in flink-connector/flink-examples-gcp-pubsub/pubsub-streaming/src/main/java/com/google/pubsub/flink/PubSubExample.java [43:106]
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
// Parse sink topic from parameters.
String topicName = parameterTool.get("sink-topic");
String topicProject = parameterTool.get("project");
if (topicName == null) {
System.out.println("Failed to start! The parameter --sink-topic must be specified.");
}
if (ProjectTopicName.isParsableFrom(topicName)) {
ProjectTopicName projectTopicName = ProjectTopicName.parse(topicName);
topicName = projectTopicName.getTopic();
topicProject = projectTopicName.getProject();
}
if (topicProject == null) {
System.out.println(
"Failed to start! The sink topic project must be specified using either [--project"
+ " PROJECT-NAME] or [--sink-topic projects/PROJECT-NAME/topics/TOPIC-NAME].");
return;
}
// Parse source subscription from parameters.
String subscriptionName = parameterTool.get("source-subscription");
String subscriptionProject = parameterTool.get("project");
if (subscriptionName == null) {
System.out.println("Failed to start! The parameter --source-subscription must be specified.");
}
if (ProjectSubscriptionName.isParsableFrom(subscriptionName)) {
ProjectSubscriptionName projectSubscriptionName =
ProjectSubscriptionName.parse(subscriptionName);
subscriptionName = projectSubscriptionName.getSubscription();
subscriptionProject = projectSubscriptionName.getProject();
}
if (subscriptionProject == null) {
System.out.println(
"Failed to start! The source subscription project must be specified using either"
+ " [--project PROJECT-NAME] or [--source-subscription"
+ " projects/PROJECT-NAME/subscriptions/SUBSCRIPTION-NAME].");
return;
}
DataStream<String> stream =
env.fromSource(
PubSubSource.<String>builder()
.setDeserializationSchema(
PubSubDeserializationSchema.dataOnly(new SimpleStringSchema()))
.setProjectName(subscriptionProject)
.setSubscriptionName(subscriptionName)
.build(),
WatermarkStrategy.noWatermarks(),
"PubSubSource");
stream
.sinkTo(
PubSubSink.<String>builder()
.setSerializationSchema(
PubSubSerializationSchema.dataOnly(new SimpleStringSchema()))
.setProjectName(topicProject)
.setTopicName(topicName)
.build())
.name("PubSubSink");
// Start a checkpoint every 1000 ms.
env.enableCheckpointing(1000);
env.execute("Streaming PubSub Example");
}