in scala/src/main/org/apache/spark/api/csharp/CSharpRDD.scala [114:192]
private def unzip(cSharpWorkerWorkingDir: File): Unit = {
val files = cSharpWorkerWorkingDir.list.filter(_.toLowerCase.endsWith(".zip"))
val lockName = "_unzip_lock"
val unzippingFlagName = "_unzipping"
val doneFlagName = "_unzip_done"
if (files.length == 0) {
logWarning("Found no zip files.")
return
} else {
logInfo("Found zip files: " + files.mkString(","))
}
val doneFlag = new File(cSharpWorkerWorkingDir, doneFlagName)
// check whether all zip files have already uncompressed
if (doneFlag.exists()) {
logInfo("Already unzipped all zip files, skip.")
return
}
val unzippingFlag = new File(cSharpWorkerWorkingDir, unzippingFlagName)
// if another thread is uncompressing files,
// current thread just needs to wait the operation done and return
if (unzippingFlag.exists()) {
waitUnzipOperationDone(doneFlag)
return
}
val lockFile = new File(cSharpWorkerWorkingDir, lockName)
var file: RandomAccessFile = null
var lock: FileLock = null
var channel: FileChannel = null
try {
file = new RandomAccessFile(lockFile, "rw")
channel = file.getChannel
lock = channel.tryLock()
if (lock == null) {
logWarning("Failed to obtain lock for file " + lockFile.getPath)
waitUnzipOperationDone(doneFlag)
return
}
// check again whether un-compression operation already done
if (new File(cSharpWorkerWorkingDir, doneFlagName).exists()) {
return
}
// unzippingFlag file will be deleted before release the lock
// so if obtain the lock successfully, there is no chance that the unzippingFlag still exists
unzippingFlag.createNewFile()
// unzip file
for (zipFile <- files) {
CSharpUtils.unzip(new File(cSharpWorkerWorkingDir, zipFile), cSharpWorkerWorkingDir)
logInfo("Unzip file: " + zipFile)
}
doneFlag.createNewFile()
unzippingFlag.delete()
logInfo("Unzip done.")
} catch {
case e: OverlappingFileLockException =>
logInfo("Already obtained the lock.")
waitUnzipOperationDone(doneFlag)
case e: Exception => logError("Exception when unzipping cSharpWorkerWorkingDir", e)
}
finally {
if (lock != null && lock.isValid) lock.release()
if (channel != null && channel.isOpen) channel.close()
if (file != null) file.close()
}
}