public void open()

in connectors/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java [94:158]


  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    Session session =
        new Session.Builder().username(user).password(password).nodeUrls(nodeUrls).build();
    session.open(false);
    try (SessionDataSet dataSet =
        session.executeQueryStatement(String.format("show pipe %s", pipeName))) {
      if (!dataSet.hasNext()) {
        String createPipeCommand;
        if (Options.CDCMode.REALTIME.equals(mode)) {
          createPipeCommand =
              String.format(
                  "CREATE PIPE %s\n"
                      + "WITH EXTRACTOR (\n"
                      + "'extractor' = 'iotdb-extractor',\n"
                      + "'extractor.history.enable' = 'false',\n"
                      + "'extractor.pattern' = '%s',\n"
                      + ") WITH CONNECTOR (\n"
                      + "'connector' = 'websocket-connector',\n"
                      + "'connector.websocket.port' = '%d',\n"
                      // avoid to reuse the pipe's connector
                      + "'connector.websocket.id' = '%d'"
                      + ")",
                  pipeName, pattern, cdcPort, System.currentTimeMillis());
        } else {
          createPipeCommand =
              String.format(
                  "CREATE PIPE %s\n"
                      + "WITH EXTRACTOR (\n"
                      + "'extractor' = 'iotdb-extractor',\n"
                      + "'extractor.pattern' = '%s',\n"
                      + ") WITH CONNECTOR (\n"
                      + "'connector' = 'websocket-connector',\n"
                      + "'connector.websocket.port' = '%d',\n"
                      // avoid to reuse the pipe's connector
                      + "'connector.websocket.id' = '%d'"
                      + ")",
                  pipeName, pattern, cdcPort, System.currentTimeMillis());
        }

        session.executeNonQueryStatement(createPipeCommand);
        session.executeNonQueryStatement(String.format("start pipe %s", pipeName));
      } else {
        RowRecord pipe = dataSet.next();
        String pipePattern = pipe.getFields().get(3).getStringValue().split(",")[0].split("=")[1];
        if (!pipePattern.equals(this.pattern)) {
          throw new IllegalOptionException(
              String.format(
                  "The CDC task `%s` has been created by pattern `%s`.",
                  this.taskName, this.pattern));
        }
        String status = pipe.getFields().get(2).getStringValue();
        if ("STOPPED".equals(status)) {
          session.executeNonQueryStatement(String.format("start pipe %s", pipeName));
        }
      }
    }
    session.close();

    consumeExecutor = Executors.newFixedThreadPool(1);
    for (String nodeUrl : nodeUrls) {
      URI uri = new URI(String.format("ws://%s:%s", nodeUrl.split(":")[0], cdcPort));
      socketClients.add(initAndGet(uri));
    }
  }