from web.database import query_db, execute_db from flask import current_app, url_for import os class StatsService: @staticmethod def resolve_avatar_url(steam_id, avatar_url): """ Resolves avatar URL with priority: 1. Local File (web/static/avatars/{steam_id}.jpg/png) - User override 2. DB Value (avatar_url) """ try: # Check local file first (User Request: "directly associate if exists") base = os.path.join(current_app.root_path, 'static', 'avatars') for ext in ('.jpg', '.png', '.jpeg'): fname = f"{steam_id}{ext}" fpath = os.path.join(base, fname) if os.path.exists(fpath): return url_for('static', filename=f'avatars/{fname}') # Fallback to DB value if valid if avatar_url and str(avatar_url).strip(): return avatar_url return None except Exception: return avatar_url @staticmethod def get_team_stats_summary(): """ Calculates aggregate statistics for matches where at least 2 roster members played together. Returns: { 'map_stats': [{'map_name', 'count', 'wins', 'win_rate'}], 'elo_stats': [{'range', 'count', 'wins', 'win_rate'}], 'duration_stats': [{'range', 'count', 'wins', 'win_rate'}], 'round_stats': [{'type', 'count', 'wins', 'win_rate'}] } """ # 1. Get Active Roster from web.services.web_service import WebService import json lineups = WebService.get_lineups() active_roster_ids = [] if lineups: try: raw_ids = json.loads(lineups[0]['player_ids_json']) active_roster_ids = [str(uid) for uid in raw_ids] except: pass if not active_roster_ids: return {} # 2. Find matches with >= 2 roster members # We need match_id, map_name, scores, winner_team, duration, avg_elo # And we need to determine if "Our Team" won. placeholders = ','.join('?' for _ in active_roster_ids) # Step A: Get Candidate Match IDs (matches with >= 2 roster players) # Also get the team_id of our players in that match to determine win candidate_sql = f""" SELECT mp.match_id, MAX(mp.team_id) as our_team_id FROM fact_match_players mp WHERE CAST(mp.steam_id_64 AS TEXT) IN ({placeholders}) GROUP BY mp.match_id HAVING COUNT(DISTINCT mp.steam_id_64) >= 2 """ candidate_rows = query_db('l2', candidate_sql, active_roster_ids) if not candidate_rows: return {} candidate_map = {row['match_id']: row['our_team_id'] for row in candidate_rows} match_ids = list(candidate_map.keys()) match_placeholders = ','.join('?' for _ in match_ids) # Step B: Get Match Details match_sql = f""" SELECT m.match_id, m.map_name, m.score_team1, m.score_team2, m.winner_team, m.duration, AVG(fmt.group_origin_elo) as avg_elo FROM fact_matches m LEFT JOIN fact_match_teams fmt ON m.match_id = fmt.match_id AND fmt.group_origin_elo > 0 WHERE m.match_id IN ({match_placeholders}) GROUP BY m.match_id """ match_rows = query_db('l2', match_sql, match_ids) # 3. Process Data # Buckets initialization map_stats = {} elo_ranges = ['<1000', '1000-1200', '1200-1400', '1400-1600', '1600-1800', '1800-2000', '2000+'] elo_stats = {r: {'wins': 0, 'total': 0} for r in elo_ranges} dur_ranges = ['<30m', '30-45m', '45m+'] dur_stats = {r: {'wins': 0, 'total': 0} for r in dur_ranges} round_types = ['Stomp (<15)', 'Normal', 'Close (>23)', 'Choke (24)'] round_stats = {r: {'wins': 0, 'total': 0} for r in round_types} for m in match_rows: mid = m['match_id'] # Determine Win # Use candidate_map to get our_team_id. # Note: winner_team is usually int (1 or 2) or string. # our_team_id from fact_match_players is usually int (1 or 2). # This logic assumes simple team ID matching. # If sophisticated "UID in Winning Group" logic is needed, we'd need more queries. # For aggregate stats, let's assume team_id matching is sufficient for 99% cases or fallback to simple check. # Actually, let's try to be consistent with get_matches logic if possible, # but getting group_uids for ALL matches is heavy. # Let's trust team_id for this summary. our_tid = candidate_map[mid] winner_tid = m['winner_team'] # Type normalization try: is_win = (int(our_tid) == int(winner_tid)) if (our_tid and winner_tid) else False except: is_win = (str(our_tid) == str(winner_tid)) if (our_tid and winner_tid) else False # 1. Map Stats map_name = m['map_name'] or 'Unknown' if map_name not in map_stats: map_stats[map_name] = {'wins': 0, 'total': 0} map_stats[map_name]['total'] += 1 if is_win: map_stats[map_name]['wins'] += 1 # 2. ELO Stats elo = m['avg_elo'] if elo: if elo < 1000: e_key = '<1000' elif elo < 1200: e_key = '1000-1200' elif elo < 1400: e_key = '1200-1400' elif elo < 1600: e_key = '1400-1600' elif elo < 1800: e_key = '1600-1800' elif elo < 2000: e_key = '1800-2000' else: e_key = '2000+' elo_stats[e_key]['total'] += 1 if is_win: elo_stats[e_key]['wins'] += 1 # 3. Duration Stats dur = m['duration'] # seconds if dur: dur_min = dur / 60 if dur_min < 30: d_key = '<30m' elif dur_min < 45: d_key = '30-45m' else: d_key = '45m+' dur_stats[d_key]['total'] += 1 if is_win: dur_stats[d_key]['wins'] += 1 # 4. Round Stats s1 = m['score_team1'] or 0 s2 = m['score_team2'] or 0 total_rounds = s1 + s2 if total_rounds == 24: r_key = 'Choke (24)' round_stats[r_key]['total'] += 1 if is_win: round_stats[r_key]['wins'] += 1 # Note: Close (>23) overlaps with Choke (24). # User requirement: Close > 23 counts ALL matches > 23, regardless of other categories. if total_rounds > 23: r_key = 'Close (>23)' round_stats[r_key]['total'] += 1 if is_win: round_stats[r_key]['wins'] += 1 if total_rounds < 15: r_key = 'Stomp (<15)' round_stats[r_key]['total'] += 1 if is_win: round_stats[r_key]['wins'] += 1 elif total_rounds <= 23: # Only Normal if NOT Stomp and NOT Close (<= 23 and >= 15) r_key = 'Normal' round_stats[r_key]['total'] += 1 if is_win: round_stats[r_key]['wins'] += 1 # 4. Format Results def fmt(stats_dict): res = [] for k, v in stats_dict.items(): rate = (v['wins'] / v['total'] * 100) if v['total'] > 0 else 0 res.append({'label': k, 'count': v['total'], 'wins': v['wins'], 'win_rate': rate}) return res # For maps, sort by count map_res = fmt(map_stats) map_res.sort(key=lambda x: x['count'], reverse=True) return { 'map_stats': map_res, 'elo_stats': fmt(elo_stats), # Keep order 'duration_stats': fmt(dur_stats), # Keep order 'round_stats': fmt(round_stats) # Keep order } @staticmethod def get_recent_matches(limit=5): sql = """ SELECT m.match_id, m.start_time, m.map_name, m.score_team1, m.score_team2, m.winner_team, p.username as mvp_name FROM fact_matches m LEFT JOIN dim_players p ON m.mvp_uid = p.uid ORDER BY m.start_time DESC LIMIT ? """ return query_db('l2', sql, [limit]) @staticmethod def get_matches(page=1, per_page=20, map_name=None, date_from=None, date_to=None): offset = (page - 1) * per_page args = [] where_clauses = ["1=1"] if map_name: where_clauses.append("map_name = ?") args.append(map_name) if date_from: where_clauses.append("start_time >= ?") args.append(date_from) if date_to: where_clauses.append("start_time <= ?") args.append(date_to) where_str = " AND ".join(where_clauses) sql = f""" SELECT m.match_id, m.start_time, m.map_name, m.score_team1, m.score_team2, m.winner_team, m.duration FROM fact_matches m WHERE {where_str} ORDER BY m.start_time DESC LIMIT ? OFFSET ? """ args.extend([per_page, offset]) matches = query_db('l2', sql, args) # Enrich matches with Avg ELO, Party info, and Our Team Result if matches: match_ids = [m['match_id'] for m in matches] placeholders = ','.join('?' for _ in match_ids) # Fetch ELO elo_sql = f""" SELECT match_id, AVG(group_origin_elo) as avg_elo FROM fact_match_teams WHERE match_id IN ({placeholders}) AND group_origin_elo > 0 GROUP BY match_id """ elo_rows = query_db('l2', elo_sql, match_ids) elo_map = {row['match_id']: row['avg_elo'] for row in elo_rows} # Fetch Max Party Size party_sql = f""" SELECT match_id, MAX(cnt) as max_party FROM ( SELECT match_id, match_team_id, COUNT(*) as cnt FROM fact_match_players WHERE match_id IN ({placeholders}) AND match_team_id > 0 GROUP BY match_id, match_team_id ) GROUP BY match_id """ party_rows = query_db('l2', party_sql, match_ids) party_map = {row['match_id']: row['max_party'] for row in party_rows} # --- New: Determine "Our Team" Result --- # Logic: Check if any player from `active_roster` played in these matches. # Use WebService to get the active roster from web.services.web_service import WebService import json lineups = WebService.get_lineups() active_roster_ids = [] if lineups: try: # Load IDs and ensure they are all strings for DB comparison consistency raw_ids = json.loads(lineups[0]['player_ids_json']) active_roster_ids = [str(uid) for uid in raw_ids] except: pass # If no roster, we can't determine "Our Result" if not active_roster_ids: result_map = {} else: # 1. Get UIDs for Roster Members involved in these matches # We query fact_match_players to ensure we get the UIDs actually used in these matches roster_placeholders = ','.join('?' for _ in active_roster_ids) uid_sql = f""" SELECT DISTINCT steam_id_64, uid FROM fact_match_players WHERE match_id IN ({placeholders}) AND CAST(steam_id_64 AS TEXT) IN ({roster_placeholders}) """ combined_args_uid = match_ids + active_roster_ids uid_rows = query_db('l2', uid_sql, combined_args_uid) # Set of "Our UIDs" (as strings) our_uids = set() for r in uid_rows: if r['uid']: our_uids.add(str(r['uid'])) # 2. Get Group UIDs and Winner info from fact_match_teams # We need to know which group contains our UIDs teams_sql = f""" SELECT fmt.match_id, fmt.group_id, fmt.group_uids, m.winner_team FROM fact_match_teams fmt JOIN fact_matches m ON fmt.match_id = m.match_id WHERE fmt.match_id IN ({placeholders}) """ teams_rows = query_db('l2', teams_sql, match_ids) # 3. Determine Result per Match result_map = {} # Group data by match match_groups = {} # match_id -> {group_id: [uids...], winner: int} for r in teams_rows: mid = r['match_id'] gid = r['group_id'] uids_str = r['group_uids'] or "" # Split and clean UIDs uids = set(str(u).strip() for u in uids_str.split(',') if u.strip()) if mid not in match_groups: match_groups[mid] = {'groups': {}, 'winner': r['winner_team']} match_groups[mid]['groups'][gid] = uids # Analyze for mid, data in match_groups.items(): winner_gid = data['winner'] groups = data['groups'] our_in_winner = False our_in_loser = False # Check each group for gid, uids in groups.items(): # Intersection of Our UIDs and Group UIDs common = our_uids.intersection(uids) if common: if gid == winner_gid: our_in_winner = True else: our_in_loser = True if our_in_winner and not our_in_loser: result_map[mid] = 'win' elif our_in_loser and not our_in_winner: result_map[mid] = 'loss' elif our_in_winner and our_in_loser: result_map[mid] = 'mixed' else: # Fallback: If UID matching failed (maybe missing UIDs), try old team_id method? # Or just leave it as None (safe) pass # Convert to dict to modify matches = [dict(m) for m in matches] for m in matches: m['avg_elo'] = elo_map.get(m['match_id'], 0) m['max_party'] = party_map.get(m['match_id'], 1) m['our_result'] = result_map.get(m['match_id']) # Convert to dict to modify matches = [dict(m) for m in matches] for m in matches: m['avg_elo'] = elo_map.get(m['match_id'], 0) m['max_party'] = party_map.get(m['match_id'], 1) m['our_result'] = result_map.get(m['match_id']) # Count total for pagination count_sql = f"SELECT COUNT(*) as cnt FROM fact_matches WHERE {where_str}" total = query_db('l2', count_sql, args[:-2], one=True)['cnt'] return matches, total @staticmethod def get_match_detail(match_id): sql = "SELECT * FROM fact_matches WHERE match_id = ?" return query_db('l2', sql, [match_id], one=True) @staticmethod def get_match_players(match_id): sql = """ SELECT mp.*, p.username, p.avatar_url FROM fact_match_players mp LEFT JOIN dim_players p ON mp.steam_id_64 = p.steam_id_64 WHERE mp.match_id = ? ORDER BY mp.team_id, mp.rating DESC """ rows = query_db('l2', sql, [match_id]) result = [] for r in rows or []: d = dict(r) d['avatar_url'] = StatsService.resolve_avatar_url(d.get('steam_id_64'), d.get('avatar_url')) result.append(d) return result @staticmethod def get_match_rounds(match_id): sql = "SELECT * FROM fact_rounds WHERE match_id = ? ORDER BY round_num" return query_db('l2', sql, [match_id]) @staticmethod def get_players(page=1, per_page=20, search=None, sort_by='rating_desc'): offset = (page - 1) * per_page args = [] where_clauses = ["1=1"] if search: # Force case-insensitive search where_clauses.append("(LOWER(username) LIKE LOWER(?) OR steam_id_64 LIKE ?)") args.append(f"%{search}%") args.append(f"%{search}%") where_str = " AND ".join(where_clauses) # Sort mapping order_clause = "rating DESC" # Default logic (this query needs refinement as L2 dim_players doesn't store avg rating) # Wait, dim_players only has static info. We need aggregated stats. # Ideally, we should fetch from L3 for player list stats. # But StatsService is for L2. # For the Player List, we usually want L3 data (Career stats). # I will leave the detailed stats logic for FeatureService or do a join here if necessary. # For now, just listing players from dim_players. sql = f""" SELECT * FROM dim_players WHERE {where_str} LIMIT ? OFFSET ? """ args.extend([per_page, offset]) rows = query_db('l2', sql, args) players = [] for r in rows or []: d = dict(r) d['avatar_url'] = StatsService.resolve_avatar_url(d.get('steam_id_64'), d.get('avatar_url')) players.append(d) total = query_db('l2', f"SELECT COUNT(*) as cnt FROM dim_players WHERE {where_str}", args[:-2], one=True)['cnt'] return players, total @staticmethod def get_player_info(steam_id): sql = "SELECT * FROM dim_players WHERE steam_id_64 = ?" r = query_db('l2', sql, [steam_id], one=True) if not r: return None d = dict(r) d['avatar_url'] = StatsService.resolve_avatar_url(steam_id, d.get('avatar_url')) return d @staticmethod def get_daily_match_counts(days=365): # Return list of {date: 'YYYY-MM-DD', count: N} sql = """ SELECT date(start_time, 'unixepoch') as day, COUNT(*) as count FROM fact_matches WHERE start_time > strftime('%s', 'now', ?) GROUP BY day ORDER BY day """ # sqlite modifier for 'now' needs format like '-365 days' modifier = f'-{days} days' rows = query_db('l2', sql, [modifier]) return rows @staticmethod def get_players_by_ids(steam_ids): if not steam_ids: return [] placeholders = ','.join('?' for _ in steam_ids) sql = f"SELECT * FROM dim_players WHERE steam_id_64 IN ({placeholders})" rows = query_db('l2', sql, steam_ids) result = [] for r in rows or []: d = dict(r) d['avatar_url'] = StatsService.resolve_avatar_url(d.get('steam_id_64'), d.get('avatar_url')) result.append(d) return result @staticmethod def get_player_basic_stats(steam_id): # Calculate stats from fact_match_players # Prefer calculating from sums (kills/deaths) for K/D accuracy # AVG(adr) is used as damage_total might be missing in some sources sql = """ SELECT AVG(rating) as rating, SUM(kills) as total_kills, SUM(deaths) as total_deaths, AVG(kd_ratio) as avg_kd, AVG(kast) as kast, AVG(adr) as adr, COUNT(*) as matches_played FROM fact_match_players WHERE steam_id_64 = ? """ row = query_db('l2', sql, [steam_id], one=True) if row and row['matches_played'] > 0: res = dict(row) # Calculate K/D: Sum Kills / Sum Deaths kills = res.get('total_kills') or 0 deaths = res.get('total_deaths') or 0 if deaths > 0: res['kd'] = kills / deaths else: res['kd'] = kills # If 0 deaths, K/D is kills (or infinity, but kills is safer for display) # Fallback to avg_kd if calculation failed (e.g. both 0) but avg_kd exists if res['kd'] == 0 and res['avg_kd'] and res['avg_kd'] > 0: res['kd'] = res['avg_kd'] # ADR validation if res['adr'] is None: res['adr'] = 0.0 return res return None @staticmethod def get_shared_matches(steam_ids): # Find matches where ALL steam_ids were present if not steam_ids or len(steam_ids) < 1: return [] placeholders = ','.join('?' for _ in steam_ids) count = len(steam_ids) # We need to know which team the players were on to determine win/loss # Assuming they were on the SAME team for "shared experience" # If count=1, it's just match history # Query: Get matches where all steam_ids are present # Also join to get team_id to check if they were on the same team (optional but better) # For simplicity in v1: Just check presence in the match. # AND check if the player won. # We need to return: match_id, map_name, score, result (Win/Loss) # "Result" is relative to the lineup. # If they were on the winning team, it's a Win. sql = f""" SELECT m.match_id, m.start_time, m.map_name, m.score_team1, m.score_team2, m.winner_team, MAX(mp.team_id) as player_team_id -- Just take one team_id (assuming same) FROM fact_matches m JOIN fact_match_players mp ON m.match_id = mp.match_id WHERE mp.steam_id_64 IN ({placeholders}) GROUP BY m.match_id HAVING COUNT(DISTINCT mp.steam_id_64) = ? ORDER BY m.start_time DESC """ args = list(steam_ids) args.append(count) rows = query_db('l2', sql, args) results = [] for r in rows: # Determine if Win # winner_team in DB is 'Team 1' or 'Team 2' usually, or the team name. # fact_matches.winner_team stores the NAME of the winner? Or 'team1'/'team2'? # Let's check how L2_Builder stores it. Usually it stores the name. # But fact_match_players.team_id stores the name too. # Logic: If m.winner_team == mp.team_id, then Win. is_win = (r['winner_team'] == r['player_team_id']) # If winner_team is NULL or empty, it's a draw? if not r['winner_team']: result_str = 'Draw' elif is_win: result_str = 'Win' else: result_str = 'Loss' res = dict(r) res['is_win'] = is_win # Boolean for styling res['result_str'] = result_str # Text for display results.append(res) return results @staticmethod def get_player_trend(steam_id, limit=20): # We need party_size: count of players with same match_team_id in the same match # Using a correlated subquery for party_size sql = """ SELECT * FROM ( SELECT m.start_time, mp.rating, mp.kd_ratio, mp.adr, m.match_id, m.map_name, mp.is_win, mp.match_team_id, (SELECT COUNT(*) FROM fact_match_players p2 WHERE p2.match_id = mp.match_id AND p2.match_team_id = mp.match_team_id AND p2.match_team_id > 0 -- Ensure we don't count 0 (solo default) as a massive party ) as party_size, ( SELECT COUNT(*) FROM fact_matches m2 WHERE m2.start_time <= m.start_time ) as match_index FROM fact_match_players mp JOIN fact_matches m ON mp.match_id = m.match_id WHERE mp.steam_id_64 = ? ORDER BY m.start_time DESC LIMIT ? ) ORDER BY start_time ASC """ return query_db('l2', sql, [steam_id, limit]) @staticmethod def get_roster_stats_distribution(target_steam_id): """ Calculates rank and distribution of the target player within the active roster. Now covers all L3 Basic Features for Detailed Panel. """ from web.services.web_service import WebService from web.services.feature_service import FeatureService import json import numpy as np # 1. Get Active Roster IDs lineups = WebService.get_lineups() active_roster_ids = [] if lineups: try: raw_ids = json.loads(lineups[0]['player_ids_json']) active_roster_ids = [str(uid) for uid in raw_ids] except: pass if not active_roster_ids: return None # 2. Fetch L3 features for all roster members # We need to use FeatureService to get the full L3 set (including detailed stats) # Assuming L3 data is up to date. placeholders = ','.join('?' for _ in active_roster_ids) sql = f"SELECT * FROM dm_player_features WHERE steam_id_64 IN ({placeholders})" rows = query_db('l3', sql, active_roster_ids) if not rows: return None stats_map = {row['steam_id_64']: dict(row) for row in rows} target_steam_id = str(target_steam_id) # If target not in map (e.g. no L3 data), try to add empty default if target_steam_id not in stats_map: stats_map[target_steam_id] = {} # --- New: Enrich with L2 Clutch/Multi Stats for Distribution --- l2_placeholders = ','.join('?' for _ in active_roster_ids) sql_l2 = f""" SELECT p.steam_id_64, SUM(p.clutch_1v1) as c1, SUM(p.clutch_1v2) as c2, SUM(p.clutch_1v3) as c3, SUM(p.clutch_1v4) as c4, SUM(p.clutch_1v5) as c5, SUM(a.attempt_1v1) as att1, SUM(a.attempt_1v2) as att2, SUM(a.attempt_1v3) as att3, SUM(a.attempt_1v4) as att4, SUM(a.attempt_1v5) as att5, SUM(p.kill_2) as k2, SUM(p.kill_3) as k3, SUM(p.kill_4) as k4, SUM(p.kill_5) as k5, SUM(p.many_assists_cnt2) as a2, SUM(p.many_assists_cnt3) as a3, SUM(p.many_assists_cnt4) as a4, SUM(p.many_assists_cnt5) as a5, SUM(p.round_total) as total_rounds FROM fact_match_players p LEFT JOIN fact_match_clutch_attempts a ON p.match_id = a.match_id AND p.steam_id_64 = a.steam_id_64 WHERE CAST(p.steam_id_64 AS TEXT) IN ({l2_placeholders}) GROUP BY p.steam_id_64 """ l2_rows = query_db('l2', sql_l2, active_roster_ids) for r in l2_rows: sid = str(r['steam_id_64']) if sid not in stats_map: stats_map[sid] = {} # Clutch Rates for i in range(1, 6): c = r[f'c{i}'] or 0 att = r[f'att{i}'] or 0 rate = (c / att) if att > 0 else 0 stats_map[sid][f'clutch_rate_1v{i}'] = rate # Multi-Kill Rates rounds = r['total_rounds'] or 1 # Avoid div by 0 total_mk = 0 for i in range(2, 6): k = r[f'k{i}'] or 0 total_mk += k stats_map[sid][f'multikill_rate_{i}k'] = k / rounds stats_map[sid]['total_multikill_rate'] = total_mk / rounds # Multi-Assist Rates total_ma = 0 for i in range(2, 6): a = r[f'a{i}'] or 0 total_ma += a stats_map[sid][f'multiassist_rate_{i}a'] = a / rounds stats_map[sid]['total_multiassist_rate'] = total_ma / rounds # 3. Calculate Distribution for ALL metrics # Define metrics list (must match Detailed Panel keys) metrics = [ 'basic_avg_rating', 'basic_avg_kd', 'basic_avg_kast', 'basic_avg_rws', 'basic_avg_adr', 'basic_avg_headshot_kills', 'basic_headshot_rate', 'basic_avg_assisted_kill', 'basic_avg_awp_kill', 'basic_avg_jump_count', 'basic_avg_knife_kill', 'basic_avg_zeus_kill', 'basic_zeus_pick_rate', 'basic_avg_mvps', 'basic_avg_plants', 'basic_avg_defuses', 'basic_avg_flash_assists', 'basic_avg_first_kill', 'basic_avg_first_death', 'basic_first_kill_rate', 'basic_first_death_rate', 'basic_avg_kill_2', 'basic_avg_kill_3', 'basic_avg_kill_4', 'basic_avg_kill_5', 'basic_avg_perfect_kill', 'basic_avg_revenge_kill', # L3 Advanced Dimensions 'sta_last_30_rating', 'sta_win_rating', 'sta_loss_rating', 'sta_rating_volatility', 'sta_time_rating_corr', 'bat_kd_diff_high_elo', 'bat_avg_duel_win_rate', 'bat_win_rate_vs_all', 'hps_clutch_win_rate_1v1', 'hps_clutch_win_rate_1v3_plus', 'hps_match_point_win_rate', 'hps_pressure_entry_rate', 'hps_comeback_kd_diff', 'hps_losing_streak_kd_diff', 'ptl_pistol_kills', 'ptl_pistol_win_rate', 'ptl_pistol_kd', 'ptl_pistol_util_efficiency', 'side_rating_ct', 'side_rating_t', 'side_first_kill_rate_ct', 'side_first_kill_rate_t', 'side_kd_diff_ct_t', 'side_hold_success_rate_ct', 'side_entry_success_rate_t', 'side_win_rate_ct', 'side_win_rate_t', 'side_kd_ct', 'side_kd_t', 'side_kast_ct', 'side_kast_t', 'side_rws_ct', 'side_rws_t', 'side_first_death_rate_ct', 'side_first_death_rate_t', 'side_multikill_rate_ct', 'side_multikill_rate_t', 'side_headshot_rate_ct', 'side_headshot_rate_t', 'side_defuses_ct', 'side_plants_t', 'util_avg_nade_dmg', 'util_avg_flash_time', 'util_avg_flash_enemy', 'util_usage_rate', # New: ECO & PACE 'eco_avg_damage_per_1k', 'eco_rating_eco_rounds', 'eco_kd_ratio', 'eco_avg_rounds', 'pace_avg_time_to_first_contact', 'pace_trade_kill_rate', 'pace_opening_kill_time', 'pace_avg_life_time', # New: ROUND (Round Dynamics) 'rd_phase_kill_early_share', 'rd_phase_kill_mid_share', 'rd_phase_kill_late_share', 'rd_phase_death_early_share', 'rd_phase_death_mid_share', 'rd_phase_death_late_share', 'rd_firstdeath_team_first_death_win_rate', 'rd_invalid_death_rate', 'rd_pressure_kpr_ratio', 'rd_matchpoint_kpr_ratio', 'rd_trade_response_10s_rate', 'rd_pressure_perf_ratio', 'rd_matchpoint_perf_ratio', 'rd_comeback_kill_share', 'map_stability_coef', # New: Party Size Stats 'party_1_win_rate', 'party_1_rating', 'party_1_adr', 'party_2_win_rate', 'party_2_rating', 'party_2_adr', 'party_3_win_rate', 'party_3_rating', 'party_3_adr', 'party_4_win_rate', 'party_4_rating', 'party_4_adr', 'party_5_win_rate', 'party_5_rating', 'party_5_adr', # New: Rating Distribution 'rating_dist_carry_rate', 'rating_dist_normal_rate', 'rating_dist_sacrifice_rate', 'rating_dist_sleeping_rate', # New: ELO Stratification 'elo_lt1200_rating', 'elo_1200_1400_rating', 'elo_1400_1600_rating', 'elo_1600_1800_rating', 'elo_1800_2000_rating', 'elo_gt2000_rating', # New: Clutch & Multi (Real Calculation) 'clutch_rate_1v1', 'clutch_rate_1v2', 'clutch_rate_1v3', 'clutch_rate_1v4', 'clutch_rate_1v5', 'multikill_rate_2k', 'multikill_rate_3k', 'multikill_rate_4k', 'multikill_rate_5k', 'multiassist_rate_2a', 'multiassist_rate_3a', 'multiassist_rate_4a', 'multiassist_rate_5a', 'total_multikill_rate', 'total_multiassist_rate' ] # Mapping for L2 legacy calls (if any) - mainly map 'rating' to 'basic_avg_rating' etc if needed # But here we just use L3 columns directly. # Define metrics where LOWER is BETTER lower_is_better = ['pace_avg_time_to_first_contact', 'pace_opening_kill_time', 'rd_invalid_death_rate', 'map_stability_coef'] result = {} for m in metrics: values = [p.get(m, 0) or 0 for p in stats_map.values()] target_val = stats_map[target_steam_id].get(m, 0) or 0 if not values: result[m] = None continue # Sort: Reverse (High to Low) by default, unless in lower_is_better is_reverse = m not in lower_is_better values.sort(reverse=is_reverse) # Rank try: rank = values.index(target_val) + 1 except ValueError: rank = len(values) result[m] = { 'val': target_val, 'rank': rank, 'total': len(values), 'min': min(values), 'max': max(values), 'avg': sum(values) / len(values), 'inverted': not is_reverse # Flag for frontend to invert bar } # Legacy mapping for top cards (rating, kd, adr, kast) legacy_map = { 'basic_avg_rating': 'rating', 'basic_avg_kd': 'kd', 'basic_avg_adr': 'adr', 'basic_avg_kast': 'kast' } if m in legacy_map: result[legacy_map[m]] = result[m] def build_roundtype_metric_distribution(metric_key, round_type, subkey): values2 = [] for sid, p in stats_map.items(): raw = p.get('rd_roundtype_split_json') or '' if not raw: continue try: obj = json.loads(raw) if isinstance(raw, str) else raw except: continue if not isinstance(obj, dict): continue bucket = obj.get(round_type) if not isinstance(bucket, dict): continue v = bucket.get(subkey) if v is None: continue try: v = float(v) except: continue values2.append(v) raw_target = stats_map.get(target_steam_id, {}).get('rd_roundtype_split_json') or '' target_val2 = None if raw_target: try: obj_t = json.loads(raw_target) if isinstance(raw_target, str) else raw_target if isinstance(obj_t, dict) and isinstance(obj_t.get(round_type), dict): tv = obj_t[round_type].get(subkey) if tv is not None: target_val2 = float(tv) except: target_val2 = None if not values2 or target_val2 is None: return None values2.sort(reverse=True) try: rank2 = values2.index(target_val2) + 1 except ValueError: rank2 = len(values2) return { 'val': target_val2, 'rank': rank2, 'total': len(values2), 'min': min(values2), 'max': max(values2), 'avg': sum(values2) / len(values2), 'inverted': False } rt_kpr_types = ['pistol', 'reg', 'overtime'] rt_perf_types = ['eco', 'rifle', 'fullbuy', 'overtime'] for t in rt_kpr_types: result[f'rd_rt_kpr_{t}'] = build_roundtype_metric_distribution('rd_roundtype_split_json', t, 'kpr') for t in rt_perf_types: result[f'rd_rt_perf_{t}'] = build_roundtype_metric_distribution('rd_roundtype_split_json', t, 'perf') top_weapon_rank_map = {} try: raw_tw = stats_map.get(target_steam_id, {}).get('rd_weapon_top_json') or '[]' tw_items = json.loads(raw_tw) if isinstance(raw_tw, str) else raw_tw weapons = [] if isinstance(tw_items, list): for it in tw_items: if isinstance(it, dict) and it.get('weapon'): weapons.append(str(it.get('weapon'))) weapons = weapons[:5] except Exception: weapons = [] if weapons: w_placeholders = ','.join('?' for _ in weapons) sql_w = f""" SELECT attacker_steam_id as steam_id_64, weapon, COUNT(*) as kills, SUM(is_headshot) as hs FROM fact_round_events WHERE event_type='kill' AND attacker_steam_id IN ({l2_placeholders}) AND weapon IN ({w_placeholders}) GROUP BY attacker_steam_id, weapon """ weapon_rows = query_db('l2', sql_w, active_roster_ids + weapons) per_weapon = {} for r in weapon_rows: sid = str(r['steam_id_64']) w = str(r['weapon'] or '') if not w: continue kills = int(r['kills'] or 0) hs = int(r['hs'] or 0) mp = stats_map.get(sid, {}).get('total_matches') or 0 try: mp = float(mp) except Exception: mp = 0 kpm = (kills / mp) if (kills > 0 and mp > 0) else None hs_rate = (hs / kills) if kills > 0 else None per_weapon.setdefault(w, {})[sid] = {"kpm": kpm, "hs_rate": hs_rate} for w in weapons: d = per_weapon.get(w) or {} target_d = d.get(target_steam_id) or {} target_kpm = target_d.get("kpm") target_hs = target_d.get("hs_rate") kpm_vals = [v.get("kpm") for v in d.values() if v.get("kpm") is not None] hs_vals = [v.get("hs_rate") for v in d.values() if v.get("hs_rate") is not None] kpm_rank = None hs_rank = None if kpm_vals and target_kpm is not None: kpm_vals.sort(reverse=True) try: kpm_rank = kpm_vals.index(target_kpm) + 1 except ValueError: kpm_rank = len(kpm_vals) if hs_vals and target_hs is not None: hs_vals.sort(reverse=True) try: hs_rank = hs_vals.index(target_hs) + 1 except ValueError: hs_rank = len(hs_vals) top_weapon_rank_map[w] = { "kpm_rank": kpm_rank, "kpm_total": len(kpm_vals), "hs_rank": hs_rank, "hs_total": len(hs_vals), } result['top_weapon_rank_map'] = top_weapon_rank_map return result @staticmethod def get_live_matches(): # Query matches started in last 2 hours with no winner # Assuming we have a way to ingest live matches. # For now, this query is 'formal' but will likely return empty on static dataset. sql = """ SELECT m.match_id, m.map_name, m.score_team1, m.score_team2, m.start_time FROM fact_matches m WHERE m.winner_team IS NULL AND m.start_time > strftime('%s', 'now', '-2 hours') """ return query_db('l2', sql) @staticmethod def get_head_to_head_stats(match_id): """ Returns a matrix of kills between players. List of {attacker_steam_id, victim_steam_id, kills} """ sql = """ SELECT attacker_steam_id, victim_steam_id, COUNT(*) as kills FROM fact_round_events WHERE match_id = ? AND event_type = 'kill' GROUP BY attacker_steam_id, victim_steam_id """ return query_db('l2', sql, [match_id]) @staticmethod def get_match_round_details(match_id): """ Returns a detailed dictionary of rounds, events, and economy. { round_num: { info: {winner_side, win_reason_desc, end_time_stamp...}, events: [ {event_type, event_time, attacker..., weapon...}, ... ], economy: { steam_id: {main_weapon, equipment_value...}, ... } } } """ # 1. Base Round Info rounds_sql = "SELECT * FROM fact_rounds WHERE match_id = ? ORDER BY round_num" rounds_rows = query_db('l2', rounds_sql, [match_id]) if not rounds_rows: return {} # 2. Events events_sql = """ SELECT * FROM fact_round_events WHERE match_id = ? ORDER BY round_num, event_time """ events_rows = query_db('l2', events_sql, [match_id]) # 3. Economy (if avail) eco_sql = """ SELECT * FROM fact_round_player_economy WHERE match_id = ? """ eco_rows = query_db('l2', eco_sql, [match_id]) # Structure Data result = {} # Initialize rounds for r in rounds_rows: r_num = r['round_num'] result[r_num] = { 'info': dict(r), 'events': [], 'economy': {} } # Group events for e in events_rows: r_num = e['round_num'] if r_num in result: result[r_num]['events'].append(dict(e)) # Group economy for eco in eco_rows: r_num = eco['round_num'] sid = eco['steam_id_64'] if r_num in result: result[r_num]['economy'][sid] = dict(eco) return result