from typing import Union
import numpy as np
import pandas as pd
import scipy
import xarray as xr
[docs]def construct_x1x2_ds(gridded_ds):
"""Construct an :class:`xarray.Dataset` containing two vars, where each var is
a 2D gridded channel whose values contain the x_1 and x_2 coordinate
values, respectively.
Args:
gridded_ds (:class:`xarray.Dataset`):
...
Returns:
:class:`xarray.Dataset`
...
"""
X1, X2 = np.meshgrid(gridded_ds.x1, gridded_ds.x2, indexing="ij")
ds = xr.Dataset(
coords={"x1": gridded_ds.x1, "x2": gridded_ds.x2},
data_vars={"x1_arr": (("x1", "x2"), X1), "x2_arr": (("x1", "x2"), X2)},
)
return ds
[docs]def construct_circ_time_ds(dates, freq):
"""Return an :class:`xarray.Dataset` containing a circular variable for time.
The ``freq`` entry dictates the frequency of cycling of the circular
variable. E.g.:
- ``'H'``: cycles once per day at hourly intervals
- ``'D'``: cycles once per year at daily intervals
- ``'M'``: cycles once per year at monthly intervals
Args:
dates (...):
...
freq (...):
...
Returns:
:class:`xarray.Dataset`
...
"""
# Ensure dates are pandas
dates = pd.DatetimeIndex(dates)
if freq == "D":
time_var = dates.dayofyear
mod = 365.25
elif freq == "H":
time_var = dates.hour
mod = 24
elif freq == "M":
time_var = dates.month
mod = 12
else:
raise ValueError("Circular time variable not implemented for this frequency.")
cos_time = np.cos(2 * np.pi * time_var / mod)
sin_time = np.sin(2 * np.pi * time_var / mod)
ds = xr.Dataset(
coords={"time": dates},
data_vars={
f"cos_{freq}": ("time", cos_time),
f"sin_{freq}": ("time", sin_time),
},
)
return ds
[docs]def compute_xarray_data_resolution(ds: Union[xr.DataArray, xr.Dataset]) -> float:
"""Computes the resolution of an xarray object with coordinates x1 and x2.
The data resolution is the finer of the two coordinate resolutions (x1 and
x2). For example, if x1 has a resolution of 0.1 degrees and x2 has a
resolution of 0.2 degrees, the data resolution returned will be 0.1
degrees.
Args:
ds (:class:`xarray.DataArray` | :class:`xarray.Dataset`):
Xarray object with coordinates x1 and x2.
Returns:
float: Resolution of the data (in spatial units, e.g. 0.1 degrees).
"""
x1_res = np.abs(np.mean(np.diff(ds["x1"])))
x2_res = np.abs(np.mean(np.diff(ds["x2"])))
data_resolution = np.min([x1_res, x2_res])
return data_resolution
[docs]def compute_pandas_data_resolution(
df: Union[pd.DataFrame, pd.Series],
n_times: int = 1000,
percentile: int = 5,
) -> float:
"""Approximates the resolution of non-gridded pandas data with indexes time,
x1, and x2.
The resolution is approximated as the Nth percentile of the distances
between neighbouring observations, possibly using a subset of the dates in
the data. The default is to use 1000 dates (or all dates if there are fewer
than 1000) and to use the 5th percentile. This means that the resolution is
the distance between the closest 5% of neighbouring observations.
Args:
df (:class:`pandas.DataFrame` | :class:`pandas.Series`):
Dataframe or series with indexes time, x1, and x2.
n_times (int, optional):
Number of dates to sample. Defaults to 1000. If "all", all dates
are used.
percentile (int, optional):
Percentile of pairwise distances for computing the resolution.
Defaults to 5.
Returns:
float: Resolution of the data (in spatial units, e.g. 0.1 degrees).
"""
dates = df.index.get_level_values("time").unique()
if n_times != "all" and len(dates) > n_times:
rng = np.random.default_rng(42)
dates = rng.choice(dates, size=n_times, replace=False)
closest_distances = []
df = df.reset_index().set_index("time")
for time in dates:
df_t = df.loc[[time]]
X = df_t[["x1", "x2"]].values # (N, 2) array of coordinates
if X.shape[0] < 2:
# Skip this time if there are fewer than 2 stationS
continue
X_unique = np.unique(X, axis=0) # (N_unique, 2) array of unique coordinates
pairwise_distances = scipy.spatial.distance.cdist(X_unique, X_unique)
percentile_distances_without_self = np.ma.masked_equal(pairwise_distances, 0)
# Compute the closest distance from each station to each other station
closest_distances_t = np.min(percentile_distances_without_self, axis=1)
closest_distances.extend(closest_distances_t)
data_resolution = np.percentile(closest_distances, percentile)
return data_resolution