in namingserver/src/main/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilter.java [72:154]
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
throws IOException, ServletException {
if (servletRequest instanceof HttpServletRequest) {
if (urlPattern.matcher(((HttpServletRequest)servletRequest).getRequestURI()).matches()) {
CachedBodyHttpServletRequest request =
new CachedBodyHttpServletRequest((HttpServletRequest)servletRequest);
HttpServletResponse response = (HttpServletResponse)servletResponse;
String namespace = request.getHeader("x-seata-namespace");
String cluster = request.getHeader("x-seata-cluster");
String vgroup = request.getParameter("vgroup");
if (StringUtils.isNotBlank(namespace)
&& (StringUtils.isNotBlank(cluster) || StringUtils.isNotBlank(vgroup))) {
List<Node> list = null;
if (StringUtils.isNotBlank(vgroup)) {
list = namingManager.getInstancesByVgroupAndNamespace(namespace, vgroup, StringUtils.equalsIgnoreCase(request.getMethod(), HttpMethod.GET.name()));
} else if (StringUtils.isNotBlank(cluster)) {
list = namingManager.getInstances(namespace, cluster);
}
if (CollectionUtils.isNotEmpty(list)) {
// Randomly select a node from the list
NamingServerNode node = (NamingServerNode) list.get(ThreadLocalRandom.current().nextInt(list.size()));
Node.Endpoint controlEndpoint = node.getControl();
if (controlEndpoint != null) {
// Construct the target URL
String targetUrl = "http://" + controlEndpoint.getHost() + ":" + controlEndpoint.getPort()
+ request.getRequestURI()
+ (request.getQueryString() != null ? "?" + request.getQueryString() : "");
// Copy headers from the original request
HttpHeaders headers = new HttpHeaders();
if (node.getRole() == ClusterRole.LEADER) {
headers.add(RAFT_GROUP_HEADER, node.getUnit());
}
Collections.list(request.getHeaderNames())
.forEach(headerName -> headers.add(headerName, request.getHeader(headerName)));
// Create the HttpEntity with headers and body
HttpEntity<byte[]> httpEntity = new HttpEntity<>(request.getCachedBody(), headers);
// Forward the request
AsyncContext asyncContext = servletRequest.startAsync();
asyncContext.setTimeout(5000L);
ListenableFuture<ResponseEntity<byte[]>> responseEntityFuture = asyncRestTemplate.exchange(
URI.create(targetUrl), Objects.requireNonNull(HttpMethod.resolve(request.getMethod())),
httpEntity, byte[].class);
responseEntityFuture.addCallback(new ListenableFutureCallback<ResponseEntity<byte[]>>() {
@Override
public void onFailure(Throwable ex) {
try {
logger.error(ex.getMessage(), ex);
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
} finally {
asyncContext.complete();
}
}
@Override
public void onSuccess(ResponseEntity<byte[]> responseEntity) {
// Copy response headers and status code
responseEntity.getHeaders().forEach((key, value) -> {
value.forEach(v -> response.addHeader(key, v));
});
response.setStatus(responseEntity.getStatusCodeValue());
// Write response body
Optional.ofNullable(responseEntity.getBody()).ifPresent(body -> {
try (ServletOutputStream outputStream = response.getOutputStream()) {
outputStream.write(body);
outputStream.flush();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
});
asyncContext.complete();
}
});
return;
}
}
}
}
}
filterChain.doFilter(servletRequest, servletResponse);
}