public void writeLogData()

in src/main/java/com/aws/logaggregator/connector/StorageConnector.java [82:144]


    public void writeLogData(Dataset dataset, SparkSession session, LogAppInitializer initBean) {

        Map<String, String> ancillaryOptions = initBean.getDestination("storage").getConfigoptions();
        Map<String, String> engineoptions = initBean.getDestination("storage").getSparkoptions();
        String outputformat = ancillaryOptions.get("outputformat");
        String savemode = ancillaryOptions.get("savemode");
        String partitioncolumns = ancillaryOptions.get("partitioncolumns");

        String outputDirectory = ancillaryOptions.get("outputlocation");
        String partitionStrategy = ancillaryOptions.get("partitionstrategy");

        int numberofOutputPartitons = 1;

        if (ancillaryOptions.get("numberofoutputpartitions") != null) {
            numberofOutputPartitons = Integer.valueOf(ancillaryOptions.get("numberofoutputpartitions"));
        }

        if (partitioncolumns != null && !"".equalsIgnoreCase(partitioncolumns.trim())) {

            String[] pcolumns = partitioncolumns.split(",");
            if (engineoptions != null && engineoptions.size() > 0) {
                if (partitionStrategy != null && partitionStrategy.equalsIgnoreCase("repartition")) {
                    dataset.repartition(numberofOutputPartitons).write().options(engineoptions).format(outputformat).mode(savemode).
                            partitionBy(pcolumns).save(outputDirectory);
                } else {
                    dataset.coalesce(numberofOutputPartitons).write().options(engineoptions).format(outputformat).mode(savemode).
                            partitionBy(pcolumns).save(outputDirectory);
                }

            } else {
                if (partitionStrategy != null && partitionStrategy.equalsIgnoreCase("repartition")) {
                    dataset.repartition(numberofOutputPartitons).write().format(outputformat).mode(savemode).
                            partitionBy(pcolumns).save(outputDirectory);
                } else {
                    dataset.coalesce(numberofOutputPartitons).write().format(outputformat).mode(savemode).
                            partitionBy(pcolumns).save(outputDirectory);
                }

            }


        } else {
            if (engineoptions != null && engineoptions.size() > 0) {
                if (partitionStrategy != null && partitionStrategy.equalsIgnoreCase("repartition")) {
                    dataset.repartition(numberofOutputPartitons).write().options(engineoptions).format(outputformat).mode(savemode).save(outputDirectory);

                } else {
                    dataset.coalesce(numberofOutputPartitons).write().options(engineoptions).format(outputformat).mode(savemode).save(outputDirectory);

                }

            } else {
                if (partitionStrategy != null && partitionStrategy.equalsIgnoreCase("repartition")) {
                    dataset.repartition(numberofOutputPartitons).write().format(outputformat).mode(savemode).save(outputDirectory);
                } else {
                    dataset.coalesce(numberofOutputPartitons).write().format(outputformat).mode(savemode).save(outputDirectory);
                }

            }

        }

    }