in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java [196:274]
private static void validateTableAccess(Client engineClient, TopicToTableMapping mapping, KustoSinkConfig config, List<String> databaseTableErrorList,
List<String> accessErrorList) {
String database = mapping.getDb();
String table = mapping.getTable();
String format = mapping.getFormat();
String mappingName = mapping.getMapping();
boolean streamingEnabled = mapping.isStreaming();
if (isDataFormatAnyTypeOfJson(format)) {
format = IngestionProperties.DataFormat.JSON.name();
}
boolean hasAccess = false;
boolean shouldCheckStreaming = streamingEnabled;
try {
if (shouldCheckStreaming && isStreamingPolicyEnabled(DATABASE, database, engineClient, database)) {
shouldCheckStreaming = false;
}
try {
KustoOperationResult rs = engineClient.execute(database, String.format(FETCH_TABLE_COMMAND, table),
validateOnlyClientRequestProperties);
if (VALIDATION_OK.equals(rs.getPrimaryResults().getData().get(0).get(0))) {
hasAccess = true;
}
} catch (DataServiceException e) {
databaseTableErrorList.add(String.format("Couldn't validate access to Database '%s' Table '%s', with exception '%s'", database, table,
ExceptionUtils.getStackTrace(e)));
}
if (hasAccess && StringUtils.isNotEmpty(mappingName)) {
try {
engineClient.execute(database, String.format(FETCH_TABLE_MAPPING_COMMAND, table,
format.toLowerCase(Locale.ROOT), mappingName));
} catch (DataServiceException e) {
hasAccess = false;
databaseTableErrorList.add(String.format("Database:%s Table:%s | %s mapping '%s' not found, with exception '%s'", database, table, format,
mappingName, ExceptionUtils.getStackTrace(e)));
}
}
if (hasAccess) {
// TODO check this for managed identity
if (StringUtils.isEmpty(config.getAuthAppId()) || StringUtils.isEmpty(config.getAuthAuthority())) {
throw new ConfigException("Authority ID and Application ID must be provided to validate table accesses.");
}
String authenticateWith = String.format("aadapp=%s;%s", config.getAuthAppId(),
config.getAuthAuthority());
String query = String.format(FETCH_PRINCIPAL_ROLES_COMMAND, authenticateWith, database, table);
try {
KustoOperationResult rs = engineClient.execute(database, query);
hasAccess = (boolean) rs.getPrimaryResults().getData().get(0).get(INGESTION_ALLOWED_INDEX);
if (hasAccess) {
log.info("User has appropriate permissions to sink data into the Kusto table={}", table);
} else {
accessErrorList.add(String.format("User does not have appropriate permissions " +
"to sink data into the Kusto database %s", database));
}
} catch (DataServiceException e) {
// Logging the error so that the trace is not lost.
if (!e.getCause().toString().contains("Forbidden")) {
databaseTableErrorList.add(
String.format("Fetching principal roles using query '%s' resulted in exception '%s'", query, ExceptionUtils.getStackTrace(e)));
} else {
log.warn(
"Failed to check permissions with query '{}', will continue the run as the principal might still be able to ingest",
query, e);
}
}
}
if (hasAccess && shouldCheckStreaming
&& !isStreamingPolicyEnabled(MAPPING_TABLE, table, engineClient, database)) {
databaseTableErrorList.add(String.format("Ingestion is configured as streaming, but a streaming" +
" ingestion policy was not found on either database '%s' or table '%s'", database, table));
}
} catch (KustoDataExceptionBase e) {
throw new ConnectException("Unable to connect to ADX(Kusto) instance", e);
}
}