in ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceManager.java [137:171]
private void refreshIngestionResources() throws IngestionClientException, IngestionServiceException {
// Here we use tryLock(): If there is another instance doing the refresh, then just skip it.
if (ingestionResourcesLock.writeLock().tryLock()) {
try {
log.info("Refreshing Ingestion Resources");
IngestionResourceSet newIngestionResourceSet = new IngestionResourceSet();
Retry retry = Retry.of("get ingestion resources", taskRetryConfig);
CheckedFunction0<KustoOperationResult> retryExecute = Retry.decorateCheckedSupplier(retry,
() -> client.executeMgmt(Commands.INGESTION_RESOURCES_SHOW_COMMAND));
KustoOperationResult ingestionResourcesResults = retryExecute.apply();
if (ingestionResourcesResults != null) {
KustoResultSetTable table = ingestionResourcesResults.getPrimaryResults();
// Add the received values to the new ingestion resources
while (table.next()) {
String resourceTypeName = table.getString(0);
String storageUrl = table.getString(1);
addIngestionResource(newIngestionResourceSet, resourceTypeName, storageUrl);
}
}
populateStorageAccounts(newIngestionResourceSet);
ingestionResourceSet = newIngestionResourceSet;
refreshedAtLeastOnce.clear();
refreshedAtLeastOnce.put(true);
log.info("Refreshing Ingestion Resources Finished");
} catch (DataServiceException e) {
throw new IngestionServiceException(e.getIngestionSource(), "Error refreshing IngestionResources. " + e.getMessage(), e);
} catch (DataClientException e) {
throw new IngestionClientException(e.getIngestionSource(), "Error refreshing IngestionResources. " + e.getMessage(), e);
} catch (Throwable e) {
throw new IngestionClientException(e.getMessage(), e);
} finally {
ingestionResourcesLock.writeLock().unlock();
}
}
}