Using DynamoDB to track changes to DynamoDB

Create a table with running diffs for DynamoDB

Dan Moore
Sep 24th, 2019
Share

DynamoDB is a fast, serverless NoSQL database. It is a great place to store persistent data that your Transposit application might need. You have access to all the DynamoDB operations.

I was recently working on an application that had a main table and needed an audit log of changes to this origin table. I could have mananged this in the application code, but I knew that DynamoDB has streams and thought it’d be easy enough to capture the streams output into another DynamoDB table. This would allow users to search changes to the origin table. You could also control access to the tables so that people and applications that have access to one table can’t get to the other.

I found this handy AWS tutorial which sends emails when a new item is added to a DynamoDB table. It sets up a lambda function that pushes to an SNS topic. I modified it to suit my needs and update a table instead. We will capture the entire event, the old and new data, and a diff of the data (to make it easy to see what changed).

Before you start

You’ll want to know your account number (I’ll use the example 999) and your AWS region (I’ll use us-west-2), but make sure you replace these values in the JSON and scripts below.

You want to create a DynamoDB table (I’ll call it orders through this post). Then, you want to enable streams on orders; set it to send both new and old images. The orders table will have a primary key of order_id.

You’ll also want to create a table called audit. It will have a partition key of order_id and a sort key of timestamp.

You also want to make sure that you have sufficient permissions to:

  • create and modify lambdas
  • view cloudwatch logs
  • create and modify IAM roles (you can remove this after creating the roles)
  • create and modify DynamoDB tables

You will want to set up an AWS profile since we are going to execute everything from the command line. I’ll call that profile audit-profile.

All scripts are assumed to be executing in the same directory as the JSON files.

Set up roles

I highly suggest you review the AWS tutorial I worked off of. Go ahead, I’ll wait.

First, you want to set up a trust relationship that allows Lambda to assume a role, and then the actual role that allows access to needed resources.

The trust relationship:

{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Principal": {
 "Service": "lambda.amazonaws.com"
 },
 "Action": "sts:AssumeRole"
 }
 ]
 }

The command to run it:

aws --profile audit-profile iam create-role --role-name AuditLambdaRole \
 --path "/service-role/" \
 --assume-role-policy-document file://trust-relationship.json

The role policy:

{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Action": "lambda:InvokeFunction",
 "Resource": "arn:aws:lambda:us-west-2:999:function:publishAuditChange*"
 },
 {
 "Effect": "Allow",
 "Action": [
 "logs:CreateLogGroup",
 "logs:CreateLogStream",
 "logs:PutLogEvents"
 ],
 "Resource": "arn:aws:logs:us-west-2:999:*"
 },
 {
 "Effect": "Allow",
 "Action": [
 "dynamodb:DescribeStream",
 "dynamodb:GetRecords",
 "dynamodb:GetShardIterator",
 "dynamodb:ListStreams"
 ],
 "Resource": "arn:aws:dynamodb:us-west-2:999:table/orders/stream/*"
 },
 {
 "Effect": "Allow",
 "Action": [
 "dynamodb:*"
 ],
 "Resource": "arn:aws:dynamodb:us-west-2:999:table/audit/*"
 },
 {
 "Effect": "Allow",
 "Action": [
 "dynamodb:*"
 ],
 "Resource": "arn:aws:dynamodb:us-west-2:999:table/audit"
 }
 ]
}

The command to run it:

aws --profile audit-profile iam put-role-policy --role-name AuditLambdaRole \
 --policy-name AuditLambdaRolePolicy \
 --policy-document file://role-policy.json

Set up the Lambda

The lambda function will get all the records from the orders table and store them in the audit table. Again, we’ll capture the entire object, the new and old data, and any changes (if this is a data update).

'use strict';
var AWS = require("aws-sdk");
AWS.config.update({
 region: 'us-west-2'
});

var ddb = new AWS.DynamoDB({
 apiVersion: '2012-08-10'
});

