in connectors/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java [94:158]
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Session session =
new Session.Builder().username(user).password(password).nodeUrls(nodeUrls).build();
session.open(false);
try (SessionDataSet dataSet =
session.executeQueryStatement(String.format("show pipe %s", pipeName))) {
if (!dataSet.hasNext()) {
String createPipeCommand;
if (Options.CDCMode.REALTIME.equals(mode)) {
createPipeCommand =
String.format(
"CREATE PIPE %s\n"
+ "WITH EXTRACTOR (\n"
+ "'extractor' = 'iotdb-extractor',\n"
+ "'extractor.history.enable' = 'false',\n"
+ "'extractor.pattern' = '%s',\n"
+ ") WITH CONNECTOR (\n"
+ "'connector' = 'websocket-connector',\n"
+ "'connector.websocket.port' = '%d',\n"
// avoid to reuse the pipe's connector
+ "'connector.websocket.id' = '%d'"
+ ")",
pipeName, pattern, cdcPort, System.currentTimeMillis());
} else {
createPipeCommand =
String.format(
"CREATE PIPE %s\n"
+ "WITH EXTRACTOR (\n"
+ "'extractor' = 'iotdb-extractor',\n"
+ "'extractor.pattern' = '%s',\n"
+ ") WITH CONNECTOR (\n"
+ "'connector' = 'websocket-connector',\n"
+ "'connector.websocket.port' = '%d',\n"
// avoid to reuse the pipe's connector
+ "'connector.websocket.id' = '%d'"
+ ")",
pipeName, pattern, cdcPort, System.currentTimeMillis());
}
session.executeNonQueryStatement(createPipeCommand);
session.executeNonQueryStatement(String.format("start pipe %s", pipeName));
} else {
RowRecord pipe = dataSet.next();
String pipePattern = pipe.getFields().get(3).getStringValue().split(",")[0].split("=")[1];
if (!pipePattern.equals(this.pattern)) {
throw new IllegalOptionException(
String.format(
"The CDC task `%s` has been created by pattern `%s`.",
this.taskName, this.pattern));
}
String status = pipe.getFields().get(2).getStringValue();
if ("STOPPED".equals(status)) {
session.executeNonQueryStatement(String.format("start pipe %s", pipeName));
}
}
}
session.close();
consumeExecutor = Executors.newFixedThreadPool(1);
for (String nodeUrl : nodeUrls) {
URI uri = new URI(String.format("ws://%s:%s", nodeUrl.split(":")[0], cdcPort));
socketClients.add(initAndGet(uri));
}
}