private SuccessResponse uploadSegment()

in pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java [240:436]


  private SuccessResponse uploadSegment(@Nullable String tableName, TableType tableType,
      @Nullable FormDataMultiPart multiPart, boolean copySegmentToFinalLocation, boolean enableParallelPushProtection,
      boolean allowRefresh, HttpHeaders headers, Request request) {
    long segmentUploadStartTimeMs = System.currentTimeMillis();
    if (StringUtils.isNotEmpty(tableName)) {
      TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(tableName);
      if (tableTypeFromTableName != null && tableTypeFromTableName != tableType) {
        throw new ControllerApplicationException(LOGGER,
            String.format("Table name: %s does not match table type: %s", tableName, tableType),
            Response.Status.BAD_REQUEST);
      }
    }

    // TODO: Consider validating the segment name and table name from the header against the actual segment
    extractHttpHeader(headers, CommonConstants.Controller.SEGMENT_NAME_HTTP_HEADER);
    extractHttpHeader(headers, CommonConstants.Controller.TABLE_NAME_HTTP_HEADER);

    String uploadTypeStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
    String sourceDownloadURIStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
    String crypterClassNameInHeader = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER);
    String ingestionDescriptor = extractHttpHeader(headers, CommonConstants.Controller.INGESTION_DESCRIPTOR);

    File tempEncryptedFile = null;
    File tempDecryptedFile = null;
    File tempSegmentDir = null;
    // The downloadUri for putting into segment zk metadata
    String segmentDownloadURIStr = sourceDownloadURIStr;
    try {
      ControllerFilePathProvider provider = ControllerFilePathProvider.getInstance();
      String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID();
      tempEncryptedFile = new File(provider.getFileUploadTempDir(), tempFileName + ENCRYPTED_SUFFIX);
      tempDecryptedFile = new File(provider.getFileUploadTempDir(), tempFileName);
      tempSegmentDir = new File(provider.getUntarredFileTempDir(), tempFileName);

      boolean uploadedSegmentIsEncrypted = StringUtils.isNotEmpty(crypterClassNameInHeader);
      FileUploadType uploadType = getUploadType(uploadTypeStr);
      File destFile = uploadedSegmentIsEncrypted ? tempEncryptedFile : tempDecryptedFile;
      long segmentSizeInBytes;
      switch (uploadType) {
        case SEGMENT:
          if (multiPart == null) {
            throw new ControllerApplicationException(LOGGER,
                "Segment file (as multipart/form-data) is required for SEGMENT upload mode",
                Response.Status.BAD_REQUEST);
          }
          if (!copySegmentToFinalLocation && StringUtils.isEmpty(sourceDownloadURIStr)) {
            throw new ControllerApplicationException(LOGGER,
                "Source download URI is required in header field 'DOWNLOAD_URI' if segment should not be copied to "
                    + "the deep store",
                Response.Status.BAD_REQUEST);
          }
          createSegmentFileFromMultipart(multiPart, destFile);
          segmentSizeInBytes = destFile.length();
          break;
        case URI:
          if (StringUtils.isEmpty(sourceDownloadURIStr)) {
            throw new ControllerApplicationException(LOGGER,
                "Source download URI is required in header field 'DOWNLOAD_URI' for URI upload mode",
                Response.Status.BAD_REQUEST);
          }
          downloadSegmentFileFromURI(sourceDownloadURIStr, destFile, tableName);
          segmentSizeInBytes = destFile.length();
          break;
        case METADATA:
          if (multiPart == null) {
            throw new ControllerApplicationException(LOGGER,
                "Segment metadata file (as multipart/form-data) is required for METADATA upload mode",
                Response.Status.BAD_REQUEST);
          }
          if (StringUtils.isEmpty(sourceDownloadURIStr)) {
            throw new ControllerApplicationException(LOGGER,
                "Source download URI is required in header field 'DOWNLOAD_URI' for METADATA upload mode",
                Response.Status.BAD_REQUEST);
          }
          // override copySegmentToFinalLocation if override provided in headers:COPY_SEGMENT_TO_DEEP_STORE
          // else set to false for backward compatibility
          String copySegmentToDeepStore =
              extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
          copySegmentToFinalLocation = Boolean.parseBoolean(copySegmentToDeepStore);
          createSegmentFileFromMultipart(multiPart, destFile);
          PinotFS pinotFS = null;
          try {
            URI segmentURI = new URI(sourceDownloadURIStr);
            pinotFS = PinotFSFactory.create(segmentURI.getScheme());
            segmentSizeInBytes = pinotFS.length(segmentURI);
          } catch (Exception e) {
            segmentSizeInBytes = -1;
            LOGGER.warn("Could not fetch segment size for metadata push", e);
          } finally {
            if (pinotFS != null) {
              pinotFS.close();
            }
          }
          break;
        default:
          throw new ControllerApplicationException(LOGGER, "Unsupported upload type: " + uploadType,
              Response.Status.BAD_REQUEST);
      }

      if (uploadedSegmentIsEncrypted) {
        decryptFile(crypterClassNameInHeader, tempEncryptedFile, tempDecryptedFile);
      }

      String metadataProviderClass = DefaultMetadataExtractor.class.getName();
      SegmentMetadata segmentMetadata = getSegmentMetadata(tempDecryptedFile, tempSegmentDir, metadataProviderClass);

      // Fetch segment name
      String segmentName = segmentMetadata.getName();

      // Fetch table name. Try to derive the table name from the parameter and then from segment metadata
      String rawTableName;
      if (StringUtils.isNotEmpty(tableName)) {
        rawTableName = TableNameBuilder.extractRawTableName(tableName);
      } else {
        // TODO: remove this when we completely deprecate the table name from segment metadata
        rawTableName = segmentMetadata.getTableName();
        LOGGER.warn("Table name is not provided as request query parameter when uploading segment: {} for table: {}",
            segmentName, rawTableName);
      }
      String tableNameWithType = tableType == TableType.OFFLINE
          ? TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)
          : TableNameBuilder.REALTIME.tableNameWithType(rawTableName);

      if (UploadedRealtimeSegmentName.isUploadedRealtimeSegmentName(segmentName) && tableType != TableType.REALTIME) {
        throw new ControllerApplicationException(LOGGER, "Cannot upload segment: " + segmentName
            + " to OFFLINE table as this format is reserved for uploaded real-time segment",
            Response.Status.BAD_REQUEST);
      }

      String clientAddress = InetAddress.getByName(request.getRemoteAddr()).getHostName();
      LOGGER.info("Processing upload request for segment: {} of table: {} with upload type: {} from client: {}, "
          + "ingestion descriptor: {}", segmentName, tableNameWithType, uploadType, clientAddress, ingestionDescriptor);

      // Validate segment
      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
      if (tableConfig == null) {
        throw new ControllerApplicationException(LOGGER, "Failed to find table: " + tableNameWithType,
            Response.Status.BAD_REQUEST);
      }
      if (tableConfig.getIngestionConfig() == null || tableConfig.getIngestionConfig().isSegmentTimeValueCheck()) {
        SegmentValidationUtils.validateTimeInterval(segmentMetadata, tableConfig);
      }
      long untarredSegmentSizeInBytes;
      if (uploadType == FileUploadType.METADATA && segmentSizeInBytes > 0) {
        // TODO: Include the untarred segment size when using the METADATA push rest API. Currently we can only use the
        //       tarred segment size as an approximation.
        untarredSegmentSizeInBytes = segmentSizeInBytes;
      } else {
        untarredSegmentSizeInBytes = FileUtils.sizeOfDirectory(tempSegmentDir);
      }
      SegmentValidationUtils.checkStorageQuota(segmentName, untarredSegmentSizeInBytes, tableConfig,
          _storageQuotaChecker);

      // Encrypt segment
      String crypterNameInTableConfig = tableConfig.getValidationConfig().getCrypterClassName();
      Pair<String, File> encryptionInfo =
          encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile, uploadedSegmentIsEncrypted,
              crypterClassNameInHeader, crypterNameInTableConfig, segmentName, tableNameWithType);

      String crypterName = encryptionInfo.getLeft();
      File segmentFile = encryptionInfo.getRight();

      // Update download URI if controller is responsible for moving the segment to the deep store
      URI finalSegmentLocationURI = null;
      if (copySegmentToFinalLocation) {
        URI dataDirURI = provider.getDataDirURI();
        String dataDirPath = dataDirURI.toString();
        String encodedSegmentName = URIUtils.encode(segmentName);
        String finalSegmentLocationPath = URIUtils.getPath(dataDirPath, rawTableName, encodedSegmentName);
        if (dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) {
          segmentDownloadURIStr = URIUtils.getPath(provider.getVip(), "segments", rawTableName, encodedSegmentName);
        } else {
          segmentDownloadURIStr = finalSegmentLocationPath;
        }
        finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath);
      }
      LOGGER.info("Using segment download URI: {} for segment: {} of table: {} (move segment: {})",
          segmentDownloadURIStr, segmentFile, tableNameWithType, copySegmentToFinalLocation);

      ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics);
      zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI,
          segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes,
          enableParallelPushProtection, allowRefresh, headers);
      return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + tableNameWithType);
    } catch (WebApplicationException e) {
      throw e;
    } catch (Exception e) {
      _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_SEGMENT_UPLOAD_ERROR, 1L);
      _controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.CONTROLLER_TABLE_SEGMENT_UPLOAD_ERROR, 1L);
      throw new ControllerApplicationException(LOGGER, "Exception while uploading segment: " + e.getMessage(),
          Response.Status.INTERNAL_SERVER_ERROR, e);
    } finally {
      FileUtils.deleteQuietly(tempEncryptedFile);
      FileUtils.deleteQuietly(tempDecryptedFile);
      FileUtils.deleteQuietly(tempSegmentDir);
    }
  }