public SamplerResponse sample()

in indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java [97:292]


  public SamplerResponse sample(
      final InputSource inputSource,
      // inputFormat can be null only if inputSource.needsFormat() = false or parser is specified.
      @Nullable final InputFormat inputFormat,
      @Nullable final DataSchema dataSchema,
      @Nullable final SamplerConfig samplerConfig
  )
  {
    Preconditions.checkNotNull(inputSource, "inputSource required");
    if (inputSource.needsFormat()) {
      Preconditions.checkNotNull(inputFormat, "inputFormat required");
    }
    final DataSchema nonNullDataSchema = dataSchema == null
                                         ? DEFAULT_DATA_SCHEMA
                                         : dataSchema;
    final SamplerConfig nonNullSamplerConfig = samplerConfig == null
                                               ? SamplerConfig.empty()
                                               : samplerConfig;

    final Closer closer = Closer.create();
    final File tempDir = FileUtils.createTempDir();
    closer.register(() -> FileUtils.deleteDirectory(tempDir));

    try {
      final InputSourceReader reader = buildReader(
          nonNullSamplerConfig,
          nonNullDataSchema,
          inputSource,
          inputFormat,
          tempDir
      );
      try (final CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample();
           final IncrementalIndex index = buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema);
           final Closer closer1 = closer) {
        List<SamplerResponseRow> responseRows = new ArrayList<>(nonNullSamplerConfig.getNumRows());
        int numRowsIndexed = 0;

        while (
            responseRows.size() < nonNullSamplerConfig.getNumRows() &&
            index.getBytesInMemory().get() < nonNullSamplerConfig.getMaxBytesInMemory() &&
            iterator.hasNext()
        ) {
          final InputRowListPlusRawValues inputRowListPlusRawValues = iterator.next();

          final List<Map<String, Object>> rawColumnsList = inputRowListPlusRawValues.getRawValuesList();

          final ParseException parseException = inputRowListPlusRawValues.getParseException();
          if (parseException != null) {
            if (rawColumnsList != null) {
              // add all rows to response
              responseRows.addAll(rawColumnsList.stream()
                                                .map(rawColumns -> new SamplerResponseRow(
                                                    rawColumns,
                                                    null,
                                                    true,
                                                    parseException.getMessage()
                                                ))
                                                .collect(Collectors.toList()));
            } else {
              // no data parsed, add one response row
              responseRows.add(new SamplerResponseRow(null, null, true, parseException.getMessage()));
            }
            continue;
          }

          List<InputRow> inputRows = inputRowListPlusRawValues.getInputRows();
          if (inputRows == null) {
            continue;
          }

          for (int i = 0; i < inputRows.size(); i++) {
            // InputRowListPlusRawValues guarantees the size of rawColumnsList and inputRows are the same
            Map<String, Object> rawColumns = rawColumnsList == null ? null : rawColumnsList.get(i);
            InputRow row = inputRows.get(i);

            //keep the index of the row to be added to responseRows for further use
            final int rowIndex = responseRows.size();
            IncrementalIndexAddResult addResult = index.add(new SamplerInputRow(row, rowIndex), true);
            if (addResult.hasParseException()) {
              responseRows.add(new SamplerResponseRow(
                  rawColumns,
                  null,
                  true,
                  addResult.getParseException().getMessage()
              ));
            } else {
              // store the raw value; will be merged with the data from the IncrementalIndex later
              responseRows.add(new SamplerResponseRow(rawColumns, null, null, null));
              numRowsIndexed++;
            }
          }
        }

        final List<String> columnNames = index.getColumnNames();
        columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);


        for (Row row : index) {
          Map<String, Object> parsed = new LinkedHashMap<>();

          parsed.put(ColumnHolder.TIME_COLUMN_NAME, row.getTimestampFromEpoch());
          columnNames.stream()
                     .filter(k -> !ColumnHolder.TIME_COLUMN_NAME.equals(k))
                     .forEach(k -> parsed.put(k, row.getRaw(k)));

          Number sortKey = row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
          if (sortKey != null) {
            SamplerResponseRow theRow = responseRows.get(sortKey.intValue()).withParsed(parsed);
            responseRows.set(sortKey.intValue(), theRow);

          }
        }

        // make sure size of responseRows meets the input
        if (responseRows.size() > nonNullSamplerConfig.getNumRows()) {
          responseRows = responseRows.subList(0, nonNullSamplerConfig.getNumRows());
        }

        if (nonNullSamplerConfig.getMaxClientResponseBytes() > 0) {
          long estimatedResponseSize = 0;
          boolean limited = false;
          int rowCounter = 0;
          int parsedCounter = 0;
          for (SamplerResponseRow row : responseRows) {
            rowCounter++;
            if (row.getInput() != null) {
              parsedCounter++;
            }
            estimatedResponseSize += jsonMapper.writeValueAsBytes(row).length;
            if (estimatedResponseSize > nonNullSamplerConfig.getMaxClientResponseBytes()) {
              limited = true;
              break;
            }
          }
          if (limited) {
            responseRows = responseRows.subList(0, rowCounter);
            numRowsIndexed = parsedCounter;
          }
        }

        int numRowsRead = responseRows.size();

        List<DimensionSchema> logicalDimensionSchemas = new ArrayList<>();
        List<DimensionSchema> physicalDimensionSchemas = new ArrayList<>();

        RowSignature.Builder signatureBuilder = RowSignature.builder();
        for (final String dimensionName : index.getDimensionNames(true)) {
          if (ColumnHolder.TIME_COLUMN_NAME.equals(dimensionName)) {
            signatureBuilder.addTimeColumn();
          } else if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(dimensionName)) {
            final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimensionName);
            signatureBuilder.add(dimensionDesc.getName(), ColumnType.fromCapabilities(dimensionDesc.getCapabilities()));
            // use explicitly specified dimension schema if it exists
            if (dataSchema != null &&
                dataSchema.getDimensionsSpec() != null &&
                dataSchema.getDimensionsSpec().getSchema(dimensionDesc.getName()) != null) {
              logicalDimensionSchemas.add(dataSchema.getDimensionsSpec().getSchema(dimensionDesc.getName()));
            } else {
              logicalDimensionSchemas.add(
                  DimensionSchema.getDefaultSchemaForBuiltInType(
                      dimensionDesc.getName(),
                      dimensionDesc.getCapabilities()
                  )
              );
            }
            physicalDimensionSchemas.add(
                dimensionDesc.getIndexer().getFormat().getColumnSchema(dimensionDesc.getName())
            );
          }
        }
        for (AggregatorFactory aggregatorFactory : index.getMetricAggs()) {
          if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(aggregatorFactory.getName())) {
            signatureBuilder.add(
                aggregatorFactory.getName(),
                ColumnType.fromCapabilities(index.getColumnCapabilities(aggregatorFactory.getName()))
            );
          }
        }

        return new SamplerResponse(
            numRowsRead,
            numRowsIndexed,
            logicalDimensionSchemas,
            physicalDimensionSchemas,
            signatureBuilder.build(),
            responseRows.stream()
                        .filter(Objects::nonNull)
                        .filter(x -> x.getParsed() != null || x.isUnparseable() != null)
                        .collect(Collectors.toList())
        );
      }
    }
    catch (Exception e) {
      throw new SamplerException(e, "Failed to sample data: %s", e.getMessage());
    }
  }