UFC Sell-Through Project - Model Training
Overview
The project uses Spark MLlib for distributed machine learning:
- Primary Model: Gradient Boosted Trees (GBT)
- Alternative Model: Random Forest
- Validation: Time-based split (train on pre-2024, test on 2024+)
- Tuning: 3-fold cross-validation with hyperparameter grid
Improved Model (36 Features)
| Metric |
Value |
| Test RMSE |
0.3231 |
| Test MAE |
0.1801 |
| Test R² |
0.4763 |
| Features |
36 |
| Training Events |
565 |
| Test Events |
80 |
Baseline Model (9 Features)
| Metric |
Value |
| Test RMSE |
0.3829 |
| Test MAE |
0.3460 |
| Test R² |
0.2644 |
| Features |
9 |
Improvement: 80% increase in R² with extended features.
Spark Session
def create_spark_session():
builder = SparkSession.builder
builder = builder.appName("UFC-Model-Training-Improved")
builder = builder.config("spark.driver.memory", "6g")
builder = builder.config("spark.sql.shuffle.partitions", "50")
spark = builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
return spark
Data Loading
def load_data(spark, data_dir):
# Load feature table
features_path = os.path.join(data_dir, "features", "event_features")
event_features = spark.read.parquet(features_path)
# Load attendance target variable
attendance_path = os.path.join(data_dir, "external", "attendance.csv")
attendance = spark.read.option("header", "true").csv(attendance_path)
# Parse sell_through as double
attendance = attendance.withColumn(
"sell_through",
F.col("sell_through").cast("double")
).filter(F.col("sell_through").isNotNull())
# Join features with target
joined = event_features.join(
attendance.select("event_name", "sell_through"),
on="event_name",
how="inner"
)
return joined
Feature Preparation
def prepare_features(df):
feature_columns = [
# Base features (18)
"num_fights", "num_title_fights", "has_title",
"num_rematches", "has_rivalry",
"avg_exp", "avg_win_rate", "max_exp",
"avg_reach_diff", "avg_height_diff", "avg_age_diff",
"day_of_week", "month", "is_saturday", "days_since_last",
"is_vegas", "is_usa", "is_ppv",
# Graph features (5)
"avg_combined_pagerank", "max_combined_pagerank",
"avg_pagerank_diff", "avg_network_size", "num_same_community_fights",
# Betting features (5)
"avg_betting_spread", "max_betting_spread", "min_betting_spread",
"num_competitive_fights", "num_heavy_favorites",
# Sentiment features (8)
"avg_buzz_7d", "max_buzz_7d", "total_buzz_7d", "avg_buzz_diff",
"reddit_sentiment", "reddit_hype", "reddit_engagement", "reddit_comments"
]
# Use only available columns
available = [c for c in feature_columns if c in df.columns]
# Fill nulls with 0
for col in available:
df = df.withColumn(col, F.coalesce(F.col(col), F.lit(0.0)))
return df, available
Time-Based Split
def train_test_split(df, test_year=2024):
"""Split data by year to avoid data leakage."""
# Extract year from event_date
df = df.withColumn("year", F.year("event_date"))
# Train on pre-test_year, test on test_year+
train_df = df.filter(F.col("year") < test_year)
test_df = df.filter(F.col("year") >= test_year)
print(f"Training set: {train_df.count()} events (before {test_year})")
print(f"Test set: {test_df.count()} events ({test_year}+)")
return train_df, test_df
Model Training
Gradient Boosted Trees
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
def train_model(train_df, feature_columns, do_cross_validation=True, model_type="gbt"):
# Assemble features into vector
assembler = VectorAssembler(
inputCols=feature_columns,
outputCol="features"
)
if model_type == "gbt":
model = GBTRegressor(
featuresCol="features",
labelCol="sell_through",
maxDepth=7,
maxIter=100,
stepSize=0.1,
seed=42
)
else: # Random Forest
model = RandomForestRegressor(
featuresCol="features",
labelCol="sell_through",
numTrees=100,
maxDepth=10,
seed=42
)
pipeline = Pipeline(stages=[assembler, model])
if do_cross_validation:
# Hyperparameter grid
param_grid = ParamGridBuilder() \
.addGrid(model.maxDepth, [7, 10, 12]) \
.addGrid(model.maxIter, [100, 150]) \
.addGrid(model.stepSize, [0.05, 0.1]) \
.build()
evaluator = RegressionEvaluator(
labelCol="sell_through",
predictionCol="prediction",
metricName="rmse"
)
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=3,
seed=42
)
cv_model = cv.fit(train_df)
return cv_model.bestModel
else:
return pipeline.fit(train_df)
Model Evaluation
def evaluate_model(model, test_df, feature_columns, output_dir):
# Generate predictions
predictions = model.transform(test_df)
# Calculate metrics
rmse_eval = RegressionEvaluator(
labelCol="sell_through",
predictionCol="prediction",
metricName="rmse"
)
mae_eval = RegressionEvaluator(
labelCol="sell_through",
predictionCol="prediction",
metricName="mae"
)
r2_eval = RegressionEvaluator(
labelCol="sell_through",
predictionCol="prediction",
metricName="r2"
)
rmse = rmse_eval.evaluate(predictions)
mae = mae_eval.evaluate(predictions)
r2 = r2_eval.evaluate(predictions)
print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"R²: {r2:.4f}")
# Feature importance
gbt_model = model.stages[-1]
if hasattr(gbt_model, 'featureImportances'):
importances = gbt_model.featureImportances.toArray()
feature_importance = list(zip(feature_columns, importances))
feature_importance.sort(key=lambda x: x[1], reverse=True)
print("\nTop 10 Features:")
for feat, imp in feature_importance[:10]:
print(f" {feat}: {imp:.4f}")
# Save predictions for visualization
pred_pandas = predictions.select("event_name", "sell_through", "prediction").toPandas()
pred_pandas.to_csv(os.path.join(output_dir, "models", "predictions.csv"), index=False)
return {
"rmse": rmse,
"mae": mae,
"r2": r2,
"test_size": test_df.count(),
"feature_importance": feature_importance if 'feature_importance' in locals() else None
}
Feature Importance (Top 10)
| Rank |
Feature |
Importance |
| 1 |
is_ppv |
0.1842 |
| 2 |
has_title |
0.1523 |
| 3 |
num_title_fights |
0.0987 |
| 4 |
reddit_hype |
0.0812 |
| 5 |
avg_buzz_7d |
0.0745 |
| 6 |
max_combined_pagerank |
0.0689 |
| 7 |
is_vegas |
0.0534 |
| 8 |
avg_betting_spread |
0.0478 |
| 9 |
num_fights |
0.0423 |
| 10 |
avg_win_rate |
0.0389 |
Saving Results
def save_results(model, metrics, output_dir, feature_columns):
models_dir = os.path.join(output_dir, "models")
os.makedirs(models_dir, exist_ok=True)
# Save model
model_path = os.path.join(models_dir, "gbt_model_improved")
model.write().overwrite().save(model_path)
# Save metrics
metrics_copy = metrics.copy()
metrics_copy["timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
metrics_copy["num_features"] = len(feature_columns)
if metrics_copy.get("feature_importance"):
metrics_copy["feature_importance"] = [
{"feature": f, "importance": float(i)}
for f, i in metrics_copy["feature_importance"]
]
with open(os.path.join(models_dir, "metrics_improved.json"), "w") as f:
json.dump(metrics_copy, f, indent=2)
# Save feature list
with open(os.path.join(models_dir, "features_used.txt"), "w") as f:
f.write("\n".join(feature_columns))
Running Training
Basic Training
spark-submit src/models/train_improved.py \
--data-dir ./data \
--output-dir ./data \
--test-year 2024
With Cross-Validation
spark-submit src/models/train_improved.py \
--data-dir ./data \
--output-dir ./data \
--test-year 2024 \
# CV enabled by default
Skip CV (faster)
spark-submit src/models/train_improved.py \
--data-dir ./data \
--output-dir ./data \
--test-year 2024 \
--no-cv
Random Forest Alternative
spark-submit src/models/train_improved.py \
--data-dir ./data \
--output-dir ./data \
--test-year 2024 \
--model rf
Output Files
data/models/
├── gbt_model_improved/ # Saved Spark ML model
├── metrics_improved.json # Performance metrics
├── predictions.csv # Test set predictions
└── features_used.txt # Feature list
metrics_improved.json Example
{
"rmse": 0.3231,
"mae": 0.1801,
"r2": 0.4763,
"mse": 0.1044,
"test_size": 80,
"timestamp": "2025-01-04 12:00:00",
"num_features": 36,
"feature_importance": [
{"feature": "is_ppv", "importance": 0.1842},
{"feature": "has_title", "importance": 0.1523},
...
]
}