in java/dataflow-connector-examples/src/main/java/com/google/cloud/bigtable/dataflow/example/CsvImport.java [211:245]
public static void main(String[] args) throws IllegalArgumentException {
BigtableCsvOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableCsvOptions.class);
if (isBlank(options.getInputFile())) {
throw new IllegalArgumentException("Please provide value for inputFile.");
}
if (options.getHeaders() == null || options.getHeaders().size() == 0) {
throw new IllegalArgumentException("Please provide value for headers.");
}
CloudBigtableTableConfiguration config =
new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.build();
Pipeline p = Pipeline.create(options);
p.apply("DefineCsvFileMatches", FileIO.match().filepattern(options.getInputFile()))
.apply("MatchCsvFiles", FileIO.readMatches())
.apply("ReadCsvFiles", TextIO.readFiles())
.apply("TransformParsingsToBigtable",
ParDo.of(
MutationTransform.builder()
.setCsvFormat(options.getCsvFormat().getFormat())
.setCsvHeaders(options.getHeaders())
.setRowKeyColumnName(options.getRowKeyHeader())
.setBigTableFamilyName(options.getBigtableColumnFamilyName())
.build()))
.apply("WriteToBigtable", CloudBigtableIO.writeToTable(config));
p.run().waitUntilFinish();
}