Module sarra_py.data_preparation

View Source
from os import listdir

from os.path import isfile, join

import pandas as pd

import datetime

import rasterio

import os

import rioxarray

from tqdm import tqdm as tqdm

import numpy as np

import yaml

import xarray as xr

import astral

from astral.sun import sun

from astral import LocationInfo

def build_rainfall_files_df(rainfall_path, date_start, duration):

    """

    This function builds a dataframe containing the list of rainfall files

    from the provided path, and the given date_start and duration.

    Helper function used in get_grid_size() and load_TAMSAT_data().

    Args:

        rainfall_path (_type_): _description_

    Returns:

        _type_: _description_

    """

    #rainfall_files = [f for f in listdir(rainfall_path) if isfile(join(rainfall_path, f))]

    # version that should be quicker

    rainfall_files = [f for f in listdir(rainfall_path)]# if isfile(join(rainfall_path, f))]

    rainfall_files_df = pd.DataFrame({"filename":rainfall_files}).sort_values("filename").reset_index(drop=True)

    rainfall_files_df["date"] = rainfall_files_df.apply(

        lambda x: datetime.date(

            int(x["filename"].replace(".tif","").split("_")[-3]),

            int(x["filename"].replace(".tif","").split("_")[-2]),

            int(x["filename"].replace(".tif","").split("_")[-1]),

        ),

        axis=1,

    )

    rainfall_files_df = rainfall_files_df[(rainfall_files_df["date"]>=date_start) & (rainfall_files_df["date"]<date_start+datetime.timedelta(days=duration))].reset_index(drop=True)

    return rainfall_files_df

def get_grid_size(rainfall_path, date_start, duration):

    """

    This function loads the list of rainfall files corresponding to the given

    date_start and duration, loads the first rainfall file, and returns its grid

    size, as dimensions of the rainfall grid define the output resolution of the

    model.

    Args:

        TAMSAT_path (_type_): _description_

        date_start (_type_): _description_

        duration (_type_): _description_

    Returns:

        _type_: _description_

    """

    rainfall_files_df = build_rainfall_files_df(rainfall_path, date_start, duration)

    # checking coherence between date_start and duration and available rainfall data

    if rainfall_files_df["date"].iloc[-1] != date_start+datetime.timedelta(days=duration-1) :

        raise ValueError("The date range may not be covered by the available rainfall data ; please check rainfall entry files.")

    # loading the first rainfall file to get the grid size

    src = rasterio.open(os.path.join(rainfall_path,rainfall_files_df.loc[0,"filename"]))

    array = src.read(1)

    grid_width = array.shape[0]

    grid_height = array.shape[1]

    return grid_width, grid_height

def load_TAMSAT_data(data, TAMSAT_path, date_start, duration):

    """

    This function loops over the rainfall raster files, and loads them into a

    xarray DataArray, which is then added to the rain data dictionary. It is

    tailored to the TAMSAT rainfall data files, hence its name.

    Args:

        data (_type_): _description_

        TAMSAT_path (_type_): _description_

        date_start (_type_): _description_

        duration (_type_): _description_

    Returns:

        _type_: _description_

    """

    TAMSAT_files_df = build_rainfall_files_df(TAMSAT_path, date_start, duration)

    for i in range(len(TAMSAT_files_df)):

        dataarray = rioxarray.open_rasterio(os.path.join(TAMSAT_path,TAMSAT_files_df.loc[i,"filename"]))

        dataarray = dataarray.squeeze("band").drop_vars(["band", "spatial_ref"])

        dataarray.attrs = {}

        try:

            dataarray_full = xr.concat([dataarray_full, dataarray],"time")

        except:

            dataarray_full = dataarray

    dataarray_full.rio.write_crs(4326,inplace=True)

    data["rain"] = dataarray_full

    data["rain"].attrs = {"units":"mm", "long_name":"rainfall"}

    return data

def load_TAMSAT_data_fast(data, rainfall_data_path, date_start, duration):

    """

    This function loops over the rainfall raster files, and loads them into a

    xarray DataArray, which is then added to the rain data dictionary. It is

    tailored to the TAMSAT rainfall data files, hence its name.

    Args:

        data (_type_): _description_

        TAMSAT_path (_type_): _description_

        date_start (_type_): _description_

        duration (_type_): _description_

    Returns:

        _type_: _description_

    """

    rainfall_data_path_df = build_rainfall_files_df(rainfall_data_path, date_start, duration)

    dataarray_full = xr.open_mfdataset([os.path.join(rainfall_data_path,rainfall_data_path_df.loc[i,"filename"]) for i in range(len(rainfall_data_path_df))], concat_dim="time", combine="nested", chunks={"x":100,"y":100})

    dataarray_full = dataarray_full.squeeze("band").drop_vars(["band"])

    dataarray_full.rio.write_crs(4326,inplace=True)

    data["rain"] = dataarray_full["band_data"]

    return data

def load_AgERA5_data(data, AgERA5_data_path, date_start, duration):

    """

    This function loops over the AgERA5 raster files, and loads them into

    xarray DataArray, which are then added to the data dictionary.

    Args:

        data (_type_): _description_

        AgERA5_data_path (_type_): _description_

        date_start (_type_): _description_

        duration (_type_): _description_

    Returns:

        _type_: _description_

    """

    # TODO : make the file listing error-proof regarding the structure of the folder and the file extensions

    # getting list of variables

    AgERA5_variables = [path.split("/")[-1] for path in [x[0] for x in os.walk(AgERA5_data_path)][1:]]

    # building a dictionary of dataframes containing the list of files for each variable

    AgERA5_files_df_collection = {}

    for variable in AgERA5_variables:

        #AgERA5_files = [f for f in listdir(os.path.join(AgERA5_data_path,variable)) if isfile(join(os.path.join(AgERA5_data_path,variable), f))]

        # alternate version that should be quicker

        AgERA5_files = [f for f in listdir(os.path.join(AgERA5_data_path,variable))]

        AgERA5_files_df_collection[variable] = pd.DataFrame({"filename":AgERA5_files})

        AgERA5_files_df_collection[variable]["date"] = AgERA5_files_df_collection[variable].apply(

            lambda x: datetime.date(

                int(x["filename"].replace(".tif","").split("_")[-3]),

                int(x["filename"].replace(".tif","").split("_")[-2]),

                int(x["filename"].replace(".tif","").split("_")[-1]),

            ),

            axis=1,

        )

    # building a dictionary of correspondance between AgERA5 variables and SARRA variables

    AgERA5_SARRA_correspondance = {

        '10m_wind_speed_24_hour_mean':None,

        '2m_temperature_24_hour_maximum':None,

        '2m_temperature_24_hour_mean':'tpMoy',

        '2m_temperature_24_hour_minimum':None,

        'ET0Hargeaves':'ET0',

        'solar_radiation_flux_daily':'rg',

        'vapour_pressure_24_hour_mean':None,

    }

    # loading the data

    for variable in tqdm(AgERA5_variables) :

        print(variable)

        if AgERA5_SARRA_correspondance[variable] != None :

            try:

                del dataarray_full

            except:

                pass

            for i in range(duration) :

                dataarray = rioxarray.open_rasterio(os.path.join(AgERA5_data_path,variable,AgERA5_files_df_collection[variable].loc[i,"filename"]))

                dataarray = dataarray.rio.reproject_match(data, nodata=np.nan)

                dataarray = dataarray.squeeze("band").drop_vars(["band"])

                # TODO: add dataarray.attrs = {} to precise units and long_name

                try:

                    dataarray_full = xr.concat([dataarray_full, dataarray],"time")

                except:

                    dataarray_full = dataarray

            # storing the variable in the data dictionary

            data[AgERA5_SARRA_correspondance[variable]] = dataarray_full

    # unit conversion for solar radiation (kJ/m2 as provided by AgERA5 to MJ/m2/day)

    data["rg"] = data["rg"]/1000

    return data

