in common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala [60:130]
def toPbStoreVersion(major: Int, minor: Int): Array[Byte] =
PbStoreVersion.newBuilder
.setMajor(major)
.setMinor(minor)
.build.toByteArray
def fromPbDiskInfo(pbDiskInfo: PbDiskInfo): DiskInfo =
new DiskInfo(
pbDiskInfo.getMountPoint,
pbDiskInfo.getUsableSpace,
pbDiskInfo.getAvgFlushTime,
pbDiskInfo.getAvgFetchTime,
pbDiskInfo.getUsedSlots)
.setStatus(Utils.toDiskStatus(pbDiskInfo.getStatus))
def toPbDiskInfo(diskInfo: DiskInfo): PbDiskInfo =
PbDiskInfo.newBuilder
.setMountPoint(diskInfo.mountPoint)
.setUsableSpace(diskInfo.actualUsableSpace)
.setAvgFlushTime(diskInfo.avgFlushTime)
.setAvgFetchTime(diskInfo.avgFetchTime)
.setUsedSlots(diskInfo.activeSlots)
.setStatus(diskInfo.status.getValue)
.build
def fromPbFileInfo(pbFileInfo: PbFileInfo): FileInfo =
fromPbFileInfo(pbFileInfo, fromPbUserIdentifier(pbFileInfo.getUserIdentifier))
def fromPbFileInfo(pbFileInfo: PbFileInfo, userIdentifier: UserIdentifier) =
new FileInfo(
pbFileInfo.getFilePath,
pbFileInfo.getChunkOffsetsList,
userIdentifier,
Utils.toPartitionType(pbFileInfo.getPartitionType),
pbFileInfo.getBufferSize,
pbFileInfo.getNumSubpartitions,
pbFileInfo.getBytesFlushed)
def toPbFileInfo(fileInfo: FileInfo): PbFileInfo =
PbFileInfo.newBuilder
.setFilePath(fileInfo.getFilePath)
.addAllChunkOffsets(fileInfo.getChunkOffsets)
.setUserIdentifier(toPbUserIdentifier(fileInfo.getUserIdentifier))
.setPartitionType(fileInfo.getPartitionType.getValue)
.setBufferSize(fileInfo.getBufferSize)
.setNumSubpartitions(fileInfo.getNumSubpartitions)
.setBytesFlushed(fileInfo.getFileLength)
.build
@throws[InvalidProtocolBufferException]
def fromPbFileInfoMap(
data: Array[Byte],
cache: ConcurrentHashMap[String, UserIdentifier]): ConcurrentHashMap[String, FileInfo] = {
val pbFileInfoMap = PbFileInfoMap.parseFrom(data)
val fileInfoMap = JavaUtils.newConcurrentHashMap[String, FileInfo]
pbFileInfoMap.getValuesMap.entrySet().asScala.foreach { entry =>
val fileName = entry.getKey
val pbFileInfo = entry.getValue
val pbUserIdentifier = pbFileInfo.getUserIdentifier
val userIdentifierKey = pbUserIdentifier.getTenantId + "-" + pbUserIdentifier.getName
if (!cache.containsKey(userIdentifierKey)) {
val fileInfo = fromPbFileInfo(pbFileInfo)
cache.put(userIdentifierKey, fileInfo.getUserIdentifier)
fileInfoMap.put(fileName, fileInfo)
} else {
val fileInfo = fromPbFileInfo(pbFileInfo, cache.get(userIdentifierKey))
fileInfoMap.put(fileName, fileInfo)
}
}
fileInfoMap
}