exports.handler = (event, context, callback) => {
 var timestamp = Math.floor(new Date().getTime() / 1000);
 var table_name = 'audit';

 event.Records.forEach((record) => {
 console.log('Stream record: ', JSON.stringify(record, null, 2));
 var orderID = ""+record.dynamodb.Keys.order_id.N;
 var item = {
 'order_id': {
 N: orderID
 },
 'Timestamp': {
 N: timestamp + ""
 },
 'FullJSON': {
 S: JSON.stringify(record, null, 2)
 }
 }
 if (record.eventName == 'INSERT') {
 item['NewData'] = {
 S: JSON.stringify(record.dynamodb.NewImage, null, 2)
 };
 }
 if (record.eventName == 'REMOVE') {
 item['OldData'] = {
 S: JSON.stringify(record.dynamodb.OldImage, null, 2)
 };
 }
 if (record.eventName == 'MODIFY') {
 item['NewData'] = {
 S: JSON.stringify(record.dynamodb.NewImage, null, 2)
 };
 item['OldData'] = {
 S: JSON.stringify(record.dynamodb.OldImage, null, 2)
 };
 // items in o2 that are different than o1
 var o2 = record.dynamodb.NewImage;
 var o1 = record.dynamodb.OldImage;
 var rawDiff = objectDifference(o1, o2);
 var diff = {};
 Object.keys(rawDiff).forEach(key => {
 var valN = rawDiff[key].N;
 var valS = rawDiff[key].S;
 var valBOOL = rawDiff[key].BOOL;

 var val = valN || valS || valBOOL;
 if (!val) {
 //insert of previously missing value
 diff[key] = rawDiff[key];
 } else if (val.type != 'unchanged') {
 //change of value
 diff[key] = rawDiff[key];
 }
 });

 item['DiffContainingOldValue'] = {
 S: JSON.stringify(diff, null, 2)
 };
 }

 var params = {
 TableName: table_name,
 Item: item
 };

 // Call DynamoDB to add the item to the table
 ddb.putItem(params, function(err, data) {
 if (err) {
 console.log("Error", err);
 } else {
 console.log("Success", data);
 }
 });

 });
 callback(null, `Successfully processed ${event.Records.length} records.`);
};

// from https://stackoverflow.com/questions/8572826/generic-deep-diff-between-two-objects
function objectDifference(obj1, obj2) {
 if ((dataType(obj1) !== 'array' && dataType(obj1) !== 'object') || (dataType(obj2) !== 'array' && dataType(obj2) !== 'object')) {
 var type = '';

 if (obj1 === obj2 || (dataType(obj1) === 'date' && dataType(obj2) === 'date' && obj1.getTime() === obj2.getTime()))
 type = 'unchanged';
 else if (dataType(obj1) === 'undefined')
 type = 'created';
 if (dataType(obj2) === 'undefined')
 type = 'deleted';
 else if (type === '') type = 'updated';

 return {
 type: type,
 data: (obj1 === undefined) ? obj2 : obj1
 };
 }

 if (dataType(obj1) === 'array' && dataType(obj2) === 'array') {
 var diff = [];
 obj1.sort();
 obj2.sort();
 for (var i = 0; i < obj2.length; i++) {
 var type = obj1.indexOf(obj2[i]) === -1 ? 'created' : 'unchanged';
 if (type === 'created' && (dataType(obj2[i]) === 'array' || dataType(obj2[i]) === 'object')) {
 diff.push(
 objectDifference(obj1[i], obj2[i])
 );
 continue;
 }
 diff.push({
 type: type,
 data: obj2[i]
 });
 }

 for (var i = 0; i < obj1.length; i++) {
 if (obj2.indexOf(obj1[i]) !== -1 || dataType(obj1[i]) === 'array' || dataType(obj1[i]) === 'object')
 continue;
 diff.push({
 type: 'deleted',
 data: obj1[i]
 });
 }
 } else {
 var diff = {};
 var key = Object.keys(obj1);
 for (var i = 0; i < key.length; i++) {
 var value2 = undefined;
 if (dataType(obj2[key[i]]) !== 'undefined')
 value2 = obj2[key[i]];

 diff[key[i]] = objectDifference(obj1[key[i]], value2);
 }

 var key = Object.keys(obj2);
 for (var i = 0; i < key.length; i++) {
 if (dataType(diff[key[i]]) !== 'undefined')
 continue;

 diff[key[i]] = objectDifference(undefined, obj2[key[i]]);
 }
 }

 return diff;
}

