Python API
Tensogram provides native Python bindings via PyO3. All tensor data crosses the boundary as NumPy arrays.
Installation
# From PyPI (once published)
pip install tensogram
# From source
pip install maturin numpy
cd python/bindings && maturin develop
Quick Start
import numpy as np
import tensogram
# Encode a 2D temperature field
temps = np.random.randn(100, 200).astype(np.float32) + 273.15
meta = {}
desc = {"type": "ntensor", "shape": [100, 200], "dtype": "float32"}
msg = tensogram.encode(meta, [(desc, temps)])
# Decode it back
meta, objects = tensogram.decode(msg)
desc, array = objects[0]
print(array.shape) # (100, 200)
Encoding
Basic encoding
tensogram.encode() takes metadata, a list of (descriptor, array) pairs, and returns wire-format bytes:
msg = tensogram.encode(
{},
[({"type": "ntensor", "shape": [3], "dtype": "float32"}, np.array([1, 2, 3], dtype=np.float32))],
hash="xxh3", # default; use None to skip hashing
)
Descriptor keys
Every object in a message is described by a dict. The three required keys define what the tensor looks like; the optional keys control how it is stored on the wire.
| Key | Required | Default | Description |
|---|---|---|---|
"type" | yes | — | Object type, e.g. "ntensor" |
"shape" | yes | — | Tensor dimensions, e.g. [100, 200] |
"dtype" | yes | — | Data type name (see Data Types) |
"strides" | no | row-major | Element strides; computed automatically if omitted |
"byte_order" | no | native | "little" or "big"; defaults to host byte order |
"encoding" | no | "none" | Encoding stage — see below |
"filter" | no | "none" | Filter stage — see below |
"compression" | no | "none" | Compression stage — see below |
Any additional keys (e.g. "sp_reference_value", "sp_bits_per_value") are stored in the descriptor’s .params dict and passed through to the encoding pipeline.
The encoding pipeline
Each object passes through a three-stage pipeline before it is stored. You control each stage via descriptor keys:
raw bytes → encoding → filter → compression → wire payload
Encoding transforms the data representation:
| Value | What it does | Use case |
|---|---|---|
"none" | Pass-through (default) | Exact values, integer data |
"simple_packing" | Quantize floats to packed integers | Bounded-range scalar fields (GRIB-compatible) |
Filter rearranges bytes to improve compressibility:
| Value | What it does | Use case |
|---|---|---|
"none" | Pass-through (default) | Most cases |
"shuffle" | Byte-transpose by element width (requires "shuffle_element_size") | Improves lz4/zstd ratio on typed data |
Compression reduces the payload size:
| Value | Random access | Type | Use case |
|---|---|---|---|
"none" | yes | — | No compression |
"zstd" | no | lossless | General-purpose, best ratio/speed tradeoff |
"lz4" | no | lossless | Fastest decompression |
"szip" | yes (RSI blocks) | lossless | Integer/packed data (CCSDS 121.0-B-3) |
"blosc2" | yes (chunks) | lossless | Large tensors, multi-codec |
"zfp" | yes (fixed-rate) | lossy | Floating-point arrays |
"sz3" | no | lossy | Error-bounded scientific data |
Compression parameters are passed as extra descriptor keys. For example, zstd level:
desc = {
"type": "ntensor", "shape": [1000], "dtype": "float32",
"compression": "zstd", "zstd_level": 9,
}
For the full list of compressor parameters, see Compression.
Common pipeline combinations
# Lossless, fast decompression
desc = {"type": "ntensor", "shape": shape, "dtype": "float32",
"compression": "lz4"}
# Lossless, best ratio (shuffle_element_size must match dtype byte width)
desc = {"type": "ntensor", "shape": shape, "dtype": "float32",
"filter": "shuffle", "shuffle_element_size": 4, "compression": "zstd", "zstd_level": 12}
# Quantise a bounded-range float field to 16-bit packed ints, then compress
# (the same pipeline GRIB 2 uses for simple_packing + CCSDS).
# compute_packing_params expects a flat float64 array
values = data.astype(np.float64).ravel()
params = tensogram.compute_packing_params(values, bits_per_value=16, decimal_scale_factor=0)
desc = {"type": "ntensor", "shape": shape, "dtype": "float64",
"encoding": "simple_packing", "compression": "zstd", **params}
# Lossy float compression with error bound (zfp operates on float64)
desc = {"type": "ntensor", "shape": shape, "dtype": "float64",
"compression": "zfp", "zfp_mode": "fixed_accuracy", "zfp_tolerance": 0.01}
Invalid combinations: Some pipeline combinations are rejected at encode time — e.g.
zfp+shuffle(ZFP operates on typed floats, not byte-shuffled data) orsimple_packing+sz3(both are encoding stages). See Compression — Invalid Combinations.
Multiple objects per message
A single message can contain multiple tensors, each with its own descriptor:
spectrum = np.random.randn(256).astype(np.float64)
mask = np.array([1, 0, 1, 1, 0], dtype=np.uint8)
msg = tensogram.encode(
{},
[
({"type": "ntensor", "shape": [256], "dtype": "float64", "compression": "zstd"}, spectrum),
({"type": "ntensor", "shape": [5], "dtype": "uint8"}, mask),
],
)
Pre-encoded data
If you already have compressed/packed payloads (e.g. from another system), use tensogram.encode_pre_encoded() with the same interface. The library skips the encoding pipeline and writes the bytes as-is:
msg = tensogram.encode_pre_encoded(meta, [(desc, pre_compressed_bytes)])
See Pre-Encoded Data API for details.
Decoding
Full decode
meta, objects = tensogram.decode(msg)
Returns a Message namedtuple with .metadata and .objects. Tuple unpacking works directly.
By default, decoded arrays are in the caller’s native byte order — the library handles byte-swapping automatically. Pass native_byte_order=False to receive the raw wire byte order instead:
meta, objects = tensogram.decode(msg, native_byte_order=False)
Metadata
meta is a Metadata object:
meta.version # int — always 2
meta.base # list[dict] — per-object metadata (one entry per object)
meta.extra # dict — message-level annotations (_extra_ in CBOR)
meta.reserved # dict — library internals (_reserved_ in CBOR, read-only)
meta["key"] # dict-style access (checks base entries, then extra)
To read metadata without decoding any payloads:
meta = tensogram.decode_metadata(msg)
To read metadata and descriptors (no payload decode):
meta, descriptors = tensogram.decode_descriptors(msg)
for desc in descriptors:
print(desc.shape, desc.dtype, desc.compression)
Selective decode
Decode a single object without touching the others — O(1) seek via the binary header’s offset table:
meta, desc, array = tensogram.decode_object(msg, index=2)
Decode a sub-range of elements from one object (for compressors that support random access):
# Elements 100-149 and 300-324 from object 0
parts = tensogram.decode_range(msg, object_index=0, ranges=[(100, 50), (300, 25)])
# parts is a list of numpy arrays, one per range
# Or join into a single contiguous array
joined = tensogram.decode_range(msg, object_index=0, ranges=[(100, 50), (300, 25)], join=True)
# joined is a single flat numpy array of shape (75,)
decode_rangeworks with uncompressed data,simple_packing,szip,blosc2, andzfpfixed-rate mode. It returns an error for stream compressors (zstd,lz4,sz3) and for theshufflefilter. See Decoding Data for details.
Scanning and iteration
To find message boundaries in a buffer without decoding:
offsets = tensogram.scan(buf) # list of (offset, length) pairs
To iterate messages in a multi-message buffer:
for meta, objects in tensogram.iter_messages(buf):
print(meta.version, len(objects))
Hash verification
meta, objects = tensogram.decode(msg, verify_hash=True)
Raises RuntimeError if any object’s payload hash doesn’t match. If the message was encoded without a hash (hash=None), verification is silently skipped.
File API
Writing
with tensogram.TensogramFile.create("forecast.tgm") as f:
for step in range(24):
data = model.run(step)
desc = {"type": "ntensor", "shape": list(data.shape), "dtype": "float32",
"compression": "zstd"}
f.append({"base": [{"step": step}]}, [(desc, data)])
Each append encodes one message and writes it to the end of the file. Messages are independent and self-describing.
Reading
with tensogram.TensogramFile.open("forecast.tgm") as f:
print(len(f)) # message count
meta, objects = f[0] # index (supports negative indices)
subset = f[1:10:2] # slice → list[Message]
for meta, objects in f: # iterate all messages
for desc, array in objects:
print(desc.shape, array.dtype)
raw = f.read_message(0) # raw bytes for forwarding/caching
The first access triggers a streaming scan that records message offsets. After that, every read is an O(1) seek.
Streaming encoder
For building a message one object at a time in memory:
enc = tensogram.StreamingEncoder({}, hash="xxh3")
for desc, data in objects:
enc.write_object(desc, data)
msg = enc.finish() # returns complete message as bytes
For pre-encoded payloads, use enc.write_object_pre_encoded(desc, raw_bytes).
finish() vs finish_backfilled()
finish() produces a streaming-mode message: both the preamble and
postamble carry total_length = 0, signalling “unknown length at
write time”. Forward-only readers handle this transparently by
walking the message’s END_MAGIC trailer, but backward readers
cannot O(1)-jump from the postamble to the message start without the
mirrored length.
finish_backfilled() seeks back into the in-memory cursor and patches
both length slots with the real message length before returning. The
produced bytes satisfy the backward-locatability invariant from
wire-format §7 — any reader can read the last 16 bytes of the
postamble, take the mirrored total_length, and jump straight to
the message start. Required for fixtures or workloads that
exercise the bidirectional remote walker (see
remote-access.md → Bidirectional Scan).
enc = tensogram.StreamingEncoder({"base": [{"name": "obs"}]})
enc.write_object(desc, payload)
msg = enc.finish_backfilled() # mirrored total_length in both slots
Async API
AsyncTensogramFile provides the same operations as TensogramFile but as
asyncio coroutines. A single handle supports truly concurrent operations
with no per-handle mutex; internal caches are thread-safe.
Opening and decoding
import asyncio
import tensogram
async def main():
f = await tensogram.AsyncTensogramFile.open("forecast.tgm")
meta, objects = await f.decode_message(0)
result = await f.file_decode_object(0, 0)
print(result["data"].shape)
asyncio.run(main())
For remote files with credentials:
f = await tensogram.AsyncTensogramFile.open_remote(
"s3://bucket/data.tgm", {"region": "eu-west-1"}
)
Concurrent decoding with asyncio.gather
Multiple decode calls run concurrently on a single handle:
results = await asyncio.gather(
f.file_decode_object(0, 0),
f.file_decode_object(1, 0),
f.file_decode_object(2, 0),
)
Batch decoding from many messages at once
When you need the same data from many messages, for example reading how a value at one grid point changes over 300 time steps, individual requests are slow because each one is a separate HTTP round-trip.
file_decode_range_batch collects the requested element ranges across
messages and fetches the underlying data in a batched HTTP call.
file_decode_object_batch does the same for full frames:
indices = list(range(300))
row, col, grid = 100, 200, 528
offset = row * grid + col
values = await f.file_decode_range_batch(indices, 0, [(offset, 1)], join=True)
frames = await f.file_decode_object_batch(indices, 0)
For even more speed, split the work into chunks and run them concurrently:
chunks = [indices[i::16] for i in range(16)]
batch_results = await asyncio.gather(
*[f.file_decode_range_batch(chunk, 0, [(offset, 1)], join=True)
for chunk in chunks]
)
The sync TensogramFile also has file_decode_range_batch and
file_decode_object_batch with the same signatures. Both batch methods
require a remote backend; calling them on a local file raises OSError.
Layout prefetching
Before running many concurrent decodes on a remote file, prefetch the internal layout metadata to avoid repeated discovery requests:
count = await f.message_count()
await f.prefetch_layouts(list(range(count)))
Context manager and iteration
async with await tensogram.AsyncTensogramFile.open("data.tgm") as f:
await f.message_count() # required before async for or len(f)
async for meta, objects in f:
print(objects[0][1].shape)
Async iteration works on remote files (sync iteration does not).
await f.message_count() must be called once before using async for
or len(f), to discover the message count without blocking the event loop.
Other methods
count = await f.message_count()
raw = await f.read_message(0)
all_raw = await f.messages()
print(f.is_remote(), f.source())
Note:
len(f)requires a priorawait f.message_count()call. Without it,len(f)raisesRuntimeError.
When to use async vs sync
| Scenario | Recommendation |
|---|---|
| Script, CLI, or notebook | TensogramFile (sync) |
| Inside an asyncio event loop | AsyncTensogramFile |
| xarray or zarr | Sync (those frameworks are synchronous) |
| Many concurrent remote reads | asyncio.gather on one AsyncTensogramFile |
| Same data from many messages | file_decode_range_batch or file_decode_object_batch |
Validation
Two functions check whether messages and files are well-formed without consuming the data. See also the CLI reference.
report = tensogram.validate(msg)
file_report = tensogram.validate_file("data.tgm")
Levels
| Level | Checks | hash_verified |
|---|---|---|
"quick" | Structure only: magic bytes, frame layout, lengths | always False |
"default" | + metadata (CBOR) + integrity (hash verification, decompression) | True only if hash succeeds and no errors |
"checksum" | Hash verification only, structural warnings suppressed | True only if hash succeeds and no errors |
"full" | + fidelity (full decode, decoded-size check, NaN/Inf scan) | True only if hash succeeds and no errors |
# Full validation with canonical CBOR key-order checking
report = tensogram.validate(msg, level="full", check_canonical=True)
Return values
validate() returns:
{
"issues": [
{
"code": "hash_mismatch", # stable snake_case string
"level": "integrity", # which validation level found it
"severity": "error", # "error" or "warning"
"description": "...", # human-readable message
"object_index": 0, # optional — which object
"byte_offset": 1234, # optional — position in buffer
}
],
"object_count": 1,
"hash_verified": False,
}
validate_file() returns file-level issues plus per-message reports:
{
"file_issues": [
{"byte_offset": 100, "length": 19, "description": "trailing bytes after last message"}
],
"messages": [
{"issues": [], "object_count": 1, "hash_verified": True}
],
}
Interpreting results
report = tensogram.validate(msg)
if not report["issues"]:
print(f"OK — {report['object_count']} objects, hash verified")
else:
for issue in report["issues"]:
print(f"[{issue['severity']}] {issue['code']}: {issue['description']}")
GRIB / NetCDF conversion
Three PyO3-bound helpers wrap tensogram-grib
and tensogram-netcdf.
They are always callable — when the Python wheel was built without
the corresponding Cargo feature, each raises RuntimeError with a
pointer to rebuild instructions.
You can probe availability at runtime:
import tensogram
if tensogram.__has_grib__:
msgs = tensogram.convert_grib("forecast.grib2")
if tensogram.__has_netcdf__:
msgs = tensogram.convert_netcdf("data.nc")
convert_grib(path, **options) -> list[bytes]
Convert a GRIB file (as many messages as it contains) to Tensogram
wire format. Returns one bytes per output Tensogram message — join
or write sequentially to produce a .tgm file.
msgs = tensogram.convert_grib(
"forecast.grib2",
grouping="merge_all", # "merge_all" | "one_to_one"
preserve_all_keys=False, # lift every ecCodes namespace into base[i]["grib"]
encoding="simple_packing", # "none" | "simple_packing"
bits=16, # None -> defaults to 16; ignored for encoding="none"
filter="none", # "none" | "shuffle"
compression="szip", # "none" | "zstd" | "lz4" | "blosc2" | "szip"
compression_level=None, # applies to zstd / blosc2 (None = codec default)
threads=0, # 0 = sequential; honours TENSOGRAM_THREADS env var
hash="xxh3", # "xxh3" | None
# NaN / Inf handling — see docs/src/guide/nan-inf-handling.md
allow_nan=False, # False (default) rejects any NaN input
allow_inf=False, # False (default) rejects any ±Inf input
)
with open("forecast.tgm", "wb") as fh:
for msg in msgs:
fh.write(msg)
Pipeline defaults and edge cases:
bits=Nonewithencoding="simple_packing"defaults to 16 bits.bitsoutside1..=64silently falls back toencoding="none"and emits a warning to stderr. Validate your inputs before calling if fail-fast is important.- Unknown
compression/encodingnames raiseValueErrorwith the list of valid choices in the message. - Unknown
grouping/split_by/hashvalues raiseValueError. - Missing input paths raise
FileNotFoundError. - Building the wheel without the
grib/netcdffeature causes the corresponding function to raiseRuntimeErrorat call time with rebuild instructions.
Requires libeccodes at the OS level and the wheel built with
--features grib (maturin develop --features grib). Official PyPI
wheels do not currently include the grib feature — see
Jupyter Notebook Walk-through.
convert_grib_buffer(buf, **options) -> list[bytes]
In-memory variant of convert_grib. Accepts any Python bytes-like
object (bytes, bytearray, memoryview, numpy.uint8[:]).
Useful when the GRIB bytes come from a byte-range HTTP fetch, a
cache, or any other in-memory source — no filesystem staging needed.
import requests
# Byte-range download of a single GRIB message from data.ecmwf.int.
resp = requests.get(
"https://data.ecmwf.int/forecasts/.../...grib2",
headers={"Range": "bytes=74573515-75234113"},
)
msgs = tensogram.convert_grib_buffer(
resp.content,
encoding="simple_packing",
bits=16,
compression="szip",
# See [NaN / Inf Handling](nan-inf-handling.md) for the
# `allow_nan` / `allow_inf` opt-in if your data contains
# non-finite values.
)
convert_grib and convert_grib_buffer produce bit-identical
decoded payloads for the same input. The encoded bytes may
differ — each call stamps a fresh timestamp and UUID into
_reserved_.
convert_netcdf(path, **options) -> list[bytes]
Convert a NetCDF-3 or NetCDF-4 file to Tensogram. Packed variables
(scale_factor / add_offset) are automatically unpacked to
float64.
msgs = tensogram.convert_netcdf(
"data.nc",
split_by="file", # "file" | "variable" | "record"
cf=False, # lift 16 CF attributes into base[i]["cf"]
encoding="none",
bits=None,
filter="none",
compression="zstd",
compression_level=3,
threads=0,
hash="xxh3",
# NaN / Inf handling — see docs/src/guide/nan-inf-handling.md
allow_nan=False, # False (default) rejects any NaN input
allow_inf=False, # False (default) rejects any ±Inf input
)
Note on NaN and
--encoding simple_packing. Since 0.17 the importer hard-fails on NaN or Inf in a variable targeted forsimple_packing(previous behaviour: stderr warning + fallback toencoding="none"). If your NetCDF has_FillValue/missing_valuefields unpacked to NaN, either stick with the defaultencoding="none"or pre-process the values. See the NetCDF Importer error-handling reference for the full contract.
Requires libnetcdf + libhdf5 at the OS level and the wheel built
with --features netcdf.
Error Handling
| Exception | When |
|---|---|
FileNotFoundError | convert_grib(path) / convert_netcdf(path) called with a non-existent path (subclass of OSError). |
OSError | Other file I/O failures (permission denied, disk error, etc.). |
ValueError | Invalid parameters; unknown dtype; NaN in simple packing; unknown validation level; invalid grouping / split_by / hash; unknown codec / bit width in the conversion pipeline; empty/non-GRIB input buffer; split_by="record" on a NetCDF without an unlimited dimension. |
RuntimeError | Hash mismatch during decode(..., verify_hash=True); calling convert_grib / convert_grib_buffer / convert_netcdf on a wheel built without the feature; internal ecCodes / libnetcdf C-library failures that cannot be classified as caller-input errors. |
KeyError | Missing metadata key via meta["key"]. |
Supported dtypes
| Category | Types |
|---|---|
| Floating point | float16, bfloat16, float32, float64 |
| Complex | complex64, complex128 |
| Signed integer | int8, int16, int32, int64 |
| Unsigned integer | uint8, uint16, uint32, uint64 |
| Special | bitmask |
bfloat16 is returned as ml_dtypes.bfloat16 when ml_dtypes is installed; otherwise it falls back to np.uint16.
See Data Types for byte widths and wire-format details.
Examples
See examples/python/ for complete working examples:
| Example | Topic |
|---|---|
01_encode_decode.py | Basic round-trip |
02_mars_metadata.py | Per-object metadata (ECMWF MARS vocabulary example) |
02b_generic_metadata.py | Per-object metadata using a generic application namespace |
03_simple_packing.py | Simple-packing encoding |
04_multi_object.py | Multi-object messages, selective decode |
05_file_api.py | Multi-message .tgm files |
06_hash_and_errors.py | Hash verification and error handling |
07_iterators.py | File iteration, indexing, slicing |
08_xarray_integration.py | Opening .tgm as xarray Datasets |
08_zarr_backend.py | Reading/writing through Zarr v3 |
09_dask_distributed.py | Dask distributed computing over 4-D tensors |
09_streaming_consumer.py | Streaming consumer pattern |
11_encode_pre_encoded.py | Pre-encoded data API |
12_convert_netcdf.py | NetCDF → Tensogram import via the Python API |
13_validate.py | Message and file validation |
15_async_operations.py | Async open, decode, and asyncio.gather |
17_convert_grib.py | GRIB → Tensogram import (file + in-memory buffer) |
For narrative walk-throughs with plots and explanations, see also
examples/jupyter/*.ipynb — five journey notebooks covering
quickstart/MARS, encoding pipeline fidelity, GRIB conversion, NetCDF
conversion with xarray, and validation with multi-threaded encoding.