Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Aviso Logo Aviso Logo

Introduction

Aviso Server is a real-time notification and historical replay system built for data dissemination pipelines. It lets publishers announce data availability events and lets subscribers receive those events — either live as they happen, or replayed from a chosen point in history.

It is designed for environments where timely, reliable notification of data availability is critical: scientific computing, operational weather forecasting pipelines, large-scale data distribution, and similar domains.


How It Works

At its core, Aviso Server exposes three operations:

graph LR
    P(Publisher) -->|POST /api/v1/notification| A[Aviso Server]
    A -->|store| B[("Backend<br/>JetStream / In-Memory")]
    B -->|fan-out| W1("Subscriber A<br/>watch")
    B -->|fan-out| W2("Subscriber B<br/>watch")
    B -->|history| R("Client<br/>replay")
OperationEndpointDescription
NotifyPOST /api/v1/notificationPublish a notification event to the backend
WatchPOST /api/v1/watchStream live (and optionally historical) events over SSE
ReplayPOST /api/v1/replayStream historical events only, then close

Key Features

Schema-driven validation

Each event type can have a schema that defines which identifier fields are required, their types (date, time, integer, float, enum, polygon), allowed ranges, and topic ordering. Invalid notifications are rejected at the API boundary with a clear error.

Structured topic routing

Aviso builds a deterministic topic string from the notification's identifier fields. This topic is used to route, store, and filter messages in the backend. Subscribers can use wildcard patterns and constraint objects to filter the stream.

Hybrid filtering

Watch and replay requests are matched using a two-tier strategy: the backend handles coarse routing (broad subject filters), and Aviso applies precise application-level filtering (constraints, spatial checks) on top. This keeps backend subscription counts low while delivering exact results.

Spatial awareness

Polygon and point identifiers are first-class — notifications can carry geographic polygons, and subscribers can filter by polygon intersection or point containment.

Pluggable backends

Aviso abstracts storage behind a NotificationBackend trait. Today two backends ship: JetStream (NATS-backed, durable, production-ready) and In-Memory (single-process, for development and testing).

Server-Sent Events (SSE)

Watch and replay streams use SSE — a simple, firewall-friendly HTTP streaming protocol supported natively by browsers and all major HTTP clients. The stream includes typed control frames (connection established, replay started/completed, heartbeats, and graceful close reasons).

CloudEvents format

All delivered notifications follow the CloudEvents specification, making them easy to integrate with other event-driven systems.


Use Cases

  • Data availability monitoring — trigger downstream workflows the moment a dataset lands
  • Operational pipelines — coordinate processing steps across distributed services
  • Audit and compliance — replay historical events to reconstruct what was published and when
  • System integration — connect disparate systems through a standardized event interface

Where to Go Next

If you are new to Aviso, read these pages in order:

  1. Key Concepts — understand the terminology before anything else
  2. Installation — get the server running
  3. Getting Started — send your first notification and watch it arrive
  4. Practical Examples — copy-paste workflows for common scenarios

If you are configuring for production:

Key Concepts

This page defines the core terms you will encounter throughout Aviso's documentation and API. Reading it before Getting Started will make everything else click faster.


Event Type

An event type is a named category of notification — for example extreme_event, sensor_alert, or data_ready.

Every request to Aviso (notify, watch, or replay) targets exactly one event type. The server uses the event type to:

  • look up the matching schema (validation rules, required fields, topic ordering),
  • route messages to the correct storage stream,
  • apply the correct retention and storage policy.
{ "event_type": "extreme_event", ... }

If no schema is configured for an event type, Aviso falls back to generic behavior: fields are accepted as-is and the topic is built from sorted keys.


Identifier

An identifier is a set of key-value pairs that describe what a notification is about. Think of it as structured metadata that uniquely (or approximately) locates a piece of data.

{
  "identifier": {
    "region":   "north",
    "run_time": "1200",
    "severity": "4",
    "anomaly":  "42.5"
  }
}

Identifiers serve two purposes depending on the operation:

OperationRole of identifier
notifyDeclares the metadata of the notification being published
watchActs as a filter — which notifications to receive
replayActs as a filter — which historical notifications to retrieve

In watch and replay, identifier values can be constraint objects instead of scalars (e.g. {"gte": 5}) for numeric and enum fields. See Streaming Semantics.


Topic

A topic is the internal routing key Aviso builds from an identifier. You rarely construct topics manually — Aviso builds them for you.

Topics are dot-separated strings, for example:

extreme_event.north.1200.4.42%2E5

Each token corresponds to one identifier field, in the order defined by key_order in the schema. Note the %2E — the dot in 42.5 is percent-encoded so it doesn't conflict with the topic separator. Reserved characters (., *, >, %) in field values are percent-encoded before writing to the backend so they do not break routing. See Topic Encoding.


Schema

A schema configures how Aviso handles a specific event type. Schemas are defined in configuration/config.yaml under notification_schema.

A schema controls:

  • topic.base — the stream/prefix for this event type (e.g. diss, mars)
  • topic.key_order — the order of identifier fields in the topic string
  • identifier.* — validation rules per field (type, required, allowed values, ranges)
  • payload.required — whether a payload is mandatory on notify
  • storage_policy — per-stream retention, size limits, compression (JetStream only)

Example:

notification_schema:
  extreme_event:
    topic:
      base: "extreme_event"
      key_order: ["region", "run_time", "severity", "anomaly", "polygon"]
    identifier:
      region:
        description: "Geographic region label."
        type: EnumHandler
        values: ["north", "south", "east", "west"]
        required: true
      run_time:
        type: TimeHandler
        required: true
      severity:
        description: "Severity level from 1 to 7."
        type: IntHandler
        range: [1, 7]
        required: true
      anomaly:
        type: FloatHandler
        range: [0.0, 100.0]
        required: false
      polygon:
        type: PolygonHandler
        required: false
    payload:
      required: false

Every key listed in key_order must have a corresponding entry in identifier. For notify, every identifier key declared in the schema must be present in the request. Fields marked required: false are optional for validation semantics, but the key itself is still required on notify so Aviso can build a deterministic topic.


Payload

A payload is arbitrary JSON attached to a notification. Aviso treats it as opaque — it stores and replays the value exactly as sent.

Valid payload types: object, array, string, number, boolean, or null.

{ "payload": { "path": "/data/grib2/file.grib2", "size": 1048576 } }

Whether payload is required or optional is controlled per schema by payload.required. See Payload Contract for the full input → storage → replay mapping.


Operations: Notify, Watch, Replay

Aviso exposes three operations, each on its own endpoint:

Notify — POST /api/v1/notification

Publishes a notification to the backend. The identifier must match all required schema fields exactly (no wildcards, no constraints).

Watch — POST /api/v1/watch

Opens a persistent Server-Sent Events (SSE) stream. Receives live notifications as they arrive, optionally starting from a historical point.

  • Omit from_id/from_date → live-only stream
  • Provide one of them → historical replay first, then live

Replay — POST /api/v1/replay

Opens a finite SSE stream of historical notifications only. Requires exactly one of from_id or from_date. Stream closes automatically when history is exhausted.

For end-to-end working examples of all three operations — including spatial and constraint filtering — see Practical Examples.


CloudEvents

Aviso delivers notifications to watch/replay clients as CloudEvents — a standard envelope format. Each event includes:

  • id — the backend sequence reference (e.g. mars@42), used for targeted delete or resume
  • type — the Aviso event type string
  • source — the server base URL
  • data.identifier — the canonicalized identifier
  • data.payload — the notification payload (or null if omitted)

Backend

The backend is the storage and messaging layer that Aviso delegates to. Two backends are supported:

BackendUse case
jetstreamProduction: durable, replicated, persistent history
in_memoryDevelopment: fast setup, no persistence

The backend is selected via notification_backend.kind in config. See Backends Overview.

Installation

This page covers every way to get Aviso Server running: building from source, using Docker, or deploying to Kubernetes with Helm.


Prerequisites

Rust toolchain

Aviso is written in Rust and requires edition 2024, which means Rust 1.85 or newer. The CI always builds against the latest stable toolchain.

Install or update Rust via rustup:

curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

Verify your version:

rustc --version
# rustc 1.85.0 (... ) or newer

System dependencies

Aviso links against OpenSSL. On Debian/Ubuntu:

sudo apt-get install -y libssl-dev pkg-config build-essential

On Fedora/RHEL:

sudo dnf install -y openssl-devel pkg-config gcc

On macOS (with Homebrew):

brew install openssl pkg-config

Build from Source

Clone the repository:

git clone https://github.com/ecmwf/aviso-server.git
cd aviso-server

Development build

Fast to compile, includes debug symbols:

cargo build

Binary location: target/debug/aviso_server

Release build

Optimized for production use:

cargo build --release

Binary location: target/release/aviso_server

Run directly

cargo run                          # development
cargo run --release                # release
./target/release/aviso_server      # pre-built binary

The server loads ./configuration/config.yaml by default. See Configuration for all config loading options.


Docker

The repository includes a multi-stage Dockerfile that produces a minimal distroless image.

Build the image

# Production image (distroless, minimal attack surface)
docker build --target release -t aviso-server:local .

# Debug image (Debian slim, includes bash for troubleshooting)
docker build --target debug -t aviso-server:debug .

Run with Docker

Mount your config file and expose the port:

docker run --rm \
  -p 8000:8000 \
  -v $(pwd)/configuration/config.yaml:/app/configuration/config.yaml:ro \
  aviso-server:local

Or override settings via environment variables (no config mount needed):

docker run --rm \
  -p 8000:8000 \
  -e AVISOSERVER_APPLICATION__HOST=0.0.0.0 \
  -e AVISOSERVER_APPLICATION__PORT=8000 \
  -e AVISOSERVER_NOTIFICATION_BACKEND__KIND=in_memory \
  aviso-server:local

Build targets summary

TargetBase imageSizeUse
releasedistroless/ccminimalProduction
debugdebian:bookworm-slimlargerTroubleshooting

Local JetStream (Docker)

For local development with the JetStream backend, use the provided script to spin up a NATS server with JetStream enabled:

./scripts/init_nats.sh

This script:

  • Generates a NATS config file in ./nats_config/
  • Creates a Docker volume for persistent JetStream storage
  • Starts a nats:2-alpine container on localhost:4222
  • Waits for the server to be ready and prints a connection summary

Requires: Docker

Optional environment variables:

NATS_PORT=4222            # NATS client port (default: 4222)
ENABLE_AUTH=true          # Enable token auth (default: false)
MAX_MEMORY=5GB            # JetStream memory limit (default: 5GB)
MAX_STORAGE=10GB          # JetStream file storage limit (default: 10GB)

