in web-console/src/helpers/spec-conversion.ts [78:369]
export function convertSpecToSql(spec: any): QueryWithContext {
if (!oneOf(spec.type, 'index_parallel', 'index', 'index_hadoop')) {
throw new Error('Only index_parallel, index, and index_hadoop specs are supported');
}
spec = upgradeSpec(spec, true);
const context: QueryContext = {};
const indexSpec = deepGet(spec, 'spec.tuningConfig.indexSpec');
if (indexSpec) {
context.indexSpec = indexSpec;
}
const lines: string[] = [`-- This SQL query was auto generated from an ingestion spec`];
lines.push(`SET arrayIngestMode = ${L(getArrayIngestMode(spec, DEFAULT_ARRAY_INGEST_MODE))};`);
const forceSegmentSortByTime = deepGet(
spec,
'spec.dataSchema.dimensionsSpec.forceSegmentSortByTime',
);
if (typeof forceSegmentSortByTime !== 'undefined') {
lines.push(`SET forceSegmentSortByTime = ${L(forceSegmentSortByTime)};`);
}
const maxNumConcurrentSubTasks = deepGet(spec, 'spec.tuningConfig.maxNumConcurrentSubTasks');
if (maxNumConcurrentSubTasks > 1) {
lines.push(`SET maxNumTasks = ${maxNumConcurrentSubTasks + 1};`);
}
const maxParseExceptions = deepGet(spec, 'spec.tuningConfig.maxParseExceptions');
if (typeof maxParseExceptions === 'number') {
lines.push(`SET maxParseExceptions = ${maxParseExceptions};`);
}
const rollup = deepGet(spec, 'spec.dataSchema.granularitySpec.rollup') ?? true;
if (nonEmptyArray(deepGet(spec, 'spec.dataSchema.dimensionsSpec.spatialDimensions'))) {
throw new Error(`spatialDimensions are not currently supported in SQL-based ingestion`);
}
const timestampSpec: TimestampSpec = deepGet(spec, 'spec.dataSchema.timestampSpec');
if (!timestampSpec) throw new Error(`spec.dataSchema.timestampSpec is not defined`);
const specDimensions: (string | DimensionSpec)[] = deepGet(
spec,
'spec.dataSchema.dimensionsSpec.dimensions',
);
if (!Array.isArray(specDimensions)) {
throw new Error(`spec.dataSchema.dimensionsSpec.dimensions must be an array`);
}
const dimensions = specDimensions.map(inflateDimensionSpec);
let columnDeclarations: SqlColumnDeclaration[] = dimensions.map(d =>
SqlColumnDeclaration.create(d.name, dimensionSpecToSqlType(d)),
);
const metricsSpec = deepGet(spec, 'spec.dataSchema.metricsSpec');
if (Array.isArray(metricsSpec)) {
columnDeclarations = columnDeclarations.concat(
filterMap(metricsSpec, metricSpec =>
metricSpec.fieldName
? SqlColumnDeclaration.create(
metricSpec.fieldName,
SqlType.fromNativeType(metricSpecTypeToNativeDataInputType(metricSpec.type)),
)
: undefined,
),
);
}
columnDeclarations = dedupe(columnDeclarations, d => d.getColumnName());
const transforms: Transform[] = deepGet(spec, 'spec.dataSchema.transformSpec.transforms') || [];
if (!Array.isArray(transforms)) {
throw new Error(`spec.dataSchema.transformSpec.transforms is not an array`);
}
let timeExpression: string;
const timestampColumnName = timestampSpec.column || 'timestamp';
const timeTransform = transforms.find(t => t.name === TIME_COLUMN);
if (timestampColumnName === NO_SUCH_COLUMN) {
timeExpression = timestampSpec.missingValue
? `TIME_PARSE(${L(timestampSpec.missingValue)})`
: `TIMESTAMP '1970-01-01'`;
} else {
const timestampColumn = C(timestampColumnName);
const format = timestampSpec.format || 'auto';
if (timeTransform) {
timeExpression = `REWRITE_[${timeTransform.expression}]_TO_SQL`;
} else if (timestampColumnName === TIME_COLUMN) {
timeExpression = String(timestampColumn);
columnDeclarations.unshift(SqlColumnDeclaration.create(timestampColumnName, SqlType.BIGINT));
} else {
let timestampColumnType: SqlType;
switch (format) {
case 'auto':
timestampColumnType = SqlType.VARCHAR;
timeExpression = `CASE WHEN CAST(${timestampColumn} AS BIGINT) > 0 THEN MILLIS_TO_TIMESTAMP(CAST(${timestampColumn} AS BIGINT)) ELSE TIME_PARSE(TRIM(${timestampColumn})) END`;
break;
case 'iso':
timestampColumnType = SqlType.VARCHAR;
timeExpression = `TIME_PARSE(${timestampColumn})`;
break;
case 'posix':
timestampColumnType = SqlType.BIGINT;
timeExpression = `MILLIS_TO_TIMESTAMP(${timestampColumn} * 1000)`;
break;
case 'millis':
timestampColumnType = SqlType.BIGINT;
timeExpression = `MILLIS_TO_TIMESTAMP(${timestampColumn})`;
break;
case 'micro':
timestampColumnType = SqlType.BIGINT;
timeExpression = `MILLIS_TO_TIMESTAMP(${timestampColumn} / 1000)`;
break;
case 'nano':
timestampColumnType = SqlType.BIGINT;
timeExpression = `MILLIS_TO_TIMESTAMP(${timestampColumn} / 1000000)`;
break;
default:
timestampColumnType = SqlType.VARCHAR;
timeExpression = `TIME_PARSE(${timestampColumn}, ${L(format)})`;
break;
}
columnDeclarations.unshift(
SqlColumnDeclaration.create(timestampColumnName, timestampColumnType),
);
}
if (timestampSpec.missingValue) {
timeExpression = `COALESCE(${timeExpression}, TIME_PARSE(${L(timestampSpec.missingValue)}))`;
}
timeExpression = convertQueryGranularity(
timeExpression,
deepGet(spec, 'spec.dataSchema.granularitySpec.queryGranularity'),
);
}
lines.push(`SET finalizeAggregations = FALSE;`, `SET groupByEnableMultiValueUnnesting = FALSE;`);
const dataSource = deepGet(spec, 'spec.dataSchema.dataSource');
if (typeof dataSource !== 'string') throw new Error(`spec.dataSchema.dataSource is not a string`);
if (deepGet(spec, 'spec.ioConfig.appendToExisting')) {
lines.push(`INSERT INTO ${T(dataSource)}`);
} else {
const overwrite = deepGet(spec, 'spec.ioConfig.dropExisting')
? 'WHERE ' +
SqlExpression.fromTimeExpressionAndInterval(
C('__time'),
deepGet(spec, 'spec.dataSchema.granularitySpec.intervals'),
)
: 'ALL';
lines.push(`REPLACE INTO ${T(dataSource)} OVERWRITE ${overwrite}`);
}
let inputSource: any;
if (oneOf(spec.type, 'index_parallel', 'index')) {
inputSource = deepGet(spec, 'spec.ioConfig.inputSource');
if (!inputSource) throw new Error(`spec.ioConfig.inputSource is not defined`);
} else {
// index_hadoop
const inputSpec = deepGet(spec, 'spec.ioConfig.inputSpec');
if (!inputSpec) throw new Error(`spec.ioConfig.inputSpec is not defined`);
if (inputSpec.type !== 'static') {
throw new Error(`can only convert when spec.ioConfig.inputSpec.type = 'static'`);
}
const paths = inputSpec.paths.split(',');
const firstPath = paths[0];
if (firstPath.startsWith('s3://')) {
inputSource = { type: 's3', uris: paths };
} else if (firstPath.startsWith('gs://')) {
inputSource = { type: 'google', uris: paths };
} else if (firstPath.startsWith('hdfs://')) {
inputSource = { type: 'hdfs', paths };
} else {
throw new Error('unsupported');
}
}
if (inputSource.type === 'druid') {
lines.push(
`WITH ${SOURCE_TABLE} AS (`,
` SELECT *`,
` FROM ${T(inputSource.dataSource)}`,
` WHERE ${SqlExpression.fromTimeExpressionAndInterval(C('__time'), inputSource.interval)}`,
')',
);
} else {
lines.push(
`WITH ${SOURCE_TABLE} AS (SELECT * FROM TABLE(`,
` EXTERN(`,
` ${L(JSONBig.stringify(inputSource))},`,
);
const inputFormat = deepGet(spec, 'spec.ioConfig.inputFormat');
if (!inputFormat) throw new Error(`spec.ioConfig.inputFormat is not defined`);
lines.push(
` ${L(JSONBig.stringify(inputFormat))}`,
` )`,
`) EXTEND (${columnDeclarations.join(', ')}))`,
);
}
lines.push(`SELECT`);
if (transforms.length) {
lines.push(
` --:ISSUE: The spec contained transforms that could not be automatically converted.`,
);
}
const dimensionExpressions = [
` ${timeExpression} AS "__time",${
timeTransform ? ` --:ISSUE: Transform for __time could not be converted` : ''
}`,
].concat(
dimensions.flatMap((dimension: DimensionSpec) => {
const dimensionName = dimension.name;
const relevantTransform = transforms.find(t => t.name === dimensionName);
return ` ${
relevantTransform ? `REWRITE_[${relevantTransform.expression}]_TO_SQL AS ` : ''
}${C(dimensionName)},${
relevantTransform ? ` --:ISSUE: Transform for dimension could not be converted` : ''
}`;
}),
);
const selectExpressions = dimensionExpressions.concat(
Array.isArray(metricsSpec)
? metricsSpec.map(metricSpec => ` ${metricSpecToSelect(metricSpec)},`)
: [],
);
// Remove trailing comma from last expression
selectExpressions[selectExpressions.length - 1] = selectExpressions[selectExpressions.length - 1]
.replace(/,$/, '')
.replace(/,(\s+--)/, '$1');
lines.push(selectExpressions.join('\n'));
lines.push(`FROM ${SOURCE_TABLE}`);
const filter = deepGet(spec, 'spec.dataSchema.transformSpec.filter');
if (filter) {
try {
lines.push(`WHERE ${convertFilter(filter)}`);
} catch {
lines.push(
`WHERE REWRITE_[${JSONBig.stringify(
filter,
)}]_TO_SQL --:ISSUE: The spec contained a filter that could not be automatically converted, please convert it manually`,
);
}
}
if (rollup) {
lines.push(`GROUP BY ${dimensionExpressions.map((_, i) => i + 1).join(', ')}`);
}
const segmentGranularity = deepGet(spec, 'spec.dataSchema.granularitySpec.segmentGranularity');
if (typeof segmentGranularity !== 'string') {
throw new Error(`spec.dataSchema.granularitySpec.segmentGranularity is not a string`);
}
lines.push(
`PARTITIONED BY ${
segmentGranularity.toLowerCase() === 'all' ? 'ALL TIME' : segmentGranularity.toUpperCase()
}`,
);
const partitionsSpec = deepGet(spec, 'spec.tuningConfig.partitionsSpec') || {};
const partitionDimensions =
partitionsSpec.partitionDimensions ||
(partitionsSpec.partitionDimension ? [partitionsSpec.partitionDimension] : undefined);
if (Array.isArray(partitionDimensions)) {
lines.push(`CLUSTERED BY ${partitionDimensions.map(d => C(d)).join(', ')}`);
}
return {
queryString: lines.join('\n'),
queryContext: context,
};
}