Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Aviso Logo Aviso Logo

Introduction

Aviso Server is a real-time notification and historical replay system built for data dissemination pipelines. It lets publishers announce data availability events and lets subscribers receive those events — either live as they happen, or replayed from a chosen point in history.

It is designed for environments where timely, reliable notification of data availability is critical: scientific computing, operational weather forecasting pipelines, large-scale data distribution, and similar domains.


How It Works

At its core, Aviso Server exposes three operations:

graph LR
    P(Publisher) -->|POST /api/v1/notification| A[Aviso Server]
    A -->|store| B[("Backend<br/>JetStream / In-Memory")]
    B -->|fan-out| W1("Subscriber A<br/>watch")
    B -->|fan-out| W2("Subscriber B<br/>watch")
    B -->|history| R("Client<br/>replay")
OperationEndpointDescription
NotifyPOST /api/v1/notificationPublish a notification event to the backend
WatchPOST /api/v1/watchStream live (and optionally historical) events over SSE
ReplayPOST /api/v1/replayStream historical events only, then close

Key Features

Schema-driven validation

Each event type can have a schema that defines which identifier fields are required, their types (date, time, integer, float, enum, polygon), allowed ranges, and topic ordering. Invalid notifications are rejected at the API boundary with a clear error.

Structured topic routing

Aviso builds a deterministic topic string from the notification's identifier fields. This topic is used to route, store, and filter messages in the backend. Subscribers can use wildcard patterns and constraint objects to filter the stream.

Hybrid filtering

Watch and replay requests are matched using a two-tier strategy: the backend handles coarse routing (broad subject filters), and Aviso applies precise application-level filtering (constraints, spatial checks) on top. This keeps backend subscription counts low while delivering exact results.

Spatial awareness

Polygon and point identifiers are first-class — notifications can carry geographic polygons, and subscribers can filter by polygon intersection or point containment.

Pluggable backends

Aviso abstracts storage behind a NotificationBackend trait. Today two backends ship: JetStream (NATS-backed, durable, production-ready) and In-Memory (single-process, for development and testing).

Server-Sent Events (SSE)

Watch and replay streams use SSE — a simple, firewall-friendly HTTP streaming protocol supported natively by browsers and all major HTTP clients. The stream includes typed control frames (connection established, replay started/completed, heartbeats, and graceful close reasons).

CloudEvents format

All delivered notifications follow the CloudEvents specification, making them easy to integrate with other event-driven systems.


Use Cases

  • Data availability monitoring — trigger downstream workflows the moment a dataset lands
  • Operational pipelines — coordinate processing steps across distributed services
  • Audit and compliance — replay historical events to reconstruct what was published and when
  • System integration — connect disparate systems through a standardized event interface

Where to Go Next

If you are new to Aviso, read these pages in order:

  1. Key Concepts — understand the terminology before anything else
  2. Installation — get the server running
  3. Getting Started — send your first notification and watch it arrive
  4. Practical Examples — copy-paste workflows for common scenarios

If you are configuring for production:

Key Concepts

This page defines the core terms you will encounter throughout Aviso's documentation and API. Reading it before Getting Started will make everything else click faster.


Event Type

An event type is a named category of notification — for example extreme_event, sensor_alert, or data_ready.

Every request to Aviso (notify, watch, or replay) targets exactly one event type. The server uses the event type to:

  • look up the matching schema (validation rules, required fields, topic ordering),
  • route messages to the correct storage stream,
  • apply the correct retention and storage policy.
{ "event_type": "extreme_event", ... }

If no schema is configured for an event type, Aviso falls back to generic behavior: fields are accepted as-is and the topic is built from sorted keys.


Identifier

An identifier is a set of key-value pairs that describe what a notification is about. Think of it as structured metadata that uniquely (or approximately) locates a piece of data.

{
  "identifier": {
    "region":   "north",
    "run_time": "1200",
    "severity": "4",
    "anomaly":  "42.5"
  }
}

Identifiers serve two purposes depending on the operation:

OperationRole of identifier
notifyDeclares the metadata of the notification being published
watchActs as a filter — which notifications to receive
replayActs as a filter — which historical notifications to retrieve

In watch and replay, identifier values can be constraint objects instead of scalars (e.g. {"gte": 5}) for numeric and enum fields. See Streaming Semantics.


Topic

A topic is the internal routing key Aviso builds from an identifier. You rarely construct topics manually — Aviso builds them for you.

Topics are dot-separated strings, for example:

extreme_event.north.1200.4.42%2E5

Each token corresponds to one identifier field, in the order defined by key_order in the schema. Note the %2E — the dot in 42.5 is percent-encoded so it doesn't conflict with the topic separator. Reserved characters (., *, >, %) in field values are percent-encoded before writing to the backend so they do not break routing. See Topic Encoding.


Schema

A schema configures how Aviso handles a specific event type. Schemas are defined in configuration/config.yaml under notification_schema.

A schema controls:

  • topic.base — the stream/prefix for this event type (e.g. diss, mars)
  • topic.key_order — the order of identifier fields in the topic string
  • identifier.* — validation rules per field (type, required, allowed values, ranges)
  • payload.required — whether a payload is mandatory on notify
  • storage_policy — per-stream retention, size limits, compression (JetStream only)

Example:

notification_schema:
  extreme_event:
    topic:
      base: "extreme_event"
      key_order: ["region", "run_time", "severity", "anomaly", "polygon"]
    identifier:
      region:
        description: "Geographic region label."
        type: EnumHandler
        values: ["north", "south", "east", "west"]
        required: true
      run_time:
        type: TimeHandler
        required: true
      severity:
        description: "Severity level from 1 to 7."
        type: IntHandler
        range: [1, 7]
        required: true
      anomaly:
        type: FloatHandler
        range: [0.0, 100.0]
        required: false
      polygon:
        type: PolygonHandler
        required: false
    payload:
      required: false

Every key listed in key_order must have a corresponding entry in identifier. For notify, every identifier key declared in the schema must be present in the request. Fields marked required: false are optional for validation semantics, but the key itself is still required on notify so Aviso can build a deterministic topic.


Payload

A payload is arbitrary JSON attached to a notification. Aviso treats it as opaque — it stores and replays the value exactly as sent.

Valid payload types: object, array, string, number, boolean, or null.

{ "payload": { "path": "/data/grib2/file.grib2", "size": 1048576 } }

Whether payload is required or optional is controlled per schema by payload.required. See Payload Contract for the full input → storage → replay mapping.


Operations: Notify, Watch, Replay

Aviso exposes three operations, each on its own endpoint:

Notify — POST /api/v1/notification

Publishes a notification to the backend. The identifier must match all required schema fields exactly (no wildcards, no constraints).

Watch — POST /api/v1/watch

Opens a persistent Server-Sent Events (SSE) stream. Receives live notifications as they arrive, optionally starting from a historical point.

  • Omit from_id/from_date → live-only stream
  • Provide one of them → historical replay first, then live

Replay — POST /api/v1/replay

Opens a finite SSE stream of historical notifications only. Requires exactly one of from_id or from_date. Stream closes automatically when history is exhausted.

For end-to-end working examples of all three operations — including spatial and constraint filtering — see Practical Examples.


CloudEvents

Aviso delivers notifications to watch/replay clients as CloudEvents — a standard envelope format. Each event includes:

  • id — the backend sequence reference (e.g. mars@42), used for targeted delete or resume
  • type — the Aviso event type string
  • source — the server base URL
  • data.identifier — the canonicalized identifier
  • data.payload — the notification payload (or null if omitted)

Backend

The backend is the storage and messaging layer that Aviso delegates to. Two backends are supported:

BackendUse case
jetstreamProduction: durable, replicated, persistent history
in_memoryDevelopment: fast setup, no persistence

The backend is selected via notification_backend.kind in config. See Backends Overview.

Installation

This page covers every way to get Aviso Server running: building from source, using Docker, or deploying to Kubernetes with Helm.


Prerequisites

Rust toolchain

Aviso is written in Rust and requires edition 2024, which means Rust 1.85 or newer. The CI always builds against the latest stable toolchain.

Install or update Rust via rustup:

curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

Verify your version:

rustc --version
# rustc 1.85.0 (... ) or newer

System dependencies

Aviso links against OpenSSL. On Debian/Ubuntu:

sudo apt-get install -y libssl-dev pkg-config build-essential

On Fedora/RHEL:

sudo dnf install -y openssl-devel pkg-config gcc

On macOS (with Homebrew):

brew install openssl pkg-config

Build from Source

Clone the repository:

git clone https://github.com/ecmwf/aviso-server.git
cd aviso-server

Development build

Fast to compile, includes debug symbols:

cargo build

Binary location: target/debug/aviso_server

Release build

Optimized for production use:

cargo build --release

Binary location: target/release/aviso_server

Run directly

cargo run                          # development
cargo run --release                # release
./target/release/aviso_server      # pre-built binary

The server loads ./configuration/config.yaml by default. See Configuration for all config loading options.


Docker

The repository includes a multi-stage Dockerfile that produces a minimal distroless image.

Build the image

# Production image (distroless, minimal attack surface)
docker build --target release -t aviso-server:local .

# Debug image (Debian slim, includes bash for troubleshooting)
docker build --target debug -t aviso-server:debug .

Run with Docker

Mount your config file and expose the port:

docker run --rm \
  -p 8000:8000 \
  -v $(pwd)/configuration/config.yaml:/app/configuration/config.yaml:ro \
  aviso-server:local

Or override settings via environment variables (no config mount needed):

docker run --rm \
  -p 8000:8000 \
  -e AVISOSERVER_APPLICATION__HOST=0.0.0.0 \
  -e AVISOSERVER_APPLICATION__PORT=8000 \
  -e AVISOSERVER_NOTIFICATION_BACKEND__KIND=in_memory \
  aviso-server:local

Build targets summary

TargetBase imageSizeUse
releasedistroless/ccminimalProduction
debugdebian:bookworm-slimlargerTroubleshooting

Local JetStream (Docker)

For local development with the JetStream backend, use the provided script to spin up a NATS server with JetStream enabled:

./scripts/init_nats.sh

This script:

  • Generates a NATS config file in ./nats_config/
  • Creates a Docker volume for persistent JetStream storage
  • Starts a nats:2-alpine container on localhost:4222
  • Waits for the server to be ready and prints a connection summary

Requires: Docker

Optional environment variables:

NATS_PORT=4222            # NATS client port (default: 4222)
ENABLE_AUTH=true          # Enable token auth (default: false)
MAX_MEMORY=5GB            # JetStream memory limit (default: 5GB)
MAX_STORAGE=10GB          # JetStream file storage limit (default: 10GB)

Example with auth enabled:

ENABLE_AUTH=true ./scripts/init_nats.sh

The script prints the generated token at the end of its output, for example:

Authentication enabled with token: aviso_secure_token_1712345678

After the script completes, configure Aviso to connect:

Without auth (default):

notification_backend:
  kind: jetstream
  jetstream:
    nats_url: "nats://localhost:4222"

With auth — pass the token printed by the script:

notification_backend:
  kind: jetstream
  jetstream:
    nats_url: "nats://localhost:4222"
    token: "aviso_secure_token_1712345678"

Alternatively, set the token as an environment variable (Aviso reads NATS_TOKEN automatically):

export NATS_TOKEN=aviso_secure_token_1712345678
cargo run

Kubernetes / Helm

For production Kubernetes deployments, use the official Helm chart:

The chart handles:

  • Deployment with configurable replicas
  • ConfigMap-based configuration mounting
  • Service and Ingress setup
  • JetStream connection settings via values

Build Documentation

Aviso docs are built with mdBook.

Install mdBook and the mermaid preprocessor:

cargo install mdbook
cargo install mdbook-mermaid

Serve docs locally with live reload:

mdbook serve docs --open

Build static output to docs/book/:

mdbook build docs

Run Tests

# Unit and integration tests (in-memory backend)
cargo test --workspace

# Include JetStream integration tests (requires running NATS)
AVISO_RUN_NATS_TESTS=1 cargo test --workspace

# Tests must run single-threaded (shared port binding)
cargo test -- --test-threads=1

Getting Started

This guide walks you through running Aviso Server locally and sending your first notification. It assumes you have already completed Installation.

If you haven't read Key Concepts yet, do that first — it will make the commands below much easier to follow.


1. Choose a Backend

Aviso needs a storage backend before it can accept notifications. For local exploration, in-memory requires zero infrastructure.

GoalBackend
Quick local test, no setupin_memory
Persistent history, realistic behaviorjetstream

The examples on this page use in_memory. To use JetStream locally, see Local JetStream setup below.


2. Configure the Server

Open configuration/config.yaml and make sure it contains at minimum:

application:
  host: "127.0.0.1"
  port: 8000

notification_backend:
  kind: in_memory
  in_memory:
    max_history_per_topic: 100
    max_topics: 10000

notification_schema:
  my_event:
    topic:
      base: "my_event"
      key_order: ["region", "date"]
    identifier:
      region:
        description: "Geographic region label."
        type: EnumHandler
        values: ["north", "south", "east", "west"]
        required: true
      date:
        type: DateHandler
        required: true
    payload:
      required: false

You can also use environment variables to override any config value without editing the file. See Configuration for the full precedence rules.


3. Start the Server

cargo run

Or with the release binary:

./target/release/aviso_server

You should see structured JSON log output. Once you see a line like:

{"level":"INFO","message":"aviso-server listening","address":"127.0.0.1:8000"}

the server is ready.


4. Check the Health Endpoint

curl -sS http://127.0.0.1:8000/health

Expected response: 200 OK


5. Open a Watch Stream

Before publishing, open a terminal and start watching for events. This is a live SSE stream — keep it open while you proceed to step 6.

curl -N -X POST "http://127.0.0.1:8000/api/v1/watch" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type": "my_event",
    "identifier": {
      "region": "north",
      "date":   "20250706"
    }
  }'

You will see the SSE connection frame immediately:

data: {"type":"connection_established","timestamp":"2026-03-04T10:00:00Z"}

The stream stays open and will print new events as they arrive.


6. Publish a Notification

In a second terminal, send a notification:

curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type": "my_event",
    "identifier": {
      "region": "north",
      "date":   "20250706"
    },
    "payload": { "note": "data is ready" }
  }'

Expected response:

{ "id": "my_event@1", "topic": "my_event.north.20250706" }

The id (my_event@1) is the backend sequence reference. You can use it later to replay from that point or delete that specific notification.

Switch back to the watch terminal — the notification should have arrived:

data: {"specversion":"1.0","id":"my_event@1","type":"aviso.notification",...}

7. Replay History

Once you have published a few notifications, you can replay them from a specific point:

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type": "my_event",
    "identifier": {
      "region": "north",
      "date":   "20250706"
    },
    "from_id": "1"
  }'

The stream will emit all matching historical notifications, then close with:

