Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Python API

Tensogram provides native Python bindings via PyO3. All tensor data crosses the boundary as NumPy arrays.

Installation

# From PyPI (once published)
pip install tensogram

# From source
pip install maturin numpy
cd python/bindings && maturin develop

Quick Start

import numpy as np
import tensogram

# Encode a 2D temperature field
temps = np.random.randn(100, 200).astype(np.float32) + 273.15
meta = {}
desc = {"type": "ntensor", "shape": [100, 200], "dtype": "float32"}

msg = tensogram.encode(meta, [(desc, temps)])

# Decode it back
meta, objects = tensogram.decode(msg)
desc, array = objects[0]
print(array.shape)  # (100, 200)

Encoding

Basic encoding

tensogram.encode() takes metadata, a list of (descriptor, array) pairs, and returns wire-format bytes:

msg = tensogram.encode(
    {},
    [({"type": "ntensor", "shape": [3], "dtype": "float32"}, np.array([1, 2, 3], dtype=np.float32))],
    hash="xxh3",  # default; use None to skip hashing
)

Descriptor keys

Every object in a message is described by a dict. The three required keys define what the tensor looks like; the optional keys control how it is stored on the wire.

KeyRequiredDefaultDescription
"type"yesObject type, e.g. "ntensor"
"shape"yesTensor dimensions, e.g. [100, 200]
"dtype"yesData type name (see Data Types)
"strides"norow-majorElement strides; computed automatically if omitted
"byte_order"nonative"little" or "big"; defaults to host byte order
"encoding"no"none"Encoding stage — see below
"filter"no"none"Filter stage — see below
"compression"no"none"Compression stage — see below

Any additional keys (e.g. "sp_reference_value", "sp_bits_per_value") are stored in the descriptor’s .params dict and passed through to the encoding pipeline.

The encoding pipeline

Each object passes through a three-stage pipeline before it is stored. You control each stage via descriptor keys:

raw bytes → encoding → filter → compression → wire payload

Encoding transforms the data representation:

ValueWhat it doesUse case
"none"Pass-through (default)Exact values, integer data
"simple_packing"Quantize floats to packed integersBounded-range scalar fields (GRIB-compatible)

Filter rearranges bytes to improve compressibility:

ValueWhat it doesUse case
"none"Pass-through (default)Most cases
"shuffle"Byte-transpose by element width (requires "shuffle_element_size")Improves lz4/zstd ratio on typed data

Compression reduces the payload size:

ValueRandom accessTypeUse case
"none"yesNo compression
"zstd"nolosslessGeneral-purpose, best ratio/speed tradeoff
"lz4"nolosslessFastest decompression
"szip"yes (RSI blocks)losslessInteger/packed data (CCSDS 121.0-B-3)
"blosc2"yes (chunks)losslessLarge tensors, multi-codec
"zfp"yes (fixed-rate)lossyFloating-point arrays
"sz3"nolossyError-bounded scientific data

Compression parameters are passed as extra descriptor keys. For example, zstd level:

desc = {
    "type": "ntensor", "shape": [1000], "dtype": "float32",
    "compression": "zstd", "zstd_level": 9,
}

For the full list of compressor parameters, see Compression.

Common pipeline combinations

# Lossless, fast decompression
desc = {"type": "ntensor", "shape": shape, "dtype": "float32",
        "compression": "lz4"}

# Lossless, best ratio (shuffle_element_size must match dtype byte width)
desc = {"type": "ntensor", "shape": shape, "dtype": "float32",
        "filter": "shuffle", "shuffle_element_size": 4, "compression": "zstd", "zstd_level": 12}

# Quantise a bounded-range float field to 16-bit packed ints, then compress
# (the same pipeline GRIB 2 uses for simple_packing + CCSDS).
# compute_packing_params expects a flat float64 array
values = data.astype(np.float64).ravel()
params = tensogram.compute_packing_params(values, bits_per_value=16, decimal_scale_factor=0)
desc = {"type": "ntensor", "shape": shape, "dtype": "float64",
        "encoding": "simple_packing", "compression": "zstd", **params}

# Lossy float compression with error bound (zfp operates on float64)
desc = {"type": "ntensor", "shape": shape, "dtype": "float64",
        "compression": "zfp", "zfp_mode": "fixed_accuracy", "zfp_tolerance": 0.01}

