in linkis-engineconn-plugins/io_file/src/main/scala/org/apache/linkis/manager/engineplugin/io/executor/IoEngineConnExecutor.scala [111:224]
override def executeLine(
engineExecutionContext: EngineExecutionContext,
code: String
): ExecuteResponse = {
val method = MethodEntitySerializer.deserializer(code)
val methodName = method.methodName
val jobID = engineExecutionContext.getJobId.get
logger.info(
s"jobID($jobID):creator ${method.creatorUser} proxy user: ${method.proxyUser} to execute a method: ${method.methodName}.,fsId=${method.id}"
)
val executeResponse = methodName match {
case "init" =>
val fsId: Long = if (!existsUserFS(method)) {
createUserFS(method)
} else {
method.id
}
logger.info(s"jobID($jobID),user(${method.proxyUser}) execute init and fsID($fsId)")
AliasOutputExecuteResponse(
fsId.toString,
StorageUtils.serializerStringToResult(fsId.toString)
)
case "close" => closeUserFS(method); SuccessExecuteResponse()
case "read" =>
val fs = getUserFS(method)
AliasOutputExecuteResponse(method.id.toString, IOHelp.read(fs, method))
case "available" =>
val fs = getUserFS(method)
if (method.params == null || method.params.length != 2) {
throw new StorageErrorException(
PARAMETER_CALLS.getErrorCode,
PARAMETER_CALLS.getErrorDesc
)
}
val dest = MethodEntitySerializer.deserializerToJavaObject(
method.params(0).asInstanceOf[String],
classOf[FsPath]
)
val position =
if (method.params(1).toString.toInt < 0) 0 else method.params(1).toString.toInt
val inputStream = fs.read(dest)
Utils.tryFinally(
AliasOutputExecuteResponse(
method.id.toString,
StorageUtils.serializerStringToResult((inputStream.available() - position).toString)
)
)(IOUtils.closeQuietly(inputStream))
case "write" =>
val fs = getUserFS(method)
IOHelp.write(fs, method)
SuccessExecuteResponse()
case "renameTo" =>
val fs = getUserFS(method)
if (method.params == null || method.params.length != 2) {
throw new StorageErrorException(
PARAMETER_CALLS.getErrorCode,
PARAMETER_CALLS.getErrorDesc
)
}
fs.renameTo(
MethodEntitySerializer
.deserializerToJavaObject(method.params(0).asInstanceOf[String], classOf[FsPath]),
MethodEntitySerializer.deserializerToJavaObject(
method.params(1).asInstanceOf[String],
classOf[FsPath]
)
)
SuccessExecuteResponse()
case "list" =>
if (method.params == null || method.params.length != 1) {
throw new StorageErrorException(
PARAMETER_CALLS.getErrorCode,
PARAMETER_CALLS.getErrorDesc
)
}
val fs = getUserFS(method)
val dest = MethodEntitySerializer.deserializerToJavaObject(
method.params(0).asInstanceOf[String],
classOf[FsPath]
)
AliasOutputExecuteResponse(
method.id.toString,
StorageUtils.serializerStringToResult(
MethodEntitySerializer.serializerJavaObject(fs.list(dest))
)
)
case "listPathWithError" =>
if (method.params == null || method.params.length != 1) {
throw new StorageErrorException(
PARAMETER_CALLS.getErrorCode,
PARAMETER_CALLS.getErrorDesc
)
}
val fs = getUserFS(method).asInstanceOf[FileSystem]
val dest = MethodEntitySerializer.deserializerToJavaObject(
method.params(0).asInstanceOf[String],
classOf[FsPath]
)
AliasOutputExecuteResponse(
method.id.toString,
StorageUtils.serializerStringToResult(
MethodEntitySerializer.serializerJavaObject(fs.listPathWithError(dest))
)
)
case "get" | "create" | "isOwner" =>
invokeMethod(method, classOf[String], jobID)
case _ =>
invokeMethod(method, classOf[FsPath], jobID)
}
logger.info(
s"jobID($jobID):creator ${method.creatorUser} proxy user: ${method.proxyUser} finished to execute a method: ${method.methodName}.,fsId=${method.id}"
)
executeResponse
}