DataProcessor#
The first step in any modelling pipeline is to normalise and standardise the data.
In DeepSensor, this is achieved with the DataProcessor class.
Let’s load some environmental data from deepsensor.data.sources and see how it works!
Note
Some of the data downloader functions used here require additional dependencies. To run this yourself you will need to run:
pip install rioxarray
to install the rioxarray package and
pip install git+https://github.com/scott-hosking/get-station-data.git
to install the get_station_data package.
from deepsensor.data import DataProcessor
from deepsensor.data.sources import get_ghcnd_station_data, get_era5_reanalysis_data, get_earthenv_auxiliary_data, get_gldas_land_mask
station_raw_df = get_ghcnd_station_data(station_var_IDs, extent, date_range=data_range, cache=True, cache_dir=cache_dir)
era5_raw_ds = get_era5_reanalysis_data(era5_var_IDs, extent, date_range=data_range, cache=True, cache_dir=cache_dir)
aux_raw_ds = get_earthenv_auxiliary_data(aux_var_IDs, extent, "1KM", cache=True, cache_dir=cache_dir)
land_mask_raw_ds = get_gldas_land_mask(extent, cache=True, cache_dir=cache_dir)
100%|████████████████████████████████████████████████████████████████| 3142/3142 [02:42<00:00, 19.33it/s]
Initialising a DataProcessor#
To initialise a DataProcessor object, provide it with the names of the spatiotemporal dimensions in your data (defaults to time, x1, x2).
data_processor = DataProcessor(x1_name="lat", x2_name="lon")
print(data_processor)
DataProcessor with normalisation params:
{'coords': {'time': {'name': 'time'},
'x1': {'map': None, 'name': 'lat'},
'x2': {'map': None, 'name': 'lon'}}}
Normalising data with a DataProcessor#
Calling a DataProcessor on an xarray or pandas object will compute normalisation parameters for each variable in the object (if not already computed) and return the normalised object/s.
era5_ds = data_processor(era5_raw_ds)
era5_ds
<xarray.Dataset>
Dimensions: (time: 6, x1: 141, x2: 221)
Coordinates:
* time (time) datetime64[ns] 2015-06-25 ... 2015-06-30
* x1 (x1) float32 0.6364 0.6318 0.6273 ... 0.004545 0.0
* x2 (x2) float32 0.0 0.004545 0.009091 ... 0.9955 1.0
Data variables:
2m_temperature (time, x1, x2) float32 -2.652 -2.635 ... 2.322
10m_u_component_of_wind (time, x1, x2) float32 1.987 1.985 ... 1.572 1.529
10m_v_component_of_wind (time, x1, x2) float32 1.054 1.018 ... -0.724station_df = data_processor(station_raw_df)
station_df
| PRCP | TAVG | ||||
|---|---|---|---|---|---|
| time | x1 | x2 | station | ||
| 2015-06-25 | 0.000309 | 0.246364 | AGM00060531 | -0.278121 | 0.759107 |
| 0.001818 | 0.239091 | AGE00147716 | -0.278121 | 0.836200 | |
| 0.002127 | 0.940909 | SYM00040030 | NaN | 1.221668 | |
| 0.003036 | 0.314855 | AGM00060514 | -0.278121 | 1.318035 | |
| 0.003636 | 0.261509 | AGM00060520 | -0.278121 | 1.125301 | |
| ... | ... | ... | ... | ... | ... |
| 2015-06-30 | 0.198782 | 0.412727 | ITM00016052 | NaN | -2.575188 |
| 0.061218 | 0.263636 | SPM00008359 | -0.278121 | 1.645682 | |
| 0.370600 | 0.940000 | RSM00027611 | -0.278121 | -0.358749 | |
| 0.437818 | 0.597455 | SWE00138750 | -0.278121 | NaN | |
| 0.522909 | 0.498727 | SWE00140158 | -0.134323 | NaN |
16922 rows × 2 columns
You can also process multiple variables in one DataProcessor call, and choose to normalise the data with "min_max", which will scale the data to the range [-1, 1]. For example:
aux_ds, land_mask_ds = data_processor([aux_raw_ds, land_mask_raw_ds], method="min_max")
aux_ds
<xarray.Dataset>
Dimensions: (x1: 4200, x2: 6600)
Coordinates:
* x1 (x1) float64 0.6363 0.6361 0.636 ... 0.0002273 7.576e-05
* x2 (x2) float64 7.576e-05 0.0002273 0.0003788 ... 0.9998 0.9999
Data variables:
elevation (x1, x2) float32 -0.921 -0.921 -0.921 ... -0.8095 -0.8095 -0.8099
tpi (x1, x2) float32 -0.09401 -0.09401 -0.09401 ... -0.09305 -0.09199DataProcessor configuration#
The DataProcessor keeps track of the normalisation parameters used to transform the data.
When the DataProcessor is called on data with a variable ID that matches one in the config dictionary, those normalisation parameters are used.
print(data_processor)
DataProcessor with normalisation params:
{'10m_u_component_of_wind': {'method': 'mean_std',
'params': {'mean': 0.644899845123291,
'std': 2.8509252071380615}},
'10m_v_component_of_wind': {'method': 'mean_std',
'params': {'mean': -0.19969606399536133,
'std': 3.2448606491088867}},
'2m_temperature': {'method': 'mean_std',
'params': {'mean': 289.39849853515625,
'std': 5.538551330566406}},
'GLDAS_mask': {'method': 'min_max', 'params': {'max': 1.0, 'min': 0.0}},
'PRCP': {'method': 'mean_std',
'params': {'mean': 1.1604694953552597, 'std': 4.172541647864038}},
'TAVG': {'method': 'mean_std',
'params': {'mean': 19.0613726868119, 'std': 5.188503809362459}},
'coords': {'time': {'name': 'time'},
'x1': {'map': (35.0, 90.0), 'name': 'lat'},
'x2': {'map': (-15.0, 40.0), 'name': 'lon'}},
'elevation': {'method': 'min_max',
'params': {'max': 4504.4375, 'min': -185.125}},
'tpi': {'method': 'min_max', 'params': {'max': 88.9609375, 'min': -73.671875}}}
Unnormalising data#
Keeping track of the normalisation parameters allows us to easily unnormalise data back to the original coordinates and units:
era5_raw_ds_unnormalised = data_processor.unnormalise(era5_ds)
xr.testing.assert_allclose(era5_raw_ds, era5_raw_ds_unnormalised, atol=1e-6)
era5_raw_ds_unnormalised
<xarray.Dataset>
Dimensions: (time: 6, lat: 141, lon: 221)
Coordinates:
* time (time) datetime64[ns] 2015-06-25 ... 2015-06-30
* lat (lat) float32 70.0 69.75 69.5 ... 35.5 35.25 35.0
* lon (lon) float32 -15.0 -14.75 -14.5 ... 39.75 40.0
Data variables:
2m_temperature (time, lat, lon) float32 274.7 274.8 ... 302.3
10m_u_component_of_wind (time, lat, lon) float32 6.309 6.305 ... 5.004
10m_v_component_of_wind (time, lat, lon) float32 3.221 3.105 ... -2.549station_df_unnormalised = data_processor.unnormalise(station_df)
pd.testing.assert_frame_equal(station_raw_df, station_df_unnormalised)
station_df_unnormalised
| PRCP | TAVG | ||||
|---|---|---|---|---|---|
| time | lat | lon | station | ||
| 2015-06-25 | 35.017 | -1.450 | AGM00060531 | 0.0 | 23.0 |
| 35.100 | -1.850 | AGE00147716 | 0.0 | 23.4 | |
| 35.117 | 36.750 | SYM00040030 | NaN | 25.4 | |
| 35.167 | 2.317 | AGM00060514 | 0.0 | 25.9 | |
| 35.200 | -0.617 | AGM00060520 | 0.0 | 24.9 | |
| ... | ... | ... | ... | ... | ... |
| 2015-06-30 | 45.933 | 7.700 | ITM00016052 | NaN | 5.7 |
| 38.367 | -0.500 | SPM00008359 | 0.0 | 27.6 | |
| 55.383 | 36.700 | RSM00027611 | 0.0 | 17.2 | |
| 59.080 | 17.860 | SWE00138750 | 0.0 | NaN | |
| 63.760 | 12.430 | SWE00140158 | 0.6 | NaN |
16922 rows × 2 columns
Notice how the units of the unnormalised data are the same as the original data.
This functionality will be used under the hood later to map model predictions back to the original units!
Saving and loading a DataProcessor#
The DataProcessor configuration can be saved and loaded to avoid re-computing the normalisation parameters in new sessions.
This is done using the save method to save the DataProcessor configuration to a folder, and then instantiating a new DataProcessor with the same folder will recover the same DataProcessor object.
data_processor.save("../deepsensor_config/")
data_processor2 = DataProcessor("../deepsensor_config/")
print(data_processor2)
DataProcessor with normalisation params:
{'10m_u_component_of_wind': {'method': 'mean_std',
'params': {'mean': 0.644899845123291,
'std': 2.8509252071380615}},
'10m_v_component_of_wind': {'method': 'mean_std',
'params': {'mean': -0.19969606399536133,
'std': 3.2448606491088867}},
'2m_temperature': {'method': 'mean_std',
'params': {'mean': 289.39849853515625,
'std': 5.538551330566406}},
'GLDAS_mask': {'method': 'min_max', 'params': {'max': 1.0, 'min': 0.0}},
'PRCP': {'method': 'mean_std',
'params': {'mean': 1.1604694953552597, 'std': 4.172541647864038}},
'TAVG': {'method': 'mean_std',
'params': {'mean': 19.0613726868119, 'std': 5.188503809362459}},
'coords': {'time': {'name': 'time'},
'x1': {'map': (35.0, 90.0), 'name': 'lat'},
'x2': {'map': (-15.0, 40.0), 'name': 'lon'}},
'elevation': {'method': 'min_max',
'params': {'max': 4504.4375, 'min': -185.125}},
'tpi': {'method': 'min_max', 'params': {'max': 88.9609375, 'min': -73.671875}}}
Computing normalisation parameters over a subset of the data#
Want to compute normalisation parameters only over a training period?
No problem: just slice the data before passing it to the DataProcessor.
_ = data_processor(era5_raw_ds.sel(time=slice("2015-06-25", "2015-06-27")))
era5_ds = data_processor(era5_raw_ds) # Will use the normalisation parameters computed above when called on the full dataset