DataProcessor#

The first step in any modelling pipeline is to normalise and standardise the data. In DeepSensor, this is achieved with the DataProcessor class. Let’s load some environmental data from deepsensor.data.sources and see how it works!

Note

Some of the data downloader functions used here require additional dependencies. To run this yourself you will need to run:

pip install rioxarray

to install the rioxarray package and

pip install git+https://github.com/scott-hosking/get-station-data.git

to install the get_station_data package.

Hide code cell source
import logging
logging.captureWarnings(True)

import xarray as xr
import pandas as pd

# Using the same settings allows use to use pre-downloaded cached data
data_range = ("2015-06-25", "2015-06-30")
extent = "europe"
station_var_IDs = ["TAVG", "PRCP"]
era5_var_IDs = ["2m_temperature", "10m_u_component_of_wind", "10m_v_component_of_wind"]
aux_var_IDs = ["elevation", "tpi"]
cache_dir = "../../.datacache"
from deepsensor.data import DataProcessor
from deepsensor.data.sources import get_ghcnd_station_data, get_era5_reanalysis_data, get_earthenv_auxiliary_data, get_gldas_land_mask
station_raw_df = get_ghcnd_station_data(station_var_IDs, extent, date_range=data_range, cache=True, cache_dir=cache_dir)
era5_raw_ds = get_era5_reanalysis_data(era5_var_IDs, extent, date_range=data_range, cache=True, cache_dir=cache_dir)
aux_raw_ds = get_earthenv_auxiliary_data(aux_var_IDs, extent, "1KM", cache=True, cache_dir=cache_dir)
land_mask_raw_ds = get_gldas_land_mask(extent, cache=True, cache_dir=cache_dir)
100%|████████████████████████████████████████████████████████████████| 3142/3142 [02:42<00:00, 19.33it/s]

Initialising a DataProcessor#

To initialise a DataProcessor object, provide it with the names of the spatiotemporal dimensions in your data (defaults to time, x1, x2).

data_processor = DataProcessor(x1_name="lat", x2_name="lon")
print(data_processor)
DataProcessor with normalisation params:
{'coords': {'time': {'name': 'time'},
            'x1': {'map': None, 'name': 'lat'},
            'x2': {'map': None, 'name': 'lon'}}}

Normalising data with a DataProcessor#

Calling a DataProcessor on an xarray or pandas object will compute normalisation parameters for each variable in the object (if not already computed) and return the normalised object/s.

era5_ds = data_processor(era5_raw_ds)
era5_ds
<xarray.Dataset>
Dimensions:                  (time: 6, x1: 141, x2: 221)
Coordinates:
  * time                     (time) datetime64[ns] 2015-06-25 ... 2015-06-30
  * x1                       (x1) float32 0.6364 0.6318 0.6273 ... 0.004545 0.0
  * x2                       (x2) float32 0.0 0.004545 0.009091 ... 0.9955 1.0
Data variables:
    2m_temperature           (time, x1, x2) float32 -2.652 -2.635 ... 2.322
    10m_u_component_of_wind  (time, x1, x2) float32 1.987 1.985 ... 1.572 1.529
    10m_v_component_of_wind  (time, x1, x2) float32 1.054 1.018 ... -0.724
station_df = data_processor(station_raw_df)
station_df
PRCP TAVG
time x1 x2 station
2015-06-25 0.000309 0.246364 AGM00060531 -0.278121 0.759107
0.001818 0.239091 AGE00147716 -0.278121 0.836200
0.002127 0.940909 SYM00040030 NaN 1.221668
0.003036 0.314855 AGM00060514 -0.278121 1.318035
0.003636 0.261509 AGM00060520 -0.278121 1.125301
... ... ... ... ... ...
2015-06-30 0.198782 0.412727 ITM00016052 NaN -2.575188
0.061218 0.263636 SPM00008359 -0.278121 1.645682
0.370600 0.940000 RSM00027611 -0.278121 -0.358749
0.437818 0.597455 SWE00138750 -0.278121 NaN
0.522909 0.498727 SWE00140158 -0.134323 NaN

16922 rows × 2 columns

You can also process multiple variables in one DataProcessor call, and choose to normalise the data with "min_max", which will scale the data to the range [-1, 1]. For example:

aux_ds, land_mask_ds = data_processor([aux_raw_ds, land_mask_raw_ds], method="min_max")
aux_ds
<xarray.Dataset>
Dimensions:    (x1: 4200, x2: 6600)
Coordinates:
  * x1         (x1) float64 0.6363 0.6361 0.636 ... 0.0002273 7.576e-05
  * x2         (x2) float64 7.576e-05 0.0002273 0.0003788 ... 0.9998 0.9999
Data variables:
    elevation  (x1, x2) float32 -0.921 -0.921 -0.921 ... -0.8095 -0.8095 -0.8099
    tpi        (x1, x2) float32 -0.09401 -0.09401 -0.09401 ... -0.09305 -0.09199

DataProcessor configuration#

The DataProcessor keeps track of the normalisation parameters used to transform the data.

When the DataProcessor is called on data with a variable ID that matches one in the config dictionary, those normalisation parameters are used.

