in linkis-extensions/linkis-io-file-client/src/main/scala/org/apache/linkis/storage/io/iteraceptor/IOMethodInterceptor.scala [158:264]
override def intercept(
o: scala.Any,
method: Method,
args: Array[AnyRef],
methodProxy: MethodProxy
): AnyRef = {
if (closed && method.getName != "close") {
throw new StorageErrorException(
STORAGE_HAS_BEEN_CLOSED.getErrorCode,
s"$fsType storage has been closed."
)
}
if (System.currentTimeMillis() - lastAccessTime >= iOEngineExecutorMaxFreeTime) synchronized {
method.getName match {
case "init" =>
case "storageName" => return fsType
case "setUser" =>
properties += StorageConfiguration.PROXY_USER.key -> args(0).asInstanceOf[String];
return Unit
case _ =>
if (inited) {
initFS()
logger.info(s"since the $fsType storage($id) is free for too long time, re-inited it.")
}
}
}
lastAccessTime = System.currentTimeMillis()
method.getName match {
case "init" =>
val user =
if (properties.contains(StorageConfiguration.PROXY_USER.key)) {
StorageConfiguration.PROXY_USER.getValue(properties.toMap)
} else {
null
}
if (args.length > 0 && args(0).isInstanceOf[java.util.Map[String, String]]) {
properties ++= args(0).asInstanceOf[java.util.Map[String, String]].asScala
}
if (StringUtils.isNoneBlank(user)) {
properties += StorageConfiguration.PROXY_USER.key -> user
}
initFS()
logger.warn(s"For user($user)inited a $fsType storage($id) .")
Unit
case "fsName" => fsType
case "setUser" =>
properties += StorageConfiguration.PROXY_USER.key -> args(0).asInstanceOf[String]; Unit
case "read" =>
if (!inited) throw new IllegalAccessException("storage has not been inited.")
new IOInputStream(args)
case "write" =>
if (!inited) throw new IllegalAccessException("storage has not been inited.")
new IOOutputStream(args)
case "renameTo" =>
if (!inited || args.length < 2) {
throw new IllegalAccessException("storage has not been inited.")
}
val params =
args.map(MethodEntitySerializer.serializerJavaObject(_)).map(_.asInstanceOf[AnyRef])
executeMethod(method.getName, params)
new java.lang.Boolean(true)
case "list" =>
if (!inited || args.length < 1) {
throw new IllegalAccessException("storage has not been inited.")
}
val params =
Array(MethodEntitySerializer.serializerJavaObject(args(0))).map(_.asInstanceOf[AnyRef])
val msg = executeMethod(method.getName, params)
MethodEntitySerializer.deserializerToJavaObject[java.util.List[FsPath]](
StorageUtils.deserializerResultToString(msg),
new TypeToken[java.util.List[FsPath]]() {}.getType
)
case "listPathWithError" =>
if (!inited || args.length < 1) {
throw new IllegalAccessException("storage has not been inited.")
}
val params =
Array(MethodEntitySerializer.serializerJavaObject(args(0))).map(_.asInstanceOf[AnyRef])
val msg = executeMethod(method.getName, params)
MethodEntitySerializer.deserializerToJavaObject[FsPathListWithError](
StorageUtils.deserializerResultToString(msg),
new TypeToken[FsPathListWithError]() {}.getType
)
case "toString" =>
this.toString
case "finalize" =>
logger.info("no support method")
Unit
case _ =>
if (!inited) throw new IllegalAccessException("storage has not been inited.")
if (method.getName == "close") {
closed = true
bindEngineLabel.setIsJobGroupEnd("true")
bindEngineLabel.setIsJobGroupHead("false")
}
val returnType = method.getReturnType
if (args.length > 0) args(0) = MethodEntitySerializer.serializerJavaObject(args(0))
val msg = executeMethod(method.getName, args)
if (returnType == Void.TYPE) return Unit
val result = MethodEntitySerializer.deserializerToJavaObject(
StorageUtils.deserializerResultToString(msg),
returnType
)
result.asInstanceOf[AnyRef]
}
}