def ingest_from_stream()

in azure-kusto-ingest/azure/kusto/ingest/managed_streaming_ingest_client.py [0:0]


    def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
        stream_descriptor = StreamDescriptor.get_instance(stream_descriptor)
        IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties)

        super().ingest_from_stream(stream_descriptor, ingestion_properties)

        stream_descriptor = BaseIngestClient._prepare_stream(stream_descriptor, ingestion_properties)
        stream = stream_descriptor.stream

        buffered_stream = read_until_size_or_end(stream, self.MAX_STREAMING_SIZE_IN_BYTES + 1)
        length = len(buffered_stream.getbuffer())

        stream_descriptor.stream = buffered_stream

        try:
            res = self._stream_with_retries(length, stream_descriptor, ingestion_properties)
            if res:
                return res
            stream_descriptor.stream = chain_streams([buffered_stream, stream])
        except KustoApiError as ex:
            error = ex.get_api_error()
            if error.permanent:
                raise
            buffered_stream.seek(0, SEEK_SET)

        return self.queued_client.ingest_from_stream(stream_descriptor, ingestion_properties)