export function convertSpecToSql()

in web-console/src/helpers/spec-conversion.ts [78:369]


export function convertSpecToSql(spec: any): QueryWithContext {
  if (!oneOf(spec.type, 'index_parallel', 'index', 'index_hadoop')) {
    throw new Error('Only index_parallel, index, and index_hadoop specs are supported');
  }
  spec = upgradeSpec(spec, true);

  const context: QueryContext = {};

  const indexSpec = deepGet(spec, 'spec.tuningConfig.indexSpec');
  if (indexSpec) {
    context.indexSpec = indexSpec;
  }

  const lines: string[] = [`-- This SQL query was auto generated from an ingestion spec`];

  lines.push(`SET arrayIngestMode = ${L(getArrayIngestMode(spec, DEFAULT_ARRAY_INGEST_MODE))};`);

  const forceSegmentSortByTime = deepGet(
    spec,
    'spec.dataSchema.dimensionsSpec.forceSegmentSortByTime',
  );
  if (typeof forceSegmentSortByTime !== 'undefined') {
    lines.push(`SET forceSegmentSortByTime = ${L(forceSegmentSortByTime)};`);
  }

  const maxNumConcurrentSubTasks = deepGet(spec, 'spec.tuningConfig.maxNumConcurrentSubTasks');
  if (maxNumConcurrentSubTasks > 1) {
    lines.push(`SET maxNumTasks = ${maxNumConcurrentSubTasks + 1};`);
  }

  const maxParseExceptions = deepGet(spec, 'spec.tuningConfig.maxParseExceptions');
  if (typeof maxParseExceptions === 'number') {
    lines.push(`SET maxParseExceptions = ${maxParseExceptions};`);
  }

  const rollup = deepGet(spec, 'spec.dataSchema.granularitySpec.rollup') ?? true;

  if (nonEmptyArray(deepGet(spec, 'spec.dataSchema.dimensionsSpec.spatialDimensions'))) {
    throw new Error(`spatialDimensions are not currently supported in SQL-based ingestion`);
  }

  const timestampSpec: TimestampSpec = deepGet(spec, 'spec.dataSchema.timestampSpec');
  if (!timestampSpec) throw new Error(`spec.dataSchema.timestampSpec is not defined`);

  const specDimensions: (string | DimensionSpec)[] = deepGet(
    spec,
    'spec.dataSchema.dimensionsSpec.dimensions',
  );
  if (!Array.isArray(specDimensions)) {
    throw new Error(`spec.dataSchema.dimensionsSpec.dimensions must be an array`);
  }
  const dimensions = specDimensions.map(inflateDimensionSpec);

  let columnDeclarations: SqlColumnDeclaration[] = dimensions.map(d =>
    SqlColumnDeclaration.create(d.name, dimensionSpecToSqlType(d)),
  );

  const metricsSpec = deepGet(spec, 'spec.dataSchema.metricsSpec');
  if (Array.isArray(metricsSpec)) {
    columnDeclarations = columnDeclarations.concat(
      filterMap(metricsSpec, metricSpec =>
        metricSpec.fieldName
          ? SqlColumnDeclaration.create(
              metricSpec.fieldName,
              SqlType.fromNativeType(metricSpecTypeToNativeDataInputType(metricSpec.type)),
            )
          : undefined,
      ),
    );
  }

  columnDeclarations = dedupe(columnDeclarations, d => d.getColumnName());

  const transforms: Transform[] = deepGet(spec, 'spec.dataSchema.transformSpec.transforms') || [];
  if (!Array.isArray(transforms)) {
    throw new Error(`spec.dataSchema.transformSpec.transforms is not an array`);
  }

  let timeExpression: string;
  const timestampColumnName = timestampSpec.column || 'timestamp';
  const timeTransform = transforms.find(t => t.name === TIME_COLUMN);
  if (timestampColumnName === NO_SUCH_COLUMN) {
    timeExpression = timestampSpec.missingValue
      ? `TIME_PARSE(${L(timestampSpec.missingValue)})`
      : `TIMESTAMP '1970-01-01'`;
  } else {
    const timestampColumn = C(timestampColumnName);
    const format = timestampSpec.format || 'auto';
    if (timeTransform) {
      timeExpression = `REWRITE_[${timeTransform.expression}]_TO_SQL`;
    } else if (timestampColumnName === TIME_COLUMN) {
      timeExpression = String(timestampColumn);
      columnDeclarations.unshift(SqlColumnDeclaration.create(timestampColumnName, SqlType.BIGINT));
    } else {
      let timestampColumnType: SqlType;
      switch (format) {
        case 'auto':
          timestampColumnType = SqlType.VARCHAR;
          timeExpression = `CASE WHEN CAST(${timestampColumn} AS BIGINT) > 0 THEN MILLIS_TO_TIMESTAMP(CAST(${timestampColumn} AS BIGINT)) ELSE TIME_PARSE(TRIM(${timestampColumn})) END`;
          break;

        case 'iso':
          timestampColumnType = SqlType.VARCHAR;
          timeExpression = `TIME_PARSE(${timestampColumn})`;
          break;

        case 'posix':
          timestampColumnType = SqlType.BIGINT;
          timeExpression = `MILLIS_TO_TIMESTAMP(${timestampColumn} * 1000)`;
          break;

        case 'millis':
          timestampColumnType = SqlType.BIGINT;
          timeExpression = `MILLIS_TO_TIMESTAMP(${timestampColumn})`;
          break;

        case 'micro':
          timestampColumnType = SqlType.BIGINT;
          timeExpression = `MILLIS_TO_TIMESTAMP(${timestampColumn} / 1000)`;
          break;

        case 'nano':
          timestampColumnType = SqlType.BIGINT;
          timeExpression = `MILLIS_TO_TIMESTAMP(${timestampColumn} / 1000000)`;
          break;

        default:
          timestampColumnType = SqlType.VARCHAR;
          timeExpression = `TIME_PARSE(${timestampColumn}, ${L(format)})`;
          break;
      }
      columnDeclarations.unshift(
        SqlColumnDeclaration.create(timestampColumnName, timestampColumnType),
      );
    }

    if (timestampSpec.missingValue) {
      timeExpression = `COALESCE(${timeExpression}, TIME_PARSE(${L(timestampSpec.missingValue)}))`;
    }

    timeExpression = convertQueryGranularity(
      timeExpression,
      deepGet(spec, 'spec.dataSchema.granularitySpec.queryGranularity'),
    );
  }

  lines.push(`SET finalizeAggregations = FALSE;`, `SET groupByEnableMultiValueUnnesting = FALSE;`);

  const dataSource = deepGet(spec, 'spec.dataSchema.dataSource');
  if (typeof dataSource !== 'string') throw new Error(`spec.dataSchema.dataSource is not a string`);

  if (deepGet(spec, 'spec.ioConfig.appendToExisting')) {
    lines.push(`INSERT INTO ${T(dataSource)}`);
  } else {
    const overwrite = deepGet(spec, 'spec.ioConfig.dropExisting')
      ? 'WHERE ' +
        SqlExpression.fromTimeExpressionAndInterval(
          C('__time'),
          deepGet(spec, 'spec.dataSchema.granularitySpec.intervals'),
        )
      : 'ALL';

    lines.push(`REPLACE INTO ${T(dataSource)} OVERWRITE ${overwrite}`);
  }

  let inputSource: any;
  if (oneOf(spec.type, 'index_parallel', 'index')) {
    inputSource = deepGet(spec, 'spec.ioConfig.inputSource');
    if (!inputSource) throw new Error(`spec.ioConfig.inputSource is not defined`);
  } else {
    // index_hadoop
    const inputSpec = deepGet(spec, 'spec.ioConfig.inputSpec');
    if (!inputSpec) throw new Error(`spec.ioConfig.inputSpec is not defined`);
    if (inputSpec.type !== 'static') {
      throw new Error(`can only convert when spec.ioConfig.inputSpec.type = 'static'`);
    }

    const paths = inputSpec.paths.split(',');
    const firstPath = paths[0];
    if (firstPath.startsWith('s3://')) {
      inputSource = { type: 's3', uris: paths };
    } else if (firstPath.startsWith('gs://')) {
      inputSource = { type: 'google', uris: paths };
    } else if (firstPath.startsWith('hdfs://')) {
      inputSource = { type: 'hdfs', paths };
    } else {
      throw new Error('unsupported');
    }
  }

  if (inputSource.type === 'druid') {
    lines.push(
      `WITH ${SOURCE_TABLE} AS (`,
      `  SELECT *`,
      `  FROM ${T(inputSource.dataSource)}`,
      `  WHERE ${SqlExpression.fromTimeExpressionAndInterval(C('__time'), inputSource.interval)}`,
      ')',
    );
  } else {
    lines.push(
      `WITH ${SOURCE_TABLE} AS (SELECT * FROM TABLE(`,
      `  EXTERN(`,
      `    ${L(JSONBig.stringify(inputSource))},`,
    );

    const inputFormat = deepGet(spec, 'spec.ioConfig.inputFormat');
    if (!inputFormat) throw new Error(`spec.ioConfig.inputFormat is not defined`);
    lines.push(
      `    ${L(JSONBig.stringify(inputFormat))}`,
      `  )`,
      `) EXTEND (${columnDeclarations.join(', ')}))`,
    );
  }

  lines.push(`SELECT`);

  if (transforms.length) {
    lines.push(
      `  --:ISSUE: The spec contained transforms that could not be automatically converted.`,
    );
  }

  const dimensionExpressions = [
    `  ${timeExpression} AS "__time",${
      timeTransform ? ` --:ISSUE: Transform for __time could not be converted` : ''
    }`,
  ].concat(
    dimensions.flatMap((dimension: DimensionSpec) => {
      const dimensionName = dimension.name;
      const relevantTransform = transforms.find(t => t.name === dimensionName);
      return `  ${
        relevantTransform ? `REWRITE_[${relevantTransform.expression}]_TO_SQL AS ` : ''
      }${C(dimensionName)},${
        relevantTransform ? ` --:ISSUE: Transform for dimension could not be converted` : ''
      }`;
    }),
  );

  const selectExpressions = dimensionExpressions.concat(
    Array.isArray(metricsSpec)
      ? metricsSpec.map(metricSpec => `  ${metricSpecToSelect(metricSpec)},`)
      : [],
  );

  // Remove trailing comma from last expression
  selectExpressions[selectExpressions.length - 1] = selectExpressions[selectExpressions.length - 1]
    .replace(/,$/, '')
    .replace(/,(\s+--)/, '$1');

  lines.push(selectExpressions.join('\n'));
  lines.push(`FROM ${SOURCE_TABLE}`);

  const filter = deepGet(spec, 'spec.dataSchema.transformSpec.filter');
  if (filter) {
    try {
      lines.push(`WHERE ${convertFilter(filter)}`);
    } catch {
      lines.push(
        `WHERE REWRITE_[${JSONBig.stringify(
          filter,
        )}]_TO_SQL --:ISSUE: The spec contained a filter that could not be automatically converted, please convert it manually`,
      );
    }
  }

  if (rollup) {
    lines.push(`GROUP BY ${dimensionExpressions.map((_, i) => i + 1).join(', ')}`);
  }

  const segmentGranularity = deepGet(spec, 'spec.dataSchema.granularitySpec.segmentGranularity');
  if (typeof segmentGranularity !== 'string') {
    throw new Error(`spec.dataSchema.granularitySpec.segmentGranularity is not a string`);
  }
  lines.push(
    `PARTITIONED BY ${
      segmentGranularity.toLowerCase() === 'all' ? 'ALL TIME' : segmentGranularity.toUpperCase()
    }`,
  );

  const partitionsSpec = deepGet(spec, 'spec.tuningConfig.partitionsSpec') || {};
  const partitionDimensions =
    partitionsSpec.partitionDimensions ||
    (partitionsSpec.partitionDimension ? [partitionsSpec.partitionDimension] : undefined);
  if (Array.isArray(partitionDimensions)) {
    lines.push(`CLUSTERED BY ${partitionDimensions.map(d => C(d)).join(', ')}`);
  }

  return {
    queryString: lines.join('\n'),
    queryContext: context,
  };
}