mirror of
https://github.com/zhayujie/chatgpt-on-wechat.git
synced 2026-06-02 00:57:41 +08:00
feat(channel): support slack bot
This commit is contained in:
@@ -42,6 +42,9 @@ def create_channel(channel_type) -> Channel:
|
||||
elif channel_type == const.TELEGRAM:
|
||||
from channel.telegram.telegram_channel import TelegramChannel
|
||||
ch = TelegramChannel()
|
||||
elif channel_type == const.SLACK:
|
||||
from channel.slack.slack_channel import SlackChannel
|
||||
ch = SlackChannel()
|
||||
elif channel_type in (const.WEIXIN, "wx"):
|
||||
from channel.weixin.weixin_channel import WeixinChannel
|
||||
ch = WeixinChannel()
|
||||
|
||||
1
channel/slack/__init__.py
Normal file
1
channel/slack/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
|
||||
506
channel/slack/slack_channel.py
Normal file
506
channel/slack/slack_channel.py
Normal file
@@ -0,0 +1,506 @@
|
||||
"""
|
||||
Slack channel via Bolt for Python (Socket Mode).
|
||||
|
||||
Features:
|
||||
- Direct message & channel chat (text / image / file)
|
||||
- Channel trigger: @mention or reply in a thread the bot is in (configurable)
|
||||
- /cancel fast-path matches Web channel behaviour
|
||||
- Socket Mode: no public IP / callback URL required, works behind NAT
|
||||
|
||||
Implementation note:
|
||||
slack_bolt's SocketModeHandler is blocking and runs its own background
|
||||
threads. We start it in a dedicated thread so the rest of cow (sync) stays
|
||||
untouched. Inbound events are dispatched onto cow's existing sync
|
||||
ChatChannel.produce() pipeline; outbound send() calls the Slack Web API
|
||||
client directly (it is sync-safe).
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import threading
|
||||
|
||||
import requests
|
||||
|
||||
from bridge.context import Context, ContextType
|
||||
from bridge.reply import Reply, ReplyType
|
||||
from channel.chat_channel import ChatChannel, check_prefix
|
||||
from channel.slack.slack_message import SlackMessage
|
||||
from common.expired_dict import ExpiredDict
|
||||
from common.log import logger
|
||||
from common.singleton import singleton
|
||||
from config import conf
|
||||
|
||||
|
||||
@singleton
|
||||
class SlackChannel(ChatChannel):
|
||||
NOT_SUPPORT_REPLYTYPE = []
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.bot_token = ""
|
||||
self.app_token = ""
|
||||
self.bot_user_id = "" # used to strip @mention and ignore self messages
|
||||
self._app = None
|
||||
self._handler = None
|
||||
self._client = None
|
||||
self._loop_thread = None
|
||||
# Idempotent dedup; Slack retries event delivery on slow ack
|
||||
self._received_msgs = ExpiredDict(60 * 60 * 1)
|
||||
|
||||
# Disable group whitelist / prefix checks (we handle triggering ourselves
|
||||
# in _should_reply_in_channel), aligned with telegram / feishu channels.
|
||||
conf()["group_name_white_list"] = ["ALL_GROUP"]
|
||||
conf()["single_chat_prefix"] = [""]
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Lifecycle
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def startup(self):
|
||||
self.bot_token = conf().get("slack_bot_token", "")
|
||||
self.app_token = conf().get("slack_app_token", "")
|
||||
if not self.bot_token or not self.app_token:
|
||||
err = "[Slack] slack_bot_token and slack_app_token are both required"
|
||||
logger.error(err)
|
||||
self.report_startup_error(err)
|
||||
return
|
||||
|
||||
# Guard against the common mistake of swapping the two tokens:
|
||||
# bot token must start with xoxb-, app-level token with xapp-.
|
||||
if not self.bot_token.startswith("xoxb-") or not self.app_token.startswith("xapp-"):
|
||||
err = (
|
||||
"[Slack] token type mismatch: slack_bot_token must start with 'xoxb-' "
|
||||
"and slack_app_token must start with 'xapp-' (they look swapped)"
|
||||
)
|
||||
logger.error(err)
|
||||
self.report_startup_error(err)
|
||||
return
|
||||
|
||||
try:
|
||||
from slack_bolt import App
|
||||
from slack_bolt.adapter.socket_mode import SocketModeHandler
|
||||
except ImportError:
|
||||
err = (
|
||||
"[Slack] slack_bolt is not installed. "
|
||||
"Run: pip install slack_bolt"
|
||||
)
|
||||
logger.error(err)
|
||||
self.report_startup_error(err)
|
||||
return
|
||||
|
||||
try:
|
||||
self._app = App(token=self.bot_token)
|
||||
self._client = self._app.client
|
||||
|
||||
# Resolve our own bot user id (needed for @mention strip / self-ignore)
|
||||
auth = self._client.auth_test()
|
||||
self.bot_user_id = auth.get("user_id", "")
|
||||
self.name = self.bot_user_id # ChatChannel uses self.name to strip @-mention
|
||||
logger.info(f"[Slack] Bot logged in as user_id={self.bot_user_id}, team={auth.get('team')}")
|
||||
except Exception as e:
|
||||
err = f"[Slack] auth_test failed: {e}"
|
||||
logger.error(err)
|
||||
self.report_startup_error(err)
|
||||
return
|
||||
|
||||
self._register_handlers()
|
||||
|
||||
self._handler = SocketModeHandler(self._app, self.app_token)
|
||||
|
||||
def _run():
|
||||
try:
|
||||
logger.info("[Slack] Starting Socket Mode connection...")
|
||||
self.report_startup_success()
|
||||
logger.info("[Slack] ✅ Slack bot ready, listening for events")
|
||||
self._handler.start()
|
||||
except Exception as e:
|
||||
logger.error(f"[Slack] socket mode crashed: {e}", exc_info=True)
|
||||
self.report_startup_error(str(e))
|
||||
finally:
|
||||
logger.info("[Slack] socket mode exited")
|
||||
|
||||
self._loop_thread = threading.Thread(target=_run, daemon=True, name="slack-socket")
|
||||
self._loop_thread.start()
|
||||
# Block startup() until the handler thread exits, matching other channels'
|
||||
# behaviour (startup is a blocking call).
|
||||
self._loop_thread.join()
|
||||
|
||||
def _register_handlers(self):
|
||||
app = self._app
|
||||
|
||||
# app_mention: bot is @-mentioned in a channel
|
||||
@app.event("app_mention")
|
||||
def _on_app_mention(event, ack):
|
||||
ack()
|
||||
self._handle_event(event, is_group=True)
|
||||
|
||||
# message: DMs and channel messages (including thread replies)
|
||||
@app.event("message")
|
||||
def _on_message(event, ack):
|
||||
ack()
|
||||
self._handle_message_event(event)
|
||||
|
||||
def stop(self):
|
||||
logger.info("[Slack] stop() called")
|
||||
try:
|
||||
if self._handler is not None:
|
||||
self._handler.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"[Slack] handler close error: {e}")
|
||||
if self._loop_thread and self._loop_thread.is_alive():
|
||||
try:
|
||||
self._loop_thread.join(timeout=10)
|
||||
except Exception:
|
||||
pass
|
||||
logger.info("[Slack] stop() completed")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Inbound: slack event -> ChatMessage -> ChatChannel.produce
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _handle_message_event(self, event: dict):
|
||||
"""Route a raw `message` event: skip bot/system noise, decide grouping."""
|
||||
try:
|
||||
logger.debug(
|
||||
f"[Slack] message event: channel_type={event.get('channel_type')}, "
|
||||
f"subtype={event.get('subtype')}, user={event.get('user')}, "
|
||||
f"ts={event.get('ts')}, thread_ts={event.get('thread_ts')}"
|
||||
)
|
||||
# Ignore bot messages (including our own) and message edits/deletes
|
||||
if event.get("bot_id") or event.get("subtype") in ("bot_message", "message_changed", "message_deleted"):
|
||||
return
|
||||
if event.get("user") == self.bot_user_id:
|
||||
return
|
||||
|
||||
channel_type = event.get("channel_type", "")
|
||||
# DM (im) is single chat; channel/group is group chat. app_mention
|
||||
# already covers channel @-mentions, so for plain channel messages we
|
||||
# only react when configured / thread-following.
|
||||
is_group = channel_type in ("channel", "group", "mpim")
|
||||
if is_group:
|
||||
# app_mention handler covers explicit @bot; here we only handle
|
||||
# follow-up replies in threads the bot participates in.
|
||||
if not self._should_reply_in_channel(event):
|
||||
return
|
||||
self._handle_event(event, is_group=is_group)
|
||||
except Exception as e:
|
||||
logger.error(f"[Slack] _handle_message_event error: {e}", exc_info=True)
|
||||
|
||||
def _handle_event(self, event: dict, is_group: bool):
|
||||
"""Parse event -> build SlackMessage -> produce()."""
|
||||
try:
|
||||
channel_id = event.get("channel", "")
|
||||
ts = event.get("ts", "")
|
||||
if not channel_id:
|
||||
return
|
||||
|
||||
# Idempotent dedup
|
||||
msg_uid = f"{channel_id}:{ts}"
|
||||
if self._received_msgs.get(msg_uid):
|
||||
return
|
||||
self._received_msgs[msg_uid] = True
|
||||
|
||||
# Parse type + download media if needed.
|
||||
ctype, content, caption = self._parse_event(event)
|
||||
if ctype is None:
|
||||
logger.debug(f"[Slack] unsupported message type, skip. event={event}")
|
||||
return
|
||||
|
||||
# Strip <@bot_user_id> mention from channel text
|
||||
if is_group and self.bot_user_id:
|
||||
if ctype == ContextType.TEXT and content:
|
||||
content = self._strip_at_mention(content)
|
||||
if caption:
|
||||
caption = self._strip_at_mention(caption)
|
||||
|
||||
slack_msg = SlackMessage(
|
||||
event,
|
||||
is_group=is_group,
|
||||
bot_user_id=self.bot_user_id,
|
||||
ctype=ctype,
|
||||
content=content,
|
||||
)
|
||||
slack_msg.is_at = is_group # if we reached here in a channel, bot is mentioned/threaded
|
||||
|
||||
from channel.file_cache import get_file_cache
|
||||
file_cache = get_file_cache()
|
||||
session_id = self._compute_session_id(event, is_group)
|
||||
|
||||
# Media + caption together: treat as a complete query and bypass the cache
|
||||
if ctype in (ContextType.IMAGE, ContextType.FILE) and caption:
|
||||
tag = "image" if ctype == ContextType.IMAGE else "file"
|
||||
merged_text = f"{caption}\n[{tag}: {content}]"
|
||||
slack_msg.ctype = ContextType.TEXT
|
||||
slack_msg.content = merged_text
|
||||
ctype = ContextType.TEXT
|
||||
logger.info(f"[Slack] Media+caption merged for session {session_id}")
|
||||
# fallthrough to the TEXT branch below
|
||||
|
||||
elif ctype == ContextType.IMAGE:
|
||||
file_cache.add(session_id, content, file_type="image")
|
||||
logger.info(f"[Slack] Image cached for session {session_id}, waiting for query...")
|
||||
return
|
||||
elif ctype == ContextType.FILE:
|
||||
file_cache.add(session_id, content, file_type="file")
|
||||
logger.info(f"[Slack] File cached for session {session_id}: {content}")
|
||||
return
|
||||
|
||||
if ctype == ContextType.TEXT:
|
||||
# Fast-path: /cancel mirrors Web channel behaviour
|
||||
if (content or "").strip().lower() in ("/cancel", "cancel"):
|
||||
self._do_cancel(session_id, channel_id, event)
|
||||
return
|
||||
|
||||
cached_files = file_cache.get(session_id)
|
||||
if cached_files:
|
||||
refs = []
|
||||
for fi in cached_files:
|
||||
ftype = fi["type"]
|
||||
tag = ftype if ftype in ("image", "video") else "file"
|
||||
refs.append(f"[{tag}: {fi['path']}]")
|
||||
slack_msg.content = (slack_msg.content or "") + "\n" + "\n".join(refs)
|
||||
file_cache.clear(session_id)
|
||||
logger.info(f"[Slack] Attached {len(cached_files)} cached file(s) to query")
|
||||
|
||||
# Reply in the originating thread when present, else start one on this msg
|
||||
thread_ts = event.get("thread_ts") or ts
|
||||
|
||||
context = self._compose_context(
|
||||
slack_msg.ctype,
|
||||
slack_msg.content,
|
||||
isgroup=is_group,
|
||||
msg=slack_msg,
|
||||
# Replies go back into the thread, no manual @mention needed
|
||||
no_need_at=True,
|
||||
)
|
||||
if context:
|
||||
context["session_id"] = session_id
|
||||
context["receiver"] = channel_id
|
||||
context["slack_channel"] = channel_id
|
||||
context["slack_thread_ts"] = thread_ts if is_group else None
|
||||
self.produce(context)
|
||||
logger.debug(f"[Slack] received: type={ctype}, content={str(slack_msg.content)[:80]}")
|
||||
except Exception as e:
|
||||
logger.error(f"[Slack] _handle_event error: {e}", exc_info=True)
|
||||
|
||||
def _do_cancel(self, session_id: str, channel_id: str, event: dict):
|
||||
"""Fast-path: /cancel calls cancel_session directly without going through agent."""
|
||||
try:
|
||||
from agent.protocol import get_cancel_registry
|
||||
cancelled = get_cancel_registry().cancel_session(session_id)
|
||||
text = "Current task cancelled." if cancelled else "No running task to cancel."
|
||||
thread_ts = event.get("thread_ts") or event.get("ts")
|
||||
self._client.chat_postMessage(channel=channel_id, text=text, thread_ts=thread_ts)
|
||||
logger.info(f"[Slack] /cancel session={session_id}, cancelled={cancelled}")
|
||||
except Exception as e:
|
||||
logger.error(f"[Slack] /cancel error: {e}", exc_info=True)
|
||||
|
||||
def _parse_event(self, event: dict):
|
||||
"""Parse a slack event and return (ctype, content, caption).
|
||||
|
||||
- content is text for ContextType.TEXT, otherwise the local file path
|
||||
- caption is the optional text accompanying a file; empty for plain text
|
||||
"""
|
||||
text = (event.get("text") or "").strip()
|
||||
files = event.get("files") or []
|
||||
|
||||
if files:
|
||||
# Handle the first attachment; caption is the accompanying message text
|
||||
f = files[0]
|
||||
mimetype = (f.get("mimetype") or "").lower()
|
||||
url = f.get("url_private_download") or f.get("url_private")
|
||||
name = f.get("name") or f.get("id") or "file"
|
||||
if not url:
|
||||
return (None, None, "")
|
||||
path = self._download_file(url, name)
|
||||
if not path:
|
||||
return (None, None, "")
|
||||
if mimetype.startswith("image/"):
|
||||
return (ContextType.IMAGE, path, text)
|
||||
return (ContextType.FILE, path, text)
|
||||
|
||||
if text:
|
||||
return (ContextType.TEXT, text, "")
|
||||
|
||||
return (None, None, "")
|
||||
|
||||
def _download_file(self, url: str, name: str):
|
||||
"""Download a Slack private file (requires bot token auth) to local tmp dir."""
|
||||
try:
|
||||
headers = {"Authorization": f"Bearer {self.bot_token}"}
|
||||
resp = requests.get(url, headers=headers, timeout=60, stream=True)
|
||||
resp.raise_for_status()
|
||||
tmp_dir = SlackMessage.get_tmp_dir()
|
||||
# Sanitize the name and keep it unique-ish via the url tail
|
||||
safe_name = re.sub(r"[^\w.\-]", "_", name)
|
||||
local_path = os.path.join(tmp_dir, safe_name)
|
||||
with open(local_path, "wb") as fp:
|
||||
for chunk in resp.iter_content(chunk_size=8192):
|
||||
if chunk:
|
||||
fp.write(chunk)
|
||||
logger.debug(f"[Slack] downloaded {name} -> {local_path}")
|
||||
return local_path
|
||||
except Exception as e:
|
||||
logger.error(f"[Slack] download_file failed ({name}): {e}")
|
||||
return None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Channel trigger logic
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _should_reply_in_channel(self, event: dict) -> bool:
|
||||
"""Decide whether to reply to a plain channel message (no @mention).
|
||||
|
||||
app_mention already handles explicit @bot, so here we only deal with
|
||||
follow-up messages. `all` replies to every message; `mention_or_reply`
|
||||
replies inside threads the bot already participates in.
|
||||
"""
|
||||
mode = conf().get("slack_group_trigger", "mention_or_reply")
|
||||
if mode == "all":
|
||||
return True
|
||||
if mode == "mention_only":
|
||||
return False
|
||||
# mention_or_reply: follow up only within an existing thread
|
||||
return bool(event.get("thread_ts"))
|
||||
|
||||
def _strip_at_mention(self, content: str) -> str:
|
||||
"""Strip <@BOT_USER_ID> from channel text."""
|
||||
if not content or not self.bot_user_id:
|
||||
return content
|
||||
pattern = re.compile(r"<@" + re.escape(self.bot_user_id) + r">", re.IGNORECASE)
|
||||
return pattern.sub("", content).strip()
|
||||
|
||||
@staticmethod
|
||||
def _compute_session_id(event: dict, is_group: bool) -> str:
|
||||
channel_id = event.get("channel", "")
|
||||
user_id = event.get("user", "")
|
||||
if is_group:
|
||||
if conf().get("group_shared_session", True):
|
||||
return f"slack_channel_{channel_id}"
|
||||
return f"slack_channel_{channel_id}_{user_id}"
|
||||
return f"slack_user_{user_id}"
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Override _compose_context: skip the parent's group whitelist/at checks
|
||||
# (already handled via _should_reply_in_channel). Same idea as telegram.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _compose_context(self, ctype: ContextType, content, **kwargs):
|
||||
context = Context(ctype, content)
|
||||
context.kwargs = kwargs
|
||||
if "channel_type" not in context:
|
||||
context["channel_type"] = self.channel_type
|
||||
if "origin_ctype" not in context:
|
||||
context["origin_ctype"] = ctype
|
||||
|
||||
cmsg = context["msg"]
|
||||
if cmsg.is_group:
|
||||
if conf().get("group_shared_session", True):
|
||||
context["session_id"] = cmsg.other_user_id
|
||||
else:
|
||||
context["session_id"] = f"{cmsg.from_user_id}:{cmsg.other_user_id}"
|
||||
else:
|
||||
context["session_id"] = cmsg.from_user_id
|
||||
context["receiver"] = cmsg.other_user_id
|
||||
|
||||
if ctype == ContextType.TEXT:
|
||||
img_match_prefix = check_prefix(content, conf().get("image_create_prefix"))
|
||||
if img_match_prefix:
|
||||
content = content.replace(img_match_prefix, "", 1)
|
||||
context.type = ContextType.IMAGE_CREATE
|
||||
else:
|
||||
context.type = ContextType.TEXT
|
||||
context.content = (content or "").strip()
|
||||
if "desire_rtype" not in context and conf().get("always_reply_voice"):
|
||||
context["desire_rtype"] = ReplyType.VOICE
|
||||
elif ctype == ContextType.VOICE:
|
||||
if "desire_rtype" not in context and (
|
||||
conf().get("voice_reply_voice") or conf().get("always_reply_voice")
|
||||
):
|
||||
context["desire_rtype"] = ReplyType.VOICE
|
||||
|
||||
return context
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Outbound: ChatChannel.send -> Slack Web API
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def send(self, reply: Reply, context: Context):
|
||||
"""Called from cow's sync main thread; Slack Web client is sync-safe."""
|
||||
if self._client is None:
|
||||
logger.warning("[Slack] client not ready, drop reply")
|
||||
return
|
||||
|
||||
channel_id = context.get("slack_channel")
|
||||
thread_ts = context.get("slack_thread_ts")
|
||||
if not channel_id:
|
||||
logger.warning("[Slack] no slack_channel in context, drop reply")
|
||||
return
|
||||
|
||||
try:
|
||||
self._do_send(reply, channel_id, thread_ts)
|
||||
logger.info(f"[Slack] sent reply (type={reply.type}, channel={channel_id})")
|
||||
except Exception as e:
|
||||
logger.error(f"[Slack] send failed: {e}", exc_info=True)
|
||||
|
||||
def _do_send(self, reply: Reply, channel_id: str, thread_ts):
|
||||
rtype = reply.type
|
||||
content = reply.content
|
||||
|
||||
if rtype in (ReplyType.TEXT, ReplyType.INFO, ReplyType.ERROR):
|
||||
text = str(content) if content is not None else ""
|
||||
if not text:
|
||||
return
|
||||
# Slack caps a message around 40k chars; split conservatively
|
||||
for chunk in _split_text(text, 3500):
|
||||
self._client.chat_postMessage(channel=channel_id, text=chunk, thread_ts=thread_ts)
|
||||
|
||||
elif rtype == ReplyType.IMAGE:
|
||||
# Already a local BytesIO; upload it directly
|
||||
content.seek(0)
|
||||
self._client.files_upload_v2(
|
||||
channel=channel_id, file=content, filename="image.png", thread_ts=thread_ts,
|
||||
)
|
||||
|
||||
elif rtype == ReplyType.IMAGE_URL:
|
||||
url = str(content)
|
||||
if url.startswith("file://"):
|
||||
local = url[7:]
|
||||
self._client.files_upload_v2(
|
||||
channel=channel_id, file=local, thread_ts=thread_ts,
|
||||
)
|
||||
else:
|
||||
# Post the URL as text; Slack will unfurl it as an image preview
|
||||
self._client.chat_postMessage(channel=channel_id, text=url, thread_ts=thread_ts)
|
||||
|
||||
elif rtype in (ReplyType.VOICE, ReplyType.FILE):
|
||||
local = content[7:] if isinstance(content, str) and content.startswith("file://") else content
|
||||
caption = getattr(reply, "text_content", None) or None
|
||||
self._client.files_upload_v2(
|
||||
channel=channel_id, file=local, initial_comment=caption, thread_ts=thread_ts,
|
||||
)
|
||||
|
||||
else:
|
||||
# Fallback: send as plain text
|
||||
self._client.chat_postMessage(channel=channel_id, text=str(content), thread_ts=thread_ts)
|
||||
|
||||
|
||||
def _split_text(text: str, limit: int):
|
||||
"""Split long text preferring line breaks to keep markdown structure intact."""
|
||||
if len(text) <= limit:
|
||||
yield text
|
||||
return
|
||||
buf = []
|
||||
size = 0
|
||||
for line in text.splitlines(keepends=True):
|
||||
if size + len(line) > limit and buf:
|
||||
yield "".join(buf)
|
||||
buf, size = [], 0
|
||||
# Hard-split single lines that exceed the limit
|
||||
while len(line) > limit:
|
||||
yield line[:limit]
|
||||
line = line[limit:]
|
||||
buf.append(line)
|
||||
size += len(line)
|
||||
if buf:
|
||||
yield "".join(buf)
|
||||
60
channel/slack/slack_message.py
Normal file
60
channel/slack/slack_message.py
Normal file
@@ -0,0 +1,60 @@
|
||||
"""
|
||||
Slack message adapter.
|
||||
|
||||
Convert a Slack event payload into cow's unified ChatMessage.
|
||||
File downloads are NOT performed here; the channel layer downloads files
|
||||
on demand because it needs the bot token for authenticated download URLs.
|
||||
"""
|
||||
import os
|
||||
|
||||
from bridge.context import ContextType
|
||||
from channel.chat_message import ChatMessage
|
||||
from common.utils import expand_path
|
||||
from config import conf
|
||||
|
||||
|
||||
class SlackMessage(ChatMessage):
|
||||
"""Wrap a Slack event into the unified ChatMessage."""
|
||||
|
||||
def __init__(self, event: dict, is_group: bool = False, bot_user_id: str = "",
|
||||
ctype: ContextType = ContextType.TEXT, content: str = ""):
|
||||
super().__init__(event)
|
||||
# Basic fields
|
||||
self.msg_id = event.get("client_msg_id") or event.get("ts") or ""
|
||||
try:
|
||||
self.create_time = int(float(event.get("ts", 0)))
|
||||
except (TypeError, ValueError):
|
||||
self.create_time = 0
|
||||
self.ctype = ctype
|
||||
self.content = content
|
||||
|
||||
# Sender / chat info
|
||||
from_user_id = event.get("user", "unknown")
|
||||
channel_id = event.get("channel", "")
|
||||
self.from_user_id = from_user_id
|
||||
self.from_user_nickname = from_user_id
|
||||
self.to_user_id = bot_user_id or "slack_bot"
|
||||
self.to_user_nickname = bot_user_id or "slack_bot"
|
||||
|
||||
self.is_group = is_group
|
||||
if is_group:
|
||||
# Channel chat: other_user_id = channel_id, actual_user_id = sender id
|
||||
self.other_user_id = channel_id
|
||||
self.other_user_nickname = channel_id
|
||||
self.actual_user_id = from_user_id
|
||||
self.actual_user_nickname = from_user_id
|
||||
else:
|
||||
# DM: use channel_id so replies go back to the same DM channel
|
||||
self.other_user_id = channel_id or from_user_id
|
||||
self.other_user_nickname = from_user_id
|
||||
|
||||
# Whether the bot was triggered by @-mention (set by channel layer)
|
||||
self.is_at = False
|
||||
|
||||
@staticmethod
|
||||
def get_tmp_dir() -> str:
|
||||
"""Local download directory, aligned with other channels (agent_workspace/tmp)."""
|
||||
workspace_root = expand_path(conf().get("agent_workspace", "~/cow"))
|
||||
tmp_dir = os.path.join(workspace_root, "tmp")
|
||||
os.makedirs(tmp_dir, exist_ok=True)
|
||||
return tmp_dir
|
||||
@@ -177,10 +177,21 @@ class TelegramChannel(ChatChannel):
|
||||
application.add_handler(MessageHandler(filters.COMMAND, self._on_command_passthrough))
|
||||
|
||||
# Start polling. drop_pending_updates avoids replaying backlog after restart.
|
||||
# Transient "Server disconnected" / RemoteProtocolError during get_updates
|
||||
# are common over proxies/flaky networks; PTB's network loop auto-retries,
|
||||
# so we only need to keep the noise down (see _quiet_polling_network_errors).
|
||||
self._quiet_polling_network_errors()
|
||||
logger.info("[Telegram] Starting long polling...")
|
||||
await application.initialize()
|
||||
await application.start()
|
||||
await application.updater.start_polling(drop_pending_updates=True)
|
||||
await application.updater.start_polling(
|
||||
drop_pending_updates=True,
|
||||
# Long-poll hold time on the server side; smaller value = reconnect more
|
||||
# often but each hung connection fails faster.
|
||||
timeout=30,
|
||||
# Retry forever on transient get_updates network errors instead of giving up.
|
||||
bootstrap_retries=-1,
|
||||
)
|
||||
self.report_startup_success()
|
||||
logger.info("[Telegram] ✅ Telegram bot ready, polling for updates")
|
||||
|
||||
@@ -196,6 +207,38 @@ class TelegramChannel(ChatChannel):
|
||||
except Exception as e:
|
||||
logger.warning(f"[Telegram] shutdown error: {e}")
|
||||
|
||||
@staticmethod
|
||||
def _quiet_polling_network_errors():
|
||||
"""Downgrade PTB's noisy 'Exception happened while polling for updates' logs.
|
||||
|
||||
These transient get_updates errors (RemoteProtocolError / NetworkError /
|
||||
TimedOut, typically over a proxy) are auto-retried by PTB's network loop,
|
||||
so logging the full traceback at ERROR is just noise. We attach a filter
|
||||
that drops these specific records while leaving real errors untouched.
|
||||
"""
|
||||
import logging
|
||||
|
||||
class _PollingNoiseFilter(logging.Filter):
|
||||
_NEEDLES = (
|
||||
"Exception happened while polling for updates",
|
||||
"Server disconnected without sending a response",
|
||||
)
|
||||
|
||||
def filter(self, record: logging.LogRecord) -> bool:
|
||||
try:
|
||||
msg = record.getMessage()
|
||||
except Exception:
|
||||
return True
|
||||
if any(n in msg for n in self._NEEDLES):
|
||||
# Keep a single-line breadcrumb at DEBUG, drop the traceback.
|
||||
logger.debug(f"[Telegram] transient polling network error (auto-retrying): {msg.splitlines()[0]}")
|
||||
return False
|
||||
return True
|
||||
|
||||
noise_filter = _PollingNoiseFilter()
|
||||
for name in ("telegram.ext.Updater", "telegram.ext._updater", "telegram.ext"):
|
||||
logging.getLogger(name).addFilter(noise_filter)
|
||||
|
||||
def stop(self):
|
||||
logger.info("[Telegram] stop() called")
|
||||
self._stop_event.set()
|
||||
|
||||
@@ -1315,7 +1315,13 @@ class FileServeHandler:
|
||||
file_path = params.path
|
||||
if not file_path or not os.path.isabs(file_path):
|
||||
raise web.notfound()
|
||||
file_path = os.path.normpath(file_path)
|
||||
# Resolve symlinks and confine access to the configured root dir,
|
||||
# so this endpoint can't be abused to read arbitrary files (e.g. /etc/passwd, ~/.ssh).
|
||||
# Defaults to the user home dir; set web_file_serve_root="/" to allow the whole filesystem.
|
||||
file_path = os.path.realpath(file_path)
|
||||
serve_root = os.path.realpath(os.path.expanduser(conf().get("web_file_serve_root", "~") or "~"))
|
||||
if serve_root != os.sep and os.path.commonpath([file_path, serve_root]) != serve_root:
|
||||
raise web.notfound()
|
||||
if not os.path.isfile(file_path):
|
||||
raise web.notfound()
|
||||
content_type = mimetypes.guess_type(file_path)[0] or "application/octet-stream"
|
||||
@@ -2917,6 +2923,15 @@ class ChannelsHandler:
|
||||
{"key": "telegram_token", "label": "Bot Token", "type": "secret"},
|
||||
],
|
||||
}),
|
||||
("slack", {
|
||||
"label": {"zh": "Slack", "en": "Slack"},
|
||||
"icon": "fa-hashtag",
|
||||
"color": "purple",
|
||||
"fields": [
|
||||
{"key": "slack_bot_token", "label": "Bot Token (xoxb-)", "type": "secret"},
|
||||
{"key": "slack_app_token", "label": "App Token (xapp-)", "type": "secret"},
|
||||
],
|
||||
}),
|
||||
])
|
||||
|
||||
@staticmethod
|
||||
|
||||
Reference in New Issue
Block a user