in workshop/workshop-java-repo-vulnerable/src/main/java/com/shipmentEvents/handlers/EventHandler.java [72:106]
private void processShipmentUpdates(final LambdaLogger logger) throws InterruptedException {
final List<String> bucketsToProcess = Constants.BUCKETS_TO_PROCESS;
final Map<String, Pair<Long, String>> latestStatusForTrackingNumber = new HashMap<String, Pair<Long, String>>();
final Map<String, List<KeyVersion>> filesToDelete = new HashMap<String, List<DeleteObjectsRequest.KeyVersion>>();
for (final String bucketName : bucketsToProcess) {
final List<KeyVersion> filesProcessed = processEventsInBucket(bucketName, logger, latestStatusForTrackingNumber);
filesToDelete.put(bucketName, filesProcessed);
}
final AmazonS3 s3Client = EventHandler.getS3Client();
//Create a new file in the Constants.SUMMARY_BUCKET
logger.log("Map of statuses -> " + latestStatusForTrackingNumber);
String summaryUpdateName = Long.toString(System.currentTimeMillis());
EventHandler.getS3Client().putObject(Constants.SUMMARY_BUCKET, summaryUpdateName, latestStatusForTrackingNumber.toString());
long expirationTime = System.currentTimeMillis() + Duration.ofMinutes(1).toMillis();
while(System.currentTimeMillis() < expirationTime) {
if (s3Client.doesObjectExist(Constants.SUMMARY_BUCKET, summaryUpdateName)) {
break;
}
logger.log("waiting for file to be created " + summaryUpdateName);
Thread.sleep(1000);
}
// Before we delete the shipment updates make sure the summary update file exists
if (EventHandler.getS3Client().doesObjectExist(Constants.SUMMARY_BUCKET, summaryUpdateName)) {
deleteProcessedFiles(filesToDelete);
logger.log("All updates successfully processed");
} else {
throw new RuntimeException("Failed to write summary status, will be retried in 15 minutes");
}
}