Example with auth enabled:

ENABLE_AUTH=true ./scripts/init_nats.sh

The script prints the generated token at the end of its output, for example:

Authentication enabled with token: aviso_secure_token_1712345678

After the script completes, configure Aviso to connect:

Without auth (default):

notification_backend:
  kind: jetstream
  jetstream:
    nats_url: "nats://localhost:4222"

With auth — pass the token printed by the script:

notification_backend:
  kind: jetstream
  jetstream:
    nats_url: "nats://localhost:4222"
    token: "aviso_secure_token_1712345678"

Alternatively, set the token as an environment variable (Aviso reads NATS_TOKEN automatically):

export NATS_TOKEN=aviso_secure_token_1712345678
cargo run

Kubernetes / Helm

For production Kubernetes deployments, use the official Helm chart:

The chart handles:

  • Deployment with configurable replicas
  • ConfigMap-based configuration mounting
  • Service and Ingress setup
  • JetStream connection settings via values

Build Documentation

Aviso docs are built with mdBook.

Install mdBook and the mermaid preprocessor:

cargo install mdbook
cargo install mdbook-mermaid

Serve docs locally with live reload:

mdbook serve docs --open

Build static output to docs/book/:

mdbook build docs

Run Tests

# Unit and integration tests (in-memory backend)
cargo test --workspace

# Include JetStream integration tests (requires running NATS)
AVISO_RUN_NATS_TESTS=1 cargo test --workspace

# Tests must run single-threaded (shared port binding)
cargo test -- --test-threads=1

Getting Started

This guide walks you through running Aviso Server locally and sending your first notification. It assumes you have already completed Installation.

If you haven't read Key Concepts yet, do that first — it will make the commands below much easier to follow.


1. Choose a Backend

Aviso needs a storage backend before it can accept notifications. For local exploration, in-memory requires zero infrastructure.

GoalBackend
Quick local test, no setupin_memory
Persistent history, realistic behaviorjetstream

The examples on this page use in_memory. To use JetStream locally, see Local JetStream setup below.


2. Configure the Server

Open configuration/config.yaml and make sure it contains at minimum:

application:
  host: "127.0.0.1"
  port: 8000

notification_backend:
  kind: in_memory
  in_memory:
    max_history_per_topic: 100
    max_topics: 10000

notification_schema:
  my_event:
    topic:
      base: "my_event"
      key_order: ["region", "date"]
    identifier:
      region:
        description: "Geographic region label."
        type: EnumHandler
        values: ["north", "south", "east", "west"]
        required: true
      date:
        type: DateHandler
        required: true
    payload:
      required: false

You can also use environment variables to override any config value without editing the file. See Configuration for the full precedence rules.


3. Start the Server

cargo run

Or with the release binary:

./target/release/aviso_server

You should see structured JSON log output. Once you see a line like:

{"level":"INFO","message":"aviso-server listening","address":"127.0.0.1:8000"}

the server is ready.


4. Check the Health Endpoint

curl -sS http://127.0.0.1:8000/health

Expected response: 200 OK


5. Open a Watch Stream

Before publishing, open a terminal and start watching for events. This is a live SSE stream — keep it open while you proceed to step 6.

curl -N -X POST "http://127.0.0.1:8000/api/v1/watch" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type": "my_event",
    "identifier": {
      "region": "north",
      "date":   "20250706"
    }
  }'

You will see the SSE connection frame immediately:

data: {"type":"connection_established","timestamp":"2026-03-04T10:00:00Z"}

The stream stays open and will print new events as they arrive.


6. Publish a Notification

In a second terminal, send a notification:

curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type": "my_event",
    "identifier": {
      "region": "north",
      "date":   "20250706"
    },
    "payload": { "note": "data is ready" }
  }'

Expected response:

{ "id": "my_event@1", "topic": "my_event.north.20250706" }

The id (my_event@1) is the backend sequence reference. You can use it later to replay from that point or delete that specific notification.

Switch back to the watch terminal — the notification should have arrived:

data: {"specversion":"1.0","id":"my_event@1","type":"aviso.notification",...}

7. Replay History

Once you have published a few notifications, you can replay them from a specific point:

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type": "my_event",
    "identifier": {
      "region": "north",
      "date":   "20250706"
    },
    "from_id": "1"
  }'

The stream will emit all matching historical notifications, then close with:

data: {"type":"connection_closing","reason":"end_of_stream","timestamp":"..."}

Optional: Local JetStream Setup

To test with the JetStream backend (persistent storage, more realistic), see Installation — Local JetStream for the full setup including environment variables and token authentication.


Run the Smoke Test

A Python smoke script covers the full notify → watch → replay cycle. Copy the example config and start the server:

cp configuration/config.yaml.example configuration/config.yaml
cargo run

With auth (default) — start auth-o-tron before running the smoke tests:

python3 -m pip install httpx
./scripts/auth-o-tron-docker.sh
python3 scripts/smoke_test.py

Without auth — set auth.enabled: false in your config (or remove the auth section), then:

AUTH_ENABLED=false python3 scripts/smoke_test.py

AUTH_ENABLED must match the server's auth.enabled setting. When false, auth headers are omitted and auth-specific smoke tests are skipped.

Useful overrides:

BASE_URL="http://127.0.0.1:8000" python3 scripts/smoke_test.py
BACKEND="jetstream"               python3 scripts/smoke_test.py
TIMEOUT_SECONDS=12                python3 scripts/smoke_test.py
SMOKE_VERBOSE=1                   python3 scripts/smoke_test.py
python3 scripts/smoke_test.py --verbose

The smoke script covers:

  • health endpoint
  • replay/watch baseline flows (test_polygon)
  • mars replay with dot-containing identifier values, integer and enum predicates
  • dissemination watch + from_date with dot-containing identifier values
  • read/write auth separation across public, role-restricted, and admin-only streams

What's Next

Defining Notification Schemas

A notification schema describes the shape of an event stream: what identifier fields are accepted, how they are validated, how the storage topic is constructed, whether a payload is required, and who can read or write.

Schemas are defined under the notification_schema key in your configuration file. Each top-level key becomes an event type that clients reference when calling /api/v1/notification, /api/v1/watch, or /api/v1/replay.

notification_schema:
  my_event:          # ← event type name
    topic: ...
    identifier: ...
    payload: ...
    auth: ...            # optional
    storage_policy: ...  # optional, JetStream only

Topic Configuration

A schema should have a topic block that tells Aviso how to build the NATS subject for storage and routing.

topic:
  base: "weather"
  key_order: ["region", "date"]
FieldDescription
baseRoot prefix for the subject. Must be unique across all schemas (case-insensitive).
key_orderOrdered list of identifier field names appended to the base, separated by ..

Given base: "weather" and key_order: ["region", "date"], a request with region=north and date=20250706 produces the subject:

weather.north.20250706

Values containing reserved characters (., *, >, %) are automatically percent-encoded so they do not interfere with NATS subject routing. See Topic Encoding for details.

Only fields listed in key_order contribute to the subject. Other identifier fields are validated but not part of the topic.


Identifier Fields

The identifier map defines the fields that clients can send. Each field specifies a handler type that controls validation and canonicalization.

identifier:
  region:
    type: EnumHandler
    values: ["north", "south", "east", "west"]
    required: true
    description: "Geographic region."
  date:
    type: DateHandler
    required: true

Every field supports these common properties:

PropertyTypeDescription
typestringHandler type (see below). Required.
requiredboolIf true, requests missing this field are rejected. Required.
descriptionstringHuman-readable text exposed by GET /api/v1/schema. Optional.

Handler Types

StringHandler

Accepts any non-empty string. No transformation.

class:
  type: StringHandler
  max_length: 2      # optional: reject strings longer than this
  required: true

DateHandler

Parses dates in multiple formats and canonicalizes to a configured output format.

Accepted inputs: YYYY-MM-DD, YYYYMMDD, YYYY-DDD (day-of-year).

date:
  type: DateHandler
  canonical_format: "%Y%m%d"   # output format (default: "%Y%m%d")
  required: false
canonical_formatExample output
"%Y%m%d"20250706
"%Y-%m-%d"2025-07-06

Invalid dates (e.g. February 30) are rejected.

TimeHandler

Parses times and canonicalizes to four-digit HHMM format.

Accepted inputs: 14:30, 1430, 14, 9:05.

time:
  type: TimeHandler
  required: false

Input 14:30 → stored as 1430. Input 9 → stored as 0900.

EnumHandler

Accepts one value from a predefined list. Matching is case-insensitive; stored in lowercase.

domain:
  type: EnumHandler
  values: ["a", "b", "c"]
  required: false

Input "A" → stored as "a". Input "x" → rejected.

IntHandler

Accepts integer strings. Strips leading zeros for canonical storage.

step:
  type: IntHandler
  range: [0, 100000]   # optional: inclusive [min, max] bounds
  required: false

Input "007" → stored as "7". Input "-1" with range: [0, 100] → rejected.

FloatHandler

Accepts floating-point strings. Rejects NaN and Inf.

severity:
  type: FloatHandler
  range: [0.0, 10.0]   # optional: inclusive [min, max] bounds
  required: false

Input "3.14" → stored as "3.14". Input "NaN" → rejected.

ExpverHandler

Experiment version handler. Numeric values are zero-padded to four digits; non-numeric values are lowercased.

expver:
  type: ExpverHandler
  default: "0001"    # optional: used when the field is empty
  required: false

Input "1" → stored as "0001". Input "test" → stored as "test".

PolygonHandler

Accepts a closed polygon as a coordinate string. Used for spatial filtering on /watch and /replay.

Format: lat,lon,lat,lon,...,lat,lon — the first and last coordinate pair must be identical to close the polygon. Parentheses are optional.

polygon:
  type: PolygonHandler
  required: true

Constraints: at least 3 coordinate pairs (plus closing repeat), latitude in [-90, 90], longitude in [-180, 180].

See Spatial Filtering for usage examples.

Reserved Query-Time Fields

point (built-in)

The point field is a reserved identifier that clients can send on /watch or /replay to filter notifications whose polygon contains the given point. It accepts a single lat,lon coordinate pair.

point is not a schema-configurable handler — it is always available on any schema that includes a PolygonHandler field. The /notification endpoint rejects requests that include point.

See Spatial Filtering for usage examples.


Payload Configuration

Controls whether requests must include a payload field.

payload:
  required: true
requiredBehavior
trueRequests without a payload are rejected (400).
falsePayload is optional; missing payloads are stored as JSON null.

