77"""
88# !/usr/bin/env python
99import pandas as pd
10+ import xarray as xr
11+ import numpy as np
12+ import dask
13+ from zarr .codecs import BloscCodec
1014from datetime import datetime
15+
1116import cdsapi , os , sys
1217from dateutil .relativedelta import *
13- import glob
1418import subprocess
1519from multiprocessing .dummy import Pool as ThreadPool
1620from datetime import datetime , timedelta
1721from TopoPyScale import topo_export as te
18- import xarray as xr
22+ from TopoPyScale import topo_utils as tu
23+ from pathlib import Path
1924import cfgrib
20- import os
2125import zipfile
2226import shutil
2327import glob
2428from cdo import *
2529
30+
31+ # [ ] remove this dependencie era5_downloader bringing little value
2632import era5_downloader as era5down
2733
34+ import warnings
35+ warnings .filterwarnings ("ignore" , category = UserWarning )
36+
2837
2938var_surf_name_google = {'geopotential_at_surface' :'z' ,
3039 '2m_dewpoint_temperature' :'d2m' ,
4554 'specific_humidity' :'q' }
4655
4756
57+ def append_nc_to_zarr_region (year , ncfile , zarrstore , surf = False ):
58+ ncfile = str (ncfile )
59+ print (f"---> Appending { ncfile } to { zarrstore } " )
60+ dp = xr .open_dataset (ncfile , chunks = 'auto' )
61+ if surf :
62+ dp = dp .rename ({'z' :'z_surf' })
63+ dp .to_zarr (zarrstore , mode = 'a' ,region = 'auto' , align_chunks = True )
64+ dp = None
65+
66+ def convert_netcdf_stack_to_zarr (path_to_netcdfs = 'inputs/climate/yearly' ,
67+ zarrout = 'inputs/climate/ERA5.zarr' ,
68+ chunks = {'latitude' :3 , 'longitude' :3 , 'time' :8760 , 'level' :13 },
69+ compressor = None ,
70+ plev_name = 'PLEV_*.nc' ,
71+ surf_name = 'SURF_*.nc' ,
72+ parallelize = False ,
73+ n_cores = 4 ):
74+ """
75+ Function to convert a stack of netcdf PLEV and SURF files to a zarr store. Optimizing chunking based on auto detection by dask.
76+
77+ Args:
78+ path_to_netcdfs (str): path to yearly netcdf files of PLEV and SURF
79+ zarrout (str): path and name of the output zarr store to be created
80+ chunks (dict): dictionnary indicating the size of the chunk for the output zarr store. Good practice to have time chunk equivalent to a yearly file size.
81+ compressor (obj): Compressor object to use for encoding. See Zarr compression. Default: BloscCodec(cname='lz4', clevel=5, shuffle='bitshuffle', blocksize=0)
82+ plev_name (str): file pattern of the netcdf containing the pressure levels. Use * instead of the year
83+ surf_name (str): file pattern of the netcdf containing the surface level. Use * instead of the year
84+
85+ """
86+ pn = Path (path_to_netcdfs )
87+ pz = Path (zarrout )
88+
89+ # lazy load stack of PLEV netcdf to grab dimensions and attributes
90+ ds = xr .open_mfdataset (str (pn / plev_name ), parallel = True , chunks = 'auto' )
91+ tvec = ds .time .values
92+ za1 = dask .array .empty ((len (tvec ), ds .level .shape [0 ], ds .latitude .shape [0 ], ds .longitude .shape [0 ] ), dtype = 'float32' )
93+ za2 = dask .array .empty ((len (tvec ), ds .latitude .shape [0 ], ds .longitude .shape [0 ] ), dtype = 'float32' )
94+
95+ dd = xr .Dataset (
96+ coords = {
97+ 'time' :tvec ,
98+ 'longitude' : ds .longitude .values ,
99+ 'latitude' : ds .latitude .values ,
100+ 'level' : ds .level .values ,
101+ },
102+ data_vars = {
103+ 'z' : (('time' , 'level' , 'latitude' , 'longitude' ), za1 ),
104+ 't' : (('time' , 'level' , 'latitude' , 'longitude' ), za1 ),
105+ 'u' : (('time' , 'level' , 'latitude' , 'longitude' ), za1 ),
106+ 'v' : (('time' , 'level' , 'latitude' , 'longitude' ), za1 ),
107+ 'q' : (('time' , 'level' , 'latitude' , 'longitude' ), za1 ),
108+ 'r' : (('time' , 'level' , 'latitude' , 'longitude' ), za1 ),
109+ 'z_surf' : (('time' , 'latitude' , 'longitude' ), za2 ),
110+ 'd2m' : (('time' , 'latitude' , 'longitude' ), za2 ),
111+ 'sp' : (('time' , 'latitude' , 'longitude' ), za2 ),
112+ 'strd' : (('time' , 'latitude' , 'longitude' ), za2 ),
113+ 'ssrd' : (('time' , 'latitude' , 'longitude' ), za2 ),
114+ 'tp' : (('time' , 'latitude' , 'longitude' ), za2 ),
115+ 't2m' : (('time' , 'latitude' , 'longitude' ), za2 ),
116+ },
117+ attrs = ds .attrs
118+ )
119+
120+ chunks_plev = chunks
121+ chunks_surf = {'time' :chunks .get ('time' ), 'latitude' :chunks .get ('latitude' ),'longitude' :chunks .get ('longitude' )}
122+
123+ dd = dd .chunk (chunks = chunks ).persist ()
124+ vars = list (ds .keys ())
125+ if compressor is None :
126+ compressor = BloscCodec (cname = 'lz4' , clevel = 5 , shuffle = 'bitshuffle' , blocksize = 0 )
127+ encoder = dict (zip (vars , [{'compressors' : compressor }]* len (vars )))
128+ dd .to_zarr (zarrout , mode = 'w' ,zarr_format = 3 , encoding = encoder )
129+ dd .close ()
130+ del dd
131+ ds .close ()
132+ del ds
133+
134+ if not parallelize :
135+ # loop through the years. This could be sent to multicore eventually
136+ for year in pd .to_datetime (tvec ).year .unique ():
137+ print (f"---> Appending PLEV year { year } to { pz .name } " )
138+ dp = xr .open_dataset (str (pn / (plev_name .replace ('*' ,f'{ year } ' ))),chunks = 'auto' )
139+ dp .to_zarr (str (pz ), mode = 'a' ,region = 'auto' , align_chunks = True )
140+ dp .close ()
141+ del dp
142+
143+ print (f"---> Appending SURF year { year } to { pz .name } " )
144+ du = xr .open_dataset (str (pn / (surf_name .replace ('*' ,f'{ year } ' ))), chunks = 'auto' )
145+ du = du .rename ({'z' :'z_surf' })
146+ du .to_zarr (str (pz ), mode = 'a' ,region = 'auto' , align_chunks = True )
147+ du .close ()
148+ del du
149+ else :
150+ flist = pn .glob (plev_name )
151+ fun_param = zip (list (np .unique (dd .time .dt .year .values ).astype (int )), list (flist ), [str (pz )* len (list (flist ))], [str (False )* len (list (flist ))])
152+ tu .multicore_pooling (append_nc_to_zarr_region , fun_param , n_cores )
153+
154+ flist = pn .glob (surf_name )
155+ fun_param = zip (list (np .unique (dd .time .dt .year .values ).astype (int )), list (flist ), [str (pz )* len (list (flist ))], [str (True )* len (list (flist ))])
156+ tu .multicore_pooling (append_nc_to_zarr_region , fun_param , n_cores )
157+ print ("Conversion done :)" )
158+
159+
160+ def to_zarr (ds , fout , chuncks = {'x' :3 , 'y' :3 , 'time' :8760 , 'level' :7 }, compressor = None , mode = 'w' ):
161+ """
162+ Function to convert a dataset to a zarr store.
163+
164+ Args:
165+ ds (dataset): dataset to convert to zarr
166+ fout (str): name of the zarr archive
167+ chuncks (dict): {'x':3, 'y':3, 'time':8760, 'level':7}
168+ compressor (obj): default is None, otherwise use an encoder compatible with zarr3. Blosc is default
169+ mode (str): mode to write to zarr store. See the xarray to_zarr() documentation
170+ """
171+ vars = list (ds .keys ())
172+
173+ if compressor is None :
174+ compressor = BloscCodec (cname = 'lz4' , clevel = 5 , shuffle = 'bitshuffle' , blocksize = 0 )
175+
176+ encoder = dict (zip (vars , [{'compressors' : compressor }]* len (vars )))
177+ ds .chunk (chuncks ).to_zarr (fout , zarr_format = 3 , encoding = encoder , mode = mode )
48178
49179
50180def fetch_era5_google (eraDir , startDate , endDate , lonW , latS , lonE , latN , plevels , step = '3H' ,num_threads = 1 ):
@@ -116,8 +246,7 @@ def fetch_era5_google(eraDir, startDate, endDate, lonW, latS, lonE, latN, plevel
116246
117247
118248
119- def fetch_era5_google_from_zarr (eraDir , startDate , endDate , lonW , latS , lonE , latN , plevels ,
120- bucket = 'gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3' ):
249+ def fetch_era5_google_from_zarr (eraDir , startDate , endDate , lonW , latS , lonE , latN , plevels , bucket = 'gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3' ):
121250 '''
122251 Function to download data from Zarr repository on Google Cloud Storage (https://github.com/google-research/arco-era5/tree/main)
123252
@@ -153,7 +282,7 @@ def fetch_era5_google_from_zarr(eraDir, startDate, endDate, lonW, latS, lonE, la
153282
154283def retrieve_era5 (product , startDate , endDate , eraDir , latN , latS , lonE , lonW , step ,
155284 num_threads = 10 , surf_plev = 'surf' , plevels = None , realtime = False ,
156- output_format = 'netcdf' , download_format = "unarchived" , new_CDS_API = True , rm_daily = False ):
285+ output_format = 'netcdf' , download_format = "unarchived" , new_CDS_API = True , rm_daily = False , store_as_zarr = 'ERA5.zarr' ):
157286 """ Sets up era5 surface retrieval.
158287 * Creates list of year/month pairs to iterate through.
159288 * MARS retrievals are most efficient when subset by time.
@@ -176,6 +305,7 @@ def retrieve_era5(product, startDate, endDate, eraDir, latN, latS, lonE, lonW, s
176305 download_format (str): default "unarchived". Can be "zip"
177306 new_CDS_API: flag to handle new formating of SURF files with the new CDS API (2024).
178307 rm_daily: remove folder containing all daily ERA5 file. Option to clear space of data converted to yearly files.
308+ store_as_zarr (str): name of the Zarr store. None if you want to use the old system with Netcdf
179309
180310 Returns:
181311 Monthly era surface files stored in disk.
@@ -316,12 +446,15 @@ def retrieve_era5(product, startDate, endDate, eraDir, latN, latS, lonE, lonW, s
316446
317447 if surf_plev == 'plev' :
318448 fpat = str (eraDir / "daily" / ("dPLEV_%04d*.nc" % (year )))
319- fout = str (eraDir / ("PLEV_%04d.nc" % (year )))
449+ fout = str (eraDir / "yearly" / ("PLEV_%04d.nc" % (year )))
320450 cdo .mergetime (input = fpat , output = fout )
321451
322452 if rm_daily :
323- shutil .rmtree (eraDir / "daily" )
324-
453+ try :
454+ shutil .rmtree (eraDir / "daily" )
455+ print ('---> Daily files removed' )
456+ except :
457+ raise IOError ('deletion of daily files issue' )
325458
326459 print ("===> ERA5 files ready" )
327460
0 commit comments