How to use Topic Rules to react to MQTT messages in AWS IoT Core

Run a Lambda function, store the data in a database, or send a notification on IoT device data

Author's image
Tamás Sallai
4 mins

MQTT messages on the backend

IoT topic rules provide an event-driven mechanism for the cloud to react to incoming MQTT messages. When a device connects to the IoT Core endpoint it can publish messages to topics and can subscribe to changes to these topics. With topic rules, you can run a Lambda function, store the message in a variety of databases, publish a notification to SNS or SQS, among the other possible integrations IoT Core provides. Rules are the link between the MQTT broker and the rest of the cloud backend.

For example, a temperature sensor might periodically update a topic with the current measurement. Then a topic rule can listen to these messages and store them in a DynamoDB table. This is the basic structure of storing measurements in a database.

Or it can run a Lambda function and that opens the door for all sorts of integrations. A new message might trigger an AppSync subscription, send messages to web applications via a WebSocket channel, send an email, or notify a third-party.

A rule has an SQL statement that runs for every publish and when it returns a value the actions associated with the rule will run. These actions then interact with the rest of the AWS ecosystem.

In this article, we'll build a topic rule that calls a Lambda function every time the reported state of a device changes. This builds upon the shadows service, as that defines the reported/desired state distinction. But topic rules are not tied to shadows; they work with any MQTT topics.

Topic rule SQL

A topic rule needs an SQL statement.

SQL statement for the topic rule

The SQL, in a more readable form:

SELECT current, topic(3) as thingName, topic(6) as shadowName
	FROM '$aws/things/+/shadow/name/+/update/documents'
	WHERE isNull(previous) OR previous.state.reported.value <> current.state.reported.value

This mixes MQTT and SQL a bit. Let's break it down!

The FROM part defines an MQTT topic filter. Here, the + defines a one-level wildcard, meaning it can be any string except the topic delimiter (/). Since the shadow topic is $aws/things/<thingname>/shadow/name<shadowname>/update/documents, this statement matches all shadow updates (second wildcard) for all things (first wildcard).

The SELECT defines how to transform the message. The current is the current value of the shadow, as defined by the shadows service. The topic() function returns the specified level from the matched topic. So topic(3) is the third (1-indexed) level, which is the thing's name. Then topic(6) is the sixth level, which is the shadow's name. It is a best practice to extract the variable parts of the topic name as that usually helps with writing reusable expressions.

The WHERE defines when to match the topic rule. The previous and the current are objects in the shadow and we are interested in the reported.value. The above expression matches when there is no previous value (isNull(previous)) that happens for the first message to the shadow, and when the reported value is changed (previous.state.reported.value <> current.state.reported.value).

Actions

The actions define what to do when a message matches the filter.

Lambda action for the topic rule

In this case, the action is to call a Lambda function.

As usual in AWS, we need to give the topic rule permission to invoke the Lambda. This is done via a resource-based policy attached to the function that grants the lambda:InvokeFunction to the iot.amazonaws.com service.

Permission for the Lambda function

The policy JSON:

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Effect": "Allow",
			"Principal": {
				"Service": "iot.amazonaws.com"
			},
			"Action": "lambda:InvokeFunction",
			"Resource": "<lambda arn>",
			"Condition": {
				"ArnLike": {
					"AWS:SourceArn": "<rule arn>"
				}
			}
		}
	]
}

The Condition element specifies the topic rule. It's a best practice to specify that as well, so that only the intended rule gains access to the function.

Lambda

The Lambda function is called with the result of the SQL, so in our case it will have a current, a thingName, and a shadowName properties in the event object:

export const handler = async (event) => {
	const {thingName, shadowName, current} = event;
	// ...
};

Considerations

Topic rules incur costs. This is not surprising, but it seems like there is a distinction between "message processed" and "actions executed". I couldn't find any evidence, but the pricing page suggests that when the topic filter matches a message then the rule will be executed no matter if the WHERE clause then discards the event. This means even if a topic rule does nothing (not executing any actions) it still costs money.

Also, if an action triggers an MQTT message it's easy to end up with an infinite loop. For example, if a shadow update triggers a rule that calls a Lambda that then updates the shadow can end up running indefinitely. Make sure you have a filter that breaks the circuit.

February 21, 2023
In this article