/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "VeloxPlanConverter.h"
#include <filesystem>

#include "compute/ResultIterator.h"
#include "config/GlutenConfig.h"
#include "operators/plannodes/RowVectorStream.h"
#include "velox/common/file/FileSystems.h"

namespace gluten {

using namespace facebook;

VeloxPlanConverter::VeloxPlanConverter(
    const std::vector<std::shared_ptr<ResultIterator>>& inputIters,
    velox::memory::MemoryPool* veloxPool,
    const std::unordered_map<std::string, std::string>& confMap,
    const std::optional<std::string> writeFilesTempPath,
    bool validationMode)
    : validationMode_(validationMode),
      substraitVeloxPlanConverter_(veloxPool, confMap, writeFilesTempPath, validationMode),
      pool_(veloxPool) {
  // avoid include RowVectorStream.h in SubstraitToVeloxPlan.cpp, it may cause redefinition of array abi.h.
  auto factory = [inputIters = std::move(inputIters), validationMode = validationMode](
                     std::string nodeId, memory::MemoryPool* pool, int32_t streamIdx, RowTypePtr outputType) {
    std::shared_ptr<ResultIterator> iterator;
    if (!validationMode) {
      VELOX_CHECK_LT(streamIdx, inputIters.size(), "Could not find stream index {} in input iterator list.", streamIdx);
      iterator = inputIters[streamIdx];
    }
    auto valueStream = std::make_shared<RowVectorStream>(pool, iterator, outputType);
    return std::make_shared<ValueStreamNode>(nodeId, outputType, std::move(valueStream));
  };
  substraitVeloxPlanConverter_.setValueStreamNodeFactory(std::move(factory));
}

namespace {
std::shared_ptr<SplitInfo> parseScanSplitInfo(
    const google::protobuf::RepeatedPtrField<substrait::ReadRel_LocalFiles_FileOrFiles>& fileList) {
  using SubstraitFileFormatCase = ::substrait::ReadRel_LocalFiles_FileOrFiles::FileFormatCase;

  auto splitInfo = std::make_shared<SplitInfo>();
  splitInfo->paths.reserve(fileList.size());
  splitInfo->starts.reserve(fileList.size());
  splitInfo->lengths.reserve(fileList.size());
  splitInfo->partitionColumns.reserve(fileList.size());
  for (const auto& file : fileList) {
    // Expect all Partitions share the same index.
    splitInfo->partitionIndex = file.partition_index();

    std::unordered_map<std::string, std::string> partitionColumnMap;
    for (const auto& partitionColumn : file.partition_columns()) {
      partitionColumnMap[partitionColumn.key()] = partitionColumn.value();
    }
    splitInfo->partitionColumns.emplace_back(partitionColumnMap);

    splitInfo->paths.emplace_back(file.uri_file());
    splitInfo->starts.emplace_back(file.start());
    splitInfo->lengths.emplace_back(file.length());
    switch (file.file_format_case()) {
      case SubstraitFileFormatCase::kOrc:
        splitInfo->format = dwio::common::FileFormat::ORC;
        break;
      case SubstraitFileFormatCase::kDwrf:
        splitInfo->format = dwio::common::FileFormat::DWRF;
        break;
      case SubstraitFileFormatCase::kParquet:
        splitInfo->format = dwio::common::FileFormat::PARQUET;
        break;
      case SubstraitFileFormatCase::kText:
        splitInfo->format = dwio::common::FileFormat::TEXT;
        break;
      default:
        splitInfo->format = dwio::common::FileFormat::UNKNOWN;
        break;
    }
  }
  return splitInfo;
}

void parseLocalFileNodes(
    SubstraitToVeloxPlanConverter* planConverter,
    std::vector<::substrait::ReadRel_LocalFiles>& localFiles) {
  std::vector<std::shared_ptr<SplitInfo>> splitInfos;
  splitInfos.reserve(localFiles.size());
  for (int32_t i = 0; i < localFiles.size(); i++) {
    const auto& localFile = localFiles[i];
    const auto& fileList = localFile.items();

    splitInfos.push_back(std::move(parseScanSplitInfo(fileList)));
  }

  planConverter->setSplitInfos(std::move(splitInfos));
}
} // namespace

std::shared_ptr<const facebook::velox::core::PlanNode> VeloxPlanConverter::toVeloxPlan(
    const ::substrait::Plan& substraitPlan,
    std::vector<::substrait::ReadRel_LocalFiles> localFiles) {
  if (!validationMode_) {
    parseLocalFileNodes(&substraitVeloxPlanConverter_, localFiles);
  }

  auto veloxPlan = substraitVeloxPlanConverter_.toVeloxPlan(substraitPlan);
  DLOG(INFO) << "Plan Node: " << std::endl << veloxPlan->toString(true, true);
  return veloxPlan;
}

std::string VeloxPlanConverter::nextPlanNodeId() {
  auto id = fmt::format("{}", planNodeId_);
  planNodeId_++;
  return id;
}

} // namespace gluten