def load_AgERA5_data_fast_dask(data, AgERA5_data_path, date_start, duration):

    """

    This function loops over the AgERA5 raster files, and loads them into

    xarray DataArray, which are then added to the data dictionary.

    Args:

        data (_type_): _description_

        AgERA5_data_path (_type_): _description_

        date_start (_type_): _description_

        duration (_type_): _description_

    Returns:

        _type_: _description_

    """

    # TODO : make the file listing error-proof regarding the structure of the folder and the file extensions

    # getting list of variables

    AgERA5_variables = [path.split("/")[-1] for path in [x[0] for x in os.walk(AgERA5_data_path)][1:]]

    # building a dictionary of dataframes containing the list of files for each variable

    AgERA5_files_df_collection = {}

    for variable in AgERA5_variables:

        #AgERA5_files = [f for f in listdir(os.path.join(AgERA5_data_path,variable)) if isfile(join(os.path.join(AgERA5_data_path,variable), f))]

        # alternate version that should be quicker

        AgERA5_files = [f for f in listdir(os.path.join(AgERA5_data_path,variable))]

        AgERA5_files_df_collection[variable] = pd.DataFrame({"filename":AgERA5_files})

        AgERA5_files_df_collection[variable]["date"] = AgERA5_files_df_collection[variable].apply(

            lambda x: datetime.date(

                int(x["filename"].replace(".tif","").split("_")[-3]),

                int(x["filename"].replace(".tif","").split("_")[-2]),

                int(x["filename"].replace(".tif","").split("_")[-1]),

            ),

            axis=1,

        )

        AgERA5_files_df_collection[variable] = AgERA5_files_df_collection[variable][(AgERA5_files_df_collection[variable]["date"]>=date_start) & (AgERA5_files_df_collection[variable]["date"]<date_start+datetime.timedelta(days=duration))].reset_index(drop=True)

    # building a dictionary of correspondance between AgERA5 variables and SARRA variables

    AgERA5_SARRA_correspondance = {

        '10m_wind_speed_24_hour_mean':None,

        '2m_temperature_24_hour_maximum':None,

        '2m_temperature_24_hour_mean':'tpMoy',

        '2m_temperature_24_hour_minimum':None,

        'ET0Hargeaves':'ET0',

        'solar_radiation_flux_daily':'rg',

        'vapour_pressure_24_hour_mean':None,

    }

    # loading the data

    for variable in tqdm(AgERA5_variables) :

        if AgERA5_SARRA_correspondance[variable] != None :

            try:

                del dataarray_full

            except:

                pass

            # for i in range(duration) :

            #     dataarray = rioxarray.open_rasterio(os.path.join(AgERA5_data_path,variable,AgERA5_files_df_collection[variable].loc[i,"filename"]))

            #     dataarray = dataarray.rio.reproject_match(data, nodata=np.nan)

            #     dataarray = dataarray.squeeze("band").drop_vars(["band"])

            #     # TODO: add dataarray.attrs = {} to precise units and long_name

            #     try:

            #         dataarray_full = xr.concat([dataarray_full, dataarray],"time")

            #     except:

            #         dataarray_full = dataarray

            dataarray_full = xr.open_mfdataset([os.path.join(AgERA5_data_path,variable,AgERA5_files_df_collection[variable].loc[i,"filename"]) for i in range(len(AgERA5_files_df_collection[variable]))], concat_dim="time", combine="nested", chunks={"x":1000,"y":1000})

            dataarray_full = dataarray_full.squeeze("band").drop_vars(["band"])

            dataarray_full = dataarray_full.rio.reproject_match(data, nodata=np.nan)

            dataarray_full.rio.write_crs(4326,inplace=True)

            data[AgERA5_SARRA_correspondance[variable]] = dataarray_full["band_data"]

    # unit conversion for solar radiation (kJ/m2 as provided by AgERA5 to MJ/m2/day)

    data["rg"] = data["rg"]/1000

    return data

def load_paramVariete(file_paramVariete) :

    """

    This function loads the parameters of the variety from the yaml file.

    Args:

        file_paramVariete (_type_): _description_

    Raises:

        exception: _description_

    Returns:

        _type_: _description_

    """

    with open(os.path.join('../data/params/variety/',file_paramVariete), 'r') as stream:

        paramVariete = yaml.safe_load(stream)

    if paramVariete["feuilAeroBase"] == 0.1 :

        raise Exception()

    return paramVariete

def load_paramITK(file_paramITK):

    """

    This function loads the ITK parameters from the yaml file.

    Args:

        file_paramITK (_type_): _description_

    Returns:

        _type_: _description_

    """

    with open(os.path.join('../data/params/itk/',file_paramITK), 'r') as stream:

        paramITK = yaml.safe_load(stream)

    paramITK["DateSemis"] = datetime.datetime.strptime(paramITK["DateSemis"], "%Y-%m-%d").date()

    return paramITK

def load_paramTypeSol(file_paramTypeSol):

    """

    This function loads the soil parameters from the yaml file.

    Args:

        file_paramTypeSol (_type_): _description_

    Returns:

        _type_: _description_

    """

    with open(os.path.join('../data/params/soil/',file_paramTypeSol), 'r') as stream:

        paramTypeSol = yaml.safe_load(stream)

    return paramTypeSol

def load_YAML_parameters(file_paramVariete, file_paramITK, file_paramTypeSol):

    """

    This function loads the parameters from the yaml files.

    It is a wrapper for the three functions above.

    Args:

        file_paramVariete (_type_): _description_

        file_paramITK (_type_): _description_

        file_paramTypeSol (_type_): _description_

    Returns:

        _type_: _description_

    """

    paramVariete = load_paramVariete(file_paramVariete)

    paramITK = load_paramITK(file_paramITK)

    paramTypeSol = load_paramTypeSol(file_paramTypeSol)

    if ~np.isnan(paramITK["NI"]):

        print("NI NON NULL")

        paramVariete["txConversion"] = paramVariete["NIYo"] + paramVariete["NIp"] * (1-np.exp(-paramVariete["NIp"] * paramITK["NI"])) - (np.exp(-0.5*((paramITK["NI"] - paramVariete["LGauss"])/paramVariete["AGauss"])* (paramITK["NI"]- paramVariete["LGauss"])/paramVariete["AGauss"]))/(paramVariete["AGauss"]*2.506628274631)

    return paramVariete, paramITK, paramTypeSol

