in streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala [86:295]
def sql(sql: String = null)(implicit callback: String => Unit = null): Unit =
FlinkSqlExecutor.executeSql(sql, parameter, this)
// ...streamEnv api start...
def getJavaEnv: JavaStreamExecutionEnvironment = this.streamEnv.getJavaEnv
def $getCachedFiles: JavaList[tuple.Tuple2[String, DistributedCache.DistributedCacheEntry]] =
this.streamEnv.getCachedFiles
def $getJobListeners: JavaList[JobListener] = this.streamEnv.getJobListeners
def $setParallelism(parallelism: Int): Unit = this.streamEnv.setParallelism(parallelism)
def $setRuntimeMode(executionMode: RuntimeExecutionMode): StreamExecutionEnvironment =
this.streamEnv.setRuntimeMode(executionMode)
def $setMaxParallelism(maxParallelism: Int): Unit =
this.streamEnv.setMaxParallelism(maxParallelism)
def $getParallelism: Int = this.streamEnv.getParallelism
def $getMaxParallelism: Int = this.streamEnv.getMaxParallelism
def $setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment =
this.streamEnv.setBufferTimeout(timeoutMillis)
def $getBufferTimeout: Long = this.streamEnv.getBufferTimeout
def $disableOperatorChaining(): StreamExecutionEnvironment =
this.streamEnv.disableOperatorChaining()
def $getCheckpointConfig: CheckpointConfig = this.streamEnv.getCheckpointConfig
def $enableCheckpointing(interval: Long, mode: CheckpointingMode): StreamExecutionEnvironment =
this.streamEnv.enableCheckpointing(interval, mode)
def $enableCheckpointing(interval: Long): StreamExecutionEnvironment =
this.streamEnv.enableCheckpointing(interval)
def $getCheckpointingMode: CheckpointingMode = this.streamEnv.getCheckpointingMode
def $setStateBackend(backend: StateBackend): StreamExecutionEnvironment =
this.streamEnv.setStateBackend(backend)
def $getStateBackend: StateBackend = this.streamEnv.getStateBackend
def $setRestartStrategy(
restartStrategyConfiguration: RestartStrategies.RestartStrategyConfiguration): Unit =
this.streamEnv.setRestartStrategy(restartStrategyConfiguration)
def $getRestartStrategy: RestartStrategies.RestartStrategyConfiguration =
this.streamEnv.getRestartStrategy
def $setNumberOfExecutionRetries(numRetries: Int): Unit =
this.streamEnv.setNumberOfExecutionRetries(numRetries)
def $getNumberOfExecutionRetries: Int = this.streamEnv.getNumberOfExecutionRetries
def $addDefaultKryoSerializer[T <: Serializer[_] with Serializable](
`type`: Class[_],
serializer: T): Unit =
this.streamEnv.addDefaultKryoSerializer(`type`, serializer)
def $addDefaultKryoSerializer(
`type`: Class[_],
serializerClass: Class[_ <: Serializer[_]]): Unit =
this.streamEnv.addDefaultKryoSerializer(`type`, serializerClass)
def $registerTypeWithKryoSerializer[T <: Serializer[_] with Serializable](
clazz: Class[_],
serializer: T): Unit =
this.streamEnv.registerTypeWithKryoSerializer(clazz, serializer)
def $registerTypeWithKryoSerializer(
clazz: Class[_],
serializer: Class[_ <: Serializer[_]]): Unit =
this.streamEnv.registerTypeWithKryoSerializer(clazz, serializer)
def $registerType(typeClass: Class[_]): Unit = this.streamEnv.registerType(typeClass)
def $getStreamTimeCharacteristic: TimeCharacteristic = this.streamEnv.getStreamTimeCharacteristic
def $configure(configuration: ReadableConfig, classLoader: ClassLoader): Unit =
this.streamEnv.configure(configuration, classLoader)
def $fromSequence(from: Long, to: Long): DataStream[Long] = this.streamEnv.fromSequence(from, to)
def $fromElements[T](data: T*)(implicit info: TypeInformation[T]): DataStream[T] =
this.streamEnv.fromElements(data: _*)
def $fromCollection[T](data: Seq[T])(implicit info: TypeInformation[T]): DataStream[T] =
this.streamEnv.fromCollection(data)
def $fromCollection[T](data: Iterator[T])(implicit info: TypeInformation[T]): DataStream[T] =
this.streamEnv.fromCollection(data)
def $fromParallelCollection[T](data: SplittableIterator[T])(implicit
info: TypeInformation[T]): DataStream[T] = this.streamEnv.fromParallelCollection(data)
def $readTextFile(filePath: String): DataStream[String] = this.streamEnv.readTextFile(filePath)
def $readTextFile(filePath: String, charsetName: String): DataStream[String] =
this.streamEnv.readTextFile(filePath, charsetName)
def $readFile[T](inputFormat: FileInputFormat[T], filePath: String)(implicit
info: TypeInformation[T]): DataStream[T] =
this.streamEnv.readFile(inputFormat, filePath)
def $readFile[T](
inputFormat: FileInputFormat[T],
filePath: String,
watchType: FileProcessingMode,
interval: Long)(implicit info: TypeInformation[T]): DataStream[T] =
this.streamEnv.readFile(inputFormat, filePath, watchType, interval)
def $socketTextStream(
hostname: String,
port: Int,
delimiter: Char,
maxRetry: Long): DataStream[String] =
this.streamEnv.socketTextStream(hostname, port, delimiter, maxRetry)
def $createInput[T](inputFormat: InputFormat[T, _])(implicit
info: TypeInformation[T]): DataStream[T] = this.streamEnv.createInput(inputFormat)
def $addSource[T](function: SourceFunction[T])(implicit info: TypeInformation[T]): DataStream[T] =
this.streamEnv.addSource(function)
def $addSource[T](function: SourceFunction.SourceContext[T] => Unit)(implicit
info: TypeInformation[T]): DataStream[T] = this.streamEnv.addSource(function)
def $fromSource[T](
source: Source[T, _ <: SourceSplit, _],
watermarkStrategy: WatermarkStrategy[T],
sourceName: String)(implicit info: TypeInformation[T]): DataStream[T] =
this.streamEnv.fromSource(source, watermarkStrategy, sourceName)
def $registerJobListener(jobListener: JobListener): Unit =
this.streamEnv.registerJobListener(jobListener)
def $clearJobListeners(): Unit = this.streamEnv.clearJobListeners()
def $executeAsync(): JobClient = this.streamEnv.executeAsync()
def $executeAsync(jobName: String): JobClient = this.streamEnv.executeAsync(jobName)
def $getExecutionPlan: String = this.streamEnv.getExecutionPlan
def $getStreamGraph: StreamGraph = this.streamEnv.getStreamGraph
def $getWrappedStreamExecutionEnvironment: JavaStreamExecutionEnvironment =
this.streamEnv.getWrappedStreamExecutionEnvironment
def $registerCachedFile(filePath: String, name: String): Unit =
this.streamEnv.registerCachedFile(filePath, name)
def $registerCachedFile(filePath: String, name: String, executable: Boolean): Unit =
this.streamEnv.registerCachedFile(filePath, name, executable)
def $isUnalignedCheckpointsEnabled: Boolean = this.streamEnv.isUnalignedCheckpointsEnabled
def $isForceUnalignedCheckpoints: Boolean = this.streamEnv.isForceUnalignedCheckpoints
@deprecated def $enableCheckpointing(
interval: Long,
mode: CheckpointingMode,
force: Boolean): StreamExecutionEnvironment =
this.streamEnv.enableCheckpointing(interval, mode, force)
@deprecated def $enableCheckpointing(): StreamExecutionEnvironment =
this.streamEnv.enableCheckpointing()
@deprecated def $generateSequence(from: Long, to: Long): DataStream[Long] =
this.streamEnv.generateSequence(from, to)
@deprecated def $readFileStream(
StreamPath: String,
intervalMillis: Long,
watchType: FileMonitoringFunction.WatchType): DataStream[String] =
this.streamEnv.readFileStream(StreamPath, intervalMillis, watchType)
@deprecated def $readFile[T](
inputFormat: FileInputFormat[T],
filePath: String,
watchType: FileProcessingMode,
interval: Long,
filter: FilePathFilter)(implicit info: TypeInformation[T]): DataStream[T] =
this.streamEnv.readFile(inputFormat, filePath, watchType, interval, filter)
// ...streamEnv api end...
override def fromDataStream[T](dataStream: DataStream[T]): Table =
tableEnv.fromDataStream(dataStream)
override def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table =
tableEnv.fromDataStream(dataStream, fields: _*)
override def createTemporaryView[T](path: String, dataStream: DataStream[T]): Unit =
tableEnv.createTemporaryView(path, dataStream)
override def createTemporaryView[T](
path: String,
dataStream: DataStream[T],
fields: Expression*): Unit =
tableEnv.createTemporaryView(path, dataStream, fields: _*)
override def toAppendStream[T](table: Table)(implicit info: TypeInformation[T]): DataStream[T] = {
isConvertedToDataStream = true
tableEnv.toAppendStream(table)
}