in azure-kusto-ingest/azure/kusto/ingest/managed_streaming_ingest_client.py [0:0]
def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
stream_descriptor = StreamDescriptor.get_instance(stream_descriptor)
IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties)
super().ingest_from_stream(stream_descriptor, ingestion_properties)
stream_descriptor = BaseIngestClient._prepare_stream(stream_descriptor, ingestion_properties)
stream = stream_descriptor.stream
buffered_stream = read_until_size_or_end(stream, self.MAX_STREAMING_SIZE_IN_BYTES + 1)
length = len(buffered_stream.getbuffer())
stream_descriptor.stream = buffered_stream
try:
res = self._stream_with_retries(length, stream_descriptor, ingestion_properties)
if res:
return res
stream_descriptor.stream = chain_streams([buffered_stream, stream])
except KustoApiError as ex:
error = ex.get_api_error()
if error.permanent:
raise
buffered_stream.seek(0, SEEK_SET)
return self.queued_client.ingest_from_stream(stream_descriptor, ingestion_properties)