Invalid combinations: Some pipeline combinations are rejected at encode time — e.g. zfp + shuffle (ZFP operates on typed floats, not byte-shuffled data) or simple_packing + sz3 (both are encoding stages). See Compression — Invalid Combinations.

Multiple objects per message

A single message can contain multiple tensors, each with its own descriptor:

spectrum = np.random.randn(256).astype(np.float64)
mask = np.array([1, 0, 1, 1, 0], dtype=np.uint8)

msg = tensogram.encode(
    {},
    [
        ({"type": "ntensor", "shape": [256], "dtype": "float64", "compression": "zstd"}, spectrum),
        ({"type": "ntensor", "shape": [5], "dtype": "uint8"}, mask),
    ],
)

Pre-encoded data

If you already have compressed/packed payloads (e.g. from another system), use tensogram.encode_pre_encoded() with the same interface. The library skips the encoding pipeline and writes the bytes as-is:

msg = tensogram.encode_pre_encoded(meta, [(desc, pre_compressed_bytes)])

See Pre-Encoded Data API for details.

Decoding

Full decode

meta, objects = tensogram.decode(msg)

Returns a Message namedtuple with .metadata and .objects. Tuple unpacking works directly.

By default, decoded arrays are in the caller’s native byte order — the library handles byte-swapping automatically. Pass native_byte_order=False to receive the raw wire byte order instead:

meta, objects = tensogram.decode(msg, native_byte_order=False)

Metadata

meta is a Metadata object:

meta.version     # int — always 2
meta.base        # list[dict] — per-object metadata (one entry per object)
meta.extra       # dict — message-level annotations (_extra_ in CBOR)
meta.reserved    # dict — library internals (_reserved_ in CBOR, read-only)
meta["key"]      # dict-style access (checks base entries, then extra)

To read metadata without decoding any payloads:

meta = tensogram.decode_metadata(msg)

To read metadata and descriptors (no payload decode):

meta, descriptors = tensogram.decode_descriptors(msg)
for desc in descriptors:
    print(desc.shape, desc.dtype, desc.compression)

Selective decode

Decode a single object without touching the others — O(1) seek via the binary header’s offset table:

meta, desc, array = tensogram.decode_object(msg, index=2)

Decode a sub-range of elements from one object (for compressors that support random access):

# Elements 100-149 and 300-324 from object 0
parts = tensogram.decode_range(msg, object_index=0, ranges=[(100, 50), (300, 25)])
# parts is a list of numpy arrays, one per range

# Or join into a single contiguous array
joined = tensogram.decode_range(msg, object_index=0, ranges=[(100, 50), (300, 25)], join=True)
# joined is a single flat numpy array of shape (75,)

decode_range works with uncompressed data, simple_packing, szip, blosc2, and zfp fixed-rate mode. It returns an error for stream compressors (zstd, lz4, sz3) and for the shuffle filter. See Decoding Data for details.

Scanning and iteration

To find message boundaries in a buffer without decoding:

offsets = tensogram.scan(buf)  # list of (offset, length) pairs

To iterate messages in a multi-message buffer:

for meta, objects in tensogram.iter_messages(buf):
    print(meta.version, len(objects))

Hash verification

meta, objects = tensogram.decode(msg, verify_hash=True)

Raises RuntimeError if any object’s payload hash doesn’t match. If the message was encoded without a hash (hash=None), verification is silently skipped.

File API

Writing

with tensogram.TensogramFile.create("forecast.tgm") as f:
    for step in range(24):
        data = model.run(step)
        desc = {"type": "ntensor", "shape": list(data.shape), "dtype": "float32",
                "compression": "zstd"}
        f.append({"base": [{"step": step}]}, [(desc, data)])

Each append encodes one message and writes it to the end of the file. Messages are independent and self-describing.

Reading

with tensogram.TensogramFile.open("forecast.tgm") as f:
    print(len(f))                    # message count

    meta, objects = f[0]             # index (supports negative indices)
    subset = f[1:10:2]              # slice → list[Message]

    for meta, objects in f:          # iterate all messages
        for desc, array in objects:
            print(desc.shape, array.dtype)

    raw = f.read_message(0)          # raw bytes for forwarding/caching

The first access triggers a streaming scan that records message offsets. After that, every read is an O(1) seek.

Streaming encoder

For building a message one object at a time in memory:

enc = tensogram.StreamingEncoder({}, hash="xxh3")
for desc, data in objects:
    enc.write_object(desc, data)
