def _register_tools()

in src/mcp_server_aliyun_observability/toolkit/arms_toolkit.py [0:0]


    def _register_tools(self):
        """register arms related tools functions"""

        @self.server.tool()
        def arms_search_apps(
            ctx: Context,
            appNameQuery: str = Field(..., description="app name query"),
            regionId: str = Field(
                ...,
                description="region id,region id format like 'xx-xxx',like 'cn-hangzhou'",
            ),
            pageSize: int = Field(
                20, description="page size,max is 100", ge=1, le=100
            ),
            pageNumber: int = Field(1, description="page number,default is 1", ge=1),
        ) -> list[dict[str, Any]]:
            """搜索ARMS应用。

            ## 功能概述

            该工具用于根据应用名称搜索ARMS应用,返回应用的基本信息,包括应用名称、PID、用户ID和类型。

            ## 使用场景

            - 当需要查找特定名称的应用时
            - 当需要获取应用的PID以便进行其他ARMS操作时
            - 当需要检查用户拥有的应用列表时

            ## 搜索条件

            - app_name_query必须是应用名称的一部分,而非自然语言
            - 搜索结果将分页返回,可以指定页码和每页大小

            ## 返回数据结构

            返回一个字典,包含以下信息:
            - total: 符合条件的应用总数
            - page_size: 每页大小
            - page_number: 当前页码
            - trace_apps: 应用列表,每个应用包含app_name、pid、user_id和type

            ## 查询示例

            - "帮我查询下 XXX 的应用"
            - "找出名称包含'service'的应用"

            Args:
                ctx: MCP上下文,用于访问ARMS客户端
                app_name_query: 应用名称查询字符串
                region_id: 阿里云区域ID
                page_size: 每页大小,范围1-100,默认20
                page_number: 页码,默认1

            Returns:
                包含应用信息的字典
            """
            arms_client: ArmsClient = ctx.request_context.lifespan_context[
                "arms_client"
            ].with_region(regionId)
            request: SearchTraceAppByPageRequest = SearchTraceAppByPageRequest(
                trace_app_name=appNameQuery,
                region_id=regionId,
                page_size=pageSize,
                page_number=pageNumber,
            )
            response: SearchTraceAppByPageResponse = (
                arms_client.search_trace_app_by_page(request)
            )
            page_bean: SearchTraceAppByPageResponseBodyPageBean = (
                response.body.page_bean
            )
            result = {
                "total": page_bean.total_count,
                "page_size": page_bean.page_size,
                "page_number": page_bean.page_number,
                "trace_apps": [],
            }
            if page_bean:
                result["trace_apps"] = [
                    {
                        "app_name": app.app_name,
                        "pid": app.pid,
                        "user_id": app.user_id,
                        "type": app.type,
                    }
                    for app in page_bean.trace_apps
                ]

            return result

        @self.server.tool()
        @retry(
            stop=stop_after_attempt(2),
            wait=wait_fixed(1),
            retry=retry_if_exception_type(Exception),
            reraise=True,
        )
        def arms_generate_trace_query(
            ctx: Context,
            user_id: int = Field(..., description="user aliyun account id"),
            pid: str = Field(..., description="pid,the pid of the app"),
            region_id: str = Field(
                ...,
                description="region id,region id format like 'xx-xxx',like 'cn-hangzhou'",
            ),
            question: str = Field(
                ..., description="question,the question to query the trace"
            ),
        ) -> dict:
            """生成ARMS应用的调用链查询语句。

            ## 功能概述

            该工具用于将自然语言描述转换为ARMS调用链查询语句,便于分析应用性能和问题。

            ## 使用场景

            - 当需要查询应用的调用链信息时
            - 当需要分析应用性能问题时
            - 当需要跟踪特定请求的执行路径时
            - 当需要分析服务间调用关系时

            ## 查询处理

            工具会将自然语言问题转换为SLS查询,并返回:
            - 生成的SLS查询语句
            - 存储调用链数据的项目名
            - 存储调用链数据的日志库名

            ## 查询上下文

            查询会考虑以下信息:
            - 应用的PID
            - 响应时间以纳秒存储,需转换为毫秒
            - 数据以span记录存储,查询耗时需要对符合条件的span进行求和
            - 服务相关信息使用serviceName字段
            - 如果用户明确提出要查询 trace信息,则需要在查询问题上question 上添加说明返回trace信息

            ## 查询示例

            - "帮我查询下 XXX 的 trace 信息"
            - "分析最近一小时内响应时间超过1秒的调用链"

            Args:
                ctx: MCP上下文,用于访问ARMS和SLS客户端
                user_id: 用户阿里云账号ID
                pid: 应用的PID
                region_id: 阿里云区域ID
                question: 查询调用链的自然语言问题

            Returns:
                包含查询信息的字典,包括sls_query、project和log_store
            """

            data: dict[str, str] = get_arms_user_trace_log_store(user_id, region_id)
            instructions = [
                "1. pid为" + pid,
                "2. 响应时间字段为 duration,单位为纳秒,转换成毫秒",
                "3. 注意因为保存的是每个 span 记录,如果是耗时,需要对所有符合条件的span 耗时做求和",
                "4. 涉及到接口服务等字段,使用 serviceName字段",
                "5. 如果用户明确提出要查询 trace信息,则需要返回 trace_id",
            ]
            instructions_str = "\n".join(instructions)
            prompt = f"""
            问题:
            {question}
            补充信息:
            {instructions_str}
            请根据以上信息生成sls查询语句
            """
            sls_text_to_query = text_to_sql(
                ctx, prompt, data["project"], data["log_store"], region_id
            )
            return {
                "sls_query": sls_text_to_query["data"],
                "requestId": sls_text_to_query["requestId"],
                "project": data["project"],
                "log_store": data["log_store"],
            }
          
        @self.server.tool()
        def arms_profile_flame_analysis(
                ctx: Context,
                pid: str = Field(..., description="arms application id"),
                startMs: str = Field(..., description="profile start ms"),
                endMs: str = Field(..., description="profile end ms"),
                profileType: str = Field(default="cpu", description="profile type, like 'cpu' 'memory'"),
                ip: str = Field(None, description="arms service host ip"),
                thread: str = Field(None, description="arms service thread id"),
                threadGroup: str = Field(None, description="arms service thread group"),
                regionId: str = Field(default=...,
                    description="aliyun region id,region id format like 'xx-xxx',like 'cn-hangzhou'",
                ),
        ) -> dict:
            """分析ARMS应用火焰图性能热点。

            ## 功能概述

            当应用存在性能问题且开启持续剖析时,可以调用该工具对ARMS应用火焰图性能热点进行分析,生成分析结果。分析结果会包含火焰图的性能热点问题、优化建议等信息。

            ## 使用场景

            - 当需要分析ARMS应用火焰图性能问题时

            ## 查询示例

            - "帮我分析下ARMS应用 XXX 的火焰图性能热点"

            Args:
                ctx: MCP上下文,用于访问SLS客户端
                pid: ARMS应用监控服务PID
                startMs: 分析的开始时间,通过get_current_time工具获取毫秒级时间戳
                endMs: 分析的结束时间,通过get_current_time工具获取毫秒级时间戳
                profileType: Profile类型,用于选择需要分析的Profile指标,支持CPU热点和内存热点,如'cpu'、'memory'
                ip: ARMS应用服务主机地址,非必要参数,用于选择所在的服务机器,如有多个填写时以英文逗号","分隔,如'192.168.0.1,192.168.0.2',不填写默认查询服务所在的所有IP
                thread: 服务线程名称,非必要参数,用于选择对应线程,如有多个填写时以英文逗号","分隔,如'C1 CompilerThre,C2 CompilerThre',不填写默认查询服务所有线程
                threadGroup: 服务聚合线程组名称,非必要参数,用于选择对应线程组,如有多个填写时以英文逗号","分隔,如'http-nio-*-exec-*,http-nio-*-ClientPoller-*',不填写默认查询服务所有聚合线程组
                regionId: 阿里云区域ID,如'cn-hangzhou'、'cn-shanghai'等
            """
            try:
                valid_types = ['cpu', 'memory']
                profileType = profileType.lower()
                if profileType not in valid_types:
                    raise ValueError(f"无效的profileType: {profileType}, 仅支持: {', '.join(valid_types)}")

                # Connect to ARMS client
                arms_client: ArmsClient = ctx.request_context.lifespan_context["arms_client"].with_region(regionId)
                request: GetTraceAppRequest = GetTraceAppRequest(pid=pid, region_id=regionId)
                response: GetTraceAppResponse = arms_client.get_trace_app(request)
                trace_app: GetTraceAppResponseBodyTraceApp = response.body.trace_app

                if not trace_app:
                    raise ValueError("无法找到应用信息")

                # Extract application details
                service_name = trace_app.app_name
                language = trace_app.language

                # Validate language parameter
                if language not in ['java', 'go']:
                    raise ValueError(f"暂不支持的语言类型: {language}. 当前仅支持 'java' 和 'go'")

                # Prepare SLS client for Flame analysis
                sls_client: Client = ctx.request_context.lifespan_context["sls_client"].with_region("cn-shanghai")
                ai_request: CallAiToolsRequest = CallAiToolsRequest(
                    tool_name="profile_flame_analysis",
                    region_id=regionId
                )

                params: dict[str, Any] = {
                    "serviceName": service_name,
                    "startMs": startMs,
                    "endMs": endMs,
                    "profileType": profileType,
                    "ip": ip,
                    "language": language,
                    "thread": thread,
                    "threadGroup": threadGroup,
                    "sys.query": f"帮我分析下应用 {service_name} 的火焰图性能热点问题",
                }

                ai_request.params = params
                runtime: util_models.RuntimeOptions = util_models.RuntimeOptions(read_timeout=60000,
                                                                                 connect_timeout=60000)

                tool_response: CallAiToolsResponse = sls_client.call_ai_tools_with_options(request=ai_request,
                                                                                           headers={},
                                                                                           runtime=runtime)
                data = tool_response.body

                if "------answer------\n" in data:
                    data = data.split("------answer------\n")[1]

                return {
                    "data": data
                }

            except Exception as e:
                logger.error(f"调用火焰图数据性能热点AI工具失败: {str(e)}")
                raise

        @self.server.tool()
        def arms_diff_profile_flame_analysis(
                ctx: Context,
                pid: str = Field(..., description="arms application id"),
                currentStartMs: str = Field(..., description="current profile start ms"),
                currentEndMs: str = Field(..., description="current profile end ms"),
                referenceStartMs: str = Field(..., description="reference profile start ms (for comparison)"),
                referenceEndMs: str = Field(..., description="reference profile end ms (for comparison)"),
                profileType: str = Field(default="cpu", description="profile type, like 'cpu' 'memory'"),
                ip: str = Field(None, description="arms service host ip"),
                thread: str = Field(None, description="arms service thread id"),
                threadGroup: str = Field(None, description="arms service thread group"),
                regionId: str = Field(default=...,
                                      description="aliyun region id,region id format like 'xx-xxx',like 'cn-hangzhou'")
        ) -> dict:
            """对比两个时间段火焰图的性能变化。

            ## 功能概述

            对应用在两个不同时间段内的性能进行分析,生成差分火焰图。通常用于发布前后或性能优化前后性能对比,帮助识别性能提升或退化。

            ## 使用场景

            - 发布前后、性能优化前后不同时间段火焰图性能对比

            ## 查询示例

            - "帮我分析应用 XXX 在发布前后的性能变化情况"

            Args:
                ctx: MCP上下文,用于访问SLS客户端
                pid: ARMS应用监控服务PID
                currentStartMs: 火焰图当前(基准)时间段的开始时间戳,通过get_current_time工具获取毫秒级时间戳
                currentEndMs: 火焰图当前(基准)时间段的结束时间戳,通过get_current_time工具获取毫秒级时间戳
                referenceStartMs: 火焰图对比时间段(参考时间段)的开始时间戳,通过get_current_time工具获取毫秒级时间戳
                referenceEndMs: 火焰图对比时间段(参考时间段)的结束时间戳,通过get_current_time工具获取毫秒级时间戳
                profileType: Profile类型,如'cpu'、'memory'
                ip: ARMS应用服务主机地址,非必要参数,用于选择所在的服务机器,如有多个填写时以英文逗号","分隔,如'192.168.0.1,192.168.0.2',不填写默认查询服务所在的所有IP
                thread: 服务线程名称,非必要参数,用于选择对应线程,如有多个填写时以英文逗号","分隔,如'C1 CompilerThre,C2 CompilerThre',不填写默认查询服务所有线程
                threadGroup: 服务聚合线程组名称,非必要参数,用于选择对应线程组,如有多个填写时以英文逗号","分隔,如'http-nio-*-exec-*,http-nio-*-ClientPoller-*',不填写默认查询服务所有聚合线程组
                regionId: 阿里云区域ID,如'cn-hangzhou'、'cn-shanghai'等
            """
            try:
                valid_types = ['cpu', 'memory']
                profileType = profileType.lower()
                if profileType not in valid_types:
                    raise ValueError(f"无效的profileType: {profileType}, 仅支持: {', '.join(valid_types)}")

                arms_client: ArmsClient = ctx.request_context.lifespan_context["arms_client"].with_region(regionId)
                request: GetTraceAppRequest = GetTraceAppRequest(
                    pid=pid,
                    region_id=regionId,
                )
                response: GetTraceAppResponse = arms_client.get_trace_app(request)
                trace_app: GetTraceAppResponseBodyTraceApp = response.body.trace_app

                if not trace_app:
                    raise ValueError("无法找到应用信息")

                service_name = trace_app.app_name
                language = trace_app.language

                if language not in ['java', 'go']:
                    raise ValueError(f"暂不支持的语言类型: {language}. 当前仅支持 'java' 和 'go'")

                sls_client: Client = ctx.request_context.lifespan_context["sls_client"].with_region("cn-shanghai")
                ai_request: CallAiToolsRequest = CallAiToolsRequest(
                    tool_name="diff_profile_flame_analysis",
                    region_id=regionId
                )

                params: dict[str, Any] = {
                    "serviceName": service_name,
                    "startMs": currentStartMs,
                    "endMs": currentEndMs,
                    "baseStartMs": referenceStartMs,
                    "baseEndMs": referenceEndMs,
                    "profileType": profileType,
                    "ip": ip,
                    "language": language,
                    "thread": thread,
                    "threadGroup": threadGroup,
                    "sys.query": f"帮我分析应用 {service_name} 在两个时间段前后的性能变化情况",
                }

                ai_request.params = params
                runtime: util_models.RuntimeOptions = util_models.RuntimeOptions(read_timeout=60000, connect_timeout=60000)

                tool_response: CallAiToolsResponse = sls_client.call_ai_tools_with_options(request=ai_request, headers={}, runtime=runtime)
                data = tool_response.body

                if "------answer------\n" in data:
                    data = data.split("------answer------\n")[1]

                return {
                    "data": data
                }

            except Exception as e:
                logger.error(f"调用差分火焰图性能变化分析工具失败: {str(e)}")
                raise

        @self.server.tool()
        def arms_get_application_info(ctx: Context,
                                      pid: str = Field(..., description="pid,the pid of the app"),
                                      regionId: str = Field(...,
                                        description="aliyun region id,region id format like 'xx-xxx',like 'cn-hangzhou'",
                                      ),
                                      ) -> dict:
            """
            根据 PID获取具体某个应用的信息,
            ## 功能概述
            1. 获取ARMS应用信息,会返回应用的 PID,AppName,开发语言类型比如 java,python 等
            
            ## 使用场景
            1. 当用户明确提出要查询某个应用的信息时,可以调用该工具
            2. 有场景需要获取应用的开发语言类型,可以调用该工具
            """
            arms_client: ArmsClient = ctx.request_context.lifespan_context[
                "arms_client"
            ].with_region(regionId)
            request: GetTraceAppRequest = GetTraceAppRequest(
                pid=pid,
                region_id=regionId,
            )
            response: GetTraceAppResponse = arms_client.get_trace_app(request)
            if response.body:
                trace_app: GetTraceAppResponseBodyTraceApp = response.body.trace_app
                return {
                    "pid": trace_app.pid,
                    "app_name": trace_app.app_name,
                    "language": trace_app.language,
                }
            else:
                return "没有找到应用信息"
        
        @self.server.tool()
        def arms_trace_quality_analysis(ctx: Context,
                traceId: str = Field(..., description="traceId"),
                startMs: int = Field(..., description="start time (ms) for trace query. unit is millisecond, should be unix timestamp, only number, no other characters"),
                endMs: int = Field(..., description="end time (ms) for trace query. unit is millisecond, should be unix timestamp, only number, no other characters"),
                regionId: str = Field(default=...,
                                      description="aliyun region id,region id format like 'xx-xxx',like 'cn-hangzhou'")
        ) -> dict:
            """Trace 质量检测

            ## 功能概述
            识别指定 traceId 的 Trace 是否存在完整性问题(断链)和性能问题(错慢调用)

            ## 使用场景

            - 检测调用链是否存在问题

            ## 查询示例

            - "帮我分析调用链"

            Args:
                ctx: MCP上下文,用于访问SLS客户端
                traceId: 待分析的 Trace 的 traceId,必要参数
                startMs: 分析的开始时间,通过get_current_time工具获取毫秒级时间戳
                endMs: 分析的结束时间,通过get_current_time工具获取毫秒级时间戳
                regionId: 阿里云区域ID,如'cn-hangzhou'、'cn-shanghai'等
            """
            try:

                sls_client: Client = ctx.request_context.lifespan_context["sls_client"].with_region("cn-shanghai")
                ai_request: CallAiToolsRequest = CallAiToolsRequest(
                    tool_name="trace_struct_analysis",
                    region_id=regionId
                )

                params: dict[str, Any] = {
                    "startMs": startMs,
                    "endMs": endMs,
                    "traceId": traceId,
                    "sys.query": f"分析这个trace",
                }

                ai_request.params = params
                runtime: util_models.RuntimeOptions = util_models.RuntimeOptions(read_timeout=60000, connect_timeout=60000)

                tool_response: CallAiToolsResponse = sls_client.call_ai_tools_with_options(request=ai_request, headers={}, runtime=runtime)
                data = tool_response.body

                if "------answer------\n" in data:
                    data = data.split("------answer------\n")[1]

                return {
                    "data": data
                }

            except Exception as e:
                logger.error(f"调用Trace质量检测工具失败: {str(e)}")
                raise

        @self.server.tool()
        def arms_slow_trace_analysis(ctx: Context,
                                     traceId: str = Field(..., description="traceId"),
                                     startMs: int = Field(..., description="start time (ms) for trace query. unit is millisecond, should be unix timestamp, only number, no other characters"),
                                     endMs: int = Field(..., description="end time (ms) for trace query. unit is millisecond, should be unix timestamp, only number, no other characters"),
                                     regionId: str = Field(default=...,
                                                           description="aliyun region id,region id format like 'xx-xxx',like 'cn-hangzhou'")
                                     ) -> dict:
            """深入分析 Trace 慢调用根因

            ## 功能概述

            针对 Trace 中的慢调用进行诊断分析,输出包含概述、根因、影响范围及解决方案的诊断报告。

            ## 使用场景

            - 性能问题定位和修复

            ## 查询示例

            - "请分析 ${traceId} 这个 trace 慢的原因"

            Args:
                ctx: MCP上下文,用于访问SLS客户端
                traceId: 待分析的Trace的 traceId,必要参数
                startMs: 分析的开始时间,通过get_current_time工具获取毫秒级时间戳
                endMs: 分析的结束时间,通过get_current_time工具获取毫秒级时间戳
                regionId: 阿里云区域ID,如'cn-hangzhou'、'cn-shanghai'等
            """
            try:

                sls_client: Client = ctx.request_context.lifespan_context["sls_client"].with_region("cn-shanghai")
                ai_request: CallAiToolsRequest = CallAiToolsRequest(
                    tool_name="trace_slow_analysis",
                    region_id=regionId
                )

                params: dict[str, Any] = {
                    "startMs": startMs,
                    "endMs": endMs,
                    "traceId": traceId,
                    "sys.query": f"深入分析慢调用根因",
                }

                ai_request.params = params
                runtime: util_models.RuntimeOptions = util_models.RuntimeOptions(read_timeout=60000,
                                                                                 connect_timeout=60000)

                tool_response: CallAiToolsResponse = sls_client.call_ai_tools_with_options(request=ai_request,
                                                                                           headers={}, runtime=runtime)
                data = tool_response.body

                if "------answer------\n" in data:
                    data = data.split("------answer------\n")[1]

                return {
                    "data": data
                }

            except Exception as e:
                logger.error(f"调用Trace慢调用分析工具失败: {str(e)}")
                raise

        @self.server.tool()
        def arms_error_trace_analysis(ctx: Context,
                                     traceId: str = Field(..., description="traceId"),
                                     startMs: int = Field(..., description="start time (ms) for trace query. unit is millisecond, should be unix timestamp, only number, no other characters"),
                                     endMs: int = Field(..., description="end time (ms) for trace query. unit is millisecond, should be unix timestamp, only number, no other characters"),
                                     regionId: str = Field(default=...,
                                                           description="aliyun region id,region id format like 'xx-xxx',like 'cn-hangzhou'")
                                     ) -> dict:
            """深入分析 Trace 错误根因

            ## 功能概述

            针对 Trace 中的错误调用进行深入诊断分析,输出包含概述、根因、影响范围及解决方案的错误诊断报告。

            ## 使用场景

            - 性能问题定位和修复

            ## 查询示例

            - "请分析 ${traceId} 这个 trace 发生错误的原因"

            Args:
                ctx: MCP上下文,用于访问SLS客户端
                traceId: 待分析的Trace的 traceId,必要参数
                startMs: 分析的开始时间,通过get_current_time工具获取毫秒级时间戳
                endMs: 分析的结束时间,通过get_current_time工具获取毫秒级时间戳
                regionId: 阿里云区域ID,如'cn-hangzhou'、'cn-shanghai'等
            """
            try:

                sls_client: Client = ctx.request_context.lifespan_context["sls_client"].with_region("cn-shanghai")
                ai_request: CallAiToolsRequest = CallAiToolsRequest(
                    tool_name="trace_error_analysis",
                    region_id=regionId
                )

                params: dict[str, Any] = {
                    "startMs": startMs,
                    "endMs": endMs,
                    "traceId": traceId,
                    "sys.query": f"深入分析错误根因",
                }

                ai_request.params = params
                runtime: util_models.RuntimeOptions = util_models.RuntimeOptions(read_timeout=60000,
                                                                                 connect_timeout=60000)

                tool_response: CallAiToolsResponse = sls_client.call_ai_tools_with_options(request=ai_request,
                                                                                           headers={}, runtime=runtime)
                data = tool_response.body

                if "------answer------\n" in data:
                    data = data.split("------answer------\n")[1]

                return {
                    "data": data
                }

            except Exception as e:
                logger.error(f"调用Trace错误分析工具失败: {str(e)}")
                raise