jobs/eam-integrations/scripts/slack_channels/api/Slack/slack.py (85 lines of code) (raw):
import os
from api.util import APIAdaptor
from .secrets import config
import time
from api.util.decorators import wait
def retry(count):
def decorator(func):
def wrapper(*args, **kwargs):
for _ in range(count):
try:
ret = func(*args, **kwargs)
return ret
except Exception as e:
print('retry except')
if 'ratelimited' in e.args[0].get('error'):
time.sleep(1)
continue
return wrapper
return decorator
class SlackAPI:
def __init__(self):
self.api_adapter = APIAdaptor(host=config['slack_host'])
self._token = config['slack_token']
@retry(3)
def get_conversations_list(self, types):
channels_dict = {}
params = {'limit': 100,
'types': types,
'team_id': 'T07JXFQU132',
# 'last_message_activity_before': 1724426147
}
headers = {'Authorization': f'Bearer {self._token }'}
endpoint = "api/conversations.list"
#endpoint = "api/admin.conversations.lookup"
while (True):
data = self.api_adapter.get(endpoint=endpoint,
headers=headers,
params=params)
if not data.data.get('ok'):
raise Exception(data.data)
# channels_dict = {x.get('id'):x for x in data.data.get('channels',[])}
for x in data.data.get('channels',''):
channels_dict[x.get('id')] = x
if data.data.get('response_metadata').get('next_cursor', ''):
params['cursor'] = data.data.get('response_metadata').get('next_cursor', '')
else:
break
return channels_dict
@retry(3)
def get_conversations_history(self, params):
headers = {'Authorization': f'Bearer {self._token }'}
endpoint = "api/conversations.history"
return self.api_adapter.get(endpoint=endpoint,
headers=headers,
params=params)
@retry(3)
def conversations_archive(self, channel_id):
params = {'channel': channel_id}
headers = {'Authorization': f'Bearer {self._token }'}
endpoint = "api/conversations.archive"
return self.api_adapter.post(endpoint=endpoint,
headers=headers,
params=params)
@retry(3)
def conversations_delete(self, channel_id):
params = {'channel_id': channel_id}
headers = {'Authorization': f'Bearer {self._token }'}
endpoint = "api/admin.conversations.delete"
return self.api_adapter.post(endpoint=endpoint,
headers=headers,
params=params)
@retry(3)
def join_channel(self, channel_id):
params = {'channel': channel_id}
endpoint = "api/conversations.join"
headers = {'Authorization': f'Bearer {self._token }'}
return self.api_adapter.post(endpoint=endpoint,
headers=headers,
params=params)
@retry(3)
def chat_post_message(self, channel_id, text):
params = {'channel': channel_id,
'text': text}
headers = {'Authorization': f'Bearer {self._token }'}
endpoint = "api/chat.postMessage"
return self.api_adapter.post(endpoint=endpoint,
headers=headers,
params=params)