in src/main/java/com/aws/logaggregator/connector/StorageConnector.java [82:144]
public void writeLogData(Dataset dataset, SparkSession session, LogAppInitializer initBean) {
Map<String, String> ancillaryOptions = initBean.getDestination("storage").getConfigoptions();
Map<String, String> engineoptions = initBean.getDestination("storage").getSparkoptions();
String outputformat = ancillaryOptions.get("outputformat");
String savemode = ancillaryOptions.get("savemode");
String partitioncolumns = ancillaryOptions.get("partitioncolumns");
String outputDirectory = ancillaryOptions.get("outputlocation");
String partitionStrategy = ancillaryOptions.get("partitionstrategy");
int numberofOutputPartitons = 1;
if (ancillaryOptions.get("numberofoutputpartitions") != null) {
numberofOutputPartitons = Integer.valueOf(ancillaryOptions.get("numberofoutputpartitions"));
}
if (partitioncolumns != null && !"".equalsIgnoreCase(partitioncolumns.trim())) {
String[] pcolumns = partitioncolumns.split(",");
if (engineoptions != null && engineoptions.size() > 0) {
if (partitionStrategy != null && partitionStrategy.equalsIgnoreCase("repartition")) {
dataset.repartition(numberofOutputPartitons).write().options(engineoptions).format(outputformat).mode(savemode).
partitionBy(pcolumns).save(outputDirectory);
} else {
dataset.coalesce(numberofOutputPartitons).write().options(engineoptions).format(outputformat).mode(savemode).
partitionBy(pcolumns).save(outputDirectory);
}
} else {
if (partitionStrategy != null && partitionStrategy.equalsIgnoreCase("repartition")) {
dataset.repartition(numberofOutputPartitons).write().format(outputformat).mode(savemode).
partitionBy(pcolumns).save(outputDirectory);
} else {
dataset.coalesce(numberofOutputPartitons).write().format(outputformat).mode(savemode).
partitionBy(pcolumns).save(outputDirectory);
}
}
} else {
if (engineoptions != null && engineoptions.size() > 0) {
if (partitionStrategy != null && partitionStrategy.equalsIgnoreCase("repartition")) {
dataset.repartition(numberofOutputPartitons).write().options(engineoptions).format(outputformat).mode(savemode).save(outputDirectory);
} else {
dataset.coalesce(numberofOutputPartitons).write().options(engineoptions).format(outputformat).mode(savemode).save(outputDirectory);
}
} else {
if (partitionStrategy != null && partitionStrategy.equalsIgnoreCase("repartition")) {
dataset.repartition(numberofOutputPartitons).write().format(outputformat).mode(savemode).save(outputDirectory);
} else {
dataset.coalesce(numberofOutputPartitons).write().format(outputformat).mode(savemode).save(outputDirectory);
}
}
}
}