Files
yrtv/ETL/L1A.py
2026-01-24 17:39:56 +08:00

90 lines
2.7 KiB
Python

import os
import json
import sqlite3
import glob
import argparse # Added
# Paths
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
OUTPUT_ARENA_DIR = os.path.join(BASE_DIR, 'output_arena')
DB_DIR = os.path.join(BASE_DIR, 'database', 'L1A')
DB_PATH = os.path.join(DB_DIR, 'L1A.sqlite')
def init_db():
if not os.path.exists(DB_DIR):
os.makedirs(DB_DIR)
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS raw_iframe_network (
match_id TEXT PRIMARY KEY,
content TEXT,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
return conn
def process_files():
parser = argparse.ArgumentParser()
parser.add_argument('--force', action='store_true', help='Force reprocessing of all files')
args = parser.parse_args()
conn = init_db()
cursor = conn.cursor()
# Get existing match_ids to skip
existing_ids = set()
if not args.force:
try:
cursor.execute("SELECT match_id FROM raw_iframe_network")
existing_ids = set(row[0] for row in cursor.fetchall())
print(f"Found {len(existing_ids)} existing matches in DB. Incremental mode active.")
except Exception as e:
print(f"Error checking existing data: {e}")
# Pattern to match all iframe_network.json files
# output_arena/*/iframe_network.json
pattern = os.path.join(OUTPUT_ARENA_DIR, '*', 'iframe_network.json')
files = glob.glob(pattern)
print(f"Found {len(files)} files in directory.")
count = 0
skipped = 0
for file_path in files:
try:
# Extract match_id from directory name
# file_path is like .../output_arena/g161-xxx/iframe_network.json
parent_dir = os.path.dirname(file_path)
match_id = os.path.basename(parent_dir)
if match_id in existing_ids:
skipped += 1
continue
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Upsert data
cursor.execute('''
INSERT OR REPLACE INTO raw_iframe_network (match_id, content)
VALUES (?, ?)
''', (match_id, content))
count += 1
if count % 100 == 0:
print(f"Processed {count} files...")
conn.commit()
except Exception as e:
print(f"Error processing {file_path}: {e}")
conn.commit()
conn.close()
print(f"Finished. Processed: {count}, Skipped: {skipped}.")
if __name__ == '__main__':
process_files()