This commit is contained in:
tt-P607
2025-11-13 20:39:14 +08:00
4 changed files with 46 additions and 92 deletions

View File

@@ -417,7 +417,12 @@ class StatisticOutputTask(AsyncTask):
avg_key = f"AVG_TIME_COST_BY_{items.upper()}" avg_key = f"AVG_TIME_COST_BY_{items.upper()}"
std_key = f"STD_TIME_COST_BY_{items.upper()}" std_key = f"STD_TIME_COST_BY_{items.upper()}"
for item_name in period_stats[category_key]: # Ensure the stat dicts exist before trying to access them, making the process more robust.
period_stats.setdefault(time_cost_key, defaultdict(list))
period_stats.setdefault(avg_key, defaultdict(float))
period_stats.setdefault(std_key, defaultdict(float))
for item_name in period_stats.get(category_key, {}):
time_costs = period_stats[time_cost_key].get(item_name, []) time_costs = period_stats[time_cost_key].get(item_name, [])
if time_costs: if time_costs:
avg_time = sum(time_costs) / len(time_costs) avg_time = sum(time_costs) / len(time_costs)
@@ -614,37 +619,31 @@ class StatisticOutputTask(AsyncTask):
# 统计数据合并 # 统计数据合并
# 合并三类统计数据 # 合并三类统计数据
for period_key, _ in stat_start_timestamp: for period_key, _ in stat_start_timestamp:
stat[period_key].update(model_req_stat[period_key]) stat[period_key].update(model_req_stat.get(period_key, {}))
stat[period_key].update(online_time_stat[period_key]) stat[period_key].update(online_time_stat.get(period_key, {}))
stat[period_key].update(message_count_stat[period_key]) stat[period_key].update(message_count_stat.get(period_key, {}))
if last_all_time_stat: if last_all_time_stat:
# 若存在上次完整统计数据,则将其与当前统计数据合并 # 若存在上次完整统计数据,则将其与当前统计数据合并
for key, val in last_all_time_stat.items(): for key, val in last_all_time_stat.items():
# 确保当前统计数据中存在该key # If a key from old stats is not in the current period's stats, it means no new data was generated.
# In this case, we carry over the old data.
if key not in stat["all_time"]: if key not in stat["all_time"]:
stat["all_time"][key] = val
continue continue
# If the key exists in both, we merge.
if isinstance(val, dict): if isinstance(val, dict):
# 是字典类型,则进行合并 # It's a dictionary-like object (e.g., COST_BY_MODEL, TIME_COST_BY_TYPE)
current_dict = stat["all_time"][key]
for sub_key, sub_val in val.items(): for sub_key, sub_val in val.items():
# 普通的数值或字典合并 if sub_key in current_dict:
if sub_key in stat["all_time"][key]: # For lists (like TIME_COST), this extends. For numbers, this adds.
# 检查是否为嵌套的字典类型(如版本统计) current_dict[sub_key] += sub_val
if isinstance(sub_val, dict) and isinstance(stat["all_time"][key][sub_key], dict):
# 合并嵌套字典
for nested_key, nested_val in sub_val.items():
if nested_key in stat["all_time"][key][sub_key]:
stat["all_time"][key][sub_key][nested_key] += nested_val
else: else:
stat["all_time"][key][sub_key][nested_key] = nested_val current_dict[sub_key] = sub_val
else: else:
# 普通数值累加 # It's a simple value (e.g., TOTAL_COST)
stat["all_time"][key][sub_key] += sub_val
else:
stat["all_time"][key][sub_key] = sub_val
else:
# 直接合并
stat["all_time"][key] += val stat["all_time"][key] += val
# 更新上次完整统计数据的时间戳 # 更新上次完整统计数据的时间戳
@@ -686,10 +685,10 @@ class StatisticOutputTask(AsyncTask):
""" """
output = [ output = [
f"总在线时间: {_format_online_time(stats[ONLINE_TIME])}", f"总在线时间: {_format_online_time(stats.get(ONLINE_TIME, 0))}",
f"总消息数: {stats[TOTAL_MSG_CNT]}", f"总消息数: {stats.get(TOTAL_MSG_CNT, 0)}",
f"总请求数: {stats[TOTAL_REQ_CNT]}", f"总请求数: {stats.get(TOTAL_REQ_CNT, 0)}",
f"总花费: {stats[TOTAL_COST]:.4f}¥", f"总花费: {stats.get(TOTAL_COST, 0.0):.4f}¥",
"", "",
] ]
@@ -700,21 +699,21 @@ class StatisticOutputTask(AsyncTask):
""" """
格式化按模型分类的统计数据 格式化按模型分类的统计数据
""" """
if stats[TOTAL_REQ_CNT] <= 0: if stats.get(TOTAL_REQ_CNT, 0) <= 0:
return "" return ""
data_fmt = "{:<32} {:>10} {:>12} {:>12} {:>12} {:>9.4f}¥ {:>10} {:>10}" data_fmt = "{:<32} {:>10} {:>12} {:>12} {:>12} {:>9.4f}¥ {:>10} {:>10}"
output = [ output = [
" 模型名称 调用次数 输入Token 输出Token Token总量 累计花费 平均耗时(秒) 标准差(秒)", " 模型名称 调用次数 输入Token 输出Token Token总量 累计花费 平均耗时(秒) 标准差(秒)",
] ]
for model_name, count in sorted(stats[REQ_CNT_BY_MODEL].items()): for model_name, count in sorted(stats.get(REQ_CNT_BY_MODEL, {}).items()):
name = f"{model_name[:29]}..." if len(model_name) > 32 else model_name name = f"{model_name[:29]}..." if len(model_name) > 32 else model_name
in_tokens = stats[IN_TOK_BY_MODEL][model_name] in_tokens = stats.get(IN_TOK_BY_MODEL, {}).get(model_name, 0)
out_tokens = stats[OUT_TOK_BY_MODEL][model_name] out_tokens = stats.get(OUT_TOK_BY_MODEL, {}).get(model_name, 0)
tokens = stats[TOTAL_TOK_BY_MODEL][model_name] tokens = stats.get(TOTAL_TOK_BY_MODEL, {}).get(model_name, 0)
cost = stats[COST_BY_MODEL][model_name] cost = stats.get(COST_BY_MODEL, {}).get(model_name, 0.0)
avg_time_cost = stats[AVG_TIME_COST_BY_MODEL][model_name] avg_time_cost = stats.get(AVG_TIME_COST_BY_MODEL, {}).get(model_name, 0.0)
std_time_cost = stats[STD_TIME_COST_BY_MODEL][model_name] std_time_cost = stats.get(STD_TIME_COST_BY_MODEL, {}).get(model_name, 0.0)
output.append( output.append(
data_fmt.format(name, count, in_tokens, out_tokens, tokens, cost, avg_time_cost, std_time_cost) data_fmt.format(name, count, in_tokens, out_tokens, tokens, cost, avg_time_cost, std_time_cost)
) )
@@ -726,12 +725,12 @@ class StatisticOutputTask(AsyncTask):
""" """
格式化聊天统计数据 格式化聊天统计数据
""" """
if stats[TOTAL_MSG_CNT] <= 0: if stats.get(TOTAL_MSG_CNT, 0) <= 0:
return "" return ""
output = ["聊天消息统计:", " 联系人/群组名称 消息数量"] output = ["聊天消息统计:", " 联系人/群组名称 消息数量"]
output.extend( output.extend(
f"{self.name_mapping.get(chat_id, (chat_id, 0))[0][:32]:<32} {count:>10}" f"{self.name_mapping.get(chat_id, (chat_id, 0))[0][:32]:<32} {count:>10}"
for chat_id, count in sorted(stats[MSG_CNT_BY_CHAT].items()) for chat_id, count in sorted(stats.get(MSG_CNT_BY_CHAT, {}).items())
) )
output.append("") output.append("")
return "\n".join(output) return "\n".join(output)

