UFC Sell-Through Project - Model Training

Overview

The project uses Spark MLlib for distributed machine learning:

Model Performance

Improved Model (36 Features)

Metric Value
Test RMSE 0.3231
Test MAE 0.1801
Test R² 0.4763
Features 36
Training Events 565
Test Events 80

Baseline Model (9 Features)

Metric Value
Test RMSE 0.3829
Test MAE 0.3460
Test R² 0.2644
Features 9

Improvement: 80% increase in R² with extended features.

Spark Session

def create_spark_session():
    builder = SparkSession.builder
    builder = builder.appName("UFC-Model-Training-Improved")
    builder = builder.config("spark.driver.memory", "6g")
    builder = builder.config("spark.sql.shuffle.partitions", "50")
    
    spark = builder.getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    return spark

Data Loading

def load_data(spark, data_dir):
    # Load feature table
    features_path = os.path.join(data_dir, "features", "event_features")
    event_features = spark.read.parquet(features_path)
    
    # Load attendance target variable
    attendance_path = os.path.join(data_dir, "external", "attendance.csv")
    attendance = spark.read.option("header", "true").csv(attendance_path)
    
    # Parse sell_through as double
    attendance = attendance.withColumn(
        "sell_through", 
        F.col("sell_through").cast("double")
    ).filter(F.col("sell_through").isNotNull())
    
    # Join features with target
    joined = event_features.join(
        attendance.select("event_name", "sell_through"),
        on="event_name",
        how="inner"
    )
    
    return joined

Feature Preparation

def prepare_features(df):
    feature_columns = [
        # Base features (18)
        "num_fights", "num_title_fights", "has_title",
        "num_rematches", "has_rivalry",
        "avg_exp", "avg_win_rate", "max_exp",
        "avg_reach_diff", "avg_height_diff", "avg_age_diff",
        "day_of_week", "month", "is_saturday", "days_since_last",
        "is_vegas", "is_usa", "is_ppv",
        
        # Graph features (5)
        "avg_combined_pagerank", "max_combined_pagerank",
        "avg_pagerank_diff", "avg_network_size", "num_same_community_fights",
        
        # Betting features (5)
        "avg_betting_spread", "max_betting_spread", "min_betting_spread",
        "num_competitive_fights", "num_heavy_favorites",
        
        # Sentiment features (8)
        "avg_buzz_7d", "max_buzz_7d", "total_buzz_7d", "avg_buzz_diff",
        "reddit_sentiment", "reddit_hype", "reddit_engagement", "reddit_comments"
    ]
    
    # Use only available columns
    available = [c for c in feature_columns if c in df.columns]
    
    # Fill nulls with 0
    for col in available:
        df = df.withColumn(col, F.coalesce(F.col(col), F.lit(0.0)))
    
    return df, available

Time-Based Split

def train_test_split(df, test_year=2024):
    """Split data by year to avoid data leakage."""
    
    # Extract year from event_date
    df = df.withColumn("year", F.year("event_date"))
    
    # Train on pre-test_year, test on test_year+
    train_df = df.filter(F.col("year") < test_year)
    test_df = df.filter(F.col("year") >= test_year)
    
    print(f"Training set: {train_df.count()} events (before {test_year})")
    print(f"Test set: {test_df.count()} events ({test_year}+)")
    
    return train_df, test_df

Model Training

Gradient Boosted Trees

from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

def train_model(train_df, feature_columns, do_cross_validation=True, model_type="gbt"):
    # Assemble features into vector
    assembler = VectorAssembler(
        inputCols=feature_columns,
        outputCol="features"
    )
    
    if model_type == "gbt":
        model = GBTRegressor(
            featuresCol="features",
            labelCol="sell_through",
            maxDepth=7,
            maxIter=100,
            stepSize=0.1,
            seed=42
        )
    else:  # Random Forest
        model = RandomForestRegressor(
            featuresCol="features",
            labelCol="sell_through",
            numTrees=100,
            maxDepth=10,
            seed=42
        )
    
    pipeline = Pipeline(stages=[assembler, model])
    
    if do_cross_validation:
        # Hyperparameter grid
        param_grid = ParamGridBuilder() \
            .addGrid(model.maxDepth, [7, 10, 12]) \
            .addGrid(model.maxIter, [100, 150]) \
            .addGrid(model.stepSize, [0.05, 0.1]) \
            .build()
        
        evaluator = RegressionEvaluator(
            labelCol="sell_through",
            predictionCol="prediction",
            metricName="rmse"
        )
        
        cv = CrossValidator(
            estimator=pipeline,
            estimatorParamMaps=param_grid,
            evaluator=evaluator,
            numFolds=3,
            seed=42
        )
        
        cv_model = cv.fit(train_df)
        return cv_model.bestModel
    else:
        return pipeline.fit(train_df)

