in BDB-701-Moving-From-Transactional-to-Stateful-Batch-Processing/emr/src/main/java/com/amazon/aws/blog/StatefulMain.java [40:100]
public void run(String[] args) {
final String statelessFile = args[0];
final String preFetchedFile = args[1];
final String outputName = args[2];
final String inflightFiles;
if (args.length > 3) {
inflightFiles = args[3];
} else {
inflightFiles = null;
}
System.out.println("Run in beginning");
// Creates a new Spark session
final SparkSession session = SparkSession.builder().appName("Demo").getOrCreate();
DataFrameReader dataFrameReader = session.read();
System.out.println("Before reading");
// Stateless artifacts produced by the Stateless Ingestion Engine
Dataset<Row> statelessRows = dataFrameReader.option("header", "true").csv(statelessFile);
// Existing stateful artifacts produced by the Pre-Fetcher
Dataset<Row> preFetchedRows = dataFrameReader.option("header", "false").csv(preFetchedFile);
if (preFetchedRows.count() > 0) {
preFetchedRows = preFetchedRows.toDF(ORDER_ID, AMOUNT, VERSION);
} else {
// Sometimes there aren't any existing stateful artifacts, so we use a placeholder empty data frame
preFetchedRows = session.createDataFrame(new LinkedList<>(), statelessRows.schema());
}
Dataset<Row> existingRows;
if (inflightFiles != null) {
// In-flight files contain stateful artifacts that have been indexed yet
Dataset<Row> inflightRows = dataFrameReader.option("header", "false").csv(inflightFiles)
.toDF(ORDER_ID, AMOUNT, VERSION);
existingRows = preFetchedRows.union(inflightRows);
} else {
existingRows = preFetchedRows;
}
// Keep only the rows which have the latest version between the pre-fetched rows and the unindexed rows
Dataset<Row> latestVersions = existingRows.withColumn("temp_col", row_number().over(Window.partitionBy(ORDER_ID).orderBy(desc(VERSION))));
latestVersions = latestVersions.where(latestVersions.col("temp_col").equalTo(1)).drop(latestVersions.col("temp_col"));
System.out.println("Latest versions:");
System.out.println(latestVersions.toString());
// Update the existing rows with the new stateless artifacts
latestVersions = latestVersions.union(statelessRows)
.groupBy(col(ORDER_ID))
.agg(sum(AMOUNT), max(VERSION))
.withColumnRenamed("sum(amount)", AMOUNT)
.withColumnRenamed("max(version)", VERSION)
.withColumn(VERSION, expr(VERSION + " +1").cast("integer"));
// Write the grouped data back to S3 in one single coalesced file
latestVersions.coalesce(1).write().option("header", "false").csv(outputName);
session.stop();
}