function dataType(data) {
 if (data === undefined || data === null) return 'undefined';
 if (data.constructor === String) return 'string';
 if (data.constructor === Array) return 'array';
 if (data.constructor === Object) return 'object';
 if (data.constructor === Number) return 'number';
 if (data.constructor === Boolean) return 'boolean';
 if (data.constructor === Function) return 'function';
 if (data.constructor === Date) return 'date';
 if (data.constructor === RegExp) return 'regex';
 return 'unknown';
}

Some interesting bits of code:

  • You’ll have to change the item definition to make sure you have your keys.
  • DynamoDB wants everything to be a string when passed to it, which is why there is code like var orderID = ""+record.dynamodb.Keys.order_id.N;
  • We have a diff function that I pulled from Stackoverflow (thanks SO!) but it shows all the keys of the object including those that were unchanged. Therefore, I added the iterator over the rawDiff: Object.keys(rawDiff).forEach(key => { ..., which stores off only things that have changed. This code is not guaranteed to work with nested objects, as the table I tested it on only had scalars as values.

Here’s the code to publish it. Note that I actually delete the lambda function before creating it each time because I didn’t want to figure out the differing semantics of a code update. (This means the first time you run it, you’ll see An error occurred (ResourceNotFoundException) when calling the DeleteFunction operation message.)

zip publishAuditChange.zip publishAuditChange.js

role_arn=`aws --profile audit-profile iam get-role --role-name AuditLambdaRole|grep Arn|sed 's/.*: //'|sed 's/"//g'`

aws --profile audit-profile lambda delete-function \
 --region us-west-2 \
 --function-name publishAuditChange

sleep 2

aws --profile audit-profile lambda create-function \
 --region us-west-2 \
 --function-name publishAuditChange \
 --zip-file fileb://publishAuditChange.zip \
 --role $role_arn \
 --handler publishAuditChange.handler \
 --timeout 5 \
 --runtime nodejs10.x

You can now test that this lambda accepts data and writes it correctly by using this sample JSON:

{
 "Records": [
 {
 "eventID": "7de3041dd709b024af6f29e4fa13d34c",
 "eventName": "INSERT",
 "eventVersion": "1.1",
 "eventSource": "aws:dynamodb",
 "awsRegion": "us-west-2",
 "dynamodb": {
 "ApproximateCreationDateTime": 1479499740,
 "Keys": {
 "order_id": {
 "N": "123"
 }
 },
 "NewImage": {
 "Timestamp": {
 "S": "2016-11-18:12:09:36"
 },
 "Message": {
 "S": "This is a bark from the Woofer social network"
 },
 "Count": {
 "N": "1"
 },
 "Username": {
 "S": "John Doe"
 }
 },
 "OldImage": {
 "Timestamp": {
 "S": "2016-11-18:12:09:36"
 },
 "Message": {
 "S": "This is an old bark from the Woofer social network"
 },
 "Count": {
 "N": "0"
 },
 "Username": {
 "S": "John Doe"
 }
 },
 "SequenceNumber": "13021600000000001596893679",
 "SizeBytes": 112,
 "StreamViewType": "NEW_AND_OLD_IMAGES"
 },
 "eventSourceARN": "arn:aws:dynamodb:us-west-2:999:table/orders/stream/2016-11-16T20:42:48.104"
 }
 ]
}

and this script:

aws --region us-west-2 --profile audit-profile lambda invoke --function-name publishAuditChange --payload file://payload.json output.txt

You should see a 200 response. If not, head to CloudWatch Logs and do some debugging.

Attach the trigger

aws --profile audit-profile lambda create-event-source-mapping \
 --region us-west-2 \
 --function-name publishAuditChange \
 --event-source arn:aws:dynamodb:us-west-2:999:table/orders/stream/2019-09-19T18:12:59.448 \
 --batch-size 1 \
 --starting-position TRIM_HORIZON

Find the event source ARN from the DynamoDB page. Put whatever is there, including the timestamp.

The big moment

Go ahead and change something in the orders table. You should see the change reflected in the audit table. It may take a bit of time (the first time I ran it, I think it took about tens seconds), but after the initial audit table insert, you should see changes in less than a second.

Watch for this

Depending on how much data you are seeing, you may want to adjust your capacity settings. Also, this may get expensive depending on the volume of data, so you may want to look at archiving to S3 based on a TTL.

Conclusion

This setup lets you easily capture a changelog of your DynamoDB table with no maintenance burden. Users with access to the audit table can easily look up an order from the orders table and see how it has changed over time.

Learn more from our glossary

Share