private def streamBytesIntoKusto()

in connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala [446:482]


  private def streamBytesIntoKusto(
      batchIdForTracing: String,
      bytes: Array[Byte],
      ingestionProperties: IngestionProperties,
      writeOptions: WriteOptions,
      streamingClient: ManagedStreamingIngestClient,
      inputStreamLastIdx: Int): Unit = {
    KDSU.retryApplyFunction(
      i => {
        val inputStream = new ByteArrayInputStream(bytes, 0, inputStreamLastIdx)
        // The SDK will compress the stream by default.
        val streamSourceInfo = new StreamSourceInfo(inputStream)
        Try(streamingClient.ingestFromStream(streamSourceInfo, ingestionProperties)) match {
          case Success(status) =>
            status.getIngestionStatusCollection.forEach(ingestionStatus => {
              KDSU.logInfo(
                className,
                s"BatchId $batchIdForTracing IngestionStatus { " +
                  s"status: '${ingestionStatus.status.toString}', " +
                  s"details: ${ingestionStatus.details}, " +
                  s"activityId: ${ingestionStatus.activityId}, " +
                  s"errorCode: ${ingestionStatus.errorCode}, " +
                  s"errorCodeString: ${ingestionStatus.errorCodeString}," +
                  s"retry: $i" +
                  "}")
            })
          case Failure(e: Throwable) =>
            KDSU.reportExceptionAndThrow(
              className,
              e,
              "Streaming ingestion in partition " +
                s"${TaskContext.getPartitionId.toString} for requestId: '${writeOptions.requestId} failed")
        }
      },
      this.retryConfig,
      "Streaming ingest to Kusto")
  }