in azure-kusto-ingest/azure/kusto/ingest/ingest_client.py [0:0]
def ingest_from_blob(self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties) -> IngestionResult:
"""Enqueue an ingest command from azure blobs.
To learn more about ingestion methods go to:
https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
:param azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested.
:param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
"""
IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties)
if self._is_closed:
raise KustoClosedError()
queues = self._resource_manager.get_ingestion_queues()
authorization_context = self._resource_manager.get_authorization_context()
ingestion_blob_info = IngestionBlobInfo(
blob_descriptor,
ingestion_properties=ingestion_properties,
auth_context=authorization_context,
application_for_tracing=self.application_for_tracing,
client_version_for_tracing=self.client_version_for_tracing,
)
ingestion_blob_info_json = ingestion_blob_info.to_json()
retries_left = min(self._MAX_RETRIES, len(queues))
for queue in queues:
try:
with QueueServiceClient(queue.account_uri, proxies=self._proxy_dict) as queue_service:
with queue_service.get_queue_client(queue=queue.object_name, message_encode_policy=TextBase64EncodePolicy()) as queue_client:
# trace enqueuing of blob for ingestion
invoker = lambda: queue_client.send_message(content=ingestion_blob_info_json, timeout=self._SERVICE_CLIENT_TIMEOUT_SECONDS)
enqueue_trace_attributes = IngestTracingAttributes.create_enqueue_request_attributes(queue_client.queue_name, blob_descriptor.source_id)
MonitoredActivity.invoke(invoker, name_of_span="QueuedIngestClient.enqueue_request", tracing_attributes=enqueue_trace_attributes)
self._resource_manager.report_resource_usage_result(queue.storage_account_name, True)
return IngestionResult(
IngestionStatus.QUEUED, ingestion_properties.database, ingestion_properties.table, blob_descriptor.source_id, blob_descriptor.path
)
except Exception as e:
retries_left = retries_left - 1
# TODO: log the retry once we have a proper logging system
self._resource_manager.report_resource_usage_result(queue.storage_account_name, False)
if retries_left == 0:
raise KustoQueueError() from e