mirror of
https://github.com/zhayujie/chatgpt-on-wechat.git
synced 2026-06-02 09:48:22 +08:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
94004b095b | ||
|
|
f652d592bd | ||
|
|
186e18fe94 | ||
|
|
28eb67bc24 | ||
|
|
6c7e4aaf37 | ||
|
|
709a1317ef | ||
|
|
371e38cfa6 | ||
|
|
5a221848e9 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -10,3 +10,4 @@ nohup.out
|
||||
tmp
|
||||
plugins.json
|
||||
itchat.pkl
|
||||
*.log
|
||||
@@ -86,6 +86,7 @@ class ChatGPTBot(Bot,OpenAIImage):
|
||||
"top_p":1,
|
||||
"frequency_penalty":conf().get('frequency_penalty', 0.0), # [-2,2]之间,该值越大则更倾向于产生不同的内容
|
||||
"presence_penalty":conf().get('presence_penalty', 0.0), # [-2,2]之间,该值越大则更倾向于产生不同的内容
|
||||
"request_timeout": conf().get('request_timeout', 30), # 请求超时时间
|
||||
}
|
||||
|
||||
def reply_text(self, session:ChatGPTSession, session_id, retry_count=0) -> dict:
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
|
||||
|
||||
|
||||
from asyncio import CancelledError
|
||||
from concurrent.futures import Future, ThreadPoolExecutor
|
||||
import os
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from common.expired_dict import ExpiredDict
|
||||
from common.dequeue import Dequeue
|
||||
from channel.channel import Channel
|
||||
from bridge.reply import *
|
||||
from bridge.context import *
|
||||
@@ -20,8 +22,16 @@ except Exception as e:
|
||||
class ChatChannel(Channel):
|
||||
name = None # 登录的用户名
|
||||
user_id = None # 登录的用户id
|
||||
futures = {} # 记录每个session_id提交到线程池的future对象, 用于重置会话时把没执行的future取消掉,正在执行的不会被取消
|
||||
sessions = {} # 用于控制并发,每个session_id同时只能有一个context在处理
|
||||
lock = threading.Lock() # 用于控制对sessions的访问
|
||||
handler_pool = ThreadPoolExecutor(max_workers=8) # 处理消息的线程池
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
_thread = threading.Thread(target=self.consume)
|
||||
_thread.setDaemon(True)
|
||||
_thread.start()
|
||||
|
||||
|
||||
# 根据消息构造context,消息内容相关的触发项写在这里
|
||||
def _compose_context(self, ctype: ContextType, content, **kwargs):
|
||||
@@ -215,6 +225,74 @@ class ChatChannel(Channel):
|
||||
time.sleep(3+3*retry_cnt)
|
||||
self._send(reply, context, retry_cnt+1)
|
||||
|
||||
def thread_pool_callback(self, session_id):
|
||||
def func(worker:Future):
|
||||
try:
|
||||
worker_exception = worker.exception()
|
||||
if worker_exception:
|
||||
logger.exception("Worker return exception: {}".format(worker_exception))
|
||||
except CancelledError as e:
|
||||
logger.info("Worker cancelled, session_id = {}".format(session_id))
|
||||
except Exception as e:
|
||||
logger.exception("Worker raise exception: {}".format(e))
|
||||
with self.lock:
|
||||
self.sessions[session_id][1].release()
|
||||
return func
|
||||
|
||||
def produce(self, context: Context):
|
||||
session_id = context['session_id']
|
||||
with self.lock:
|
||||
if session_id not in self.sessions:
|
||||
self.sessions[session_id] = [Dequeue(), threading.BoundedSemaphore(conf().get("concurrency_in_session", 1))]
|
||||
if context.type == ContextType.TEXT and context.content.startswith("#"):
|
||||
self.sessions[session_id][0].putleft(context) # 优先处理管理命令
|
||||
else:
|
||||
self.sessions[session_id][0].put(context)
|
||||
|
||||
# 消费者函数,单独线程,用于从消息队列中取出消息并处理
|
||||
def consume(self):
|
||||
while True:
|
||||
with self.lock:
|
||||
session_ids = list(self.sessions.keys())
|
||||
for session_id in session_ids:
|
||||
context_queue, semaphore = self.sessions[session_id]
|
||||
if semaphore.acquire(blocking = False): # 等线程处理完毕才能删除
|
||||
if not context_queue.empty():
|
||||
context = context_queue.get()
|
||||
logger.debug("[WX] consume context: {}".format(context))
|
||||
future:Future = self.handler_pool.submit(self._handle, context)
|
||||
future.add_done_callback(self.thread_pool_callback(session_id))
|
||||
if session_id not in self.futures:
|
||||
self.futures[session_id] = []
|
||||
self.futures[session_id].append(future)
|
||||
elif semaphore._initial_value == semaphore._value+1: # 除了当前,没有任务再申请到信号量,说明所有任务都处理完毕
|
||||
self.futures[session_id] = [t for t in self.futures[session_id] if not t.done()]
|
||||
assert len(self.futures[session_id]) == 0, "thread pool error"
|
||||
del self.sessions[session_id]
|
||||
else:
|
||||
semaphore.release()
|
||||
time.sleep(0.1)
|
||||
|
||||
# 取消session_id对应的所有任务,只能取消排队的消息和已提交线程池但未执行的任务
|
||||
def cancel_session(self, session_id):
|
||||
with self.lock:
|
||||
if session_id in self.sessions:
|
||||
for future in self.futures[session_id]:
|
||||
future.cancel()
|
||||
cnt = self.sessions[session_id][0].qsize()
|
||||
if cnt>0:
|
||||
logger.info("Cancel {} messages in session {}".format(cnt, session_id))
|
||||
self.sessions[session_id][0] = Dequeue()
|
||||
|
||||
def cancel_all_session(self):
|
||||
with self.lock:
|
||||
for session_id in self.sessions:
|
||||
for future in self.futures[session_id]:
|
||||
future.cancel()
|
||||
cnt = self.sessions[session_id][0].qsize()
|
||||
if cnt>0:
|
||||
logger.info("Cancel {} messages in session {}".format(cnt, session_id))
|
||||
self.sessions[session_id][0] = Dequeue()
|
||||
|
||||
|
||||
def check_prefix(content, prefix_list):
|
||||
|
||||
@@ -5,6 +5,7 @@ wechat channel
|
||||
"""
|
||||
|
||||
import os
|
||||
import threading
|
||||
import requests
|
||||
import io
|
||||
import time
|
||||
@@ -17,18 +18,10 @@ from lib import itchat
|
||||
from lib.itchat.content import *
|
||||
from bridge.reply import *
|
||||
from bridge.context import *
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from config import conf
|
||||
from common.time_check import time_checker
|
||||
from common.expired_dict import ExpiredDict
|
||||
from plugins import *
|
||||
thread_pool = ThreadPoolExecutor(max_workers=8)
|
||||
|
||||
def thread_pool_callback(worker):
|
||||
worker_exception = worker.exception()
|
||||
if worker_exception:
|
||||
logger.exception("Worker return exception: {}".format(worker_exception))
|
||||
|
||||
|
||||
@itchat.msg_register(TEXT)
|
||||
def handler_single_msg(msg):
|
||||
@@ -73,7 +66,9 @@ def qrCallback(uuid,status,qrcode):
|
||||
try:
|
||||
from PIL import Image
|
||||
img = Image.open(io.BytesIO(qrcode))
|
||||
thread_pool.submit(img.show,"QRCode")
|
||||
_thread = threading.Thread(target=img.show, args=("QRCode",))
|
||||
_thread.setDaemon(True)
|
||||
_thread.start()
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
@@ -142,7 +137,7 @@ class WechatChannel(ChatChannel):
|
||||
logger.debug("[WX]receive voice msg: {}".format(cmsg.content))
|
||||
context = self._compose_context(ContextType.VOICE, cmsg.content, isgroup=False, msg=cmsg)
|
||||
if context:
|
||||
thread_pool.submit(self._handle, context).add_done_callback(thread_pool_callback)
|
||||
self.produce(context)
|
||||
|
||||
@time_checker
|
||||
@_check
|
||||
@@ -150,7 +145,7 @@ class WechatChannel(ChatChannel):
|
||||
logger.debug("[WX]receive text msg: {}, cmsg={}".format(json.dumps(cmsg._rawmsg, ensure_ascii=False), cmsg))
|
||||
context = self._compose_context(ContextType.TEXT, cmsg.content, isgroup=False, msg=cmsg)
|
||||
if context:
|
||||
thread_pool.submit(self._handle, context).add_done_callback(thread_pool_callback)
|
||||
self.produce(context)
|
||||
|
||||
@time_checker
|
||||
@_check
|
||||
@@ -158,7 +153,7 @@ class WechatChannel(ChatChannel):
|
||||
logger.debug("[WX]receive group msg: {}, cmsg={}".format(json.dumps(cmsg._rawmsg, ensure_ascii=False), cmsg))
|
||||
context = self._compose_context(ContextType.TEXT, cmsg.content, isgroup=True, msg=cmsg)
|
||||
if context:
|
||||
thread_pool.submit(self._handle, context).add_done_callback(thread_pool_callback)
|
||||
self.produce(context)
|
||||
|
||||
@time_checker
|
||||
@_check
|
||||
@@ -168,7 +163,7 @@ class WechatChannel(ChatChannel):
|
||||
logger.debug("[WX]receive voice for group msg: {}".format(cmsg.content))
|
||||
context = self._compose_context(ContextType.VOICE, cmsg.content, isgroup=True, msg=cmsg)
|
||||
if context:
|
||||
thread_pool.submit(self._handle, context).add_done_callback(thread_pool_callback)
|
||||
self.produce(context)
|
||||
|
||||
# 统一的发送函数,每个Channel自行实现,根据reply的type字段发送不同类型的消息
|
||||
def send(self, reply: Reply, context: Context):
|
||||
|
||||
@@ -5,7 +5,6 @@ wechaty channel
|
||||
Python Wechaty - https://github.com/wechaty/python-wechaty
|
||||
"""
|
||||
import base64
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import os
|
||||
import time
|
||||
import asyncio
|
||||
@@ -18,21 +17,18 @@ from bridge.context import *
|
||||
from channel.chat_channel import ChatChannel
|
||||
from channel.wechat.wechaty_message import WechatyMessage
|
||||
from common.log import logger
|
||||
from common.singleton import singleton
|
||||
from config import conf
|
||||
try:
|
||||
from voice.audio_convert import any_to_sil
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
thread_pool = ThreadPoolExecutor(max_workers=8)
|
||||
def thread_pool_callback(worker):
|
||||
worker_exception = worker.exception()
|
||||
if worker_exception:
|
||||
logger.exception("Worker return exception: {}".format(worker_exception))
|
||||
@singleton
|
||||
class WechatyChannel(ChatChannel):
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
super().__init__()
|
||||
|
||||
def startup(self):
|
||||
config = conf()
|
||||
@@ -41,6 +37,10 @@ class WechatyChannel(ChatChannel):
|
||||
asyncio.run(self.main())
|
||||
|
||||
async def main(self):
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
#将asyncio的loop传入处理线程
|
||||
self.handler_pool._initializer= lambda: asyncio.set_event_loop(loop)
|
||||
self.bot = Wechaty()
|
||||
self.bot.on('login', self.on_login)
|
||||
self.bot.on('message', self.on_message)
|
||||
@@ -122,8 +122,4 @@ class WechatyChannel(ChatChannel):
|
||||
context = self._compose_context(ctype, cmsg.content, isgroup=isgroup, msg=cmsg)
|
||||
if context:
|
||||
logger.info('[WX] receiveMsg={}, context={}'.format(cmsg, context))
|
||||
thread_pool.submit(self._handle_loop, context, asyncio.get_event_loop()).add_done_callback(thread_pool_callback)
|
||||
|
||||
def _handle_loop(self,context,loop):
|
||||
asyncio.set_event_loop(loop)
|
||||
self._handle(context)
|
||||
self.produce(context)
|
||||
33
common/dequeue.py
Normal file
33
common/dequeue.py
Normal file
@@ -0,0 +1,33 @@
|
||||
|
||||
from queue import Full, Queue
|
||||
from time import monotonic as time
|
||||
|
||||
# add implementation of putleft to Queue
|
||||
class Dequeue(Queue):
|
||||
def putleft(self, item, block=True, timeout=None):
|
||||
with self.not_full:
|
||||
if self.maxsize > 0:
|
||||
if not block:
|
||||
if self._qsize() >= self.maxsize:
|
||||
raise Full
|
||||
elif timeout is None:
|
||||
while self._qsize() >= self.maxsize:
|
||||
self.not_full.wait()
|
||||
elif timeout < 0:
|
||||
raise ValueError("'timeout' must be a non-negative number")
|
||||
else:
|
||||
endtime = time() + timeout
|
||||
while self._qsize() >= self.maxsize:
|
||||
remaining = endtime - time()
|
||||
if remaining <= 0.0:
|
||||
raise Full
|
||||
self.not_full.wait(remaining)
|
||||
self._putleft(item)
|
||||
self.unfinished_tasks += 1
|
||||
self.not_empty.notify()
|
||||
|
||||
def putleft_nowait(self, item):
|
||||
return self.putleft(item, block=False)
|
||||
|
||||
def _putleft(self, item):
|
||||
self.queue.appendleft(item)
|
||||
@@ -8,6 +8,10 @@ def _get_logger():
|
||||
console_handle = logging.StreamHandler(sys.stdout)
|
||||
console_handle.setFormatter(logging.Formatter('[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d] - %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'))
|
||||
file_handle = logging.FileHandler('run.log', encoding='utf-8')
|
||||
file_handle.setFormatter(logging.Formatter('[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d] - %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'))
|
||||
log.addHandler(file_handle)
|
||||
log.addHandler(console_handle)
|
||||
return log
|
||||
|
||||
|
||||
11
config.py
11
config.py
@@ -1,6 +1,7 @@
|
||||
# encoding:utf-8
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from common.log import logger
|
||||
|
||||
@@ -27,6 +28,7 @@ available_setting = {
|
||||
"group_chat_in_one_session": ["ChatGPT测试群"], # 支持会话上下文共享的群名称
|
||||
"trigger_by_self": False, # 是否允许机器人触发
|
||||
"image_create_prefix": ["画", "看", "找"], # 开启图片回复的前缀
|
||||
"concurrency_in_session": 1, # 同一会话最多有多少条消息在处理中,大于1可能乱序
|
||||
|
||||
# chatgpt会话参数
|
||||
"expires_in_seconds": 3600, # 无操作会话的过期时间
|
||||
@@ -37,12 +39,12 @@ available_setting = {
|
||||
"rate_limit_chatgpt": 20, # chatgpt的调用频率限制
|
||||
"rate_limit_dalle": 50, # openai dalle的调用频率限制
|
||||
|
||||
|
||||
# chatgpt api参数 参考https://platform.openai.com/docs/api-reference/chat/create
|
||||
"temperature": 0.9,
|
||||
"top_p": 1,
|
||||
"frequency_penalty": 0,
|
||||
"presence_penalty": 0,
|
||||
"request_timeout": 30, # chatgpt请求超时时间
|
||||
|
||||
# 语音设置
|
||||
"speech_recognition": False, # 是否开启语音识别
|
||||
@@ -75,11 +77,12 @@ available_setting = {
|
||||
"wechaty_puppet_service_token": "", # wechaty的token
|
||||
|
||||
# chatgpt指令自定义触发词
|
||||
"clear_memory_commands": ['#清除记忆'], # 重置会话指令
|
||||
"clear_memory_commands": ['#清除记忆'], # 重置会话指令,必须以#开头
|
||||
|
||||
# channel配置
|
||||
"channel_type": "wx", # 通道类型,支持wx,wxy和terminal
|
||||
|
||||
"debug": False, # 是否开启debug模式,开启后会打印更多日志
|
||||
|
||||
}
|
||||
|
||||
@@ -137,6 +140,10 @@ def load_config():
|
||||
else:
|
||||
config[name] = value
|
||||
|
||||
if config.get("debug", False):
|
||||
logger.setLevel(logging.DEBUG)
|
||||
logger.debug("[INIT] set log level to DEBUG")
|
||||
|
||||
logger.info("[INIT] load config: {}".format(config))
|
||||
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ from typing import Tuple
|
||||
from bridge.bridge import Bridge
|
||||
from bridge.context import ContextType
|
||||
from bridge.reply import Reply, ReplyType
|
||||
from config import load_config
|
||||
from config import conf, load_config
|
||||
import plugins
|
||||
from plugins import *
|
||||
from common import const
|
||||
@@ -126,7 +126,14 @@ class Godcmd(Plugin):
|
||||
else:
|
||||
with open(config_path,"r") as f:
|
||||
gconf=json.load(f)
|
||||
|
||||
|
||||
custom_commands = conf().get("clear_memory_commands", [])
|
||||
for custom_command in custom_commands:
|
||||
if custom_command and custom_command.startswith("#"):
|
||||
custom_command = custom_command[1:]
|
||||
if custom_command and custom_command not in COMMANDS["reset"]["alias"]:
|
||||
COMMANDS["reset"]["alias"].append(custom_command)
|
||||
|
||||
self.password = gconf["password"]
|
||||
self.admin_users = gconf["admin_users"] # 预存的管理员账号,这些账号不需要认证 TODO: 用户名每次都会变,目前不可用
|
||||
self.isrunning = True # 机器人是否运行中
|
||||
@@ -146,6 +153,7 @@ class Godcmd(Plugin):
|
||||
logger.debug("[Godcmd] on_handle_context. content: %s" % content)
|
||||
if content.startswith("#"):
|
||||
# msg = e_context['context']['msg']
|
||||
channel = e_context['channel']
|
||||
user = e_context['context']['receiver']
|
||||
session_id = e_context['context']['session_id']
|
||||
isgroup = e_context['context']['isgroup']
|
||||
@@ -181,6 +189,7 @@ class Godcmd(Plugin):
|
||||
elif cmd == "reset":
|
||||
if bottype in (const.CHATGPT, const.OPEN_AI):
|
||||
bot.sessions.clear_session(session_id)
|
||||
channel.cancel_session(session_id)
|
||||
ok, result = True, "会话已重置"
|
||||
else:
|
||||
ok, result = False, "当前对话机器人不支持重置会话"
|
||||
@@ -202,6 +211,7 @@ class Godcmd(Plugin):
|
||||
ok, result = True, "配置已重载"
|
||||
elif cmd == "resetall":
|
||||
if bottype in (const.CHATGPT, const.OPEN_AI):
|
||||
channel.cancel_all_session()
|
||||
bot.sessions.clear_all_session()
|
||||
ok, result = True, "重置所有会话成功"
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user