in processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java [234:381]
public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid,
List<Segment> segmentsToBeDeleted, List<Segment> segmentFilesTobeUpdated,
boolean isUpdateStatusRequired) throws IOException {
boolean status = false;
AbsoluteTableIdentifier identifier =
loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
if (loadModel.isCarbonTransactionalTable()) {
String metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath());
if (!FileFactory.isFileExist(metadataPath)) {
FileFactory.mkdirs(metadataPath);
}
}
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier,
loadModel.getCarbonDataLoadSchema().getCarbonTable().getTableStatusVersion());
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
int retryCount = CarbonLockUtil
.getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
int maxTimeout = CarbonLockUtil
.getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
try {
if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
LOGGER.info(
"Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
+ " for table status updation");
loadModel.setLatestTableStatusWriteVersion(String.valueOf(System.currentTimeMillis()));
String newTblStatusVersion = loadModel.getLatestTableStatusWriteVersion();
String tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath(),
loadModel.getLatestTableStatusWriteVersion());
if (newMetaEntry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS
|| newMetaEntry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
newTblStatusVersion =
loadModel.getCarbonDataLoadSchema().getCarbonTable().getTableStatusVersion();
}
LoadMetadataDetails[] listOfLoadFolderDetailsArray =
SegmentStatusManager.readLoadMetadata(
CarbonTablePath.getMetadataPath(identifier.getTablePath()), newTblStatusVersion);
List<LoadMetadataDetails> listOfLoadFolderDetails =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
Collections.addAll(listOfLoadFolderDetails, listOfLoadFolderDetailsArray);
// create a new segment Id if load has just begun else add the already generated Id
if (loadStartEntry) {
String segmentId =
String.valueOf(SegmentStatusManager.createNewSegmentId(listOfLoadFolderDetailsArray));
loadModel.setLoadMetadataDetails(listOfLoadFolderDetails);
LoadMetadataDetails entryTobeRemoved = null;
if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isMV()
&& !loadModel.getSegmentId().isEmpty()) {
for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
if (entry.getLoadName().equalsIgnoreCase(loadModel.getSegmentId())) {
newMetaEntry.setLoadName(loadModel.getSegmentId());
newMetaEntry.setExtraInfo(entry.getExtraInfo());
entryTobeRemoved = entry;
}
}
} else {
if (isUpdateStatusRequired && segmentId.equalsIgnoreCase("0") && !StringUtils
.isBlank(uuid)) {
newMetaEntry.setUpdateStatusFileName(CarbonUpdateUtil.getUpdateStatusFileName(uuid));
}
newMetaEntry.setLoadName(segmentId);
loadModel.setSegmentId(segmentId);
}
listOfLoadFolderDetails.remove(entryTobeRemoved);
// Exception should be thrown if:
// 1. If insert overwrite is in progress and any other load or insert operation
// is triggered
// 2. If load or insert into operation is in progress and insert overwrite operation
// is triggered
for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
if (entry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
&& SegmentStatusManager.isLoadInProgress(
identifier, entry.getLoadName())) {
throw new RuntimeException("Already insert overwrite is in progress");
} else if (newMetaEntry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
&& entry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS
&& SegmentStatusManager.isLoadInProgress(
identifier, entry.getLoadName())) {
throw new RuntimeException("Already insert into or load is in progress");
}
}
listOfLoadFolderDetails.add(newMetaEntry);
} else {
newMetaEntry.setLoadName(String.valueOf(loadModel.getSegmentId()));
// existing entry needs to be overwritten as the entry will exist with some
// intermediate status
int indexToOverwriteNewMetaEntry = 0;
boolean found = false;
for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
if (entry.getLoadName().equals(newMetaEntry.getLoadName())
&& entry.getLoadStartTime() == newMetaEntry.getLoadStartTime()) {
newMetaEntry.setExtraInfo(entry.getExtraInfo());
found = true;
break;
}
indexToOverwriteNewMetaEntry++;
}
if (insertOverwrite) {
for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
if (entry.getSegmentStatus() != SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
}
}
}
if (!found) {
LOGGER.error("Entry not found to update " + newMetaEntry + " From list :: "
+ listOfLoadFolderDetails);
throw new IOException("Entry not found to update in the table status file");
}
listOfLoadFolderDetails.set(indexToOverwriteNewMetaEntry, newMetaEntry);
}
for (LoadMetadataDetails detail: listOfLoadFolderDetails) {
// if the segments is in the list of marked for delete then update the status.
if (segmentsToBeDeleted.contains(new Segment(detail.getLoadName()))) {
detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
} else if (segmentFilesTobeUpdated
.contains(Segment.toSegment(detail.getLoadName(), null))) {
detail.setSegmentFile(
detail.getLoadName() + "_" + newMetaEntry.getUpdateStatusFileName()
+ CarbonTablePath.SEGMENT_EXT);
} else if (isUpdateStatusRequired && detail.getLoadName().equalsIgnoreCase("0")
&& !StringUtils.isBlank(uuid)) {
detail.setUpdateStatusFileName(CarbonUpdateUtil.getUpdateStatusFileName(uuid));
}
}
SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
.toArray(new LoadMetadataDetails[0]));
status = true;
} else {
LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
.getDatabaseName() + "." + loadModel.getTableName());
}
} finally {
if (carbonLock.unlock()) {
LOGGER.info(
"Table unlocked successfully after table status updation" + loadModel.getDatabaseName()
+ "." + loadModel.getTableName());
} else {
LOGGER.error(
"Unable to unlock Table lock for table" + loadModel.getDatabaseName() + "." + loadModel
.getTableName() + " during table status updation");
}
}
return status;
}