Model Evaluation

def evaluate_model(model, test_df, feature_columns, output_dir):
    # Generate predictions
    predictions = model.transform(test_df)
    
    # Calculate metrics
    rmse_eval = RegressionEvaluator(
        labelCol="sell_through", 
        predictionCol="prediction", 
        metricName="rmse"
    )
    mae_eval = RegressionEvaluator(
        labelCol="sell_through", 
        predictionCol="prediction", 
        metricName="mae"
    )
    r2_eval = RegressionEvaluator(
        labelCol="sell_through", 
        predictionCol="prediction", 
        metricName="r2"
    )
    
    rmse = rmse_eval.evaluate(predictions)
    mae = mae_eval.evaluate(predictions)
    r2 = r2_eval.evaluate(predictions)
    
    print(f"RMSE: {rmse:.4f}")
    print(f"MAE: {mae:.4f}")
    print(f"R²: {r2:.4f}")
    
    # Feature importance
    gbt_model = model.stages[-1]
    if hasattr(gbt_model, 'featureImportances'):
        importances = gbt_model.featureImportances.toArray()
        feature_importance = list(zip(feature_columns, importances))
        feature_importance.sort(key=lambda x: x[1], reverse=True)
        
        print("\nTop 10 Features:")
        for feat, imp in feature_importance[:10]:
            print(f"  {feat}: {imp:.4f}")
    
    # Save predictions for visualization
    pred_pandas = predictions.select("event_name", "sell_through", "prediction").toPandas()
    pred_pandas.to_csv(os.path.join(output_dir, "models", "predictions.csv"), index=False)
    
    return {
        "rmse": rmse,
        "mae": mae,
        "r2": r2,
        "test_size": test_df.count(),
        "feature_importance": feature_importance if 'feature_importance' in locals() else None
    }

Feature Importance (Top 10)

Rank Feature Importance
1 is_ppv 0.1842
2 has_title 0.1523
3 num_title_fights 0.0987
4 reddit_hype 0.0812
5 avg_buzz_7d 0.0745
6 max_combined_pagerank 0.0689
7 is_vegas 0.0534
8 avg_betting_spread 0.0478
9 num_fights 0.0423
10 avg_win_rate 0.0389

Saving Results

def save_results(model, metrics, output_dir, feature_columns):
    models_dir = os.path.join(output_dir, "models")
    os.makedirs(models_dir, exist_ok=True)
    
    # Save model
    model_path = os.path.join(models_dir, "gbt_model_improved")
    model.write().overwrite().save(model_path)
    
    # Save metrics
    metrics_copy = metrics.copy()
    metrics_copy["timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    metrics_copy["num_features"] = len(feature_columns)
    
    if metrics_copy.get("feature_importance"):
        metrics_copy["feature_importance"] = [
            {"feature": f, "importance": float(i)}
            for f, i in metrics_copy["feature_importance"]
        ]
    
    with open(os.path.join(models_dir, "metrics_improved.json"), "w") as f:
        json.dump(metrics_copy, f, indent=2)
    
    # Save feature list
    with open(os.path.join(models_dir, "features_used.txt"), "w") as f:
        f.write("\n".join(feature_columns))

Running Training

Basic Training

spark-submit src/models/train_improved.py \
    --data-dir ./data \
    --output-dir ./data \
    --test-year 2024

With Cross-Validation

spark-submit src/models/train_improved.py \
    --data-dir ./data \
    --output-dir ./data \
    --test-year 2024 \
    # CV enabled by default

Skip CV (faster)

spark-submit src/models/train_improved.py \
    --data-dir ./data \
    --output-dir ./data \
    --test-year 2024 \
    --no-cv

Random Forest Alternative

spark-submit src/models/train_improved.py \
    --data-dir ./data \
    --output-dir ./data \
    --test-year 2024 \
    --model rf

Output Files

data/models/
├── gbt_model_improved/      # Saved Spark ML model
├── metrics_improved.json    # Performance metrics
├── predictions.csv          # Test set predictions
└── features_used.txt        # Feature list

metrics_improved.json Example

{
  "rmse": 0.3231,
  "mae": 0.1801,
  "r2": 0.4763,
  "mse": 0.1044,
  "test_size": 80,
  "timestamp": "2025-01-04 12:00:00",
  "num_features": 36,
  "feature_importance": [
    {"feature": "is_ppv", "importance": 0.1842},
    {"feature": "has_title", "importance": 0.1523},
    ...
  ]
}