diff --git a/README.md b/README.md
index 8b9e044e..70b2707f 100644
--- a/README.md
+++ b/README.md
@@ -30,7 +30,7 @@ CowAgent is lightweight, easy to deploy, and built to extend. Plug in any major
| [Knowledge](https://docs.cowagent.ai/en/knowledge/index) | Auto-curates structured knowledge into a Markdown wiki, builds an evolving knowledge graph with visual browsing |
| [Skills](https://docs.cowagent.ai/en/skills/index) | One-click install from [Skill Hub](https://skills.cowagent.ai/), GitHub, ClawHub; or create custom skills via natural-language conversation |
| [Tools](https://docs.cowagent.ai/en/tools/index) | Built-in file I/O, terminal, browser, scheduler, memory retrieval, web search, and 10+ more tools — with native MCP integration |
-| [Channels](https://docs.cowagent.ai/en/channels/index) | Integrates with Web, WeChat, Feishu, DingTalk, WeCom, QQ, and Official Accounts |
+| [Channels](https://docs.cowagent.ai/en/channels/index) | Integrates with Web, WeChat, Feishu, DingTalk, WeCom, QQ, Official Accounts, and Telegram |
| Multimodal | First-class support for text, images, voice, and files — recognition, generation, and delivery |
| [Models](https://docs.cowagent.ai/en/models/index) | Claude, GPT, Gemini, DeepSeek, Qwen, GLM, Kimi, MiniMax, Doubao, and more — swap providers from the Web console with one click |
| [Deploy](https://docs.cowagent.ai/en/guide/quick-start) | One-line installer, unified Web console, multiple deployment modes (local, Docker, server) |
@@ -126,6 +126,7 @@ A single Agent instance can serve multiple channels in parallel. Most channels c
| [QQ](https://docs.cowagent.ai/en/channels/qq) | ✅ | ✅ | ✅ | | ✅ |
| [WeCom App](https://docs.cowagent.ai/en/channels/wecom) | ✅ | ✅ | ✅ | ✅ | |
| [WeChat Official Account](https://docs.cowagent.ai/en/channels/wechatmp) | ✅ | ✅ | | ✅ | |
+| [Telegram](https://docs.cowagent.ai/en/channels/telegram) | ✅ | ✅ | ✅ | ✅ | ✅ |
> See the [Channels overview](https://docs.cowagent.ai/en/channels/index) for setup details.
diff --git a/channel/channel_factory.py b/channel/channel_factory.py
index cf6bfea8..fd8cf0e8 100644
--- a/channel/channel_factory.py
+++ b/channel/channel_factory.py
@@ -39,6 +39,9 @@ def create_channel(channel_type) -> Channel:
elif channel_type == const.QQ:
from channel.qq.qq_channel import QQChannel
ch = QQChannel()
+ elif channel_type == const.TELEGRAM:
+ from channel.telegram.telegram_channel import TelegramChannel
+ ch = TelegramChannel()
elif channel_type in (const.WEIXIN, "wx"):
from channel.weixin.weixin_channel import WeixinChannel
ch = WeixinChannel()
diff --git a/channel/telegram/__init__.py b/channel/telegram/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/channel/telegram/telegram_channel.py b/channel/telegram/telegram_channel.py
new file mode 100644
index 00000000..b3b46589
--- /dev/null
+++ b/channel/telegram/telegram_channel.py
@@ -0,0 +1,676 @@
+"""
+Telegram channel via Bot API (long polling mode).
+
+Features:
+- Single chat & group chat (text / photo / voice / video / document)
+- Group trigger: @mention or reply-to-bot (configurable)
+- /cancel fast-path matches Web channel behaviour
+- Auto-register bot commands menu on startup (mirrors Web slash menu)
+- Optional HTTP/SOCKS5 proxy support for restricted networks
+
+Implementation note:
+ python-telegram-bot is async-first. We run the bot inside a dedicated
+ thread with its own asyncio loop so the rest of cow (which is sync)
+ stays untouched. Inbound updates are dispatched onto cow's existing
+ sync ChatChannel.produce() pipeline; outbound send() schedules
+ coroutines back onto that loop via asyncio.run_coroutine_threadsafe.
+"""
+
+import asyncio
+import os
+import re
+import threading
+
+from bridge.context import Context, ContextType
+from bridge.reply import Reply, ReplyType
+from channel.chat_channel import ChatChannel, check_prefix
+from channel.telegram.telegram_message import TelegramMessage
+from common.expired_dict import ExpiredDict
+from common.log import logger
+from common.singleton import singleton
+from config import conf
+
+# Bot command menu, aligned with Web slash commands.
+# Top-level commands only; sub-commands are entered with a space (e.g. "/skill list").
+TELEGRAM_BOT_COMMANDS = [
+ ("help", "Show command help"),
+ ("status", "Show running status"),
+ ("context", "View/clear conversation context (sub: clear)"),
+ ("skill", "Manage skills (list/search/install/...)"),
+ ("memory", "Manage memory (sub: dream)"),
+ ("knowledge", "Manage knowledge base (list/on/off)"),
+ ("config", "Show current config"),
+ ("cancel", "Cancel running agent task"),
+ ("logs", "Show recent logs"),
+ ("version", "Show version"),
+]
+
+
+@singleton
+class TelegramChannel(ChatChannel):
+ NOT_SUPPORT_REPLYTYPE = []
+
+ def __init__(self):
+ super().__init__()
+ self.bot_token = ""
+ self.bot_username = "" # used for @-mention matching
+ self._bot = None
+ self._application = None
+ self._loop = None
+ self._loop_thread = None
+ self._stop_event = threading.Event()
+ # Idempotent dedup; TG occasionally redelivers the same update on flaky networks
+ self._received_msgs = ExpiredDict(60 * 60 * 1)
+
+ # Disable group whitelist / prefix checks (we handle triggering ourselves
+ # in _should_reply_in_group), aligned with feishu / wecom_bot channels.
+ conf()["group_name_white_list"] = ["ALL_GROUP"]
+ conf()["single_chat_prefix"] = [""]
+
+ # ------------------------------------------------------------------
+ # Lifecycle
+ # ------------------------------------------------------------------
+
+ def startup(self):
+ self.bot_token = conf().get("telegram_token", "")
+ if not self.bot_token:
+ err = "[Telegram] telegram_token is required"
+ logger.error(err)
+ self.report_startup_error(err)
+ return
+
+ try:
+ from telegram.ext import (
+ Application,
+ MessageHandler,
+ CommandHandler,
+ filters,
+ )
+ except ImportError:
+ err = (
+ "[Telegram] python-telegram-bot is not installed. "
+ "Run: pip install python-telegram-bot"
+ )
+ logger.error(err)
+ self.report_startup_error(err)
+ return
+
+ # Run the asyncio event loop in a dedicated thread so the sync cow body
+ # is untouched.
+ self._loop = asyncio.new_event_loop()
+
+ def _run_loop():
+ asyncio.set_event_loop(self._loop)
+ try:
+ self._loop.run_until_complete(self._async_main(Application, MessageHandler, CommandHandler, filters))
+ except Exception as e:
+ logger.error(f"[Telegram] event loop crashed: {e}", exc_info=True)
+ self.report_startup_error(str(e))
+ finally:
+ try:
+ self._loop.close()
+ except Exception:
+ pass
+ logger.info("[Telegram] event loop exited")
+
+ self._loop_thread = threading.Thread(target=_run_loop, daemon=True, name="telegram-loop")
+ self._loop_thread.start()
+ # Block startup() until the loop thread exits, matching other channels'
+ # behaviour (startup is a blocking call).
+ self._loop_thread.join()
+
+ async def _async_main(self, Application, MessageHandler, CommandHandler, filters):
+ """Build Application, register handlers, and run polling."""
+ builder = Application.builder().token(self.bot_token)
+
+ # Proxy: prefer telegram_proxy config, fall back to HTTPS_PROXY env var
+ proxy_url = conf().get("telegram_proxy", "") or os.environ.get("HTTPS_PROXY", "")
+ if proxy_url:
+ try:
+ builder = builder.proxy(proxy_url).get_updates_proxy(proxy_url)
+ logger.info(f"[Telegram] using proxy: {proxy_url}")
+ except Exception as e:
+ logger.warning(f"[Telegram] proxy config failed, fallback to direct: {e}")
+
+ # Media uploads (photo/voice/video/document) over a proxy can be slow,
+ # bump read/write/connect/pool timeouts.
+ builder = (
+ builder
+ .read_timeout(60)
+ .write_timeout(120)
+ .connect_timeout(30)
+ .pool_timeout(30)
+ )
+
+ application = builder.build()
+ self._application = application
+ self._bot = application.bot
+
+ # Fetch our own username (needed for @-mention matching in groups)
+ try:
+ me = await self._bot.get_me()
+ self.bot_username = me.username or ""
+ self.name = self.bot_username # ChatChannel uses self.name to strip @-mention
+ logger.info(f"[Telegram] Bot logged in as @{self.bot_username} (id={me.id})")
+ except Exception as e:
+ err = f"[Telegram] get_me failed: {e}"
+ logger.error(err)
+ self.report_startup_error(err)
+ return
+
+ # Register the command menu (failure is non-fatal)
+ if conf().get("telegram_register_commands", True):
+ try:
+ from telegram import BotCommand
+ cmds = [BotCommand(name, desc) for name, desc in TELEGRAM_BOT_COMMANDS]
+ await self._bot.set_my_commands(cmds)
+ logger.info(f"[Telegram] Registered {len(cmds)} bot commands")
+ except Exception as e:
+ logger.warning(f"[Telegram] set_my_commands failed: {e}")
+
+ # Handlers:
+ # 1) /cancel uses the fast-path
+ application.add_handler(CommandHandler("cancel", self._on_cancel))
+ # 2) Normal messages (text + media)
+ application.add_handler(MessageHandler(filters.ALL & ~filters.COMMAND, self._on_message))
+ # 3) Other slash commands are forwarded as plain text for the agent to handle
+ application.add_handler(MessageHandler(filters.COMMAND, self._on_command_passthrough))
+
+ # Start polling. drop_pending_updates avoids replaying backlog after restart.
+ logger.info("[Telegram] Starting long polling...")
+ await application.initialize()
+ await application.start()
+ await application.updater.start_polling(drop_pending_updates=True)
+ self.report_startup_success()
+ logger.info("[Telegram] ✅ Telegram bot ready, polling for updates")
+
+ # Block until stop()
+ try:
+ while not self._stop_event.is_set():
+ await asyncio.sleep(0.5)
+ finally:
+ try:
+ await application.updater.stop()
+ await application.stop()
+ await application.shutdown()
+ except Exception as e:
+ logger.warning(f"[Telegram] shutdown error: {e}")
+
+ def stop(self):
+ logger.info("[Telegram] stop() called")
+ self._stop_event.set()
+ if self._loop_thread and self._loop_thread.is_alive():
+ try:
+ self._loop_thread.join(timeout=10)
+ except Exception:
+ pass
+ logger.info("[Telegram] stop() completed")
+
+ # ------------------------------------------------------------------
+ # Inbound: telegram update -> ChatMessage -> ChatChannel.produce
+ # ------------------------------------------------------------------
+
+ async def _on_cancel(self, update, _context):
+ """Fast-path: /cancel calls cancel_session directly without going through agent."""
+ try:
+ from agent.protocol import get_cancel_registry
+ session_id = self._compute_session_id(update)
+ cancelled = get_cancel_registry().cancel_session(session_id)
+ text = "Current task cancelled." if cancelled else "No running task to cancel."
+ await update.effective_message.reply_text(text)
+ logger.info(f"[Telegram] /cancel session={session_id}, cancelled={cancelled}")
+ except Exception as e:
+ logger.error(f"[Telegram] /cancel error: {e}", exc_info=True)
+ try:
+ await update.effective_message.reply_text(f"⚠️ /cancel failed: {e}")
+ except Exception:
+ pass
+
+ async def _on_command_passthrough(self, update, _context):
+ """All non-/cancel commands fall through to plain message handling."""
+ await self._on_message(update, _context)
+
+ async def _on_message(self, update, _context):
+ """Telegram update entry: parse message -> build ChatMessage -> produce()."""
+ try:
+ message = update.effective_message
+ chat = update.effective_chat
+ if not message or not chat:
+ return
+
+ # Idempotent dedup
+ msg_uid = f"{chat.id}:{message.message_id}"
+ if self._received_msgs.get(msg_uid):
+ return
+ self._received_msgs[msg_uid] = True
+
+ is_group = chat.type in ("group", "supergroup")
+
+ # Debug log: helpful when group messages are silently dropped
+ if is_group:
+ logger.debug(
+ f"[Telegram] group update received: chat_id={chat.id}, "
+ f"text={(message.text or message.caption or '')[:40]!r}, "
+ f"reply_to_bot={bool(message.reply_to_message and message.reply_to_message.from_user and message.reply_to_message.from_user.username == self.bot_username)}"
+ )
+
+ # Group trigger gate (silently drop if not triggered)
+ if is_group and not self._should_reply_in_group(update):
+ logger.debug(f"[Telegram] group message not triggered (need @{self.bot_username} or reply), skip")
+ return
+
+ # Parse message type + download media if needed.
+ # Media messages with caption return both the local path and the caption text.
+ ctype, content, caption = await self._parse_message(message)
+ if ctype is None:
+ logger.debug(f"[Telegram] unsupported message type, skip. msg={message}")
+ return
+
+ # Strip @bot mention for group text/caption
+ if is_group and self.bot_username:
+ if ctype == ContextType.TEXT and content:
+ content = self._strip_at_mention(content)
+ if caption:
+ caption = self._strip_at_mention(caption)
+
+ tg_msg = TelegramMessage(
+ update,
+ is_group=is_group,
+ bot_username=self.bot_username,
+ ctype=ctype,
+ content=content,
+ )
+ tg_msg.is_at = is_group # If we got here in a group, the bot is mentioned/replied
+
+ # File cache: standalone media goes into cache, the next text query attaches them
+ from channel.file_cache import get_file_cache
+ file_cache = get_file_cache()
+ session_id = self._compute_session_id(update)
+
+ # Media + caption together: treat as a complete query and bypass the cache
+ if ctype in (ContextType.IMAGE, ContextType.FILE) and caption:
+ tag = "image" if ctype == ContextType.IMAGE else "file"
+ merged_text = f"{caption}\n[{tag}: {content}]"
+ tg_msg.ctype = ContextType.TEXT
+ tg_msg.content = merged_text
+ ctype = ContextType.TEXT
+ logger.info(f"[Telegram] Media+caption merged for session {session_id}")
+ # fallthrough to the TEXT branch below
+
+ elif ctype == ContextType.IMAGE:
+ file_cache.add(session_id, content, file_type="image")
+ logger.info(f"[Telegram] Image cached for session {session_id}, waiting for query...")
+ return
+ elif ctype == ContextType.FILE:
+ file_cache.add(session_id, content, file_type="file")
+ logger.info(f"[Telegram] File cached for session {session_id}: {content}")
+ return
+
+ if ctype == ContextType.TEXT:
+ cached_files = file_cache.get(session_id)
+ if cached_files:
+ refs = []
+ for fi in cached_files:
+ ftype = fi["type"]
+ tag = ftype if ftype in ("image", "video") else "file"
+ refs.append(f"[{tag}: {fi['path']}]")
+ tg_msg.content = (tg_msg.content or "") + "\n" + "\n".join(refs)
+ file_cache.clear(session_id)
+ logger.info(f"[Telegram] Attached {len(cached_files)} cached file(s) to query")
+
+ # Dispatch to cow main pipeline (reuses ChatChannel._compose_context routing)
+ context = self._compose_context(
+ tg_msg.ctype,
+ tg_msg.content,
+ isgroup=is_group,
+ msg=tg_msg,
+ )
+ if context:
+ context["session_id"] = session_id
+ context["receiver"] = str(chat.id)
+ context["telegram_chat_id"] = chat.id
+ context["telegram_reply_to_msg_id"] = message.message_id if is_group else None
+ self.produce(context)
+ logger.debug(f"[Telegram] received: type={ctype}, content={str(tg_msg.content)[:80]}")
+
+ except Exception as e:
+ logger.error(f"[Telegram] _on_message error: {e}", exc_info=True)
+
+ async def _parse_message(self, message):
+ """Parse a telegram message and return (ctype, content, caption).
+
+ - content is text for ContextType.TEXT, otherwise the local file path
+ - caption is the optional text accompanying a media message; empty for plain text
+ """
+ caption = (message.caption or "").strip()
+
+ if message.photo:
+ largest = message.photo[-1]
+ path = await self._download_file(largest.file_id, suffix=".jpg")
+ return (ContextType.IMAGE, path, caption) if path else (None, None, "")
+
+ if message.voice or message.audio:
+ audio_obj = message.voice or message.audio
+ suffix = ".ogg" if message.voice else (
+ "." + (audio_obj.mime_type.split("/")[-1] if getattr(audio_obj, "mime_type", "") else "mp3")
+ )
+ path = await self._download_file(audio_obj.file_id, suffix=suffix)
+ return (ContextType.VOICE, path, caption) if path else (None, None, "")
+
+ if message.video or message.video_note:
+ video_obj = message.video or message.video_note
+ path = await self._download_file(video_obj.file_id, suffix=".mp4")
+ return (ContextType.FILE, path, caption) if path else (None, None, "")
+
+ if message.document:
+ doc = message.document
+ ext = ""
+ if doc.file_name and "." in doc.file_name:
+ ext = "." + doc.file_name.rsplit(".", 1)[-1]
+ path = await self._download_file(doc.file_id, suffix=ext, original_name=doc.file_name)
+ if not path:
+ return (None, None, "")
+ # Image-typed documents (user picked "send as file") are treated as images
+ mime = (doc.mime_type or "").lower()
+ if mime.startswith("image/"):
+ return (ContextType.IMAGE, path, caption)
+ return (ContextType.FILE, path, caption)
+
+ if message.text:
+ return (ContextType.TEXT, message.text.strip(), "")
+
+ return (None, None, "")
+
+ async def _download_file(self, file_id: str, suffix: str = "", original_name: str = ""):
+ """Download via bot.get_file into the local tmp dir; return path or None on failure."""
+ try:
+ f = await self._bot.get_file(file_id)
+ tmp_dir = TelegramMessage.get_tmp_dir()
+ base = original_name or f"{file_id}{suffix or ''}"
+ # Prefix with file_id to avoid name collisions / weird chars
+ safe_name = f"{file_id}_{base}" if original_name else base
+ local_path = os.path.join(tmp_dir, safe_name)
+ await f.download_to_drive(custom_path=local_path)
+ logger.debug(f"[Telegram] downloaded file_id={file_id} -> {local_path}")
+ return local_path
+ except Exception as e:
+ logger.error(f"[Telegram] download_file failed (file_id={file_id}): {e}")
+ return None
+
+ # ------------------------------------------------------------------
+ # Group trigger logic
+ # ------------------------------------------------------------------
+
+ def _should_reply_in_group(self, update) -> bool:
+ """Decide whether to reply to a group message based on configuration."""
+ mode = conf().get("telegram_group_trigger", "mention_or_reply")
+ if mode == "all":
+ return True
+
+ message = update.effective_message
+ if not message:
+ return False
+
+ # 1) Mentioned
+ if self.bot_username and self._is_mentioned(message, self.bot_username):
+ return True
+
+ # 2) Reply to a bot message
+ if mode == "mention_or_reply":
+ reply = message.reply_to_message
+ if reply and reply.from_user and reply.from_user.username == self.bot_username:
+ return True
+
+ return False
+
+ @staticmethod
+ def _is_mentioned(message, bot_username: str) -> bool:
+ """Check whether entities/caption_entities contain a @mention of the bot."""
+ bot_at = "@" + bot_username.lower()
+ text = (message.text or message.caption or "").lower()
+ if bot_at in text:
+ return True
+ # Also check entities strictly to support text_mention (no-username @)
+ for ent in (message.entities or []) + (message.caption_entities or []):
+ if ent.type == "mention":
+ src = message.text or message.caption or ""
+ if src[ent.offset: ent.offset + ent.length].lower() == bot_at:
+ return True
+ return False
+
+ def _strip_at_mention(self, content: str) -> str:
+ """Strip @bot_username from group text (case-insensitive)."""
+ if not content or not self.bot_username:
+ return content
+ pattern = re.compile(r"@" + re.escape(self.bot_username), re.IGNORECASE)
+ return pattern.sub("", content).strip()
+
+ @staticmethod
+ def _compute_session_id(update) -> str:
+ chat = update.effective_chat
+ user = update.effective_user
+ is_group = chat.type in ("group", "supergroup")
+ if is_group:
+ if conf().get("group_shared_session", True):
+ return f"tg_group_{chat.id}"
+ return f"tg_group_{chat.id}_{user.id}"
+ return f"tg_user_{user.id}"
+
+ # ------------------------------------------------------------------
+ # Override _compose_context: skip the parent's group whitelist/at checks
+ # (already handled in _on_message via _should_reply_in_group). Same idea
+ # as the feishu channel.
+ # ------------------------------------------------------------------
+
+ def _compose_context(self, ctype: ContextType, content, **kwargs):
+ context = Context(ctype, content)
+ context.kwargs = kwargs
+ if "channel_type" not in context:
+ context["channel_type"] = self.channel_type
+ if "origin_ctype" not in context:
+ context["origin_ctype"] = ctype
+
+ cmsg = context["msg"]
+ if cmsg.is_group:
+ if conf().get("group_shared_session", True):
+ context["session_id"] = cmsg.other_user_id
+ else:
+ context["session_id"] = f"{cmsg.from_user_id}:{cmsg.other_user_id}"
+ else:
+ context["session_id"] = cmsg.from_user_id
+ context["receiver"] = cmsg.other_user_id
+
+ if ctype == ContextType.TEXT:
+ img_match_prefix = check_prefix(content, conf().get("image_create_prefix"))
+ if img_match_prefix:
+ content = content.replace(img_match_prefix, "", 1)
+ context.type = ContextType.IMAGE_CREATE
+ else:
+ context.type = ContextType.TEXT
+ context.content = (content or "").strip()
+ if "desire_rtype" not in context and conf().get("always_reply_voice"):
+ context["desire_rtype"] = ReplyType.VOICE
+ elif ctype == ContextType.VOICE:
+ if "desire_rtype" not in context and (
+ conf().get("voice_reply_voice") or conf().get("always_reply_voice")
+ ):
+ context["desire_rtype"] = ReplyType.VOICE
+
+ return context
+
+ # ------------------------------------------------------------------
+ # Outbound: ChatChannel.send -> Telegram API
+ # ------------------------------------------------------------------
+
+ def send(self, reply: Reply, context: Context):
+ """Called from cow's sync main thread; we marshal the coroutine onto the loop thread."""
+ if self._loop is None or self._bot is None:
+ logger.warning("[Telegram] bot not ready, drop reply")
+ return
+
+ chat_id = context.get("telegram_chat_id")
+ reply_to = context.get("telegram_reply_to_msg_id")
+ if chat_id is None:
+ logger.warning("[Telegram] no telegram_chat_id in context, drop reply")
+ return
+
+ coro = self._async_send(reply, chat_id, reply_to)
+ try:
+ future = asyncio.run_coroutine_threadsafe(coro, self._loop)
+ # Media uploads through a proxy can be slow; let PTB's own timeouts win
+ future.result(timeout=180)
+ except Exception as e:
+ logger.error(f"[Telegram] send failed: {e}")
+
+ # Number of retries for transient network errors (proxy hiccups etc.)
+ _SEND_RETRIES = 2
+ _SEND_RETRY_BACKOFF = 2.0 # seconds
+
+ async def _send_with_retry(self, send_fn, *, label: str):
+ """Run a single Telegram API call with retries for transient network errors."""
+ from telegram.error import NetworkError, TimedOut
+ last_err = None
+ for attempt in range(self._SEND_RETRIES + 1):
+ try:
+ return await send_fn()
+ except (NetworkError, TimedOut) as e:
+ last_err = e
+ if attempt >= self._SEND_RETRIES:
+ break
+ wait = self._SEND_RETRY_BACKOFF * (attempt + 1)
+ logger.warning(
+ f"[Telegram] {label} transient error (attempt {attempt + 1}/"
+ f"{self._SEND_RETRIES + 1}): {e}; retry in {wait}s"
+ )
+ await asyncio.sleep(wait)
+ raise last_err
+
+ async def _async_send(self, reply: Reply, chat_id, reply_to_msg_id):
+ try:
+ rtype = reply.type
+ content = reply.content
+
+ if rtype == ReplyType.TEXT or rtype == ReplyType.INFO or rtype == ReplyType.ERROR:
+ # Telegram caps a single text message at 4096 chars; auto-split
+ text = str(content) if content is not None else ""
+ if not text:
+ return
+ for chunk in _split_text(text, 4000):
+ await self._send_with_retry(
+ lambda c=chunk: self._bot.send_message(
+ chat_id=chat_id,
+ text=c,
+ reply_to_message_id=reply_to_msg_id,
+ # Avoid failing the whole send if reply_to was deleted
+ allow_sending_without_reply=True,
+ ),
+ label="send_message",
+ )
+
+ elif rtype == ReplyType.IMAGE:
+ # Already a local BytesIO; send it directly
+ content.seek(0)
+ await self._send_with_retry(
+ lambda: self._bot.send_photo(
+ chat_id=chat_id,
+ photo=content,
+ reply_to_message_id=reply_to_msg_id,
+ allow_sending_without_reply=True,
+ ),
+ label="send_photo",
+ )
+
+ elif rtype == ReplyType.IMAGE_URL:
+ url = str(content)
+ if url.startswith("file://"):
+ local = url[7:]
+ # Open inside the lambda so each retry gets a fresh stream
+ async def _send_local_photo():
+ with open(local, "rb") as f:
+ return await self._bot.send_photo(
+ chat_id=chat_id, photo=f,
+ reply_to_message_id=reply_to_msg_id,
+ allow_sending_without_reply=True,
+ )
+ await self._send_with_retry(_send_local_photo, label="send_photo(file)")
+ else:
+ await self._send_with_retry(
+ lambda: self._bot.send_photo(
+ chat_id=chat_id, photo=url,
+ reply_to_message_id=reply_to_msg_id,
+ allow_sending_without_reply=True,
+ ),
+ label="send_photo(url)",
+ )
+
+ elif rtype == ReplyType.VOICE:
+ local = content[7:] if isinstance(content, str) and content.startswith("file://") else content
+ async def _send_voice():
+ with open(local, "rb") as f:
+ return await self._bot.send_voice(
+ chat_id=chat_id, voice=f,
+ reply_to_message_id=reply_to_msg_id,
+ allow_sending_without_reply=True,
+ )
+ await self._send_with_retry(_send_voice, label="send_voice")
+
+ elif rtype == ReplyType.FILE:
+ # Videos go through send_video, everything else through send_document
+ local = content[7:] if isinstance(content, str) and content.startswith("file://") else content
+ # File replies may carry an accompanying text caption
+ caption = getattr(reply, "text_content", None) or None
+ is_video = isinstance(local, str) and local.lower().endswith(
+ (".mp4", ".mov", ".avi", ".mkv", ".webm")
+ )
+
+ async def _send_file():
+ with open(local, "rb") as f:
+ if is_video:
+ return await self._bot.send_video(
+ chat_id=chat_id, video=f, caption=caption,
+ reply_to_message_id=reply_to_msg_id,
+ allow_sending_without_reply=True,
+ )
+ return await self._bot.send_document(
+ chat_id=chat_id, document=f, caption=caption,
+ reply_to_message_id=reply_to_msg_id,
+ allow_sending_without_reply=True,
+ )
+ await self._send_with_retry(_send_file, label="send_video" if is_video else "send_document")
+
+ else:
+ # Fallback: send as plain text
+ await self._send_with_retry(
+ lambda: self._bot.send_message(
+ chat_id=chat_id, text=str(content),
+ reply_to_message_id=reply_to_msg_id,
+ allow_sending_without_reply=True,
+ ),
+ label="send_message(fallback)",
+ )
+
+ logger.info(f"[Telegram] sent reply (type={rtype}, chat_id={chat_id})")
+
+ except Exception as e:
+ logger.error(f"[Telegram] _async_send error: {e}", exc_info=True)
+
+
+def _split_text(text: str, limit: int):
+ """Split long text preferring line breaks to keep markdown structure intact."""
+ if len(text) <= limit:
+ yield text
+ return
+ buf = []
+ size = 0
+ for line in text.splitlines(keepends=True):
+ if size + len(line) > limit and buf:
+ yield "".join(buf)
+ buf, size = [], 0
+ # Hard-split single lines that exceed the limit
+ while len(line) > limit:
+ yield line[:limit]
+ line = line[limit:]
+ buf.append(line)
+ size += len(line)
+ if buf:
+ yield "".join(buf)
diff --git a/channel/telegram/telegram_message.py b/channel/telegram/telegram_message.py
new file mode 100644
index 00000000..c97c6059
--- /dev/null
+++ b/channel/telegram/telegram_message.py
@@ -0,0 +1,62 @@
+"""
+Telegram message adapter.
+
+Convert a python-telegram-bot Update into cow's unified ChatMessage.
+File downloads are NOT performed here; the channel layer triggers
+bot.get_file() on demand because it requires the async event loop.
+"""
+import os
+
+from bridge.context import ContextType
+from channel.chat_message import ChatMessage
+from common.utils import expand_path
+from config import conf
+
+
+class TelegramMessage(ChatMessage):
+ """Wrap a Telegram Update into the unified ChatMessage."""
+
+ def __init__(self, update, is_group: bool = False, bot_username: str = "",
+ ctype: ContextType = ContextType.TEXT, content: str = ""):
+ super().__init__(update)
+ message = update.effective_message
+ chat = update.effective_chat
+ user = update.effective_user
+
+ # Basic fields
+ self.msg_id = str(message.message_id) if message else ""
+ self.create_time = int(message.date.timestamp()) if message and message.date else 0
+ self.ctype = ctype
+ self.content = content
+
+ # Sender / chat info
+ from_user_id = str(user.id) if user else "unknown"
+ from_user_nick = (
+ user.full_name if user and user.full_name else (user.username if user else "unknown")
+ )
+ self.from_user_id = from_user_id
+ self.from_user_nickname = from_user_nick or from_user_id
+ self.to_user_id = bot_username or "telegram_bot"
+ self.to_user_nickname = bot_username or "telegram_bot"
+
+ self.is_group = is_group
+ if is_group:
+ # Group: other_user_id = group_id, actual_user_id = sender id
+ self.other_user_id = str(chat.id)
+ self.other_user_nickname = chat.title or str(chat.id)
+ self.actual_user_id = from_user_id
+ self.actual_user_nickname = self.from_user_nickname
+ else:
+ self.other_user_id = from_user_id
+ self.other_user_nickname = self.from_user_nickname
+
+ # Whether the bot was triggered by @-mention or reply (set by channel layer)
+ self.is_at = False
+
+ @staticmethod
+ def get_tmp_dir() -> str:
+ """Local download directory, aligned with other channels (agent_workspace/tmp)."""
+ workspace_root = expand_path(conf().get("agent_workspace", "~/cow"))
+ tmp_dir = os.path.join(workspace_root, "tmp")
+ os.makedirs(tmp_dir, exist_ok=True)
+ return tmp_dir
diff --git a/channel/web/web_channel.py b/channel/web/web_channel.py
index af4e241e..57891855 100644
--- a/channel/web/web_channel.py
+++ b/channel/web/web_channel.py
@@ -2909,6 +2909,14 @@ class ChannelsHandler:
{"key": "wechatmp_port", "label": "Port", "type": "number", "default": 8080},
],
}),
+ ("telegram", {
+ "label": {"zh": "Telegram", "en": "Telegram"},
+ "icon": "fa-paper-plane",
+ "color": "sky",
+ "fields": [
+ {"key": "telegram_token", "label": "Bot Token", "type": "secret"},
+ ],
+ }),
])
@staticmethod
diff --git a/common/const.py b/common/const.py
index 7addd6af..4daea457 100644
--- a/common/const.py
+++ b/common/const.py
@@ -243,3 +243,4 @@ DINGTALK = "dingtalk"
WECOM_BOT = "wecom_bot"
QQ = "qq"
WEIXIN = "weixin"
+TELEGRAM = "telegram"
diff --git a/config.py b/config.py
index 1d44dcc5..cf8f43fe 100644
--- a/config.py
+++ b/config.py
@@ -166,6 +166,11 @@ available_setting = {
# 企微智能机器人配置(长连接模式)
"wecom_bot_id": "", # 企微智能机器人BotID
"wecom_bot_secret": "", # 企微智能机器人长连接Secret
+ # Telegram 配置
+ "telegram_token": "", # 从 @BotFather 申请的 bot token
+ "telegram_proxy": "", # 可选的 HTTP/SOCKS5 代理,例如 http://127.0.0.1:7890 或 socks5://127.0.0.1:1080(留空则走系统环境变量)
+ "telegram_group_trigger": "mention_or_reply", # 群聊触发方式: mention_or_reply(@或回复触发,推荐) | mention_only(仅@) | all(所有消息)
+ "telegram_register_commands": True, # 启动时是否自动向 BotFather 注册命令菜单(与 web 端 slash 命令一致)
# 微信配置
"weixin_token": "", # 微信登录后获取的bot_token,留空则启动时自动扫码登录
"weixin_base_url": "https://ilinkai.weixin.qq.com", # Weixin ilink API base URL
@@ -174,7 +179,7 @@ available_setting = {
# chatgpt指令自定义触发词
"clear_memory_commands": ["#清除记忆"], # 重置会话指令,必须以#开头
# channel配置
- "channel_type": "", # 通道类型,支持多渠道同时运行。单个: "feishu",多个: "feishu, dingtalk" 或 ["feishu", "dingtalk"]。可选值: web,feishu,dingtalk,wecom_bot,weixin,wechatmp,wechatmp_service,wechatcom_app
+ "channel_type": "", # 通道类型,支持多渠道同时运行。单个: "feishu",多个: "feishu, dingtalk" 或 ["feishu", "dingtalk"]。可选值: web,feishu,dingtalk,wecom_bot,weixin,wechatmp,wechatmp_service,wechatcom_app,telegram
"web_console": True, # 是否自动启动Web控制台(默认启动)。设为False可禁用
"subscribe_msg": "", # 订阅消息, 支持: wechatmp, wechatmp_service, wechatcom_app
"debug": False, # 是否开启debug模式,开启后会打印更多日志
diff --git a/docs/channels/index.mdx b/docs/channels/index.mdx
index 9049a51d..5d3fffdc 100644
--- a/docs/channels/index.mdx
+++ b/docs/channels/index.mdx
@@ -19,6 +19,7 @@ CowAgent 支持接入多种聊天通道,启动时通过 `channel_type` 切换
| [QQ](/channels/qq) | ✅ | ✅ | ✅ | | ✅ |
| [企业微信应用](/channels/wecom) | ✅ | ✅ | ✅ | ✅ | |
| [公众号](/channels/wechatmp) | ✅ | ✅ | | ✅ | |
+| [Telegram](/channels/telegram) | ✅ | ✅ | ✅ | ✅ | ✅ |
- **图片 / 文件 / 语音**列表示通道支持收发对应消息类型,具体细节详见各通道文档
- **群聊**列指可识别并响应群消息
@@ -37,3 +38,4 @@ CowAgent 支持接入多种聊天通道,启动时通过 `channel_type` 切换
- [QQ](/channels/qq) — QQ 官方机器人开放平台
- [企业微信应用](/channels/wecom) — 企业微信自建应用接入
- [公众号](/channels/wechatmp) — 微信公众号(订阅号 / 服务号)
+- [Telegram](/channels/telegram) — 海外 IM,5 分钟接入,无需公网 IP
diff --git a/docs/channels/telegram.mdx b/docs/channels/telegram.mdx
new file mode 100644
index 00000000..d7ab7a44
--- /dev/null
+++ b/docs/channels/telegram.mdx
@@ -0,0 +1,112 @@
+---
+title: Telegram
+description: 将 CowAgent 接入 Telegram Bot
+---
+
+> 通过 Telegram Bot API 接入 CowAgent,支持单聊与群聊(@机器人 / 回复机器人触发),使用 Long Polling 模式无需公网 IP,开箱即用。
+
+
+## 一、接入步骤
+
+### 步骤一:通过 BotFather 创建 Bot
+
+1. 在 Telegram 中搜索并打开官方账号 [@BotFather](https://t.me/BotFather)。
+2. 发送 `/newbot` 命令,按提示输入:
+ - **Bot 名称**(显示名,可中文,例如 `My CowAgent Bot`)
+ - **Bot 用户名**(必须以 `bot` 结尾,例如 `my_cowagent_bot`)
+3. 创建成功后,BotFather 会返回一段 **HTTP API Token**(形如 `123456789:ABCdefGhIJKlmNoPQRsTUVwxyZ`),妥善保存。
+
+
🌐 官网 · @@ -30,7 +30,7 @@ CowAgent 轻量、易部署、可扩展,自由接入主流大模型,覆盖 | [知识库](https://docs.cowagent.ai/knowledge) | 自动整理结构化知识为 Markdown Wiki,构建持续增长的知识图谱,可视化浏览 | | [技能](https://docs.cowagent.ai/skills) | 从 [Skill Hub](https://skills.cowagent.ai/)、GitHub、ClawHub 等一键安装;也可通过对话创造自定义技能 | | [工具](https://docs.cowagent.ai/tools) | 内置文件读写、终端、浏览器、定时任务、记忆检索、联网搜索等 10+ 工具,支持 MCP 协议 | -| [通道](https://docs.cowagent.ai/channels) | 一个 Agent 同时接入 Web、微信、飞书、钉钉、企微、QQ、公众号 等多个渠道 | +| [通道](https://docs.cowagent.ai/channels) | 一个 Agent 同时接入 Web、微信、飞书、钉钉、企微、QQ、公众号、Telegram 等多个渠道 | | 多模态 | 文本、图片、语音、文件全消息类型支持,覆盖识别、生成、收发 | | [模型](https://docs.cowagent.ai/models) | DeepSeek、Claude、Gemini、GPT、GLM、Qwen、Kimi、MiniMax、Doubao 等主流厂商,配置一行切换 | | [部署](https://docs.cowagent.ai/guide/quick-start) | 一键脚本安装,Web 控制台统一管理;本地、Docker、服务器多种部署方式 | @@ -126,6 +126,7 @@ CowAgent 支持国内外主流厂商的大语言模型。**文本对话、图像 | [QQ](https://docs.cowagent.ai/channels/qq) | ✅ | ✅ | ✅ | | ✅ | | [企业微信应用](https://docs.cowagent.ai/channels/wecom) | ✅ | ✅ | ✅ | ✅ | | | [微信公众号](https://docs.cowagent.ai/channels/wechatmp) | ✅ | ✅ | | ✅ | | +| [Telegram](https://docs.cowagent.ai/channels/telegram) | ✅ | ✅ | ✅ | ✅ | ✅ | > 飞书、企微智能机器人支持在 Web 控制台内**扫码一键接入**,无需公网 IP。详见 [通道概览](https://docs.cowagent.ai/channels)。 diff --git a/requirements.txt b/requirements.txt index 706d9894..b6592d70 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,3 +27,5 @@ dingtalk_stream # wecom bot websocket mode websocket-client>=1.4.0 pycryptodome +# telegram bot (requires Python >= 3.9; skipped on older versions) +python-telegram-bot; python_version >= "3.9"