Skip to content
4 changes: 4 additions & 0 deletions configs/esm_software/esm_runscripts/esm_plugins.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,7 @@ core:
workflow:
- "assemble_workflow"

catalog:
- "create_intake_esm_catalog"
- "write_intake_esm_catalog"

2 changes: 2 additions & 0 deletions configs/esm_software/esm_runscripts/esm_runscripts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ choose_job_type:
- "copy_stuff_back_from_work"
- "copy_all_results_to_exp"
- "clean_run_dir"
- "create_intake_esm_catalog"
- "write_intake_esm_catalog"

prepcompute:
recipe:
Expand Down
90 changes: 90 additions & 0 deletions examples/catalogs/example-intake-tutorial_1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
aliases:
capitals: capitals
inverted: inverted
multi: multi
tute: tute
data:
7e0b327a50eef58d:
datatype: intake.readers.datatypes:CSV
kwargs:
metadata: {}
storage_options:
anon: true
url: '{CATALOG_DIR}/intake_1.csv'
metadata: {}
user_parameters: {}
entries:
capitals:
kwargs:
out_instances:
- pandas:DataFrame
- pandas:DataFrame
- pandas:DataFrame
- pandas:DataFrame
steps:
- - '{data(tute)}'
- []
- {}
- - '{func(intake.readers.transform:Method)}'
- []
- method_name: a
- - '{func(intake.readers.transform:Method)}'
- []
- method_name: str
- - '{func(intake.readers.transform:Method)}'
- []
- method_name: capitalize
metadata: {}
output_instance: pandas:DataFrame
reader: intake.readers.convert:Pipeline
user_parameters: {}
inverted:
kwargs:
out_instances:
- pandas:DataFrame
- pandas:DataFrame
steps:
- - '{data(tute)}'
- []
- {}
- - '{func(intake.readers.transform:Method)}'
- []
- args:
- b
ascending: false
method_name: sort_values
metadata: {}
output_instance: pandas:DataFrame
reader: intake.readers.convert:Pipeline
user_parameters: {}
multi:
kwargs:
out_instances:
- pandas:DataFrame
- pandas:DataFrame
steps:
- - '{data(tute)}'
- []
- {}
- - '{func(intake.readers.transform:Method)}'
- []
- c: '{data(capitals)}'
method_name: assign
metadata: {}
output_instance: pandas:DataFrame
reader: intake.readers.convert:Pipeline
user_parameters: {}
tute:
kwargs:
data: '{data(7e0b327a50eef58d)}'
metadata: {}
output_instance: pandas:DataFrame
reader: intake.readers.readers:PandasCSV
user_parameters: {}
metadata: {}
user_parameters:
CATALOG_DIR: s3://mymdtemp
CATALOG_PATH: s3://mymdtemp/intake_1.yaml
STORAGE_OPTIONS:
anon: true
version: 2
25 changes: 25 additions & 0 deletions examples/catalogs/example-multiple-netcdf-inside-remote-zip.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
version: 2
data:
topo:
user_parameters: {}
description: "Specific NetCDF file from remote Zip"
driver: netcdf
args:
urlpath: "zip://Supp/herold_etal_eocene_topo_1x1.nc::https://gmd.copernicus.org/articles/7/2077/2014/gmd-7-2077-2014-supplement.zip"
metadata:
file_in_zip: "Supp/herold_etal_eocene_topo_1x1.nc"
origin: "Remote Zip"
all_files:
user_parameters: {}
description: "Specific NetCDF file from remote Zip"
driver: netcdf
args:
urlpath: "zip://**/*.nc::https://gmd.copernicus.org/articles/7/2077/2014/gmd-7-2077-2014-supplement.zip"
metadata:
file_in_zip: "Supp/herold_etal_eocene_topo_1x1.nc"
origin: "Remote Zip"
user_parameters: {}
aliases: {}
metadata: {}
entries: {}