msg = enc.finish()  # returns complete message as bytes

For pre-encoded payloads, use enc.write_object_pre_encoded(desc, raw_bytes).

finish() vs finish_backfilled()

finish() produces a streaming-mode message: both the preamble and postamble carry total_length = 0, signalling “unknown length at write time”. Forward-only readers handle this transparently by walking the message’s END_MAGIC trailer, but backward readers cannot O(1)-jump from the postamble to the message start without the mirrored length.

finish_backfilled() seeks back into the in-memory cursor and patches both length slots with the real message length before returning. The produced bytes satisfy the backward-locatability invariant from wire-format §7 — any reader can read the last 16 bytes of the postamble, take the mirrored total_length, and jump straight to the message start. Required for fixtures or workloads that exercise the bidirectional remote walker (see remote-access.mdBidirectional Scan).

enc = tensogram.StreamingEncoder({"base": [{"name": "obs"}]})
enc.write_object(desc, payload)
msg = enc.finish_backfilled()  # mirrored total_length in both slots

Async API

AsyncTensogramFile provides the same operations as TensogramFile but as asyncio coroutines. A single handle supports truly concurrent operations with no per-handle mutex; internal caches are thread-safe.

Opening and decoding

import asyncio
import tensogram

async def main():
    f = await tensogram.AsyncTensogramFile.open("forecast.tgm")

    meta, objects = await f.decode_message(0)
    result = await f.file_decode_object(0, 0)
    print(result["data"].shape)

asyncio.run(main())

For remote files with credentials:

    f = await tensogram.AsyncTensogramFile.open_remote(
        "s3://bucket/data.tgm", {"region": "eu-west-1"}
    )

Concurrent decoding with asyncio.gather

Multiple decode calls run concurrently on a single handle:

    results = await asyncio.gather(
        f.file_decode_object(0, 0),
        f.file_decode_object(1, 0),
        f.file_decode_object(2, 0),
    )

Batch decoding from many messages at once

When you need the same data from many messages, for example reading how a value at one grid point changes over 300 time steps, individual requests are slow because each one is a separate HTTP round-trip.

file_decode_range_batch collects the requested element ranges across messages and fetches the underlying data in a batched HTTP call. file_decode_object_batch does the same for full frames:

    indices = list(range(300))
    row, col, grid = 100, 200, 528
    offset = row * grid + col

    values = await f.file_decode_range_batch(indices, 0, [(offset, 1)], join=True)

    frames = await f.file_decode_object_batch(indices, 0)

For even more speed, split the work into chunks and run them concurrently:

    chunks = [indices[i::16] for i in range(16)]
    batch_results = await asyncio.gather(
        *[f.file_decode_range_batch(chunk, 0, [(offset, 1)], join=True)
          for chunk in chunks]
    )

The sync TensogramFile also has file_decode_range_batch and file_decode_object_batch with the same signatures. Both batch methods require a remote backend; calling them on a local file raises OSError.

Layout prefetching

Before running many concurrent decodes on a remote file, prefetch the internal layout metadata to avoid repeated discovery requests:

    count = await f.message_count()
    await f.prefetch_layouts(list(range(count)))

Context manager and iteration

    async with await tensogram.AsyncTensogramFile.open("data.tgm") as f:
        await f.message_count()   # required before async for or len(f)
        async for meta, objects in f:
            print(objects[0][1].shape)

Async iteration works on remote files (sync iteration does not). await f.message_count() must be called once before using async for or len(f), to discover the message count without blocking the event loop.

Other methods

    count = await f.message_count()
    raw = await f.read_message(0)
    all_raw = await f.messages()
    print(f.is_remote(), f.source())

Note: len(f) requires a prior await f.message_count() call. Without it, len(f) raises RuntimeError.

When to use async vs sync

ScenarioRecommendation
Script, CLI, or notebookTensogramFile (sync)
Inside an asyncio event loopAsyncTensogramFile
xarray or zarrSync (those frameworks are synchronous)
Many concurrent remote readsasyncio.gather on one AsyncTensogramFile
Same data from many messagesfile_decode_range_batch or file_decode_object_batch

Validation

Two functions check whether messages and files are well-formed without consuming the data. See also the CLI reference.

report = tensogram.validate(msg)
file_report = tensogram.validate_file("data.tgm")

Levels

