in connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala [446:482]
private def streamBytesIntoKusto(
batchIdForTracing: String,
bytes: Array[Byte],
ingestionProperties: IngestionProperties,
writeOptions: WriteOptions,
streamingClient: ManagedStreamingIngestClient,
inputStreamLastIdx: Int): Unit = {
KDSU.retryApplyFunction(
i => {
val inputStream = new ByteArrayInputStream(bytes, 0, inputStreamLastIdx)
// The SDK will compress the stream by default.
val streamSourceInfo = new StreamSourceInfo(inputStream)
Try(streamingClient.ingestFromStream(streamSourceInfo, ingestionProperties)) match {
case Success(status) =>
status.getIngestionStatusCollection.forEach(ingestionStatus => {
KDSU.logInfo(
className,
s"BatchId $batchIdForTracing IngestionStatus { " +
s"status: '${ingestionStatus.status.toString}', " +
s"details: ${ingestionStatus.details}, " +
s"activityId: ${ingestionStatus.activityId}, " +
s"errorCode: ${ingestionStatus.errorCode}, " +
s"errorCodeString: ${ingestionStatus.errorCodeString}," +
s"retry: $i" +
"}")
})
case Failure(e: Throwable) =>
KDSU.reportExceptionAndThrow(
className,
e,
"Streaming ingestion in partition " +
s"${TaskContext.getPartitionId.toString} for requestId: '${writeOptions.requestId} failed")
}
},
this.retryConfig,
"Streaming ingest to Kusto")
}