in connector/src/main/scala/com/microsoft/kusto/spark/utils/ContainerProvider.scala [84:124]
private def refresh(exportContainer: Boolean = false): ContainerAndSas = {
if (exportContainer) {
Try(client.executeDM(command, None, "refreshContainers", Some(retryConfigExportContainers))) match {
case Success(res) =>
val storage = res.getPrimaryResults.getData.asScala.map(row => {
val parts = row.get(0).toString.split('?')
ContainerAndSas(parts(0), s"?${parts(1)}")
})
processContainerResults(storage)
case Failure(exception) =>
KDSU.reportExceptionAndThrow(
className,
exception,
"Error querying for create export containers",
clusterAlias,
shouldNotThrow = storageUris.nonEmpty)
storageUris(roundRobinIdx)
}
} else {
val retryExecute: CheckedFunction0[ContainerAndSas] = Retry.decorateCheckedSupplier(
Retry.of("refresh ingestion resources", retryConfigIngestionRefresh),
() => {
Try(client.ingestClient.getResourceManager.getShuffledContainers) match {
case Success(res) =>
val storage = res.asScala.map(row => {
ContainerAndSas(row.getContainer.getBlobContainerUrl, s"${row.getSas}")
})
processContainerResults(storage)
case Failure(exception) =>
KDSU.reportExceptionAndThrow(
className,
exception,
"Error querying for create tempstorage",
clusterAlias,
shouldNotThrow = storageUris.nonEmpty)
storageUris(roundRobinIdx)
}
})
retryExecute.apply()
}
}