How to send notifications to SNS, SQS, and EventBridge from AppSync

Use the HTTP data source to send messages directly from AppSync

Author's image
Tamás Sallai
5 mins

With the HTTP data source AppSync can interact directly with HTTP-based APIs. All (or all I know of) AWS services provide HTTP APIs exposing all possible operations for the service. This is usually hidden behind the SDKs and the CLI, but because of this you can send an HTTP request signed with the AWS signature algorithm and do everything you can with an SDK.

In this article we'll look into how to send notifications to different services from AppSync. In particular, we'll look into SNS, SQS, and EventBridge. We'll discuss how to configure AppSync to interact with the service and how to construct the HTTP requests and process the responses.

SNS

In this example, we'll have an SNS topic and we'll publish messages there.

resource "aws_sns_topic" "topic" {
}

Permission

First, permissions. AppSync needs an IAM policy to publish messages to a topic:

{
	"Effect": "Allow",
	"Action": "sns:Publish",
	"Resource": "<topic arn>"
}

Data source

Then we need the data source to define the endpoint and the signing options.

data "aws_arn" "sns" {
  arn = aws_sns_topic.topic.arn
}

resource "aws_appsync_datasource" "sns" {
  api_id           = aws_appsync_graphql_api.appsync.id
  name             = "sns"
  service_role_arn = aws_iam_role.appsync.arn
  type             = "HTTP"
	http_config {
		endpoint = "https://sns.${data.aws_arn.sns.region}.amazonaws.com"
		authorization_config {
			authorization_type = "AWS_IAM"
			aws_iam_config {
				signing_region = data.aws_arn.sns.region
				signing_service_name = "sns"
			}
		}
	}
}

The endpoint needs the region of the topic, which is extracted from its ARN. Then the signing_service_name is sns.

Resolver

The resolver needs to assemble the request according to the documentation of the service.

{
	"version": "2018-05-29",
	"method": "POST",
	"params": {
		"query": {
			"Action": "Publish",
			"Version": "2010-03-31",
			"TopicArn": "$utils.urlEncode("${aws_sns_topic.topic.arn}")",
			"Message": "$utils.urlEncode($ctx.args.message)"
		},
		"body": " ",
		"headers": {
			"Content-Type" : "application/xml"
		},
	},
	"resourcePath": "/"
}

Notice the "body": " " part. I encountered a strange error when the body was empty:

<ErrorResponse xmlns=\"http://sns.amazonaws.com/doc/2010-03-31/\">
	<Error>
		<Type>Sender</Type>
		<Code>SignatureDoesNotMatch</Code>
		<Message>The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.</Message>
	</Error>
	<RequestId>60d591cd-d26a-50d9-8b6e-ca987439233d</RequestId>
</ErrorResponse>

It seems like the value of the body is not important, the only requirement is that it's not empty.

Other than that, the TopicArn and the Message needs to be URL-encoded. This blog post helped me figure out that part.

Process the response:

#if ($ctx.error)
	$util.error($ctx.error.message, $ctx.error.type)
#end
#if ($ctx.result.statusCode < 200 || $ctx.result.statusCode >= 300)
	$util.error($ctx.result.body, "StatusCode$ctx.result.statusCode")
#end
$util.toJson($util.xml.toMap($ctx.result.body).PublishResponse.PublishResult.MessageId)

The usual error handling is there, then since the API returns an XML, we need to use the $util.xml.toMap to convert it to JSON. Then this template gets the published message's ID.

Testing

Let's send a GraphQL request:

mutation MyMutation {
  sns(message: "testmsg")
}

This publishes this event to the topic:

{
	"Records": [
		{
			"EventSource": "aws:sns",
			"EventVersion": "1.0",
			"EventSubscriptionArn": "arn:aws:sns:eu-...",
			"Sns": {
				"Type": "Notification",
				"MessageId": "432e191e-3825-5e1b-989f-424b829e3ade",
				"TopicArn": "arn:aws:sns:eu-central-...",
				"Subject": null,
				"Message": "testmsg",
				"Timestamp": "2022-07-23T08:59:39.282Z",
				"SignatureVersion": "1",
				"Signature": "LJXCC...",
				"SigningCertUrl": "https://sns.eu-cen...",
				"UnsubscribeUrl": "https://sns.eu-cen...",
				"MessageAttributes": {}
			}
		}
	]
}

The message is in the Records[0].Sns.Message.

SQS

In this example, we have an SQS queue.

resource "aws_sqs_queue" "queue" {
}

Permission

Give AppSync permission to send messages to this queue:

{
	"Effect": "Allow",
	"Action": "sqs:SendMessage",
	"Resource": "<queue arn>"
}

Data source

data "aws_arn" "sqs" {
  arn = aws_sqs_queue.queue.arn
}

resource "aws_appsync_datasource" "sqs" {
  api_id           = aws_appsync_graphql_api.appsync.id
  name             = "sqs"
  service_role_arn = aws_iam_role.appsync.arn
  type             = "HTTP"
	http_config {
		endpoint = "https://sqs.${data.aws_arn.sqs.region}.amazonaws.com"
		authorization_config {
			authorization_type = "AWS_IAM"
			aws_iam_config {
				signing_region = data.aws_arn.sqs.region
				signing_service_name = "sqs"
			}
		}
	}
}

No surprises here: the endpoint depends on the region, and the service name is sqs.

Resolver

The documentation pages for the SQS API:

{
	"version": "2018-05-29",
	"method": "POST",
	"params": {
		"body": "Action=SendMessage&MessageBody=$util.urlEncode($ctx.args.message)&Version=2012-11-05",
		"headers": {
			"Content-Type" : "application/x-www-form-urlencoded"
		},
	},
	"resourcePath": "/${data.aws_arn.sqs.account}/${aws_sqs_queue.queue.name}/"
}

The body contains the form values and the Content-Type is application/x-www-form-urlencoded accordingly. Then the resource path contains the account ID and the queue name.

Process the response:

#if ($ctx.error)
	$util.error($ctx.error.message, $ctx.error.type)
#end
#if ($ctx.result.statusCode < 200 || $ctx.result.statusCode >= 300)
	$util.error($ctx.result.body, "StatusCode$ctx.result.statusCode")
#end
$util.toJson($util.xml.toMap($ctx.result.body).SendMessageResponse.SendMessageResult.MessageId)

The usual error handling, converting the response to JSON from XML, then extact the message ID.

Testing

The GraphQL request:

mutation MyMutation {
	sqs(message: "testmsg")
}

The event generated:

{
	"Records": [
		{
			"messageId": "fbd73eb1-1faa-4dee-a269-53c049f349df",
			"receiptHandle": "AQEBeQc+6pE98pqX...",
			"body": "testmsg",
			"attributes": {
				"ApproximateReceiveCount": "1",
				"SentTimestamp": "1658566842444",
				"SenderId": "AROAUB3O2IQ5MJG3Z3LKN:APPSYNC_ASSUME_ROLE",
				"ApproximateFirstReceiveTimestamp": "1658566842449"
			},
			"messageAttributes": {},
			"md5OfBody": "bad44ae996b8b7aec1ded43d7cb079b1",
			"eventSource": "aws:sqs",
			"eventSourceARN": "arn:aws:sqs:eu-c...",
			"awsRegion": "eu-central-1"
		}
	]
}

The message is in Records[0].body.

EventBus

We'll use a custom event bus:

resource "aws_cloudwatch_event_bus" "eventbus" {
	name = "eventbus-${random_id.id.hex}"
}

Of course, it's possible to use the default one, just remove the EventBusName parameter.

Permission

To allow putting events to the bus:

{
	"Effect": "Allow",
	"Action": "events:PutEvents",
	"Resource": "<event bus arn>"
}

Data source

data "aws_arn" "eventbus" {
  arn = aws_cloudwatch_event_bus.eventbus.arn
}

resource "aws_appsync_datasource" "eventbus" {
  api_id           = aws_appsync_graphql_api.appsync.id
  name             = "eventbus"
  service_role_arn = aws_iam_role.appsync.arn
  type             = "HTTP"
	http_config {
		endpoint = "https://events.${data.aws_arn.eventbus.region}.amazonaws.com"
		authorization_config {
			authorization_type = "AWS_IAM"
			aws_iam_config {
				signing_region = data.aws_arn.eventbus.region
				signing_service_name = "events"
			}
		}
	}
}

Resolver

The EventBridge API is much more modern than the previous two and follows the structure of most other AWS APIs.

{
	"version": "2018-05-29",
	"method": "POST",
	"params": {
		"headers": {
			"Content-Type": "application/x-amz-json-1.1",
			"X-Amz-Target": "AWSEvents.PutEvents"
		},
		"body":$util.toJson({
			"Entries":[
				{
					"Source":"test",
					"Detail": $util.toJson({"Message": $ctx.args.message}),
					"DetailType":"test",
					"EventBusName": "${aws_cloudwatch_event_bus.eventbus.name}"
				}
			]
		}),
	},
	"resourcePath": "/"
}

The X-Amz-Target defines the operation, which is AWSEvents.PutEvents. Then the Content-Type needs to be application/x-amz-json-1.1. Finally, the request body contains the parameters in JSON format.

The response is also in a friendlier format:

#if ($ctx.error)
	$util.error($ctx.error.message, $ctx.error.type)
#end
#if ($ctx.result.statusCode < 200 || $ctx.result.statusCode >= 300)
	$util.error($ctx.result.body, "StatusCode$ctx.result.statusCode")
#end
#if($util.parseJson($ctx.result.body).FailedEntryCount > 0)
	$util.error($ctx.result.body)
#end
$util.toJson($util.parseJson($ctx.result.body).Entries[0].EventId)

Since the response is a JSON there is no need for XML parsing.

Note the third error handling: since the PutEvents operation supports sending multiple events, it can partially succeed. Checking the FailedEntryCount makes sure any partial failure results in an AppSync error.

Testing

The GraphQL query:

mutation MyMutation {
	eventbus(message: "testmsg")
}

And the event JSON:

{
	"version": "0",
	"id": "b41cd580-ae85-cd2e-401e-57a5151dbc86",
	"detail-type": "test",
	"source": "test",
	"account": "278868411450",
	"time": "2022-07-23T09:01:43Z",
	"region": "eu-central-1",
	"resources": [],
	"detail": {
		"Message": "testmsg"
	}
}
August 30, 2022
In this article