private List listEventsV2()

in repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java [670:809]


    private List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit, boolean isAgeoutTransaction, boolean createEventsAgeoutAllowed, boolean allowAgeoutByAuditCount, List<EntityAuditEventV2> eventsToKeep) throws AtlasBaseException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={})",
                    entityId, auditAction, sortByColumn, sortOrderDesc, offset, limit);
        }

        AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("listEventsV2");

        if (sortByColumn == null) {
            sortByColumn = EntityAuditEventV2.SORT_COLUMN_TIMESTAMP;
        }

        if (offset < 0) {
            offset = 0;
        }

        if (!isAgeoutTransaction && limit < 0) {
            limit = 100;
        }

        try (Table table = connection.getTable(tableName)) {
            /*
             * HBase Does not support query with sorted results. To support this API inmemory sort has to be performed.
             * Audit entry can potentially have entire entity dumped into it. Loading entire audit entries for an entity can be
             * memory intensive. Therefore we load audit entries with limited columns first, perform sort on this light weight list,
             * then get the relevant section by removing offsets and reducing to limits. With this reduced list we create
             * MultiRowRangeFilter and then again scan the table to get all the columns from the table this time.
             */
            Scan scan = new Scan().setReversed(true)
                    .setCaching(DEFAULT_CACHING)
                    .setSmall(true)
                    .setStopRow(Bytes.toBytes(entityId))
                    .setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE))
                    .addColumn(COLUMN_FAMILY, COLUMN_ACTION)
                    .addColumn(COLUMN_FAMILY, COLUMN_USER);

            FilterList filterList = new FilterList();

            if (auditAction != null) {
                Filter filterAction = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(auditAction.toString())));

                filterList.addFilter(filterAction);
            }

            if (!createEventsAgeoutAllowed) {
                FilterList createEventFilterList          = new FilterList(FilterList.Operator.MUST_PASS_ALL);
                Filter     filterByCreateActionType       = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(EntityAuditActionV2.ENTITY_CREATE.toString())));
                Filter     filterByImportCreateActionType = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(EntityAuditActionV2.ENTITY_IMPORT_CREATE.toString())));

                createEventFilterList.addFilter(filterByCreateActionType);
                createEventFilterList.addFilter(filterByImportCreateActionType);

                filterList.addFilter(createEventFilterList);
            }

            scan.setFilter(filterList);

            List<EntityAuditEventV2> events = new ArrayList<>();

            try (ResultScanner scanner = table.getScanner(scan)) {
                for (Result result = scanner.next(); result != null; result = scanner.next()) {
                    EntityAuditEventV2 event = fromKeyV2(result.getRow());

                    event.setUser(getResultString(result, COLUMN_USER));
                    event.setAction(EntityAuditActionV2.fromString(getResultString(result, COLUMN_ACTION)));

                    events.add(event);
                }
            }

            EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);

            int fromIndex = Math.min(events.size(), offset);
            int endIndex  = events.size();

            if (limit > 0) {
                endIndex = Math.min(events.size(), offset + limit);
            }

            if (isAgeoutTransaction) {
                if (!allowAgeoutByAuditCount) { //No audit events allowed to age-out by audit count
                    eventsToKeep.addAll(events);

                    return Collections.emptyList();
                }

                eventsToKeep.addAll(events.subList(0, fromIndex));
            }

            events = events.subList(fromIndex, endIndex);

            if (!events.isEmpty()) {
                List<MultiRowRangeFilter.RowRange> ranges = new ArrayList<>();

                events.forEach(e -> ranges.add(new MultiRowRangeFilter.RowRange(e.getEventKey(), true, e.getEventKey(), true)));

                scan = new Scan().setReversed(true)
                        .setCaching(DEFAULT_CACHING)
                        .setSmall(true)
                        .setStopRow(Bytes.toBytes(entityId))
                        .setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE))
                        .setFilter(new MultiRowRangeFilter(ranges));

                try (ResultScanner scanner = table.getScanner(scan)) {
                    events = new ArrayList<>();

                    for (Result result = scanner.next(); result != null; result = scanner.next()) {
                        EntityAuditEventV2 event = fromKeyV2(result.getRow());

                        event.setUser(getResultString(result, COLUMN_USER));
                        event.setAction(EntityAuditActionV2.fromString(getResultString(result, COLUMN_ACTION)));
                        event.setDetails(getResultString(result, COLUMN_DETAIL));

                        if (PERSIST_ENTITY_DEFINITION) {
                            String colDef = getResultString(result, COLUMN_DEFINITION);

                            if (colDef != null) {
                                event.setEntityDefinition(colDef);
                            }
                        }

                        events.add(event);
                    }
                }

                EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);
            }

            if (LOG.isDebugEnabled()) {
                LOG.debug("<== HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={}): #records returned {}",
                        entityId, auditAction, sortByColumn, sortOrderDesc, offset, limit, events.size());
            }

            return events;
        } catch (IOException e) {
            throw new AtlasBaseException(e);
        } finally {
            RequestContext.get().endMetricRecord(metric);
        }
    }