DataProcessor#
The first step in any modelling pipeline is to normalise and standardise the data.
In DeepSensor, this is achieved with the DataProcessor
class.
Let’s load some environmental data from deepsensor.data.sources
and see how it works!
Note
Some of the data downloader functions used here require additional dependencies. To run this yourself you will need to run:
pip install rioxarray
to install the rioxarray
package and
pip install git+https://github.com/scott-hosking/get-station-data.git
to install the get_station_data
package.
Show code cell source
import logging
logging.captureWarnings(True)
import xarray as xr
import pandas as pd
# Using the same settings allows use to use pre-downloaded cached data
data_range = ("2015-06-25", "2015-06-30")
extent = "europe"
station_var_IDs = ["TAVG", "PRCP"]
era5_var_IDs = ["2m_temperature", "10m_u_component_of_wind", "10m_v_component_of_wind"]
aux_var_IDs = ["elevation", "tpi"]
cache_dir = "../../.datacache"
from deepsensor.data import DataProcessor
from deepsensor.data.sources import get_ghcnd_station_data, get_era5_reanalysis_data, get_earthenv_auxiliary_data, get_gldas_land_mask
station_raw_df = get_ghcnd_station_data(station_var_IDs, extent, date_range=data_range, cache=True, cache_dir=cache_dir)
era5_raw_ds = get_era5_reanalysis_data(era5_var_IDs, extent, date_range=data_range, cache=True, cache_dir=cache_dir)
aux_raw_ds = get_earthenv_auxiliary_data(aux_var_IDs, extent, "1KM", cache=True, cache_dir=cache_dir)
land_mask_raw_ds = get_gldas_land_mask(extent, cache=True, cache_dir=cache_dir)
100%|████████████████████████████████████████████████████████████████| 3142/3142 [02:42<00:00, 19.33it/s]
Initialising a DataProcessor#
To initialise a DataProcessor
object, provide it with the names of the spatiotemporal dimensions in your data (defaults to time
, x1
, x2
).
data_processor = DataProcessor(x1_name="lat", x2_name="lon")
print(data_processor)
DataProcessor with normalisation params:
{'coords': {'time': {'name': 'time'},
'x1': {'map': None, 'name': 'lat'},
'x2': {'map': None, 'name': 'lon'}}}
Normalising data with a DataProcessor#
Calling a DataProcessor
on an xarray or pandas object will compute normalisation parameters for each variable in the object (if not already computed) and return the normalised object/s.
era5_ds = data_processor(era5_raw_ds)
era5_ds
<xarray.Dataset> Dimensions: (time: 6, x1: 141, x2: 221) Coordinates: * time (time) datetime64[ns] 2015-06-25 ... 2015-06-30 * x1 (x1) float32 0.6364 0.6318 0.6273 ... 0.004545 0.0 * x2 (x2) float32 0.0 0.004545 0.009091 ... 0.9955 1.0 Data variables: 2m_temperature (time, x1, x2) float32 -2.652 -2.635 ... 2.322 10m_u_component_of_wind (time, x1, x2) float32 1.987 1.985 ... 1.572 1.529 10m_v_component_of_wind (time, x1, x2) float32 1.054 1.018 ... -0.724
station_df = data_processor(station_raw_df)
station_df
PRCP | TAVG | ||||
---|---|---|---|---|---|
time | x1 | x2 | station | ||
2015-06-25 | 0.000309 | 0.246364 | AGM00060531 | -0.278121 | 0.759107 |
0.001818 | 0.239091 | AGE00147716 | -0.278121 | 0.836200 | |
0.002127 | 0.940909 | SYM00040030 | NaN | 1.221668 | |
0.003036 | 0.314855 | AGM00060514 | -0.278121 | 1.318035 | |
0.003636 | 0.261509 | AGM00060520 | -0.278121 | 1.125301 | |
... | ... | ... | ... | ... | ... |
2015-06-30 | 0.198782 | 0.412727 | ITM00016052 | NaN | -2.575188 |
0.061218 | 0.263636 | SPM00008359 | -0.278121 | 1.645682 | |
0.370600 | 0.940000 | RSM00027611 | -0.278121 | -0.358749 | |
0.437818 | 0.597455 | SWE00138750 | -0.278121 | NaN | |
0.522909 | 0.498727 | SWE00140158 | -0.134323 | NaN |
16922 rows × 2 columns
You can also process multiple variables in one DataProcessor
call, and choose to normalise the data with "min_max"
, which will scale the data to the range [-1, 1]. For example:
aux_ds, land_mask_ds = data_processor([aux_raw_ds, land_mask_raw_ds], method="min_max")
aux_ds
<xarray.Dataset> Dimensions: (x1: 4200, x2: 6600) Coordinates: * x1 (x1) float64 0.6363 0.6361 0.636 ... 0.0002273 7.576e-05 * x2 (x2) float64 7.576e-05 0.0002273 0.0003788 ... 0.9998 0.9999 Data variables: elevation (x1, x2) float32 -0.921 -0.921 -0.921 ... -0.8095 -0.8095 -0.8099 tpi (x1, x2) float32 -0.09401 -0.09401 -0.09401 ... -0.09305 -0.09199
DataProcessor configuration#
The DataProcessor
keeps track of the normalisation parameters used to transform the data.
When the DataProcessor
is called on data with a variable ID that matches one in the config
dictionary, those normalisation parameters are used.
print(data_processor)
DataProcessor with normalisation params:
{'10m_u_component_of_wind': {'method': 'mean_std',
'params': {'mean': 0.644899845123291,
'std': 2.8509252071380615}},
'10m_v_component_of_wind': {'method': 'mean_std',
'params': {'mean': -0.19969606399536133,
'std': 3.2448606491088867}},
'2m_temperature': {'method': 'mean_std',
'params': {'mean': 289.39849853515625,
'std': 5.538551330566406}},
'GLDAS_mask': {'method': 'min_max', 'params': {'max': 1.0, 'min': 0.0}},
'PRCP': {'method': 'mean_std',
'params': {'mean': 1.1604694953552597, 'std': 4.172541647864038}},
'TAVG': {'method': 'mean_std',
'params': {'mean': 19.0613726868119, 'std': 5.188503809362459}},
'coords': {'time': {'name': 'time'},
'x1': {'map': (35.0, 90.0), 'name': 'lat'},
'x2': {'map': (-15.0, 40.0), 'name': 'lon'}},
'elevation': {'method': 'min_max',
'params': {'max': 4504.4375, 'min': -185.125}},
'tpi': {'method': 'min_max', 'params': {'max': 88.9609375, 'min': -73.671875}}}
Unnormalising data#
Keeping track of the normalisation parameters allows us to easily unnormalise data back to the original coordinates and units:
era5_raw_ds_unnormalised = data_processor.unnormalise(era5_ds)
xr.testing.assert_allclose(era5_raw_ds, era5_raw_ds_unnormalised, atol=1e-6)
era5_raw_ds_unnormalised
<xarray.Dataset> Dimensions: (time: 6, lat: 141, lon: 221) Coordinates: * time (time) datetime64[ns] 2015-06-25 ... 2015-06-30 * lat (lat) float32 70.0 69.75 69.5 ... 35.5 35.25 35.0 * lon (lon) float32 -15.0 -14.75 -14.5 ... 39.75 40.0 Data variables: 2m_temperature (time, lat, lon) float32 274.7 274.8 ... 302.3 10m_u_component_of_wind (time, lat, lon) float32 6.309 6.305 ... 5.004 10m_v_component_of_wind (time, lat, lon) float32 3.221 3.105 ... -2.549
station_df_unnormalised = data_processor.unnormalise(station_df)
pd.testing.assert_frame_equal(station_raw_df, station_df_unnormalised)
station_df_unnormalised
PRCP | TAVG | ||||
---|---|---|---|---|---|
time | lat | lon | station | ||
2015-06-25 | 35.017 | -1.450 | AGM00060531 | 0.0 | 23.0 |
35.100 | -1.850 | AGE00147716 | 0.0 | 23.4 | |
35.117 | 36.750 | SYM00040030 | NaN | 25.4 | |
35.167 | 2.317 | AGM00060514 | 0.0 | 25.9 | |
35.200 | -0.617 | AGM00060520 | 0.0 | 24.9 | |
... | ... | ... | ... | ... | ... |
2015-06-30 | 45.933 | 7.700 | ITM00016052 | NaN | 5.7 |
38.367 | -0.500 | SPM00008359 | 0.0 | 27.6 | |
55.383 | 36.700 | RSM00027611 | 0.0 | 17.2 | |
59.080 | 17.860 | SWE00138750 | 0.0 | NaN | |
63.760 | 12.430 | SWE00140158 | 0.6 | NaN |
16922 rows × 2 columns
Notice how the units of the unnormalised data are the same as the original data.
This functionality will be used under the hood later to map model predictions back to the original units!
Saving and loading a DataProcessor#
The DataProcessor
configuration can be saved and loaded to avoid re-computing the normalisation parameters in new sessions.
This is done using the save
method to save the DataProcessor
configuration to a folder, and then instantiating a new DataProcessor
with the same folder will recover the same DataProcessor
object.
data_processor.save("../deepsensor_config/")
data_processor2 = DataProcessor("../deepsensor_config/")
print(data_processor2)
DataProcessor with normalisation params:
{'10m_u_component_of_wind': {'method': 'mean_std',
'params': {'mean': 0.644899845123291,
'std': 2.8509252071380615}},
'10m_v_component_of_wind': {'method': 'mean_std',
'params': {'mean': -0.19969606399536133,
'std': 3.2448606491088867}},
'2m_temperature': {'method': 'mean_std',
'params': {'mean': 289.39849853515625,
'std': 5.538551330566406}},
'GLDAS_mask': {'method': 'min_max', 'params': {'max': 1.0, 'min': 0.0}},
'PRCP': {'method': 'mean_std',
'params': {'mean': 1.1604694953552597, 'std': 4.172541647864038}},
'TAVG': {'method': 'mean_std',
'params': {'mean': 19.0613726868119, 'std': 5.188503809362459}},
'coords': {'time': {'name': 'time'},
'x1': {'map': (35.0, 90.0), 'name': 'lat'},
'x2': {'map': (-15.0, 40.0), 'name': 'lon'}},
'elevation': {'method': 'min_max',
'params': {'max': 4504.4375, 'min': -185.125}},
'tpi': {'method': 'min_max', 'params': {'max': 88.9609375, 'min': -73.671875}}}
Computing normalisation parameters over a subset of the data#
Want to compute normalisation parameters only over a training period?
No problem: just slice the data before passing it to the DataProcessor
.
_ = data_processor(era5_raw_ds.sel(time=slice("2015-06-25", "2015-06-27")))
era5_ds = data_processor(era5_raw_ds) # Will use the normalisation parameters computed above when called on the full dataset