UFC Sell-Through Project - Feature Engineering

Overview

The feature engineering module creates 36 features across four categories:

  1. Fighter Statistics - Rolling performance metrics
  2. Matchup Features - Physical and historical comparisons
  3. Event Features - Card-level aggregations
  4. External Features - Betting, trends, sentiment

Window Functions

PySpark window functions enable rolling calculations without loops:

from pyspark.sql import Window
from pyspark.sql import functions as F

# Window for all previous fights (career stats)
win_all = Window.partitionBy("fighter_name").orderBy("event_date").rowsBetween(
    Window.unboundedPreceding, -1
)

# Window for last N fights
win_n = Window.partitionBy("fighter_name").orderBy("event_date").rowsBetween(-5, -1)

Fighter Statistics

make_fighter_stats()

def make_fighter_stats(fights, stats, n=5):
    # Split into fighter1 and fighter2 views
    f1 = fights.select(
        "fight_id", "event_id", "event_date",
        F.col("fighter1_name").alias("fighter_name"),
        "winner_name", "method_category", "is_title_fight", "weight_class"
    )
    f1 = f1.withColumn("won", (F.col("fighter_name") == F.col("winner_name")).cast(IntegerType()))
    
    f2 = fights.select(
        "fight_id", "event_id", "event_date",
        F.col("fighter2_name").alias("fighter_name"),
        "winner_name", "method_category", "is_title_fight", "weight_class"
    )
    f2 = f2.withColumn("won", (F.col("fighter_name") == F.col("winner_name")).cast(IntegerType()))
    
    # Union both perspectives
    all_fights = f1.union(f2)
    
    # Check if fight ended in finish
    all_fights = all_fights.withColumn(
        "finished",
        F.when(
            (F.col("won") == 1) & F.col("method_category").isin(["KO/TKO", "Submission"]), 
            1
        ).otherwise(0)
    )
    
    # Join with fight stats
    all_fights = all_fights.join(
        stats.select("fight_id", "fighter_name", "sig_strikes_landed", 
                     "sig_strikes_attempted", "takedowns_landed", "takedowns_attempted"),
        on=["fight_id", "fighter_name"],
        how="left"
    )
    
    # Calculate rolling statistics
    result = all_fights
    
    # Win rate over last N fights
    result = result.withColumn(
        f"win_rate_last{n}", 
        F.avg("won").over(win_n)
    )
    
    # Finish rate over last N fights
    result = result.withColumn(
        f"finish_rate_last{n}", 
        F.avg("finished").over(win_n)
    )
    
    # Total career fights
    result = result.withColumn(
        "total_fights", 
        F.count("*").over(win_all)
    )
    
    # Days since last fight
    fighter_window = Window.partitionBy("fighter_name").orderBy("event_date")
    result = result.withColumn("last_fight", F.lag("event_date", 1).over(fighter_window))
    result = result.withColumn("days_off", F.datediff("event_date", "last_fight"))
    
    # Career strike accuracy
    strike_accuracy = F.when(
        F.col("sig_strikes_attempted") > 0,
        F.col("sig_strikes_landed") / F.col("sig_strikes_attempted")
    )
    result = result.withColumn("strike_acc", F.avg(strike_accuracy).over(win_all))
    
    # Career takedown accuracy
    td_accuracy = F.when(
        F.col("takedowns_attempted") > 0,
        F.col("takedowns_landed") / F.col("takedowns_attempted")
    )
    result = result.withColumn("td_acc", F.avg(td_accuracy).over(win_all))
    
    # Total title fights
    result = result.withColumn(
        "title_fights", 
        F.sum(F.col("is_title_fight").cast(IntegerType())).over(win_all)
    )
    
    return result.select([
        "fight_id", "event_id", "event_date", "fighter_name",
        f"win_rate_last{n}", f"finish_rate_last{n}",
        "total_fights", "days_off", "strike_acc", "td_acc", "title_fights"
    ])

Fighter Features Table

