in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java [359:460]
protected void readChangeRecords() throws InterruptedException, TimeoutException {
if (resolvedTimestamp > 0) {
obReaderConfig.updateCheckpoint(Long.toString(resolvedTimestamp));
LOG.info("Restore from timestamp: {}", resolvedTimestamp);
}
ClientConf clientConf =
ClientConf.builder()
.clientId(
logProxyClientId != null
? logProxyClientId
: String.format(
"%s_%s_%s",
ClientUtil.generateClientId(),
Thread.currentThread().getId(),
tenantName))
.maxReconnectTimes(0)
.connectTimeoutMs((int) connectTimeout.toMillis())
.build();
logProxyClient = new LogProxyClient(logProxyHost, logProxyPort, obReaderConfig, clientConf);
final CountDownLatch latch = new CountDownLatch(1);
logProxyClient.addListener(
new RecordListener() {
boolean started = false;
@Override
public void notify(LogMessage message) {
switch (message.getOpt()) {
case HEARTBEAT:
case BEGIN:
if (!started) {
started = true;
latch.countDown();
}
break;
case INSERT:
case UPDATE:
case DELETE:
if (!started) {
break;
}
SourceRecord record = getChangeRecord(message);
if (record != null) {
changeRecordBuffer.add(record);
}
break;
case COMMIT:
changeRecordBuffer.forEach(
r -> {
try {
deserializer.deserialize(r, outputCollector);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
});
changeRecordBuffer.clear();
long timestamp = Long.parseLong(message.getSafeTimestamp());
if (timestamp > resolvedTimestamp) {
resolvedTimestamp = timestamp;
}
break;
case DDL:
// TODO record ddl and remove expired table schema
LOG.trace(
"Ddl: {}",
message.getFieldList().get(0).getValue().toString());
break;
default:
throw new UnsupportedOperationException(
"Unsupported type: " + message.getOpt());
}
}
@Override
public void onException(LogProxyClientException e) {
logProxyClientException = e;
logProxyClient.stop();
}
});
LOG.info(
"Try to start LogProxyClient with client id: {}, config: {}",
clientConf.getClientId(),
obReaderConfig);
logProxyClient.start();
if (!latch.await(connectTimeout.getSeconds(), TimeUnit.SECONDS)) {
throw new TimeoutException(
"Timeout to receive log messages in LogProxyClient.RecordListener");
}
LOG.info("LogProxyClient started successfully");
logProxyClient.join();
if (logProxyClientException != null) {
throw new RuntimeException("LogProxyClient exception", logProxyClientException);
}
}