private def unzip()

in scala/src/main/org/apache/spark/api/csharp/CSharpRDD.scala [114:192]


  private def unzip(cSharpWorkerWorkingDir: File): Unit = {

    val files = cSharpWorkerWorkingDir.list.filter(_.toLowerCase.endsWith(".zip"))

    val lockName = "_unzip_lock"
    val unzippingFlagName = "_unzipping"
    val doneFlagName = "_unzip_done"

    if (files.length == 0) {
      logWarning("Found no zip files.")
      return
    } else {
      logInfo("Found zip files: " + files.mkString(","))
    }

    val doneFlag = new File(cSharpWorkerWorkingDir, doneFlagName)

    // check whether all zip files have already uncompressed
    if (doneFlag.exists()) {
      logInfo("Already unzipped all zip files, skip.")
      return
    }

    val unzippingFlag = new File(cSharpWorkerWorkingDir, unzippingFlagName)

    // if another thread is uncompressing files,
    // current thread just needs to wait the operation done and return
    if (unzippingFlag.exists()) {
      waitUnzipOperationDone(doneFlag)
      return
    }

    val lockFile = new File(cSharpWorkerWorkingDir, lockName)
    var file: RandomAccessFile = null
    var lock: FileLock = null
    var channel: FileChannel = null

    try {
      file = new RandomAccessFile(lockFile, "rw")
      channel = file.getChannel
      lock = channel.tryLock()

      if (lock == null) {
        logWarning("Failed to obtain lock for file " + lockFile.getPath)
        waitUnzipOperationDone(doneFlag)
        return
      }

      // check again whether un-compression operation already done
      if (new File(cSharpWorkerWorkingDir, doneFlagName).exists()) {
        return
      }

      // unzippingFlag file will be deleted before release the lock
      // so if obtain the lock successfully, there is no chance that the unzippingFlag still exists
      unzippingFlag.createNewFile()

      // unzip file
      for (zipFile <- files) {
        CSharpUtils.unzip(new File(cSharpWorkerWorkingDir, zipFile), cSharpWorkerWorkingDir)
        logInfo("Unzip file: " + zipFile)
      }

      doneFlag.createNewFile()
      unzippingFlag.delete()
      logInfo("Unzip done.")

    } catch {
      case e: OverlappingFileLockException =>
        logInfo("Already obtained the lock.")
        waitUnzipOperationDone(doneFlag)
      case e: Exception => logError("Exception when unzipping cSharpWorkerWorkingDir", e)
    }
    finally {
      if (lock != null && lock.isValid) lock.release()
      if (channel != null && channel.isOpen) channel.close()
      if (file != null) file.close()
    }
  }