fix:pfc多重存在

This commit is contained in:
SengokuCola
2025-04-05 00:11:28 +08:00
parent d108a9ab2a
commit 6e89534376
6 changed files with 204 additions and 147 deletions

View File

@@ -17,6 +17,7 @@ from ..storage.storage import MessageStorage
from .chat_observer import ChatObserver
from .pfc_KnowledgeFetcher import KnowledgeFetcher
from .reply_checker import ReplyChecker
from .pfc_utils import get_items_from_json
import json
import time
@@ -128,43 +129,18 @@ judge_conversation: 判断对话是否结束,当发现对话目标已经达到
content, _ = await self.llm.generate_response_async(prompt)
logger.debug(f"LLM原始返回内容: {content}")
# 清理内容,尝试提取JSON部分
content = content.strip()
try:
# 尝试直接解析
result = json.loads(content)
except json.JSONDecodeError:
# 如果直接解析失败尝试查找和提取JSON部分
import re
json_pattern = r'\{[^{}]*\}'
json_match = re.search(json_pattern, content)
if json_match:
try:
result = json.loads(json_match.group())
except json.JSONDecodeError:
logger.error("提取的JSON内容解析失败返回默认行动")
return "direct_reply", "JSON解析失败选择直接回复"
else:
# 如果找不到JSON尝试从文本中提取行动和原因
if "direct_reply" in content.lower():
return "direct_reply", "从文本中提取的行动"
elif "fetch_knowledge" in content.lower():
return "fetch_knowledge", "从文本中提取的行动"
elif "wait" in content.lower():
return "wait", "从文本中提取的行动"
elif "listening" in content.lower():
return "listening", "从文本中提取的行动"
elif "rethink_goal" in content.lower():
return "rethink_goal", "从文本中提取的行动"
elif "judge_conversation" in content.lower():
return "judge_conversation", "从文本中提取的行动"
else:
logger.error("无法从返回内容中提取行动类型")
return "direct_reply", "无法解析响应,选择直接回复"
# 使用简化函数提取JSON内容
success, result = get_items_from_json(
content,
"action", "reason",
default_values={"action": "direct_reply", "reason": "默认原因"}
)
# 验证JSON字段
action = result.get("action", "direct_reply")
reason = result.get("reason", "默认原因")
if not success:
return "direct_reply", "JSON解析失败选择直接回复"
action = result["action"]
reason = result["reason"]
# 验证action类型
if action not in ["direct_reply", "fetch_knowledge", "wait", "listening", "rethink_goal", "judge_conversation"]:
@@ -195,6 +171,8 @@ class GoalAnalyzer:
self.name = global_config.BOT_NICKNAME
self.nick_name = global_config.BOT_ALIAS_NAMES
self.chat_observer = ChatObserver.get_instance(stream_id)
self.current_goal_and_reason = None
async def analyze_goal(self) -> Tuple[str, str, str]:
"""分析对话历史并设定目标
@@ -239,48 +217,20 @@ class GoalAnalyzer:
content, _ = await self.llm.generate_response_async(prompt)
logger.debug(f"LLM原始返回内容: {content}")
# 清理和验证返回内容
if not content or not isinstance(content, str):
logger.error("LLM返回内容为空或格式不正确")
continue
# 尝试提取JSON部分
content = content.strip()
try:
# 尝试直接解析
result = json.loads(content)
except json.JSONDecodeError:
# 如果直接解析失败尝试查找和提取JSON部分
import re
json_pattern = r'\{[^{}]*\}'
json_match = re.search(json_pattern, content)
if json_match:
try:
result = json.loads(json_match.group())
except json.JSONDecodeError:
logger.error(f"提取的JSON内容解析失败重试第{retry + 1}")
continue
else:
logger.error(f"无法在返回内容中找到有效的JSON重试第{retry + 1}")
continue
# 使用简化函数提取JSON内容
success, result = get_items_from_json(
content,
"goal", "reasoning",
required_types={"goal": str, "reasoning": str}
)
# 验证JSON字段
if not all(key in result for key in ["goal", "reasoning"]):
logger.error(f"JSON缺少必要字段实际内容: {result},重试第{retry + 1}")
if not success:
logger.error(f"无法解析JSON重试第{retry + 1}")
continue
goal = result["goal"]
reasoning = result["reasoning"]
# 验证字段内容
if not isinstance(goal, str) or not isinstance(reasoning, str):
logger.error(f"JSON字段类型错误goal和reasoning必须是字符串重试第{retry + 1}")
continue
if not goal.strip() or not reasoning.strip():
logger.error(f"JSON字段内容为空重试第{retry + 1}")
continue
# 使用默认的方法
method = "以友好的态度回应"
return goal, method, reasoning
@@ -330,58 +280,21 @@ class GoalAnalyzer:
content, _ = await self.llm.generate_response_async(prompt)
logger.debug(f"LLM原始返回内容: {content}")
# 清理和验证返回内容
if not content or not isinstance(content, str):
logger.error("LLM返回内容为空或格式不正确")
return False, False, "确保对话顺利进行"
# 尝试提取JSON部分
content = content.strip()
try:
# 尝试直接解析
result = json.loads(content)
except json.JSONDecodeError:
# 如果直接解析失败尝试查找和提取JSON部分
import re
json_pattern = r'\{[^{}]*\}'
json_match = re.search(json_pattern, content)
if json_match:
try:
result = json.loads(json_match.group())
except json.JSONDecodeError as e:
logger.error(f"提取的JSON内容解析失败: {e}")
return False, False, "确保对话顺利进行"
else:
logger.error("无法在返回内容中找到有效的JSON")
return False, False, "确保对话顺利进行"
# 使用简化函数提取JSON内容
success, result = get_items_from_json(
content,
"goal_achieved", "stop_conversation", "reason",
required_types={
"goal_achieved": bool,
"stop_conversation": bool,
"reason": str
}
)
# 验证JSON字段
if not all(key in result for key in ["goal_achieved", "stop_conversation", "reason"]):
logger.error(f"JSON缺少必要字段实际内容: {result}")
return False, False, "确保对话顺利进行"
goal_achieved = result["goal_achieved"]
stop_conversation = result["stop_conversation"]
reason = result["reason"]
# 验证字段类型
if not isinstance(goal_achieved, bool):
logger.error("goal_achieved 必须是布尔值")
return False, False, "确保对话顺利进行"
if not isinstance(stop_conversation, bool):
logger.error("stop_conversation 必须是布尔值")
return False, False, "确保对话顺利进行"
if not isinstance(reason, str):
logger.error("reason 必须是字符串")
return False, False, "确保对话顺利进行"
if not reason.strip():
logger.error("reason 不能为空")
if not success:
return False, False, "确保对话顺利进行"
return goal_achieved, stop_conversation, reason
return result["goal_achieved"], result["stop_conversation"], result["reason"]
except Exception as e:
logger.error(f"分析对话目标时出错: {str(e)}")
@@ -536,25 +449,66 @@ class ReplyGenerator:
class Conversation:
# 类级别的实例管理
_instances: Dict[str, 'Conversation'] = {}
_instance_lock = asyncio.Lock() # 类级别的全局锁
_init_events: Dict[str, asyncio.Event] = {} # 初始化完成事件
_initializing: Dict[str, bool] = {} # 标记是否正在初始化
@classmethod
def get_instance(cls, stream_id: str) -> 'Conversation':
"""获取或创建对话实例"""
if stream_id not in cls._instances:
cls._instances[stream_id] = cls(stream_id)
logger.info(f"创建新的对话实例: {stream_id}")
return cls._instances[stream_id]
async def get_instance(cls, stream_id: str) -> Optional['Conversation']:
"""获取或创建对话实例
Args:
stream_id: 聊天流ID
Returns:
Optional[Conversation]: 对话实例如果创建或等待失败则返回None
"""
try:
# 使用全局锁来确保线程安全
async with cls._instance_lock:
# 如果已经在初始化中,等待初始化完成
if stream_id in cls._initializing and cls._initializing[stream_id]:
# 释放锁等待初始化
cls._instance_lock.release()
try:
await asyncio.wait_for(cls._init_events[stream_id].wait(), timeout=5.0)
except asyncio.TimeoutError:
logger.error(f"等待实例 {stream_id} 初始化超时")
return None
finally:
await cls._instance_lock.acquire()
# 如果实例不存在,创建新实例
if stream_id not in cls._instances:
cls._instances[stream_id] = cls(stream_id)
cls._init_events[stream_id] = asyncio.Event()
cls._initializing[stream_id] = True
logger.info(f"创建新的对话实例: {stream_id}")
return cls._instances[stream_id]
except Exception as e:
logger.error(f"获取对话实例失败: {e}")
return None
@classmethod
def remove_instance(cls, stream_id: str):
"""删除对话实例"""
if stream_id in cls._instances:
# 停止相关组件
instance = cls._instances[stream_id]
instance.chat_observer.stop()
# 删除实例
del cls._instances[stream_id]
logger.info(f"已删除对话实例 {stream_id}")
async def remove_instance(cls, stream_id: str):
"""删除对话实例
Args:
stream_id: 聊天流ID
"""
async with cls._instance_lock:
if stream_id in cls._instances:
# 停止相关组件
instance = cls._instances[stream_id]
instance.chat_observer.stop()
# 删除实例
del cls._instances[stream_id]
if stream_id in cls._init_events:
del cls._init_events[stream_id]
if stream_id in cls._initializing:
del cls._initializing[stream_id]
logger.info(f"已删除对话实例 {stream_id}")
def __init__(self, stream_id: str):
"""初始化对话系统"""
@@ -592,13 +546,21 @@ class Conversation:
async def start(self):
"""开始对话流程"""
logger.info("对话系统启动")
self.should_continue = True
self.chat_observer.start() # 启动观察器
await asyncio.sleep(1)
# 启动对话循环
await self._conversation_loop()
try:
logger.info("对话系统启动")
self.should_continue = True
self.chat_observer.start() # 启动观察器
await asyncio.sleep(1)
# 启动对话循环
await self._conversation_loop()
except Exception as e:
logger.error(f"启动对话系统失败: {e}")
raise
finally:
# 标记初始化完成
self._init_events[self.stream_id].set()
self._initializing[self.stream_id] = False
async def _conversation_loop(self):
"""对话循环"""
# 获取最近的消息历史
@@ -724,7 +686,7 @@ class Conversation:
self.should_continue = False
self.state = ConversationState.ENDED
# 删除实例这会同时停止chat_observer
self.remove_instance(self.stream_id)
await self.remove_instance(self.stream_id)
async def _send_timeout_message(self):
"""发送超时结束消息"""