src/main/java/com/aliyun/dms/subscribe/clients/UserRecordGeneratorWithDBMapping.java [57:77]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                DefaultUserRecord defaultUserRecord = new DefaultUserRecord(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(),
                        record,
                        (tp, commitRecord, offset, metadata) -> {
                            recordStoreOutCountSensor.record(1);
                            recordStoreOutByteSensor.record(consumerRecord.value().length);
                            commitCheckpoint = new Checkpoint(tp, commitRecord.getSourceTimestamp(), offset, metadata);
                            commit();
                        });

                int offerTryCount = 0;

                while (!offerRecord(1000, TimeUnit.MILLISECONDS, defaultUserRecord) && !consumerContext.isExited()) {
                    if (++offerTryCount % 10 == 0) {
                        log.info("UserRecordGenerator: offer user record has failed for a period (10s) [ " + record + "]");
                    }
                }

                toProcessRecord.poll();
            } catch (Exception e) {
                log.error("UserRecordGenerator: process record failed, raw consumer record [" + toProcess + "], parsed record [" + record + "], cause " + e.getMessage(), e);
                consumerContext.exit();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/main/java/com/aliyun/dts/subscribe/clients/recordgenerator/UserRecordGenerator.java [97:117]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                DefaultUserRecord defaultUserRecord = new DefaultUserRecord(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(),
                        record,
                        (tp, commitRecord, offset, metadata) -> {
                            recordStoreOutCountSensor.record(1);
                            recordStoreOutByteSensor.record(consumerRecord.value().length);
                            commitCheckpoint = new Checkpoint(tp, commitRecord.getSourceTimestamp(), offset, metadata);
                            commit();
                        });

                int offerTryCount = 0;

                while (!offerRecord(1000, TimeUnit.MILLISECONDS, defaultUserRecord) && !consumerContext.isExited()) {
                    if (++offerTryCount % 10 == 0) {
                        log.info("UserRecordGenerator: offer user record has failed for a period (10s) [ " + record + "]");
                    }
                }

                toProcessRecord.poll();
            } catch (Exception e) {
                log.error("UserRecordGenerator: process record failed, raw consumer record [" + toProcess + "], parsed record [" + record + "], cause " + e.getMessage(), e);
                consumerContext.exit();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