View File

@@ -307,6 +307,7 @@ class AffinityInterestCalculator(BaseInterestCalculator):
self.post_reply_boost_max_count - self.post_reply_boost_remaining self.post_reply_boost_max_count - self.post_reply_boost_remaining
) )
post_reply_reduction = self.post_reply_threshold_reduction * decay_factor post_reply_reduction = self.post_reply_threshold_reduction * decay_factor
self.post_reply_boost_remaining -= 1
total_reduction += post_reply_reduction total_reduction += post_reply_reduction
logger.debug( logger.debug(
f"[阈值调整] 回复后降低: {post_reply_reduction:.3f} " f"[阈值调整] 回复后降低: {post_reply_reduction:.3f} "
@@ -319,17 +320,6 @@ class AffinityInterestCalculator(BaseInterestCalculator):
return adjusted_reply_threshold, adjusted_action_threshold return adjusted_reply_threshold, adjusted_action_threshold
def _apply_no_reply_boost(self, base_score: float) -> float:
"""【已弃用】应用连续不回复的概率提升
注意:此方法已被 _apply_no_reply_threshold_adjustment 替代
保留用于向后兼容
"""
if self.no_reply_count > 0 and self.no_reply_count < self.max_no_reply_count:
boost = self.no_reply_count * self.probability_boost_per_no_reply
return min(1.0, base_score + boost)
return base_score
def _extract_keywords_from_database(self, message: "DatabaseMessages") -> list[str]: def _extract_keywords_from_database(self, message: "DatabaseMessages") -> list[str]:
"""从数据库消息中提取关键词""" """从数据库消息中提取关键词"""
keywords = [] keywords = []
@@ -394,7 +384,7 @@ class AffinityInterestCalculator(BaseInterestCalculator):
def on_reply_sent(self): def on_reply_sent(self):
"""当机器人发送回复后调用,激活回复后阈值降低机制""" """当机器人发送回复后调用,激活回复后阈值降低机制"""
if self.enable_post_reply_boost: if self.enable_post_reply_boost and not self.post_reply_boost_remaining:
# 重置回复后降低计数器 # 重置回复后降低计数器
self.post_reply_boost_remaining = self.post_reply_boost_max_count self.post_reply_boost_remaining = self.post_reply_boost_max_count
logger.debug( logger.debug(

View File

@@ -107,15 +107,15 @@ class ChatterPlanGenerator:
""" """
try: try:
# 从组件注册表获取可用动作 # 从组件注册表获取可用动作
available_actions = component_registry.get_enabled_actions() available_actions = self.action_manager.get_using_actions()
# 根据聊天类型和模式筛选动作 # 根据聊天类型和模式筛选动作
filtered_actions = {} filtered_actions = {}
for action_name, action_info in available_actions.items(): for action_name, action_info in available_actions.items():
# 检查动作是否支持当前聊天类型 # 检查动作是否支持当前聊天类型
if chat_type in action_info.chat_types: if chat_type == action_info.chat_type_allow:
# 检查动作是否支持当前模式 # 检查动作是否支持当前模式
if mode in action_info.chat_modes: if mode == action_info.mode_enable:
filtered_actions[action_name] = action_info filtered_actions[action_name] = action_info
return filtered_actions return filtered_actions

View File

@@ -190,9 +190,13 @@ class ChatterActionPlanner:
# 4. 生成初始计划 # 4. 生成初始计划
initial_plan = await self.generator.generate(ChatMode.FOCUS) initial_plan = await self.generator.generate(ChatMode.FOCUS)
# 5. 确保Plan中包含所有当前可用的动作 # 5. 过滤回复动作(如果未达到回复阈值)
initial_plan.available_actions = self.action_manager.get_using_actions() if reply_not_available:
initial_plan.available_actions = {
action_name: action_info
for action_name, action_info in initial_plan.available_actions.items()
if action_name not in ["reply", "respond"]
}
# 6. 筛选 Plan # 6. 筛选 Plan
available_actions = list(initial_plan.available_actions.keys()) available_actions = list(initial_plan.available_actions.keys())
plan_filter = ChatterPlanFilter(self.chat_id, available_actions) plan_filter = ChatterPlanFilter(self.chat_id, available_actions)
@@ -527,45 +531,6 @@ class ChatterActionPlanner:
except Exception as e: except Exception as e:
logger.warning(f"同步chat_mode到ChatStream失败: {e}") logger.warning(f"同步chat_mode到ChatStream失败: {e}")
async def _flush_cached_messages_to_unread(self, context: "StreamContext | None") -> list:
"""在planner开始时将缓存消息刷新到未读消息列表
此方法在动作修改器执行后、生成初始计划前调用,确保计划阶段能看到所有积累的消息。
Args:
context: 流上下文
Returns:
list: 刷新的消息列表
"""
if not context:
return []
try:
from src.chat.message_manager.message_manager import message_manager
stream_id = context.stream_id
if message_manager.is_running and message_manager.has_cached_messages(stream_id):
# 获取缓存消息
cached_messages = message_manager.flush_cached_messages(stream_id)
if cached_messages:
# 直接添加到上下文的未读消息列表
for message in cached_messages:
context.unread_messages.append(message)
logger.info(f"Planner开始前刷新缓存消息到未读列表: stream={stream_id}, 数量={len(cached_messages)}")
return cached_messages
return []
except ImportError:
logger.debug("MessageManager不可用跳过缓存刷新")
return []
except Exception as e:
logger.warning(f"Planner刷新缓存消息失败: error={e}")
return []
def _update_stats_from_execution_result(self, execution_result: dict[str, Any]): def _update_stats_from_execution_result(self, execution_result: dict[str, Any]):
"""根据执行结果更新规划器统计""" """根据执行结果更新规划器统计"""
if not execution_result: if not execution_result: