How to use DynamoDB with AppSync

How to store, retrieve, and query items in a DynamoDB table with an AppSync resolver

Author's image
Tamás Sallai
12 mins

AppSync natively supports reading and writing data from/to DynamoDB tables. This is done through the DynamoDB data source that defines the data structure for the resolvers for each operation. It supports all operations DynamoDB supports, which means you can implement anything database-related natively in resolvers.

In this article we're going to look into a simple data model that stores data in DynamoDB tables. We'll see how to do various operations on the items and how to integrate that with AppSync.

Example data model

In this article, we're going to implement a simple GraphQL API with two tables: users and groups. Both of them have an id and a name and users belong to groups.

We'll implement a query to get back a group based on its ID, and mutations to add groups and users.

The schema:

type User {
	id: ID!
	name: String!
}

type PaginatedUsers {
	users: [User!]!
	nextToken: String
}

type Group {
	id: ID!
	name: String!
	users(count: Int, nextToken: String): PaginatedUsers!
}

type Query {
	groupById(id: String!): Group
}

type Mutation {
	addGroup(name: String!): Group!
	addUser(name: String!, groupId: ID!): ID!
}

schema {
	query: Query
	mutation: Mutation
}

Data source

First, we need the tables:

Then we'll define a data source for each of them:

The first part is straightforward, as it defines the data source as an "Amazon DynamoDB table", and locates it. AppSync uses an IAM Role to gain the necessary permissions to access these tables, so we need to create the role and add the permissions:

This is all we need to start using these DynamoDB tables in AppSync resolvers.

Operations

AppSync supports the full feature set of DynamoDB. You can issue all basic operations (CRUD items, queries, scans, transactions, batching), use conditions, expressions, and specify the types, just like with any other DynamoDB client.

Let's see the most commonly used operations and how to implement them for our data model!

Getting items

Let's start with the entry point to the object graph, the groupById query:

type Query {
	groupById(id: String!): Group
}

This gets an id argument and returns a Group. A query and a response:

query MyQuery {
	groupById(id: "group1") {
		id
		name
	}
}
{
	"data": {
		"groupById": {
			"id": "group1",
			"name": "Group 1"
		}
	}
}

To implement this, we need a resolver that sends a GetItem to the groups table:

{
	"version" : "2018-05-29",
	"operation" : "GetItem",
	"key" : {
		"id" : {"S": $util.toJson($ctx.args.id)}
	},
	"consistentRead" : true
}

This is the structure that most GetItems follow. The $ctx.args.id is the query argument, then the $util.toJson converts it to a String in a safe way. The key specifies what element to get, and the consistentRead defines a strongly consistent read.

The response mapping:

#if($ctx.error)
	$util.error($ctx.error.message, $ctx.error.type)
#end
$util.toJson($ctx.result)

Here, AppSync first needs to check if there was an error during the GetItem call. By default, resolvers don't terminate if there was an error in the underlying data source (as of the 2018-05-29 version) but leave that to the response mapping template. So the template needs to check the $ctx.error and throw an error using $util.error.

Queries

Queries in DynamoDB allows returning a list of items when a table or index has a composite key. The query needs to define the hash key in full, then it can define filters and sorting for the range key.

In our data model, a group can contain a list of users. This is a field in the Group type and that resolves to a list of users:

type Group {
	users(count: Int, nextToken: String): PaginatedUsers!
}

Since the users table has the user id as its key, we need an index to be efficiently query users in a group:

AppSync can then query this index to get back the list of all users for a given group. This operation scales, as it does not matter how many groups the database has, only how many users are in a given group (as opposed to a Scan operation, that would get slower with more groups).

Pagination

Pagination is a central concept in DynamoDB as every operation that returns a list is not guaranteed to return all elements in one go. The exact number of elements is determined by the size of each item DynamoDB needs to read, which means you can't make assumptions of the returned number. For small items, DynamoDB returns maybe hundreds, but if at a later point you start adding new attributes this number can drop. Moreover, filtering is run after the items are fetched by the database, so it can happen that a query returns 0 results for a page. Because of this, always assume that a Query might return partial results.

DynamoDB implements marker-based pagination. This means when it returns a partial response it returns a marker (called nextToken). When you want the next page, run the same query and pass the nextToken from the previous page.

AppSync has a limitation that using the DynamoDB data source you can only send a predefined number of requests to the database. One resolver equals one query, and you can't define a loop that fetches all pages for a query. On the other hand, AppSync has hard limits for the response size and time. Because of this, it is not suited to returns a list of items that can go to large numbers.

It's a best practice to expose DynamoDB pagination to the clients. It even has a named design pattern in GraphQL: the connection pattern.

To implement this in AppSync, the field for the list should include a nextToken both in the arguments and the response:

type PaginatedUsers {
	users: [User!]!
	nextToken: String
}

type Group {
	users(count: Int, nextToken: String): PaginatedUsers!
}

