UFC Sell-Through Project - ETL Pipeline

Overview

The ETL pipeline uses PySpark to:

  1. Load raw CSV files
  2. Standardize date formats
  3. Clean and transform data
  4. Save as Parquet for efficient querying

Spark Session Configuration

def make_spark(app_name="UFC-ETL", local=True):
    builder = SparkSession.builder
    builder = builder.appName(app_name)
    
    if local:
        builder = builder.master("local[*]")
    
    # Performance configurations
    builder = builder.config("spark.sql.adaptive.enabled", "true")
    builder = builder.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    builder = builder.config("spark.sql.parquet.compression.codec", "snappy")
    builder = builder.config("spark.driver.memory", "4g")
    builder = builder.config("spark.executor.memory", "4g")
    builder = builder.config("spark.sql.shuffle.partitions", "8")
    builder = builder.config("spark.default.parallelism", "8")
    builder = builder.config("spark.sql.autoBroadcastJoinThreshold", "10485760")
    
    return builder.getOrCreate()

Data Loading

Load Raw CSVs

def load_data(spark, data_dir):
    raw_path = os.path.join(data_dir, "raw")
    
    file_map = {
        "events": "events.csv",
        "fight_results": "fight_results.csv",
        "fight_stats": "fight_stats.csv",
        "fight_details": "fight_details.csv",
        "fighter_details": "fighter_details.csv",
        "fighter_tott": "fighter_tott.csv",
    }
    
    data = {}
    for name, fname in file_map.items():
        full_path = os.path.join(raw_path, fname)
        if os.path.exists(full_path):
            df = spark.read.csv(full_path, header=True, inferSchema=True)
            # Lowercase all column names
            for col_name in df.columns:
                df = df.withColumnRenamed(col_name, col_name.lower())
            data[name] = df
    
    return data

Load External Data

def load_external_data(spark, data_dir):
    external_path = os.path.join(data_dir, "external")
    
    external_files = {
        "betting_odds": "betting_odds.csv",
        "google_trends": "google_trends.csv",
        "fighter_buzz": "fighter_buzz.csv",
        "event_sentiment": "event_sentiment.csv",
        "reddit_comments": "reddit_comments.csv",
    }
    
    external_data = {}
    for name, fname in external_files.items():
        full_path = os.path.join(external_path, fname)
        if os.path.exists(full_path):
            df = spark.read.csv(full_path, header=True, inferSchema=True)
            external_data[name] = df
    
    return external_data

Date Standardization

The raw data contains dates in multiple formats. The ETL standardizes them:

def standardize_date(date_col):
    # Extract embedded ISO date if present (e.g., "2024-03-09 00:00:00")
    extracted = F.when(
        date_col.rlike(r"\d{4}-\d{2}-\d{2}"),
        F.regexp_extract(date_col, r"(\d{4}-\d{2}-\d{2})", 1)
    ).otherwise(date_col)
    
    # Try different date formats
    iso_date = F.when(
        extracted.rlike(r"^\d{4}-\d{2}-\d{2}$"), 
        F.to_date(extracted, "yyyy-MM-dd")
    )
    long_date = F.when(
        extracted.rlike(r"^[A-Za-z]+ \d+, \d+"), 
        F.to_date(extracted, "MMMM d, yyyy")
    )
    short_date = F.when(
        extracted.rlike(r"^[A-Za-z]{3} \d+, \d+"), 
        F.to_date(extracted, "MMM d, yyyy")
    )
    
    return F.coalesce(iso_date, long_date, short_date)

Event Cleaning

