src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java [370:388]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                                                                                            messageQueue))
                                                            .thenAccept(
                                                                    future -> {
                                                                        try {
                                                                            result.put(
                                                                                    messageQueue,
                                                                                    future.get());
                                                                        } catch (Exception e) {
                                                                            LOG.error(
                                                                                    "Consumer offsets retriever fetch committed offset error",
                                                                                    e);
                                                                        }
                                                                    }))
                                    .toArray(CompletableFuture[]::new))
                    .join();
            return result;
        }

        @Override
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java [428:446]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                                                                                            messageQueue))
                                                            .thenAccept(
                                                                    future -> {
                                                                        try {
                                                                            result.put(
                                                                                    messageQueue,
                                                                                    future.get());
                                                                        } catch (Exception e) {
                                                                            LOG.error(
                                                                                    "Consumer offsets retriever fetch committed offset error",
                                                                                    e);
                                                                        }
                                                                    }))
                                    .toArray(CompletableFuture[]::new))
                    .join();
            return result;
        }

        @Override
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



