fix:pfc优化,会检查新消息
This commit is contained in:
@@ -1,6 +1,6 @@
|
|||||||
import time
|
import time
|
||||||
import asyncio
|
import asyncio
|
||||||
from typing import Optional, Dict, Any, List
|
from typing import Optional, Dict, Any, List, Tuple
|
||||||
from src.common.logger import get_module_logger
|
from src.common.logger import get_module_logger
|
||||||
from src.common.database import db
|
from src.common.database import db
|
||||||
from ..message.message_base import UserInfo
|
from ..message.message_base import UserInfo
|
||||||
@@ -63,8 +63,28 @@ class ChatObserver:
|
|||||||
Returns:
|
Returns:
|
||||||
bool: 是否有新消息
|
bool: 是否有新消息
|
||||||
"""
|
"""
|
||||||
return self.new_message_after(self.last_check_time)
|
logger.debug(f"检查距离上一次观察之后是否有了新消息: {self.last_check_time}")
|
||||||
|
|
||||||
|
query = {
|
||||||
|
"chat_id": self.stream_id,
|
||||||
|
"time": {"$gt": self.last_check_time}
|
||||||
|
}
|
||||||
|
|
||||||
|
# 只需要查询是否存在,不需要获取具体消息
|
||||||
|
new_message_exists = db.messages.find_one(query) is not None
|
||||||
|
|
||||||
|
if new_message_exists:
|
||||||
|
logger.debug("发现新消息")
|
||||||
|
self.last_check_time = time.time()
|
||||||
|
|
||||||
|
return new_message_exists
|
||||||
|
|
||||||
|
def get_new_message(self) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
|
||||||
|
"""获取上一次观察的时间点后的新消息,插入到历史记录中,并返回新消息和历史记录两个对象"""
|
||||||
|
messages = self.get_message_history(self.last_check_time)
|
||||||
|
for message in messages:
|
||||||
|
self._add_message_to_history(message)
|
||||||
|
return messages, self.message_history
|
||||||
|
|
||||||
def new_message_after(self, time_point: float) -> bool:
|
def new_message_after(self, time_point: float) -> bool:
|
||||||
"""判断是否在指定时间点后有新消息
|
"""判断是否在指定时间点后有新消息
|
||||||
@@ -75,6 +95,7 @@ class ChatObserver:
|
|||||||
Returns:
|
Returns:
|
||||||
bool: 是否有新消息
|
bool: 是否有新消息
|
||||||
"""
|
"""
|
||||||
|
logger.debug(f"判断是否在指定时间点后有新消息: {self.last_message_time} > {time_point}")
|
||||||
return self.last_message_time is None or self.last_message_time > time_point
|
return self.last_message_time is None or self.last_message_time > time_point
|
||||||
|
|
||||||
def _add_message_to_history(self, message: Dict[str, Any]):
|
def _add_message_to_history(self, message: Dict[str, Any]):
|
||||||
|
|||||||
@@ -348,19 +348,18 @@ class ReplyGenerator:
|
|||||||
knowledge_cache: Dict[str, str],
|
knowledge_cache: Dict[str, str],
|
||||||
previous_reply: Optional[str] = None,
|
previous_reply: Optional[str] = None,
|
||||||
retry_count: int = 0
|
retry_count: int = 0
|
||||||
) -> Tuple[str, bool]:
|
) -> str:
|
||||||
"""生成回复
|
"""生成回复
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
goal: 对话目标
|
goal: 对话目标
|
||||||
method: 实现方式
|
|
||||||
chat_history: 聊天历史
|
chat_history: 聊天历史
|
||||||
knowledge_cache: 知识缓存
|
knowledge_cache: 知识缓存
|
||||||
previous_reply: 上一次生成的回复(如果有)
|
previous_reply: 上一次生成的回复(如果有)
|
||||||
retry_count: 当前重试次数
|
retry_count: 当前重试次数
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Tuple[str, bool]: (生成的回复, 是否需要重新规划)
|
str: 生成的回复
|
||||||
"""
|
"""
|
||||||
# 构建提示词
|
# 构建提示词
|
||||||
logger.debug(f"开始生成回复:当前目标: {goal}")
|
logger.debug(f"开始生成回复:当前目标: {goal}")
|
||||||
@@ -421,29 +420,40 @@ class ReplyGenerator:
|
|||||||
try:
|
try:
|
||||||
content, _ = await self.llm.generate_response_async(prompt)
|
content, _ = await self.llm.generate_response_async(prompt)
|
||||||
logger.info(f"生成的回复: {content}")
|
logger.info(f"生成的回复: {content}")
|
||||||
|
is_new = self.chat_observer.check()
|
||||||
|
logger.debug(f"再看一眼聊天记录,{'有' if is_new else '没有'}新消息")
|
||||||
|
|
||||||
# 检查生成的回复是否合适
|
# 如果有新消息,重新生成回复
|
||||||
is_suitable, reason, need_replan = await self.reply_checker.check(
|
if is_new:
|
||||||
content, goal, retry_count
|
logger.info("检测到新消息,重新生成回复")
|
||||||
)
|
return await self.generate(
|
||||||
|
goal, chat_history, knowledge_cache,
|
||||||
|
None, retry_count
|
||||||
|
)
|
||||||
|
|
||||||
if not is_suitable:
|
return content
|
||||||
logger.warning(f"生成的回复不合适,原因: {reason}")
|
|
||||||
if need_replan:
|
|
||||||
logger.info("需要重新规划对话目标")
|
|
||||||
return "让我重新思考一下...", True
|
|
||||||
else:
|
|
||||||
# 递归调用,将当前回复作为previous_reply传入
|
|
||||||
return await self.generate(
|
|
||||||
goal, chat_history, knowledge_cache,
|
|
||||||
content, retry_count + 1
|
|
||||||
)
|
|
||||||
|
|
||||||
return content, False
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"生成回复时出错: {e}")
|
logger.error(f"生成回复时出错: {e}")
|
||||||
return "抱歉,我现在有点混乱,让我重新思考一下...", True
|
return "抱歉,我现在有点混乱,让我重新思考一下..."
|
||||||
|
|
||||||
|
async def check_reply(
|
||||||
|
self,
|
||||||
|
reply: str,
|
||||||
|
goal: str,
|
||||||
|
retry_count: int = 0
|
||||||
|
) -> Tuple[bool, str, bool]:
|
||||||
|
"""检查回复是否合适
|
||||||
|
|
||||||
|
Args:
|
||||||
|
reply: 生成的回复
|
||||||
|
goal: 对话目标
|
||||||
|
retry_count: 当前重试次数
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple[bool, str, bool]: (是否合适, 原因, 是否需要重新规划)
|
||||||
|
"""
|
||||||
|
return await self.reply_checker.check(reply, goal, retry_count)
|
||||||
|
|
||||||
|
|
||||||
class Conversation:
|
class Conversation:
|
||||||
@@ -620,17 +630,53 @@ class Conversation:
|
|||||||
if action == "direct_reply":
|
if action == "direct_reply":
|
||||||
self.state = ConversationState.GENERATING
|
self.state = ConversationState.GENERATING
|
||||||
messages = self.chat_observer.get_message_history(limit=30)
|
messages = self.chat_observer.get_message_history(limit=30)
|
||||||
self.generated_reply, need_replan = await self.reply_generator.generate(
|
self.generated_reply = await self.reply_generator.generate(
|
||||||
self.current_goal,
|
self.current_goal,
|
||||||
self.current_method,
|
self.current_method,
|
||||||
[self._convert_to_message(msg) for msg in messages],
|
[self._convert_to_message(msg) for msg in messages],
|
||||||
self.knowledge_cache
|
self.knowledge_cache
|
||||||
)
|
)
|
||||||
if need_replan:
|
|
||||||
self.state = ConversationState.RETHINKING
|
# 检查回复是否合适
|
||||||
self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal()
|
is_suitable, reason, need_replan = await self.reply_generator.check_reply(
|
||||||
else:
|
self.generated_reply,
|
||||||
await self._send_reply()
|
self.current_goal
|
||||||
|
)
|
||||||
|
|
||||||
|
if not is_suitable:
|
||||||
|
logger.warning(f"生成的回复不合适,原因: {reason}")
|
||||||
|
if need_replan:
|
||||||
|
self.state = ConversationState.RETHINKING
|
||||||
|
self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal()
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
# 重新生成回复
|
||||||
|
self.generated_reply = await self.reply_generator.generate(
|
||||||
|
self.current_goal,
|
||||||
|
self.current_method,
|
||||||
|
[self._convert_to_message(msg) for msg in messages],
|
||||||
|
self.knowledge_cache,
|
||||||
|
self.generated_reply # 将不合适的回复作为previous_reply传入
|
||||||
|
)
|
||||||
|
|
||||||
|
while self.chat_observer.check():
|
||||||
|
if not is_suitable:
|
||||||
|
logger.warning(f"生成的回复不合适,原因: {reason}")
|
||||||
|
if need_replan:
|
||||||
|
self.state = ConversationState.RETHINKING
|
||||||
|
self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal()
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
# 重新生成回复
|
||||||
|
self.generated_reply = await self.reply_generator.generate(
|
||||||
|
self.current_goal,
|
||||||
|
self.current_method,
|
||||||
|
[self._convert_to_message(msg) for msg in messages],
|
||||||
|
self.knowledge_cache,
|
||||||
|
self.generated_reply # 将不合适的回复作为previous_reply传入
|
||||||
|
)
|
||||||
|
|
||||||
|
await self._send_reply()
|
||||||
|
|
||||||
elif action == "fetch_knowledge":
|
elif action == "fetch_knowledge":
|
||||||
self.state = ConversationState.GENERATING
|
self.state = ConversationState.GENERATING
|
||||||
@@ -644,17 +690,36 @@ class Conversation:
|
|||||||
if knowledge != "未找到相关知识":
|
if knowledge != "未找到相关知识":
|
||||||
self.knowledge_cache[sources] = knowledge
|
self.knowledge_cache[sources] = knowledge
|
||||||
|
|
||||||
self.generated_reply, need_replan = await self.reply_generator.generate(
|
self.generated_reply = await self.reply_generator.generate(
|
||||||
self.current_goal,
|
self.current_goal,
|
||||||
self.current_method,
|
self.current_method,
|
||||||
[self._convert_to_message(msg) for msg in messages],
|
[self._convert_to_message(msg) for msg in messages],
|
||||||
self.knowledge_cache
|
self.knowledge_cache
|
||||||
)
|
)
|
||||||
if need_replan:
|
|
||||||
self.state = ConversationState.RETHINKING
|
# 检查回复是否合适
|
||||||
self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal()
|
is_suitable, reason, need_replan = await self.reply_generator.check_reply(
|
||||||
else:
|
self.generated_reply,
|
||||||
await self._send_reply()
|
self.current_goal
|
||||||
|
)
|
||||||
|
|
||||||
|
if not is_suitable:
|
||||||
|
logger.warning(f"生成的回复不合适,原因: {reason}")
|
||||||
|
if need_replan:
|
||||||
|
self.state = ConversationState.RETHINKING
|
||||||
|
self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal()
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
# 重新生成回复
|
||||||
|
self.generated_reply = await self.reply_generator.generate(
|
||||||
|
self.current_goal,
|
||||||
|
self.current_method,
|
||||||
|
[self._convert_to_message(msg) for msg in messages],
|
||||||
|
self.knowledge_cache,
|
||||||
|
self.generated_reply # 将不合适的回复作为previous_reply传入
|
||||||
|
)
|
||||||
|
|
||||||
|
await self._send_reply()
|
||||||
|
|
||||||
elif action == "rethink_goal":
|
elif action == "rethink_goal":
|
||||||
self.state = ConversationState.RETHINKING
|
self.state = ConversationState.RETHINKING
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ class ChatStream:
|
|||||||
self.platform = platform
|
self.platform = platform
|
||||||
self.user_info = user_info
|
self.user_info = user_info
|
||||||
self.group_info = group_info
|
self.group_info = group_info
|
||||||
self.create_time = data.get("create_time", int(time.time())) if data else int(time.time())
|
self.create_time = data.get("create_time", time.time()) if data else time.time()
|
||||||
self.last_active_time = data.get("last_active_time", self.create_time) if data else self.create_time
|
self.last_active_time = data.get("last_active_time", self.create_time) if data else self.create_time
|
||||||
self.saved = False
|
self.saved = False
|
||||||
|
|
||||||
@@ -60,7 +60,7 @@ class ChatStream:
|
|||||||
|
|
||||||
def update_active_time(self):
|
def update_active_time(self):
|
||||||
"""更新最后活跃时间"""
|
"""更新最后活跃时间"""
|
||||||
self.last_active_time = int(time.time())
|
self.last_active_time = time.time()
|
||||||
self.saved = False
|
self.saved = False
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -168,7 +168,7 @@ class MessageProcessBase(Message):
|
|||||||
# 调用父类初始化
|
# 调用父类初始化
|
||||||
super().__init__(
|
super().__init__(
|
||||||
message_id=message_id,
|
message_id=message_id,
|
||||||
time=int(time.time()),
|
time=round(time.time(), 3), # 保留3位小数
|
||||||
chat_stream=chat_stream,
|
chat_stream=chat_stream,
|
||||||
user_info=bot_user_info,
|
user_info=bot_user_info,
|
||||||
message_segment=message_segment,
|
message_segment=message_segment,
|
||||||
|
|||||||
Reference in New Issue
Block a user