shared/openai-slackbot/openai_slackbot/clients/slack.py (94 lines of code) (raw):

import json import os import typing as t from logging import getLogger from jinja2 import Environment, FileSystemLoader from pydantic import BaseModel from slack_sdk.errors import SlackApiError from slack_sdk.web.async_client import AsyncWebClient logger = getLogger(__name__) class SlackMessage(BaseModel): app_id: t.Optional[str] = None blocks: t.Optional[t.List[t.Any]] = None bot_id: t.Optional[str] = None bot_profile: t.Optional[t.Dict[str, t.Any]] = None team: str text: str ts: str type: str user: t.Optional[str] = None class CreateSlackMessageResponse(BaseModel): ok: bool channel: str ts: str message: SlackMessage class SlackClient: """ SlackClient wraps the Slack AsyncWebClient implementation and provides some additional functionality specific to the Slackbot implementation. """ def __init__(self, client: AsyncWebClient, template_path: str) -> None: self._client = client self._jinja = self._init_jinja(template_path) async def get_message_link(self, **kwargs) -> str: response = await self._client.chat_getPermalink(**kwargs) if not response["ok"]: raise Exception(f"Failed to get Slack message link: {response['error']}") return response["permalink"] async def get_message(self, channel: str, ts: str) -> t.Optional[t.Dict[str, t.Any]]: """Follows: https://api.slack.com/messaging/retrieving.""" result = await self._client.conversations_history( channel=channel, inclusive=True, latest=ts, limit=1, ) return result["messages"][0] if result["messages"] else None async def post_message(self, **kwargs) -> CreateSlackMessageResponse: response = await self._client.chat_postMessage(**kwargs) if not response["ok"]: raise Exception(f"Failed to post Slack message: {response['error']}") assert isinstance(response.data, dict) return CreateSlackMessageResponse(**response.data) async def update_message(self, **kwargs) -> t.Dict[str, t.Any]: response = await self._client.chat_update(**kwargs) if not response["ok"]: raise Exception(f"Failed to update Slack message: {response['error']}") assert isinstance(response.data, dict) return response.data async def add_reaction(self, **kwargs) -> t.Dict[str, t.Any]: try: response = await self._client.reactions_add(**kwargs) except SlackApiError as e: if e.response["error"] == "already_reacted": return {} raise e assert isinstance(response.data, dict) return response.data async def get_thread_messages(self, channel: str, thread_ts: str) -> t.List[t.Dict[str, t.Any]]: response = await self._client.conversations_replies(channel=channel, ts=thread_ts) if not response["ok"]: raise Exception(f"Failed to get thread messages: {response['error']}") assert isinstance(response.data, dict) return response.data["messages"] async def get_user_display_name(self, user_id: str) -> str: response = await self._client.users_info(user=user_id) if not response["ok"]: raise Exception(f"Failed to get user info: {response['error']}") return response["user"]["profile"]["display_name"] async def get_original_blocks(self, thread_ts: str, channel: str) -> None: """Given a thread_ts, get original message block""" response = await self._client.conversations_replies( channel=channel, ts=thread_ts, ) try: messages = response.get("messages", []) if not messages: raise ValueError(f"Error fetching original message for thread_ts {thread_ts}") blocks = messages[0].get("blocks") if not blocks: raise ValueError(f"Error fetching original message for thread_ts {thread_ts}") return blocks except Exception as e: logger.exception(f"Error fetching original message for thread_ts {thread_ts}: {e}") def render_blocks_from_template(self, template_filename: str, context: t.Dict = {}) -> t.Any: rendered_template = self._jinja.get_template(template_filename).render(context) return json.loads(rendered_template) def _init_jinja(self, template_path: str): templates_dir = os.path.join(template_path) return Environment(loader=FileSystemLoader(templates_dir))