By Robert Bruce – Chief Technology Officer
Here at synvert TCM (formerly Crimson Macaw), we have typically created AWS Lambda functions in Python and used the abstract factory design pattern as a way to handle different AWS event structures. Achieving the same principle within Go requires a different approach. Read on to find out how we handle multiple AWS Lambda event types in Go.
Being able to trigger a lambda function from multiple sources gives great flexibility to deploy our solutions into different configurations depending on our clients’ requirements.
An excellent example of this is a lambda function that handles AWS S3 events; you can do this either by directly triggering from S3 or by sending S3 events to AWS SNS or AWS SQS and then subsequently setting up as an event source to the lambda function.
Expected Data Structures
When S3, SNS or SQS trigger a lambda function, the JSON structure contains a top-level Records
array; however, the structure of each will differ depending on the source.
{
"Records": []
}
Direct From S3
In this setup, we configure an S3 bucket for its events to invoke a Lambda function directly.
When S3 asynchronously invokes the Lambda function, an example of the JSON structure would be:
{
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:s3",
"awsRegion": "eu-west-1",
"eventTime": "2020-04-05T19:37:27.192Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "AWS:AIDAINPONIXQXHT3IKHL2"
},
"requestParameters": {
"sourceIPAddress": "205.255.255.255"
},
"responseElements": {
"x-amz-request-id": "D82B88E5F771F645",
"x-amz-id-2": "vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo=" },
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "828aa6fc-f7b5-4305-8584-487c791949c1",
"bucket": {
"name": "lambda-artifacts-deafc19498e3f2df",
"ownerIdentity": {
"principalId": "A3I5XTEXAMAI3E"
},
"arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df"
},
"object": {
"key": "b21b84d653bb07b05b1e6b33684dc11b",
"size": 1305107,
"eTag": "b21b84d653bb07b05b1e6b33684dc11b",
"sequencer": "0C0F6F405D6ED209E1"
}
}
}
]
}
Referenced from https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html
AWS S3 events via AWS SNS
In this setup, we configure S3 to publish events to an SNS Topic. We add the Lambda function as a subscriber of the SNS Topic.
When events occur on the S3 bucket, it pushes messages to the SNS Topic in the same format as above.
When SNS asynchronously invokes the Lambda function, after S3 has published a message, an example of the JSON structure would be:
{
"Records": [
{ "EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws:sns:us-east-2:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486",
"EventSource": "aws:sns",
"Sns": {
"SignatureVersion": "1",
"Timestamp": "2020-04-05T19:37:27.318Z",
"Signature": "tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==",
"SigningCertUrl": "https://sns.us-east-2.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem",
"MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
"Message": "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"eu-west-1\",\"eventTime\":\"2020-04-05T19:37:27.192Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:AIDAINPONIXQXHT3IKHL2\"},\"requestParameters\":{\"sourceIPAddress\":\"205.255.255.255\"},\"responseElements\":{\"x-amz-request-id\":\"D82B88E5F771F645\",\"x-amz-id-2\":\"vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo=\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"828aa6fc-f7b5-4305-8584-487c791949c1\",\"bucket\":{\"name\":\"lambda-artifacts-deafc19498e3f2df\",\"ownerIdentity\":{\"principalId\":\"A3I5XTEXAMAI3E\"},\"arn\":\"arn:aws:s3:::lambda-artifacts-deafc19498e3f2df\"},\"object\":{\"key\":\"b21b84d653bb07b05b1e6b33684dc11b\",\"size\":1305107,\"eTag\":\"b21b84d653bb07b05b1e6b33684dc11b\",\"sequencer\":\"0C0F6F405D6ED209E1\"}}}]}",
"MessageAttributes": {},
"Type": "Notification",
"UnsubscribeUrl": "https://sns.eu-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486",
"TopicArn":"arn:aws:sns:eu-west-1:123456789012:sns-lambda",
"Subject": "TestInvoke"
}
}
]
}
Referenced from https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html
As you can see, the S3 event is the original JSON structure but encoded as a string in the Message property.
AWS S3 events via AWS SQS
In this setup, S3 was configured to send events to an SQS Queue. The SQS Queue is added as an event source to the Lambda function.
When events occur on the S3 bucket, messages are pushed to the SQS Queue in the same format as above.
When SNS synchronously invokes the Lambda function, after S3 has sent a message, an example of the JSON structure would be:
{
"Records": [
{
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"eu-west-1\",\"eventTime\":\"2020-04-05T19:37:27.192Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:AIDAINPONIXQXHT3IKHL2\"},\"requestParameters\":{\"sourceIPAddress\":\"205.255.255.255\"},\"responseElements\":{\"x-amz-request-id\":\"D82B88E5F771F645\",\"x-amz-id-2\":\"vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo=\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"828aa6fc-f7b5-4305-8584-487c791949c1\",\"bucket\":{\"name\":\"lambda-artifacts-deafc19498e3f2df\",\"ownerIdentity\":{\"principalId\":\"A3I5XTEXAMAI3E\"},\"arn\":\"arn:aws:s3:::lambda-artifacts-deafc19498e3f2df\"},\"object\":{\"key\":\"b21b84d653bb07b05b1e6b33684dc11b\",\"size\":1305107,\"eTag\":\"b21b84d653bb07b05b1e6b33684dc11b\",\"sequencer\":\"0C0F6F405D6ED209E1\"}}}]}",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1586111847318",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "15861118483091"
},
"messageAttributes": {},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:eu-west-1:123456789012:my-queue",
"awsRegion": "eu-west-1"
}
]
}
Referenced from https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
Just like when we use SNS, the S3 event is the original JSON structure but coded as a string in the body of the SQS message.
AWS Lambda in Python
Decoding the variant structures in Python is relatively simple, as the event arrives as a dict
, then using Python classes and **kwargs
on a class’s __init__
method is enough to recursively decode the structure.
An incomplete example:
from abc import ABC
class Record(ABC):
@staticmethod
def factory(record: dict) -> 'Record':
try:
return S3Record(**record)
except (KeyError, ValueError):
pass
try:
return SNSRecord(**record)
except (KeyError, ValueError):
pass
try:
return SQSRecord(**record)
except (KeyError, ValueError):
pass
raise TypeError
class S3Record(Record):
__slots__ = ('event_source',)
def __init__(self, **kwargs):
self.event_source = kwargs.pop('eventSource')
if 'aws:s3' != self.event_source:
raise ValueError
class SNSRecord(Record):
__slots__ = ('event_source',)
def __init__(self, **kwargs):
self.event_source = kwargs.pop('EventSource')
if 'aws:sns' != self.event_source:
raise ValueError
class SQSRecord(Record): __slots__ = ('event_source',)
def __init__(self, **kwargs):
self.event_source = kwargs.pop('eventSource')
if 'aws:sns' != self.event_source:
raise ValueError
AWS Lambda in Go
Using the AWS Lambda for Go library, it is not so simple as the event structure is already pre-decoded using the encoding/json
library internally.
Fortunately, the library already contains the expected data structures as struct types with the json tags needed for decoding. These can be used for much of the leg work, but to handle the variant structure then the Unmarshaller
interface needs to be implemented on the data type used on the lambda handle function.
Setup the main
If you are unfamiliar with writing Lambda functions in Go, then I would recommend reading the AWS Lambda Function Handler in Go documentation.
I create a custom structure data type called Event
, for which I know will always contain, be a dictionary with a field called Records
.
Each Record
will always be about an S3 event, but its source could come from any of the above. To keep the original data, I can store the event source information and if an SNS or SQS payload is received, then save that too within additional fields.
package main import ( "context" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) //Record each data record type Record struct { EventSource string EventSourceArn string AWSRegion string S3 events.S3Entity SQS events.SQSMessage SNS events.SNSEntity } //Event incoming event type Event struct { Records []Record } func handle(ctx context.Context, event Event) { // execute the lambda } func main() { lambda.Start(handle) }
Implement the Unmarshaller interface
The Unmarshaller
interface only requires a single function. Initially, the most apparent type to implement this would be on a Record
, however as an SNS or SQS event could theoretically have multiple S3 records internally then the function must exist on the Event
type.
func (event *Event) UnmarshalJSON(data []byte) error {
}
Detecting the Event Source
Before I decode the data, first, I need to detect the structure type. I add another function on an *Event
to get the event type; our actual implementation of this has much more error handling, but a simplified version would be:
type eventType int
const ( unknownEventType eventType = iota
s3EventType
snsEventType
sqsEventType
)
func (event *Event) getEventType(data []byte) eventType {
temp := make(map[string]interface{})
json.Unmarshal(data, &temp)
recordsList, _ := temp["Records"].([]interface{})
record, _ := recordsList[0].(map[string]interface{})
var eventSource string
if es, ok := record["EventSource"]; ok {
eventSource = es.(string)
} else if es, ok := record["eventSource"]; ok {
eventSource = es.(string)
}
switch eventSource {
case "aws:s3":
return s3EventType
case "aws:sns":
return snsEventType
case "aws:sqs":
return sqsEventType
}
return unknownEventType
}
The first step in the UnmarshalJSON
function is now to call the getEventType
function.
func (event *Event) UnmarshalJSON(data []byte) error {
eType := event.getEventType(data)
}
Mapping the Data
Now that I know the event type, I can safely use the event structures inside the AWS Lambda for Go package as these have json struct tags for decoding the data.
func (event *Event) UnmarshalJSON(data []byte) error {
var err error
switch event.getEventType(data) {
case s3EventType:
s3Event := &events.S3Event{}
err = json.Unmarshal(data, s3Event)
if err == nil {
return event.mapS3EventRecords(s3Event)
}
case snsEventType:
snsEvent := &events.SNSEvent{}
err = json.Unmarshal(data, snsEvent)
if err == nil {
return event.mapSNSEventRecords(snsEvent)
}
case sqsEventType:
sqsEvent := &events.SQSEvent{}
err = json.Unmarshal(data, sqsEvent)
if err == nil {
return event.mapSQSEventRecords(sqsEvent)
}
}
return err
}
In the above, I have referenced some map functions. These take each source event and map each record to our Record
structure.
Mapping S3 Records
The mapS3EventRecords
function is the simplest, each S3EventRecord
can be directly mapped to my Record
structure.
func (event *Event) mapS3EventRecords(s3Event *events.S3Event) error {
event.Records = make([]Record, 0)
for _, s3Record := range s3Event.Records {
event.Records = append(event.Records, Record{
EventSource: s3Record.EventSource,
EventSourceArn: s3Record.S3.Bucket.Arn,
AWSRegion: s3Record.AWSRegion,
S3: s3Record.S3,
})
}
return nil
}
Mapping SNS Records
The mapSNSEventRecords
function requires a little bit extra.
An SNS event does not contain any region information in its payload, by using arn.Parse
function available in the AWS SDK for Go I can extract the region from SNS TopicArn.
Also, I use the json.Unmarshal
function to decode the SNS message to an S3Event. As this, itself holds an array, the overall mapping of a single SNSEventRecord
can produce multiple records.
You may also notice the use of github.com/pkg/errors
package here!
func (event *Event) mapSNSEventRecords(snsEvent *events.SNSEvent) error {
event.Records = make([]Record, 0)
for _, snsRecord := range snsEvent.Records {
// decode sns message to s3 event
s3Event := &events.S3Event{}
err := json.Unmarshal([]byte(snsRecord.SNS.Message), s3Event)
if err != nil {
return errors.Wrap(err, "Failed to decode sns message to an S3 event")
}
if len(s3Event.Records) == 0 {
return errors.New("S3 Event Records is empty")
}
for _, s3Record := range s3Event.Records {
topicArn, err := arn.Parse(snsRecord.SNS.TopicArn)
if err != nil {
return err
}
event.Records = append(event.Records, Record{
EventSource: snsRecord.EventSource,
EventSourceArn: snsRecord.SNS.TopicArn,
AWSRegion: topicArn.Region,
SNS: snsRecord.SNS,
S3: s3Record.S3,
})
}
}
return nil
}
Mapping SQS Records
The mapSQSEventRecords
is similar to mapSNSEventRecords
, except that the region is part of a SQS structure.
func (event *Event) mapSQSEventRecords(sqsEvent *events.SQSEvent) error {
event.Records = make([]Record, 0)
for _, sqsRecord := range sqsEvent.Records {
// decode sqs body to s3 event s3Event := &events.S3Event{}
err := json.Unmarshal([]byte(sqsRecord.Body), s3Event)
if err != nil {
return errors.Wrap(err, "Failed to decode sqs body to an S3 event")
}
if len(s3Event.Records) == 0 {
return errors.New("S3 Event Records is empty")
}
for _, s3Record := range s3Event.Records {
event.Records = append(event.Records,
Record{ EventSource: sqsRecord.EventSource,
EventSourceArn: sqsRecord.EventSourceARN,
AWSRegion: sqsRecord.AWSRegion,
SQS: sqsRecord,
S3: s3Record.S3,
})
}
}
return nil
}
Conclusion
As you can see from the above, by using the internal functionality of Go’s encoding/json
package, it is possible to populate your lambda function’s event type dynamically.
This approach, of course, is expandible for any underlying data type you wish.
Find out more about how we’ve used json to help our clients here.