func()

in dth/job.go [473:536]


func (w *Worker) processMessage(ctx context.Context, msg, rh *string) (obj *Object, action int) {
	// log.Println("Processing Event Message...")
	action = Ignore // Default to ignore

	if strings.Contains(*msg, `"s3:TestEvent"`) {
		// Once S3 Notification is set up, a TestEvent message will be generated by service.
		// Delete the test message
		log.Println("Test Event Message received, deleting the message...")
		w.sqs.DeleteMessage(ctx, rh)
		return
	}

	// Simply check if msg body contains "eventSource" to determine if it's a event message
	// might need to change in the future
	if strings.Contains(*msg, `"eventSource":`) {

		event := newS3Event(msg)

		// log.Println(*event)
		// log.Printf("Event is %s", event.Records[0].EventName)

		// There shouldn't be more than 1 record in the event message
		if len(event.Records) > 1 {
			log.Println("Warning - Found event message with more than 1 record, Skipped...")
			return
		}

		if event.Records[0].EventSource != "aws:s3" {
			log.Println("Error - Event message from Unknown source, expect S3 event message only")
			return
		}

		log.Printf("Received an event message of %s, start processing...\n", event.Records[0].EventName)

		obj = &event.Records[0].S3.Object
		obj.Key = unescape(&obj.Key)
		seq := getHex(&event.Records[0].S3.Sequencer)

		var oldSeq int64 = 0
		// Get old sequencer from DynamoDB
		item, _ := w.db.QueryItem(ctx, &event.Records[0].S3.Key)
		if item != nil {
			oldSeq = getHex(&item.Sequencer)
		}

		// equals might be a retry
		if seq < oldSeq {
			log.Printf("Old Event, ignored")
			action = Ignore
		}

		if strings.HasPrefix(event.Records[0].EventName, "ObjectRemoved") {
			action = Delete
		} else if strings.HasPrefix(event.Records[0].EventName, "ObjectCreated") {
			action = Transfer
		} else {
			log.Printf("Unknown S3 Event %s, do nothing", event.Records[0].EventName)
		}
	} else {
		obj = newObject(msg)
		action = Transfer
	}
	return
}