15 changes: 15 additions & 0 deletions examples/catalogs/example-netcdf-inside-remote-zip.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version: 2
data: {}
user_parameters: {}
aliases: {}
metadata: {}
entries:
topo:
user_parameters: {}
description: "Specific NetCDF file from remote Zip"
driver: netcdf
args:
urlpath: "zip://Supp/herold_etal_eocene_topo_1x1.nc::https://gmd.copernicus.org/articles/7/2077/2014/gmd-7-2077-2014-supplement.zip"
metadata:
file_in_zip: "Supp/herold_etal_eocene_topo_1x1.nc"
origin: "Remote Zip"
4 changes: 4 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@

requirements = [
"Click>=8.0.4", # Maximum version for Python 3.6 support
"cfgrib",
"PyGithub==1.55",
"colorama==0.4.5",
"coloredlogs==15.0.1", # NOTE(PG): Should be removed during cleanup for loguru instead
"dpath",
"emoji==1.7.0",
"f90nml==1.4.2",
"gfw-creator==0.2.2",
"gitpython==3.1.41", # Maximum version for Python 3.6 support
"intake",
"intake-esm",
"jinja2==3.1.4",
"loguru==0.6.0",
"numpy>=1.19.5", # Maximum version for Python 3.6 support
Expand Down
1 change: 1 addition & 0 deletions src/esm_runscripts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from .batch_system import *
from .chunky_parts import *
from .catalog import *
from .database import *
from .database_actions import *
from .dataprocess import *
Expand Down
203 changes: 203 additions & 0 deletions src/esm_runscripts/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
import datetime
import getpass
import hashlib
import json
import pathlib

import dpath
import intake
import intake_esm # noqa: F401, import only needed to register intake-esm driver
import pandas as pd
import xarray as xr
from loguru import logger


def create_intake_esm_catalog(config):
"""
Create an intake-esm catalog based on the configuration.

This creates an intake-esm compatible catalog based on the simulation
configuration. The catalog is stored under ``config["intake"]``. Write
to disk as a separate step, this is not handled here!

Creation of the catalog can be controlled via the configuration key::

config["general"]["create_catalog"] = True

Default is ``False`` during the testing phase. To enable catalog creation, please
use your run configuration file.


Parameters
----------
config : dict
The simulation configuration

Notes
-----
The catalog attributes for each entry define what can be searched for and filtered. The exact
specifications of what should be included seems to be ambiguous and could be defined however
we want. Here, we use NextGEMS as a basis, and remove some of the fields which are not obvious
as a start. To see what is used in the DKRZ NextGems catalogue::

$ jq .attributes[].column_name /pool/data/Catalogs/dkrz_nextgems_disk.json

See Also
--------
* https://intake.readthedocs.io/en/latest/index.html
* https://github.com/NCAR/esm-collection-spec/blob/master/collection-spec/collection-spec.md
* https://tutorials.dkrz.de/tutorial_intake-5-create-esm-collection.html
"""
if not config.get("intake", {}).get("create_catalog", False):
return config
catalog = config.get("intake", {}).get("catalog", {})
catalog["esmcat_version"] = "0.1.0"
attributes = catalog["attributes"] = []
catalog_attrs = [
"variable_id",
"project",
"institution_id",
"source_id",
"experiment_id",
"realm",
"time_min",
"time_max",
]
for attr in catalog_attrs:
attributes.append(dict(column_name=attr, vocabulary=""))
assets = catalog["assets"] = dict(column_name="uri", format_column_name="format")
aggregation_control = catalog["aggregation_control"] = {
"variable_column_name": "variable_id",
"groupby_attrs": [
"project",
"institution_id",
"source_id",
"experiment_id",
"realm",
],
"aggregations": [
{"type": "union", "attribute_name": "variable_id", "options": {}},
{
"type": "join_existing",
"attribute_name": "time_min",
"options": {"dim": "time", "coords": "minimal", "compat": "override"},
},
],
}
catalog["id"] = hashlib.sha256(
f"{config['general']['expid']}_{datetime.datetime.now()}_{getpass.getuser()}".encode()
).hexdigest()
catalog["description"] = str(config["general"].get("description"))
catalog["title"] = f"Intake-ESM Catalog for Experiment {config['general']['expid']}"
catalog["last_updated"] = str(datetime.datetime.now())
catalog_dict = catalog["catalog_dict"] = []
# Each entry in catalog_dict should correspond to the schema provided
# in catalog_attrs plus assets
for model in config["general"]["valid_model_names"]:
logger.info(f"Cataloguing output of model {model}")
mconfig = config[model]
# FIXME(PG): This is not how we should determine which files are in the experiment outdata
# since this will list **all** files, not just the ones added during this run.
for output_file in pathlib.Path(mconfig["experiment_outdata_dir"]).iterdir():
# TODO(PG): @JanStreffing, how does OIFS output look like? GRIB, NetCDF?
# Known GRIB output models:
if mconfig["model"] in ["echam", "jsbach"]:
if "codes" in output_file.suffix or "idx" in output_file.suffix:
logger.debug(
"Skipping codes file or already processed grib outputfile"
)
continue
# TODO(PG): Add zarr support later on
xarray_engine = "netcdf4" if "nc" in output_file.suffix else "cfgrib"
# NOTE(PG): Determine which variables are contained in the file, I don't know
# but this could be better...
try:
var_list = list(
xr.open_dataset(output_file, engine=xarray_engine).variables.keys()
)
except Exception as e:
logger.warning(f"Could not determine variables in {output_file}: {e}")
logger.warning("This file is not added to the catalog!")
continue
this_asset = dict(
project=config["general"].get("project", "esm_tools"),
institution_id="AWI",
source_id=f"{config['general']['model']}-{config['general']['version']}",
experiment_id=str(config["general"]["expid"]),
realm=str(mconfig.get("type", "UNKNOWN")),
time_min=str(config["general"]["start_date"]),
time_max=str(config["general"]["end_date"]),
uri=f"file://{output_file}",
# _data_format_=xarray_engine, # NOTE(PG): I don't like needing this...
format=xarray_engine,
variable_id=var_list,
)
catalog_dict.append(this_asset)

catalog_dict = catalog["catalog_dict"]
catalog_df = pd.DataFrame(catalog_dict)
# Try to construct the esm_datastore object:
validated_cat = intake.open_esm_datastore(obj=dict(esmcat=catalog, df=catalog_df))
config["intake"] = config.get("intake", {})
config["intake"]["catalog"] = validated_cat
return config


def write_intake_esm_catalog(config):
"""
Save the intake catalog to disk.

This function saves the intake catalog specified in the configuration to a YAML file
on disk. If a previous catalog exists, it merges the new catalog with the existing one.

Saving of the catalog can be controlled via the configuration key::

config["intake"]["write_catalog"] = True

Default is ``False`` during the testing phase. Please enable this in your run configuration.

Parameters
----------
config : dict
Configuration dictionary containing the catalog and general settings.
Expected keys are "general" and "intake".

Returns
-------
dict
The updated configuration dictionary with the merged intake catalog.
"""
if not config.get("intake", {}).get("write_catalog", False):
return config

cat_file = pathlib.Path(
f'{config["general"]["experiment_dir"]}/{config["general"]["expid"]}_intake_catalog.json'
)
catalog = config["intake"]["catalog"]

if cat_file.exists():
with open(cat_file, "r") as f:
prev_cat = json.load(f)
else:
prev_cat = {}

catalog_name = cat_file.stem
catalog.serialize(catalog_name, directory=cat_file.parent)
# catalog.serialize("paul", directory=cat_file.parent)

backup_file = cat_file.with_suffix(".json_backup")
if cat_file.exists():
with open(cat_file, "r") as f:
with open(backup_file, "w") as backup:
backup.write(f.read())

# Merge the new catalog into the previous one
with open(cat_file, "r") as f:
new_cat = json.load(f)
dpath.merge(prev_cat, new_cat)

# Save the merged catalog back to disk
with open(cat_file, "w") as f:
json.dump(prev_cat, f, indent=4)
config["intake"]["catalog_json"] = prev_cat
return config
Loading