in dth/job.go [473:536]
func (w *Worker) processMessage(ctx context.Context, msg, rh *string) (obj *Object, action int) {
// log.Println("Processing Event Message...")
action = Ignore // Default to ignore
if strings.Contains(*msg, `"s3:TestEvent"`) {
// Once S3 Notification is set up, a TestEvent message will be generated by service.
// Delete the test message
log.Println("Test Event Message received, deleting the message...")
w.sqs.DeleteMessage(ctx, rh)
return
}
// Simply check if msg body contains "eventSource" to determine if it's a event message
// might need to change in the future
if strings.Contains(*msg, `"eventSource":`) {
event := newS3Event(msg)
// log.Println(*event)
// log.Printf("Event is %s", event.Records[0].EventName)
// There shouldn't be more than 1 record in the event message
if len(event.Records) > 1 {
log.Println("Warning - Found event message with more than 1 record, Skipped...")
return
}
if event.Records[0].EventSource != "aws:s3" {
log.Println("Error - Event message from Unknown source, expect S3 event message only")
return
}
log.Printf("Received an event message of %s, start processing...\n", event.Records[0].EventName)
obj = &event.Records[0].S3.Object
obj.Key = unescape(&obj.Key)
seq := getHex(&event.Records[0].S3.Sequencer)
var oldSeq int64 = 0
// Get old sequencer from DynamoDB
item, _ := w.db.QueryItem(ctx, &event.Records[0].S3.Key)
if item != nil {
oldSeq = getHex(&item.Sequencer)
}
// equals might be a retry
if seq < oldSeq {
log.Printf("Old Event, ignored")
action = Ignore
}
if strings.HasPrefix(event.Records[0].EventName, "ObjectRemoved") {
action = Delete
} else if strings.HasPrefix(event.Records[0].EventName, "ObjectCreated") {
action = Transfer
} else {
log.Printf("Unknown S3 Event %s, do nothing", event.Records[0].EventName)
}
} else {
obj = newObject(msg)
action = Transfer
}
return
}