import time import sys import os from typing import Dict, List # Add project root to Python path from src.common.database.database_model import Expression, ChatStreams project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, project_root) def get_chat_name(chat_id: str) -> str: """Get chat name from chat_id by querying ChatStreams table directly""" try: # 直接从数据库查询ChatStreams表 chat_stream = ChatStreams.get_or_none(ChatStreams.stream_id == chat_id) if chat_stream is None: return f"未知聊天 ({chat_id})" # 如果有群组信息,显示群组名称 if chat_stream.group_name: return f"{chat_stream.group_name} ({chat_id})" # 如果是私聊,显示用户昵称 elif chat_stream.user_nickname: return f"{chat_stream.user_nickname}的私聊 ({chat_id})" else: return f"未知聊天 ({chat_id})" except Exception: return f"查询失败 ({chat_id})" def calculate_time_distribution(expressions) -> Dict[str, int]: """Calculate distribution of last active time in days""" now = time.time() distribution = { '0-1天': 0, '1-3天': 0, '3-7天': 0, '7-14天': 0, '14-30天': 0, '30-60天': 0, '60-90天': 0, '90+天': 0 } for expr in expressions: diff_days = (now - expr.last_active_time) / (24*3600) if diff_days < 1: distribution['0-1天'] += 1 elif diff_days < 3: distribution['1-3天'] += 1 elif diff_days < 7: distribution['3-7天'] += 1 elif diff_days < 14: distribution['7-14天'] += 1 elif diff_days < 30: distribution['14-30天'] += 1 elif diff_days < 60: distribution['30-60天'] += 1 elif diff_days < 90: distribution['60-90天'] += 1 else: distribution['90+天'] += 1 return distribution def calculate_count_distribution(expressions) -> Dict[str, int]: """Calculate distribution of count values""" distribution = { '0-1': 0, '1-2': 0, '2-3': 0, '3-4': 0, '4-5': 0, '5-10': 0, '10+': 0 } for expr in expressions: cnt = expr.count if cnt < 1: distribution['0-1'] += 1 elif cnt < 2: distribution['1-2'] += 1 elif cnt < 3: distribution['2-3'] += 1 elif cnt < 4: distribution['3-4'] += 1 elif cnt < 5: distribution['4-5'] += 1 elif cnt < 10: distribution['5-10'] += 1 else: distribution['10+'] += 1 return distribution def get_top_expressions_by_chat(chat_id: str, top_n: int = 5) -> List[Expression]: """Get top N most used expressions for a specific chat_id""" return (Expression.select() .where(Expression.chat_id == chat_id) .order_by(Expression.count.desc()) .limit(top_n)) def show_overall_statistics(expressions, total: int) -> None: """Show overall statistics""" time_dist = calculate_time_distribution(expressions) count_dist = calculate_count_distribution(expressions) print("\n=== 总体统计 ===") print(f"总表达式数量: {total}") print("\n上次激活时间分布:") for period, count in time_dist.items(): print(f"{period}: {count} ({count/total*100:.2f}%)") print("\ncount分布:") for range_, count in count_dist.items(): print(f"{range_}: {count} ({count/total*100:.2f}%)") def show_chat_statistics(chat_id: str, chat_name: str) -> None: """Show statistics for a specific chat""" chat_exprs = list(Expression.select().where(Expression.chat_id == chat_id)) chat_total = len(chat_exprs) print(f"\n=== {chat_name} ===") print(f"表达式数量: {chat_total}") if chat_total == 0: print("该聊天没有表达式数据") return # Time distribution for this chat time_dist = calculate_time_distribution(chat_exprs) print("\n上次激活时间分布:") for period, count in time_dist.items(): if count > 0: print(f"{period}: {count} ({count/chat_total*100:.2f}%)") # Count distribution for this chat count_dist = calculate_count_distribution(chat_exprs) print("\ncount分布:") for range_, count in count_dist.items(): if count > 0: print(f"{range_}: {count} ({count/chat_total*100:.2f}%)") # Top expressions print("\nTop 10使用最多的表达式:") top_exprs = get_top_expressions_by_chat(chat_id, 10) for i, expr in enumerate(top_exprs, 1): print(f"{i}. [{expr.type}] Count: {expr.count}") print(f" Situation: {expr.situation}") print(f" Style: {expr.style}") print() def interactive_menu() -> None: """Interactive menu for expression statistics""" # Get all expressions expressions = list(Expression.select()) if not expressions: print("数据库中没有找到表达式") return total = len(expressions) # Get unique chat_ids and their names chat_ids = list(set(expr.chat_id for expr in expressions)) chat_info = [(chat_id, get_chat_name(chat_id)) for chat_id in chat_ids] chat_info.sort(key=lambda x: x[1]) # Sort by chat name while True: print("\n" + "="*50) print("表达式统计分析") print("="*50) print("0. 显示总体统计") for i, (chat_id, chat_name) in enumerate(chat_info, 1): chat_count = sum(1 for expr in expressions if expr.chat_id == chat_id) print(f"{i}. {chat_name} ({chat_count}个表达式)") print("q. 退出") choice = input("\n请选择要查看的统计 (输入序号): ").strip() if choice.lower() == 'q': print("再见!") break try: choice_num = int(choice) if choice_num == 0: show_overall_statistics(expressions, total) elif 1 <= choice_num <= len(chat_info): chat_id, chat_name = chat_info[choice_num - 1] show_chat_statistics(chat_id, chat_name) else: print("无效的选择,请重新输入") except ValueError: print("请输入有效的数字") input("\n按回车键继续...") if __name__ == "__main__": interactive_menu()