data: {"type":"connection_closing","reason":"end_of_stream","timestamp":"..."}

Optional: Local JetStream Setup

To test with the JetStream backend (persistent storage, more realistic), see Installation — Local JetStream for the full setup including environment variables and token authentication.


Run the Smoke Test

A Python smoke script covers the full notify → watch → replay cycle. Copy the example config and start the server:

cp configuration/config.yaml.example configuration/config.yaml
cargo run

With auth (default) — start auth-o-tron before running the smoke tests:

python3 -m pip install httpx
./scripts/auth-o-tron-docker.sh
python3 scripts/smoke_test.py

Without auth — set auth.enabled: false in your config (or remove the auth section), then:

AUTH_ENABLED=false python3 scripts/smoke_test.py

AUTH_ENABLED must match the server's auth.enabled setting. When false, auth headers are omitted and auth-specific smoke tests are skipped.

Useful overrides:

BASE_URL="http://127.0.0.1:8000" python3 scripts/smoke_test.py
BACKEND="jetstream"               python3 scripts/smoke_test.py
TIMEOUT_SECONDS=12                python3 scripts/smoke_test.py
SMOKE_VERBOSE=1                   python3 scripts/smoke_test.py
python3 scripts/smoke_test.py --verbose

The smoke script covers:

  • health endpoint
  • replay/watch baseline flows (test_polygon)
  • mars replay with dot-containing identifier values, integer and enum predicates
  • dissemination watch + from_date with dot-containing identifier values
  • read/write auth separation across public, role-restricted, and admin-only streams
  • (optional, off by default) ECPDS plugin allow/deny/notify-bypass (see below)

Optional: end-to-end ECPDS plugin smoke test

If your build has --features ecpds enabled and your config has a working ecpds: block pointing at your real ECPDS servers, the smoke script can verify the plugin end-to-end against that ECPDS. It's off by default.

Prerequisites:

  • The server must run with auth.enabled: true and auth.mode: direct. The smoke script sends HTTP Basic credentials, which Aviso forwards to auth-o-tron only in direct mode. Trusted-proxy mode would require an upstream proxy to mint a JWT, which is out of scope for the smoke script.

  • Your auth-o-tron config must know two users: an admin user (defaults admin-user / admin-pass) for the NOTIFY-bypass case, and your ECPDS user (ECPDS_ALLOWED_USER / ECPDS_ALLOWED_PASS) for the watch cases.

  • You need a destination value the ECPDS user is entitled to and one they are not (the latter can be a deliberately-fake string).

  • Add a minimal ECPDS test schema to your config, with match_key (e.g. destination) as the only required identifier field. The smoke test sends a minimal request body and does not populate any other required identifier fields. Don't point it at a richer schema like your production dissemination. Add this dedicated test schema instead:

    notification_schema:
      ecpds_test:
        payload:
          required: false
        topic:
          base: "ecpds_test"
          key_order: ["destination"]
        identifier:
          destination:
            type: StringHandler
            required: true
        auth:
          required: true
          plugins: ["ecpds"]
    

Then:

ECPDS_ENABLED=true \
  ECPDS_EVENT_TYPE=ecpds_test \
  ECPDS_MATCH_KEY=destination \
  ECPDS_ALLOWED_USER="<auth-o-tron-username>" \
  ECPDS_ALLOWED_PASS="<auth-o-tron-password>" \
  ECPDS_ALLOWED_DESTINATION="<destination-the-user-can-read>" \
  ECPDS_DENIED_DESTINATION="NOT-A-REAL-DEST" \
  AUTH_ADMIN_USER=admin-user \
  AUTH_ADMIN_PASS=admin-pass \
  python3 scripts/smoke_test.py

What the three ECPDS smoke cases verify:

CaseWhat it asserts
ecpds: allowed user + entitled destination -> 200POST /api/v1/watch returns HTTP 200 for ECPDS_ALLOWED_USER reading ECPDS_ALLOWED_DESTINATION.
ecpds: allowed user + DENIED destination -> 403Same endpoint returns HTTP 403 for the same user reading ECPDS_DENIED_DESTINATION.
ecpds: notify on ECPDS-protected stream is not gatedPOST /api/v1/notification returns 2xx for AUTH_ADMIN_USER. The plugin is read-only; a 503 here would mean it incorrectly ran on a write.

Troubleshooting:

  • All three skip with [INFO] skipping ECPDS smoke test → check ECPDS_ENABLED=true and that the required env vars are set.
  • The allow case fails with 400 and a "schema validator before the plugin" hint: your ECPDS_EVENT_TYPE schema has additional required identifier fields. Add the minimal test schema above, or simplify the schema you're pointing at. The schema validator rejecting the request before ECPDS runs is the correct behaviour. The smoke test fails loudly here rather than papering over it.
  • The allow case fails with 503 → the issue is between Aviso and ECPDS rather than at the plugin layer; see the ECPDS Plugin Runbook.
  • The notify-bypass case fails with 401/403 → your AUTH_ADMIN_USER / AUTH_ADMIN_PASS don't match your auth-o-tron config; that's an auth setup issue, not an ECPDS issue.

Reporting a Problem

Every aviso response carries an X-Request-ID HTTP header and (for error responses) a request_id field in the JSON body. Streaming responses also include the same UUID in the first SSE event. When something goes wrong, include this id in the bug report so the operator can find the matching server logs in seconds. See API Errors for details.

What's Next

Defining Notification Schemas

A notification schema describes the shape of an event stream: what identifier fields are accepted, how they are validated, how the storage topic is constructed, whether a payload is required, and who can read or write.

Schemas are defined under the notification_schema key in your configuration file. Each top-level key becomes an event type that clients reference when calling /api/v1/notification, /api/v1/watch, or /api/v1/replay.

notification_schema:
  my_event:          # ← event type name
    topic: ...
    identifier: ...
    payload: ...
    auth: ...            # optional
    storage_policy: ...  # optional, JetStream only

Topic Configuration

A schema should have a topic block that tells Aviso how to build the NATS subject for storage and routing.

topic:
  base: "weather"
  key_order: ["region", "date"]
FieldDescription
baseRoot prefix for the subject. Must be unique across all schemas (case-insensitive).
key_orderOrdered list of identifier field names appended to the base, separated by ..

Given base: "weather" and key_order: ["region", "date"], a request with region=north and date=20250706 produces the subject:

weather.north.20250706

Values containing reserved characters (., *, >, %) are automatically percent-encoded so they do not interfere with NATS subject routing. See Topic Encoding for details.

Only fields listed in key_order contribute to the subject. Other identifier fields are validated but not part of the topic.


Identifier Fields

The identifier map defines the fields that clients can send. Each field specifies a handler type that controls validation and canonicalization.

identifier:
  region:
    type: EnumHandler
    values: ["north", "south", "east", "west"]
    required: true
    description: "Geographic region."
  date:
    type: DateHandler
    required: true

Every field supports these common properties:

PropertyTypeDescription
typestringHandler type (see below). Required.
requiredboolIf true, requests missing this field are rejected. Required.
descriptionstringHuman-readable text exposed by GET /api/v1/schema. Optional.

Handler Types

StringHandler

Accepts any non-empty string. No transformation.

class:
  type: StringHandler
  max_length: 2      # optional: reject strings longer than this
  required: true

DateHandler

Parses dates in multiple formats and canonicalizes to a configured output format.

Accepted inputs: YYYY-MM-DD, YYYYMMDD, YYYY-DDD (day-of-year).

date:
  type: DateHandler
  canonical_format: "%Y%m%d"   # output format (default: "%Y%m%d")
  required: false
canonical_formatExample output
"%Y%m%d"20250706
"%Y-%m-%d"2025-07-06

Invalid dates (e.g. February 30) are rejected.

TimeHandler

Parses times and canonicalizes to four-digit HHMM format.

Accepted inputs: 14:30, 1430, 14, 9:05.

time:
  type: TimeHandler
  required: false

Input 14:30 → stored as 1430. Input 9 → stored as 0900.

EnumHandler

Accepts one value from a predefined list. Matching is case-insensitive; stored in lowercase.

domain:
  type: EnumHandler
  values: ["a", "b", "c"]
  required: false

Input "A" → stored as "a". Input "x" → rejected.

IntHandler

Accepts integer strings. Strips leading zeros for canonical storage.

step:
  type: IntHandler
  range: [0, 100000]   # optional: inclusive [min, max] bounds
  required: false

Input "007" → stored as "7". Input "-1" with range: [0, 100] → rejected.

FloatHandler

Accepts floating-point strings. Rejects NaN and Inf.

severity:
  type: FloatHandler
  range: [0.0, 10.0]   # optional: inclusive [min, max] bounds
  required: false

Input "3.14" → stored as "3.14". Input "NaN" → rejected.

ExpverHandler

Experiment version handler. Numeric values are zero-padded to four digits; non-numeric values are lowercased.

expver:
  type: ExpverHandler
  default: "0001"    # optional: used when the field is empty
  required: false

Input "1" → stored as "0001". Input "test" → stored as "test".

PolygonHandler

Accepts a closed polygon as a coordinate string. Used for spatial filtering on /watch and /replay.

Format: lat,lon,lat,lon,...,lat,lon — the first and last coordinate pair must be identical to close the polygon. Parentheses are optional.

polygon:
  type: PolygonHandler
  required: true

Constraints: at least 3 coordinate pairs (plus closing repeat), latitude in [-90, 90], longitude in [-180, 180].

See Spatial Filtering for usage examples.

Reserved Query-Time Fields

point (built-in)

The point field is a reserved identifier that clients can send on /watch or /replay to filter notifications whose polygon contains the given point. It accepts a single lat,lon coordinate pair.

point is not a schema-configurable handler — it is always available on any schema that includes a PolygonHandler field. The /notification endpoint rejects requests that include point.

See Spatial Filtering for usage examples.


Payload Configuration

Controls whether requests must include a payload field.

payload:
  required: true
requiredBehavior
trueRequests without a payload are rejected (400).
falsePayload is optional; missing payloads are stored as JSON null.

The payload can be any valid JSON value (object, array, string, number, boolean, null). It is stored as-is with no reshaping.

See Payload Contract for full semantics.


Per-Stream Authentication

When global authentication is enabled, individual schemas can require credentials and restrict access by role.

auth:
  required: true
  read_roles:
    localrealm: ["analyst", "consumer"]
  write_roles:
    localrealm: ["producer"]
FieldDefault when omittedEffect
requiredMust be set explicitly to true or false.
read_rolesAny authenticated user can readMaps realm → role list for watch/replay access.
write_rolesOnly admins can writeMaps realm → role list for notify access.

Use ["*"] as the role list to grant access to all users from a realm.

Admins (users matching global admin_roles) always have both read and write access.

Omitting the entire auth block makes the stream publicly accessible, even when global auth is enabled.

See Authentication for the full access-control matrix and role-matching rules.


Storage Policy (JetStream Only)

When using the JetStream backend, you can configure per-stream retention limits.

storage_policy:
  retention_time: "7d"
  max_messages: 500000
  max_size: "2Gi"
  allow_duplicates: false
  compression: true
FieldTypeDescription
retention_timedurationDiscard messages older than this. Accepts 30m, 1h, 7d, 1w.
max_messagesintegerMaximum message count; oldest are discarded when exceeded.
max_sizesizeMaximum stream size. Accepts 100Mi, 1Gi, etc.
allow_duplicatesboolAllow duplicate message IDs. Default: backend-specific.
compressionboolEnable message-level compression. Default: backend-specific.

All fields are optional. Omitting storage_policy entirely uses backend defaults.

The in-memory backend does not support storage policies.


Complete Example

This example defines a weather alert stream with date/region routing, enum validation, optional payload, and role-restricted access.

notification_schema:
  weather_alert:
    payload:
      required: false

    topic:
      base: "alert"
      key_order: ["region", "severity_level", "date"]

    identifier:
      region:
        description: "Geographic region."
        type: EnumHandler
        values: ["europe", "asia", "africa", "americas", "oceania"]
        required: true
      severity_level:
        description: "Alert severity (1–5)."
        type: IntHandler
        range: [1, 5]
        required: true
      date:
        description: "Alert date."
        type: DateHandler
        canonical_format: "%Y%m%d"
        required: true
      issued_by:
        description: "Issuing authority identifier."
        type: StringHandler
        max_length: 64
        required: false

    auth:
      required: true
      read_roles:
        operations: ["*"]
      write_roles:
        operations: ["forecaster", "admin"]

    storage_policy:
      retention_time: "30d"
      max_messages: 100000

With this schema:

  • Publishing a notification with region=europe, severity_level=3, date=2025-07-06 produces the subject alert.europe.3.20250706.
  • The issued_by field is validated if present but does not appear in the subject (not in key_order).
  • Any authenticated user in the operations realm can watch/replay.
  • Only users with the forecaster or admin role can publish.
  • JetStream retains up to 100,000 messages or 30 days, whichever limit is hit first.

Tips

  • Start simple. Define only topic, one or two identifier fields, and payload. Add auth and storage policy later.
  • Use key_order deliberately. Fields in key_order become part of the NATS subject and affect routing granularity. More fields = more specific topics = more efficient filtering, but also more distinct subjects.
  • Mark routing fields required. If a field is in key_order, consider making it required: true so every notification produces a complete subject.
  • Keep base short and unique. It is the root of every subject in this stream. Avoid collisions with other schemas.
  • Test with GET /api/v1/schema/{event_type}. This endpoint returns the public view of your schema, showing all identifier fields and their validation rules.

Practical Examples

This section provides copy-paste examples for common workflows.

All examples use the same generic event schema so behavior is easy to compare.

Shared Generic Schema

notification_schema:
  extreme_event:
    topic:
      base: extreme_event
      key_order: [region, run_time, severity, anomaly, polygon]
    identifier:
      region:
        description: "Geographic region label."
        type: EnumHandler
        values: ["north", "south", "east", "west"]
        required: true
      run_time:
        type: TimeHandler
        required: true
      severity:
        description: "Severity level from 1 to 7."
        type: IntHandler
        range: [1, 7]
        required: true
      anomaly:
        type: FloatHandler
        range: [0.0, 100.0]
        required: false
      polygon:
        type: PolygonHandler
        required: false
    payload:
      required: false

Shared Assumptions

  • Base URL: http://127.0.0.1:8000
  • Content type: application/json
  • Replay examples use from_id or from_date explicitly.

Next:

Basic Notify/Watch/Replay

Uses the shared generic schema from Practical Examples.

This page is the quickest way to understand the normal API flow. You first publish (notify), then observe live updates (watch), then read history (replay). If you are onboarding a new environment, start here before trying filters or admin operations. Read the examples in order.

1) Notify

curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4",
      "anomaly":"42.5"
    },
    "payload":{"note":"initial forecast"}
  }'

Expected:

  • HTTP 200
  • required identifier keys must be present; optional keys may be omitted

2) Watch (Live Only)

