Aviso is a system developed at ECMWF with the aim of:
- Notifying for data availability
- Real-Time Model Output Data
- Product dissemination via ECPDS
- Meant for automated systems
- Based on an Event system supporting a mechanism When <this> … Do <that> …
It allows users to:
- Define which events to be notified of.
- Define the triggers to be executed once a notification is received.
- Dispatch notifications to the notification server.
Contents
Quick Start
Aviso can be used as a Python API or as Command-Line Interface (CLI) application. Here a few steps to quickly get a working configuration listening to notifications.
Note, this guide assumes the user to have a ECMWF account.
- Create a VM with the
data
flavour. This will automatically install ECMWF packages such as MARS and Aviso. Configure MARS credentials, follow the instructions in EWC MARS access. Aviso uses the same credentials.
Create a configuration file in the default location
~
/.aviso/config.yaml
.Below an example for products notifications:
listeners: - event: dissemination request: destination: <user_destination> class: od expver: 1 domain: g stream: enfo step: [1,2,3] triggers: - type: echo
Note the
dissemination
event listener.request
describes for which dissemination event users want to execute the triggers. It is made of a set of fields. Users have to specify only the fields they wants to use as filters.destination
is a mandatory field and it is associated to one or more destinations which are linked to the user's ECMWF account. Only the notifications complying with all the fields defined will execute the trigger. The trigger in this example isecho
. This will simply print out the notification to the console output.Below an example for real time model output notifications:
listeners: - event: mars request: class: od expver: 1 domain: g stream: enfo step: [1,2,3] triggers: - type: echo
Note the
mars
event listener. destination field is not present here.
Activate base conda environment. Aviso is part of it.
conda activate
Launch the aviso application
aviso listen
Once in execution this command will create a process waiting for notifications. Users can terminate the application by typing
CTRL+C
.Note, the configuration file is only read at start time, therefore every time users make changes to it they need to restart the listening process.
Upgrading pre-existent installation
To upgrade Aviso to a newer version than the one available at the time of creating the VM:
Ensure the following settings are present in the configuration file located at /etc/aviso/config.yaml :
username_file: ~/.marsrc/mars.email key_file: ~/.marsrc/mars.token notification_engine: type: etcd_rest host: aviso.ecmwf.int port: 443 https: true configuration_engine: type: etcd_rest host: aviso.ecmwf.int port: 443 https: true schema_parser: ecmwf remote_schema: True auth_type: ecmwf
Run the following command:
pip3 install --upgrade pyaviso
Testing my listener
Aviso provides the capability of submitting test notifications to a local server. This functionality can be used to test the listener configuration without any impact to the operational server.
Launch the aviso application in test mode. This allows to connect to a local file-based notification server, part of the aviso application, that is able to simulate the notification server behaviour.
aviso listen --test
The console should display a
Test Mode
message.Send a test
dissemination
notification. From another terminal run thenotify
command. Here an example, matching the example configuration presented above:aviso notify event=dissemination,target=E1,class=od,date=20190810,destination=<user_destination>,domain=g,expver=1,step=1,stream=enfo,time=0,location=xxxx --test
Note the list of fields required for a
Note, to submit a testdissemination
event, the order is not important, but the command requires all of them. Thedestination
has to match the one of the listener configuration.mars
notification the fieldsdestination, target
andlocation
must be removed.After a few seconds, the console output should display the notification, as the trigger is set to
echo
.
Defining my listener
Aviso configuration file allows the definition of multiple listeners. Alternatively, the listeners configuration can be indicated as an independent file or multiple files:
aviso listen listener1.yaml listener2.yaml
Regardless where is defined, each listener is composed of:
- an
event
type - a
request
block - a
triggers
block
Event
Aviso offers notifications for the following types of events:
- The
dissemination
event is submitted by the product generation. The related listener configuration must define thedestination
field. A notification related to adissemination
event will have the fieldlocation
containing the URL to the product notified. The
mars
event is designed for real-time data from the model output. The related listener configuration has no mandatory fields. Moreover the related notification will not contain thelocation
field because users will have to access to it by the conventional MARS API.
Request
The table below shows the full list of fields accepted in a request
block. These fields represent a subset on the MARS language.
Field | Type | Event | Optional/Mandatory |
---|---|---|---|
destination | String, uppercase | dissemination | Mandatory |
target | String | dissemination | Optional |
class | Enum | All | Optional |
stream | Enum | All | Optional |
domain | Enum | All | Optional |
expver | Integer | All | Optional |
date | Date (e.g.20190810) | All | Optional |
time | Values: [0,6,12,18] | All | Optional |
step | Integer | All | Optional |
Triggers
The triggers
block accepts a sequence of triggers. Each trigger will result in an independent process executed every time a notification is received. This sections shows the type of triggers currently available.
Echo
This is the simplest trigger as it prints the notification to the console output. It is used for testing and it does not accept any extra parameters.
triggers: - type: echo
Log
This trigger logs the event to a log file. It is useful for recording the received event. Note, it will fail if the directory does not exist.
triggers: - type: log path: testLog.log
Command
This trigger allows the user to define a shell command to work with the notification.
triggers: - type: command working_dir: $HOME/aviso/examples command: ./script.sh --date ${request.date} -s ${request.stream} environment: STEP: ${request.step} TIME: "The time is ${request.time}"
command
is the command that will be executed for each notification received. This is a mandatory field.environment
is a user defined list of local variables that will be passed to the command shell. This is an optional field.working_dir
defines the working directory that will be set before executing the command. This is an optional field.
Moreover, the system performs a parameter substitution in the command
and environment
fields, for every sequence of the pattern:
-
${name}
, it replaces it with the value associated to the corresponding key found in the notification received. ${json}
, it replaces it with the whole notification formatted as a JSON inline string.${jsonpath}
, it replaces it with the file name of a JSON file containing the notification.
A notification is a dictionary whose keys can be used in the parameter substitution mechanism described above. Here an example of a notification:
{ "event": "dissemination", "request": { "class": "od", "date": "20191112", "destination": "FOO", "domain": "g", "expver": "0001", "step": "001", "stream": "enfo", "time": "18", "target": "E1" }, "location": "s3://storage.ecmwf.europeanweather.cloud/ecpds/xxx.xx" }
Here an example file of a listener command triggering a bash script executing a MARS request.
Post
This trigger allows the user to send as HTTP POST message the notification received and formatted accordingly to the CloudEvents specification
triggers: - type: post protocol: type: cloudevents_http url: http://my.notification.system/api
This is the basic configuration. More parameters can be specified to customise the CloudEvents message. More info the reference documentation.
The CloudEvents message sent would look like the following:
{ "type" : "aviso", # this is customisable by the user "data": { # this is aviso specific "event": "dissemination", "request": { "target": "E1", "class": "od", "date": "20190810", "destination": "FOO", "domain": "g", "expver": "1", "step": "1", "stream": "enfo", "time": "0", }, "location": "s3://data.ecmwf.int/diss/foo/bar/20190810/xyz", # location on ceph or s3 }, "datacontenttype": "application/json", "id": "0c02fdc5-148c-43b5-b2fa-cb1f590369ff", # UUID random generated by aviso "source": "https://aviso.ecmwf.int", # this is customisable by the user "specversion": "1.0", "time": "2020-03-02T13:34:40.245Z", # Timestamp of when this message is created }
Aviso can also post events to a AWS Simple Notification Service (SNS) topic. See the relevant documentation page for more details: https://pyaviso.readthedocs.io/en/latest/reference/triggers.html#post
Aviso as a Python API
Aviso can be used as a Python API. This is intended for users that want to integrate Aviso in a bigger workflow written in Python or that simply have their trigger defined as a Python function. Below an example of a python script that defines a function to be executed once a notification is received, creates a listener that references to this function trigger and finally passes it to aviso to execute.
from pyaviso import NotificationManager # define function to be called def do_something(notification): print(f"Notification for step {notification['request']['step']} received") # now do something useful with it ... # define the trigger trigger = {"type": "function", "function": do_something} # create a event listener request that uses that trigger request = {"class": "od", "stream": "oper", "expver": 1, "domain": "g", "step": 1} listeners = {"listeners": [{"event": "mars", "request": request, "triggers": [trigger]}]} # run it aviso = NotificationManager() aviso.listen(listeners=listeners)
Here an example file of a Python script running Aviso and executing a MARS request after a notification is received.
Dealing with past notifications
Before listening to new notifications, Aviso by default checks what was the last notification received and it will then return all the notifications that have been missed since. It will then carry on by listening to new ones. The first ever time the application runs however no previous notification will be returned. This behaviour allows users not to miss any notifications in case of machine reboots.
To override this behaviour by ignoring the missed notifications while listening only to the new ones, run the following:
aviso listen --now
This command will also reset the previous history.
Users can also explicitly replay past notifications. Aviso can deliver notifications from the ECMWF server up to 14 days in the past. This can also be used to test the listener configuration with real notifications.
Here an example, launch Aviso with the following options:
aviso listen --from 2020-01-20T00:00:00.0Z --to 2020-01-21T00:00:00.0Z
It will replay all the notifications sent from 20 January to 21 January and the ones complying with the listener request will execute the triggers.
Note, the dates must be in the past and --to
can only be defined together with --from
. The dates are defined in ISO format and they are in UTC.
In absence of --to
, the system after having retrieved the past notifications, it will continue listening to future notifications. If --to
is defined Aviso will terminate once retrieved all the past notifications.
Running as a service
Aviso can be executed as a system service. This helps automating its restart in case of machine reboots. The following steps help to configure Aviso to run as a service that automatically restart:
Create a system service unit, by creating the following file in /etc/systemd/system/aviso.service:
[Unit] Description=Aviso [Service] User=<username> (if omitted it will run as root) Group=<groupname> (optional) WorkingDirectory= <home_directory> (optional) ExecStart=/opt/anaconda3/bin/aviso listen Restart=always [Install] WantedBy=multi-user.target
Enable the aviso service:
sudo systemctl enable aviso.service
Reload systemd:
sudo systemctl daemon-reload
Start the service:
sudo systemctl start aviso.service
Note, if users change Aviso configuration, Aviso service must be restarted otherwise the change will be ineffective.
2 Comments
Martin Grønlien Pejcoch
I am getting 404 trying to reach the referenced examples:
Claudio Iacopino
quite right, the link was not updated! please try now