print(data_processor)
DataProcessor with normalisation params:
{'10m_u_component_of_wind': {'method': 'mean_std',
                             'params': {'mean': 0.644899845123291,
                                        'std': 2.8509252071380615}},
 '10m_v_component_of_wind': {'method': 'mean_std',
                             'params': {'mean': -0.19969606399536133,
                                        'std': 3.2448606491088867}},
 '2m_temperature': {'method': 'mean_std',
                    'params': {'mean': 289.39849853515625,
                               'std': 5.538551330566406}},
 'GLDAS_mask': {'method': 'min_max', 'params': {'max': 1.0, 'min': 0.0}},
 'PRCP': {'method': 'mean_std',
          'params': {'mean': 1.1604694953552597, 'std': 4.172541647864038}},
 'TAVG': {'method': 'mean_std',
          'params': {'mean': 19.0613726868119, 'std': 5.188503809362459}},
 'coords': {'time': {'name': 'time'},
            'x1': {'map': (35.0, 90.0), 'name': 'lat'},
            'x2': {'map': (-15.0, 40.0), 'name': 'lon'}},
 'elevation': {'method': 'min_max',
               'params': {'max': 4504.4375, 'min': -185.125}},
 'tpi': {'method': 'min_max', 'params': {'max': 88.9609375, 'min': -73.671875}}}

Unnormalising data#

Keeping track of the normalisation parameters allows us to easily unnormalise data back to the original coordinates and units:

era5_raw_ds_unnormalised = data_processor.unnormalise(era5_ds)
xr.testing.assert_allclose(era5_raw_ds, era5_raw_ds_unnormalised, atol=1e-6)
era5_raw_ds_unnormalised
<xarray.Dataset>
Dimensions:                  (time: 6, lat: 141, lon: 221)
Coordinates:
  * time                     (time) datetime64[ns] 2015-06-25 ... 2015-06-30
  * lat                      (lat) float32 70.0 69.75 69.5 ... 35.5 35.25 35.0
  * lon                      (lon) float32 -15.0 -14.75 -14.5 ... 39.75 40.0
Data variables:
    2m_temperature           (time, lat, lon) float32 274.7 274.8 ... 302.3
    10m_u_component_of_wind  (time, lat, lon) float32 6.309 6.305 ... 5.004
    10m_v_component_of_wind  (time, lat, lon) float32 3.221 3.105 ... -2.549
station_df_unnormalised = data_processor.unnormalise(station_df)
pd.testing.assert_frame_equal(station_raw_df, station_df_unnormalised)
station_df_unnormalised
PRCP TAVG
time lat lon station
2015-06-25 35.017 -1.450 AGM00060531 0.0 23.0
35.100 -1.850 AGE00147716 0.0 23.4
35.117 36.750 SYM00040030 NaN 25.4
35.167 2.317 AGM00060514 0.0 25.9
35.200 -0.617 AGM00060520 0.0 24.9
... ... ... ... ... ...
2015-06-30 45.933 7.700 ITM00016052 NaN 5.7
38.367 -0.500 SPM00008359 0.0 27.6
55.383 36.700 RSM00027611 0.0 17.2
59.080 17.860 SWE00138750 0.0 NaN
63.760 12.430 SWE00140158 0.6 NaN

16922 rows × 2 columns

Notice how the units of the unnormalised data are the same as the original data.

This functionality will be used under the hood later to map model predictions back to the original units!

Saving and loading a DataProcessor#

The DataProcessor configuration can be saved and loaded to avoid re-computing the normalisation parameters in new sessions.

This is done using the save method to save the DataProcessor configuration to a folder, and then instantiating a new DataProcessor with the same folder will recover the same DataProcessor object.

data_processor.save("../deepsensor_config/")
data_processor2 = DataProcessor("../deepsensor_config/")
print(data_processor2)
DataProcessor with normalisation params:
{'10m_u_component_of_wind': {'method': 'mean_std',
                             'params': {'mean': 0.644899845123291,
                                        'std': 2.8509252071380615}},
 '10m_v_component_of_wind': {'method': 'mean_std',
                             'params': {'mean': -0.19969606399536133,
                                        'std': 3.2448606491088867}},
 '2m_temperature': {'method': 'mean_std',
                    'params': {'mean': 289.39849853515625,
                               'std': 5.538551330566406}},
 'GLDAS_mask': {'method': 'min_max', 'params': {'max': 1.0, 'min': 0.0}},
 'PRCP': {'method': 'mean_std',
          'params': {'mean': 1.1604694953552597, 'std': 4.172541647864038}},
 'TAVG': {'method': 'mean_std',
          'params': {'mean': 19.0613726868119, 'std': 5.188503809362459}},
 'coords': {'time': {'name': 'time'},
            'x1': {'map': (35.0, 90.0), 'name': 'lat'},
            'x2': {'map': (-15.0, 40.0), 'name': 'lon'}},
 'elevation': {'method': 'min_max',
               'params': {'max': 4504.4375, 'min': -185.125}},
 'tpi': {'method': 'min_max', 'params': {'max': 88.9609375, 'min': -73.671875}}}

Computing normalisation parameters over a subset of the data#

Want to compute normalisation parameters only over a training period? No problem: just slice the data before passing it to the DataProcessor.

_ = data_processor(era5_raw_ds.sel(time=slice("2015-06-25", "2015-06-27")))
era5_ds = data_processor(era5_raw_ds)  # Will use the normalisation parameters computed above when called on the full dataset