Parser Hooks¶
With parser hooks, you can augment the behaviour of existing parsers without having to change their source code.
Here are some examples the hooks can be used for:
- Renaming reading fields
- Filtering readings you don't want to see
- Converting reading values to different units
- Filtering packets before they hit the parser
- Writing data to profiles
Note
We are preparing more documentation for all Elixir modules and functions that can be used when writing hooks and when using the new parser behaviour callbacks.
Example Post-hook¶
This example post-hook will consume all readings that have been added in this parser pipeline.
For each reading, it will add the field temperature_f
, with the value of temperature
but converted from °C to °F.
The transform_reading_fields/1
callback is implemented so the reading list shows the unit correctly.
defmodule PostHook do
use Platform.Parsing.PostHook
def take_orb(orb) do
Orb.modify_reading(orb, fn reading ->
temperature = get(reading, [:data, :temperature])
temperature_f = round(temperature * (9.0 / 5.0) + 32)
Reading.add_value(reading, :temperature_f, temperature_f)
end)
end
def transform_reading_fields(existing_fields) do
{:ok,
[
%{
field: "temperature_f",
display: "Temperature",
unit: "°F"
}
| existing_fields
]}
end
end
Testing the post-hook¶
We can test if our post-hook works by heading to the Test
tab of the parser.
Here, we enter the value 00000015
as the Payload in hexadecimal.
After clicking the "Run Test" button, we can see that the main parser returns a reading with temperature
set to 21 °C, and the post-hook then modifies that reading, adding another field temperature_f
set to 70 °F:
We can also see that the transform_reading_fields/1
callback worked, by going to the General
tab of the parser:
Example main parser¶
For reference, this is the main parser module used to demonstrate the post-hook:
defmodule Parser do
use Platform.Parsing.Behaviour
def take_orb(orb) do
Orb.parse_packet(orb, fn <<temp::32>> = payload, slice ->
Orb.Slice.add_reading(slice, %{temperature: temp})
end)
end
def fields() do
[
%{field: "temperature", display: "Temperature", unit: "°C"}
]
end
end
New parser pipeline¶
When a packet is added to a device with a parser, it will take the following route through a parser configured with pre-hooks, the main parser and post-hooks:
- Preprocessing
In this step done by the system, data is preloaded and prepared for the callbacks in the parser and hooks
- Pre-hooks
- Main parser
- Post-hooks
Orbs, events and slices¶
In the new parser pipeline, every packet that goes in, and every reading that goes out is tracked as an event that can be modified or dropped. Even updating the profile data (fields) of a device tracked as an event so it can be undone.
The preprocessing stage adds only events of the type packet
to the data structure called Orb
. An orb has 2 lists of events, one is the outbox, containing all events that were added in the current stage of the current pipeline, and the inbox is the list of unconsumed events that were added by previous stages.
After each stage, the orb is turned, which simply means that the events in the outbox zone fall into the inbox zone for the next hook or the main parser.
Orb.consume/3
¶
Usually, a take_orb
implementation will start with a call to Orb.consume/3
.
This function takes the orb, a type of event and a function.
The given function is then called multiple times with a slice for each event.
The function must put new events into that slice and then return it.
Orb.consume(orb, :packet, fn slice ->
slice
|> Slice.add_reading(%{foo: 3})
|> Slice.put_fields(%{info: %{foo: "bar"}})
end)
Orbs¶
A data structure used while executing parsers.
The orb starts with packets in its input and is then passed through all pre-hooks, the main parser, and the post hooks.
Each step of the pipeline can consume a type of events from the orb and transform them into new events using the builtin functions in Platform.Parsing.Orb.Slice
.
For example:
Orb.parse_packet(orb, fn
<< 1 :: 8, t :: 32>>, slice ->
Slice.add_reading(slice, %{temperature: t / 10})
_, slice ->
Slice.drop(slice)
end)
Would consume all packets from the orb, in case of a packet with the type 1
, adds a temperature reading, and drops the packet otherwise, which stops the packet from being processed by later steps in the pipeline.
Functions of Orb
¶
The Orb module contains these functions as part of the public API:
-
turn(orb, next_phase \\ nil)
Warning
Parsers and Hooks usually don't need to call this function. If you feel like it is necessary to turn an orb, think about creating a hook instead.
Turning an orb around prepares it for the next step in the processing.
All the outputs will be put back into the input list. Unconsumed inputs will stay there as well.
Additionally,
next_phase
can be passed to set the phase field.If a phase is passed and the orb is in
:test
env, the output events of the previous phase will be recorded in therecorded_outputs
field in reversed order. -
consume(orb, event, fun)
Consuming an event type removes all events with that type from the input list, and passes the events to the given
consumer
function as slices.Each slice only contains one event and also the device for that event.
The consumer must return the slice back, and it can use functions of
Orb.Slice
to add events to the orbs output list. -
consume_all(orb, event_type, consumer)
Consumes all events from the given type in the input and passes them to the given function as a list in the first argument, then passes the orb as the second argument.
The function can add events to the orb and must then return the orb.
consume_all/3
should be used with caution, most users will be happy withconsume/3
. -
add(orb, event, payload)
Adds multiple events with the same type to the orb.
-
parse_packet(orb, fun)
Convenience function for consuming packet payloads.
The given function is called with the payload of all packets in the orb's input and a slice. The function can add events to the slice and must then return it.
Example:
parse_packet(orb, fn <<temp::16>>, slice -> Orb.Slice.add_reading(slice, %{temperature: temp / 10}) end)
-
modify_reading(orb, fun)
Convenience function to modify readings.
The given function is called with the reading, and it should return the reading back.
Info
This function will always create one reading for every incoming reading.
If you want to drop some readings, or create multiple readings from one, use the generic
consume/3
function to consume events of the type:reading
.Example:
modify_reading(orb, fn reading -> reading |> Reading.rename(:temp, :temperature) |> Reading.update_value(:temperature, &(&1 - 32) * 5/9) |> Reading.add_value(:converted, true) end)
Slices¶
A slice contains these fields as part of the public API:
event
- the type of the input eventdata
- the input event datadevice
- the device of the event
It also keeps track of:
- any events that were added to the slice
- whether the input event should be kept or dropped. The default behaviour is to drop the event.
- the relationship between events (only tracked when using the testing feature)
Functions of Orb.Slice
¶
The module Orb.Slice
contains functions to add events to the slice:
-
add_reading(slice, data :: %{}, opts :: keyword)
Adds a
:reading
event to the processing pipeline.data
is a flat map with atom keys and numbers, binaries or booleans as values.Available options:
location
- aGeo.Point
struct,{lng, lat}
tuple, or[lng, lat]
list.measured_at
- set to backdate the reading to an earlier date.
Example with all options:
Slice.add_reading( slice, %{ temperature: 25.6, indoor: true }, location: {10.014203339627025, 53.54821087842766}, measured_at: ~U[2024-10-28T13:51:10Z] )
-
put_fields(slice, fields :: %{binary => %{binary => term}})
Adds a
:field
event to the processing pipeline.fields
is a map of maps, containing the fields that should be merged into the fields of the current device.
It also has the functions keep(slice)
and drop(slice)
. The default behaviour when consuming an event is to drop it, expecting the function to return a new event instead.