Feature Description Calculation
win_rate_last5 Win rate in last 5 fights avg(won) over window
finish_rate_last5 Finish rate in last 5 avg(finished) over window
total_fights Career fight count count(*) over career
days_off Days since last fight datediff
strike_acc Career strike accuracy landed / attempted
td_acc Career takedown accuracy landed / attempted
title_fights Career title fight count sum(is_title) over career

Matchup Features

make_matchup_features()

def make_matchup_features(fights, fighter_feats, fighters):
    df = fights
    
    # Create pair identifiers for rematch detection
    df = df.withColumn("pair1", F.least("fighter1_name", "fighter2_name"))
    df = df.withColumn("pair2", F.greatest("fighter1_name", "fighter2_name"))
    
    # Count previous meetings
    prev_fights = fights.select(
        F.least("fighter1_name", "fighter2_name").alias("pair1"),
        F.greatest("fighter1_name", "fighter2_name").alias("pair2"),
        F.col("event_date").alias("prev_event_date")
    )
    
    current = df.select("fight_id", F.col("event_date").alias("curr_date"), "pair1", "pair2")
    prev_joined = prev_fights.join(current, on=["pair1", "pair2"])
    prev_filtered = prev_joined.filter(F.col("prev_event_date") < F.col("curr_date"))
    prev_counts = prev_filtered.groupBy("fight_id").agg(F.count("*").alias("times_fought"))
    
    df = df.join(prev_counts, on="fight_id", how="left")
    df = df.fillna({"times_fought": 0})
    
    # Rematch flags
    df = df.withColumn("is_rematch", (F.col("times_fought") > 0).cast(IntegerType()))
    df = df.withColumn("is_rivalry", (F.col("times_fought") >= 2).cast(IntegerType()))
    
    # Join fighter physical stats
    f1_stats = fighters.select(
        F.col("fighter_name").alias("fighter1_name"),
        F.col("height_inches").alias("h1"),
        F.col("reach_inches").alias("r1"),
        F.col("dob").alias("dob1")
    )
    f2_stats = fighters.select(
        F.col("fighter_name").alias("fighter2_name"),
        F.col("height_inches").alias("h2"),
        F.col("reach_inches").alias("r2"),
        F.col("dob").alias("dob2")
    )
    
    df = df.join(f1_stats, on="fighter1_name", how="left")
    df = df.join(f2_stats, on="fighter2_name", how="left")
    
    # Calculate differentials
    df = df.withColumn("reach_diff", F.abs(F.col("r1") - F.col("r2")))
    df = df.withColumn("height_diff", F.abs(F.col("h1") - F.col("h2")))
    
    # Age difference in years
    age1 = F.datediff("event_date", "dob1") / 365.25
    age2 = F.datediff("event_date", "dob2") / 365.25
    df = df.withColumn("age_diff", F.abs(age1 - age2))
    
    return df.select([
        "fight_id", "event_id", "fighter1_name", "fighter2_name",
        "is_rematch", "is_rivalry", "is_title_fight",
        "reach_diff", "height_diff", "age_diff"
    ])

Matchup Features Table

Feature Description
is_rematch Fighters have met before
is_rivalry Met 2+ times before
reach_diff Absolute reach difference (inches)
height_diff Absolute height difference (inches)
age_diff Absolute age difference (years)

Event Features

make_event_features()

