public static void main()

in flink-connector/flink-examples-gcp-pubsub/pubsub-streaming/src/main/java/com/google/pubsub/flink/PubSubExample.java [43:106]


  public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    final ParameterTool parameterTool = ParameterTool.fromArgs(args);
    // Parse sink topic from parameters.
    String topicName = parameterTool.get("sink-topic");
    String topicProject = parameterTool.get("project");
    if (topicName == null) {
      System.out.println("Failed to start! The parameter --sink-topic must be specified.");
    }
    if (ProjectTopicName.isParsableFrom(topicName)) {
      ProjectTopicName projectTopicName = ProjectTopicName.parse(topicName);
      topicName = projectTopicName.getTopic();
      topicProject = projectTopicName.getProject();
    }
    if (topicProject == null) {
      System.out.println(
          "Failed to start! The sink topic project must be specified using either [--project"
              + " PROJECT-NAME] or [--sink-topic projects/PROJECT-NAME/topics/TOPIC-NAME].");
      return;
    }
    // Parse source subscription from parameters.
    String subscriptionName = parameterTool.get("source-subscription");
    String subscriptionProject = parameterTool.get("project");
    if (subscriptionName == null) {
      System.out.println("Failed to start! The parameter --source-subscription must be specified.");
    }
    if (ProjectSubscriptionName.isParsableFrom(subscriptionName)) {
      ProjectSubscriptionName projectSubscriptionName =
          ProjectSubscriptionName.parse(subscriptionName);
      subscriptionName = projectSubscriptionName.getSubscription();
      subscriptionProject = projectSubscriptionName.getProject();
    }
    if (subscriptionProject == null) {
      System.out.println(
          "Failed to start! The source subscription project must be specified using either"
              + " [--project PROJECT-NAME] or [--source-subscription"
              + " projects/PROJECT-NAME/subscriptions/SUBSCRIPTION-NAME].");
      return;
    }

    DataStream<String> stream =
        env.fromSource(
            PubSubSource.<String>builder()
                .setDeserializationSchema(
                    PubSubDeserializationSchema.dataOnly(new SimpleStringSchema()))
                .setProjectName(subscriptionProject)
                .setSubscriptionName(subscriptionName)
                .build(),
            WatermarkStrategy.noWatermarks(),
            "PubSubSource");
    stream
        .sinkTo(
            PubSubSink.<String>builder()
                .setSerializationSchema(
                    PubSubSerializationSchema.dataOnly(new SimpleStringSchema()))
                .setProjectName(topicProject)
                .setTopicName(topicName)
                .build())
        .name("PubSubSink");

    // Start a checkpoint every 1000 ms.
    env.enableCheckpointing(1000);
    env.execute("Streaming PubSub Example");
  }