src/main/java/com/aliyun/dts/subscribe/clients/metastore/AbstractUserMetaStore.java [33:70]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private String toJson(StoreElement storeElement) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put(GROUP_ID_NAME, storeElement.groupName);
        JSONArray jsonArray = new JSONArray();
        storeElement.streamCheckpoint.forEach((tp, checkpoint) -> {
            JSONObject streamCheckpointJsonObject = new JSONObject();
            streamCheckpointJsonObject.put(TOPIC_NAME, tp.topic());
            streamCheckpointJsonObject.put(PARTITION_NAME, tp.partition());
            streamCheckpointJsonObject.put(OFFSET_NAME, checkpoint.getOffset());
            streamCheckpointJsonObject.put(TIMESTAMP_NAME, checkpoint.getTimeStamp());
            streamCheckpointJsonObject.put(INFO_NAME, checkpoint.getInfo());
            jsonArray.add(streamCheckpointJsonObject);
        });

        jsonObject.put(STREAM_CHECKPOINT_NAME, jsonArray);
        return jsonObject.toJSONString();
    }

    private StoreElement fromString(String jsonString) {
        JSONObject jsonObject = JSONObject.parseObject(jsonString);
        String groupName = jsonObject.getString(GROUP_ID_NAME);
        JSONArray streamCheckpointJsonObject = jsonObject.getJSONArray(STREAM_CHECKPOINT_NAME);
        Map<TopicPartition, Checkpoint> checkpointInfo = new HashMap<>();
        for (Object o : streamCheckpointJsonObject) {
            JSONObject tpAndCheckpoint = (JSONObject) o;
            String topic = tpAndCheckpoint.getString(TOPIC_NAME);
            int partition = tpAndCheckpoint.getInteger(PARTITION_NAME);
            long offset = tpAndCheckpoint.getLong(OFFSET_NAME);
            long timestamp = tpAndCheckpoint.getLong(TIMESTAMP_NAME);
            String info = tpAndCheckpoint.getString(INFO_NAME);
            checkpointInfo.put(new TopicPartition(topic, partition), new Checkpoint(new TopicPartition(topic, partition), timestamp, offset, info));
        }

        return new StoreElement(groupName, checkpointInfo);
    }

    @Override
    public Future<Checkpoint> serializeTo(TopicPartition topicPartition, String groupID, Checkpoint value) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/main/java/com/aliyun/dts/subscribe/clients/metastore/LocalFileMetaStore.java [43:80]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private String toJson(StoreElement storeElement) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put(GROUP_ID_NAME, storeElement.groupName);
        JSONArray jsonArray = new JSONArray();
        storeElement.streamCheckpoint.forEach((tp, checkpoint) -> {
            JSONObject streamCheckpointJsonObject = new JSONObject();
            streamCheckpointJsonObject.put(TOPIC_NAME, tp.topic());
            streamCheckpointJsonObject.put(PARTITION_NAME, tp.partition());
            streamCheckpointJsonObject.put(OFFSET_NAME, checkpoint.getOffset());
            streamCheckpointJsonObject.put(TIMESTAMP_NAME, checkpoint.getTimeStamp());
            streamCheckpointJsonObject.put(INFO_NAME, checkpoint.getInfo());
            jsonArray.add(streamCheckpointJsonObject);
        });

        jsonObject.put(STREAM_CHECKPOINT_NAME, jsonArray);
        return jsonObject.toJSONString();
    }

    private StoreElement fromString(String jsonString) {
        JSONObject jsonObject = JSONObject.parseObject(jsonString);
        String groupName = jsonObject.getString(GROUP_ID_NAME);
        JSONArray streamCheckpointJsonObject = jsonObject.getJSONArray(STREAM_CHECKPOINT_NAME);
        Map<TopicPartition, Checkpoint> checkpointInfo = new HashMap<>();
        for (Object o : streamCheckpointJsonObject) {
            JSONObject tpAndCheckpoint = (JSONObject) o;
            String topic = tpAndCheckpoint.getString(TOPIC_NAME);
            int partition = tpAndCheckpoint.getInteger(PARTITION_NAME);
            long offset = tpAndCheckpoint.getLong(OFFSET_NAME);
            long timestamp = tpAndCheckpoint.getLong(TIMESTAMP_NAME);
            String info = tpAndCheckpoint.getString(INFO_NAME);
            checkpointInfo.put(new TopicPartition(topic, partition), new Checkpoint(new TopicPartition(topic, partition), timestamp, offset, info));
        }

        return new StoreElement(groupName, checkpointInfo);
    }

    @Override
    public Future<Checkpoint> serializeTo(TopicPartition topicPartition, String groupID, Checkpoint value) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



