Files
chatgpt-on-wechat/channel/wechat/wechat_channel.py
2022-12-17 23:08:08 +08:00

105 lines
3.5 KiB
Python

"""
wechat channel
"""
import itchat
import json
from itchat.content import *
from channel.channel import Channel
from concurrent.futures import ThreadPoolExecutor
from common.log import logger
from config import conf
thead_pool = ThreadPoolExecutor(max_workers=8)
@itchat.msg_register(TEXT)
def handler_single_msg(msg):
WechatChannel().handle(msg)
@itchat.msg_register(TEXT, isGroupChat=True)
def handler_group_msg(msg):
WechatChannel().handle_group(msg)
class WechatChannel(Channel):
def __init__(self):
pass
def startup(self):
# login by scan QRCode
itchat.auto_login(enableCmdQR=2)
# start message listener
itchat.run()
def handle(self, msg):
logger.info("[WX]receive msg: " + json.dumps(msg, ensure_ascii=False))
from_user_id = msg['FromUserName']
to_user_id = msg['ToUserName']
other_user_id = msg['User']['UserName']
content = msg['Text']
if from_user_id == other_user_id and \
self.check_prefix(content, conf().get('single_chat_prefix')):
str_list = content.split('bot', 1)
if len(str_list) == 2:
content = str_list[1].strip()
thead_pool.submit(self._do_send, content, from_user_id)
elif to_user_id == other_user_id and \
self.check_prefix(content, conf().get('single_chat_prefix')):
str_list = content.split('bot', 1)
if len(str_list) == 2:
content = str_list[1].strip()
thead_pool.submit(self._do_send, content, to_user_id)
def handle_group(self, msg):
logger.info("[WX]receive group msg: " + json.dumps(msg, ensure_ascii=False))
group_name = msg['User'].get('NickName', None)
if not group_name:
return ""
origin_content = msg['Content']
content = msg['Content']
content_list = content.split(' ', 1)
context_special_list = content.split('\u2005', 1)
if len(context_special_list) == 2:
content = context_special_list[1]
elif len(content_list) == 2:
content = content_list[1]
config = conf()
if group_name in config.get('group_name_white_list') \
and (msg['IsAt'] or self.check_prefix(origin_content, config.get('group_chat_prefix'))):
thead_pool.submit(self._do_send_group, content, msg)
def send(self, msg, receiver):
# time.sleep(random.randint(1, 3))
logger.info('[WX] sendMsg={}, receiver={}'.format(msg, receiver))
itchat.send(msg, toUserName=receiver)
def _do_send(self, query, reply_user_id):
if not query:
return
context = dict()
context['from_user_id'] = reply_user_id
reply_text = super().build_reply_content(query, context).strip()
if reply_text:
self.send(conf().get("single_chat_reply_prefix") + reply_text, reply_user_id)
def _do_send_group(self, query, msg):
if not query:
return
context = dict()
context['from_user_id'] = msg['ActualUserName']
reply_text = super().build_reply_content(query, context)
reply_text = '@' + msg['ActualNickName'] + ' ' + reply_text.strip()
if reply_text:
self.send(reply_text, msg['User']['UserName'])
def check_prefix(self, content, prefix_list):
for prefix in prefix_list:
if content.lower().startswith(prefix):
return True
return False