From 46945942e1df9b2808296600d47cb4ac0e84e379 Mon Sep 17 00:00:00 2001 From: zhayujie Date: Fri, 13 Feb 2026 12:38:52 +0800 Subject: [PATCH 1/4] feat: support channel start in sub thread --- app.py | 162 +++++++++++++++++++--- channel/channel.py | 6 + channel/dingtalk/dingtalk_channel.py | 21 ++- channel/feishu/feishu_channel.py | 25 +++- channel/web/web_channel.py | 24 +++- channel/wechatcom/wechatcomapp_channel.py | 24 +++- channel/wechatmp/wechatmp_channel.py | 19 ++- common/linkai_client.py | 144 ++++++++++++++++++- 8 files changed, 382 insertions(+), 43 deletions(-) diff --git a/app.py b/app.py index 022fdd70..97b58a1b 100644 --- a/app.py +++ b/app.py @@ -7,11 +7,152 @@ import time from channel import channel_factory from common import const -from config import load_config +from common.log import logger +from config import load_config, conf from plugins import * import threading +# Global channel manager for restart support +_channel_mgr = None + + +def get_channel_manager(): + return _channel_mgr + + +class ChannelManager: + """ + Manage the lifecycle of a channel, supporting restart from sub-threads. + The channel.startup() runs in a daemon thread so that the main thread + remains available and a new channel can be started at any time. + """ + + def __init__(self): + self._channel = None + self._channel_thread = None + self._lock = threading.Lock() + + @property + def channel(self): + return self._channel + + def start(self, channel_name: str, first_start: bool = False): + """ + Create and start a channel in a sub-thread. + If first_start is True, plugins and linkai client will also be initialized. + """ + with self._lock: + channel = channel_factory.create_channel(channel_name) + self._channel = channel + + if first_start: + if channel_name in ["wx", "wxy", "terminal", "wechatmp", "web", + "wechatmp_service", "wechatcom_app", "wework", + const.FEISHU, const.DINGTALK]: + PluginManager().load_plugins() + + if conf().get("use_linkai"): + try: + from common import linkai_client + threading.Thread(target=linkai_client.start, args=(channel, self), daemon=True).start() + except Exception as e: + pass + + # Run channel.startup() in a daemon thread so we can restart later + self._channel_thread = threading.Thread( + target=self._run_channel, args=(channel,), daemon=True + ) + self._channel_thread.start() + logger.info(f"[ChannelManager] Channel '{channel_name}' started in sub-thread") + + def _run_channel(self, channel): + try: + channel.startup() + except Exception as e: + logger.error(f"[ChannelManager] Channel startup error: {e}") + logger.exception(e) + + def stop(self): + """ + Stop the current channel. Since most channel startup() methods block + on an HTTP server or stream client, we stop by terminating the thread. + """ + with self._lock: + if self._channel is None: + return + channel_type = getattr(self._channel, 'channel_type', 'unknown') + logger.info(f"[ChannelManager] Stopping channel '{channel_type}'...") + + # Try graceful stop if channel implements it + try: + if hasattr(self._channel, 'stop'): + self._channel.stop() + except Exception as e: + logger.warning(f"[ChannelManager] Error during channel stop: {e}") + + self._channel = None + self._channel_thread = None + + def restart(self, new_channel_name: str): + """ + Restart the channel with a new channel type. + Can be called from any thread (e.g. linkai config callback). + """ + logger.info(f"[ChannelManager] Restarting channel to '{new_channel_name}'...") + self.stop() + + # Clear singleton cache so a fresh channel instance is created + _clear_singleton_cache(new_channel_name) + + time.sleep(1) # Brief pause to allow resources to release + self.start(new_channel_name, first_start=False) + logger.info(f"[ChannelManager] Channel restarted to '{new_channel_name}' successfully") + + +def _clear_singleton_cache(channel_name: str): + """ + Clear the singleton cache for the channel class so that + a new instance can be created with updated config. + """ + cls_map = { + "wx": "channel.wechat.wechat_channel.WechatChannel", + "wxy": "channel.wechat.wechaty_channel.WechatyChannel", + "wcf": "channel.wechat.wcf_channel.WechatfChannel", + "web": "channel.web.web_channel.WebChannel", + "wechatmp": "channel.wechatmp.wechatmp_channel.WechatMPChannel", + "wechatmp_service": "channel.wechatmp.wechatmp_channel.WechatMPChannel", + "wechatcom_app": "channel.wechatcom.wechatcomapp_channel.WechatComAppChannel", + "wework": "channel.wework.wework_channel.WeworkChannel", + const.FEISHU: "channel.feishu.feishu_channel.FeiShuChanel", + const.DINGTALK: "channel.dingtalk.dingtalk_channel.DingTalkChanel", + } + module_path = cls_map.get(channel_name) + if not module_path: + return + # The singleton decorator stores instances in a closure dict keyed by class. + # We need to find the actual class and clear it from the closure. + try: + parts = module_path.rsplit(".", 1) + module_name, class_name = parts[0], parts[1] + import importlib + module = importlib.import_module(module_name) + # The module-level name is the wrapper function from @singleton + wrapper = getattr(module, class_name, None) + if wrapper and hasattr(wrapper, '__closure__') and wrapper.__closure__: + for cell in wrapper.__closure__: + try: + cell_contents = cell.cell_contents + if isinstance(cell_contents, dict): + cell_contents.clear() + logger.debug(f"[ChannelManager] Cleared singleton cache for {class_name}") + break + except ValueError: + pass + except Exception as e: + logger.warning(f"[ChannelManager] Failed to clear singleton cache: {e}") + + def sigterm_handler_wrap(_signo): old_handler = signal.getsignal(_signo) @@ -25,22 +166,8 @@ def sigterm_handler_wrap(_signo): signal.signal(_signo, func) -def start_channel(channel_name: str): - channel = channel_factory.create_channel(channel_name) - if channel_name in ["wx", "wxy", "terminal", "wechatmp", "web", "wechatmp_service", "wechatcom_app", "wework", - const.FEISHU, const.DINGTALK]: - PluginManager().load_plugins() - - if conf().get("use_linkai"): - try: - from common import linkai_client - threading.Thread(target=linkai_client.start, args=(channel,)).start() - except Exception as e: - pass - channel.startup() - - def run(): + global _channel_mgr try: # load config load_config() @@ -58,7 +185,8 @@ def run(): if channel_name == "wxy": os.environ["WECHATY_LOG"] = "warn" - start_channel(channel_name) + _channel_mgr = ChannelManager() + _channel_mgr.start(channel_name, first_start=True) while True: time.sleep(1) diff --git a/channel/channel.py b/channel/channel.py index 08799c68..42d613f8 100644 --- a/channel/channel.py +++ b/channel/channel.py @@ -19,6 +19,12 @@ class Channel(object): """ raise NotImplementedError + def stop(self): + """ + stop channel gracefully, called before restart + """ + pass + def handle_text(self, msg): """ process received msg diff --git a/channel/dingtalk/dingtalk_channel.py b/channel/dingtalk/dingtalk_channel.py index 0094f562..4c41d3b7 100644 --- a/channel/dingtalk/dingtalk_channel.py +++ b/channel/dingtalk/dingtalk_channel.py @@ -90,13 +90,9 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): dingtalk_client_secret = conf().get('dingtalk_client_secret') def setup_logger(self): - logger = logging.getLogger() - handler = logging.StreamHandler() - handler.setFormatter( - logging.Formatter('%(asctime)s %(name)-8s %(levelname)-8s %(message)s [%(filename)s:%(lineno)d]')) - logger.addHandler(handler) - logger.setLevel(logging.INFO) - return logger + # Suppress verbose logs from dingtalk_stream SDK + logging.getLogger("dingtalk_stream").setLevel(logging.WARNING) + return logging.getLogger("DingTalk") def __init__(self): super().__init__() @@ -104,6 +100,7 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): self.logger = self.setup_logger() # 历史消息id暂存,用于幂等控制 self.receivedMsgs = ExpiredDict(conf().get("expires_in_seconds", 3600)) + self._stream_client = None logger.debug("[DingTalk] client_id={}, client_secret={} ".format( self.dingtalk_client_id, self.dingtalk_client_secret)) # 无需群校验和前缀 @@ -119,9 +116,19 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): def startup(self): credential = dingtalk_stream.Credential(self.dingtalk_client_id, self.dingtalk_client_secret) client = dingtalk_stream.DingTalkStreamClient(credential) + self._stream_client = client client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self) logger.info("[DingTalk] ✅ Stream connected, ready to receive messages") client.start_forever() + + def stop(self): + if self._stream_client: + try: + self._stream_client.stop() + logger.info("[DingTalk] Stream client stopped") + except Exception as e: + logger.warning(f"[DingTalk] Error stopping stream client: {e}") + self._stream_client = None def get_access_token(self): """ diff --git a/channel/feishu/feishu_channel.py b/channel/feishu/feishu_channel.py index f2d6c4b6..e6a8ff7f 100644 --- a/channel/feishu/feishu_channel.py +++ b/channel/feishu/feishu_channel.py @@ -12,6 +12,7 @@ """ import json +import logging import os import ssl import threading @@ -32,6 +33,9 @@ from common.log import logger from common.singleton import singleton from config import conf +# Suppress verbose logs from Lark SDK +logging.getLogger("Lark").setLevel(logging.WARNING) + URL_VERIFICATION = "url_verification" # 尝试导入飞书SDK,如果未安装则websocket模式不可用 @@ -56,6 +60,7 @@ class FeiShuChanel(ChatChannel): super().__init__() # 历史消息id暂存,用于幂等控制 self.receivedMsgs = ExpiredDict(60 * 60 * 7.1) + self._http_server = None logger.debug("[FeiShu] app_id={}, app_secret={}, verification_token={}, event_mode={}".format( self.feishu_app_id, self.feishu_app_secret, self.feishu_token, self.feishu_event_mode)) # 无需群校验和前缀 @@ -73,6 +78,15 @@ class FeiShuChanel(ChatChannel): else: self._startup_webhook() + def stop(self): + if self._http_server: + try: + self._http_server.stop() + logger.info("[FeiShu] HTTP server stopped") + except Exception as e: + logger.warning(f"[FeiShu] Error stopping HTTP server: {e}") + self._http_server = None + def _startup_webhook(self): """启动HTTP服务器接收事件(webhook模式)""" logger.debug("[FeiShu] Starting in webhook mode...") @@ -81,7 +95,14 @@ class FeiShuChanel(ChatChannel): ) app = web.application(urls, globals(), autoreload=False) port = conf().get("feishu_port", 9891) - web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port)) + func = web.httpserver.StaticMiddleware(app.wsgifunc()) + func = web.httpserver.LogMiddleware(func) + server = web.httpserver.WSGIServer(("0.0.0.0", port), func) + self._http_server = server + try: + server.start() + except (KeyboardInterrupt, SystemExit): + server.stop() def _startup_websocket(self): """启动长连接接收事件(websocket模式)""" @@ -138,7 +159,7 @@ class FeiShuChanel(ChatChannel): self.feishu_app_id, self.feishu_app_secret, event_handler=event_handler, - log_level=lark.LogLevel.DEBUG if conf().get("debug") else lark.LogLevel.INFO + log_level=lark.LogLevel.DEBUG if conf().get("debug") else lark.LogLevel.WARNING ) logger.debug("[FeiShu] Websocket client starting...") diff --git a/channel/web/web_channel.py b/channel/web/web_channel.py index 88c04fd8..43c245e0 100644 --- a/channel/web/web_channel.py +++ b/channel/web/web_channel.py @@ -50,6 +50,7 @@ class WebChannel(ChatChannel): self.msg_id_counter = 0 # 添加消息ID计数器 self.session_queues = {} # 存储session_id到队列的映射 self.request_to_session = {} # 存储request_id到session_id的映射 + self._http_server = None def _generate_msg_id(self): @@ -235,13 +236,24 @@ class WebChannel(ChatChannel): logging.getLogger("web").setLevel(logging.ERROR) logging.getLogger("web.httpserver").setLevel(logging.ERROR) - # 抑制 web.py 默认的服务器启动消息 - old_stdout = sys.stdout - sys.stdout = io.StringIO() + # Build WSGI app with middleware (same as runsimple but without print) + func = web.httpserver.StaticMiddleware(app.wsgifunc()) + func = web.httpserver.LogMiddleware(func) + server = web.httpserver.WSGIServer(("0.0.0.0", port), func) + self._http_server = server try: - web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port)) - finally: - sys.stdout = old_stdout + server.start() + except (KeyboardInterrupt, SystemExit): + server.stop() + + def stop(self): + if self._http_server: + try: + self._http_server.stop() + logger.info("[WebChannel] HTTP server stopped") + except Exception as e: + logger.warning(f"[WebChannel] Error stopping HTTP server: {e}") + self._http_server = None class RootHandler: diff --git a/channel/wechatcom/wechatcomapp_channel.py b/channel/wechatcom/wechatcomapp_channel.py index 262702ae..3321f541 100644 --- a/channel/wechatcom/wechatcomapp_channel.py +++ b/channel/wechatcom/wechatcomapp_channel.py @@ -36,6 +36,7 @@ class WechatComAppChannel(ChatChannel): self.agent_id = conf().get("wechatcomapp_agent_id") self.token = conf().get("wechatcomapp_token") self.aes_key = conf().get("wechatcomapp_aes_key") + self._http_server = None logger.info( "[wechatcom] Initializing WeCom app channel, corp_id: {}, agent_id: {}".format(self.corp_id, self.agent_id) ) @@ -51,13 +52,24 @@ class WechatComAppChannel(ChatChannel): logger.info("[wechatcom] 📡 Listening on http://0.0.0.0:{}/wxcomapp/".format(port)) logger.info("[wechatcom] 🤖 Ready to receive messages") - # Suppress web.py's default server startup message - old_stdout = sys.stdout - sys.stdout = io.StringIO() + # Build WSGI app with middleware (same as runsimple but without print) + func = web.httpserver.StaticMiddleware(app.wsgifunc()) + func = web.httpserver.LogMiddleware(func) + server = web.httpserver.WSGIServer(("0.0.0.0", port), func) + self._http_server = server try: - web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port)) - finally: - sys.stdout = old_stdout + server.start() + except (KeyboardInterrupt, SystemExit): + server.stop() + + def stop(self): + if self._http_server: + try: + self._http_server.stop() + logger.info("[wechatcom] HTTP server stopped") + except Exception as e: + logger.warning(f"[wechatcom] Error stopping HTTP server: {e}") + self._http_server = None def send(self, reply: Reply, context: Context): receiver = context["receiver"] diff --git a/channel/wechatmp/wechatmp_channel.py b/channel/wechatmp/wechatmp_channel.py index e0a07dda..c066f286 100644 --- a/channel/wechatmp/wechatmp_channel.py +++ b/channel/wechatmp/wechatmp_channel.py @@ -41,6 +41,7 @@ class WechatMPChannel(ChatChannel): super().__init__() self.passive_reply = passive_reply self.NOT_SUPPORT_REPLYTYPE = [] + self._http_server = None appid = conf().get("wechatmp_app_id") secret = conf().get("wechatmp_app_secret") token = conf().get("wechatmp_token") @@ -69,7 +70,23 @@ class WechatMPChannel(ChatChannel): urls = ("/wx", "channel.wechatmp.active_reply.Query") app = web.application(urls, globals(), autoreload=False) port = conf().get("wechatmp_port", 8080) - web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port)) + func = web.httpserver.StaticMiddleware(app.wsgifunc()) + func = web.httpserver.LogMiddleware(func) + server = web.httpserver.WSGIServer(("0.0.0.0", port), func) + self._http_server = server + try: + server.start() + except (KeyboardInterrupt, SystemExit): + server.stop() + + def stop(self): + if self._http_server: + try: + self._http_server.stop() + logger.info("[wechatmp] HTTP server stopped") + except Exception as e: + logger.warning(f"[wechatmp] Error stopping HTTP server: {e}") + self._http_server = None def start_loop(self, loop): asyncio.set_event_loop(loop) diff --git a/common/linkai_client.py b/common/linkai_client.py index 16c7049a..7ee5e8aa 100644 --- a/common/linkai_client.py +++ b/common/linkai_client.py @@ -2,9 +2,12 @@ from bridge.context import Context, ContextType from bridge.reply import Reply, ReplyType from common.log import logger from linkai import LinkAIClient, PushMsg -from config import conf, pconf, plugin_config, available_setting, write_plugin_config +from config import conf, pconf, plugin_config, available_setting, write_plugin_config, get_root from plugins import PluginManager +import threading import time +import json +import os chat_client: LinkAIClient @@ -15,6 +18,7 @@ class ChatClient(LinkAIClient): super().__init__(api_key, host) self.channel = channel self.client_type = channel.channel_type + self.channel_mgr = None def on_message(self, push_msg: PushMsg): session_id = push_msg.session_id @@ -34,9 +38,12 @@ class ChatClient(LinkAIClient): return local_config = conf() + need_restart_channel = False + for key in config.keys(): if key in available_setting and config.get(key) is not None: local_config[key] = config.get(key) + # 语音配置 reply_voice_mode = config.get("reply_voice_mode") if reply_voice_mode: @@ -50,6 +57,55 @@ class ChatClient(LinkAIClient): local_config["always_reply_voice"] = False local_config["voice_reply_voice"] = False + # Model configuration + if config.get("model"): + local_config["model"] = config.get("model") + + # Channel configuration + if config.get("channelType"): + if local_config.get("channel_type") != config.get("channelType"): + local_config["channel_type"] = config.get("channelType") + need_restart_channel = True + + # Channel-specific app credentials + current_channel_type = local_config.get("channel_type", "") + + if config.get("app_id") is not None: + if current_channel_type == "feishu": + if local_config.get("feishu_app_id") != config.get("app_id"): + local_config["feishu_app_id"] = config.get("app_id") + need_restart_channel = True + elif current_channel_type == "dingtalk": + if local_config.get("dingtalk_client_id") != config.get("app_id"): + local_config["dingtalk_client_id"] = config.get("app_id") + need_restart_channel = True + elif current_channel_type == "wechatmp" or current_channel_type == "wechatmp_service": + if local_config.get("wechatmp_app_id") != config.get("app_id"): + local_config["wechatmp_app_id"] = config.get("app_id") + need_restart_channel = True + elif current_channel_type == "wechatcom_app": + if local_config.get("wechatcomapp_agent_id") != config.get("app_id"): + local_config["wechatcomapp_agent_id"] = config.get("app_id") + need_restart_channel = True + + if config.get("app_secret"): + if current_channel_type == "feishu": + if local_config.get("feishu_app_secret") != config.get("app_secret"): + local_config["feishu_app_secret"] = config.get("app_secret") + need_restart_channel = True + elif current_channel_type == "dingtalk": + if local_config.get("dingtalk_client_secret") != config.get("app_secret"): + local_config["dingtalk_client_secret"] = config.get("app_secret") + need_restart_channel = True + elif current_channel_type == "wechatmp" or current_channel_type == "wechatmp_service": + if local_config.get("wechatmp_app_secret") != config.get("app_secret"): + local_config["wechatmp_app_secret"] = config.get("app_secret") + need_restart_channel = True + elif current_channel_type == "wechatcom_app": + if local_config.get("wechatcomapp_secret") != config.get("app_secret"): + local_config["wechatcomapp_secret"] = config.get("app_secret") + need_restart_channel = True + if config.get("admin_password"): if not pconf("Godcmd"): write_plugin_config({"Godcmd": {"password": config.get("admin_password"), "admin_users": []} }) @@ -71,11 +127,67 @@ class ChatClient(LinkAIClient): elif config.get("text_to_image") and config.get("text_to_image") in ["dall-e-2", "dall-e-3"]: if pconf("linkai")["midjourney"]: pconf("linkai")["midjourney"]["use_image_create_prefix"] = False + + # Save configuration to config.json file + self._save_config_to_file(local_config) + + if need_restart_channel: + self._restart_channel(local_config.get("channel_type", "")) + + def _restart_channel(self, new_channel_type: str): + """ + Restart the channel via ChannelManager when channel type changes. + """ + if self.channel_mgr: + logger.info(f"[LinkAI] Restarting channel to '{new_channel_type}'...") + threading.Thread(target=self._do_restart_channel, args=(self.channel_mgr, new_channel_type), daemon=True).start() + else: + logger.warning("[LinkAI] ChannelManager not available, please restart the application manually") + + def _do_restart_channel(self, mgr, new_channel_type: str): + """ + Perform the channel restart in a separate thread to avoid blocking the config callback. + """ + try: + mgr.restart(new_channel_type) + # Update the linkai client's channel reference + if mgr.channel: + self.channel = mgr.channel + self.client_type = mgr.channel.channel_type + logger.info(f"[LinkAI] Channel reference updated to '{new_channel_type}'") + except Exception as e: + logger.error(f"[LinkAI] Channel restart failed: {e}") + + def _save_config_to_file(self, local_config: dict): + """ + Save configuration to config.json file + """ + try: + config_path = os.path.join(get_root(), "config.json") + if not os.path.exists(config_path): + logger.warning(f"[LinkAI] config.json not found at {config_path}, skip saving") + return + + # Read current config file + with open(config_path, "r", encoding="utf-8") as f: + file_config = json.load(f) + + # Update file config with memory config + file_config.update(dict(local_config)) + + # Write back to file + with open(config_path, "w", encoding="utf-8") as f: + json.dump(file_config, f, indent=4, ensure_ascii=False) + + logger.info("[LinkAI] Configuration saved to config.json successfully") + except Exception as e: + logger.error(f"[LinkAI] Failed to save configuration to config.json: {e}") -def start(channel): +def start(channel, channel_mgr=None): global chat_client - chat_client = ChatClient(api_key=conf().get("linkai_api_key"), host="", channel=channel) + chat_client = ChatClient(api_key=conf().get("linkai_api_key"), channel=channel) + chat_client.channel_mgr = channel_mgr chat_client.config = _build_config() chat_client.start() time.sleep(1.5) @@ -97,14 +209,38 @@ def _build_config(): "nick_name_black_list": local_conf.get("nick_name_black_list"), "speech_recognition": "Y" if local_conf.get("speech_recognition") else "N", "text_to_image": local_conf.get("text_to_image"), - "image_create_prefix": local_conf.get("image_create_prefix") + "image_create_prefix": local_conf.get("image_create_prefix"), + "model": local_conf.get("model"), + "agent_max_context_turns": local_conf.get("agent_max_context_turns"), + "agent_max_context_tokens": local_conf.get("agent_max_context_tokens"), + "agent_max_steps": local_conf.get("agent_max_steps"), + "channelType": local_conf.get("channel_type") } + if local_conf.get("always_reply_voice"): config["reply_voice_mode"] = "always_reply_voice" elif local_conf.get("voice_reply_voice"): config["reply_voice_mode"] = "voice_reply_voice" + if pconf("linkai"): config["group_app_map"] = pconf("linkai").get("group_app_map") + if plugin_config.get("Godcmd"): config["admin_password"] = plugin_config.get("Godcmd").get("password") + + # Add channel-specific app credentials + current_channel_type = local_conf.get("channel_type", "") + if current_channel_type == "feishu": + config["app_id"] = local_conf.get("feishu_app_id") + config["app_secret"] = local_conf.get("feishu_app_secret") + elif current_channel_type == "dingtalk": + config["app_id"] = local_conf.get("dingtalk_client_id") + config["app_secret"] = local_conf.get("dingtalk_client_secret") + elif current_channel_type == "wechatmp" or current_channel_type == "wechatmp_service": + config["app_id"] = local_conf.get("wechatmp_app_id") + config["app_secret"] = local_conf.get("wechatmp_app_secret") + elif current_channel_type == "wechatcom_app": + config["app_id"] = local_conf.get("wechatcomapp_agent_id") + config["app_secret"] = local_conf.get("wechatcomapp_secret") + return config From e1dc037eb9e8c7b5412f6cd569c032b4565b03bb Mon Sep 17 00:00:00 2001 From: zhayujie Date: Fri, 20 Feb 2026 23:23:04 +0800 Subject: [PATCH 2/4] feat: cloud skills manage --- agent/protocol/agent.py | 4 +- agent/skills/__init__.py | 2 + agent/skills/loader.py | 77 +++---- agent/skills/manager.py | 165 ++++++++++----- agent/skills/service.py | 204 +++++++++++++++++++ agent/skills/types.py | 2 +- app.py | 6 +- bridge/agent_bridge.py | 2 +- bridge/agent_initializer.py | 2 +- channel/wechat/wechat_channel.py | 8 +- common/{linkai_client.py => cloud_client.py} | 135 ++++++++---- 11 files changed, 463 insertions(+), 144 deletions(-) create mode 100644 agent/skills/service.py rename common/{linkai_client.py => cloud_client.py} (70%) diff --git a/agent/protocol/agent.py b/agent/protocol/agent.py index 8b1c2420..3f0167d3 100644 --- a/agent/protocol/agent.py +++ b/agent/protocol/agent.py @@ -1,4 +1,5 @@ import json +import os import time import threading @@ -61,7 +62,8 @@ class Agent: # Auto-create skill manager try: from agent.skills import SkillManager - self.skill_manager = SkillManager(workspace_dir=workspace_dir) + custom_dir = os.path.join(workspace_dir, "skills") if workspace_dir else None + self.skill_manager = SkillManager(custom_dir=custom_dir) logger.debug(f"Initialized SkillManager with {len(self.skill_manager.skills)} skills") except Exception as e: logger.warning(f"Failed to initialize SkillManager: {e}") diff --git a/agent/skills/__init__.py b/agent/skills/__init__.py index 9b2e26c2..c6345791 100644 --- a/agent/skills/__init__.py +++ b/agent/skills/__init__.py @@ -15,6 +15,7 @@ from agent.skills.types import ( ) from agent.skills.loader import SkillLoader from agent.skills.manager import SkillManager +from agent.skills.service import SkillService from agent.skills.formatter import format_skills_for_prompt __all__ = [ @@ -25,5 +26,6 @@ __all__ = [ "LoadSkillsResult", "SkillLoader", "SkillManager", + "SkillService", "format_skills_for_prompt", ] diff --git a/agent/skills/loader.py b/agent/skills/loader.py index a7fa5f9e..210ab733 100644 --- a/agent/skills/loader.py +++ b/agent/skills/loader.py @@ -12,25 +12,20 @@ from agent.skills.frontmatter import parse_frontmatter, parse_metadata, parse_bo class SkillLoader: """Loads skills from various directories.""" - - def __init__(self, workspace_dir: Optional[str] = None): - """ - Initialize the skill loader. - - :param workspace_dir: Agent workspace directory (for workspace-specific skills) - """ - self.workspace_dir = workspace_dir + + def __init__(self): + pass def load_skills_from_dir(self, dir_path: str, source: str) -> LoadSkillsResult: """ Load skills from a directory. - + Discovery rules: - Direct .md files in the root directory - Recursive SKILL.md files under subdirectories - + :param dir_path: Directory path to scan - :param source: Source identifier (e.g., 'managed', 'workspace', 'bundled') + :param source: Source identifier ('builtin' or 'custom') :return: LoadSkillsResult with skills and diagnostics """ skills = [] @@ -216,61 +211,49 @@ class SkillLoader: def load_all_skills( self, - managed_dir: Optional[str] = None, - workspace_skills_dir: Optional[str] = None, - extra_dirs: Optional[List[str]] = None, + builtin_dir: Optional[str] = None, + custom_dir: Optional[str] = None, ) -> Dict[str, SkillEntry]: """ - Load skills from all configured locations with precedence. - + Load skills from builtin and custom directories. + Precedence (lowest to highest): - 1. Extra directories - 2. Managed skills directory - 3. Workspace skills directory - - :param managed_dir: Managed skills directory (e.g., ~/.cow/skills) - :param workspace_skills_dir: Workspace skills directory (e.g., workspace/skills) - :param extra_dirs: Additional directories to load skills from + 1. builtin — project root ``skills/``, shipped with the codebase + 2. custom — workspace ``skills/``, installed via cloud console or skill creator + + Same-name custom skills override builtin ones. + + :param builtin_dir: Built-in skills directory + :param custom_dir: Custom skills directory :return: Dictionary mapping skill name to SkillEntry """ skill_map: Dict[str, SkillEntry] = {} all_diagnostics = [] - - # Load from extra directories (lowest precedence) - if extra_dirs: - for extra_dir in extra_dirs: - if not os.path.exists(extra_dir): - continue - result = self.load_skills_from_dir(extra_dir, source='extra') - all_diagnostics.extend(result.diagnostics) - for skill in result.skills: - entry = self._create_skill_entry(skill) - skill_map[skill.name] = entry - - # Load from managed directory - if managed_dir and os.path.exists(managed_dir): - result = self.load_skills_from_dir(managed_dir, source='managed') + + # Load builtin skills (lower precedence) + if builtin_dir and os.path.exists(builtin_dir): + result = self.load_skills_from_dir(builtin_dir, source='builtin') all_diagnostics.extend(result.diagnostics) for skill in result.skills: entry = self._create_skill_entry(skill) skill_map[skill.name] = entry - - # Load from workspace directory (highest precedence) - if workspace_skills_dir and os.path.exists(workspace_skills_dir): - result = self.load_skills_from_dir(workspace_skills_dir, source='workspace') + + # Load custom skills (higher precedence, overrides builtin) + if custom_dir and os.path.exists(custom_dir): + result = self.load_skills_from_dir(custom_dir, source='custom') all_diagnostics.extend(result.diagnostics) for skill in result.skills: entry = self._create_skill_entry(skill) skill_map[skill.name] = entry - + # Log diagnostics if all_diagnostics: logger.debug(f"Skill loading diagnostics: {len(all_diagnostics)} issues") - for diag in all_diagnostics[:5]: # Log first 5 + for diag in all_diagnostics[:5]: logger.debug(f" - {diag}") - - logger.debug(f"Loaded {len(skill_map)} skills from all sources") - + + logger.debug(f"Loaded {len(skill_map)} skills total") + return skill_map def _create_skill_entry(self, skill: Skill) -> SkillEntry: diff --git a/agent/skills/manager.py b/agent/skills/manager.py index adb4b010..5cf57357 100644 --- a/agent/skills/manager.py +++ b/agent/skills/manager.py @@ -3,6 +3,7 @@ Skill manager for managing skill lifecycle and operations. """ import os +import json from typing import Dict, List, Optional from pathlib import Path from common.log import logger @@ -10,56 +11,131 @@ from agent.skills.types import Skill, SkillEntry, SkillSnapshot from agent.skills.loader import SkillLoader from agent.skills.formatter import format_skill_entries_for_prompt +SKILLS_CONFIG_FILE = "skills_config.json" + class SkillManager: """Manages skills for an agent.""" - + def __init__( self, - workspace_dir: Optional[str] = None, - managed_skills_dir: Optional[str] = None, - extra_dirs: Optional[List[str]] = None, + builtin_dir: Optional[str] = None, + custom_dir: Optional[str] = None, config: Optional[Dict] = None, ): """ Initialize the skill manager. - - :param workspace_dir: Agent workspace directory - :param managed_skills_dir: Managed skills directory (e.g., ~/.cow/skills) - :param extra_dirs: Additional skill directories + + :param builtin_dir: Built-in skills directory (project root ``skills/``) + :param custom_dir: Custom skills directory (workspace ``skills/``) :param config: Configuration dictionary """ - self.workspace_dir = workspace_dir - self.managed_skills_dir = managed_skills_dir or self._get_default_managed_dir() - self.extra_dirs = extra_dirs or [] + project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + self.builtin_dir = builtin_dir or os.path.join(project_root, 'skills') + self.custom_dir = custom_dir or os.path.join(project_root, 'workspace', 'skills') self.config = config or {} - - self.loader = SkillLoader(workspace_dir=workspace_dir) + self._skills_config_path = os.path.join(self.custom_dir, SKILLS_CONFIG_FILE) + + # skills_config: full skill metadata keyed by name + # { "web-fetch": {"name": ..., "description": ..., "source": ..., "enabled": true}, ... } + self.skills_config: Dict[str, dict] = {} + + self.loader = SkillLoader() self.skills: Dict[str, SkillEntry] = {} - + # Load skills on initialization self.refresh_skills() - - def _get_default_managed_dir(self) -> str: - """Get the default managed skills directory.""" - # Use project root skills directory as default - import os - project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - return os.path.join(project_root, 'skills') - + def refresh_skills(self): - """Reload all skills from configured directories.""" - workspace_skills_dir = None - if self.workspace_dir: - workspace_skills_dir = os.path.join(self.workspace_dir, 'skills') - + """Reload all skills from builtin and custom directories, then sync config.""" self.skills = self.loader.load_all_skills( - managed_dir=self.managed_skills_dir, - workspace_skills_dir=workspace_skills_dir, - extra_dirs=self.extra_dirs, + builtin_dir=self.builtin_dir, + custom_dir=self.custom_dir, ) - + self._sync_skills_config() logger.debug(f"SkillManager: Loaded {len(self.skills)} skills") + + # ------------------------------------------------------------------ + # skills_config.json management + # ------------------------------------------------------------------ + def _load_skills_config(self) -> Dict[str, dict]: + """Load skills_config.json from custom_dir. Returns empty dict if not found.""" + if not os.path.exists(self._skills_config_path): + return {} + try: + with open(self._skills_config_path, "r", encoding="utf-8") as f: + data = json.load(f) + if isinstance(data, dict): + return data + except Exception as e: + logger.warning(f"[SkillManager] Failed to load {SKILLS_CONFIG_FILE}: {e}") + return {} + + def _save_skills_config(self): + """Persist skills_config to custom_dir/skills_config.json.""" + os.makedirs(self.custom_dir, exist_ok=True) + try: + with open(self._skills_config_path, "w", encoding="utf-8") as f: + json.dump(self.skills_config, f, indent=4, ensure_ascii=False) + except Exception as e: + logger.error(f"[SkillManager] Failed to save {SKILLS_CONFIG_FILE}: {e}") + + def _sync_skills_config(self): + """ + Merge directory-scanned skills with the persisted config file. + + - New skills discovered on disk are added with enabled=True. + - Skills that no longer exist on disk are removed. + - Existing entries preserve their enabled state; name/description/source + are refreshed from the latest scan. + """ + saved = self._load_skills_config() + merged: Dict[str, dict] = {} + + for name, entry in self.skills.items(): + skill = entry.skill + prev = saved.get(name, {}) + merged[name] = { + "name": name, + "description": skill.description, + "source": skill.source, + "enabled": prev.get("enabled", True), + } + + self.skills_config = merged + self._save_skills_config() + + def is_skill_enabled(self, name: str) -> bool: + """ + Check if a skill is enabled according to skills_config. + + :param name: skill name + :return: True if enabled (default True if not in config) + """ + entry = self.skills_config.get(name) + if entry is None: + return True + return entry.get("enabled", True) + + def set_skill_enabled(self, name: str, enabled: bool): + """ + Set a skill's enabled state and persist. + + :param name: skill name + :param enabled: True to enable, False to disable + """ + if name not in self.skills_config: + raise ValueError(f"skill '{name}' not found in config") + self.skills_config[name]["enabled"] = enabled + self._save_skills_config() + + def get_skills_config(self) -> Dict[str, dict]: + """ + Return the full skills_config dict (for query API). + + :return: copy of skills_config + """ + return dict(self.skills_config) def get_skill(self, name: str) -> Optional[SkillEntry]: """ @@ -85,25 +161,24 @@ class SkillManager: ) -> List[SkillEntry]: """ Filter skills based on criteria. - + Simple rule: Skills are auto-enabled if requirements are met. - - Has required API keys → included - - Missing API keys → excluded - + - Has required API keys -> included + - Missing API keys -> excluded + :param skill_filter: List of skill names to include (None = all) - :param include_disabled: Whether to include skills with disable_model_invocation=True + :param include_disabled: Whether to include disabled skills :return: Filtered list of skill entries """ from agent.skills.config import should_include_skill - + entries = list(self.skills.values()) - + # Check requirements (platform, binaries, env vars) entries = [e for e in entries if should_include_skill(e, self.config)] - + # Apply skill filter if skill_filter is not None: - # Flatten and normalize skill names (handle both strings and nested lists) normalized = [] for item in skill_filter: if isinstance(item, str): @@ -111,20 +186,18 @@ class SkillManager: if name: normalized.append(name) elif isinstance(item, list): - # Handle nested lists for subitem in item: if isinstance(subitem, str): name = subitem.strip() if name: normalized.append(name) - if normalized: entries = [e for e in entries if e.skill.name in normalized] - - # Filter out disabled skills unless explicitly requested + + # Filter out disabled skills based on skills_config.json if not include_disabled: - entries = [e for e in entries if not e.skill.disable_model_invocation] - + entries = [e for e in entries if self.is_skill_enabled(e.skill.name)] + return entries def build_skills_prompt( diff --git a/agent/skills/service.py b/agent/skills/service.py new file mode 100644 index 00000000..8aa6ca07 --- /dev/null +++ b/agent/skills/service.py @@ -0,0 +1,204 @@ +""" +Skill service for handling skill CRUD operations. + +This service provides a unified interface for managing skills, which can be +called from the cloud control client (LinkAI), the local web console, or any +other management entry point. +""" + +import os +import shutil +from typing import Dict, List, Optional +from common.log import logger +from agent.skills.types import Skill, SkillEntry +from agent.skills.manager import SkillManager + +try: + import requests +except ImportError: + requests = None + + +class SkillService: + """ + High-level service for skill lifecycle management. + Wraps SkillManager and provides network-aware operations such as + downloading skill files from remote URLs. + """ + + def __init__(self, skill_manager: SkillManager): + """ + :param skill_manager: The SkillManager instance to operate on + """ + self.manager = skill_manager + + # ------------------------------------------------------------------ + # query + # ------------------------------------------------------------------ + def query(self) -> List[dict]: + """ + Query all skills and return a serialisable list. + Reads from skills_config.json (refreshes from disk if needed). + + :return: list of skill info dicts + """ + self.manager.refresh_skills() + config = self.manager.get_skills_config() + result = list(config.values()) + logger.info(f"[SkillService] query: {len(result)} skills found") + return result + + # ------------------------------------------------------------------ + # add / install + # ------------------------------------------------------------------ + def add(self, payload: dict) -> None: + """ + Add (install) a skill from a remote payload. + + The payload follows the socket protocol:: + + { + "name": "web_search", + "type": "url", + "enabled": true, + "files": [ + {"url": "https://...", "path": "README.md"}, + {"url": "https://...", "path": "scripts/main.py"} + ] + } + + Files are downloaded and saved under the custom skills directory + using *name* as the sub-directory. + + :param payload: skill add payload from server + """ + name = payload.get("name") + if not name: + raise ValueError("skill name is required") + + files = payload.get("files", []) + if not files: + raise ValueError("skill files list is empty") + + skill_dir = os.path.join(self.manager.custom_dir, name) + os.makedirs(skill_dir, exist_ok=True) + + for file_info in files: + url = file_info.get("url") + rel_path = file_info.get("path") + if not url or not rel_path: + logger.warning(f"[SkillService] add: skip invalid file entry {file_info}") + continue + dest = os.path.join(skill_dir, rel_path) + self._download_file(url, dest) + + # Reload to pick up the new skill and sync config + self.manager.refresh_skills() + logger.info(f"[SkillService] add: skill '{name}' installed ({len(files)} files)") + + # ------------------------------------------------------------------ + # open / close (enable / disable) + # ------------------------------------------------------------------ + def open(self, payload: dict) -> None: + """ + Enable a skill by name. + + :param payload: {"name": "skill_name"} + """ + name = payload.get("name") + if not name: + raise ValueError("skill name is required") + self.manager.set_skill_enabled(name, enabled=True) + logger.info(f"[SkillService] open: skill '{name}' enabled") + + def close(self, payload: dict) -> None: + """ + Disable a skill by name. + + :param payload: {"name": "skill_name"} + """ + name = payload.get("name") + if not name: + raise ValueError("skill name is required") + self.manager.set_skill_enabled(name, enabled=False) + logger.info(f"[SkillService] close: skill '{name}' disabled") + + # ------------------------------------------------------------------ + # delete + # ------------------------------------------------------------------ + def delete(self, payload: dict) -> None: + """ + Delete a skill by removing its directory entirely. + + :param payload: {"name": "skill_name"} + """ + name = payload.get("name") + if not name: + raise ValueError("skill name is required") + + skill_dir = os.path.join(self.manager.custom_dir, name) + if os.path.exists(skill_dir): + shutil.rmtree(skill_dir) + logger.info(f"[SkillService] delete: removed directory {skill_dir}") + else: + logger.warning(f"[SkillService] delete: skill directory not found: {skill_dir}") + + # Refresh will remove the deleted skill from config automatically + self.manager.refresh_skills() + logger.info(f"[SkillService] delete: skill '{name}' deleted") + + # ------------------------------------------------------------------ + # dispatch - single entry point for protocol messages + # ------------------------------------------------------------------ + def dispatch(self, action: str, payload: Optional[dict] = None) -> dict: + """ + Dispatch a skill management action and return a protocol-compatible + response dict. + + :param action: one of query / add / open / close / delete + :param payload: action-specific payload (may be None for query) + :return: dict with action, code, message, payload + """ + payload = payload or {} + try: + if action == "query": + result_payload = self.query() + return {"action": action, "code": 200, "message": "success", "payload": result_payload} + elif action == "add": + self.add(payload) + elif action == "open": + self.open(payload) + elif action == "close": + self.close(payload) + elif action == "delete": + self.delete(payload) + else: + return {"action": action, "code": 400, "message": f"unknown action: {action}", "payload": None} + return {"action": action, "code": 200, "message": "success", "payload": None} + except Exception as e: + logger.error(f"[SkillService] dispatch error: action={action}, error={e}") + return {"action": action, "code": 500, "message": str(e), "payload": None} + + # ------------------------------------------------------------------ + # internal helpers + # ------------------------------------------------------------------ + @staticmethod + def _download_file(url: str, dest: str): + """ + Download a file from *url* and save to *dest*. + + :param url: remote file URL + :param dest: local destination path + """ + if requests is None: + raise RuntimeError("requests library is required for downloading skill files") + + dest_dir = os.path.dirname(dest) + if dest_dir: + os.makedirs(dest_dir, exist_ok=True) + + resp = requests.get(url, timeout=60) + resp.raise_for_status() + with open(dest, "wb") as f: + f.write(resp.content) + logger.debug(f"[SkillService] downloaded {url} -> {dest}") diff --git a/agent/skills/types.py b/agent/skills/types.py index e44189cb..1b27479b 100644 --- a/agent/skills/types.py +++ b/agent/skills/types.py @@ -45,7 +45,7 @@ class Skill: description: str file_path: str base_dir: str - source: str # managed, workspace, bundled, etc. + source: str # builtin or custom content: str # Full markdown content disable_model_invocation: bool = False frontmatter: Dict[str, Any] = field(default_factory=dict) diff --git a/app.py b/app.py index 97b58a1b..cc4dd334 100644 --- a/app.py +++ b/app.py @@ -54,8 +54,8 @@ class ChannelManager: if conf().get("use_linkai"): try: - from common import linkai_client - threading.Thread(target=linkai_client.start, args=(channel, self), daemon=True).start() + from common import cloud_client + threading.Thread(target=cloud_client.start, args=(channel, self), daemon=True).start() except Exception as e: pass @@ -64,7 +64,7 @@ class ChannelManager: target=self._run_channel, args=(channel,), daemon=True ) self._channel_thread.start() - logger.info(f"[ChannelManager] Channel '{channel_name}' started in sub-thread") + logger.debug(f"[ChannelManager] Channel '{channel_name}' started in sub-thread") def _run_channel(self, channel): try: diff --git a/bridge/agent_bridge.py b/bridge/agent_bridge.py index 001c4c05..6c465689 100644 --- a/bridge/agent_bridge.py +++ b/bridge/agent_bridge.py @@ -28,7 +28,7 @@ def add_openai_compatible_support(bot_instance): """ if hasattr(bot_instance, 'call_with_tools'): # Bot already has tool calling support (e.g., ZHIPUAIBot) - logger.info(f"[AgentBridge] {type(bot_instance).__name__} already has native tool calling support") + logger.debug(f"[AgentBridge] {type(bot_instance).__name__} already has native tool calling support") return bot_instance # Create a temporary mixin class that combines the bot with OpenAI compatibility diff --git a/bridge/agent_initializer.py b/bridge/agent_initializer.py index 00ef2730..b9aae389 100644 --- a/bridge/agent_initializer.py +++ b/bridge/agent_initializer.py @@ -291,7 +291,7 @@ class AgentInitializer: """Initialize skill manager""" try: from agent.skills import SkillManager - skill_manager = SkillManager(workspace_dir=workspace_root) + skill_manager = SkillManager(custom_dir=os.path.join(workspace_root, "skills")) return skill_manager except Exception as e: logger.warning(f"[AgentInitializer] Failed to initialize SkillManager: {e}") diff --git a/channel/wechat/wechat_channel.py b/channel/wechat/wechat_channel.py index 394a9f6c..19db71d5 100644 --- a/channel/wechat/wechat_channel.py +++ b/channel/wechat/wechat_channel.py @@ -151,7 +151,7 @@ class WechatChannel(ChatChannel): def exitCallback(self): try: - from common.linkai_client import chat_client + from common.cloud_client import chat_client if chat_client.client_id and conf().get("use_linkai"): _send_logout() time.sleep(2) @@ -283,7 +283,7 @@ class WechatChannel(ChatChannel): def _send_login_success(): try: - from common.linkai_client import chat_client + from common.cloud_client import chat_client if chat_client.client_id: chat_client.send_login_success() except Exception as e: @@ -292,7 +292,7 @@ def _send_login_success(): def _send_logout(): try: - from common.linkai_client import chat_client + from common.cloud_client import chat_client if chat_client.client_id: chat_client.send_logout() except Exception as e: @@ -301,7 +301,7 @@ def _send_logout(): def _send_qr_code(qrcode_list: list): try: - from common.linkai_client import chat_client + from common.cloud_client import chat_client if chat_client.client_id: chat_client.send_qrcode(qrcode_list) except Exception as e: diff --git a/common/linkai_client.py b/common/cloud_client.py similarity index 70% rename from common/linkai_client.py rename to common/cloud_client.py index 7ee5e8aa..6840fe4a 100644 --- a/common/linkai_client.py +++ b/common/cloud_client.py @@ -1,3 +1,10 @@ +""" +Cloud management client for connecting to the LinkAI control console. + +Handles remote configuration sync, message push, and skill management +via the LinkAI socket protocol. +""" + from bridge.context import Context, ContextType from bridge.reply import Reply, ReplyType from common.log import logger @@ -13,13 +20,34 @@ import os chat_client: LinkAIClient -class ChatClient(LinkAIClient): - def __init__(self, api_key, host, channel): +class CloudClient(LinkAIClient): + def __init__(self, api_key: str, channel, host: str = ""): super().__init__(api_key, host) self.channel = channel self.client_type = channel.channel_type self.channel_mgr = None + self._skill_service = None + @property + def skill_service(self): + """Lazy-init SkillService so it is available once SkillManager exists.""" + if self._skill_service is None: + try: + from agent.skills.manager import SkillManager + from agent.skills.service import SkillService + from config import conf + from common.utils import expand_path + workspace_root = expand_path(conf().get("agent_workspace", "~/cow")) + manager = SkillManager(custom_dir=os.path.join(workspace_root, "skills")) + self._skill_service = SkillService(manager) + logger.debug("[CloudClient] SkillService initialised") + except Exception as e: + logger.error(f"[CloudClient] Failed to init SkillService: {e}") + return self._skill_service + + # ------------------------------------------------------------------ + # message push callback + # ------------------------------------------------------------------ def on_message(self, push_msg: PushMsg): session_id = push_msg.session_id msg_content = push_msg.msg_content @@ -30,21 +58,24 @@ class ChatClient(LinkAIClient): context["isgroup"] = push_msg.is_group self.channel.send(Reply(ReplyType.TEXT, content=msg_content), context) + # ------------------------------------------------------------------ + # config callback + # ------------------------------------------------------------------ def on_config(self, config: dict): if not self.client_id: return - logger.info(f"[LinkAI] 从客户端管理加载远程配置: {config}") + logger.info(f"[CloudClient] Loading remote config: {config}") if config.get("enabled") != "Y": return local_config = conf() need_restart_channel = False - + for key in config.keys(): if key in available_setting and config.get(key) is not None: local_config[key] = config.get(key) - - # 语音配置 + + # Voice settings reply_voice_mode = config.get("reply_voice_mode") if reply_voice_mode: if reply_voice_mode == "voice_reply_voice": @@ -60,16 +91,16 @@ class ChatClient(LinkAIClient): # Model configuration if config.get("model"): local_config["model"] = config.get("model") - + # Channel configuration if config.get("channelType"): if local_config.get("channel_type") != config.get("channelType"): local_config["channel_type"] = config.get("channelType") need_restart_channel = True - + # Channel-specific app credentials current_channel_type = local_config.get("channel_type", "") - + if config.get("app_id") is not None: if current_channel_type == "feishu": if local_config.get("feishu_app_id") != config.get("app_id"): @@ -79,7 +110,7 @@ class ChatClient(LinkAIClient): if local_config.get("dingtalk_client_id") != config.get("app_id"): local_config["dingtalk_client_id"] = config.get("app_id") need_restart_channel = True - elif current_channel_type == "wechatmp" or current_channel_type == "wechatmp_service": + elif current_channel_type in ("wechatmp", "wechatmp_service"): if local_config.get("wechatmp_app_id") != config.get("app_id"): local_config["wechatmp_app_id"] = config.get("app_id") need_restart_channel = True @@ -87,7 +118,7 @@ class ChatClient(LinkAIClient): if local_config.get("wechatcomapp_agent_id") != config.get("app_id"): local_config["wechatcomapp_agent_id"] = config.get("app_id") need_restart_channel = True - + if config.get("app_secret"): if current_channel_type == "feishu": if local_config.get("feishu_app_secret") != config.get("app_secret"): @@ -97,7 +128,7 @@ class ChatClient(LinkAIClient): if local_config.get("dingtalk_client_secret") != config.get("app_secret"): local_config["dingtalk_client_secret"] = config.get("app_secret") need_restart_channel = True - elif current_channel_type == "wechatmp" or current_channel_type == "wechatmp_service": + elif current_channel_type in ("wechatmp", "wechatmp_service"): if local_config.get("wechatmp_app_secret") != config.get("app_secret"): local_config["wechatmp_app_secret"] = config.get("app_secret") need_restart_channel = True @@ -108,7 +139,7 @@ class ChatClient(LinkAIClient): if config.get("admin_password"): if not pconf("Godcmd"): - write_plugin_config({"Godcmd": {"password": config.get("admin_password"), "admin_users": []} }) + write_plugin_config({"Godcmd": {"password": config.get("admin_password"), "admin_users": []}}) else: pconf("Godcmd")["password"] = config.get("admin_password") PluginManager().instances["GODCMD"].reload() @@ -127,22 +158,46 @@ class ChatClient(LinkAIClient): elif config.get("text_to_image") and config.get("text_to_image") in ["dall-e-2", "dall-e-3"]: if pconf("linkai")["midjourney"]: pconf("linkai")["midjourney"]["use_image_create_prefix"] = False - + # Save configuration to config.json file self._save_config_to_file(local_config) if need_restart_channel: self._restart_channel(local_config.get("channel_type", "")) - + + # ------------------------------------------------------------------ + # skill callback + # ------------------------------------------------------------------ + def on_skill(self, data: dict) -> dict: + """ + Handle SKILL messages from the cloud console. + Delegates to SkillService.dispatch for the actual operations. + + :param data: message data with 'action', 'clientId', 'payload' + :return: response dict + """ + action = data.get("action", "") + payload = data.get("payload") + logger.info(f"[CloudClient] on_skill: action={action}") + + svc = self.skill_service + if svc is None: + return {"action": action, "code": 500, "message": "SkillService not available", "payload": None} + + return svc.dispatch(action, payload) + + # ------------------------------------------------------------------ + # channel restart helpers + # ------------------------------------------------------------------ def _restart_channel(self, new_channel_type: str): """ Restart the channel via ChannelManager when channel type changes. """ if self.channel_mgr: - logger.info(f"[LinkAI] Restarting channel to '{new_channel_type}'...") + logger.info(f"[CloudClient] Restarting channel to '{new_channel_type}'...") threading.Thread(target=self._do_restart_channel, args=(self.channel_mgr, new_channel_type), daemon=True).start() else: - logger.warning("[LinkAI] ChannelManager not available, please restart the application manually") + logger.warning("[CloudClient] ChannelManager not available, please restart the application manually") def _do_restart_channel(self, mgr, new_channel_type: str): """ @@ -150,49 +205,49 @@ class ChatClient(LinkAIClient): """ try: mgr.restart(new_channel_type) - # Update the linkai client's channel reference + # Update the client's channel reference if mgr.channel: self.channel = mgr.channel self.client_type = mgr.channel.channel_type - logger.info(f"[LinkAI] Channel reference updated to '{new_channel_type}'") + logger.info(f"[CloudClient] Channel reference updated to '{new_channel_type}'") except Exception as e: - logger.error(f"[LinkAI] Channel restart failed: {e}") + logger.error(f"[CloudClient] Channel restart failed: {e}") + # ------------------------------------------------------------------ + # config persistence + # ------------------------------------------------------------------ def _save_config_to_file(self, local_config: dict): """ - Save configuration to config.json file + Save configuration to config.json file. """ try: config_path = os.path.join(get_root(), "config.json") if not os.path.exists(config_path): - logger.warning(f"[LinkAI] config.json not found at {config_path}, skip saving") + logger.warning(f"[CloudClient] config.json not found at {config_path}, skip saving") return - - # Read current config file + with open(config_path, "r", encoding="utf-8") as f: file_config = json.load(f) - - # Update file config with memory config + file_config.update(dict(local_config)) - - # Write back to file + with open(config_path, "w", encoding="utf-8") as f: json.dump(file_config, f, indent=4, ensure_ascii=False) - - logger.info("[LinkAI] Configuration saved to config.json successfully") + + logger.info("[CloudClient] Configuration saved to config.json successfully") except Exception as e: - logger.error(f"[LinkAI] Failed to save configuration to config.json: {e}") + logger.error(f"[CloudClient] Failed to save configuration to config.json: {e}") def start(channel, channel_mgr=None): global chat_client - chat_client = ChatClient(api_key=conf().get("linkai_api_key"), channel=channel) + chat_client = CloudClient(api_key=conf().get("linkai_api_key"), host=conf().get("cloud_host", ""), channel=channel) chat_client.channel_mgr = channel_mgr chat_client.config = _build_config() chat_client.start() time.sleep(1.5) if chat_client.client_id: - logger.info("[LinkAI] 可前往控制台进行线上登录和配置:https://link-ai.tech/console/clients") + logger.info("[CloudClient] Console: https://link-ai.tech/console/clients") def _build_config(): @@ -214,20 +269,20 @@ def _build_config(): "agent_max_context_turns": local_conf.get("agent_max_context_turns"), "agent_max_context_tokens": local_conf.get("agent_max_context_tokens"), "agent_max_steps": local_conf.get("agent_max_steps"), - "channelType": local_conf.get("channel_type") + "channelType": local_conf.get("channel_type"), } - + if local_conf.get("always_reply_voice"): config["reply_voice_mode"] = "always_reply_voice" elif local_conf.get("voice_reply_voice"): config["reply_voice_mode"] = "voice_reply_voice" - + if pconf("linkai"): config["group_app_map"] = pconf("linkai").get("group_app_map") - + if plugin_config.get("Godcmd"): config["admin_password"] = plugin_config.get("Godcmd").get("password") - + # Add channel-specific app credentials current_channel_type = local_conf.get("channel_type", "") if current_channel_type == "feishu": @@ -236,11 +291,11 @@ def _build_config(): elif current_channel_type == "dingtalk": config["app_id"] = local_conf.get("dingtalk_client_id") config["app_secret"] = local_conf.get("dingtalk_client_secret") - elif current_channel_type == "wechatmp" or current_channel_type == "wechatmp_service": + elif current_channel_type in ("wechatmp", "wechatmp_service"): config["app_id"] = local_conf.get("wechatmp_app_id") config["app_secret"] = local_conf.get("wechatmp_app_secret") elif current_channel_type == "wechatcom_app": config["app_id"] = local_conf.get("wechatcomapp_agent_id") config["app_secret"] = local_conf.get("wechatcomapp_secret") - + return config From 5e3eccb3f617a97c4e62f312a6508f12a3ee7848 Mon Sep 17 00:00:00 2001 From: zhayujie Date: Fri, 20 Feb 2026 23:44:05 +0800 Subject: [PATCH 3/4] feat: support memory service --- agent/memory/service.py | 167 ++++++++++++++++++++++++++++++++++++++++ common/cloud_client.py | 37 +++++++++ 2 files changed, 204 insertions(+) create mode 100644 agent/memory/service.py diff --git a/agent/memory/service.py b/agent/memory/service.py new file mode 100644 index 00000000..6456e296 --- /dev/null +++ b/agent/memory/service.py @@ -0,0 +1,167 @@ +""" +Memory service for handling memory query operations via cloud protocol. + +Provides a unified interface for listing and reading memory files, +callable from the cloud client (LinkAI) or a future web console. + +Memory file layout (under workspace_root): + MEMORY.md -> type: global + memory/2026-02-20.md -> type: daily +""" + +import os +from datetime import datetime +from typing import Dict, List, Optional +from pathlib import Path +from common.log import logger + + +class MemoryService: + """ + High-level service for memory file queries. + Operates directly on the filesystem — no MemoryManager dependency. + """ + + def __init__(self, workspace_root: str): + """ + :param workspace_root: Workspace root directory (e.g. ~/cow) + """ + self.workspace_root = workspace_root + self.memory_dir = os.path.join(workspace_root, "memory") + + # ------------------------------------------------------------------ + # list — paginated file metadata + # ------------------------------------------------------------------ + def list_files(self, page: int = 1, page_size: int = 20) -> dict: + """ + List all memory files with metadata (without content). + + Returns:: + + { + "page": 1, + "page_size": 20, + "total": 15, + "list": [ + {"filename": "MEMORY.md", "type": "global", "size": 2048, "updated_at": "2026-02-20 10:00:00"}, + {"filename": "2026-02-20.md", "type": "daily", "size": 512, "updated_at": "2026-02-20 09:30:00"}, + ... + ] + } + """ + files: List[dict] = [] + + # 1. Global memory — MEMORY.md in workspace root + global_path = os.path.join(self.workspace_root, "MEMORY.md") + if os.path.isfile(global_path): + files.append(self._file_info(global_path, "MEMORY.md", "global")) + + # 2. Daily memory files — memory/*.md (sorted newest first) + if os.path.isdir(self.memory_dir): + daily_files = [] + for name in os.listdir(self.memory_dir): + full = os.path.join(self.memory_dir, name) + if os.path.isfile(full) and name.endswith(".md"): + daily_files.append((name, full)) + # Sort by filename descending (newest date first) + daily_files.sort(key=lambda x: x[0], reverse=True) + for name, full in daily_files: + files.append(self._file_info(full, name, "daily")) + + total = len(files) + + # Paginate + start = (page - 1) * page_size + end = start + page_size + page_items = files[start:end] + + return { + "page": page, + "page_size": page_size, + "total": total, + "list": page_items, + } + + # ------------------------------------------------------------------ + # content — read a single file + # ------------------------------------------------------------------ + def get_content(self, filename: str) -> dict: + """ + Read the full content of a memory file. + + :param filename: File name, e.g. ``MEMORY.md`` or ``2026-02-20.md`` + :return: dict with ``filename`` and ``content`` + :raises FileNotFoundError: if the file does not exist + """ + path = self._resolve_path(filename) + if not os.path.isfile(path): + raise FileNotFoundError(f"Memory file not found: {filename}") + + with open(path, "r", encoding="utf-8") as f: + content = f.read() + + return { + "filename": filename, + "content": content, + } + + # ------------------------------------------------------------------ + # dispatch — single entry point for protocol messages + # ------------------------------------------------------------------ + def dispatch(self, action: str, payload: Optional[dict] = None) -> dict: + """ + Dispatch a memory management action. + + :param action: ``list`` or ``content`` + :param payload: action-specific payload + :return: protocol-compatible response dict + """ + payload = payload or {} + try: + if action == "list": + page = payload.get("page", 1) + page_size = payload.get("page_size", 20) + result_payload = self.list_files(page=page, page_size=page_size) + return {"action": action, "code": 200, "message": "success", "payload": result_payload} + + elif action == "content": + filename = payload.get("filename") + if not filename: + return {"action": action, "code": 400, "message": "filename is required", "payload": None} + result_payload = self.get_content(filename) + return {"action": action, "code": 200, "message": "success", "payload": result_payload} + + else: + return {"action": action, "code": 400, "message": f"unknown action: {action}", "payload": None} + + except FileNotFoundError as e: + return {"action": action, "code": 404, "message": str(e), "payload": None} + except Exception as e: + logger.error(f"[MemoryService] dispatch error: action={action}, error={e}") + return {"action": action, "code": 500, "message": str(e), "payload": None} + + # ------------------------------------------------------------------ + # internal helpers + # ------------------------------------------------------------------ + def _resolve_path(self, filename: str) -> str: + """ + Resolve a filename to its absolute path. + + - ``MEMORY.md`` → ``{workspace_root}/MEMORY.md`` + - ``2026-02-20.md`` → ``{workspace_root}/memory/2026-02-20.md`` + """ + if filename == "MEMORY.md": + return os.path.join(self.workspace_root, filename) + return os.path.join(self.memory_dir, filename) + + @staticmethod + def _file_info(path: str, filename: str, file_type: str) -> dict: + """Build a file metadata dict.""" + stat = os.stat(path) + updated_at = datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S") + return { + "filename": filename, + "type": file_type, + "size": stat.st_size, + "updated_at": updated_at, + } diff --git a/common/cloud_client.py b/common/cloud_client.py index 6840fe4a..c3dd1b83 100644 --- a/common/cloud_client.py +++ b/common/cloud_client.py @@ -27,6 +27,7 @@ class CloudClient(LinkAIClient): self.client_type = channel.channel_type self.channel_mgr = None self._skill_service = None + self._memory_service = None @property def skill_service(self): @@ -45,6 +46,21 @@ class CloudClient(LinkAIClient): logger.error(f"[CloudClient] Failed to init SkillService: {e}") return self._skill_service + @property + def memory_service(self): + """Lazy-init MemoryService.""" + if self._memory_service is None: + try: + from agent.memory.service import MemoryService + from config import conf + from common.utils import expand_path + workspace_root = expand_path(conf().get("agent_workspace", "~/cow")) + self._memory_service = MemoryService(workspace_root) + logger.debug("[CloudClient] MemoryService initialised") + except Exception as e: + logger.error(f"[CloudClient] Failed to init MemoryService: {e}") + return self._memory_service + # ------------------------------------------------------------------ # message push callback # ------------------------------------------------------------------ @@ -186,6 +202,27 @@ class CloudClient(LinkAIClient): return svc.dispatch(action, payload) + # ------------------------------------------------------------------ + # memory callback + # ------------------------------------------------------------------ + def on_memory(self, data: dict) -> dict: + """ + Handle MEMORY messages from the cloud console. + Delegates to MemoryService.dispatch for the actual operations. + + :param data: message data with 'action', 'clientId', 'payload' + :return: response dict + """ + action = data.get("action", "") + payload = data.get("payload") + logger.info(f"[CloudClient] on_memory: action={action}") + + svc = self.memory_service + if svc is None: + return {"action": action, "code": 500, "message": "MemoryService not available", "payload": None} + + return svc.dispatch(action, payload) + # ------------------------------------------------------------------ # channel restart helpers # ------------------------------------------------------------------ From 1c71c4e38bfab060f60bf2bfca0c4b9227bf51df Mon Sep 17 00:00:00 2001 From: zhayujie Date: Sat, 21 Feb 2026 00:39:36 +0800 Subject: [PATCH 4/4] feat: agent chat service --- agent/chat/__init__.py | 3 + agent/chat/service.py | 168 ++++++++++++++++++++++++++++++++++ bridge/agent_event_handler.py | 2 +- common/cloud_client.py | 37 ++++++++ 4 files changed, 209 insertions(+), 1 deletion(-) create mode 100644 agent/chat/__init__.py create mode 100644 agent/chat/service.py diff --git a/agent/chat/__init__.py b/agent/chat/__init__.py new file mode 100644 index 00000000..69a90a9d --- /dev/null +++ b/agent/chat/__init__.py @@ -0,0 +1,3 @@ +from agent.chat.service import ChatService + +__all__ = ["ChatService"] diff --git a/agent/chat/service.py b/agent/chat/service.py new file mode 100644 index 00000000..23a121a7 --- /dev/null +++ b/agent/chat/service.py @@ -0,0 +1,168 @@ +""" +ChatService - Wraps the Agent stream execution to produce CHAT protocol chunks. + +Translates agent events (message_update, message_end, tool_execution_end, etc.) +into the CHAT socket protocol format (content chunks with segment_id, tool_calls chunks). +""" + +import time +from typing import Callable, Optional + +from common.log import logger + + +class ChatService: + """ + High-level service that runs an Agent for a given query and streams + the results as CHAT protocol chunks via a callback. + + Usage: + svc = ChatService(agent_bridge) + svc.run(query, session_id, send_chunk_fn) + """ + + def __init__(self, agent_bridge): + """ + :param agent_bridge: AgentBridge instance (manages agent lifecycle) + """ + self.agent_bridge = agent_bridge + + def run(self, query: str, session_id: str, send_chunk_fn: Callable[[dict], None]): + """ + Run the agent for *query* and stream results back via *send_chunk_fn*. + + The method blocks until the agent finishes. After it returns the SDK + will automatically send the final (streaming=false) message. + + :param query: user query text + :param session_id: session identifier for agent isolation + :param send_chunk_fn: callable(chunk_data: dict) to send a streaming chunk + """ + agent = self.agent_bridge.get_agent(session_id=session_id) + if agent is None: + raise RuntimeError("Failed to initialise agent for the session") + + # State shared between the event callback and this method + state = _StreamState() + + def on_event(event: dict): + """Translate agent events into CHAT protocol chunks.""" + event_type = event.get("type") + data = event.get("data", {}) + + if event_type == "message_update": + # Incremental text delta + delta = data.get("delta", "") + if delta: + send_chunk_fn({ + "chunk_type": "content", + "delta": delta, + "segment_id": state.segment_id, + }) + + elif event_type == "message_end": + # A content segment finished. + tool_calls = data.get("tool_calls", []) + if tool_calls: + # After tool_calls are executed the next content will be + # a new segment; collect tool results until turn_end. + state.pending_tool_results = [] + + elif event_type == "tool_execution_end": + tool_name = data.get("tool_name", "") + arguments = data.get("arguments", {}) + result = data.get("result", "") + status = data.get("status", "unknown") + execution_time = data.get("execution_time", 0) + elapsed_str = f"{execution_time:.2f}s" + + # Serialise result to string if needed + if not isinstance(result, str): + import json + try: + result = json.dumps(result, ensure_ascii=False) + except Exception: + result = str(result) + + tool_info = { + "name": tool_name, + "arguments": arguments, + "result": result, + "status": status, + "elapsed": elapsed_str, + } + + if state.pending_tool_results is not None: + state.pending_tool_results.append(tool_info) + + elif event_type == "turn_end": + has_tool_calls = data.get("has_tool_calls", False) + if has_tool_calls and state.pending_tool_results: + # Flush collected tool results as a single tool_calls chunk + send_chunk_fn({ + "chunk_type": "tool_calls", + "tool_calls": state.pending_tool_results, + }) + state.pending_tool_results = None + # Next content belongs to a new segment + state.segment_id += 1 + + # Run the agent with our event callback --------------------------- + logger.info(f"[ChatService] Starting agent run: session={session_id}, query={query[:80]}") + + from config import conf + max_context_turns = conf().get("agent_max_context_turns", 30) + + # Get full system prompt with skills + full_system_prompt = agent.get_full_system_prompt() + + # Create a copy of messages for this execution + with agent.messages_lock: + messages_copy = agent.messages.copy() + original_length = len(agent.messages) + + from agent.protocol.agent_stream import AgentStreamExecutor + + executor = AgentStreamExecutor( + agent=agent, + model=agent.model, + system_prompt=full_system_prompt, + tools=agent.tools, + max_turns=agent.max_steps, + on_event=on_event, + messages=messages_copy, + max_context_turns=max_context_turns, + ) + + try: + response = executor.run_stream(query) + except Exception: + # If executor cleared messages (context overflow), sync back + if len(executor.messages) == 0: + with agent.messages_lock: + agent.messages.clear() + logger.info("[ChatService] Cleared agent message history after executor recovery") + raise + + # Append only the NEW messages from this execution (thread-safe) + with agent.messages_lock: + new_messages = executor.messages[original_length:] + agent.messages.extend(new_messages) + + # Store executor reference for files_to_send access + agent.stream_executor = executor + + # Execute post-process tools + agent._execute_post_process_tools() + + logger.info(f"[ChatService] Agent run completed: session={session_id}") + + +class _StreamState: + """Mutable state shared between the event callback and the run method.""" + + def __init__(self): + self.segment_id: int = 0 + # None means we are not accumulating tool results right now. + # A list means we are in the middle of a tool-execution phase. + self.pending_tool_results: Optional[list] = None diff --git a/bridge/agent_event_handler.py b/bridge/agent_event_handler.py index 17a09200..8bc7f628 100644 --- a/bridge/agent_event_handler.py +++ b/bridge/agent_event_handler.py @@ -74,7 +74,7 @@ class AgentEventHandler: # Only send thinking process if followed by tool calls if tool_calls: if self.current_thinking.strip(): - logger.debug(f"💭 {self.current_thinking.strip()[:200]}{'...' if len(self.current_thinking) > 200 else ''}") + logger.info(f"💭 {self.current_thinking.strip()[:200]}{'...' if len(self.current_thinking) > 200 else ''}") # Send thinking process to channel self._send_to_channel(f"{self.current_thinking.strip()}") else: diff --git a/common/cloud_client.py b/common/cloud_client.py index c3dd1b83..de3af472 100644 --- a/common/cloud_client.py +++ b/common/cloud_client.py @@ -28,6 +28,7 @@ class CloudClient(LinkAIClient): self.channel_mgr = None self._skill_service = None self._memory_service = None + self._chat_service = None @property def skill_service(self): @@ -61,6 +62,20 @@ class CloudClient(LinkAIClient): logger.error(f"[CloudClient] Failed to init MemoryService: {e}") return self._memory_service + @property + def chat_service(self): + """Lazy-init ChatService (requires AgentBridge via Bridge singleton).""" + if self._chat_service is None: + try: + from agent.chat.service import ChatService + from bridge.bridge import Bridge + agent_bridge = Bridge().get_agent_bridge() + self._chat_service = ChatService(agent_bridge) + logger.debug("[CloudClient] ChatService initialised") + except Exception as e: + logger.error(f"[CloudClient] Failed to init ChatService: {e}") + return self._chat_service + # ------------------------------------------------------------------ # message push callback # ------------------------------------------------------------------ @@ -223,6 +238,28 @@ class CloudClient(LinkAIClient): return svc.dispatch(action, payload) + # ------------------------------------------------------------------ + # chat callback + # ------------------------------------------------------------------ + def on_chat(self, data: dict, send_chunk_fn): + """ + Handle CHAT messages from the cloud console. + Runs the agent in streaming mode and sends chunks back via send_chunk_fn. + + :param data: message data with 'action' and 'payload' (query, session_id) + :param send_chunk_fn: callable(chunk_data: dict) to send one streaming chunk + """ + payload = data.get("payload", {}) + query = payload.get("query", "") + session_id = payload.get("session_id", "cloud_console") + logger.info(f"[CloudClient] on_chat: session={session_id}, query={query[:80]}") + + svc = self.chat_service + if svc is None: + raise RuntimeError("ChatService not available") + + svc.run(query=query, session_id=session_id, send_chunk_fn=send_chunk_fn) + # ------------------------------------------------------------------ # channel restart helpers # ------------------------------------------------------------------