private static void validateTableAccess()

in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java [196:274]


    private static void validateTableAccess(Client engineClient, TopicToTableMapping mapping, KustoSinkConfig config, List<String> databaseTableErrorList,
            List<String> accessErrorList) {
        String database = mapping.getDb();
        String table = mapping.getTable();
        String format = mapping.getFormat();
        String mappingName = mapping.getMapping();
        boolean streamingEnabled = mapping.isStreaming();
        if (isDataFormatAnyTypeOfJson(format)) {
            format = IngestionProperties.DataFormat.JSON.name();
        }
        boolean hasAccess = false;
        boolean shouldCheckStreaming = streamingEnabled;

        try {
            if (shouldCheckStreaming && isStreamingPolicyEnabled(DATABASE, database, engineClient, database)) {
                shouldCheckStreaming = false;
            }
            try {
                KustoOperationResult rs = engineClient.execute(database, String.format(FETCH_TABLE_COMMAND, table),
                        validateOnlyClientRequestProperties);
                if (VALIDATION_OK.equals(rs.getPrimaryResults().getData().get(0).get(0))) {
                    hasAccess = true;
                }
            } catch (DataServiceException e) {
                databaseTableErrorList.add(String.format("Couldn't validate access to Database '%s' Table '%s', with exception '%s'", database, table,
                        ExceptionUtils.getStackTrace(e)));
            }

            if (hasAccess && StringUtils.isNotEmpty(mappingName)) {
                try {
                    engineClient.execute(database, String.format(FETCH_TABLE_MAPPING_COMMAND, table,
                            format.toLowerCase(Locale.ROOT), mappingName));
                } catch (DataServiceException e) {
                    hasAccess = false;
                    databaseTableErrorList.add(String.format("Database:%s Table:%s | %s mapping '%s' not found, with exception '%s'", database, table, format,
                            mappingName, ExceptionUtils.getStackTrace(e)));
                }
            }

            if (hasAccess) {
                // TODO check this for managed identity
                if (StringUtils.isEmpty(config.getAuthAppId()) || StringUtils.isEmpty(config.getAuthAuthority())) {
                    throw new ConfigException("Authority ID and Application ID must be provided to validate table accesses.");
                }

                String authenticateWith = String.format("aadapp=%s;%s", config.getAuthAppId(),
                        config.getAuthAuthority());
                String query = String.format(FETCH_PRINCIPAL_ROLES_COMMAND, authenticateWith, database, table);
                try {
                    KustoOperationResult rs = engineClient.execute(database, query);
                    hasAccess = (boolean) rs.getPrimaryResults().getData().get(0).get(INGESTION_ALLOWED_INDEX);
                    if (hasAccess) {
                        log.info("User has appropriate permissions to sink data into the Kusto table={}", table);
                    } else {
                        accessErrorList.add(String.format("User does not have appropriate permissions " +
                                "to sink data into the Kusto database %s", database));
                    }
                } catch (DataServiceException e) {
                    // Logging the error so that the trace is not lost.
                    if (!e.getCause().toString().contains("Forbidden")) {
                        databaseTableErrorList.add(
                                String.format("Fetching principal roles using query '%s' resulted in exception '%s'", query, ExceptionUtils.getStackTrace(e)));
                    } else {
                        log.warn(
                                "Failed to check permissions with query '{}', will continue the run as the principal might still be able to ingest",
                                query, e);
                    }
                }
            }
            if (hasAccess && shouldCheckStreaming
                    && !isStreamingPolicyEnabled(MAPPING_TABLE, table, engineClient, database)) {
                databaseTableErrorList.add(String.format("Ingestion is configured as streaming, but a streaming" +
                        " ingestion policy was not found on either database '%s' or table '%s'", database, table));
            }

        } catch (KustoDataExceptionBase e) {
            throw new ConnectException("Unable to connect to ADX(Kusto) instance", e);
        }
    }