private void loadStruct()

in hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java [227:276]


    private void loadStruct(InputStruct struct, InputReader reader) {
        LOG.info("Start loading '{}'", struct);
        LoadMetrics metrics = this.context.summary().metrics(struct);
        metrics.startInFlight();

        ParseTaskBuilder taskBuilder = new ParseTaskBuilder(this.context, struct);
        final int batchSize = this.context.options().batchSize;
        List<Line> lines = new ArrayList<>(batchSize);
        for (boolean finished = false; !finished;) {
            if (this.context.stopped()) {
                break;
            }
            try {
                // Read next line from data source
                if (reader.hasNext()) {
                    lines.add(reader.next());
                    metrics.increaseReadSuccess();
                } else {
                    finished = true;
                }
            } catch (ReadException e) {
                metrics.increaseReadFailure();
                this.handleReadFailure(struct, e);
            }
            // If read max allowed lines, stop loading
            boolean reachedMaxReadLines = this.reachedMaxReadLines();
            if (reachedMaxReadLines) {
                finished = true;
            }
            if (lines.size() >= batchSize || finished) {
                List<ParseTaskBuilder.ParseTask> tasks = taskBuilder.build(lines);
                for (ParseTaskBuilder.ParseTask task : tasks) {
                    this.executeParseTask(struct, task.mapping(), task);
                }
                // Confirm offset to avoid lost records
                reader.confirmOffset();
                this.context.newProgress().markLoaded(struct, finished);

                this.handleParseFailure();
                if (reachedMaxReadLines) {
                    LOG.warn("Read lines exceed limit, stopped loading tasks");
                    this.context.stopLoading();
                }
                lines = new ArrayList<>(batchSize);
            }
        }

        metrics.stopInFlight();
        LOG.info("Finish loading '{}'", struct);
    }