def clean_events(df):
    # Standardize dates
    df = df.withColumn("event_date", standardize_date(F.col("date")))
    
    # Extract event type
    df = df.withColumn(
        "event_type",
        F.when(F.col("event").rlike(r"^UFC \d+"), "PPV")
        .when(F.col("event").contains("Fight Night"), "Fight Night")
        .when(F.col("event").contains("on ESPN"), "ESPN")
        .when(F.col("event").contains("on ABC"), "ABC")
        .when(F.col("event").contains("on Fox"), "Fox")
        .otherwise("Other")
    )
    
    # Extract location components
    df = df.withColumn(
        "city",
        F.regexp_extract(F.col("location"), r"^([^,]+)", 1)
    )
    df = df.withColumn(
        "state_country",
        F.regexp_extract(F.col("location"), r",\s*(.+)$", 1)
    )
    
    # Determine country
    us_states = ["Nevada", "California", "New York", "Texas", "Florida", ...]
    df = df.withColumn(
        "country",
        F.when(F.col("state_country").isin(us_states), "USA")
        .when(F.col("state_country").contains("Canada"), "Canada")
        .when(F.col("state_country").contains("UK"), "UK")
        .otherwise(F.col("state_country"))
    )
    
    # Extract year
    df = df.withColumn("year", F.year("event_date"))
    
    # Create unique event ID
    df = df.withColumn("event_id", F.monotonically_increasing_id())
    
    return df

Fighter Cleaning

def clean_fighters(fighter_details, fighter_tott):
    # Combine details and physical attributes
    df = fighter_details.join(
        fighter_tott,
        on=F.col("first") + " " + F.col("last") == F.col("fighter"),
        how="left"
    )
    
    # Parse height to inches
    df = df.withColumn(
        "height_inches",
        F.when(F.col("height").rlike(r"\d+' \d+"),
            F.regexp_extract(F.col("height"), r"(\d+)'", 1).cast("int") * 12 +
            F.regexp_extract(F.col("height"), r"(\d+)\"", 1).cast("int")
        ).otherwise(None)
    )
    
    # Parse reach
    df = df.withColumn(
        "reach_inches",
        F.when(F.col("reach").rlike(r"\d+"),
            F.regexp_extract(F.col("reach"), r"(\d+)", 1).cast("int")
        ).otherwise(None)
    )
    
    # Parse DOB
    df = df.withColumn("dob", F.to_date(F.col("dob"), "MMM dd, yyyy"))
    
    # Full name
    df = df.withColumn(
        "fighter_name",
        F.concat_ws(" ", F.col("first"), F.col("last"))
    )
    
    return df

Fight Cleaning

def clean_fights(fight_results, events_lookup):
    df = fight_results
    
    # Extract fighter names from BOUT column
    df = df.withColumn(
        "fighter1_name",
        F.trim(F.regexp_extract(F.col("bout"), r"^(.+?)\s+vs\.?\s+", 1))
    )
    df = df.withColumn(
        "fighter2_name",
        F.trim(F.regexp_extract(F.col("bout"), r"\s+vs\.?\s+(.+)$", 1))
    )
    
    # Parse method category
    df = df.withColumn(
        "method_category",
        F.when(F.col("method").contains("KO"), "KO/TKO")
        .when(F.col("method").contains("TKO"), "KO/TKO")
        .when(F.col("method").contains("SUB"), "Submission")
        .when(F.col("method").contains("Submission"), "Submission")
        .when(F.col("method").contains("DEC"), "Decision")
        .when(F.col("method").contains("Decision"), "Decision")
        .otherwise("Other")
    )
    
    # Identify title fights
    df = df.withColumn(
        "is_title_fight",
        F.col("weightclass").contains("Title").cast("boolean")
    )
    
    # Extract weight class
    df = df.withColumn(
        "weight_class",
        F.regexp_extract(F.col("weightclass"), r"([\w]+weight|Catch Weight)", 1)
    )
    
    # Join with events to get event_id and date
    df = df.join(events_lookup, on="event", how="left")
    
    # Create fight ID
    df = df.withColumn("fight_id", F.monotonically_increasing_id())
    
    # Determine winner
    df = df.withColumn(
        "winner_name",
        F.when(F.col("outcome") == "W", F.col("fighter1_name"))
        .otherwise(F.col("fighter2_name"))
    )
    
    return df

Fight Stats Cleaning

