diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index e7e503aad..3bb39be2c 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -21,12 +21,12 @@ import re # 定义日志配置 # 获取项目根目录(假设本文件在src/chat/message_receive/下,根目录为上上上级目录) -PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../..')) +PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../..")) -ENABLE_S4U_CHAT = os.path.isfile(os.path.join(PROJECT_ROOT, 's4u.s4u')) +ENABLE_S4U_CHAT = os.path.isfile(os.path.join(PROJECT_ROOT, "s4u.s4u")) if ENABLE_S4U_CHAT: - print('''\nS4U私聊模式已开启\n!!!!!!!!!!!!!!!!!\n''') + print("""\nS4U私聊模式已开启\n!!!!!!!!!!!!!!!!!\n""") # 仅内部开启 # 配置主程序日志格式 diff --git a/src/mais4u/mais4u_chat/s4u_chat.py b/src/mais4u/mais4u_chat/s4u_chat.py index 149ecd9ea..579d36013 100644 --- a/src/mais4u/mais4u_chat/s4u_chat.py +++ b/src/mais4u/mais4u_chat/s4u_chat.py @@ -154,14 +154,9 @@ class S4UChat: # 两个消息队列 self._vip_queue = asyncio.PriorityQueue() self._normal_queue = asyncio.PriorityQueue() - - # 优先级管理配置 - self.normal_queue_max_size = 20 # 普通队列最大容量,可以后续移到配置文件 - self.interest_dict = {} # 用户兴趣分字典,可以后续移到配置文件. e.g. {"user_id": 5.0} - self.at_bot_priority_bonus = 100.0 # @机器人时的额外优先分 self._entry_counter = 0 # 保证FIFO的全局计数器 - self._new_message_event = asyncio.Event() # 用于唤醒处理器 + self._new_message_event = asyncio.Event() # 用于唤醒处理器 self._processing_task = asyncio.create_task(self._message_processor()) self._current_generation_task: Optional[asyncio.Task] = None @@ -201,18 +196,17 @@ class S4UChat: async def add_message(self, message: MessageRecv) -> None: """根据VIP状态和中断逻辑将消息放入相应队列。""" is_vip = self._is_vip(message) - # 优先级分数越高,优先级越高。 - new_priority_score = self._calculate_base_priority_score(message) - + new_priority = self._get_message_priority(message) + should_interrupt = False if self._current_generation_task and not self._current_generation_task.done(): if self._current_message_being_replied: - current_queue, current_priority_score, _, current_msg = self._current_message_being_replied - + current_queue, current_priority, _, current_msg = self._current_message_being_replied + # 规则:VIP从不被打断 if current_queue == "vip": - pass # Do nothing - + pass # Do nothing + # 规则:普通消息可以被打断 elif current_queue == "normal": # VIP消息可以打断普通消息 @@ -231,10 +225,12 @@ class S4UChat: elif new_sender_id == current_sender_id and new_priority_score >= current_priority_score: should_interrupt = True logger.info(f"[{self.stream_name}] Same user sent new message, interrupting.") - + if should_interrupt: if self.gpt.partial_response: - logger.warning(f"[{self.stream_name}] Interrupting reply. Already generated: '{self.gpt.partial_response}'") + logger.warning( + f"[{self.stream_name}] Interrupting reply. Already generated: '{self.gpt.partial_response}'" + ) self._current_generation_task.cancel() # asyncio.PriorityQueue 是最小堆,所以我们存入分数的相反数 @@ -253,9 +249,9 @@ class S4UChat: return await self._normal_queue.put(item) - + self._entry_counter += 1 - self._new_message_event.set() # 唤醒处理器 + self._new_message_event.set() # 唤醒处理器 async def _message_processor(self): """调度器:优先处理VIP队列,然后处理普通队列。""" @@ -276,12 +272,14 @@ class S4UChat: priority = -neg_priority # 检查普通消息是否超时 if time.time() - timestamp > self._MESSAGE_TIMEOUT_SECONDS: - logger.info(f"[{self.stream_name}] Discarding stale normal message: {message.processed_plain_text[:20]}...") + logger.info( + f"[{self.stream_name}] Discarding stale normal message: {message.processed_plain_text[:20]}..." + ) self._normal_queue.task_done() - continue # 处理下一条 + continue # 处理下一条 queue_name = "normal" else: - continue # 没有消息了,回去等事件 + continue # 没有消息了,回去等事件 self._current_message_being_replied = (queue_name, priority, entry_count, message) self._current_generation_task = asyncio.create_task(self._generate_and_send(message)) @@ -289,7 +287,9 @@ class S4UChat: try: await self._current_generation_task except asyncio.CancelledError: - logger.info(f"[{self.stream_name}] Reply generation was interrupted externally for {queue_name} message. The message will be discarded.") + logger.info( + f"[{self.stream_name}] Reply generation was interrupted externally for {queue_name} message. The message will be discarded." + ) # 被中断的消息应该被丢弃,而不是重新排队,以响应最新的用户输入。 # 旧的重新入队逻辑会导致所有中断的消息最终都被回复。 @@ -299,11 +299,11 @@ class S4UChat: self._current_generation_task = None self._current_message_being_replied = None # 标记任务完成 - if queue_name == 'vip': + if queue_name == "vip": self._vip_queue.task_done() else: self._normal_queue.task_done() - + # 检查是否还有任务,有则立即再次触发事件 if not self._vip_queue.empty() or not self._normal_queue.empty(): self._new_message_event.set() diff --git a/src/mais4u/mais4u_chat/s4u_prompt.py b/src/mais4u/mais4u_chat/s4u_prompt.py index c2aa4e654..24dba6029 100644 --- a/src/mais4u/mais4u_chat/s4u_prompt.py +++ b/src/mais4u/mais4u_chat/s4u_prompt.py @@ -104,7 +104,9 @@ class PromptBuilder: ) relation_info = "".join(relation_info_list) if relation_info: - relation_prompt = await global_prompt_manager.format_prompt("relation_prompt", relation_info=relation_info) + relation_prompt = await global_prompt_manager.format_prompt( + "relation_prompt", relation_info=relation_info + ) return relation_prompt async def build_memory_block(self, text: str) -> str: @@ -127,7 +129,7 @@ class PromptBuilder: ) talk_type = message.message_info.platform + ":" + message.chat_stream.user_info.user_id - + core_dialogue_list = [] background_dialogue_list = [] bot_id = str(global_config.bot.qq_account) @@ -147,7 +149,7 @@ class PromptBuilder: background_dialogue_list.append(msg_dict) except Exception as e: logger.error(f"无法处理历史消息记录: {msg_dict}, 错误: {e}") - + background_dialogue_prompt = "" if background_dialogue_list: latest_25_msgs = background_dialogue_list[-25:] @@ -195,9 +197,8 @@ class PromptBuilder: all_msg_seg_list.append(msg_seg_str) for msg in all_msg_seg_list: core_msg_str += msg - - return core_msg_str, background_dialogue_prompt + return core_msg_str, background_dialogue_prompt async def build_prompt_normal( self, @@ -206,19 +207,16 @@ class PromptBuilder: message_txt: str, sender_name: str = "某人", ) -> str: - identity_block, relation_info_block, memory_block = await asyncio.gather( - self.build_identity_block(), - self.build_relation_info(chat_stream), - self.build_memory_block(message_txt) + self.build_identity_block(), self.build_relation_info(chat_stream), self.build_memory_block(message_txt) ) core_dialogue_prompt, background_dialogue_prompt = self.build_chat_history_prompts(chat_stream, message) - + time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" - + template_name = "s4u_prompt" - + prompt = await global_prompt_manager.format_prompt( template_name, identity_block=identity_block, diff --git a/src/mais4u/mais4u_chat/s4u_stream_generator.py b/src/mais4u/mais4u_chat/s4u_stream_generator.py index fd6967823..449922886 100644 --- a/src/mais4u/mais4u_chat/s4u_stream_generator.py +++ b/src/mais4u/mais4u_chat/s4u_stream_generator.py @@ -135,7 +135,7 @@ class S4UStreamGenerator: to_yield = punctuation_buffer + sentence if to_yield.endswith((",", ",")): to_yield = to_yield.rstrip(",,") - + self.partial_response += to_yield yield to_yield punctuation_buffer = "" # 清空标点符号缓冲区