diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index 6ae951663..a3cc985bb 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -108,6 +108,17 @@ class StreamLoopManager: Returns: bool: 是否成功启动 """ + # 获取流上下文 + context = await self._get_stream_context(stream_id) + if not context: + logger.warning(f"无法获取流上下文: {stream_id}") + return False + + # 快速路径:如果流已存在且不是强制启动,无需处理 + if not force and context.stream_loop_task and not context.stream_loop_task.done(): + logger.debug(f"🔄 [流循环] stream={stream_id[:8]}, 循环已在运行,跳过启动") + return True + # 获取或创建该流的启动锁 if stream_id not in self._stream_start_locks: self._stream_start_locks[stream_id] = asyncio.Lock() @@ -116,17 +127,6 @@ class StreamLoopManager: # 使用锁防止并发启动同一个流的多个循环任务 async with lock: - # 获取流上下文 - context = await self._get_stream_context(stream_id) - if not context: - logger.warning(f"无法获取流上下文: {stream_id}") - return False - - # 快速路径:如果流已存在且不是强制启动,无需处理 - if not force and context.stream_loop_task and not context.stream_loop_task.done(): - logger.debug(f"🔄 [流循环] stream={stream_id[:8]}, 循环已在运行,跳过启动") - return True - # 如果是强制启动且任务仍在运行,先取消旧任务 if force and context.stream_loop_task and not context.stream_loop_task.done(): logger.warning(f"⚠️ [流循环] stream={stream_id[:8]}, 强制启动模式:先取消现有任务") @@ -238,7 +238,7 @@ class StreamLoopManager: # 3. 在处理前更新能量值(用于下次间隔计算) try: - await self._update_stream_energy(stream_id, context) + asyncio.create_task(self._update_stream_energy(stream_id, context)) except Exception as e: logger.debug(f"更新流能量失败 {stream_id}: {e}") @@ -370,9 +370,6 @@ class StreamLoopManager: if last_message: context.triggering_user_id = last_message.user_info.user_id - # 创建子任务用于刷新能量(不阻塞主流程) - asyncio.create_task(self._refresh_focus_energy(stream_id)) - # 设置 Chatter 正在处理的标志 context.is_chatter_processing = True logger.debug(f"设置 Chatter 处理标志: {stream_id}") @@ -388,11 +385,6 @@ class StreamLoopManager: success = results.get("success", False) if success: - # 处理成功后,再次刷新缓存中可能的新消息 - additional_messages = await self._flush_cached_messages_to_unread(stream_id) - if additional_messages: - logger.debug(f"处理完成后刷新新消息: stream={stream_id}, 数量={len(additional_messages)}") - process_time = time.time() - start_time logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)") else: diff --git a/src/chat/planner_actions/action_manager.py b/src/chat/planner_actions/action_manager.py index 0857089b4..a6f114b2c 100644 --- a/src/chat/planner_actions/action_manager.py +++ b/src/chat/planner_actions/action_manager.py @@ -204,8 +204,7 @@ class ChatterActionManager: action_prompt_display=reason, ) else: - # 改为同步等待,确保存储完成 - await database_api.store_action_info( + asyncio.create_task(database_api.store_action_info( chat_stream=chat_stream, action_build_into_prompt=False, action_prompt_display=reason, @@ -213,7 +212,7 @@ class ChatterActionManager: thinking_id=thinking_id or "", action_data={"reason": reason}, action_name="no_reply", - ) + )) return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""} @@ -229,9 +228,9 @@ class ChatterActionManager: target_message, ) - # 记录执行的动作到目标消息(改为同步等待) + # 记录执行的动作到目标消息 if success: - await self._record_action_to_message(chat_stream, action_name, target_message, action_data) + asyncio.create_task(self._record_action_to_message(chat_stream, action_name, target_message, action_data)) # 重置打断计数 await self._reset_interruption_count_after_action(chat_stream.stream_id) @@ -314,13 +313,13 @@ class ChatterActionManager: ) # 记录回复动作到目标消息 - await self._record_action_to_message(chat_stream, action_name, target_message, action_data) + asyncio.create_task(self._record_action_to_message(chat_stream, action_name, target_message, action_data)) - # 回复成功,重置打断计数(改为同步等待) + # 回复成功,重置打断计数 await self._reset_interruption_count_after_action(chat_stream.stream_id) return loop_info, reply_text, cycle_timers_reply - loop_info, reply_text, cycle_timers_reply = await asyncio.create_task(_after_reply()) + loop_info, reply_text, _ = await _after_reply() return {"action_type": action_name, "success": True, "reply_text": reply_text, "loop_info": loop_info} except Exception as e: diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index c518d6d1d..2b8472148 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -380,8 +380,8 @@ class DefaultReplyer: if not prompt: logger.warning("构建prompt失败,跳过回复生成") return False, None, None + from src.plugin_system.core.event_manager import event_manager - # 触发 POST_LLM 事件(请求 LLM 之前) if not from_plugin: result = await event_manager.trigger_event( @@ -1202,19 +1202,11 @@ class DefaultReplyer: return "" else: # 有 reply_message,正常处理 - # 统一处理 DatabaseMessages 对象和字典 - if isinstance(reply_message, DatabaseMessages): - platform = reply_message.chat_info.platform - user_id = reply_message.user_info.user_id - user_nickname = reply_message.user_info.user_nickname - user_cardname = reply_message.user_info.user_cardname - processed_plain_text = reply_message.processed_plain_text - else: - platform = reply_message.get("chat_info_platform") - user_id = reply_message.get("user_id") - user_nickname = reply_message.get("user_nickname") - user_cardname = reply_message.get("user_cardname") - processed_plain_text = reply_message.get("processed_plain_text") + platform = reply_message.chat_info.platform + user_id = reply_message.user_info.user_id + user_nickname = reply_message.user_info.user_nickname + user_cardname = reply_message.user_info.user_cardname + processed_plain_text = reply_message.processed_plain_text person_id = person_info_manager.get_person_id( platform, # type: ignore @@ -1237,7 +1229,7 @@ class DefaultReplyer: current_user_id = await person_info_manager.get_value(person_id, "user_id") current_platform = platform - if current_user_id == bot_user_id and current_platform == global_config.bot.platform: + if str(current_user_id) == bot_user_id and current_platform == global_config.bot.platform: sender = f"{person_name}(你)" else: # 如果不是bot自己,直接使用person_name diff --git a/src/plugin_system/apis/generator_api.py b/src/plugin_system/apis/generator_api.py index 86c8f3210..9c6fb0840 100644 --- a/src/plugin_system/apis/generator_api.py +++ b/src/plugin_system/apis/generator_api.py @@ -20,7 +20,7 @@ from src.common.logger import get_logger from src.plugin_system.base.component_types import ActionInfo if TYPE_CHECKING: - pass + from chat.replyer.default_generator import DefaultReplyer install(extra_lines=3) @@ -37,7 +37,7 @@ async def get_replyer( chat_stream: ChatStream | None = None, chat_id: str | None = None, request_type: str = "replyer", -) -> Any | None: +) -> "DefaultReplyer | None": """获取回复器对象 优先使用chat_stream,如果没有则使用chat_id直接查找。 @@ -163,6 +163,8 @@ async def generate_reply( assert llm_response_dict is not None, "llm_response_dict不应为None" # 虽然说不会出现llm_response为空的情况 if content := llm_response_dict.get("content", ""): # 处理为拟人化文本 + from src.chat.utils.utils import filter_system_format_content + content = filter_system_format_content(content) reply_set = process_human_text(content, enable_splitter, enable_chinese_typo) else: reply_set = [] diff --git a/src/plugins/built_in/affinity_flow_chatter/core/affinity_chatter.py b/src/plugins/built_in/affinity_flow_chatter/core/affinity_chatter.py index d26379689..509b7889c 100644 --- a/src/plugins/built_in/affinity_flow_chatter/core/affinity_chatter.py +++ b/src/plugins/built_in/affinity_flow_chatter/core/affinity_chatter.py @@ -107,22 +107,20 @@ class AffinityChatter(BaseChatter): logger.info(f"亲和力聊天处理器 {self.stream_id} 处理被取消") self.stats["failed_executions"] += 1 self.last_activity_time = time.time() - # 清理 processing_message_id - context.processing_message_id = None raise except Exception as e: logger.error(f"亲和力聊天处理器 {self.stream_id} 处理StreamContext时出错: {e}\n{traceback.format_exc()}") self.stats["failed_executions"] += 1 self.last_activity_time = time.time() - # 清理 processing_message_id - context.processing_message_id = None - return { "success": False, "stream_id": self.stream_id, "error_message": str(e), "executed_count": 0, } + finally: + # 清理 processing_message_id + context.processing_message_id = None def get_stats(self) -> dict[str, Any]: """ diff --git a/src/plugins/built_in/affinity_flow_chatter/core/affinity_interest_calculator.py b/src/plugins/built_in/affinity_flow_chatter/core/affinity_interest_calculator.py index b6cc5b959..07d58ef6f 100644 --- a/src/plugins/built_in/affinity_flow_chatter/core/affinity_interest_calculator.py +++ b/src/plugins/built_in/affinity_flow_chatter/core/affinity_interest_calculator.py @@ -140,7 +140,7 @@ class AffinityInterestCalculator(BaseInterestCalculator): # 5. 考虑连续不回复的阈值调整 adjusted_score = total_score - adjusted_reply_threshold, adjusted_action_threshold = self._apply_no_reply_threshold_adjustment() + adjusted_reply_threshold, adjusted_action_threshold = self._apply_threshold_adjustment() logger.debug( f"[Affinity兴趣计算] 连续不回复调整: 回复阈值 {self.reply_threshold:.3f} → {adjusted_reply_threshold:.3f}, " f"动作阈值 {global_config.affinity_flow.non_reply_action_interest_threshold:.3f} → {adjusted_action_threshold:.3f}" @@ -282,7 +282,7 @@ class AffinityInterestCalculator(BaseInterestCalculator): logger.debug("[提及分计算] 未提及机器人,返回0.0") return 0.0 - def _apply_no_reply_threshold_adjustment(self) -> tuple[float, float]: + def _apply_threshold_adjustment(self) -> tuple[float, float]: """应用阈值调整(包括连续不回复和回复后降低机制) Returns: diff --git a/src/plugins/built_in/affinity_flow_chatter/planner/plan_executor.py b/src/plugins/built_in/affinity_flow_chatter/planner/plan_executor.py index 4adaf1ade..d6f254b7e 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner/plan_executor.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner/plan_executor.py @@ -228,10 +228,6 @@ class ChatterPlanExecutor: error_message = str(e) logger.error(f"执行回复动作失败: {action_info.action_type}, 错误: {error_message}") - # 将机器人回复添加到已读消息中 - if success and action_info.action_message: - await self._add_bot_reply_to_read_messages(action_info, plan, reply_content) - execution_time = time.time() - start_time self.execution_stats["execution_times"].append(execution_time) diff --git a/src/plugins/built_in/affinity_flow_chatter/planner/planner.py b/src/plugins/built_in/affinity_flow_chatter/planner/planner.py index 6d664b3cb..63b9f740a 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner/planner.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner/planner.py @@ -81,16 +81,10 @@ class ChatterActionPlanner: except asyncio.CancelledError: logger.info(f"规划流程被取消: {self.chat_id}") self.planner_stats["failed_plans"] += 1 - # 确保清理 processing_message_id - if context: - context.processing_message_id = None raise except Exception as e: logger.error(f"规划流程出错: {e}") self.planner_stats["failed_plans"] += 1 - # 确保清理 processing_message_id - if context: - context.processing_message_id = None return [], None async def _enhanced_plan_flow(self, context: "StreamContext | None") -> tuple[list[dict[str, Any]], Any | None]: @@ -310,8 +304,6 @@ class ChatterActionPlanner: action_data={}, action_message=None, ) - # 检查是否需要退出Normal模式 - await self._check_exit_normal_mode(context) return [asdict(no_action)], None # 2. 检查是否有消息达到reply阈值 @@ -341,8 +333,6 @@ class ChatterActionPlanner: action_data={}, action_message=None, ) - # 检查是否需要退出Normal模式 - await self._check_exit_normal_mode(context) return [asdict(no_action)], None # 记录当前正在处理的消息ID @@ -387,9 +377,6 @@ class ChatterActionPlanner: context.processing_message_id = None logger.debug("Normal模式 - 已清理处理标记") - # 8. 检查是否需要退出Normal模式 - await self._check_exit_normal_mode(context) - # respond动作不返回目标消息,因为它是统一回应所有未读消息 return [asdict(respond_action)], None else: @@ -406,25 +393,19 @@ class ChatterActionPlanner: # 更新连续不回复计数 await self._update_interest_calculator_state(replied=False) - # 检查是否需要退出Normal模式 - await self._check_exit_normal_mode(context) - return [asdict(no_action)], None except asyncio.CancelledError: logger.info(f"Normal模式流程被取消: {self.chat_id}") self.planner_stats["failed_plans"] += 1 - # 清理处理标记 - if context: - context.processing_message_id = None raise except Exception as e: logger.error(f"Normal模式 - 流程出错: {e}") self.planner_stats["failed_plans"] += 1 - # 清理处理标记 - if context: - context.processing_message_id = None return [], None + finally: + # 检查是否需要退出Normal模式 + await self._check_exit_normal_mode(context) async def _check_enter_normal_mode(self, context: "StreamContext | None") -> None: """检查并执行进入Normal模式的判定