def initialize_default_irrigation(data):

    # default irrigation scheme

    data["irrigation"] = data["rain"] * 0

    data["irrigation"].attrs = {"units":"mm", "long_name":"irrigation"}

    return data

def load_iSDA_soil_data(data, grid_width, grid_height):

    """

    This function loads iSDA soil data and crop to the area of interest remark :

    it would be nice to try to modulate the percentage of runoff according to

    slope

    First, this function loads the iSDA soil data, crops it to the area of

    interest and resamples it to match the grid resolution. The iSDA soil class

    raster map obtained is passed to the xarray dataset as a new variable.

    For the sake of example, the file that is used here already has been

    downsampled at TAMSAT resolution. Its specs are available on the CIRAD

    Dataverse at the following adress :

    https://doi.org/10.1038/s41598-021-85639-y

    Second, a correspondance table matching the iSDA soil classes to the soil

    physical properties is loaded. Maps of soil physical properties are then

    created and added to the xarray dataset.

    Returns:

        _type_: _description_

    """

    # load soil depth data, using the same code as in the load_iSDA_soil_data_alternate function

    soil_depth_file_path = "../data/assets/gyga_af_erzd__m_1km.tif"

    dataarray = rioxarray.open_rasterio(soil_depth_file_path)

    dataarray = dataarray.astype('float32') # converting to float32 to allow for NaNs

    dataarray = dataarray.rio.reproject_match(data, nodata=np.nan) # reprojecting to match the base data

    dataarray = dataarray.squeeze("band").drop_vars(["band"]) # removing the band dimension and variable

    dataarray = dataarray * 10 # converting from cm to mm

    data["profRu"] = dataarray # we add the soil depth data to the base data

    data["profRu"].attrs = {"units": "mm", "long_name": "Soil root zone depth (mm) adapted from Africa SoilGrids RZD"}

    del dataarray # we delete the temporary dataarray

    # load raster data

    soil_type_file_path = "../data/assets/iSDA_at_TAMSAT_resolution_zeroclass_1E6.tif"

    dataarray = rioxarray.open_rasterio(soil_type_file_path)

    # # determine the boundaries of the data xarray before cropping

    xmin, ymin, xmax, ymax = data.rio.set_spatial_dims(x_dim="x", y_dim="y").rio.bounds()

    # # crop to the area of interest

    dataarray = dataarray.where((dataarray.y < ymax)

                                & (dataarray.y > ymin)

                                & (dataarray.x > xmin)

                                & (dataarray.x < xmax)

                            ).dropna(dim='y', how='all').dropna(dim='x', how='all')

    # resample to match the grid resolution

    dataarray = dataarray.rio.reproject_match(data, nodata=np.nan)

    dataarray = dataarray.squeeze("band").drop_vars(["band"])

    dataarray.attrs = {"units":"arbitrary", "long_name":"soil_type"}

    # add soil type identifier to the dataset

    data["soil_type"] = dataarray

    data["soil_type"] = data["soil_type"] / 1000000 # conversion from 1E6 to 1E0

    data["soil_type"] = data["soil_type"].astype("float32")

    # load correspondance table

    path_soil_type_correspondance = "../data/assets/TypeSol_Moy13_HWSD.csv"

    df_soil_type_correspondance = pd.read_csv(path_soil_type_correspondance, sep=";", skiprows=1)

    # correspondance between soil properties naming in the csv file and in the dataset

    soil_variables = {

        "epaisseurProf" : "EpaisseurProf",

        "epaisseurSurf" : "EpaisseurSurf",

        "stockIniProf" : "StockIniProf",

        "stockIniSurf" : "StockIniSurf",

        "runoff_threshold" : "SeuilRuiss",

        "runoff_rate" : "PourcRuiss",

        "ru" : "Ru",

    }

    # create maps of soil properties and add them to the dataset

    for soil_variable in soil_variables :

        dict_values = dict(zip(df_soil_type_correspondance["Id"], df_soil_type_correspondance[soil_variables[soil_variable]]))

        dict_values[0] = np.nan

        soil_types_converted = np.reshape([dict_values[x.astype(int)] for x in data["soil_type"].to_numpy().flatten()], (grid_width, grid_height))

        data[soil_variable] = (data["soil_type"].dims,soil_types_converted)

        data[soil_variable] = data[soil_variable].astype("float32")

        # TODO: add dataarray.attrs = {} to precise units and long_name

    # converting pourcRuiss to decimal %

    data["runoff_rate"] = data["runoff_rate"] / 100

    return data

