JetStream Backend
The jetstream backend is the production-oriented storage implementation.
It connects to a NATS server with JetStream enabled and uses it
for durable message storage, replay, and live streaming.
Intended Use
Use jetstream when you need:
- durable storage that survives server restarts
- replay across multiple server instances
- live streaming with cluster-wide fan-out
- configurable retention, size limits, and compression
Local Test Setup
Start a NATS + JetStream instance via Docker:
./scripts/init_nats.sh
Then configure Aviso:
notification_backend:
kind: jetstream
jetstream:
nats_url: "nats://localhost:4222"
For full setup options including authentication and storage limits, see Installation — Local JetStream.
Core Behavior
- Connects to the configured NATS server on startup (with retry).
- Creates JetStream streams on demand, one per topic
base(e.g.MARS,DISS,POLYGON). - Publishes notifications directly to JetStream subjects using the encoded wire format.
- Uses pull consumers for replay batching (
from_id,from_date). - Uses push consumers for live watch subscriptions.
- Reconciles existing streams against current config when they are first accessed.
Configuration Reference
All fields live under notification_backend.jetstream.
Connection & startup
| Field | Default | Notes |
|---|---|---|
nats_url | nats://localhost:4222 | NATS server URL. |
token | None | Token auth; falls back to NATS_TOKEN environment variable. |
timeout_seconds | 30 | Per-attempt connection timeout (> 0). |
retry_attempts | 3 | Startup connection attempts before backend init fails (> 0). |
Runtime reconnect
| Field | Default | Notes |
|---|---|---|
enable_auto_reconnect | true | Enables/disables NATS client reconnect after startup. |
max_reconnect_attempts | 5 | 0 means unlimited reconnect retries. |
reconnect_delay_ms | 2000 | Delay between reconnect attempts and startup connect retries (> 0). |
Publish resilience
| Field | Default | Notes |
|---|---|---|
publish_retry_attempts | 5 | Retries for transient channel closed publish failures (> 0). |
publish_retry_base_delay_ms | 150 | Base backoff in ms for publish retries; grows exponentially per attempt (> 0). |
Stream defaults
These apply to every stream created by Aviso unless overridden by a per-schema storage_policy.
| Field | Default | Notes |
|---|---|---|
max_messages | None | Stream message cap (maps to max_messages). |
max_bytes | None | Stream size cap in bytes (maps to max_bytes). |
retention_time | None | Default max age: duration literal (s, m, h, d, w; e.g. 30d). |
storage_type | file | file or memory — parsed as typed enum at config load. |
replicas | None | Stream replica count. |
retention_policy | limits | limits, interest, or workqueue — parsed as typed enum. |
discard_policy | old | old or new — parsed as typed enum. |
Fail-fast validation:
storage_type,retention_policy, anddiscard_policyare parsed as typed enums during configuration loading. Invalid values fail startup immediately, before any streams are created.
Full example
notification_backend:
kind: jetstream
jetstream:
nats_url: "nats://localhost:4222"
timeout_seconds: 30
retry_attempts: 3
enable_auto_reconnect: true
max_reconnect_attempts: 5
reconnect_delay_ms: 2000
publish_retry_attempts: 5
publish_retry_base_delay_ms: 150
storage_type: file
retention_policy: limits
discard_policy: old
Stream Management
Stream creation
On first access (e.g. first publish for a given event type), Aviso creates a JetStream stream with the following settings applied:
storage_type,retention_policy,discard_policymax_messages,max_bytes,retention_time→max_agereplicas
The stream subject binding is set to <base>.> (e.g. mars.>) to capture all topics
under that base.
Reconciliation of existing streams
When a stream already exists and is accessed by Aviso, it is reconciled — the current stream config is compared against the desired config and mutable fields are updated if drift is detected:
- limits (retention, size, message count)
- compression
- duplicate window
- replicas
- subject binding
If JetStream rejects an update (e.g. the field is not editable in the current server/stream state), Aviso logs a warning and continues with the existing stream configuration.
Precedence
Backend-level defaults are applied first, then per-schema storage_policy overrides for that stream:
notification_backend.jetstream.* (base defaults)
↓ overridden by
notification_schema.<event_type>.storage_policy.*
Applying config changes to existing streams
Changes to stream-affecting settings (e.g. compression, retention, limits) in config.yaml
are applied to existing streams automatically during reconciliation when the stream is next accessed.
To force historical data to be physically rewritten with new settings (e.g. re-pack with compression):
- Stop all Aviso writers for the target stream.
- Delete the stream in NATS.
- Restart Aviso (or publish again) — the stream is recreated with current config.
# List streams
nats stream ls
# Delete a stream (example: DISS)
nats stream rm DISS
wipe_stream(admin endpoint) removes messages but preserves stream configuration. Use stream deletion only when you need historical data physically rewritten.
Verifying Effective Stream Policy
Use the nats CLI to inspect the stream config after a publish or reconcile:
# Replace POLYGON with your stream name (MARS, DISS, etc.)
nats --server nats://localhost:4222 stream info POLYGON
Fields to check:
| CLI field | Config field |
|---|---|
Max Age | retention_time |
Max Messages | max_messages |
Max Bytes | max_bytes / per-schema max_size |
Max Messages Per Subject | allow_duplicates: 1 = disabled, -1 = enabled |
Compression | None or S2 |
Replay Behavior
- Sequence replay (
from_id): starts from that sequence number, inclusive. - Time replay (
from_date): uses JetStream start-time delivery policy. - The API enforces mutual exclusivity —
from_idandfrom_datecannot both be present.
Smoke Test (JetStream Mode)
python3 -m pip install httpx
BACKEND=jetstream \
NATS_URL=nats://localhost:4222 \
JETSTREAM_POLICY_STREAM_NAME=POLYGON \
EXPECT_MAX_MESSAGES=500000 \
EXPECT_MAX_BYTES=2147483648 \
EXPECT_MAX_MESSAGES_PER_SUBJECT=1 \
EXPECT_COMPRESSION=None \
python3 scripts/smoke_test.py
Operational Caveats
- Startup connectivity is controlled by
timeout_seconds+retry_attempts. - Runtime reconnect is controlled by
enable_auto_reconnect,max_reconnect_attempts,reconnect_delay_ms. - Publish retry is a narrow resilience path for transient
channel closedfailures; non-transient failures fail fast. retry_attemptsapplies only to startup; post-startup reconnect uses the reconnect settings.- Setting
max_reconnect_attempts = 0enables unlimited reconnect retries.