How to send notifications to SNS, SQS, and EventBridge from AppSync
Use the HTTP data source to send messages directly from AppSync
With the HTTP data source AppSync can interact directly with HTTP-based APIs. All (or all I know of) AWS services provide HTTP APIs exposing all possible operations for the service. This is usually hidden behind the SDKs and the CLI, but because of this you can send an HTTP request signed with the AWS signature algorithm and do everything you can with an SDK.
In this article we'll look into how to send notifications to different services from AppSync. In particular, we'll look into SNS, SQS, and EventBridge. We'll discuss how to configure AppSync to interact with the service and how to construct the HTTP requests and process the responses.
SNS
In this example, we'll have an SNS topic and we'll publish messages there.
resource "aws_sns_topic" "topic" {
}
Permission
First, permissions. AppSync needs an IAM policy to publish messages to a topic:
{
"Effect": "Allow",
"Action": "sns:Publish",
"Resource": "<topic arn>"
}
Data source
Then we need the data source to define the endpoint and the signing options.
data "aws_arn" "sns" {
arn = aws_sns_topic.topic.arn
}
resource "aws_appsync_datasource" "sns" {
api_id = aws_appsync_graphql_api.appsync.id
name = "sns"
service_role_arn = aws_iam_role.appsync.arn
type = "HTTP"
http_config {
endpoint = "https://sns.${data.aws_arn.sns.region}.amazonaws.com"
authorization_config {
authorization_type = "AWS_IAM"
aws_iam_config {
signing_region = data.aws_arn.sns.region
signing_service_name = "sns"
}
}
}
}
The endpoint needs the region of the topic, which is extracted from its ARN. Then the signing_service_name
is sns
.
Resolver
The resolver needs to assemble the request according to the documentation of the service.
{
"version": "2018-05-29",
"method": "POST",
"params": {
"query": {
"Action": "Publish",
"Version": "2010-03-31",
"TopicArn": "$utils.urlEncode("${aws_sns_topic.topic.arn}")",
"Message": "$utils.urlEncode($ctx.args.message)"
},
"body": " ",
"headers": {
"Content-Type" : "application/xml"
},
},
"resourcePath": "/"
}
Notice the "body": " "
part. I encountered a strange error when the body was empty:
<ErrorResponse xmlns=\"http://sns.amazonaws.com/doc/2010-03-31/\">
<Error>
<Type>Sender</Type>
<Code>SignatureDoesNotMatch</Code>
<Message>The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.</Message>
</Error>
<RequestId>60d591cd-d26a-50d9-8b6e-ca987439233d</RequestId>
</ErrorResponse>
It seems like the value of the body is not important, the only requirement is that it's not empty.
Other than that, the TopicArn
and the Message
needs to be URL-encoded. This blog post helped me figure out that part.
Process the response:
#if ($ctx.error)
$util.error($ctx.error.message, $ctx.error.type)
#end
#if ($ctx.result.statusCode < 200 || $ctx.result.statusCode >= 300)
$util.error($ctx.result.body, "StatusCode$ctx.result.statusCode")
#end
$util.toJson($util.xml.toMap($ctx.result.body).PublishResponse.PublishResult.MessageId)
The usual error handling is there, then since the API returns an XML, we need to use the $util.xml.toMap
to convert it to JSON. Then this template gets the
published message's ID.
Testing
Let's send a GraphQL request:
mutation MyMutation {
sns(message: "testmsg")
}
This publishes this event to the topic:
{
"Records": [
{
"EventSource": "aws:sns",
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws:sns:eu-...",
"Sns": {
"Type": "Notification",
"MessageId": "432e191e-3825-5e1b-989f-424b829e3ade",
"TopicArn": "arn:aws:sns:eu-central-...",
"Subject": null,
"Message": "testmsg",
"Timestamp": "2022-07-23T08:59:39.282Z",
"SignatureVersion": "1",
"Signature": "LJXCC...",
"SigningCertUrl": "https://sns.eu-cen...",
"UnsubscribeUrl": "https://sns.eu-cen...",
"MessageAttributes": {}
}
}
]
}
The message is in the Records[0].Sns.Message
.
SQS
In this example, we have an SQS queue.
resource "aws_sqs_queue" "queue" {
}
Permission
Give AppSync permission to send messages to this queue:
{
"Effect": "Allow",
"Action": "sqs:SendMessage",
"Resource": "<queue arn>"
}
Data source
data "aws_arn" "sqs" {
arn = aws_sqs_queue.queue.arn
}
resource "aws_appsync_datasource" "sqs" {
api_id = aws_appsync_graphql_api.appsync.id
name = "sqs"
service_role_arn = aws_iam_role.appsync.arn
type = "HTTP"
http_config {
endpoint = "https://sqs.${data.aws_arn.sqs.region}.amazonaws.com"
authorization_config {
authorization_type = "AWS_IAM"
aws_iam_config {
signing_region = data.aws_arn.sqs.region
signing_service_name = "sqs"
}
}
}
}
No surprises here: the endpoint depends on the region, and the service name is sqs
.
Resolver
The documentation pages for the SQS API:
{
"version": "2018-05-29",
"method": "POST",
"params": {
"body": "Action=SendMessage&MessageBody=$util.urlEncode($ctx.args.message)&Version=2012-11-05",
"headers": {
"Content-Type" : "application/x-www-form-urlencoded"
},
},
"resourcePath": "/${data.aws_arn.sqs.account}/${aws_sqs_queue.queue.name}/"
}
The body contains the form values and the Content-Type
is application/x-www-form-urlencoded
accordingly. Then the resource path contains the account
ID and the queue name.
Process the response:
#if ($ctx.error)
$util.error($ctx.error.message, $ctx.error.type)
#end
#if ($ctx.result.statusCode < 200 || $ctx.result.statusCode >= 300)
$util.error($ctx.result.body, "StatusCode$ctx.result.statusCode")
#end
$util.toJson($util.xml.toMap($ctx.result.body).SendMessageResponse.SendMessageResult.MessageId)
The usual error handling, converting the response to JSON from XML, then extact the message ID.
Testing
The GraphQL request:
mutation MyMutation {
sqs(message: "testmsg")
}
The event generated:
{
"Records": [
{
"messageId": "fbd73eb1-1faa-4dee-a269-53c049f349df",
"receiptHandle": "AQEBeQc+6pE98pqX...",
"body": "testmsg",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1658566842444",
"SenderId": "AROAUB3O2IQ5MJG3Z3LKN:APPSYNC_ASSUME_ROLE",
"ApproximateFirstReceiveTimestamp": "1658566842449"
},
"messageAttributes": {},
"md5OfBody": "bad44ae996b8b7aec1ded43d7cb079b1",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:eu-c...",
"awsRegion": "eu-central-1"
}
]
}
The message is in Records[0].body
.
EventBus
We'll use a custom event bus:
resource "aws_cloudwatch_event_bus" "eventbus" {
name = "eventbus-${random_id.id.hex}"
}
Of course, it's possible to use the default one, just remove the EventBusName
parameter.
Permission
To allow putting events to the bus:
{
"Effect": "Allow",
"Action": "events:PutEvents",
"Resource": "<event bus arn>"
}
Data source
data "aws_arn" "eventbus" {
arn = aws_cloudwatch_event_bus.eventbus.arn
}
resource "aws_appsync_datasource" "eventbus" {
api_id = aws_appsync_graphql_api.appsync.id
name = "eventbus"
service_role_arn = aws_iam_role.appsync.arn
type = "HTTP"
http_config {
endpoint = "https://events.${data.aws_arn.eventbus.region}.amazonaws.com"
authorization_config {
authorization_type = "AWS_IAM"
aws_iam_config {
signing_region = data.aws_arn.eventbus.region
signing_service_name = "events"
}
}
}
}
Resolver
The EventBridge API is much more modern than the previous two and follows the structure of most other AWS APIs.
{
"version": "2018-05-29",
"method": "POST",
"params": {
"headers": {
"Content-Type": "application/x-amz-json-1.1",
"X-Amz-Target": "AWSEvents.PutEvents"
},
"body":$util.toJson({
"Entries":[
{
"Source":"test",
"Detail": $util.toJson({"Message": $ctx.args.message}),
"DetailType":"test",
"EventBusName": "${aws_cloudwatch_event_bus.eventbus.name}"
}
]
}),
},
"resourcePath": "/"
}
The X-Amz-Target
defines the operation, which is AWSEvents.PutEvents
. Then the Content-Type
needs to be application/x-amz-json-1.1
.
Finally, the request body contains the parameters in JSON format.
The response is also in a friendlier format:
#if ($ctx.error)
$util.error($ctx.error.message, $ctx.error.type)
#end
#if ($ctx.result.statusCode < 200 || $ctx.result.statusCode >= 300)
$util.error($ctx.result.body, "StatusCode$ctx.result.statusCode")
#end
#if($util.parseJson($ctx.result.body).FailedEntryCount > 0)
$util.error($ctx.result.body)
#end
$util.toJson($util.parseJson($ctx.result.body).Entries[0].EventId)
Since the response is a JSON there is no need for XML parsing.
Note the third error handling: since the PutEvents operation supports sending multiple events, it can partially succeed. Checking the FailedEntryCount
makes sure any partial failure results in an AppSync error.
Testing
The GraphQL query:
mutation MyMutation {
eventbus(message: "testmsg")
}
And the event JSON:
{
"version": "0",
"id": "b41cd580-ae85-cd2e-401e-57a5151dbc86",
"detail-type": "test",
"source": "test",
"account": "278868411450",
"time": "2022-07-23T09:01:43Z",
"region": "eu-central-1",
"resources": [],
"detail": {
"Message": "testmsg"
}
}