def load_iSDA_soil_data_alternate(data, grid_width, grid_height):

    """

    This function loads iSDA soil data and crop to the area of interest remark :

    it would be nice to try to modulate the percentage of runoff according to

    slope

    First, this function loads the iSDA soil data, crops it to the area of

    interest and resamples it to match the grid resolution. The iSDA soil class

    raster map obtained is passed to the xarray dataset as a new variable.

    For the sake of example, the file that is used here already has been

    downsampled at TAMSAT resolution. Its specs are available on the CIRAD

    Dataverse at the following adress :

    https://doi.org/10.1038/s41598-021-85639-y

    Second, a correspondance table matching the iSDA soil classes to the soil

    physical properties is loaded. Maps of soil physical properties are then

    created and added to the xarray dataset.

    Returns:

        _type_: _description_

    """

    # loading soil depth data

    # soil data is Africa SoilGrids - Root zone depth (cm)

    # reference of the dataset is https://data.isric.org/geonetwork/srv/fre/catalog.search#/metadata/c77d1209-56e9-4cac-b76e-bbf6c7e3a617

    # reference publication : https://doi.org/10.1016/j.geoderma.2018.02.046

    soil_depth_file_path = "../data/assets/gyga_af_erzd__m_1km.tif"

    dataarray = rioxarray.open_rasterio(soil_depth_file_path)

    dataarray = dataarray.astype('float32') # converting to float32 to allow for NaNs

    dataarray = dataarray.rio.reproject_match(data, nodata=np.nan) # reprojecting to match the base data

    dataarray = dataarray.squeeze("band").drop_vars(["band"]) # removing the band dimension and variable

    dataarray = dataarray * 10 # converting from cm to mm

    data["profRu"] = dataarray # we add the soil depth data to the base data

    data["profRu"].attrs = {"units": "mm", "long_name": "Soil root zone depth (mm) adapted from Africa SoilGrids RZD"}

    del dataarray # we delete the temporary dataarray

    # defining the soil surface reservoir depth with default values

    data["epaisseurSurf"] = 200 * xr.ones_like(data["profRu"])

    data["epaisseurSurf"].attrs = {"units": "mm", "long_name": "Soil surface reservoir depth (mm)"}

    # defining the initial water stock in the deep surface reservoir

    data["stockIniProf"] = 0 * xr.ones_like(data["profRu"])

    data["stockIniProf"].attrs = {"units": "mm", "long_name": "Initial water stock in the deep surface reservoir (mm)"}

    # loading soil texture data

    # soil data is adapted from iSDA Africa - USDA Soil Texture Class

    # reference of the original dataset is https://zenodo.org/record/4094616

    # reference of the adapted dataset is https://doi.org/10.18167/DVN1/YSVTS2

    soil_texture_class_file_path = "../data/assets/iSDA_at_TAMSAT_resolution_zeroclass_1E6.tif"

    dataarray = rioxarray.open_rasterio(soil_texture_class_file_path)

    dataarray = dataarray.rio.reproject_match(data, nodata=np.nan) # reprojecting to match the base data

    dataarray = dataarray.squeeze("band").drop_vars(["band"]) # removing the band dimension and variable

    dataarray.attrs = {"units":"arbitrary", "long_name":"soil_type"} # adding attributes

    data["soil_type"] = dataarray # add soil type identifier to the dataset

    data["soil_type"] = data["soil_type"] / 1000000 # conversion from 1E6 to 1E0

    data["soil_type"] = data["soil_type"].astype("float32")

    # load correspondance table

    path_soil_type_correspondance = "../data/assets/TypeSol_Moy13_HWSD.csv"

    df_soil_type_correspondance = pd.read_csv(path_soil_type_correspondance, sep=";", skiprows=1)

    # correspondance between soil properties naming in the csv file and in the dataset

    soil_variables = {

        "runoff_threshold" : "SeuilRuiss", # utilisé dans bilan_hydro.estimate_runoff pour le calcul de lr

        "runoff_rate" : "PourcRuiss", # utilisé dans bilan_hydro.estimate_runoff pour le calcul de lr

    }

    # create maps of soil properties and add them to the dataset

    for soil_variable in soil_variables :

        dict_values = dict(zip(df_soil_type_correspondance["Id"], df_soil_type_correspondance[soil_variables[soil_variable]]))

        dict_values[0] = np.nan

        soil_types_converted = np.reshape([dict_values[x.astype(int)] for x in data["soil_type"].to_numpy().flatten()], (grid_width, grid_height))

        data[soil_variable] = (data["soil_type"].dims,soil_types_converted)

        data[soil_variable] = data[soil_variable].astype("float32")

        # TODO: add dataarray.attrs = {} to precise units and long_name

    # converting pourcRuiss to decimal %

    data["runoff_rate"] = data["runoff_rate"] / 100

    del dataarray # we delete the temporary dataarray

    # loading RU

    soil_RZPAWC_file_path = "../data/assets/gyga_af_agg_erzd_tawcpf23mm__m_1km.tif"

    dataarray = rioxarray.open_rasterio(soil_RZPAWC_file_path)

    dataarray = dataarray.astype('float32')

    dataarray = dataarray.rio.reproject_match(data, nodata=np.nan) # reprojecting to match the base data

    dataarray = dataarray.squeeze("band").drop_vars(["band"]) # removing the band dimension and variable

    dataarray.attrs = {"units":"mm", "long_name":"Root zone plant available water capacity (mm)"} # adding attributes

    data["RZPAWC"] = dataarray # add soil type identifier to the dataset

    del dataarray # we delete the temporary dataarray

    #calculating RU (mm/m)

    data["ru"] = data["RZPAWC"] / (data["profRu"]/1000)

    return data

def calc_day_length(day, lat):

    """

    This function calculates the day length for a given day and latitude.

    Args:

        day (_type_): _description_

        lat (_type_): _description_

    Returns:

        _type_: _description_

    """

    # print(day, lat)

    coords = LocationInfo(latitude=float(lat), longitude=0.0)

    daylight = astral.sun.daylight(coords.observer, date=day)

    dureeDuJour = (daylight[1] - daylight[0]).seconds/3600

    return dureeDuJour

def calc_day_length_raster_fast(data, date_start, duration):

    vectorized_func = np.vectorize(calc_day_length)

    days = np.array([date_start + datetime.timedelta(days=i) for i in range(duration)])[...,np.newaxis]

    latitudes = np.array(data["y"])[np.newaxis,...]

    result = vectorized_func(days, latitudes)

    # we will first define an empty array of the same shape as the rain array

    data["dureeDuJour"] = (data["rain"].dims, np.zeros(data["rain"].shape))

    data["dureeDuJour"].values = np.repeat(result[...,np.newaxis], data["dureeDuJour"].shape[2], axis=2)

    return data

Functions

build_rainfall_files_df

def build_rainfall_files_df(
    rainfall_path,
    date_start,
    duration
)

This function builds a dataframe containing the list of rainfall files

from the provided path, and the given date_start and duration.

Helper function used in get_grid_size() and load_TAMSAT_data().

Parameters:

Name Type Description Default
rainfall_path type description None

Returns:

Type Description
type description
View Source
def build_rainfall_files_df(rainfall_path, date_start, duration):

    """

    This function builds a dataframe containing the list of rainfall files

    from the provided path, and the given date_start and duration.

    Helper function used in get_grid_size() and load_TAMSAT_data().

    Args:

        rainfall_path (_type_): _description_

    Returns:

        _type_: _description_

    """

    #rainfall_files = [f for f in listdir(rainfall_path) if isfile(join(rainfall_path, f))]

    # version that should be quicker

    rainfall_files = [f for f in listdir(rainfall_path)]# if isfile(join(rainfall_path, f))]

    rainfall_files_df = pd.DataFrame({"filename":rainfall_files}).sort_values("filename").reset_index(drop=True)

    rainfall_files_df["date"] = rainfall_files_df.apply(

        lambda x: datetime.date(

            int(x["filename"].replace(".tif","").split("_")[-3]),

            int(x["filename"].replace(".tif","").split("_")[-2]),

            int(x["filename"].replace(".tif","").split("_")[-1]),

        ),

        axis=1,

    )

    rainfall_files_df = rainfall_files_df[(rainfall_files_df["date"]>=date_start) & (rainfall_files_df["date"]<date_start+datetime.timedelta(days=duration))].reset_index(drop=True)

    return rainfall_files_df

calc_day_length

def calc_day_length(
    day,
    lat
)

This function calculates the day length for a given day and latitude.

Parameters:

Name Type Description Default
day type description None
lat type description None

Returns:

Type Description
type description
View Source
def calc_day_length(day, lat):

    """

    This function calculates the day length for a given day and latitude.

    Args:

        day (_type_): _description_

        lat (_type_): _description_

    Returns:

        _type_: _description_

    """

    # print(day, lat)

    coords = LocationInfo(latitude=float(lat), longitude=0.0)

    daylight = astral.sun.daylight(coords.observer, date=day)

    dureeDuJour = (daylight[1] - daylight[0]).seconds/3600

    return dureeDuJour

calc_day_length_raster_fast

