Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

JetStream Backend

The jetstream backend is the production-oriented storage implementation. It connects to a NATS server with JetStream enabled and uses it for durable message storage, replay, and live streaming.


Intended Use

Use jetstream when you need:

  • durable storage that survives server restarts
  • replay across multiple server instances
  • live streaming with cluster-wide fan-out
  • configurable retention, size limits, and compression

Local Test Setup

Start a NATS + JetStream instance via Docker:

./scripts/init_nats.sh

Then configure Aviso:

notification_backend:
  kind: jetstream
  jetstream:
    nats_url: "nats://localhost:4222"

For full setup options including authentication and storage limits, see Installation — Local JetStream.


Core Behavior

  • Connects to the configured NATS server on startup (with retry).
  • Creates JetStream streams on demand, one per topic base (e.g. MARS, DISS, POLYGON).
  • Publishes notifications directly to JetStream subjects using the encoded wire format.
  • Uses pull consumers for replay batching (from_id, from_date).
  • Uses push consumers for live watch subscriptions.
  • Reconciles existing streams against current config when they are first accessed.

Configuration Reference

All fields live under notification_backend.jetstream.

Connection & startup

FieldDefaultNotes
nats_urlnats://localhost:4222NATS server URL.
tokenNoneToken auth; falls back to NATS_TOKEN environment variable.
timeout_seconds30Per-attempt connection timeout (> 0).
retry_attempts3Startup connection attempts before backend init fails (> 0).

Runtime reconnect

FieldDefaultNotes
enable_auto_reconnecttrueEnables/disables NATS client reconnect after startup.
max_reconnect_attempts50 means unlimited reconnect retries.
reconnect_delay_ms2000Delay between reconnect attempts and startup connect retries (> 0).

Publish resilience

FieldDefaultNotes
publish_retry_attempts5Retries for transient channel closed publish failures (> 0).
publish_retry_base_delay_ms150Base backoff in ms for publish retries; grows exponentially per attempt (> 0).

Stream defaults

These apply to every stream created by Aviso unless overridden by a per-schema storage_policy.

FieldDefaultNotes
max_messagesNoneStream message cap (maps to max_messages).
max_bytesNoneStream size cap in bytes (maps to max_bytes).
retention_timeNoneDefault max age: duration literal (s, m, h, d, w; e.g. 30d).
storage_typefilefile or memory — parsed as typed enum at config load.
replicasNoneStream replica count.
retention_policylimitslimits, interest, or workqueue — parsed as typed enum.
discard_policyoldold or new — parsed as typed enum.

Fail-fast validation: storage_type, retention_policy, and discard_policy are parsed as typed enums during configuration loading. Invalid values fail startup immediately, before any streams are created.

Full example

notification_backend:
  kind: jetstream
  jetstream:
    nats_url: "nats://localhost:4222"
    timeout_seconds: 30
    retry_attempts: 3
    enable_auto_reconnect: true
    max_reconnect_attempts: 5
    reconnect_delay_ms: 2000
    publish_retry_attempts: 5
    publish_retry_base_delay_ms: 150
    storage_type: file
    retention_policy: limits
    discard_policy: old

Stream Management

Stream creation

On first access (e.g. first publish for a given event type), Aviso creates a JetStream stream with the following settings applied:

  • storage_type, retention_policy, discard_policy
  • max_messages, max_bytes, retention_timemax_age
  • replicas

The stream subject binding is set to <base>.> (e.g. mars.>) to capture all topics under that base.

Reconciliation of existing streams

When a stream already exists and is accessed by Aviso, it is reconciled — the current stream config is compared against the desired config and mutable fields are updated if drift is detected:

  • limits (retention, size, message count)
  • compression
  • duplicate window
  • replicas
  • subject binding

If JetStream rejects an update (e.g. the field is not editable in the current server/stream state), Aviso logs a warning and continues with the existing stream configuration.

Precedence

Backend-level defaults are applied first, then per-schema storage_policy overrides for that stream:

notification_backend.jetstream.* (base defaults)
    ↓ overridden by
notification_schema.<event_type>.storage_policy.*

Applying config changes to existing streams

Changes to stream-affecting settings (e.g. compression, retention, limits) in config.yaml are applied to existing streams automatically during reconciliation when the stream is next accessed.

To force historical data to be physically rewritten with new settings (e.g. re-pack with compression):

  1. Stop all Aviso writers for the target stream.
  2. Delete the stream in NATS.
  3. Restart Aviso (or publish again) — the stream is recreated with current config.
# List streams
nats stream ls

# Delete a stream (example: DISS)
nats stream rm DISS

wipe_stream (admin endpoint) removes messages but preserves stream configuration. Use stream deletion only when you need historical data physically rewritten.


Verifying Effective Stream Policy

Use the nats CLI to inspect the stream config after a publish or reconcile:

# Replace POLYGON with your stream name (MARS, DISS, etc.)
nats --server nats://localhost:4222 stream info POLYGON

Fields to check:

CLI fieldConfig field
Max Ageretention_time
Max Messagesmax_messages
Max Bytesmax_bytes / per-schema max_size
Max Messages Per Subjectallow_duplicates: 1 = disabled, -1 = enabled
CompressionNone or S2

Replay Behavior

  • Sequence replay (from_id): starts from that sequence number, inclusive.
  • Time replay (from_date): uses JetStream start-time delivery policy.
  • The API enforces mutual exclusivity — from_id and from_date cannot both be present.

Smoke Test (JetStream Mode)

python3 -m pip install httpx

BACKEND=jetstream \
NATS_URL=nats://localhost:4222 \
JETSTREAM_POLICY_STREAM_NAME=POLYGON \
EXPECT_MAX_MESSAGES=500000 \
EXPECT_MAX_BYTES=2147483648 \
EXPECT_MAX_MESSAGES_PER_SUBJECT=1 \
EXPECT_COMPRESSION=None \
python3 scripts/smoke_test.py

Operational Caveats

  • Startup connectivity is controlled by timeout_seconds + retry_attempts.
  • Runtime reconnect is controlled by enable_auto_reconnect, max_reconnect_attempts, reconnect_delay_ms.
  • Publish retry is a narrow resilience path for transient channel closed failures; non-transient failures fail fast.
  • retry_attempts applies only to startup; post-startup reconnect uses the reconnect settings.
  • Setting max_reconnect_attempts = 0 enables unlimited reconnect retries.