in spring-ai-alibaba-jmanus/src/main/java/com/alibaba/cloud/ai/example/manus/dynamic/mcp/service/McpService.java [71:160]
private void loadMcpServices(McpConfigEntity mcpConfigEntity) throws IOException {
if (mcpConfigEntity.getConnectionType() == null) {
throw new IOException("Connection type is required");
}
McpConfigType type = mcpConfigEntity.getConnectionType();
String serverName = mcpConfigEntity.getMcpServerName();
switch (type) {
case SSE -> {
McpClientTransport transport = null;
try (JsonParser jsonParser = new ObjectMapper().createParser(mcpConfigEntity.getConnectionConfig())) {
McpServerConfig mcpServerConfig = jsonParser.readValueAs(McpServerConfig.class);
if (mcpServerConfig.getUrl() == null || mcpServerConfig.getUrl().isEmpty()) {
throw new IOException("Invalid MCP server URL");
}
// 获取URL和base_uri
String url = mcpServerConfig.getUrl();
// 移除末尾的"/sse"部分
if (url.endsWith("/sse")) {
url = url.substring(0, url.length() - 4); // 移除末尾的"/sse"
}
// 移除末尾的斜杠,避免与baseUrl组合时产生路径问题
else if (url.endsWith("/")) {
url = url.substring(0, url.length() - 1);
}
logger.info("Connecting to SSE endpoint: {}", url);
// 创建WebClient并添加必要的请求头
WebClient.Builder webClient = WebClient.builder()
.baseUrl(url)
.defaultHeader("Accept", "text/event-stream")
.defaultHeader("Content-Type", "application/json");
transport = new WebFluxSseClientTransport(webClient, new ObjectMapper());
configureMcpTransport(serverName, transport);
}
}
case STUDIO -> {
McpClientTransport transport = null;
try (JsonParser jsonParser = new ObjectMapper().createParser(mcpConfigEntity.getConnectionConfig())) {
McpServerConfig mcpServerConfig = jsonParser.readValueAs(McpServerConfig.class);
// 提取命令参数
String command = mcpServerConfig.getCommand();
List<String> args = mcpServerConfig.getArgs();
Map<String, String> env = mcpServerConfig.getEnv();
// 检查命令是否存在
if (command == null || command.isEmpty()) {
throw new IOException(
"Missing required 'command' field in server configuration for " + serverName);
}
// 使用Builder模式创建ServerParameters实例
ServerParameters.Builder builder = ServerParameters.builder(command);
// 添加参数
if (args != null && !args.isEmpty()) {
builder.args(args);
}
// 添加环境变量
if (env != null && !env.isEmpty()) {
builder.env(env);
}
// 构建ServerParameters实例
ServerParameters serverParameters = builder.build();
transport = new StdioClientTransport(serverParameters, new ObjectMapper());
// 配置MCP客户端
configureMcpTransport(serverName, transport);
logger.info("STUDIO MCP Client configured for server: {}", serverName);
}
catch (Exception e) {
logger.error("Error creating STUDIO transport: ", e);
throw new IOException("Failed to create StdioClientTransport: " + e.getMessage(), e);
}
}
case STREAMING -> {
// 处理STREAMING类型的连接
// 注意:此处需要实现STREAMING类型的处理逻辑
logger.warn("STREAMING connection type is not fully implemented yet");
throw new UnsupportedOperationException("STREAMING connection type is not supported yet");
}
}
}