def calc_day_length_raster_fast(
    data,
    date_start,
    duration
)
View Source
def calc_day_length_raster_fast(data, date_start, duration):

    vectorized_func = np.vectorize(calc_day_length)

    days = np.array([date_start + datetime.timedelta(days=i) for i in range(duration)])[...,np.newaxis]

    latitudes = np.array(data["y"])[np.newaxis,...]

    result = vectorized_func(days, latitudes)

    # we will first define an empty array of the same shape as the rain array

    data["dureeDuJour"] = (data["rain"].dims, np.zeros(data["rain"].shape))

    data["dureeDuJour"].values = np.repeat(result[...,np.newaxis], data["dureeDuJour"].shape[2], axis=2)

    return data

get_grid_size

def get_grid_size(
    rainfall_path,
    date_start,
    duration
)

This function loads the list of rainfall files corresponding to the given

date_start and duration, loads the first rainfall file, and returns its grid size, as dimensions of the rainfall grid define the output resolution of the model.

Parameters:

Name Type Description Default
TAMSAT_path type description None
date_start type description None
duration type description None

Returns:

Type Description
type description
View Source
def get_grid_size(rainfall_path, date_start, duration):

    """

    This function loads the list of rainfall files corresponding to the given

    date_start and duration, loads the first rainfall file, and returns its grid

    size, as dimensions of the rainfall grid define the output resolution of the

    model.

    Args:

        TAMSAT_path (_type_): _description_

        date_start (_type_): _description_

        duration (_type_): _description_

    Returns:

        _type_: _description_

    """

    rainfall_files_df = build_rainfall_files_df(rainfall_path, date_start, duration)

    # checking coherence between date_start and duration and available rainfall data

    if rainfall_files_df["date"].iloc[-1] != date_start+datetime.timedelta(days=duration-1) :

        raise ValueError("The date range may not be covered by the available rainfall data ; please check rainfall entry files.")

    # loading the first rainfall file to get the grid size

    src = rasterio.open(os.path.join(rainfall_path,rainfall_files_df.loc[0,"filename"]))

    array = src.read(1)

    grid_width = array.shape[0]

    grid_height = array.shape[1]

    return grid_width, grid_height

initialize_default_irrigation

def initialize_default_irrigation(
    data
)
View Source
def initialize_default_irrigation(data):

    # default irrigation scheme

    data["irrigation"] = data["rain"] * 0

    data["irrigation"].attrs = {"units":"mm", "long_name":"irrigation"}

    return data

load_AgERA5_data

def load_AgERA5_data(
    data,
    AgERA5_data_path,
    date_start,
    duration
)

This function loops over the AgERA5 raster files, and loads them into

xarray DataArray, which are then added to the data dictionary.

Parameters:

Name Type Description Default
data type description None
AgERA5_data_path type description None
date_start type description None
duration type description None

Returns:

Type Description
type description
View Source
def load_AgERA5_data(data, AgERA5_data_path, date_start, duration):

    """

    This function loops over the AgERA5 raster files, and loads them into

    xarray DataArray, which are then added to the data dictionary.

    Args:

        data (_type_): _description_

        AgERA5_data_path (_type_): _description_

        date_start (_type_): _description_

        duration (_type_): _description_

    Returns:

        _type_: _description_

    """

    # TODO : make the file listing error-proof regarding the structure of the folder and the file extensions

    # getting list of variables

    AgERA5_variables = [path.split("/")[-1] for path in [x[0] for x in os.walk(AgERA5_data_path)][1:]]

    # building a dictionary of dataframes containing the list of files for each variable

    AgERA5_files_df_collection = {}

    for variable in AgERA5_variables:

        #AgERA5_files = [f for f in listdir(os.path.join(AgERA5_data_path,variable)) if isfile(join(os.path.join(AgERA5_data_path,variable), f))]

        # alternate version that should be quicker

        AgERA5_files = [f for f in listdir(os.path.join(AgERA5_data_path,variable))]

        AgERA5_files_df_collection[variable] = pd.DataFrame({"filename":AgERA5_files})

        AgERA5_files_df_collection[variable]["date"] = AgERA5_files_df_collection[variable].apply(

            lambda x: datetime.date(

                int(x["filename"].replace(".tif","").split("_")[-3]),

                int(x["filename"].replace(".tif","").split("_")[-2]),

                int(x["filename"].replace(".tif","").split("_")[-1]),

            ),

            axis=1,

        )

    # building a dictionary of correspondance between AgERA5 variables and SARRA variables

    AgERA5_SARRA_correspondance = {

        '10m_wind_speed_24_hour_mean':None,

        '2m_temperature_24_hour_maximum':None,

        '2m_temperature_24_hour_mean':'tpMoy',

        '2m_temperature_24_hour_minimum':None,

        'ET0Hargeaves':'ET0',

        'solar_radiation_flux_daily':'rg',

        'vapour_pressure_24_hour_mean':None,

    }

    # loading the data

    for variable in tqdm(AgERA5_variables) :

        print(variable)

        if AgERA5_SARRA_correspondance[variable] != None :

            try:

                del dataarray_full

            except:

                pass

            for i in range(duration) :

                dataarray = rioxarray.open_rasterio(os.path.join(AgERA5_data_path,variable,AgERA5_files_df_collection[variable].loc[i,"filename"]))

                dataarray = dataarray.rio.reproject_match(data, nodata=np.nan)

                dataarray = dataarray.squeeze("band").drop_vars(["band"])

                # TODO: add dataarray.attrs = {} to precise units and long_name

                try:

                    dataarray_full = xr.concat([dataarray_full, dataarray],"time")

                except:

                    dataarray_full = dataarray

            # storing the variable in the data dictionary

            data[AgERA5_SARRA_correspondance[variable]] = dataarray_full

    # unit conversion for solar radiation (kJ/m2 as provided by AgERA5 to MJ/m2/day)

    data["rg"] = data["rg"]/1000

    return data

load_AgERA5_data_fast_dask

def load_AgERA5_data_fast_dask(
    data,
    AgERA5_data_path,
    date_start,
    duration
)

This function loops over the AgERA5 raster files, and loads them into

xarray DataArray, which are then added to the data dictionary.

Parameters:

Name Type Description Default
data type description None
AgERA5_data_path type description None
date_start type description None
duration type description None

Returns:

Type Description
type description
View Source
def load_AgERA5_data_fast_dask(data, AgERA5_data_path, date_start, duration):

    """

    This function loops over the AgERA5 raster files, and loads them into

    xarray DataArray, which are then added to the data dictionary.

    Args:

        data (_type_): _description_

        AgERA5_data_path (_type_): _description_

        date_start (_type_): _description_

        duration (_type_): _description_

    Returns:

        _type_: _description_

    """

    # TODO : make the file listing error-proof regarding the structure of the folder and the file extensions

    # getting list of variables

    AgERA5_variables = [path.split("/")[-1] for path in [x[0] for x in os.walk(AgERA5_data_path)][1:]]

    # building a dictionary of dataframes containing the list of files for each variable

    AgERA5_files_df_collection = {}

    for variable in AgERA5_variables:

        #AgERA5_files = [f for f in listdir(os.path.join(AgERA5_data_path,variable)) if isfile(join(os.path.join(AgERA5_data_path,variable), f))]

        # alternate version that should be quicker

        AgERA5_files = [f for f in listdir(os.path.join(AgERA5_data_path,variable))]

        AgERA5_files_df_collection[variable] = pd.DataFrame({"filename":AgERA5_files})

        AgERA5_files_df_collection[variable]["date"] = AgERA5_files_df_collection[variable].apply(

            lambda x: datetime.date(

                int(x["filename"].replace(".tif","").split("_")[-3]),

                int(x["filename"].replace(".tif","").split("_")[-2]),

                int(x["filename"].replace(".tif","").split("_")[-1]),

            ),

            axis=1,

        )

        AgERA5_files_df_collection[variable] = AgERA5_files_df_collection[variable][(AgERA5_files_df_collection[variable]["date"]>=date_start) & (AgERA5_files_df_collection[variable]["date"]<date_start+datetime.timedelta(days=duration))].reset_index(drop=True)

    # building a dictionary of correspondance between AgERA5 variables and SARRA variables

    AgERA5_SARRA_correspondance = {

        '10m_wind_speed_24_hour_mean':None,

        '2m_temperature_24_hour_maximum':None,

        '2m_temperature_24_hour_mean':'tpMoy',

        '2m_temperature_24_hour_minimum':None,

        'ET0Hargeaves':'ET0',

        'solar_radiation_flux_daily':'rg',

        'vapour_pressure_24_hour_mean':None,

    }

    # loading the data

    for variable in tqdm(AgERA5_variables) :

        if AgERA5_SARRA_correspondance[variable] != None :

            try:

                del dataarray_full

            except:

                pass

            # for i in range(duration) :

            #     dataarray = rioxarray.open_rasterio(os.path.join(AgERA5_data_path,variable,AgERA5_files_df_collection[variable].loc[i,"filename"]))

            #     dataarray = dataarray.rio.reproject_match(data, nodata=np.nan)

            #     dataarray = dataarray.squeeze("band").drop_vars(["band"])

            #     # TODO: add dataarray.attrs = {} to precise units and long_name

            #     try:

            #         dataarray_full = xr.concat([dataarray_full, dataarray],"time")

            #     except:

            #         dataarray_full = dataarray

            dataarray_full = xr.open_mfdataset([os.path.join(AgERA5_data_path,variable,AgERA5_files_df_collection[variable].loc[i,"filename"]) for i in range(len(AgERA5_files_df_collection[variable]))], concat_dim="time", combine="nested", chunks={"x":1000,"y":1000})

            dataarray_full = dataarray_full.squeeze("band").drop_vars(["band"])

            dataarray_full = dataarray_full.rio.reproject_match(data, nodata=np.nan)

            dataarray_full.rio.write_crs(4326,inplace=True)

            data[AgERA5_SARRA_correspondance[variable]] = dataarray_full["band_data"]

    # unit conversion for solar radiation (kJ/m2 as provided by AgERA5 to MJ/m2/day)

    data["rg"] = data["rg"]/1000

    return data

load_TAMSAT_data

def load_TAMSAT_data(
    data,
    TAMSAT_path,
    date_start,
    duration
)

This function loops over the rainfall raster files, and loads them into a

xarray DataArray, which is then added to the rain data dictionary. It is tailored to the TAMSAT rainfall data files, hence its name.

Parameters:

Name Type Description Default
data type description None
TAMSAT_path type description None
date_start type description None
duration type description None

Returns:

Type Description
type description
View Source
def load_TAMSAT_data(data, TAMSAT_path, date_start, duration):

    """

    This function loops over the rainfall raster files, and loads them into a

    xarray DataArray, which is then added to the rain data dictionary. It is

    tailored to the TAMSAT rainfall data files, hence its name.

    Args:

        data (_type_): _description_

        TAMSAT_path (_type_): _description_

        date_start (_type_): _description_

        duration (_type_): _description_

    Returns:

        _type_: _description_

    """

    TAMSAT_files_df = build_rainfall_files_df(TAMSAT_path, date_start, duration)

    for i in range(len(TAMSAT_files_df)):

        dataarray = rioxarray.open_rasterio(os.path.join(TAMSAT_path,TAMSAT_files_df.loc[i,"filename"]))

        dataarray = dataarray.squeeze("band").drop_vars(["band", "spatial_ref"])

        dataarray.attrs = {}

        try:

            dataarray_full = xr.concat([dataarray_full, dataarray],"time")

        except:

            dataarray_full = dataarray

    dataarray_full.rio.write_crs(4326,inplace=True)

    data["rain"] = dataarray_full

    data["rain"].attrs = {"units":"mm", "long_name":"rainfall"}

    return data

load_TAMSAT_data_fast

def load_TAMSAT_data_fast(
    data,
    rainfall_data_path,
    date_start,
    duration
)

This function loops over the rainfall raster files, and loads them into a

xarray DataArray, which is then added to the rain data dictionary. It is tailored to the TAMSAT rainfall data files, hence its name.

Parameters:

Name Type Description Default
data type description None
TAMSAT_path type description None
date_start type description None
duration type description None

Returns:

Type Description
type description
View Source
def load_TAMSAT_data_fast(data, rainfall_data_path, date_start, duration):

    """

    This function loops over the rainfall raster files, and loads them into a

    xarray DataArray, which is then added to the rain data dictionary. It is

    tailored to the TAMSAT rainfall data files, hence its name.

    Args:

        data (_type_): _description_

        TAMSAT_path (_type_): _description_

        date_start (_type_): _description_

        duration (_type_): _description_

    Returns:

        _type_: _description_

    """

    rainfall_data_path_df = build_rainfall_files_df(rainfall_data_path, date_start, duration)

    dataarray_full = xr.open_mfdataset([os.path.join(rainfall_data_path,rainfall_data_path_df.loc[i,"filename"]) for i in range(len(rainfall_data_path_df))], concat_dim="time", combine="nested", chunks={"x":100,"y":100})

    dataarray_full = dataarray_full.squeeze("band").drop_vars(["band"])

    dataarray_full.rio.write_crs(4326,inplace=True)

    data["rain"] = dataarray_full["band_data"]

    return data

load_YAML_parameters

def load_YAML_parameters(
    file_paramVariete,
    file_paramITK,
    file_paramTypeSol
)

This function loads the parameters from the yaml files.

It is a wrapper for the three functions above.

Parameters:

Name Type Description Default
file_paramVariete type description None
file_paramITK type description None
file_paramTypeSol type description None

Returns:

Type Description
type description
View Source
def load_YAML_parameters(file_paramVariete, file_paramITK, file_paramTypeSol):

    """

    This function loads the parameters from the yaml files.

    It is a wrapper for the three functions above.

    Args:

        file_paramVariete (_type_): _description_

        file_paramITK (_type_): _description_

        file_paramTypeSol (_type_): _description_

    Returns:

        _type_: _description_

    """

    paramVariete = load_paramVariete(file_paramVariete)

    paramITK = load_paramITK(file_paramITK)

    paramTypeSol = load_paramTypeSol(file_paramTypeSol)

    if ~np.isnan(paramITK["NI"]):

        print("NI NON NULL")

        paramVariete["txConversion"] = paramVariete["NIYo"] + paramVariete["NIp"] * (1-np.exp(-paramVariete["NIp"] * paramITK["NI"])) - (np.exp(-0.5*((paramITK["NI"] - paramVariete["LGauss"])/paramVariete["AGauss"])* (paramITK["NI"]- paramVariete["LGauss"])/paramVariete["AGauss"]))/(paramVariete["AGauss"]*2.506628274631)

    return paramVariete, paramITK, paramTypeSol