curl -N -X POST "http://127.0.0.1:8000/api/v1/watch" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4",
      "anomaly":"42.5"
    }
  }'

Expected:

  • HTTP 200
  • SSE starts with connection_established
  • only new matching notifications arrive

3) Replay (Historical)

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4",
      "anomaly":"42.5"
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 200
  • SSE emits replay_started, replay events, replay_completed, then closes

Constraint Filtering

Uses the shared generic schema from Practical Examples.

Constraint filtering lets subscribers express conditions over identifier fields instead of exact values. In practice, this is how you ask for ranges (severity >= 5), numeric bands, or enum subsets. This page starts with seed data, then shows valid constraint requests, then common failure cases. It is the best reference for building client-side filter payloads.

Seed Notifications

curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"north","run_time":"1200","severity":"3","anomaly":42.5},
    "payload":{"note":"seed-a"}
  }'

curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"south","run_time":"1200","severity":"6","anomaly":87.2},
    "payload":{"note":"seed-b"}
  }'

Expected:

  • both return HTTP 200

Scalar Value (Implicit eq)

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"south","run_time":"1200","severity":6,"anomaly":87.2},
    "from_id":"1"
  }'

Expected:

  • HTTP 200
  • only severity = 6 notifications match

Integer Constraint (gte)

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":{"in":["north","south"]},
      "run_time":"1200",
      "severity":{"gte":5},
      "anomaly":87.2
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 200
  • includes severity=6
  • excludes severity=3

Float Constraint (between)

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"3",
      "anomaly":{"between":[40.0,50.0]}
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 200
  • includes anomaly=42.5

Float eq Is Exact (No Tolerance)

Float eq and in are exact comparisons. This keeps behavior deterministic across replay/live and avoids hidden tolerance windows.

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"3",
      "anomaly":{"eq":42.5}
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 200
  • only notifications with exactly anomaly=42.5 match
  • NaN/inf values are rejected by float validation/constraints

Enum Constraint (in)

curl -N -X POST "http://127.0.0.1:8000/api/v1/watch" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":{"in":["south","west"]},
      "run_time":"1200",
      "severity":"6",
      "anomaly":87.2
    }
  }'

Expected:

  • HTTP 200
  • live notifications pass only for regions in ["south","west"]

Invalid: Two Operators in One Constraint Object

curl -sS -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":{"gte":4,"lt":7},
      "anomaly":42.5
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 400
  • message says constraint object must contain exactly one operator

Invalid: Constraint Object on /notification

curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":{"gte":4},
      "anomaly":42.5
    },
    "payload":{"note":"should-fail"}
  }'

Expected:

  • HTTP 400

Spatial Filtering

Uses the shared generic schema from Practical Examples.

Spatial filtering has two modes:

  • identifier.polygon: keep notifications whose polygon intersects the request polygon.
  • identifier.point: keep notifications whose polygon contains the request point.

This matters when many notifications share similar non-spatial identifiers and you need geographic precision.

Seed Notifications

These two notifications differ only by polygon shape.

curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4",
      "polygon":"(52.5,13.4,52.6,13.5,52.5,13.6,52.4,13.5,52.5,13.4)"
    },
    "payload":{"note":"poly-a"}
  }'

curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4",
      "polygon":"(10.0,10.0,10.2,10.0,10.2,10.2,10.0,10.2,10.0,10.0)"
    },
    "payload":{"note":"poly-b"}
  }'

Expected:

  • both requests return HTTP 200

Replay with Polygon Intersection Filter

This request polygon intersects poly-a but not poly-b.

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4",
      "polygon":"(52.52,13.45,52.62,13.55,52.52,13.65,52.42,13.55,52.52,13.45)"
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 200
  • replay includes poly-a
  • replay excludes poly-b

Replay with Point Containment Filter

The point below is inside poly-a and outside poly-b.

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4",
      "point":"52.55,13.50"
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 200
  • replay includes poly-a
  • replay excludes poly-b

Replay Without Spatial Filter

No polygon and no point means no spatial narrowing.

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4"
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 200
  • replay may include both poly-a and poly-b because only non-spatial fields are applied

Invalid: polygon and point Together

curl -sS -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{
      "region":"north",
      "run_time":"1200",
      "severity":"4",
      "polygon":"(52.5,13.4,52.6,13.5,52.5,13.6,52.4,13.5,52.5,13.4)",
      "point":"52.55,13.50"
    },
    "from_id":"1"
  }'

Expected:

  • HTTP 400
  • validation error says both spatial filters cannot be used together

Replay Starting Points

Uses the shared generic schema from Practical Examples.

Replay start parameters control where historical delivery begins. Choose from_id when you track sequence progress; choose from_date when you track wall-clock time. These examples cover valid forms and the common invalid combinations that return 400. Use this page to validate client retry and resume logic.

Replay from Sequence (from_id)

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"},
    "from_id":"10"
  }'

Expected:

  • HTTP 200
  • replay starts from sequence 10 (inclusive)

Replay from Time (from_date) RFC3339

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"},
    "from_date":"2026-03-01T12:00:00Z"
  }'

Expected:

  • HTTP 200
  • replay starts from that UTC timestamp (inclusive)

Replay from Time (from_date) Unix Seconds

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"},
    "from_date":"1740509903"
  }'

Expected:

  • HTTP 200

Replay from Time (from_date) Unix Milliseconds

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"},
    "from_date":"1740509903710"
  }'

Expected:

  • HTTP 200

Invalid Replay Start Combinations

Missing Both from_id and from_date

curl -sS -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"}
  }'

Expected:

  • HTTP 400

Both from_id and from_date Provided

curl -sS -X POST "http://127.0.0.1:8000/api/v1/watch" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type":"extreme_event",
    "identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"},
    "from_id":"5",
    "from_date":"2026-03-01T12:00:00Z"
  }'

Expected:

  • HTTP 400

Admin Operations (Practical)

See full reference in Admin Operations.

These operations are for cleanup and recovery, not normal data flow. Use delete when one bad record must be removed; use wipe when resetting a stream or environment. Because these endpoints are destructive, validate IDs and stream names carefully before execution.

Delete One Notification by ID

curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/notification/extreme_event@42"

Expected:

  • 200 if it exists
  • 404 if stream/sequence does not exist
  • 400 for invalid format

Wipe One Stream

curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/wipe/stream" \
  -H "Content-Type: application/json" \
  -d '{"stream_name":"EXTREME_EVENT"}'

Expected:

  • 200
  • stream definition remains, messages are removed

Wipe All Streams

curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/wipe/all"

Expected:

  • 200
  • all stream data is removed

Backends Overview

Aviso abstracts all storage and messaging behind a NotificationBackend trait. Two implementations ship out of the box.


Which Backend Should I Use?

flowchart TD
    A["What do you need?"] --> B{"Persistent history<br/>across restarts?"}
    B -->|Yes| C[jetstream]
    B -->|No| D{"Multiple replicas<br/>or pods?"}
    D -->|Yes| C
    D -->|No| E{"Production<br/>workload?"}
    E -->|Yes| C
    E -->|No| F[in_memory]

    C:::jet
    F:::mem

    classDef jet fill:#1a6b3a,color:#fff,stroke:#0d4a27
    classDef mem fill:#1a4d6b,color:#fff,stroke:#0d3347
RequirementRecommended backend
Persistent history across restartsjetstream
Replay endpoint supportjetstream (or in_memory for local/node-local use)
Live watch streaming supportjetstream (or in_memory for local/node-local use)
Multi-replica deploymentjetstream
Quick local experimentation with minimal setupin_memory

Capability Comparison

CapabilityJetStreamIn-Memory
Durable storageYesNo — data lost on restart
Replay supportYesYes — node-local only
Live watch supportYesYes — node-local fan-out
Multi-replica / HAYes (clustered NATS)No
Per-schema storage policyYesNo — rejected at startup
Cross-instance consistencyYesNo

Backend Details

InMemory Backend

Intended use

in_memory backend is best for:

  • local development,
  • schema/validation testing,
  • lightweight experimentation where persistence is not required.

Behavior

  • Data is process-memory only and is lost on restart.
  • Topic/message limits are enforced with eviction.
  • No shared state across replicas or pods.
  • Supports live watch subscriptions (live-only delivery).
  • Supports replay batch retrieval for from_id and from_date.
  • Uses in-process fanout only, so subscriptions/replay are node-local.

Configuration

notification_backend.kind: in_memory

Available knobs:

  • max_history_per_topic (default 1)
  • max_topics (default 10000)
  • enable_metrics (default false)

Per-schema storage_policy fields are currently not supported on in_memory and are rejected at startup.

Production suitability

Not recommended for production because:

  • no durability,
  • no HA replication,
  • no cross-instance consistency,
  • replay/watch history is limited to local in-memory retention.

JetStream Backend

The jetstream backend is the production-oriented storage implementation. It connects to a NATS server with JetStream enabled and uses it for durable message storage, replay, and live streaming.


Intended Use

Use jetstream when you need:

  • durable storage that survives server restarts
  • replay across multiple server instances
  • live streaming with cluster-wide fan-out
  • configurable retention, size limits, and compression

Local Test Setup

Start a NATS + JetStream instance via Docker:

./scripts/init_nats.sh

Then configure Aviso:

notification_backend:
  kind: jetstream
  jetstream:
    nats_url: "nats://localhost:4222"

For full setup options including authentication and storage limits, see Installation — Local JetStream.


Core Behavior

  • Connects to the configured NATS server on startup (with retry).
  • Creates JetStream streams on demand, one per topic base (e.g. MARS, DISS, POLYGON).
  • Publishes notifications directly to JetStream subjects using the encoded wire format.
  • Uses pull consumers for replay batching (from_id, from_date).
  • Uses push consumers for live watch subscriptions.
  • Reconciles existing streams against current config when they are first accessed.

Configuration Reference

All fields live under notification_backend.jetstream.

Connection & startup

FieldDefaultNotes
nats_urlnats://localhost:4222NATS server URL.
tokenNoneToken auth; falls back to NATS_TOKEN environment variable.
timeout_seconds30Per-attempt connection timeout (> 0).
retry_attempts3Startup connection attempts before backend init fails (> 0).

Runtime reconnect

FieldDefaultNotes
enable_auto_reconnecttrueEnables/disables NATS client reconnect after startup.
max_reconnect_attempts50 means unlimited reconnect retries.
reconnect_delay_ms2000Delay between reconnect attempts and startup connect retries (> 0).

Publish resilience

FieldDefaultNotes
publish_retry_attempts5Retries for transient channel closed publish failures (> 0).
publish_retry_base_delay_ms150Base backoff in ms for publish retries; grows exponentially per attempt (> 0).

Stream defaults

These apply to every stream created by Aviso unless overridden by a per-schema storage_policy.

FieldDefaultNotes
max_messagesNoneStream message cap (maps to max_messages).
max_bytesNoneStream size cap in bytes (maps to max_bytes).
retention_timeNoneDefault max age: duration literal (s, m, h, d, w; e.g. 30d).
storage_typefilefile or memory — parsed as typed enum at config load.
replicasNoneStream replica count.
retention_policylimitslimits, interest, or workqueue — parsed as typed enum.
discard_policyoldold or new — parsed as typed enum.

Fail-fast validation: storage_type, retention_policy, and discard_policy are parsed as typed enums during configuration loading. Invalid values fail startup immediately, before any streams are created.

Full example

notification_backend:
  kind: jetstream
  jetstream:
    nats_url: "nats://localhost:4222"
    timeout_seconds: 30
    retry_attempts: 3
    enable_auto_reconnect: true
    max_reconnect_attempts: 5
    reconnect_delay_ms: 2000
    publish_retry_attempts: 5
    publish_retry_base_delay_ms: 150
    storage_type: file
    retention_policy: limits
    discard_policy: old

Stream Management

Stream creation

On first access (e.g. first publish for a given event type), Aviso creates a JetStream stream with the following settings applied:

  • storage_type, retention_policy, discard_policy
  • max_messages, max_bytes, retention_timemax_age
  • replicas

The stream subject binding is set to <base>.> (e.g. mars.>) to capture all topics under that base.

Reconciliation of existing streams

When a stream already exists and is accessed by Aviso, it is reconciled — the current stream config is compared against the desired config and mutable fields are updated if drift is detected:

  • limits (retention, size, message count)
  • compression
  • duplicate window
  • replicas
  • subject binding

If JetStream rejects an update (e.g. the field is not editable in the current server/stream state), Aviso logs a warning and continues with the existing stream configuration.

Precedence

Backend-level defaults are applied first, then per-schema storage_policy overrides for that stream:

notification_backend.jetstream.* (base defaults)
    ↓ overridden by
notification_schema.<event_type>.storage_policy.*

Applying config changes to existing streams

Changes to stream-affecting settings (e.g. compression, retention, limits) in config.yaml are applied to existing streams automatically during reconciliation when the stream is next accessed.

To force historical data to be physically rewritten with new settings (e.g. re-pack with compression):

  1. Stop all Aviso writers for the target stream.
  2. Delete the stream in NATS.
  3. Restart Aviso (or publish again) — the stream is recreated with current config.
# List streams
nats stream ls

# Delete a stream (example: DISS)
nats stream rm DISS

wipe_stream (admin endpoint) removes messages but preserves stream configuration. Use stream deletion only when you need historical data physically rewritten.


Verifying Effective Stream Policy

Use the nats CLI to inspect the stream config after a publish or reconcile:

# Replace POLYGON with your stream name (MARS, DISS, etc.)
nats --server nats://localhost:4222 stream info POLYGON

Fields to check:

CLI fieldConfig field
Max Ageretention_time
Max Messagesmax_messages
Max Bytesmax_bytes / per-schema max_size
Max Messages Per Subjectallow_duplicates: 1 = disabled, -1 = enabled
CompressionNone or S2

Replay Behavior

  • Sequence replay (from_id): starts from that sequence number, inclusive.
  • Time replay (from_date): uses JetStream start-time delivery policy.
  • The API enforces mutual exclusivity — from_id and from_date cannot both be present.

Smoke Test (JetStream Mode)

python3 -m pip install httpx

BACKEND=jetstream \
NATS_URL=nats://localhost:4222 \
JETSTREAM_POLICY_STREAM_NAME=POLYGON \
EXPECT_MAX_MESSAGES=500000 \
EXPECT_MAX_BYTES=2147483648 \
EXPECT_MAX_MESSAGES_PER_SUBJECT=1 \
EXPECT_COMPRESSION=None \
python3 scripts/smoke_test.py

Operational Caveats

  • Startup connectivity is controlled by timeout_seconds + retry_attempts.
  • Runtime reconnect is controlled by enable_auto_reconnect, max_reconnect_attempts, reconnect_delay_ms.
  • Publish retry is a narrow resilience path for transient channel closed failures; non-transient failures fail fast.
  • retry_attempts applies only to startup; post-startup reconnect uses the reconnect settings.
  • Setting max_reconnect_attempts = 0 enables unlimited reconnect retries.

