UFC Sell-Through Project - ETL Pipeline
Overview
The ETL pipeline uses PySpark to:
- Load raw CSV files
- Standardize date formats
- Clean and transform data
- Save as Parquet for efficient querying
Spark Session Configuration
def make_spark(app_name="UFC-ETL", local=True):
builder = SparkSession.builder
builder = builder.appName(app_name)
if local:
builder = builder.master("local[*]")
# Performance configurations
builder = builder.config("spark.sql.adaptive.enabled", "true")
builder = builder.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
builder = builder.config("spark.sql.parquet.compression.codec", "snappy")
builder = builder.config("spark.driver.memory", "4g")
builder = builder.config("spark.executor.memory", "4g")
builder = builder.config("spark.sql.shuffle.partitions", "8")
builder = builder.config("spark.default.parallelism", "8")
builder = builder.config("spark.sql.autoBroadcastJoinThreshold", "10485760")
return builder.getOrCreate()
Data Loading
Load Raw CSVs
def load_data(spark, data_dir):
raw_path = os.path.join(data_dir, "raw")
file_map = {
"events": "events.csv",
"fight_results": "fight_results.csv",
"fight_stats": "fight_stats.csv",
"fight_details": "fight_details.csv",
"fighter_details": "fighter_details.csv",
"fighter_tott": "fighter_tott.csv",
}
data = {}
for name, fname in file_map.items():
full_path = os.path.join(raw_path, fname)
if os.path.exists(full_path):
df = spark.read.csv(full_path, header=True, inferSchema=True)
# Lowercase all column names
for col_name in df.columns:
df = df.withColumnRenamed(col_name, col_name.lower())
data[name] = df
return data
Load External Data
def load_external_data(spark, data_dir):
external_path = os.path.join(data_dir, "external")
external_files = {
"betting_odds": "betting_odds.csv",
"google_trends": "google_trends.csv",
"fighter_buzz": "fighter_buzz.csv",
"event_sentiment": "event_sentiment.csv",
"reddit_comments": "reddit_comments.csv",
}
external_data = {}
for name, fname in external_files.items():
full_path = os.path.join(external_path, fname)
if os.path.exists(full_path):
df = spark.read.csv(full_path, header=True, inferSchema=True)
external_data[name] = df
return external_data
Date Standardization
The raw data contains dates in multiple formats. The ETL standardizes them:
def standardize_date(date_col):
# Extract embedded ISO date if present (e.g., "2024-03-09 00:00:00")
extracted = F.when(
date_col.rlike(r"\d{4}-\d{2}-\d{2}"),
F.regexp_extract(date_col, r"(\d{4}-\d{2}-\d{2})", 1)
).otherwise(date_col)
# Try different date formats
iso_date = F.when(
extracted.rlike(r"^\d{4}-\d{2}-\d{2}$"),
F.to_date(extracted, "yyyy-MM-dd")
)
long_date = F.when(
extracted.rlike(r"^[A-Za-z]+ \d+, \d+"),
F.to_date(extracted, "MMMM d, yyyy")
)
short_date = F.when(
extracted.rlike(r"^[A-Za-z]{3} \d+, \d+"),
F.to_date(extracted, "MMM d, yyyy")
)
return F.coalesce(iso_date, long_date, short_date)
Event Cleaning
def clean_events(df):
# Standardize dates
df = df.withColumn("event_date", standardize_date(F.col("date")))
# Extract event type
df = df.withColumn(
"event_type",
F.when(F.col("event").rlike(r"^UFC \d+"), "PPV")
.when(F.col("event").contains("Fight Night"), "Fight Night")
.when(F.col("event").contains("on ESPN"), "ESPN")
.when(F.col("event").contains("on ABC"), "ABC")
.when(F.col("event").contains("on Fox"), "Fox")
.otherwise("Other")
)
# Extract location components
df = df.withColumn(
"city",
F.regexp_extract(F.col("location"), r"^([^,]+)", 1)
)
df = df.withColumn(
"state_country",
F.regexp_extract(F.col("location"), r",\s*(.+)$", 1)
)
# Determine country
us_states = ["Nevada", "California", "New York", "Texas", "Florida", ...]
df = df.withColumn(
"country",
F.when(F.col("state_country").isin(us_states), "USA")
.when(F.col("state_country").contains("Canada"), "Canada")
.when(F.col("state_country").contains("UK"), "UK")
.otherwise(F.col("state_country"))
)
# Extract year
df = df.withColumn("year", F.year("event_date"))
# Create unique event ID
df = df.withColumn("event_id", F.monotonically_increasing_id())
return df
Fighter Cleaning
def clean_fighters(fighter_details, fighter_tott):
# Combine details and physical attributes
df = fighter_details.join(
fighter_tott,
on=F.col("first") + " " + F.col("last") == F.col("fighter"),
how="left"
)
# Parse height to inches
df = df.withColumn(
"height_inches",
F.when(F.col("height").rlike(r"\d+' \d+"),
F.regexp_extract(F.col("height"), r"(\d+)'", 1).cast("int") * 12 +
F.regexp_extract(F.col("height"), r"(\d+)\"", 1).cast("int")
).otherwise(None)
)
# Parse reach
df = df.withColumn(
"reach_inches",
F.when(F.col("reach").rlike(r"\d+"),
F.regexp_extract(F.col("reach"), r"(\d+)", 1).cast("int")
).otherwise(None)
)
# Parse DOB
df = df.withColumn("dob", F.to_date(F.col("dob"), "MMM dd, yyyy"))
# Full name
df = df.withColumn(
"fighter_name",
F.concat_ws(" ", F.col("first"), F.col("last"))
)
return df
Fight Cleaning
def clean_fights(fight_results, events_lookup):
df = fight_results
# Extract fighter names from BOUT column
df = df.withColumn(
"fighter1_name",
F.trim(F.regexp_extract(F.col("bout"), r"^(.+?)\s+vs\.?\s+", 1))
)
df = df.withColumn(
"fighter2_name",
F.trim(F.regexp_extract(F.col("bout"), r"\s+vs\.?\s+(.+)$", 1))
)
# Parse method category
df = df.withColumn(
"method_category",
F.when(F.col("method").contains("KO"), "KO/TKO")
.when(F.col("method").contains("TKO"), "KO/TKO")
.when(F.col("method").contains("SUB"), "Submission")
.when(F.col("method").contains("Submission"), "Submission")
.when(F.col("method").contains("DEC"), "Decision")
.when(F.col("method").contains("Decision"), "Decision")
.otherwise("Other")
)
# Identify title fights
df = df.withColumn(
"is_title_fight",
F.col("weightclass").contains("Title").cast("boolean")
)
# Extract weight class
df = df.withColumn(
"weight_class",
F.regexp_extract(F.col("weightclass"), r"([\w]+weight|Catch Weight)", 1)
)
# Join with events to get event_id and date
df = df.join(events_lookup, on="event", how="left")
# Create fight ID
df = df.withColumn("fight_id", F.monotonically_increasing_id())
# Determine winner
df = df.withColumn(
"winner_name",
F.when(F.col("outcome") == "W", F.col("fighter1_name"))
.otherwise(F.col("fighter2_name"))
)
return df
Fight Stats Cleaning
def clean_fight_stats(stats_df, fights_lookup):
df = stats_df
# Parse "X of Y" format
def parse_of_stat(col_name):
return F.regexp_extract(F.col(col_name), r"(\d+) of \d+", 1).cast("int")
def parse_of_attempted(col_name):
return F.regexp_extract(F.col(col_name), r"\d+ of (\d+)", 1).cast("int")
df = df.withColumn("sig_strikes_landed", parse_of_stat("sig.str."))
df = df.withColumn("sig_strikes_attempted", parse_of_attempted("sig.str."))
df = df.withColumn("takedowns_landed", parse_of_stat("td"))
df = df.withColumn("takedowns_attempted", parse_of_attempted("td"))
# Parse control time (MM:SS) to seconds
df = df.withColumn(
"ctrl_seconds",
F.when(F.col("ctrl").rlike(r"\d+:\d+"),
F.regexp_extract(F.col("ctrl"), r"(\d+):", 1).cast("int") * 60 +
F.regexp_extract(F.col("ctrl"), r":(\d+)", 1).cast("int")
).otherwise(0)
)
# Join with fights to get fight_id
df = df.join(fights_lookup.select("bout", "event", "fight_id"),
on=["bout", "event"], how="left")
return df
Save to Parquet
def save_processed(data, output_dir):
proc_dir = os.path.join(output_dir, "processed")
os.makedirs(proc_dir, exist_ok=True)
for name, df in data.items():
output_path = os.path.join(proc_dir, name)
df.write.mode("overwrite").parquet(output_path)
print(f" Saved {name}: {df.count()} rows")
Main Pipeline
def run_etl(data_dir, output_dir):
print("Starting UFC ETL Pipeline...")
spark = make_spark()
spark.sparkContext.setLogLevel("WARN")
try:
# Load raw data
raw_data = load_data(spark, data_dir)
# Clean each dataset
events = clean_events(raw_data["events"])
fighters = clean_fighters(
raw_data["fighter_details"],
raw_data["fighter_tott"]
)
fights = clean_fights(
raw_data["fight_results"],
events.select("event", "event_id", "event_date")
)
stats = clean_fight_stats(
raw_data["fight_stats"],
fights.select("bout", "event", "fight_id")
)
# Cache for reuse
events.cache()
fighters.cache()
fights.cache()
stats.cache()
# Save processed data
processed = {
"events": events,
"fighters": fighters,
"fights": fights,
"fight_stats": stats
}
save_processed(processed, output_dir)
print("ETL Pipeline Complete!")
finally:
spark.stop()
Running the Pipeline
Local Development
spark-submit src/etl/spark_etl.py \
--data-dir ./data \
--output-dir ./data
On Roar Cluster
spark-submit \
--master yarn \
--deploy-mode client \
--driver-memory 8g \
--executor-memory 8g \
--num-executors 4 \
src/etl/spark_etl.py \
--data-dir /storage/work/user/ufc_data \
--output-dir /storage/work/user/ufc_data
Data Validation
def validate_processed_data(spark, data_dir):
proc_dir = os.path.join(data_dir, "processed")
events = spark.read.parquet(os.path.join(proc_dir, "events"))
fights = spark.read.parquet(os.path.join(proc_dir, "fights"))
# Check for nulls in key columns
null_dates = events.filter(F.col("event_date").isNull()).count()
null_fighters = fights.filter(F.col("fighter1_name").isNull()).count()
print(f"Events with null dates: {null_dates}")
print(f"Fights with null fighters: {null_fighters}")
# Verify joins worked
orphan_fights = fights.join(
events.select("event_id"),
on="event_id",
how="left_anti"
).count()
print(f"Orphan fights (no event): {orphan_fights}")