With this, the client can send a request, get back a page, then send the next request with the token, and so on, until there is no token in the response. Concatenating the results gives the full list.

Also, it's a best practice to add a limit argument too. But keep in mind the usual restrictions of DynamoDB queries:

  • The client needs to adjust the limit according to how many items it already got
  • Limit = 1 does not mean there is no pagination

Resolver

After all this background, let's see how to implement the AppSync resolver for the Group.users field!

Since it is a field of the Group type, the $ctx.source is the Group object, so $ctx.source.id is the group id. Also, since the field gets a count and the nextToken arguments, these are accessible under $ctx.args.

To send the query to the index:

{
	"version" : "2018-05-29",
	"operation" : "Query",
	"index": "groupId",
	"query": {
		"expression" : "#groupId = :groupId",
		"expressionNames": {
			"#groupId": "groupId"
		},
		"expressionValues" : {
			":groupId" : {"S": $util.toJson($ctx.source.id)}
		}
	}
	#if($ctx.args.count)
		,"limit": $util.toJson($ctx.args.count)
	#end
	#if($ctx.args.nextToken)
		,"nextToken": $util.toJson($ctx.args.nextToken)
	#end
}

It defines quite a few things:

  • operation is Query so it sends a Query to a DynamoDB table
  • index is groupId so it queries an index and not a table
  • The expressions specifies the partition key of the items
  • ":groupId" : {"S": $util.toJson($ctx.source.id)} defines the group's id
  • If there is a $ctx.args.count then specify the limit
  • If there is a $ctx.args.nextToken then specify the nextToken

The response gets what DynamoDB returns and transforms it to the structure of the PaginatedUsers:

#if($ctx.error)
	$util.error($ctx.error.message, $ctx.error.type)
#end
{
	"users": $utils.toJson($ctx.result.items)
	#if($ctx.result.nextToken)
		,"nextToken": $util.toJson($ctx.result.nextToken)
	#end
}

As usual, it check for errors first. Then it converts the $ctx.result.items to User objects, then attaches a nextToken if DynamoDB returned one.

Testing

Finally, let's see how the above query works!

To get back users, we need a group, obtained with groupById, then fetch the users field:

query MyQuery {
	groupById(id: "group1") {
		id
		name
		users {
			users {
				id
				name
			}
			nextToken
		}
	}
}

Result:

{
	"data": {
		"groupById": {
			"id": "group1",
			"name": "Group 1",
			"users": {
				"users": [
					{
						"id": "user1",
						"name": "User 1"
					},
					{
						"id": "user2",
						"name": "User 2"
					}
				],
				"nextToken": null
			}
		}
	}
}

To simulate pagination, let's add a count argument to the field:

query MyQuery {
	groupById(id: "group1") {
		id
		name
		users(count: 1) {
			users {
				id
				name
			}
			nextToken
		}
	}
}

The result contains only one user, and a token:

{
	"data": {
		"groupById": {
			"id": "group1",
			"name": "Group 1",
			"users": {
				"users": [
					{
						"id": "user1",
						"name": "User 1"
					}
				],
				"nextToken": "eyJ2ZX..."
			}
		}
	}
}

To get the next page, add the nextToken to the field:

query MyQuery {
	groupById(id: "group1") {
		id
		name
		users(count: 1, nextToken: "eyJ2ZX...") {
			users {
				id
				name
			}
			nextToken
		}
	}
}
{
	"data": {
		"groupById": {
			"id": "group1",
			"name": "Group 1",
			"users": {
				"users": [
					{
						"id": "user2",
						"name": "User 2"
					}
				],
				"nextToken": "yLCJ0...."
			}
		}
	}
}

Notice that there is another nextToken in the response, which means there might be more results.

query MyQuery {
	groupById(id: "group1") {
		id
		name
		users(count: 1, nextToken: "yLCJ0....") {
			users {
				id
				name
			}
			nextToken
		}
	}
}
{
	"data": {
		"groupById": {
			"id": "group1",
			"name": "Group 1",
			"users": {
				"users": [],
				"nextToken": null
			}
		}
	}
}

No more users, and no nextToken either. That means we got all the results.

Storing items

To add groups we need to implement a mutation that calls the putItem operation:

type Mutation {
	addGroup(name: String!): Group!
}

schema {
	query: Query
	mutation: Mutation
}

This gets a name argument that will be the name of the group, and it returns a Group object. Since groups need an id and the mutation does not get one, AppSync needs to generate it. This is done using the $util.autoId() function:

{
	"version" : "2018-05-29",
	"operation" : "PutItem",
	"key" : {
		"id" : {"S": "$util.autoId()"}
	},
	"attributeValues": {
		"name": {"S": $util.toJson($ctx.args.name)}
	}
}

This defines the id and the name attributes for the new item. The former is generated by AppSync, while the latter comes from the mutation arguments.

