in indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java [97:292]
public SamplerResponse sample(
final InputSource inputSource,
// inputFormat can be null only if inputSource.needsFormat() = false or parser is specified.
@Nullable final InputFormat inputFormat,
@Nullable final DataSchema dataSchema,
@Nullable final SamplerConfig samplerConfig
)
{
Preconditions.checkNotNull(inputSource, "inputSource required");
if (inputSource.needsFormat()) {
Preconditions.checkNotNull(inputFormat, "inputFormat required");
}
final DataSchema nonNullDataSchema = dataSchema == null
? DEFAULT_DATA_SCHEMA
: dataSchema;
final SamplerConfig nonNullSamplerConfig = samplerConfig == null
? SamplerConfig.empty()
: samplerConfig;
final Closer closer = Closer.create();
final File tempDir = FileUtils.createTempDir();
closer.register(() -> FileUtils.deleteDirectory(tempDir));
try {
final InputSourceReader reader = buildReader(
nonNullSamplerConfig,
nonNullDataSchema,
inputSource,
inputFormat,
tempDir
);
try (final CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample();
final IncrementalIndex index = buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema);
final Closer closer1 = closer) {
List<SamplerResponseRow> responseRows = new ArrayList<>(nonNullSamplerConfig.getNumRows());
int numRowsIndexed = 0;
while (
responseRows.size() < nonNullSamplerConfig.getNumRows() &&
index.getBytesInMemory().get() < nonNullSamplerConfig.getMaxBytesInMemory() &&
iterator.hasNext()
) {
final InputRowListPlusRawValues inputRowListPlusRawValues = iterator.next();
final List<Map<String, Object>> rawColumnsList = inputRowListPlusRawValues.getRawValuesList();
final ParseException parseException = inputRowListPlusRawValues.getParseException();
if (parseException != null) {
if (rawColumnsList != null) {
// add all rows to response
responseRows.addAll(rawColumnsList.stream()
.map(rawColumns -> new SamplerResponseRow(
rawColumns,
null,
true,
parseException.getMessage()
))
.collect(Collectors.toList()));
} else {
// no data parsed, add one response row
responseRows.add(new SamplerResponseRow(null, null, true, parseException.getMessage()));
}
continue;
}
List<InputRow> inputRows = inputRowListPlusRawValues.getInputRows();
if (inputRows == null) {
continue;
}
for (int i = 0; i < inputRows.size(); i++) {
// InputRowListPlusRawValues guarantees the size of rawColumnsList and inputRows are the same
Map<String, Object> rawColumns = rawColumnsList == null ? null : rawColumnsList.get(i);
InputRow row = inputRows.get(i);
//keep the index of the row to be added to responseRows for further use
final int rowIndex = responseRows.size();
IncrementalIndexAddResult addResult = index.add(new SamplerInputRow(row, rowIndex), true);
if (addResult.hasParseException()) {
responseRows.add(new SamplerResponseRow(
rawColumns,
null,
true,
addResult.getParseException().getMessage()
));
} else {
// store the raw value; will be merged with the data from the IncrementalIndex later
responseRows.add(new SamplerResponseRow(rawColumns, null, null, null));
numRowsIndexed++;
}
}
}
final List<String> columnNames = index.getColumnNames();
columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
for (Row row : index) {
Map<String, Object> parsed = new LinkedHashMap<>();
parsed.put(ColumnHolder.TIME_COLUMN_NAME, row.getTimestampFromEpoch());
columnNames.stream()
.filter(k -> !ColumnHolder.TIME_COLUMN_NAME.equals(k))
.forEach(k -> parsed.put(k, row.getRaw(k)));
Number sortKey = row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
if (sortKey != null) {
SamplerResponseRow theRow = responseRows.get(sortKey.intValue()).withParsed(parsed);
responseRows.set(sortKey.intValue(), theRow);
}
}
// make sure size of responseRows meets the input
if (responseRows.size() > nonNullSamplerConfig.getNumRows()) {
responseRows = responseRows.subList(0, nonNullSamplerConfig.getNumRows());
}
if (nonNullSamplerConfig.getMaxClientResponseBytes() > 0) {
long estimatedResponseSize = 0;
boolean limited = false;
int rowCounter = 0;
int parsedCounter = 0;
for (SamplerResponseRow row : responseRows) {
rowCounter++;
if (row.getInput() != null) {
parsedCounter++;
}
estimatedResponseSize += jsonMapper.writeValueAsBytes(row).length;
if (estimatedResponseSize > nonNullSamplerConfig.getMaxClientResponseBytes()) {
limited = true;
break;
}
}
if (limited) {
responseRows = responseRows.subList(0, rowCounter);
numRowsIndexed = parsedCounter;
}
}
int numRowsRead = responseRows.size();
List<DimensionSchema> logicalDimensionSchemas = new ArrayList<>();
List<DimensionSchema> physicalDimensionSchemas = new ArrayList<>();
RowSignature.Builder signatureBuilder = RowSignature.builder();
for (final String dimensionName : index.getDimensionNames(true)) {
if (ColumnHolder.TIME_COLUMN_NAME.equals(dimensionName)) {
signatureBuilder.addTimeColumn();
} else if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(dimensionName)) {
final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimensionName);
signatureBuilder.add(dimensionDesc.getName(), ColumnType.fromCapabilities(dimensionDesc.getCapabilities()));
// use explicitly specified dimension schema if it exists
if (dataSchema != null &&
dataSchema.getDimensionsSpec() != null &&
dataSchema.getDimensionsSpec().getSchema(dimensionDesc.getName()) != null) {
logicalDimensionSchemas.add(dataSchema.getDimensionsSpec().getSchema(dimensionDesc.getName()));
} else {
logicalDimensionSchemas.add(
DimensionSchema.getDefaultSchemaForBuiltInType(
dimensionDesc.getName(),
dimensionDesc.getCapabilities()
)
);
}
physicalDimensionSchemas.add(
dimensionDesc.getIndexer().getFormat().getColumnSchema(dimensionDesc.getName())
);
}
}
for (AggregatorFactory aggregatorFactory : index.getMetricAggs()) {
if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(aggregatorFactory.getName())) {
signatureBuilder.add(
aggregatorFactory.getName(),
ColumnType.fromCapabilities(index.getColumnCapabilities(aggregatorFactory.getName()))
);
}
}
return new SamplerResponse(
numRowsRead,
numRowsIndexed,
logicalDimensionSchemas,
physicalDimensionSchemas,
signatureBuilder.build(),
responseRows.stream()
.filter(Objects::nonNull)
.filter(x -> x.getParsed() != null || x.isUnparseable() != null)
.collect(Collectors.toList())
);
}
}
catch (Exception e) {
throw new SamplerException(e, "Failed to sample data: %s", e.getMessage());
}
}