diff --git a/configs/esm_software/esm_runscripts/esm_plugins.yaml b/configs/esm_software/esm_runscripts/esm_plugins.yaml index 47158d88e..346382069 100644 --- a/configs/esm_software/esm_runscripts/esm_plugins.yaml +++ b/configs/esm_software/esm_runscripts/esm_plugins.yaml @@ -111,3 +111,7 @@ core: workflow: - "assemble_workflow" + catalog: + - "create_intake_esm_catalog" + - "write_intake_esm_catalog" + diff --git a/configs/esm_software/esm_runscripts/esm_runscripts.yaml b/configs/esm_software/esm_runscripts/esm_runscripts.yaml index 8e425a98e..b5610dc35 100644 --- a/configs/esm_software/esm_runscripts/esm_runscripts.yaml +++ b/configs/esm_software/esm_runscripts/esm_runscripts.yaml @@ -72,6 +72,8 @@ choose_job_type: - "copy_stuff_back_from_work" - "copy_all_results_to_exp" - "clean_run_dir" + - "create_intake_esm_catalog" + - "write_intake_esm_catalog" prepcompute: recipe: diff --git a/examples/catalogs/example-intake-tutorial_1.yaml b/examples/catalogs/example-intake-tutorial_1.yaml new file mode 100644 index 000000000..b055f6f97 --- /dev/null +++ b/examples/catalogs/example-intake-tutorial_1.yaml @@ -0,0 +1,90 @@ +aliases: + capitals: capitals + inverted: inverted + multi: multi + tute: tute +data: + 7e0b327a50eef58d: + datatype: intake.readers.datatypes:CSV + kwargs: + metadata: {} + storage_options: + anon: true + url: '{CATALOG_DIR}/intake_1.csv' + metadata: {} + user_parameters: {} +entries: + capitals: + kwargs: + out_instances: + - pandas:DataFrame + - pandas:DataFrame + - pandas:DataFrame + - pandas:DataFrame + steps: + - - '{data(tute)}' + - [] + - {} + - - '{func(intake.readers.transform:Method)}' + - [] + - method_name: a + - - '{func(intake.readers.transform:Method)}' + - [] + - method_name: str + - - '{func(intake.readers.transform:Method)}' + - [] + - method_name: capitalize + metadata: {} + output_instance: pandas:DataFrame + reader: intake.readers.convert:Pipeline + user_parameters: {} + inverted: + kwargs: + out_instances: + - pandas:DataFrame + - pandas:DataFrame + steps: + - - '{data(tute)}' + - [] + - {} + - - '{func(intake.readers.transform:Method)}' + - [] + - args: + - b + ascending: false + method_name: sort_values + metadata: {} + output_instance: pandas:DataFrame + reader: intake.readers.convert:Pipeline + user_parameters: {} + multi: + kwargs: + out_instances: + - pandas:DataFrame + - pandas:DataFrame + steps: + - - '{data(tute)}' + - [] + - {} + - - '{func(intake.readers.transform:Method)}' + - [] + - c: '{data(capitals)}' + method_name: assign + metadata: {} + output_instance: pandas:DataFrame + reader: intake.readers.convert:Pipeline + user_parameters: {} + tute: + kwargs: + data: '{data(7e0b327a50eef58d)}' + metadata: {} + output_instance: pandas:DataFrame + reader: intake.readers.readers:PandasCSV + user_parameters: {} +metadata: {} +user_parameters: + CATALOG_DIR: s3://mymdtemp + CATALOG_PATH: s3://mymdtemp/intake_1.yaml + STORAGE_OPTIONS: + anon: true +version: 2 diff --git a/examples/catalogs/example-multiple-netcdf-inside-remote-zip.yaml b/examples/catalogs/example-multiple-netcdf-inside-remote-zip.yaml new file mode 100644 index 000000000..65e5134f3 --- /dev/null +++ b/examples/catalogs/example-multiple-netcdf-inside-remote-zip.yaml @@ -0,0 +1,25 @@ +version: 2 +data: + topo: + user_parameters: {} + description: "Specific NetCDF file from remote Zip" + driver: netcdf + args: + urlpath: "zip://Supp/herold_etal_eocene_topo_1x1.nc::https://gmd.copernicus.org/articles/7/2077/2014/gmd-7-2077-2014-supplement.zip" + metadata: + file_in_zip: "Supp/herold_etal_eocene_topo_1x1.nc" + origin: "Remote Zip" + all_files: + user_parameters: {} + description: "Specific NetCDF file from remote Zip" + driver: netcdf + args: + urlpath: "zip://**/*.nc::https://gmd.copernicus.org/articles/7/2077/2014/gmd-7-2077-2014-supplement.zip" + metadata: + file_in_zip: "Supp/herold_etal_eocene_topo_1x1.nc" + origin: "Remote Zip" +user_parameters: {} +aliases: {} +metadata: {} +entries: {} + diff --git a/examples/catalogs/example-netcdf-inside-remote-zip.yaml b/examples/catalogs/example-netcdf-inside-remote-zip.yaml new file mode 100644 index 000000000..9571c2293 --- /dev/null +++ b/examples/catalogs/example-netcdf-inside-remote-zip.yaml @@ -0,0 +1,15 @@ +version: 2 +data: {} +user_parameters: {} +aliases: {} +metadata: {} +entries: + topo: + user_parameters: {} + description: "Specific NetCDF file from remote Zip" + driver: netcdf + args: + urlpath: "zip://Supp/herold_etal_eocene_topo_1x1.nc::https://gmd.copernicus.org/articles/7/2077/2014/gmd-7-2077-2014-supplement.zip" + metadata: + file_in_zip: "Supp/herold_etal_eocene_topo_1x1.nc" + origin: "Remote Zip" diff --git a/setup.py b/setup.py index 74323b429..079008f9d 100644 --- a/setup.py +++ b/setup.py @@ -14,13 +14,17 @@ requirements = [ "Click>=8.0.4", # Maximum version for Python 3.6 support + "cfgrib", "PyGithub==1.55", "colorama==0.4.5", "coloredlogs==15.0.1", # NOTE(PG): Should be removed during cleanup for loguru instead + "dpath", "emoji==1.7.0", "f90nml==1.4.2", "gfw-creator==0.2.2", "gitpython==3.1.41", # Maximum version for Python 3.6 support + "intake", + "intake-esm", "jinja2==3.1.4", "loguru==0.6.0", "numpy>=1.19.5", # Maximum version for Python 3.6 support diff --git a/src/esm_runscripts/__init__.py b/src/esm_runscripts/__init__.py index 0ce5ebf4e..9a6b02c2c 100644 --- a/src/esm_runscripts/__init__.py +++ b/src/esm_runscripts/__init__.py @@ -6,6 +6,7 @@ from .batch_system import * from .chunky_parts import * +from .catalog import * from .database import * from .database_actions import * from .dataprocess import * diff --git a/src/esm_runscripts/catalog.py b/src/esm_runscripts/catalog.py new file mode 100644 index 000000000..641d002a1 --- /dev/null +++ b/src/esm_runscripts/catalog.py @@ -0,0 +1,203 @@ +import datetime +import getpass +import hashlib +import json +import pathlib + +import dpath +import intake +import intake_esm # noqa: F401, import only needed to register intake-esm driver +import pandas as pd +import xarray as xr +from loguru import logger + + +def create_intake_esm_catalog(config): + """ + Create an intake-esm catalog based on the configuration. + + This creates an intake-esm compatible catalog based on the simulation + configuration. The catalog is stored under ``config["intake"]``. Write + to disk as a separate step, this is not handled here! + + Creation of the catalog can be controlled via the configuration key:: + + config["general"]["create_catalog"] = True + + Default is ``False`` during the testing phase. To enable catalog creation, please + use your run configuration file. + + + Parameters + ---------- + config : dict + The simulation configuration + + Notes + ----- + The catalog attributes for each entry define what can be searched for and filtered. The exact + specifications of what should be included seems to be ambiguous and could be defined however + we want. Here, we use NextGEMS as a basis, and remove some of the fields which are not obvious + as a start. To see what is used in the DKRZ NextGems catalogue:: + + $ jq .attributes[].column_name /pool/data/Catalogs/dkrz_nextgems_disk.json + + See Also + -------- + * https://intake.readthedocs.io/en/latest/index.html + * https://github.com/NCAR/esm-collection-spec/blob/master/collection-spec/collection-spec.md + * https://tutorials.dkrz.de/tutorial_intake-5-create-esm-collection.html + """ + if not config.get("intake", {}).get("create_catalog", False): + return config + catalog = config.get("intake", {}).get("catalog", {}) + catalog["esmcat_version"] = "0.1.0" + attributes = catalog["attributes"] = [] + catalog_attrs = [ + "variable_id", + "project", + "institution_id", + "source_id", + "experiment_id", + "realm", + "time_min", + "time_max", + ] + for attr in catalog_attrs: + attributes.append(dict(column_name=attr, vocabulary="")) + assets = catalog["assets"] = dict(column_name="uri", format_column_name="format") + aggregation_control = catalog["aggregation_control"] = { + "variable_column_name": "variable_id", + "groupby_attrs": [ + "project", + "institution_id", + "source_id", + "experiment_id", + "realm", + ], + "aggregations": [ + {"type": "union", "attribute_name": "variable_id", "options": {}}, + { + "type": "join_existing", + "attribute_name": "time_min", + "options": {"dim": "time", "coords": "minimal", "compat": "override"}, + }, + ], + } + catalog["id"] = hashlib.sha256( + f"{config['general']['expid']}_{datetime.datetime.now()}_{getpass.getuser()}".encode() + ).hexdigest() + catalog["description"] = str(config["general"].get("description")) + catalog["title"] = f"Intake-ESM Catalog for Experiment {config['general']['expid']}" + catalog["last_updated"] = str(datetime.datetime.now()) + catalog_dict = catalog["catalog_dict"] = [] + # Each entry in catalog_dict should correspond to the schema provided + # in catalog_attrs plus assets + for model in config["general"]["valid_model_names"]: + logger.info(f"Cataloguing output of model {model}") + mconfig = config[model] + # FIXME(PG): This is not how we should determine which files are in the experiment outdata + # since this will list **all** files, not just the ones added during this run. + for output_file in pathlib.Path(mconfig["experiment_outdata_dir"]).iterdir(): + # TODO(PG): @JanStreffing, how does OIFS output look like? GRIB, NetCDF? + # Known GRIB output models: + if mconfig["model"] in ["echam", "jsbach"]: + if "codes" in output_file.suffix or "idx" in output_file.suffix: + logger.debug( + "Skipping codes file or already processed grib outputfile" + ) + continue + # TODO(PG): Add zarr support later on + xarray_engine = "netcdf4" if "nc" in output_file.suffix else "cfgrib" + # NOTE(PG): Determine which variables are contained in the file, I don't know + # but this could be better... + try: + var_list = list( + xr.open_dataset(output_file, engine=xarray_engine).variables.keys() + ) + except Exception as e: + logger.warning(f"Could not determine variables in {output_file}: {e}") + logger.warning("This file is not added to the catalog!") + continue + this_asset = dict( + project=config["general"].get("project", "esm_tools"), + institution_id="AWI", + source_id=f"{config['general']['model']}-{config['general']['version']}", + experiment_id=str(config["general"]["expid"]), + realm=str(mconfig.get("type", "UNKNOWN")), + time_min=str(config["general"]["start_date"]), + time_max=str(config["general"]["end_date"]), + uri=f"file://{output_file}", + # _data_format_=xarray_engine, # NOTE(PG): I don't like needing this... + format=xarray_engine, + variable_id=var_list, + ) + catalog_dict.append(this_asset) + + catalog_dict = catalog["catalog_dict"] + catalog_df = pd.DataFrame(catalog_dict) + # Try to construct the esm_datastore object: + validated_cat = intake.open_esm_datastore(obj=dict(esmcat=catalog, df=catalog_df)) + config["intake"] = config.get("intake", {}) + config["intake"]["catalog"] = validated_cat + return config + + +def write_intake_esm_catalog(config): + """ + Save the intake catalog to disk. + + This function saves the intake catalog specified in the configuration to a YAML file + on disk. If a previous catalog exists, it merges the new catalog with the existing one. + + Saving of the catalog can be controlled via the configuration key:: + + config["intake"]["write_catalog"] = True + + Default is ``False`` during the testing phase. Please enable this in your run configuration. + + Parameters + ---------- + config : dict + Configuration dictionary containing the catalog and general settings. + Expected keys are "general" and "intake". + + Returns + ------- + dict + The updated configuration dictionary with the merged intake catalog. + """ + if not config.get("intake", {}).get("write_catalog", False): + return config + + cat_file = pathlib.Path( + f'{config["general"]["experiment_dir"]}/{config["general"]["expid"]}_intake_catalog.json' + ) + catalog = config["intake"]["catalog"] + + if cat_file.exists(): + with open(cat_file, "r") as f: + prev_cat = json.load(f) + else: + prev_cat = {} + + catalog_name = cat_file.stem + catalog.serialize(catalog_name, directory=cat_file.parent) + # catalog.serialize("paul", directory=cat_file.parent) + + backup_file = cat_file.with_suffix(".json_backup") + if cat_file.exists(): + with open(cat_file, "r") as f: + with open(backup_file, "w") as backup: + backup.write(f.read()) + + # Merge the new catalog into the previous one + with open(cat_file, "r") as f: + new_cat = json.load(f) + dpath.merge(prev_cat, new_cat) + + # Save the merged catalog back to disk + with open(cat_file, "w") as f: + json.dump(prev_cat, f, indent=4) + config["intake"]["catalog_json"] = prev_cat + return config