public static void run()

in aws-blog-event-driven-batch-analytics/src/main/java/com/amazonaws/bigdatablog/edba/emr/ProcessVendorTrasactions.java [35:96]


	public static void run(String jobInputParam) throws Exception{
		
    	List<StructField> schemaFields = new ArrayList<StructField>();
    	schemaFields.add(DataTypes.createStructField("vendor_id", DataTypes.StringType, true));
    	schemaFields.add(DataTypes.createStructField("trans_amount", DataTypes.StringType, true));
    	schemaFields.add(DataTypes.createStructField("trans_type", DataTypes.StringType, true));
    	schemaFields.add(DataTypes.createStructField("item_id", DataTypes.StringType, true));
    	schemaFields.add(DataTypes.createStructField("trans_date", DataTypes.StringType, true));
    	StructType schema = DataTypes.createStructType(schemaFields);

    	SparkConf conf = new SparkConf().setAppName("Spark Redshift No Access-Keys");
    	SparkSession spark = SparkSession.builder().config(conf).getOrCreate();	
		JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
				
		String redshiftJDBCURL=props.getProperty("redshift.jdbc.url");
		String s3TempPath = props.getProperty("s3.temp.path");
		System.out.println("props"+props);
		
		JavaRDD<Row> salesRDD = sc.textFile(jobInputParam).
				map(new Function<String,Row>(){public Row call(String saleRec){ String[] fields = saleRec.split(",");
			      return RowFactory.create(fields[0], fields[1],fields[2],fields[3],fields[4]);}});
		Dataset<Row> salesDF = spark.createDataFrame(salesRDD,schema);
		Dataset<Row> vendorItemSaleAmountDF = salesDF.filter(salesDF.col("trans_type").equalTo("4")).groupBy(salesDF.col("vendor_id"),salesDF.col("item_id"),salesDF.col("trans_date")).agg(ImmutableMap.of("trans_amount", "sum"));
		Dataset<Row> vendorItemTaxAmountDF = salesDF.filter(salesDF.col("trans_type").equalTo("5")).groupBy(salesDF.col("vendor_id"),salesDF.col("item_id"),salesDF.col("trans_date")).agg(ImmutableMap.of("trans_amount", "sum"));
		Dataset<Row> vendorItemDiscountAmountDF = salesDF.filter(salesDF.col("trans_type").equalTo("6")).groupBy(salesDF.col("vendor_id"),salesDF.col("item_id"),salesDF.col("trans_date")).agg(ImmutableMap.of("trans_amount", "sum"));
		String[] joinColArray = {"vendor_id","item_id","trans_date"};
		vendorItemSaleAmountDF.printSchema();
		Seq<String> commonJoinColumns = scala.collection.JavaConversions.asScalaBuffer(Arrays.asList(joinColArray)).seq();

		Dataset<Row> vendorAggregatedDF = vendorItemSaleAmountDF.join(vendorItemTaxAmountDF,commonJoinColumns,"left_outer")
								 .join(vendorItemDiscountAmountDF,commonJoinColumns,"left_outer")
								 .toDF("vendor_id","item_id","trans_date","sale_amount","tax_amount","discount_amount");
		
		vendorAggregatedDF.printSchema();
		DefaultAWSCredentialsProviderChain provider = new DefaultAWSCredentialsProviderChain();
		AWSSessionCredentials creds  = (AWSSessionCredentials) provider.getCredentials();
		
		String appendix=new StringBuilder(String.valueOf(System.currentTimeMillis())).append("_").append(String.valueOf(new Random().nextInt(10)+1)).toString();
		String vendorTransSummarySQL = new StringBuilder("begin transaction;delete from vendortranssummary using vendortranssummary_temp")
				 .append(appendix)
				 .append(" where vendortranssummary.vendor_id=vendortranssummary_temp")
				 .append(appendix)
				 .append(".vendor_id and vendortranssummary.item_id=vendortranssummary_temp")
				 .append(appendix)
				 .append(".item_id and vendortranssummary.trans_date = vendortranssummary_temp")
				 .append(appendix)
				 .append(".trans_date;")
				 .append("insert into vendortranssummary select * from vendortranssummary_temp")
				 .append(appendix)
				 .append(";drop table vendortranssummary_temp")
				 .append(appendix)
				 .append(";end transaction;").toString();
		vendorAggregatedDF.write().format("com.databricks.spark.redshift").option("url", redshiftJDBCURL)
	    .option("dbtable", "vendortranssummary_temp"+appendix)
	    .option("usestagingtable","false")
	    .option("postactions",vendorTransSummarySQL)
	    .option("temporary_aws_access_key_id", creds.getAWSAccessKeyId())
	    .option("temporary_aws_secret_access_key",creds.getAWSSecretKey())
	    .option("temporary_aws_session_token", creds.getSessionToken())
	    .option("tempdir", s3TempPath).mode(SaveMode.Overwrite).save();
			
	}