Examples#
This example demonstrates how to use common functionality of the PyFDB.
The section contains examples for all methods of the FDB object.
In general PyFDB is used to refer to the Python API of the FDB, whereas
FDB is used to refer to the underlying C++ class or the created Python instance.
MARS Selections#
One main concept of interacting with the FDB is a MARS selection. A
selection is a dictionary-like object describing the ranges or sets
coordinates, which point to multiple elements or sub-datacubes within a
datacube, see Datacube Spec.
The recommended way of interacting with the FDB is to use a dictionary,
describing a MARS selection. The following example shows how to create a MARS
selection with it:
mars_selection = {
"key-1": ["value-2", "value-4", "value-5"], # String values
"key-2": ["0.1", 0.2, "0.5"], # Mixed types
"key-3": [0.1, 0.2, 0.5], # Float values
"key-4": [1, 2, 0.5], # Integer and float values
"key-5": 1, # Single int value
"key-6": [1 + 0.5 * x for x in range(2)], # List of float value generated by a list expression
}
Note
The type of a MarsSelection is Mapping[str, str | int | float | Collection[str | int | float]] which can be given to
the FDB object of the PyFDB module.
Some of the methods accepts wildcard selection, e.g. listing. For those it’s possible to hand the wildcard selection directly:
MARS Identifier#
There is also the concept of a MARS identifier. Those are strict subsets of the MARS selections and differ by only allowing singular values.
mars_identifier = {
"key-1": "value-1", # String values
"key-2": 2, # String values
# ...
}
For further information about the individual MARS data types, see the Datacube Spec.
PyFDB Initalisation#
fdb = pyfdb.FDB()
If no configuration is supplied FDB falls back to derive the configuration
location from a predefined set of locations. You can supply a custom location
by specifying the FDB_HOME environment variable. If you want to set the
location of the configuration file only, use the FDB5_CONFIG_FILE
environment variable. There is a plethora of different configuration options,
if in doubt, refer to the official FDB documentation at ECMWF DOCS.
You can also pass (dynamically) created custom configurations as parameters to
the FDB constructor. Those can be supplied as a Path pointing to the
location of the configuration file, as a str which is the yaml
representation of the configuration or as a Dict[str, Any] as shown below.
config = {
"type":"local",
"engine":"toc",
"schema":"/path/to/fdb_schema",
"spaces":[
{
"handler":"Default",
"roots":[
{"path": "/path/to/root"}
]
}
],
}
fdb = pyfdb.FDB(config=config, user_config={})
For convenience, the FDB instance implements the Python context manager interface:
with pyfdb.FDB() as fdb:
# Use fdb in here
pass
On exiting the context, Flush is called to guarantee any potential Archive operation has been synced. This can lead to non-intended sync behaviors, see Archive for further information.
The different methods of the FDB class can be leverage for different
use-cases. Below we listed examples of the most common method class display
different ways of using the Python API.
Archive#
Archive binary data into the underlying FDB.
with pyfdb.FDB(fdb_config_path) as fdb:
filename = data_path / "x138-300.grib"
fdb.archive(filename.read_bytes())
# On exit of this scope fdb is flushed
In this scenario a GRIB file is archived to the configured FDB. The FDB
reads metadata from the given GRIB file and saves this, if no optional
identifier is supplied. If we set an identifier, there are no
consistency checks taking place and our data is saved with the metadata given
from the supplied identifier. This enables us to store arbitrary binary data
under the given key, as shown below:
identifier = {
"class": "rd",
"expver": "zzzz",
"stream": "oper",
"date": "20191110",
"time": "0000",
"domain": "g",
"type": "an",
"levtype": "pl",
"step": "0",
"levelist": "300",
"param": "138",
}
with pyfdb.FDB(fdb_config_path) as fdb:
fdb.archive(b"test-binary-data", identifier=identifier)
# On exit of this scope fdb is flushed
The flush command guarantees that the archived data has been flushed to the
FDB. In combination with using the context manager of the FDB object,
the syncing may behave differently from what the user expects. Take a look at
the following code, utilizing the archive method:
>> fdb = pyfdb.FDB() >> for step in range(240): >> with fdb: >> fdb.archive(…) >> fdb.archvie(…)
This would call the exit function of the FDB after each iteration of
the step loop, therefore causing a flush of the archived data. Compare the before-mentioned call
with the following:
>> fdb = pyfdb.FDB() >> for step in range(240): >> fdb.archive(…) >> fdb.archvie(…) >> fdb.flush()
Both of these examples achieve the same result in the normal, successful execution case. However, in the case with a manual call to flush() if an exception is thrown interrupting the execution when some archive() calls have succeed and some have not then none of this data will become visible to the user. When using the context manager, flush() will be implicitly called when leaving the scope including when an exception is thrown, making partial output visible to the user. Which of these outcomes is most desirable depends on the workflow.
Flush#
Flush all buffers and close all data handles of the underlying FDB into a consistent DB state.
Tip
It’s always safe to call flush.
fdb = pyfdb.FDB(fdb_config_path)
filename = data_path / "x138-300.grib"
fdb.archive(open(filename, "rb").read())
fdb.flush()
The flush command guarantees that the archived data
has been flushed to the FDB. It’s always safe to call flush. You can
either call the method explicitly or by using the context manager capabilities
of the FDB. See Archive.
Retrieving#
Retrieve data which is specified by a MARS selection.
To Memory#
fdb = pyfdb.FDB(fdb_config_path)
selection = {
"type": "an",
"class": "ea",
"domain": "g",
"expver": "0001",
"stream": "oper",
"date": "20200101",
"levtype": "sfc",
"step": "0",
"param": ["167", "165", "166"],
"time": "1800",
}
with fdb.retrieve(selection) as data_handle:
data_handle.read(4) # == b"GRIB"
# data_handle.readall() # As an alternative to read all messages
The code above shows how to retrieve a MARS selection given as a dictionary.
The retrieved data_handle has to be opened before being read and closed
afterwards. If you are interested in reading the entire data_handle, you
could use the readall method.
Tip
For the readall method there is no need to open or close the
data_handle after the call to readall.
To File#
Another use-case, which is often needed, is saving certain GRIB data in a
file on your local machine. The following code is showing how to achieve this:
import shutil
fdb = pyfdb.FDB(fdb_config_path)
# Specify selection here
# --------------------
selection = {
"type": "an",
"class": "ea",
"domain": "g",
"expver": "0001",
"stream": "oper",
"date": "20200101",
"levtype": "sfc",
"step": "0",
"param": ["167", "165", "166"],
"time": "1800",
}
filename = test_case_tmp / 'output.grib'
with open(filename, 'wb') as out:
with fdb.retrieve(selection) as data_handle:
out.write(data_handle.readall())
The example above, as a first step, reads all data in memory and writes the
data to the specified file afterwards. In case the data of the selection is
to large to fit in memory, we can leverage the shutil functions read the content
buffered and write the individual chunks in a single file onto disk:
with tempfile.TemporaryFile() as out:
with fdb.retrieve(selection) as data_handle:
assert data_handle
shutil.copyfileobj(data_handle, out)
Depending on the implementation this shutil uses the read method or the
readinto method. The first is copying a specific buffer, one at a time till
the entire data handle is depleted. The latter, leverages a memoryview to make it a zero-copy
function.
List#
List data present at the underlying fdb archive and which can be retrieved.
fdb = pyfdb.FDB(fdb_config_path)
selection = {
"type": "an",
"class": "ea"
}
list_iterator = fdb.list(selection, level=1)
elements = list(list_iterator)
for el in elements:
print(el)
assert len(elements) == 32
The code above shows an example of listing the contents of the FDB for a given selection.
The selection is describing a MarsSelection (A MARS request without the verb).
Note
A MarsSelection doesn’t need to be fully specified. In the example above
you can see that many of the MARS keys aren’t specified. In case of list, the given keys
are treated as a selector, meaning that all data which matches those keys is returned. For
every key, which isn’t explicitly stated, all found data is returned.
We recommend to use lists of values at all given times, as seen in MARS Selections.
level=1 refers to the schema level of the FDB. A given Rule in a
FDB schema could look like:
[ class, expver, stream, date, time, domain?
^^^^^^^^^^^^^^^ Level 1 ^^^^^^^^^^^^^^^^^^
[ type, levtype
^^ Level 2 ^^
[ step, levelist?, param ]]
^^^^^^^ Level 3 ^^^^^^^
]
Depending on the given level different outputs are to be expected:
fdb = pyfdb.FDB(fdb_config_path)
selection = {
"type": "an",
"class": "ea",
"domain": "g",
"expver": "0001",
"stream": "oper",
"date": "20200101",
"levtype": "sfc",
"step": "0",
"time": "1800",
}
list_iterator = fdb.list(selection) # level == 3
elements = list(list_iterator)
print(elements[0])
{class=ea,expver=0001,stream=oper,date=20200101,time=1800,domain=g}
{type=an,levtype=sfc}
{step=0,param=131},
TocFieldLocation[uri=URI[scheme=file,name=/<path-to-db_store>/ea:0001:oper:20200101:1800:g/an:sfc.20251118.151917.<?>.375861178007828.data],offset=10732,length=10732,remapKey={}],
length=10732,
timestamp=1763479157
{class=ea,expver=0001,stream=oper,date=20200101,time=1800,domain=g}
{type=an,levtype=sfc}
{step=0,param=132},
TocFieldLocation[uri=URI[scheme=file,name=/<path-to-db_store>/db_store/ea:0001:oper:20200101:1800:g/an:sfc.20251118.151917.<?>.375861178007828.data],offset=21464,length=10732,remapKey={}],
length=10732,
timestamp=1763479157
{class=ea,expver=0001,stream=oper,date=20200101,time=1800,domain=g}
{type=an,levtype=sfc}
{step=0,param=167},
TocFieldLocation[uri=URI[scheme=file,name=/<path-to-db_store>/db_store/ea:0001:oper:20200101:1800:g/an:sfc.20251118.151917.<?>.375861178007828.data],offset=0,length=10732,remapKey={}],
length=10732,
timestamp=1763479157
fdb = pyfdb.FDB(fdb_config_path)
selection = {
"type": "an",
"class": "ea",
"domain": "g",
"expver": "0001",
"stream": "oper",
"date": "20200101",
"levtype": "sfc",
"step": "0",
"time": "1800",
}
list_iterator = fdb.list(selection, level=2)
elements = list(list_iterator)
print(elements[0])
{class=ea,expver=0001,stream=oper,date=20200101,time=1800,domain=g}
{type=an,levtype=sfc},
length=0,
timestamp=0
fdb = pyfdb.FDB(fdb_config_path)
selection = {
"type": "an",
"class": "ea",
"domain": "g",
"expver": "0001",
"stream": "oper",
"date": "20200101",
"levtype": "sfc",
"step": "0",
"time": "1800",
}
list_iterator = fdb.list(selection, level=1)
elements = list(list_iterator)
print(elements[0])
{class=ea,expver=0001,stream=oper,date=20200101,time=1800,domain=g},
length=0,
timestamp=0
For each level the returned iterator of ListElement is restricting the elements to the corresponding
level of the underlying FDB. level=1 returns elements, which key only contains MARS keys of level 1,
level=2 returns elements, which key contains MARS keys of level 2 and level=3 returns elements
which key contain all MARS keys and the corresponding DataHandle pointing to the location of the
file on disk.
You can use this directly to read the message represented by the ListElement, e.g.:
fdb = pyfdb.FDB(fdb_config_path)
list_iterator = fdb.list(selection, level=3)
selection = {
"type": "an",
"class": "ea",
"domain": "g",
"expver": "0001",
"stream": "oper",
"date": "20200101",
"levtype": "sfc",
"step": "0",
"param": ["167", "131", "132"],
"time": "1800",
}
for el in list_iterator:
data_handle = el.data_handle
data_handle.open()
assert data_handle.read(4) == b"GRIB"
data_handle.close()
If you want to access the keys of any given ListElement, you can use either
the combined_key or the keys method of the ListElement class. keys
returns a list with 3 elements. Each entry is a dictionary, resembling the
corresponding level of the FDB schema, e.g., the first entry contains all
MARS keys of the first level, etc. In case the list call specifies level=2 or
level=1, the entries of the dictionaries linked to the lower levels are
empty.
fdb = pyfdb.FDB(fdb_config_path)
selection = {
"type": "an",
"class": "ea",
"domain": "g",
"expver": "0001",
"stream": "oper",
"date": "20200101",
"levtype": "sfc",
"step": "0",
"param": ["167", "131", "132"],
"time": "1800",
}
list_iterator = fdb.list(selection, level=2)
for el in list_iterator:
keys = el.keys()
print(keys)
For a single element of this list_iterator its keys would have the following structure:
...
[{'class': 'ea', 'date': '20200101', 'domain': 'g', 'expver': '0001', 'stream': 'oper', 'time': '1800'}, {'levtype': 'sfc', 'type': 'an'}, {}]
...
If we called fdb.list with level=1, the result would have the following structure:
...
[{'class': 'ea', 'date': '20200101', 'domain': 'g', 'expver': '0001', 'stream': 'oper', 'time': '1800'}, {}, {}]
...
combined_key is a convenience method for combining all dictionaries of the
keys function into a single dictionary.
Inspect#
Inspects the content of the underlying FDB and returns a generator of list elements describing which field was part of the MARS selection.
fdb = pyfdb.FDB(fdb_config_path)
identifier = {
"type": "an",
"class": "ea",
"domain": "g",
"expver": "0001",
"stream": "oper",
"date": "20200101",
"levtype": "sfc",
"step": "0",
"param": "131",
"time": "1800",
}
inspect_iterator = fdb.inspect(identifier)
elements = list(inspect_iterator)
# Because the identifier needs to be fully specified, there
# should be only a single element returned
assert len(elements) == 1
for el in elements:
with el.data_handle as data_handle:
assert data_handle.read(4) == b"GRIB"
The code above shows how to inspect certain elements stored in the FDB. This call is similar to
a list call with level=3, although the internals are quite different. The functionality is
designed to list a vast amount of individual fields.
Similar to the list command, each ListElement returned, contains a DataHandle which can
be used to directly access the data associated with the element, see the example of list.
Note
Due to the internals of the FDB only a fully specified MARS selection
with singular values (also called Identifier) is accepted. If a list is given
for a key, e.g. param=131/132, the second value is silently dropped.
Status#
List the status of all FDB entries with their control identifiers, e.g., whether a certain database was locked for retrieval.
fdb = pyfdb.FDB(fdb_config_path)
selection = {
"type": "an",
"class": "ea",
"domain": "g",
}
status_iterator = fdb.status(selection)
elements = list(status_iterator)
len(elements) # == 32
The output of such a command can look like the above and is the same output you get from the
call to control <control_label> when setting certain ControlIdentifiers for elements of the FDB.
ControlElement(
control_identifiers=[WIPE],
key={'class': ['ea'], 'date': ['20200104'], 'domain': ['g'], 'expver': ['0001'], 'stream': ['oper'], 'time': ['2100']},
location=/<some-path>/db_store/ea:0001:oper:20200104:2100:g
)
You can see that the ControlIdentifier for WIPE is active for the given entry of the FDB.
Tip
Use the control functionality of FDB to switch certain properties of FDB elements.
Refer to the Control section for further information.
Wipe#
Wipe data from the database
Delete FDB databases and the data therein contained. Use the passed selection to identify the database to delete. This is equivalent to a UNIX rm command. This function deletes either whole databases, or whole indexes within databases
Tip
You should check the elements of a deletion before running it with the doit flag.
Double check that the dry-run, which is active per default, really returns the elements you are
expecting.
A potential deletion operation could look like this:
fdb = pyfdb.FDB(fdb_config_path)
elements = list(fdb.wipe({"class": "ea"}))
len(elements) > 0
# NOTE: Double check that the returned elements are those you want to delete
for element in elements:
print(element)
# Do the actual deletion with the `doit=True` flag
wipe_iterator = fdb.wipe({"class": "ea"}, doit=True)
wiped_elements = list(wipe_iterator)
for element in wiped_elements:
print(element)
Purge#
Remove duplicate data from the database.
Purge duplicate entries from the database and remove the associated data if the data is owned and not adopted.
Data in the FDB is immutable. It is masked, but not removed, when overwritten with new data using the same key.
Masked data can no longer be accessed. Indexes and data files that only contains masked data may be removed.
If an index refers to data that is not owned by the FDB (in particular data which has been adopted from an
existing FDB), this data will not be removed.
Tip
It’s always advised to check the elements of a deletion before running it with the doit flag.
Double check that the dry-run, which is active per default, really returns the elements you are
expecting.
fdb = pyfdb.FDB(fdb_config_path)
elements = list(fdb.purge({"class": "ea"}))
len(elements) > 0
# NOTE: Double check that the returned elements are those you want to delete
for element in elements:
print(element)
# Do the actual deletion with the `doit=True` flag
purge_iterator = fdb.purge({"class": "ea"}, doit=True)
purge_elements = list(purge_iterator)
for element in purge_elements:
print(element)
Stats#
Print information about FDB databases, aggregating the information over all the databases visited into a final summary.
fdb = pyfdb.FDB(fdb_config_path)
selection = {
"type": "an",
"class": "ea",
"domain": "g",
"expver": "0001",
"stream": "oper",
"date": "20200101",
"levtype": "sfc",
"step": "0",
"param": ["167", "165", "166"],
"time": "1800",
}
elements = list(fdb.stats(selection))
for el in elements:
print(el)
The example above shows how to use the stats function to get an overview over the statistics a given MARS selection
has. For every database and every index the selection touches, it aggregates statistics and shows the result in a table.
The StatsElement s returned from the call are Python string resembling individual lines of the output generated by
the underlying FDB. A potential call of the example above could lead to the following output:
Index Statistics:
Fields : 3
Size of fields : 32,196 (31.4414 Kbytes)
Reacheable fields : 3
Reachable size : 32,196 (31.4414 Kbytes)
DB Statistics:
Databases : 1
TOC records : 2
Size of TOC files : 2,048 (2 Kbytes)
Size of schemas files : 228 (228 bytes)
TOC records : 2
Owned data files : 1
Size of owned data files : 32,196 (31.4414 Kbytes)
Index files : 1
Size of index files : 131,072 (128 Kbytes)
Size of TOC files : 2,048 (2 Kbytes)
Total owned size : 165,544 (161.664 Kbytes)
Total size : 165,544 (161.664 Kbytes)
Control#
Enable certain features of FDB databases, e.g., disables or enables retrieving, list, etc.
The example given below shows how the activation/deactivation of the wipe functionality of the FDB
works for a certain selection.
Tip
Consume the iterator, returned by the control call, completely. Otherwise, the lock file
won’t be created.
fdb = pyfdb.FDB(fdb_config_path)
selection = {
"class": "ea",
"domain": "g",
"expver": "0001",
"stream": "oper",
"date": "20200101",
"time": "1800",
}
print("Lock the database for wiping")
control_iterator = fdb.control(selection, pyfdb.ControlAction.DISABLE, [pyfdb.ControlIdentifier.WIPE])
elements = list(control_iterator)
assert len(elements) == 1
assert (fdb_config_path.parent / "db_store" / "ea:0001:oper:20200101:1800:g" / "wipe.lock").exists()
print("Try Wipe")
wipe_iterator = fdb.wipe(selection, doit=True)
elements = []
with pytest.raises(RuntimeError):
for el in wipe_iterator:
elements.append(el)
assert len(elements) == 0
print("Unlock the database for wiping")
control_iterator = fdb.control(selection, pyfdb.ControlAction.ENABLE, [pyfdb.ControlIdentifier.WIPE])
elements = list(control_iterator)
assert len(elements) > 0
assert not (fdb_config_path.parent / "db_store" / "ea:0001:oper:20200101:1800:g" / "wipe.lock").exists()
print("Wipe")
fdb.wipe(selection, doit=True)
fdb.flush()
print("Success")
After specifying the selection we want to target, this has to be a selection which contains keys of
the first and second level of the schema, we can call the control function and specify the wished action:
in this case ControlIdentifier.WIPE and ControlAction.DISABLE, which translate to wanting to disable
wiping for the specified database. We could specify multiple of the ControlIdentifier in a single call.
For each of the ControlIdentifier the underlying FDB will create a <control-identifier-name>.lock file,
which resides inside the database specified by the MARS selection. If we decide to enable the action again, this
file gets deleted.
After disabling the action, a call to it results in an empty iterator being returned.
Axes#
Return the ‘axes’ and their extent of a selection for a given level of the schema in an IndexAxis object.
If a key is not specified the entire extent (all values) are returned.
fdb = pyfdb.FDB(fdb_config_path)
selection = {
"type": "an",
"class": "ea",
"domain": "g",
"expver": "0001",
"stream": "oper",
# "date": "20200101", # Left out to show all values are returned
"levtype": "sfc",
"step": "0",
"time": "1800",
}
print("---------- Level 3: ----------")
index_axis = fdb.axes(selection)
# len(index_axis.items()) == 11
for k, v in index_axis.items():
print(f"k={k} | v={v}")
print("---------- Level 2: ----------")
index_axis = fdb.axes(selection, level=2)
#len(index_axis.items()) == 8
for k, v in index_axis.items():
print(f"k={k} | v={v}")
print("---------- Level 1: ----------")
index_axis = fdb.axes(selection, level=1)
# len(index_axis.items()) == 6
for k, v in index_axis.items():
print(f"k={k} | v={v}")
The example above produces the following output:
---------- Level 3: ----------
k=class | v=['ea']
k=date | v=['20200101', '20200102', '20200103', '20200104']
k=domain | v=['g']
k=expver | v=['0001']
k=levelist | v=['']
k=levtype | v=['sfc']
k=param | v=['131', '132', '167']
k=step | v=['0']
k=stream | v=['oper']
k=time | v=['1800']
k=type | v=['an']
---------- Level 2: ----------
k=class | v=['ea']
k=date | v=['20200101', '20200102', '20200103', '20200104']
k=domain | v=['g']
k=expver | v=['0001']
k=levtype | v=['sfc']
k=stream | v=['oper']
k=time | v=['1800']
k=type | v=['an']
---------- Level 1: ----------
k=class | v=['ea']
k=date | v=['20200101', '20200102', '20200103', '20200104']
k=domain | v=['g']
k=expver | v=['0001']
k=stream | v=['oper']
k=time | v=['1800']
For each specified level, the keys affected by the MARS selection at that level are returned.
Optional keys in the FDB schema appear as empty lists. If a key is missing from the selection,
the key and all values stored in the FDB are returned (see the date key above).
In case you want to see the ‘span’ of all elements stored in an FDB you could use the following code:
Warning
This following code is an expensive call (depending on the size of the FDB).
For testing purposes or locally configured FDB instances this is fine.
fdb = pyfdb.FDB(fdb_config_path)
index_axis: pyfdb.IndexAxis = fdb.axes({})
Enabled#
Check whether a specific control identifier is enabled.
from pyfdb import ControlIdentifier
fdb = pyfdb.FDB(fdb_config_path)
assert fdb.enabled(ControlIdentifier.NONE) is True
assert fdb.enabled(ControlIdentifier.LIST) is True
assert fdb.enabled(ControlIdentifier.RETRIEVE) is True
assert fdb.enabled(ControlIdentifier.ARCHIVE) is True
assert fdb.enabled(ControlIdentifier.WIPE) is True
assert fdb.enabled(ControlIdentifier.UNIQUEROOT) is True
The examples above show how a default FDB is configured, this is, all possible ControlAction s
are enabled by default.
Configuring the FDB to disallow writing via setting writable = False in the fdb_config.yaml,
we end up with the following ControlIdentifier s:
import yaml
from pyfdb import ControlIdentifier
fdb_config = yaml.safe_load(fdb_config_path.read_text())
fdb_config["writable"] = False
fdb = pyfdb.FDB(fdb_config)
assert fdb.enabled(ControlIdentifier.NONE) is True
assert fdb.enabled(ControlIdentifier.LIST) is True
assert fdb.enabled(ControlIdentifier.RETRIEVE) is True
assert fdb.enabled(ControlIdentifier.ARCHIVE) is False
assert fdb.enabled(ControlIdentifier.WIPE) is False
assert fdb.enabled(ControlIdentifier.UNIQUEROOT) is True
The configuration changes accordingly, if we substitute writable = False with visitable = False.
Dirty#
Return whether a flush of the FDB is needed, for example if data was archived since the last flush.
fdb = pyfdb.FDB(fdb_config_path)
filename = data_path / "x138-300.grib"
fdb.archive(open(filename, "rb").read())
fdb.dirty() # == True
fdb.flush()
fdb.dirty() # == False
The example above shows return value of the dirty command after an archive command results in True.
Flushing resets the internal status of the FDB and the call to dirty returns False afterwards.