in shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-divide/src/main/java/org/apache/shenyu/plugin/divide/DividePlugin.java [71:130]
protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
Objects.requireNonNull(shenyuContext);
DivideRuleHandle ruleHandle = buildRuleHandle(rule);
if (ruleHandle.getHeaderMaxSize() > 0) {
long headerSize = exchange.getRequest().getHeaders().values()
.stream()
.flatMap(Collection::stream)
.mapToLong(header -> header.getBytes(StandardCharsets.UTF_8).length)
.sum();
if (headerSize > ruleHandle.getHeaderMaxSize()) {
LOG.error("request header is too large");
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_HEADER_TOO_LARGE);
return WebFluxResultUtils.result(exchange, error);
}
}
if (ruleHandle.getRequestMaxSize() > 0) {
if (exchange.getRequest().getHeaders().getContentLength() > ruleHandle.getRequestMaxSize()) {
LOG.error("request entity is too large");
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.REQUEST_ENTITY_TOO_LARGE);
return WebFluxResultUtils.result(exchange, error);
}
}
List<Upstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
if (CollectionUtils.isEmpty(upstreamList)) {
LOG.error("divide upstream configuration error: {}", selector);
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL);
return WebFluxResultUtils.result(exchange, error);
}
String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
Upstream upstream = LoadBalancerFactory.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
if (Objects.isNull(upstream)) {
LOG.error("divide has no upstream");
Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.CANNOT_FIND_HEALTHY_UPSTREAM_URL);
return WebFluxResultUtils.result(exchange, error);
}
// set the http url
if (CollectionUtils.isNotEmpty(exchange.getRequest().getHeaders().get(Constants.SPECIFY_DOMAIN))) {
upstream.setUrl(exchange.getRequest().getHeaders().get(Constants.SPECIFY_DOMAIN).get(0));
}
// set domain
String domain = upstream.buildDomain();
exchange.getAttributes().put(Constants.HTTP_DOMAIN, domain);
// set the http timeout
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
// set retry strategy stuff
exchange.getAttributes().put(Constants.RETRY_STRATEGY, StringUtils.defaultString(ruleHandle.getRetryStrategy(), RetryEnum.CURRENT.getName()));
exchange.getAttributes().put(Constants.LOAD_BALANCE, StringUtils.defaultString(ruleHandle.getLoadBalance(), LoadBalanceEnum.RANDOM.getName()));
exchange.getAttributes().put(Constants.DIVIDE_SELECTOR_ID, selector.getId());
if (ruleHandle.getLoadBalance().equals(P2C)) {
return chain.execute(exchange).doOnSuccess(e -> responseTrigger(upstream
)).doOnError(throwable -> responseTrigger(upstream));
} else if (ruleHandle.getLoadBalance().equals(SHORTEST_RESPONSE)) {
beginTime = System.currentTimeMillis();
return chain.execute(exchange).doOnSuccess(e -> successResponseTrigger(upstream
));
}
return chain.execute(exchange);
}