Deployment Modes

Local experimentation

Recommended backend:

  • in_memory for quick local request/validation testing.

Characteristics:

  • No persistence: data is lost on process restart.
  • Single-process state only.
  • Not suitable for horizontal scaling or replica failover.
  • Supports replay and watch in-process, limited by local memory retention.
  • For local JetStream testing, see Installation — Local JetStream.
  • For a quick end-to-end behavior check, see Getting Started — Run the Smoke Test.

Production-like / persistent mode

Recommended backend:

  • jetstream

Characteristics:

  • Durable message storage.
  • Retention and size limits.
  • Replica support (requires clustered NATS setup).
  • Supports replay and live streaming workflows.

Recommended packaging/deployment for Kubernetes:

Selection guideline

  • Need persistence/replay/streaming robustness: use jetstream.
  • Need fastest setup for local functional checks only: use in_memory (node-local replay/watch).

Configuration

Key Behaviors to Know First

Before diving into field-level settings, these rules affect how everything behaves at runtime:

  • Environment variables always win — they override any YAML value regardless of which file it came from.
  • Replay and watch behavior is controlled by request parameters, not static config switches.
  • Invalid policy values fail startup immediatelystorage_type, retention_policy, and discard_policy are parsed as typed enums; bad values are caught before any streams are created.
  • Per-schema storage_policy is validated at startup against the selected backend's capabilities. Unsupported fields (e.g. retention_time on in_memory) cause a startup failure with a clear error.
  • JetStream stream changes are reconciled on access — updating compression, retention, or limits in config takes effect when that stream is next accessed. Recreate the stream only if you need historical data physically rewritten.
  • /api/v1/schema responses are client-focused — internal storage_policy settings are not exposed.

Loading Precedence

Configuration is loaded in this order (later sources override earlier ones):

  1. ./configuration/config.yaml
  2. /etc/aviso_server/config.yaml
  3. $HOME/.aviso_server/config.yaml
  4. Environment variables (highest precedence)

If AVISOSERVER_CONFIG_FILE is set, only that single file is loaded (steps 1–3 are skipped). Environment variables still override values from the file.

Environment variable format

Prefix: AVISOSERVER_ Nested separator: __

AVISOSERVER_APPLICATION__HOST=0.0.0.0
AVISOSERVER_APPLICATION__PORT=8000
AVISOSERVER_NOTIFICATION_BACKEND__KIND=jetstream
AVISOSERVER_NOTIFICATION_BACKEND__JETSTREAM__NATS_URL=nats://localhost:4222

Config File Structure

The top-level sections are:

SectionPurpose
applicationServer host, port, static files path
loggingLog level and format
authAuthentication mode, secrets, admin roles
notification_backendBackend selection and backend-specific settings
notification_schemaPer-event-type validation, topic ordering, storage policy
metricsOptional Prometheus metrics server
watch_endpointSSE heartbeat, connection limits, replay batch settings

notification_backend.kind selects the storage implementation:

  • jetstream — production backend (NATS JetStream)
  • in_memory — development backend (process-local, no persistence)

Backend Details


For full field-level documentation of every config option, see Configuration Reference.

Configuration Reference

This page documents runtime-relevant configuration fields and defaults.

Topic Wire Format

  • Topic wire subjects always use . as separator.
  • Per-schema topic.separator is no longer used.
  • Token values are percent-encoded for reserved chars (., *, >, %) before writing to backend subjects.

See Topic Encoding for rules and examples.

application

FieldTypeDefaultNotes
hoststringnoneBind address.
portu16noneBind port.
base_urlstringhttp://localhostUsed in generated CloudEvent source links.
static_files_pathstring/app/staticStatic asset root for homepage assets.

logging

FieldTypeDefaultNotes
levelstringimplementation defaultExample: info, debug, warn, error.
formatstringimplementation defaultKept for compatibility; output is OTel-aligned JSON.

auth

Authentication is optional. When disabled (default), all API endpoints are publicly accessible only if schemas do not define stream auth rules. Startup fails if global auth is disabled while a schema sets auth.required=true or non-empty auth.read_roles/auth.write_roles.

When enabled:

  • Admin endpoints always require a valid JWT and an admin role.
  • Stream endpoints (notify, watch, replay) enforce authentication only when the target schema has auth.required: true.
  • Schema endpoints (/api/v1/schema) are always public.
  • In trusted_proxy mode, Aviso validates Authorization: Bearer <jwt> locally with jwt_secret.
