override def executeLine()

in linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala [111:224]


  override def executeLine(
      engineExecutionContext: EngineExecutionContext,
      code: String
  ): ExecuteResponse = {
    val method = MethodEntitySerializer.deserializer(code)
    val methodName = method.methodName
    val jobID = engineExecutionContext.getJobId.get
    logger.info(
      s"jobID($jobID):creator ${method.creatorUser} proxy user: ${method.proxyUser} to execute a method: ${method.methodName}.,fsId=${method.id}"
    )
    val executeResponse = methodName match {
      case "init" =>
        val fsId: Long = if (!existsUserFS(method)) {
          createUserFS(method)
        } else {
          method.id
        }
        logger.info(s"jobID($jobID),user(${method.proxyUser}) execute init and fsID($fsId)")
        AliasOutputExecuteResponse(
          fsId.toString,
          StorageUtils.serializerStringToResult(fsId.toString)
        )
      case "close" => closeUserFS(method); SuccessExecuteResponse()
      case "read" =>
        val fs = getUserFS(method)
        AliasOutputExecuteResponse(method.id.toString, IOHelp.read(fs, method))
      case "available" =>
        val fs = getUserFS(method)
        if (method.params == null || method.params.length != 2) {
          throw new StorageErrorException(
            PARAMETER_CALLS.getErrorCode,
            PARAMETER_CALLS.getErrorDesc
          )
        }
        val dest = MethodEntitySerializer.deserializerToJavaObject(
          method.params(0).asInstanceOf[String],
          classOf[FsPath]
        )
        val position =
          if (method.params(1).toString.toInt < 0) 0 else method.params(1).toString.toInt
        val inputStream = fs.read(dest)
        Utils.tryFinally(
          AliasOutputExecuteResponse(
            method.id.toString,
            StorageUtils.serializerStringToResult((inputStream.available() - position).toString)
          )
        )(IOUtils.closeQuietly(inputStream))
      case "write" =>
        val fs = getUserFS(method)
        IOHelp.write(fs, method)
        SuccessExecuteResponse()
      case "renameTo" =>
        val fs = getUserFS(method)
        if (method.params == null || method.params.length != 2) {
          throw new StorageErrorException(
            PARAMETER_CALLS.getErrorCode,
            PARAMETER_CALLS.getErrorDesc
          )
        }
        fs.renameTo(
          MethodEntitySerializer
            .deserializerToJavaObject(method.params(0).asInstanceOf[String], classOf[FsPath]),
          MethodEntitySerializer.deserializerToJavaObject(
            method.params(1).asInstanceOf[String],
            classOf[FsPath]
          )
        )
        SuccessExecuteResponse()
      case "list" =>
        if (method.params == null || method.params.length != 1) {
          throw new StorageErrorException(
            PARAMETER_CALLS.getErrorCode,
            PARAMETER_CALLS.getErrorDesc
          )
        }
        val fs = getUserFS(method)
        val dest = MethodEntitySerializer.deserializerToJavaObject(
          method.params(0).asInstanceOf[String],
          classOf[FsPath]
        )
        AliasOutputExecuteResponse(
          method.id.toString,
          StorageUtils.serializerStringToResult(
            MethodEntitySerializer.serializerJavaObject(fs.list(dest))
          )
        )
      case "listPathWithError" =>
        if (method.params == null || method.params.length != 1) {
          throw new StorageErrorException(
            PARAMETER_CALLS.getErrorCode,
            PARAMETER_CALLS.getErrorDesc
          )
        }
        val fs = getUserFS(method).asInstanceOf[FileSystem]
        val dest = MethodEntitySerializer.deserializerToJavaObject(
          method.params(0).asInstanceOf[String],
          classOf[FsPath]
        )
        AliasOutputExecuteResponse(
          method.id.toString,
          StorageUtils.serializerStringToResult(
            MethodEntitySerializer.serializerJavaObject(fs.listPathWithError(dest))
          )
        )
      case "get" | "create" | "isOwner" =>
        invokeMethod(method, classOf[String], jobID)
      case _ =>
        invokeMethod(method, classOf[FsPath], jobID)
    }
    logger.info(
      s"jobID($jobID):creator ${method.creatorUser} proxy user: ${method.proxyUser} finished to  execute a method: ${method.methodName}.,fsId=${method.id}"
    )
    executeResponse
  }