def ingest_from_blob()

in azure-kusto-ingest/azure/kusto/ingest/ingest_client.py [0:0]


    def ingest_from_blob(self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties) -> IngestionResult:
        """Enqueue an ingest command from azure blobs.
        To learn more about ingestion methods go to:
        https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
        :param azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested.
        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
        """
        IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties)

        if self._is_closed:
            raise KustoClosedError()

        queues = self._resource_manager.get_ingestion_queues()

        authorization_context = self._resource_manager.get_authorization_context()
        ingestion_blob_info = IngestionBlobInfo(
            blob_descriptor,
            ingestion_properties=ingestion_properties,
            auth_context=authorization_context,
            application_for_tracing=self.application_for_tracing,
            client_version_for_tracing=self.client_version_for_tracing,
        )
        ingestion_blob_info_json = ingestion_blob_info.to_json()
        retries_left = min(self._MAX_RETRIES, len(queues))
        for queue in queues:
            try:
                with QueueServiceClient(queue.account_uri, proxies=self._proxy_dict) as queue_service:
                    with queue_service.get_queue_client(queue=queue.object_name, message_encode_policy=TextBase64EncodePolicy()) as queue_client:
                        # trace enqueuing of blob for ingestion
                        invoker = lambda: queue_client.send_message(content=ingestion_blob_info_json, timeout=self._SERVICE_CLIENT_TIMEOUT_SECONDS)
                        enqueue_trace_attributes = IngestTracingAttributes.create_enqueue_request_attributes(queue_client.queue_name, blob_descriptor.source_id)
                        MonitoredActivity.invoke(invoker, name_of_span="QueuedIngestClient.enqueue_request", tracing_attributes=enqueue_trace_attributes)

                self._resource_manager.report_resource_usage_result(queue.storage_account_name, True)
                return IngestionResult(
                    IngestionStatus.QUEUED, ingestion_properties.database, ingestion_properties.table, blob_descriptor.source_id, blob_descriptor.path
                )
            except Exception as e:
                retries_left = retries_left - 1
                # TODO: log the retry once we have a proper logging system
                self._resource_manager.report_resource_usage_result(queue.storage_account_name, False)
                if retries_left == 0:
                    raise KustoQueueError() from e