The payload can be any valid JSON value (object, array, string, number, boolean, null). It is stored as-is with no reshaping.

See Payload Contract for full semantics.


Per-Stream Authentication

When global authentication is enabled, individual schemas can require credentials and restrict access by role.

auth:
  required: true
  read_roles:
    localrealm: ["analyst", "consumer"]
  write_roles:
    localrealm: ["producer"]
FieldDefault when omittedEffect
requiredMust be set explicitly to true or false.
read_rolesAny authenticated user can readMaps realm → role list for watch/replay access.
write_rolesOnly admins can writeMaps realm → role list for notify access.

Use ["*"] as the role list to grant access to all users from a realm.

Admins (users matching global admin_roles) always have both read and write access.

Omitting the entire auth block makes the stream publicly accessible, even when global auth is enabled.

See Authentication for the full access-control matrix and role-matching rules.


Storage Policy (JetStream Only)

When using the JetStream backend, you can configure per-stream retention limits.

storage_policy:
  retention_time: "7d"
  max_messages: 500000
  max_size: "2Gi"
  allow_duplicates: false
  compression: true
FieldTypeDescription
retention_timedurationDiscard messages older than this. Accepts 30m, 1h, 7d, 1w.
max_messagesintegerMaximum message count; oldest are discarded when exceeded.
max_sizesizeMaximum stream size. Accepts 100Mi, 1Gi, etc.
allow_duplicatesboolAllow duplicate message IDs. Default: backend-specific.
compressionboolEnable message-level compression. Default: backend-specific.

All fields are optional. Omitting storage_policy entirely uses backend defaults.

The in-memory backend does not support storage policies.


Complete Example

This example defines a weather alert stream with date/region routing, enum validation, optional payload, and role-restricted access.

notification_schema:
  weather_alert:
    payload:
      required: false

    topic:
      base: "alert"
      key_order: ["region", "severity_level", "date"]

    identifier:
      region:
        description: "Geographic region."
        type: EnumHandler
        values: ["europe", "asia", "africa", "americas", "oceania"]
        required: true
      severity_level:
        description: "Alert severity (1–5)."
        type: IntHandler
        range: [1, 5]
        required: true
      date:
        description: "Alert date."
        type: DateHandler
        canonical_format: "%Y%m%d"
        required: true
      issued_by:
        description: "Issuing authority identifier."
        type: StringHandler
        max_length: 64
        required: false

    auth:
      required: true
      read_roles:
        operations: ["*"]
      write_roles:
        operations: ["forecaster", "admin"]

    storage_policy:
      retention_time: "30d"
      max_messages: 100000

With this schema:

  • Publishing a notification with region=europe, severity_level=3, date=2025-07-06 produces the subject alert.europe.3.20250706.
  • The issued_by field is validated if present but does not appear in the subject (not in key_order).
  • Any authenticated user in the operations realm can watch/replay.
  • Only users with the forecaster or admin role can publish.
  • JetStream retains up to 100,000 messages or 30 days, whichever limit is hit first.

Tips

  • Start simple. Define only topic, one or two identifier fields, and payload. Add auth and storage policy later.
  • Use key_order deliberately. Fields in key_order become part of the NATS subject and affect routing granularity. More fields = more specific topics = more efficient filtering, but also more distinct subjects.
  • Mark routing fields required. If a field is in key_order, consider making it required: true so every notification produces a complete subject.
  • Keep base short and unique. It is the root of every subject in this stream. Avoid collisions with other schemas.
  • Test with GET /api/v1/schema/{event_type}. This endpoint returns the public view of your schema, showing all identifier fields and their validation rules.

Practical Examples

This section provides copy-paste examples for common workflows.

All examples use the same generic event schema so behavior is easy to compare.

Shared Generic Schema

notification_schema:
  extreme_event:
    topic:
      base: extreme_event
      key_order: [region, run_time, severity, anomaly, polygon]
    identifier:
      region:
        description: "Geographic region label."
        type: EnumHandler
        values: ["north", "south", "east", "west"]
        required: true
      run_time:
        type: TimeHandler
        required: true
      severity:
        description: "Severity level from 1 to 7."
        type: IntHandler
        range: [1, 7]
        required: true
      anomaly:
        type: FloatHandler
        range: [0.0, 100.0]
        required: false
      polygon:
        type: PolygonHandler
        required: false
    payload:
      required: false

Shared Assumptions

  • Base URL: http://127.0.0.1:8000
  • Content type: application/json
  • Replay examples use from_id or from_date explicitly.

Next:

Basic Notify/Watch/Replay

Uses the shared generic schema from Practical Examples.

This page is the quickest way to understand the normal API flow. You first publish (notify), then observe live updates (watch), then read history (replay). If you are onboarding a new environment, start here before trying filters or admin operations. Read the examples in order.

1) Notify

curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4",
      "anomaly":"42.5"
    },
    "payload":{"note":"initial forecast"}
  }'

Expected:

  • HTTP 200
  • required identifier keys must be present; optional keys may be omitted

2) Watch (Live Only)

curl -N -X POST "http://127.0.0.1:8000/api/v1/watch" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4",
      "anomaly":"42.5"
    }
  }'

Expected:

  • HTTP 200
  • SSE starts with connection_established
  • only new matching notifications arrive

3) Replay (Historical)

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4",
      "anomaly":"42.5"
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 200
  • SSE emits replay_started, replay events, replay_completed, then closes

Constraint Filtering

Uses the shared generic schema from Practical Examples.

Constraint filtering lets subscribers express conditions over identifier fields instead of exact values. In practice, this is how you ask for ranges (severity >= 5), numeric bands, or enum subsets. This page starts with seed data, then shows valid constraint requests, then common failure cases. It is the best reference for building client-side filter payloads.

Seed Notifications

curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"north","run_time":"1200","severity":"3","anomaly":42.5},
    "payload":{"note":"seed-a"}
  }'

curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"south","run_time":"1200","severity":"6","anomaly":87.2},
    "payload":{"note":"seed-b"}
  }'

Expected:

  • both return HTTP 200

Scalar Value (Implicit eq)

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"south","run_time":"1200","severity":6,"anomaly":87.2},
    "from_id":"1"
  }'

Expected:

  • HTTP 200
  • only severity = 6 notifications match

Integer Constraint (gte)

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":{"in":["north","south"]},
      "run_time":"1200",
      "severity":{"gte":5},
      "anomaly":87.2
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 200
  • includes severity=6
  • excludes severity=3

Float Constraint (between)

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"3",
      "anomaly":{"between":[40.0,50.0]}
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 200
  • includes anomaly=42.5

Float eq Is Exact (No Tolerance)

Float eq and in are exact comparisons. This keeps behavior deterministic across replay/live and avoids hidden tolerance windows.

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"3",
      "anomaly":{"eq":42.5}
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 200
  • only notifications with exactly anomaly=42.5 match
  • NaN/inf values are rejected by float validation/constraints

Enum Constraint (in)

curl -N -X POST "http://127.0.0.1:8000/api/v1/watch" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":{"in":["south","west"]},
      "run_time":"1200",
      "severity":"6",
      "anomaly":87.2
    }
  }'

Expected:

  • HTTP 200
  • live notifications pass only for regions in ["south","west"]

Invalid: Two Operators in One Constraint Object

curl -sS -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":{"gte":4,"lt":7},
      "anomaly":42.5
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 400
  • message says constraint object must contain exactly one operator

Invalid: Constraint Object on /notification

curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":{"gte":4},
      "anomaly":42.5
    },
    "payload":{"note":"should-fail"}
  }'

Expected:

  • HTTP 400

Spatial Filtering

Uses the shared generic schema from Practical Examples.

Spatial filtering has two modes:

  • identifier.polygon: keep notifications whose polygon intersects the request polygon.
  • identifier.point: keep notifications whose polygon contains the request point.

This matters when many notifications share similar non-spatial identifiers and you need geographic precision.

Seed Notifications

These two notifications differ only by polygon shape.

curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4",
      "polygon":"(52.5,13.4,52.6,13.5,52.5,13.6,52.4,13.5,52.5,13.4)"
    },
    "payload":{"note":"poly-a"}
  }'

curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4",
      "polygon":"(10.0,10.0,10.2,10.0,10.2,10.2,10.0,10.2,10.0,10.0)"
    },
    "payload":{"note":"poly-b"}
  }'

Expected:

  • both requests return HTTP 200

Replay with Polygon Intersection Filter

This request polygon intersects poly-a but not poly-b.

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4",
      "polygon":"(52.52,13.45,52.62,13.55,52.52,13.65,52.42,13.55,52.52,13.45)"
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 200
  • replay includes poly-a
  • replay excludes poly-b

Replay with Point Containment Filter

The point below is inside poly-a and outside poly-b.

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4",
      "point":"52.55,13.50"
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 200
  • replay includes poly-a
  • replay excludes poly-b

Replay Without Spatial Filter

No polygon and no point means no spatial narrowing.

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4"
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 200
  • replay may include both poly-a and poly-b because only non-spatial fields are applied

Invalid: polygon and point Together

curl -sS -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4",
      "polygon":"(52.5,13.4,52.6,13.5,52.5,13.6,52.4,13.5,52.5,13.4)",
      "point":"52.55,13.50"
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 400
  • validation error says both spatial filters cannot be used together

Replay Starting Points

Uses the shared generic schema from Practical Examples.

Replay start parameters control where historical delivery begins. Choose from_id when you track sequence progress; choose from_date when you track wall-clock time. These examples cover valid forms and the common invalid combinations that return 400. Use this page to validate client retry and resume logic.

Replay from Sequence (from_id)

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"},
    "from_id":"10"
  }'

Expected:

  • HTTP 200
  • replay starts from sequence 10 (inclusive)

Replay from Time (from_date) RFC3339

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"},
    "from_date":"2026-03-01T12:00:00Z"
  }'

Expected:

  • HTTP 200
  • replay starts from that UTC timestamp (inclusive)

Replay from Time (from_date) Unix Seconds

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"},
    "from_date":"1740509903"
  }'

Expected:

  • HTTP 200

Replay from Time (from_date) Unix Milliseconds

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"},
    "from_date":"1740509903710"
  }'

Expected:

  • HTTP 200

Invalid Replay Start Combinations

Missing Both from_id and from_date

curl -sS -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"}
  }'

Expected:

  • HTTP 400

Both from_id and from_date Provided

curl -sS -X POST "http://127.0.0.1:8000/api/v1/watch" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"},
    "from_id":"5",
    "from_date":"2026-03-01T12:00:00Z"
  }'

