in oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/ContinuousProfilingServiceHandler.java [88:166]
public void queryPolicies(ContinuousProfilingPolicyQuery request, StreamObserver<Commands> responseObserver) {
final Map<String, String> policiesQuery = request.getPoliciesList().stream()
.collect(Collectors.toMap(s -> IDManager.ServiceID.buildId(s.getServiceName(), true), ContinuousProfilingServicePolicyQuery::getUuid, (s1, s2) -> s1));
if (CollectionUtils.isEmpty(policiesQuery)) {
responseObserver.onNext(Commands.newBuilder().build());
responseObserver.onCompleted();
return;
}
try {
final List<String> serviceIdList = new ArrayList<>(policiesQuery.keySet());
final HashMap<String, ContinuousProfilingPolicy> policiesInDB = new HashMap<>();
// query from the cache first
for (ListIterator<String> serviceIdIt = serviceIdList.listIterator(); serviceIdIt.hasNext(); ) {
final String serviceId = serviceIdIt.next();
final PolicyWrapper wrapper = this.policyCache.getIfPresent(serviceId);
if (wrapper == null) {
continue;
}
serviceIdIt.remove();
if (wrapper.policy != null) {
policiesInDB.put(serviceId, wrapper.policy);
}
}
// if no service need to check from DB then return empty
if (serviceIdList.isEmpty()) {
sendEmptyCommands(responseObserver);
return;
}
// query the policies which not in the cache
final List<ContinuousProfilingPolicy> queriedFromDB = policyDAO.queryPolicies(serviceIdList);
for (ContinuousProfilingPolicy dbPolicy : queriedFromDB) {
policiesInDB.put(dbPolicy.getServiceId(), dbPolicy);
this.policyCache.put(dbPolicy.getServiceId(), new PolicyWrapper(dbPolicy));
serviceIdList.remove(dbPolicy.getServiceId());
}
// Also add the cache if the service haven't policy
for (String serviceId : serviceIdList) {
this.policyCache.put(serviceId, new PolicyWrapper(null));
}
final ArrayList<ContinuousProfilingPolicy> updatePolicies = new ArrayList<>();
for (Map.Entry<String, String> entry : policiesQuery.entrySet()) {
final String serviceId = entry.getKey();
final ContinuousProfilingPolicy policyInDB = policiesInDB.get(serviceId);
// policy not exist in DB or uuid not same
// needs to send commands to downstream
if (policyInDB == null && StringUtil.isNotEmpty(entry.getValue())) {
final ContinuousProfilingPolicy emptyPolicy = new ContinuousProfilingPolicy();
emptyPolicy.setServiceId(entry.getKey());
emptyPolicy.setUuid("");
updatePolicies.add(emptyPolicy);
} else if (policyInDB != null && !Objects.equals(policyInDB.getUuid(), entry.getValue())) {
updatePolicies.add(policyInDB);
}
}
if (CollectionUtils.isEmpty(updatePolicies)) {
sendEmptyCommands(responseObserver);
return;
}
final ContinuousProfilingPolicyCommand command = commandService.newContinuousProfilingServicePolicyCommand(updatePolicies);
final Commands.Builder builder = Commands.newBuilder();
builder.addCommands(command.serialize());
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
return;
} catch (Exception e) {
log.warn("query continuous profiling service policies failure", e);
}
sendEmptyCommands(responseObserver);
}