def clean_fight_stats(stats_df, fights_lookup):
    df = stats_df
    
    # Parse "X of Y" format
    def parse_of_stat(col_name):
        return F.regexp_extract(F.col(col_name), r"(\d+) of \d+", 1).cast("int")
    
    def parse_of_attempted(col_name):
        return F.regexp_extract(F.col(col_name), r"\d+ of (\d+)", 1).cast("int")
    
    df = df.withColumn("sig_strikes_landed", parse_of_stat("sig.str."))
    df = df.withColumn("sig_strikes_attempted", parse_of_attempted("sig.str."))
    df = df.withColumn("takedowns_landed", parse_of_stat("td"))
    df = df.withColumn("takedowns_attempted", parse_of_attempted("td"))
    
    # Parse control time (MM:SS) to seconds
    df = df.withColumn(
        "ctrl_seconds",
        F.when(F.col("ctrl").rlike(r"\d+:\d+"),
            F.regexp_extract(F.col("ctrl"), r"(\d+):", 1).cast("int") * 60 +
            F.regexp_extract(F.col("ctrl"), r":(\d+)", 1).cast("int")
        ).otherwise(0)
    )
    
    # Join with fights to get fight_id
    df = df.join(fights_lookup.select("bout", "event", "fight_id"), 
                 on=["bout", "event"], how="left")
    
    return df

Save to Parquet

def save_processed(data, output_dir):
    proc_dir = os.path.join(output_dir, "processed")
    os.makedirs(proc_dir, exist_ok=True)
    
    for name, df in data.items():
        output_path = os.path.join(proc_dir, name)
        df.write.mode("overwrite").parquet(output_path)
        print(f"  Saved {name}: {df.count()} rows")

Main Pipeline

def run_etl(data_dir, output_dir):
    print("Starting UFC ETL Pipeline...")
    
    spark = make_spark()
    spark.sparkContext.setLogLevel("WARN")
    
    try:
        # Load raw data
        raw_data = load_data(spark, data_dir)
        
        # Clean each dataset
        events = clean_events(raw_data["events"])
        fighters = clean_fighters(
            raw_data["fighter_details"], 
            raw_data["fighter_tott"]
        )
        fights = clean_fights(
            raw_data["fight_results"],
            events.select("event", "event_id", "event_date")
        )
        stats = clean_fight_stats(
            raw_data["fight_stats"],
            fights.select("bout", "event", "fight_id")
        )
        
        # Cache for reuse
        events.cache()
        fighters.cache()
        fights.cache()
        stats.cache()
        
        # Save processed data
        processed = {
            "events": events,
            "fighters": fighters,
            "fights": fights,
            "fight_stats": stats
        }
        save_processed(processed, output_dir)
        
        print("ETL Pipeline Complete!")
        
    finally:
        spark.stop()

Running the Pipeline

Local Development

spark-submit src/etl/spark_etl.py \
    --data-dir ./data \
    --output-dir ./data

On Roar Cluster

spark-submit \
    --master yarn \
    --deploy-mode client \
    --driver-memory 8g \
    --executor-memory 8g \
    --num-executors 4 \
    src/etl/spark_etl.py \
    --data-dir /storage/work/user/ufc_data \
    --output-dir /storage/work/user/ufc_data

Data Validation

def validate_processed_data(spark, data_dir):
    proc_dir = os.path.join(data_dir, "processed")
    
    events = spark.read.parquet(os.path.join(proc_dir, "events"))
    fights = spark.read.parquet(os.path.join(proc_dir, "fights"))
    
    # Check for nulls in key columns
    null_dates = events.filter(F.col("event_date").isNull()).count()
    null_fighters = fights.filter(F.col("fighter1_name").isNull()).count()
    
    print(f"Events with null dates: {null_dates}")
    print(f"Fights with null fighters: {null_fighters}")
    
    # Verify joins worked
    orphan_fights = fights.join(
        events.select("event_id"), 
        on="event_id", 
        how="left_anti"
    ).count()
    
    print(f"Orphan fights (no event): {orphan_fights}")