Expected:

  • HTTP 400

Admin Operations (Practical)

See full reference in Admin Operations.

These operations are for cleanup and recovery, not normal data flow. Use delete when one bad record must be removed; use wipe when resetting a stream or environment. Because these endpoints are destructive, validate IDs and stream names carefully before execution.

Delete One Notification by ID

curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/notification/extreme_event@42"

Expected:

  • 200 if it exists
  • 404 if stream/sequence does not exist
  • 400 for invalid format

Wipe One Stream

curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/wipe/stream" \
  -H "Content-Type: application/json" \
  -d '{"stream_name":"EXTREME_EVENT"}'

Expected:

  • 200
  • stream definition remains, messages are removed

Wipe All Streams

curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/wipe/all"

Expected:

  • 200
  • all stream data is removed

Backends Overview

Aviso abstracts all storage and messaging behind a NotificationBackend trait. Two implementations ship out of the box.


Which Backend Should I Use?

flowchart TD
    A["What do you need?"] --> B{"Persistent history<br/>across restarts?"}
    B -->|Yes| C[jetstream]
    B -->|No| D{"Multiple replicas<br/>or pods?"}
    D -->|Yes| C
    D -->|No| E{"Production<br/>workload?"}
    E -->|Yes| C
    E -->|No| F[in_memory]

    C:::jet
    F:::mem

    classDef jet fill:#1a6b3a,color:#fff,stroke:#0d4a27
    classDef mem fill:#1a4d6b,color:#fff,stroke:#0d3347
RequirementRecommended backend
Persistent history across restartsjetstream
Replay endpoint supportjetstream (or in_memory for local/node-local use)
Live watch streaming supportjetstream (or in_memory for local/node-local use)
Multi-replica deploymentjetstream
Quick local experimentation with minimal setupin_memory

Capability Comparison

CapabilityJetStreamIn-Memory
Durable storageYesNo — data lost on restart
Replay supportYesYes — node-local only
Live watch supportYesYes — node-local fan-out
Multi-replica / HAYes (clustered NATS)No
Per-schema storage policyYesNo — rejected at startup
Cross-instance consistencyYesNo

Backend Details

InMemory Backend

Intended use

in_memory backend is best for:

  • local development,
  • schema/validation testing,
  • lightweight experimentation where persistence is not required.

Behavior

  • Data is process-memory only and is lost on restart.
  • Topic/message limits are enforced with eviction.
  • No shared state across replicas or pods.
  • Supports live watch subscriptions (live-only delivery).
  • Supports replay batch retrieval for from_id and from_date.
  • Uses in-process fanout only, so subscriptions/replay are node-local.

Configuration

notification_backend.kind: in_memory

Available knobs:

  • max_history_per_topic (default 1)
  • max_topics (default 10000)
  • enable_metrics (default false)

Per-schema storage_policy fields are currently not supported on in_memory and are rejected at startup.

Production suitability

Not recommended for production because:

  • no durability,
  • no HA replication,
  • no cross-instance consistency,
  • replay/watch history is limited to local in-memory retention.

JetStream Backend

The jetstream backend is the production-oriented storage implementation. It connects to a NATS server with JetStream enabled and uses it for durable message storage, replay, and live streaming.


Intended Use

Use jetstream when you need:

  • durable storage that survives server restarts
  • replay across multiple server instances
  • live streaming with cluster-wide fan-out
  • configurable retention, size limits, and compression

Local Test Setup

Start a NATS + JetStream instance via Docker:

./scripts/init_nats.sh

Then configure Aviso:

notification_backend:
  kind: jetstream
  jetstream:
    nats_url: "nats://localhost:4222"

For full setup options including authentication and storage limits, see Installation — Local JetStream.


Core Behavior

  • Connects to the configured NATS server on startup (with retry).
  • Creates JetStream streams on demand, one per topic base (e.g. MARS, DISS, POLYGON).
  • Publishes notifications directly to JetStream subjects using the encoded wire format.
  • Uses pull consumers for replay batching (from_id, from_date).
  • Uses push consumers for live watch subscriptions.
  • Reconciles existing streams against current config when they are first accessed.

Configuration Reference

All fields live under notification_backend.jetstream.

Connection & startup

FieldDefaultNotes
nats_urlnats://localhost:4222NATS server URL.
tokenNoneToken auth; falls back to NATS_TOKEN environment variable.
timeout_seconds30Per-attempt connection timeout (> 0).
retry_attempts3Startup connection attempts before backend init fails (> 0).

Runtime reconnect

FieldDefaultNotes
enable_auto_reconnecttrueEnables/disables NATS client reconnect after startup.
max_reconnect_attempts50 means unlimited reconnect retries.
reconnect_delay_ms2000Delay between reconnect attempts and startup connect retries (> 0).

Publish resilience

FieldDefaultNotes
publish_retry_attempts5Retries for transient channel closed publish failures (> 0).
publish_retry_base_delay_ms150Base backoff in ms for publish retries; grows exponentially per attempt (> 0).

Stream defaults

These apply to every stream created by Aviso unless overridden by a per-schema storage_policy.

FieldDefaultNotes
max_messagesNoneStream message cap (maps to max_messages).
max_bytesNoneStream size cap in bytes (maps to max_bytes).
retention_timeNoneDefault max age: duration literal (s, m, h, d, w; e.g. 30d).
storage_typefilefile or memory — parsed as typed enum at config load.
replicasNoneStream replica count.
retention_policylimitslimits, interest, or workqueue — parsed as typed enum.
discard_policyoldold or new — parsed as typed enum.

Fail-fast validation: storage_type, retention_policy, and discard_policy are parsed as typed enums during configuration loading. Invalid values fail startup immediately, before any streams are created.

Full example

notification_backend:
  kind: jetstream
  jetstream:
    nats_url: "nats://localhost:4222"
    timeout_seconds: 30
    retry_attempts: 3
    enable_auto_reconnect: true
    max_reconnect_attempts: 5
    reconnect_delay_ms: 2000
    publish_retry_attempts: 5
    publish_retry_base_delay_ms: 150
    storage_type: file
    retention_policy: limits
    discard_policy: old

Stream Management

Stream creation

On first access (e.g. first publish for a given event type), Aviso creates a JetStream stream with the following settings applied:

  • storage_type, retention_policy, discard_policy
  • max_messages, max_bytes, retention_timemax_age
  • replicas

The stream subject binding is set to <base>.> (e.g. mars.>) to capture all topics under that base.

Reconciliation of existing streams

When a stream already exists and is accessed by Aviso, it is reconciled — the current stream config is compared against the desired config and mutable fields are updated if drift is detected:

  • limits (retention, size, message count)
  • compression
  • duplicate window
  • replicas
  • subject binding

If JetStream rejects an update (e.g. the field is not editable in the current server/stream state), Aviso logs a warning and continues with the existing stream configuration.

Precedence

Backend-level defaults are applied first, then per-schema storage_policy overrides for that stream:

notification_backend.jetstream.* (base defaults)
    ↓ overridden by
notification_schema.<event_type>.storage_policy.*

Applying config changes to existing streams

Changes to stream-affecting settings (e.g. compression, retention, limits) in config.yaml are applied to existing streams automatically during reconciliation when the stream is next accessed.

To force historical data to be physically rewritten with new settings (e.g. re-pack with compression):

  1. Stop all Aviso writers for the target stream.
  2. Delete the stream in NATS.
  3. Restart Aviso (or publish again) — the stream is recreated with current config.
# List streams
nats stream ls

# Delete a stream (example: DISS)
nats stream rm DISS

wipe_stream (admin endpoint) removes messages but preserves stream configuration. Use stream deletion only when you need historical data physically rewritten.


Verifying Effective Stream Policy

Use the nats CLI to inspect the stream config after a publish or reconcile:

# Replace POLYGON with your stream name (MARS, DISS, etc.)
nats --server nats://localhost:4222 stream info POLYGON

Fields to check:

CLI fieldConfig field
Max Ageretention_time
Max Messagesmax_messages
Max Bytesmax_bytes / per-schema max_size
Max Messages Per Subjectallow_duplicates: 1 = disabled, -1 = enabled
CompressionNone or S2

Replay Behavior

  • Sequence replay (from_id): starts from that sequence number, inclusive.
  • Time replay (from_date): uses JetStream start-time delivery policy.
  • The API enforces mutual exclusivity — from_id and from_date cannot both be present.

Smoke Test (JetStream Mode)

python3 -m pip install httpx

BACKEND=jetstream \
NATS_URL=nats://localhost:4222 \
JETSTREAM_POLICY_STREAM_NAME=POLYGON \
EXPECT_MAX_MESSAGES=500000 \
EXPECT_MAX_BYTES=2147483648 \
EXPECT_MAX_MESSAGES_PER_SUBJECT=1 \
EXPECT_COMPRESSION=None \
python3 scripts/smoke_test.py

Operational Caveats

  • Startup connectivity is controlled by timeout_seconds + retry_attempts.
  • Runtime reconnect is controlled by enable_auto_reconnect, max_reconnect_attempts, reconnect_delay_ms.
  • Publish retry is a narrow resilience path for transient channel closed failures; non-transient failures fail fast.
  • retry_attempts applies only to startup; post-startup reconnect uses the reconnect settings.
  • Setting max_reconnect_attempts = 0 enables unlimited reconnect retries.

Deployment Modes

Local experimentation

Recommended backend:

  • in_memory for quick local request/validation testing.

Characteristics:

  • No persistence: data is lost on process restart.
  • Single-process state only.
  • Not suitable for horizontal scaling or replica failover.
  • Supports replay and watch in-process, limited by local memory retention.
  • For local JetStream testing, see Installation — Local JetStream.
  • For a quick end-to-end behavior check, see Getting Started — Run the Smoke Test.

Production-like / persistent mode

Recommended backend:

  • jetstream

Characteristics:

  • Durable message storage.
  • Retention and size limits.
  • Replica support (requires clustered NATS setup).
  • Supports replay and live streaming workflows.

Recommended packaging/deployment for Kubernetes:

Selection guideline

  • Need persistence/replay/streaming robustness: use jetstream.
  • Need fastest setup for local functional checks only: use in_memory (node-local replay/watch).

Configuration

Key Behaviors to Know First