def make_event_features(events, fights, matchups, fighter_feats):
    df = events
    
    # Count fights per event
    fight_counts = fights.groupBy("event_id").agg(
        F.count("*").alias("num_fights"),
        F.sum(F.col("is_title_fight").cast(IntegerType())).alias("num_title_fights"),
        F.max(F.col("is_title_fight").cast(IntegerType())).alias("has_title")
    )
    df = df.join(fight_counts, on="event_id", how="left")
    
    # Count rematches
    rematch_counts = matchups.groupBy("event_id").agg(
        F.sum(F.col("is_rematch").cast(IntegerType())).alias("num_rematches"),
        F.max("is_rivalry").alias("has_rivalry")
    )
    df = df.join(rematch_counts, on="event_id", how="left")
    
    # Aggregate physical features
    matchup_stats = matchups.groupBy("event_id").agg(
        F.avg("reach_diff").alias("avg_reach_diff"),
        F.avg("height_diff").alias("avg_height_diff"),
        F.avg("age_diff").alias("avg_age_diff")
    )
    df = df.join(matchup_stats, on="event_id", how="left")
    
    # Average fighter experience on card
    fighter_avg = fighter_feats.groupBy("event_id").agg(
        F.avg("total_fights").alias("avg_exp"),
        F.avg("win_rate_last5").alias("avg_win_rate"),
        F.max("total_fights").alias("max_exp")
    )
    df = df.join(fighter_avg, on="event_id", how="left")
    
    # Date features
    df = df.withColumn("day_of_week", F.dayofweek("event_date"))
    df = df.withColumn("month", F.month("event_date"))
    df = df.withColumn("is_saturday", (F.col("day_of_week") == 7).cast(IntegerType()))
    
    # Days since last event of same type
    win_prev = Window.partitionBy("event_type").orderBy("event_date")
    df = df.withColumn("days_since_last", 
                       F.datediff("event_date", F.lag("event_date", 1).over(win_prev)))
    
    # Location features
    df = df.withColumn("is_vegas", 
                       F.lower(F.col("city")).contains("las vegas").cast(IntegerType()))
    df = df.withColumn("is_usa", (F.col("country") == "USA").cast(IntegerType()))
    df = df.withColumn("is_ppv", (F.col("event_type") == "PPV").cast(IntegerType()))
    
    return df

Event Features Table

Feature Description
num_fights Total fights on card
num_title_fights Number of title fights
has_title At least one title fight
num_rematches Rematch count
has_rivalry Card has a rivalry
avg_reach_diff Mean reach differential
avg_height_diff Mean height differential
avg_age_diff Mean age differential
avg_exp Average fighter experience
avg_win_rate Average fighter win rate
max_exp Most experienced fighter
day_of_week Day (1=Sunday, 7=Saturday)
month Month number
is_saturday Saturday event
days_since_last Days since previous event
is_vegas Las Vegas location
is_usa USA location
is_ppv Pay-per-view event

Extended Features (External Data)

Graph Features

Feature Description
avg_combined_pagerank Mean PageRank sum
max_combined_pagerank Max star power
avg_pagerank_diff Mean PageRank differential
avg_network_size Mean opponent count
num_same_community_fights Same-division matchups

Betting Features

Feature Description
avg_betting_spread Mean odds spread
max_betting_spread Biggest mismatch
min_betting_spread Closest matchup
num_competitive_fights Spread < 10%
num_heavy_favorites Favorite > 70%

Sentiment Features

Feature Description
avg_buzz_7d Mean 7-day search interest
max_buzz_7d Peak search interest
total_buzz_7d Sum of all buzz
avg_buzz_diff Mean buzz differential
reddit_sentiment Average r/MMA sentiment
reddit_hype Engagement-weighted hype
reddit_engagement Total Reddit engagement
reddit_comments Comment count

Complete Feature List (36 Features)

feature_columns = [
    # Base features
    "num_fights", "num_title_fights", "has_title",
    "num_rematches", "has_rivalry",
    "avg_exp", "avg_win_rate", "max_exp",
    "avg_reach_diff", "avg_height_diff", "avg_age_diff",
    "day_of_week", "month", "is_saturday", "days_since_last",
    "is_vegas", "is_usa", "is_ppv",
    
    # Graph features
    "avg_combined_pagerank", "max_combined_pagerank",
    "avg_pagerank_diff", "avg_network_size", "num_same_community_fights",
    
    # Betting features
    "avg_betting_spread", "max_betting_spread", "min_betting_spread",
    "num_competitive_fights", "num_heavy_favorites",
    
    # Sentiment features
    "avg_buzz_7d", "max_buzz_7d", "total_buzz_7d", "avg_buzz_diff",
    "reddit_sentiment", "reddit_hype", "reddit_engagement", "reddit_comments"
]

Running Feature Engineering

spark-submit src/features/feature_engineering.py \
    --data-dir ./data \
    --output-dir ./data \
    --no-external  # Skip external data if not available

Output

Features saved to Parquet: