Sparky¶
import importlib
import time
import util
import SparkHandler as sh
importlib.reload(util)
importlib.reload(sh)
<module 'SparkHandler' from '/bigdata/students/rsbhatia2/p3/SparkHandler.py'>
sparky = sh.Sparky(spark, sc)
Strangely Snowy¶
Find a location that contains snow while its surroundings do not. Why does this occur? Is it a high mountain peak in a desert?
Setup¶
- I am using only months that are considered "winter months" in North America. They are November, December, January, and February
- Partial results are stored back in the hdfs.
from pyspark.sql import functions as F
import time
# ------------------------------------------------------------
# Parser
# ------------------------------------------------------------
from pyspark.sql.types import StructType, StructField, LongType, DoubleType
def parse_line(line):
try:
if line.startswith("1_time"):
return None
parts = line.rstrip("\t").split("\t")
lat = util.safe_float(parts, lat_idx)
lon = util.safe_float(parts, lon_idx)
snow = util.safe_float(parts, snow_idx)
temp = util.safe_float(parts, temp_idx)
if lat is None or lon is None or snow is None or temp is None:
return None
albedo = util.safe_float(parts, albedo_idx)
vegetation = util.safe_float(parts, veg_idx)
precip = util.safe_float(parts, precip_idx)
return (
lat,
lon,
snow,
temp,
albedo,
vegetation,
precip,
)
except Exception:
return None
schema = StructType([
StructField("lat", DoubleType(), False),
StructField("lon", DoubleType(), False),
StructField("snow", DoubleType(), False),
StructField("temp", DoubleType(), False),
StructField("albedo", DoubleType(), True),
StructField("vegetation", DoubleType(), True),
StructField("precip", DoubleType(), True),
])
def snow_partial_transform(points):
return (
points
.withColumn("lat_i", F.floor(F.col("lat") * 10).cast("int"))
.withColumn("lon_i", F.floor(F.col("lon") * 10).cast("int"))
.groupBy("lat_i", "lon_i")
.agg(
F.sum("lat").alias("sum_lat"),
F.sum("lon").alias("sum_lon"),
F.max("snow").alias("max_snow"),
F.sum("snow").alias("sum_snow"),
F.sum("temp").alias("sum_temp"),
F.sum("albedo").alias("sum_albedo"),
F.count("albedo").alias("count_albedo"),
F.sum("vegetation").alias("sum_vegetation"),
F.count("vegetation").alias("count_vegetation"),
F.sum("precip").alias("sum_precip"),
F.count("precip").alias("count_precip"),
F.count("*").alias("count_points"),
)
)
snow_ctx = sh.SparkyBatchContext(
glob_path="hdfs:///3hr/*/*_20*{11,12,01,02}*_*",
partial_out="hdfs:///sparky/strangely_snowy_partials",
parse_line=parse_line,
schema=schema,
partial_transform=snow_partial_transform,
batch_size=50)
strangely_snowy_runtime_start = time.perf_counter()
input_paths = sparky.setup_job(snow_ctx)
Deleted old partial output: hdfs:/sparky/strangely_snowy_partials Found 4036 files
Feature Engineering¶
For this question, I am using the following features:
timeis the timestamp. It has datetime info for the sensorlatis necessary for creating regional buckets and doing neighbor comparisons.lonis necessary for creating regional buckets and doing neighbor comparisons.snow_depth_surfaceis needed cause it's snowy.temperature_surfacea place with lower temperature is more likely to have snow.albedo_surfaceis the reflective index of the surface of a location. A higher albedo value means that it is cooler.vegetation_surfaceallows me to determine if the location identified is a desert or not. Lower vegetation is a desert.total_precipitation_surface_3_hour_accumulationgives us an idea of whether the anomaly is because of a snow storm of some kind.
time_idx = sparky.required_col("time")
lat_idx = sparky.required_col("lat")
lon_idx = sparky.required_col("lon")
snow_idx = sparky.required_col("snow_depth_surface")
temp_idx = sparky.required_col("temperature_surface")
albedo_idx = sparky.required_col("albedo_surface")
veg_idx = sparky.required_col("vegetation_surface")
precip_idx = sparky.required_col("total_precipitation_surface_3_hour_accumulation")
print("Using columns:")
print("time:", time_idx)
print("lat:", lat_idx)
print("lon:", lon_idx)
print("snow:", snow_idx)
print("temp:", temp_idx)
print("albedo:", albedo_idx)
print("vegetation:", veg_idx)
print("precip:", precip_idx)
Using columns: time: 0 lat: 1 lon: 2 snow: 9 temp: 10 albedo: 3 vegetation: 14 precip: 13
for batch_id, start in enumerate(range(0, len(input_paths), snow_ctx.batch_size)):
batch_paths = input_paths[start:start + snow_ctx.batch_size]
sparky.process_batch(snow_ctx, batch_paths, batch_id)
Processing batch 0: 50 files
Processing batch 1: 50 files
Processing batch 2: 50 files
Processing batch 3: 50 files
Processing batch 4: 50 files
Processing batch 5: 50 files
Processing batch 6: 50 files
Processing batch 7: 50 files
Processing batch 8: 50 files
Processing batch 9: 50 files
Processing batch 10: 50 files
Processing batch 11: 50 files
Processing batch 12: 50 files
Processing batch 13: 50 files
Processing batch 14: 50 files
Processing batch 15: 50 files
Processing batch 16: 50 files
Processing batch 17: 50 files
Processing batch 18: 50 files
Processing batch 19: 50 files
Processing batch 20: 50 files
Processing batch 21: 50 files
Processing batch 22: 50 files
Processing batch 23: 50 files
Processing batch 24: 50 files
Processing batch 25: 50 files
Processing batch 26: 50 files
Processing batch 27: 50 files
Processing batch 28: 50 files
Processing batch 29: 50 files
Processing batch 30: 50 files
Processing batch 31: 50 files
Processing batch 32: 50 files
Processing batch 33: 50 files
Processing batch 34: 50 files
Processing batch 35: 50 files
Processing batch 36: 50 files
Processing batch 37: 50 files
Processing batch 38: 50 files
Processing batch 39: 50 files
Processing batch 40: 50 files
Processing batch 41: 50 files
Processing batch 42: 50 files
Processing batch 43: 50 files
Processing batch 44: 50 files
Processing batch 45: 50 files
Processing batch 46: 50 files
Processing batch 47: 50 files
Processing batch 48: 50 files
Processing batch 49: 50 files
Processing batch 50: 50 files
Processing batch 51: 50 files
Processing batch 52: 50 files
Processing batch 53: 50 files
Processing batch 54: 50 files
Processing batch 55: 50 files
Processing batch 56: 50 files
Processing batch 57: 50 files
Processing batch 58: 50 files
Processing batch 59: 50 files
Processing batch 60: 50 files
Processing batch 61: 50 files
Processing batch 62: 50 files
Processing batch 63: 50 files
Processing batch 64: 50 files
Processing batch 65: 50 files
Processing batch 66: 50 files
Processing batch 67: 50 files
Processing batch 68: 50 files
Processing batch 69: 50 files
Processing batch 70: 50 files
Processing batch 71: 50 files
Processing batch 72: 50 files
Processing batch 73: 50 files
Processing batch 74: 50 files
Processing batch 75: 50 files
Processing batch 76: 50 files
Processing batch 77: 50 files
Processing batch 78: 50 files
Processing batch 79: 50 files
Processing batch 80: 36 files
from pyspark.sql import functions as F
def combine_snow_partials(partials):
return (
partials
.groupBy("lat_i", "lon_i")
.agg(
(F.sum("sum_lat") / F.sum("count_points")).alias("lat"),
(F.sum("sum_lon") / F.sum("count_points")).alias("lon"),
F.max("max_snow").alias("cell_max_snow"),
(F.sum("sum_snow") / F.sum("count_points")).alias("cell_avg_snow"),
(F.sum("sum_temp") / F.sum("count_points")).alias("cell_avg_temp"),
(F.sum("sum_albedo") / F.sum("count_albedo")).alias("cell_avg_albedo"),
(F.sum("sum_vegetation") / F.sum("count_vegetation")).alias("cell_avg_vegetation"),
(F.sum("sum_precip") / F.sum("count_precip")).alias("cell_avg_precip"),
F.sum("count_points").alias("cell_points"),
)
)
def find_isolated_snow_cells(cell_stats):
regional_stats = (
cell_stats
.withColumn("region_lat", F.floor(F.col("lat_i") / 5))
.withColumn("region_lon", F.floor(F.col("lon_i") / 5))
.groupBy("region_lat", "region_lon")
.agg(
F.avg("cell_avg_snow").alias("regional_avg_snow"),
F.max("cell_max_snow").alias("regional_max_snow"),
F.avg("cell_avg_temp").alias("regional_avg_temp"),
F.avg("cell_avg_albedo").alias("regional_avg_albedo"),
F.avg("cell_avg_vegetation").alias("regional_avg_vegetation"),
F.avg("cell_avg_precip").alias("regional_avg_precip"),
F.count("*").alias("regional_cell_count"),
)
)
return (
cell_stats
.withColumn("region_lat", F.floor(F.col("lat_i") / 5))
.withColumn("region_lon", F.floor(F.col("lon_i") / 5))
.join(regional_stats, ["region_lat", "region_lon"], "inner")
.withColumn("snow_isolation_score", F.col("cell_max_snow") - F.col("regional_avg_snow"))
.withColumn("temp_colder_than_region", F.col("regional_avg_temp") - F.col("cell_avg_temp"))
.withColumn("albedo_higher_than_region", F.col("cell_avg_albedo") - F.col("regional_avg_albedo"))
.withColumn("vegetation_lower_than_region", F.col("regional_avg_vegetation") - F.col("cell_avg_vegetation"))
.filter(F.col("cell_max_snow") >= 0.5)
.filter(F.col("regional_avg_snow") <= 0.05)
.filter(F.col("regional_cell_count") >= 4)
.orderBy(F.desc("snow_isolation_score"), F.desc("cell_max_snow"))
.select(
"lat",
"lon",
"cell_max_snow",
"cell_avg_snow",
"regional_avg_snow",
"regional_max_snow",
"snow_isolation_score",
"cell_avg_temp",
"regional_avg_temp",
"temp_colder_than_region",
"cell_avg_albedo",
"regional_avg_albedo",
"albedo_higher_than_region",
"cell_avg_vegetation",
"regional_avg_vegetation",
"vegetation_lower_than_region",
"cell_points",
"regional_cell_count",
)
.limit(3)
)
snow_analysis_ctx = sh.SparkyAnalysisContext(
partial_in=snow_ctx.partial_out,
combine_partials=combine_snow_partials,
analyze=find_isolated_snow_cells,
result_out=None,
)
cell_stats = sparky.combine(snow_analysis_ctx)
top3 = sparky.analyze(snow_analysis_ctx, cell_stats)
top3.show(3, truncate=False)
strangely_snowy_runtime = time.perf_counter() - strangely_snowy_runtime_start
print(f"Total Strangely Snowy Spark runtime: {strangely_snowy_runtime:.2f} seconds")
+------------------+-------------------+-------------+-------------------+--------------------+-----------------+--------------------+-----------------+------------------+-----------------------+------------------+-------------------+-------------------------+-------------------+-----------------------+----------------------------+-----------+-------------------+ |lat |lon |cell_max_snow|cell_avg_snow |regional_avg_snow |regional_max_snow|snow_isolation_score|cell_avg_temp |regional_avg_temp |temp_colder_than_region|cell_avg_albedo |regional_avg_albedo|albedo_higher_than_region|cell_avg_vegetation|regional_avg_vegetation|vegetation_lower_than_region|cell_points|regional_cell_count| +------------------+-------------------+-------------+-------------------+--------------------+-----------------+--------------------+-----------------+------------------+-----------------------+------------------+-------------------+-------------------------+-------------------+-----------------------+----------------------------+-----------+-------------------+ |36.48025474536799 |-118.56796062211633|3.5512 |0.2250536953106203 |0.04022571125487863 |3.5512 |3.5109742887451216 |273.6274536550868|282.88273287191134|9.255279216824533 |34.27946140680069 |17.87228976387486 |16.40717164292583 |42.21369727245658 |44.79528352141172 |2.5815862489551407 |4030 |20 | |36.374458471843475|-118.54488139420283|2.5751998 |0.16376453928171214|0.04022571125487863 |3.5512 |2.5349740887451215 |275.4141993548388|282.88273287191134|7.468533517072558 |27.601104492429887|17.87228976387486 |9.728814728555026 |43.9483995057072 |44.79528352141172 |0.8468840157045179 |4030 |20 | |43.58407050794018 |-86.06345689016926 |2.2184 |0.0411021434923077 |0.033958424925835444|2.2184 |2.1844415750741644 |279.4982177245658|279.81490099386303|0.31668326929724344 |26.151638124596676|23.022939336922576 |3.1286987876741 |33.55467741985112 |27.951770737498375 |-5.602906682352742 |4030 |22 | +------------------+-------------------+-------------+-------------------+--------------------+-----------------+--------------------+-----------------+------------------+-----------------------+------------------+-------------------+-------------------------+-------------------+-----------------------+----------------------------+-----------+-------------------+ Total Strangely Snowy Spark runtime: 2527.01 seconds
Result¶
The top place that this job found is located at 36.47, -118.56, which is Black Rock Pass Trail, CA
This place has a max_snow value of $3.5512$ while the regional average is $0.0368$ .
This result makes sense because the average temperature at this location is $273.67K$ which is $0.52\degree C$
Interestingly, this is slightly more than water's freezing point of $0\degree C$ but given its high altitude, it makes sense that there is snow present at this temperature because of decrease in pressure at high altitude.

Climate Chart¶
Given a Geohash prefix as an input, build a function that will create a climate chart for the region. This includes high, low, and average temperatures, as well as monthly average rainfall (precipitation).
Setup¶
For this question, I need to process all available files.
The reason for this is that I have no prior information about which geohash is present in which file, and if I want to make the function to be independent of that, I need to parse all files so that the same function can be reused for multiple geohash prefixes.
I am still doing batch processing like the previous question, and that helps not filling up the /tmp directory.
import sys
import importlib
import time
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, LongType, DoubleType
import util
import SparkHandler
importlib.reload(util)
importlib.reload(SparkHandler)
from util import safe_float, geohash_bbox
from SparkHandler import Sparky, SparkyBatchContext, SparkyAnalysisContext
# ------------------------------------------------------------
# Config
# ------------------------------------------------------------
GEOHASH_PREFIX = "9q8"
INPUT_GLOB = "hdfs:///3hr/*/*"
BATCH_SIZE = 50
PARTIAL_OUT = f"hdfs:///climate_chart_partials/{GEOHASH_PREFIX}"
lat_min, lat_max, lon_min, lon_max = geohash_bbox(GEOHASH_PREFIX)
print("Geohash:", GEOHASH_PREFIX)
print("lat range:", lat_min, lat_max)
print("lon range:", lon_min, lon_max)
# ------------------------------------------------------------
# Schema
# ------------------------------------------------------------
raw_schema = StructType([
StructField("time_ms", LongType(), False),
StructField("lat", DoubleType(), False),
StructField("lon", DoubleType(), False),
StructField("temp_k", DoubleType(), False),
StructField("precip", DoubleType(), False),
])
Geohash: 9q8 lat range: 36.5625 37.96875 lon range: -123.75 -122.34375
# ------------------------------------------------------------
# Batch partial transform
# ------------------------------------------------------------
def climate_partial_transform(points):
return (
points
.withColumn(
"timestamp",
F.from_unixtime((F.col("time_ms") / 1000).cast("long")).cast("timestamp")
)
.withColumn("year", F.year("timestamp"))
.withColumn("month", F.month("timestamp"))
.withColumn("lat_i", F.floor(F.col("lat") * 100).cast("int"))
.withColumn("lon_i", F.floor(F.col("lon") * 100).cast("int"))
.groupBy("year", "month", "lat_i", "lon_i")
.agg(
F.min("temp_k").alias("min_temp_k"),
F.max("temp_k").alias("max_temp_k"),
F.sum("temp_k").alias("sum_temp_k"),
F.count("temp_k").alias("count_temp"),
# NAM field is 3-hour accumulated precipitation.
# Summing gives monthly accumulated precipitation for the cell.
F.sum("precip").alias("sum_precip"),
F.count("precip").alias("count_precip"),
)
)
# ------------------------------------------------------------
# Analysis: combine partials
# ------------------------------------------------------------
def combine_climate_partials(partials):
return (
partials
.groupBy("year", "month", "lat_i", "lon_i")
.agg(
F.min("min_temp_k").alias("cell_month_min_temp_k"),
F.max("max_temp_k").alias("cell_month_max_temp_k"),
(F.sum("sum_temp_k") / F.sum("count_temp")).alias("cell_month_avg_temp_k"),
F.sum("sum_precip").alias("cell_month_total_precip"),
)
)
# ------------------------------------------------------------
# Analysis: build climate chart table
# ------------------------------------------------------------
def analyze_climate(monthly_cell):
return (
monthly_cell
.groupBy("month")
.agg(
F.avg("cell_month_min_temp_k").alias("low_temp_k"),
F.avg("cell_month_max_temp_k").alias("high_temp_k"),
F.avg("cell_month_avg_temp_k").alias("avg_temp_k"),
F.avg("cell_month_total_precip").alias("avg_monthly_precip"),
F.count("*").alias("month_cell_samples"),
)
.withColumn("low_temp_c", F.col("low_temp_k") - 273.15)
.withColumn("high_temp_c", F.col("high_temp_k") - 273.15)
.withColumn("avg_temp_c", F.col("avg_temp_k") - 273.15)
.orderBy("month")
.select(
"month",
"low_temp_c",
"high_temp_c",
"avg_temp_c",
"avg_monthly_precip",
"month_cell_samples",
)
)
sparky = Sparky(spark, sc)
setup_ctx = SparkyBatchContext(
glob_path=INPUT_GLOB,
partial_out=PARTIAL_OUT,
schema=raw_schema,
parse_line=lambda line: None,
partial_transform=climate_partial_transform,
batch_size=BATCH_SIZE,
)
input_paths = sparky.setup_job(setup_ctx)
print("Total input files:", len(input_paths))
print("Batch size:", BATCH_SIZE)
print("Number of batches:", (len(input_paths) + BATCH_SIZE - 1) // BATCH_SIZE)
climate_chart_runtime_start = time.perf_counter()
if len(input_paths) == 0:
raise ValueError(f"No files matched: {INPUT_GLOB}")
# ------------------------------------------------------------
# Resolve required columns
# ------------------------------------------------------------
time_idx = sparky.required_col("time")
lat_idx = sparky.required_col("lat")
lon_idx = sparky.required_col("lon")
temp_idx = sparky.required_col("temperature_surface")
precip_idx = sparky.required_col("total_precipitation_surface_3_hour_accumulation")
print("Using column indexes:")
print("time:", time_idx)
print("lat:", lat_idx)
print("lon:", lon_idx)
print("temp:", temp_idx)
print("precip:", precip_idx)
Deleted old partial output: hdfs:/climate_chart_partials/9q8
26/05/21 23:12:24 WARN SparkContext: The path /bigdata/students/rsbhatia2/p3/util.py has been added already. Overwriting of added paths is not supported in the current version. 26/05/21 23:12:24 WARN SparkContext: The path /bigdata/students/rsbhatia2/p3/SparkHandler.py has been added already. Overwriting of added paths is not supported in the current version.
Found 6770 files
[Stage 258:> (0 + 1) / 1]
Total input files: 6770 Batch size: 50 Number of batches: 136 Using column indexes: time: 0 lat: 1 lon: 2 temp: 10 precip: 13
# ------------------------------------------------------------
# Parser
# ------------------------------------------------------------
def make_climate_parser(
time_idx,
lat_idx,
lon_idx,
temp_idx,
precip_idx,
lat_min,
lat_max,
lon_min,
lon_max,
):
def parse_climate_line(line):
try:
if line.startswith("1_time"):
return None
parts = line.rstrip("\t").split("\t")
time_ms = safe_float(parts, time_idx)
lat = safe_float(parts, lat_idx)
lon = safe_float(parts, lon_idx)
temp_k = safe_float(parts, temp_idx)
precip = safe_float(parts, precip_idx)
if (
time_ms is None
or lat is None
or lon is None
or temp_k is None
or precip is None
):
return None
# Filter to geohash region before DataFrame work.
if not (lat_min <= lat <= lat_max and lon_min <= lon <= lon_max):
return None
return (
int(time_ms),
lat,
lon,
temp_k,
precip,
)
except Exception:
return None
return parse_climate_line
parse_climate_line = make_climate_parser(
time_idx=time_idx,
lat_idx=lat_idx,
lon_idx=lon_idx,
temp_idx=temp_idx,
precip_idx=precip_idx,
lat_min=lat_min,
lat_max=lat_max,
lon_min=lon_min,
lon_max=lon_max,
)
# ------------------------------------------------------------
# batch context
# ------------------------------------------------------------
batch_ctx = SparkyBatchContext(
glob_path=INPUT_GLOB,
partial_out=PARTIAL_OUT,
schema=raw_schema,
parse_line=parse_climate_line,
partial_transform=climate_partial_transform,
batch_size=BATCH_SIZE,
)
# ------------------------------------------------------------
# Analysis context
# ------------------------------------------------------------
analysis_ctx = SparkyAnalysisContext(
partial_in=PARTIAL_OUT,
combine_partials=combine_climate_partials,
analyze=analyze_climate,
result_out=None,
)
# ------------------------------------------------------------
# Run batches
# ------------------------------------------------------------
for batch_id, start in enumerate(range(0, len(input_paths), batch_ctx.batch_size)):
batch_paths = input_paths[start:start + batch_ctx.batch_size]
sparky.process_batch(batch_ctx, batch_paths, batch_id)
Processing batch 0: 50 files
Processing batch 1: 50 files
Processing batch 2: 50 files
Processing batch 3: 50 files
Processing batch 4: 50 files
Processing batch 5: 50 files
Processing batch 6: 50 files
Processing batch 7: 50 files
Processing batch 8: 50 files
Processing batch 9: 50 files
Processing batch 10: 50 files
Processing batch 11: 50 files
Processing batch 12: 50 files
Processing batch 13: 50 files
Processing batch 14: 50 files
Processing batch 15: 50 files
Processing batch 16: 50 files
Processing batch 17: 50 files
Processing batch 18: 50 files
Processing batch 19: 50 files
Processing batch 20: 50 files
Processing batch 21: 50 files
Processing batch 22: 50 files
Processing batch 23: 50 files
Processing batch 24: 50 files
Processing batch 25: 50 files
Processing batch 26: 50 files
Processing batch 27: 50 files
Processing batch 28: 50 files
Processing batch 29: 50 files
Processing batch 30: 50 files
Processing batch 31: 50 files
Processing batch 32: 50 files
Processing batch 33: 50 files
Processing batch 34: 50 files
Processing batch 35: 50 files
Processing batch 36: 50 files
Processing batch 37: 50 files
Processing batch 38: 50 files
Processing batch 39: 50 files
Processing batch 40: 50 files
Processing batch 41: 50 files
Processing batch 42: 50 files
Processing batch 43: 50 files
Processing batch 44: 50 files
Processing batch 45: 50 files
Processing batch 46: 50 files
Processing batch 47: 50 files
Processing batch 48: 50 files
Processing batch 49: 50 files
Processing batch 50: 50 files
Processing batch 51: 50 files
Processing batch 52: 50 files
Processing batch 53: 50 files
Processing batch 54: 50 files
Processing batch 55: 50 files
Processing batch 56: 50 files
Processing batch 57: 50 files
Processing batch 58: 50 files
Processing batch 59: 50 files
Processing batch 60: 50 files
Processing batch 61: 50 files
Processing batch 62: 50 files
Processing batch 63: 50 files
Processing batch 64: 50 files
Processing batch 65: 50 files
Processing batch 66: 50 files
Processing batch 67: 50 files
Processing batch 68: 50 files
Processing batch 69: 50 files
Processing batch 70: 50 files
Processing batch 71: 50 files
Processing batch 72: 50 files
Processing batch 73: 50 files
Processing batch 74: 50 files
Processing batch 75: 50 files
Processing batch 76: 50 files
Processing batch 77: 50 files
Processing batch 78: 50 files
Processing batch 79: 50 files
Processing batch 80: 50 files
Processing batch 81: 50 files
Processing batch 82: 50 files
Processing batch 83: 50 files
Processing batch 84: 50 files
Processing batch 85: 50 files
Processing batch 86: 50 files
Processing batch 87: 50 files
Processing batch 88: 50 files
Processing batch 89: 50 files
Processing batch 90: 50 files
Processing batch 91: 50 files
Processing batch 92: 50 files
Processing batch 93: 50 files
Processing batch 94: 50 files
Processing batch 95: 50 files
Processing batch 96: 50 files
Processing batch 97: 50 files
Processing batch 98: 50 files
Processing batch 99: 50 files
Processing batch 100: 50 files
Processing batch 101: 50 files
Processing batch 102: 50 files
Processing batch 103: 50 files
Processing batch 104: 50 files
Processing batch 105: 50 files
Processing batch 106: 50 files
Processing batch 107: 50 files
Processing batch 108: 50 files
Processing batch 109: 50 files
Processing batch 110: 50 files
Processing batch 111: 50 files
Processing batch 112: 50 files
Processing batch 113: 50 files
Processing batch 114: 50 files
Processing batch 115: 50 files
Processing batch 116: 50 files
Processing batch 117: 50 files
Processing batch 118: 50 files
Processing batch 119: 50 files
Processing batch 120: 50 files
Processing batch 121: 50 files
Processing batch 122: 50 files
Processing batch 123: 50 files
Processing batch 124: 50 files
Processing batch 125: 50 files
Processing batch 126: 50 files
Processing batch 127: 50 files
Processing batch 128: 50 files
Processing batch 129: 50 files
Processing batch 130: 50 files
Processing batch 131: 50 files
Processing batch 132: 50 files
Processing batch 133: 50 files
Processing batch 134: 50 files
Processing batch 135: 20 files
# ------------------------------------------------------------
# Combine + analyze
# ------------------------------------------------------------
monthly_cell = sparky.combine(analysis_ctx)
climate = sparky.analyze(analysis_ctx, monthly_cell)
climate.show(12, truncate=False)
[Stage 671:> (0 + 1) / 1]
+-----+------------------+------------------+------------------+------------------+------------------+ |month|low_temp_c |high_temp_c |avg_temp_c |avg_monthly_precip|month_cell_samples| +-----+------------------+------------------+------------------+------------------+------------------+ |1 |12.016236348974303|14.309803240470046|13.181206475171223|45.917338709677416|682 | |2 |11.649982639297434|14.54796824046906 |13.0830678088156 |36.53354105571847 |682 | |3 |11.734685982403846|14.864220967741858|13.083496387227683|22.082950879765395|682 | |4 |11.814751147741504|14.763760231990375|12.970997660776447|11.306666687449331|819 | |5 |11.562745754026366|14.911664436311582|12.97150433422297 |7.391087115666179 |683 | |6 |11.90829101024758 |16.16154909224224 |13.731071802928227|1.9405197657393864|683 | |7 |12.695180439238584|16.93223509516764 |14.705720532938074|3.0604868228404074|683 | |8 |13.853945841875657|18.351347964860622|15.888160752673343|3.1662701317715958|683 | |9 |14.561720146413222|18.62315465592974 |16.27131729169463 |1.848188140556369 |683 | |10 |14.205379575402674|17.53579715958972 |15.603527402464351|5.35953513909224 |683 | |11 |13.244209956076645|16.718046676428912|14.844923994043654|17.75137262079063 |683 | |12 |12.551296642245688|14.921054383394164|13.73723547992904 |20.238629426129425|819 | +-----+------------------+------------------+------------------+------------------+------------------+
# ------------------------------------------------------------
# Plot
# ------------------------------------------------------------
pdf = climate.toPandas()
if len(pdf) == 0:
print("No data found for this geohash prefix.")
else:
month_labels = [
"Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
]
pdf["month_name"] = pdf["month"].apply(lambda m: month_labels[int(m) - 1])
fig, ax1 = plt.subplots(figsize=(10, 5))
ax1.plot(
pdf["month_name"],
pdf["high_temp_c"],
marker="o",
color="red",
label="Max temp",
)
ax1.plot(
pdf["month_name"],
pdf["avg_temp_c"],
marker="o",
color="purple",
label="Avg temp",
)
ax1.plot(
pdf["month_name"],
pdf["low_temp_c"],
marker="o",
color="blue",
label="Min temp",
)
ax1.set_xlabel("Month")
ax1.set_ylabel("Temperature (°C)")
ax1.set_ylim(bottom=0)
ax1.legend(loc="upper left")
ax2 = ax1.twinx()
ax2.bar(
pdf["month_name"],
pdf["avg_monthly_precip"],
alpha=0.3,
label="Avg monthly precip",
)
ax2.set_ylabel("Average monthly precipitation")
ax2.set_ylim(bottom=0)
ax2.legend(loc="upper right")
plt.title(f"Climate Chart for Geohash Prefix {GEOHASH_PREFIX}")
plt.grid(True, axis="y", alpha=0.3)
plt.show()
climate_chart_runtime = time.perf_counter() - climate_chart_runtime_start
print(f"Total Climate Chart Spark runtime: {climate_chart_runtime:.2f} seconds")
Total Climate Chart Spark runtime: 2224.99 seconds
Travel Startup¶
import sys
import importlib
import re
import time
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.types import (
StructType,
StructField,
StringType,
LongType,
DoubleType,
)
from pyspark.sql.window import Window
import util
import SparkHandler
importlib.reload(util)
importlib.reload(SparkHandler)
from util import safe_float
from SparkHandler import Sparky, SparkyBatchContext, SparkyAnalysisContext
# ============================================================
# CONFIG
# ============================================================
sc.setLogLevel("ERROR")
INPUT_GLOB = "hdfs:///3hr/2018/*"
BATCH_SIZE = 50
PARTIAL_OUT = "hdfs:///travel_startup_2018_partials"
TRAVEL_WINDOWS = [
("Winter Break / New Year", [12, 1]),
("Spring", [2, 3, 4]),
("Early Summer", [5, 6, 7]),
("Late Summer", [8, 9]),
("Fall", [10, 11]),
]
REGIONS = [
{
"region": "San Francisco Bay Area",
"lat_min": 37.2,
"lat_max": 38.2,
"lon_min": -122.8,
"lon_max": -121.7,
},
{
"region": "Los Angeles Coast",
"lat_min": 33.5,
"lat_max": 34.4,
"lon_min": -118.8,
"lon_max": -117.7,
},
{
"region": "San Diego Coast",
"lat_min": 32.5,
"lat_max": 33.1,
"lon_min": -117.4,
"lon_max": -116.8,
},
{
"region": "Seattle / Puget Sound",
"lat_min": 47.2,
"lat_max": 47.9,
"lon_min": -122.7,
"lon_max": -121.9,
},
{
"region": "Portland / Willamette Valley",
"lat_min": 45.2,
"lat_max": 45.8,
"lon_min": -123.1,
"lon_max": -122.2,
},
{
"region": "Denver / Front Range",
"lat_min": 39.4,
"lat_max": 40.1,
"lon_min": -105.4,
"lon_max": -104.5,
},
{
"region": "Las Vegas",
"lat_min": 35.9,
"lat_max": 36.5,
"lon_min": -115.5,
"lon_max": -114.8,
},
{
"region": "Phoenix / Scottsdale",
"lat_min": 33.2,
"lat_max": 33.8,
"lon_min": -112.4,
"lon_max": -111.6,
},
{
"region": "New York City",
"lat_min": 40.3,
"lat_max": 41.1,
"lon_min": -74.4,
"lon_max": -73.4,
},
{
"region": "Chicago",
"lat_min": 41.5,
"lat_max": 42.2,
"lon_min": -88.1,
"lon_max": -87.3,
},
{
"region": "Miami",
"lat_min": 25.5,
"lat_max": 26.1,
"lon_min": -80.5,
"lon_max": -80.0,
},
{
"region": "New Orleans",
"lat_min": 29.7,
"lat_max": 30.3,
"lon_min": -90.4,
"lon_max": -89.7,
},
{
"region": "Boston",
"lat_min": 42.1,
"lat_max": 42.6,
"lon_min": -71.4,
"lon_max": -70.8,
},
{
"region": "Vancouver",
"lat_min": 49.0,
"lat_max": 49.5,
"lon_min": -123.4,
"lon_max": -122.7,
},
]
IDEAL_TEMP_C = 21.0
MAX_COMFORT_HIGH_C = 30.0
MIN_COMFORT_LOW_C = 8.0
def safe_window_name(window_name):
return re.sub(r"[^a-zA-Z0-9_]+", "_", window_name.lower()).strip("_")
def month_from_path(path):
match = re.search(r"(20\d{2})(\d{2})(\d{2})", path)
if match is None:
return None
return int(match.group(2))
def region_for_point(lat, lon):
for region in REGIONS:
if (
region["lat_min"] <= lat <= region["lat_max"]
and region["lon_min"] <= lon <= region["lon_max"]
):
return region["region"]
return None
# ============================================================
# SCHEMA
# ============================================================
raw_schema = StructType([
StructField("region", StringType(), False),
StructField("time_ms", LongType(), False),
StructField("lat", DoubleType(), False),
StructField("lon", DoubleType(), False),
StructField("temp_k", DoubleType(), False),
StructField("precip", DoubleType(), False),
StructField("rh", DoubleType(), False),
StructField("cloud", DoubleType(), False),
StructField("snow", DoubleType(), False),
StructField("wind_speed", DoubleType(), False),
])
# ============================================================
# PARTIAL TRANSFORM
# ============================================================
def travel_partial_transform(points):
return (
points
.withColumn("lat_i", F.floor(F.col("lat") * 100).cast("int"))
.withColumn("lon_i", F.floor(F.col("lon") * 100).cast("int"))
.groupBy("region", "lat_i", "lon_i")
.agg(
F.sum("lat").alias("sum_lat"),
F.sum("lon").alias("sum_lon"),
F.count("*").alias("count_points"),
F.sum("temp_k").alias("sum_temp_k"),
F.count("temp_k").alias("count_temp"),
F.min("temp_k").alias("min_temp_k"),
F.max("temp_k").alias("max_temp_k"),
F.sum("precip").alias("sum_precip"),
F.count("precip").alias("count_precip"),
F.sum("rh").alias("sum_rh"),
F.count("rh").alias("count_rh"),
F.sum("cloud").alias("sum_cloud"),
F.count("cloud").alias("count_cloud"),
F.sum("snow").alias("sum_snow"),
F.max("snow").alias("max_snow"),
F.count("snow").alias("count_snow"),
F.sum("wind_speed").alias("sum_wind"),
F.count("wind_speed").alias("count_wind"),
)
)
# ============================================================
# COMBINE PARTIALS
# ============================================================
def combine_travel_partials(partials):
return (
partials
.groupBy("region", "lat_i", "lon_i")
.agg(
(F.sum("sum_lat") / F.sum("count_points")).alias("lat"),
(F.sum("sum_lon") / F.sum("count_points")).alias("lon"),
F.sum("sum_temp_k").alias("sum_temp_k"),
F.sum("count_temp").alias("count_temp"),
F.min("min_temp_k").alias("low_temp_k"),
F.max("max_temp_k").alias("high_temp_k"),
F.sum("sum_precip").alias("sum_precip"),
F.sum("count_precip").alias("count_precip"),
F.sum("sum_rh").alias("sum_rh"),
F.sum("count_rh").alias("count_rh"),
F.sum("sum_cloud").alias("sum_cloud"),
F.sum("count_cloud").alias("count_cloud"),
F.sum("sum_snow").alias("sum_snow"),
F.max("max_snow").alias("max_snow"),
F.sum("count_snow").alias("count_snow"),
F.sum("sum_wind").alias("sum_wind"),
F.sum("count_wind").alias("count_wind"),
F.sum("count_points").alias("cell_points"),
)
.withColumn("avg_temp_k", F.col("sum_temp_k") / F.col("count_temp"))
.withColumn("avg_precip_per_3hr", F.col("sum_precip") / F.col("count_precip"))
.withColumn("avg_daily_precip", F.col("avg_precip_per_3hr") * F.lit(8.0))
.withColumn("avg_rh", F.col("sum_rh") / F.col("count_rh"))
.withColumn("avg_cloud", F.col("sum_cloud") / F.col("count_cloud"))
.withColumn("avg_snow", F.col("sum_snow") / F.col("count_snow"))
.withColumn("avg_wind", F.col("sum_wind") / F.col("count_wind"))
)
# ============================================================
# ANALYZER
# ============================================================
def make_travel_analyzer(window_name, months):
def analyze_travel_window(cell_window):
region_window = (
cell_window
.groupBy("region")
.agg(
F.avg("lat").alias("lat"),
F.avg("lon").alias("lon"),
F.avg("avg_temp_k").alias("avg_temp_k"),
F.avg("low_temp_k").alias("low_temp_k"),
F.avg("high_temp_k").alias("high_temp_k"),
F.avg("avg_daily_precip").alias("avg_daily_precip"),
F.avg("avg_rh").alias("avg_rh"),
F.avg("avg_cloud").alias("avg_cloud"),
F.avg("avg_snow").alias("avg_snow"),
F.max("max_snow").alias("max_snow"),
F.avg("avg_wind").alias("avg_wind"),
F.sum("cell_points").alias("region_points"),
F.count("*").alias("region_cells"),
)
.withColumn("travel_window", F.lit(window_name))
.withColumn("months", F.lit(",".join(str(month) for month in months)))
.withColumn("avg_temp_c", F.col("avg_temp_k") - F.lit(273.15))
.withColumn("low_temp_c", F.col("low_temp_k") - F.lit(273.15))
.withColumn("high_temp_c", F.col("high_temp_k") - F.lit(273.15))
)
return (
region_window
.withColumn(
"temp_penalty",
F.abs(F.col("avg_temp_c") - F.lit(IDEAL_TEMP_C)) * F.lit(3.0),
)
.withColumn(
"heat_penalty",
F.greatest(
F.col("high_temp_c") - F.lit(MAX_COMFORT_HIGH_C),
F.lit(0.0),
) * F.lit(2.0),
)
.withColumn(
"cold_penalty",
F.greatest(
F.lit(MIN_COMFORT_LOW_C) - F.col("low_temp_c"),
F.lit(0.0),
) * F.lit(2.0),
)
.withColumn(
"rain_penalty",
F.least(F.col("avg_daily_precip") * F.lit(5.0), F.lit(25.0)),
)
.withColumn(
"humidity_penalty",
F.greatest(F.col("avg_rh") - F.lit(70.0), F.lit(0.0)) * F.lit(0.35),
)
.withColumn(
"cloud_penalty",
F.least(F.col("avg_cloud") / F.lit(10.0), F.lit(10.0)),
)
.withColumn(
"wind_penalty",
F.greatest(F.col("avg_wind") - F.lit(6.0), F.lit(0.0)) * F.lit(3.0),
)
.withColumn(
"snow_penalty",
F.least(F.col("max_snow") * F.lit(20.0), F.lit(20.0)),
)
.withColumn(
"comfort_index_raw",
F.lit(100.0)
- F.col("temp_penalty")
- F.col("heat_penalty")
- F.col("cold_penalty")
- F.col("rain_penalty")
- F.col("humidity_penalty")
- F.col("cloud_penalty")
- F.col("wind_penalty")
- F.col("snow_penalty"),
)
.withColumn(
"comfort_index",
F.least(F.greatest(F.col("comfort_index_raw"), F.lit(0.0)), F.lit(100.0)),
)
.select(
"travel_window",
"months",
"region",
"lat",
"lon",
"comfort_index",
"avg_temp_c",
"low_temp_c",
"high_temp_c",
"avg_daily_precip",
"avg_rh",
"avg_cloud",
"avg_wind",
"avg_snow",
"max_snow",
"temp_penalty",
"heat_penalty",
"cold_penalty",
"rain_penalty",
"humidity_penalty",
"cloud_penalty",
"wind_penalty",
"snow_penalty",
"region_points",
"region_cells",
)
)
return analyze_travel_window
sparky = Sparky(spark, sc)
setup_ctx = SparkyBatchContext(
glob_path=INPUT_GLOB,
partial_out=PARTIAL_OUT,
schema=raw_schema,
parse_line=lambda line: None,
partial_transform=travel_partial_transform,
batch_size=BATCH_SIZE,
)
all_paths = sparky.setup_job(setup_ctx)
print("Total 2018 files:", len(all_paths))
if len(all_paths) == 0:
raise ValueError(f"No files matched: {INPUT_GLOB}")
paths_by_window = {}
for window_name, months in TRAVEL_WINDOWS:
month_set = set(months)
window_paths = [path for path in all_paths if month_from_path(path) in month_set]
paths_by_window[window_name] = window_paths
print(f"{window_name}: {len(window_paths)} 2018 files")
# ============================================================
# RESOLVE COLUMNS
# ============================================================
time_idx = sparky.required_col("time")
lat_idx = sparky.required_col("lat")
lon_idx = sparky.required_col("lon")
temp_idx = sparky.required_col("temperature_surface")
precip_idx = sparky.required_col("total_precipitation_surface_3_hour_accumulation")
rh_idx = sparky.required_col("relative_humidity_zerodegc_isotherm")
cloud_idx = sparky.required_col("total_cloud_cover_entire_atmosphere_single_layer")
snow_idx = sparky.required_col("snow_depth_surface")
wind_speed_idx = sparky.required_col("wind_speed_gust_surface")
print("Using column indexes:")
print("time:", time_idx)
print("lat:", lat_idx)
print("lon:", lon_idx)
print("temp:", temp_idx)
print("precip:", precip_idx)
print("rh:", rh_idx)
print("cloud:", cloud_idx)
print("snow:", snow_idx)
print("wind_speed:", wind_speed_idx)
# ============================================================
# PARSER
# ============================================================
def make_travel_parser(
time_idx,
lat_idx,
lon_idx,
temp_idx,
precip_idx,
rh_idx,
cloud_idx,
snow_idx,
wind_speed_idx,
):
def parse_travel_line(line):
try:
if line.startswith("1_time"):
return None
parts = line.rstrip("\t").split("\t")
time_ms = safe_float(parts, time_idx)
lat = safe_float(parts, lat_idx)
lon = safe_float(parts, lon_idx)
temp_k = safe_float(parts, temp_idx)
precip = safe_float(parts, precip_idx)
rh = safe_float(parts, rh_idx)
cloud = safe_float(parts, cloud_idx)
snow = safe_float(parts, snow_idx)
wind_speed = safe_float(parts, wind_speed_idx)
if (
time_ms is None
or lat is None
or lon is None
or temp_k is None
or precip is None
or rh is None
or cloud is None
or snow is None
or wind_speed is None
):
return None
region = region_for_point(lat, lon)
if region is None:
return None
return (
region,
int(time_ms),
lat,
lon,
temp_k,
precip,
rh,
cloud,
snow,
wind_speed,
)
except Exception:
return None
return parse_travel_line
parse_travel_line = make_travel_parser(
time_idx=time_idx,
lat_idx=lat_idx,
lon_idx=lon_idx,
temp_idx=temp_idx,
precip_idx=precip_idx,
rh_idx=rh_idx,
cloud_idx=cloud_idx,
snow_idx=snow_idx,
wind_speed_idx=wind_speed_idx,
)
# ============================================================
# PROCESS ONE WINDOW
# ============================================================
def process_window(window_name, months, window_paths):
cleaned_window_name = safe_window_name(window_name)
window_out = f"{PARTIAL_OUT}/{cleaned_window_name}"
print()
print("=" * 80)
print(f"Processing 2018 window: {window_name}")
print(f"Months: {months}")
print(f"Files: {len(window_paths)}")
print("=" * 80)
if len(window_paths) == 0:
print(f"No 2018 files for {window_name}; skipping.")
return None
batch_ctx = SparkyBatchContext(
glob_path=INPUT_GLOB,
partial_out=window_out,
schema=raw_schema,
parse_line=parse_travel_line,
partial_transform=travel_partial_transform,
batch_size=BATCH_SIZE,
)
analysis_ctx = SparkyAnalysisContext(
partial_in=window_out,
combine_partials=combine_travel_partials,
analyze=make_travel_analyzer(window_name, months),
result_out=None,
)
window_start = time.perf_counter()
for batch_id, start in enumerate(range(0, len(window_paths), batch_ctx.batch_size)):
batch_start = time.perf_counter()
batch_paths = window_paths[start:start + batch_ctx.batch_size]
sparky.process_batch(batch_ctx, batch_paths, batch_id)
batch_elapsed = time.perf_counter() - batch_start
print(f"Finished {window_name} batch {batch_id} in {batch_elapsed:.2f} seconds")
cell_window = sparky.combine(analysis_ctx)
scored = sparky.analyze(analysis_ctx, cell_window)
window_elapsed = time.perf_counter() - window_start
print(f"Finished 2018 window {window_name} in {window_elapsed:.2f} seconds")
return scored
# ============================================================
# RUN ALL TRAVEL WINDOWS
# ============================================================
scored_windows = []
overall_start = time.perf_counter()
for window_name, months in TRAVEL_WINDOWS:
scored = process_window(
window_name=window_name,
months=months,
window_paths=paths_by_window[window_name],
)
if scored is not None:
scored_windows.append(scored)
overall_elapsed = time.perf_counter() - overall_start
print(f"Total 2018 Travel Startup Spark runtime: {overall_elapsed:.2f} seconds")
if len(scored_windows) == 0:
raise ValueError("No scored 2018 windows were produced.")
all_scores = scored_windows[0]
for scored in scored_windows[1:]:
all_scores = all_scores.unionByName(scored)
# ============================================================
# BEST REGION PER TRAVEL WINDOW
# ============================================================
ranked = (
all_scores
.withColumn(
"rank_in_window",
F.row_number().over(
Window.partitionBy("travel_window").orderBy(F.desc("comfort_index"))
),
)
)
best_per_window = (
ranked
.filter(F.col("rank_in_window") == 1)
.drop("rank_in_window")
.orderBy(
F.expr("""
CASE travel_window
WHEN 'Winter Break / New Year' THEN 1
WHEN 'Spring' THEN 2
WHEN 'Early Summer' THEN 3
WHEN 'Late Summer' THEN 4
WHEN 'Fall' THEN 5
ELSE 99
END
""")
)
)
print("Best destination for each 2018 travel window:")
best_per_window.select(
"travel_window",
"months",
"region",
"comfort_index",
"avg_temp_c",
"low_temp_c",
"high_temp_c",
"avg_daily_precip",
"avg_rh",
"avg_cloud",
"avg_wind",
"max_snow",
"region_cells",
).show(truncate=False)
print("All 2018 destination scores:")
all_scores.orderBy("travel_window", F.desc("comfort_index")).select(
"travel_window",
"months",
"region",
"comfort_index",
"avg_temp_c",
"low_temp_c",
"high_temp_c",
"avg_daily_precip",
"avg_rh",
"avg_cloud",
"avg_wind",
"max_snow",
"region_cells",
).show(200, truncate=False)
Deleted old partial output: hdfs:/travel_startup_2018_partials Found 1443 files
Total 2018 files: 1443 Winter Break / New Year: 248 2018 files Spring: 340 2018 files Early Summer: 367 2018 files Late Summer: 244 2018 files Fall: 244 2018 files Using column indexes: time: 0 lat: 1 lon: 2 temp: 10 precip: 13 rh: 8 cloud: 12 snow: 9 wind_speed: 17 ================================================================================ Processing 2018 window: Winter Break / New Year Months: [12, 1] Files: 248 ================================================================================ Processing batch 0: 50 files
Finished Winter Break / New Year batch 0 in 23.74 seconds Processing batch 1: 50 files
Finished Winter Break / New Year batch 1 in 21.65 seconds Processing batch 2: 50 files
Finished Winter Break / New Year batch 2 in 21.77 seconds Processing batch 3: 50 files
Finished Winter Break / New Year batch 3 in 17.31 seconds Processing batch 4: 48 files
Finished Winter Break / New Year batch 4 in 16.17 seconds Finished 2018 window Winter Break / New Year in 101.05 seconds ================================================================================ Processing 2018 window: Spring Months: [2, 3, 4] Files: 340 ================================================================================ Processing batch 0: 50 files
Finished Spring batch 0 in 16.84 seconds Processing batch 1: 50 files
Finished Spring batch 1 in 16.98 seconds Processing batch 2: 50 files
Finished Spring batch 2 in 16.17 seconds Processing batch 3: 50 files
Finished Spring batch 3 in 16.85 seconds Processing batch 4: 50 files
Finished Spring batch 4 in 17.14 seconds Processing batch 5: 50 files
Finished Spring batch 5 in 17.22 seconds Processing batch 6: 40 files
Finished Spring batch 6 in 16.28 seconds Finished 2018 window Spring in 117.94 seconds ================================================================================ Processing 2018 window: Early Summer Months: [5, 6, 7] Files: 367 ================================================================================ Processing batch 0: 50 files
Finished Early Summer batch 0 in 16.98 seconds Processing batch 1: 50 files
Finished Early Summer batch 1 in 15.34 seconds Processing batch 2: 50 files
Finished Early Summer batch 2 in 16.10 seconds Processing batch 3: 50 files
Finished Early Summer batch 3 in 17.33 seconds Processing batch 4: 50 files
Finished Early Summer batch 4 in 16.82 seconds Processing batch 5: 50 files
Finished Early Summer batch 5 in 17.62 seconds Processing batch 6: 50 files
Finished Early Summer batch 6 in 17.86 seconds Processing batch 7: 17 files
Finished Early Summer batch 7 in 10.67 seconds Finished 2018 window Early Summer in 129.16 seconds ================================================================================ Processing 2018 window: Late Summer Months: [8, 9] Files: 244 ================================================================================ Processing batch 0: 50 files
Finished Late Summer batch 0 in 16.75 seconds Processing batch 1: 50 files
Finished Late Summer batch 1 in 17.77 seconds Processing batch 2: 50 files
Finished Late Summer batch 2 in 18.31 seconds Processing batch 3: 50 files
Finished Late Summer batch 3 in 16.74 seconds Processing batch 4: 44 files
Finished Late Summer batch 4 in 17.10 seconds Finished 2018 window Late Summer in 87.10 seconds ================================================================================ Processing 2018 window: Fall Months: [10, 11] Files: 244 ================================================================================ Processing batch 0: 50 files
Finished Fall batch 0 in 17.50 seconds Processing batch 1: 50 files
Finished Fall batch 1 in 16.95 seconds Processing batch 2: 50 files
Finished Fall batch 2 in 15.63 seconds Processing batch 3: 50 files
Finished Fall batch 3 in 16.51 seconds Processing batch 4: 44 files
Finished Fall batch 4 in 17.10 seconds Finished 2018 window Fall in 84.15 seconds Total 2018 Travel Startup Spark runtime: 519.40 seconds Best destination for each 2018 travel window:
+-----------------------+------+----------------------+-----------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+--------+------------+ |travel_window |months|region |comfort_index |avg_temp_c |low_temp_c |high_temp_c |avg_daily_precip |avg_rh |avg_cloud |avg_wind |max_snow|region_cells| +-----------------------+------+----------------------+-----------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+--------+------------+ |Winter Break / New Year|12,1 |Miami |78.07625905889968|22.60698234062977 |12.499861904761872|30.206612857142943|1.7699692780337943 |30.741935483870964|43.60349462365591 |7.159790784130184 |0.0 |21 | |Spring |2,3,4 |New Orleans |75.72603480160166|19.214442488843872|6.202835517241397 |29.882799999999918|1.987981744421906 |34.21196754563895 |46.18225152129816 |6.254943275057809 |0.0 |29 | |Early Summer |5,6,7 |Vancouver |76.88059243809968|17.738883249614958|7.667636521739155 |31.854160000000036|0.9808079611420447 |58.77898125245823 |36.24570548513209 |2.939737445120247 |0.02172 |23 | |Late Summer |8,9 |San Francisco Bay Area|78.30875940109274|19.323642155191294|10.4081458666667 |36.11454746666681 |0.09863387978142077|20.676612021857917|39.375027322404385|4.793583148777318 |1.2E-4 |75 | |Fall |10,11 |Los Angeles Coast |82.2612633729508 |19.143465542740046|11.329275571428695|33.20703571428572 |0.5796545667447308 |26.397599531615924|28.535889929742382|3.3943909924268145|1.6E-4 |70 | +-----------------------+------+----------------------+-----------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+--------+------------+ All 2018 destination scores:
[Stage 847:=============================================> (4 + 1) / 5]
+-----------------------+------+----------------------------+------------------+-------------------+--------------------+------------------+-------------------+------------------+------------------+------------------+--------+------------+ |travel_window |months|region |comfort_index |avg_temp_c |low_temp_c |high_temp_c |avg_daily_precip |avg_rh |avg_cloud |avg_wind |max_snow|region_cells| +-----------------------+------+----------------------------+------------------+-------------------+--------------------+------------------+-------------------+------------------+------------------+------------------+--------+------------+ |Early Summer |5,6,7 |Vancouver |76.8805924380998 |17.738883249614958 |7.667636521739155 |31.85415999999998 |0.9808079611420449 |58.778981252458244|36.24570548513209 |2.939737445120247 |0.02172 |23 | |Early Summer |5,6,7 |San Francisco Bay Area |75.23419130027237 |18.594241428519638 |8.945867600000042 |36.24976840000022 |0.1333878292461399 |28.30153861398728 |43.81257039055402 |5.973426027602543 |4.0E-5 |75 | |Early Summer |5,6,7 |Boston |68.71109728719576 |21.34224604774886 |6.2502580952381095 |36.7153295238096 |1.661411703646036 |45.31821435785649 |47.5632541845076 |6.089545925244583 |0.0 |21 | |Early Summer |5,6,7 |Chicago |67.27978765866976 |20.157102663653916 |6.3914039024390945 |33.806859024390235|3.0575197713830002 |45.534758081876774|40.73011231474713 |5.5604947237523765|0.0 |41 | |Early Summer |5,6,7 |Los Angeles Coast |65.29434501907262 |23.09865853016754 |12.361159000000043 |41.73394728571452 |0.15988711560918653|26.86214844920203 |41.415492409497865|2.9103841196220315|4.0E-5 |70 | |Early Summer |5,6,7 |Seattle / Puget Sound |65.08614318948342 |19.54412570807858 |6.672773513513505 |39.57482486486492 |0.8950217247219973 |51.90821853243979 |42.670226084395026|3.145185755161205 |0.0 |37 | |Early Summer |5,6,7 |San Diego Coast |65.04266619945477 |22.34063886648505 |11.719746400000076 |42.14704920000008 |0.3068664850136239 |26.71588920326975 |51.069863760217984|2.655238927747139 |0.0 |25 | |Early Summer |5,6,7 |New York City |61.49962273024549 |21.583587757740816 |5.88843345454552 |35.008065818181876|3.529749814218481 |45.51155926182808 |48.61600198166955 |5.661918150071341 |0.0 |55 | |Early Summer |5,6,7 |Portland / Willamette Valley|60.256616593248694|19.270396457008815 |4.911733333333359 |41.103897500000016|0.4607553739025128 |42.67463276899789 |38.6566757493188 |3.592475078387603 |4.0E-5 |36 | |Early Summer |5,6,7 |Denver / Front Range |47.38774939600382 |21.7473702925912 |2.846803095238158 |44.16499380952371 |1.5713961333852342 |56.44744753341117 |33.83177630725314 |4.02809529046062 |0.02468 |42 | |Early Summer |5,6,7 |New Orleans |35.072104705440026|30.00199334398195 |16.794864827586252 |39.02017965517251 |3.3674245983275393 |47.916197388142436|30.444329606314003|3.780804649848726 |0.0 |29 | |Early Summer |5,6,7 |Miami |34.061678971065085|28.8349238782925 |22.670041904761945 |36.3000366666667 |6.954262358894512 |58.154596877124696|48.33476060724017 |4.438683072408201 |0.0 |21 | |Early Summer |5,6,7 |Las Vegas |28.930224415985528|29.585398483197082 |8.889546666666718 |51.28879733333332 |0.29037238873751137|38.99650682942779 |12.84123524069028 |4.251871068663942 |0.0 |30 | |Early Summer |5,6,7 |Phoenix / Scottsdale |8.828341656826964 |33.53106707462916 |11.292545277777776 |55.0890052777778 |0.445201332122313 |36.030672734786556|11.744399031183772|3.893489302041704 |0.0 |36 | |Fall |10,11 |Los Angeles Coast |82.26126337295062 |19.143465542740103 |11.329275571428695 |33.207035714285894|0.5796545667447308 |26.397599531615928|28.535889929742396|3.394390992426815 |1.6E-4 |70 | |Fall |10,11 |San Diego Coast |77.14535499999987 |18.99092882950822 |10.273522800000023 |34.9915764000001 |0.6877868852459017 |22.71262295081967 |34.05344262295082 |3.165551414683607 |0.0 |25 | |Fall |10,11 |Phoenix / Scottsdale |67.55244025387108 |19.080388020264195 |5.16328666666675 |37.17733777777778 |0.9747836976320584 |32.21106557377048 |17.867030965391624|3.7215491372085605|0.0 |36 | |Fall |10,11 |New Orleans |66.67447120972346 |20.994673595251697 |5.788291724137935 |33.45762275862069 |3.456543244771056 |40.045364612775586|46.88171283210854 |5.718812567163651 |0.0 |29 | |Fall |10,11 |San Francisco Bay Area |65.5836584136613 |15.128004705464491 |4.417730666666728 |30.504970266666703|1.086912568306011 |30.83426229508197 |31.825136612021847|4.1014530418155735|4.4E-4 |75 | |Fall |10,11 |Miami |60.71799652810271 |27.721469432084405 |15.817580000000078 |34.57002904761907 |1.3241608118657298 |31.10284933645589 |33.56733021077283 |5.457027606291764 |0.0 |21 | |Fall |10,11 |Las Vegas |46.068832419399044|13.821840908469994 |-0.3497513333333586 |36.78049299999998 |0.1452185792349727 |38.76051912568307 |13.773087431693995|4.306510428008197 |0.00164 |30 | |Fall |10,11 |Seattle / Puget Sound |28.802133630926317|10.013167131147611 |-0.13314378378373704|25.34349945945945 |3.1597252990695615 |61.618741692512195|61.68453699601239 |5.141573562887573 |2.0E-4 |37 | |Fall |10,11 |Portland / Willamette Valley|26.141521756602994|10.11086843920765 |-1.5116486111110703 |27.839578333333407|3.347620673952642 |54.04371584699454 |54.16882969034607 |5.3844589703239985|6.4E-4 |36 | |Fall |10,11 |Vancouver |22.151275017819106|9.276054590163994 |2.343271304347809 |19.821615652173932|8.912508909479687 |65.85620099786175 |62.770313613684955|4.235724615003564 |0.00432 |23 | |Fall |10,11 |Chicago |14.187471462151123|8.38664103758498 |-3.547795609756122 |26.02454682926833 |2.2090163934426235 |70.70251899240306 |63.580667732906846|7.8367433477489 |0.08588 |41 | |Fall |10,11 |New York City |8.315710972645121 |11.637580602086587 |-3.591253272727215 |25.13070018181827 |4.896646795827125 |60.90901639344262 |54.182265275707884|8.212087927151266 |0.19384 |55 | |Fall |10,11 |Boston |0.0 |8.262699182279562 |-9.057264761904719 |26.807930952380957|3.5885050741608118 |65.60421545667448 |55.65007806401248 |8.338916739945354 |0.075 |21 | |Fall |10,11 |Denver / Front Range |0.0 |5.501489242779087 |-11.614429285714266 |34.01414880952382 |0.8140124902419986 |61.883489461358316|32.98877829820453 |4.150118931835481 |0.09696 |42 | |Late Summer |8,9 |San Francisco Bay Area |78.30875940109274 |19.323642155191294 |10.4081458666667 |36.11454746666681 |0.09863387978142077|20.676612021857917|39.375027322404385|4.793583148777319 |1.2E-4 |75 | |Late Summer |8,9 |Chicago |67.59501320171896 |23.055984289284368 |10.329692682926861 |34.863978780487855|2.4895041983206725 |41.45821671331468 |40.615553778488604|5.809869288940424 |0.0 |41 | |Late Summer |8,9 |Portland / Willamette Valley|64.04983545081998 |19.53145956739536 |7.052050277777823 |41.57213611111115 |0.7118055555555555 |35.825022768670316|29.45343806921675 |3.4570917949554874|0.0 |36 | |Late Summer |8,9 |Denver / Front Range |63.544098880757 |21.854133951014887 |4.2673942857143174 |40.73763142857149 |0.5020979703356753 |53.998926619828254|24.425351288056202|3.7554996199902426|0.0 |42 | |Late Summer |8,9 |Los Angeles Coast |63.46463046779824 |25.5025065450821 |17.12462942857144 |39.69468899999998 |0.06548594847775176|23.2179156908665 |33.11042154566745 |2.559896578267565 |0.0 |70 | |Late Summer |8,9 |Seattle / Puget Sound |62.595128789321926|19.28495769273377 |7.295451621621623 |40.64622540540557 |1.1364643331856448 |44.487040319007534|38.75875055383251 |3.547820332633252 |0.0 |37 | |Late Summer |8,9 |San Diego Coast |62.515425852458904|25.1287854721312 |15.36540279999997 |40.03727279999998 |0.1315573770491803 |24.371967213114754|43.65885245901639 |2.341145993368853 |0.0 |25 | |Late Summer |8,9 |Vancouver |61.085878718816986|17.12099320563084 |8.751992173913038 |32.3494556521739 |3.6994832501781896 |53.47416250890948 |40.80773342836778 |3.05841683521561 |0.0 |23 | |Late Summer |8,9 |Boston |60.475383813426625|22.874553337236648 |10.541677142857225 |36.40105380952383 |3.130171740827479 |45.77263856362217 |54.47989851678376 |5.910847016366121 |0.0 |21 | |Late Summer |8,9 |New York City |53.554772501550914|24.29320278837571 |15.008871272727447 |34.05159654545457 |4.528576751117734 |51.527943368107316|58.16743666169895 |6.000932873551417 |0.0 |55 | |Late Summer |8,9 |Miami |37.21132795081948 |29.986971323185116 |25.488982857142787 |35.84957380952375 |4.130269320843093 |50.199258391881344|34.77263856362217 |4.114730698774747 |0.0 |21 | |Late Summer |8,9 |Las Vegas |31.57596641256785 |30.165631024590255 |14.602295333333359 |49.74296233333342 |0.17568306010928966|34.38360655737704 |5.628005464480875 |3.881807200107924 |0.0 |30 | |Late Summer |8,9 |New Orleans |27.726474590163754|30.061190486150394 |24.846032413793125 |38.12879551724143 |5.821014697569249 |60.85952515545505 |38.32362916902205 |3.5354297642278127|0.0 |29 | |Late Summer |8,9 |Phoenix / Scottsdale |6.316249412568248 |35.07858279143903 |22.01064416666668 |53.764961944444394|0.569159836065574 |46.07001366120219 |10.722791438979964|3.2285171197928038|0.0 |36 | |Spring |2,3,4 |New Orleans |75.72603480160161 |19.214442488843872 |6.202835517241397 |29.88280000000003 |1.987981744421907 |34.21196754563895 |46.182251521298184|6.25494327505781 |0.0 |29 | |Spring |2,3,4 |Los Angeles Coast |67.32192970546242 |15.854634450840422 |5.759157285714366 |32.67008528571438 |0.7150420168067227 |32.986092436974786|31.40907563025211 |3.7258175199794534|0.0352 |70 | |Spring |2,3,4 |Miami |66.6593355924369 |26.03618034873955 |14.9790971428572 |34.39626476190472 |1.3870448179271708 |23.60266106442577 |25.043697478991596|5.747430959560225 |0.0 |21 | |Spring |2,3,4 |San Diego Coast |63.89257408941175 |16.147180037647047 |4.782679200000018 |34.08803280000001 |0.514 |29.998941176470588|43.66658823529413 |3.338605755732941 |8.0E-5 |25 | |Spring |2,3,4 |Phoenix / Scottsdale |46.866694205065315|18.73842736029411 |1.3934558333334053 |45.22852444444453 |0.13684640522875818|30.580882352941167|19.878186274509808|3.7140952890197716|3.2E-4 |36 | |Spring |2,3,4 |San Francisco Bay Area |46.26255494431377 |12.064810163137224 |1.000993600000072 |28.165284933333282|1.8529803921568624 |44.96227450980391 |36.30560784313726 |5.168715563990668 |0.00192 |75 | |Spring |2,3,4 |Las Vegas |27.68696998823535 |13.480617956862773 |-3.506648666666649 |41.6889736666667 |0.21823529411764706|38.970882352941175|20.94862745098039 |5.255337115343138 |0.00888 |30 | |Spring |2,3,4 |Chicago |0.0 |2.000405583931183 |-18.409008292682984 |18.795235121951293|1.9676470588235293 |67.87632711621234 |52.11592539454806 |7.426830421054519 |0.39528 |41 | |Spring |2,3,4 |Seattle / Puget Sound |0.0 |7.741653746422799 |-5.608539189189116 |31.68416756756767 |3.955802861685214 |79.36669316375199 |67.88076311605722 |6.003372366403815 |0.08496 |37 | |Spring |2,3,4 |New York City |0.0 |5.414488437967918 |-6.163292181818235 |21.241486909090952|4.269598930481284 |66.61695187165775 |54.56636363636364 |8.284004699645456 |0.1928 |55 | |Spring |2,3,4 |Portland / Willamette Valley|0.0 |7.53191030310461 |-8.309794444444378 |31.825212222222262|3.540522875816993 |75.3702614379085 |66.64158496732027 |5.709095301198527 |0.13168 |36 | |Spring |2,3,4 |Vancouver |0.0 |6.371353310741711 |-4.735428260869583 |24.55640173913048 |5.820140664961637 |79.62020460358056 |65.54322250639386 |4.590978649189259 |0.39872 |23 | |Spring |2,3,4 |Boston |0.0 |4.218594557422932 |-9.836204285714302 |20.798897142857186|3.7221988795518217 |67.0672268907563 |54.00252100840335 |8.588929579785715 |0.2516 |21 | |Spring |2,3,4 |Denver / Front Range |0.0 |5.043801283613391 |-16.69673476190468 |30.87942666666669 |0.725 |54.851050420168065|28.650910364145656|4.98148557092437 |0.23 |42 | |Winter Break / New Year|12,1 |Miami |78.07625905889981 |22.60698234062977 |12.499861904761872 |30.206612857142886|1.769969278033794 |30.741935483870968|43.60349462365592 |7.159790784130182 |0.0 |21 | |Winter Break / New Year|12,1 |Los Angeles Coast |65.83647238018449 |14.298108762672825 |5.970274428571486 |27.54496342857135 |1.235483870967742 |34.924135944700446|34.257834101382485|3.7482681401814513|0.01976 |70 | |Winter Break / New Year|12,1 |San Diego Coast |64.82845066129059 |14.488365635483945 |4.525967200000025 |28.148825999999985|0.9341129032258065 |32.261129032258076|40.18016129032259 |3.5142669938241933|0.0 |25 | |Winter Break / New Year|12,1 |Phoenix / Scottsdale |51.15139809251822 |11.888095666442666 |-1.0716261111110725 |30.444485277777687|0.05157930107526881|34.670250896057354|22.003696236559133|3.420932316322805 |0.00112 |36 | |Winter Break / New Year|12,1 |San Francisco Bay Area |45.16968248064529 |10.919329927957051 |3.368869466666638 |20.579259999999977|2.006155913978495 |45.240860215053765|52.85666666666664 |4.7233207515919355|4.8E-4 |75 | |Winter Break / New Year|12,1 |New Orleans |27.59940128508737 |11.367425848164544 |-0.440084482758607 |22.93719724137941 |3.9201890989988883 |42.01738042269188 |54.3615127919911 |6.458136839898499 |0.01056 |29 | |Winter Break / New Year|12,1 |Las Vegas |26.7458974112906 |6.657908212365669 |-4.320478666666645 |24.73723366666661 |0.5116935483870967 |40.034946236559136|27.836021505376337|3.5227938626599458|0.01224 |30 | |Winter Break / New Year|12,1 |Chicago |0.0 |-1.2660238611329646|-19.576156097560954 |8.661305853658632 |1.5816778127458697 |75.00472069236821 |54.67142014162075 |8.473555726898798 |0.09976 |41 | |Winter Break / New Year|12,1 |Seattle / Puget Sound |0.0 |5.368115336748076 |-2.877403243243293 |13.775614594594572|5.110015257192676 |82.35287707061902 |74.23964690496949 |7.750997140253924 |0.00788 |37 | |Winter Break / New Year|12,1 |New York City |0.0 |1.8344318812315805 |-10.799302363636343 |11.711979636363651|3.10949413489736 |71.43438416422288 |44.73592375366568 |8.07446787323717 |0.31328 |55 | |Winter Break / New Year|12,1 |Portland / Willamette Valley|0.0 |5.484651470654171 |-3.4774858333333327 |15.192656666666778|5.419298835125448 |79.97838261648744 |75.41028225806451 |8.045170659446686 |0.02308 |36 | |Winter Break / New Year|12,1 |Vancouver |0.0 |4.578253290673217 |-1.0017613043478377 |10.597806086956552|13.025070126227211 |85.19021739130434 |75.64025245441796 |6.913380591456872 |0.204 |23 | |Winter Break / New Year|12,1 |Boston |0.0 |-0.9721785963901084|-17.480080476190523 |14.808346666666637|2.5719086021505375 |69.93951612903227 |40.0583717357911 |8.763074218473504 |0.36264 |21 | |Winter Break / New Year|12,1 |Denver / Front Range |0.0 |-1.4821784840628993|-14.899612619047616 |18.068379523809654|0.3289170506912442 |53.10647081413212 |26.278033794162827|4.259061145825652 |0.14872 |42 | +-----------------------+------+----------------------------+------------------+-------------------+--------------------+------------------+-------------------+------------------+------------------+------------------+--------+------------+
Your Itinerary¶
The itinerary has been designed for travelling in 2019-20, with 2018 data as a base for our trademarked comfort scorer pro max. We will start our travels in Feb, so that we can end the year with a bang with Christmas + New Year. We will divide your journey into 5 time periods.
- Feb - Apr
- May - Jul
- Aug - Sep
- Oct - Nov
- Dec - Jan
Spring (Feb - Apr)¶
Spring time in New Orleans would be amazing! Check out one of the beautiful gardens you can visit:

There are also festivals to attend, like this one:

Early Summer (May - Jul)¶
The longest leg of this year-long journey takes us to Vancouver, Canada!

Late Summer (Aug - Sept)¶
San Francisco Bay Area
There are lots of things to do in San Francisco, and it might be a good idea to visit Seattle, and other surrounding places.
- San Francisco

Fall (Oct - Nov)¶
Los Angeles coast had the best score for fall. You can enjoy:
- Santa Monica Beach

- Venice Beach

and many more beaches, each with its own charm!
Although Los Angeles coast scored the highest on this leg of the trip, we will also visit other places nearby!
Time to go to San Diego.

Enjoy the festivities in October!

Las Vegas is close too!
- Las Vegas

Christmas - New Year¶
Time to head to Miami for Christmas and New year celebrations! This party and beach town averages 70F at this time.

Remarks¶
I think it is a little awkward that Late Summer was SF bay area and Fall was LA. If I had to do this differently, maybe I would have enforced a distance too? Something to make sure that places close to each other wouldn't be picked in conscutive travel windows...
LEAST FOGGY (I AM RICH)¶
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, LongType, DoubleType
from SparkHandler import Sparky, SparkyBatchContext, SparkyAnalysisContext
import util
import math
import gc
import os
import time
import folium
from IPython.display import IFrame, display
# Restore SparkContext in case `sc` got overwritten by matplotlib
sc = spark.sparkContext
# ============================================================
# CONFIG
# ============================================================
INPUT_GLOB = "hdfs:///3hr/*/*_20*{07,08,09,10}*_*"
BATCH_SIZE = 50
PARTIAL_OUT = f"hdfs:///user/{sc.sparkUser()}/least_foggy_bay_area_partials"
RESULT_OUT = None
# Bay Area bounding box
LAT_MIN = 36.8
LAT_MAX = 38.9
LON_MIN = -123.2
LON_MAX = -121.4
# Fog season
FOG_MONTHS = [5, 6, 7, 8, 9]
# 20 -> 0.05 degree cells
CELL_SCALE = 20
MIN_CELL_POINTS = 100
TOP_N = 10
# ============================================================
# SCHEMA
# ============================================================
raw_schema = StructType([
StructField("time_ms", LongType(), False),
StructField("lat", DoubleType(), False),
StructField("lon", DoubleType(), False),
StructField("cloud", DoubleType(), True),
StructField("visibility", DoubleType(), True),
])
time_idx = None
lat_idx = None
lon_idx = None
cloud_idx = None
vis_idx = None
# ============================================================
# PARSER
# ============================================================
def parse_fog_line(line):
try:
if line.startswith("1_time"):
return None
parts = line.rstrip("\t").split("\t")
time_ms = util.safe_float(parts, time_idx)
lat = util.safe_float(parts, lat_idx)
lon = util.safe_float(parts, lon_idx)
if time_ms is None or lat is None or lon is None:
return None
# Early geographic filter
if not (LAT_MIN <= lat <= LAT_MAX and LON_MIN <= lon <= LON_MAX):
return None
cloud = util.safe_float(parts, cloud_idx)
visibility = util.safe_float(parts, vis_idx)
return (
int(time_ms),
lat,
lon,
cloud,
visibility,
)
except Exception:
return None
# ============================================================
# PARTIAL TRANSFORM
# This runs once per 50-file batch
# ============================================================
def partial_transform(points):
points = (
points
.withColumn(
"timestamp",
F.from_unixtime((F.col("time_ms") / 1000).cast("long")).cast("timestamp")
)
.withColumn("month", F.month("timestamp"))
)
if FOG_MONTHS is not None:
points = points.filter(F.col("month").isin(FOG_MONTHS))
return (
points
.withColumn("lat_i", F.floor(F.col("lat") * CELL_SCALE).cast("int"))
.withColumn("lon_i", F.floor(F.col("lon") * CELL_SCALE).cast("int"))
.groupBy("lat_i", "lon_i")
.agg(
F.sum("lat").alias("sum_lat"),
F.sum("lon").alias("sum_lon"),
F.count("*").alias("count_points"),
F.sum("cloud").alias("sum_cloud"),
F.count("cloud").alias("count_cloud"),
F.sum("visibility").alias("sum_visibility"),
F.count("visibility").alias("count_visibility"),
)
)
# ============================================================
# COMBINE PARTIALS
# This combines all batch parquet files
# ============================================================
def combine_fog_partials(partials):
combined = (
partials
.groupBy("lat_i", "lon_i")
.agg(
F.sum("sum_lat").alias("sum_lat"),
F.sum("sum_lon").alias("sum_lon"),
F.sum("count_points").alias("cell_points"),
F.sum("sum_cloud").alias("sum_cloud"),
F.sum("count_cloud").alias("count_cloud"),
F.sum("sum_visibility").alias("sum_visibility"),
F.sum("count_visibility").alias("count_visibility"),
)
)
return (
combined
.withColumn("lat", F.col("sum_lat") / F.col("cell_points"))
.withColumn("lon", F.col("sum_lon") / F.col("cell_points"))
.withColumn(
"avg_cloud",
F.when(F.col("count_cloud") > 0, F.col("sum_cloud") / F.col("count_cloud"))
)
.withColumn(
"avg_visibility",
F.when(
F.col("count_visibility") > 0,
F.col("sum_visibility") / F.col("count_visibility")
)
)
.select(
"lat_i",
"lon_i",
"lat",
"lon",
"cell_points",
"avg_cloud",
"avg_visibility",
)
)
# ============================================================
# ANALYZE
# Builds fog_score and clarity_score
# ============================================================
def clamp01(col):
return F.least(F.greatest(col, F.lit(0.0)), F.lit(1.0))
def analyze_least_foggy(cell_stats):
scored = (
cell_stats
.withColumn("avg_cloud_filled", F.coalesce(F.col("avg_cloud"), F.lit(40.0)))
.withColumn("avg_visibility_filled", F.coalesce(F.col("avg_visibility"), F.lit(20000.0)))
)
cloud_term = clamp01(F.col("avg_cloud_filled") / F.lit(100.0))
vis_term = clamp01((F.lit(20000.0) - F.col("avg_visibility_filled")) / F.lit(20000.0))
if vis_idx is not None:
scored = scored.withColumn(
"fog_score",
F.lit(0.30) * cloud_term +
F.lit(0.20) * vis_term
)
else:
scored = scored.withColumn(
"fog_score", cloud_term
)
return (
scored
.withColumn("clarity_score", F.lit(100.0) * (F.lit(1.0) - F.col("fog_score")))
.filter(F.col("cell_points") >= MIN_CELL_POINTS)
.orderBy(
F.desc("clarity_score"),
F.asc("avg_cloud_filled"),
)
)
# ============================================================
# SETUP SPARKY
# ============================================================
sparky = Sparky(spark, sc)
batch_ctx = SparkyBatchContext(
glob_path=INPUT_GLOB,
partial_out=PARTIAL_OUT,
schema=raw_schema,
parse_line=parse_fog_line,
partial_transform=partial_transform,
batch_size=BATCH_SIZE,
)
least_foggy_runtime_start = time.perf_counter()
input_paths = sparky.setup_job(batch_ctx)
print("Total input files:", len(input_paths))
print("Batch size:", BATCH_SIZE)
print("Number of batches:", (len(input_paths) + BATCH_SIZE - 1) // BATCH_SIZE)
if len(input_paths) == 0:
raise ValueError(f"No files matched: {INPUT_GLOB}")
# ============================================================
# COLUMN DISCOVERY FROM sparky.columns
# ============================================================
def required_idx(name):
key = name.strip().lower()
if key not in sparky.columns:
raise ValueError(f"Missing required column: {name}")
return sparky.columns[key]
def optional_idx(name):
key = name.strip().lower()
return sparky.columns.get(key)
def optional_idx_contains(*tokens):
tokens = [t.strip().lower() for t in tokens]
for col_name, idx in sparky.columns.items():
if all(token in col_name for token in tokens):
return idx
return None
time_idx = required_idx("time")
lat_idx = required_idx("lat")
lon_idx = required_idx("lon")
cloud_idx = optional_idx("total_cloud_cover_entire_atmosphere_single_layer")
vis_idx = optional_idx("visibility_surface")
print("Using column indices:")
print("time:", time_idx)
print("lat:", lat_idx)
print("lon:", lon_idx)
print("cloud cover:", cloud_idx)
print("visibility:", vis_idx)
Deleted old partial output: hdfs:/user/rsbhatia2/least_foggy_bay_area_partials Found 3194 files
[Stage 848:> (0 + 1) / 1]
Total input files: 3194 Batch size: 50 Number of batches: 64 Using column indices: time: 0 lat: 1 lon: 2 cloud cover: 12 visibility: 15
# ============================================================
# RUN BATCHES THROUGH SPARKY
# ============================================================
for batch_id, start in enumerate(range(0, len(input_paths), BATCH_SIZE)):
batch_paths = input_paths[start:start + BATCH_SIZE]
sparky.process_batch(batch_ctx, batch_paths, batch_id)
Processing batch 0: 50 files
Processing batch 1: 50 files
Processing batch 2: 50 files
Processing batch 3: 50 files
Processing batch 4: 50 files
Processing batch 5: 50 files
Processing batch 6: 50 files
Processing batch 7: 50 files
Processing batch 8: 50 files
Processing batch 9: 50 files
Processing batch 10: 50 files
Processing batch 11: 50 files
Processing batch 12: 50 files
Processing batch 13: 50 files
Processing batch 14: 50 files
Processing batch 15: 50 files
Processing batch 16: 50 files
Processing batch 17: 50 files
Processing batch 18: 50 files
Processing batch 19: 50 files
Processing batch 20: 50 files
Processing batch 21: 50 files
Processing batch 22: 50 files
Processing batch 23: 50 files
Processing batch 24: 50 files
Processing batch 25: 50 files
Processing batch 26: 50 files
Processing batch 27: 50 files
Processing batch 28: 50 files
Processing batch 29: 50 files
Processing batch 30: 50 files
Processing batch 31: 50 files
Processing batch 32: 50 files
Processing batch 33: 50 files
Processing batch 34: 50 files
Processing batch 35: 50 files
Processing batch 36: 50 files
Processing batch 37: 50 files
Processing batch 38: 50 files
Processing batch 39: 50 files
Processing batch 40: 50 files
Processing batch 41: 50 files
Processing batch 42: 50 files
Processing batch 43: 50 files
Processing batch 44: 50 files
Processing batch 45: 50 files
Processing batch 46: 50 files
Processing batch 47: 50 files
Processing batch 48: 50 files
Processing batch 49: 50 files
Processing batch 50: 50 files
Processing batch 51: 50 files
Processing batch 52: 50 files
Processing batch 53: 50 files
Processing batch 54: 50 files
Processing batch 55: 50 files
Processing batch 56: 50 files
Processing batch 57: 50 files
Processing batch 58: 50 files
Processing batch 59: 50 files
Processing batch 60: 50 files
Processing batch 61: 50 files
Processing batch 62: 50 files
Processing batch 63: 44 files
# ============================================================
# COMBINE + ANALYZE THROUGH SPARKY
# ============================================================
analysis_ctx = SparkyAnalysisContext(
partial_in=PARTIAL_OUT,
combine_partials=combine_fog_partials,
analyze=analyze_least_foggy,
result_out=RESULT_OUT,
)
cell_stats = sparky.combine(analysis_ctx)
least_foggy = sparky.analyze(analysis_ctx, cell_stats)
# ============================================================
# SHOW RESULTS
# ============================================================
print("Top least-foggy Bay Area cells:")
top_pdf = (
least_foggy
.select(
"lat",
"lon",
"clarity_score",
"fog_score",
"avg_cloud",
"avg_visibility",
"cell_points",
)
.limit(TOP_N)
.toPandas()
)
display(top_pdf)
least_foggy_runtime = time.perf_counter() - least_foggy_runtime_start
print(f"Total Least Foggy Spark runtime: {least_foggy_runtime:.2f} seconds")
Top least-foggy Bay Area cells:
| lat | lon | clarity_score | fog_score | avg_cloud | avg_visibility | cell_points | |
|---|---|---|---|---|---|---|---|
| 0 | 38.877293 | -122.166343 | 98.172023 | 0.018280 | 6.093256 | 24106.990715 | 1892 |
| 1 | 38.850548 | -122.303162 | 98.143547 | 0.018565 | 6.188177 | 24084.512906 | 1201 |
| 2 | 38.773071 | -122.139185 | 98.137302 | 0.018627 | 6.208993 | 24103.445741 | 1892 |
| 3 | 38.788658 | -122.008523 | 98.038884 | 0.019611 | 6.537052 | 24080.099917 | 1201 |
| 4 | 38.689884 | -121.978465 | 98.010408 | 0.019896 | 6.631973 | 24102.070079 | 1892 |
| 5 | 38.892870 | -122.035550 | 98.001166 | 0.019988 | 6.662781 | 24054.870941 | 1201 |
| 6 | 38.746358 | -122.275882 | 97.996919 | 0.020031 | 6.676936 | 24090.507910 | 1201 |
| 7 | 38.668795 | -122.112081 | 97.958951 | 0.020410 | 6.803497 | 24092.757910 | 1892 |
| 8 | 37.580908 | -121.420552 | 97.957952 | 0.020420 | 6.806828 | 24069.001296 | 1892 |
| 9 | 38.834758 | -122.433918 | 97.953455 | 0.020465 | 6.821815 | 24105.667963 | 1892 |
Total Least Foggy Spark runtime: 896.84 seconds
# ============================================================
# REAL MAP: TOP LAT/LON POINTS ONLY
# ============================================================
def fmt(value, digits=2):
if value is None:
return "N/A"
try:
if math.isnan(value):
return "N/A"
except TypeError:
pass
return f"{value:.{digits}f}"
center_lat = top_pdf["lat"].mean() if len(top_pdf) > 0 else 37.7
center_lon = top_pdf["lon"].mean() if len(top_pdf) > 0 else -122.2
m = folium.Map(
location=[center_lat, center_lon],
zoom_start=9,
tiles=None,
)
folium.TileLayer(
tiles="https://{s}.basemaps.cartocdn.com/light_all/{z}/{x}/{y}{r}.png",
attr='© <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors © <a href="https://carto.com/attributions">CARTO</a>',
name="CartoDB Positron",
control=False,
).add_to(m)
for i, row in top_pdf.reset_index(drop=True).iterrows():
rank = i + 1
popup_html = f"""
<b>Rank {rank}: Least-foggy candidate</b><br>
Latitude: {row['lat']:.4f}<br>
Longitude: {row['lon']:.4f}<br>
Clarity score: {fmt(row['clarity_score'])}<br>
Fog score: {fmt(row['fog_score'], 3)}<br>
Avg cloud cover: {fmt(row['avg_cloud'])}<br>
Avg visibility: {fmt(row['avg_visibility'])}<br>
Cell points: {int(row['cell_points'])}
"""
folium.Marker(
location=[row["lat"], row["lon"]],
popup=folium.Popup(popup_html, max_width=350),
tooltip=f"#{rank}: clarity {row['clarity_score']:.2f}",
icon=folium.DivIcon(
html=f"""
<div style="
background-color: black;
color: white;
border-radius: 50%;
width: 30px;
height: 30px;
text-align: center;
line-height: 30px;
font-weight: bold;
border: 2px solid white;
box-shadow: 0 0 4px gray;
">
{rank}
</div>
"""
),
).add_to(m)
if len(top_pdf) > 0:
bounds = [
[top_pdf["lat"].min(), top_pdf["lon"].min()],
[top_pdf["lat"].max(), top_pdf["lon"].max()],
]
m.fit_bounds(bounds)
html_path = "least_foggy_bay_area_points.html"
m.save(html_path)
print("Saved map to:", html_path)
display(IFrame(src=html_path, width=950, height=700))
Saved map to: least_foggy_bay_area_points.html
YOLO¶
Once I am rich and famous and rich and famous and rich, I am going to live in YOLO county, CA. It is the best place for a fogphobic trillionaire like me.

SolarWind Inc¶
from pyspark.sql import functions as F
from pyspark.sql.types import (
StructType, StructField,
StringType, DoubleType
)
from SparkHandler import Sparky, SparkyBatchContext, SparkyAnalysisContext
import util
import os
import time
import math
import folium
from IPython.display import display, IFrame
sc = spark.sparkContext
# ============================================================
# CONFIG
# ============================================================
INPUT_GLOB = "hdfs:///3hr/*/*"
BATCH_SIZE = 50
PARTIAL_OUT = f"hdfs:///user/{sc.sparkUser()}/solarwind_partials_vegetation_land_proxy"
RESULT_OUT = None
GEOHASH_PRECISION = 4
FILE_STRIDE = 24
MIN_GEOHASH_POINTS = 200
TOP_N = 3
MIN_AVG_VEGETATION = 1.0
LAT_COLUMN = "lat"
LON_COLUMN = "lon"
CLOUD_COLUMN = "total_cloud_cover_entire_atmosphere_single_layer"
PRECIP_COLUMN = "total_precipitation_surface_3_hour_accumulation"
WIND_GUST_COLUMN = "wind_speed_gust_surface"
TEMP_COLUMN = "temperature_surface"
VEGETATION_COLUMN = "vegetation_surface"
# ============================================================
# GEOHASH ENCODER
# ============================================================
_BASE32 = "0123456789bcdefghjkmnpqrstuvwxyz"
def encode_geohash(lat, lon, precision=4):
lat_range = [-90.0, 90.0]
lon_range = [-180.0, 180.0]
bits = [16, 8, 4, 2, 1]
bit = 0
ch = 0
even = True
geohash = []
while len(geohash) < precision:
if even:
mid = (lon_range[0] + lon_range[1]) / 2.0
if lon >= mid:
ch |= bits[bit]
lon_range[0] = mid
else:
lon_range[1] = mid
else:
mid = (lat_range[0] + lat_range[1]) / 2.0
if lat >= mid:
ch |= bits[bit]
lat_range[0] = mid
else:
lat_range[1] = mid
even = not even
if bit < 4:
bit += 1
else:
geohash.append(_BASE32[ch])
bit = 0
ch = 0
return "".join(geohash)
# ============================================================
# SCHEMA
# ============================================================
raw_schema = StructType([
StructField("geohash", StringType(), False),
StructField("lat", DoubleType(), False),
StructField("lon", DoubleType(), False),
StructField("cloud", DoubleType(), False),
StructField("precip", DoubleType(), False),
StructField("wind_speed", DoubleType(), False),
StructField("wind_power_proxy", DoubleType(), False),
StructField("temp_k", DoubleType(), False),
StructField("vegetation", DoubleType(), True),
])
lat_idx = None
lon_idx = None
cloud_idx = None
precip_idx = None
wind_gust_idx = None
temp_idx = None
vegetation_idx = None
# ============================================================
# PARSER
# ============================================================
def parse_solarwind_line(line):
try:
if line.startswith("1_time"):
return None
parts = line.rstrip("\t").split("\t")
lat = util.safe_float(parts, lat_idx)
lon = util.safe_float(parts, lon_idx)
cloud = util.safe_float(parts, cloud_idx)
precip = util.safe_float(parts, precip_idx)
wind_speed = util.safe_float(parts, wind_gust_idx)
temp_k = util.safe_float(parts, temp_idx)
vegetation = util.safe_float(parts, vegetation_idx)
if (
lat is None
or lon is None
or cloud is None
or precip is None
or wind_speed is None
or temp_k is None
):
return None
geohash = encode_geohash(lat, lon, GEOHASH_PRECISION)
# Wind power is approximately proportional to wind speed cubed.
wind_power_proxy = wind_speed ** 3
return (
geohash,
lat,
lon,
cloud,
precip,
wind_speed,
wind_power_proxy,
temp_k,
vegetation,
)
except Exception:
return None
# ============================================================
# PARTIAL TRANSFORM
# Runs once per batch
# ============================================================
def partial_transform(points):
return (
points
.groupBy("geohash")
.agg(
F.sum("lat").alias("sum_lat"),
F.sum("lon").alias("sum_lon"),
F.count("*").alias("count_points"),
F.sum("cloud").alias("sum_cloud"),
F.count("cloud").alias("count_cloud"),
F.sum("precip").alias("sum_precip"),
F.count("precip").alias("count_precip"),
F.sum("wind_speed").alias("sum_wind_speed"),
F.sum("wind_power_proxy").alias("sum_wind_power_proxy"),
F.max("wind_speed").alias("max_wind_speed"),
F.count("wind_speed").alias("count_wind"),
F.sum("temp_k").alias("sum_temp_k"),
F.count("temp_k").alias("count_temp"),
F.sum("vegetation").alias("sum_vegetation"),
F.count("vegetation").alias("count_vegetation"),
)
)
# ============================================================
# COMBINE PARTIALS
# Combines batch parquet files into geohash-level rows
# ============================================================
def combine_solarwind_partials(partials):
combined = (
partials
.groupBy("geohash")
.agg(
F.sum("sum_lat").alias("sum_lat"),
F.sum("sum_lon").alias("sum_lon"),
F.sum("count_points").alias("points"),
F.sum("sum_cloud").alias("sum_cloud"),
F.sum("count_cloud").alias("count_cloud"),
F.sum("sum_precip").alias("sum_precip"),
F.sum("count_precip").alias("count_precip"),
F.sum("sum_wind_speed").alias("sum_wind_speed"),
F.sum("sum_wind_power_proxy").alias("sum_wind_power_proxy"),
F.max("max_wind_speed").alias("max_wind_speed"),
F.sum("count_wind").alias("count_wind"),
F.sum("sum_temp_k").alias("sum_temp_k"),
F.sum("count_temp").alias("count_temp"),
F.sum("sum_vegetation").alias("sum_vegetation"),
F.sum("count_vegetation").alias("count_vegetation"),
)
)
return (
combined
.withColumn("lat", F.col("sum_lat") / F.col("points"))
.withColumn("lon", F.col("sum_lon") / F.col("points"))
.withColumn("avg_cloud_cover", F.col("sum_cloud") / F.col("count_cloud"))
# NAM total precipitation field is a 3-hour accumulation.
.withColumn("avg_precip_3hr", F.col("sum_precip") / F.col("count_precip"))
.withColumn("avg_daily_precip", F.col("avg_precip_3hr") * F.lit(8.0))
.withColumn("avg_wind_speed", F.col("sum_wind_speed") / F.col("count_wind"))
.withColumn("avg_wind_power_proxy", F.col("sum_wind_power_proxy") / F.col("count_wind"))
.withColumn("avg_temp_k", F.col("sum_temp_k") / F.col("count_temp"))
.withColumn("avg_temp_c", F.col("avg_temp_k") - F.lit(273.15))
.withColumn(
"avg_vegetation",
F.when(
F.col("count_vegetation") > 0,
F.col("sum_vegetation") / F.col("count_vegetation")
)
)
.filter(F.col("points") >= MIN_GEOHASH_POINTS)
# Approximate land filter.
# This removes open-ocean-like geohashes using vegetation_surface.
.filter(F.col("avg_vegetation") >= MIN_AVG_VEGETATION)
.select(
"geohash",
"lat",
"lon",
"points",
"avg_vegetation",
"avg_cloud_cover",
"avg_precip_3hr",
"avg_daily_precip",
"avg_wind_speed",
"avg_wind_power_proxy",
"max_wind_speed",
"avg_temp_c",
)
)
# ============================================================
# ANALYZE / SCORE
# ============================================================
def norm_expr(col_name, min_value, max_value):
if min_value is None or max_value is None or max_value == min_value:
return F.lit(50.0)
return (
(F.col(col_name) - F.lit(float(min_value))) /
(F.lit(float(max_value)) - F.lit(float(min_value))) *
F.lit(100.0)
)
def analyze_solarwind(geohash_stats):
ranges = geohash_stats.agg(
F.min("avg_cloud_cover").alias("cloud_min"),
F.max("avg_cloud_cover").alias("cloud_max"),
F.min("avg_daily_precip").alias("precip_min"),
F.max("avg_daily_precip").alias("precip_max"),
F.min("avg_wind_power_proxy").alias("wind_power_min"),
F.max("avg_wind_power_proxy").alias("wind_power_max"),
).first().asDict()
scored = (
geohash_stats
.withColumn(
"cloud_norm",
norm_expr("avg_cloud_cover", ranges["cloud_min"], ranges["cloud_max"])
)
.withColumn(
"precip_norm",
norm_expr("avg_daily_precip", ranges["precip_min"], ranges["precip_max"])
)
.withColumn(
"wind_norm",
norm_expr("avg_wind_power_proxy", ranges["wind_power_min"], ranges["wind_power_max"])
)
# Solar proxy:
# lower cloud cover + lower precipitation = better solar farm candidate.
.withColumn(
"solar_score",
(
F.lit(0.75) * (F.lit(100.0) - F.col("cloud_norm")) +
F.lit(0.25) * (F.lit(100.0) - F.col("precip_norm"))
)
)
.withColumn("wind_score", F.col("wind_norm"))
.withColumn(
"combined_score",
F.lit(0.50) * F.col("solar_score") + F.lit(0.50) * F.col("wind_score")
)
)
base_cols = [
"geohash",
"lat",
"lon",
"points",
"avg_vegetation",
"solar_score",
"wind_score",
"combined_score",
"avg_cloud_cover",
"avg_daily_precip",
"avg_wind_speed",
"avg_wind_power_proxy",
"max_wind_speed",
"avg_temp_c",
]
solar_top3 = (
scored
.orderBy(F.desc("solar_score"))
.limit(TOP_N)
.select(*base_cols)
.withColumn("category", F.lit("Solar"))
.withColumn("category_score", F.col("solar_score"))
)
wind_top3 = (
scored
.orderBy(F.desc("wind_score"))
.limit(TOP_N)
.select(*base_cols)
.withColumn("category", F.lit("Wind"))
.withColumn("category_score", F.col("wind_score"))
)
combined_top3 = (
scored
.orderBy(F.desc("combined_score"))
.limit(TOP_N)
.select(*base_cols)
.withColumn("category", F.lit("Solar + Wind"))
.withColumn("category_score", F.col("combined_score"))
)
return (
solar_top3
.unionByName(wind_top3)
.unionByName(combined_top3)
.select(
"category",
"geohash",
"lat",
"lon",
"category_score",
"solar_score",
"wind_score",
"combined_score",
"avg_vegetation",
"avg_cloud_cover",
"avg_daily_precip",
"avg_wind_speed",
"avg_wind_power_proxy",
"max_wind_speed",
"avg_temp_c",
"points",
)
)
# ============================================================
# SETUP SPARKY
# ============================================================
sparky = Sparky(spark, sc)
batch_ctx = SparkyBatchContext(
glob_path=INPUT_GLOB,
partial_out=PARTIAL_OUT,
schema=raw_schema,
parse_line=parse_solarwind_line,
partial_transform=partial_transform,
batch_size=BATCH_SIZE,
)
input_paths = sparky.setup_job(batch_ctx)
input_paths = input_paths[::FILE_STRIDE]
print("Total files after stride:", len(input_paths))
print("Batch size:", BATCH_SIZE)
print("Number of batches:", (len(input_paths) + BATCH_SIZE - 1) // BATCH_SIZE)
if len(input_paths) == 0:
raise ValueError(f"No files matched: {INPUT_GLOB}")
def required_idx(name):
key = name.strip().lower()
if key not in sparky.columns:
print("Available related columns:")
for col_name, idx in sparky.columns.items():
if name.split("_")[0] in col_name:
print(idx, col_name)
raise ValueError(f"Missing required column: {name}")
return sparky.columns[key]
lat_idx = required_idx(LAT_COLUMN)
lon_idx = required_idx(LON_COLUMN)
cloud_idx = required_idx(CLOUD_COLUMN)
precip_idx = required_idx(PRECIP_COLUMN)
wind_gust_idx = required_idx(WIND_GUST_COLUMN)
temp_idx = required_idx(TEMP_COLUMN)
vegetation_idx = required_idx(VEGETATION_COLUMN)
print("Using column indices:")
print("lat:", lat_idx)
print("lon:", lon_idx)
print("cloud:", cloud_idx)
print("precip:", precip_idx)
print("wind gust:", wind_gust_idx)
print("temp:", temp_idx)
print("vegetation:", vegetation_idx)
# ============================================================
# RUN BATCHES
# ============================================================
overall_start = time.perf_counter()
for batch_id, start in enumerate(range(0, len(input_paths), BATCH_SIZE)):
batch_paths = input_paths[start:start + BATCH_SIZE]
sparky.process_batch(batch_ctx, batch_paths, batch_id)
overall_elapsed = time.perf_counter() - overall_start
print(f"Total SolarWind batch runtime: {overall_elapsed:.2f} seconds")
# ============================================================
# COMBINE + ANALYZE
# ============================================================
analysis_ctx = SparkyAnalysisContext(
partial_in=PARTIAL_OUT,
combine_partials=combine_solarwind_partials,
analyze=analyze_solarwind,
result_out=RESULT_OUT,
)
geohash_stats = sparky.combine(analysis_ctx)
top9 = sparky.analyze(analysis_ctx, geohash_stats)
display_cols = [
"category",
"geohash",
"lat",
"lon",
"category_score",
"solar_score",
"wind_score",
"combined_score",
"avg_vegetation",
"avg_cloud_cover",
"avg_daily_precip",
"avg_wind_speed",
"avg_wind_power_proxy",
"max_wind_speed",
"avg_temp_c",
"points",
]
top9_pdf = top9.select(*display_cols).toPandas()
for col in [
"lat",
"lon",
"category_score",
"solar_score",
"wind_score",
"combined_score",
"avg_vegetation",
"avg_cloud_cover",
"avg_daily_precip",
"avg_wind_speed",
"avg_wind_power_proxy",
"max_wind_speed",
"avg_temp_c",
]:
top9_pdf[col] = top9_pdf[col].round(3)
print("Top 3 solar, top 3 wind, top 3 solar+wind:")
display(top9_pdf)
print("Solar top 3:")
display(top9_pdf[top9_pdf["category"] == "Solar"].reset_index(drop=True))
print("Wind top 3:")
display(top9_pdf[top9_pdf["category"] == "Wind"].reset_index(drop=True))
print("Solar + Wind top 3:")
display(top9_pdf[top9_pdf["category"] == "Solar + Wind"].reset_index(drop=True))
# ============================================================
# SAVE CSV + INTERACTIVE MAP
# ============================================================
csv_path = "solarwind_top9.csv"
top9_pdf.to_csv(csv_path, index=False)
print("Saved CSV:", csv_path)
def fmt(value, digits=2):
if value is None:
return "N/A"
try:
if math.isnan(value):
return "N/A"
except TypeError:
pass
return f"{value:.{digits}f}"
center_lat = top9_pdf["lat"].mean() if len(top9_pdf) > 0 else 39.0
center_lon = top9_pdf["lon"].mean() if len(top9_pdf) > 0 else -98.0
m = folium.Map(
location=[center_lat, center_lon],
zoom_start=4,
tiles=None,
)
folium.TileLayer(
tiles="https://{s}.basemaps.cartocdn.com/light_all/{z}/{x}/{y}{r}.png",
attr='© <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors © <a href="https://carto.com/attributions">CARTO</a>',
name="CartoDB Positron",
control=False,
).add_to(m)
category_colors = {
"Solar": "orange",
"Wind": "blue",
"Solar + Wind": "green",
}
category_labels = {
"Solar": "S",
"Wind": "W",
"Solar + Wind": "B",
}
category_offsets = {
"Solar": (0.00, -0.08),
"Wind": (0.00, 0.08),
"Solar + Wind": (0.08, 0.00),
}
for i, row in top9_pdf.reset_index(drop=True).iterrows():
rank = i + 1
category = row["category"]
lat_offset, lon_offset = category_offsets.get(category, (0.0, 0.0))
marker_lat = row["lat"] + lat_offset
marker_lon = row["lon"] + lon_offset
popup_html = f"""
<b>{category} candidate #{rank}</b><br>
Geohash: <b>{row['geohash']}</b><br>
Latitude: {row['lat']:.4f}<br>
Longitude: {row['lon']:.4f}<br>
Category score: {fmt(row['category_score'])}<br>
Solar score: {fmt(row['solar_score'])}<br>
Wind score: {fmt(row['wind_score'])}<br>
Combined score: {fmt(row['combined_score'])}<br>
Avg vegetation: {fmt(row['avg_vegetation'])}<br>
Avg cloud cover: {fmt(row['avg_cloud_cover'])}<br>
Avg daily precip: {fmt(row['avg_daily_precip'], 3)}<br>
Avg wind speed: {fmt(row['avg_wind_speed'], 3)}<br>
Avg wind power proxy: {fmt(row['avg_wind_power_proxy'], 3)}<br>
Max wind speed: {fmt(row['max_wind_speed'], 3)}<br>
Avg temp C: {fmt(row['avg_temp_c'])}<br>
Points: {int(row['points'])}
"""
marker_html = f"""
<div style="
background-color: {category_colors.get(category, 'black')};
color: white;
border-radius: 50%;
width: 32px;
height: 32px;
text-align: center;
line-height: 32px;
font-weight: bold;
border: 2px solid white;
box-shadow: 0 0 5px gray;
font-size: 13px;
">
{category_labels.get(category, '?')}{rank}
</div>
"""
folium.Marker(
location=[marker_lat, marker_lon],
popup=folium.Popup(popup_html, max_width=400),
tooltip=f"{category}: {row['geohash']} | score {row['category_score']:.2f}",
icon=folium.DivIcon(html=marker_html),
).add_to(m)
legend_html = """
<div style="
position: fixed;
bottom: 30px;
left: 30px;
z-index: 9999;
background-color: white;
padding: 10px 12px;
border: 2px solid #999;
border-radius: 8px;
font-size: 14px;
box-shadow: 0 0 5px gray;
">
<b>SolarWind Categories</b><br>
<span style="color: orange; font-weight: bold;">●</span> Solar<br>
<span style="color: blue; font-weight: bold;">●</span> Wind<br>
<span style="color: green; font-weight: bold;">●</span> Solar + Wind<br>
</div>
"""
m.get_root().html.add_child(folium.Element(legend_html))
if len(top9_pdf) > 0:
bounds = [
[top9_pdf["lat"].min(), top9_pdf["lon"].min()],
[top9_pdf["lat"].max(), top9_pdf["lon"].max()],
]
m.fit_bounds(bounds)
html_path = "solarwind_top9_map.html"
m.save(html_path)
print("Saved interactive map to:", html_path)
display(IFrame(src=html_path, width=950, height=700))
Deleted old partial output: hdfs:/user/rsbhatia2/solarwind_partials_vegetation_land_proxy Found 6770 files Total files after stride: 283 Batch size: 50 Number of batches: 6 Using column indices: lat: 1 lon: 2 cloud: 12 precip: 13 wind gust: 17 temp: 10 vegetation: 14 Processing batch 0: 50 files
Processing batch 1: 50 files
Processing batch 2: 50 files
Processing batch 3: 50 files
Processing batch 4: 50 files
Processing batch 5: 33 files
Total SolarWind batch runtime: 177.01 seconds
[Stage 1051:=============>(14 + 1) / 15][Stage 1053:=============>(14 + 1) / 15]
Top 3 solar, top 3 wind, top 3 solar+wind:
| category | geohash | lat | lon | category_score | solar_score | wind_score | combined_score | avg_vegetation | avg_cloud_cover | avg_daily_precip | avg_wind_speed | avg_wind_power_proxy | max_wind_speed | avg_temp_c | points | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | Solar | 9mve | 32.956 | -115.867 | 99.997 | 99.997 | 6.541 | 53.269 | 15.324 | 9.372 | 0.003 | 3.986 | 164.238 | 16.208 | 18.479 | 984 |
| 1 | Solar | 9mvd | 32.772 | -115.800 | 99.699 | 99.699 | 12.526 | 56.112 | 15.594 | 9.663 | 0.001 | 4.578 | 310.271 | 18.808 | 18.547 | 656 |
| 2 | Solar | 9mqr | 30.840 | -114.777 | 99.673 | 99.673 | 5.519 | 52.596 | 1.000 | 9.427 | 0.242 | 3.982 | 139.307 | 16.534 | 20.138 | 328 |
| 3 | Wind | 9xmp | 42.097 | -105.316 | 100.000 | 84.474 | 100.000 | 92.237 | 22.888 | 23.421 | 0.884 | 8.803 | 2444.358 | 30.145 | 6.011 | 656 |
| 4 | Wind | f8kf | 46.860 | -60.592 | 99.185 | 40.757 | 99.185 | 69.971 | 43.327 | 62.215 | 4.077 | 10.095 | 2424.465 | 33.301 | 6.727 | 492 |
| 5 | Wind | f8kc | 46.667 | -60.645 | 97.334 | 35.933 | 97.334 | 66.634 | 43.839 | 65.168 | 5.654 | 10.100 | 2379.319 | 32.601 | 5.982 | 820 |
| 6 | Solar + Wind | 9xmp | 42.097 | -105.316 | 92.237 | 84.474 | 100.000 | 92.237 | 22.888 | 23.421 | 0.884 | 8.803 | 2444.358 | 30.145 | 6.011 | 656 |
| 7 | Solar + Wind | 9xmn | 41.907 | -105.273 | 90.685 | 85.231 | 96.139 | 90.685 | 19.524 | 22.720 | 0.857 | 8.905 | 2350.160 | 30.245 | 6.323 | 492 |
| 8 | Solar + Wind | 9xm4 | 41.229 | -105.333 | 87.434 | 84.414 | 90.454 | 87.434 | 22.397 | 23.637 | 0.739 | 9.239 | 2211.469 | 28.337 | 4.174 | 656 |
Solar top 3:
| category | geohash | lat | lon | category_score | solar_score | wind_score | combined_score | avg_vegetation | avg_cloud_cover | avg_daily_precip | avg_wind_speed | avg_wind_power_proxy | max_wind_speed | avg_temp_c | points | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | Solar | 9mve | 32.956 | -115.867 | 99.997 | 99.997 | 6.541 | 53.269 | 15.324 | 9.372 | 0.003 | 3.986 | 164.238 | 16.208 | 18.479 | 984 |
| 1 | Solar | 9mvd | 32.772 | -115.800 | 99.699 | 99.699 | 12.526 | 56.112 | 15.594 | 9.663 | 0.001 | 4.578 | 310.271 | 18.808 | 18.547 | 656 |
| 2 | Solar | 9mqr | 30.840 | -114.777 | 99.673 | 99.673 | 5.519 | 52.596 | 1.000 | 9.427 | 0.242 | 3.982 | 139.307 | 16.534 | 20.138 | 328 |
Wind top 3:
| category | geohash | lat | lon | category_score | solar_score | wind_score | combined_score | avg_vegetation | avg_cloud_cover | avg_daily_precip | avg_wind_speed | avg_wind_power_proxy | max_wind_speed | avg_temp_c | points | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | Wind | 9xmp | 42.097 | -105.316 | 100.000 | 84.474 | 100.000 | 92.237 | 22.888 | 23.421 | 0.884 | 8.803 | 2444.358 | 30.145 | 6.011 | 656 |
| 1 | Wind | f8kf | 46.860 | -60.592 | 99.185 | 40.757 | 99.185 | 69.971 | 43.327 | 62.215 | 4.077 | 10.095 | 2424.465 | 33.301 | 6.727 | 492 |
| 2 | Wind | f8kc | 46.667 | -60.645 | 97.334 | 35.933 | 97.334 | 66.634 | 43.839 | 65.168 | 5.654 | 10.100 | 2379.319 | 32.601 | 5.982 | 820 |
Solar + Wind top 3:
| category | geohash | lat | lon | category_score | solar_score | wind_score | combined_score | avg_vegetation | avg_cloud_cover | avg_daily_precip | avg_wind_speed | avg_wind_power_proxy | max_wind_speed | avg_temp_c | points | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | Solar + Wind | 9xmp | 42.097 | -105.316 | 92.237 | 84.474 | 100.000 | 92.237 | 22.888 | 23.421 | 0.884 | 8.803 | 2444.358 | 30.145 | 6.011 | 656 |
| 1 | Solar + Wind | 9xmn | 41.907 | -105.273 | 90.685 | 85.231 | 96.139 | 90.685 | 19.524 | 22.720 | 0.857 | 8.905 | 2350.160 | 30.245 | 6.323 | 492 |
| 2 | Solar + Wind | 9xm4 | 41.229 | -105.333 | 87.434 | 84.414 | 90.454 | 87.434 | 22.397 | 23.637 | 0.739 | 9.239 | 2211.469 | 28.337 | 4.174 | 656 |
Saved CSV: solarwind_top9.csv Saved interactive map to: solarwind_top9_map.html
SOLARWIND INC. REPORT (CLASSIFIED)¶
We tried to find the best place to put our wind and solar farms.
Our initial run recommended the ocean as a place for our wind farms.
This idea was scrapped after told us that they didn't have enough power to claim the ocean yet.
As such, the oceanic wind farms have been delayed by at least 15 years, till has become the king of the world.
We adjusted the methodology to use surface vegetation as a marker for land mass.
For now, we shall release a public facing report, we think Canada might be a good shout.
SOLARWIND INC. REPORT¶
We ran our spark data analysis tools on the datasets, to find the best place to setup solar and wind farms. Here are the results of our groundbeaking technology:
Solar Farms
The following places were identified as being ideal for solar farms:
- Imperial County, CA
| Solar Score | Geohash | Avg Cloud Cover |
|---|---|---|
| 100 | 9mve | 9.37 |
- Dunaway Road, Imperial County, CA
| Solar Score | Geohash | Avg Cloud Cover |
|---|---|---|
| 99.7 | 9mvd | 9.66 |
- Municipio de San Felipe, Mexico
| Solar Score | Geohash | Avg Cloud Cover |
|---|---|---|
| 99.67 | 9mqr | 9.43 |
The council decided that they were not ready to expand business into Mexico yet. Imperial County shall be the target for the next 5 years.
To see more detailed information, kindly refer to the tables attached above the map.
Wind Farms
The following places were identifed as being ideal for wind farms.
- Albany County, Wyoming
| Wind Score | Geohash | Avg Wind Speed |
|---|---|---|
| 100 | 9xmp | 8.803 |
- Cape North, Nova Scotia, Canada
| Wind Score | Geohash | Avg Wind Speed |
|---|---|---|
| 99.19 | f8kf | 10.095 |
- Cheticamp Flats, Nova Scotia, Canada
| Wind Score | Geohash | Avg Wind Speed |
|---|---|---|
| 97.33 | f8kc | 10.1 |
Solar + Wind Farms
This is some prime real estate. These locations are number one priority, because they support good solar and wind farms in the same location, that will be cost-effective for maintenance.
- Britania Mountain, Wyoming
| Total Score | Wind Score | Solar Score | Geohash |
|---|---|---|---|
| 92.24 | 100 | 84.47 | 9xmp |
- Albany County, Wyoming, United States of America
| Total Score | Wind Score | Solar Score | Geohash |
|---|---|---|---|
| 90.69 | 96.14 | 85.23 | 9xmn |
This location is very close to a government park, so it is unlikely we will be able to do anything here.
- Albany County, Wyoming. This time, near a lake.
| Total Score | Wind Score | Solar Score | Geohash |
|---|---|---|---|
| 87.43 | 90.45 | 84.41 | 9xm4 |
This concludes our report. For more information, please take a look at the tables above the map.
CLIMATE CHANGE¶
from pyspark.sql import functions as F
from pyspark.sql.types import (
StructType, StructField,
StringType, LongType, DoubleType
)
from SparkHandler import Sparky, SparkyBatchContext, SparkyAnalysisContext
import util
from IPython.display import display
import pandas as pd
import matplotlib.pyplot as plt
import os
import time
import math
sc = spark.sparkContext
# ============================================================
# CONFIG
# ============================================================
INPUT_GLOB = "hdfs:///3hr/*/*"
BATCH_SIZE = 50
PARTIAL_OUT = f"hdfs:///user/{sc.sparkUser()}/climate_change_geohash2_partials"
RESULT_OUT = None
FILE_STRIDE = 1
GEOHASH_PRECISION = 2
YEARS_BACK = 5
MIN_YEARS_FOR_TREND = 2
TOP_REGIONS_TO_DISPLAY = 10
TIME_COLUMN = "time"
LAT_COLUMN = "lat"
LON_COLUMN = "lon"
ALBEDO_COLUMN = "albedo_surface"
PRECIPITABLE_WATER_COLUMN = "precipitable_water_entire_atmosphere_single_layer"
PRESSURE_SURFACE_COLUMN = "pressure_surface"
RH_ZERO_COLUMN = "relative_humidity_zerodegc_isotherm"
SNOW_DEPTH_COLUMN = "snow_depth_surface"
TEMP_COLUMN = "temperature_surface"
CLOUD_COLUMN = "total_cloud_cover_entire_atmosphere_single_layer"
PRECIP_COLUMN = "total_precipitation_surface_3_hour_accumulation"
VEGETATION_COLUMN = "vegetation_surface"
VISIBILITY_COLUMN = "visibility_surface"
WIND_GUST_COLUMN = "wind_speed_gust_surface"
_BASE32 = "0123456789bcdefghjkmnpqrstuvwxyz"
def encode_geohash(lat, lon, precision=2):
lat_range = [-90.0, 90.0]
lon_range = [-180.0, 180.0]
bits = [16, 8, 4, 2, 1]
bit = 0
ch = 0
even = True
geohash = []
while len(geohash) < precision:
if even:
mid = (lon_range[0] + lon_range[1]) / 2.0
if lon >= mid:
ch |= bits[bit]
lon_range[0] = mid
else:
lon_range[1] = mid
else:
mid = (lat_range[0] + lat_range[1]) / 2.0
if lat >= mid:
ch |= bits[bit]
lat_range[0] = mid
else:
lat_range[1] = mid
even = not even
if bit < 4:
bit += 1
else:
geohash.append(_BASE32[ch])
bit = 0
ch = 0
return "".join(geohash)
# ============================================================
# SCHEMA
# ============================================================
raw_schema = StructType([
StructField("geohash2", StringType(), False),
StructField("time_ms", LongType(), False),
StructField("lat", DoubleType(), False),
StructField("lon", DoubleType(), False),
StructField("temp_k", DoubleType(), False),
StructField("precipitable_water", DoubleType(), False),
StructField("pressure_surface", DoubleType(), False),
StructField("rh_zerodegc", DoubleType(), False),
StructField("snow_depth", DoubleType(), False),
StructField("cloud_cover", DoubleType(), False),
StructField("precip_3hr", DoubleType(), False),
StructField("vegetation", DoubleType(), False),
StructField("visibility", DoubleType(), False),
StructField("wind_gust", DoubleType(), False),
StructField("albedo", DoubleType(), False),
])
time_idx = None
lat_idx = None
lon_idx = None
albedo_idx = None
precipitable_water_idx = None
pressure_surface_idx = None
rh_zero_idx = None
snow_depth_idx = None
temp_idx = None
cloud_idx = None
precip_idx = None
vegetation_idx = None
visibility_idx = None
wind_gust_idx = None
# ============================================================
# PARSER
# ============================================================
def parse_climate_change_line(line):
try:
if line.startswith("1_time"):
return None
parts = line.rstrip("\t").split("\t")
time_ms = util.safe_float(parts, time_idx)
lat = util.safe_float(parts, lat_idx)
lon = util.safe_float(parts, lon_idx)
temp_k = util.safe_float(parts, temp_idx)
precipitable_water = util.safe_float(parts, precipitable_water_idx)
pressure_surface = util.safe_float(parts, pressure_surface_idx)
rh_zerodegc = util.safe_float(parts, rh_zero_idx)
snow_depth = util.safe_float(parts, snow_depth_idx)
cloud_cover = util.safe_float(parts, cloud_idx)
precip_3hr = util.safe_float(parts, precip_idx)
vegetation = util.safe_float(parts, vegetation_idx)
visibility = util.safe_float(parts, visibility_idx)
wind_gust = util.safe_float(parts, wind_gust_idx)
albedo = util.safe_float(parts, albedo_idx)
if (
time_ms is None
or lat is None
or lon is None
or temp_k is None
or precipitable_water is None
or pressure_surface is None
or rh_zerodegc is None
or snow_depth is None
or cloud_cover is None
or precip_3hr is None
or vegetation is None
or visibility is None
or wind_gust is None
or albedo is None
):
return None
geohash2 = encode_geohash(lat, lon, GEOHASH_PRECISION)
return (
geohash2,
int(time_ms),
lat,
lon,
temp_k,
precipitable_water,
pressure_surface,
rh_zerodegc,
snow_depth,
cloud_cover,
precip_3hr,
vegetation,
visibility,
wind_gust,
albedo,
)
except Exception:
return None
# ============================================================
# PARTIAL TRANSFORM
# geohash2 + year aggregates
# ============================================================
def partial_transform(points):
yearly = (
points
.withColumn(
"timestamp",
F.from_unixtime((F.col("time_ms") / 1000).cast("long")).cast("timestamp")
)
.withColumn("year", F.year("timestamp"))
)
return (
yearly
.groupBy("geohash2", "year")
.agg(
F.sum("lat").alias("sum_lat"),
F.sum("lon").alias("sum_lon"),
F.count("*").alias("count_points"),
F.sum("temp_k").alias("sum_temp_k"),
F.count("temp_k").alias("count_temp"),
F.sum("precipitable_water").alias("sum_precipitable_water"),
F.count("precipitable_water").alias("count_precipitable_water"),
F.sum("pressure_surface").alias("sum_pressure_surface"),
F.count("pressure_surface").alias("count_pressure_surface"),
F.sum("rh_zerodegc").alias("sum_rh_zerodegc"),
F.count("rh_zerodegc").alias("count_rh_zerodegc"),
F.sum("snow_depth").alias("sum_snow_depth"),
F.count("snow_depth").alias("count_snow_depth"),
F.sum("cloud_cover").alias("sum_cloud_cover"),
F.count("cloud_cover").alias("count_cloud_cover"),
F.sum("precip_3hr").alias("sum_precip_3hr"),
F.count("precip_3hr").alias("count_precip_3hr"),
F.sum("vegetation").alias("sum_vegetation"),
F.count("vegetation").alias("count_vegetation"),
F.sum("visibility").alias("sum_visibility"),
F.count("visibility").alias("count_visibility"),
F.sum("wind_gust").alias("sum_wind_gust"),
F.count("wind_gust").alias("count_wind_gust"),
F.sum("albedo").alias("sum_albedo"),
F.count("albedo").alias("count_albedo"),
)
)
# ============================================================
# COMBINE PARTIALS
# ============================================================
def combine_climate_partials(partials):
combined = (
partials
.groupBy("geohash2", "year")
.agg(
F.sum("sum_lat").alias("sum_lat"),
F.sum("sum_lon").alias("sum_lon"),
F.sum("count_points").alias("points"),
F.sum("sum_temp_k").alias("sum_temp_k"),
F.sum("count_temp").alias("count_temp"),
F.sum("sum_precipitable_water").alias("sum_precipitable_water"),
F.sum("count_precipitable_water").alias("count_precipitable_water"),
F.sum("sum_pressure_surface").alias("sum_pressure_surface"),
F.sum("count_pressure_surface").alias("count_pressure_surface"),
F.sum("sum_rh_zerodegc").alias("sum_rh_zerodegc"),
F.sum("count_rh_zerodegc").alias("count_rh_zerodegc"),
F.sum("sum_snow_depth").alias("sum_snow_depth"),
F.sum("count_snow_depth").alias("count_snow_depth"),
F.sum("sum_cloud_cover").alias("sum_cloud_cover"),
F.sum("count_cloud_cover").alias("count_cloud_cover"),
F.sum("sum_precip_3hr").alias("sum_precip_3hr"),
F.sum("count_precip_3hr").alias("count_precip_3hr"),
F.sum("sum_vegetation").alias("sum_vegetation"),
F.sum("count_vegetation").alias("count_vegetation"),
F.sum("sum_visibility").alias("sum_visibility"),
F.sum("count_visibility").alias("count_visibility"),
F.sum("sum_wind_gust").alias("sum_wind_gust"),
F.sum("count_wind_gust").alias("count_wind_gust"),
F.sum("sum_albedo").alias("sum_albedo"),
F.sum("count_albedo").alias("count_albedo"),
)
)
return (
combined
.withColumn("lat", F.col("sum_lat") / F.col("points"))
.withColumn("lon", F.col("sum_lon") / F.col("points"))
.withColumn("avg_temp_k", F.col("sum_temp_k") / F.col("count_temp"))
.withColumn("avg_temp_c", F.col("avg_temp_k") - F.lit(273.15))
.withColumn("avg_precipitable_water", F.col("sum_precipitable_water") / F.col("count_precipitable_water"))
.withColumn("avg_pressure_surface", F.col("sum_pressure_surface") / F.col("count_pressure_surface"))
.withColumn("avg_rh_zerodegc", F.col("sum_rh_zerodegc") / F.col("count_rh_zerodegc"))
.withColumn("avg_snow_depth", F.col("sum_snow_depth") / F.col("count_snow_depth"))
.withColumn("avg_cloud_cover", F.col("sum_cloud_cover") / F.col("count_cloud_cover"))
# NAM total precipitation is a 3-hour accumulation.
.withColumn("avg_precip_3hr", F.col("sum_precip_3hr") / F.col("count_precip_3hr"))
.withColumn("avg_daily_precip", F.col("avg_precip_3hr") * F.lit(8.0))
.withColumn("avg_vegetation", F.col("sum_vegetation") / F.col("count_vegetation"))
.withColumn("avg_visibility", F.col("sum_visibility") / F.col("count_visibility"))
.withColumn("avg_wind_gust", F.col("sum_wind_gust") / F.col("count_wind_gust"))
.withColumn("avg_albedo", F.col("sum_albedo") / F.col("count_albedo"))
.select(
"geohash2",
"year",
"lat",
"lon",
"points",
"avg_temp_c",
"avg_precipitable_water",
"avg_pressure_surface",
"avg_rh_zerodegc",
"avg_snow_depth",
"avg_cloud_cover",
"avg_daily_precip",
"avg_vegetation",
"avg_visibility",
"avg_wind_gust",
"avg_albedo",
)
)
# ============================================================
# SPARKY SETUP
# ============================================================
sparky = Sparky(spark, sc)
batch_ctx = SparkyBatchContext(
glob_path=INPUT_GLOB,
partial_out=PARTIAL_OUT,
schema=raw_schema,
parse_line=parse_climate_change_line,
partial_transform=partial_transform,
batch_size=BATCH_SIZE,
)
input_paths = sparky.setup_job(batch_ctx)
input_paths = input_paths[::FILE_STRIDE]
print("Files used:", len(input_paths))
print("Batch size:", BATCH_SIZE)
print("Number of batches:", (len(input_paths) + BATCH_SIZE - 1) // BATCH_SIZE)
if len(input_paths) == 0:
raise ValueError(f"No files matched: {INPUT_GLOB}")
def required_idx(name):
key = name.strip().lower()
if key not in sparky.columns:
print("Available related columns:")
for col_name, idx in sparky.columns.items():
if name.split("_")[0] in col_name:
print(idx, col_name)
raise ValueError(f"Missing required column: {name}")
return sparky.columns[key]
time_idx = required_idx(TIME_COLUMN)
lat_idx = required_idx(LAT_COLUMN)
lon_idx = required_idx(LON_COLUMN)
albedo_idx = required_idx(ALBEDO_COLUMN)
precipitable_water_idx = required_idx(PRECIPITABLE_WATER_COLUMN)
pressure_surface_idx = required_idx(PRESSURE_SURFACE_COLUMN)
rh_zero_idx = required_idx(RH_ZERO_COLUMN)
snow_depth_idx = required_idx(SNOW_DEPTH_COLUMN)
temp_idx = required_idx(TEMP_COLUMN)
cloud_idx = required_idx(CLOUD_COLUMN)
precip_idx = required_idx(PRECIP_COLUMN)
vegetation_idx = required_idx(VEGETATION_COLUMN)
visibility_idx = required_idx(VISIBILITY_COLUMN)
wind_gust_idx = required_idx(WIND_GUST_COLUMN)
print("Using column indices:")
print("time:", time_idx)
print("lat:", lat_idx)
print("lon:", lon_idx)
print("albedo:", albedo_idx)
print("precipitable water:", precipitable_water_idx)
print("pressure surface:", pressure_surface_idx)
print("RH zero C:", rh_zero_idx)
print("snow depth:", snow_depth_idx)
print("temp:", temp_idx)
print("cloud:", cloud_idx)
print("precip:", precip_idx)
print("vegetation:", vegetation_idx)
print("visibility:", visibility_idx)
print("wind gust:", wind_gust_idx)
# ============================================================
# RUN BATCHES
# ============================================================
overall_start = time.perf_counter()
for batch_id, start in enumerate(range(0, len(input_paths), BATCH_SIZE)):
batch_paths = input_paths[start:start + BATCH_SIZE]
sparky.process_batch(batch_ctx, batch_paths, batch_id)
overall_elapsed = time.perf_counter() - overall_start
print(f"Total Climate Change batch runtime: {overall_elapsed:.2f} seconds")
# ============================================================
# COMBINE
# ============================================================
analysis_ctx = SparkyAnalysisContext(
partial_in=PARTIAL_OUT,
combine_partials=combine_climate_partials,
analyze=lambda df: df,
result_out=RESULT_OUT,
)
yearly_stats_all = sparky.combine(analysis_ctx)
year_bounds = yearly_stats_all.agg(
F.min("year").alias("min_year"),
F.max("year").alias("max_year"),
F.countDistinct("year").alias("n_years"),
).first().asDict()
min_year_available = year_bounds["min_year"]
max_year_available = year_bounds["max_year"]
n_years_available = year_bounds["n_years"]
start_year = max_year_available - YEARS_BACK + 1
print("Available years:", min_year_available, "to", max_year_available)
print("Distinct years:", n_years_available)
print("Using years:", start_year, "to", max_year_available)
yearly_stats = (
yearly_stats_all
.filter((F.col("year") >= start_year) & (F.col("year") <= max_year_available))
)
display(
yearly_stats
.groupBy("year")
.agg(F.countDistinct("geohash2").alias("geohash2_regions"))
.orderBy("year")
.toPandas()
)
# ============================================================
# TEMPERATURE TRENDS BY TWO-CHARACTER GEOHASH
# ============================================================
trend_stats = (
yearly_stats
.groupBy("geohash2")
.agg(
F.avg("lat").alias("lat"),
F.avg("lon").alias("lon"),
F.countDistinct("year").alias("n_years"),
F.min("year").alias("first_year"),
F.max("year").alias("last_year"),
F.avg("year").alias("avg_year"),
F.avg("avg_temp_c").alias("mean_temp_c"),
F.avg(F.col("year") * F.col("avg_temp_c")).alias("avg_year_temp"),
F.avg(F.col("year") * F.col("year")).alias("avg_year_sq"),
)
.withColumn(
"temp_slope_c_per_year",
(
F.col("avg_year_temp") - F.col("avg_year") * F.col("mean_temp_c")
) / (
F.col("avg_year_sq") - F.col("avg_year") * F.col("avg_year")
)
)
.withColumn(
"estimated_temp_change_c",
F.col("temp_slope_c_per_year") * (F.col("last_year") - F.col("first_year"))
)
.filter(F.col("n_years") >= MIN_YEARS_FOR_TREND)
)
warming_regions = (
trend_stats
.filter(F.col("temp_slope_c_per_year") > 0)
.orderBy(F.desc("temp_slope_c_per_year"))
)
cooling_or_flat_regions = (
trend_stats
.filter(F.col("temp_slope_c_per_year") <= 0)
.orderBy(F.asc("temp_slope_c_per_year"))
)
print("Top warming two-character geohash regions:")
display(
warming_regions
.select(
"geohash2",
"lat",
"lon",
"n_years",
"first_year",
"last_year",
"mean_temp_c",
"temp_slope_c_per_year",
"estimated_temp_change_c",
)
.limit(TOP_REGIONS_TO_DISPLAY)
.toPandas()
)
print("Cooling/flat two-character geohash regions:")
display(
cooling_or_flat_regions
.select(
"geohash2",
"lat",
"lon",
"n_years",
"first_year",
"last_year",
"mean_temp_c",
"temp_slope_c_per_year",
"estimated_temp_change_c",
)
.limit(TOP_REGIONS_TO_DISPLAY)
.toPandas()
)
# ============================================================
# CORRELATION MATRIX FOR WARMING REGIONS
# ============================================================
warming_yearly = (
yearly_stats
.join(
warming_regions.select("geohash2"),
on="geohash2",
how="inner",
)
)
feature_cols = [
"avg_temp_c",
"avg_precipitable_water",
"avg_pressure_surface",
"avg_rh_zerodegc",
"avg_snow_depth",
"avg_cloud_cover",
"avg_daily_precip",
"avg_vegetation",
"avg_visibility",
"avg_wind_gust",
"avg_albedo",
]
warming_pdf = warming_yearly.select(
"geohash2",
"year",
*feature_cols,
).toPandas()
print("Rows used for warming-region correlation matrix:", len(warming_pdf))
if len(warming_pdf) == 0:
print("No warming regions found with the current trend filter.")
else:
global_corr = warming_pdf[feature_cols].corr(method="pearson")
print("Global PCC correlation matrix across warming regions:")
display(global_corr.round(3))
# Heatmap
plt.figure(figsize=(11, 9))
im = plt.imshow(global_corr.values, vmin=-1, vmax=1)
plt.colorbar(im, label="Pearson correlation coefficient")
plt.xticks(range(len(feature_cols)), feature_cols, rotation=60, ha="right")
plt.yticks(range(len(feature_cols)), feature_cols)
plt.title("PCC Correlation Matrix for Warming Geohash-2 Regions")
plt.tight_layout()
plot_dir = f"/bigdata/students/{os.environ['USER']}/plots"
os.makedirs(plot_dir, exist_ok=True)
corr_path = f"{plot_dir}/climate_change_global_corr.png"
plt.savefig(corr_path, dpi=150, bbox_inches="tight")
plt.show()
print("Saved correlation heatmap:", corr_path)
# ============================================================
# Do correlations between temperature and other features differ across warming regions?
# ============================================================
regional_corr_exprs = []
for feature in feature_cols:
if feature == "avg_temp_c":
continue
regional_corr_exprs.append(
F.corr("avg_temp_c", feature).alias(f"pcc_temp_vs_{feature}")
)
regional_temp_corrs = (
warming_yearly
.groupBy("geohash2")
.agg(
F.countDistinct("year").alias("n_years"),
*regional_corr_exprs,
)
.filter(F.col("n_years") >= MIN_YEARS_FOR_TREND)
)
print("Temperature-vs-feature PCC by warming region:")
regional_temp_corrs_pdf = regional_temp_corrs.toPandas()
display(regional_temp_corrs_pdf.round(3))
# ============================================================
# Compare variation in correlations across regions
# ============================================================
corr_value_cols = [
c for c in regional_temp_corrs_pdf.columns
if c.startswith("pcc_temp_vs_")
]
if len(regional_temp_corrs_pdf) > 0:
corr_variation = []
for col in corr_value_cols:
series = regional_temp_corrs_pdf[col].dropna()
if len(series) > 0:
corr_variation.append({
"correlation": col,
"min_pcc": series.min(),
"max_pcc": series.max(),
"mean_pcc": series.mean(),
"std_pcc": series.std(),
"range_pcc": series.max() - series.min(),
})
corr_variation_pdf = pd.DataFrame(corr_variation).sort_values(
"range_pcc",
ascending=False,
)
print("How much temp-feature correlations differ by region:")
display(corr_variation_pdf.round(3))
top_warming_geohashes = [
row["geohash2"]
for row in warming_regions
.select("geohash2")
.limit(5)
.collect()
]
for gh in top_warming_geohashes:
region_pdf = warming_pdf[warming_pdf["geohash2"] == gh].copy()
if len(region_pdf) < MIN_YEARS_FOR_TREND:
continue
region_corr = region_pdf[feature_cols].corr(method="pearson")
print(f"Full PCC matrix for warming region {gh}:")
display(region_corr.round(3))
# ============================================================
# SAVE TABLES
# ============================================================
# NOTE: Matthew, change this if you want to run stuff. idk you are allowed you to save things in my dir?
plot_dir = f"/bigdata/students/{os.environ['USER']}/plots"
os.makedirs(plot_dir, exist_ok=True)
warming_csv = f"{plot_dir}/climate_change_warming_regions.csv"
regional_corr_csv = f"{plot_dir}/climate_change_regional_temp_correlations.csv"
warming_regions.toPandas().to_csv(warming_csv, index=False)
regional_temp_corrs_pdf.to_csv(regional_corr_csv, index=False)
print("Saved warming regions:", warming_csv)
print("Saved regional temp correlations:", regional_corr_csv)
Deleted old partial output: hdfs:/user/rsbhatia2/climate_change_geohash2_partials Found 6770 files
Files used: 6770 Batch size: 50 Number of batches: 136 Using column indices: time: 0 lat: 1 lon: 2 albedo: 3 precipitable water: 4 pressure surface: 6 RH zero C: 8 snow depth: 9 temp: 10 cloud: 12 precip: 13 vegetation: 14 visibility: 15 wind gust: 17 Processing batch 0: 50 files
Processing batch 1: 50 files
Processing batch 2: 50 files
Processing batch 3: 50 files
Processing batch 4: 50 files
Processing batch 5: 50 files
Processing batch 6: 50 files
Processing batch 7: 50 files
Processing batch 8: 50 files
Processing batch 9: 50 files
Processing batch 10: 50 files
Processing batch 11: 50 files
Processing batch 12: 50 files
Processing batch 13: 50 files
Processing batch 14: 50 files
Processing batch 15: 50 files
Processing batch 16: 50 files
Processing batch 17: 50 files
Processing batch 18: 50 files
Processing batch 19: 50 files
Processing batch 20: 50 files
Processing batch 21: 50 files
Processing batch 22: 50 files
Processing batch 23: 50 files
Processing batch 24: 50 files
Processing batch 25: 50 files
Processing batch 26: 50 files
Processing batch 27: 50 files
Processing batch 28: 50 files
Processing batch 29: 50 files
Processing batch 30: 50 files
Processing batch 31: 50 files
Processing batch 32: 50 files
Processing batch 33: 50 files
Processing batch 34: 50 files
Processing batch 35: 50 files
Processing batch 36: 50 files
Processing batch 37: 50 files
Processing batch 38: 50 files
Processing batch 39: 50 files
Processing batch 40: 50 files
Processing batch 41: 50 files
Processing batch 42: 50 files
Processing batch 43: 50 files
Processing batch 44: 50 files
Processing batch 45: 50 files
Processing batch 46: 50 files
Processing batch 47: 50 files
Processing batch 48: 50 files
Processing batch 49: 50 files
Processing batch 50: 50 files
Processing batch 51: 50 files
Processing batch 52: 50 files
Processing batch 53: 50 files
Processing batch 54: 50 files
Processing batch 55: 50 files
Processing batch 56: 50 files
Processing batch 57: 50 files
Processing batch 58: 50 files
Processing batch 59: 50 files
Processing batch 60: 50 files
Processing batch 61: 50 files
Processing batch 62: 50 files
Processing batch 63: 50 files
Processing batch 64: 50 files
Processing batch 65: 50 files
Processing batch 66: 50 files
Processing batch 67: 50 files
Processing batch 68: 50 files
Processing batch 69: 50 files
Processing batch 70: 50 files
Processing batch 71: 50 files
Processing batch 72: 50 files
Processing batch 73: 50 files
Processing batch 74: 50 files
Processing batch 75: 50 files
Processing batch 76: 50 files
Processing batch 77: 50 files
Processing batch 78: 50 files
Processing batch 79: 50 files
Processing batch 80: 50 files
Processing batch 81: 50 files
Processing batch 82: 50 files
Processing batch 83: 50 files
Processing batch 84: 50 files
Processing batch 85: 50 files
Processing batch 86: 50 files
Processing batch 87: 50 files
Processing batch 88: 50 files
Processing batch 89: 50 files
Processing batch 90: 50 files
Processing batch 91: 50 files
Processing batch 92: 50 files
Processing batch 93: 50 files
Processing batch 94: 50 files
Processing batch 95: 50 files
Processing batch 96: 50 files
Processing batch 97: 50 files
Processing batch 98: 50 files
Processing batch 99: 50 files
Processing batch 101: 50 files
Processing batch 102: 50 files
Processing batch 103: 50 files
Processing batch 104: 50 files
Processing batch 105: 50 files
Processing batch 106: 50 files
Processing batch 107: 50 files
Processing batch 108: 50 files
Processing batch 109: 50 files
Processing batch 110: 50 files
Processing batch 111: 50 files
Processing batch 112: 50 files
Processing batch 113: 50 files
Processing batch 114: 50 files
Processing batch 115: 50 files
Processing batch 116: 50 files
Processing batch 117: 50 files
Processing batch 118: 50 files
Processing batch 119: 50 files
Processing batch 120: 50 files
Processing batch 121: 50 files
Processing batch 122: 50 files
Processing batch 123: 50 files
Processing batch 124: 50 files
Processing batch 125: 50 files
Processing batch 126: 50 files
Processing batch 127: 50 files
Processing batch 128: 50 files
Processing batch 129: 50 files
Processing batch 130: 50 files
Processing batch 131: 50 files
Processing batch 132: 50 files
Processing batch 133: 50 files
Processing batch 134: 50 files
Processing batch 135: 20 files
Total Climate Change batch runtime: 4697.74 seconds
Available years: 2017 to 2019 Distinct years: 3 Using years: 2015 to 2019
| year | geohash2_regions | |
|---|---|---|
| 0 | 2017 | 77 |
| 1 | 2018 | 77 |
| 2 | 2019 | 77 |
Top warming two-character geohash regions:
| geohash2 | lat | lon | n_years | first_year | last_year | mean_temp_c | temp_slope_c_per_year | estimated_temp_change_c | |
|---|---|---|---|---|---|---|---|---|---|
| 0 | 8x | 44.818510 | -146.372595 | 3 | 2017 | 2019 | 12.233501 | 0.309031 | 0.618062 |
| 1 | b8 | 48.606524 | -147.518343 | 3 | 2017 | 2019 | 10.344493 | 0.251628 | 0.503257 |
| 2 | bb | 47.805079 | -140.624287 | 3 | 2017 | 2019 | 11.473247 | 0.231085 | 0.462169 |
| 3 | b9 | 53.291323 | -148.854043 | 3 | 2017 | 2019 | 8.761258 | 0.230133 | 0.460267 |
| 4 | 8z | 42.307755 | -140.070052 | 3 | 2017 | 2019 | 14.511733 | 0.196659 | 0.393319 |
| 5 | bc | 53.431775 | -140.625296 | 3 | 2017 | 2019 | 9.985080 | 0.003220 | 0.006440 |
Cooling/flat two-character geohash regions:
| geohash2 | lat | lon | n_years | first_year | last_year | mean_temp_c | temp_slope_c_per_year | estimated_temp_change_c | |
|---|---|---|---|---|---|---|---|---|---|
| 0 | f4 | 58.692086 | -84.460206 | 3 | 2017 | 2019 | -5.820832 | -2.999996 | -5.999993 |
| 1 | cd | 58.668861 | -106.779269 | 3 | 2017 | 2019 | -0.262855 | -2.982981 | -5.965962 |
| 2 | c8 | 47.805292 | -106.877264 | 3 | 2017 | 2019 | 6.949232 | -2.922989 | -5.845978 |
| 3 | f3 | 53.435855 | -73.124153 | 3 | 2017 | 2019 | -0.518192 | -2.784224 | -5.568448 |
| 4 | c9 | 53.433213 | -106.876210 | 3 | 2017 | 2019 | 3.537299 | -2.709362 | -5.418725 |
| 5 | cb | 47.798558 | -95.625294 | 3 | 2017 | 2019 | 6.174566 | -2.702436 | -5.404873 |
| 6 | f6 | 58.369469 | -73.322819 | 3 | 2017 | 2019 | -4.076636 | -2.698814 | -5.397629 |
| 7 | cc | 53.438562 | -95.625057 | 3 | 2017 | 2019 | 2.391764 | -2.644396 | -5.288792 |
| 8 | f1 | 53.431203 | -84.376120 | 3 | 2017 | 2019 | -0.525059 | -2.624037 | -5.248074 |
| 9 | cf | 58.797982 | -95.624255 | 3 | 2017 | 2019 | -3.979622 | -2.606682 | -5.213364 |
Rows used for warming-region correlation matrix: 18 Global PCC correlation matrix across warming regions:
| avg_temp_c | avg_precipitable_water | avg_pressure_surface | avg_rh_zerodegc | avg_snow_depth | avg_cloud_cover | avg_daily_precip | avg_vegetation | avg_visibility | avg_wind_gust | avg_albedo | |
|---|---|---|---|---|---|---|---|---|---|---|---|
| avg_temp_c | 1.000 | 0.920 | 0.963 | -0.957 | NaN | -0.108 | -0.028 | NaN | 0.356 | -0.576 | NaN |
| avg_precipitable_water | 0.920 | 1.000 | 0.925 | -0.894 | NaN | 0.032 | 0.334 | NaN | 0.013 | -0.310 | NaN |
| avg_pressure_surface | 0.963 | 0.925 | 1.000 | -0.973 | NaN | -0.000 | 0.076 | NaN | 0.231 | -0.441 | NaN |
| avg_rh_zerodegc | -0.957 | -0.894 | -0.973 | 1.000 | NaN | 0.039 | 0.034 | NaN | -0.249 | 0.478 | NaN |
| avg_snow_depth | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
| avg_cloud_cover | -0.108 | 0.032 | -0.000 | 0.039 | NaN | 1.000 | 0.444 | NaN | -0.525 | 0.508 | NaN |
| avg_daily_precip | -0.028 | 0.334 | 0.076 | 0.034 | NaN | 0.444 | 1.000 | NaN | -0.713 | 0.545 | NaN |
| avg_vegetation | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
| avg_visibility | 0.356 | 0.013 | 0.231 | -0.249 | NaN | -0.525 | -0.713 | NaN | 1.000 | -0.647 | NaN |
| avg_wind_gust | -0.576 | -0.310 | -0.441 | 0.478 | NaN | 0.508 | 0.545 | NaN | -0.647 | 1.000 | NaN |
| avg_albedo | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
Saved correlation heatmap: /bigdata/students/rsbhatia2/plots/climate_change_global_corr.png Temperature-vs-feature PCC by warming region:
| geohash2 | n_years | pcc_temp_vs_avg_precipitable_water | pcc_temp_vs_avg_pressure_surface | pcc_temp_vs_avg_rh_zerodegc | pcc_temp_vs_avg_snow_depth | pcc_temp_vs_avg_cloud_cover | pcc_temp_vs_avg_daily_precip | pcc_temp_vs_avg_vegetation | pcc_temp_vs_avg_visibility | pcc_temp_vs_avg_wind_gust | pcc_temp_vs_avg_albedo | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 8z | 3 | 0.573 | -0.992 | 0.979 | NaN | 0.148 | 0.266 | NaN | 0.417 | -0.993 | NaN |
| 1 | b9 | 3 | -0.664 | 0.742 | -0.265 | NaN | 0.409 | -0.993 | NaN | 1.000 | -0.753 | NaN |
| 2 | b8 | 3 | -0.953 | 0.543 | -0.625 | NaN | -0.716 | -0.981 | NaN | 1.000 | -0.126 | NaN |
| 3 | bb | 3 | -0.401 | 0.306 | -0.615 | NaN | -0.264 | -0.452 | NaN | 0.923 | -0.954 | NaN |
| 4 | bc | 3 | 0.098 | 0.354 | 0.290 | NaN | -0.198 | -0.429 | NaN | 0.057 | -0.880 | NaN |
| 5 | 8x | 3 | -0.359 | -0.099 | -0.726 | NaN | 0.267 | -0.749 | NaN | 0.703 | 0.723 | NaN |
How much temp-feature correlations differ by region:
| correlation | min_pcc | max_pcc | mean_pcc | std_pcc | range_pcc | |
|---|---|---|---|---|---|---|
| 1 | pcc_temp_vs_avg_pressure_surface | -0.992 | 0.742 | 0.142 | 0.622 | 1.734 |
| 6 | pcc_temp_vs_avg_wind_gust | -0.993 | 0.723 | -0.497 | 0.677 | 1.716 |
| 2 | pcc_temp_vs_avg_rh_zerodegc | -0.726 | 0.979 | -0.160 | 0.671 | 1.705 |
| 0 | pcc_temp_vs_avg_precipitable_water | -0.953 | 0.573 | -0.284 | 0.546 | 1.525 |
| 4 | pcc_temp_vs_avg_daily_precip | -0.993 | 0.266 | -0.556 | 0.472 | 1.259 |
| 3 | pcc_temp_vs_avg_cloud_cover | -0.716 | 0.409 | -0.059 | 0.415 | 1.125 |
| 5 | pcc_temp_vs_avg_visibility | 0.057 | 1.000 | 0.683 | 0.380 | 0.943 |
Full PCC matrix for warming region 8x:
| avg_temp_c | avg_precipitable_water | avg_pressure_surface | avg_rh_zerodegc | avg_snow_depth | avg_cloud_cover | avg_daily_precip | avg_vegetation | avg_visibility | avg_wind_gust | avg_albedo | |
|---|---|---|---|---|---|---|---|---|---|---|---|
| avg_temp_c | 1.000 | -0.359 | -0.099 | -0.726 | NaN | 0.267 | -0.749 | NaN | 0.703 | 0.723 | NaN |
| avg_precipitable_water | -0.359 | 1.000 | -0.893 | 0.903 | NaN | 0.803 | 0.887 | NaN | -0.916 | -0.904 | NaN |
| avg_pressure_surface | -0.099 | -0.893 | 1.000 | -0.613 | NaN | -0.985 | -0.585 | NaN | 0.638 | 0.615 | NaN |
| avg_rh_zerodegc | -0.726 | 0.903 | -0.613 | 1.000 | NaN | 0.469 | 0.999 | NaN | -0.999 | -1.000 | NaN |
| avg_snow_depth | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
| avg_cloud_cover | 0.267 | 0.803 | -0.985 | 0.469 | NaN | 1.000 | 0.438 | NaN | -0.497 | -0.472 | NaN |
| avg_daily_precip | -0.749 | 0.887 | -0.585 | 0.999 | NaN | 0.438 | 1.000 | NaN | -0.998 | -0.999 | NaN |
| avg_vegetation | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
| avg_visibility | 0.703 | -0.916 | 0.638 | -0.999 | NaN | -0.497 | -0.998 | NaN | 1.000 | 1.000 | NaN |
| avg_wind_gust | 0.723 | -0.904 | 0.615 | -1.000 | NaN | -0.472 | -0.999 | NaN | 1.000 | 1.000 | NaN |
| avg_albedo | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
Full PCC matrix for warming region b8:
| avg_temp_c | avg_precipitable_water | avg_pressure_surface | avg_rh_zerodegc | avg_snow_depth | avg_cloud_cover | avg_daily_precip | avg_vegetation | avg_visibility | avg_wind_gust | avg_albedo | |
|---|---|---|---|---|---|---|---|---|---|---|---|
| avg_temp_c | 1.000 | -0.953 | 0.543 | -0.625 | NaN | -0.716 | -0.981 | NaN | 1.000 | -0.126 | NaN |
| avg_precipitable_water | -0.953 | 1.000 | -0.262 | 0.357 | NaN | 0.895 | 0.993 | NaN | -0.958 | 0.422 | NaN |
| avg_pressure_surface | 0.543 | -0.262 | 1.000 | -0.995 | NaN | 0.196 | -0.371 | NaN | 0.527 | 0.764 | NaN |
| avg_rh_zerodegc | -0.625 | 0.357 | -0.995 | 1.000 | NaN | -0.097 | 0.462 | NaN | -0.609 | -0.696 | NaN |
| avg_snow_depth | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
| avg_cloud_cover | -0.716 | 0.895 | 0.196 | -0.097 | NaN | 1.000 | 0.837 | NaN | -0.730 | 0.782 | NaN |
| avg_daily_precip | -0.981 | 0.993 | -0.371 | 0.462 | NaN | 0.837 | 1.000 | NaN | -0.985 | 0.315 | NaN |
| avg_vegetation | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
| avg_visibility | 1.000 | -0.958 | 0.527 | -0.609 | NaN | -0.730 | -0.985 | NaN | 1.000 | -0.145 | NaN |
| avg_wind_gust | -0.126 | 0.422 | 0.764 | -0.696 | NaN | 0.782 | 0.315 | NaN | -0.145 | 1.000 | NaN |
| avg_albedo | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
Full PCC matrix for warming region bb:
| avg_temp_c | avg_precipitable_water | avg_pressure_surface | avg_rh_zerodegc | avg_snow_depth | avg_cloud_cover | avg_daily_precip | avg_vegetation | avg_visibility | avg_wind_gust | avg_albedo | |
|---|---|---|---|---|---|---|---|---|---|---|---|
| avg_temp_c | 1.000 | -0.401 | 0.306 | -0.615 | NaN | -0.264 | -0.452 | NaN | 0.923 | -0.954 | NaN |
| avg_precipitable_water | -0.401 | 1.000 | -0.995 | 0.969 | NaN | -0.777 | 0.998 | NaN | -0.723 | 0.659 | NaN |
| avg_pressure_surface | 0.306 | -0.995 | 1.000 | -0.939 | NaN | 0.837 | -0.988 | NaN | 0.649 | -0.579 | NaN |
| avg_rh_zerodegc | -0.615 | 0.969 | -0.939 | 1.000 | NaN | -0.598 | 0.981 | NaN | -0.871 | 0.824 | NaN |
| avg_snow_depth | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
| avg_cloud_cover | -0.264 | -0.777 | 0.837 | -0.598 | NaN | 1.000 | -0.741 | NaN | 0.127 | -0.039 | NaN |
| avg_daily_precip | -0.452 | 0.998 | -0.988 | 0.981 | NaN | -0.741 | 1.000 | NaN | -0.761 | 0.700 | NaN |
| avg_vegetation | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
| avg_visibility | 0.923 | -0.723 | 0.649 | -0.871 | NaN | 0.127 | -0.761 | NaN | 1.000 | -0.996 | NaN |
| avg_wind_gust | -0.954 | 0.659 | -0.579 | 0.824 | NaN | -0.039 | 0.700 | NaN | -0.996 | 1.000 | NaN |
| avg_albedo | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
Full PCC matrix for warming region b9:
| avg_temp_c | avg_precipitable_water | avg_pressure_surface | avg_rh_zerodegc | avg_snow_depth | avg_cloud_cover | avg_daily_precip | avg_vegetation | avg_visibility | avg_wind_gust | avg_albedo | |
|---|---|---|---|---|---|---|---|---|---|---|---|
| avg_temp_c | 1.000 | -0.664 | 0.742 | -0.265 | NaN | 0.409 | -0.993 | NaN | 1.000 | -0.753 | NaN |
| avg_precipitable_water | -0.664 | 1.000 | -0.994 | 0.897 | NaN | -0.954 | 0.574 | NaN | -0.673 | 0.992 | NaN |
| avg_pressure_surface | 0.742 | -0.994 | 1.000 | -0.843 | NaN | 0.915 | -0.660 | NaN | 0.750 | -1.000 | NaN |
| avg_rh_zerodegc | -0.265 | 0.897 | -0.843 | 1.000 | NaN | -0.988 | 0.152 | NaN | -0.276 | 0.834 | NaN |
| avg_snow_depth | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
| avg_cloud_cover | 0.409 | -0.954 | 0.915 | -0.988 | NaN | 1.000 | -0.301 | NaN | 0.419 | -0.909 | NaN |
| avg_daily_precip | -0.993 | 0.574 | -0.660 | 0.152 | NaN | -0.301 | 1.000 | NaN | -0.992 | 0.672 | NaN |
| avg_vegetation | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
| avg_visibility | 1.000 | -0.673 | 0.750 | -0.276 | NaN | 0.419 | -0.992 | NaN | 1.000 | -0.760 | NaN |
| avg_wind_gust | -0.753 | 0.992 | -1.000 | 0.834 | NaN | -0.909 | 0.672 | NaN | -0.760 | 1.000 | NaN |
| avg_albedo | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
Full PCC matrix for warming region 8z:
| avg_temp_c | avg_precipitable_water | avg_pressure_surface | avg_rh_zerodegc | avg_snow_depth | avg_cloud_cover | avg_daily_precip | avg_vegetation | avg_visibility | avg_wind_gust | avg_albedo | |
|---|---|---|---|---|---|---|---|---|---|---|---|
| avg_temp_c | 1.000 | 0.573 | -0.992 | 0.979 | NaN | 0.148 | 0.266 | NaN | 0.417 | -0.993 | NaN |
| avg_precipitable_water | 0.573 | 1.000 | -0.464 | 0.395 | NaN | -0.726 | 0.942 | NaN | -0.507 | -0.472 | NaN |
| avg_pressure_surface | -0.992 | -0.464 | 1.000 | -0.997 | NaN | -0.272 | -0.141 | NaN | -0.529 | 1.000 | NaN |
| avg_rh_zerodegc | 0.979 | 0.395 | -0.997 | 1.000 | NaN | 0.345 | 0.065 | NaN | 0.592 | -0.996 | NaN |
| avg_snow_depth | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
| avg_cloud_cover | 0.148 | -0.726 | -0.272 | 0.345 | NaN | 1.000 | -0.914 | NaN | 0.961 | -0.264 | NaN |
| avg_daily_precip | 0.266 | 0.942 | -0.141 | 0.065 | NaN | -0.914 | 1.000 | NaN | -0.766 | -0.150 | NaN |
| avg_vegetation | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
| avg_visibility | 0.417 | -0.507 | -0.529 | 0.592 | NaN | 0.961 | -0.766 | NaN | 1.000 | -0.521 | NaN |
| avg_wind_gust | -0.993 | -0.472 | 1.000 | -0.996 | NaN | -0.264 | -0.150 | NaN | -0.521 | 1.000 | NaN |
| avg_albedo | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
Saved warming regions: /bigdata/students/rsbhatia2/plots/climate_change_warming_regions.csv Saved regional temp correlations: /bigdata/students/rsbhatia2/plots/climate_change_regional_temp_correlations.csv
Weather Station¶
I am only using August 2019 data. The result is a 10 second, 4fps video that shows how different metrics changed over the month.
Data selection is through the server. Here is how I ran the server for this:
python server.py --input-glob "hdfs:///3hr/2019/*_20*{08}*_*" --host 0.0.0.0 --port 28420 --threads 3 --max-files 1500 --hold-open-after-done --file-stride 4
from collections import defaultdict, deque
import math
import os
import socket
import subprocess
import time
import matplotlib.pyplot as plt
from IPython.display import clear_output, display
from pyspark.streaming import StreamingContext
sc = spark.sparkContext
# ============================================================
# CONFIG
# ============================================================
WEATHER_HOST = "mc08"
WEATHER_PORT = 28420
WEATHER_BATCH_INTERVAL_SECONDS = 5
WEATHER_HISTORY_LIMIT = 500
WEATHER_SAVE_FRAMES = True
WEATHER_FRAME_DIR = f"/bigdata/students/rsbhatia2/weather_station_frames"
WEATHER_VIDEO_OUT = f"/bigdata/students/rsbhatia2/weather_station.mp4"
WEATHER_VIDEO_FPS = 2
WEATHER_CHECKPOINT_DIR = f"hdfs:///user/{sc.sparkUser()}/weather_station_streaming_checkpoint"
TIME_IDX = 0
LAT_IDX = 1
LON_IDX = 2
PRESSURE_SURFACE_IDX = 6
HUMIDITY_IDX = 8
TEMP_SURFACE_IDX = 10
PRECIP_3HR_IDX = 13
VISIBILITY_IDX = 15
WIND_GUST_IDX = 17
WEATHER_LOCATIONS = [
{
"name": "San Francisco Bay Area",
"lat_min": 37.2,
"lat_max": 38.2,
"lon_min": -122.8,
"lon_max": -121.7,
},
{
"name": "Los Angeles Coast",
"lat_min": 33.5,
"lat_max": 34.4,
"lon_min": -118.8,
"lon_max": -117.7,
},
{
"name": "Seattle / Puget Sound",
"lat_min": 47.2,
"lat_max": 47.9,
"lon_min": -122.7,
"lon_max": -121.9,
},
{
"name": "Chicago",
"lat_min": 41.5,
"lat_max": 42.2,
"lon_min": -88.1,
"lon_max": -87.3,
},
{
"name": "Miami",
"lat_min": 25.5,
"lat_max": 26.1,
"lon_min": -80.5,
"lon_max": -80.0,
},
]
WEATHER_METRICS = [
("temp_c", "Temp C"),
("pressure_hpa", "Pressure hPa"),
("humidity_pct", "Humidity %"),
("precip", "Precip"),
("visibility_km", "Visibility km"),
("wind_gust", "Wind Gust"),
]
WEATHER_STREAMING_CONTEXT = None
WEATHER_HISTORY = {
location["name"]: {metric_key: deque(maxlen=WEATHER_HISTORY_LIMIT) for metric_key, _ in WEATHER_METRICS}
for location in WEATHER_LOCATIONS
}
WEATHER_BATCH_LABELS = deque(maxlen=WEATHER_HISTORY_LIMIT)
WEATHER_FRAME_COUNT = 0
WEATHER_STOP_REQUESTED = False
# ============================================================
# PARSING AND AGGREGATION
# ============================================================
def weather_safe_float(parts, idx):
if idx is None or idx >= len(parts):
return None
value = parts[idx].strip()
if value == "" or value.lower() == "null":
return None
return float(value)
def weather_location_for(lat, lon):
for location in WEATHER_LOCATIONS:
if (
location["lat_min"] <= lat <= location["lat_max"]
and location["lon_min"] <= lon <= location["lon_max"]
):
return location["name"]
return None
def empty_weather_summary():
return (0,) + tuple(0.0 for _ in range(len(WEATHER_METRICS) * 2))
def metric_pair(value):
if value is None:
return (0.0, 0.0)
try:
value = float(value)
except Exception:
return (0.0, 0.0)
if not math.isfinite(value):
return (0.0, 0.0)
return (value, 1.0)
def extract_weather_values(parts):
temp_k = weather_safe_float(parts, TEMP_SURFACE_IDX)
pressure_pa = weather_safe_float(parts, PRESSURE_SURFACE_IDX)
humidity_pct = weather_safe_float(parts, HUMIDITY_IDX)
precip = weather_safe_float(parts, PRECIP_3HR_IDX)
visibility_m = weather_safe_float(parts, VISIBILITY_IDX)
wind_gust = weather_safe_float(parts, WIND_GUST_IDX)
return [
temp_k - 273.15 if temp_k is not None else None,
pressure_pa / 100.0 if pressure_pa is not None else None,
humidity_pct,
precip,
visibility_m / 1000.0 if visibility_m is not None else None,
wind_gust,
]
def parse_weather_station_line(line):
try:
if not line or line.startswith("1_time"):
return None
parts = line.rstrip("\t").split("\t")
lat = weather_safe_float(parts, LAT_IDX)
lon = weather_safe_float(parts, LON_IDX)
if lat is None or lon is None:
return None
location_name = weather_location_for(lat, lon)
if location_name is None:
return None
metric_values = extract_weather_values(parts)
metric_pairs = [metric_pair(value) for value in metric_values]
if not any(count > 0 for _, count in metric_pairs):
return None
flattened = []
for metric_sum, metric_count in metric_pairs:
flattened.extend([metric_sum, metric_count])
return (location_name, (1,) + tuple(flattened))
except Exception:
return None
def classify_weather_station_line(line):
counts = [("raw_lines", 1)]
try:
if not line or line.startswith("1_time"):
return counts
parts = line.rstrip("\t").split("\t")
lat = weather_safe_float(parts, LAT_IDX)
lon = weather_safe_float(parts, LON_IDX)
if lat is None or lon is None:
return counts
counts.append(("valid_latlon", 1))
if weather_location_for(lat, lon) is None:
return counts
counts.append(("selected_location", 1))
metric_values = extract_weather_values(parts)
if any(metric_pair(value)[1] > 0 for value in metric_values):
counts.append(("usable_metric", 1))
return counts
except Exception:
counts.append(("parse_errors", 1))
return counts
def combine_weather_summaries(left, right):
return tuple(left[i] + right[i] for i in range(len(left)))
def update_weather_state(new_values, current_state):
if current_state is None:
current_state = empty_weather_summary()
total = current_state
for value in new_values:
total = combine_weather_summaries(total, value)
return total
def weather_state_to_means(state):
means = {}
for idx, (metric_key, _) in enumerate(WEATHER_METRICS):
pair_start = 1 + idx * 2
metric_sum = state[pair_start]
metric_count = state[pair_start + 1]
means[metric_key] = metric_sum / metric_count if metric_count > 0 else None
return means
# ============================================================
# VISUALIZATION
# ============================================================
def update_weather_history(timestamp, rows):
batch_label = timestamp.strftime("%H:%M:%S") if hasattr(timestamp, "strftime") else str(timestamp)
WEATHER_BATCH_LABELS.append(batch_label)
row_by_location = dict(rows)
for location in WEATHER_LOCATIONS:
location_name = location["name"]
means = weather_state_to_means(row_by_location[location_name]) if location_name in row_by_location else None
for metric_key, _ in WEATHER_METRICS:
value = means[metric_key] if means is not None else None
WEATHER_HISTORY[location_name][metric_key].append(value)
def draw_weather_dashboard():
global WEATHER_FRAME_COUNT
fig, axes = plt.subplots(
nrows=len(WEATHER_METRICS),
ncols=len(WEATHER_LOCATIONS),
figsize=(18, 12),
sharex=True,
)
x_values = list(range(len(WEATHER_BATCH_LABELS)))
for row_idx, (metric_key, metric_label) in enumerate(WEATHER_METRICS):
for col_idx, location in enumerate(WEATHER_LOCATIONS):
ax = axes[row_idx][col_idx]
location_name = location["name"]
y_values = list(WEATHER_HISTORY[location_name][metric_key])
ax.plot(x_values[-len(y_values):], y_values, marker="o", linewidth=1.5, markersize=3)
ax.grid(True, alpha=0.25)
if row_idx == 0:
ax.set_title(location_name, fontsize=10)
if col_idx == 0:
ax.set_ylabel(metric_label, fontsize=9)
if row_idx == len(WEATHER_METRICS) - 1:
ax.set_xlabel("Batch", fontsize=9)
fig.suptitle("Weather Station Online Summary", fontsize=16)
fig.tight_layout(rect=[0, 0, 1, 0.97])
if WEATHER_SAVE_FRAMES:
os.makedirs(WEATHER_FRAME_DIR, exist_ok=True)
frame_path = os.path.join(WEATHER_FRAME_DIR, f"weather_station_{WEATHER_FRAME_COUNT:05d}.png")
fig.savefig(frame_path, dpi=120, bbox_inches="tight")
WEATHER_FRAME_COUNT += 1
clear_output(wait=True)
display(fig)
plt.close(fig)
def handle_weather_batch(timestamp, rdd):
if WEATHER_STOP_REQUESTED:
return
rows = rdd.collect()
update_weather_history(timestamp, rows)
draw_weather_dashboard()
total_records = sum(row[1][0] for row in rows)
print(f"Batch: {timestamp} | Online locations: {len(rows)} | Location rows summarized: {total_records}")
def handle_weather_debug_batch(timestamp, rdd):
if WEATHER_STOP_REQUESTED:
return
counts = dict(rdd.collect()) if not rdd.isEmpty() else {}
print(
"Debug counts | "
f"Batch: {timestamp} | "
f"raw lines: {counts.get('raw_lines', 0)} | "
f"valid lat/lon: {counts.get('valid_latlon', 0)} | "
f"selected location: {counts.get('selected_location', 0)} | "
f"usable metric: {counts.get('usable_metric', 0)} | "
f"parse errors: {counts.get('parse_errors', 0)}"
)
# ============================================================
# STREAM CONTROLS
# ============================================================
def build_weather_streaming_context():
ssc = StreamingContext(sc, WEATHER_BATCH_INTERVAL_SECONDS)
ssc.checkpoint(WEATHER_CHECKPOINT_DIR)
lines = ssc.socketTextStream(WEATHER_HOST, WEATHER_PORT)
debug_counts = lines.flatMap(classify_weather_station_line).reduceByKey(lambda left, right: left + right)
debug_counts.foreachRDD(handle_weather_debug_batch)
parsed = lines.map(parse_weather_station_line).filter(lambda row: row is not None)
batch_summaries = parsed.reduceByKey(combine_weather_summaries)
online_summaries = batch_summaries.updateStateByKey(update_weather_state)
online_summaries.foreachRDD(handle_weather_batch)
return ssc
def weather_server_is_listening(timeout_seconds=2.0):
try:
with socket.create_connection((WEATHER_HOST, WEATHER_PORT), timeout=timeout_seconds):
return True
except OSError as exc:
print(f"Could not connect to {WEATHER_HOST}:{WEATHER_PORT}: {exc}")
print("Start server.py first and wait until it prints the Listening message.")
return False
def start_weather_station():
global WEATHER_STREAMING_CONTEXT, WEATHER_STOP_REQUESTED
WEATHER_STOP_REQUESTED = False
if WEATHER_SAVE_FRAMES:
os.makedirs(WEATHER_FRAME_DIR, exist_ok=True)
print(f"Saving dashboard frames to: {WEATHER_FRAME_DIR}")
if WEATHER_STREAMING_CONTEXT is not None:
print("Weather station stream is already running. Call stop_weather_station() first if needed.")
return WEATHER_STREAMING_CONTEXT
print(f"Connecting Spark Streaming to {WEATHER_HOST}:{WEATHER_PORT}")
print("Start server.py first, then call stop_weather_station() when the demo is done.")
if not weather_server_is_listening():
print("Spark Streaming was not started.")
return None
WEATHER_STREAMING_CONTEXT = build_weather_streaming_context()
WEATHER_STREAMING_CONTEXT.start()
return WEATHER_STREAMING_CONTEXT
def run_weather_station():
return start_weather_station()
def stop_weather_station():
global WEATHER_STREAMING_CONTEXT, WEATHER_STOP_REQUESTED
WEATHER_STOP_REQUESTED = True
if WEATHER_STREAMING_CONTEXT is None:
print("Weather station stream is not running.")
return
WEATHER_STREAMING_CONTEXT.stop(stopSparkContext=False, stopGraceFully=False)
WEATHER_STREAMING_CONTEXT = None
print("Weather station stream stopped.")
def reset_weather_station_frames():
global WEATHER_FRAME_COUNT, WEATHER_STOP_REQUESTED
os.makedirs(WEATHER_FRAME_DIR, exist_ok=True)
for filename in os.listdir(WEATHER_FRAME_DIR):
if filename.startswith("weather_station_") and filename.endswith(".png"):
os.remove(os.path.join(WEATHER_FRAME_DIR, filename))
WEATHER_FRAME_COUNT = 0
WEATHER_STOP_REQUESTED = False
print(f"Cleared weather station frames from: {WEATHER_FRAME_DIR}")
print("Weather Station code loaded.")
print("Remote test server command is in the comments at the top of this cell.")
print("Frames save automatically while the dashboard updates.")
print("Run stop_weather_station() when the video has enough dashboard updates.")
Weather Station code loaded. Remote test server command is in the comments at the top of this cell. Frames save automatically while the dashboard updates. Run stop_weather_station() when the video has enough dashboard updates.
reset_weather_station_frames()
Cleared weather station frames from: /bigdata/students/rsbhatia2/weather_station_frames
weather_station_streaming_runtime_start = time.perf_counter()
run_weather_station()
[Stage 2424:> (0 + 1) / 1]
stop_weather_station()
weather_station_streaming_runtime = time.perf_counter() - weather_station_streaming_runtime_start
print(f"Total Weather Station streaming runtime: {weather_station_streaming_runtime:.2f} seconds")
26/05/22 02:26:33 ERROR TaskSchedulerImpl: Lost executor 8 on 10.0.2.31: Command exited with code 50 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7768_17 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6597_46 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9580_45 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6133_9 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6154_52 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9580_14 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6154_9 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7314_15 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9490_45 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7799_8 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7082_16 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6139_9 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6315_32 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7051_39 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9557_45 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9512_14 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9535_14 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7051_16 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7096_16 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6133_32 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7314_38 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6154_32 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6139_32 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6597_35 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9490_14 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6133_52 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6139_52 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9535_45 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9512_45 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6315_52 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9557_14 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7813_8 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6597_12 ! 26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6315_9 !
Weather station stream stopped. Total Weather Station streaming runtime: 420.03 seconds
You can find the video in the repository.
It is called weather_station.mp4
Prediction/Classification¶
Fog Classification With MLlib¶
This extends the least-foggy Bay Area analysis with a logistic regression classifier. The model predicts whether an individual Bay Area weather observation is foggy or clear.
The label is built from visibility_surface: observations with visibility at or below 10000 meters are labeled foggy, observations with visibility at or above 18000 meters are labeled clear, and middle-visibility observations are dropped. Visibility is intentionally not used as a training feature so the model does not simply learn the label rule.
Training features are cloud cover, surface temperature, relative humidity at the zero-degree isotherm, surface pressure, wind gust speed, month, hour, latitude, and longitude. The data is partitioned by time: 2015-2018 train the model, and 2019 tests it.
This improves the least-foggy analysis by checking whether weather variables other than visibility can distinguish foggy from clear Bay Area conditions. If the model performs well, the hand-built clarity ranking is supported by a learned classifier; if it performs poorly, that suggests visibility is doing most of the work and the score should be interpreted more cautiously.
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, LongType, DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import StandardScaler, VectorAssembler
import time
from SparkHandler import Sparky, SparkyBatchContext, SparkyAnalysisContext
import util
sc = spark.sparkContext
fog_classification_runtime_start = time.perf_counter()
# ============================================================
# CONFIG
# ============================================================
FOG_CLASSIFICATION_INPUT_GLOB = "hdfs:///3hr/*/*_20*{05,06,07,08,09}*_*"
FOG_CLASSIFICATION_BATCH_SIZE = 50
FOG_CLASSIFICATION_PARTIAL_OUT = f"hdfs:///user/{sc.sparkUser()}/fog_classification_partials"
# Bay Area bounding box, same as least foggy cause this is directly comparing to that.
FOG_LAT_MIN = 36.8
FOG_LAT_MAX = 38.9
FOG_LON_MIN = -123.2
FOG_LON_MAX = -121.4
FOG_MONTHS = [5, 6, 7, 8, 9]
FOGGY_VISIBILITY_MAX = 10000.0
CLEAR_VISIBILITY_MIN = 18000.0
FOG_FEATURE_COLS = [
"cloud",
"temp_c",
"rh",
"pressure",
"wind",
"month",
"hour",
"lat",
"lon",
]
fog_raw_schema = StructType([
StructField("time_ms", LongType(), False),
StructField("lat", DoubleType(), False),
StructField("lon", DoubleType(), False),
StructField("cloud", DoubleType(), False),
StructField("temp_k", DoubleType(), False),
StructField("rh", DoubleType(), False),
StructField("pressure", DoubleType(), False),
StructField("wind", DoubleType(), False),
StructField("visibility", DoubleType(), False),
])
fog_time_idx = None
fog_lat_idx = None
fog_lon_idx = None
fog_cloud_idx = None
fog_temp_idx = None
fog_rh_idx = None
fog_pressure_idx = None
fog_wind_idx = None
fog_visibility_idx = None
def parse_fog_classification_line(line):
try:
if line.startswith("1_time"):
return None
parts = line.rstrip("\t").split("\t")
time_ms = util.safe_float(parts, fog_time_idx)
lat = util.safe_float(parts, fog_lat_idx)
lon = util.safe_float(parts, fog_lon_idx)
if time_ms is None or lat is None or lon is None:
return None
if not (FOG_LAT_MIN <= lat <= FOG_LAT_MAX and FOG_LON_MIN <= lon <= FOG_LON_MAX):
return None
cloud = util.safe_float(parts, fog_cloud_idx)
temp_k = util.safe_float(parts, fog_temp_idx)
rh = util.safe_float(parts, fog_rh_idx)
pressure = util.safe_float(parts, fog_pressure_idx)
wind = util.safe_float(parts, fog_wind_idx)
visibility = util.safe_float(parts, fog_visibility_idx)
if any(value is None for value in [cloud, temp_k, rh, pressure, wind, visibility]):
return None
return (
int(time_ms),
lat,
lon,
cloud,
temp_k,
rh,
pressure,
wind,
visibility,
)
except Exception:
return None
def fog_classification_partial_transform(points):
labeled = (
points
.withColumn(
"timestamp",
F.from_unixtime((F.col("time_ms") / 1000).cast("long")).cast("timestamp")
)
.withColumn("year", F.year("timestamp"))
.withColumn("month", F.month("timestamp"))
.withColumn("hour", F.hour("timestamp"))
.filter(F.col("month").isin(FOG_MONTHS))
.withColumn("temp_c", F.col("temp_k") - F.lit(273.15))
.withColumn(
"label",
F.when(F.col("visibility") <= F.lit(FOGGY_VISIBILITY_MAX), F.lit(1.0))
.when(F.col("visibility") >= F.lit(CLEAR_VISIBILITY_MIN), F.lit(0.0))
)
.filter(F.col("label").isNotNull())
)
return labeled.select(
"year",
"month",
"hour",
"lat",
"lon",
"cloud",
"temp_c",
"rh",
"pressure",
"wind",
"label",
)
def combine_fog_classification_partials(partials):
return partials
def analyze_fog_classification_rows(rows):
return rows
fog_sparky = Sparky(spark, sc)
fog_batch_ctx = SparkyBatchContext(
glob_path=FOG_CLASSIFICATION_INPUT_GLOB,
partial_out=FOG_CLASSIFICATION_PARTIAL_OUT,
schema=fog_raw_schema,
parse_line=parse_fog_classification_line,
partial_transform=fog_classification_partial_transform,
batch_size=FOG_CLASSIFICATION_BATCH_SIZE,
)
fog_input_paths = fog_sparky.setup_job(fog_batch_ctx)
if len(fog_input_paths) == 0:
raise ValueError(f"No files matched: {FOG_CLASSIFICATION_INPUT_GLOB}")
def fog_required_idx(name):
key = name.strip().lower()
if key not in fog_sparky.columns:
raise ValueError(f"Missing required column: {name}")
return fog_sparky.columns[key]
fog_time_idx = fog_required_idx("time")
fog_lat_idx = fog_required_idx("lat")
fog_lon_idx = fog_required_idx("lon")
fog_cloud_idx = fog_required_idx("total_cloud_cover_entire_atmosphere_single_layer")
fog_temp_idx = fog_required_idx("temperature_surface")
fog_rh_idx = fog_required_idx("relative_humidity_zerodegc_isotherm")
fog_pressure_idx = fog_required_idx("pressure_surface")
fog_wind_idx = fog_required_idx("wind_speed_gust_surface")
fog_visibility_idx = fog_required_idx("visibility_surface")
print("Using column indices:")
print("time:", fog_time_idx)
print("lat:", fog_lat_idx)
print("lon:", fog_lon_idx)
print("cloud cover:", fog_cloud_idx)
print("temperature:", fog_temp_idx)
print("relative humidity:", fog_rh_idx)
print("pressure:", fog_pressure_idx)
print("wind gust:", fog_wind_idx)
print("visibility:", fog_visibility_idx)
print("visibility is used only for labels, not model features")
for batch_id, start in enumerate(range(0, len(fog_input_paths), fog_batch_ctx.batch_size)):
batch_paths = fog_input_paths[start:start + fog_batch_ctx.batch_size]
fog_sparky.process_batch(fog_batch_ctx, batch_paths, batch_id)
Deleted old partial output: hdfs:/user/rsbhatia2/fog_classification_partials Found 4381 files
Using column indices: time: 0 lat: 1 lon: 2 cloud cover: 12 temperature: 10 relative humidity: 8 pressure: 6 wind gust: 17 visibility: 15 visibility is used only for labels, not model features Processing batch 0: 50 files
Processing batch 1: 50 files
Processing batch 2: 50 files
Processing batch 3: 50 files
Processing batch 4: 50 files
Processing batch 5: 50 files
Processing batch 6: 50 files
Processing batch 7: 50 files
Processing batch 8: 50 files
Processing batch 9: 50 files
Processing batch 10: 50 files
Processing batch 11: 50 files
Processing batch 12: 50 files
Processing batch 13: 50 files
Processing batch 14: 50 files
Processing batch 15: 50 files
Processing batch 16: 50 files
Processing batch 17: 50 files
Processing batch 18: 50 files
Processing batch 19: 50 files
Processing batch 20: 50 files
Processing batch 21: 50 files
Processing batch 22: 50 files
Processing batch 23: 50 files
Processing batch 24: 50 files
Processing batch 25: 50 files
Processing batch 26: 50 files
Processing batch 27: 50 files
Processing batch 28: 50 files
Processing batch 29: 50 files
Processing batch 30: 50 files
Processing batch 31: 50 files
Processing batch 32: 50 files
Processing batch 33: 50 files
Processing batch 34: 50 files
Processing batch 35: 50 files
Processing batch 36: 50 files
Processing batch 37: 50 files
Processing batch 38: 50 files
Processing batch 39: 50 files
Processing batch 40: 50 files
Processing batch 41: 50 files
Processing batch 42: 50 files
Processing batch 43: 50 files
Processing batch 44: 50 files
Processing batch 45: 50 files
Processing batch 46: 50 files
Processing batch 47: 50 files
Processing batch 48: 50 files
Processing batch 49: 50 files
Processing batch 50: 50 files
Processing batch 51: 50 files
Processing batch 52: 50 files
Processing batch 53: 50 files
Processing batch 54: 50 files
Processing batch 55: 50 files
Processing batch 56: 50 files
Processing batch 57: 50 files
Processing batch 58: 50 files
Processing batch 59: 50 files
Processing batch 60: 50 files
Processing batch 61: 50 files
Processing batch 62: 50 files
Processing batch 63: 50 files
Processing batch 64: 50 files
Processing batch 65: 50 files
Processing batch 66: 50 files
Processing batch 67: 50 files
Processing batch 68: 50 files
Processing batch 69: 50 files
Processing batch 70: 50 files
Processing batch 71: 50 files
Processing batch 72: 50 files
Processing batch 73: 50 files
Processing batch 74: 50 files
Processing batch 75: 50 files
Processing batch 76: 50 files
Processing batch 77: 50 files
Processing batch 78: 50 files
Processing batch 79: 50 files
Processing batch 80: 50 files
Processing batch 81: 50 files
Processing batch 82: 50 files
Processing batch 83: 50 files
Processing batch 84: 50 files
Processing batch 85: 50 files
Processing batch 86: 50 files
Processing batch 87: 31 files
fog_analysis_ctx = SparkyAnalysisContext(
partial_in=FOG_CLASSIFICATION_PARTIAL_OUT,
combine_partials=combine_fog_classification_partials,
analyze=analyze_fog_classification_rows,
result_out=None,
)
fog_rows = fog_sparky.analyze(fog_analysis_ctx, fog_sparky.combine(fog_analysis_ctx)).cache()
train = fog_rows.filter(F.col("year").between(2015, 2018)).cache()
test = fog_rows.filter(F.col("year") == 2019).cache()
print("Training label counts:")
train.groupBy("label").count().orderBy("label").show()
print("Test label counts:")
test.groupBy("label").count().orderBy("label").show()
train_label_count = train.select("label").distinct().count()
test_label_count = test.select("label").distinct().count()
if train_label_count < 2:
raise ValueError("Training set does not contain both foggy and clear labels.")
if test_label_count < 2:
raise ValueError("Test set does not contain both foggy and clear labels.")
print("Model feature columns:", FOG_FEATURE_COLS)
print("visibility_surface excluded from features:", "visibility" not in FOG_FEATURE_COLS)
assembler = VectorAssembler(
inputCols=FOG_FEATURE_COLS,
outputCol="features",
handleInvalid="skip",
)
scaler = StandardScaler(
inputCol="features",
outputCol="scaled_features",
withStd=True,
withMean=True,
)
lr = LogisticRegression(
labelCol="label",
featuresCol="scaled_features",
maxIter=50,
regParam=0.01,
elasticNetParam=0.0,
)
fog_pipeline = Pipeline(stages=[assembler, scaler, lr])
fog_model = fog_pipeline.fit(train)
fog_predictions = fog_model.transform(test).cache()
print("Prediction rows:", fog_predictions.count())
Training label counts:
+-----+------+ |label| count| +-----+------+ | 0.0|271661| | 1.0| 33021| +-----+------+ Test label counts: +-----+------+ |label| count| +-----+------+ | 0.0|131402| | 1.0| 17243| +-----+------+ Model feature columns: ['cloud', 'temp_c', 'rh', 'pressure', 'wind', 'month', 'hour', 'lat', 'lon'] visibility_surface excluded from features: True
[Stage 1638:====================================================> (57 + 2) / 59]
Prediction rows: 148645
lr_model = fog_model.stages[-1]
coefficient_rows = [
(feature, float(coef))
for feature, coef in zip(FOG_FEATURE_COLS, lr_model.coefficients)
]
coefficient_df = spark.createDataFrame(coefficient_rows, ["feature", "coefficient"])
print("Logistic regression coefficients:")
coefficient_df.orderBy(F.desc(F.abs(F.col("coefficient")))).show(truncate=False)
print("Confusion matrix: rows are actual labels, columns are predictions")
(
fog_predictions
.groupBy("label")
.pivot("prediction", [0.0, 1.0])
.count()
.na.fill(0)
.orderBy("label")
.show()
)
accuracy = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="accuracy",
).evaluate(fog_predictions)
weighted_f1 = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="f1",
).evaluate(fog_predictions)
area_under_roc = BinaryClassificationEvaluator(
labelCol="label",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC",
).evaluate(fog_predictions)
print(f"Accuracy: {accuracy:.4f}")
print(f"Weighted F1: {weighted_f1:.4f}")
print(f"Area under ROC: {area_under_roc:.4f}")
print("Example predictions:")
(
fog_predictions
.select(
"year",
"month",
"hour",
"lat",
"lon",
"label",
"prediction",
"probability",
"cloud",
"temp_c",
"rh",
"pressure",
"wind",
)
.orderBy("year", "month", "hour")
.show(20, truncate=False)
)
fog_classification_runtime = time.perf_counter() - fog_classification_runtime_start
print(f"Total Prediction/Classification Spark runtime: {fog_classification_runtime:.2f} seconds")
Logistic regression coefficients:
+--------+--------------------+ |feature |coefficient | +--------+--------------------+ |cloud |1.1067942230378602 | |temp_c |-0.7872210748882439 | |lon |-0.400521811557659 | |pressure|0.3690058883045009 | |wind |-0.36672343270150165| |month |0.21371538457949868 | |hour |0.15716091232829532 | |lat |-0.15161985573516182| |rh |-0.06357281862133732| +--------+--------------------+ Confusion matrix: rows are actual labels, columns are predictions +-----+------+----+ |label| 0.0| 1.0| +-----+------+----+ | 0.0|129296|2106| | 1.0| 14647|2596| +-----+------+----+
Accuracy: 0.8873 Weighted F1: 0.8577 Area under ROC: 0.9072 Example predictions: +----+-----+----+------------------+-------------------+-----+----------+-----------------------------------------+-----+------------------+----+----------+---------+ |year|month|hour|lat |lon |label|prediction|probability |cloud|temp_c |rh |pressure |wind | +----+-----+----+------------------+-------------------+-----+----------+-----------------------------------------+-----+------------------+----+----------+---------+ |2019|5 |2 |37.76021306282369 |-122.9866273902384 |0.0 |0.0 |[0.6557099198197698,0.3442900801802302] |100.0|14.695400000000006|37.0|101081.0 |4.8545904| |2019|5 |2 |37.76021306282369 |-122.9866273902384 |0.0 |0.0 |[0.6026044674460598,0.3973955325539402] |100.0|13.338459999999998|66.0|101222.89 |3.1258583| |2019|5 |2 |37.76021306282369 |-122.9866273902384 |0.0 |0.0 |[0.6040506172227778,0.39594938277722225] |100.0|13.251059999999995|23.0|101211.266|4.5 | |2019|5 |2 |38.49513874119703 |-122.48812487581753|0.0 |0.0 |[0.8234601659614486,0.17653983403855145] |100.0|11.678500000000042|93.0|97879.695 |2.6258583| |2019|5 |2 |37.76021306282369 |-122.9866273902384 |0.0 |0.0 |[0.7772587866947785,0.22274121330522145] |100.0|13.197660000000042|5.0 |101257.664|13.644336| |2019|5 |2 |37.07226085949479 |-121.8466103871987 |0.0 |0.0 |[0.8415230708804309,0.15847692911956912] |100.0|14.138480000000015|28.0|98074.89 |2.7258582| |2019|5 |2 |37.80626966684229 |-122.03276508131742|1.0 |0.0 |[0.8951205781739128,0.10487942182608723] |79.0 |9.611080000000015 |24.0|98697.67 |6.3 | |2019|5 |2 |37.13199613541767 |-122.82142370021901|0.0 |0.0 |[0.6123541199467968,0.3876458800532032] |100.0|13.59847000000002 |46.0|101278.09 |4.325858 | |2019|5 |2 |37.76021306282369 |-122.9866273902384 |0.0 |0.0 |[0.8160051563116283,0.1839948436883717] |95.0 |13.342200000000048|32.0|101263.4 |13.7301 | |2019|5 |2 |38.68439162763422 |-121.98154998865351|0.0 |0.0 |[0.8545880423737439,0.14541195762625614] |100.0|13.478480000000047|56.0|99533.3 |3.4258583| |2019|5 |2 |37.07226085949479 |-121.8466103871987 |0.0 |0.0 |[0.8876160529681716,0.11238394703182841] |78.0 |11.221060000000023|24.0|97992.87 |2.8 | |2019|5 |2 |37.44953919998765 |-122.2168242814588 |0.0 |0.0 |[0.7763555894809914,0.22364441051900863] |100.0|15.238500000000045|32.0|99916.49 |3.0258584| |2019|5 |2 |38.80965104619415 |-121.87476520122758|0.0 |0.0 |[0.9803487412712196,0.019651258728780374]|17.0 |11.217650000000049|39.0|100447.266|4.1443357| |2019|5 |2 |37.5548823631671 |-121.55606012175988|0.0 |0.0 |[0.9220186583455454,0.07798134165445458] |100.0|12.278470000000027|32.0|95062.89 |2.7258582| |2019|5 |2 |37.13199613541767 |-122.82142370021901|0.0 |0.0 |[0.5844937625445045,0.41550623745549553] |100.0|13.461080000000038|22.0|101248.87 |3.9 | |2019|5 |2 |37.5967978678328 |-121.97931844540852|0.0 |0.0 |[0.817165050891673,0.18283494910832698] |100.0|13.688469999999995|34.0|99323.695 |3.4258583| |2019|5 |2 |37.76021306282369 |-122.9866273902384 |0.0 |0.0 |[0.7055133871642577,0.2944866128357423] |100.0|14.050900000000013|21.0|101608.766|9.306425 | |2019|5 |2 |36.838970056062415|-122.60836677431931|0.0 |0.0 |[0.6105370835193823,0.3894629164806177] |100.0|13.798500000000047|25.0|101310.09 |3.8258584| |2019|5 |2 |38.68439162763422 |-121.98154998865351|0.0 |0.0 |[0.9802840247373842,0.019715975262615815]|0.0 |6.141080000000045 |8.0 |99388.87 |3.4 | |2019|5 |2 |36.84100827316722 |-121.92525054696785|0.0 |0.0 |[0.7255518025469067,0.27444819745309335] |100.0|14.37847000000005 |27.0|101310.09 |3.2258582| +----+-----+----+------------------+-------------------+-----+----------+-----------------------------------------+-----+------------------+----+----------+---------+ only showing top 20 rows Total Prediction/Classification Spark runtime: 1312.98 seconds
Fog Classification Results¶
The classifier was trained on 304682 labeled Bay Area observations from 2015-2018 and tested on 148645 labeled observations from 2019. The training set contains 271661 clear observations and 33021 foggy observations. The test set contains 131402 clear observations and 17243 foggy observations, so the data is strongly weighted toward clear conditions.
The model performs well overall on the 2019 holdout set: accuracy is 0.8873, weighted F1 is 0.8577, and area under ROC is 0.9072. The ROC score is the most useful headline metric here because it shows the model can separate foggy and clear observations well even though visibility_surface was excluded from the training features.
The strongest learned signal is cloud cover. Its coefficient is 1.1068, so higher cloud cover pushes predictions toward foggy. Surface temperature has a coefficient of -0.7872, meaning warmer observations push predictions toward clear. Longitude, pressure, wind, month, hour, and latitude also affect the prediction. Relative humidity has the smallest coefficient, -0.0636, so it contributes little in this fitted model.
The confusion matrix shows that the model is much stronger at identifying clear observations than foggy observations. It correctly predicts 129296 clear observations and misclassifies only 2106 clear observations as foggy. For foggy observations, it correctly predicts 2596 and misses 14647. In other words, this model is conservative about predicting fog, which makes sense given the class imbalance in the test set.
This improves the least-foggy analysis because it checks the hand-built clarity score against a learned model. The model uses weather variables other than visibility and still gets strong holdout performance, especially for identifying clear conditions. The main caveat is that the current classifier should be treated as a clear-condition detector more than a fog-event detector unless the training data is rebalanced or the decision threshold is tuned.
The main problem here, is that clear observations dominate the dataset. So the classifier mostly learns how to avoid false positives for fog predictions. As a model that is limiting, but since the we are talking about least foggy here, being able to accurately predict if there is going to be fog is useful for Least Foggy.
Part 2¶
Clash Royale¶
Clash Royale is a 2016 real-time strategy mobile game developed and published by Supercell. It combines elements of collectible card games, tower defense, and multiplayer online battle arena (MOBA). Players collect cards and create a deck to use in 3–5 minute battles, with the aim to destroy opposing towers whilst defending. (from wikipedia)
Number of games in the dataset: 37.9 Million
Task 1: Full-Deck Elo¶
In this task, I am going to apply an aggregated Elo system to decks. Each deck is made of 8 cards, and I consider all combinations of the cards as the same deck.
Example: for 3 card decks, [a, b, c] and [c, a, b] are the same deck for this task.
from pyspark import StorageLevel
from pyspark.sql import functions as F
import time
ROYALE_BATTLES_GLOB = "hdfs:///royale/*/*.csv"
ROYALE_CARD_MASTER_PATH = "hdfs:///royale/CardMasterListSeason18_12082020.csv"
ROYALE_TASK1_PARTIAL_OUT = "hdfs:///sparky/royale_task1_matchup_partials"
ROYALE_TASK1_MATCHUP_OUT = "hdfs:///sparky/royale_task1_matchups"
ROYALE_TASK1_RATINGS_OUT = "hdfs:///sparky/royale_task1_ratings"
ROYALE_TASK1_BATCH_SIZE = 1
ROYALE_TASK1_K = 32.0
ROYALE_TASK1_ITERATIONS = 5
ROYALE_TASK1_MIN_GAMES = 100
# Prevent Spark from spending heap building enormous plan strings when an action starts.
# Kept running out of heap space when doing this, so our overlords suggested this.
spark.conf.set("spark.sql.maxPlanStringLength", "4096")
previous_log_level = sc.getLogLevel() if hasattr(sc, "getLogLevel") else "WARN"
sc.setLogLevel("ERROR")
royale_task1_runtime_start = time.perf_counter()
winner_card_cols = [f"winner.card{i}.id" for i in range(1, 9)]
loser_card_cols = [f"loser.card{i}.id" for i in range(1, 9)]
def clear_hdfs_path(path):
hadoop_path = spark._jvm.org.apache.hadoop.fs.Path(path)
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
if fs.exists(hadoop_path):
fs.delete(hadoop_path, True)
print(f"Deleted old output: {path}")
def hdfs_glob_paths(glob_path):
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
statuses = fs.globStatus(spark._jvm.org.apache.hadoop.fs.Path(glob_path))
if statuses is None:
return []
return sorted(str(status.getPath()) for status in statuses)
def deck_key(cols):
return F.concat_ws(
"|",
F.array_sort(F.array(*[F.col(f"`{col}`").cast("string") for col in cols])),
)
def build_matchup_counts(df):
return (
df
.select(
deck_key(winner_card_cols).alias("winner_deck"),
deck_key(loser_card_cols).alias("loser_deck"),
)
.where(
F.col("winner_deck").isNotNull()
& F.col("loser_deck").isNotNull()
& (F.size(F.split(F.col("winner_deck"), "\\|")) == 8)
& (F.size(F.split(F.col("loser_deck"), "\\|")) == 8)
)
.groupBy("winner_deck", "loser_deck")
.agg(F.count("*").alias("games"))
)
battle_paths = hdfs_glob_paths(ROYALE_BATTLES_GLOB)
if len(battle_paths) == 0:
raise ValueError(f"No battle CSV files matched: {ROYALE_BATTLES_GLOB}")
print(f"Found {len(battle_paths)} battle CSV files")
clear_hdfs_path(ROYALE_TASK1_PARTIAL_OUT)
clear_hdfs_path(ROYALE_TASK1_MATCHUP_OUT)
clear_hdfs_path(ROYALE_TASK1_RATINGS_OUT)
for batch_id, start in enumerate(range(0, len(battle_paths), ROYALE_TASK1_BATCH_SIZE)):
batch_paths = battle_paths[start:start + ROYALE_TASK1_BATCH_SIZE]
print(f"Processing Clash Royale Task 1 batch {batch_id}: {len(batch_paths)} file(s)")
batch_df = (
spark.read
.option("header", "true")
.option("multiLine", "false")
.csv(batch_paths)
)
missing_cols = [col for col in winner_card_cols + loser_card_cols if col not in batch_df.columns]
if missing_cols:
raise ValueError(f"Missing expected battle columns in batch {batch_id}: {missing_cols}")
partial_matchups = build_matchup_counts(batch_df)
(
partial_matchups
.write
.mode("overwrite")
.parquet(f"{ROYALE_TASK1_PARTIAL_OUT}/batch_{batch_id:04d}")
)
print(f"Wrote partial matchup counts for batch {batch_id}")
combined_matchups = (
spark.read
.parquet(f"{ROYALE_TASK1_PARTIAL_OUT}/batch_*")
.groupBy("winner_deck", "loser_deck")
.agg(F.sum("games").alias("games"))
)
(
combined_matchups
.write
.mode("overwrite")
.parquet(ROYALE_TASK1_MATCHUP_OUT)
)
matchups = spark.read.parquet(ROYALE_TASK1_MATCHUP_OUT).persist(StorageLevel.DISK_ONLY)
matchup_count = matchups.count()
print(f"Combined matchup rows: {matchup_count}")
initial_ratings = (
matchups.select(F.col("winner_deck").alias("deck"))
.union(matchups.select(F.col("loser_deck").alias("deck")))
.distinct()
.withColumn("elo", F.lit(800.0))
)
initial_rating_path = f"{ROYALE_TASK1_RATINGS_OUT}/iteration_0000"
(
initial_ratings
.write
.mode("overwrite")
.parquet(initial_rating_path)
)
ratings = spark.read.parquet(initial_rating_path).persist(StorageLevel.DISK_ONLY)
ratings.count()
for iteration in range(ROYALE_TASK1_ITERATIONS):
w = ratings.select(F.col("deck").alias("winner_deck"), F.col("elo").alias("winner_elo"))
l = ratings.select(F.col("deck").alias("loser_deck"), F.col("elo").alias("loser_elo"))
matchup_deltas = (
matchups
.join(w, "winner_deck")
.join(l, "loser_deck")
.withColumn(
"expected_win",
F.expr("1.0 / (1.0 + pow(10.0, (loser_elo - winner_elo) / 400.0))"),
)
.withColumn("winner_delta", F.lit(ROYALE_TASK1_K) * F.col("games") * (1.0 - F.col("expected_win")))
.withColumn("loser_delta", -F.col("winner_delta"))
)
deck_deltas = (
matchup_deltas.select(
F.col("winner_deck").alias("deck"),
F.col("winner_delta").alias("delta"),
F.col("games").alias("games_seen"),
)
.union(
matchup_deltas.select(
F.col("loser_deck").alias("deck"),
F.col("loser_delta").alias("delta"),
F.col("games").alias("games_seen"),
)
)
.groupBy("deck")
.agg(
F.sum("delta").alias("total_delta"),
F.sum("games_seen").alias("games_seen"),
)
.select("deck", (F.col("total_delta") / F.col("games_seen")).alias("delta"))
)
updated_ratings = (
ratings
.join(deck_deltas, "deck", "left")
.select("deck", (F.col("elo") + F.coalesce(F.col("delta"), F.lit(0.0))).alias("elo"))
)
next_rating_path = f"{ROYALE_TASK1_RATINGS_OUT}/iteration_{iteration + 1:04d}"
(
updated_ratings
.write
.mode("overwrite")
.parquet(next_rating_path)
)
ratings.unpersist(blocking=False)
ratings = spark.read.parquet(next_rating_path).persist(StorageLevel.DISK_ONLY)
ratings.count()
print(f"Finished Elo iteration {iteration + 1}/{ROYALE_TASK1_ITERATIONS}")
deck_game_counts = (
matchups.select(F.col("winner_deck").alias("deck"), F.col("games").alias("games_seen"))
.union(matchups.select(F.col("loser_deck").alias("deck"), F.col("games").alias("games_seen")))
.groupBy("deck")
.agg(F.sum("games_seen").alias("games_seen"))
)
rating_rows = (
ratings
.join(deck_game_counts, "deck", "left")
.select("deck", F.round("elo", 2).alias("elo"), F.coalesce(F.col("games_seen"), F.lit(0)).alias("games_seen"))
.persist(StorageLevel.DISK_ONLY)
)
unique_deck_count = rating_rows.count()
ranked_rating_rows = rating_rows.where(F.col("games_seen") >= ROYALE_TASK1_MIN_GAMES).persist(StorageLevel.DISK_ONLY)
ranked_deck_count = ranked_rating_rows.count()
card_master_df = (
spark.read
.option("header", "true")
.csv(ROYALE_CARD_MASTER_PATH)
)
card_id_col = "team.card1.id" if "team.card1.id" in card_master_df.columns else card_master_df.columns[0]
card_name_col = "team.card1.name" if "team.card1.name" in card_master_df.columns else card_master_df.columns[1]
card_name_rows = (
card_master_df
.select(F.col(f"`{card_id_col}`").cast("string").alias("card_id"), F.col(f"`{card_name_col}`").alias("card_name"))
.where(F.col("card_id").isNotNull() & F.col("card_name").isNotNull())
.collect()
)
card_names = {row["card_id"]: row["card_name"] for row in card_name_rows}
def display_deck(deck):
return ", ".join(card_names.get(card_id, card_id) for card_id in deck.split("|"))
def show_ranked_decks(title, rows):
print(title)
for rank, row in enumerate(rows, start=1):
print(f"{rank}. Elo {row['elo']:.2f}, games {row['games_seen']}: {display_deck(row['deck'])}")
print()
print(f"Unique decks seen: {unique_deck_count}")
print(f"Decks with at least {ROYALE_TASK1_MIN_GAMES} games: {ranked_deck_count}")
top_5_decks = ranked_rating_rows.orderBy(F.desc("elo")).limit(5).collect()
bottom_5_decks = ranked_rating_rows.orderBy(F.asc("elo")).limit(5).collect()
show_ranked_decks(f"Top 5 full decks with at least {ROYALE_TASK1_MIN_GAMES} games", top_5_decks)
show_ranked_decks(f"Bottom 5 full decks with at least {ROYALE_TASK1_MIN_GAMES} games", bottom_5_decks)
royale_task1_runtime = time.perf_counter() - royale_task1_runtime_start
print(f"Total Clash Royale Task 1 Spark runtime: {royale_task1_runtime:.2f} seconds")
sc.setLogLevel(previous_log_level)
Found 10 battle CSV files Deleted old output: hdfs:///sparky/royale_task1_matchup_partials Deleted old output: hdfs:///sparky/royale_task1_matchups Deleted old output: hdfs:///sparky/royale_task1_ratings Processing Clash Royale Task 1 batch 0: 1 file(s)
Wrote partial matchup counts for batch 0 Processing Clash Royale Task 1 batch 1: 1 file(s)
Wrote partial matchup counts for batch 1 Processing Clash Royale Task 1 batch 2: 1 file(s)
Wrote partial matchup counts for batch 2 Processing Clash Royale Task 1 batch 3: 1 file(s)
Wrote partial matchup counts for batch 3 Processing Clash Royale Task 1 batch 4: 1 file(s)
Wrote partial matchup counts for batch 4 Processing Clash Royale Task 1 batch 5: 1 file(s)
Wrote partial matchup counts for batch 5 Processing Clash Royale Task 1 batch 6: 1 file(s)
Wrote partial matchup counts for batch 6 Processing Clash Royale Task 1 batch 7: 1 file(s)
Wrote partial matchup counts for batch 7 Processing Clash Royale Task 1 batch 8: 1 file(s)
Wrote partial matchup counts for batch 8 Processing Clash Royale Task 1 batch 9: 1 file(s)
Wrote partial matchup counts for batch 9
Combined matchup rows: 35768615
Finished Elo iteration 1/5
Finished Elo iteration 2/5
Finished Elo iteration 3/5
]
Finished Elo iteration 4/5
Finished Elo iteration 5/5
Unique decks seen: 16074675 Decks with at least 100 games: 23929 Top 5 full decks with at least 100 games 1. Elo 859.35, games 115: P.E.K.K.A, Valkyrie, Mega Minion, Goblin Hut, Furnace, Fireball, Arrows, Zap 2. Elo 859.16, games 721: Goblins, Valkyrie, Musketeer, Hog Rider, Inferno Tower, Fireball, Arrows, Zap 3. Elo 858.25, games 111: Knight, Minions, Balloon, Lumberjack, Tesla, Fireball, Arrows, Zap 4. Elo 854.86, games 217: Balloon, Valkyrie, Inferno Dragon, Electro Wizard, Mega Knight, Electro Spirit, Poison, Giant Snowball 5. Elo 853.19, games 188: Balloon, Valkyrie, Wizard, Electro Wizard, Mega Knight, Firecracker, Goblin Barrel, The Log Bottom 5 full decks with at least 100 games 1. Elo 729.01, games 174: Fireball, Arrows, Lightning, Zap, Poison, The Log, Tornado, Giant Snowball 2. Elo 729.52, games 172: Fireball, Arrows, Rocket, Lightning, Zap, Poison, The Log, Giant Snowball 3. Elo 729.82, games 194: Fireball, Arrows, Rocket, Zap, Poison, The Log, Tornado, Heal Spirit 4. Elo 730.04, games 106: Fireball, Arrows, Rage, Zap, Poison, Tornado, Earthquake, Giant Snowball 5. Elo 730.60, games 178: Fireball, Arrows, Zap, Poison, The Log, Tornado, Earthquake, Giant Snowball Total Clash Royale Task 1 Spark runtime: 788.18 seconds
Interpretation¶
The highest rated deck is as shown above with 115 games. I think deck 2 is probably the best one, giving that its Elo is very close (0.19 diff) and there have been almost 6 times as many games as the first deck.
Just looking at the Bottom decks, itseems like haing a bunch of spells is really really bad.
Task 2: First-Three-Slot Sub-Deck Elo¶
In this task , I am looking at the first-three slots only, as those are powered slots that allow cards that can be powered up to activate their powers.
This is less accurate, because there are games in the dataset where players might choose to not have powered cards or maybe they don't have enough powered cards. However, this was still interesting.
from pyspark import StorageLevel
from pyspark.sql import functions as F
import time
ROYALE_BATTLES_GLOB = "hdfs:///royale/*/*.csv"
ROYALE_CARD_MASTER_PATH = "hdfs:///royale/CardMasterListSeason18_12082020.csv"
ROYALE_TASK2_PARTIAL_OUT = "hdfs:///sparky/royale_task2_matchup_partials"
ROYALE_TASK2_MATCHUP_OUT = "hdfs:///sparky/royale_task2_matchups"
ROYALE_TASK2_RATINGS_OUT = "hdfs:///sparky/royale_task2_ratings"
ROYALE_TASK2_BATCH_SIZE = 1
ROYALE_TASK2_K = 32.0
ROYALE_TASK2_ITERATIONS = 5
ROYALE_TASK2_MIN_GAMES = 100
spark.conf.set("spark.sql.maxPlanStringLength", "4096")
previous_log_level = sc.getLogLevel() if hasattr(sc, "getLogLevel") else "WARN"
sc.setLogLevel("ERROR")
royale_task2_runtime_start = time.perf_counter()
winner_subdeck_cols = [f"winner.card{i}.id" for i in range(1, 4)]
loser_subdeck_cols = [f"loser.card{i}.id" for i in range(1, 4)]
def clear_hdfs_path_task2(path):
hadoop_path = spark._jvm.org.apache.hadoop.fs.Path(path)
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
if fs.exists(hadoop_path):
fs.delete(hadoop_path, True)
print(f"Deleted old output: {path}")
def hdfs_glob_paths_task2(glob_path):
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
statuses = fs.globStatus(spark._jvm.org.apache.hadoop.fs.Path(glob_path))
if statuses is None:
return []
return sorted(str(status.getPath()) for status in statuses)
def ordered_subdeck_key(cols):
return F.concat_ws("|", *[F.trim(F.col(f"`{col}`").cast("string")) for col in cols])
def complete_subdeck_filter(cols):
checks = [F.col(f"`{col}`").isNotNull() & (F.trim(F.col(f"`{col}`").cast("string")) != "") for col in cols]
condition = checks[0]
for check in checks[1:]:
condition = condition & check
return condition
def build_task2_matchup_counts(df):
return (
df
.where(complete_subdeck_filter(winner_subdeck_cols) & complete_subdeck_filter(loser_subdeck_cols))
.select(
ordered_subdeck_key(winner_subdeck_cols).alias("winner_deck"),
ordered_subdeck_key(loser_subdeck_cols).alias("loser_deck"),
)
.groupBy("winner_deck", "loser_deck")
.agg(F.count("*").alias("games"))
)
battle_paths = hdfs_glob_paths_task2(ROYALE_BATTLES_GLOB)
if len(battle_paths) == 0:
raise ValueError(f"No battle CSV files matched: {ROYALE_BATTLES_GLOB}")
print(f"Found {len(battle_paths)} battle CSV files")
clear_hdfs_path_task2(ROYALE_TASK2_PARTIAL_OUT)
clear_hdfs_path_task2(ROYALE_TASK2_MATCHUP_OUT)
clear_hdfs_path_task2(ROYALE_TASK2_RATINGS_OUT)
for batch_id, start in enumerate(range(0, len(battle_paths), ROYALE_TASK2_BATCH_SIZE)):
batch_paths = battle_paths[start:start + ROYALE_TASK2_BATCH_SIZE]
print(f"Processing Clash Royale Task 2 batch {batch_id}: {len(batch_paths)} file(s)")
batch_df = (
spark.read
.option("header", "true")
.option("multiLine", "false")
.csv(batch_paths)
)
missing_cols = [col for col in winner_subdeck_cols + loser_subdeck_cols if col not in batch_df.columns]
if missing_cols:
raise ValueError(f"Missing expected battle columns in batch {batch_id}: {missing_cols}")
partial_matchups = build_task2_matchup_counts(batch_df)
(
partial_matchups
.write
.mode("overwrite")
.parquet(f"{ROYALE_TASK2_PARTIAL_OUT}/batch_{batch_id:04d}")
)
print(f"Wrote partial matchup counts for batch {batch_id}")
combined_matchups = (
spark.read
.parquet(f"{ROYALE_TASK2_PARTIAL_OUT}/batch_*")
.groupBy("winner_deck", "loser_deck")
.agg(F.sum("games").alias("games"))
)
(
combined_matchups
.write
.mode("overwrite")
.parquet(ROYALE_TASK2_MATCHUP_OUT)
)
matchups = spark.read.parquet(ROYALE_TASK2_MATCHUP_OUT).persist(StorageLevel.DISK_ONLY)
matchup_count = matchups.count()
print(f"Combined matchup rows: {matchup_count}")
initial_ratings = (
matchups.select(F.col("winner_deck").alias("deck"))
.union(matchups.select(F.col("loser_deck").alias("deck")))
.distinct()
.withColumn("elo", F.lit(800.0))
)
initial_rating_path = f"{ROYALE_TASK2_RATINGS_OUT}/iteration_0000"
(
initial_ratings
.write
.mode("overwrite")
.parquet(initial_rating_path)
)
ratings = spark.read.parquet(initial_rating_path).persist(StorageLevel.DISK_ONLY)
ratings.count()
for iteration in range(ROYALE_TASK2_ITERATIONS):
w = ratings.select(F.col("deck").alias("winner_deck"), F.col("elo").alias("winner_elo"))
l = ratings.select(F.col("deck").alias("loser_deck"), F.col("elo").alias("loser_elo"))
matchup_deltas = (
matchups
.join(w, "winner_deck")
.join(l, "loser_deck")
.withColumn(
"expected_win",
F.expr("1.0 / (1.0 + pow(10.0, (loser_elo - winner_elo) / 400.0))"),
)
.withColumn("winner_delta", F.lit(ROYALE_TASK2_K) * F.col("games") * (1.0 - F.col("expected_win")))
.withColumn("loser_delta", -F.col("winner_delta"))
)
deck_deltas = (
matchup_deltas.select(
F.col("winner_deck").alias("deck"),
F.col("winner_delta").alias("delta"),
F.col("games").alias("games_seen"),
)
.union(
matchup_deltas.select(
F.col("loser_deck").alias("deck"),
F.col("loser_delta").alias("delta"),
F.col("games").alias("games_seen"),
)
)
.groupBy("deck")
.agg(
F.sum("delta").alias("total_delta"),
F.sum("games_seen").alias("games_seen"),
)
.select("deck", (F.col("total_delta") / F.col("games_seen")).alias("delta"))
)
updated_ratings = (
ratings
.join(deck_deltas, "deck", "left")
.select("deck", (F.col("elo") + F.coalesce(F.col("delta"), F.lit(0.0))).alias("elo"))
)
next_rating_path = f"{ROYALE_TASK2_RATINGS_OUT}/iteration_{iteration + 1:04d}"
(
updated_ratings
.write
.mode("overwrite")
.parquet(next_rating_path)
)
ratings.unpersist(blocking=False)
ratings = spark.read.parquet(next_rating_path).persist(StorageLevel.DISK_ONLY)
ratings.count()
print(f"Finished Elo iteration {iteration + 1}/{ROYALE_TASK2_ITERATIONS}")
deck_game_counts = (
matchups.select(F.col("winner_deck").alias("deck"), F.col("games").alias("games_seen"))
.union(matchups.select(F.col("loser_deck").alias("deck"), F.col("games").alias("games_seen")))
.groupBy("deck")
.agg(F.sum("games_seen").alias("games_seen"))
)
rating_rows = (
ratings
.join(deck_game_counts, "deck", "left")
.select("deck", F.round("elo", 2).alias("elo"), F.coalesce(F.col("games_seen"), F.lit(0)).alias("games_seen"))
.persist(StorageLevel.DISK_ONLY)
)
unique_deck_count = rating_rows.count()
ranked_rating_rows = rating_rows.where(F.col("games_seen") >= ROYALE_TASK2_MIN_GAMES).persist(StorageLevel.DISK_ONLY)
ranked_deck_count = ranked_rating_rows.count()
card_master_df = (
spark.read
.option("header", "true")
.csv(ROYALE_CARD_MASTER_PATH)
)
card_id_col = "team.card1.id" if "team.card1.id" in card_master_df.columns else card_master_df.columns[0]
card_name_col = "team.card1.name" if "team.card1.name" in card_master_df.columns else card_master_df.columns[1]
card_name_rows = (
card_master_df
.select(F.col(f"`{card_id_col}`").cast("string").alias("card_id"), F.col(f"`{card_name_col}`").alias("card_name"))
.where(F.col("card_id").isNotNull() & F.col("card_name").isNotNull())
.collect()
)
card_names = {row["card_id"]: row["card_name"] for row in card_name_rows}
def display_subdeck(deck):
return ", ".join(card_names.get(card_id, card_id) for card_id in deck.split("|"))
def show_ranked_subdecks(title, rows):
print(title)
for rank, row in enumerate(rows, start=1):
print(f"{rank}. Elo {row['elo']:.2f}, games {row['games_seen']}: {display_subdeck(row['deck'])}")
print()
print(f"Unique ordered first-three-slot sub-decks seen: {unique_deck_count}")
print(f"Sub-decks with at least {ROYALE_TASK2_MIN_GAMES} games: {ranked_deck_count}")
top_5_decks = ranked_rating_rows.orderBy(F.desc("elo")).limit(5).collect()
bottom_5_decks = ranked_rating_rows.orderBy(F.asc("elo")).limit(5).collect()
show_ranked_subdecks(f"Top 5 ordered first-three-slot sub-decks with at least {ROYALE_TASK2_MIN_GAMES} games", top_5_decks)
show_ranked_subdecks(f"Bottom 5 ordered first-three-slot sub-decks with at least {ROYALE_TASK2_MIN_GAMES} games", bottom_5_decks)
royale_task2_runtime = time.perf_counter() - royale_task2_runtime_start
print(f"Total Clash Royale Task 2 Spark runtime: {royale_task2_runtime:.2f} seconds")
sc.setLogLevel(previous_log_level)
Found 10 battle CSV files Deleted old output: hdfs:///sparky/royale_task2_matchup_partials Deleted old output: hdfs:///sparky/royale_task2_matchups Deleted old output: hdfs:///sparky/royale_task2_ratings Processing Clash Royale Task 2 batch 0: 1 file(s)
Wrote partial matchup counts for batch 0 Processing Clash Royale Task 2 batch 1: 1 file(s)
Wrote partial matchup counts for batch 1 Processing Clash Royale Task 2 batch 2: 1 file(s)
Wrote partial matchup counts for batch 2 Processing Clash Royale Task 2 batch 3: 1 file(s)
Wrote partial matchup counts for batch 3 Processing Clash Royale Task 2 batch 4: 1 file(s)
Wrote partial matchup counts for batch 4 Processing Clash Royale Task 2 batch 5: 1 file(s)
Wrote partial matchup counts for batch 5 Processing Clash Royale Task 2 batch 6: 1 file(s)
Wrote partial matchup counts for batch 6 Processing Clash Royale Task 2 batch 7: 1 file(s)
Wrote partial matchup counts for batch 7 Processing Clash Royale Task 2 batch 8: 1 file(s)
Wrote partial matchup counts for batch 8 Processing Clash Royale Task 2 batch 9: 1 file(s)
Wrote partial matchup counts for batch 9
Combined matchup rows: 36691387
Finished Elo iteration 1/5
Finished Elo iteration 2/5
Finished Elo iteration 3/5
Finished Elo iteration 4/5
Finished Elo iteration 5/5
Unique ordered first-three-slot sub-decks seen: 805306 Sub-decks with at least 100 games: 140643 Top 5 ordered first-three-slot sub-decks with at least 100 games 1. Elo 847.63, games 202: Inferno Tower, Zap, Goblins 2. Elo 839.37, games 117: Royal Recruits, Balloon, Cannon Cart 3. Elo 837.57, games 143: Valkyrie, Hunter, Spear Goblins 4. Elo 834.83, games 111: Battle Ram, Witch, Hunter 5. Elo 834.09, games 113: Dark Prince, Hunter, Magic Archer Bottom 5 ordered first-three-slot sub-decks with at least 100 games 1. Elo 728.79, games 681: Earthquake, Giant Snowball, Rage 2. Elo 728.85, games 1008: Poison, Zap, Earthquake 3. Elo 731.28, games 103: Zap, Giant Snowball, Earthquake 4. Elo 731.31, games 103: The Log, Zap, Giant Snowball 5. Elo 731.70, games 431: Furnace, Tombstone, Inferno Tower Total Clash Royale Task 2 Spark runtime: 349.05 seconds
Interpretation¶
Inferno tower, Zap, and Goblins form the best power trio. Interestingly, it is the structure and spell only trios that perform the worst.
Based on this result and the one before, I think a player might want to pull back on how many spells and structures are added to the deck.
Task 3: Level the playing field.¶
from pyspark import StorageLevel
from pyspark.sql import functions as F
import time
# A level-agnostic card keeps a strong win rate across multiple observed card levels.
ROYALE_BATTLES_GLOB = "hdfs:///royale/*/*.csv"
ROYALE_CARD_MASTER_PATH = "hdfs:///royale/CardMasterListSeason18_12082020.csv"
ROYALE_TASK3_PARTIAL_OUT = "hdfs:///sparky/royale_task3_card_level_partials"
ROYALE_TASK3_CARD_LEVEL_OUT = "hdfs:///sparky/royale_task3_card_level_summary"
ROYALE_TASK3_BATCH_SIZE = 1
ROYALE_TASK3_MIN_LEVEL_GAMES = 1000
ROYALE_TASK3_MIN_TOTAL_GAMES = 10000
ROYALE_TASK3_MIN_LEVELS = 3
ROYALE_TASK3_WIN_RATE_FLOOR = 0.50
spark.conf.set("spark.sql.maxPlanStringLength", "4096")
previous_log_level = sc.getLogLevel() if hasattr(sc, "getLogLevel") else "WARN"
sc.setLogLevel("ERROR")
royale_task3_runtime_start = time.perf_counter()
winner_task3_cols = [(f"winner.card{i}.id", f"winner.card{i}.level") for i in range(1, 9)]
loser_task3_cols = [(f"loser.card{i}.id", f"loser.card{i}.level") for i in range(1, 9)]
def clear_hdfs_path_task3(path):
hadoop_path = spark._jvm.org.apache.hadoop.fs.Path(path)
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
if fs.exists(hadoop_path):
fs.delete(hadoop_path, True)
print(f"Deleted old output: {path}")
def hdfs_glob_paths_task3(glob_path):
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
statuses = fs.globStatus(spark._jvm.org.apache.hadoop.fs.Path(glob_path))
if statuses is None:
return []
return sorted(str(status.getPath()) for status in statuses)
def card_level_struct(card_col, level_col, wins):
return F.struct(
F.trim(F.col(f"`{card_col}`").cast("string")).alias("card_id"),
F.col(f"`{level_col}`").cast("int").alias("card_level"),
F.lit(wins).cast("long").alias("wins"),
F.lit(1).cast("long").alias("games"),
)
def build_task3_card_level_counts(df):
winner_observations = [card_level_struct(card_col, level_col, 1) for card_col, level_col in winner_task3_cols]
loser_observations = [card_level_struct(card_col, level_col, 0) for card_col, level_col in loser_task3_cols]
return (
df
.select(F.explode(F.array(*(winner_observations + loser_observations))).alias("obs"))
.select("obs.card_id", "obs.card_level", "obs.wins", "obs.games")
.where(
F.col("card_id").isNotNull()
& (F.col("card_id") != "")
& F.col("card_level").isNotNull()
)
.groupBy("card_id", "card_level")
.agg(
F.sum("wins").alias("wins"),
F.sum("games").alias("games"),
)
)
battle_paths = hdfs_glob_paths_task3(ROYALE_BATTLES_GLOB)
if len(battle_paths) == 0:
raise ValueError(f"No battle CSV files matched: {ROYALE_BATTLES_GLOB}")
print(f"Found {len(battle_paths)} battle CSV files")
clear_hdfs_path_task3(ROYALE_TASK3_PARTIAL_OUT)
clear_hdfs_path_task3(ROYALE_TASK3_CARD_LEVEL_OUT)
expected_task3_cols = [col for pair in winner_task3_cols + loser_task3_cols for col in pair]
for batch_id, start in enumerate(range(0, len(battle_paths), ROYALE_TASK3_BATCH_SIZE)):
batch_paths = battle_paths[start:start + ROYALE_TASK3_BATCH_SIZE]
print(f"Processing Clash Royale Task 3 batch {batch_id}: {len(batch_paths)} file(s)")
batch_df = (
spark.read
.option("header", "true")
.option("multiLine", "false")
.csv(batch_paths)
)
missing_cols = [col for col in expected_task3_cols if col not in batch_df.columns]
if missing_cols:
raise ValueError(f"Missing expected battle columns in batch {batch_id}: {missing_cols}")
partial_card_levels = build_task3_card_level_counts(batch_df)
(
partial_card_levels
.write
.mode("overwrite")
.parquet(f"{ROYALE_TASK3_PARTIAL_OUT}/batch_{batch_id:04d}")
)
print(f"Wrote partial card-level counts for batch {batch_id}")
combined_card_levels = (
spark.read
.parquet(f"{ROYALE_TASK3_PARTIAL_OUT}/batch_*")
.groupBy("card_id", "card_level")
.agg(
F.sum("wins").alias("wins"),
F.sum("games").alias("games"),
)
.withColumn("level_win_rate", F.col("wins") / F.col("games"))
)
(
combined_card_levels
.write
.mode("overwrite")
.parquet(ROYALE_TASK3_CARD_LEVEL_OUT)
)
card_level_summary = spark.read.parquet(ROYALE_TASK3_CARD_LEVEL_OUT).persist(StorageLevel.DISK_ONLY)
card_level_row_count = card_level_summary.count()
print(f"Combined card-level aggregate rows: {card_level_row_count}")
eligible_card_levels = card_level_summary.where(F.col("games") >= ROYALE_TASK3_MIN_LEVEL_GAMES)
card_scores = (
eligible_card_levels
.groupBy("card_id")
.agg(
F.sum("wins").alias("total_wins"),
F.sum("games").alias("total_games"),
F.count("*").alias("eligible_levels"),
F.min("level_win_rate").alias("min_level_win_rate"),
F.max("level_win_rate").alias("max_level_win_rate"),
F.coalesce(F.stddev_pop("level_win_rate"), F.lit(0.0)).alias("win_rate_stddev"),
)
.withColumn("overall_win_rate", F.col("total_wins") / F.col("total_games"))
.where(
(F.col("total_games") >= ROYALE_TASK3_MIN_TOTAL_GAMES)
& (F.col("eligible_levels") >= ROYALE_TASK3_MIN_LEVELS)
& (F.col("overall_win_rate") >= ROYALE_TASK3_WIN_RATE_FLOOR)
)
.withColumn("level_agnostic_score", F.col("overall_win_rate") - (F.lit(2.0) * F.col("win_rate_stddev")))
.persist(StorageLevel.DISK_ONLY)
)
eligible_card_count = card_scores.count()
print(f"Cards passing Task 3 filters: {eligible_card_count}")
card_master_df = (
spark.read
.option("header", "true")
.csv(ROYALE_CARD_MASTER_PATH)
)
card_id_col = "team.card1.id" if "team.card1.id" in card_master_df.columns else card_master_df.columns[0]
card_name_col = "team.card1.name" if "team.card1.name" in card_master_df.columns else card_master_df.columns[1]
card_name_rows = (
card_master_df
.select(F.col(f"`{card_id_col}`").cast("string").alias("card_id"), F.col(f"`{card_name_col}`").alias("card_name"))
.where(F.col("card_id").isNotNull() & F.col("card_name").isNotNull())
.collect()
)
card_names = {row["card_id"]: row["card_name"] for row in card_name_rows}
def display_card(card_id):
return card_names.get(card_id, card_id)
top_3_cards = (
card_scores
.orderBy(F.desc("level_agnostic_score"), F.desc("overall_win_rate"), F.desc("total_games"))
.limit(3)
.collect()
)
print("Top 3 level-agnostic winning cards")
for rank, row in enumerate(top_3_cards, start=1):
print(
f"{rank}. {display_card(row['card_id'])}: "
f"score {row['level_agnostic_score']:.4f}, "
f"win rate {row['overall_win_rate']:.2%}, "
f"games {row['total_games']}, "
f"eligible levels {row['eligible_levels']}, "
f"level win-rate range {row['min_level_win_rate']:.2%}-{row['max_level_win_rate']:.2%}, "
f"stddev {row['win_rate_stddev']:.4f}"
)
card_level_summary.unpersist(blocking=False)
card_scores.unpersist(blocking=False)
royale_task3_runtime = time.perf_counter() - royale_task3_runtime_start
print(f"Total Clash Royale Task 3 Spark runtime: {royale_task3_runtime:.2f} seconds")
sc.setLogLevel(previous_log_level)
Found 10 battle CSV files Deleted old output: hdfs:///sparky/royale_task3_card_level_partials Deleted old output: hdfs:///sparky/royale_task3_card_level_summary Processing Clash Royale Task 3 batch 0: 1 file(s)
Wrote partial card-level counts for batch 0 Processing Clash Royale Task 3 batch 1: 1 file(s)
Wrote partial card-level counts for batch 1 Processing Clash Royale Task 3 batch 2: 1 file(s)
Wrote partial card-level counts for batch 2 Processing Clash Royale Task 3 batch 3: 1 file(s)
Wrote partial card-level counts for batch 3 Processing Clash Royale Task 3 batch 4: 1 file(s)
Wrote partial card-level counts for batch 4 Processing Clash Royale Task 3 batch 5: 1 file(s)
Wrote partial card-level counts for batch 5 Processing Clash Royale Task 3 batch 6: 1 file(s)
Wrote partial card-level counts for batch 6 Processing Clash Royale Task 3 batch 7: 1 file(s)
Wrote partial card-level counts for batch 7 Processing Clash Royale Task 3 batch 8: 1 file(s)
Wrote partial card-level counts for batch 8 Processing Clash Royale Task 3 batch 9: 1 file(s)
Wrote partial card-level counts for batch 9 Combined card-level aggregate rows: 986
Cards passing Task 3 filters: 49 Top 3 level-agnostic winning cards 1. The Log: score 0.4831, win rate 50.37%, games 20915126, eligible levels 5, level win-rate range 48.39%-51.36%, stddev 0.0103 2. Night Witch: score 0.4788, win rate 51.31%, games 4568406, eligible levels 5, level win-rate range 47.83%-52.86%, stddev 0.0171 3. Mega Knight: score 0.4760, win rate 50.14%, games 15362533, eligible levels 5, level win-rate range 48.39%-51.94%, stddev 0.0127 Total Clash Royale Task 3 Spark runtime: 83.62 seconds
Interpretation¶
- The Log
- Night Witch
- Mega Knight
The best 3 cards all have things in common:
- They are all legendary in rarity. The rarest class of cards.
- They are all area damage cards. All these cards can do area damage. Either in the form of targeting multiple enemies or spawning minions.
The interesting thing here is that maybe cost doesn't matter too much, since The Log costs 2 elxir (in-game currency), Night Witch costs 4, and Mega Knight costs 7.
Task 4: Card rarity vs wins¶
from pyspark import StorageLevel
from pyspark.sql import functions as F
import time
ROYALE_BATTLES_GLOB = "hdfs:///royale/*/*.csv"
ROYALE_TASK4_PARTIAL_OUT = "hdfs:///sparky/royale_task4_rarity_partials"
ROYALE_TASK4_SUMMARY_OUT = "hdfs:///sparky/royale_task4_rarity_summary"
ROYALE_TASK4_BATCH_SIZE = 1
spark.conf.set("spark.sql.maxPlanStringLength", "4096")
previous_log_level = sc.getLogLevel() if hasattr(sc, "getLogLevel") else "WARN"
sc.setLogLevel("ERROR")
royale_task4_runtime_start = time.perf_counter()
rarity_names = ["common", "rare", "epic", "legendary"]
winner_rarity_cols = [f"winner.{rarity}.count" for rarity in rarity_names]
loser_rarity_cols = [f"loser.{rarity}.count" for rarity in rarity_names]
def clear_hdfs_path_task4(path):
hadoop_path = spark._jvm.org.apache.hadoop.fs.Path(path)
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
if fs.exists(hadoop_path):
fs.delete(hadoop_path, True)
print(f"Deleted old output: {path}")
def hdfs_glob_paths_task4(glob_path):
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
statuses = fs.globStatus(spark._jvm.org.apache.hadoop.fs.Path(glob_path))
if statuses is None:
return []
return sorted(str(status.getPath()) for status in statuses)
def rarity_observation_struct(column_name, rarity, won):
return F.struct(
F.lit(rarity).alias("rarity"),
F.col(f"`{column_name}`").cast("int").alias("rarity_count"),
F.lit(won).cast("long").alias("wins"),
F.lit(1).cast("long").alias("games"),
)
def build_task4_rarity_counts(df):
winner_observations = [
rarity_observation_struct(f"winner.{rarity}.count", rarity, 1)
for rarity in rarity_names
]
loser_observations = [
rarity_observation_struct(f"loser.{rarity}.count", rarity, 0)
for rarity in rarity_names
]
return (
df
.select(F.explode(F.array(*(winner_observations + loser_observations))).alias("obs"))
.select("obs.rarity", "obs.rarity_count", "obs.wins", "obs.games")
.where(F.col("rarity_count").isNotNull())
.groupBy("rarity", "rarity_count")
.agg(
F.sum("wins").alias("wins"),
F.sum("games").alias("games"),
)
)
battle_paths = hdfs_glob_paths_task4(ROYALE_BATTLES_GLOB)
if len(battle_paths) == 0:
raise ValueError(f"No battle CSV files matched: {ROYALE_BATTLES_GLOB}")
print(f"Found {len(battle_paths)} battle CSV files")
clear_hdfs_path_task4(ROYALE_TASK4_PARTIAL_OUT)
clear_hdfs_path_task4(ROYALE_TASK4_SUMMARY_OUT)
expected_task4_cols = winner_rarity_cols + loser_rarity_cols
for batch_id, start in enumerate(range(0, len(battle_paths), ROYALE_TASK4_BATCH_SIZE)):
batch_paths = battle_paths[start:start + ROYALE_TASK4_BATCH_SIZE]
print(f"Processing Clash Royale Task 4 batch {batch_id}: {len(batch_paths)} file(s)")
batch_df = (
spark.read
.option("header", "true")
.option("multiLine", "false")
.csv(batch_paths)
)
missing_cols = [col for col in expected_task4_cols if col not in batch_df.columns]
if missing_cols:
raise ValueError(f"Missing expected battle columns in batch {batch_id}: {missing_cols}")
partial_rarity_counts = build_task4_rarity_counts(batch_df)
(
partial_rarity_counts
.write
.mode("overwrite")
.parquet(f"{ROYALE_TASK4_PARTIAL_OUT}/batch_{batch_id:04d}")
)
print(f"Wrote partial rarity counts for batch {batch_id}")
combined_rarity_counts = (
spark.read
.parquet(f"{ROYALE_TASK4_PARTIAL_OUT}/batch_*")
.groupBy("rarity", "rarity_count")
.agg(
F.sum("wins").alias("wins"),
F.sum("games").alias("games"),
)
.withColumn("win_rate", F.col("wins") / F.col("games"))
)
(
combined_rarity_counts
.write
.mode("overwrite")
.parquet(f"{ROYALE_TASK4_SUMMARY_OUT}/win_rates_by_count")
)
rarity_counts = spark.read.parquet(f"{ROYALE_TASK4_SUMMARY_OUT}/win_rates_by_count").persist(StorageLevel.DISK_ONLY)
rarity_count_rows = rarity_counts.count()
print(f"Combined rarity/count aggregate rows: {rarity_count_rows}")
correlation_summary = (
rarity_counts
.withColumn("x", F.col("rarity_count").cast("double"))
.withColumn("n", F.col("games").cast("double"))
.withColumn("sum_x", F.col("x") * F.col("n"))
.withColumn("sum_y", F.col("wins").cast("double"))
.withColumn("sum_xy", F.col("x") * F.col("wins").cast("double"))
.withColumn("sum_x2", F.col("x") * F.col("x") * F.col("n"))
.withColumn("sum_y2", F.col("wins").cast("double"))
.groupBy("rarity")
.agg(
F.sum("n").alias("observations"),
F.sum("sum_x").alias("sum_x"),
F.sum("sum_y").alias("wins"),
F.sum("sum_xy").alias("sum_xy"),
F.sum("sum_x2").alias("sum_x2"),
F.sum("sum_y2").alias("sum_y2"),
)
.withColumn("avg_rarity_count", F.col("sum_x") / F.col("observations"))
.withColumn("win_rate", F.col("wins") / F.col("observations"))
.withColumn(
"pearson_correlation",
(
(F.col("observations") * F.col("sum_xy")) - (F.col("sum_x") * F.col("wins"))
) / F.sqrt(
((F.col("observations") * F.col("sum_x2")) - (F.col("sum_x") * F.col("sum_x")))
* ((F.col("observations") * F.col("sum_y2")) - (F.col("wins") * F.col("wins")))
)
)
.select(
"rarity",
F.col("observations").cast("long").alias("observations"),
F.round("avg_rarity_count", 4).alias("avg_rarity_count"),
F.round("win_rate", 6).alias("win_rate"),
F.round("pearson_correlation", 6).alias("pearson_correlation"),
F.round(F.abs("pearson_correlation"), 6).alias("abs_correlation"),
)
)
(
correlation_summary
.write
.mode("overwrite")
.parquet(f"{ROYALE_TASK4_SUMMARY_OUT}/correlations")
)
correlation_rows = (
spark.read
.parquet(f"{ROYALE_TASK4_SUMMARY_OUT}/correlations")
.orderBy(F.desc("abs_correlation"), "rarity")
.collect()
)
print("Rarity count vs win correlation, strongest absolute correlation first")
for row in correlation_rows:
print(
f"{row['rarity']}: correlation {row['pearson_correlation']:.6f}, "
f"avg count {row['avg_rarity_count']:.4f}, "
f"win rate {row['win_rate']:.2%}, "
f"observations {row['observations']}"
)
print()
for rarity in rarity_names:
rows = (
rarity_counts
.where(F.col("rarity") == rarity)
.orderBy("rarity_count")
.collect()
)
print(f"{rarity.title()} count win rates")
for row in rows:
print(
f" {row['rarity_count']}: "
f"win rate {row['win_rate']:.2%}, "
f"wins {row['wins']}, "
f"games {row['games']}"
)
print()
rarity_counts.unpersist(blocking=False)
royale_task4_runtime = time.perf_counter() - royale_task4_runtime_start
print(f"Total Clash Royale Task 4 Spark runtime: {royale_task4_runtime:.2f} seconds")
sc.setLogLevel(previous_log_level)
Found 10 battle CSV files Deleted old output: hdfs:///sparky/royale_task4_rarity_partials Deleted old output: hdfs:///sparky/royale_task4_rarity_summary Processing Clash Royale Task 4 batch 0: 1 file(s)
Wrote partial rarity counts for batch 0 Processing Clash Royale Task 4 batch 1: 1 file(s)
Wrote partial rarity counts for batch 1 Processing Clash Royale Task 4 batch 2: 1 file(s)
Wrote partial rarity counts for batch 2 Processing Clash Royale Task 4 batch 3: 1 file(s)
Wrote partial rarity counts for batch 3 Processing Clash Royale Task 4 batch 4: 1 file(s)
Wrote partial rarity counts for batch 4 Processing Clash Royale Task 4 batch 5: 1 file(s)
Wrote partial rarity counts for batch 5 Processing Clash Royale Task 4 batch 6: 1 file(s)
Wrote partial rarity counts for batch 6 Processing Clash Royale Task 4 batch 7: 1 file(s)
Wrote partial rarity counts for batch 7 Processing Clash Royale Task 4 batch 8: 1 file(s)
Wrote partial rarity counts for batch 8 Processing Clash Royale Task 4 batch 9: 1 file(s)
Wrote partial rarity counts for batch 9
Combined rarity/count aggregate rows: 36
Rarity count vs win correlation, strongest absolute correlation first rare: correlation -0.008498, avg count 2.0598, win rate 50.00%, observations 75946774 epic: correlation 0.008474, avg count 2.2634, win rate 50.00%, observations 75946774 common: correlation -0.004201, avg count 2.0377, win rate 50.00%, observations 75946774 legendary: correlation 0.003590, avg count 1.6391, win rate 50.00%, observations 75946774 Common count win rates 0: win rate 49.69%, wins 4364117, games 8783445 1: win rate 50.20%, wins 9838091, games 19595919 2: win rate 50.46%, wins 10546771, games 20901584 3: win rate 49.95%, wins 8299707, games 16615318 4: win rate 48.63%, wins 3557320, games 7315818 5: win rate 50.22%, wins 1131604, games 2253514 6: win rate 50.02%, wins 202056, games 403934 7: win rate 49.39%, wins 29647, games 60026 8: win rate 23.66%, wins 4074, games 17216 Rare count win rates 0: win rate 50.14%, wins 3770799, games 7520151 1: win rate 50.38%, wins 10058467, games 19966130 2: win rate 50.41%, wins 11308236, games 22430635 3: win rate 49.38%, wins 7670131, games 15533119 4: win rate 49.59%, wins 3920614, games 7906260 5: win rate 47.82%, wins 1049326, games 2194467 6: win rate 49.48%, wins 179982, games 363765 7: win rate 49.52%, wins 14968, games 30228 8: win rate 42.79%, wins 864, games 2019 Epic count win rates 0: win rate 48.37%, wins 2973996, games 6148803 1: win rate 49.59%, wins 9077034, games 18303394 2: win rate 50.34%, wins 10873635, games 21600199 3: win rate 50.37%, wins 8064627, games 16011484 4: win rate 50.32%, wins 4381865, games 8708699 5: win rate 50.08%, wins 1865407, games 3724848 6: win rate 50.89%, wins 642689, games 1262868 7: win rate 50.53%, wins 83501, games 165246 8: win rate 50.08%, wins 10633, games 21233 Legendary count win rates 0: win rate 48.59%, wins 6635081, games 13655748 1: win rate 50.46%, wins 12560681, games 24894109 2: win rate 50.48%, wins 10861760, games 21516648 3: win rate 50.07%, wins 4891953, games 9770413 4: win rate 49.87%, wins 2081498, games 4174210 5: win rate 48.95%, wins 661477, games 1351454 6: win rate 48.53%, wins 185665, games 382557 7: win rate 47.86%, wins 57401, games 119925 8: win rate 46.35%, wins 37871, games 81710 Total Clash Royale Task 4 Spark runtime: 64.47 seconds
Interpretation¶
There is no strong evidence that having more cards of a rarity leads to more wins.
Having a lot of legendaries doesn't seem like a good idea. Though from the data, I think having more epics than anything else might be a good idea. So having some epics and rares with 1-2 commons and legendaries might be a good starting point for a deck.
Task 5: Card Category Count vs Wins¶
In this task, I want to see if having a particular category of cards is correlated with wins.
There are different categories:
- Troops
- Structures
- Spells
from pyspark import StorageLevel
from pyspark.sql import functions as F
import time
ROYALE_BATTLES_GLOB = "hdfs:///royale/*/*.csv"
ROYALE_TASK5_PARTIAL_OUT = "hdfs:///sparky/royale_task5_category_partials"
ROYALE_TASK5_SUMMARY_OUT = "hdfs:///sparky/royale_task5_category_summary"
ROYALE_TASK5_BATCH_SIZE = 1
spark.conf.set("spark.sql.maxPlanStringLength", "4096")
previous_log_level = sc.getLogLevel() if hasattr(sc, "getLogLevel") else "WARN"
sc.setLogLevel("ERROR")
royale_task5_runtime_start = time.perf_counter()
category_names = ["troop", "structure", "spell"]
winner_category_cols = [f"winner.{category}.count" for category in category_names]
loser_category_cols = [f"loser.{category}.count" for category in category_names]
def clear_hdfs_path_task5(path):
hadoop_path = spark._jvm.org.apache.hadoop.fs.Path(path)
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
if fs.exists(hadoop_path):
fs.delete(hadoop_path, True)
print(f"Deleted old output: {path}")
def hdfs_glob_paths_task5(glob_path):
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
statuses = fs.globStatus(spark._jvm.org.apache.hadoop.fs.Path(glob_path))
if statuses is None:
return []
return sorted(str(status.getPath()) for status in statuses)
def category_observation_struct(column_name, category, won):
return F.struct(
F.lit(category).alias("category"),
F.col(f"`{column_name}`").cast("int").alias("category_count"),
F.lit(won).cast("long").alias("wins"),
F.lit(1).cast("long").alias("games"),
)
def build_task5_category_counts(df):
winner_observations = [
category_observation_struct(f"winner.{category}.count", category, 1)
for category in category_names
]
loser_observations = [
category_observation_struct(f"loser.{category}.count", category, 0)
for category in category_names
]
return (
df
.select(F.explode(F.array(*(winner_observations + loser_observations))).alias("obs"))
.select("obs.category", "obs.category_count", "obs.wins", "obs.games")
.where(F.col("category_count").isNotNull())
.groupBy("category", "category_count")
.agg(
F.sum("wins").alias("wins"),
F.sum("games").alias("games"),
)
)
battle_paths = hdfs_glob_paths_task5(ROYALE_BATTLES_GLOB)
if len(battle_paths) == 0:
raise ValueError(f"No battle CSV files matched: {ROYALE_BATTLES_GLOB}")
print(f"Found {len(battle_paths)} battle CSV files")
clear_hdfs_path_task5(ROYALE_TASK5_PARTIAL_OUT)
clear_hdfs_path_task5(ROYALE_TASK5_SUMMARY_OUT)
expected_task5_cols = winner_category_cols + loser_category_cols
for batch_id, start in enumerate(range(0, len(battle_paths), ROYALE_TASK5_BATCH_SIZE)):
batch_paths = battle_paths[start:start + ROYALE_TASK5_BATCH_SIZE]
print(f"Processing Clash Royale Task 5 batch {batch_id}: {len(batch_paths)} file(s)")
batch_df = (
spark.read
.option("header", "true")
.option("multiLine", "false")
.csv(batch_paths)
)
missing_cols = [col for col in expected_task5_cols if col not in batch_df.columns]
if missing_cols:
raise ValueError(f"Missing expected battle columns in batch {batch_id}: {missing_cols}")
partial_category_counts = build_task5_category_counts(batch_df)
(
partial_category_counts
.write
.mode("overwrite")
.parquet(f"{ROYALE_TASK5_PARTIAL_OUT}/batch_{batch_id:04d}")
)
print(f"Wrote partial category counts for batch {batch_id}")
combined_category_counts = (
spark.read
.parquet(f"{ROYALE_TASK5_PARTIAL_OUT}/batch_*")
.groupBy("category", "category_count")
.agg(
F.sum("wins").alias("wins"),
F.sum("games").alias("games"),
)
.withColumn("win_rate", F.col("wins") / F.col("games"))
)
(
combined_category_counts
.write
.mode("overwrite")
.parquet(f"{ROYALE_TASK5_SUMMARY_OUT}/win_rates_by_count")
)
category_counts = spark.read.parquet(f"{ROYALE_TASK5_SUMMARY_OUT}/win_rates_by_count").persist(StorageLevel.DISK_ONLY)
category_count_rows = category_counts.count()
print(f"Combined category/count aggregate rows: {category_count_rows}")
correlation_summary = (
category_counts
.withColumn("x", F.col("category_count").cast("double"))
.withColumn("n", F.col("games").cast("double"))
.withColumn("sum_x", F.col("x") * F.col("n"))
.withColumn("sum_y", F.col("wins").cast("double"))
.withColumn("sum_xy", F.col("x") * F.col("wins").cast("double"))
.withColumn("sum_x2", F.col("x") * F.col("x") * F.col("n"))
.withColumn("sum_y2", F.col("wins").cast("double"))
.groupBy("category")
.agg(
F.sum("n").alias("observations"),
F.sum("sum_x").alias("sum_x"),
F.sum("sum_y").alias("wins"),
F.sum("sum_xy").alias("sum_xy"),
F.sum("sum_x2").alias("sum_x2"),
F.sum("sum_y2").alias("sum_y2"),
)
.withColumn("avg_category_count", F.col("sum_x") / F.col("observations"))
.withColumn("win_rate", F.col("wins") / F.col("observations"))
.withColumn(
"pearson_correlation",
(
(F.col("observations") * F.col("sum_xy")) - (F.col("sum_x") * F.col("wins"))
) / F.sqrt(
((F.col("observations") * F.col("sum_x2")) - (F.col("sum_x") * F.col("sum_x")))
* ((F.col("observations") * F.col("sum_y2")) - (F.col("wins") * F.col("wins")))
)
)
.select(
"category",
F.col("observations").cast("long").alias("observations"),
F.round("avg_category_count", 4).alias("avg_category_count"),
F.round("win_rate", 6).alias("win_rate"),
F.round("pearson_correlation", 6).alias("pearson_correlation"),
F.round(F.abs("pearson_correlation"), 6).alias("abs_correlation"),
)
)
(
correlation_summary
.write
.mode("overwrite")
.parquet(f"{ROYALE_TASK5_SUMMARY_OUT}/correlations")
)
correlation_rows = (
spark.read
.parquet(f"{ROYALE_TASK5_SUMMARY_OUT}/correlations")
.orderBy(F.desc("abs_correlation"), "category")
.collect()
)
print("Category count vs win correlation, strongest absolute correlation first")
for row in correlation_rows:
print(
f"{row['category']}: correlation {row['pearson_correlation']:.6f}, "
f"avg count {row['avg_category_count']:.4f}, "
f"win rate {row['win_rate']:.2%}, "
f"observations {row['observations']}"
)
print()
for category in category_names:
rows = (
category_counts
.where(F.col("category") == category)
.orderBy("category_count")
.collect()
)
print(f"{category.title()} count win rates")
for row in rows:
print(
f" {row['category_count']}: "
f"win rate {row['win_rate']:.2%}, "
f"wins {row['wins']}, "
f"games {row['games']}"
)
print()
category_counts.unpersist(blocking=False)
royale_task5_runtime = time.perf_counter() - royale_task5_runtime_start
print(f"Total Clash Royale Task 5 Spark runtime: {royale_task5_runtime:.2f} seconds")
sc.setLogLevel(previous_log_level)
Found 10 battle CSV files Deleted old output: hdfs:///sparky/royale_task5_category_partials Deleted old output: hdfs:///sparky/royale_task5_category_summary Processing Clash Royale Task 5 batch 0: 1 file(s)
Wrote partial category counts for batch 0 Processing Clash Royale Task 5 batch 1: 1 file(s)
Wrote partial category counts for batch 1 Processing Clash Royale Task 5 batch 2: 1 file(s)
Wrote partial category counts for batch 2 Processing Clash Royale Task 5 batch 3: 1 file(s)
Wrote partial category counts for batch 3 Processing Clash Royale Task 5 batch 4: 1 file(s)
Wrote partial category counts for batch 4 Processing Clash Royale Task 5 batch 5: 1 file(s)
Wrote partial category counts for batch 5 Processing Clash Royale Task 5 batch 6: 1 file(s)
Wrote partial category counts for batch 6 Processing Clash Royale Task 5 batch 7: 1 file(s)
Wrote partial category counts for batch 7 Processing Clash Royale Task 5 batch 8: 1 file(s)
Wrote partial category counts for batch 8 Processing Clash Royale Task 5 batch 9: 1 file(s)
Wrote partial category counts for batch 9
Combined category/count aggregate rows: 27 Category count vs win correlation, strongest absolute correlation first structure: correlation -0.010709, avg count 0.4233, win rate 50.00%, observations 75946774 spell: correlation 0.006955, avg count 1.8873, win rate 50.00%, observations 75946774 troop: correlation 0.000136, avg count 5.6894, win rate 50.00%, observations 75946774 Troop count win rates 0: win rate 6.99%, wins 2372, games 33939 1: win rate 23.39%, wins 4848, games 20725 2: win rate 43.49%, wins 41412, games 95212 3: win rate 49.69%, wins 811090, games 1632248 4: win rate 50.23%, wins 4311608, games 8584163 5: win rate 50.21%, wins 10677092, games 21263647 6: win rate 50.01%, wins 13480285, games 26954968 7: win rate 49.95%, wins 7141065, games 14297157 8: win rate 49.06%, wins 1503615, games 3064715 Structure count win rates 0: win rate 50.14%, wins 23703609, games 47274637 1: win rate 50.22%, wins 12842223, games 25571620 2: win rate 46.21%, wins 1315216, games 2846166 3: win rate 48.26%, wins 96567, games 200115 4: win rate 37.66%, wins 12020, games 31918 5: win rate 37.32%, wins 2121, games 5683 6: win rate 31.93%, wins 718, games 2249 7: win rate 25.56%, wins 340, games 1330 8: win rate 4.39%, wins 573, games 13056 Spell count win rates 0: win rate 46.91%, wins 2015971, games 4297746 1: win rate 49.87%, wins 9757285, games 19565577 2: win rate 50.33%, wins 17476103, games 34723280 3: win rate 50.56%, wins 7741046, games 15311555 4: win rate 49.07%, wins 947716, games 1931356 5: win rate 40.26%, wins 32517, games 80771 6: win rate 16.70%, wins 1835, games 10989 7: win rate 5.43%, wins 536, games 9866 8: win rate 2.42%, wins 378, games 15634 Total Clash Royale Task 5 Spark runtime: 57.97 seconds
Interpretation¶
According to Pearson Correlation, there is no significant effect of different categories' participation in a deck.
Structure count had the strongest relationship, but even that was negative and minimal.
Based on the count vs win rates data:
- Decks with a lot of spells or structures perform very poorly. However, looking at the number of games played, it might be that they were challenge games played by youtubers. (I PLAYED ALL SPELLS AND WON _)
- Reflecting the previous point, decks with no troops perform extremely poorly.
Conclusion¶
The best 2 decks we saw were:
- Elo 859.35, games 115: P.E.K.K.A, Valkyrie, Mega Minion, Goblin Hut, Furnace, Fireball, Arrows, Zap
- Elo 859.16, games 721: Goblins, Valkyrie, Musketeer, Hog Rider, Inferno Tower, Fireball, Arrows, Zap
To make my point clearer, I name: deck1 -> Larry deck2 -> Barry
Now, here are the stats for Larry:
| #Common | #Rare | #Epic | #Legendary | #Structures | #Troops | #Spells |
|---|---|---|---|---|---|---|
| 2 | 5 | 1 | 0 | 2 | 3 | 3 |
Now Barry:
| #Common | #Rare | #Epic | #Legendary | #Structures | #Troops | #Spells |
|---|---|---|---|---|---|---|
| 3 | 5 | 0 | 0 | 1 | 4 | 3 |
Interestingly, Barry doesn't have any epic cards which only has an overall winrate of $48.37\%$ It doesn't seem too bad, but there are other combinations for epic cards that touch winrates of $50.89\%$ which is about $2.5\%$ better.
What I would do differently¶
If the goal wasn't to explore spark's distributed analysis capabilities, I would have employed a traditional Elo system instead of an aggregated one. The problem with traditional method is that it has to be chronological, which isn't very spark friendly.