The response needs to check for errors, then return the item:

#if($ctx.error)
	$util.error($ctx.error.message, $ctx.error.type)
#end
$util.toJson($ctx.result)

Why does it return a Group object?

The DynamoDB PutItem returns the object that is inserted into the database, converted to Javascript types (which means instead of {"name": {"S": "group1"}}, it will be {"name": "group1"}) and that will be the value of $ctx.result. Because of this, there is no need to refetch the item.

Retrying

One important thing is missing here: retrying failed operations. DynamoDB might fail to insert the item for all sorts of reasons, and AppSync won't try again after a failure but return an error to the client.

This is called fail-fast behavior and it's the preferred way to handle errors. But because of this, the clients need to implement a retry strategy that automatically sends the same request if there is a failure.

Testing

To test it, send a mutation that adds a new group:

mutation MyMutation {
	addGroup(name: "group3") {
		id
		name
	}
}

The response, if everything went fine:

{
	"data": {
		"addGroup": {
			"id": "254603e9-8a17-4a3a-8eea-4d4b4d2a4f69",
			"name": "group3"
		}
	}
}

Transactions

Transactions are the primary way to implement cross-item (and cross-table) consistency guarantees in DynamoDB. A transaction is an all-or-nothing set of operations.

We'll need a transaction to insert a new user as DynamoDB needs to check that the group exists. The mutation needs a name for the user and the group's id:

type Mutation {
	addUser(name: String!, groupId: ID!): ID!
}

Transactions are made of transaction items. Each item is an operation that adds (PutItem), deletes (DeleteItem), updates (UpdateItem), or checks a condition (ConditionCheck) for an item. In our case, we need to add a user (PutItem) and also check the existence of the group (ConditionCheck).

{
	"version": "2018-05-29",
	"operation": "TransactWriteItems",
	"transactItems": [
		{
			"table": "user-13b359215bd4d5fb",
			"operation": "PutItem",
			"key": {
				"id" : {"S": "$util.autoId()"}
			},
			"attributeValues": {
				"name": {"S": $util.toJson($ctx.args.name)},
				"groupId": {"S": $util.toJson($ctx.args.groupId)}
			}
		},
		{
			"table": "group-13b359215bd4d5fb",
			"operation": "ConditionCheck",
			"key":{
				"id": {"S": $util.toJson($ctx.args.groupId)}
			},
			"condition":{
				"expression": "attribute_exists(#pk)",
				"expressionNames": {
					"#pk": "id"
				}
			}
		}
	]
}

The $util.autoId() generates a new UUID for the user, and the $ctx.args contains the name and the groupId specified in the arguments. Then the attribute_exists(#pk) checks the existence of the group item.

The global DynamoDB limit of at most 25 items in a single transaction still applies. While it's rare to hit that limit, it's good to be aware of it.

The response mapping template checks if there was an error, and extracts the id from the response:

#if($ctx.error)
	$util.error($ctx.error.message, $ctx.error.type)
#end
$util.toJson($ctx.result.keys[0].id)

Data source

The DynamoDB data source defines the table, but transactions can have operations targeting different tables. That means each transactItem has a table attribute that specifies what table the operation is sent to. Because of this, the table for the data source does not matter.

What matters though is the IAM role of the data source. AppSync uses that role to send the transaction, so it needs the necessary permissions for all of the involved tables.

Return type

You might have noticed that the addUser returns the ID of the new user and not the full User object. This is because DynamoDB returns only the ID and not the full object, so AppSync can not resolve the fields that need other properties of the source object (such as the name).

It is possible to return the full object but that needs a second resolver that reads the item from DynamoDB and returns that. It's possible with a pipeline resolver.

Testing

To add a new user:

mutation MyMutation {
	addUser(groupId: "group1", name: "user5")
}

It returns the ID of the user:

{
	"data": {
		"addUser": "3174153b-cbcc-468c-ad22-67267c8d8fbd"
	}
}

If the group does not exist it returns an error:

mutation MyMutation {
	addUser(groupId: "none", name: "user5")
}
{
	"data": null,
	"errors": [
		{
			"path": [
				"addUser"
			],
			"data": null,
			"errorType": "DynamoDB:TransactionCanceledException",
			"errorInfo": null,
			"locations": [
				{
					"line": 2,
					"column": 3,
					"sourceName": null
				}
			],
			"message": "Transaction cancelled, please refer cancellation reasons for specific reasons [None, ConditionalCheckFailed] (Service: DynamoDb, Status Code: 400, Request ID: D82EOU9B0DM440THDU9U8L0H7FVV4KQNSO5AEMVJF66Q9ASUAAJG, Extended Request ID: null)"
		}
	]
}

Conclusion

The DynamoDB data source allows an AppSync API to directly interface with tables in the AWS account. You can define resolvers for queries, fields, and mutations that create/retrieve/update/delete items, even across multiple tables.

December 7, 2021
In this article