LevelCheckshash_verified
"quick"Structure only: magic bytes, frame layout, lengthsalways False
"default"+ metadata (CBOR) + integrity (hash verification, decompression)True only if hash succeeds and no errors
"checksum"Hash verification only, structural warnings suppressedTrue only if hash succeeds and no errors
"full"+ fidelity (full decode, decoded-size check, NaN/Inf scan)True only if hash succeeds and no errors
# Full validation with canonical CBOR key-order checking
report = tensogram.validate(msg, level="full", check_canonical=True)

Return values

validate() returns:

{
    "issues": [
        {
            "code": "hash_mismatch",   # stable snake_case string
            "level": "integrity",      # which validation level found it
            "severity": "error",       # "error" or "warning"
            "description": "...",      # human-readable message
            "object_index": 0,         # optional — which object
            "byte_offset": 1234,       # optional — position in buffer
        }
    ],
    "object_count": 1,
    "hash_verified": False,
}

validate_file() returns file-level issues plus per-message reports:

{
    "file_issues": [
        {"byte_offset": 100, "length": 19, "description": "trailing bytes after last message"}
    ],
    "messages": [
        {"issues": [], "object_count": 1, "hash_verified": True}
    ],
}

Interpreting results

report = tensogram.validate(msg)
if not report["issues"]:
    print(f"OK — {report['object_count']} objects, hash verified")
else:
    for issue in report["issues"]:
        print(f"[{issue['severity']}] {issue['code']}: {issue['description']}")

GRIB / NetCDF conversion

Three PyO3-bound helpers wrap tensogram-grib and tensogram-netcdf. They are always callable — when the Python wheel was built without the corresponding Cargo feature, each raises RuntimeError with a pointer to rebuild instructions.

You can probe availability at runtime:

import tensogram

if tensogram.__has_grib__:
    msgs = tensogram.convert_grib("forecast.grib2")

if tensogram.__has_netcdf__:
    msgs = tensogram.convert_netcdf("data.nc")

convert_grib(path, **options) -> list[bytes]

Convert a GRIB file (as many messages as it contains) to Tensogram wire format. Returns one bytes per output Tensogram message — join or write sequentially to produce a .tgm file.

msgs = tensogram.convert_grib(
    "forecast.grib2",
    grouping="merge_all",      # "merge_all" | "one_to_one"
    preserve_all_keys=False,   # lift every ecCodes namespace into base[i]["grib"]
    encoding="simple_packing", # "none" | "simple_packing"
    bits=16,                   # None -> defaults to 16; ignored for encoding="none"
    filter="none",             # "none" | "shuffle"
    compression="szip",        # "none" | "zstd" | "lz4" | "blosc2" | "szip"
    compression_level=None,    # applies to zstd / blosc2 (None = codec default)
    threads=0,                 # 0 = sequential; honours TENSOGRAM_THREADS env var
    hash="xxh3",               # "xxh3" | None
    # NaN / Inf handling — see docs/src/guide/nan-inf-handling.md
    allow_nan=False,           # False (default) rejects any NaN input
    allow_inf=False,           # False (default) rejects any ±Inf input
)
with open("forecast.tgm", "wb") as fh:
    for msg in msgs:
        fh.write(msg)

Pipeline defaults and edge cases:

  • bits=None with encoding="simple_packing" defaults to 16 bits.
  • bits outside 1..=64 silently falls back to encoding="none" and emits a warning to stderr. Validate your inputs before calling if fail-fast is important.
  • Unknown compression / encoding names raise ValueError with the list of valid choices in the message.
  • Unknown grouping / split_by / hash values raise ValueError.
  • Missing input paths raise FileNotFoundError.
  • Building the wheel without the grib / netcdf feature causes the corresponding function to raise RuntimeError at call time with rebuild instructions.

Requires libeccodes at the OS level and the wheel built with --features grib (maturin develop --features grib). Official PyPI wheels do not currently include the grib feature — see Jupyter Notebook Walk-through.

convert_grib_buffer(buf, **options) -> list[bytes]

In-memory variant of convert_grib. Accepts any Python bytes-like object (bytes, bytearray, memoryview, numpy.uint8[:]). Useful when the GRIB bytes come from a byte-range HTTP fetch, a cache, or any other in-memory source — no filesystem staging needed.

import requests

