Introduction
Aviso Server is a real-time notification and historical replay system built for data dissemination pipelines. It lets publishers announce data availability events and lets subscribers receive those events — either live as they happen, or replayed from a chosen point in history.
It is designed for environments where timely, reliable notification of data availability is critical: scientific computing, operational weather forecasting pipelines, large-scale data distribution, and similar domains.
How It Works
At its core, Aviso Server exposes three operations:
graph LR
P(Publisher) -->|POST /api/v1/notification| A[Aviso Server]
A -->|store| B[("Backend<br/>JetStream / In-Memory")]
B -->|fan-out| W1("Subscriber A<br/>watch")
B -->|fan-out| W2("Subscriber B<br/>watch")
B -->|history| R("Client<br/>replay")
| Operation | Endpoint | Description |
|---|---|---|
| Notify | POST /api/v1/notification | Publish a notification event to the backend |
| Watch | POST /api/v1/watch | Stream live (and optionally historical) events over SSE |
| Replay | POST /api/v1/replay | Stream historical events only, then close |
Key Features
Schema-driven validation
Each event type can have a schema that defines which identifier fields are required, their types (date, time, integer, float, enum, polygon), allowed ranges, and topic ordering. Invalid notifications are rejected at the API boundary with a clear error.
Structured topic routing
Aviso builds a deterministic topic string from the notification's identifier fields. This topic is used to route, store, and filter messages in the backend. Subscribers can use wildcard patterns and constraint objects to filter the stream.
Hybrid filtering
Watch and replay requests are matched using a two-tier strategy: the backend handles coarse routing (broad subject filters), and Aviso applies precise application-level filtering (constraints, spatial checks) on top. This keeps backend subscription counts low while delivering exact results.
Spatial awareness
Polygon and point identifiers are first-class — notifications can carry geographic polygons, and subscribers can filter by polygon intersection or point containment.
Pluggable backends
Aviso abstracts storage behind a NotificationBackend trait.
Today two backends ship: JetStream (NATS-backed, durable, production-ready)
and In-Memory (single-process, for development and testing).
Server-Sent Events (SSE)
Watch and replay streams use SSE — a simple, firewall-friendly HTTP streaming protocol supported natively by browsers and all major HTTP clients. The stream includes typed control frames (connection established, replay started/completed, heartbeats, and graceful close reasons).
CloudEvents format
All delivered notifications follow the CloudEvents specification, making them easy to integrate with other event-driven systems.
Use Cases
- Data availability monitoring — trigger downstream workflows the moment a dataset lands
- Operational pipelines — coordinate processing steps across distributed services
- Audit and compliance — replay historical events to reconstruct what was published and when
- System integration — connect disparate systems through a standardized event interface
Where to Go Next
If you are new to Aviso, read these pages in order:
- Key Concepts — understand the terminology before anything else
- Installation — get the server running
- Getting Started — send your first notification and watch it arrive
- Practical Examples — copy-paste workflows for common scenarios
If you are configuring for production:
Key Concepts
This page defines the core terms you will encounter throughout Aviso's documentation and API. Reading it before Getting Started will make everything else click faster.
Event Type
An event type is a named category of notification — for example extreme_event, sensor_alert, or data_ready.
Every request to Aviso (notify, watch, or replay) targets exactly one event type. The server uses the event type to:
- look up the matching schema (validation rules, required fields, topic ordering),
- route messages to the correct storage stream,
- apply the correct retention and storage policy.
{ "event_type": "extreme_event", ... }
If no schema is configured for an event type, Aviso falls back to generic behavior: fields are accepted as-is and the topic is built from sorted keys.
Identifier
An identifier is a set of key-value pairs that describe what a notification is about. Think of it as structured metadata that uniquely (or approximately) locates a piece of data.
{
"identifier": {
"region": "north",
"run_time": "1200",
"severity": "4",
"anomaly": "42.5"
}
}
Identifiers serve two purposes depending on the operation:
| Operation | Role of identifier |
|---|---|
notify | Declares the metadata of the notification being published |
watch | Acts as a filter — which notifications to receive |
replay | Acts as a filter — which historical notifications to retrieve |
In watch and replay, identifier values can be constraint objects instead of scalars
(e.g. {"gte": 5}) for numeric and enum fields. See Streaming Semantics.
Topic
A topic is the internal routing key Aviso builds from an identifier. You rarely construct topics manually — Aviso builds them for you.
Topics are dot-separated strings, for example:
extreme_event.north.1200.4.42%2E5
Each token corresponds to one identifier field, in the order defined by key_order in the schema.
Note the %2E — the dot in 42.5 is percent-encoded so it doesn't conflict with the topic separator.
Reserved characters (., *, >, %) in field values are percent-encoded before writing
to the backend so they do not break routing. See Topic Encoding.
Schema
A schema configures how Aviso handles a specific event type.
Schemas are defined in configuration/config.yaml under notification_schema.
A schema controls:
topic.base— the stream/prefix for this event type (e.g.diss,mars)topic.key_order— the order of identifier fields in the topic stringidentifier.*— validation rules per field (type, required, allowed values, ranges)payload.required— whether a payload is mandatory on notifystorage_policy— per-stream retention, size limits, compression (JetStream only)
Example:
notification_schema:
extreme_event:
topic:
base: "extreme_event"
key_order: ["region", "run_time", "severity", "anomaly", "polygon"]
identifier:
region:
description: "Geographic region label."
type: EnumHandler
values: ["north", "south", "east", "west"]
required: true
run_time:
type: TimeHandler
required: true
severity:
description: "Severity level from 1 to 7."
type: IntHandler
range: [1, 7]
required: true
anomaly:
type: FloatHandler
range: [0.0, 100.0]
required: false
polygon:
type: PolygonHandler
required: false
payload:
required: false
Every key listed in key_order must have a corresponding entry in identifier.
For notify, every identifier key declared in the schema must be present in the request.
Fields marked required: false are optional for validation semantics, but the key itself
is still required on notify so Aviso can build a deterministic topic.
Payload
A payload is arbitrary JSON attached to a notification. Aviso treats it as opaque — it stores and replays the value exactly as sent.
Valid payload types: object, array, string, number, boolean, or null.
{ "payload": { "path": "/data/grib2/file.grib2", "size": 1048576 } }
Whether payload is required or optional is controlled per schema by payload.required.
See Payload Contract for the full input → storage → replay mapping.
Operations: Notify, Watch, Replay
Aviso exposes three operations, each on its own endpoint:
Notify — POST /api/v1/notification
Publishes a notification to the backend. The identifier must match all required schema fields exactly (no wildcards, no constraints).
Watch — POST /api/v1/watch
Opens a persistent Server-Sent Events (SSE) stream. Receives live notifications as they arrive, optionally starting from a historical point.
- Omit
from_id/from_date→ live-only stream - Provide one of them → historical replay first, then live
Replay — POST /api/v1/replay
Opens a finite SSE stream of historical notifications only.
Requires exactly one of from_id or from_date.
Stream closes automatically when history is exhausted.
For end-to-end working examples of all three operations — including spatial and constraint filtering — see Practical Examples.
CloudEvents
Aviso delivers notifications to watch/replay clients as CloudEvents — a standard envelope format. Each event includes:
id— the backend sequence reference (e.g.mars@42), used for targeted delete or resumetype— the Aviso event type stringsource— the server base URLdata.identifier— the canonicalized identifierdata.payload— the notification payload (ornullif omitted)
Backend
The backend is the storage and messaging layer that Aviso delegates to. Two backends are supported:
| Backend | Use case |
|---|---|
jetstream | Production: durable, replicated, persistent history |
in_memory | Development: fast setup, no persistence |
The backend is selected via notification_backend.kind in config.
See Backends Overview.
Installation
This page covers every way to get Aviso Server running: building from source, using Docker, or deploying to Kubernetes with Helm.
Prerequisites
Rust toolchain
Aviso is written in Rust and requires edition 2024, which means Rust 1.85 or newer. The CI always builds against the latest stable toolchain.
Install or update Rust via rustup:
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
Verify your version:
rustc --version
# rustc 1.85.0 (... ) or newer
System dependencies
Aviso links against OpenSSL. On Debian/Ubuntu:
sudo apt-get install -y libssl-dev pkg-config build-essential
On Fedora/RHEL:
sudo dnf install -y openssl-devel pkg-config gcc
On macOS (with Homebrew):
brew install openssl pkg-config
Build from Source
Clone the repository:
git clone https://github.com/ecmwf/aviso-server.git
cd aviso-server
Development build
Fast to compile, includes debug symbols:
cargo build
Binary location: target/debug/aviso_server
Release build
Optimized for production use:
cargo build --release
Binary location: target/release/aviso_server
Run directly
cargo run # development
cargo run --release # release
./target/release/aviso_server # pre-built binary
The server loads ./configuration/config.yaml by default.
See Configuration for all config loading options.
Docker
The repository includes a multi-stage Dockerfile that produces a minimal
distroless image.
Build the image
# Production image (distroless, minimal attack surface)
docker build --target release -t aviso-server:local .
# Debug image (Debian slim, includes bash for troubleshooting)
docker build --target debug -t aviso-server:debug .
Run with Docker
Mount your config file and expose the port:
docker run --rm \
-p 8000:8000 \
-v $(pwd)/configuration/config.yaml:/app/configuration/config.yaml:ro \
aviso-server:local
Or override settings via environment variables (no config mount needed):
docker run --rm \
-p 8000:8000 \
-e AVISOSERVER_APPLICATION__HOST=0.0.0.0 \
-e AVISOSERVER_APPLICATION__PORT=8000 \
-e AVISOSERVER_NOTIFICATION_BACKEND__KIND=in_memory \
aviso-server:local
Build targets summary
| Target | Base image | Size | Use |
|---|---|---|---|
release | distroless/cc | minimal | Production |
debug | debian:bookworm-slim | larger | Troubleshooting |
Local JetStream (Docker)
For local development with the JetStream backend, use the provided script to spin up a NATS server with JetStream enabled:
./scripts/init_nats.sh
This script:
- Generates a NATS config file in
./nats_config/ - Creates a Docker volume for persistent JetStream storage
- Starts a
nats:2-alpinecontainer onlocalhost:4222 - Waits for the server to be ready and prints a connection summary
Requires: Docker
Optional environment variables:
NATS_PORT=4222 # NATS client port (default: 4222)
ENABLE_AUTH=true # Enable token auth (default: false)
MAX_MEMORY=5GB # JetStream memory limit (default: 5GB)
MAX_STORAGE=10GB # JetStream file storage limit (default: 10GB)
Example with auth enabled:
ENABLE_AUTH=true ./scripts/init_nats.sh
The script prints the generated token at the end of its output, for example:
Authentication enabled with token: aviso_secure_token_1712345678
After the script completes, configure Aviso to connect:
Without auth (default):
notification_backend:
kind: jetstream
jetstream:
nats_url: "nats://localhost:4222"
With auth — pass the token printed by the script:
notification_backend:
kind: jetstream
jetstream:
nats_url: "nats://localhost:4222"
token: "aviso_secure_token_1712345678"
Alternatively, set the token as an environment variable (Aviso reads NATS_TOKEN automatically):
export NATS_TOKEN=aviso_secure_token_1712345678
cargo run
Kubernetes / Helm
For production Kubernetes deployments, use the official Helm chart:
- Chart repository: https://github.com/ecmwf/aviso-chart
The chart handles:
- Deployment with configurable replicas
- ConfigMap-based configuration mounting
- Service and Ingress setup
- JetStream connection settings via values
Build Documentation
Aviso docs are built with mdBook.
Install mdBook and the mermaid preprocessor:
cargo install mdbook
cargo install mdbook-mermaid
Serve docs locally with live reload:
mdbook serve docs --open
Build static output to docs/book/:
mdbook build docs
Run Tests
# Unit and integration tests (in-memory backend)
cargo test --workspace
# Include JetStream integration tests (requires running NATS)
AVISO_RUN_NATS_TESTS=1 cargo test --workspace
# Tests must run single-threaded (shared port binding)
cargo test -- --test-threads=1
Getting Started
This guide walks you through running Aviso Server locally and sending your first notification. It assumes you have already completed Installation.
If you haven't read Key Concepts yet, do that first — it will make the commands below much easier to follow.
1. Choose a Backend
Aviso needs a storage backend before it can accept notifications. For local exploration, in-memory requires zero infrastructure.
| Goal | Backend |
|---|---|
| Quick local test, no setup | in_memory |
| Persistent history, realistic behavior | jetstream |
The examples on this page use in_memory.
To use JetStream locally, see Local JetStream setup below.
2. Configure the Server
Open configuration/config.yaml and make sure it contains at minimum:
application:
host: "127.0.0.1"
port: 8000
notification_backend:
kind: in_memory
in_memory:
max_history_per_topic: 100
max_topics: 10000
notification_schema:
my_event:
topic:
base: "my_event"
key_order: ["region", "date"]
identifier:
region:
description: "Geographic region label."
type: EnumHandler
values: ["north", "south", "east", "west"]
required: true
date:
type: DateHandler
required: true
payload:
required: false
You can also use environment variables to override any config value without editing the file. See Configuration for the full precedence rules.
3. Start the Server
cargo run
Or with the release binary:
./target/release/aviso_server
You should see structured JSON log output. Once you see a line like:
{"level":"INFO","message":"aviso-server listening","address":"127.0.0.1:8000"}
the server is ready.
4. Check the Health Endpoint
curl -sS http://127.0.0.1:8000/health
Expected response: 200 OK
5. Open a Watch Stream
Before publishing, open a terminal and start watching for events. This is a live SSE stream — keep it open while you proceed to step 6.
curl -N -X POST "http://127.0.0.1:8000/api/v1/watch" \
-H "Content-Type: application/json" \
-d '{
"event_type": "my_event",
"identifier": {
"region": "north",
"date": "20250706"
}
}'
You will see the SSE connection frame immediately:
data: {"type":"connection_established","timestamp":"2026-03-04T10:00:00Z"}
The stream stays open and will print new events as they arrive.
6. Publish a Notification
In a second terminal, send a notification:
curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
-H "Content-Type: application/json" \
-d '{
"event_type": "my_event",
"identifier": {
"region": "north",
"date": "20250706"
},
"payload": { "note": "data is ready" }
}'
Expected response:
{ "id": "my_event@1", "topic": "my_event.north.20250706" }
The id (my_event@1) is the backend sequence reference.
You can use it later to replay from that point or delete that specific notification.
Switch back to the watch terminal — the notification should have arrived:
data: {"specversion":"1.0","id":"my_event@1","type":"aviso.notification",...}
7. Replay History
Once you have published a few notifications, you can replay them from a specific point:
curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
-H "Content-Type: application/json" \
-d '{
"event_type": "my_event",
"identifier": {
"region": "north",
"date": "20250706"
},
"from_id": "1"
}'
The stream will emit all matching historical notifications, then close with:
data: {"type":"connection_closing","reason":"end_of_stream","timestamp":"..."}
Optional: Local JetStream Setup
To test with the JetStream backend (persistent storage, more realistic), see Installation — Local JetStream for the full setup including environment variables and token authentication.
Run the Smoke Test
A Python smoke script covers the full notify → watch → replay cycle. Copy the example config and start the server:
cp configuration/config.yaml.example configuration/config.yaml
cargo run
With auth (default) — start auth-o-tron before running the smoke tests:
python3 -m pip install httpx
./scripts/auth-o-tron-docker.sh
python3 scripts/smoke_test.py
Without auth — set auth.enabled: false in your config (or remove the auth section), then:
AUTH_ENABLED=false python3 scripts/smoke_test.py
AUTH_ENABLED must match the server's auth.enabled setting.
When false, auth headers are omitted and auth-specific smoke tests are skipped.
Useful overrides:
BASE_URL="http://127.0.0.1:8000" python3 scripts/smoke_test.py
BACKEND="jetstream" python3 scripts/smoke_test.py
TIMEOUT_SECONDS=12 python3 scripts/smoke_test.py
SMOKE_VERBOSE=1 python3 scripts/smoke_test.py
python3 scripts/smoke_test.py --verbose
The smoke script covers:
- health endpoint
- replay/watch baseline flows (
test_polygon) marsreplay with dot-containing identifier values, integer and enum predicatesdisseminationwatch +from_datewith dot-containing identifier values- read/write auth separation across public, role-restricted, and admin-only streams
What's Next
- Practical Examples — constraint filtering, spatial filtering, admin operations
- Streaming Semantics — full rules for watch/replay behavior
- Configuration Reference — all config fields and defaults
Defining Notification Schemas
A notification schema describes the shape of an event stream: what identifier fields are accepted, how they are validated, how the storage topic is constructed, whether a payload is required, and who can read or write.
Schemas are defined under the notification_schema key in your configuration file. Each top-level key becomes an event type that clients reference when calling /api/v1/notification, /api/v1/watch, or /api/v1/replay.
notification_schema:
my_event: # ← event type name
topic: ...
identifier: ...
payload: ...
auth: ... # optional
storage_policy: ... # optional, JetStream only
Topic Configuration
A schema should have a topic block that tells Aviso how to build the NATS subject for storage and routing.
topic:
base: "weather"
key_order: ["region", "date"]
| Field | Description |
|---|---|
base | Root prefix for the subject. Must be unique across all schemas (case-insensitive). |
key_order | Ordered list of identifier field names appended to the base, separated by .. |
Given base: "weather" and key_order: ["region", "date"], a request with region=north and date=20250706 produces the subject:
weather.north.20250706
Values containing reserved characters (., *, >, %) are automatically percent-encoded so they do not interfere with NATS subject routing. See Topic Encoding for details.
Only fields listed in key_order contribute to the subject. Other identifier fields are validated but not part of the topic.
Identifier Fields
The identifier map defines the fields that clients can send. Each field specifies a handler type that controls validation and canonicalization.
identifier:
region:
type: EnumHandler
values: ["north", "south", "east", "west"]
required: true
description: "Geographic region."
date:
type: DateHandler
required: true
Every field supports these common properties:
| Property | Type | Description |
|---|---|---|
type | string | Handler type (see below). Required. |
required | bool | If true, requests missing this field are rejected. Required. |
description | string | Human-readable text exposed by GET /api/v1/schema. Optional. |
Handler Types
StringHandler
Accepts any non-empty string. No transformation.
class:
type: StringHandler
max_length: 2 # optional: reject strings longer than this
required: true
DateHandler
Parses dates in multiple formats and canonicalizes to a configured output format.
Accepted inputs: YYYY-MM-DD, YYYYMMDD, YYYY-DDD (day-of-year).
date:
type: DateHandler
canonical_format: "%Y%m%d" # output format (default: "%Y%m%d")
required: false
canonical_format | Example output |
|---|---|
"%Y%m%d" | 20250706 |
"%Y-%m-%d" | 2025-07-06 |
Invalid dates (e.g. February 30) are rejected.
TimeHandler
Parses times and canonicalizes to four-digit HHMM format.
Accepted inputs: 14:30, 1430, 14, 9:05.
time:
type: TimeHandler
required: false
Input 14:30 → stored as 1430. Input 9 → stored as 0900.
EnumHandler
Accepts one value from a predefined list. Matching is case-insensitive; stored in lowercase.
domain:
type: EnumHandler
values: ["a", "b", "c"]
required: false
Input "A" → stored as "a". Input "x" → rejected.
IntHandler
Accepts integer strings. Strips leading zeros for canonical storage.
step:
type: IntHandler
range: [0, 100000] # optional: inclusive [min, max] bounds
required: false
Input "007" → stored as "7". Input "-1" with range: [0, 100] → rejected.
FloatHandler
Accepts floating-point strings. Rejects NaN and Inf.
severity:
type: FloatHandler
range: [0.0, 10.0] # optional: inclusive [min, max] bounds
required: false
Input "3.14" → stored as "3.14". Input "NaN" → rejected.
ExpverHandler
Experiment version handler. Numeric values are zero-padded to four digits; non-numeric values are lowercased.
expver:
type: ExpverHandler
default: "0001" # optional: used when the field is empty
required: false
Input "1" → stored as "0001". Input "test" → stored as "test".
PolygonHandler
Accepts a closed polygon as a coordinate string. Used for spatial filtering on /watch and /replay.
Format: lat,lon,lat,lon,...,lat,lon — the first and last coordinate pair must be identical to close the polygon. Parentheses are optional.
polygon:
type: PolygonHandler
required: true
Constraints: at least 3 coordinate pairs (plus closing repeat), latitude in [-90, 90], longitude in [-180, 180].
See Spatial Filtering for usage examples.
Reserved Query-Time Fields
point (built-in)
The point field is a reserved identifier that clients can send on /watch or /replay to filter notifications whose polygon contains the given point. It accepts a single lat,lon coordinate pair.
point is not a schema-configurable handler — it is always available on any schema that includes a PolygonHandler field. The /notification endpoint rejects requests that include point.
See Spatial Filtering for usage examples.
Payload Configuration
Controls whether requests must include a payload field.
payload:
required: true
required | Behavior |
|---|---|
true | Requests without a payload are rejected (400). |
false | Payload is optional; missing payloads are stored as JSON null. |
The payload can be any valid JSON value (object, array, string, number, boolean, null). It is stored as-is with no reshaping.
See Payload Contract for full semantics.
Per-Stream Authentication
When global authentication is enabled, individual schemas can require credentials and restrict access by role.
auth:
required: true
read_roles:
localrealm: ["analyst", "consumer"]
write_roles:
localrealm: ["producer"]
| Field | Default when omitted | Effect |
|---|---|---|
required | — | Must be set explicitly to true or false. |
read_roles | Any authenticated user can read | Maps realm → role list for watch/replay access. |
write_roles | Only admins can write | Maps realm → role list for notify access. |
Use ["*"] as the role list to grant access to all users from a realm.
Admins (users matching global admin_roles) always have both read and write access.
Omitting the entire auth block makes the stream publicly accessible, even when global auth is enabled.
See Authentication for the full access-control matrix and role-matching rules.
Storage Policy (JetStream Only)
When using the JetStream backend, you can configure per-stream retention limits.
storage_policy:
retention_time: "7d"
max_messages: 500000
max_size: "2Gi"
allow_duplicates: false
compression: true
| Field | Type | Description |
|---|---|---|
retention_time | duration | Discard messages older than this. Accepts 30m, 1h, 7d, 1w. |
max_messages | integer | Maximum message count; oldest are discarded when exceeded. |
max_size | size | Maximum stream size. Accepts 100Mi, 1Gi, etc. |
allow_duplicates | bool | Allow duplicate message IDs. Default: backend-specific. |
compression | bool | Enable message-level compression. Default: backend-specific. |
All fields are optional. Omitting storage_policy entirely uses backend defaults.
The in-memory backend does not support storage policies.
Complete Example
This example defines a weather alert stream with date/region routing, enum validation, optional payload, and role-restricted access.
notification_schema:
weather_alert:
payload:
required: false
topic:
base: "alert"
key_order: ["region", "severity_level", "date"]
identifier:
region:
description: "Geographic region."
type: EnumHandler
values: ["europe", "asia", "africa", "americas", "oceania"]
required: true
severity_level:
description: "Alert severity (1–5)."
type: IntHandler
range: [1, 5]
required: true
date:
description: "Alert date."
type: DateHandler
canonical_format: "%Y%m%d"
required: true
issued_by:
description: "Issuing authority identifier."
type: StringHandler
max_length: 64
required: false
auth:
required: true
read_roles:
operations: ["*"]
write_roles:
operations: ["forecaster", "admin"]
storage_policy:
retention_time: "30d"
max_messages: 100000
With this schema:
- Publishing a notification with
region=europe,severity_level=3,date=2025-07-06produces the subjectalert.europe.3.20250706. - The
issued_byfield is validated if present but does not appear in the subject (not inkey_order). - Any authenticated user in the
operationsrealm can watch/replay. - Only users with the
forecasteroradminrole can publish. - JetStream retains up to 100,000 messages or 30 days, whichever limit is hit first.
Tips
- Start simple. Define only
topic, one or twoidentifierfields, andpayload. Add auth and storage policy later. - Use
key_orderdeliberately. Fields inkey_orderbecome part of the NATS subject and affect routing granularity. More fields = more specific topics = more efficient filtering, but also more distinct subjects. - Mark routing fields required. If a field is in
key_order, consider making itrequired: trueso every notification produces a complete subject. - Keep
baseshort and unique. It is the root of every subject in this stream. Avoid collisions with other schemas. - Test with
GET /api/v1/schema/{event_type}. This endpoint returns the public view of your schema, showing all identifier fields and their validation rules.
Practical Examples
This section provides copy-paste examples for common workflows.
All examples use the same generic event schema so behavior is easy to compare.
Shared Generic Schema
notification_schema:
extreme_event:
topic:
base: extreme_event
key_order: [region, run_time, severity, anomaly, polygon]
identifier:
region:
description: "Geographic region label."
type: EnumHandler
values: ["north", "south", "east", "west"]
required: true
run_time:
type: TimeHandler
required: true
severity:
description: "Severity level from 1 to 7."
type: IntHandler
range: [1, 7]
required: true
anomaly:
type: FloatHandler
range: [0.0, 100.0]
required: false
polygon:
type: PolygonHandler
required: false
payload:
required: false
Shared Assumptions
- Base URL:
http://127.0.0.1:8000 - Content type:
application/json - Replay examples use
from_idorfrom_dateexplicitly.
Next:
- Basic Notify/Watch/Replay
- Spatial Filtering
- Constraint Filtering
- Replay Starting Points
- Admin Operations
Basic Notify/Watch/Replay
Uses the shared generic schema from Practical Examples.
This page is the quickest way to understand the normal API flow.
You first publish (notify), then observe live updates (watch), then read history (replay).
If you are onboarding a new environment, start here before trying filters or admin operations.
Read the examples in order.
1) Notify
curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{
"region":"north",
"run_time":"1200",
"severity":"4",
"anomaly":"42.5"
},
"payload":{"note":"initial forecast"}
}'
Expected:
- HTTP
200 - required identifier keys must be present; optional keys may be omitted
2) Watch (Live Only)
curl -N -X POST "http://127.0.0.1:8000/api/v1/watch" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{
"region":"north",
"run_time":"1200",
"severity":"4",
"anomaly":"42.5"
}
}'
Expected:
- HTTP
200 - SSE starts with
connection_established - only new matching notifications arrive
3) Replay (Historical)
curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{
"region":"north",
"run_time":"1200",
"severity":"4",
"anomaly":"42.5"
},
"from_id":"1"
}'
Expected:
- HTTP
200 - SSE emits
replay_started, replay events,replay_completed, then closes
Constraint Filtering
Uses the shared generic schema from Practical Examples.
Constraint filtering lets subscribers express conditions over identifier fields instead of exact values.
In practice, this is how you ask for ranges (severity >= 5), numeric bands, or enum subsets.
This page starts with seed data, then shows valid constraint requests, then common failure cases.
It is the best reference for building client-side filter payloads.
Seed Notifications
curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{"region":"north","run_time":"1200","severity":"3","anomaly":42.5},
"payload":{"note":"seed-a"}
}'
curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{"region":"south","run_time":"1200","severity":"6","anomaly":87.2},
"payload":{"note":"seed-b"}
}'
Expected:
- both return HTTP
200
Scalar Value (Implicit eq)
curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{"region":"south","run_time":"1200","severity":6,"anomaly":87.2},
"from_id":"1"
}'
Expected:
- HTTP
200 - only
severity = 6notifications match
Integer Constraint (gte)
curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{
"region":{"in":["north","south"]},
"run_time":"1200",
"severity":{"gte":5},
"anomaly":87.2
},
"from_id":"1"
}'
Expected:
- HTTP
200 - includes
severity=6 - excludes
severity=3
Float Constraint (between)
curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{
"region":"north",
"run_time":"1200",
"severity":"3",
"anomaly":{"between":[40.0,50.0]}
},
"from_id":"1"
}'
Expected:
- HTTP
200 - includes
anomaly=42.5
Float eq Is Exact (No Tolerance)
Float eq and in are exact comparisons. This keeps behavior deterministic across replay/live
and avoids hidden tolerance windows.
curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{
"region":"north",
"run_time":"1200",
"severity":"3",
"anomaly":{"eq":42.5}
},
"from_id":"1"
}'
Expected:
- HTTP
200 - only notifications with exactly
anomaly=42.5match NaN/infvalues are rejected by float validation/constraints
Enum Constraint (in)
curl -N -X POST "http://127.0.0.1:8000/api/v1/watch" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{
"region":{"in":["south","west"]},
"run_time":"1200",
"severity":"6",
"anomaly":87.2
}
}'
Expected:
- HTTP
200 - live notifications pass only for regions in
["south","west"]
Invalid: Two Operators in One Constraint Object
curl -sS -X POST "http://127.0.0.1:8000/api/v1/replay" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{
"region":"north",
"run_time":"1200",
"severity":{"gte":4,"lt":7},
"anomaly":42.5
},
"from_id":"1"
}'
Expected:
- HTTP
400 - message says constraint object must contain exactly one operator
Invalid: Constraint Object on /notification
curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{
"region":"north",
"run_time":"1200",
"severity":{"gte":4},
"anomaly":42.5
},
"payload":{"note":"should-fail"}
}'
Expected:
- HTTP
400
Spatial Filtering
Uses the shared generic schema from Practical Examples.
Spatial filtering has two modes:
identifier.polygon: keep notifications whose polygon intersects the request polygon.identifier.point: keep notifications whose polygon contains the request point.
This matters when many notifications share similar non-spatial identifiers and you need geographic precision.
Seed Notifications
These two notifications differ only by polygon shape.
curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{
"region":"north",
"run_time":"1200",
"severity":"4",
"polygon":"(52.5,13.4,52.6,13.5,52.5,13.6,52.4,13.5,52.5,13.4)"
},
"payload":{"note":"poly-a"}
}'
curl -sS -X POST "http://127.0.0.1:8000/api/v1/notification" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{
"region":"north",
"run_time":"1200",
"severity":"4",
"polygon":"(10.0,10.0,10.2,10.0,10.2,10.2,10.0,10.2,10.0,10.0)"
},
"payload":{"note":"poly-b"}
}'
Expected:
- both requests return HTTP
200
Replay with Polygon Intersection Filter
This request polygon intersects poly-a but not poly-b.
curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{
"region":"north",
"run_time":"1200",
"severity":"4",
"polygon":"(52.52,13.45,52.62,13.55,52.52,13.65,52.42,13.55,52.52,13.45)"
},
"from_id":"1"
}'
Expected:
- HTTP
200 - replay includes
poly-a - replay excludes
poly-b
Replay with Point Containment Filter
The point below is inside poly-a and outside poly-b.
curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{
"region":"north",
"run_time":"1200",
"severity":"4",
"point":"52.55,13.50"
},
"from_id":"1"
}'
Expected:
- HTTP
200 - replay includes
poly-a - replay excludes
poly-b
Replay Without Spatial Filter
No polygon and no point means no spatial narrowing.
curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{
"region":"north",
"run_time":"1200",
"severity":"4"
},
"from_id":"1"
}'
Expected:
- HTTP
200 - replay may include both
poly-aandpoly-bbecause only non-spatial fields are applied
Invalid: polygon and point Together
curl -sS -X POST "http://127.0.0.1:8000/api/v1/replay" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{
"region":"north",
"run_time":"1200",
"severity":"4",
"polygon":"(52.5,13.4,52.6,13.5,52.5,13.6,52.4,13.5,52.5,13.4)",
"point":"52.55,13.50"
},
"from_id":"1"
}'
Expected:
- HTTP
400 - validation error says both spatial filters cannot be used together
Replay Starting Points
Uses the shared generic schema from Practical Examples.
Replay start parameters control where historical delivery begins.
Choose from_id when you track sequence progress; choose from_date when you track wall-clock time.
These examples cover valid forms and the common invalid combinations that return 400.
Use this page to validate client retry and resume logic.
Replay from Sequence (from_id)
curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"},
"from_id":"10"
}'
Expected:
- HTTP
200 - replay starts from sequence
10(inclusive)
Replay from Time (from_date) RFC3339
curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"},
"from_date":"2026-03-01T12:00:00Z"
}'
Expected:
- HTTP
200 - replay starts from that UTC timestamp (inclusive)
Replay from Time (from_date) Unix Seconds
curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"},
"from_date":"1740509903"
}'
Expected:
- HTTP
200
Replay from Time (from_date) Unix Milliseconds
curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"},
"from_date":"1740509903710"
}'
Expected:
- HTTP
200
Invalid Replay Start Combinations
Missing Both from_id and from_date
curl -sS -X POST "http://127.0.0.1:8000/api/v1/replay" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"}
}'
Expected:
- HTTP
400
Both from_id and from_date Provided
curl -sS -X POST "http://127.0.0.1:8000/api/v1/watch" \
-H "Content-Type: application/json" \
-d '{
"event_type":"extreme_event",
"identifier":{"region":"north","run_time":"1200","severity":"4","anomaly":"42.5"},
"from_id":"5",
"from_date":"2026-03-01T12:00:00Z"
}'
Expected:
- HTTP
400
Admin Operations (Practical)
See full reference in Admin Operations.
These operations are for cleanup and recovery, not normal data flow. Use delete when one bad record must be removed; use wipe when resetting a stream or environment. Because these endpoints are destructive, validate IDs and stream names carefully before execution.
Delete One Notification by ID
curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/notification/extreme_event@42"
Expected:
200if it exists404if stream/sequence does not exist400for invalid format
Wipe One Stream
curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/wipe/stream" \
-H "Content-Type: application/json" \
-d '{"stream_name":"EXTREME_EVENT"}'
Expected:
200- stream definition remains, messages are removed
Wipe All Streams
curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/wipe/all"
Expected:
200- all stream data is removed
Backends Overview
Aviso abstracts all storage and messaging behind a NotificationBackend trait.
Two implementations ship out of the box.
Which Backend Should I Use?
flowchart TD
A["What do you need?"] --> B{"Persistent history<br/>across restarts?"}
B -->|Yes| C[jetstream]
B -->|No| D{"Multiple replicas<br/>or pods?"}
D -->|Yes| C
D -->|No| E{"Production<br/>workload?"}
E -->|Yes| C
E -->|No| F[in_memory]
C:::jet
F:::mem
classDef jet fill:#1a6b3a,color:#fff,stroke:#0d4a27
classDef mem fill:#1a4d6b,color:#fff,stroke:#0d3347
| Requirement | Recommended backend |
|---|---|
| Persistent history across restarts | jetstream |
| Replay endpoint support | jetstream (or in_memory for local/node-local use) |
| Live watch streaming support | jetstream (or in_memory for local/node-local use) |
| Multi-replica deployment | jetstream |
| Quick local experimentation with minimal setup | in_memory |
Capability Comparison
| Capability | JetStream | In-Memory |
|---|---|---|
| Durable storage | Yes | No — data lost on restart |
| Replay support | Yes | Yes — node-local only |
| Live watch support | Yes | Yes — node-local fan-out |
| Multi-replica / HA | Yes (clustered NATS) | No |
| Per-schema storage policy | Yes | No — rejected at startup |
| Cross-instance consistency | Yes | No |
Backend Details
- In-Memory Backend — behavior, config, production caveats
- JetStream Backend — setup, stream management, operational notes
- Backend Development — how to implement a new backend
InMemory Backend
Intended use
in_memory backend is best for:
- local development,
- schema/validation testing,
- lightweight experimentation where persistence is not required.
Behavior
- Data is process-memory only and is lost on restart.
- Topic/message limits are enforced with eviction.
- No shared state across replicas or pods.
- Supports live watch subscriptions (live-only delivery).
- Supports replay batch retrieval for
from_idandfrom_date. - Uses in-process fanout only, so subscriptions/replay are node-local.
Configuration
notification_backend.kind: in_memory
Available knobs:
max_history_per_topic(default1)max_topics(default10000)enable_metrics(defaultfalse)
Per-schema storage_policy fields are currently not supported on in_memory and are rejected at startup.
Production suitability
Not recommended for production because:
- no durability,
- no HA replication,
- no cross-instance consistency,
- replay/watch history is limited to local in-memory retention.
JetStream Backend
The jetstream backend is the production-oriented storage implementation.
It connects to a NATS server with JetStream enabled and uses it
for durable message storage, replay, and live streaming.
Intended Use
Use jetstream when you need:
- durable storage that survives server restarts
- replay across multiple server instances
- live streaming with cluster-wide fan-out
- configurable retention, size limits, and compression
Local Test Setup
Start a NATS + JetStream instance via Docker:
./scripts/init_nats.sh
Then configure Aviso:
notification_backend:
kind: jetstream
jetstream:
nats_url: "nats://localhost:4222"
For full setup options including authentication and storage limits, see Installation — Local JetStream.
Core Behavior
- Connects to the configured NATS server on startup (with retry).
- Creates JetStream streams on demand, one per topic
base(e.g.MARS,DISS,POLYGON). - Publishes notifications directly to JetStream subjects using the encoded wire format.
- Uses pull consumers for replay batching (
from_id,from_date). - Uses push consumers for live watch subscriptions.
- Reconciles existing streams against current config when they are first accessed.
Configuration Reference
All fields live under notification_backend.jetstream.
Connection & startup
| Field | Default | Notes |
|---|---|---|
nats_url | nats://localhost:4222 | NATS server URL. |
token | None | Token auth; falls back to NATS_TOKEN environment variable. |
timeout_seconds | 30 | Per-attempt connection timeout (> 0). |
retry_attempts | 3 | Startup connection attempts before backend init fails (> 0). |
Runtime reconnect
| Field | Default | Notes |
|---|---|---|
enable_auto_reconnect | true | Enables/disables NATS client reconnect after startup. |
max_reconnect_attempts | 5 | 0 means unlimited reconnect retries. |
reconnect_delay_ms | 2000 | Delay between reconnect attempts and startup connect retries (> 0). |
Publish resilience
| Field | Default | Notes |
|---|---|---|
publish_retry_attempts | 5 | Retries for transient channel closed publish failures (> 0). |
publish_retry_base_delay_ms | 150 | Base backoff in ms for publish retries; grows exponentially per attempt (> 0). |
Stream defaults
These apply to every stream created by Aviso unless overridden by a per-schema storage_policy.
| Field | Default | Notes |
|---|---|---|
max_messages | None | Stream message cap (maps to max_messages). |
max_bytes | None | Stream size cap in bytes (maps to max_bytes). |
retention_time | None | Default max age: duration literal (s, m, h, d, w; e.g. 30d). |
storage_type | file | file or memory — parsed as typed enum at config load. |
replicas | None | Stream replica count. |
retention_policy | limits | limits, interest, or workqueue — parsed as typed enum. |
discard_policy | old | old or new — parsed as typed enum. |
Fail-fast validation:
storage_type,retention_policy, anddiscard_policyare parsed as typed enums during configuration loading. Invalid values fail startup immediately, before any streams are created.
Full example
notification_backend:
kind: jetstream
jetstream:
nats_url: "nats://localhost:4222"
timeout_seconds: 30
retry_attempts: 3
enable_auto_reconnect: true
max_reconnect_attempts: 5
reconnect_delay_ms: 2000
publish_retry_attempts: 5
publish_retry_base_delay_ms: 150
storage_type: file
retention_policy: limits
discard_policy: old
Stream Management
Stream creation
On first access (e.g. first publish for a given event type), Aviso creates a JetStream stream with the following settings applied:
storage_type,retention_policy,discard_policymax_messages,max_bytes,retention_time→max_agereplicas
The stream subject binding is set to <base>.> (e.g. mars.>) to capture all topics
under that base.
Reconciliation of existing streams
When a stream already exists and is accessed by Aviso, it is reconciled — the current stream config is compared against the desired config and mutable fields are updated if drift is detected:
- limits (retention, size, message count)
- compression
- duplicate window
- replicas
- subject binding
If JetStream rejects an update (e.g. the field is not editable in the current server/stream state), Aviso logs a warning and continues with the existing stream configuration.
Precedence
Backend-level defaults are applied first, then per-schema storage_policy overrides for that stream:
notification_backend.jetstream.* (base defaults)
↓ overridden by
notification_schema.<event_type>.storage_policy.*
Applying config changes to existing streams
Changes to stream-affecting settings (e.g. compression, retention, limits) in config.yaml
are applied to existing streams automatically during reconciliation when the stream is next accessed.
To force historical data to be physically rewritten with new settings (e.g. re-pack with compression):
- Stop all Aviso writers for the target stream.
- Delete the stream in NATS.
- Restart Aviso (or publish again) — the stream is recreated with current config.
# List streams
nats stream ls
# Delete a stream (example: DISS)
nats stream rm DISS
wipe_stream(admin endpoint) removes messages but preserves stream configuration. Use stream deletion only when you need historical data physically rewritten.
Verifying Effective Stream Policy
Use the nats CLI to inspect the stream config after a publish or reconcile:
# Replace POLYGON with your stream name (MARS, DISS, etc.)
nats --server nats://localhost:4222 stream info POLYGON
Fields to check:
| CLI field | Config field |
|---|---|
Max Age | retention_time |
Max Messages | max_messages |
Max Bytes | max_bytes / per-schema max_size |
Max Messages Per Subject | allow_duplicates: 1 = disabled, -1 = enabled |
Compression | None or S2 |
Replay Behavior
- Sequence replay (
from_id): starts from that sequence number, inclusive. - Time replay (
from_date): uses JetStream start-time delivery policy. - The API enforces mutual exclusivity —
from_idandfrom_datecannot both be present.
Smoke Test (JetStream Mode)
python3 -m pip install httpx
BACKEND=jetstream \
NATS_URL=nats://localhost:4222 \
JETSTREAM_POLICY_STREAM_NAME=POLYGON \
EXPECT_MAX_MESSAGES=500000 \
EXPECT_MAX_BYTES=2147483648 \
EXPECT_MAX_MESSAGES_PER_SUBJECT=1 \
EXPECT_COMPRESSION=None \
python3 scripts/smoke_test.py
Operational Caveats
- Startup connectivity is controlled by
timeout_seconds+retry_attempts. - Runtime reconnect is controlled by
enable_auto_reconnect,max_reconnect_attempts,reconnect_delay_ms. - Publish retry is a narrow resilience path for transient
channel closedfailures; non-transient failures fail fast. retry_attemptsapplies only to startup; post-startup reconnect uses the reconnect settings.- Setting
max_reconnect_attempts = 0enables unlimited reconnect retries.
Deployment Modes
Local experimentation
Recommended backend:
in_memoryfor quick local request/validation testing.
Characteristics:
- No persistence: data is lost on process restart.
- Single-process state only.
- Not suitable for horizontal scaling or replica failover.
- Supports replay and watch in-process, limited by local memory retention.
- For local JetStream testing, see Installation — Local JetStream.
- For a quick end-to-end behavior check, see Getting Started — Run the Smoke Test.
Production-like / persistent mode
Recommended backend:
jetstream
Characteristics:
- Durable message storage.
- Retention and size limits.
- Replica support (requires clustered NATS setup).
- Supports replay and live streaming workflows.
Recommended packaging/deployment for Kubernetes:
- Helm chart: https://github.com/ecmwf/aviso-chart
Selection guideline
- Need persistence/replay/streaming robustness: use
jetstream. - Need fastest setup for local functional checks only: use
in_memory(node-local replay/watch).
Configuration
Key Behaviors to Know First
Before diving into field-level settings, these rules affect how everything behaves at runtime:
- Environment variables always win — they override any YAML value regardless of which file it came from.
- Replay and watch behavior is controlled by request parameters, not static config switches.
- Invalid policy values fail startup immediately —
storage_type,retention_policy, anddiscard_policyare parsed as typed enums; bad values are caught before any streams are created. - Per-schema
storage_policyis validated at startup against the selected backend's capabilities. Unsupported fields (e.g.retention_timeonin_memory) cause a startup failure with a clear error. - JetStream stream changes are reconciled on access — updating
compression, retention, or limits in config takes effect when that stream is next accessed. Recreate the stream only if you need historical data physically rewritten. /api/v1/schemaresponses are client-focused — internalstorage_policysettings are not exposed.
Loading Precedence
Configuration is loaded in this order (later sources override earlier ones):
./configuration/config.yaml/etc/aviso_server/config.yaml$HOME/.aviso_server/config.yaml- Environment variables (highest precedence)
If AVISOSERVER_CONFIG_FILE is set, only that single file is loaded (steps 1–3 are skipped). Environment variables still override values from the file.
Environment variable format
Prefix: AVISOSERVER_
Nested separator: __
AVISOSERVER_APPLICATION__HOST=0.0.0.0
AVISOSERVER_APPLICATION__PORT=8000
AVISOSERVER_NOTIFICATION_BACKEND__KIND=jetstream
AVISOSERVER_NOTIFICATION_BACKEND__JETSTREAM__NATS_URL=nats://localhost:4222
Config File Structure
The top-level sections are:
| Section | Purpose |
|---|---|
application | Server host, port, static files path |
logging | Log level and format |
auth | Authentication mode, secrets, admin roles |
notification_backend | Backend selection and backend-specific settings |
notification_schema | Per-event-type validation, topic ordering, storage policy |
metrics | Optional Prometheus metrics server |
watch_endpoint | SSE heartbeat, connection limits, replay batch settings |
notification_backend.kind selects the storage implementation:
jetstream— production backend (NATS JetStream)in_memory— development backend (process-local, no persistence)
Backend Details
- Backends Overview — choose the right backend
- In-Memory Backend — behavior and caveats
- JetStream Backend — setup, stream management, operational notes
- Kubernetes deployment: Helm chart
For full field-level documentation of every config option, see Configuration Reference.
Configuration Reference
This page documents runtime-relevant configuration fields and defaults.
Topic Wire Format
- Topic wire subjects always use
.as separator. - Per-schema
topic.separatoris no longer used. - Token values are percent-encoded for reserved chars (
.,*,>,%) before writing to backend subjects.
See Topic Encoding for rules and examples.
application
| Field | Type | Default | Notes |
|---|---|---|---|
host | string | none | Bind address. |
port | u16 | none | Bind port. |
base_url | string | http://localhost | Used in generated CloudEvent source links. |
static_files_path | string | /app/static | Static asset root for homepage assets. |
logging
| Field | Type | Default | Notes |
|---|---|---|---|
level | string | implementation default | Example: info, debug, warn, error. |
format | string | implementation default | Kept for compatibility; output is OTel-aligned JSON. |
auth
Authentication is optional. When disabled (default), all API endpoints are publicly accessible only if schemas do not define stream auth rules. Startup fails if global auth is disabled while a schema sets auth.required=true or non-empty auth.read_roles/auth.write_roles.
When enabled:
- Admin endpoints always require a valid JWT and an admin role.
- Stream endpoints (
notify,watch,replay) enforce authentication only when the target schema hasauth.required: true. - Schema endpoints (
/api/v1/schema) are always public. - In
trusted_proxymode, Aviso validatesAuthorization: Bearer <jwt>locally withjwt_secret.
| Field | Type | Default | Notes |
|---|---|---|---|
enabled | bool | false | Set to true to enable authentication. |
mode | "direct"|"trusted_proxy" | "direct" | direct: forward credentials to auth-o-tron. trusted_proxy: validate forwarded JWT locally. |
auth_o_tron_url | string | "" | auth-o-tron base URL. Required when enabled=true and mode=direct. |
jwt_secret | string | "" | Shared HMAC secret for JWT validation. Required when enabled=true. Not exposed via /api/v1/schema endpoints and redacted when auth settings are serialized or logged. |
admin_roles | map<string, string[]> | {} | Realm-scoped roles for admin endpoints (/api/v1/admin/*). Must contain at least one realm with non-empty roles when enabled=true. |
timeout_ms | u64 | 5000 | Timeout for auth-o-tron requests (milliseconds). Must be > 0. |
Per-stream auth (notification_schema.<event_type>.auth)
| Field | Type | Default | Notes |
|---|---|---|---|
required | bool | — | Must be explicitly set whenever an auth block is present. When true, the stream requires authentication. |
read_roles | map<string, string[]> | — | Realm-scoped roles for read access (watch/replay). When omitted, any authenticated user can read. Use ["*"] as the role list to grant realm-wide access. |
write_roles | map<string, string[]> | — | Realm-scoped roles for write access (notify). When omitted, only users matching global admin_roles can write. Use ["*"] as the role list to grant realm-wide access. |
See Authentication for detailed setup, client usage, and error responses.
metrics
Optional Prometheus metrics endpoint. When enabled, a separate HTTP server serves /metrics on an internal port for scraping by Prometheus/ServiceMonitor. This keeps metrics isolated from the public API.
| Field | Type | Default | Notes |
|---|---|---|---|
enabled | bool | false | Enable the metrics endpoint. |
host | string | "127.0.0.1" | Bind address for the metrics server. Defaults to loopback to avoid public exposure. |
port | u16 | none | Required when enabled=true. Must differ from application.port. |
Exposed metrics:
| Metric | Type | Labels | Description |
|---|---|---|---|
aviso_notifications_total | counter | event_type, status | Total notification requests. |
aviso_sse_connections_active | gauge | endpoint, event_type | Currently active SSE connections. |
aviso_sse_connections_total | counter | endpoint, event_type | Total SSE connections opened. |
aviso_sse_unique_users_active | gauge | endpoint | Distinct users with active SSE connections. |
aviso_auth_requests_total | counter | mode, outcome | Authentication attempts. |
Process-level metrics (CPU, memory, open FDs) are automatically collected on Linux.
notification_backend
| Field | Type | Default | Notes |
|---|---|---|---|
kind | string | none | jetstream or in_memory. |
in_memory | object | optional | Used when kind = in_memory. |
jetstream | object | optional | Used when kind = jetstream. |
notification_backend.in_memory
| Field | Type | Default | Notes |
|---|---|---|---|
max_history_per_topic | usize | 1 | Retained messages per topic in memory. |
max_topics | usize | 10000 | Max tracked topics before LRU-style eviction. |
enable_metrics | bool | false | Enables extra internal metrics logs. |
See InMemory Backend for operational caveats.
notification_backend.jetstream
| Field | Type | Default | Runtime usage summary |
|---|---|---|---|
nats_url | string | nats://localhost:4222 | NATS connection URL. |
token | string? | None | Token auth; NATS_TOKEN env fallback. |
timeout_seconds | u64? | 30 | NATS connection timeout for each startup connect attempt (> 0). |
retry_attempts | u32? | 3 | Startup connect attempts before backend init fails (> 0). |
max_messages | i64? | None | Stream message cap. |
max_bytes | i64? | None | Stream size cap in bytes. |
retention_time | string? | None | Default stream max age (s, m, h, d, w; for example 30d). |
storage_type | string? | file | file or memory (parsed as typed enum at config load). |
replicas | usize? | None | Stream replicas. |
retention_policy | string? | limits | limits/interest/workqueue (parsed as typed enum at config load). |
discard_policy | string? | old | old/new (parsed as typed enum at config load). |
enable_auto_reconnect | bool? | true | Enables/disables NATS client reconnect behavior. |
max_reconnect_attempts | u32? | 5 | Mapped to NATS max_reconnects (0 => unlimited). |
reconnect_delay_ms | u64? | 2000 | Reconnect delay and startup connect retry backoff (> 0). |
publish_retry_attempts | u32? | 5 | Retry attempts for transient publish channel closed failures (> 0). |
publish_retry_base_delay_ms | u64? | 150 | Base backoff in milliseconds for publish retries (> 0). |
See JetStream Backend for detailed behavior.
notification_schema.<event_type>.payload
Schema-level payload contract for notify requests.
| Field | Type | Example | Notes |
|---|---|---|---|
required | bool | true | When true, /notification rejects requests without payload. |
Behavior details and edge cases are documented in Payload Contract.
notification_schema.<event_type>.storage_policy
Optional per-schema storage settings validated at startup against selected backend capabilities.
| Field | Type | Example | Notes |
|---|---|---|---|
retention_time | string | 7d, 12h, 30m | Duration literal (s, m, h, d, w). |
max_messages | integer | 100000 | Must be > 0. |
max_size | string | 512Mi, 2G | Size literal (K, Ki, M, Mi, G, Gi, T, Ti). |
allow_duplicates | bool | true | Backend support is capability-gated. |
compression | bool | true | Backend support is capability-gated. |
Field behavior:
retention_timeoverrides backend-level retention for the schema stream.max_messagesoverrides backend-level message cap for the schema stream.max_sizeoverrides backend-level byte cap for the schema stream.allow_duplicates = falsemaps to one message per subject (latest kept);trueremoves this cap.compression = trueenables stream compression when backend supports it.
Startup behavior:
- Invalid
retention_time/max_sizeformat fails startup. - Unsupported fields for selected backend fail startup.
- Validation happens before backend initialization.
- With
in_memory, allstorage_policyfields are currently unsupported (startup fails if provided).
Runtime application behavior:
storage_policyis applied on stream create and reconciled for existing JetStream streams when those streams are accessed by Aviso.- Aviso-managed stream subject binding is also reconciled to the expected
<base>.>pattern. - Mutable fields (retention/limits/compression/duplicates/replicas) are updated when drift is detected.
- Recreate stream(s) only when you need historical data physically rewritten with new settings.
Example:
notification_backend:
kind: jetstream
jetstream:
nats_url: "nats://localhost:4222"
publish_retry_attempts: 5
publish_retry_base_delay_ms: 150
notification_schema:
dissemination:
topic:
base: "diss"
key_order: ["destination", "target", "class", "expver", "domain", "date", "time", "stream", "step"]
storage_policy:
retention_time: "7d"
max_messages: 2000000
max_size: "10Gi"
allow_duplicates: true
compression: true
watch_endpoint
| Field | Type | Default | Notes |
|---|---|---|---|
sse_heartbeat_interval_sec | u64 | 30 | SSE heartbeat period. |
connection_max_duration_sec | u64 | 3600 | Maximum live watch duration. |
replay_batch_size | usize | 100 | Historical fetch batch size. |
max_historical_notifications | usize | 10000 | Replay cap for historical delivery. |
replay_batch_delay_ms | u64 | 100 | Delay between historical replay batches. |
concurrent_notification_processing | usize | 15 | Live stream CloudEvent conversion concurrency. |
Custom config file path
Set AVISOSERVER_CONFIG_FILE to use a specific config file instead of the default search cascade:
AVISOSERVER_CONFIG_FILE=/path/to/config.yaml cargo run
When set, only this file is loaded as a file source (startup fails if it does not exist). The default locations (./configuration/config.yaml, /etc/aviso_server/config.yaml, $HOME/.aviso_server/config.yaml) are skipped. AVISOSERVER_* field-level overrides still apply on top.
Environment override examples
AVISOSERVER_APPLICATION__HOST=0.0.0.0
AVISOSERVER_APPLICATION__PORT=8000
AVISOSERVER_NOTIFICATION_BACKEND__KIND=jetstream
AVISOSERVER_NOTIFICATION_BACKEND__JETSTREAM__NATS_URL=nats://localhost:4222
AVISOSERVER_NOTIFICATION_BACKEND__JETSTREAM__TOKEN=secret
AVISOSERVER_WATCH_ENDPOINT__REPLAY_BATCH_SIZE=200
AVISOSERVER_AUTH__ENABLED=true
AVISOSERVER_AUTH__JWT_SECRET=secret
AVISOSERVER_METRICS__ENABLED=true
AVISOSERVER_METRICS__PORT=9090
Authentication
Authentication is optional. When enabled, Aviso supports two modes:
- Direct — Aviso forwards
BearerorBasiccredentials to auth-o-tron, which returns a signed JWT. - Trusted proxy — an upstream reverse proxy authenticates the user and forwards a signed JWT; Aviso validates it locally.
How It Works
- Client sends credentials to Aviso.
- Middleware resolves user identity:
- direct: forwards the
Authorizationheader to auth-o-tronGET /authenticateand receives a JWT back. - trusted_proxy: validates the forwarded
Authorization: Bearer <jwt>locally usingjwt_secret.
- direct: forwards the
- Username, realm, and roles are extracted from JWT claims and attached to the request.
- Route handlers enforce per-stream auth rules on
notify,watch, andreplay. - Admin endpoints (
/api/v1/admin/*) always require a valid JWT with an admin role.
Schema endpoints (GET /api/v1/schema, GET /api/v1/schema/{event_type}) are always publicly accessible, even when auth is enabled.
Quick Start (Direct Mode)
1. Start auth-o-tron
# Foreground (Ctrl+C to stop):
./scripts/auth-o-tron-docker.sh start
# Background:
./scripts/auth-o-tron-docker.sh start --detach
By default this uses scripts/example_auth_config.yaml.
To use your own config:
AUTH_O_TRON_CONFIG_FILE=/path/to/auth-config.yaml ./scripts/auth-o-tron-docker.sh start
The bundled example config defines three local test users in realm localrealm:
| User | Password | Role |
|---|---|---|
admin-user | admin-pass | admin |
reader-user | reader-pass | reader |
producer-user | producer-pass | producer |
2. Enable auth in config
auth:
enabled: true
mode: direct
auth_o_tron_url: "http://localhost:8080"
jwt_secret: "your-shared-secret" # must match auth-o-tron jwt.secret
admin_roles:
localrealm: ["admin"]
timeout_ms: 5000
Roles are realm-scoped: admin_roles maps each realm name to its authorized role list. A user must belong to a listed realm and hold one of that realm's roles.
3. Run aviso-server
Auth is now enforced for:
- Admin endpoints (
/api/v1/admin/*) — always require auth + admin role. - Stream endpoints (
/api/v1/notification,/api/v1/watch,/api/v1/replay) — only when the target schema setsauth.required: true.
For full field-level documentation, see the auth section in Configuration Reference.
Trusted Proxy Mode
Use trusted_proxy when Aviso sits behind a reverse proxy or API gateway that handles authentication. The proxy authenticates the user (via OIDC, SAML, etc.) and forwards a signed JWT to Aviso.
Aviso validates the forwarded Authorization: Bearer <jwt> locally using jwt_secret. Username and roles are read directly from JWT claims — no outbound call to auth-o-tron is made.
auth:
enabled: true
mode: trusted_proxy
jwt_secret: "shared-signing-secret"
admin_roles:
ecmwf: ["admin"]
auth_o_tron_url is not required in this mode.
Per-Stream Authentication
Streams support separate read and write access controls. Read access governs /watch and /replay; write access governs /notification.
Configure authentication per stream in your notification schema:
notification_schema:
# Public — no auth section means anonymous access
public_events:
payload:
required: true
topic:
base: "public"
# Authenticated — any valid user can read, only admins can write
internal_events:
payload:
required: true
topic:
base: "internal"
auth:
required: true
# Separate read/write roles
sensor_data:
payload:
required: true
topic:
base: "sensor"
auth:
required: true
read_roles:
internal: ["analyst", "consumer"]
external: ["partner"]
write_roles:
internal: ["producer"]
# Realm-wide read access using wildcard, restricted write
shared_events:
payload:
required: true
topic:
base: "shared"
auth:
required: true
read_roles:
internal: ["*"]
external: ["analyst"]
write_roles:
internal: ["producer", "operator"]
Read vs. write access defaults
auth.required | read_roles | write_roles | Read (watch/replay) | Write (notify) |
|---|---|---|---|---|
false or omitted | — | — | Anyone | Anyone |
true | omitted | omitted | Any authenticated user | Admins only |
true | set | omitted | Must match read_roles | Admins only |
true | omitted | set | Any authenticated user | Must match write_roles or be admin |
true | set | set | Must match read_roles | Must match write_roles or be admin |
Admins (users matching global admin_roles) always have both read and write access.
Role matching rules
Both read_roles and write_roles map realm names to role lists. A user's realm claim from the JWT must match a key in the map, and the user must hold at least one of that realm's listed roles.
- Wildcard
"*"— use["*"]as the role list to grant access to all users from a realm, regardless of their specific roles. - Omitted role list — when
read_rolesis omitted, any authenticated user can read. Whenwrite_rolesis omitted, only admins can write.
When a per-stream auth block is present, auth.required must be explicitly set to either true or false.
Admin Endpoints
Admin endpoints always require authentication and one of the configured admin_roles, regardless of per-stream settings:
DELETE /api/v1/admin/notification/{id}DELETE /api/v1/admin/wipe/streamDELETE /api/v1/admin/wipe/all
See Admin Operations for request/response details.
Disabling Authentication
auth:
enabled: false
Or omit the auth section entirely. When auth is disabled, all endpoints are publicly accessible.
Startup fails if global auth is disabled while any schema defines auth.required: true or non-empty auth.read_roles/auth.write_roles. Remove stream-level auth blocks before disabling global auth.
Client Usage
Bearer token (both modes)
# Watch an authenticated stream:
curl -N -H "Authorization: Bearer <jwt-token>" \
-X POST http://localhost:8000/api/v1/watch \
-H "Content-Type: application/json" \
-d '{"event_type": "private_events", "identifier": {}}'
Basic credentials (direct mode only)
# Notify with Basic auth:
curl -X POST http://localhost:8000/api/v1/notification \
-u "admin-user:admin-pass" \
-H "Content-Type: application/json" \
-d '{"event_type": "ops_events", "identifier": {"event_type": "deploy"}, "payload": "ok"}'
In direct mode, Aviso forwards Basic credentials to auth-o-tron, which authenticates the user and returns a JWT. The response JWT is validated and used for authorization.
Error Responses
Auth errors use a subset of the standard API error shape with three fields (code, error, message; no details):
{
"code": "UNAUTHORIZED",
"error": "unauthorized",
"message": "Authorization header is required"
}
| Code | HTTP Status | When |
|---|---|---|
UNAUTHORIZED | 401 | Missing Authorization header, invalid token format, expired or bad signature. |
FORBIDDEN | 403 | Valid credentials but user lacks the required role for the stream or admin endpoint. |
SERVICE_UNAVAILABLE | 503 | auth-o-tron is unreachable or returned an unexpected error (direct mode only). |
A 401 response includes a WWW-Authenticate header indicating the supported scheme (Bearer in trusted-proxy mode; Bearer, Basic in direct mode).
Streaming Semantics
This page defines the exact behavior of the watch and replay endpoints, including start points, spatial filtering, identifier constraints, and SSE lifecycle events.
SSE Stream Lifecycle
Every streaming response (watch or replay) goes through a typed lifecycle:
stateDiagram-v2
[*] --> Connected : SSE connection established
Connected --> Replaying : from_id or from_date provided
Connected --> Live : no replay parameters (watch only)
Replaying --> Live : replay_completed (watch only)
Replaying --> Closed : end_of_stream (replay only)
Live --> Closed : max_duration_reached
Live --> Closed : server_shutdown
Closed --> [*]
Close reasons emitted in the final connection_closing SSE event:
| Reason | Trigger |
|---|---|
end_of_stream | Replay finished (/replay endpoint, or watch replay phase if live subscribe fails) |
max_duration_reached | connection_max_duration_sec elapsed on a watch stream |
server_shutdown | Server is shutting down gracefully |
POST /api/v1/watch
- If both
from_idandfrom_dateare omitted:- stream is live-only (new notifications from now onward).
- If exactly one replay parameter is present:
- historical replay starts first, then transitions to live stream.
- If both are present:
- request is rejected with
400.
- request is rejected with
flowchart TD
A["Watch Request"] --> B{"from_id or<br/>from_date?"}
B -->|neither| C["Live-only stream"]
B -->|exactly one| D["Historical replay<br/>then live stream"]
B -->|both| E["400 Bad Request"]
style E fill:#8b1a1a,color:#fff
style C fill:#1a6b3a,color:#fff
style D fill:#1a4d6b,color:#fff
POST /api/v1/replay
- Requires exactly one replay start parameter:
from_id(sequence-based), orfrom_date(time-based).
- If both are missing or both are present:
- request is rejected with
400.
- request is rejected with
- Stream closes with
end_of_streamwhen history is exhausted.
Start Point for Historical Events
from_id starts delivery from that sequence number (inclusive).
from_date accepts any of these formats:
| Format | Example |
|---|---|
| RFC3339 with timezone | 2025-01-15T10:00:00Z |
| RFC3339 with offset | 2025-01-15T10:00:00+02:00 |
| Space-separated with timezone | 2025-01-15 10:00:00+00:00 |
| Naive datetime (interpreted as UTC) | 2025-01-15T10:00:00 |
| Unix seconds (≤ 11 digits) | 1740509903 |
| Unix milliseconds (≥ 12 digits) | 1740509903710 |
All inputs are normalized to UTC internally.
Spatial Filter Model
Spatial filtering applies on top of the identifier field filters. Think of it in two layers:
- Non-spatial identifier fields (
time,date,class, etc.) narrow candidates by topic routing. - Spatial fields (
polygonorpoint) further narrow that candidate set geographically.
flowchart LR
A["Candidate<br/>notifications"] -->|"non-spatial<br/>identifier filter"| B["Topic-matched<br/>subset"]
B -->|"spatial filter<br/>if provided"| C["Final<br/>results"]
Rules
identifier.polygon | identifier.point | Result |
|---|---|---|
| provided | omitted | polygon-intersects-polygon filter |
| omitted | provided | point-inside-notification-polygon filter |
| omitted | omitted | no spatial filter |
| provided | provided | 400 Bad Request |
identifier.polygon: keep notifications whose stored polygon intersects the request polygon.identifier.point: keep notifications whose stored polygon contains the request point.- Both together: invalid — request is rejected.
Identifier Constraints (watch / replay)
For schema-backed event types, identifier fields in watch/replay requests accept
constraint objects instead of (or in addition to) scalar values.
A scalar value is treated as an implicit eq constraint.
Constraint objects are rejected on /notification — notify only accepts scalar values.
Supported operators by field type
| Handler | Operators |
|---|---|
IntHandler | eq, in, gt, gte, lt, lte, between |
FloatHandler | eq, in, gt, gte, lt, lte, between |
EnumHandler | eq, in |
Notes
betweenexpects exactly two values[min, max]and is inclusive on both ends.- Float constraints reject
NaNandinf— only finite values are valid. - Float
eqandinuse exact numeric equality; no tolerance window is applied. - A constraint object must contain exactly one operator — combining operators in a single object is rejected.
Examples
{ "severity": { "gte": 5 } }
{ "severity": { "between": [3, 7] } }
{ "region": { "in": ["north", "south"] } }
{ "anomaly": { "lt": 50.0 } }
Backend Behavior
| Backend | Historical replay | Live watch |
|---|---|---|
in_memory | Node-local only; clears on restart | Node-local fan-out |
jetstream | Durable; survives restarts | Cluster-wide fan-out |
SSE Timestamp Format
All control, heartbeat, and close event timestamps use canonical UTC second precision:
YYYY-MM-DDTHH:MM:SSZ
Example: 2026-02-25T18:58:23Z
Replay Payload Shape
- Replay and watch CloudEvent output always includes
data.payload. - If a notify request omitted payload (optional schema), replay returns
data.payload = null. - Payload values are not reshaped — scalar strings remain strings, objects remain objects.
See Payload Contract for the full input → storage → output mapping.
For end-to-end examples, see:
Payload Contract
This page defines the canonical payload behavior for Aviso notifications.
Scope
- Applies to
POST /api/v1/notificationinput. - Applies to stored backend payload representation.
- Applies to replay/watch CloudEvent output payload field.
Schema Configuration
Per event schema, payload configuration is:
payload:
required: true # or false
There is no payload.type list.
Canonical Rules
- Payload values are JSON values:
object,array,string,number,boolean,null. - If
payload.required = trueand request omitspayload, request is rejected (400). - If
payload.required = falseand request omitspayload, Aviso stores canonical JSONnull. - Aviso does not wrap or reshape payload values (for example no auto-wrapping into
{"data": ...}).
Input to Storage to Replay Mapping
| Notify request payload | Stored payload | Replay/Watch CloudEvent data.payload |
|---|---|---|
| omitted (optional schema) | null | null |
"forecast complete" | "forecast complete" | "forecast complete" |
42 | 42 | 42 |
true | true | true |
["a","b"] | ["a","b"] | ["a","b"] |
{"note":"ok"} | {"note":"ok"} | {"note":"ok"} |
Failure Cases
- Missing required payload:
- HTTP
400 - validation error (
INVALID_NOTIFICATION_REQUEST)
- HTTP
- Malformed JSON request body:
- HTTP
400 - parse error (
INVALID_JSON)
- HTTP
Consumer Guidance
- Treat
data.payloadas dynamic JSON. - If your client requires object-only payloads, normalize on the client side.
- Example strategy: for non-object payloads, convert to
{"data": <payload>}in your consumer.
- Example strategy: for non-object payloads, convert to
Topic Encoding
Aviso uses a single backend-agnostic wire format for topics across all backends.
Why This Exists
NATS subject tokenization uses . as the separator. Wildcards (*, >) also operate on
dot-delimited tokens. If topic field values contain any of these reserved characters,
they would silently break routing and filtering.
Example of the problem:
Logical value: 1.45
Naive subject: mars.od.1.45 ← looks like 4 tokens, not 3
To prevent this, Aviso percent-encodes each token value before assembling the wire subject.
Encoding Rules
Only four characters are reserved and must be encoded:
| Character | Encoded form | Reason |
|---|---|---|
. | %2E | NATS token separator |
* | %2A | NATS single-token wildcard |
> | %3E | NATS multi-token wildcard |
% | %25 | Escape character itself (keeps decoding unambiguous) |
All other characters pass through unchanged.
Encode → Wire → Decode Flow
flowchart LR
A["Logical value<br/>e.g. 1.45"] -->|encode| B["Wire token<br/>e.g. 1%2E45"]
B -->|assemble| C["Wire subject<br/>e.g. extreme_event.north.1%2E45"]
C -->|"split on '.'"| D["Wire tokens"]
D -->|decode each| E["Logical tokens<br/>e.g. 1.45"]
style A fill:#2a4a6b,color:#fff
style C fill:#1a6b3a,color:#fff
style E fill:#2a4a6b,color:#fff
The decoder is strict: malformed %HH sequences (e.g. %GG) are rejected, not passed through.
Examples
Encoding
| Logical value | Wire token |
|---|---|
1.45 | 1%2E45 |
1*34 | 1%2A34 |
1>0 | 1%3E0 |
1%25 | 1%2525 |
Decoding (single pass)
| Wire token | Logical value |
|---|---|
1%2E45 | 1.45 |
1%2A34 | 1*34 |
1%2525 | 1%25 |
1%25 | 1% |
Impact on Wildcard Matching
Watch and replay requests use a two-step filter:
- Backend coarse filter — operates on wire subjects (NATS wildcard matching)
- App-level wildcard match — operates on decoded logical tokens
Both steps are safe with reserved characters because the app layer always decodes before matching. Subscribers never need to think about encoding in their filter values — Aviso handles it transparently.
Invariants
- Wire subject separator is always
. - One shared codec is used for all backends (JetStream and In-Memory)
- Encoding is applied per-token, not per-subject
- Decoding is a strict single-pass operation
API Errors
Aviso returns a consistent JSON error object for 4xx and 5xx responses.
Response Shape
{
"code": "INVALID_REPLAY_REQUEST",
"error": "Invalid Replay Request",
"message": "Replay endpoint requires either from_id or from_date parameter...",
"details": "Replay endpoint requires either from_id or from_date parameter..."
}
Fields:
code: stable machine-readable error code.error: human-readable error category.message: top-level failure message (safe for client display).details: deepest/root detail available.
Notes:
error_chainis logged server-side for diagnostics, but is not returned in API responses.messageanddetailsare always present for both4xxand5xxerror responses.
Error Telemetry Events
These event_name values are emitted in structured logs:
| Event Name | Level | Trigger |
|---|---|---|
api.request.parse.failed | warn | JSON parse/shape/unknown-field failure before domain validation. |
api.request.validation.failed | warn | Domain/request validation failure (400). |
api.request.processing.failed | error | Server-side processing/storage failure (500). |
stream.sse.initialization.failed | error | Replay/watch SSE initialization failure (500). |
For SSE setup failures, response also includes:
topic: decoded logical topic.request_id: request correlation id.
Error Code Reference
| Code | HTTP Status | Meaning |
|---|---|---|
INVALID_JSON | 400 | Request body is not valid JSON. |
UNKNOWN_FIELD | 400 | Request contains fields outside API contract. |
INVALID_REQUEST_SHAPE | 400 | JSON structure cannot be deserialized into request model. |
INVALID_NOTIFICATION_REQUEST | 400 | Notification request failed business validation. |
INVALID_WATCH_REQUEST | 400 | Watch request failed validation (replay/spatial/schema rules). |
INVALID_REPLAY_REQUEST | 400 | Replay request failed validation (start cursor/spatial/schema rules). |
UNAUTHORIZED | 401 | Missing or invalid credentials (no token, bad format, expired, bad signature). |
FORBIDDEN | 403 | Valid credentials but user lacks the required role. |
NOTIFICATION_PROCESSING_FAILED | 500 | Notification processing pipeline failed before storage. |
NOTIFICATION_STORAGE_FAILED | 500 | Backend write operation failed. |
SSE_STREAM_INITIALIZATION_FAILED | 500 | Replay/watch SSE stream could not be created. |
INTERNAL_ERROR | 500 | Fallback internal error code (reserved). |
SERVICE_UNAVAILABLE | 503 | Auth service (auth-o-tron) unreachable or returned an unexpected error. |
Examples
Invalid replay request:
{
"code": "INVALID_REPLAY_REQUEST",
"error": "Invalid Replay Request",
"message": "Cannot specify both from_id and from_date...",
"details": "Cannot specify both from_id and from_date..."
}
Auth error (missing credentials on a protected stream).
Auth errors use three fields (code, error, message); details is not included:
{
"code": "UNAUTHORIZED",
"error": "unauthorized",
"message": "Authentication is required for this stream"
}
SSE initialization failure:
{
"code": "SSE_STREAM_INITIALIZATION_FAILED",
"error": "SSE stream creation failed",
"message": "Failed to create stream consumer",
"details": "nats connect failed: timeout",
"topic": "test_polygon.*.1200",
"request_id": "0d4f6758-1ce3-4dda-a0f3-0ccf5fcb50d6"
}
Admin Operations
Admin endpoints are destructive. Restrict access in production.
When authentication is enabled, all admin endpoints require a valid credential and one of the configured admin_roles. Add -H "Authorization: Bearer <token>" or -u user:pass (direct mode) to the curl examples below.
Delete One Notification
DELETE /api/v1/admin/notification/{notification_id}
notification_id format:
<stream>@<sequence>(canonical)<event_type>@<sequence>(alias; resolved through configured schematopic.base)
How notification_id maps to your schema
Delete IDs use the stream key plus backend sequence number.
If your schema contains:
notification_schema:
mars:
topic:
base: "mars"
dissemination:
topic:
base: "diss"
test_polygon:
topic:
base: "polygon"
Then valid delete IDs include:
mars@42(event type alias and stream key are same)dissemination@42(alias form, resolved to stream keydiss)diss@42(canonical stream key form)test_polygon@306(alias form, resolved to stream keypolygon)polygon@306(canonical stream key form)
Example: replay ID then delete
Replay returns CloudEvent IDs like mars@1:
curl -N -X POST "http://127.0.0.1:8000/api/v1/replay" \
-H "Content-Type: application/json" \
-d '{
"event_type": "mars",
"identifier": {
"class": "od",
"expver": "0001",
"domain": "g",
"date": "20250706",
"time": "1200",
"stream": "enfo",
"step": "1"
},
"from_id": "1"
}'
If one replayed event has "id":"mars@1", delete it with:
curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/notification/mars@1"
Response behavior
200: notification deleted.404: stream/sequence pair not found.400: invalid ID format (<name>@<positive-integer>required).
Invalid examples:
mars(missing@sequence)mars@0(sequence must be > 0)mars@abc(sequence must be an integer)
Wipe Endpoints
DELETE /api/v1/admin/wipe/streamDELETE /api/v1/admin/wipe/all
These endpoints remove many messages at once and should be used with extreme caution.
Wipe One Stream
DELETE /api/v1/admin/wipe/stream
Request body:
{
"stream_name": "MARS"
}
Example:
curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/wipe/stream" \
-H "Content-Type: application/json" \
-d '{"stream_name":"MARS"}'
What it does:
- Removes all stored messages for the selected stream.
- Keeps the stream definition/configuration in place.
- New notifications can still be written to that stream immediately after wipe.
When to use:
- You want to reset one event family (
mars,diss,polygon) without affecting others.
Wipe All Streams
DELETE /api/v1/admin/wipe/all
Example:
curl -X DELETE "http://127.0.0.1:8000/api/v1/admin/wipe/all"
What it does:
- Removes all stored messages from all streams managed by the backend.
- Leaves service configuration intact, but data history is gone.
When to use:
- Local/dev reset before a fresh test run.
- Operational emergency cleanup where full history removal is intended.
Which Admin Operation Should I Use?
- Delete one notification (
/admin/notification/{id}):- Use when you know the exact sequence to remove.
- Wipe one stream (
/admin/wipe/stream):- Use when one stream is polluted and others must remain untouched.
- Wipe all (
/admin/wipe/all):- Use only when complete history reset is intended.
Wipe Response Shape
Both wipe endpoints return:
{
"success": true,
"message": "..."
}
Failure responses keep the same shape with success: false and an error message.
Architecture
Aviso Server is built around three operations — Notify, Watch, and Replay — each sharing a common validation and schema layer but diverging at the backend interaction.
System Overview
graph TB
subgraph Clients
P(Publisher)
W(Watcher)
R(Replayer)
end
subgraph "Aviso Server"
direction TB
AM["Auth Middleware<br/>(optional)"]
RT["Routes<br/>HTTP handlers"]
VP["Validation &<br/>Processing"]
NC["Notification<br/>Core"]
BE["Backend<br/>Abstraction"]
end
AOT["auth-o-tron<br/>(external)"]
subgraph Backend
JS[("JetStream<br/>NATS")]
IM[("In-Memory<br/>Process")]
end
P -->|POST /api/v1/notification| AM
W -->|POST /api/v1/watch| AM
R -->|POST /api/v1/replay| AM
AM -.->|verify credentials| AOT
AM --> RT
RT --> VP
VP --> NC
NC --> BE
BE --> JS
BE --> IM
JS -.->|SSE stream| W
JS -.->|SSE stream| R
IM -.->|SSE stream| W
IM -.->|SSE stream| R
Notify Request Flow
When a publisher sends POST /api/v1/notification:
sequenceDiagram
participant C as Publisher
participant A as Auth Middleware
participant R as Route Handler
participant V as Validator
participant P as Processor
participant T as Topic Builder
participant B as Backend
C->>A: POST /api/v1/notification (JSON)
alt stream requires auth
A->>A: resolve user (JWT or auth-o-tron)
A-->>C: 401/403 if unauthorized
end
A->>R: forward request (+ user identity)
R->>V: parse & shape-check JSON
V-->>R: 400 if malformed
R->>P: process_notification_request()
P->>P: look up event schema
P->>P: validate each identifier field
P->>P: canonicalize values (dates, enums)
P->>T: build_topic_with_schema()
T-->>P: topic string (e.g. mars.od.0001.g.20250706.1200)
P->>B: put_message_with_headers()
B-->>C: 200 { id, topic }
Key steps:
- Parse — raw JSON bytes are deserialized; unknown fields are rejected (
UNKNOWN_FIELD) - Validate — each identifier field is checked against its
ValidationRules(type, range, enum values) - Canonicalize — values are normalized (e.g. dates to
YYYYMMDD, enums to lowercase) - Build topic — fields are ordered per
key_order, reserved chars are percent-encoded - Store — the message is written to the backend with the encoded topic as the subject
Watch Request Flow
POST /api/v1/watch opens a persistent SSE stream. It optionally starts with a historical
replay phase before transitioning to live delivery.
sequenceDiagram
participant C as Subscriber
participant A as Auth Middleware
participant R as Route Handler
participant P as Stream Processor
participant F as Hybrid Filter
participant B as Backend
C->>A: POST /api/v1/watch (JSON)
alt stream requires auth
A->>A: resolve user (JWT or auth-o-tron)
A-->>C: 401/403 if unauthorized
end
A->>R: forward request (+ user identity)
R->>P: process_request (ValidationConfig::for_watch)
P->>P: allow optional fields & constraint objects
P->>P: analyze_watch_pattern() → coarse + precise patterns
alt has from_id or from_date
P->>B: fetch historical batch
B-->>P: NotificationMessage[]
P->>F: apply wildcard + constraint + spatial filter
F-->>C: SSE: replay_started → events → replay_completed
end
P->>B: subscribe(coarse_pattern)
loop live stream
B-->>P: live NotificationMessage
P->>F: apply precise filter
F-->>C: SSE: notification event
end
C-->>R: disconnect / timeout
R-->>C: SSE: connection_closing
Replay Request Flow
POST /api/v1/replay is like watch but historical-only — the stream closes when history ends.
sequenceDiagram
participant C as Client
participant A as Auth Middleware
participant R as Route Handler
participant P as Stream Processor
participant B as Backend
C->>A: POST /api/v1/replay (JSON + from_id or from_date)
alt stream requires auth
A->>A: resolve user (JWT or auth-o-tron)
A-->>C: 401/403 if unauthorized
end
A->>R: forward request (+ user identity)
R->>P: process_request (ValidationConfig::for_replay)
P->>B: batch fetch from StartAt::Sequence or StartAt::Date
loop batches
B-->>P: NotificationMessage[]
P->>P: filter + convert to CloudEvent
P-->>C: SSE: notification events
end
P-->>C: SSE: replay_completed → connection_closing (end_of_stream)
SSE Streaming Pipeline
The streaming layer (src/sse/) is built around typed values rather than raw strings,
which keeps the lifecycle explicit and the endpoint logic thin.
Cursor types — how a start point is represented internally:
StartAt::LiveOnly— no history, subscribe immediatelyStartAt::Sequence(u64)— start from a specific backend sequence numberStartAt::Date(DateTime<Utc>)— start from a UTC timestamp
Frame types — what the stream produces before rendering to SSE wire format:
- Control frames —
connection_established,replay_started,replay_completed - Notification frames — a decoded CloudEvent ready for delivery
- Heartbeat frames — periodic keep-alive
- Error frames — non-fatal stream errors
- Close frame — carries one of:
end_of_stream,max_duration_reached,server_shutdown
Lifecycle (shutdown token, max duration, natural end) is applied once in a shared wrapper, so individual endpoint handlers don't need to reimplement it.
Component Map
| Component | Path | Role |
|---|---|---|
| Routes | src/routes/ | Thin HTTP handlers — parse request, delegate, return response |
| Auth | src/auth/ | Middleware, JWT validation, role matching, auth-o-tron client |
| Handlers | src/handlers/ | Shared parsing, validation, and processing logic |
| Notification core | src/notification/ | Schema registry, topic builder/codec/parser, wildcard matcher, spatial |
| Backend abstraction | src/notification_backend/ | NotificationBackend trait + JetStream and InMemory implementations |
| SSE layer | src/sse/ | Stream composition, typed frames, heartbeats, lifecycle |
| CloudEvents | src/cloudevents/ | Converts stored messages into CloudEvent envelope |
| Configuration | src/configuration/ | Config loading, schema validation, global snapshot |
| Error model | src/error.rs | Stable HTTP error codes and structured responses |
Hybrid Filtering
Watch subscriptions use a two-tier strategy to balance backend load with filter precision:
graph LR
A[Watch Request] --> B[analyze_watch_pattern]
B --> C["Coarse pattern<br/>e.g. mars.*.*.*"]
B --> D["Precise pattern<br/>full decoded topic"]
C -->|backend subscription| E[(NATS JetStream)]
E -->|candidate messages| F[App-level filter]
D --> F
F -->|matched| G[SSE client]
F -->|rejected| H[dropped]
- The coarse pattern is sent to the backend as the subscription subject filter. It uses NATS wildcards and covers a superset of the desired messages.
- The precise pattern is applied in-process on decoded topics + constraint objects + spatial checks. Only messages that pass both layers reach the client.
This avoids creating one backend subscription per unique topic while still delivering exact results.
JetStream Backend Internals
| Module | Path | Responsibility |
|---|---|---|
| Config | notification_backend/jetstream/config.rs | Decode and validate JetStream settings |
| Connection | notification_backend/jetstream/connection.rs | NATS connect with retry |
| Streams | notification_backend/jetstream/streams.rs | Create and reconcile streams |
| Publisher | notification_backend/jetstream/publisher.rs | Publish with retry on transient failures |
| Subscriber | notification_backend/jetstream/subscriber.rs | Consumer-based live subscriptions |
| Replay | notification_backend/jetstream/replay.rs | Pull consumer batch retrieval |
| Admin | notification_backend/jetstream/admin.rs | Wipe and delete operations |
Backend Development Guide
This guide explains how to add a new notification backend in Aviso.
Required Contract
A backend must implement NotificationBackend in src/notification_backend/mod.rs.
Core requirements:
- Implement publish/replay/subscribe/admin methods.
- Implement
capabilities()and return a stableBackendCapabilitiesmap. - Keep startup/shutdown behavior explicit and logged.
Storage Policy Compatibility
Per-schema storage policy is validated at startup before backend initialization.
Validation entry point:
configuration::validate_schema_storage_policy_support(...)
Capability source:
notification_backend::capabilities_for_backend_kind(...)
If a schema requests unsupported fields, startup fails fast with a clear error. Do not silently ignore unsupported storage policy fields.
Capability Checklist
When adding backend <new_backend>:
- Add backend kind support in
capabilities_for_backend_kind. - Add
NotificationBackend::capabilities()implementation. - Ensure capability values match real backend behavior.
- Add tests for:
- capability map values
- accepted storage-policy fields
- rejected storage-policy fields
- Add backend docs page and update summary links.
Minimal Capability Example
#![allow(unused)] fn main() { BackendCapabilities { retention_time: true, max_messages: true, max_size: false, allow_duplicates: false, compression: false, } }
Meaning:
retention_timeandmax_messagescan be used in schema storage policy.max_size,allow_duplicates, andcompressionmust be rejected at startup.
Testing Expectations
- Unit tests should verify capability flags are stable.
- Validation tests should verify fail-fast messages for unsupported fields.
- Integration tests should use test-local config/schema fixtures, not developer-local YAML files.