Before diving into field-level settings, these rules affect how everything behaves at runtime:

  • Environment variables always win — they override any YAML value regardless of which file it came from.
  • Replay and watch behavior is controlled by request parameters, not static config switches.
  • Invalid policy values fail startup immediatelystorage_type, retention_policy, and discard_policy are parsed as typed enums; bad values are caught before any streams are created.
  • Per-schema storage_policy is validated at startup against the selected backend's capabilities. Unsupported fields (e.g. retention_time on in_memory) cause a startup failure with a clear error.
  • JetStream stream changes are reconciled on access — updating compression, retention, or limits in config takes effect when that stream is next accessed. Recreate the stream only if you need historical data physically rewritten.
  • /api/v1/schema responses are client-focused — internal storage_policy settings are not exposed.

Loading Precedence

Configuration is loaded in this order (later sources override earlier ones):

  1. ./configuration/config.yaml
  2. /etc/aviso_server/config.yaml
  3. $HOME/.aviso_server/config.yaml
  4. Environment variables (highest precedence)

If AVISOSERVER_CONFIG_FILE is set, only that single file is loaded (steps 1–3 are skipped). Environment variables still override values from the file.

Environment variable format

Prefix: AVISOSERVER_ Nested separator: __

AVISOSERVER_APPLICATION__HOST=0.0.0.0
AVISOSERVER_APPLICATION__PORT=8000
AVISOSERVER_NOTIFICATION_BACKEND__KIND=jetstream
AVISOSERVER_NOTIFICATION_BACKEND__JETSTREAM__NATS_URL=nats://localhost:4222

Config File Structure

The top-level sections are:

SectionPurpose
applicationServer host, port, static files path
loggingLog level and format
authAuthentication mode, secrets, admin roles
notification_backendBackend selection and backend-specific settings
notification_schemaPer-event-type validation, topic ordering, storage policy
metricsOptional Prometheus metrics server
watch_endpointSSE heartbeat, connection limits, replay batch settings

notification_backend.kind selects the storage implementation:

  • jetstream — production backend (NATS JetStream)
  • in_memory — development backend (process-local, no persistence)

Backend Details


For full field-level documentation of every config option, see Configuration Reference.

Configuration Reference

This page documents runtime-relevant configuration fields and defaults.

Topic Wire Format

  • Topic wire subjects always use . as separator.
  • Per-schema topic.separator is no longer used.
  • Token values are percent-encoded for reserved chars (., *, >, %) before writing to backend subjects.

See Topic Encoding for rules and examples.

application

FieldTypeDefaultNotes
hoststringnoneBind address.
portu16noneBind port.
base_urlstringhttp://localhostUsed in generated CloudEvent source links.
static_files_pathstring/app/staticStatic asset root for homepage assets.

logging

FieldTypeDefaultNotes
levelstringimplementation defaultExample: info, debug, warn, error.
formatstringimplementation defaultKept for compatibility; output is OTel-aligned JSON.

auth

Authentication is optional. When disabled (default), all API endpoints are publicly accessible only if schemas do not define stream auth rules. Startup fails if global auth is disabled while a schema sets auth.required=true or non-empty auth.read_roles/auth.write_roles.

When enabled:

  • Admin endpoints always require a valid JWT and an admin role.
  • Stream endpoints (notify, watch, replay) enforce authentication only when the target schema has auth.required: true.
  • Schema endpoints (/api/v1/schema) are always public.
  • In trusted_proxy mode, Aviso validates Authorization: Bearer <jwt> locally with jwt_secret.
