in metacat-main/src/main/java/com/netflix/metacat/main/services/search/ElasticSearchRefresh.java [375:497]
private void deleteUnmarkedEntities(final List<QualifiedName> qNames,
final List<QualifiedName> excludeQualifiedNames) {
log.info("Start: Delete unmarked entities");
//
// get unmarked qualified names
// check if it not exists
// delete
//
elasticSearchUtil.refresh();
final MetacatRequestContext context = MetacatRequestContext.builder().userName("admin").
clientAppName("metacat-refresh")
.apiUri("esRefresh")
.scheme("internal").build();
final List<DatabaseDto> unmarkedDatabaseDtos = elasticSearchUtil
.getQualifiedNamesByMarkerByNames("database", qNames, refreshMarker, excludeQualifiedNames,
DatabaseDto.class);
if (!unmarkedDatabaseDtos.isEmpty()) {
if (unmarkedDatabaseDtos.size() <= config.getElasticSearchThresholdUnmarkedDatabasesDelete()) {
log.info("Start: Delete unmarked databases({})", unmarkedDatabaseDtos.size());
final List<String> unmarkedDatabaseNames = Lists.newArrayList();
final List<DatabaseDto> deleteDatabaseDtos = unmarkedDatabaseDtos.stream().filter(databaseDto -> {
boolean result = false;
try {
unmarkedDatabaseNames.add(databaseDto.getName().toString());
final DatabaseDto dto = databaseService.get(databaseDto.getName(),
GetDatabaseServiceParameters.builder()
.includeUserMetadata(false)
.includeTableNames(false)
.disableOnReadMetadataIntercetor(false)
.build());
if (dto == null) {
result = true;
}
} catch (DatabaseNotFoundException de) {
result = true;
} catch (Exception e) {
log.warn("Ignoring exception during deleteUnmarkedEntities for {}. Message: {}",
databaseDto.getName(), e.getMessage());
}
return result;
}).collect(Collectors.toList());
log.info("Unmarked databases({}): {}", unmarkedDatabaseNames.size(), unmarkedDatabaseNames);
log.info("Deleting databases({})", deleteDatabaseDtos.size());
if (!deleteDatabaseDtos.isEmpty()) {
final List<QualifiedName> deleteDatabaseQualifiedNames = deleteDatabaseDtos.stream()
.map(DatabaseDto::getName)
.collect(Collectors.toList());
final List<String> deleteDatabaseNames = deleteDatabaseQualifiedNames.stream().map(
QualifiedName::toString).collect(Collectors.toList());
log.info("Deleting databases({}): {}", deleteDatabaseNames.size(), deleteDatabaseNames);
userMetadataService.deleteDefinitionMetadata(deleteDatabaseQualifiedNames);
elasticSearchUtil.softDelete("database", deleteDatabaseNames, context);
}
log.info("End: Delete unmarked databases({})", unmarkedDatabaseDtos.size());
} else {
log.info("Count of unmarked databases({}) is more than the threshold {}", unmarkedDatabaseDtos.size(),
config.getElasticSearchThresholdUnmarkedDatabasesDelete());
registry.counter(
registry.createId(Metrics.CounterElasticSearchUnmarkedDatabaseThreshholdReached.getMetricName()))
.increment();
}
}
final List<TableDto> unmarkedTableDtos = elasticSearchUtil
.getQualifiedNamesByMarkerByNames("table",
qNames, refreshMarker, excludeQualifiedNames, TableDto.class);
if (!unmarkedTableDtos.isEmpty()) {
if (unmarkedTableDtos.size() <= config.getElasticSearchThresholdUnmarkedTablesDelete()) {
log.info("Start: Delete unmarked tables({})", unmarkedTableDtos.size());
final List<String> unmarkedTableNames = Lists.newArrayList();
final List<TableDto> deleteTableDtos = unmarkedTableDtos.stream().filter(tableDto -> {
boolean result = false;
try {
unmarkedTableNames.add(tableDto.getName().toString());
final Optional<TableDto> dto = tableService.get(tableDto.getName(),
GetTableServiceParameters.builder()
.includeDataMetadata(false)
.disableOnReadMetadataIntercetor(false)
.includeInfo(true)
.includeDefinitionMetadata(false)
.build());
if (!dto.isPresent()) {
result = true;
}
} catch (Exception e) {
log.warn("Ignoring exception during deleteUnmarkedEntities for {}. Message: {}",
tableDto.getName(), e.getMessage());
}
return result;
}).collect(Collectors.toList());
log.info("Unmarked tables({}): {}", unmarkedTableNames.size(), unmarkedTableNames);
log.info("Deleting tables({})", deleteTableDtos.size());
if (!deleteTableDtos.isEmpty()) {
final List<String> deleteTableNames = deleteTableDtos.stream().map(
dto -> dto.getName().toString()).collect(Collectors.toList());
log.info("Deleting tables({}): {}", deleteTableNames.size(), deleteTableNames);
userMetadataService.deleteMetadata("admin", Lists.newArrayList(deleteTableDtos));
// Publish event. Elasticsearch event handler will take care of updating the index already
// TODO: Re-evaluate events vs. direct calls for these types of situations like in Genie
deleteTableDtos.forEach(
tableDto -> {
tagService.delete(tableDto.getName(), false);
this.eventBus.post(
new MetacatDeleteTablePostEvent(tableDto.getName(), context, this, tableDto)
);
}
);
}
log.info("End: Delete unmarked tables({})", unmarkedTableDtos.size());
} else {
log.info("Count of unmarked tables({}) is more than the threshold {}", unmarkedTableDtos.size(),
config.getElasticSearchThresholdUnmarkedTablesDelete());
registry.counter(
registry.createId(Metrics.CounterElasticSearchUnmarkedTableThreshholdReached.getMetricName()))
.increment();
}
}
log.info("End: Delete unmarked entities");
}