protected void readChangeRecords()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java [359:460]


    protected void readChangeRecords() throws InterruptedException, TimeoutException {
        if (resolvedTimestamp > 0) {
            obReaderConfig.updateCheckpoint(Long.toString(resolvedTimestamp));
            LOG.info("Restore from timestamp: {}", resolvedTimestamp);
        }

        ClientConf clientConf =
                ClientConf.builder()
                        .clientId(
                                logProxyClientId != null
                                        ? logProxyClientId
                                        : String.format(
                                                "%s_%s_%s",
                                                ClientUtil.generateClientId(),
                                                Thread.currentThread().getId(),
                                                tenantName))
                        .maxReconnectTimes(0)
                        .connectTimeoutMs((int) connectTimeout.toMillis())
                        .build();

        logProxyClient = new LogProxyClient(logProxyHost, logProxyPort, obReaderConfig, clientConf);

        final CountDownLatch latch = new CountDownLatch(1);

        logProxyClient.addListener(
                new RecordListener() {

                    boolean started = false;

                    @Override
                    public void notify(LogMessage message) {
                        switch (message.getOpt()) {
                            case HEARTBEAT:
                            case BEGIN:
                                if (!started) {
                                    started = true;
                                    latch.countDown();
                                }
                                break;
                            case INSERT:
                            case UPDATE:
                            case DELETE:
                                if (!started) {
                                    break;
                                }
                                SourceRecord record = getChangeRecord(message);
                                if (record != null) {
                                    changeRecordBuffer.add(record);
                                }
                                break;
                            case COMMIT:
                                changeRecordBuffer.forEach(
                                        r -> {
                                            try {
                                                deserializer.deserialize(r, outputCollector);
                                            } catch (Exception e) {
                                                throw new FlinkRuntimeException(e);
                                            }
                                        });
                                changeRecordBuffer.clear();
                                long timestamp = Long.parseLong(message.getSafeTimestamp());
                                if (timestamp > resolvedTimestamp) {
                                    resolvedTimestamp = timestamp;
                                }
                                break;
                            case DDL:
                                // TODO record ddl and remove expired table schema
                                LOG.trace(
                                        "Ddl: {}",
                                        message.getFieldList().get(0).getValue().toString());
                                break;
                            default:
                                throw new UnsupportedOperationException(
                                        "Unsupported type: " + message.getOpt());
                        }
                    }

                    @Override
                    public void onException(LogProxyClientException e) {
                        logProxyClientException = e;
                        logProxyClient.stop();
                    }
                });

        LOG.info(
                "Try to start LogProxyClient with client id: {}, config: {}",
                clientConf.getClientId(),
                obReaderConfig);
        logProxyClient.start();

        if (!latch.await(connectTimeout.getSeconds(), TimeUnit.SECONDS)) {
            throw new TimeoutException(
                    "Timeout to receive log messages in LogProxyClient.RecordListener");
        }
        LOG.info("LogProxyClient started successfully");

        logProxyClient.join();

        if (logProxyClientException != null) {
            throw new RuntimeException("LogProxyClient exception", logProxyClientException);
        }
    }