public static void main()

in src/main/java/com/aliyun/emr/example/spark/SparkTableStoreJavaDemo.java [29:72]


    public static void main(String[] args) {
        String accessKeyId = args[0];
        String accessKeySecret = args[1];
        Filter filter = new Filter(Filter.CompareOperator.GREATER_THAN,"PK", ColumnValue.fromLong(-1000));
        List<String> list = new ArrayList<>();
        list.add("VALUE");
        TableStoreFilterWritable tableStoreFilterWritable = new TableStoreFilterWritable(filter, list);

        String endpoint = args[2];
        String instance = args[3];
        String tableName = args[4];
        String primaryKeyColumnName = args[5];
        ComputeParams computeParams = new ComputeParams(100, 1, ComputeParameters.ComputeMode.Auto.name());
        SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 5: Spark TableStore Demo (Java)");
        JavaSparkContext sc = null;
        try {
            sc = new JavaSparkContext(sparkConf);
            Configuration hadoopConf = new Configuration();
            hadoopConf.set("computeParams", computeParams.serialize());
            hadoopConf.set("tableName", tableName);
            hadoopConf.set("filters", tableStoreFilterWritable.serialize());
            TableStore.setCredential(
                    hadoopConf,
                    new Credential(accessKeyId, accessKeySecret, null));
            Endpoint ep = new Endpoint(endpoint, instance);
            TableStore.setEndpoint(hadoopConf, ep);
            com.aliyun.openservices.tablestore.hadoop.TableStoreInputFormat.addCriteria(hadoopConf,
                    fetchCriteria(tableName, primaryKeyColumnName));
            JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
                    hadoopConf, com.aliyun.openservices.tablestore.hadoop.TableStoreInputFormat.class,
                    PrimaryKeyWritable.class, RowWritable.class);
            System.out.println(
                    new Formatter().format("TOTAL: %d", rdd.count()).toString());
            rdd.take(10).forEach((primaryKeyWritableRowWritableTuple2) -> {
                System.out.println(String.format("Key: %s, VALUE: %s",
                        primaryKeyWritableRowWritableTuple2._1.getPrimaryKey().toString(),
                        primaryKeyWritableRowWritableTuple2._2.getRow().toString()));
            });
        } finally {
            if (sc != null) {
                sc.close();
            }
        }
    }