FieldTypeDefaultNotes
enabledboolfalseSet to true to enable authentication.
mode"direct"|"trusted_proxy""direct"direct: forward credentials to auth-o-tron. trusted_proxy: validate forwarded JWT locally.
auth_o_tron_urlstring""auth-o-tron base URL. Required when enabled=true and mode=direct.
jwt_secretstring""Shared HMAC secret for JWT validation. Required when enabled=true. Not exposed via /api/v1/schema endpoints and redacted when auth settings are serialized or logged.
admin_rolesmap<string, string[]>{}Realm-scoped roles for admin endpoints (/api/v1/admin/*). Must contain at least one realm with non-empty roles when enabled=true.
timeout_msu645000Timeout for auth-o-tron requests (milliseconds). Must be > 0.

Per-stream auth (notification_schema.<event_type>.auth)

FieldTypeDefaultNotes
requiredboolMust be explicitly set whenever an auth block is present. When true, the stream requires authentication.
read_rolesmap<string, string[]>Realm-scoped roles for read access (watch/replay). When omitted, any authenticated user can read. Use ["*"] as the role list to grant realm-wide access.
write_rolesmap<string, string[]>Realm-scoped roles for write access (notify). When omitted, only users matching global admin_roles can write. Use ["*"] as the role list to grant realm-wide access.

See Authentication for detailed setup, client usage, and error responses.

metrics

Optional Prometheus metrics endpoint. When enabled, a separate HTTP server serves /metrics on an internal port for scraping by Prometheus/ServiceMonitor. This keeps metrics isolated from the public API.

FieldTypeDefaultNotes
enabledboolfalseEnable the metrics endpoint.
hoststring"127.0.0.1"Bind address for the metrics server. Defaults to loopback to avoid public exposure.
portu16noneRequired when enabled=true. Must differ from application.port.

Exposed metrics:

MetricTypeLabelsDescription
aviso_notifications_totalcounterevent_type, statusTotal notification requests.
aviso_sse_connections_activegaugeendpoint, event_typeCurrently active SSE connections.
aviso_sse_connections_totalcounterendpoint, event_typeTotal SSE connections opened.
aviso_sse_unique_users_activegaugeendpointDistinct users with active SSE connections.
aviso_auth_requests_totalcountermode, outcomeAuthentication attempts.

Process-level metrics (CPU, memory, open FDs) are automatically collected on Linux.

notification_backend

FieldTypeDefaultNotes
kindstringnonejetstream or in_memory.
in_memoryobjectoptionalUsed when kind = in_memory.
jetstreamobjectoptionalUsed when kind = jetstream.

notification_backend.in_memory

FieldTypeDefaultNotes
max_history_per_topicusize1Retained messages per topic in memory.
max_topicsusize10000Max tracked topics before LRU-style eviction.
enable_metricsboolfalseEnables extra internal metrics logs.

See InMemory Backend for operational caveats.

notification_backend.jetstream

FieldTypeDefaultRuntime usage summary
nats_urlstringnats://localhost:4222NATS connection URL.
tokenstring?NoneToken auth; NATS_TOKEN env fallback.
timeout_secondsu64?30NATS connection timeout for each startup connect attempt (> 0).
retry_attemptsu32?3Startup connect attempts before backend init fails (> 0).
max_messagesi64?NoneStream message cap.
max_bytesi64?NoneStream size cap in bytes.
retention_timestring?NoneDefault stream max age (s, m, h, d, w; for example 30d).
storage_typestring?filefile or memory (parsed as typed enum at config load).
replicasusize?NoneStream replicas.
retention_policystring?limitslimits/interest/workqueue (parsed as typed enum at config load).
discard_policystring?oldold/new (parsed as typed enum at config load).
enable_auto_reconnectbool?trueEnables/disables NATS client reconnect behavior.
max_reconnect_attemptsu32?5Mapped to NATS max_reconnects (0 => unlimited).
reconnect_delay_msu64?2000Reconnect delay and startup connect retry backoff (> 0).
publish_retry_attemptsu32?5Retry attempts for transient publish channel closed failures (> 0).
publish_retry_base_delay_msu64?150Base backoff in milliseconds for publish retries (> 0).

See JetStream Backend for detailed behavior.

notification_schema.<event_type>.payload

Schema-level payload contract for notify requests.

FieldTypeExampleNotes
requiredbooltrueWhen true, /notification rejects requests without payload.

Behavior details and edge cases are documented in Payload Contract.

notification_schema.<event_type>.storage_policy

Optional per-schema storage settings validated at startup against selected backend capabilities.

FieldTypeExampleNotes
retention_timestring7d, 12h, 30mDuration literal (s, m, h, d, w).
max_messagesinteger100000Must be > 0.
max_sizestring512Mi, 2GSize literal (K, Ki, M, Mi, G, Gi, T, Ti).
allow_duplicatesbooltrueBackend support is capability-gated.
compressionbooltrueBackend support is capability-gated.

Field behavior:

  • retention_time overrides backend-level retention for the schema stream.
  • max_messages overrides backend-level message cap for the schema stream.
  • max_size overrides backend-level byte cap for the schema stream.
  • allow_duplicates = false maps to one message per subject (latest kept); true removes this cap.
  • compression = true enables stream compression when backend supports it.

Startup behavior:

  • Invalid retention_time/max_size format fails startup.
  • Unsupported fields for selected backend fail startup.
  • Validation happens before backend initialization.
  • With in_memory, all storage_policy fields are currently unsupported (startup fails if provided).

Runtime application behavior:

  • storage_policy is applied on stream create and reconciled for existing JetStream streams when those streams are accessed by Aviso.
  • Aviso-managed stream subject binding is also reconciled to the expected <base>.> pattern.
  • Mutable fields (retention/limits/compression/duplicates/replicas) are updated when drift is detected.
  • Recreate stream(s) only when you need historical data physically rewritten with new settings.

Example:

notification_backend:
  kind: jetstream
  jetstream:
    nats_url: "nats://localhost:4222"
    publish_retry_attempts: 5
    publish_retry_base_delay_ms: 150

notification_schema:
  dissemination:
    topic:
      base: "diss"
      key_order: ["destination", "target", "class", "expver", "domain", "date", "time", "stream", "step"]
    storage_policy:
      retention_time: "7d"
      max_messages: 2000000
      max_size: "10Gi"
      allow_duplicates: true
      compression: true

watch_endpoint

FieldTypeDefaultNotes
sse_heartbeat_interval_secu6430SSE heartbeat period.
connection_max_duration_secu643600Maximum live watch duration.
replay_batch_sizeusize100Historical fetch batch size.
max_historical_notificationsusize10000Replay cap for historical delivery.
replay_batch_delay_msu64100Delay between historical replay batches.
concurrent_notification_processingusize15Live stream CloudEvent conversion concurrency.

Custom config file path

Set AVISOSERVER_CONFIG_FILE to use a specific config file instead of the default search cascade:

AVISOSERVER_CONFIG_FILE=/path/to/config.yaml cargo run

When set, only this file is loaded as a file source (startup fails if it does not exist). The default locations (./configuration/config.yaml, /etc/aviso_server/config.yaml, $HOME/.aviso_server/config.yaml) are skipped. AVISOSERVER_* field-level overrides still apply on top.

Environment override examples

AVISOSERVER_APPLICATION__HOST=0.0.0.0
AVISOSERVER_APPLICATION__PORT=8000
AVISOSERVER_NOTIFICATION_BACKEND__KIND=jetstream
AVISOSERVER_NOTIFICATION_BACKEND__JETSTREAM__NATS_URL=nats://localhost:4222
AVISOSERVER_NOTIFICATION_BACKEND__JETSTREAM__TOKEN=secret
AVISOSERVER_WATCH_ENDPOINT__REPLAY_BATCH_SIZE=200
AVISOSERVER_AUTH__ENABLED=true
AVISOSERVER_AUTH__JWT_SECRET=secret
AVISOSERVER_METRICS__ENABLED=true
AVISOSERVER_METRICS__PORT=9090

Authentication

Authentication is optional. When enabled, Aviso supports two modes:

  • Direct — Aviso forwards Bearer or Basic credentials to auth-o-tron, which returns a signed JWT.
  • Trusted proxy — an upstream reverse proxy authenticates the user and forwards a signed JWT; Aviso validates it locally.

How It Works

  1. Client sends credentials to Aviso.
  2. Middleware resolves user identity:
    • direct: forwards the Authorization header to auth-o-tron GET /authenticate and receives a JWT back.
    • trusted_proxy: validates the forwarded Authorization: Bearer <jwt> locally using jwt_secret.
  3. Username, realm, and roles are extracted from JWT claims and attached to the request.
  4. Route handlers enforce per-stream auth rules on notify, watch, and replay.
  5. Admin endpoints (/api/v1/admin/*) always require a valid JWT with an admin role.

Schema endpoints (GET /api/v1/schema, GET /api/v1/schema/{event_type}) are always publicly accessible, even when auth is enabled.

Quick Start (Direct Mode)

1. Start auth-o-tron

# Foreground (Ctrl+C to stop):
./scripts/auth-o-tron-docker.sh start

# Background:
./scripts/auth-o-tron-docker.sh start --detach

By default this uses scripts/example_auth_config.yaml. To use your own config:

AUTH_O_TRON_CONFIG_FILE=/path/to/auth-config.yaml ./scripts/auth-o-tron-docker.sh start

The bundled example config defines three local test users in realm localrealm:

UserPasswordRole
admin-useradmin-passadmin
reader-userreader-passreader
producer-userproducer-passproducer

2. Enable auth in config

auth:
  enabled: true
  mode: direct
  auth_o_tron_url: "http://localhost:8080"
  jwt_secret: "your-shared-secret"   # must match auth-o-tron jwt.secret
  admin_roles:
    localrealm: ["admin"]
  timeout_ms: 5000

Roles are realm-scoped: admin_roles maps each realm name to its authorized role list. A user must belong to a listed realm and hold one of that realm's roles.

3. Run aviso-server

Auth is now enforced for:

  • Admin endpoints (/api/v1/admin/*) — always require auth + admin role.
  • Stream endpoints (/api/v1/notification, /api/v1/watch, /api/v1/replay) — only when the target schema sets auth.required: true.

For full field-level documentation, see the auth section in Configuration Reference.

Trusted Proxy Mode

Use trusted_proxy when Aviso sits behind a reverse proxy or API gateway that handles authentication. The proxy authenticates the user (via OIDC, SAML, etc.) and forwards a signed JWT to Aviso.

Aviso validates the forwarded Authorization: Bearer <jwt> locally using jwt_secret. Username and roles are read directly from JWT claims — no outbound call to auth-o-tron is made.

auth:
  enabled: true
  mode: trusted_proxy
  jwt_secret: "shared-signing-secret"
  admin_roles:
    ecmwf: ["admin"]

auth_o_tron_url is not required in this mode.

Per-Stream Authentication

Streams support separate read and write access controls. Read access governs /watch and /replay; write access governs /notification.

Configure authentication per stream in your notification schema:

notification_schema:
  # Public — no auth section means anonymous access
  public_events:
    payload:
      required: true
    topic:
      base: "public"

  # Authenticated — any valid user can read, only admins can write
  internal_events:
    payload:
      required: true
    topic:
      base: "internal"
    auth:
      required: true

  # Separate read/write roles
  sensor_data:
    payload:
      required: true
    topic:
      base: "sensor"
    auth:
      required: true
      read_roles:
        internal: ["analyst", "consumer"]
        external: ["partner"]
      write_roles:
        internal: ["producer"]

  # Realm-wide read access using wildcard, restricted write
  shared_events:
    payload:
      required: true
    topic:
      base: "shared"
    auth:
      required: true
      read_roles:
        internal: ["*"]
        external: ["analyst"]
      write_roles:
        internal: ["producer", "operator"]

Read vs. write access defaults

auth.requiredread_roleswrite_rolesRead (watch/replay)Write (notify)
false or omittedAnyoneAnyone
trueomittedomittedAny authenticated userAdmins only
truesetomittedMust match read_rolesAdmins only
trueomittedsetAny authenticated userMust match write_roles or be admin
truesetsetMust match read_rolesMust match write_roles or be admin

Admins (users matching global admin_roles) always have both read and write access.

Role matching rules

Both read_roles and write_roles map realm names to role lists. A user's realm claim from the JWT must match a key in the map, and the user must hold at least one of that realm's listed roles.

  • Wildcard "*" — use ["*"] as the role list to grant access to all users from a realm, regardless of their specific roles.
  • Omitted role list — when read_roles is omitted, any authenticated user can read. When write_roles is omitted, only admins can write.

When a per-stream auth block is present, auth.required must be explicitly set to either true or false.

Admin Endpoints

Admin endpoints always require authentication and one of the configured admin_roles, regardless of per-stream settings:

  • DELETE /api/v1/admin/notification/{id}
  • DELETE /api/v1/admin/wipe/stream
  • DELETE /api/v1/admin/wipe/all

See Admin Operations for request/response details.

Disabling Authentication

auth:
  enabled: false

Or omit the auth section entirely. When auth is disabled, all endpoints are publicly accessible.

Startup fails if global auth is disabled while any schema defines auth.required: true or non-empty auth.read_roles/auth.write_roles. Remove stream-level auth blocks before disabling global auth.

Client Usage

Bearer token (both modes)

# Watch an authenticated stream:
curl -N -H "Authorization: Bearer <jwt-token>" \
  -X POST http://localhost:8000/api/v1/watch \
  -H "Content-Type: application/json" \
  -d '{"event_type": "private_events", "identifier": {}}'

Basic credentials (direct mode only)

# Notify with Basic auth:
curl -X POST http://localhost:8000/api/v1/notification \
  -u "admin-user:admin-pass" \
  -H "Content-Type: application/json" \
  -d '{"event_type": "ops_events", "identifier": {"event_type": "deploy"}, "payload": "ok"}'

In direct mode, Aviso forwards Basic credentials to auth-o-tron, which authenticates the user and returns a JWT. The response JWT is validated and used for authorization.

Error Responses

Auth errors use a subset of the standard API error shape with three fields (code, error, message; no details):

{
  "code": "UNAUTHORIZED",
  "error": "unauthorized",
  "message": "Authorization header is required"
}
CodeHTTP StatusWhen
UNAUTHORIZED401Missing Authorization header, invalid token format, expired or bad signature.
FORBIDDEN403Valid credentials but user lacks the required role for the stream or admin endpoint.
SERVICE_UNAVAILABLE503auth-o-tron is unreachable or returned an unexpected error (direct mode only).

A 401 response includes a WWW-Authenticate header indicating the supported scheme (Bearer in trusted-proxy mode; Bearer, Basic in direct mode).

Streaming Semantics

This page defines the exact behavior of the watch and replay endpoints, including start points, spatial filtering, identifier constraints, and SSE lifecycle events.


SSE Stream Lifecycle

Every streaming response (watch or replay) goes through a typed lifecycle:

stateDiagram-v2
    [*] --> Connected : SSE connection established
    Connected --> Replaying : from_id or from_date provided
    Connected --> Live : no replay parameters (watch only)
    Replaying --> Live : replay_completed (watch only)
    Replaying --> Closed : end_of_stream (replay only)
    Live --> Closed : max_duration_reached
    Live --> Closed : server_shutdown
    Closed --> [*]

Close reasons emitted in the final connection_closing SSE event:

ReasonTrigger
end_of_streamReplay finished (/replay endpoint, or watch replay phase if live subscribe fails)
max_duration_reachedconnection_max_duration_sec elapsed on a watch stream
server_shutdownServer is shutting down gracefully

POST /api/v1/watch

  • If both from_id and from_date are omitted:
    • stream is live-only (new notifications from now onward).
  • If exactly one replay parameter is present:
    • historical replay starts first, then transitions to live stream.
  • If both are present:
    • request is rejected with 400.
flowchart TD
    A["Watch Request"] --> B{"from_id or<br/>from_date?"}
    B -->|neither| C["Live-only stream"]
    B -->|exactly one| D["Historical replay<br/>then live stream"]
    B -->|both| E["400 Bad Request"]

    style E fill:#8b1a1a,color:#fff
    style C fill:#1a6b3a,color:#fff
    style D fill:#1a4d6b,color:#fff

POST /api/v1/replay

  • Requires exactly one replay start parameter:
    • from_id (sequence-based), or
    • from_date (time-based).
  • If both are missing or both are present:
    • request is rejected with 400.
  • Stream closes with end_of_stream when history is exhausted.

Start Point for Historical Events

from_id starts delivery from that sequence number (inclusive). from_date accepts any of these formats:

FormatExample
RFC3339 with timezone2025-01-15T10:00:00Z
RFC3339 with offset2025-01-15T10:00:00+02:00
Space-separated with timezone2025-01-15 10:00:00+00:00
Naive datetime (interpreted as UTC)2025-01-15T10:00:00
Unix seconds (≤ 11 digits)1740509903
Unix milliseconds (≥ 12 digits)1740509903710

All inputs are normalized to UTC internally.


Spatial Filter Model

Spatial filtering applies on top of the identifier field filters. Think of it in two layers:

  • Non-spatial identifier fields (time, date, class, etc.) narrow candidates by topic routing.
  • Spatial fields (polygon or point) further narrow that candidate set geographically.
flowchart LR
    A["Candidate<br/>notifications"] -->|"non-spatial<br/>identifier filter"| B["Topic-matched<br/>subset"]
    B -->|"spatial filter<br/>if provided"| C["Final<br/>results"]

Rules

identifier.polygonidentifier.pointResult
providedomittedpolygon-intersects-polygon filter
omittedprovidedpoint-inside-notification-polygon filter
omittedomittedno spatial filter
providedprovided400 Bad Request
  • identifier.polygon: keep notifications whose stored polygon intersects the request polygon.
  • identifier.point: keep notifications whose stored polygon contains the request point.
  • Both together: invalid — request is rejected.

Identifier Constraints (watch / replay)

For schema-backed event types, identifier fields in watch/replay requests accept constraint objects instead of (or in addition to) scalar values. A scalar value is treated as an implicit eq constraint.

Constraint objects are rejected on /notification — notify only accepts scalar values.

Supported operators by field type

HandlerOperators
IntHandlereq, in, gt, gte, lt, lte, between
FloatHandlereq, in, gt, gte, lt, lte, between
EnumHandlereq, in

Notes

  • between expects exactly two values [min, max] and is inclusive on both ends.
  • Float constraints reject NaN and inf — only finite values are valid.
  • Float eq and in use exact numeric equality; no tolerance window is applied.
  • A constraint object must contain exactly one operator — combining operators in a single object is rejected.

Examples

{ "severity": { "gte": 5 } }
{ "severity": { "between": [3, 7] } }
{ "region":   { "in": ["north", "south"] } }
{ "anomaly":  { "lt": 50.0 } }

Backend Behavior

BackendHistorical replayLive watch
in_memoryNode-local only; clears on restartNode-local fan-out
jetstreamDurable; survives restartsCluster-wide fan-out

SSE Timestamp Format

All control, heartbeat, and close event timestamps use canonical UTC second precision:

YYYY-MM-DDTHH:MM:SSZ

Example: 2026-02-25T18:58:23Z


Replay Payload Shape

  • Replay and watch CloudEvent output always includes data.payload.
  • If a notify request omitted payload (optional schema), replay returns data.payload = null.
  • Payload values are not reshaped — scalar strings remain strings, objects remain objects.

See Payload Contract for the full input → storage → output mapping.


For end-to-end examples, see:

Payload Contract

This page defines the canonical payload behavior for Aviso notifications.

Scope

  • Applies to POST /api/v1/notification input.
  • Applies to stored backend payload representation.
  • Applies to replay/watch CloudEvent output payload field.

Schema Configuration

Per event schema, payload configuration is:

payload:
  required: true # or false

There is no payload.type list.

Canonical Rules

  1. Payload values are JSON values: object, array, string, number, boolean, null.
  2. If payload.required = true and request omits payload, request is rejected (400).
  3. If payload.required = false and request omits payload, Aviso stores canonical JSON null.
  4. Aviso does not wrap or reshape payload values (for example no auto-wrapping into {"data": ...}).

Input to Storage to Replay Mapping

Notify request payloadStored payloadReplay/Watch CloudEvent data.payload
omitted (optional schema)nullnull
"forecast complete""forecast complete""forecast complete"
424242
truetruetrue
["a","b"]["a","b"]["a","b"]
{"note":"ok"}{"note":"ok"}{"note":"ok"}

Failure Cases

  • Missing required payload:
    • HTTP 400
    • validation error (INVALID_NOTIFICATION_REQUEST)
  • Malformed JSON request body:
    • HTTP 400
    • parse error (INVALID_JSON)

Consumer Guidance

  • Treat data.payload as dynamic JSON.
  • If your client requires object-only payloads, normalize on the client side.
    • Example strategy: for non-object payloads, convert to {"data": <payload>} in your consumer.

Topic Encoding

Aviso uses a single backend-agnostic wire format for topics across all backends.


Why This Exists

NATS subject tokenization uses . as the separator. Wildcards (*, >) also operate on dot-delimited tokens. If topic field values contain any of these reserved characters, they would silently break routing and filtering.

Example of the problem:

Logical value:  1.45
Naive subject:  mars.od.1.45      ← looks like 4 tokens, not 3

To prevent this, Aviso percent-encodes each token value before assembling the wire subject.


Encoding Rules

Only four characters are reserved and must be encoded:

CharacterEncoded formReason
.%2ENATS token separator
*%2ANATS single-token wildcard
>%3ENATS multi-token wildcard
%%25Escape character itself (keeps decoding unambiguous)

All other characters pass through unchanged.


Encode → Wire → Decode Flow

flowchart LR
    A["Logical value<br/>e.g. 1.45"] -->|encode| B["Wire token<br/>e.g. 1%2E45"]
    B -->|assemble| C["Wire subject<br/>e.g. extreme_event.north.1%2E45"]
    C -->|"split on '.'"| D["Wire tokens"]
    D -->|decode each| E["Logical tokens<br/>e.g. 1.45"]

    style A fill:#2a4a6b,color:#fff
    style C fill:#1a6b3a,color:#fff
    style E fill:#2a4a6b,color:#fff

The decoder is strict: malformed %HH sequences (e.g. %GG) are rejected, not passed through.


Examples

Encoding

Logical valueWire token
1.451%2E45
1*341%2A34
1>01%3E0
1%251%2525

Decoding (single pass)

Wire tokenLogical value
1%2E451.45
1%2A341*34
1%25251%25
1%251%

Impact on Wildcard Matching

Watch and replay requests use a two-step filter:

  1. Backend coarse filter — operates on wire subjects (NATS wildcard matching)
  2. App-level wildcard match — operates on decoded logical tokens

Both steps are safe with reserved characters because the app layer always decodes before matching. Subscribers never need to think about encoding in their filter values — Aviso handles it transparently.


Invariants

  • Wire subject separator is always .
  • One shared codec is used for all backends (JetStream and In-Memory)
  • Encoding is applied per-token, not per-subject
  • Decoding is a strict single-pass operation

API Errors

Aviso returns a consistent JSON error object for 4xx and 5xx responses.

Response Shape

{
  "code": "INVALID_REPLAY_REQUEST",
  "error": "Invalid Replay Request",
  "message": "Replay endpoint requires either from_id or from_date parameter...",
  "details": "Replay endpoint requires either from_id or from_date parameter..."
}

Fields:

  • code: stable machine-readable error code.
  • error: human-readable error category.
  • message: top-level failure message (safe for client display).
  • details: deepest/root detail available.

Notes:

  • error_chain is logged server-side for diagnostics, but is not returned in API responses.
  • message and details are always present for both 4xx and 5xx error responses.

Error Telemetry Events

These event_name values are emitted in structured logs:

Event NameLevelTrigger
api.request.parse.failedwarnJSON parse/shape/unknown-field failure before domain validation.
api.request.validation.failedwarnDomain/request validation failure (400).
api.request.processing.failederrorServer-side processing/storage failure (500).
stream.sse.initialization.failederrorReplay/watch SSE initialization failure (500).

For SSE setup failures, response also includes:

  • topic: decoded logical topic.
  • request_id: request correlation id.

Error Code Reference

CodeHTTP StatusMeaning
INVALID_JSON400Request body is not valid JSON.
UNKNOWN_FIELD400Request contains fields outside API contract.
INVALID_REQUEST_SHAPE400JSON structure cannot be deserialized into request model.
INVALID_NOTIFICATION_REQUEST400Notification request failed business validation.
INVALID_WATCH_REQUEST400Watch request failed validation (replay/spatial/schema rules).
INVALID_REPLAY_REQUEST400Replay request failed validation (start cursor/spatial/schema rules).
UNAUTHORIZED401Missing or invalid credentials (no token, bad format, expired, bad signature).
FORBIDDEN403Valid credentials but user lacks the required role.
NOTIFICATION_PROCESSING_FAILED500Notification processing pipeline failed before storage.
NOTIFICATION_STORAGE_FAILED500Backend write operation failed.
SSE_STREAM_INITIALIZATION_FAILED500Replay/watch SSE stream could not be created.
INTERNAL_ERROR500Fallback internal error code (reserved).
SERVICE_UNAVAILABLE503Auth service (auth-o-tron) unreachable or returned an unexpected error.

Examples

Invalid replay request:

{
  "code": "INVALID_REPLAY_REQUEST",
  "error": "Invalid Replay Request",
  "message": "Cannot specify both from_id and from_date...",
  "details": "Cannot specify both from_id and from_date..."
}

Auth error (missing credentials on a protected stream). Auth errors use three fields (code, error, message); details is not included:

{
  "code": "UNAUTHORIZED",
  "error": "unauthorized",
  "message": "Authentication is required for this stream"
}

SSE initialization failure:

{
  "code": "SSE_STREAM_INITIALIZATION_FAILED",
  "error": "SSE stream creation failed",
  "message": "Failed to create stream consumer",
  "details": "nats connect failed: timeout",
  "topic": "test_polygon.*.1200",
  "request_id": "0d4f6758-1ce3-4dda-a0f3-0ccf5fcb50d6"
}

Admin Operations

Admin endpoints are destructive. Restrict access in production.

When authentication is enabled, all admin endpoints require a valid credential and one of the configured admin_roles. Add -H "Authorization: Bearer <token>" or -u user:pass (direct mode) to the curl examples below.

Delete One Notification

DELETE /api/v1/admin/notification/{notification_id}

notification_id format:

  • <stream>@<sequence> (canonical)
  • <event_type>@<sequence> (alias; resolved through configured schema topic.base)

How notification_id maps to your schema

Delete IDs use the stream key plus backend sequence number.

If your schema contains:

notification_schema:
  mars:
    topic:
      base: "mars"
  dissemination:
    topic:
      base: "diss"
  test_polygon:
    topic:
      base: "polygon"

Then valid delete IDs include:

  • mars@42 (event type alias and stream key are same)
  • dissemination@42 (alias form, resolved to stream key diss)
  • diss@42 (canonical stream key form)
  • test_polygon@306 (alias form, resolved to stream key polygon)
  • polygon@306 (canonical stream key form)

Example: replay ID then delete

Replay returns CloudEvent IDs like mars@1:

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type": "mars",
    "identifier": {
      "class": "od",
      "expver": "0001",
      "domain": "g",
      "date": "20250706",
      "time": "1200",
      "stream": "enfo",
      "step": "1"
    },
    "from_id": "1"
  }'

If one replayed event has "id":"mars@1", delete it with:

curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/notification/mars@1"

Response behavior

  • 200: notification deleted.
  • 404: stream/sequence pair not found.
  • 400: invalid ID format (<name>@<positive-integer> required).

Invalid examples:

  • mars (missing @sequence)
  • mars@0 (sequence must be > 0)
  • mars@abc (sequence must be an integer)

Wipe Endpoints

  • DELETE /api/v1/admin/wipe/stream
  • DELETE /api/v1/admin/wipe/all

These endpoints remove many messages at once and should be used with extreme caution.

Wipe One Stream

DELETE /api/v1/admin/wipe/stream

Request body:

{
  "stream_name": "MARS"
}

Example:

curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/wipe/stream" \
  -H "Content-Type: application/json" \
  -d '{"stream_name":"MARS"}'

What it does:

  • Removes all stored messages for the selected stream.
  • Keeps the stream definition/configuration in place.
  • New notifications can still be written to that stream immediately after wipe.

When to use:

  • You want to reset one event family (mars, diss, polygon) without affecting others.

Wipe All Streams

DELETE /api/v1/admin/wipe/all

Example:

curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/wipe/all"

What it does:

  • Removes all stored messages from all streams managed by the backend.
  • Leaves service configuration intact, but data history is gone.

When to use:

  • Local/dev reset before a fresh test run.
  • Operational emergency cleanup where full history removal is intended.

Which Admin Operation Should I Use?

  • Delete one notification (/admin/notification/{id}):
    • Use when you know the exact sequence to remove.
  • Wipe one stream (/admin/wipe/stream):
    • Use when one stream is polluted and others must remain untouched.
  • Wipe all (/admin/wipe/all):
    • Use only when complete history reset is intended.

Wipe Response Shape

Both wipe endpoints return:

{
  "success": true,
  "message": "..."
}

Failure responses keep the same shape with success: false and an error message.

Architecture

Aviso Server is built around three operations — Notify, Watch, and Replay — each sharing a common validation and schema layer but diverging at the backend interaction.


System Overview

graph TB
    subgraph Clients
        P(Publisher)
        W(Watcher)
        R(Replayer)
    end

    subgraph "Aviso Server"
        direction TB
        AM["Auth Middleware<br/>(optional)"]
        RT["Routes<br/>HTTP handlers"]
        VP["Validation &<br/>Processing"]
        NC["Notification<br/>Core"]
        BE["Backend<br/>Abstraction"]
    end

    AOT["auth-o-tron<br/>(external)"]

    subgraph Backend
        JS[("JetStream<br/>NATS")]
        IM[("In-Memory<br/>Process")]
    end

    P -->|POST /api/v1/notification| AM
    W -->|POST /api/v1/watch| AM
    R -->|POST /api/v1/replay| AM

    AM -.->|verify credentials| AOT
    AM --> RT
    RT --> VP
    VP --> NC
    NC --> BE
    BE --> JS
    BE --> IM

    JS -.->|SSE stream| W
    JS -.->|SSE stream| R
    IM -.->|SSE stream| W
    IM -.->|SSE stream| R

Notify Request Flow

When a publisher sends POST /api/v1/notification:

sequenceDiagram
    participant C as Publisher
    participant A as Auth Middleware
    participant R as Route Handler
    participant V as Validator
    participant P as Processor
    participant T as Topic Builder
    participant B as Backend

    C->>A: POST /api/v1/notification (JSON)
    alt stream requires auth
        A->>A: resolve user (JWT or auth-o-tron)
        A-->>C: 401/403 if unauthorized
    end
    A->>R: forward request (+ user identity)
    R->>V: parse & shape-check JSON
    V-->>R: 400 if malformed
    R->>P: process_notification_request()
    P->>P: look up event schema
    P->>P: validate each identifier field
    P->>P: canonicalize values (dates, enums)
    P->>T: build_topic_with_schema()
    T-->>P: topic string (e.g. mars.od.0001.g.20250706.1200)
    P->>B: put_message_with_headers()
    B-->>C: 200 { id, topic }

Key steps:

  1. Parse — raw JSON bytes are deserialized; unknown fields are rejected (UNKNOWN_FIELD)
  2. Validate — each identifier field is checked against its ValidationRules (type, range, enum values)
  3. Canonicalize — values are normalized (e.g. dates to YYYYMMDD, enums to lowercase)
  4. Build topic — fields are ordered per key_order, reserved chars are percent-encoded
  5. Store — the message is written to the backend with the encoded topic as the subject

Watch Request Flow

POST /api/v1/watch opens a persistent SSE stream. It optionally starts with a historical replay phase before transitioning to live delivery.

sequenceDiagram
    participant C as Subscriber
    participant A as Auth Middleware
    participant R as Route Handler
    participant P as Stream Processor
    participant F as Hybrid Filter
    participant B as Backend

    C->>A: POST /api/v1/watch (JSON)
    alt stream requires auth
        A->>A: resolve user (JWT or auth-o-tron)
        A-->>C: 401/403 if unauthorized
    end
    A->>R: forward request (+ user identity)
    R->>P: process_request (ValidationConfig::for_watch)
    P->>P: allow optional fields & constraint objects
    P->>P: analyze_watch_pattern() → coarse + precise patterns

    alt has from_id or from_date
        P->>B: fetch historical batch
        B-->>P: NotificationMessage[]
        P->>F: apply wildcard + constraint + spatial filter
        F-->>C: SSE: replay_started → events → replay_completed
    end

    P->>B: subscribe(coarse_pattern)
    loop live stream
        B-->>P: live NotificationMessage
        P->>F: apply precise filter
        F-->>C: SSE: notification event
    end

    C-->>R: disconnect / timeout
    R-->>C: SSE: connection_closing

Replay Request Flow

POST /api/v1/replay is like watch but historical-only — the stream closes when history ends.

sequenceDiagram
    participant C as Client
    participant A as Auth Middleware
    participant R as Route Handler
    participant P as Stream Processor
    participant B as Backend

    C->>A: POST /api/v1/replay (JSON + from_id or from_date)
    alt stream requires auth
        A->>A: resolve user (JWT or auth-o-tron)
        A-->>C: 401/403 if unauthorized
    end
    A->>R: forward request (+ user identity)
    R->>P: process_request (ValidationConfig::for_replay)
    P->>B: batch fetch from StartAt::Sequence or StartAt::Date
    loop batches
        B-->>P: NotificationMessage[]
        P->>P: filter + convert to CloudEvent
        P-->>C: SSE: notification events
    end
    P-->>C: SSE: replay_completed → connection_closing (end_of_stream)

SSE Streaming Pipeline

The streaming layer (src/sse/) is built around typed values rather than raw strings, which keeps the lifecycle explicit and the endpoint logic thin.

Cursor types — how a start point is represented internally:

  • StartAt::LiveOnly — no history, subscribe immediately
  • StartAt::Sequence(u64) — start from a specific backend sequence number
  • StartAt::Date(DateTime<Utc>) — start from a UTC timestamp

Frame types — what the stream produces before rendering to SSE wire format:

  • Control frames — connection_established, replay_started, replay_completed
  • Notification frames — a decoded CloudEvent ready for delivery
  • Heartbeat frames — periodic keep-alive
  • Error frames — non-fatal stream errors
  • Close frame — carries one of: end_of_stream, max_duration_reached, server_shutdown

Lifecycle (shutdown token, max duration, natural end) is applied once in a shared wrapper, so individual endpoint handlers don't need to reimplement it.


Component Map

ComponentPathRole
Routessrc/routes/Thin HTTP handlers — parse request, delegate, return response
Authsrc/auth/Middleware, JWT validation, role matching, auth-o-tron client
Handlerssrc/handlers/Shared parsing, validation, and processing logic
Notification coresrc/notification/Schema registry, topic builder/codec/parser, wildcard matcher, spatial
Backend abstractionsrc/notification_backend/NotificationBackend trait + JetStream and InMemory implementations
SSE layersrc/sse/Stream composition, typed frames, heartbeats, lifecycle
CloudEventssrc/cloudevents/Converts stored messages into CloudEvent envelope
Configurationsrc/configuration/Config loading, schema validation, global snapshot
Error modelsrc/error.rsStable HTTP error codes and structured responses

Hybrid Filtering

Watch subscriptions use a two-tier strategy to balance backend load with filter precision:

graph LR
    A[Watch Request] --> B[analyze_watch_pattern]
    B --> C["Coarse pattern<br/>e.g. mars.*.*.*"]
    B --> D["Precise pattern<br/>full decoded topic"]
    C -->|backend subscription| E[(NATS JetStream)]
    E -->|candidate messages| F[App-level filter]
    D --> F
    F -->|matched| G[SSE client]
    F -->|rejected| H[dropped]
  • The coarse pattern is sent to the backend as the subscription subject filter. It uses NATS wildcards and covers a superset of the desired messages.
  • The precise pattern is applied in-process on decoded topics + constraint objects + spatial checks. Only messages that pass both layers reach the client.

This avoids creating one backend subscription per unique topic while still delivering exact results.


JetStream Backend Internals

ModulePathResponsibility
Confignotification_backend/jetstream/config.rsDecode and validate JetStream settings
Connectionnotification_backend/jetstream/connection.rsNATS connect with retry
Streamsnotification_backend/jetstream/streams.rsCreate and reconcile streams
Publishernotification_backend/jetstream/publisher.rsPublish with retry on transient failures
Subscribernotification_backend/jetstream/subscriber.rsConsumer-based live subscriptions
Replaynotification_backend/jetstream/replay.rsPull consumer batch retrieval
Adminnotification_backend/jetstream/admin.rsWipe and delete operations

Backend Development Guide

This guide explains how to add a new notification backend in Aviso.

Required Contract

A backend must implement NotificationBackend in src/notification_backend/mod.rs.

Core requirements:

  • Implement publish/replay/subscribe/admin methods.
  • Implement capabilities() and return a stable BackendCapabilities map.
  • Keep startup/shutdown behavior explicit and logged.

Storage Policy Compatibility

Per-schema storage policy is validated at startup before backend initialization.

Validation entry point:

  • configuration::validate_schema_storage_policy_support(...)

Capability source:

  • notification_backend::capabilities_for_backend_kind(...)

If a schema requests unsupported fields, startup fails fast with a clear error. Do not silently ignore unsupported storage policy fields.

Capability Checklist

When adding backend <new_backend>:

  1. Add backend kind support in capabilities_for_backend_kind.
  2. Add NotificationBackend::capabilities() implementation.
  3. Ensure capability values match real backend behavior.
  4. Add tests for:
    • capability map values
    • accepted storage-policy fields
    • rejected storage-policy fields
  5. Add backend docs page and update summary links.

Minimal Capability Example

#![allow(unused)]
fn main() {
BackendCapabilities {
    retention_time: true,
    max_messages: true,
    max_size: false,
    allow_duplicates: false,
    compression: false,
}
}

Meaning:

  • retention_time and max_messages can be used in schema storage policy.
  • max_size, allow_duplicates, and compression must be rejected at startup.

Testing Expectations

  • Unit tests should verify capability flags are stable.
  • Validation tests should verify fail-fast messages for unsupported fields.
  • Integration tests should use test-local config/schema fixtures, not developer-local YAML files.