integration/presto/src/main/prestodb/org/apache/carbondata/presto/impl/CarbonTableReader.java [95:371]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class CarbonTableReader {

  public CarbonTableConfig config;

  /**
   * A cache for Carbon reader, with this cache,
   * metadata of a table is only read from file system once.
   */
  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;

  /**
   * unique query id used for query statistics
   */
  private String queryId;

  /**
   * presto cli query id
   */
  private String prestoQueryId;

  /**
   * Logger instance
   */
  private static final Logger LOGGER =
      LogServiceFactory.getLogService(CarbonTableReader.class.getName());

  /**
   * List Of Schemas
   */
  private List<String> schemaNames = new ArrayList<>();

  @Inject public CarbonTableReader(CarbonTableConfig config) {
    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
    populateCarbonProperties();
  }

  /**
   * For presto worker node to initialize the metadata cache of a table.
   *
   * @param table the name of the table and schema.
   * @return
   */
  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
      Configuration config) {
    updateSchemaTables(table, config);
    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
      return parseCarbonMetadata(table, location, config);
    }
    return carbonTableCacheModel;
  }

  /**
   * Find all the tables under the schema store path (this.carbonFileList)
   * and cache all the table names in this.tableList. Notice that whenever this method
   * is called, it clears this.tableList and populate the list by reading the files.
   */
  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
    if (carbonTableCacheModel != null &&
        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
              .getSchemaFilePath(
                  carbonTable.getTablePath()),
          config).getLastModifiedTime();
      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
      if (!carbonTableCacheModel.isValid()) {
        // Invalidate indexes
        IndexStoreManager.getInstance()
            .clearIndex(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
      }
    }
  }

  /**
   * Read the metadata of the given table
   * and cache it in this.carbonCache (CarbonTableReader cache).
   *
   * @param table name of the given table.
   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
   */
  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
      Configuration config) {
    try {
      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
      if (cache != null) {
        return cache;
      }
      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
      synchronized (this) {
        // cache might be filled by another thread, so if filled use that cache.
        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
        if (cacheModel != null) {
          return cacheModel;
        }
        // Step 1: get store path of the table and cache it.
        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
        // If metadata folder exists, it is a transactional table
        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
        boolean isTransactionalTable = schemaFile.exists();
        org.apache.carbondata.format.TableInfo tableInfo;
        long modifiedTime = System.currentTimeMillis();
        if (isTransactionalTable) {
          //Step 2: read the metadata (tableInfo) of the table.
          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
            // TBase is used to read and write thrift objects.
            // TableInfo is a kind of TBase used to read and write table information.
            // TableInfo is generated by thrift,
            // see schema.thrift under format/src/main/thrift for details.
            @Override
            public TBase create() {
              return new org.apache.carbondata.format.TableInfo();
            }
          };
          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
          thriftReader.open();
          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
          thriftReader.close();
          modifiedTime = schemaFile.getLastModifiedTime();
        } else {
          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
        }
        // Step 3: convert format level TableInfo to code level TableInfo
        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
        // wrapperTableInfo is the code level information of a table in carbondata core,
        // different from the Thrift TableInfo.
        TableInfo wrapperTableInfo = schemaConverter
            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
                tablePath);
        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
        // Step 4: Load metadata info into CarbonMetadata
        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
        refreshIndexInfo(carbonTable, config);
        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
        // cache the table
        carbonCache.get().put(table, cache);
        cache.setCarbonTable(carbonTable);
      }
      return cache;
    } catch (Exception ex) {
      throw new RuntimeException(ex);
    }
  }

  private void refreshIndexInfo(CarbonTable carbonTable, Configuration config) {
    Map<String, Map<String, Map<String, String>>> indexTableMap = new ConcurrentHashMap<>();
    String indexInfo = config.get("indexInfo", IndexTableInfo.toGson(new IndexTableInfo[0]));
    String parentTableName = config.get("parentTableName", "");
    String parentTableId = config.get("parentTableId", "");
    String parentTablePath = config.get("parentTablePath", "");
    boolean isIndexTable = Boolean.getBoolean(config.get("isIndexTable", "false"));
    IndexTableInfo[] indexTableInfos = IndexTableInfo.fromGson(indexInfo);
    for (IndexTableInfo indexTableInfo : indexTableInfos) {
      Map<String, String> indexProperties = indexTableInfo.getIndexProperties();
      String indexProvider;
      if (indexProperties != null) {
        indexProvider = indexProperties.get(CarbonCommonConstants.INDEX_PROVIDER);
      } else {
        // in case if SI table has been created before the change CARBONDATA-3765,
        // indexProperties variable will not be present. On direct upgrade of SI store,
        // indexProperties will be null, in that case, create indexProperties from indexCols
        // For details, refer
        // {@link org.apache.spark.sql.secondaryindex.hive.CarbonInternalMetastore#refreshIndexInfo}
        indexProperties = new HashMap<>();
        indexProperties.put(CarbonCommonConstants.INDEX_COLUMNS,
            String.join(",", indexTableInfo.getIndexCols()));
        indexProvider = IndexType.SI.getIndexProviderName();
        indexProperties.put(CarbonCommonConstants.INDEX_PROVIDER, indexProvider);
        indexProperties.put(CarbonCommonConstants.INDEX_STATUS, IndexStatus.DISABLED.name());
      }
      if (indexTableMap.get(indexProvider) == null) {
        Map<String, Map<String, String>> indexTableInfoMap = new HashMap<>();
        indexTableInfoMap.put(indexTableInfo.getTableName(), indexProperties);
        indexTableMap.put(indexProvider, indexTableInfoMap);
      } else {
        indexTableMap.get(indexProvider).put(indexTableInfo.getTableName(), indexProperties);
      }
    }
    IndexMetadata indexMetadata =
        new IndexMetadata(indexTableMap, parentTableName, isIndexTable, parentTablePath,
            parentTableId);
    try {
      carbonTable.getTableInfo().getFactTable().getTableProperties()
          .put(carbonTable.getCarbonTableIdentifier().getTableId(), indexMetadata.serialize());
    } catch (IOException e) {
      LOGGER.error("Error serializing index metadata", e);
    }
  }

  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
    if (cache != null && cache.isValid()) {
      return cache;
    }
    return null;
  }

  /**
   * Get a carbon muti-block input splits
   *
   * @param tableCacheModel cached table
   * @param filters carbonData filters
   * @param filteredPartitions matched partitionSpec for the filter
   * @param config hadoop conf
   * @return list of multiblock split
   * @throws IOException
   */
  public List<CarbonLocalMultiBlockSplit> getInputSplits(
      CarbonTableCacheModel tableCacheModel,
      Expression filters,
      List<PartitionSpec> filteredPartitions,
      Configuration config) throws IOException {
    List<CarbonLocalInputSplit> result = new ArrayList<>();
    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
    config.set("presto.cli.query.id", prestoQueryId);
    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
    config.set("query.id", queryId);
    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
    if (CarbonProperties.getInstance().isCoarseGrainSecondaryIndex(tableInfo.getDatabaseName(),
        tableInfo.getFactTable().getTableName(), "true")) {
      CarbonInputFormat
          .checkAndSetSecondaryIndexPruning(carbonTable.getTableInfo(), filters, config);
    }

    JobConf jobConf = new JobConf(config);
    try {
      CarbonTableInputFormat.setTableInfo(config, tableInfo);
      CarbonTableInputFormat<Object> carbonTableInputFormat =
          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
              new IndexFilter(carbonTable, filters, true), filteredPartitions);
      Job job = Job.getInstance(jobConf);
      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
      Gson gson = new Gson();
      if (splits != null && splits.size() > 0) {
        for (InputSplit inputSplit : splits) {
          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
              gson.toJson(carbonInputSplit.getDetailInfo()),
              carbonInputSplit.getFileFormat().ordinal()));
        }
        // Use block distribution
        List<List<CarbonLocalInputSplit>> inputSplits =
            new ArrayList<>(result.stream().collect(Collectors.groupingBy(carbonInput -> {
              if (FileFormat.ROW_V1.equals(carbonInput.getFileFormat())) {
                return carbonInput.getSegmentId().concat(carbonInput.getPath())
                    .concat(carbonInput.getStart() + "");
              }
              return carbonInput.getSegmentId().concat(carbonInput.getPath());
            })).values());
        // TODO : try to optimize the below loic as it may slowdown for huge splits
        for (int j = 0; j < inputSplits.size(); j++) {
          multiBlockSplitList.add(new CarbonLocalMultiBlockSplit(inputSplits.get(j),
              inputSplits.get(j).stream().flatMap(f -> Arrays.stream(getLocations(f))).distinct()
                  .toArray(String[]::new)));
        }
        LOGGER.error("Size fo MultiblockList   " + multiBlockSplitList.size());
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    return multiBlockSplitList;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java [90:366]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class CarbonTableReader {

  public CarbonTableConfig config;

  /**
   * A cache for Carbon reader, with this cache,
   * metadata of a table is only read from file system once.
   */
  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;

  /**
   * unique query id used for query statistics
   */
  private String queryId;

  /**
   * presto cli query id
   */
  private String prestoQueryId;

  /**
   * Logger instance
   */
  private static final Logger LOGGER =
      LogServiceFactory.getLogService(CarbonTableReader.class.getName());

  /**
   * List Of Schemas
   */
  private List<String> schemaNames = new ArrayList<>();

  @Inject public CarbonTableReader(CarbonTableConfig config) {
    this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
    populateCarbonProperties();
  }

  /**
   * For presto worker node to initialize the metadata cache of a table.
   *
   * @param table the name of the table and schema.
   * @return
   */
  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
      Configuration config) {
    updateSchemaTables(table, config);
    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
      return parseCarbonMetadata(table, location, config);
    }
    return carbonTableCacheModel;
  }

  /**
   * Find all the tables under the schema store path (this.carbonFileList)
   * and cache all the table names in this.tableList. Notice that whenever this method
   * is called, it clears this.tableList and populate the list by reading the files.
   */
  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
    if (carbonTableCacheModel != null &&
        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
              .getSchemaFilePath(
                  carbonTable.getTablePath()),
          config).getLastModifiedTime();
      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
      if (!carbonTableCacheModel.isValid()) {
        // Invalidate indexes
        IndexStoreManager.getInstance()
            .clearIndex(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
      }
    }
  }

  /**
   * Read the metadata of the given table
   * and cache it in this.carbonCache (CarbonTableReader cache).
   *
   * @param table name of the given table.
   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
   */
  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
      Configuration config) {
    try {
      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
      if (cache != null) {
        return cache;
      }
      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
      synchronized (this) {
        // cache might be filled by another thread, so if filled use that cache.
        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
        if (cacheModel != null) {
          return cacheModel;
        }
        // Step 1: get store path of the table and cache it.
        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
        // If metadata folder exists, it is a transactional table
        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
        boolean isTransactionalTable = schemaFile.exists();
        org.apache.carbondata.format.TableInfo tableInfo;
        long modifiedTime = System.currentTimeMillis();
        if (isTransactionalTable) {
          //Step 2: read the metadata (tableInfo) of the table.
          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
            // TBase is used to read and write thrift objects.
            // TableInfo is a kind of TBase used to read and write table information.
            // TableInfo is generated by thrift,
            // see schema.thrift under format/src/main/thrift for details.
            @Override
            public TBase create() {
              return new org.apache.carbondata.format.TableInfo();
            }
          };
          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
          thriftReader.open();
          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
          thriftReader.close();
          modifiedTime = schemaFile.getLastModifiedTime();
        } else {
          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
        }
        // Step 3: convert format level TableInfo to code level TableInfo
        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
        // wrapperTableInfo is the code level information of a table in carbondata core,
        // different from the Thrift TableInfo.
        TableInfo wrapperTableInfo = schemaConverter
            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
                tablePath);
        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
        // Step 4: Load metadata info into CarbonMetadata
        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
        refreshIndexInfo(carbonTable, config);
        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
        // cache the table
        carbonCache.get().put(table, cache);
        cache.setCarbonTable(carbonTable);
      }
      return cache;
    } catch (Exception ex) {
      throw new RuntimeException(ex);
    }
  }

  private void refreshIndexInfo(CarbonTable carbonTable, Configuration config) {
    Map<String, Map<String, Map<String, String>>> indexTableMap = new ConcurrentHashMap<>();
    String indexInfo = config.get("indexInfo", IndexTableInfo.toGson(new IndexTableInfo[0]));
    String parentTableName = config.get("parentTableName", "");
    String parentTableId = config.get("parentTableId", "");
    String parentTablePath = config.get("parentTablePath", "");
    boolean isIndexTable = Boolean.getBoolean(config.get("isIndexTable", "false"));
    IndexTableInfo[] indexTableInfos = IndexTableInfo.fromGson(indexInfo);
    for (IndexTableInfo indexTableInfo : indexTableInfos) {
      Map<String, String> indexProperties = indexTableInfo.getIndexProperties();
      String indexProvider;
      if (indexProperties != null) {
        indexProvider = indexProperties.get(CarbonCommonConstants.INDEX_PROVIDER);
      } else {
        // in case if SI table has been created before the change CARBONDATA-3765,
        // indexProperties variable will not be present. On direct upgrade of SI store,
        // indexProperties will be null, in that case, create indexProperties from indexCols
        // For details, refer
        // {@link org.apache.spark.sql.secondaryindex.hive.CarbonInternalMetastore#refreshIndexInfo}
        indexProperties = new HashMap<>();
        indexProperties.put(CarbonCommonConstants.INDEX_COLUMNS,
            String.join(",", indexTableInfo.getIndexCols()));
        indexProvider = IndexType.SI.getIndexProviderName();
        indexProperties.put(CarbonCommonConstants.INDEX_PROVIDER, indexProvider);
        indexProperties.put(CarbonCommonConstants.INDEX_STATUS, IndexStatus.DISABLED.name());
      }
      if (indexTableMap.get(indexProvider) == null) {
        Map<String, Map<String, String>> indexTableInfoMap = new HashMap<>();
        indexTableInfoMap.put(indexTableInfo.getTableName(), indexProperties);
        indexTableMap.put(indexProvider, indexTableInfoMap);
      } else {
        indexTableMap.get(indexProvider).put(indexTableInfo.getTableName(), indexProperties);
      }
    }
    IndexMetadata indexMetadata =
        new IndexMetadata(indexTableMap, parentTableName, isIndexTable, parentTablePath,
            parentTableId);
    try {
      carbonTable.getTableInfo().getFactTable().getTableProperties()
          .put(carbonTable.getCarbonTableIdentifier().getTableId(), indexMetadata.serialize());
    } catch (IOException e) {
      LOGGER.error("Error serializing index metadata", e);
    }
  }

  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
    if (cache != null && cache.isValid()) {
      return cache;
    }
    return null;
  }

  /**
   * Get a carbon muti-block input splits
   *
   * @param tableCacheModel cached table
   * @param filters carbonData filters
   * @param filteredPartitions matched partitionSpec for the filter
   * @param config hadoop conf
   * @return list of multiblock split
   * @throws IOException
   */
  public List<CarbonLocalMultiBlockSplit> getInputSplits(
      CarbonTableCacheModel tableCacheModel,
      Expression filters,
      List<PartitionSpec> filteredPartitions,
      Configuration config) throws IOException {
    List<CarbonLocalInputSplit> result = new ArrayList<>();
    List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
    config.set("presto.cli.query.id", prestoQueryId);
    config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
    String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
    config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
    config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
    config.set("query.id", queryId);
    CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable());
    CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
    if (CarbonProperties.getInstance().isCoarseGrainSecondaryIndex(tableInfo.getDatabaseName(),
        tableInfo.getFactTable().getTableName(), "true")) {
      CarbonInputFormat
          .checkAndSetSecondaryIndexPruning(carbonTable.getTableInfo(), filters, config);
    }

    JobConf jobConf = new JobConf(config);
    try {
      CarbonTableInputFormat.setTableInfo(config, tableInfo);
      CarbonTableInputFormat<Object> carbonTableInputFormat =
          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
              new IndexFilter(carbonTable, filters, true), filteredPartitions);
      Job job = Job.getInstance(jobConf);
      List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
      Gson gson = new Gson();
      if (splits != null && splits.size() > 0) {
        for (InputSplit inputSplit : splits) {
          CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
          result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(),
              carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
              carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
              carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
              carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
              gson.toJson(carbonInputSplit.getDetailInfo()),
              carbonInputSplit.getFileFormat().ordinal()));
        }
        // Use block distribution
        List<List<CarbonLocalInputSplit>> inputSplits =
            new ArrayList<>(result.stream().collect(Collectors.groupingBy(carbonInput -> {
              if (FileFormat.ROW_V1.equals(carbonInput.getFileFormat())) {
                return carbonInput.getSegmentId().concat(carbonInput.getPath())
                    .concat(carbonInput.getStart() + "");
              }
              return carbonInput.getSegmentId().concat(carbonInput.getPath());
            })).values());
        // TODO : try to optimize the below loic as it may slowdown for huge splits
        for (int j = 0; j < inputSplits.size(); j++) {
          multiBlockSplitList.add(new CarbonLocalMultiBlockSplit(inputSplits.get(j),
              inputSplits.get(j).stream().flatMap(f -> Arrays.stream(getLocations(f))).distinct()
                  .toArray(String[]::new)));
        }
        LOGGER.error("Size fo MultiblockList   " + multiBlockSplitList.size());
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    return multiBlockSplitList;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