# Byte-range download of a single GRIB message from data.ecmwf.int.
resp = requests.get(
    "https://data.ecmwf.int/forecasts/.../...grib2",
    headers={"Range": "bytes=74573515-75234113"},
)
msgs = tensogram.convert_grib_buffer(
    resp.content,
    encoding="simple_packing",
    bits=16,
    compression="szip",
    # See [NaN / Inf Handling](nan-inf-handling.md) for the
    # `allow_nan` / `allow_inf` opt-in if your data contains
    # non-finite values.
)

convert_grib and convert_grib_buffer produce bit-identical decoded payloads for the same input. The encoded bytes may differ — each call stamps a fresh timestamp and UUID into _reserved_.

convert_netcdf(path, **options) -> list[bytes]

Convert a NetCDF-3 or NetCDF-4 file to Tensogram. Packed variables (scale_factor / add_offset) are automatically unpacked to float64.

msgs = tensogram.convert_netcdf(
    "data.nc",
    split_by="file",           # "file" | "variable" | "record"
    cf=False,                  # lift 16 CF attributes into base[i]["cf"]
    encoding="none",
    bits=None,
    filter="none",
    compression="zstd",
    compression_level=3,
    threads=0,
    hash="xxh3",
    # NaN / Inf handling — see docs/src/guide/nan-inf-handling.md
    allow_nan=False,           # False (default) rejects any NaN input
    allow_inf=False,           # False (default) rejects any ±Inf input
)

Note on NaN and --encoding simple_packing. Since 0.17 the importer hard-fails on NaN or Inf in a variable targeted for simple_packing (previous behaviour: stderr warning + fallback to encoding="none"). If your NetCDF has _FillValue / missing_value fields unpacked to NaN, either stick with the default encoding="none" or pre-process the values. See the NetCDF Importer error-handling reference for the full contract.

Requires libnetcdf + libhdf5 at the OS level and the wheel built with --features netcdf.

Error Handling

ExceptionWhen
FileNotFoundErrorconvert_grib(path) / convert_netcdf(path) called with a non-existent path (subclass of OSError).
OSErrorOther file I/O failures (permission denied, disk error, etc.).
ValueErrorInvalid parameters; unknown dtype; NaN in simple packing; unknown validation level; invalid grouping / split_by / hash; unknown codec / bit width in the conversion pipeline; empty/non-GRIB input buffer; split_by="record" on a NetCDF without an unlimited dimension.
RuntimeErrorHash mismatch during decode(..., verify_hash=True); calling convert_grib / convert_grib_buffer / convert_netcdf on a wheel built without the feature; internal ecCodes / libnetcdf C-library failures that cannot be classified as caller-input errors.
KeyErrorMissing metadata key via meta["key"].

Supported dtypes

CategoryTypes
Floating pointfloat16, bfloat16, float32, float64
Complexcomplex64, complex128
Signed integerint8, int16, int32, int64
Unsigned integeruint8, uint16, uint32, uint64
Specialbitmask

bfloat16 is returned as ml_dtypes.bfloat16 when ml_dtypes is installed; otherwise it falls back to np.uint16.

See Data Types for byte widths and wire-format details.

Examples

See examples/python/ for complete working examples:

ExampleTopic
01_encode_decode.pyBasic round-trip
02_mars_metadata.pyPer-object metadata (ECMWF MARS vocabulary example)
02b_generic_metadata.pyPer-object metadata using a generic application namespace
03_simple_packing.pySimple-packing encoding
04_multi_object.pyMulti-object messages, selective decode
05_file_api.pyMulti-message .tgm files
06_hash_and_errors.pyHash verification and error handling
07_iterators.pyFile iteration, indexing, slicing
08_xarray_integration.pyOpening .tgm as xarray Datasets
08_zarr_backend.pyReading/writing through Zarr v3
09_dask_distributed.pyDask distributed computing over 4-D tensors
09_streaming_consumer.pyStreaming consumer pattern
11_encode_pre_encoded.pyPre-encoded data API
12_convert_netcdf.pyNetCDF → Tensogram import via the Python API
13_validate.pyMessage and file validation
15_async_operations.pyAsync open, decode, and asyncio.gather
17_convert_grib.pyGRIB → Tensogram import (file + in-memory buffer)

For narrative walk-throughs with plots and explanations, see also examples/jupyter/*.ipynb — five journey notebooks covering quickstart/MARS, encoding pipeline fidelity, GRIB conversion, NetCDF conversion with xarray, and validation with multi-threaded encoding.