load_iSDA_soil_data

def load_iSDA_soil_data(
    data,
    grid_width,
    grid_height
)

This function loads iSDA soil data and crop to the area of interest remark :

it would be nice to try to modulate the percentage of runoff according to slope

First, this function loads the iSDA soil data, crops it to the area of interest and resamples it to match the grid resolution. The iSDA soil class raster map obtained is passed to the xarray dataset as a new variable.

For the sake of example, the file that is used here already has been downsampled at TAMSAT resolution. Its specs are available on the CIRAD Dataverse at the following adress : https://doi.org/10.1038/s41598-021-85639-y

Second, a correspondance table matching the iSDA soil classes to the soil physical properties is loaded. Maps of soil physical properties are then created and added to the xarray dataset.

Returns:

Type Description
type description
View Source
def load_iSDA_soil_data(data, grid_width, grid_height):

    """

    This function loads iSDA soil data and crop to the area of interest remark :

    it would be nice to try to modulate the percentage of runoff according to

    slope

    First, this function loads the iSDA soil data, crops it to the area of

    interest and resamples it to match the grid resolution. The iSDA soil class

    raster map obtained is passed to the xarray dataset as a new variable.

    For the sake of example, the file that is used here already has been

    downsampled at TAMSAT resolution. Its specs are available on the CIRAD

    Dataverse at the following adress :

    https://doi.org/10.1038/s41598-021-85639-y

    Second, a correspondance table matching the iSDA soil classes to the soil

    physical properties is loaded. Maps of soil physical properties are then

    created and added to the xarray dataset.

    Returns:

        _type_: _description_

    """

    # load soil depth data, using the same code as in the load_iSDA_soil_data_alternate function

    soil_depth_file_path = "../data/assets/gyga_af_erzd__m_1km.tif"

    dataarray = rioxarray.open_rasterio(soil_depth_file_path)

    dataarray = dataarray.astype('float32') # converting to float32 to allow for NaNs

    dataarray = dataarray.rio.reproject_match(data, nodata=np.nan) # reprojecting to match the base data

    dataarray = dataarray.squeeze("band").drop_vars(["band"]) # removing the band dimension and variable

    dataarray = dataarray * 10 # converting from cm to mm

    data["profRu"] = dataarray # we add the soil depth data to the base data

    data["profRu"].attrs = {"units": "mm", "long_name": "Soil root zone depth (mm) adapted from Africa SoilGrids RZD"}

    del dataarray # we delete the temporary dataarray

    # load raster data

    soil_type_file_path = "../data/assets/iSDA_at_TAMSAT_resolution_zeroclass_1E6.tif"

    dataarray = rioxarray.open_rasterio(soil_type_file_path)

    # # determine the boundaries of the data xarray before cropping

    xmin, ymin, xmax, ymax = data.rio.set_spatial_dims(x_dim="x", y_dim="y").rio.bounds()

    # # crop to the area of interest

    dataarray = dataarray.where((dataarray.y < ymax)

                                & (dataarray.y > ymin)

                                & (dataarray.x > xmin)

                                & (dataarray.x < xmax)

                            ).dropna(dim='y', how='all').dropna(dim='x', how='all')

    # resample to match the grid resolution

    dataarray = dataarray.rio.reproject_match(data, nodata=np.nan)

    dataarray = dataarray.squeeze("band").drop_vars(["band"])

    dataarray.attrs = {"units":"arbitrary", "long_name":"soil_type"}

    # add soil type identifier to the dataset

    data["soil_type"] = dataarray

    data["soil_type"] = data["soil_type"] / 1000000 # conversion from 1E6 to 1E0

    data["soil_type"] = data["soil_type"].astype("float32")

    # load correspondance table

    path_soil_type_correspondance = "../data/assets/TypeSol_Moy13_HWSD.csv"

    df_soil_type_correspondance = pd.read_csv(path_soil_type_correspondance, sep=";", skiprows=1)

    # correspondance between soil properties naming in the csv file and in the dataset

    soil_variables = {

        "epaisseurProf" : "EpaisseurProf",

        "epaisseurSurf" : "EpaisseurSurf",

        "stockIniProf" : "StockIniProf",

        "stockIniSurf" : "StockIniSurf",

        "runoff_threshold" : "SeuilRuiss",

        "runoff_rate" : "PourcRuiss",

        "ru" : "Ru",

    }

    # create maps of soil properties and add them to the dataset

    for soil_variable in soil_variables :

        dict_values = dict(zip(df_soil_type_correspondance["Id"], df_soil_type_correspondance[soil_variables[soil_variable]]))

        dict_values[0] = np.nan

        soil_types_converted = np.reshape([dict_values[x.astype(int)] for x in data["soil_type"].to_numpy().flatten()], (grid_width, grid_height))

        data[soil_variable] = (data["soil_type"].dims,soil_types_converted)

        data[soil_variable] = data[soil_variable].astype("float32")

        # TODO: add dataarray.attrs = {} to precise units and long_name

    # converting pourcRuiss to decimal %

    data["runoff_rate"] = data["runoff_rate"] / 100

    return data

load_iSDA_soil_data_alternate

def load_iSDA_soil_data_alternate(
    data,
    grid_width,
    grid_height
)

This function loads iSDA soil data and crop to the area of interest remark :

it would be nice to try to modulate the percentage of runoff according to slope

First, this function loads the iSDA soil data, crops it to the area of interest and resamples it to match the grid resolution. The iSDA soil class raster map obtained is passed to the xarray dataset as a new variable.

For the sake of example, the file that is used here already has been downsampled at TAMSAT resolution. Its specs are available on the CIRAD Dataverse at the following adress : https://doi.org/10.1038/s41598-021-85639-y

Second, a correspondance table matching the iSDA soil classes to the soil physical properties is loaded. Maps of soil physical properties are then created and added to the xarray dataset.

Returns:

