How to use Topic Rules to react to MQTT messages in AWS IoT Core
Run a Lambda function, store the data in a database, or send a notification on IoT device data
MQTT messages on the backend
IoT topic rules provide an event-driven mechanism for the cloud to react to incoming MQTT messages. When a device connects to the IoT Core endpoint it can publish messages to topics and can subscribe to changes to these topics. With topic rules, you can run a Lambda function, store the message in a variety of databases, publish a notification to SNS or SQS, among the other possible integrations IoT Core provides. Rules are the link between the MQTT broker and the rest of the cloud backend.
For example, a temperature sensor might periodically update a topic with the current measurement. Then a topic rule can listen to these messages and store them in a DynamoDB table. This is the basic structure of storing measurements in a database.
Or it can run a Lambda function and that opens the door for all sorts of integrations. A new message might trigger an AppSync subscription, send messages to web applications via a WebSocket channel, send an email, or notify a third-party.
A rule has an SQL statement that runs for every publish and when it returns a value the actions associated with the rule will run. These actions then interact with the rest of the AWS ecosystem.
In this article, we'll build a topic rule that calls a Lambda function every time the reported state of a device changes. This builds upon the shadows service, as that defines the reported/desired state distinction. But topic rules are not tied to shadows; they work with any MQTT topics.
Topic rule SQL
A topic rule needs an SQL statement.
The SQL, in a more readable form:
SELECT current, topic(3) as thingName, topic(6) as shadowName
FROM '$aws/things/+/shadow/name/+/update/documents'
WHERE isNull(previous) OR previous.state.reported.value <> current.state.reported.value
This mixes MQTT and SQL a bit. Let's break it down!
The FROM
part defines an MQTT topic filter. Here, the +
defines a one-level
wildcard, meaning it can be any string except the topic delimiter (/
).
Since the shadow topic is $aws/things/<thingname>/shadow/name<shadowname>/update/documents
, this statement matches all shadow updates (second wildcard)
for all things (first wildcard).
The SELECT
defines how to transform the message. The current
is the current value of the shadow, as defined by the shadows service. The
topic()
function returns the specified level from the
matched topic. So topic(3)
is the third (1-indexed) level, which is the thing's name. Then topic(6)
is the sixth level, which is the shadow's name.
It is a best practice to extract the variable parts of the topic name as that usually helps with writing reusable expressions.
The WHERE
defines when to match the topic rule. The previous
and the current
are objects in the shadow and we are interested in the
reported.value
. The above expression matches when there is no previous value (isNull(previous)
) that happens for the first message to the shadow,
and when the reported value is changed (previous.state.reported.value <> current.state.reported.value
).
Actions
The actions define what to do when a message matches the filter.
In this case, the action is to call a Lambda function.
As usual in AWS, we need to give the topic rule permission to invoke the Lambda. This is done via a resource-based policy attached to the function that grants
the lambda:InvokeFunction
to the iot.amazonaws.com
service.
The policy JSON:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "iot.amazonaws.com"
},
"Action": "lambda:InvokeFunction",
"Resource": "<lambda arn>",
"Condition": {
"ArnLike": {
"AWS:SourceArn": "<rule arn>"
}
}
}
]
}
The Condition
element specifies the topic rule. It's a best practice to specify that as well, so that only the intended rule gains access to the function.
Lambda
The Lambda function is called with the result of the SQL, so in our case it will have a current
, a thingName
, and a shadowName
properties in
the event
object:
export const handler = async (event) => {
const {thingName, shadowName, current} = event;
// ...
};
Considerations
Topic rules incur costs. This is not surprising, but it seems like there is a distinction between "message processed" and "actions executed". I couldn't find
any evidence, but the pricing page suggests that when the topic filter matches a message then the rule will be
executed no matter if the WHERE
clause then discards the event. This means even if a topic rule does nothing (not executing any actions) it still costs
money.
Also, if an action triggers an MQTT message it's easy to end up with an infinite loop. For example, if a shadow update triggers a rule that calls a Lambda that then updates the shadow can end up running indefinitely. Make sure you have a filter that breaks the circuit.