FieldTypeDefaultNotes
enabledboolfalseSet to true to enable authentication.
mode"direct"|"trusted_proxy""direct"direct: forward credentials to auth-o-tron. trusted_proxy: validate forwarded JWT locally.
auth_o_tron_urlstring""auth-o-tron base URL. Required when enabled=true and mode=direct.
jwt_secretstring""Shared HMAC secret for JWT validation. Required when enabled=true. Not exposed via /api/v1/schema endpoints and redacted when auth settings are serialized or logged.
admin_rolesmap<string, string[]>{}Realm-scoped roles for admin endpoints (/api/v1/admin/*). Must contain at least one realm with non-empty roles when enabled=true.
timeout_msu645000Timeout for auth-o-tron requests (milliseconds). Must be > 0.

Per-stream auth (notification_schema.<event_type>.auth)

FieldTypeDefaultNotes
requiredboolMust be explicitly set whenever an auth block is present. When true, the stream requires authentication.
read_rolesmap<string, string[]>Realm-scoped roles for read access (watch/replay). When omitted, any authenticated user can read. Use ["*"] as the role list to grant realm-wide access.
write_rolesmap<string, string[]>Realm-scoped roles for write access (notify). When omitted, only users matching global admin_roles can write. Use ["*"] as the role list to grant realm-wide access.
pluginsstring[](none)Optional list of authorization plugins to run after role-based checks. Currently supported: "ecpds" (requires --features ecpds build). On a build without the required feature, startup fails with a clear error pointing at the offending stream. (Silent skip would widen access.) Empty plugins: [] is rejected; omit the field instead. Plugins only run when auth.required is true.

See Authentication for detailed setup, client usage, and error responses.

ecpds

Optional ECPDS destination authorization. Only available when built with --features ecpds. When configured, streams can reference the "ecpds" plugin in their auth.plugins list to enforce destination-level access control on watch and replay requests.

FieldTypeDefaultNotes
usernamestringnoneService account username used for HTTP Basic Auth to ECPDS. Must not be empty.
passwordstringnoneService account password. Redacted to [REDACTED] in Debug output (and therefore in any structured-log dump of the configuration). Must not be empty. The /api/v1/schema endpoint never exposes the top-level ecpds block at all, only per-event identifier and payload fields, so the password is not reachable through it.
serversstring[]noneList of ECPDS server base URLs. Use https:// for any reachable host: the plugin authenticates with HTTP Basic Auth, so plain http:// to a real host would put the service-account password and per-user destination lookups on the wire without TLS. Plain http:// is accepted only for loopback (127.0.0.1, [::1], localhost) for local testing; a typo from https:// to http:// on a non-loopback host fails closed at startup. Each URL must parse with no query string and no fragment. Path prefixes (e.g. https://proxy.example/ecpds-api/) are accepted. The plugin appends /ecpds/v1/destination/list?id=<username> itself.
match_keystringnoneIdentifier field to match against the user's destination list (e.g. "destination"). Must be a single bare identifier name (no whitespace, / or NUL) and must be present in the schema's identifier with required: true (so the value is guaranteed before the plugin runs). It does NOT need to appear in topic.key_order; the plugin reads the value from the request's canonicalized identifier params, not from topic routing.
target_fieldstring"name"JSON field to extract from each ECPDS destination record. Records that lack this field are silently skipped (logged at info as auth.ecpds.fetch.skipped_record).
cache_ttl_secondsu64300How long (in seconds) to cache a user's destination list before re-fetching. Must be > 0.
max_entriesu6410000Maximum number of distinct usernames held in the cache; eviction policy is moka's TinyLFU. Must be > 0.
request_timeout_secondsu6430Total wall-clock budget for a single ECPDS HTTP request: DNS lookup, TCP connect, TLS handshake, request send, AND response body read must all complete within this. (reqwest::ClientBuilder::timeout is a total deadline that starts when the request is issued; tune this as an upper bound that includes connection setup, not just response time.) Must be > 0.
connect_timeout_secondsu645Sub-budget within request_timeout_seconds for the dial-through-TLS-handshake phase only (DNS + TCP connect + TLS). If this elapses first the request fails with a connect timeout; otherwise the remainder of request_timeout_seconds covers request send and response body. Must be > 0.
partial_outage_policy"strict"|"any_success""strict"How tolerant the merge is when one configured server fails. The destination list itself is always the union of per-server responses. strict: every server must respond successfully or the call fails with 503. any_success: take the union of whichever servers responded; only fails if no server responded. See ECPDS Destination Authorization for the failure-tolerance trade-off.

See ECPDS Destination Authorization for setup and runtime behavior, and the ECPDS runbook for operational triage.

metrics

Optional Prometheus metrics endpoint. When enabled, a separate HTTP server serves /metrics on an internal port for scraping by Prometheus/ServiceMonitor. This keeps metrics isolated from the public API.

FieldTypeDefaultNotes
enabledboolfalseEnable the metrics endpoint.
hoststring"127.0.0.1"Bind address for the metrics server. Defaults to loopback to avoid public exposure.
portu16noneRequired when enabled=true. Must differ from application.port.

Exposed metrics:

MetricTypeLabelsDescription
aviso_notifications_totalcounterevent_type, statusTotal notification requests.
aviso_sse_connections_activegaugeendpoint, event_typeCurrently active SSE connections.
aviso_sse_connections_totalcounterendpoint, event_typeTotal SSE connections opened.
aviso_sse_unique_users_activegaugeendpointDistinct users with active SSE connections.
aviso_auth_requests_totalcountermode, outcomeAuthentication attempts.

A binary built with --features ecpds registers the following five metrics. The unlabelled counters and the gauge appear as Prometheus series at process startup. The two labelled counters (access_decisions_total, fetch_total) are pre-initialised at startup with every documented outcome value, so each outcome label appears as a series at zero before any ECPDS traffic; this lets alert rules of the form rate(metric{outcome="error"}[5m]) > 0 start evaluating on a known-zero baseline rather than on a missing series.

MetricTypeLabelsDescription
aviso_ecpds_cache_hits_totalcounter(none)ECPDS destination cache hits (requests served from cache without an upstream call).
aviso_ecpds_cache_misses_totalcounter(none)ECPDS destination cache misses (requests not served from cache). Includes coalesced waiters that did not trigger an upstream call themselves; aviso_ecpds_fetch_total is the right metric for "actual upstream calls".
aviso_ecpds_cache_sizegauge(none)Number of usernames in the ECPDS destination cache, sampled from moka after eviction passes. Expired entries are pruned by moka asynchronously, so this gauge can briefly include not-yet-pruned expired entries until the next pending-tasks run.
aviso_ecpds_access_decisions_totalcounteroutcomeAccess decisions. outcome ∈ {allow, deny_destination, deny_match_key_missing, unavailable, admin_bypass, error}.
aviso_ecpds_fetch_totalcounteroutcomeUpstream fetch outcomes (recorded once per access check whose request actually ran the upstream call; coalesced waiters do not contribute). outcome ∈ {success, http_401, http_403, http_4xx, http_5xx, invalid_response, unreachable}.

Process-level metrics (CPU, memory, open FDs) are automatically collected on Linux.

notification_backend

FieldTypeDefaultNotes
kindstringnonejetstream or in_memory.
in_memoryobjectoptionalUsed when kind = in_memory.
jetstreamobjectoptionalUsed when kind = jetstream.

notification_backend.in_memory

FieldTypeDefaultNotes
max_history_per_topicusize1Retained messages per topic in memory.
max_topicsusize10000Max tracked topics before LRU-style eviction.
enable_metricsboolfalseEnables extra internal metrics logs.

See InMemory Backend for operational caveats.

notification_backend.jetstream

FieldTypeDefaultRuntime usage summary
nats_urlstringnats://localhost:4222NATS connection URL.
tokenstring?NoneToken auth; NATS_TOKEN env fallback.
timeout_secondsu64?30NATS connection timeout for each startup connect attempt (> 0).
retry_attemptsu32?3Startup connect attempts before backend init fails (> 0).
max_messagesi64?NoneStream message cap.
max_bytesi64?NoneStream size cap in bytes.
retention_timestring?NoneDefault stream max age (s, m, h, d, w; for example 30d).
storage_typestring?filefile or memory (parsed as typed enum at config load).
replicasusize?NoneStream replicas.
retention_policystring?limitslimits/interest/workqueue (parsed as typed enum at config load).
discard_policystring?oldold/new (parsed as typed enum at config load).
enable_auto_reconnectbool?trueEnables/disables NATS client reconnect behavior.
max_reconnect_attemptsu32?5Mapped to NATS max_reconnects (0 => unlimited).
reconnect_delay_msu64?2000Reconnect delay and startup connect retry backoff (> 0).
publish_retry_attemptsu32?5Retry attempts for transient publish channel closed failures (> 0).
publish_retry_base_delay_msu64?150Base backoff in milliseconds for publish retries (> 0).

See JetStream Backend for detailed behavior.

notification_schema.<event_type>.payload

Schema-level payload contract for notify requests.

FieldTypeExampleNotes
requiredbooltrueWhen true, /notification rejects requests without payload.

Behavior details and edge cases are documented in Payload Contract.

notification_schema.<event_type>.storage_policy

Optional per-schema storage settings validated at startup against selected backend capabilities.

FieldTypeExampleNotes
retention_timestring7d, 12h, 30mDuration literal (s, m, h, d, w).
max_messagesinteger100000Must be > 0.
max_sizestring512Mi, 2GSize literal (K, Ki, M, Mi, G, Gi, T, Ti).
allow_duplicatesbooltrueBackend support is capability-gated.
compressionbooltrueBackend support is capability-gated.

Field behavior:

  • retention_time overrides backend-level retention for the schema stream.
  • max_messages overrides backend-level message cap for the schema stream.
  • max_size overrides backend-level byte cap for the schema stream.
  • allow_duplicates = false maps to one message per subject (latest kept); true removes this cap.
  • compression = true enables stream compression when backend supports it.

Startup behavior:

  • Invalid retention_time/max_size format fails startup.
  • Unsupported fields for selected backend fail startup.
  • Validation happens before backend initialization.
  • With in_memory, all storage_policy fields are currently unsupported (startup fails if provided).

Runtime application behavior:

  • storage_policy is applied on stream create and reconciled for existing JetStream streams when those streams are accessed by Aviso.
  • Aviso-managed stream subject binding is also reconciled to the expected <base>.> pattern.
  • Mutable fields (retention/limits/compression/duplicates/replicas) are updated when drift is detected.
  • Recreate stream(s) only when you need historical data physically rewritten with new settings.

Example:

notification_backend:
  kind: jetstream
  jetstream:
    nats_url: "nats://localhost:4222"
    publish_retry_attempts: 5
    publish_retry_base_delay_ms: 150

notification_schema:
  dissemination:
    topic:
      base: "diss"
      key_order: ["destination", "target", "class", "expver", "domain", "date", "time", "stream", "step"]
    storage_policy:
      retention_time: "7d"
      max_messages: 2000000
      max_size: "10Gi"
      allow_duplicates: true
      compression: true

watch_endpoint

FieldTypeDefaultNotes
sse_heartbeat_interval_secu6430SSE heartbeat period.
connection_max_duration_secu643600Maximum live watch duration.
replay_batch_sizeusize100Historical fetch batch size.
max_historical_notificationsusize10000Replay cap for historical delivery.
replay_batch_delay_msu64100Delay between historical replay batches.
concurrent_notification_processingusize15Live stream CloudEvent conversion concurrency.

Custom config file path

Set AVISOSERVER_CONFIG_FILE to use a specific config file instead of the default search cascade:

AVISOSERVER_CONFIG_FILE=/path/to/config.yaml cargo run

When set, only this file is loaded as a file source (startup fails if it does not exist). The default locations (./configuration/config.yaml, /etc/aviso_server/config.yaml, $HOME/.aviso_server/config.yaml) are skipped. AVISOSERVER_* field-level overrides still apply on top.

Environment override examples

AVISOSERVER_APPLICATION__HOST=0.0.0.0
AVISOSERVER_APPLICATION__PORT=8000
AVISOSERVER_NOTIFICATION_BACKEND__KIND=jetstream
AVISOSERVER_NOTIFICATION_BACKEND__JETSTREAM__NATS_URL=nats://localhost:4222
AVISOSERVER_NOTIFICATION_BACKEND__JETSTREAM__TOKEN=secret
AVISOSERVER_WATCH_ENDPOINT__REPLAY_BATCH_SIZE=200
AVISOSERVER_AUTH__ENABLED=true
AVISOSERVER_AUTH__JWT_SECRET=secret
AVISOSERVER_METRICS__ENABLED=true
AVISOSERVER_METRICS__PORT=9090

Authentication

Authentication is optional. When enabled, Aviso supports two modes:

  • Direct — Aviso forwards Bearer or Basic credentials to auth-o-tron, which returns a signed JWT.
  • Trusted proxy — an upstream reverse proxy authenticates the user and forwards a signed JWT; Aviso validates it locally.

How It Works

  1. Client sends credentials to Aviso.
  2. Middleware resolves user identity:
    • direct: forwards the Authorization header to auth-o-tron GET /authenticate and receives a JWT back.
    • trusted_proxy: validates the forwarded Authorization: Bearer <jwt> locally using jwt_secret.
  3. Username, realm, and roles are extracted from JWT claims and attached to the request.
  4. Route handlers enforce per-stream auth rules on notify, watch, and replay.
  5. Admin endpoints (/api/v1/admin/*) always require a valid JWT with an admin role.

Schema endpoints (GET /api/v1/schema, GET /api/v1/schema/{event_type}) are always publicly accessible, even when auth is enabled.

Quick Start (Direct Mode)

1. Start auth-o-tron

# Foreground (Ctrl+C to stop):
./scripts/auth-o-tron-docker.sh start

# Background:
./scripts/auth-o-tron-docker.sh start --detach

By default this uses scripts/example_auth_config.yaml. To use your own config:

AUTH_O_TRON_CONFIG_FILE=/path/to/auth-config.yaml ./scripts/auth-o-tron-docker.sh start

The bundled example config defines three local test users in realm localrealm:

UserPasswordRole
admin-useradmin-passadmin
reader-userreader-passreader
producer-userproducer-passproducer

2. Enable auth in config

auth:
  enabled: true
  mode: direct
  auth_o_tron_url: "http://localhost:8080"
  jwt_secret: "your-shared-secret"   # must match auth-o-tron jwt.secret
  admin_roles:
    localrealm: ["admin"]
  timeout_ms: 5000

Roles are realm-scoped: admin_roles maps each realm name to its authorized role list. A user must belong to a listed realm and hold one of that realm's roles.

3. Run aviso-server

Auth is now enforced for:

  • Admin endpoints (/api/v1/admin/*) — always require auth + admin role.
  • Stream endpoints (/api/v1/notification, /api/v1/watch, /api/v1/replay) — only when the target schema sets auth.required: true.

For full field-level documentation, see the auth section in Configuration Reference.

Trusted Proxy Mode

Use trusted_proxy when Aviso sits behind a reverse proxy or API gateway that handles authentication. The proxy authenticates the user (via OIDC, SAML, etc.) and forwards a signed JWT to Aviso.

Aviso validates the forwarded Authorization: Bearer <jwt> locally using jwt_secret. Username and roles are read directly from JWT claims — no outbound call to auth-o-tron is made.

auth:
  enabled: true
  mode: trusted_proxy
  jwt_secret: "shared-signing-secret"
  admin_roles:
    ecmwf: ["admin"]

auth_o_tron_url is not required in this mode.

Per-Stream Authentication

Streams support separate read and write access controls. Read access governs /watch and /replay; write access governs /notification.

Configure authentication per stream in your notification schema:

notification_schema:
  # Public — no auth section means anonymous access
  public_events:
    payload:
      required: true
    topic:
      base: "public"

  # Authenticated — any valid user can read, only admins can write
  internal_events:
    payload:
      required: true
    topic:
      base: "internal"
    auth:
      required: true

  # Separate read/write roles
  sensor_data:
    payload:
      required: true
    topic:
      base: "sensor"
    auth:
      required: true
      read_roles:
        internal: ["analyst", "consumer"]
        external: ["partner"]
      write_roles:
        internal: ["producer"]

  # Realm-wide read access using wildcard, restricted write
  shared_events:
    payload:
      required: true
    topic:
      base: "shared"
    auth:
      required: true
      read_roles:
        internal: ["*"]
        external: ["analyst"]
      write_roles:
        internal: ["producer", "operator"]

Read vs. write access defaults

auth.requiredread_roleswrite_rolesRead (watch/replay)Write (notify)
false or omittedAnyoneAnyone
trueomittedomittedAny authenticated userAdmins only
truesetomittedMust match read_rolesAdmins only
trueomittedsetAny authenticated userMust match write_roles or be admin
truesetsetMust match read_rolesMust match write_roles or be admin

Admins (users matching global admin_roles) always have both read and write access.

Role matching rules

Both read_roles and write_roles map realm names to role lists. A user's realm claim from the JWT must match a key in the map, and the user must hold at least one of that realm's listed roles.

  • Wildcard "*" — use ["*"] as the role list to grant access to all users from a realm, regardless of their specific roles.
  • Omitted role list — when read_roles is omitted, any authenticated user can read. When write_roles is omitted, only admins can write.

When a per-stream auth block is present, auth.required must be explicitly set to either true or false.

ECPDS Destination Authorization

When built with --features ecpds, Aviso supports an optional authorization plugin that checks whether a user has access to a specific ECPDS destination before allowing watch or replay requests. The plugin is read-only: it never runs on notify.

Enabling the plugin

  1. Build Aviso with the ecpds feature:
cargo build --release --features ecpds

On a build without this feature, any YAML containing plugins: ["ecpds"] is rejected at startup with an error pointing at the offending stream. This is deliberate: silently skipping the plugin would widen access.

  1. Add a top-level ecpds section to your config with ECPDS service credentials:
ecpds:
  username: "ecpds-service-account"
  password: "service-password"
  servers:
    - "https://ecpds-primary.ecmwf.int"
    - "https://ecpds-secondary.ecmwf.int"
  match_key: "destination"
  target_field: "name"            # default: "name"
  cache_ttl_seconds: 300          # default: 300 (5 min)
  max_entries: 10000              # default: 10000
  request_timeout_seconds: 30     # default: 30
  connect_timeout_seconds: 5      # default: 5
  partial_outage_policy: strict   # default: strict; alternative: any_success
  1. Enable the plugin on a stream by adding plugins: ["ecpds"] to its auth block. Minimal canonical shape:
notification_schema:
  dissemination:
    payload:
      required: true
    topic:
      base: "diss"
      key_order: ["destination", "target", "class", "expver", "domain", "date", "time", "stream", "step"]
    identifier:
      destination:
        type: StringHandler
        required: true                # MUST be required
      # ... other fields ...
    auth:
      required: true                  # MUST be true
      plugins: ["ecpds"]

read_roles is optional. If you want a realm-wide gate before ECPDS even runs (e.g. block users from realms you don't trust to query ECPDS in the first place), add it; if not, omit it and the plugin runs for every authenticated user.

The plugin requires (and startup validation enforces):

  • match_key (default "destination") is present in the schema's identifier and marked required: true there. Without this, an operator could deploy a schema where the destination value is optional and a client could bypass the check by simply omitting the field. The plugin reads match_key from the request's canonicalized identifier params at runtime, so the field does NOT have to appear in topic.key_order.
  • auth.required is true. The plugin runs after standard stream auth, so plugins on a stream where auth.required is false would never execute.

How it works at runtime

  1. Standard role-based stream auth runs first. If it fails (missing token, wrong realm/role), the request fails before the ECPDS plugin sees it.
  2. The plugin extracts the match_key value (e.g. destination) from the request's canonicalised identifier.
  3. It looks up the user's destination list in an in-process cache. If absent, it queries the configured ECPDS servers in parallel, then merges per the partial_outage_policy.
  4. If the requested destination is in the user's list, the request proceeds. Otherwise, 403 Forbidden.
  5. Users matching the global auth.admin_roles bypass step 2-4 entirely.

Partial-outage policy

When more than one ECPDS server is configured, the user's effective destination list is always the union of every per-server response. ECMWF ECPDS deployments are typically federated (e.g. diss-monitor and aux-monitor cover different destination namespaces), so a user's full entitlement is the combination of what each server reports. The partial_outage_policy field only governs how tolerant the merge is when one of those servers fails or times out.

ValueBehaviourOperational implication
strict (default)Every configured server must reply successfully within the per-request timeout. The destination list is the union of their responses. Any one server failing fails the whole lookup with 503.A single ECPDS server going down takes the plugin to 503. The trade is: 503 (try again later) is preferred over 403 (definitely no access) when we can't be sure we saw the user's complete entitlement set.
any_successTake the union of whichever servers responded successfully within the per-request timeout. Failed servers are silently dropped from the merge. Only fails if no server responded usefully.Keeps serving during a partial outage. The cost: if a user's only entitlement to a destination lived on an unreachable server, that user will see 403 until the server is back, even though their access is genuinely valid.

Error responses

CodeHTTP StatusWhen
FORBIDDEN403User does not have access to the requested destination, or the required identifier field is missing. Tracing event: auth.ecpds.check.denied (with reason ∈ {DestinationNotInList, MatchKeyMissing}).
SERVICE_UNAVAILABLE503Upstream / network problem: the lookup failed under the active partial_outage_policy. Tracing event: auth.ecpds.check.unavailable. The cause is on the aviso_ecpds_fetch_total{outcome=…} metric (e.g. unreachable, http_401, http_4xx, http_5xx, invalid_response). Investigate ECPDS, the network, and the service-account credentials.
INTERNAL_ERROR500Aviso-side server error: missing AuthSettings in app_data, no checker registered, or an unexpected plugin error. Tracing event: auth.ecpds.check.error (with error_kind for the misconfiguration cases). Investigate Aviso, not ECPDS.

Caching

Destination lists are cached per user for cache_ttl_seconds (default 300 seconds). The cache holds at most max_entries users (default 10 000) and uses moka's TinyLFU eviction policy when full (TinyLFU mixes recency with admission frequency, so a one-shot scan does not flush the working set). Successful results are cached. Errors are not cached. A short ECPDS outage will not get extended by stale 503s sitting in the cache.

The cache is single-flight: when many requests for the same user arrive at the same time and the user is not yet cached, only one upstream call goes to ECPDS. The rest wait for that one call's result. This protects ECPDS when many SSE clients reconnect at once.

The cache lives in process memory. Restarting Aviso clears it. Multiple replicas have independent caches.

What is not checked

notify (write) is never gated by ECPDS. The plugin applies only to reads (watch, replay).

No retries by design

Aviso does not retry failed ECPDS calls. A 503 is the signal to investigate ECPDS itself, not to bump timeouts. See the ECPDS runbook for triage steps.

For the full ecpds field reference, see the ecpds section in Configuration Reference. For metrics and tracing event names, see the ECPDS runbook.

Admin Endpoints

Admin endpoints always require authentication and one of the configured admin_roles, regardless of per-stream settings:

  • DELETE /api/v1/admin/notification/{id}
  • DELETE /api/v1/admin/wipe/stream
  • DELETE /api/v1/admin/wipe/all

See Admin Operations for request/response details.

Disabling Authentication

auth:
  enabled: false

Or omit the auth section entirely. When auth is disabled, all endpoints are publicly accessible.

Startup fails if global auth is disabled while any schema defines auth.required: true or non-empty auth.read_roles/auth.write_roles. Remove stream-level auth blocks before disabling global auth.

Client Usage

Bearer token (both modes)

# Watch an authenticated stream:
curl -N -H "Authorization: Bearer <jwt-token>" \
  -X POST http://localhost:8000/api/v1/watch \
  -H "Content-Type: application/json" \
  -d '{"event_type": "private_events", "identifier": {}}'

Basic credentials (direct mode only)

# Notify with Basic auth:
curl -X POST http://localhost:8000/api/v1/notification \
  -u "admin-user:admin-pass" \
  -H "Content-Type: application/json" \
  -d '{"event_type": "ops_events", "identifier": {"event_type": "deploy"}, "payload": "ok"}'

In direct mode, Aviso forwards Basic credentials to auth-o-tron, which authenticates the user and returns a JWT. The response JWT is validated and used for authorization.

Error Responses

Auth errors use a subset of the standard API error shape with three fields (code, error, message; no details):

{
  "code": "UNAUTHORIZED",
  "error": "unauthorized",
  "message": "Authorization header is required"
}
CodeHTTP StatusWhen
UNAUTHORIZED401Missing Authorization header, invalid token format, expired or bad signature.
FORBIDDEN403Valid credentials but user lacks the required role for the stream or admin endpoint.
SERVICE_UNAVAILABLE503auth-o-tron is unreachable or returned an unexpected error (direct mode only).

A 401 response includes a WWW-Authenticate header indicating the supported scheme (Bearer in trusted-proxy mode; Bearer, Basic in direct mode).

ECPDS Plugin Runbook

This page is for the on-call engineer dealing with an ECPDS authorization issue at 3 AM. Read the ECPDS Destination Authorization page first if you haven't already.

At a glance

  • The plugin is read-only (watch, replay). The notify endpoint is never gated by ECPDS.
  • The plugin fails closed: it will never accidentally allow a request. The status code distinguishes where the problem is. 503 Service Unavailable means the ECPDS check could not reach a verdict (an upstream / partial-outage problem); investigate ECPDS and the network. 500 Internal Server Error means the plugin itself hit a server-side bug or a misconfiguration on Aviso's side (missing AuthSettings, no checker registered, an unexpected plugin error); investigate Aviso. The full mapping is in the response codes table below.
  • The plugin does not retry. A 503 is the signal to investigate ECPDS; a 500 is the signal to investigate Aviso.
  • The cache lives in process memory. Restarting Aviso clears it. Replicas have independent caches.
  • The default partial_outage_policy is strict: every configured ECPDS server must respond successfully or the call fails with 503. A single ECPDS server going away takes the whole plugin down. This is intentional. The destination list itself is the union of every server's response under both policies; the choice is purely about how tolerant we are of per-server failures.

Response codes the plugin emits

HTTPWhere the problem isTracing eventTrigger
200Allowedauth.ecpds.check.allowedDestination is in the user's ECPDS allow-list.
403Authorisationauth.ecpds.check.deniedDestination is not in the user's allow-list (reason=DestinationNotInList), or the request omitted the configured match_key field (reason=MatchKeyMissing).
503Upstream / networkauth.ecpds.check.unavailableThe merged ECPDS fetch could not reach a verdict under the active partial_outage_policy. Investigate ECPDS, the network, and the service-account credentials. The fetch_outcome field on the event narrows it down further.
500Aviso (this binary)auth.ecpds.check.errorA server-side bug or local misconfiguration: AuthSettings not registered as app_data, no EcpdsChecker in app_data, or an unexpected plugin error. Investigate Aviso, not ECPDS.

Symptom and first checks

What "a storm of X" means here. Throughout this section, "503 storm" or "403 storm" means: the rate of that response is far above the normal baseline. The rule of thumb is, if your dashboard shows the rate climbing fast or staying high for more than a minute or two, treat it as a storm. The first metric and first log lines below are how you confirm it.

Metrics count requests, not users. Every counter in this section increments per request. A single client retrying in a tight loop can inflate any of them, so the metric alone cannot tell you whether you are looking at one bad client or hundreds of unhappy users. To answer that, grep the relevant warn or error event in your logs and count distinct values of the username field. Every auth.ecpds.check.* and auth.ecpds.fetch.* warn/error event carries username as a structured field for exactly this purpose.

One useful tell. Successful and deny_destination results are cached for the user; errors are not. So for deny_destination, retries by the same user keep adding to aviso_ecpds_access_decisions_total{outcome="deny_destination"} but stop adding to aviso_ecpds_fetch_total until the TTL expires. If access_decisions_total is climbing fast and fetch_total is flat, you are almost certainly seeing one or a few users retrying against a cached deny rather than a real population spike. For unavailable (503), errors are not cached, so retries do reach ECPDS and both counters move together.

Field-name reading guide. When this section says reason=DestinationNotInList, the actual log line will look like ... reason=DestinationNotInList "ECPDS access denied". Use the exact strings shown when grepping. Metric outcome=... labels use snake_case (deny_destination, http_401, etc.).

503 storm on watch/replay

  • What you see: sustained HTTP 503 responses on /api/v1/watch and /api/v1/replay. From a user's perspective: "I cannot start a watch; Aviso says ECPDS is inaccessible."
  • Why it happens: the ECPDS plugin tried to fetch destination lists from your ECPDS servers and could not reach a verdict, so it failed safely with 503 rather than guess.
  • First metric: aviso_ecpds_fetch_total rate, broken down by outcome.
  • First log: event_name=auth.ecpds.fetch.failed and event_name=auth.ecpds.check.unavailable.
  • Confirm scope: count distinct username values in event_name=auth.ecpds.check.unavailable log lines. Many distinct usernames means an ECPDS-side or network problem. One or two usernames means a single misbehaving client is moving the metric.
  • Likely causes (read off the dominant outcome label):
    • unreachable: ECPDS server down, network partition, DNS, or wrong servers URLs in config.
    • http_401 or http_403: service-account credentials wrong or revoked.
    • http_4xx: an unexpected client-side response, most often 404 (a misconfigured base URL pointing somewhere that isn't ECPDS) or 429 (the service-account is being rate-limited).
    • http_5xx: ECPDS itself is broken.
    • invalid_response: ECPDS response shape no longer matches what the parser expects (the contract has changed).

403 storm on watch/replay

  • What you see: sustained HTTP 403 responses on /api/v1/watch and /api/v1/replay. From a user's perspective: "I used to be able to read this destination, now Aviso says I'm not allowed."
  • Why it happens: authentication is fine (otherwise it would be a 401), and Aviso did reach ECPDS (otherwise it would be a 503), but ECPDS replied that the user does not have the requested destination on their list.
  • First metric: aviso_ecpds_access_decisions_total{outcome="deny_destination"} rate.
  • First log: event_name=auth.ecpds.check.denied with reason=DestinationNotInList.
  • Confirm scope: count distinct username values in those log lines. If you see many distinct users, the cause is upstream of Aviso (ECPDS revoked destinations for several users, or a client batch suddenly started passing the wrong destination). If you see one or two, focus on those clients. Cross-check by hitting the ECPDS web UI directly with the same user.

403 with reason=MatchKeyMissing

  • What you see: any 403 on /api/v1/watch or /api/v1/replay whose log carries reason=MatchKeyMissing. Even one of these is suspicious; a stream of them means a misconfigured deployment is in production.
  • Why it happens: the request body did not include the configured match-key field (e.g. no destination value at all). The plugin can't check what isn't there, so it denies. Startup validation enforces that this field is required: true in the schema, so the only way to see this in practice is if the running config has drifted from what was validated.
  • First metric: aviso_ecpds_access_decisions_total{outcome="deny_match_key_missing"} rate.
  • First log: event_name=auth.ecpds.check.denied with reason=MatchKeyMissing.
  • Likely cause: the schema's match_key field is required, but a client is omitting it. Startup validation should have prevented this configuration in the first place. Investigate config drift.

Quiet, no allows

  • What you see: ECPDS-protected reads are happening (you see watch/replay traffic in your access logs and they return 200), but the aviso_ecpds_access_decisions_total{outcome="allow"} counter stays flat at zero, and you never see event_name=auth.ecpds.check.allowed in logs.
  • Why it happens: the plugin is not running for those reads. Either the build doesn't include it, or the schema isn't wired up to use it.
  • First metric: aviso_ecpds_access_decisions_total{outcome="allow"} rate is zero.
  • First log: there isn't one. The plugin is not running.
  • Likely causes:
    • The binary was built without --features ecpds. Startup would have errored if any schema referenced ["ecpds"], so this is unlikely on a real deployment.
    • The schema does not actually have plugins: ["ecpds"].
    • auth.required is false on the schema, so the plugin is unreachable.

Cache thrashing or latency spike

  • What you see: average and p99 latency on /api/v1/watch and /api/v1/replay are climbing, and the cache miss counter is rising significantly faster than the cache hit counter. From a user's perspective: "My watches and replays feel slower than usual."
  • Why it happens: "cache thrashing" means the cache is barely helping. Most requests are missing the cache and Aviso ends up making a fresh ECPDS call on every request. That call adds latency to every request and load to ECPDS. Possible reasons: cache TTL is too short and entries expire before they're reused; cache is too small and entries get evicted before reuse; or there are genuinely so many distinct usernames that no cache size would fit them all.
  • First metric: ratio of aviso_ecpds_cache_misses_total to aviso_ecpds_cache_hits_total, plus aviso_ecpds_cache_size.
  • First log: rate of event_name=auth.ecpds.cache.miss.
  • Likely cause: high miss rate with a high number of distinct usernames means cache_ttl_seconds is too short, max_entries is too small, or there are genuinely many unique users.

Tracing event reference

Every event uses the codebase's standard structured shape (service_name, service_version, event_name, plus event-specific fields). The list below covers each event with a one-line meaning. Field-value details follow.

EventLevelMeaning
auth.ecpds.check.starteddebugThe plugin started checking access for a request.
auth.ecpds.check.allowedinfoThe plugin allowed the request.
auth.ecpds.check.deniedwarnThe plugin denied the request. See reason field.
auth.ecpds.check.unavailablewarnThe plugin failed to reach a verdict. See fetch_outcome field.
auth.ecpds.check.errorerrorAn unexpected error in the plugin. See error_kind or error field.
auth.ecpds.admin.bypassinfoAn admin user skipped the ECPDS check.
auth.ecpds.cache.hitdebugThe destination list came from cache.
auth.ecpds.cache.missdebugThe destination list was not in cache; a fetch was triggered.
auth.ecpds.fetch.succeededdebugA fetch to one ECPDS server succeeded.
auth.ecpds.fetch.failedwarnA fetch to one ECPDS server failed. See error field.
auth.ecpds.fetch.skipped_inactiveinfoOne or more ECPDS records returned by a single server had active != true (false, missing, or not a boolean) and got dropped from the user's allow-list. Carries server_index, server, username, skipped, total.
auth.ecpds.fetch.skipped_recordinfoOne or more ECPDS records returned by a single server were active but missing the configured target_field and got dropped. Carries server_index, server, username, target_field, skipped, total so on-call can pinpoint which ECPDS server is producing the malformed records.

Common fields

Most events carry event_type (the schema name) and username (the JWT subject). Per-server events (auth.ecpds.fetch.succeeded, .failed, and .skipped_record) also carry server_index (zero-based) and server (the parsed URL).

Field value reference

Some events carry a typed enum field. The values you will see in logs are listed below. They are spelled exactly as shown.

  • reason (on auth.ecpds.check.denied):
    • DestinationNotInList: the user is not entitled to the requested destination.
    • MatchKeyMissing: the request body did not include the configured match-key field.
  • fetch_outcome (on auth.ecpds.check.unavailable):
    • Unauthorized, Forbidden: an ECPDS server returned 401 or 403.
    • ClientError: an ECPDS server returned a 4xx other than 401 or 403 (commonly 404 for a misconfigured base URL or 429 for throttling).
    • ServerError: an ECPDS server returned 5xx.
    • InvalidResponse: an ECPDS server returned a body the parser could not read.
    • Unreachable: network or timeout failure.
  • cache_outcome (on every auth.ecpds.check.* event: .allowed, .denied, .unavailable, .error):
    • hit: served from cache.
    • miss_coalesced: the cache was empty for this key but a concurrent caller's fetch was in flight; this request waited on it.
    • miss_fetched: this request ran the upstream fetch itself. The merged per-server result of that fetch is recorded as the outcome label on the aviso_ecpds_fetch_total metric, and on the auth.ecpds.check.unavailable event also as fetch_outcome (see above). It is intentionally NOT inlined into cache_outcome so log filters keyed on cache_outcome:miss_fetched stay stable as new FetchOutcome variants are added.
    • none: cache lookup was deliberately skipped because the request fell at the MatchKeyMissing deny path before any cache call ran. Only appears on auth.ecpds.check.denied events alongside reason=MatchKeyMissing.

How to confirm "config error vs. upstream outage"

  1. Is the ECPDS plugin even compiled in? Check /metrics for aviso_ecpds_* series. The unlabelled counters and gauge plus the pre-initialised label values on aviso_ecpds_access_decisions_total and aviso_ecpds_fetch_total register at process startup whenever the binary is built with --features ecpds, regardless of whether an ecpds: config block exists. If the series are absent, the binary does not have the feature, OR the metrics endpoint itself is disabled (metrics.enabled: false in your config). If the series exist but aviso_ecpds_access_decisions_total{outcome="allow"} plus outcome="deny_*" are all flat at zero under load, the plugin is compiled in but no stream actually opts in via plugins: ["ecpds"].
  2. Are the configured server URLs reachable from this Aviso host? Run this from the same host as Aviso:
    curl -i -u "<service-username>:<service-password>" \
         "https://<your-ecpds-host>/ecpds/v1/destination/list?id=<some-test-username>"
    
    • 200 with a JSON destinationList: ECPDS is up and credentials are valid. Problem is on the Aviso side.
    • 401 or 403: service-account credentials are wrong (rotated, revoked, typoed).
    • 5xx or hang: ECPDS itself is broken.
    • DNS error or connection refused: network-level issue.
  3. Is one specific user being denied while others succeed? Run the curl above with that user's id and compare with the destination they tried to read.

Blast radius of partial_outage_policy=strict

With strict, one ECPDS server going away takes the whole plugin to 503. Any reader on a stream with plugins: ["ecpds"] will see 503 until the missing server returns. The destination list itself would still be a union once both servers respond again; the policy only governs how strictly we treat per-server failures.

If you would rather keep serving requests during a partial outage at the cost of possibly missing entitlements that lived only on the unreachable server, switch to partial_outage_policy: any_success. Read the trade-off in the Partial-outage policy section before flipping.

What "the cache is process-local" implies

  • Restarting Aviso flushes everyone's destination cache. Expect a brief upstream-call spike right after a restart.
  • Multiple Aviso replicas keep independent caches. A user routed to a different replica will see a fresh fetch.
  • There is no admin endpoint to flush a single user's cache. The next request after cache_ttl_seconds will re-fetch automatically. For an immediate flush, restart the replica.

What this runbook deliberately does not tell you

  • ECPDS API specifics. There is no public ECPDS REST documentation as of this writing. What Aviso assumes about the response shape (e.g. destinationList[].name, success: "yes") is captured as automated contract tests under aviso-ecpds/tests/fixtures/ and aviso-ecpds/tests/contract.rs. If those tests start failing on a real ECPDS environment, the contract has changed and Aviso needs an update.
  • Kerberos, mTLS, or SSO to ECPDS. Aviso uses HTTP Basic Auth only. Switching to a different auth mechanism would need code changes.

Streaming Semantics

This page defines the exact behavior of the watch and replay endpoints, including start points, spatial filtering, identifier constraints, and SSE lifecycle events.


Request ID Correlation

Every HTTP response carries an X-Request-ID header with a per-request UUID. The same UUID is also embedded in the JSON data: payload of certain SSE events so that a client which only sees the body (not headers) can still quote it back when reporting a problem.

A note on terminology before the tables: SSE has two related but distinct labels for an event. The event: line is the SSE-level type that an EventSource.addEventListener(name, handler) call dispatches on. The data.type field (when present) is an aviso-level discriminator inside the JSON body, used for the cases where we reuse a single event: name for several control events. A wire example:

event: live-notification
data: {"type":"connection_established","topic":"...","timestamp":"...","connection_will_close_in_seconds":3600,"request_id":"<uuid>"}

The event: line above is live-notification (so an EventSource client listens for live-notification); the data.type value is connection_established (so the client distinguishes it from a normal notification, which has no type field).

In-stream events that include request_id:

SSE event:data.type (when present)FrequencyPurpose
live-notificationconnection_establishedOnce at the start of a live-only watchFirst-event correlation
replay-controlreplay_startedOnce at the start of any stream that begins with replayFirst-event correlation
error(none; uses error field as discriminator)Rare, on mid-stream backend or CloudEvent-creation failureFailure-event correlation
connection-closing(none; uses reason field as discriminator)Once on graceful closeFinal-event correlation

In-stream events that intentionally do not include request_id:

SSE event:data.type (when present)FrequencyWhy
live-notification(none; CloudEvent body)Per messageRepeating the same UUID on every notification would inflate the wire for no extra value (correlation is already covered by the first event and the response header)
replay(none; CloudEvent body)Per messageSame
heartbeat(none)Every few secondsSame
replay-controlreplay_completed, notification_replay_limit_reachedReplay phase boundariesThe first replay-control event (replay_started) already carries the UUID; repeating it is noise

The first event of any stream is guaranteed to carry the request_id (a live-notification event with data.type = "connection_established" for live-only watches, or a replay-control event with data.type = "replay_started" for any stream that begins with replay).

Reconnecting after disconnect

If a stream drops (network blip, client restart, connection_closing with reason max_duration_reached, etc.), the recommended reconnect protocol is:

  1. Remember the sequence field of the last live-notification or replay event you successfully processed. The sequence is in the CloudEvent payload of every notification.
  2. Issue a fresh POST /api/v1/watch (or /api/v1/replay) with from_id set to that sequence value plus 1.

That gives you exact at-least-once continuation without losing or duplicating notifications.

Aviso does not use the SSE id: field or the Last-Event-ID request header. Both are part of the browser EventSource auto-reconnect mechanism; aviso supports a richer from_id + from_date reconnect contract via the POST request body, and we deliberately keep one explicit reconnect mechanism rather than expose two overlapping ones.

If you need time-based catch-up rather than sequence-based, use from_date instead of from_id (see Start Point for Historical Events below for accepted formats).

SSE Stream Lifecycle

Every streaming response (watch or replay) goes through a typed lifecycle:

stateDiagram-v2
    [*] --> Connected : SSE connection established
    Connected --> Replaying : from_id or from_date provided
    Connected --> Live : no replay parameters (watch only)
    Replaying --> Live : replay_completed (watch only)
    Replaying --> Closed : end_of_stream (replay only)
    Live --> Closed : max_duration_reached
    Live --> Closed : server_shutdown
    Closed --> [*]

Close reasons emitted in the final connection_closing SSE event:

ReasonTrigger
end_of_streamReplay finished (/replay endpoint, or watch replay phase if live subscribe fails)
max_duration_reachedconnection_max_duration_sec elapsed on a watch stream
server_shutdownServer is shutting down gracefully

POST /api/v1/watch

  • If both from_id and from_date are omitted:
    • stream is live-only (new notifications from now onward).
  • If exactly one replay parameter is present:
    • historical replay starts first, then transitions to live stream.
  • If both are present:
    • request is rejected with 400.
flowchart TD
    A["Watch Request"] --> B{"from_id or<br/>from_date?"}
    B -->|neither| C["Live-only stream"]
    B -->|exactly one| D["Historical replay<br/>then live stream"]
    B -->|both| E["400 Bad Request"]

    style E fill:#8b1a1a,color:#fff
    style C fill:#1a6b3a,color:#fff
    style D fill:#1a4d6b,color:#fff

POST /api/v1/replay

  • Requires exactly one replay start parameter:
    • from_id (sequence-based), or
    • from_date (time-based).
  • If both are missing or both are present:
    • request is rejected with 400.
  • Stream closes with end_of_stream when history is exhausted.

Start Point for Historical Events

from_id starts delivery from that sequence number (inclusive). from_date accepts any of these formats:

FormatExample
RFC3339 with timezone2025-01-15T10:00:00Z
RFC3339 with offset2025-01-15T10:00:00+02:00
Space-separated with timezone2025-01-15 10:00:00+00:00
Naive datetime (interpreted as UTC)2025-01-15T10:00:00
Unix seconds (≤ 11 digits)1740509903
Unix milliseconds (≥ 12 digits)1740509903710

All inputs are normalized to UTC internally.


Spatial Filter Model

Spatial filtering applies on top of the identifier field filters. Think of it in two layers:

  • Non-spatial identifier fields (time, date, class, etc.) narrow candidates by topic routing.
  • Spatial fields (polygon or point) further narrow that candidate set geographically.
flowchart LR
    A["Candidate<br/>notifications"] -->|"non-spatial<br/>identifier filter"| B["Topic-matched<br/>subset"]
    B -->|"spatial filter<br/>if provided"| C["Final<br/>results"]

Rules

identifier.polygonidentifier.pointResult
providedomittedpolygon-intersects-polygon filter
omittedprovidedpoint-inside-notification-polygon filter
omittedomittedno spatial filter
providedprovided400 Bad Request
  • identifier.polygon: keep notifications whose stored polygon intersects the request polygon.
  • identifier.point: keep notifications whose stored polygon contains the request point.
  • Both together: invalid — request is rejected.

Identifier Constraints (watch / replay)

For schema-backed event types, identifier fields in watch/replay requests accept constraint objects instead of (or in addition to) scalar values. A scalar value is treated as an implicit eq constraint.

Constraint objects are rejected on /notification — notify only accepts scalar values.

Supported operators by field type

HandlerOperators
IntHandlereq, in, gt, gte, lt, lte, between
FloatHandlereq, in, gt, gte, lt, lte, between
EnumHandlereq, in

Notes

  • between expects exactly two values [min, max] and is inclusive on both ends.
  • Float constraints reject NaN and inf — only finite values are valid.
  • Float eq and in use exact numeric equality; no tolerance window is applied.
  • A constraint object must contain exactly one operator — combining operators in a single object is rejected.

Examples

{ "severity": { "gte": 5 } }
{ "severity": { "between": [3, 7] } }
{ "region":   { "in": ["north", "south"] } }
{ "anomaly":  { "lt": 50.0 } }

Backend Behavior

BackendHistorical replayLive watch
in_memoryNode-local only; clears on restartNode-local fan-out
jetstreamDurable; survives restartsCluster-wide fan-out

SSE Timestamp Format

All control, heartbeat, and close event timestamps use canonical UTC second precision:

YYYY-MM-DDTHH:MM:SSZ

Example: 2026-02-25T18:58:23Z


Replay Payload Shape

  • Replay and watch CloudEvent output always includes data.payload.
  • If a notify request omitted payload (optional schema), replay returns data.payload = null.
  • Payload values are not reshaped — scalar strings remain strings, objects remain objects.

See Payload Contract for the full input → storage → output mapping.


For end-to-end examples, see:

Payload Contract

This page defines the canonical payload behavior for Aviso notifications.

Scope

  • Applies to POST /api/v1/notification input.
  • Applies to stored backend payload representation.
  • Applies to replay/watch CloudEvent output payload field.

Schema Configuration

Per event schema, payload configuration is:

payload:
  required: true # or false

There is no payload.type list.

Canonical Rules

  1. Payload values are JSON values: object, array, string, number, boolean, null.
  2. If payload.required = true and request omits payload, request is rejected (400).
  3. If payload.required = false and request omits payload, Aviso stores canonical JSON null.
  4. Aviso does not wrap or reshape payload values (for example no auto-wrapping into {"data": ...}).

Input to Storage to Replay Mapping

Notify request payloadStored payloadReplay/Watch CloudEvent data.payload
omitted (optional schema)nullnull
"forecast complete""forecast complete""forecast complete"
424242
truetruetrue
["a","b"]["a","b"]["a","b"]
{"note":"ok"}{"note":"ok"}{"note":"ok"}

Failure Cases

  • Missing required payload:
    • HTTP 400
    • validation error (INVALID_NOTIFICATION_REQUEST)
  • Malformed JSON request body:
    • HTTP 400
    • parse error (INVALID_JSON)

Consumer Guidance

  • Treat data.payload as dynamic JSON.
  • If your client requires object-only payloads, normalize on the client side.
    • Example strategy: for non-object payloads, convert to {"data": <payload>} in your consumer.

Topic Encoding

Aviso uses a single backend-agnostic wire format for topics across all backends.


Why This Exists

NATS subject tokenization uses . as the separator. Wildcards (*, >) also operate on dot-delimited tokens. If topic field values contain any of these reserved characters, they would silently break routing and filtering.

Example of the problem:

Logical value:  1.45
Naive subject:  mars.od.1.45      ← looks like 4 tokens, not 3

To prevent this, Aviso percent-encodes each token value before assembling the wire subject.


Encoding Rules

Only four characters are reserved and must be encoded:

CharacterEncoded formReason
.%2ENATS token separator
*%2ANATS single-token wildcard
>%3ENATS multi-token wildcard
%%25Escape character itself (keeps decoding unambiguous)

All other characters pass through unchanged.


Encode → Wire → Decode Flow

flowchart LR
    A["Logical value<br/>e.g. 1.45"] -->|encode| B["Wire token<br/>e.g. 1%2E45"]
    B -->|assemble| C["Wire subject<br/>e.g. extreme_event.north.1%2E45"]
    C -->|"split on '.'"| D["Wire tokens"]
    D -->|decode each| E["Logical tokens<br/>e.g. 1.45"]

    style A fill:#2a4a6b,color:#fff
    style C fill:#1a6b3a,color:#fff
    style E fill:#2a4a6b,color:#fff

The decoder is strict: malformed %HH sequences (e.g. %GG) are rejected, not passed through.


Examples

Encoding

Logical valueWire token
1.451%2E45
1*341%2A34
1>01%3E0
1%251%2525

Decoding (single pass)

Wire tokenLogical value
1%2E451.45
1%2A341*34
1%25251%25
1%251%

Impact on Wildcard Matching

Watch and replay requests use a two-step filter:

  1. Backend coarse filter — operates on wire subjects (NATS wildcard matching)
  2. App-level wildcard match — operates on decoded logical tokens

Both steps are safe with reserved characters because the app layer always decodes before matching. Subscribers never need to think about encoding in their filter values — Aviso handles it transparently.


Invariants

  • Wire subject separator is always .
  • One shared codec is used for all backends (JetStream and In-Memory)
  • Encoding is applied per-token, not per-subject
  • Decoding is a strict single-pass operation

API Errors

Errors produced by aviso's own handlers use a consistent JSON error object on 4xx and 5xx responses. A small number of 4xx responses are framework-level fallbacks rather than aviso-handled errors and use a different shape; see Framework-Level Fallbacks below.

Response Shape

{
  "code": "INVALID_REPLAY_REQUEST",
  "error": "Invalid Replay Request",
  "message": "Replay endpoint requires either from_id or from_date parameter...",
  "details": "Replay endpoint requires either from_id or from_date parameter...",
  "request_id": "0d4f6758-1ce3-4dda-a0f3-0ccf5fcb50d6"
}

Fields:

  • code: stable machine-readable error code.
  • error: human-readable error category.
  • message: top-level failure message (safe for client display).
  • details: deepest/root detail available.
  • request_id: per-request UUID. The same value is also returned in the X-Request-ID HTTP response header and in every server-side log line for this request. Quoting it is the easiest way to ask the operator to look up the corresponding traces.

Notes:

  • error_chain is logged server-side for diagnostics, but is not returned in API responses.
  • message is always present.
  • details is present on 4xx and 5xx errors emitted from the notification/watch/replay request path (parse, validation, processing, and SSE stream initialization). It is intentionally omitted from authentication errors (401/403/503) and from the streaming-auth helpers (forbidden, ECPDS service unavailable, internal misconfiguration), where the upstream service or authorization plugin does not provide a stable detailed message.
  • request_id is present on every error response body produced by aviso's own handlers (notification, watch, replay, schema, admin, auth and ECPDS authorization helpers). The X-Request-ID HTTP response header carries the same UUID on every response (success, aviso error, or framework-level fallback); see Framework-Level Fallbacks for the fallback cases where a JSON body field is not available.
  • SSE stream initialization failures additionally include topic (the decoded logical topic) so the operator can scope log queries faster.
  • The admin endpoints (/api/v1/admin/*) and POST /api/v1/notification return typed responses where request_id is a struct field rather than a free-form JSON key, but the field name and value are the same.

How to report a problem

Capture either of these and pass them to the operator:

  1. The X-Request-ID HTTP response header. Visible to curl -i, browser devtools, every reverse proxy, and most log aggregators. Present on every response, success or failure.
  2. The request_id field in any error response body. Same UUID as the header.

For streaming responses (/api/v1/watch, /api/v1/replay), the same UUID also appears in the JSON data: payload of the very first event and in any error or connection-closing event the stream emits before terminating. The exact wire shape depends on the stream variant:

  • For a live-only watch, the first event has SSE event: live-notification and a JSON body with "type": "connection_established".
  • For a stream that begins with replay, the first event has SSE event: replay-control and a JSON body with "type": "replay_started".

In both cases the request_id field is in the JSON body alongside type. This means a user who only sees the open-ended SSE body (no header parsing) can still recover the id without running the request again. See Streaming Semantics for the full event-by-event payload table, including the SSE event: versus data.type distinction (relevant for clients using EventSource.addEventListener).

Error Telemetry Events

These event_name values are emitted in structured logs:

Event NameLevelTrigger
api.request.parse.failedwarnJSON parse/shape/unknown-field failure before domain validation.
api.request.validation.failedwarnDomain/request validation failure (400).
api.request.processing.failederrorServer-side processing/storage failure (500).
stream.sse.initialization.failederrorReplay/watch SSE initialization failure (500).

Error Code Reference

CodeHTTP StatusMeaning
INVALID_JSON400Request body is not valid JSON.
UNKNOWN_FIELD400Request contains fields outside API contract.
INVALID_REQUEST_SHAPE400JSON structure cannot be deserialized into request model.
INVALID_NOTIFICATION_REQUEST400Notification request failed business validation.
INVALID_WATCH_REQUEST400Watch request failed validation (replay/spatial/schema rules).
INVALID_REPLAY_REQUEST400Replay request failed validation (start cursor/spatial/schema rules).
UNAUTHORIZED401Missing or invalid credentials (no token, bad format, expired, bad signature).
FORBIDDEN403Valid credentials but user lacks the required role.
NOTIFICATION_PROCESSING_FAILED500Notification processing pipeline failed before storage.
NOTIFICATION_STORAGE_FAILED500Backend write operation failed.
SSE_STREAM_INITIALIZATION_FAILED500Replay/watch SSE stream could not be created.
INTERNAL_ERROR500Fallback internal error code (reserved).
SERVICE_UNAVAILABLE503Auth service (auth-o-tron) unreachable or returned an unexpected error.

Examples

Invalid replay request:

{
  "code": "INVALID_REPLAY_REQUEST",
  "error": "Invalid Replay Request",
  "message": "Cannot specify both from_id and from_date...",
  "details": "Cannot specify both from_id and from_date...",
  "request_id": "0d4f6758-1ce3-4dda-a0f3-0ccf5fcb50d6"
}

Auth error (missing credentials on a protected stream). Auth errors use four fields (code, error, message, request_id); details is not included:

{
  "code": "UNAUTHORIZED",
  "error": "unauthorized",
  "message": "Authentication is required for this stream",
  "request_id": "0d4f6758-1ce3-4dda-a0f3-0ccf5fcb50d6"
}

SSE initialization failure:

{
  "code": "SSE_STREAM_INITIALIZATION_FAILED",
  "error": "SSE stream creation failed",
  "message": "Failed to create stream consumer",
  "details": "nats connect failed: timeout",
  "topic": "test_polygon.*.1200",
  "request_id": "0d4f6758-1ce3-4dda-a0f3-0ccf5fcb50d6"
}

Framework-Level Fallbacks

A few 4xx responses are produced by the Actix HTTP framework itself before any aviso handler runs, so the body shape is whatever the framework defaults to (typically text/plain or empty) and not the JSON object documented above:

StatusTrigger
404 Not FoundRequest path does not match any registered aviso route.
405 Method Not AllowedPath matches an aviso route but the HTTP method does not.
400 Bad Request (rare)Request fails framework-level checks (malformed Content-Length, etc.) before reaching aviso's body parsers.

For these cases:

  • code, error, message, details, and request_id JSON fields are not in the body.
  • The X-Request-ID HTTP response header is still set on 404 and 405 (the middleware stack runs before Actix's default route-mismatch responses), so an operator can correlate the request with server logs by header alone. Errors raised even earlier in the HTTP stack (e.g., malformed Content-Length, TLS handshake failure) bypass aviso's middleware entirely and produce no X-Request-ID; in those cases the request never reaches aviso, no log line is generated, and no correlation is possible.
  • The aviso code reference table above does not apply.

If you need a stable JSON shape on these paths, hit a known-good route (GET /health is the simplest); a 4xx from there indicates an actual aviso-handled error and follows the documented contract.

Admin Operations

Admin endpoints are destructive. Restrict access in production.

When authentication is enabled, all admin endpoints require a valid credential and one of the configured admin_roles. Add -H "Authorization: Bearer <token>" or -u user:pass (direct mode) to the curl examples below.

Delete One Notification

DELETE /api/v1/admin/notification/{notification_id}

notification_id format:

  • <stream>@<sequence> (canonical)
  • <event_type>@<sequence> (alias; resolved through configured schema topic.base)

How notification_id maps to your schema

Delete IDs use the stream key plus backend sequence number.

If your schema contains:

notification_schema:
  mars:
    topic:
      base: "mars"
  dissemination:
    topic:
      base: "diss"
  test_polygon:
    topic:
      base: "polygon"

Then valid delete IDs include:

  • mars@42 (event type alias and stream key are same)
  • dissemination@42 (alias form, resolved to stream key diss)
  • diss@42 (canonical stream key form)
  • test_polygon@306 (alias form, resolved to stream key polygon)
  • polygon@306 (canonical stream key form)

Example: replay ID then delete

Replay returns CloudEvent IDs like mars@1:

curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
  -H "Content-Type: application/json" \
  -d '{
    "event_type": "mars",
    "identifier": {
      "class": "od",
      "expver": "0001",
      "domain": "g",
      "date": "20250706",
      "time": "1200",
      "stream": "enfo",
      "step": "1"
    },
    "from_id": "1"
  }'

If one replayed event has "id":"mars@1", delete it with:

curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/notification/mars@1"

Response behavior

  • 200: notification deleted.
  • 404: stream/sequence pair not found.
  • 400: invalid ID format (<name>@<positive-integer> required).

Invalid examples:

  • mars (missing @sequence)
  • mars@0 (sequence must be > 0)
  • mars@abc (sequence must be an integer)

Wipe Endpoints

  • DELETE /api/v1/admin/wipe/stream
  • DELETE /api/v1/admin/wipe/all

These endpoints remove many messages at once and should be used with extreme caution.

Wipe One Stream

DELETE /api/v1/admin/wipe/stream

Request body:

{
  "stream_name": "MARS"
}

Example:

curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/wipe/stream" \
  -H "Content-Type: application/json" \
  -d '{"stream_name":"MARS"}'

What it does:

  • Removes all stored messages for the selected stream.
  • Keeps the stream definition/configuration in place.
  • New notifications can still be written to that stream immediately after wipe.

When to use:

  • You want to reset one event family (mars, diss, polygon) without affecting others.

Wipe All Streams

DELETE /api/v1/admin/wipe/all

Example:

curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/wipe/all"

What it does:

  • Removes all stored messages from all streams managed by the backend.
  • Leaves service configuration intact, but data history is gone.

When to use:

  • Local/dev reset before a fresh test run.
  • Operational emergency cleanup where full history removal is intended.

Which Admin Operation Should I Use?

  • Delete one notification (/admin/notification/{id}):
    • Use when you know the exact sequence to remove.
  • Wipe one stream (/admin/wipe/stream):
    • Use when one stream is polluted and others must remain untouched.
  • Wipe all (/admin/wipe/all):
    • Use only when complete history reset is intended.

Wipe Response Shape

Both wipe endpoints return:

{
  "success": true,
  "message": "..."
}

Failure responses keep the same shape with success: false and an error message.

Architecture

Aviso Server is built around three operations — Notify, Watch, and Replay — each sharing a common validation and schema layer but diverging at the backend interaction.


System Overview

graph TB
    subgraph Clients
        P(Publisher)
        W(Watcher)
        R(Replayer)
    end

    subgraph "Aviso Server"
        direction TB
        AM["Auth Middleware<br/>(optional)"]
        RT["Routes<br/>HTTP handlers"]
        VP["Validation &<br/>Processing"]
        NC["Notification<br/>Core"]
        BE["Backend<br/>Abstraction"]
    end

    AOT["auth-o-tron<br/>(external)"]

    subgraph Backend
        JS[("JetStream<br/>NATS")]
        IM[("In-Memory<br/>Process")]
    end

    P -->|POST /api/v1/notification| AM
    W -->|POST /api/v1/watch| AM
    R -->|POST /api/v1/replay| AM

    AM -.->|verify credentials| AOT
    AM --> RT
    RT --> VP
    VP --> NC
    NC --> BE
    BE --> JS
    BE --> IM

    JS -.->|SSE stream| W
    JS -.->|SSE stream| R
    IM -.->|SSE stream| W
    IM -.->|SSE stream| R

Notify Request Flow

When a publisher sends POST /api/v1/notification:

sequenceDiagram
    participant C as Publisher
    participant A as Auth Middleware
    participant R as Route Handler
    participant V as Validator
    participant P as Processor
    participant T as Topic Builder
    participant B as Backend

    C->>A: POST /api/v1/notification (JSON)
    alt stream requires auth
        A->>A: resolve user (JWT or auth-o-tron)
        A-->>C: 401/403 if unauthorized
    end
    A->>R: forward request (+ user identity)
    R->>V: parse & shape-check JSON
    V-->>R: 400 if malformed
    R->>P: process_notification_request()
    P->>P: look up event schema
    P->>P: validate each identifier field
    P->>P: canonicalize values (dates, enums)
    P->>T: build_topic_with_schema()
    T-->>P: topic string (e.g. mars.od.0001.g.20250706.1200)
    P->>B: put_message_with_headers()
    B-->>C: 200 { id, topic }

Key steps:

  1. Parse — raw JSON bytes are deserialized; unknown fields are rejected (UNKNOWN_FIELD)
  2. Validate — each identifier field is checked against its ValidationRules (type, range, enum values)
  3. Canonicalize — values are normalized (e.g. dates to YYYYMMDD, enums to lowercase)
  4. Build topic — fields are ordered per key_order, reserved chars are percent-encoded
  5. Store — the message is written to the backend with the encoded topic as the subject

Watch Request Flow

POST /api/v1/watch opens a persistent SSE stream. It optionally starts with a historical replay phase before transitioning to live delivery.

sequenceDiagram
    participant C as Subscriber
    participant A as Auth Middleware
    participant R as Route Handler
    participant P as Stream Processor
    participant F as Hybrid Filter
    participant B as Backend

    C->>A: POST /api/v1/watch (JSON)
    alt stream requires auth
        A->>A: resolve user (JWT or auth-o-tron)
        A-->>C: 401/403 if unauthorized
    end
    A->>R: forward request (+ user identity)
    R->>P: process_request (ValidationConfig::for_watch)
    P->>P: allow optional fields & constraint objects
    P->>P: analyze_watch_pattern() → coarse + precise patterns

    alt has from_id or from_date
        P->>B: fetch historical batch
        B-->>P: NotificationMessage[]
        P->>F: apply wildcard + constraint + spatial filter
        F-->>C: SSE: replay_started → events → replay_completed
    end

    P->>B: subscribe(coarse_pattern)
    loop live stream
        B-->>P: live NotificationMessage
        P->>F: apply precise filter
        F-->>C: SSE: notification event
    end

    C-->>R: disconnect / timeout
    R-->>C: SSE: connection_closing

Replay Request Flow

POST /api/v1/replay is like watch but historical-only — the stream closes when history ends.

sequenceDiagram
    participant C as Client
    participant A as Auth Middleware
    participant R as Route Handler
    participant P as Stream Processor
    participant B as Backend

    C->>A: POST /api/v1/replay (JSON + from_id or from_date)
    alt stream requires auth
        A->>A: resolve user (JWT or auth-o-tron)
        A-->>C: 401/403 if unauthorized
    end
    A->>R: forward request (+ user identity)
    R->>P: process_request (ValidationConfig::for_replay)
    P->>B: batch fetch from StartAt::Sequence or StartAt::Date
    loop batches
        B-->>P: NotificationMessage[]
        P->>P: filter + convert to CloudEvent
        P-->>C: SSE: notification events
    end
    P-->>C: SSE: replay_completed → connection_closing (end_of_stream)

SSE Streaming Pipeline

The streaming layer (src/sse/) is built around typed values rather than raw strings, which keeps the lifecycle explicit and the endpoint logic thin.

Cursor types — how a start point is represented internally:

  • StartAt::LiveOnly — no history, subscribe immediately
  • StartAt::Sequence(u64) — start from a specific backend sequence number
  • StartAt::Date(DateTime<Utc>) — start from a UTC timestamp

Frame types — what the stream produces before rendering to SSE wire format:

  • Control frames — connection_established, replay_started, replay_completed
  • Notification frames — a decoded CloudEvent ready for delivery
  • Heartbeat frames — periodic keep-alive
  • Error frames — non-fatal stream errors
  • Close frame — carries one of: end_of_stream, max_duration_reached, server_shutdown

Lifecycle (shutdown token, max duration, natural end) is applied once in a shared wrapper, so individual endpoint handlers don't need to reimplement it.


Component Map

ComponentPathRole
Routessrc/routes/Thin HTTP handlers — parse request, delegate, return response
Authsrc/auth/Middleware, JWT validation, role matching, auth-o-tron client
Handlerssrc/handlers/Shared parsing, validation, and processing logic
Notification coresrc/notification/Schema registry, topic builder/codec/parser, wildcard matcher, spatial
Backend abstractionsrc/notification_backend/NotificationBackend trait + JetStream and InMemory implementations
SSE layersrc/sse/Stream composition, typed frames, heartbeats, lifecycle
CloudEventssrc/cloudevents/Converts stored messages into CloudEvent envelope
Configurationsrc/configuration/Config loading, schema validation, global snapshot
Error modelsrc/error.rsStable HTTP error codes and structured responses

Hybrid Filtering

Watch subscriptions use a two-tier strategy to balance backend load with filter precision:

graph LR
    A[Watch Request] --> B[analyze_watch_pattern]
    B --> C["Coarse pattern<br/>e.g. mars.*.*.*"]
    B --> D["Precise pattern<br/>full decoded topic"]
    C -->|backend subscription| E[(NATS JetStream)]
    E -->|candidate messages| F[App-level filter]
    D --> F
    F -->|matched| G[SSE client]
    F -->|rejected| H[dropped]
  • The coarse pattern is sent to the backend as the subscription subject filter. It uses NATS wildcards and covers a superset of the desired messages.
  • The precise pattern is applied in-process on decoded topics + constraint objects + spatial checks. Only messages that pass both layers reach the client.

This avoids creating one backend subscription per unique topic while still delivering exact results.


JetStream Backend Internals

ModulePathResponsibility
Confignotification_backend/jetstream/config.rsDecode and validate JetStream settings
Connectionnotification_backend/jetstream/connection.rsNATS connect with retry
Streamsnotification_backend/jetstream/streams.rsCreate and reconcile streams
Publishernotification_backend/jetstream/publisher.rsPublish with retry on transient failures
Subscribernotification_backend/jetstream/subscriber.rsConsumer-based live subscriptions
Replaynotification_backend/jetstream/replay.rsPull consumer batch retrieval
Adminnotification_backend/jetstream/admin.rsWipe and delete operations

Backend Development Guide

This guide explains how to add a new notification backend in Aviso.

Required Contract

A backend must implement NotificationBackend in src/notification_backend/mod.rs.

Core requirements:

  • Implement publish/replay/subscribe/admin methods.
  • Implement capabilities() and return a stable BackendCapabilities map.
  • Keep startup/shutdown behavior explicit and logged.

Storage Policy Compatibility

Per-schema storage policy is validated at startup before backend initialization.

Validation entry point:

  • configuration::validate_schema_storage_policy_support(...)

Capability source:

  • notification_backend::capabilities_for_backend_kind(...)

If a schema requests unsupported fields, startup fails fast with a clear error. Do not silently ignore unsupported storage policy fields.

Capability Checklist

When adding backend <new_backend>:

  1. Add backend kind support in capabilities_for_backend_kind.
  2. Add NotificationBackend::capabilities() implementation.
  3. Ensure capability values match real backend behavior.
  4. Add tests for:
    • capability map values
    • accepted storage-policy fields
    • rejected storage-policy fields
  5. Add backend docs page and update summary links.

Minimal Capability Example

#![allow(unused)]
fn main() {
BackendCapabilities {
    retention_time: true,
    max_messages: true,
    max_size: false,
    allow_duplicates: false,
    compression: false,
}
}

Meaning:

  • retention_time and max_messages can be used in schema storage policy.
  • max_size, allow_duplicates, and compression must be rejected at startup.

Testing Expectations

  • Unit tests should verify capability flags are stable.
  • Validation tests should verify fail-fast messages for unsupported fields.
  • Integration tests should use test-local config/schema fixtures, not developer-local YAML files.