Type Description
type description
View Source
def load_iSDA_soil_data_alternate(data, grid_width, grid_height):

    """

    This function loads iSDA soil data and crop to the area of interest remark :

    it would be nice to try to modulate the percentage of runoff according to

    slope

    First, this function loads the iSDA soil data, crops it to the area of

    interest and resamples it to match the grid resolution. The iSDA soil class

    raster map obtained is passed to the xarray dataset as a new variable.

    For the sake of example, the file that is used here already has been

    downsampled at TAMSAT resolution. Its specs are available on the CIRAD

    Dataverse at the following adress :

    https://doi.org/10.1038/s41598-021-85639-y

    Second, a correspondance table matching the iSDA soil classes to the soil

    physical properties is loaded. Maps of soil physical properties are then

    created and added to the xarray dataset.

    Returns:

        _type_: _description_

    """

    # loading soil depth data

    # soil data is Africa SoilGrids - Root zone depth (cm)

    # reference of the dataset is https://data.isric.org/geonetwork/srv/fre/catalog.search#/metadata/c77d1209-56e9-4cac-b76e-bbf6c7e3a617

    # reference publication : https://doi.org/10.1016/j.geoderma.2018.02.046

    soil_depth_file_path = "../data/assets/gyga_af_erzd__m_1km.tif"

    dataarray = rioxarray.open_rasterio(soil_depth_file_path)

    dataarray = dataarray.astype('float32') # converting to float32 to allow for NaNs

    dataarray = dataarray.rio.reproject_match(data, nodata=np.nan) # reprojecting to match the base data

    dataarray = dataarray.squeeze("band").drop_vars(["band"]) # removing the band dimension and variable

    dataarray = dataarray * 10 # converting from cm to mm

    data["profRu"] = dataarray # we add the soil depth data to the base data

    data["profRu"].attrs = {"units": "mm", "long_name": "Soil root zone depth (mm) adapted from Africa SoilGrids RZD"}

    del dataarray # we delete the temporary dataarray

    # defining the soil surface reservoir depth with default values

    data["epaisseurSurf"] = 200 * xr.ones_like(data["profRu"])

    data["epaisseurSurf"].attrs = {"units": "mm", "long_name": "Soil surface reservoir depth (mm)"}

    # defining the initial water stock in the deep surface reservoir

    data["stockIniProf"] = 0 * xr.ones_like(data["profRu"])

    data["stockIniProf"].attrs = {"units": "mm", "long_name": "Initial water stock in the deep surface reservoir (mm)"}

    # loading soil texture data

    # soil data is adapted from iSDA Africa - USDA Soil Texture Class

    # reference of the original dataset is https://zenodo.org/record/4094616

    # reference of the adapted dataset is https://doi.org/10.18167/DVN1/YSVTS2

    soil_texture_class_file_path = "../data/assets/iSDA_at_TAMSAT_resolution_zeroclass_1E6.tif"

    dataarray = rioxarray.open_rasterio(soil_texture_class_file_path)

    dataarray = dataarray.rio.reproject_match(data, nodata=np.nan) # reprojecting to match the base data

    dataarray = dataarray.squeeze("band").drop_vars(["band"]) # removing the band dimension and variable

    dataarray.attrs = {"units":"arbitrary", "long_name":"soil_type"} # adding attributes

    data["soil_type"] = dataarray # add soil type identifier to the dataset

    data["soil_type"] = data["soil_type"] / 1000000 # conversion from 1E6 to 1E0

    data["soil_type"] = data["soil_type"].astype("float32")

    # load correspondance table

    path_soil_type_correspondance = "../data/assets/TypeSol_Moy13_HWSD.csv"

    df_soil_type_correspondance = pd.read_csv(path_soil_type_correspondance, sep=";", skiprows=1)

    # correspondance between soil properties naming in the csv file and in the dataset

    soil_variables = {

        "runoff_threshold" : "SeuilRuiss", # utilisé dans bilan_hydro.estimate_runoff pour le calcul de lr

        "runoff_rate" : "PourcRuiss", # utilisé dans bilan_hydro.estimate_runoff pour le calcul de lr

    }

    # create maps of soil properties and add them to the dataset

    for soil_variable in soil_variables :

        dict_values = dict(zip(df_soil_type_correspondance["Id"], df_soil_type_correspondance[soil_variables[soil_variable]]))

        dict_values[0] = np.nan

        soil_types_converted = np.reshape([dict_values[x.astype(int)] for x in data["soil_type"].to_numpy().flatten()], (grid_width, grid_height))

        data[soil_variable] = (data["soil_type"].dims,soil_types_converted)

        data[soil_variable] = data[soil_variable].astype("float32")

        # TODO: add dataarray.attrs = {} to precise units and long_name

    # converting pourcRuiss to decimal %

    data["runoff_rate"] = data["runoff_rate"] / 100

    del dataarray # we delete the temporary dataarray

    # loading RU

    soil_RZPAWC_file_path = "../data/assets/gyga_af_agg_erzd_tawcpf23mm__m_1km.tif"

    dataarray = rioxarray.open_rasterio(soil_RZPAWC_file_path)

    dataarray = dataarray.astype('float32')

    dataarray = dataarray.rio.reproject_match(data, nodata=np.nan) # reprojecting to match the base data

    dataarray = dataarray.squeeze("band").drop_vars(["band"]) # removing the band dimension and variable

    dataarray.attrs = {"units":"mm", "long_name":"Root zone plant available water capacity (mm)"} # adding attributes

    data["RZPAWC"] = dataarray # add soil type identifier to the dataset

    del dataarray # we delete the temporary dataarray

    #calculating RU (mm/m)

    data["ru"] = data["RZPAWC"] / (data["profRu"]/1000)

    return data

load_paramITK

def load_paramITK(
    file_paramITK
)

This function loads the ITK parameters from the yaml file.

Parameters:

Name Type Description Default
file_paramITK type description None

Returns:

Type Description
type description
View Source
def load_paramITK(file_paramITK):

    """

    This function loads the ITK parameters from the yaml file.

    Args:

        file_paramITK (_type_): _description_

    Returns:

        _type_: _description_

    """

    with open(os.path.join('../data/params/itk/',file_paramITK), 'r') as stream:

        paramITK = yaml.safe_load(stream)

    paramITK["DateSemis"] = datetime.datetime.strptime(paramITK["DateSemis"], "%Y-%m-%d").date()

    return paramITK

load_paramTypeSol

def load_paramTypeSol(
    file_paramTypeSol
)

This function loads the soil parameters from the yaml file.

Parameters:

Name Type Description Default
file_paramTypeSol type description None

Returns:

Type Description
type description
View Source
def load_paramTypeSol(file_paramTypeSol):

    """

    This function loads the soil parameters from the yaml file.

    Args:

        file_paramTypeSol (_type_): _description_

    Returns:

        _type_: _description_

    """

    with open(os.path.join('../data/params/soil/',file_paramTypeSol), 'r') as stream:

        paramTypeSol = yaml.safe_load(stream)

    return paramTypeSol

load_paramVariete

def load_paramVariete(
    file_paramVariete
)

This function loads the parameters of the variety from the yaml file.

Parameters:

Name Type Description Default
file_paramVariete type description None

Returns:

Type Description
type description

Raises:

Type Description
exception description
View Source
def load_paramVariete(file_paramVariete) :

    """

    This function loads the parameters of the variety from the yaml file.

    Args:

        file_paramVariete (_type_): _description_

    Raises:

        exception: _description_

    Returns:

        _type_: _description_

    """

    with open(os.path.join('../data/params/variety/',file_paramVariete), 'r') as stream:

        paramVariete = yaml.safe_load(stream)

    if paramVariete["feuilAeroBase"] == 0.1 :

        raise Exception()

    return paramVariete