in repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java [670:809]
private List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit, boolean isAgeoutTransaction, boolean createEventsAgeoutAllowed, boolean allowAgeoutByAuditCount, List<EntityAuditEventV2> eventsToKeep) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={})",
entityId, auditAction, sortByColumn, sortOrderDesc, offset, limit);
}
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("listEventsV2");
if (sortByColumn == null) {
sortByColumn = EntityAuditEventV2.SORT_COLUMN_TIMESTAMP;
}
if (offset < 0) {
offset = 0;
}
if (!isAgeoutTransaction && limit < 0) {
limit = 100;
}
try (Table table = connection.getTable(tableName)) {
/*
* HBase Does not support query with sorted results. To support this API inmemory sort has to be performed.
* Audit entry can potentially have entire entity dumped into it. Loading entire audit entries for an entity can be
* memory intensive. Therefore we load audit entries with limited columns first, perform sort on this light weight list,
* then get the relevant section by removing offsets and reducing to limits. With this reduced list we create
* MultiRowRangeFilter and then again scan the table to get all the columns from the table this time.
*/
Scan scan = new Scan().setReversed(true)
.setCaching(DEFAULT_CACHING)
.setSmall(true)
.setStopRow(Bytes.toBytes(entityId))
.setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE))
.addColumn(COLUMN_FAMILY, COLUMN_ACTION)
.addColumn(COLUMN_FAMILY, COLUMN_USER);
FilterList filterList = new FilterList();
if (auditAction != null) {
Filter filterAction = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(auditAction.toString())));
filterList.addFilter(filterAction);
}
if (!createEventsAgeoutAllowed) {
FilterList createEventFilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
Filter filterByCreateActionType = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(EntityAuditActionV2.ENTITY_CREATE.toString())));
Filter filterByImportCreateActionType = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(EntityAuditActionV2.ENTITY_IMPORT_CREATE.toString())));
createEventFilterList.addFilter(filterByCreateActionType);
createEventFilterList.addFilter(filterByImportCreateActionType);
filterList.addFilter(createEventFilterList);
}
scan.setFilter(filterList);
List<EntityAuditEventV2> events = new ArrayList<>();
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result = scanner.next(); result != null; result = scanner.next()) {
EntityAuditEventV2 event = fromKeyV2(result.getRow());
event.setUser(getResultString(result, COLUMN_USER));
event.setAction(EntityAuditActionV2.fromString(getResultString(result, COLUMN_ACTION)));
events.add(event);
}
}
EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);
int fromIndex = Math.min(events.size(), offset);
int endIndex = events.size();
if (limit > 0) {
endIndex = Math.min(events.size(), offset + limit);
}
if (isAgeoutTransaction) {
if (!allowAgeoutByAuditCount) { //No audit events allowed to age-out by audit count
eventsToKeep.addAll(events);
return Collections.emptyList();
}
eventsToKeep.addAll(events.subList(0, fromIndex));
}
events = events.subList(fromIndex, endIndex);
if (!events.isEmpty()) {
List<MultiRowRangeFilter.RowRange> ranges = new ArrayList<>();
events.forEach(e -> ranges.add(new MultiRowRangeFilter.RowRange(e.getEventKey(), true, e.getEventKey(), true)));
scan = new Scan().setReversed(true)
.setCaching(DEFAULT_CACHING)
.setSmall(true)
.setStopRow(Bytes.toBytes(entityId))
.setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE))
.setFilter(new MultiRowRangeFilter(ranges));
try (ResultScanner scanner = table.getScanner(scan)) {
events = new ArrayList<>();
for (Result result = scanner.next(); result != null; result = scanner.next()) {
EntityAuditEventV2 event = fromKeyV2(result.getRow());
event.setUser(getResultString(result, COLUMN_USER));
event.setAction(EntityAuditActionV2.fromString(getResultString(result, COLUMN_ACTION)));
event.setDetails(getResultString(result, COLUMN_DETAIL));
if (PERSIST_ENTITY_DEFINITION) {
String colDef = getResultString(result, COLUMN_DEFINITION);
if (colDef != null) {
event.setEntityDefinition(colDef);
}
}
events.add(event);
}
}
EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={}): #records returned {}",
entityId, auditAction, sortByColumn, sortOrderDesc, offset, limit, events.size());
}
return events;
} catch (IOException e) {
throw new AtlasBaseException(e);
} finally {
RequestContext.get().endMetricRecord(metric);
}
}