in pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java [240:436]
private SuccessResponse uploadSegment(@Nullable String tableName, TableType tableType,
@Nullable FormDataMultiPart multiPart, boolean copySegmentToFinalLocation, boolean enableParallelPushProtection,
boolean allowRefresh, HttpHeaders headers, Request request) {
long segmentUploadStartTimeMs = System.currentTimeMillis();
if (StringUtils.isNotEmpty(tableName)) {
TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableTypeFromTableName != null && tableTypeFromTableName != tableType) {
throw new ControllerApplicationException(LOGGER,
String.format("Table name: %s does not match table type: %s", tableName, tableType),
Response.Status.BAD_REQUEST);
}
}
// TODO: Consider validating the segment name and table name from the header against the actual segment
extractHttpHeader(headers, CommonConstants.Controller.SEGMENT_NAME_HTTP_HEADER);
extractHttpHeader(headers, CommonConstants.Controller.TABLE_NAME_HTTP_HEADER);
String uploadTypeStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
String sourceDownloadURIStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
String crypterClassNameInHeader = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER);
String ingestionDescriptor = extractHttpHeader(headers, CommonConstants.Controller.INGESTION_DESCRIPTOR);
File tempEncryptedFile = null;
File tempDecryptedFile = null;
File tempSegmentDir = null;
// The downloadUri for putting into segment zk metadata
String segmentDownloadURIStr = sourceDownloadURIStr;
try {
ControllerFilePathProvider provider = ControllerFilePathProvider.getInstance();
String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID();
tempEncryptedFile = new File(provider.getFileUploadTempDir(), tempFileName + ENCRYPTED_SUFFIX);
tempDecryptedFile = new File(provider.getFileUploadTempDir(), tempFileName);
tempSegmentDir = new File(provider.getUntarredFileTempDir(), tempFileName);
boolean uploadedSegmentIsEncrypted = StringUtils.isNotEmpty(crypterClassNameInHeader);
FileUploadType uploadType = getUploadType(uploadTypeStr);
File destFile = uploadedSegmentIsEncrypted ? tempEncryptedFile : tempDecryptedFile;
long segmentSizeInBytes;
switch (uploadType) {
case SEGMENT:
if (multiPart == null) {
throw new ControllerApplicationException(LOGGER,
"Segment file (as multipart/form-data) is required for SEGMENT upload mode",
Response.Status.BAD_REQUEST);
}
if (!copySegmentToFinalLocation && StringUtils.isEmpty(sourceDownloadURIStr)) {
throw new ControllerApplicationException(LOGGER,
"Source download URI is required in header field 'DOWNLOAD_URI' if segment should not be copied to "
+ "the deep store",
Response.Status.BAD_REQUEST);
}
createSegmentFileFromMultipart(multiPart, destFile);
segmentSizeInBytes = destFile.length();
break;
case URI:
if (StringUtils.isEmpty(sourceDownloadURIStr)) {
throw new ControllerApplicationException(LOGGER,
"Source download URI is required in header field 'DOWNLOAD_URI' for URI upload mode",
Response.Status.BAD_REQUEST);
}
downloadSegmentFileFromURI(sourceDownloadURIStr, destFile, tableName);
segmentSizeInBytes = destFile.length();
break;
case METADATA:
if (multiPart == null) {
throw new ControllerApplicationException(LOGGER,
"Segment metadata file (as multipart/form-data) is required for METADATA upload mode",
Response.Status.BAD_REQUEST);
}
if (StringUtils.isEmpty(sourceDownloadURIStr)) {
throw new ControllerApplicationException(LOGGER,
"Source download URI is required in header field 'DOWNLOAD_URI' for METADATA upload mode",
Response.Status.BAD_REQUEST);
}
// override copySegmentToFinalLocation if override provided in headers:COPY_SEGMENT_TO_DEEP_STORE
// else set to false for backward compatibility
String copySegmentToDeepStore =
extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
copySegmentToFinalLocation = Boolean.parseBoolean(copySegmentToDeepStore);
createSegmentFileFromMultipart(multiPart, destFile);
PinotFS pinotFS = null;
try {
URI segmentURI = new URI(sourceDownloadURIStr);
pinotFS = PinotFSFactory.create(segmentURI.getScheme());
segmentSizeInBytes = pinotFS.length(segmentURI);
} catch (Exception e) {
segmentSizeInBytes = -1;
LOGGER.warn("Could not fetch segment size for metadata push", e);
} finally {
if (pinotFS != null) {
pinotFS.close();
}
}
break;
default:
throw new ControllerApplicationException(LOGGER, "Unsupported upload type: " + uploadType,
Response.Status.BAD_REQUEST);
}
if (uploadedSegmentIsEncrypted) {
decryptFile(crypterClassNameInHeader, tempEncryptedFile, tempDecryptedFile);
}
String metadataProviderClass = DefaultMetadataExtractor.class.getName();
SegmentMetadata segmentMetadata = getSegmentMetadata(tempDecryptedFile, tempSegmentDir, metadataProviderClass);
// Fetch segment name
String segmentName = segmentMetadata.getName();
// Fetch table name. Try to derive the table name from the parameter and then from segment metadata
String rawTableName;
if (StringUtils.isNotEmpty(tableName)) {
rawTableName = TableNameBuilder.extractRawTableName(tableName);
} else {
// TODO: remove this when we completely deprecate the table name from segment metadata
rawTableName = segmentMetadata.getTableName();
LOGGER.warn("Table name is not provided as request query parameter when uploading segment: {} for table: {}",
segmentName, rawTableName);
}
String tableNameWithType = tableType == TableType.OFFLINE
? TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)
: TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
if (UploadedRealtimeSegmentName.isUploadedRealtimeSegmentName(segmentName) && tableType != TableType.REALTIME) {
throw new ControllerApplicationException(LOGGER, "Cannot upload segment: " + segmentName
+ " to OFFLINE table as this format is reserved for uploaded real-time segment",
Response.Status.BAD_REQUEST);
}
String clientAddress = InetAddress.getByName(request.getRemoteAddr()).getHostName();
LOGGER.info("Processing upload request for segment: {} of table: {} with upload type: {} from client: {}, "
+ "ingestion descriptor: {}", segmentName, tableNameWithType, uploadType, clientAddress, ingestionDescriptor);
// Validate segment
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
if (tableConfig == null) {
throw new ControllerApplicationException(LOGGER, "Failed to find table: " + tableNameWithType,
Response.Status.BAD_REQUEST);
}
if (tableConfig.getIngestionConfig() == null || tableConfig.getIngestionConfig().isSegmentTimeValueCheck()) {
SegmentValidationUtils.validateTimeInterval(segmentMetadata, tableConfig);
}
long untarredSegmentSizeInBytes;
if (uploadType == FileUploadType.METADATA && segmentSizeInBytes > 0) {
// TODO: Include the untarred segment size when using the METADATA push rest API. Currently we can only use the
// tarred segment size as an approximation.
untarredSegmentSizeInBytes = segmentSizeInBytes;
} else {
untarredSegmentSizeInBytes = FileUtils.sizeOfDirectory(tempSegmentDir);
}
SegmentValidationUtils.checkStorageQuota(segmentName, untarredSegmentSizeInBytes, tableConfig,
_storageQuotaChecker);
// Encrypt segment
String crypterNameInTableConfig = tableConfig.getValidationConfig().getCrypterClassName();
Pair<String, File> encryptionInfo =
encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile, uploadedSegmentIsEncrypted,
crypterClassNameInHeader, crypterNameInTableConfig, segmentName, tableNameWithType);
String crypterName = encryptionInfo.getLeft();
File segmentFile = encryptionInfo.getRight();
// Update download URI if controller is responsible for moving the segment to the deep store
URI finalSegmentLocationURI = null;
if (copySegmentToFinalLocation) {
URI dataDirURI = provider.getDataDirURI();
String dataDirPath = dataDirURI.toString();
String encodedSegmentName = URIUtils.encode(segmentName);
String finalSegmentLocationPath = URIUtils.getPath(dataDirPath, rawTableName, encodedSegmentName);
if (dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) {
segmentDownloadURIStr = URIUtils.getPath(provider.getVip(), "segments", rawTableName, encodedSegmentName);
} else {
segmentDownloadURIStr = finalSegmentLocationPath;
}
finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath);
}
LOGGER.info("Using segment download URI: {} for segment: {} of table: {} (move segment: {})",
segmentDownloadURIStr, segmentFile, tableNameWithType, copySegmentToFinalLocation);
ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics);
zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI,
segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes,
enableParallelPushProtection, allowRefresh, headers);
return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + tableNameWithType);
} catch (WebApplicationException e) {
throw e;
} catch (Exception e) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_SEGMENT_UPLOAD_ERROR, 1L);
_controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.CONTROLLER_TABLE_SEGMENT_UPLOAD_ERROR, 1L);
throw new ControllerApplicationException(LOGGER, "Exception while uploading segment: " + e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
} finally {
FileUtils.deleteQuietly(tempEncryptedFile);
FileUtils.deleteQuietly(tempDecryptedFile);
FileUtils.deleteQuietly(tempSegmentDir);
}
}