Sparky¶

In [1]:
import importlib
import time
import util
import SparkHandler as sh

importlib.reload(util)
importlib.reload(sh)
Out[1]:
<module 'SparkHandler' from '/bigdata/students/rsbhatia2/p3/SparkHandler.py'>
In [2]:
sparky = sh.Sparky(spark, sc)

Strangely Snowy¶

Find a location that contains snow while its surroundings do not. Why does this occur? Is it a high mountain peak in a desert?

Setup¶

  • I am using only months that are considered "winter months" in North America. They are November, December, January, and February
  • Partial results are stored back in the hdfs.
In [3]:
from pyspark.sql import functions as F
import time
# ------------------------------------------------------------
# Parser
# ------------------------------------------------------------
from pyspark.sql.types import StructType, StructField, LongType, DoubleType
def parse_line(line):
    try:
        if line.startswith("1_time"):
            return None

        parts = line.rstrip("\t").split("\t")

        lat = util.safe_float(parts, lat_idx)
        lon = util.safe_float(parts, lon_idx)
        snow = util.safe_float(parts, snow_idx)
        temp = util.safe_float(parts, temp_idx)

        if lat is None or lon is None or snow is None or temp is None:
            return None

        albedo = util.safe_float(parts, albedo_idx)
        vegetation = util.safe_float(parts, veg_idx)
        precip = util.safe_float(parts, precip_idx)

        return (
            lat,
            lon,
            snow,
            temp,
            albedo,
            vegetation,
            precip,
        )

    except Exception:
        return None
schema = StructType([
    StructField("lat", DoubleType(), False),
    StructField("lon", DoubleType(), False),
    StructField("snow", DoubleType(), False),
    StructField("temp", DoubleType(), False),
    StructField("albedo", DoubleType(), True),
    StructField("vegetation", DoubleType(), True),
    StructField("precip", DoubleType(), True),
])

def snow_partial_transform(points):
    return (
        points
        .withColumn("lat_i", F.floor(F.col("lat") * 10).cast("int"))
        .withColumn("lon_i", F.floor(F.col("lon") * 10).cast("int"))
        .groupBy("lat_i", "lon_i")
        .agg(
            F.sum("lat").alias("sum_lat"),
            F.sum("lon").alias("sum_lon"),
            F.max("snow").alias("max_snow"),
            F.sum("snow").alias("sum_snow"),
            F.sum("temp").alias("sum_temp"),

            F.sum("albedo").alias("sum_albedo"),
            F.count("albedo").alias("count_albedo"),

            F.sum("vegetation").alias("sum_vegetation"),
            F.count("vegetation").alias("count_vegetation"),

            F.sum("precip").alias("sum_precip"),
            F.count("precip").alias("count_precip"),

            F.count("*").alias("count_points"),
        )
    )
In [4]:
snow_ctx = sh.SparkyBatchContext(
    glob_path="hdfs:///3hr/*/*_20*{11,12,01,02}*_*",
    partial_out="hdfs:///sparky/strangely_snowy_partials",
    parse_line=parse_line,
    schema=schema,
    partial_transform=snow_partial_transform,
    batch_size=50)
In [5]:
strangely_snowy_runtime_start = time.perf_counter()
input_paths = sparky.setup_job(snow_ctx)
Deleted old partial output: hdfs:/sparky/strangely_snowy_partials
Found 4036 files
                                                                                

Feature Engineering¶

For this question, I am using the following features:

  • time is the timestamp. It has datetime info for the sensor
  • lat is necessary for creating regional buckets and doing neighbor comparisons.
  • lon is necessary for creating regional buckets and doing neighbor comparisons.
  • snow_depth_surface is needed cause it's snowy.
  • temperature_surface a place with lower temperature is more likely to have snow.
  • albedo_surface is the reflective index of the surface of a location. A higher albedo value means that it is cooler.
  • vegetation_surface allows me to determine if the location identified is a desert or not. Lower vegetation is a desert.
  • total_precipitation_surface_3_hour_accumulation gives us an idea of whether the anomaly is because of a snow storm of some kind.
In [6]:
time_idx = sparky.required_col("time")
lat_idx = sparky.required_col("lat")
lon_idx = sparky.required_col("lon")
snow_idx = sparky.required_col("snow_depth_surface")
temp_idx = sparky.required_col("temperature_surface")
albedo_idx = sparky.required_col("albedo_surface")
veg_idx = sparky.required_col("vegetation_surface")
precip_idx = sparky.required_col("total_precipitation_surface_3_hour_accumulation")

print("Using columns:")
print("time:", time_idx)
print("lat:", lat_idx)
print("lon:", lon_idx)
print("snow:", snow_idx)
print("temp:", temp_idx)
print("albedo:", albedo_idx)
print("vegetation:", veg_idx)
print("precip:", precip_idx)
Using columns:
time: 0
lat: 1
lon: 2
snow: 9
temp: 10
albedo: 3
vegetation: 14
precip: 13
In [7]:
for batch_id, start in enumerate(range(0, len(input_paths), snow_ctx.batch_size)):
    batch_paths = input_paths[start:start + snow_ctx.batch_size]
    sparky.process_batch(snow_ctx, batch_paths, batch_id)
Processing batch 0: 50 files
                                                                                
Processing batch 1: 50 files
                                                                                
Processing batch 2: 50 files
                                                                                
Processing batch 3: 50 files
                                                                                
Processing batch 4: 50 files
                                                                                
Processing batch 5: 50 files
                                                                                
Processing batch 6: 50 files
                                                                                
Processing batch 7: 50 files
                                                                                
Processing batch 8: 50 files
                                                                                
Processing batch 9: 50 files
                                                                                
Processing batch 10: 50 files
                                                                                
Processing batch 11: 50 files
                                                                                
Processing batch 12: 50 files
                                                                                
Processing batch 13: 50 files
                                                                                
Processing batch 14: 50 files
                                                                                
Processing batch 15: 50 files
                                                                                
Processing batch 16: 50 files
                                                                                
Processing batch 17: 50 files
                                                                                
Processing batch 18: 50 files
                                                                                
Processing batch 19: 50 files
                                                                                
Processing batch 20: 50 files
                                                                                
Processing batch 21: 50 files
                                                                                
Processing batch 22: 50 files
                                                                                
Processing batch 23: 50 files
                                                                                
Processing batch 24: 50 files
                                                                                
Processing batch 25: 50 files
                                                                                
Processing batch 26: 50 files
                                                                                
Processing batch 27: 50 files
                                                                                
Processing batch 28: 50 files
                                                                                
Processing batch 29: 50 files
                                                                                
Processing batch 30: 50 files
                                                                                
Processing batch 31: 50 files
                                                                                
Processing batch 32: 50 files
                                                                                
Processing batch 33: 50 files
                                                                                
Processing batch 34: 50 files
                                                                                
Processing batch 35: 50 files
                                                                                
Processing batch 36: 50 files
                                                                                
Processing batch 37: 50 files
                                                                                
Processing batch 38: 50 files
                                                                                
Processing batch 39: 50 files
                                                                                
Processing batch 40: 50 files
                                                                                
Processing batch 41: 50 files
                                                                                
Processing batch 42: 50 files
                                                                                
Processing batch 43: 50 files
                                                                                
Processing batch 44: 50 files
                                                                                
Processing batch 45: 50 files
                                                                                
Processing batch 46: 50 files
                                                                                
Processing batch 47: 50 files
                                                                                
Processing batch 48: 50 files
                                                                                
Processing batch 49: 50 files
                                                                                
Processing batch 50: 50 files
                                                                                
Processing batch 51: 50 files
                                                                                
Processing batch 52: 50 files
                                                                                
Processing batch 53: 50 files
                                                                                
Processing batch 54: 50 files
                                                                                
Processing batch 55: 50 files
                                                                                
Processing batch 56: 50 files
                                                                                
Processing batch 57: 50 files
                                                                                
Processing batch 58: 50 files
                                                                                
Processing batch 59: 50 files
                                                                                
Processing batch 60: 50 files
                                                                                
Processing batch 61: 50 files
                                                                                
Processing batch 62: 50 files
                                                                                
Processing batch 63: 50 files
                                                                                
Processing batch 64: 50 files
                                                                                
Processing batch 65: 50 files
                                                                                
Processing batch 66: 50 files
                                                                                
Processing batch 67: 50 files
                                                                                
Processing batch 68: 50 files
                                                                                
Processing batch 69: 50 files
                                                                                
Processing batch 70: 50 files
                                                                                
Processing batch 71: 50 files
                                                                                
Processing batch 72: 50 files
                                                                                
Processing batch 73: 50 files
                                                                                
Processing batch 74: 50 files
                                                                                
Processing batch 75: 50 files
                                                                                
Processing batch 76: 50 files
                                                                                
Processing batch 77: 50 files
                                                                                
Processing batch 78: 50 files
                                                                                
Processing batch 79: 50 files
                                                                                
Processing batch 80: 36 files
                                                                                
In [8]:
from pyspark.sql import functions as F


def combine_snow_partials(partials):
    return (
        partials
        .groupBy("lat_i", "lon_i")
        .agg(
            (F.sum("sum_lat") / F.sum("count_points")).alias("lat"),
            (F.sum("sum_lon") / F.sum("count_points")).alias("lon"),

            F.max("max_snow").alias("cell_max_snow"),
            (F.sum("sum_snow") / F.sum("count_points")).alias("cell_avg_snow"),
            (F.sum("sum_temp") / F.sum("count_points")).alias("cell_avg_temp"),

            (F.sum("sum_albedo") / F.sum("count_albedo")).alias("cell_avg_albedo"),
            (F.sum("sum_vegetation") / F.sum("count_vegetation")).alias("cell_avg_vegetation"),
            (F.sum("sum_precip") / F.sum("count_precip")).alias("cell_avg_precip"),

            F.sum("count_points").alias("cell_points"),
        )
    )


def find_isolated_snow_cells(cell_stats):
    regional_stats = (
        cell_stats
        .withColumn("region_lat", F.floor(F.col("lat_i") / 5))
        .withColumn("region_lon", F.floor(F.col("lon_i") / 5))
        .groupBy("region_lat", "region_lon")
        .agg(
            F.avg("cell_avg_snow").alias("regional_avg_snow"),
            F.max("cell_max_snow").alias("regional_max_snow"),
            F.avg("cell_avg_temp").alias("regional_avg_temp"),
            F.avg("cell_avg_albedo").alias("regional_avg_albedo"),
            F.avg("cell_avg_vegetation").alias("regional_avg_vegetation"),
            F.avg("cell_avg_precip").alias("regional_avg_precip"),
            F.count("*").alias("regional_cell_count"),
        )
    )

    return (
        cell_stats
        .withColumn("region_lat", F.floor(F.col("lat_i") / 5))
        .withColumn("region_lon", F.floor(F.col("lon_i") / 5))
        .join(regional_stats, ["region_lat", "region_lon"], "inner")
        .withColumn("snow_isolation_score", F.col("cell_max_snow") - F.col("regional_avg_snow"))
        .withColumn("temp_colder_than_region", F.col("regional_avg_temp") - F.col("cell_avg_temp"))
        .withColumn("albedo_higher_than_region", F.col("cell_avg_albedo") - F.col("regional_avg_albedo"))
        .withColumn("vegetation_lower_than_region", F.col("regional_avg_vegetation") - F.col("cell_avg_vegetation"))
        .filter(F.col("cell_max_snow") >= 0.5)
        .filter(F.col("regional_avg_snow") <= 0.05)
        .filter(F.col("regional_cell_count") >= 4)
        .orderBy(F.desc("snow_isolation_score"), F.desc("cell_max_snow"))
        .select(
            "lat",
            "lon",
            "cell_max_snow",
            "cell_avg_snow",
            "regional_avg_snow",
            "regional_max_snow",
            "snow_isolation_score",
            "cell_avg_temp",
            "regional_avg_temp",
            "temp_colder_than_region",
            "cell_avg_albedo",
            "regional_avg_albedo",
            "albedo_higher_than_region",
            "cell_avg_vegetation",
            "regional_avg_vegetation",
            "vegetation_lower_than_region",
            "cell_points",
            "regional_cell_count",
        )
        .limit(3)
    )
In [9]:
snow_analysis_ctx = sh.SparkyAnalysisContext(
    partial_in=snow_ctx.partial_out,
    combine_partials=combine_snow_partials,
    analyze=find_isolated_snow_cells,
    result_out=None,
)
In [10]:
cell_stats = sparky.combine(snow_analysis_ctx)
top3 = sparky.analyze(snow_analysis_ctx, cell_stats)

top3.show(3, truncate=False)

strangely_snowy_runtime = time.perf_counter() - strangely_snowy_runtime_start
print(f"Total Strangely Snowy Spark runtime: {strangely_snowy_runtime:.2f} seconds")
                                                                                
+------------------+-------------------+-------------+-------------------+--------------------+-----------------+--------------------+-----------------+------------------+-----------------------+------------------+-------------------+-------------------------+-------------------+-----------------------+----------------------------+-----------+-------------------+
|lat               |lon                |cell_max_snow|cell_avg_snow      |regional_avg_snow   |regional_max_snow|snow_isolation_score|cell_avg_temp    |regional_avg_temp |temp_colder_than_region|cell_avg_albedo   |regional_avg_albedo|albedo_higher_than_region|cell_avg_vegetation|regional_avg_vegetation|vegetation_lower_than_region|cell_points|regional_cell_count|
+------------------+-------------------+-------------+-------------------+--------------------+-----------------+--------------------+-----------------+------------------+-----------------------+------------------+-------------------+-------------------------+-------------------+-----------------------+----------------------------+-----------+-------------------+
|36.48025474536799 |-118.56796062211633|3.5512       |0.2250536953106203 |0.04022571125487863 |3.5512           |3.5109742887451216  |273.6274536550868|282.88273287191134|9.255279216824533      |34.27946140680069 |17.87228976387486  |16.40717164292583        |42.21369727245658  |44.79528352141172      |2.5815862489551407          |4030       |20                 |
|36.374458471843475|-118.54488139420283|2.5751998    |0.16376453928171214|0.04022571125487863 |3.5512           |2.5349740887451215  |275.4141993548388|282.88273287191134|7.468533517072558      |27.601104492429887|17.87228976387486  |9.728814728555026        |43.9483995057072   |44.79528352141172      |0.8468840157045179          |4030       |20                 |
|43.58407050794018 |-86.06345689016926 |2.2184       |0.0411021434923077 |0.033958424925835444|2.2184           |2.1844415750741644  |279.4982177245658|279.81490099386303|0.31668326929724344    |26.151638124596676|23.022939336922576 |3.1286987876741          |33.55467741985112  |27.951770737498375     |-5.602906682352742          |4030       |22                 |
+------------------+-------------------+-------------+-------------------+--------------------+-----------------+--------------------+-----------------+------------------+-----------------------+------------------+-------------------+-------------------------+-------------------+-----------------------+----------------------------+-----------+-------------------+

Total Strangely Snowy Spark runtime: 2527.01 seconds

Result¶

The top place that this job found is located at 36.47, -118.56, which is Black Rock Pass Trail, CA This place has a max_snow value of $3.5512$ while the regional average is $0.0368$ .

This result makes sense because the average temperature at this location is $273.67K$ which is $0.52\degree C$ Interestingly, this is slightly more than water's freezing point of $0\degree C$ but given its high altitude, it makes sense that there is snow present at this temperature because of decrease in pressure at high altitude.
Black Rock Pass Trail is strangely snowy

Climate Chart¶

Given a Geohash prefix as an input, build a function that will create a climate chart for the region. This includes high, low, and average temperatures, as well as monthly average rainfall (precipitation).

Setup¶

For this question, I need to process all available files. The reason for this is that I have no prior information about which geohash is present in which file, and if I want to make the function to be independent of that, I need to parse all files so that the same function can be reused for multiple geohash prefixes. I am still doing batch processing like the previous question, and that helps not filling up the /tmp directory.

In [11]:
import sys
import importlib
import time
import matplotlib.pyplot as plt

from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, LongType, DoubleType

import util
import SparkHandler

importlib.reload(util)
importlib.reload(SparkHandler)

from util import safe_float, geohash_bbox
from SparkHandler import Sparky, SparkyBatchContext, SparkyAnalysisContext


# ------------------------------------------------------------
# Config
# ------------------------------------------------------------
GEOHASH_PREFIX = "9q8"
INPUT_GLOB = "hdfs:///3hr/*/*"
BATCH_SIZE = 50

PARTIAL_OUT = f"hdfs:///climate_chart_partials/{GEOHASH_PREFIX}"

lat_min, lat_max, lon_min, lon_max = geohash_bbox(GEOHASH_PREFIX)

print("Geohash:", GEOHASH_PREFIX)
print("lat range:", lat_min, lat_max)
print("lon range:", lon_min, lon_max)


# ------------------------------------------------------------
# Schema
# ------------------------------------------------------------
raw_schema = StructType([
    StructField("time_ms", LongType(), False),
    StructField("lat", DoubleType(), False),
    StructField("lon", DoubleType(), False),
    StructField("temp_k", DoubleType(), False),
    StructField("precip", DoubleType(), False),
])
Geohash: 9q8
lat range: 36.5625 37.96875
lon range: -123.75 -122.34375
In [12]:
# ------------------------------------------------------------
# Batch partial transform
# ------------------------------------------------------------
def climate_partial_transform(points):
    return (
        points
        .withColumn(
            "timestamp",
            F.from_unixtime((F.col("time_ms") / 1000).cast("long")).cast("timestamp")
        )
        .withColumn("year", F.year("timestamp"))
        .withColumn("month", F.month("timestamp"))
        .withColumn("lat_i", F.floor(F.col("lat") * 100).cast("int"))
        .withColumn("lon_i", F.floor(F.col("lon") * 100).cast("int"))
        .groupBy("year", "month", "lat_i", "lon_i")
        .agg(
            F.min("temp_k").alias("min_temp_k"),
            F.max("temp_k").alias("max_temp_k"),
            F.sum("temp_k").alias("sum_temp_k"),
            F.count("temp_k").alias("count_temp"),

            # NAM field is 3-hour accumulated precipitation.
            # Summing gives monthly accumulated precipitation for the cell.
            F.sum("precip").alias("sum_precip"),
            F.count("precip").alias("count_precip"),
        )
    )


# ------------------------------------------------------------
# Analysis: combine partials
# ------------------------------------------------------------
def combine_climate_partials(partials):
    return (
        partials
        .groupBy("year", "month", "lat_i", "lon_i")
        .agg(
            F.min("min_temp_k").alias("cell_month_min_temp_k"),
            F.max("max_temp_k").alias("cell_month_max_temp_k"),
            (F.sum("sum_temp_k") / F.sum("count_temp")).alias("cell_month_avg_temp_k"),
            F.sum("sum_precip").alias("cell_month_total_precip"),
        )
    )


# ------------------------------------------------------------
# Analysis: build climate chart table
# ------------------------------------------------------------
def analyze_climate(monthly_cell):
    return (
        monthly_cell
        .groupBy("month")
        .agg(
            F.avg("cell_month_min_temp_k").alias("low_temp_k"),
            F.avg("cell_month_max_temp_k").alias("high_temp_k"),
            F.avg("cell_month_avg_temp_k").alias("avg_temp_k"),
            F.avg("cell_month_total_precip").alias("avg_monthly_precip"),
            F.count("*").alias("month_cell_samples"),
        )
        .withColumn("low_temp_c", F.col("low_temp_k") - 273.15)
        .withColumn("high_temp_c", F.col("high_temp_k") - 273.15)
        .withColumn("avg_temp_c", F.col("avg_temp_k") - 273.15)
        .orderBy("month")
        .select(
            "month",
            "low_temp_c",
            "high_temp_c",
            "avg_temp_c",
            "avg_monthly_precip",
            "month_cell_samples",
        )
    )
In [13]:
sparky = Sparky(spark, sc)


setup_ctx = SparkyBatchContext(
    glob_path=INPUT_GLOB,
    partial_out=PARTIAL_OUT,
    schema=raw_schema,
    parse_line=lambda line: None, 
    partial_transform=climate_partial_transform,
    batch_size=BATCH_SIZE,
)

input_paths = sparky.setup_job(setup_ctx)

print("Total input files:", len(input_paths))
print("Batch size:", BATCH_SIZE)
print("Number of batches:", (len(input_paths) + BATCH_SIZE - 1) // BATCH_SIZE)

climate_chart_runtime_start = time.perf_counter()

if len(input_paths) == 0:
    raise ValueError(f"No files matched: {INPUT_GLOB}")


# ------------------------------------------------------------
# Resolve required columns
# ------------------------------------------------------------
time_idx = sparky.required_col("time")
lat_idx = sparky.required_col("lat")
lon_idx = sparky.required_col("lon")
temp_idx = sparky.required_col("temperature_surface")
precip_idx = sparky.required_col("total_precipitation_surface_3_hour_accumulation")

print("Using column indexes:")
print("time:", time_idx)
print("lat:", lat_idx)
print("lon:", lon_idx)
print("temp:", temp_idx)
print("precip:", precip_idx)
Deleted old partial output: hdfs:/climate_chart_partials/9q8
26/05/21 23:12:24 WARN SparkContext: The path /bigdata/students/rsbhatia2/p3/util.py has been added already. Overwriting of added paths is not supported in the current version.
26/05/21 23:12:24 WARN SparkContext: The path /bigdata/students/rsbhatia2/p3/SparkHandler.py has been added already. Overwriting of added paths is not supported in the current version.
Found 6770 files
[Stage 258:>                                                        (0 + 1) / 1]
Total input files: 6770
Batch size: 50
Number of batches: 136
Using column indexes:
time: 0
lat: 1
lon: 2
temp: 10
precip: 13
                                                                                
In [14]:
# ------------------------------------------------------------
# Parser
# ------------------------------------------------------------
def make_climate_parser(
    time_idx,
    lat_idx,
    lon_idx,
    temp_idx,
    precip_idx,
    lat_min,
    lat_max,
    lon_min,
    lon_max,
):
    def parse_climate_line(line):
        try:
            if line.startswith("1_time"):
                return None

            parts = line.rstrip("\t").split("\t")

            time_ms = safe_float(parts, time_idx)
            lat = safe_float(parts, lat_idx)
            lon = safe_float(parts, lon_idx)
            temp_k = safe_float(parts, temp_idx)
            precip = safe_float(parts, precip_idx)

            if (
                time_ms is None
                or lat is None
                or lon is None
                or temp_k is None
                or precip is None
            ):
                return None

            # Filter to geohash region before DataFrame work.
            if not (lat_min <= lat <= lat_max and lon_min <= lon <= lon_max):
                return None

            return (
                int(time_ms),
                lat,
                lon,
                temp_k,
                precip,
            )

        except Exception:
            return None

    return parse_climate_line


parse_climate_line = make_climate_parser(
    time_idx=time_idx,
    lat_idx=lat_idx,
    lon_idx=lon_idx,
    temp_idx=temp_idx,
    precip_idx=precip_idx,
    lat_min=lat_min,
    lat_max=lat_max,
    lon_min=lon_min,
    lon_max=lon_max,
)


# ------------------------------------------------------------
# batch context
# ------------------------------------------------------------
batch_ctx = SparkyBatchContext(
    glob_path=INPUT_GLOB,
    partial_out=PARTIAL_OUT,
    schema=raw_schema,
    parse_line=parse_climate_line,
    partial_transform=climate_partial_transform,
    batch_size=BATCH_SIZE,
)


# ------------------------------------------------------------
# Analysis context
# ------------------------------------------------------------
analysis_ctx = SparkyAnalysisContext(
    partial_in=PARTIAL_OUT,
    combine_partials=combine_climate_partials,
    analyze=analyze_climate,
    result_out=None,
)


# ------------------------------------------------------------
# Run batches
# ------------------------------------------------------------
for batch_id, start in enumerate(range(0, len(input_paths), batch_ctx.batch_size)):
    batch_paths = input_paths[start:start + batch_ctx.batch_size]
    sparky.process_batch(batch_ctx, batch_paths, batch_id)
Processing batch 0: 50 files
                                                                                
Processing batch 1: 50 files
                                                                                
Processing batch 2: 50 files
                                                                                
Processing batch 3: 50 files
                                                                                
Processing batch 4: 50 files
                                                                                
Processing batch 5: 50 files
                                                                                
Processing batch 6: 50 files
                                                                                
Processing batch 7: 50 files
                                                                                
Processing batch 8: 50 files
                                                                                
Processing batch 9: 50 files
                                                                                
Processing batch 10: 50 files
                                                                                
Processing batch 11: 50 files
                                                                                
Processing batch 12: 50 files
                                                                                
Processing batch 13: 50 files
                                                                                
Processing batch 14: 50 files
                                                                                
Processing batch 15: 50 files
                                                                                
Processing batch 16: 50 files
                                                                                
Processing batch 17: 50 files
                                                                                
Processing batch 18: 50 files
                                                                                
Processing batch 19: 50 files
                                                                                
Processing batch 20: 50 files
                                                                                
Processing batch 21: 50 files
                                                                                
Processing batch 22: 50 files
                                                                                
Processing batch 23: 50 files
                                                                                
Processing batch 24: 50 files
                                                                                
Processing batch 25: 50 files
                                                                                
Processing batch 26: 50 files
                                                                                
Processing batch 27: 50 files
                                                                                
Processing batch 28: 50 files
                                                                                
Processing batch 29: 50 files
                                                                                
Processing batch 30: 50 files
                                                                                
Processing batch 31: 50 files
                                                                                
Processing batch 32: 50 files
                                                                                
Processing batch 33: 50 files
                                                                                
Processing batch 34: 50 files
                                                                                
Processing batch 35: 50 files
                                                                                
Processing batch 36: 50 files
                                                                                
Processing batch 37: 50 files
                                                                                
Processing batch 38: 50 files
                                                                                
Processing batch 39: 50 files
                                                                                
Processing batch 40: 50 files
                                                                                
Processing batch 41: 50 files
                                                                                
Processing batch 42: 50 files
                                                                                
Processing batch 43: 50 files
                                                                                
Processing batch 44: 50 files
                                                                                
Processing batch 45: 50 files
                                                                                
Processing batch 46: 50 files
                                                                                
Processing batch 47: 50 files
                                                                                
Processing batch 48: 50 files
                                                                                
Processing batch 49: 50 files
                                                                                
Processing batch 50: 50 files
                                                                                
Processing batch 51: 50 files
                                                                                
Processing batch 52: 50 files
                                                                                
Processing batch 53: 50 files
                                                                                
Processing batch 54: 50 files
                                                                                
Processing batch 55: 50 files
                                                                                
Processing batch 56: 50 files
                                                                                
Processing batch 57: 50 files
                                                                                
Processing batch 58: 50 files
                                                                                
Processing batch 59: 50 files
                                                                                
Processing batch 60: 50 files
                                                                                
Processing batch 61: 50 files
                                                                                
Processing batch 62: 50 files
                                                                                
Processing batch 63: 50 files
                                                                                
Processing batch 64: 50 files
                                                                                
Processing batch 65: 50 files
                                                                                
Processing batch 66: 50 files
                                                                                
Processing batch 67: 50 files
                                                                                
Processing batch 68: 50 files
                                                                                
Processing batch 69: 50 files
                                                                                
Processing batch 70: 50 files
                                                                                
Processing batch 71: 50 files
                                                                                
Processing batch 72: 50 files
                                                                                
Processing batch 73: 50 files
                                                                                
Processing batch 74: 50 files
                                                                                
Processing batch 75: 50 files
                                                                                
Processing batch 76: 50 files
                                                                                
Processing batch 77: 50 files
                                                                                
Processing batch 78: 50 files
                                                                                
Processing batch 79: 50 files
                                                                                
Processing batch 80: 50 files
                                                                                
Processing batch 81: 50 files
                                                                                
Processing batch 82: 50 files
                                                                                
Processing batch 83: 50 files
                                                                                
Processing batch 84: 50 files
                                                                                
Processing batch 85: 50 files
                                                                                
Processing batch 86: 50 files
                                                                                
Processing batch 87: 50 files
                                                                                
Processing batch 88: 50 files
                                                                                
Processing batch 89: 50 files
                                                                                
Processing batch 90: 50 files
                                                                                
Processing batch 91: 50 files
                                                                                
Processing batch 92: 50 files
                                                                                
Processing batch 93: 50 files
                                                                                
Processing batch 94: 50 files
                                                                                
Processing batch 95: 50 files
                                                                                
Processing batch 96: 50 files
                                                                                
Processing batch 97: 50 files
                                                                                
Processing batch 98: 50 files
                                                                                
Processing batch 99: 50 files
                                                                                
Processing batch 100: 50 files
                                                                                
Processing batch 101: 50 files
                                                                                
Processing batch 102: 50 files
                                                                                
Processing batch 103: 50 files
                                                                                
Processing batch 104: 50 files
                                                                                
Processing batch 105: 50 files
                                                                                
Processing batch 106: 50 files
                                                                                
Processing batch 107: 50 files
                                                                                
Processing batch 108: 50 files
                                                                                
Processing batch 109: 50 files
                                                                                
Processing batch 110: 50 files
                                                                                
Processing batch 111: 50 files
                                                                                
Processing batch 112: 50 files
                                                                                
Processing batch 113: 50 files
                                                                                
Processing batch 114: 50 files
                                                                                
Processing batch 115: 50 files
                                                                                
Processing batch 116: 50 files
                                                                                
Processing batch 117: 50 files
                                                                                
Processing batch 118: 50 files
                                                                                
Processing batch 119: 50 files
                                                                                
Processing batch 120: 50 files
                                                                                
Processing batch 121: 50 files
                                                                                
Processing batch 122: 50 files
                                                                                
Processing batch 123: 50 files
                                                                                
Processing batch 124: 50 files
                                                                                
Processing batch 125: 50 files
                                                                                
Processing batch 126: 50 files
                                                                                
Processing batch 127: 50 files
                                                                                
Processing batch 128: 50 files
                                                                                
Processing batch 129: 50 files
                                                                                
Processing batch 130: 50 files
                                                                                
Processing batch 131: 50 files
                                                                                
Processing batch 132: 50 files
                                                                                
Processing batch 133: 50 files
                                                                                
Processing batch 134: 50 files
                                                                                
Processing batch 135: 20 files
                                                                                
In [15]:
# ------------------------------------------------------------
# Combine + analyze
# ------------------------------------------------------------
monthly_cell = sparky.combine(analysis_ctx)
climate = sparky.analyze(analysis_ctx, monthly_cell)

climate.show(12, truncate=False)
[Stage 671:>                                                        (0 + 1) / 1]
+-----+------------------+------------------+------------------+------------------+------------------+
|month|low_temp_c        |high_temp_c       |avg_temp_c        |avg_monthly_precip|month_cell_samples|
+-----+------------------+------------------+------------------+------------------+------------------+
|1    |12.016236348974303|14.309803240470046|13.181206475171223|45.917338709677416|682               |
|2    |11.649982639297434|14.54796824046906 |13.0830678088156  |36.53354105571847 |682               |
|3    |11.734685982403846|14.864220967741858|13.083496387227683|22.082950879765395|682               |
|4    |11.814751147741504|14.763760231990375|12.970997660776447|11.306666687449331|819               |
|5    |11.562745754026366|14.911664436311582|12.97150433422297 |7.391087115666179 |683               |
|6    |11.90829101024758 |16.16154909224224 |13.731071802928227|1.9405197657393864|683               |
|7    |12.695180439238584|16.93223509516764 |14.705720532938074|3.0604868228404074|683               |
|8    |13.853945841875657|18.351347964860622|15.888160752673343|3.1662701317715958|683               |
|9    |14.561720146413222|18.62315465592974 |16.27131729169463 |1.848188140556369 |683               |
|10   |14.205379575402674|17.53579715958972 |15.603527402464351|5.35953513909224  |683               |
|11   |13.244209956076645|16.718046676428912|14.844923994043654|17.75137262079063 |683               |
|12   |12.551296642245688|14.921054383394164|13.73723547992904 |20.238629426129425|819               |
+-----+------------------+------------------+------------------+------------------+------------------+

                                                                                
In [16]:
# ------------------------------------------------------------
# Plot
# ------------------------------------------------------------
pdf = climate.toPandas()

if len(pdf) == 0:
    print("No data found for this geohash prefix.")
else:
    month_labels = [
        "Jan", "Feb", "Mar", "Apr", "May", "Jun",
        "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
    ]

    pdf["month_name"] = pdf["month"].apply(lambda m: month_labels[int(m) - 1])

    fig, ax1 = plt.subplots(figsize=(10, 5))

    ax1.plot(
        pdf["month_name"],
        pdf["high_temp_c"],
        marker="o",
        color="red",
        label="Max temp",
    )

    ax1.plot(
        pdf["month_name"],
        pdf["avg_temp_c"],
        marker="o",
        color="purple",
        label="Avg temp",
    )

    ax1.plot(
        pdf["month_name"],
        pdf["low_temp_c"],
        marker="o",
        color="blue",
        label="Min temp",
    )

    ax1.set_xlabel("Month")
    ax1.set_ylabel("Temperature (°C)")
    ax1.set_ylim(bottom=0)
    ax1.legend(loc="upper left")

    ax2 = ax1.twinx()

    ax2.bar(
        pdf["month_name"],
        pdf["avg_monthly_precip"],
        alpha=0.3,
        label="Avg monthly precip",
    )

    ax2.set_ylabel("Average monthly precipitation")
    ax2.set_ylim(bottom=0)
    ax2.legend(loc="upper right")

    plt.title(f"Climate Chart for Geohash Prefix {GEOHASH_PREFIX}")
    plt.grid(True, axis="y", alpha=0.3)
    plt.show()

climate_chart_runtime = time.perf_counter() - climate_chart_runtime_start
print(f"Total Climate Chart Spark runtime: {climate_chart_runtime:.2f} seconds")
                                                                                
No description has been provided for this image
Total Climate Chart Spark runtime: 2224.99 seconds

Travel Startup¶

In [17]:
import sys
import importlib
import re
import time

import matplotlib.pyplot as plt
import pandas as pd

from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    LongType,
    DoubleType,
)
from pyspark.sql.window import Window

import util
import SparkHandler

importlib.reload(util)
importlib.reload(SparkHandler)

from util import safe_float
from SparkHandler import Sparky, SparkyBatchContext, SparkyAnalysisContext


# ============================================================
# CONFIG
# ============================================================
sc.setLogLevel("ERROR")
INPUT_GLOB = "hdfs:///3hr/2018/*"
BATCH_SIZE = 50
PARTIAL_OUT = "hdfs:///travel_startup_2018_partials"

TRAVEL_WINDOWS = [
    ("Winter Break / New Year", [12, 1]),
    ("Spring", [2, 3, 4]),
    ("Early Summer", [5, 6, 7]),
    ("Late Summer", [8, 9]),
    ("Fall", [10, 11]),
]

REGIONS = [
    {
        "region": "San Francisco Bay Area",
        "lat_min": 37.2,
        "lat_max": 38.2,
        "lon_min": -122.8,
        "lon_max": -121.7,
    },
    {
        "region": "Los Angeles Coast",
        "lat_min": 33.5,
        "lat_max": 34.4,
        "lon_min": -118.8,
        "lon_max": -117.7,
    },
    {
        "region": "San Diego Coast",
        "lat_min": 32.5,
        "lat_max": 33.1,
        "lon_min": -117.4,
        "lon_max": -116.8,
    },
    {
        "region": "Seattle / Puget Sound",
        "lat_min": 47.2,
        "lat_max": 47.9,
        "lon_min": -122.7,
        "lon_max": -121.9,
    },
    {
        "region": "Portland / Willamette Valley",
        "lat_min": 45.2,
        "lat_max": 45.8,
        "lon_min": -123.1,
        "lon_max": -122.2,
    },
    {
        "region": "Denver / Front Range",
        "lat_min": 39.4,
        "lat_max": 40.1,
        "lon_min": -105.4,
        "lon_max": -104.5,
    },
    {
        "region": "Las Vegas",
        "lat_min": 35.9,
        "lat_max": 36.5,
        "lon_min": -115.5,
        "lon_max": -114.8,
    },
    {
        "region": "Phoenix / Scottsdale",
        "lat_min": 33.2,
        "lat_max": 33.8,
        "lon_min": -112.4,
        "lon_max": -111.6,
    },
    {
        "region": "New York City",
        "lat_min": 40.3,
        "lat_max": 41.1,
        "lon_min": -74.4,
        "lon_max": -73.4,
    },
    {
        "region": "Chicago",
        "lat_min": 41.5,
        "lat_max": 42.2,
        "lon_min": -88.1,
        "lon_max": -87.3,
    },
    {
        "region": "Miami",
        "lat_min": 25.5,
        "lat_max": 26.1,
        "lon_min": -80.5,
        "lon_max": -80.0,
    },
    {
        "region": "New Orleans",
        "lat_min": 29.7,
        "lat_max": 30.3,
        "lon_min": -90.4,
        "lon_max": -89.7,
    },
    {
        "region": "Boston",
        "lat_min": 42.1,
        "lat_max": 42.6,
        "lon_min": -71.4,
        "lon_max": -70.8,
    },
    {
        "region": "Vancouver",
        "lat_min": 49.0,
        "lat_max": 49.5,
        "lon_min": -123.4,
        "lon_max": -122.7,
    },
]

IDEAL_TEMP_C = 21.0
MAX_COMFORT_HIGH_C = 30.0
MIN_COMFORT_LOW_C = 8.0


def safe_window_name(window_name):
    return re.sub(r"[^a-zA-Z0-9_]+", "_", window_name.lower()).strip("_")


def month_from_path(path):
    match = re.search(r"(20\d{2})(\d{2})(\d{2})", path)
    if match is None:
        return None
    return int(match.group(2))


def region_for_point(lat, lon):
    for region in REGIONS:
        if (
            region["lat_min"] <= lat <= region["lat_max"]
            and region["lon_min"] <= lon <= region["lon_max"]
        ):
            return region["region"]
    return None


# ============================================================
# SCHEMA
# ============================================================
raw_schema = StructType([
    StructField("region", StringType(), False),
    StructField("time_ms", LongType(), False),
    StructField("lat", DoubleType(), False),
    StructField("lon", DoubleType(), False),
    StructField("temp_k", DoubleType(), False),
    StructField("precip", DoubleType(), False),
    StructField("rh", DoubleType(), False),
    StructField("cloud", DoubleType(), False),
    StructField("snow", DoubleType(), False),
    StructField("wind_speed", DoubleType(), False),
])


# ============================================================
# PARTIAL TRANSFORM
# ============================================================
def travel_partial_transform(points):
    return (
        points
        .withColumn("lat_i", F.floor(F.col("lat") * 100).cast("int"))
        .withColumn("lon_i", F.floor(F.col("lon") * 100).cast("int"))
        .groupBy("region", "lat_i", "lon_i")
        .agg(
            F.sum("lat").alias("sum_lat"),
            F.sum("lon").alias("sum_lon"),
            F.count("*").alias("count_points"),

            F.sum("temp_k").alias("sum_temp_k"),
            F.count("temp_k").alias("count_temp"),
            F.min("temp_k").alias("min_temp_k"),
            F.max("temp_k").alias("max_temp_k"),

            F.sum("precip").alias("sum_precip"),
            F.count("precip").alias("count_precip"),

            F.sum("rh").alias("sum_rh"),
            F.count("rh").alias("count_rh"),

            F.sum("cloud").alias("sum_cloud"),
            F.count("cloud").alias("count_cloud"),

            F.sum("snow").alias("sum_snow"),
            F.max("snow").alias("max_snow"),
            F.count("snow").alias("count_snow"),

            F.sum("wind_speed").alias("sum_wind"),
            F.count("wind_speed").alias("count_wind"),
        )
    )


# ============================================================
# COMBINE PARTIALS
# ============================================================
def combine_travel_partials(partials):
    return (
        partials
        .groupBy("region", "lat_i", "lon_i")
        .agg(
            (F.sum("sum_lat") / F.sum("count_points")).alias("lat"),
            (F.sum("sum_lon") / F.sum("count_points")).alias("lon"),

            F.sum("sum_temp_k").alias("sum_temp_k"),
            F.sum("count_temp").alias("count_temp"),
            F.min("min_temp_k").alias("low_temp_k"),
            F.max("max_temp_k").alias("high_temp_k"),

            F.sum("sum_precip").alias("sum_precip"),
            F.sum("count_precip").alias("count_precip"),

            F.sum("sum_rh").alias("sum_rh"),
            F.sum("count_rh").alias("count_rh"),

            F.sum("sum_cloud").alias("sum_cloud"),
            F.sum("count_cloud").alias("count_cloud"),

            F.sum("sum_snow").alias("sum_snow"),
            F.max("max_snow").alias("max_snow"),
            F.sum("count_snow").alias("count_snow"),

            F.sum("sum_wind").alias("sum_wind"),
            F.sum("count_wind").alias("count_wind"),

            F.sum("count_points").alias("cell_points"),
        )
        .withColumn("avg_temp_k", F.col("sum_temp_k") / F.col("count_temp"))
        .withColumn("avg_precip_per_3hr", F.col("sum_precip") / F.col("count_precip"))
        .withColumn("avg_daily_precip", F.col("avg_precip_per_3hr") * F.lit(8.0))
        .withColumn("avg_rh", F.col("sum_rh") / F.col("count_rh"))
        .withColumn("avg_cloud", F.col("sum_cloud") / F.col("count_cloud"))
        .withColumn("avg_snow", F.col("sum_snow") / F.col("count_snow"))
        .withColumn("avg_wind", F.col("sum_wind") / F.col("count_wind"))
    )


# ============================================================
# ANALYZER
# ============================================================
def make_travel_analyzer(window_name, months):
    def analyze_travel_window(cell_window):
        region_window = (
            cell_window
            .groupBy("region")
            .agg(
                F.avg("lat").alias("lat"),
                F.avg("lon").alias("lon"),

                F.avg("avg_temp_k").alias("avg_temp_k"),
                F.avg("low_temp_k").alias("low_temp_k"),
                F.avg("high_temp_k").alias("high_temp_k"),

                F.avg("avg_daily_precip").alias("avg_daily_precip"),
                F.avg("avg_rh").alias("avg_rh"),
                F.avg("avg_cloud").alias("avg_cloud"),
                F.avg("avg_snow").alias("avg_snow"),
                F.max("max_snow").alias("max_snow"),
                F.avg("avg_wind").alias("avg_wind"),

                F.sum("cell_points").alias("region_points"),
                F.count("*").alias("region_cells"),
            )
            .withColumn("travel_window", F.lit(window_name))
            .withColumn("months", F.lit(",".join(str(month) for month in months)))
            .withColumn("avg_temp_c", F.col("avg_temp_k") - F.lit(273.15))
            .withColumn("low_temp_c", F.col("low_temp_k") - F.lit(273.15))
            .withColumn("high_temp_c", F.col("high_temp_k") - F.lit(273.15))
        )

        return (
            region_window
            .withColumn(
                "temp_penalty",
                F.abs(F.col("avg_temp_c") - F.lit(IDEAL_TEMP_C)) * F.lit(3.0),
            )
            .withColumn(
                "heat_penalty",
                F.greatest(
                    F.col("high_temp_c") - F.lit(MAX_COMFORT_HIGH_C),
                    F.lit(0.0),
                ) * F.lit(2.0),
            )
            .withColumn(
                "cold_penalty",
                F.greatest(
                    F.lit(MIN_COMFORT_LOW_C) - F.col("low_temp_c"),
                    F.lit(0.0),
                ) * F.lit(2.0),
            )
            .withColumn(
                "rain_penalty",
                F.least(F.col("avg_daily_precip") * F.lit(5.0), F.lit(25.0)),
            )
            .withColumn(
                "humidity_penalty",
                F.greatest(F.col("avg_rh") - F.lit(70.0), F.lit(0.0)) * F.lit(0.35),
            )
            .withColumn(
                "cloud_penalty",
                F.least(F.col("avg_cloud") / F.lit(10.0), F.lit(10.0)),
            )
            .withColumn(
                "wind_penalty",
                F.greatest(F.col("avg_wind") - F.lit(6.0), F.lit(0.0)) * F.lit(3.0),
            )
            .withColumn(
                "snow_penalty",
                F.least(F.col("max_snow") * F.lit(20.0), F.lit(20.0)),
            )
            .withColumn(
                "comfort_index_raw",
                F.lit(100.0)
                - F.col("temp_penalty")
                - F.col("heat_penalty")
                - F.col("cold_penalty")
                - F.col("rain_penalty")
                - F.col("humidity_penalty")
                - F.col("cloud_penalty")
                - F.col("wind_penalty")
                - F.col("snow_penalty"),
            )
            .withColumn(
                "comfort_index",
                F.least(F.greatest(F.col("comfort_index_raw"), F.lit(0.0)), F.lit(100.0)),
            )
            .select(
                "travel_window",
                "months",
                "region",
                "lat",
                "lon",
                "comfort_index",
                "avg_temp_c",
                "low_temp_c",
                "high_temp_c",
                "avg_daily_precip",
                "avg_rh",
                "avg_cloud",
                "avg_wind",
                "avg_snow",
                "max_snow",
                "temp_penalty",
                "heat_penalty",
                "cold_penalty",
                "rain_penalty",
                "humidity_penalty",
                "cloud_penalty",
                "wind_penalty",
                "snow_penalty",
                "region_points",
                "region_cells",
            )
        )

    return analyze_travel_window


sparky = Sparky(spark, sc)

setup_ctx = SparkyBatchContext(
    glob_path=INPUT_GLOB,
    partial_out=PARTIAL_OUT,
    schema=raw_schema,
    parse_line=lambda line: None,
    partial_transform=travel_partial_transform,
    batch_size=BATCH_SIZE,
)

all_paths = sparky.setup_job(setup_ctx)

print("Total 2018 files:", len(all_paths))

if len(all_paths) == 0:
    raise ValueError(f"No files matched: {INPUT_GLOB}")

paths_by_window = {}

for window_name, months in TRAVEL_WINDOWS:
    month_set = set(months)
    window_paths = [path for path in all_paths if month_from_path(path) in month_set]
    paths_by_window[window_name] = window_paths
    print(f"{window_name}: {len(window_paths)} 2018 files")


# ============================================================
# RESOLVE COLUMNS
# ============================================================
time_idx = sparky.required_col("time")
lat_idx = sparky.required_col("lat")
lon_idx = sparky.required_col("lon")
temp_idx = sparky.required_col("temperature_surface")
precip_idx = sparky.required_col("total_precipitation_surface_3_hour_accumulation")
rh_idx = sparky.required_col("relative_humidity_zerodegc_isotherm")
cloud_idx = sparky.required_col("total_cloud_cover_entire_atmosphere_single_layer")
snow_idx = sparky.required_col("snow_depth_surface")
wind_speed_idx = sparky.required_col("wind_speed_gust_surface")

print("Using column indexes:")
print("time:", time_idx)
print("lat:", lat_idx)
print("lon:", lon_idx)
print("temp:", temp_idx)
print("precip:", precip_idx)
print("rh:", rh_idx)
print("cloud:", cloud_idx)
print("snow:", snow_idx)
print("wind_speed:", wind_speed_idx)


# ============================================================
# PARSER
# ============================================================
def make_travel_parser(
    time_idx,
    lat_idx,
    lon_idx,
    temp_idx,
    precip_idx,
    rh_idx,
    cloud_idx,
    snow_idx,
    wind_speed_idx,
):
    def parse_travel_line(line):
        try:
            if line.startswith("1_time"):
                return None

            parts = line.rstrip("\t").split("\t")

            time_ms = safe_float(parts, time_idx)
            lat = safe_float(parts, lat_idx)
            lon = safe_float(parts, lon_idx)
            temp_k = safe_float(parts, temp_idx)
            precip = safe_float(parts, precip_idx)
            rh = safe_float(parts, rh_idx)
            cloud = safe_float(parts, cloud_idx)
            snow = safe_float(parts, snow_idx)
            wind_speed = safe_float(parts, wind_speed_idx)

            if (
                time_ms is None
                or lat is None
                or lon is None
                or temp_k is None
                or precip is None
                or rh is None
                or cloud is None
                or snow is None
                or wind_speed is None
            ):
                return None

            region = region_for_point(lat, lon)

            if region is None:
                return None

            return (
                region,
                int(time_ms),
                lat,
                lon,
                temp_k,
                precip,
                rh,
                cloud,
                snow,
                wind_speed,
            )

        except Exception:
            return None

    return parse_travel_line


parse_travel_line = make_travel_parser(
    time_idx=time_idx,
    lat_idx=lat_idx,
    lon_idx=lon_idx,
    temp_idx=temp_idx,
    precip_idx=precip_idx,
    rh_idx=rh_idx,
    cloud_idx=cloud_idx,
    snow_idx=snow_idx,
    wind_speed_idx=wind_speed_idx,
)


# ============================================================
# PROCESS ONE WINDOW
# ============================================================
def process_window(window_name, months, window_paths):
    cleaned_window_name = safe_window_name(window_name)
    window_out = f"{PARTIAL_OUT}/{cleaned_window_name}"

    print()
    print("=" * 80)
    print(f"Processing 2018 window: {window_name}")
    print(f"Months: {months}")
    print(f"Files: {len(window_paths)}")
    print("=" * 80)

    if len(window_paths) == 0:
        print(f"No 2018 files for {window_name}; skipping.")
        return None

    batch_ctx = SparkyBatchContext(
        glob_path=INPUT_GLOB,
        partial_out=window_out,
        schema=raw_schema,
        parse_line=parse_travel_line,
        partial_transform=travel_partial_transform,
        batch_size=BATCH_SIZE,
    )

    analysis_ctx = SparkyAnalysisContext(
        partial_in=window_out,
        combine_partials=combine_travel_partials,
        analyze=make_travel_analyzer(window_name, months),
        result_out=None,
    )

    window_start = time.perf_counter()

    for batch_id, start in enumerate(range(0, len(window_paths), batch_ctx.batch_size)):
        batch_start = time.perf_counter()
        batch_paths = window_paths[start:start + batch_ctx.batch_size]
        sparky.process_batch(batch_ctx, batch_paths, batch_id)
        batch_elapsed = time.perf_counter() - batch_start
        print(f"Finished {window_name} batch {batch_id} in {batch_elapsed:.2f} seconds")

    cell_window = sparky.combine(analysis_ctx)
    scored = sparky.analyze(analysis_ctx, cell_window)

    window_elapsed = time.perf_counter() - window_start
    print(f"Finished 2018 window {window_name} in {window_elapsed:.2f} seconds")

    return scored


# ============================================================
# RUN ALL TRAVEL WINDOWS
# ============================================================
scored_windows = []
overall_start = time.perf_counter()

for window_name, months in TRAVEL_WINDOWS:
    scored = process_window(
        window_name=window_name,
        months=months,
        window_paths=paths_by_window[window_name],
    )

    if scored is not None:
        scored_windows.append(scored)

overall_elapsed = time.perf_counter() - overall_start
print(f"Total 2018 Travel Startup Spark runtime: {overall_elapsed:.2f} seconds")

if len(scored_windows) == 0:
    raise ValueError("No scored 2018 windows were produced.")

all_scores = scored_windows[0]

for scored in scored_windows[1:]:
    all_scores = all_scores.unionByName(scored)


# ============================================================
# BEST REGION PER TRAVEL WINDOW
# ============================================================
ranked = (
    all_scores
    .withColumn(
        "rank_in_window",
        F.row_number().over(
            Window.partitionBy("travel_window").orderBy(F.desc("comfort_index"))
        ),
    )
)

best_per_window = (
    ranked
    .filter(F.col("rank_in_window") == 1)
    .drop("rank_in_window")
    .orderBy(
        F.expr("""
            CASE travel_window
                WHEN 'Winter Break / New Year' THEN 1
                WHEN 'Spring' THEN 2
                WHEN 'Early Summer' THEN 3
                WHEN 'Late Summer' THEN 4
                WHEN 'Fall' THEN 5
                ELSE 99
            END
        """)
    )
)

print("Best destination for each 2018 travel window:")
best_per_window.select(
    "travel_window",
    "months",
    "region",
    "comfort_index",
    "avg_temp_c",
    "low_temp_c",
    "high_temp_c",
    "avg_daily_precip",
    "avg_rh",
    "avg_cloud",
    "avg_wind",
    "max_snow",
    "region_cells",
).show(truncate=False)

print("All 2018 destination scores:")
all_scores.orderBy("travel_window", F.desc("comfort_index")).select(
    "travel_window",
    "months",
    "region",
    "comfort_index",
    "avg_temp_c",
    "low_temp_c",
    "high_temp_c",
    "avg_daily_precip",
    "avg_rh",
    "avg_cloud",
    "avg_wind",
    "max_snow",
    "region_cells",
).show(200, truncate=False)
Deleted old partial output: hdfs:/travel_startup_2018_partials
Found 1443 files
                                                                                
Total 2018 files: 1443
Winter Break / New Year: 248 2018 files
Spring: 340 2018 files
Early Summer: 367 2018 files
Late Summer: 244 2018 files
Fall: 244 2018 files
Using column indexes:
time: 0
lat: 1
lon: 2
temp: 10
precip: 13
rh: 8
cloud: 12
snow: 9
wind_speed: 17

================================================================================
Processing 2018 window: Winter Break / New Year
Months: [12, 1]
Files: 248
================================================================================
Processing batch 0: 50 files
                                                                                
Finished Winter Break / New Year batch 0 in 23.74 seconds
Processing batch 1: 50 files
                                                                                
Finished Winter Break / New Year batch 1 in 21.65 seconds
Processing batch 2: 50 files
                                                                                
Finished Winter Break / New Year batch 2 in 21.77 seconds
Processing batch 3: 50 files
                                                                                
Finished Winter Break / New Year batch 3 in 17.31 seconds
Processing batch 4: 48 files
                                                                                
Finished Winter Break / New Year batch 4 in 16.17 seconds
Finished 2018 window Winter Break / New Year in 101.05 seconds

================================================================================
Processing 2018 window: Spring
Months: [2, 3, 4]
Files: 340
================================================================================
Processing batch 0: 50 files
                                                                                
Finished Spring batch 0 in 16.84 seconds
Processing batch 1: 50 files
                                                                                
Finished Spring batch 1 in 16.98 seconds
Processing batch 2: 50 files
                                                                                
Finished Spring batch 2 in 16.17 seconds
Processing batch 3: 50 files
                                                                                
Finished Spring batch 3 in 16.85 seconds
Processing batch 4: 50 files
                                                                                
Finished Spring batch 4 in 17.14 seconds
Processing batch 5: 50 files
                                                                                
Finished Spring batch 5 in 17.22 seconds
Processing batch 6: 40 files
                                                                                
Finished Spring batch 6 in 16.28 seconds
Finished 2018 window Spring in 117.94 seconds

================================================================================
Processing 2018 window: Early Summer
Months: [5, 6, 7]
Files: 367
================================================================================
Processing batch 0: 50 files
                                                                                
Finished Early Summer batch 0 in 16.98 seconds
Processing batch 1: 50 files
                                                                                
Finished Early Summer batch 1 in 15.34 seconds
Processing batch 2: 50 files
                                                                                
Finished Early Summer batch 2 in 16.10 seconds
Processing batch 3: 50 files
                                                                                
Finished Early Summer batch 3 in 17.33 seconds
Processing batch 4: 50 files
                                                                                
Finished Early Summer batch 4 in 16.82 seconds
Processing batch 5: 50 files
                                                                                
Finished Early Summer batch 5 in 17.62 seconds
Processing batch 6: 50 files
                                                                                
Finished Early Summer batch 6 in 17.86 seconds
Processing batch 7: 17 files
                                                                                
Finished Early Summer batch 7 in 10.67 seconds
Finished 2018 window Early Summer in 129.16 seconds

================================================================================
Processing 2018 window: Late Summer
Months: [8, 9]
Files: 244
================================================================================
Processing batch 0: 50 files
                                                                                
Finished Late Summer batch 0 in 16.75 seconds
Processing batch 1: 50 files
                                                                                
Finished Late Summer batch 1 in 17.77 seconds
Processing batch 2: 50 files
                                                                                
Finished Late Summer batch 2 in 18.31 seconds
Processing batch 3: 50 files
                                                                                
Finished Late Summer batch 3 in 16.74 seconds
Processing batch 4: 44 files
                                                                                
Finished Late Summer batch 4 in 17.10 seconds
Finished 2018 window Late Summer in 87.10 seconds

================================================================================
Processing 2018 window: Fall
Months: [10, 11]
Files: 244
================================================================================
Processing batch 0: 50 files
                                                                                
Finished Fall batch 0 in 17.50 seconds
Processing batch 1: 50 files
                                                                                
Finished Fall batch 1 in 16.95 seconds
Processing batch 2: 50 files
                                                                                
Finished Fall batch 2 in 15.63 seconds
Processing batch 3: 50 files
                                                                                
Finished Fall batch 3 in 16.51 seconds
Processing batch 4: 44 files
                                                                                
Finished Fall batch 4 in 17.10 seconds
Finished 2018 window Fall in 84.15 seconds
Total 2018 Travel Startup Spark runtime: 519.40 seconds
Best destination for each 2018 travel window:
                                                                                
+-----------------------+------+----------------------+-----------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+--------+------------+
|travel_window          |months|region                |comfort_index    |avg_temp_c        |low_temp_c        |high_temp_c       |avg_daily_precip   |avg_rh            |avg_cloud         |avg_wind          |max_snow|region_cells|
+-----------------------+------+----------------------+-----------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+--------+------------+
|Winter Break / New Year|12,1  |Miami                 |78.07625905889968|22.60698234062977 |12.499861904761872|30.206612857142943|1.7699692780337943 |30.741935483870964|43.60349462365591 |7.159790784130184 |0.0     |21          |
|Spring                 |2,3,4 |New Orleans           |75.72603480160166|19.214442488843872|6.202835517241397 |29.882799999999918|1.987981744421906  |34.21196754563895 |46.18225152129816 |6.254943275057809 |0.0     |29          |
|Early Summer           |5,6,7 |Vancouver             |76.88059243809968|17.738883249614958|7.667636521739155 |31.854160000000036|0.9808079611420447 |58.77898125245823 |36.24570548513209 |2.939737445120247 |0.02172 |23          |
|Late Summer            |8,9   |San Francisco Bay Area|78.30875940109274|19.323642155191294|10.4081458666667  |36.11454746666681 |0.09863387978142077|20.676612021857917|39.375027322404385|4.793583148777318 |1.2E-4  |75          |
|Fall                   |10,11 |Los Angeles Coast     |82.2612633729508 |19.143465542740046|11.329275571428695|33.20703571428572 |0.5796545667447308 |26.397599531615924|28.535889929742382|3.3943909924268145|1.6E-4  |70          |
+-----------------------+------+----------------------+-----------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+--------+------------+

All 2018 destination scores:
[Stage 847:=============================================>           (4 + 1) / 5]
+-----------------------+------+----------------------------+------------------+-------------------+--------------------+------------------+-------------------+------------------+------------------+------------------+--------+------------+
|travel_window          |months|region                      |comfort_index     |avg_temp_c         |low_temp_c          |high_temp_c       |avg_daily_precip   |avg_rh            |avg_cloud         |avg_wind          |max_snow|region_cells|
+-----------------------+------+----------------------------+------------------+-------------------+--------------------+------------------+-------------------+------------------+------------------+------------------+--------+------------+
|Early Summer           |5,6,7 |Vancouver                   |76.8805924380998  |17.738883249614958 |7.667636521739155   |31.85415999999998 |0.9808079611420449 |58.778981252458244|36.24570548513209 |2.939737445120247 |0.02172 |23          |
|Early Summer           |5,6,7 |San Francisco Bay Area      |75.23419130027237 |18.594241428519638 |8.945867600000042   |36.24976840000022 |0.1333878292461399 |28.30153861398728 |43.81257039055402 |5.973426027602543 |4.0E-5  |75          |
|Early Summer           |5,6,7 |Boston                      |68.71109728719576 |21.34224604774886  |6.2502580952381095  |36.7153295238096  |1.661411703646036  |45.31821435785649 |47.5632541845076  |6.089545925244583 |0.0     |21          |
|Early Summer           |5,6,7 |Chicago                     |67.27978765866976 |20.157102663653916 |6.3914039024390945  |33.806859024390235|3.0575197713830002 |45.534758081876774|40.73011231474713 |5.5604947237523765|0.0     |41          |
|Early Summer           |5,6,7 |Los Angeles Coast           |65.29434501907262 |23.09865853016754  |12.361159000000043  |41.73394728571452 |0.15988711560918653|26.86214844920203 |41.415492409497865|2.9103841196220315|4.0E-5  |70          |
|Early Summer           |5,6,7 |Seattle / Puget Sound       |65.08614318948342 |19.54412570807858  |6.672773513513505   |39.57482486486492 |0.8950217247219973 |51.90821853243979 |42.670226084395026|3.145185755161205 |0.0     |37          |
|Early Summer           |5,6,7 |San Diego Coast             |65.04266619945477 |22.34063886648505  |11.719746400000076  |42.14704920000008 |0.3068664850136239 |26.71588920326975 |51.069863760217984|2.655238927747139 |0.0     |25          |
|Early Summer           |5,6,7 |New York City               |61.49962273024549 |21.583587757740816 |5.88843345454552    |35.008065818181876|3.529749814218481  |45.51155926182808 |48.61600198166955 |5.661918150071341 |0.0     |55          |
|Early Summer           |5,6,7 |Portland / Willamette Valley|60.256616593248694|19.270396457008815 |4.911733333333359   |41.103897500000016|0.4607553739025128 |42.67463276899789 |38.6566757493188  |3.592475078387603 |4.0E-5  |36          |
|Early Summer           |5,6,7 |Denver / Front Range        |47.38774939600382 |21.7473702925912   |2.846803095238158   |44.16499380952371 |1.5713961333852342 |56.44744753341117 |33.83177630725314 |4.02809529046062  |0.02468 |42          |
|Early Summer           |5,6,7 |New Orleans                 |35.072104705440026|30.00199334398195  |16.794864827586252  |39.02017965517251 |3.3674245983275393 |47.916197388142436|30.444329606314003|3.780804649848726 |0.0     |29          |
|Early Summer           |5,6,7 |Miami                       |34.061678971065085|28.8349238782925   |22.670041904761945  |36.3000366666667  |6.954262358894512  |58.154596877124696|48.33476060724017 |4.438683072408201 |0.0     |21          |
|Early Summer           |5,6,7 |Las Vegas                   |28.930224415985528|29.585398483197082 |8.889546666666718   |51.28879733333332 |0.29037238873751137|38.99650682942779 |12.84123524069028 |4.251871068663942 |0.0     |30          |
|Early Summer           |5,6,7 |Phoenix / Scottsdale        |8.828341656826964 |33.53106707462916  |11.292545277777776  |55.0890052777778  |0.445201332122313  |36.030672734786556|11.744399031183772|3.893489302041704 |0.0     |36          |
|Fall                   |10,11 |Los Angeles Coast           |82.26126337295062 |19.143465542740103 |11.329275571428695  |33.207035714285894|0.5796545667447308 |26.397599531615928|28.535889929742396|3.394390992426815 |1.6E-4  |70          |
|Fall                   |10,11 |San Diego Coast             |77.14535499999987 |18.99092882950822  |10.273522800000023  |34.9915764000001  |0.6877868852459017 |22.71262295081967 |34.05344262295082 |3.165551414683607 |0.0     |25          |
|Fall                   |10,11 |Phoenix / Scottsdale        |67.55244025387108 |19.080388020264195 |5.16328666666675    |37.17733777777778 |0.9747836976320584 |32.21106557377048 |17.867030965391624|3.7215491372085605|0.0     |36          |
|Fall                   |10,11 |New Orleans                 |66.67447120972346 |20.994673595251697 |5.788291724137935   |33.45762275862069 |3.456543244771056  |40.045364612775586|46.88171283210854 |5.718812567163651 |0.0     |29          |
|Fall                   |10,11 |San Francisco Bay Area      |65.5836584136613  |15.128004705464491 |4.417730666666728   |30.504970266666703|1.086912568306011  |30.83426229508197 |31.825136612021847|4.1014530418155735|4.4E-4  |75          |
|Fall                   |10,11 |Miami                       |60.71799652810271 |27.721469432084405 |15.817580000000078  |34.57002904761907 |1.3241608118657298 |31.10284933645589 |33.56733021077283 |5.457027606291764 |0.0     |21          |
|Fall                   |10,11 |Las Vegas                   |46.068832419399044|13.821840908469994 |-0.3497513333333586 |36.78049299999998 |0.1452185792349727 |38.76051912568307 |13.773087431693995|4.306510428008197 |0.00164 |30          |
|Fall                   |10,11 |Seattle / Puget Sound       |28.802133630926317|10.013167131147611 |-0.13314378378373704|25.34349945945945 |3.1597252990695615 |61.618741692512195|61.68453699601239 |5.141573562887573 |2.0E-4  |37          |
|Fall                   |10,11 |Portland / Willamette Valley|26.141521756602994|10.11086843920765  |-1.5116486111110703 |27.839578333333407|3.347620673952642  |54.04371584699454 |54.16882969034607 |5.3844589703239985|6.4E-4  |36          |
|Fall                   |10,11 |Vancouver                   |22.151275017819106|9.276054590163994  |2.343271304347809   |19.821615652173932|8.912508909479687  |65.85620099786175 |62.770313613684955|4.235724615003564 |0.00432 |23          |
|Fall                   |10,11 |Chicago                     |14.187471462151123|8.38664103758498   |-3.547795609756122  |26.02454682926833 |2.2090163934426235 |70.70251899240306 |63.580667732906846|7.8367433477489   |0.08588 |41          |
|Fall                   |10,11 |New York City               |8.315710972645121 |11.637580602086587 |-3.591253272727215  |25.13070018181827 |4.896646795827125  |60.90901639344262 |54.182265275707884|8.212087927151266 |0.19384 |55          |
|Fall                   |10,11 |Boston                      |0.0               |8.262699182279562  |-9.057264761904719  |26.807930952380957|3.5885050741608118 |65.60421545667448 |55.65007806401248 |8.338916739945354 |0.075   |21          |
|Fall                   |10,11 |Denver / Front Range        |0.0               |5.501489242779087  |-11.614429285714266 |34.01414880952382 |0.8140124902419986 |61.883489461358316|32.98877829820453 |4.150118931835481 |0.09696 |42          |
|Late Summer            |8,9   |San Francisco Bay Area      |78.30875940109274 |19.323642155191294 |10.4081458666667    |36.11454746666681 |0.09863387978142077|20.676612021857917|39.375027322404385|4.793583148777319 |1.2E-4  |75          |
|Late Summer            |8,9   |Chicago                     |67.59501320171896 |23.055984289284368 |10.329692682926861  |34.863978780487855|2.4895041983206725 |41.45821671331468 |40.615553778488604|5.809869288940424 |0.0     |41          |
|Late Summer            |8,9   |Portland / Willamette Valley|64.04983545081998 |19.53145956739536  |7.052050277777823   |41.57213611111115 |0.7118055555555555 |35.825022768670316|29.45343806921675 |3.4570917949554874|0.0     |36          |
|Late Summer            |8,9   |Denver / Front Range        |63.544098880757   |21.854133951014887 |4.2673942857143174  |40.73763142857149 |0.5020979703356753 |53.998926619828254|24.425351288056202|3.7554996199902426|0.0     |42          |
|Late Summer            |8,9   |Los Angeles Coast           |63.46463046779824 |25.5025065450821   |17.12462942857144   |39.69468899999998 |0.06548594847775176|23.2179156908665  |33.11042154566745 |2.559896578267565 |0.0     |70          |
|Late Summer            |8,9   |Seattle / Puget Sound       |62.595128789321926|19.28495769273377  |7.295451621621623   |40.64622540540557 |1.1364643331856448 |44.487040319007534|38.75875055383251 |3.547820332633252 |0.0     |37          |
|Late Summer            |8,9   |San Diego Coast             |62.515425852458904|25.1287854721312   |15.36540279999997   |40.03727279999998 |0.1315573770491803 |24.371967213114754|43.65885245901639 |2.341145993368853 |0.0     |25          |
|Late Summer            |8,9   |Vancouver                   |61.085878718816986|17.12099320563084  |8.751992173913038   |32.3494556521739  |3.6994832501781896 |53.47416250890948 |40.80773342836778 |3.05841683521561  |0.0     |23          |
|Late Summer            |8,9   |Boston                      |60.475383813426625|22.874553337236648 |10.541677142857225  |36.40105380952383 |3.130171740827479  |45.77263856362217 |54.47989851678376 |5.910847016366121 |0.0     |21          |
|Late Summer            |8,9   |New York City               |53.554772501550914|24.29320278837571  |15.008871272727447  |34.05159654545457 |4.528576751117734  |51.527943368107316|58.16743666169895 |6.000932873551417 |0.0     |55          |
|Late Summer            |8,9   |Miami                       |37.21132795081948 |29.986971323185116 |25.488982857142787  |35.84957380952375 |4.130269320843093  |50.199258391881344|34.77263856362217 |4.114730698774747 |0.0     |21          |
|Late Summer            |8,9   |Las Vegas                   |31.57596641256785 |30.165631024590255 |14.602295333333359  |49.74296233333342 |0.17568306010928966|34.38360655737704 |5.628005464480875 |3.881807200107924 |0.0     |30          |
|Late Summer            |8,9   |New Orleans                 |27.726474590163754|30.061190486150394 |24.846032413793125  |38.12879551724143 |5.821014697569249  |60.85952515545505 |38.32362916902205 |3.5354297642278127|0.0     |29          |
|Late Summer            |8,9   |Phoenix / Scottsdale        |6.316249412568248 |35.07858279143903  |22.01064416666668   |53.764961944444394|0.569159836065574  |46.07001366120219 |10.722791438979964|3.2285171197928038|0.0     |36          |
|Spring                 |2,3,4 |New Orleans                 |75.72603480160161 |19.214442488843872 |6.202835517241397   |29.88280000000003 |1.987981744421907  |34.21196754563895 |46.182251521298184|6.25494327505781  |0.0     |29          |
|Spring                 |2,3,4 |Los Angeles Coast           |67.32192970546242 |15.854634450840422 |5.759157285714366   |32.67008528571438 |0.7150420168067227 |32.986092436974786|31.40907563025211 |3.7258175199794534|0.0352  |70          |
|Spring                 |2,3,4 |Miami                       |66.6593355924369  |26.03618034873955  |14.9790971428572    |34.39626476190472 |1.3870448179271708 |23.60266106442577 |25.043697478991596|5.747430959560225 |0.0     |21          |
|Spring                 |2,3,4 |San Diego Coast             |63.89257408941175 |16.147180037647047 |4.782679200000018   |34.08803280000001 |0.514              |29.998941176470588|43.66658823529413 |3.338605755732941 |8.0E-5  |25          |
|Spring                 |2,3,4 |Phoenix / Scottsdale        |46.866694205065315|18.73842736029411  |1.3934558333334053  |45.22852444444453 |0.13684640522875818|30.580882352941167|19.878186274509808|3.7140952890197716|3.2E-4  |36          |
|Spring                 |2,3,4 |San Francisco Bay Area      |46.26255494431377 |12.064810163137224 |1.000993600000072   |28.165284933333282|1.8529803921568624 |44.96227450980391 |36.30560784313726 |5.168715563990668 |0.00192 |75          |
|Spring                 |2,3,4 |Las Vegas                   |27.68696998823535 |13.480617956862773 |-3.506648666666649  |41.6889736666667  |0.21823529411764706|38.970882352941175|20.94862745098039 |5.255337115343138 |0.00888 |30          |
|Spring                 |2,3,4 |Chicago                     |0.0               |2.000405583931183  |-18.409008292682984 |18.795235121951293|1.9676470588235293 |67.87632711621234 |52.11592539454806 |7.426830421054519 |0.39528 |41          |
|Spring                 |2,3,4 |Seattle / Puget Sound       |0.0               |7.741653746422799  |-5.608539189189116  |31.68416756756767 |3.955802861685214  |79.36669316375199 |67.88076311605722 |6.003372366403815 |0.08496 |37          |
|Spring                 |2,3,4 |New York City               |0.0               |5.414488437967918  |-6.163292181818235  |21.241486909090952|4.269598930481284  |66.61695187165775 |54.56636363636364 |8.284004699645456 |0.1928  |55          |
|Spring                 |2,3,4 |Portland / Willamette Valley|0.0               |7.53191030310461   |-8.309794444444378  |31.825212222222262|3.540522875816993  |75.3702614379085  |66.64158496732027 |5.709095301198527 |0.13168 |36          |
|Spring                 |2,3,4 |Vancouver                   |0.0               |6.371353310741711  |-4.735428260869583  |24.55640173913048 |5.820140664961637  |79.62020460358056 |65.54322250639386 |4.590978649189259 |0.39872 |23          |
|Spring                 |2,3,4 |Boston                      |0.0               |4.218594557422932  |-9.836204285714302  |20.798897142857186|3.7221988795518217 |67.0672268907563  |54.00252100840335 |8.588929579785715 |0.2516  |21          |
|Spring                 |2,3,4 |Denver / Front Range        |0.0               |5.043801283613391  |-16.69673476190468  |30.87942666666669 |0.725              |54.851050420168065|28.650910364145656|4.98148557092437  |0.23    |42          |
|Winter Break / New Year|12,1  |Miami                       |78.07625905889981 |22.60698234062977  |12.499861904761872  |30.206612857142886|1.769969278033794  |30.741935483870968|43.60349462365592 |7.159790784130182 |0.0     |21          |
|Winter Break / New Year|12,1  |Los Angeles Coast           |65.83647238018449 |14.298108762672825 |5.970274428571486   |27.54496342857135 |1.235483870967742  |34.924135944700446|34.257834101382485|3.7482681401814513|0.01976 |70          |
|Winter Break / New Year|12,1  |San Diego Coast             |64.82845066129059 |14.488365635483945 |4.525967200000025   |28.148825999999985|0.9341129032258065 |32.261129032258076|40.18016129032259 |3.5142669938241933|0.0     |25          |
|Winter Break / New Year|12,1  |Phoenix / Scottsdale        |51.15139809251822 |11.888095666442666 |-1.0716261111110725 |30.444485277777687|0.05157930107526881|34.670250896057354|22.003696236559133|3.420932316322805 |0.00112 |36          |
|Winter Break / New Year|12,1  |San Francisco Bay Area      |45.16968248064529 |10.919329927957051 |3.368869466666638   |20.579259999999977|2.006155913978495  |45.240860215053765|52.85666666666664 |4.7233207515919355|4.8E-4  |75          |
|Winter Break / New Year|12,1  |New Orleans                 |27.59940128508737 |11.367425848164544 |-0.440084482758607  |22.93719724137941 |3.9201890989988883 |42.01738042269188 |54.3615127919911  |6.458136839898499 |0.01056 |29          |
|Winter Break / New Year|12,1  |Las Vegas                   |26.7458974112906  |6.657908212365669  |-4.320478666666645  |24.73723366666661 |0.5116935483870967 |40.034946236559136|27.836021505376337|3.5227938626599458|0.01224 |30          |
|Winter Break / New Year|12,1  |Chicago                     |0.0               |-1.2660238611329646|-19.576156097560954 |8.661305853658632 |1.5816778127458697 |75.00472069236821 |54.67142014162075 |8.473555726898798 |0.09976 |41          |
|Winter Break / New Year|12,1  |Seattle / Puget Sound       |0.0               |5.368115336748076  |-2.877403243243293  |13.775614594594572|5.110015257192676  |82.35287707061902 |74.23964690496949 |7.750997140253924 |0.00788 |37          |
|Winter Break / New Year|12,1  |New York City               |0.0               |1.8344318812315805 |-10.799302363636343 |11.711979636363651|3.10949413489736   |71.43438416422288 |44.73592375366568 |8.07446787323717  |0.31328 |55          |
|Winter Break / New Year|12,1  |Portland / Willamette Valley|0.0               |5.484651470654171  |-3.4774858333333327 |15.192656666666778|5.419298835125448  |79.97838261648744 |75.41028225806451 |8.045170659446686 |0.02308 |36          |
|Winter Break / New Year|12,1  |Vancouver                   |0.0               |4.578253290673217  |-1.0017613043478377 |10.597806086956552|13.025070126227211 |85.19021739130434 |75.64025245441796 |6.913380591456872 |0.204   |23          |
|Winter Break / New Year|12,1  |Boston                      |0.0               |-0.9721785963901084|-17.480080476190523 |14.808346666666637|2.5719086021505375 |69.93951612903227 |40.0583717357911  |8.763074218473504 |0.36264 |21          |
|Winter Break / New Year|12,1  |Denver / Front Range        |0.0               |-1.4821784840628993|-14.899612619047616 |18.068379523809654|0.3289170506912442 |53.10647081413212 |26.278033794162827|4.259061145825652 |0.14872 |42          |
+-----------------------+------+----------------------------+------------------+-------------------+--------------------+------------------+-------------------+------------------+------------------+------------------+--------+------------+

                                                                                

Your Itinerary¶

The itinerary has been designed for travelling in 2019-20, with 2018 data as a base for our trademarked comfort scorer pro max. We will start our travels in Feb, so that we can end the year with a bang with Christmas + New Year. We will divide your journey into 5 time periods.

  • Feb - Apr
  • May - Jul
  • Aug - Sep
  • Oct - Nov
  • Dec - Jan

Spring (Feb - Apr)¶

Spring time in New Orleans would be amazing! Check out one of the beautiful gardens you can visit: Spring2

There are also festivals to attend, like this one: Spring

Early Summer (May - Jul)¶

The longest leg of this year-long journey takes us to Vancouver, Canada!

vc

Late Summer (Aug - Sept)¶

San Francisco Bay Area

There are lots of things to do in San Francisco, and it might be a good idea to visit Seattle, and other surrounding places.

  • San Francisco

sf

Fall (Oct - Nov)¶

Los Angeles coast had the best score for fall. You can enjoy:

  • Santa Monica Beach

beach

  • Venice Beach

beach2

and many more beaches, each with its own charm!

Although Los Angeles coast scored the highest on this leg of the trip, we will also visit other places nearby!

Time to go to San Diego.

sd

Enjoy the festivities in October!

skull

Las Vegas is close too!

  • Las Vegas

lv

Christmas - New Year¶

Time to head to Miami for Christmas and New year celebrations! This party and beach town averages 70F at this time.

miami

Remarks¶

I think it is a little awkward that Late Summer was SF bay area and Fall was LA. If I had to do this differently, maybe I would have enforced a distance too? Something to make sure that places close to each other wouldn't be picked in conscutive travel windows...

LEAST FOGGY (I AM RICH)¶

In [18]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, LongType, DoubleType

from SparkHandler import Sparky, SparkyBatchContext, SparkyAnalysisContext
import util

import math
import gc
import os
import time
import folium
from IPython.display import IFrame, display

# Restore SparkContext in case `sc` got overwritten by matplotlib
sc = spark.sparkContext

# ============================================================
# CONFIG
# ============================================================

INPUT_GLOB = "hdfs:///3hr/*/*_20*{07,08,09,10}*_*"
BATCH_SIZE = 50

PARTIAL_OUT = f"hdfs:///user/{sc.sparkUser()}/least_foggy_bay_area_partials"
RESULT_OUT = None

# Bay Area bounding box
LAT_MIN = 36.8
LAT_MAX = 38.9
LON_MIN = -123.2
LON_MAX = -121.4

# Fog season
FOG_MONTHS = [5, 6, 7, 8, 9]

# 20 -> 0.05 degree cells
CELL_SCALE = 20

MIN_CELL_POINTS = 100
TOP_N = 10
In [19]:
# ============================================================
# SCHEMA
# ============================================================

raw_schema = StructType([
    StructField("time_ms", LongType(), False),
    StructField("lat", DoubleType(), False),
    StructField("lon", DoubleType(), False),
    StructField("cloud", DoubleType(), True),
    StructField("visibility", DoubleType(), True),
])

time_idx = None
lat_idx = None
lon_idx = None
cloud_idx = None
vis_idx = None
In [20]:
# ============================================================
# PARSER
# ============================================================

def parse_fog_line(line):
    try:
        if line.startswith("1_time"):
            return None

        parts = line.rstrip("\t").split("\t")

        time_ms = util.safe_float(parts, time_idx)
        lat = util.safe_float(parts, lat_idx)
        lon = util.safe_float(parts, lon_idx)

        if time_ms is None or lat is None or lon is None:
            return None

        # Early geographic filter
        if not (LAT_MIN <= lat <= LAT_MAX and LON_MIN <= lon <= LON_MAX):
            return None

        cloud = util.safe_float(parts, cloud_idx)
        visibility = util.safe_float(parts, vis_idx)
    
        return (
            int(time_ms),
            lat,
            lon,
            cloud,
            visibility,
        )

    except Exception:
        return None

# ============================================================
# PARTIAL TRANSFORM
# This runs once per 50-file batch
# ============================================================

def partial_transform(points):
    points = (
        points
        .withColumn(
            "timestamp",
            F.from_unixtime((F.col("time_ms") / 1000).cast("long")).cast("timestamp")
        )
        .withColumn("month", F.month("timestamp"))
    )

    if FOG_MONTHS is not None:
        points = points.filter(F.col("month").isin(FOG_MONTHS))

    return (
        points
        .withColumn("lat_i", F.floor(F.col("lat") * CELL_SCALE).cast("int"))
        .withColumn("lon_i", F.floor(F.col("lon") * CELL_SCALE).cast("int"))
        .groupBy("lat_i", "lon_i")
        .agg(
            F.sum("lat").alias("sum_lat"),
            F.sum("lon").alias("sum_lon"),
            F.count("*").alias("count_points"),

            F.sum("cloud").alias("sum_cloud"),
            F.count("cloud").alias("count_cloud"),

            F.sum("visibility").alias("sum_visibility"),
            F.count("visibility").alias("count_visibility"),
        )
    )

# ============================================================
# COMBINE PARTIALS
# This combines all batch parquet files
# ============================================================

def combine_fog_partials(partials):
    combined = (
        partials
        .groupBy("lat_i", "lon_i")
        .agg(
            F.sum("sum_lat").alias("sum_lat"),
            F.sum("sum_lon").alias("sum_lon"),
            F.sum("count_points").alias("cell_points"),

            F.sum("sum_cloud").alias("sum_cloud"),
            F.sum("count_cloud").alias("count_cloud"),

            F.sum("sum_visibility").alias("sum_visibility"),
            F.sum("count_visibility").alias("count_visibility"),
        )
    )

    return (
        combined
        .withColumn("lat", F.col("sum_lat") / F.col("cell_points"))
        .withColumn("lon", F.col("sum_lon") / F.col("cell_points"))

        .withColumn(
            "avg_cloud",
            F.when(F.col("count_cloud") > 0, F.col("sum_cloud") / F.col("count_cloud"))
        )
        .withColumn(
            "avg_visibility",
            F.when(
                F.col("count_visibility") > 0,
                F.col("sum_visibility") / F.col("count_visibility")
            )
        )

        .select(
            "lat_i",
            "lon_i",
            "lat",
            "lon",
            "cell_points",
            "avg_cloud",
            "avg_visibility",
        )
    )
In [21]:
# ============================================================
# ANALYZE
# Builds fog_score and clarity_score
# ============================================================

def clamp01(col):
    return F.least(F.greatest(col, F.lit(0.0)), F.lit(1.0))

def analyze_least_foggy(cell_stats):
    scored = (
        cell_stats
        .withColumn("avg_cloud_filled", F.coalesce(F.col("avg_cloud"), F.lit(40.0)))
        .withColumn("avg_visibility_filled", F.coalesce(F.col("avg_visibility"), F.lit(20000.0)))
    )

    cloud_term = clamp01(F.col("avg_cloud_filled") / F.lit(100.0))
    vis_term = clamp01((F.lit(20000.0) - F.col("avg_visibility_filled")) / F.lit(20000.0))

    if vis_idx is not None:
        scored = scored.withColumn(
            "fog_score",
            F.lit(0.30) * cloud_term +
            F.lit(0.20) * vis_term
        )
    else:
        scored = scored.withColumn(
            "fog_score", cloud_term
        )

    return (
        scored
        .withColumn("clarity_score", F.lit(100.0) * (F.lit(1.0) - F.col("fog_score")))
        .filter(F.col("cell_points") >= MIN_CELL_POINTS)
        .orderBy(
            F.desc("clarity_score"),
            F.asc("avg_cloud_filled"),
        )
    )

# ============================================================
# SETUP SPARKY
# ============================================================

sparky = Sparky(spark, sc)

batch_ctx = SparkyBatchContext(
    glob_path=INPUT_GLOB,
    partial_out=PARTIAL_OUT,
    schema=raw_schema,
    parse_line=parse_fog_line,
    partial_transform=partial_transform,
    batch_size=BATCH_SIZE,
)

least_foggy_runtime_start = time.perf_counter()
input_paths = sparky.setup_job(batch_ctx)

print("Total input files:", len(input_paths))
print("Batch size:", BATCH_SIZE)
print("Number of batches:", (len(input_paths) + BATCH_SIZE - 1) // BATCH_SIZE)

if len(input_paths) == 0:
    raise ValueError(f"No files matched: {INPUT_GLOB}")

# ============================================================
# COLUMN DISCOVERY FROM sparky.columns
# ============================================================

def required_idx(name):
    key = name.strip().lower()
    if key not in sparky.columns:
        raise ValueError(f"Missing required column: {name}")
    return sparky.columns[key]

def optional_idx(name):
    key = name.strip().lower()
    return sparky.columns.get(key)

def optional_idx_contains(*tokens):
    tokens = [t.strip().lower() for t in tokens]

    for col_name, idx in sparky.columns.items():
        if all(token in col_name for token in tokens):
            return idx

    return None

time_idx = required_idx("time")
lat_idx = required_idx("lat")
lon_idx = required_idx("lon")

cloud_idx = optional_idx("total_cloud_cover_entire_atmosphere_single_layer")
vis_idx = optional_idx("visibility_surface")

print("Using column indices:")
print("time:", time_idx)
print("lat:", lat_idx)
print("lon:", lon_idx)
print("cloud cover:", cloud_idx)
print("visibility:", vis_idx)
Deleted old partial output: hdfs:/user/rsbhatia2/least_foggy_bay_area_partials
Found 3194 files
[Stage 848:>                                                        (0 + 1) / 1]
Total input files: 3194
Batch size: 50
Number of batches: 64
Using column indices:
time: 0
lat: 1
lon: 2
cloud cover: 12
visibility: 15
                                                                                
In [22]:
# ============================================================
# RUN BATCHES THROUGH SPARKY
# ============================================================

for batch_id, start in enumerate(range(0, len(input_paths), BATCH_SIZE)):
    batch_paths = input_paths[start:start + BATCH_SIZE]
    sparky.process_batch(batch_ctx, batch_paths, batch_id)
Processing batch 0: 50 files
                                                                                
Processing batch 1: 50 files
                                                                                
Processing batch 2: 50 files
                                                                                
Processing batch 3: 50 files
                                                                                
Processing batch 4: 50 files
                                                                                
Processing batch 5: 50 files
                                                                                
Processing batch 6: 50 files
                                                                                
Processing batch 7: 50 files
                                                                                
Processing batch 8: 50 files
                                                                                
Processing batch 9: 50 files
                                                                                
Processing batch 10: 50 files
                                                                                
Processing batch 11: 50 files
                                                                                
Processing batch 12: 50 files
                                                                                
Processing batch 13: 50 files
                                                                                
Processing batch 14: 50 files
                                                                                
Processing batch 15: 50 files
                                                                                
Processing batch 16: 50 files
                                                                                
Processing batch 17: 50 files
                                                                                
Processing batch 18: 50 files
                                                                                
Processing batch 19: 50 files
                                                                                
Processing batch 20: 50 files
                                                                                
Processing batch 21: 50 files
                                                                                
Processing batch 22: 50 files
                                                                                
Processing batch 23: 50 files
                                                                                
Processing batch 24: 50 files
                                                                                
Processing batch 25: 50 files
                                                                                
Processing batch 26: 50 files
                                                                                
Processing batch 27: 50 files
                                                                                
Processing batch 28: 50 files
                                                                                
Processing batch 29: 50 files
                                                                                
Processing batch 30: 50 files
                                                                                
Processing batch 31: 50 files
                                                                                
Processing batch 32: 50 files
                                                                                
Processing batch 33: 50 files
                                                                                
Processing batch 34: 50 files
                                                                                
Processing batch 35: 50 files
                                                                                
Processing batch 36: 50 files
                                                                                
Processing batch 37: 50 files
                                                                                
Processing batch 38: 50 files
                                                                                
Processing batch 39: 50 files
                                                                                
Processing batch 40: 50 files
                                                                                
Processing batch 41: 50 files
                                                                                
Processing batch 42: 50 files
                                                                                
Processing batch 43: 50 files
                                                                                
Processing batch 44: 50 files
                                                                                
Processing batch 45: 50 files
                                                                                
Processing batch 46: 50 files
                                                                                
Processing batch 47: 50 files
                                                                                
Processing batch 48: 50 files
                                                                                
Processing batch 49: 50 files
                                                                                
Processing batch 50: 50 files
                                                                                
Processing batch 51: 50 files
                                                                                
Processing batch 52: 50 files
                                                                                
Processing batch 53: 50 files
                                                                                
Processing batch 54: 50 files
                                                                                
Processing batch 55: 50 files
                                                                                
Processing batch 56: 50 files
                                                                                
Processing batch 57: 50 files
                                                                                
Processing batch 58: 50 files
                                                                                
Processing batch 59: 50 files
                                                                                
Processing batch 60: 50 files
                                                                                
Processing batch 61: 50 files
                                                                                
Processing batch 62: 50 files
                                                                                
Processing batch 63: 44 files
                                                                                
In [23]:
# ============================================================
# COMBINE + ANALYZE THROUGH SPARKY
# ============================================================

analysis_ctx = SparkyAnalysisContext(
    partial_in=PARTIAL_OUT,
    combine_partials=combine_fog_partials,
    analyze=analyze_least_foggy,
    result_out=RESULT_OUT,
)

cell_stats = sparky.combine(analysis_ctx)
least_foggy = sparky.analyze(analysis_ctx, cell_stats)
In [24]:
# ============================================================
# SHOW RESULTS
# ============================================================

print("Top least-foggy Bay Area cells:")

top_pdf = (
    least_foggy
    .select(
        "lat",
        "lon",
        "clarity_score",
        "fog_score",
        "avg_cloud",
        "avg_visibility",
        "cell_points",
    )
    .limit(TOP_N)
    .toPandas()
)

display(top_pdf)

least_foggy_runtime = time.perf_counter() - least_foggy_runtime_start
print(f"Total Least Foggy Spark runtime: {least_foggy_runtime:.2f} seconds")
Top least-foggy Bay Area cells:
                                                                                
lat lon clarity_score fog_score avg_cloud avg_visibility cell_points
0 38.877293 -122.166343 98.172023 0.018280 6.093256 24106.990715 1892
1 38.850548 -122.303162 98.143547 0.018565 6.188177 24084.512906 1201
2 38.773071 -122.139185 98.137302 0.018627 6.208993 24103.445741 1892
3 38.788658 -122.008523 98.038884 0.019611 6.537052 24080.099917 1201
4 38.689884 -121.978465 98.010408 0.019896 6.631973 24102.070079 1892
5 38.892870 -122.035550 98.001166 0.019988 6.662781 24054.870941 1201
6 38.746358 -122.275882 97.996919 0.020031 6.676936 24090.507910 1201
7 38.668795 -122.112081 97.958951 0.020410 6.803497 24092.757910 1892
8 37.580908 -121.420552 97.957952 0.020420 6.806828 24069.001296 1892
9 38.834758 -122.433918 97.953455 0.020465 6.821815 24105.667963 1892
Total Least Foggy Spark runtime: 896.84 seconds
In [25]:
# ============================================================
# REAL MAP: TOP LAT/LON POINTS ONLY
# ============================================================

def fmt(value, digits=2):
    if value is None:
        return "N/A"
    try:
        if math.isnan(value):
            return "N/A"
    except TypeError:
        pass
    return f"{value:.{digits}f}"

center_lat = top_pdf["lat"].mean() if len(top_pdf) > 0 else 37.7
center_lon = top_pdf["lon"].mean() if len(top_pdf) > 0 else -122.2

m = folium.Map(
    location=[center_lat, center_lon],
    zoom_start=9,
    tiles=None,
)

folium.TileLayer(
    tiles="https://{s}.basemaps.cartocdn.com/light_all/{z}/{x}/{y}{r}.png",
    attr='&copy; <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors &copy; <a href="https://carto.com/attributions">CARTO</a>',
    name="CartoDB Positron",
    control=False,
).add_to(m)

for i, row in top_pdf.reset_index(drop=True).iterrows():
    rank = i + 1

    popup_html = f"""
    <b>Rank {rank}: Least-foggy candidate</b><br>
    Latitude: {row['lat']:.4f}<br>
    Longitude: {row['lon']:.4f}<br>
    Clarity score: {fmt(row['clarity_score'])}<br>
    Fog score: {fmt(row['fog_score'], 3)}<br>
    Avg cloud cover: {fmt(row['avg_cloud'])}<br>
    Avg visibility: {fmt(row['avg_visibility'])}<br>
    Cell points: {int(row['cell_points'])}
    """

    folium.Marker(
        location=[row["lat"], row["lon"]],
        popup=folium.Popup(popup_html, max_width=350),
        tooltip=f"#{rank}: clarity {row['clarity_score']:.2f}",
        icon=folium.DivIcon(
            html=f"""
            <div style="
                background-color: black;
                color: white;
                border-radius: 50%;
                width: 30px;
                height: 30px;
                text-align: center;
                line-height: 30px;
                font-weight: bold;
                border: 2px solid white;
                box-shadow: 0 0 4px gray;
            ">
                {rank}
            </div>
            """
        ),
    ).add_to(m)

if len(top_pdf) > 0:
    bounds = [
        [top_pdf["lat"].min(), top_pdf["lon"].min()],
        [top_pdf["lat"].max(), top_pdf["lon"].max()],
    ]
    m.fit_bounds(bounds)

html_path = "least_foggy_bay_area_points.html"
m.save(html_path)

print("Saved map to:", html_path)
display(IFrame(src=html_path, width=950, height=700))
Saved map to: least_foggy_bay_area_points.html

YOLO¶

Once I am rich and famous and rich and famous and rich, I am going to live in YOLO county, CA. It is the best place for a fogphobic trillionaire like me.
yolo

SolarWind Inc¶

In [26]:
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField,
    StringType, DoubleType
)

from SparkHandler import Sparky, SparkyBatchContext, SparkyAnalysisContext
import util

import os
import time
import math
import folium
from IPython.display import display, IFrame

sc = spark.sparkContext

# ============================================================
# CONFIG
# ============================================================

INPUT_GLOB = "hdfs:///3hr/*/*"
BATCH_SIZE = 50

PARTIAL_OUT = f"hdfs:///user/{sc.sparkUser()}/solarwind_partials_vegetation_land_proxy"
RESULT_OUT = None

GEOHASH_PRECISION = 4

FILE_STRIDE = 24

MIN_GEOHASH_POINTS = 200
TOP_N = 3

MIN_AVG_VEGETATION = 1.0

LAT_COLUMN = "lat"
LON_COLUMN = "lon"
CLOUD_COLUMN = "total_cloud_cover_entire_atmosphere_single_layer"
PRECIP_COLUMN = "total_precipitation_surface_3_hour_accumulation"
WIND_GUST_COLUMN = "wind_speed_gust_surface"
TEMP_COLUMN = "temperature_surface"
VEGETATION_COLUMN = "vegetation_surface"

# ============================================================
# GEOHASH ENCODER
# ============================================================

_BASE32 = "0123456789bcdefghjkmnpqrstuvwxyz"

def encode_geohash(lat, lon, precision=4):
    lat_range = [-90.0, 90.0]
    lon_range = [-180.0, 180.0]

    bits = [16, 8, 4, 2, 1]
    bit = 0
    ch = 0
    even = True
    geohash = []

    while len(geohash) < precision:
        if even:
            mid = (lon_range[0] + lon_range[1]) / 2.0
            if lon >= mid:
                ch |= bits[bit]
                lon_range[0] = mid
            else:
                lon_range[1] = mid
        else:
            mid = (lat_range[0] + lat_range[1]) / 2.0
            if lat >= mid:
                ch |= bits[bit]
                lat_range[0] = mid
            else:
                lat_range[1] = mid

        even = not even

        if bit < 4:
            bit += 1
        else:
            geohash.append(_BASE32[ch])
            bit = 0
            ch = 0

    return "".join(geohash)

# ============================================================
# SCHEMA 
# ============================================================

raw_schema = StructType([
    StructField("geohash", StringType(), False),
    StructField("lat", DoubleType(), False),
    StructField("lon", DoubleType(), False),
    StructField("cloud", DoubleType(), False),
    StructField("precip", DoubleType(), False),
    StructField("wind_speed", DoubleType(), False),
    StructField("wind_power_proxy", DoubleType(), False),
    StructField("temp_k", DoubleType(), False),
    StructField("vegetation", DoubleType(), True),
])

lat_idx = None
lon_idx = None
cloud_idx = None
precip_idx = None
wind_gust_idx = None
temp_idx = None
vegetation_idx = None

# ============================================================
# PARSER
# ============================================================

def parse_solarwind_line(line):
    try:
        if line.startswith("1_time"):
            return None

        parts = line.rstrip("\t").split("\t")

        lat = util.safe_float(parts, lat_idx)
        lon = util.safe_float(parts, lon_idx)
        cloud = util.safe_float(parts, cloud_idx)
        precip = util.safe_float(parts, precip_idx)
        wind_speed = util.safe_float(parts, wind_gust_idx)
        temp_k = util.safe_float(parts, temp_idx)
        vegetation = util.safe_float(parts, vegetation_idx)

        if (
            lat is None
            or lon is None
            or cloud is None
            or precip is None
            or wind_speed is None
            or temp_k is None
        ):
            return None

        geohash = encode_geohash(lat, lon, GEOHASH_PRECISION)

        # Wind power is approximately proportional to wind speed cubed.
        wind_power_proxy = wind_speed ** 3

        return (
            geohash,
            lat,
            lon,
            cloud,
            precip,
            wind_speed,
            wind_power_proxy,
            temp_k,
            vegetation,
        )

    except Exception:
        return None

# ============================================================
# PARTIAL TRANSFORM
# Runs once per batch
# ============================================================

def partial_transform(points):
    return (
        points
        .groupBy("geohash")
        .agg(
            F.sum("lat").alias("sum_lat"),
            F.sum("lon").alias("sum_lon"),
            F.count("*").alias("count_points"),

            F.sum("cloud").alias("sum_cloud"),
            F.count("cloud").alias("count_cloud"),

            F.sum("precip").alias("sum_precip"),
            F.count("precip").alias("count_precip"),

            F.sum("wind_speed").alias("sum_wind_speed"),
            F.sum("wind_power_proxy").alias("sum_wind_power_proxy"),
            F.max("wind_speed").alias("max_wind_speed"),
            F.count("wind_speed").alias("count_wind"),

            F.sum("temp_k").alias("sum_temp_k"),
            F.count("temp_k").alias("count_temp"),

            F.sum("vegetation").alias("sum_vegetation"),
            F.count("vegetation").alias("count_vegetation"),
        )
    )

# ============================================================
# COMBINE PARTIALS
# Combines batch parquet files into geohash-level rows
# ============================================================

def combine_solarwind_partials(partials):
    combined = (
        partials
        .groupBy("geohash")
        .agg(
            F.sum("sum_lat").alias("sum_lat"),
            F.sum("sum_lon").alias("sum_lon"),
            F.sum("count_points").alias("points"),

            F.sum("sum_cloud").alias("sum_cloud"),
            F.sum("count_cloud").alias("count_cloud"),

            F.sum("sum_precip").alias("sum_precip"),
            F.sum("count_precip").alias("count_precip"),

            F.sum("sum_wind_speed").alias("sum_wind_speed"),
            F.sum("sum_wind_power_proxy").alias("sum_wind_power_proxy"),
            F.max("max_wind_speed").alias("max_wind_speed"),
            F.sum("count_wind").alias("count_wind"),

            F.sum("sum_temp_k").alias("sum_temp_k"),
            F.sum("count_temp").alias("count_temp"),

            F.sum("sum_vegetation").alias("sum_vegetation"),
            F.sum("count_vegetation").alias("count_vegetation"),
        )
    )

    return (
        combined
        .withColumn("lat", F.col("sum_lat") / F.col("points"))
        .withColumn("lon", F.col("sum_lon") / F.col("points"))

        .withColumn("avg_cloud_cover", F.col("sum_cloud") / F.col("count_cloud"))

        # NAM total precipitation field is a 3-hour accumulation.
        .withColumn("avg_precip_3hr", F.col("sum_precip") / F.col("count_precip"))
        .withColumn("avg_daily_precip", F.col("avg_precip_3hr") * F.lit(8.0))

        .withColumn("avg_wind_speed", F.col("sum_wind_speed") / F.col("count_wind"))
        .withColumn("avg_wind_power_proxy", F.col("sum_wind_power_proxy") / F.col("count_wind"))

        .withColumn("avg_temp_k", F.col("sum_temp_k") / F.col("count_temp"))
        .withColumn("avg_temp_c", F.col("avg_temp_k") - F.lit(273.15))

        .withColumn(
            "avg_vegetation",
            F.when(
                F.col("count_vegetation") > 0,
                F.col("sum_vegetation") / F.col("count_vegetation")
            )
        )

        .filter(F.col("points") >= MIN_GEOHASH_POINTS)

        # Approximate land filter.
        # This removes open-ocean-like geohashes using vegetation_surface.
        .filter(F.col("avg_vegetation") >= MIN_AVG_VEGETATION)

        .select(
            "geohash",
            "lat",
            "lon",
            "points",
            "avg_vegetation",
            "avg_cloud_cover",
            "avg_precip_3hr",
            "avg_daily_precip",
            "avg_wind_speed",
            "avg_wind_power_proxy",
            "max_wind_speed",
            "avg_temp_c",
        )
    )

# ============================================================
# ANALYZE / SCORE
# ============================================================

def norm_expr(col_name, min_value, max_value):
    if min_value is None or max_value is None or max_value == min_value:
        return F.lit(50.0)

    return (
        (F.col(col_name) - F.lit(float(min_value))) /
        (F.lit(float(max_value)) - F.lit(float(min_value))) *
        F.lit(100.0)
    )

def analyze_solarwind(geohash_stats):
    ranges = geohash_stats.agg(
        F.min("avg_cloud_cover").alias("cloud_min"),
        F.max("avg_cloud_cover").alias("cloud_max"),

        F.min("avg_daily_precip").alias("precip_min"),
        F.max("avg_daily_precip").alias("precip_max"),

        F.min("avg_wind_power_proxy").alias("wind_power_min"),
        F.max("avg_wind_power_proxy").alias("wind_power_max"),
    ).first().asDict()

    scored = (
        geohash_stats
        .withColumn(
            "cloud_norm",
            norm_expr("avg_cloud_cover", ranges["cloud_min"], ranges["cloud_max"])
        )
        .withColumn(
            "precip_norm",
            norm_expr("avg_daily_precip", ranges["precip_min"], ranges["precip_max"])
        )
        .withColumn(
            "wind_norm",
            norm_expr("avg_wind_power_proxy", ranges["wind_power_min"], ranges["wind_power_max"])
        )

        # Solar proxy:
        # lower cloud cover + lower precipitation = better solar farm candidate.
        .withColumn(
            "solar_score",
            (
                F.lit(0.75) * (F.lit(100.0) - F.col("cloud_norm")) +
                F.lit(0.25) * (F.lit(100.0) - F.col("precip_norm"))
            )
        )

        .withColumn("wind_score", F.col("wind_norm"))

        .withColumn(
            "combined_score",
            F.lit(0.50) * F.col("solar_score") + F.lit(0.50) * F.col("wind_score")
        )
    )

    base_cols = [
        "geohash",
        "lat",
        "lon",
        "points",
        "avg_vegetation",
        "solar_score",
        "wind_score",
        "combined_score",
        "avg_cloud_cover",
        "avg_daily_precip",
        "avg_wind_speed",
        "avg_wind_power_proxy",
        "max_wind_speed",
        "avg_temp_c",
    ]

    solar_top3 = (
        scored
        .orderBy(F.desc("solar_score"))
        .limit(TOP_N)
        .select(*base_cols)
        .withColumn("category", F.lit("Solar"))
        .withColumn("category_score", F.col("solar_score"))
    )

    wind_top3 = (
        scored
        .orderBy(F.desc("wind_score"))
        .limit(TOP_N)
        .select(*base_cols)
        .withColumn("category", F.lit("Wind"))
        .withColumn("category_score", F.col("wind_score"))
    )

    combined_top3 = (
        scored
        .orderBy(F.desc("combined_score"))
        .limit(TOP_N)
        .select(*base_cols)
        .withColumn("category", F.lit("Solar + Wind"))
        .withColumn("category_score", F.col("combined_score"))
    )

    return (
        solar_top3
        .unionByName(wind_top3)
        .unionByName(combined_top3)
        .select(
            "category",
            "geohash",
            "lat",
            "lon",
            "category_score",
            "solar_score",
            "wind_score",
            "combined_score",
            "avg_vegetation",
            "avg_cloud_cover",
            "avg_daily_precip",
            "avg_wind_speed",
            "avg_wind_power_proxy",
            "max_wind_speed",
            "avg_temp_c",
            "points",
        )
    )

# ============================================================
# SETUP SPARKY
# ============================================================

sparky = Sparky(spark, sc)

batch_ctx = SparkyBatchContext(
    glob_path=INPUT_GLOB,
    partial_out=PARTIAL_OUT,
    schema=raw_schema,
    parse_line=parse_solarwind_line,
    partial_transform=partial_transform,
    batch_size=BATCH_SIZE,
)

input_paths = sparky.setup_job(batch_ctx)

input_paths = input_paths[::FILE_STRIDE]

print("Total files after stride:", len(input_paths))
print("Batch size:", BATCH_SIZE)
print("Number of batches:", (len(input_paths) + BATCH_SIZE - 1) // BATCH_SIZE)

if len(input_paths) == 0:
    raise ValueError(f"No files matched: {INPUT_GLOB}")


def required_idx(name):
    key = name.strip().lower()
    if key not in sparky.columns:
        print("Available related columns:")
        for col_name, idx in sparky.columns.items():
            if name.split("_")[0] in col_name:
                print(idx, col_name)
        raise ValueError(f"Missing required column: {name}")
    return sparky.columns[key]

lat_idx = required_idx(LAT_COLUMN)
lon_idx = required_idx(LON_COLUMN)
cloud_idx = required_idx(CLOUD_COLUMN)
precip_idx = required_idx(PRECIP_COLUMN)
wind_gust_idx = required_idx(WIND_GUST_COLUMN)
temp_idx = required_idx(TEMP_COLUMN)
vegetation_idx = required_idx(VEGETATION_COLUMN)

print("Using column indices:")
print("lat:", lat_idx)
print("lon:", lon_idx)
print("cloud:", cloud_idx)
print("precip:", precip_idx)
print("wind gust:", wind_gust_idx)
print("temp:", temp_idx)
print("vegetation:", vegetation_idx)

# ============================================================
# RUN BATCHES 
# ============================================================

overall_start = time.perf_counter()

for batch_id, start in enumerate(range(0, len(input_paths), BATCH_SIZE)):
    batch_paths = input_paths[start:start + BATCH_SIZE]
    sparky.process_batch(batch_ctx, batch_paths, batch_id)

overall_elapsed = time.perf_counter() - overall_start
print(f"Total SolarWind batch runtime: {overall_elapsed:.2f} seconds")

# ============================================================
# COMBINE + ANALYZE
# ============================================================

analysis_ctx = SparkyAnalysisContext(
    partial_in=PARTIAL_OUT,
    combine_partials=combine_solarwind_partials,
    analyze=analyze_solarwind,
    result_out=RESULT_OUT,
)

geohash_stats = sparky.combine(analysis_ctx)
top9 = sparky.analyze(analysis_ctx, geohash_stats)


display_cols = [
    "category",
    "geohash",
    "lat",
    "lon",
    "category_score",
    "solar_score",
    "wind_score",
    "combined_score",
    "avg_vegetation",
    "avg_cloud_cover",
    "avg_daily_precip",
    "avg_wind_speed",
    "avg_wind_power_proxy",
    "max_wind_speed",
    "avg_temp_c",
    "points",
]

top9_pdf = top9.select(*display_cols).toPandas()

for col in [
    "lat",
    "lon",
    "category_score",
    "solar_score",
    "wind_score",
    "combined_score",
    "avg_vegetation",
    "avg_cloud_cover",
    "avg_daily_precip",
    "avg_wind_speed",
    "avg_wind_power_proxy",
    "max_wind_speed",
    "avg_temp_c",
]:
    top9_pdf[col] = top9_pdf[col].round(3)

print("Top 3 solar, top 3 wind, top 3 solar+wind:")
display(top9_pdf)

print("Solar top 3:")
display(top9_pdf[top9_pdf["category"] == "Solar"].reset_index(drop=True))

print("Wind top 3:")
display(top9_pdf[top9_pdf["category"] == "Wind"].reset_index(drop=True))

print("Solar + Wind top 3:")
display(top9_pdf[top9_pdf["category"] == "Solar + Wind"].reset_index(drop=True))

# ============================================================
# SAVE CSV + INTERACTIVE MAP
# ============================================================

csv_path = "solarwind_top9.csv"
top9_pdf.to_csv(csv_path, index=False)
print("Saved CSV:", csv_path)

def fmt(value, digits=2):
    if value is None:
        return "N/A"
    try:
        if math.isnan(value):
            return "N/A"
    except TypeError:
        pass
    return f"{value:.{digits}f}"

center_lat = top9_pdf["lat"].mean() if len(top9_pdf) > 0 else 39.0
center_lon = top9_pdf["lon"].mean() if len(top9_pdf) > 0 else -98.0

m = folium.Map(
    location=[center_lat, center_lon],
    zoom_start=4,
    tiles=None,
)

folium.TileLayer(
    tiles="https://{s}.basemaps.cartocdn.com/light_all/{z}/{x}/{y}{r}.png",
    attr='&copy; <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors &copy; <a href="https://carto.com/attributions">CARTO</a>',
    name="CartoDB Positron",
    control=False,
).add_to(m)

category_colors = {
    "Solar": "orange",
    "Wind": "blue",
    "Solar + Wind": "green",
}

category_labels = {
    "Solar": "S",
    "Wind": "W",
    "Solar + Wind": "B",
}

category_offsets = {
    "Solar": (0.00, -0.08),
    "Wind": (0.00, 0.08),
    "Solar + Wind": (0.08, 0.00),
}

for i, row in top9_pdf.reset_index(drop=True).iterrows():
    rank = i + 1
    category = row["category"]

    lat_offset, lon_offset = category_offsets.get(category, (0.0, 0.0))
    marker_lat = row["lat"] + lat_offset
    marker_lon = row["lon"] + lon_offset

    popup_html = f"""
    <b>{category} candidate #{rank}</b><br>
    Geohash: <b>{row['geohash']}</b><br>
    Latitude: {row['lat']:.4f}<br>
    Longitude: {row['lon']:.4f}<br>
    Category score: {fmt(row['category_score'])}<br>
    Solar score: {fmt(row['solar_score'])}<br>
    Wind score: {fmt(row['wind_score'])}<br>
    Combined score: {fmt(row['combined_score'])}<br>
    Avg vegetation: {fmt(row['avg_vegetation'])}<br>
    Avg cloud cover: {fmt(row['avg_cloud_cover'])}<br>
    Avg daily precip: {fmt(row['avg_daily_precip'], 3)}<br>
    Avg wind speed: {fmt(row['avg_wind_speed'], 3)}<br>
    Avg wind power proxy: {fmt(row['avg_wind_power_proxy'], 3)}<br>
    Max wind speed: {fmt(row['max_wind_speed'], 3)}<br>
    Avg temp C: {fmt(row['avg_temp_c'])}<br>
    Points: {int(row['points'])}
    """

    marker_html = f"""
    <div style="
        background-color: {category_colors.get(category, 'black')};
        color: white;
        border-radius: 50%;
        width: 32px;
        height: 32px;
        text-align: center;
        line-height: 32px;
        font-weight: bold;
        border: 2px solid white;
        box-shadow: 0 0 5px gray;
        font-size: 13px;
    ">
        {category_labels.get(category, '?')}{rank}
    </div>
    """

    folium.Marker(
        location=[marker_lat, marker_lon],
        popup=folium.Popup(popup_html, max_width=400),
        tooltip=f"{category}: {row['geohash']} | score {row['category_score']:.2f}",
        icon=folium.DivIcon(html=marker_html),
    ).add_to(m)

legend_html = """
<div style="
    position: fixed;
    bottom: 30px;
    left: 30px;
    z-index: 9999;
    background-color: white;
    padding: 10px 12px;
    border: 2px solid #999;
    border-radius: 8px;
    font-size: 14px;
    box-shadow: 0 0 5px gray;
">
<b>SolarWind Categories</b><br>
<span style="color: orange; font-weight: bold;">●</span> Solar<br>
<span style="color: blue; font-weight: bold;">●</span> Wind<br>
<span style="color: green; font-weight: bold;">●</span> Solar + Wind<br>
</div>
"""

m.get_root().html.add_child(folium.Element(legend_html))

if len(top9_pdf) > 0:
    bounds = [
        [top9_pdf["lat"].min(), top9_pdf["lon"].min()],
        [top9_pdf["lat"].max(), top9_pdf["lon"].max()],
    ]
    m.fit_bounds(bounds)

html_path = "solarwind_top9_map.html"
m.save(html_path)

print("Saved interactive map to:", html_path)
display(IFrame(src=html_path, width=950, height=700))
Deleted old partial output: hdfs:/user/rsbhatia2/solarwind_partials_vegetation_land_proxy
Found 6770 files
Total files after stride: 283
Batch size: 50
Number of batches: 6
Using column indices:
lat: 1
lon: 2
cloud: 12
precip: 13
wind gust: 17
temp: 10
vegetation: 14
Processing batch 0: 50 files
                                                                                
Processing batch 1: 50 files
                                                                                
Processing batch 2: 50 files
                                                                                
Processing batch 3: 50 files
                                                                                
Processing batch 4: 50 files
                                                                                
Processing batch 5: 33 files
                                                                                
Total SolarWind batch runtime: 177.01 seconds
[Stage 1051:=============>(14 + 1) / 15][Stage 1053:=============>(14 + 1) / 15]
Top 3 solar, top 3 wind, top 3 solar+wind:
                                                                                
category geohash lat lon category_score solar_score wind_score combined_score avg_vegetation avg_cloud_cover avg_daily_precip avg_wind_speed avg_wind_power_proxy max_wind_speed avg_temp_c points
0 Solar 9mve 32.956 -115.867 99.997 99.997 6.541 53.269 15.324 9.372 0.003 3.986 164.238 16.208 18.479 984
1 Solar 9mvd 32.772 -115.800 99.699 99.699 12.526 56.112 15.594 9.663 0.001 4.578 310.271 18.808 18.547 656
2 Solar 9mqr 30.840 -114.777 99.673 99.673 5.519 52.596 1.000 9.427 0.242 3.982 139.307 16.534 20.138 328
3 Wind 9xmp 42.097 -105.316 100.000 84.474 100.000 92.237 22.888 23.421 0.884 8.803 2444.358 30.145 6.011 656
4 Wind f8kf 46.860 -60.592 99.185 40.757 99.185 69.971 43.327 62.215 4.077 10.095 2424.465 33.301 6.727 492
5 Wind f8kc 46.667 -60.645 97.334 35.933 97.334 66.634 43.839 65.168 5.654 10.100 2379.319 32.601 5.982 820
6 Solar + Wind 9xmp 42.097 -105.316 92.237 84.474 100.000 92.237 22.888 23.421 0.884 8.803 2444.358 30.145 6.011 656
7 Solar + Wind 9xmn 41.907 -105.273 90.685 85.231 96.139 90.685 19.524 22.720 0.857 8.905 2350.160 30.245 6.323 492
8 Solar + Wind 9xm4 41.229 -105.333 87.434 84.414 90.454 87.434 22.397 23.637 0.739 9.239 2211.469 28.337 4.174 656
Solar top 3:
category geohash lat lon category_score solar_score wind_score combined_score avg_vegetation avg_cloud_cover avg_daily_precip avg_wind_speed avg_wind_power_proxy max_wind_speed avg_temp_c points
0 Solar 9mve 32.956 -115.867 99.997 99.997 6.541 53.269 15.324 9.372 0.003 3.986 164.238 16.208 18.479 984
1 Solar 9mvd 32.772 -115.800 99.699 99.699 12.526 56.112 15.594 9.663 0.001 4.578 310.271 18.808 18.547 656
2 Solar 9mqr 30.840 -114.777 99.673 99.673 5.519 52.596 1.000 9.427 0.242 3.982 139.307 16.534 20.138 328
Wind top 3:
category geohash lat lon category_score solar_score wind_score combined_score avg_vegetation avg_cloud_cover avg_daily_precip avg_wind_speed avg_wind_power_proxy max_wind_speed avg_temp_c points
0 Wind 9xmp 42.097 -105.316 100.000 84.474 100.000 92.237 22.888 23.421 0.884 8.803 2444.358 30.145 6.011 656
1 Wind f8kf 46.860 -60.592 99.185 40.757 99.185 69.971 43.327 62.215 4.077 10.095 2424.465 33.301 6.727 492
2 Wind f8kc 46.667 -60.645 97.334 35.933 97.334 66.634 43.839 65.168 5.654 10.100 2379.319 32.601 5.982 820
Solar + Wind top 3:
category geohash lat lon category_score solar_score wind_score combined_score avg_vegetation avg_cloud_cover avg_daily_precip avg_wind_speed avg_wind_power_proxy max_wind_speed avg_temp_c points
0 Solar + Wind 9xmp 42.097 -105.316 92.237 84.474 100.000 92.237 22.888 23.421 0.884 8.803 2444.358 30.145 6.011 656
1 Solar + Wind 9xmn 41.907 -105.273 90.685 85.231 96.139 90.685 19.524 22.720 0.857 8.905 2350.160 30.245 6.323 492
2 Solar + Wind 9xm4 41.229 -105.333 87.434 84.414 90.454 87.434 22.397 23.637 0.739 9.239 2211.469 28.337 4.174 656
Saved CSV: solarwind_top9.csv
Saved interactive map to: solarwind_top9_map.html

SOLARWIND INC. REPORT (CLASSIFIED)¶

We tried to find the best place to put our wind and solar farms. Our initial run recommended the ocean as a place for our wind farms. This idea was scrapped after told us that they didn't have enough power to claim the ocean yet.

As such, the oceanic wind farms have been delayed by at least 15 years, till has become the king of the world. We adjusted the methodology to use surface vegetation as a marker for land mass.

For now, we shall release a public facing report, we think Canada might be a good shout.

SOLARWIND INC. REPORT¶

We ran our spark data analysis tools on the datasets, to find the best place to setup solar and wind farms. Here are the results of our groundbeaking technology:

Solar Farms
The following places were identified as being ideal for solar farms:

  • Imperial County, CA
Solar Score Geohash Avg Cloud Cover
100 9mve 9.37
  • Dunaway Road, Imperial County, CA
Solar Score Geohash Avg Cloud Cover
99.7 9mvd 9.66
  • Municipio de San Felipe, Mexico
Solar Score Geohash Avg Cloud Cover
99.67 9mqr 9.43

The council decided that they were not ready to expand business into Mexico yet. Imperial County shall be the target for the next 5 years.
To see more detailed information, kindly refer to the tables attached above the map.

Wind Farms
The following places were identifed as being ideal for wind farms.

  • Albany County, Wyoming
Wind Score Geohash Avg Wind Speed
100 9xmp 8.803
  • Cape North, Nova Scotia, Canada
Wind Score Geohash Avg Wind Speed
99.19 f8kf 10.095
  • Cheticamp Flats, Nova Scotia, Canada
Wind Score Geohash Avg Wind Speed
97.33 f8kc 10.1

Solar + Wind Farms

This is some prime real estate. These locations are number one priority, because they support good solar and wind farms in the same location, that will be cost-effective for maintenance.

  • Britania Mountain, Wyoming
Total Score Wind Score Solar Score Geohash
92.24 100 84.47 9xmp
  • Albany County, Wyoming, United States of America
Total Score Wind Score Solar Score Geohash
90.69 96.14 85.23 9xmn

This location is very close to a government park, so it is unlikely we will be able to do anything here.

  • Albany County, Wyoming. This time, near a lake.
Total Score Wind Score Solar Score Geohash
87.43 90.45 84.41 9xm4

This concludes our report. For more information, please take a look at the tables above the map.

CLIMATE CHANGE¶

In [27]:
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField,
    StringType, LongType, DoubleType
)

from SparkHandler import Sparky, SparkyBatchContext, SparkyAnalysisContext
import util

from IPython.display import display
import pandas as pd
import matplotlib.pyplot as plt
import os
import time
import math

sc = spark.sparkContext

# ============================================================
# CONFIG
# ============================================================

INPUT_GLOB = "hdfs:///3hr/*/*"
BATCH_SIZE = 50

PARTIAL_OUT = f"hdfs:///user/{sc.sparkUser()}/climate_change_geohash2_partials"
RESULT_OUT = None

FILE_STRIDE = 1

GEOHASH_PRECISION = 2

YEARS_BACK = 5

MIN_YEARS_FOR_TREND = 2

TOP_REGIONS_TO_DISPLAY = 10

TIME_COLUMN = "time"
LAT_COLUMN = "lat"
LON_COLUMN = "lon"

ALBEDO_COLUMN = "albedo_surface"
PRECIPITABLE_WATER_COLUMN = "precipitable_water_entire_atmosphere_single_layer"
PRESSURE_SURFACE_COLUMN = "pressure_surface"
RH_ZERO_COLUMN = "relative_humidity_zerodegc_isotherm"
SNOW_DEPTH_COLUMN = "snow_depth_surface"
TEMP_COLUMN = "temperature_surface"
CLOUD_COLUMN = "total_cloud_cover_entire_atmosphere_single_layer"
PRECIP_COLUMN = "total_precipitation_surface_3_hour_accumulation"
VEGETATION_COLUMN = "vegetation_surface"
VISIBILITY_COLUMN = "visibility_surface"
WIND_GUST_COLUMN = "wind_speed_gust_surface"


_BASE32 = "0123456789bcdefghjkmnpqrstuvwxyz"

def encode_geohash(lat, lon, precision=2):
    lat_range = [-90.0, 90.0]
    lon_range = [-180.0, 180.0]

    bits = [16, 8, 4, 2, 1]
    bit = 0
    ch = 0
    even = True
    geohash = []

    while len(geohash) < precision:
        if even:
            mid = (lon_range[0] + lon_range[1]) / 2.0
            if lon >= mid:
                ch |= bits[bit]
                lon_range[0] = mid
            else:
                lon_range[1] = mid
        else:
            mid = (lat_range[0] + lat_range[1]) / 2.0
            if lat >= mid:
                ch |= bits[bit]
                lat_range[0] = mid
            else:
                lat_range[1] = mid

        even = not even

        if bit < 4:
            bit += 1
        else:
            geohash.append(_BASE32[ch])
            bit = 0
            ch = 0

    return "".join(geohash)

# ============================================================
# SCHEMA 
# ============================================================

raw_schema = StructType([
    StructField("geohash2", StringType(), False),
    StructField("time_ms", LongType(), False),
    StructField("lat", DoubleType(), False),
    StructField("lon", DoubleType(), False),

    StructField("temp_k", DoubleType(), False),
    StructField("precipitable_water", DoubleType(), False),
    StructField("pressure_surface", DoubleType(), False),
    StructField("rh_zerodegc", DoubleType(), False),
    StructField("snow_depth", DoubleType(), False),
    StructField("cloud_cover", DoubleType(), False),
    StructField("precip_3hr", DoubleType(), False),
    StructField("vegetation", DoubleType(), False),
    StructField("visibility", DoubleType(), False),
    StructField("wind_gust", DoubleType(), False),
    StructField("albedo", DoubleType(), False),
])


time_idx = None
lat_idx = None
lon_idx = None

albedo_idx = None
precipitable_water_idx = None
pressure_surface_idx = None
rh_zero_idx = None
snow_depth_idx = None
temp_idx = None
cloud_idx = None
precip_idx = None
vegetation_idx = None
visibility_idx = None
wind_gust_idx = None

# ============================================================
# PARSER
# ============================================================

def parse_climate_change_line(line):
    try:
        if line.startswith("1_time"):
            return None

        parts = line.rstrip("\t").split("\t")

        time_ms = util.safe_float(parts, time_idx)
        lat = util.safe_float(parts, lat_idx)
        lon = util.safe_float(parts, lon_idx)

        temp_k = util.safe_float(parts, temp_idx)
        precipitable_water = util.safe_float(parts, precipitable_water_idx)
        pressure_surface = util.safe_float(parts, pressure_surface_idx)
        rh_zerodegc = util.safe_float(parts, rh_zero_idx)
        snow_depth = util.safe_float(parts, snow_depth_idx)
        cloud_cover = util.safe_float(parts, cloud_idx)
        precip_3hr = util.safe_float(parts, precip_idx)
        vegetation = util.safe_float(parts, vegetation_idx)
        visibility = util.safe_float(parts, visibility_idx)
        wind_gust = util.safe_float(parts, wind_gust_idx)
        albedo = util.safe_float(parts, albedo_idx)

        if (
            time_ms is None
            or lat is None
            or lon is None
            or temp_k is None
            or precipitable_water is None
            or pressure_surface is None
            or rh_zerodegc is None
            or snow_depth is None
            or cloud_cover is None
            or precip_3hr is None
            or vegetation is None
            or visibility is None
            or wind_gust is None
            or albedo is None
        ):
            return None

        geohash2 = encode_geohash(lat, lon, GEOHASH_PRECISION)

        return (
            geohash2,
            int(time_ms),
            lat,
            lon,
            temp_k,
            precipitable_water,
            pressure_surface,
            rh_zerodegc,
            snow_depth,
            cloud_cover,
            precip_3hr,
            vegetation,
            visibility,
            wind_gust,
            albedo,
        )

    except Exception:
        return None

# ============================================================
# PARTIAL TRANSFORM
# geohash2 + year aggregates
# ============================================================

def partial_transform(points):
    yearly = (
        points
        .withColumn(
            "timestamp",
            F.from_unixtime((F.col("time_ms") / 1000).cast("long")).cast("timestamp")
        )
        .withColumn("year", F.year("timestamp"))
    )

    return (
        yearly
        .groupBy("geohash2", "year")
        .agg(
            F.sum("lat").alias("sum_lat"),
            F.sum("lon").alias("sum_lon"),
            F.count("*").alias("count_points"),

            F.sum("temp_k").alias("sum_temp_k"),
            F.count("temp_k").alias("count_temp"),

            F.sum("precipitable_water").alias("sum_precipitable_water"),
            F.count("precipitable_water").alias("count_precipitable_water"),

            F.sum("pressure_surface").alias("sum_pressure_surface"),
            F.count("pressure_surface").alias("count_pressure_surface"),

            F.sum("rh_zerodegc").alias("sum_rh_zerodegc"),
            F.count("rh_zerodegc").alias("count_rh_zerodegc"),

            F.sum("snow_depth").alias("sum_snow_depth"),
            F.count("snow_depth").alias("count_snow_depth"),

            F.sum("cloud_cover").alias("sum_cloud_cover"),
            F.count("cloud_cover").alias("count_cloud_cover"),

            F.sum("precip_3hr").alias("sum_precip_3hr"),
            F.count("precip_3hr").alias("count_precip_3hr"),

            F.sum("vegetation").alias("sum_vegetation"),
            F.count("vegetation").alias("count_vegetation"),

            F.sum("visibility").alias("sum_visibility"),
            F.count("visibility").alias("count_visibility"),

            F.sum("wind_gust").alias("sum_wind_gust"),
            F.count("wind_gust").alias("count_wind_gust"),

            F.sum("albedo").alias("sum_albedo"),
            F.count("albedo").alias("count_albedo"),
        )
    )

# ============================================================
# COMBINE PARTIALS
# ============================================================

def combine_climate_partials(partials):
    combined = (
        partials
        .groupBy("geohash2", "year")
        .agg(
            F.sum("sum_lat").alias("sum_lat"),
            F.sum("sum_lon").alias("sum_lon"),
            F.sum("count_points").alias("points"),

            F.sum("sum_temp_k").alias("sum_temp_k"),
            F.sum("count_temp").alias("count_temp"),

            F.sum("sum_precipitable_water").alias("sum_precipitable_water"),
            F.sum("count_precipitable_water").alias("count_precipitable_water"),

            F.sum("sum_pressure_surface").alias("sum_pressure_surface"),
            F.sum("count_pressure_surface").alias("count_pressure_surface"),

            F.sum("sum_rh_zerodegc").alias("sum_rh_zerodegc"),
            F.sum("count_rh_zerodegc").alias("count_rh_zerodegc"),

            F.sum("sum_snow_depth").alias("sum_snow_depth"),
            F.sum("count_snow_depth").alias("count_snow_depth"),

            F.sum("sum_cloud_cover").alias("sum_cloud_cover"),
            F.sum("count_cloud_cover").alias("count_cloud_cover"),

            F.sum("sum_precip_3hr").alias("sum_precip_3hr"),
            F.sum("count_precip_3hr").alias("count_precip_3hr"),

            F.sum("sum_vegetation").alias("sum_vegetation"),
            F.sum("count_vegetation").alias("count_vegetation"),

            F.sum("sum_visibility").alias("sum_visibility"),
            F.sum("count_visibility").alias("count_visibility"),

            F.sum("sum_wind_gust").alias("sum_wind_gust"),
            F.sum("count_wind_gust").alias("count_wind_gust"),

            F.sum("sum_albedo").alias("sum_albedo"),
            F.sum("count_albedo").alias("count_albedo"),
        )
    )

    return (
        combined
        .withColumn("lat", F.col("sum_lat") / F.col("points"))
        .withColumn("lon", F.col("sum_lon") / F.col("points"))

        .withColumn("avg_temp_k", F.col("sum_temp_k") / F.col("count_temp"))
        .withColumn("avg_temp_c", F.col("avg_temp_k") - F.lit(273.15))

        .withColumn("avg_precipitable_water", F.col("sum_precipitable_water") / F.col("count_precipitable_water"))
        .withColumn("avg_pressure_surface", F.col("sum_pressure_surface") / F.col("count_pressure_surface"))
        .withColumn("avg_rh_zerodegc", F.col("sum_rh_zerodegc") / F.col("count_rh_zerodegc"))
        .withColumn("avg_snow_depth", F.col("sum_snow_depth") / F.col("count_snow_depth"))
        .withColumn("avg_cloud_cover", F.col("sum_cloud_cover") / F.col("count_cloud_cover"))

        # NAM total precipitation is a 3-hour accumulation.
        .withColumn("avg_precip_3hr", F.col("sum_precip_3hr") / F.col("count_precip_3hr"))
        .withColumn("avg_daily_precip", F.col("avg_precip_3hr") * F.lit(8.0))

        .withColumn("avg_vegetation", F.col("sum_vegetation") / F.col("count_vegetation"))
        .withColumn("avg_visibility", F.col("sum_visibility") / F.col("count_visibility"))
        .withColumn("avg_wind_gust", F.col("sum_wind_gust") / F.col("count_wind_gust"))
        .withColumn("avg_albedo", F.col("sum_albedo") / F.col("count_albedo"))

        .select(
            "geohash2",
            "year",
            "lat",
            "lon",
            "points",
            "avg_temp_c",
            "avg_precipitable_water",
            "avg_pressure_surface",
            "avg_rh_zerodegc",
            "avg_snow_depth",
            "avg_cloud_cover",
            "avg_daily_precip",
            "avg_vegetation",
            "avg_visibility",
            "avg_wind_gust",
            "avg_albedo",
        )
    )

# ============================================================
# SPARKY SETUP
# ============================================================

sparky = Sparky(spark, sc)

batch_ctx = SparkyBatchContext(
    glob_path=INPUT_GLOB,
    partial_out=PARTIAL_OUT,
    schema=raw_schema,
    parse_line=parse_climate_change_line,
    partial_transform=partial_transform,
    batch_size=BATCH_SIZE,
)

input_paths = sparky.setup_job(batch_ctx)

input_paths = input_paths[::FILE_STRIDE]

print("Files used:", len(input_paths))
print("Batch size:", BATCH_SIZE)
print("Number of batches:", (len(input_paths) + BATCH_SIZE - 1) // BATCH_SIZE)

if len(input_paths) == 0:
    raise ValueError(f"No files matched: {INPUT_GLOB}")


def required_idx(name):
    key = name.strip().lower()
    if key not in sparky.columns:
        print("Available related columns:")
        for col_name, idx in sparky.columns.items():
            if name.split("_")[0] in col_name:
                print(idx, col_name)
        raise ValueError(f"Missing required column: {name}")
    return sparky.columns[key]

time_idx = required_idx(TIME_COLUMN)
lat_idx = required_idx(LAT_COLUMN)
lon_idx = required_idx(LON_COLUMN)

albedo_idx = required_idx(ALBEDO_COLUMN)
precipitable_water_idx = required_idx(PRECIPITABLE_WATER_COLUMN)
pressure_surface_idx = required_idx(PRESSURE_SURFACE_COLUMN)
rh_zero_idx = required_idx(RH_ZERO_COLUMN)
snow_depth_idx = required_idx(SNOW_DEPTH_COLUMN)
temp_idx = required_idx(TEMP_COLUMN)
cloud_idx = required_idx(CLOUD_COLUMN)
precip_idx = required_idx(PRECIP_COLUMN)
vegetation_idx = required_idx(VEGETATION_COLUMN)
visibility_idx = required_idx(VISIBILITY_COLUMN)
wind_gust_idx = required_idx(WIND_GUST_COLUMN)

print("Using column indices:")
print("time:", time_idx)
print("lat:", lat_idx)
print("lon:", lon_idx)
print("albedo:", albedo_idx)
print("precipitable water:", precipitable_water_idx)
print("pressure surface:", pressure_surface_idx)
print("RH zero C:", rh_zero_idx)
print("snow depth:", snow_depth_idx)
print("temp:", temp_idx)
print("cloud:", cloud_idx)
print("precip:", precip_idx)
print("vegetation:", vegetation_idx)
print("visibility:", visibility_idx)
print("wind gust:", wind_gust_idx)

# ============================================================
# RUN BATCHES
# ============================================================

overall_start = time.perf_counter()

for batch_id, start in enumerate(range(0, len(input_paths), BATCH_SIZE)):
    batch_paths = input_paths[start:start + BATCH_SIZE]
    sparky.process_batch(batch_ctx, batch_paths, batch_id)

overall_elapsed = time.perf_counter() - overall_start
print(f"Total Climate Change batch runtime: {overall_elapsed:.2f} seconds")

# ============================================================
# COMBINE
# ============================================================

analysis_ctx = SparkyAnalysisContext(
    partial_in=PARTIAL_OUT,
    combine_partials=combine_climate_partials,
    analyze=lambda df: df,
    result_out=RESULT_OUT,
)

yearly_stats_all = sparky.combine(analysis_ctx)


year_bounds = yearly_stats_all.agg(
    F.min("year").alias("min_year"),
    F.max("year").alias("max_year"),
    F.countDistinct("year").alias("n_years"),
).first().asDict()

min_year_available = year_bounds["min_year"]
max_year_available = year_bounds["max_year"]
n_years_available = year_bounds["n_years"]

start_year = max_year_available - YEARS_BACK + 1

print("Available years:", min_year_available, "to", max_year_available)
print("Distinct years:", n_years_available)
print("Using years:", start_year, "to", max_year_available)

yearly_stats = (
    yearly_stats_all
    .filter((F.col("year") >= start_year) & (F.col("year") <= max_year_available))
)

display(
    yearly_stats
    .groupBy("year")
    .agg(F.countDistinct("geohash2").alias("geohash2_regions"))
    .orderBy("year")
    .toPandas()
)

# ============================================================
# TEMPERATURE TRENDS BY TWO-CHARACTER GEOHASH
# ============================================================

trend_stats = (
    yearly_stats
    .groupBy("geohash2")
    .agg(
        F.avg("lat").alias("lat"),
        F.avg("lon").alias("lon"),
        F.countDistinct("year").alias("n_years"),
        F.min("year").alias("first_year"),
        F.max("year").alias("last_year"),

        F.avg("year").alias("avg_year"),
        F.avg("avg_temp_c").alias("mean_temp_c"),
        F.avg(F.col("year") * F.col("avg_temp_c")).alias("avg_year_temp"),
        F.avg(F.col("year") * F.col("year")).alias("avg_year_sq"),
    )
    .withColumn(
        "temp_slope_c_per_year",
        (
            F.col("avg_year_temp") - F.col("avg_year") * F.col("mean_temp_c")
        ) / (
            F.col("avg_year_sq") - F.col("avg_year") * F.col("avg_year")
        )
    )
    .withColumn(
        "estimated_temp_change_c",
        F.col("temp_slope_c_per_year") * (F.col("last_year") - F.col("first_year"))
    )
    .filter(F.col("n_years") >= MIN_YEARS_FOR_TREND)
)

warming_regions = (
    trend_stats
    .filter(F.col("temp_slope_c_per_year") > 0)
    .orderBy(F.desc("temp_slope_c_per_year"))
)

cooling_or_flat_regions = (
    trend_stats
    .filter(F.col("temp_slope_c_per_year") <= 0)
    .orderBy(F.asc("temp_slope_c_per_year"))
)

print("Top warming two-character geohash regions:")
display(
    warming_regions
    .select(
        "geohash2",
        "lat",
        "lon",
        "n_years",
        "first_year",
        "last_year",
        "mean_temp_c",
        "temp_slope_c_per_year",
        "estimated_temp_change_c",
    )
    .limit(TOP_REGIONS_TO_DISPLAY)
    .toPandas()
)

print("Cooling/flat two-character geohash regions:")
display(
    cooling_or_flat_regions
    .select(
        "geohash2",
        "lat",
        "lon",
        "n_years",
        "first_year",
        "last_year",
        "mean_temp_c",
        "temp_slope_c_per_year",
        "estimated_temp_change_c",
    )
    .limit(TOP_REGIONS_TO_DISPLAY)
    .toPandas()
)

# ============================================================
# CORRELATION MATRIX FOR WARMING REGIONS
# ============================================================

warming_yearly = (
    yearly_stats
    .join(
        warming_regions.select("geohash2"),
        on="geohash2",
        how="inner",
    )
)

feature_cols = [
    "avg_temp_c",
    "avg_precipitable_water",
    "avg_pressure_surface",
    "avg_rh_zerodegc",
    "avg_snow_depth",
    "avg_cloud_cover",
    "avg_daily_precip",
    "avg_vegetation",
    "avg_visibility",
    "avg_wind_gust",
    "avg_albedo",
]

warming_pdf = warming_yearly.select(
    "geohash2",
    "year",
    *feature_cols,
).toPandas()

print("Rows used for warming-region correlation matrix:", len(warming_pdf))

if len(warming_pdf) == 0:
    print("No warming regions found with the current trend filter.")
else:
    global_corr = warming_pdf[feature_cols].corr(method="pearson")

    print("Global PCC correlation matrix across warming regions:")
    display(global_corr.round(3))

    # Heatmap
    plt.figure(figsize=(11, 9))
    im = plt.imshow(global_corr.values, vmin=-1, vmax=1)
    plt.colorbar(im, label="Pearson correlation coefficient")
    plt.xticks(range(len(feature_cols)), feature_cols, rotation=60, ha="right")
    plt.yticks(range(len(feature_cols)), feature_cols)
    plt.title("PCC Correlation Matrix for Warming Geohash-2 Regions")
    plt.tight_layout()

    plot_dir = f"/bigdata/students/{os.environ['USER']}/plots"
    os.makedirs(plot_dir, exist_ok=True)

    corr_path = f"{plot_dir}/climate_change_global_corr.png"
    plt.savefig(corr_path, dpi=150, bbox_inches="tight")
    plt.show()

    print("Saved correlation heatmap:", corr_path)

# ============================================================
# Do correlations between temperature and other features differ across warming regions?
# ============================================================

regional_corr_exprs = []

for feature in feature_cols:
    if feature == "avg_temp_c":
        continue

    regional_corr_exprs.append(
        F.corr("avg_temp_c", feature).alias(f"pcc_temp_vs_{feature}")
    )

regional_temp_corrs = (
    warming_yearly
    .groupBy("geohash2")
    .agg(
        F.countDistinct("year").alias("n_years"),
        *regional_corr_exprs,
    )
    .filter(F.col("n_years") >= MIN_YEARS_FOR_TREND)
)

print("Temperature-vs-feature PCC by warming region:")
regional_temp_corrs_pdf = regional_temp_corrs.toPandas()
display(regional_temp_corrs_pdf.round(3))

# ============================================================
# Compare variation in correlations across regions
# ============================================================

corr_value_cols = [
    c for c in regional_temp_corrs_pdf.columns
    if c.startswith("pcc_temp_vs_")
]

if len(regional_temp_corrs_pdf) > 0:
    corr_variation = []

    for col in corr_value_cols:
        series = regional_temp_corrs_pdf[col].dropna()

        if len(series) > 0:
            corr_variation.append({
                "correlation": col,
                "min_pcc": series.min(),
                "max_pcc": series.max(),
                "mean_pcc": series.mean(),
                "std_pcc": series.std(),
                "range_pcc": series.max() - series.min(),
            })

    corr_variation_pdf = pd.DataFrame(corr_variation).sort_values(
        "range_pcc",
        ascending=False,
    )

    print("How much temp-feature correlations differ by region:")
    display(corr_variation_pdf.round(3))


top_warming_geohashes = [
    row["geohash2"]
    for row in warming_regions
        .select("geohash2")
        .limit(5)
        .collect()
]

for gh in top_warming_geohashes:
    region_pdf = warming_pdf[warming_pdf["geohash2"] == gh].copy()

    if len(region_pdf) < MIN_YEARS_FOR_TREND:
        continue

    region_corr = region_pdf[feature_cols].corr(method="pearson")

    print(f"Full PCC matrix for warming region {gh}:")
    display(region_corr.round(3))

# ============================================================
# SAVE TABLES
# ============================================================

# NOTE: Matthew, change this if you want to run stuff. idk you are allowed you to save things in my dir?
plot_dir = f"/bigdata/students/{os.environ['USER']}/plots"
os.makedirs(plot_dir, exist_ok=True)

warming_csv = f"{plot_dir}/climate_change_warming_regions.csv"
regional_corr_csv = f"{plot_dir}/climate_change_regional_temp_correlations.csv"

warming_regions.toPandas().to_csv(warming_csv, index=False)
regional_temp_corrs_pdf.to_csv(regional_corr_csv, index=False)

print("Saved warming regions:", warming_csv)
print("Saved regional temp correlations:", regional_corr_csv)
Deleted old partial output: hdfs:/user/rsbhatia2/climate_change_geohash2_partials
Found 6770 files
                                                                                
Files used: 6770
Batch size: 50
Number of batches: 136
Using column indices:
time: 0
lat: 1
lon: 2
albedo: 3
precipitable water: 4
pressure surface: 6
RH zero C: 8
snow depth: 9
temp: 10
cloud: 12
precip: 13
vegetation: 14
visibility: 15
wind gust: 17
Processing batch 0: 50 files
                                                                                
Processing batch 1: 50 files
                                                                                
Processing batch 2: 50 files
                                                                                
Processing batch 3: 50 files
                                                                                
Processing batch 4: 50 files
                                                                                
Processing batch 5: 50 files
                                                                                
Processing batch 6: 50 files
                                                                                
Processing batch 7: 50 files
                                                                                
Processing batch 8: 50 files
                                                                                
Processing batch 9: 50 files
                                                                                
Processing batch 10: 50 files
                                                                                
Processing batch 11: 50 files
                                                                                
Processing batch 12: 50 files
                                                                                
Processing batch 13: 50 files
                                                                                
Processing batch 14: 50 files
                                                                                
Processing batch 15: 50 files
                                                                                
Processing batch 16: 50 files
                                                                                
Processing batch 17: 50 files
                                                                                
Processing batch 18: 50 files
                                                                                
Processing batch 19: 50 files
                                                                                
Processing batch 20: 50 files
                                                                                
Processing batch 21: 50 files
                                                                                
Processing batch 22: 50 files
                                                                                
Processing batch 23: 50 files
                                                                                
Processing batch 24: 50 files
                                                                                
Processing batch 25: 50 files
                                                                                
Processing batch 26: 50 files
                                                                                
Processing batch 27: 50 files
                                                                                
Processing batch 28: 50 files
                                                                                
Processing batch 29: 50 files
                                                                                
Processing batch 30: 50 files
                                                                                
Processing batch 31: 50 files
                                                                                
Processing batch 32: 50 files
                                                                                
Processing batch 33: 50 files
                                                                                
Processing batch 34: 50 files
                                                                                
Processing batch 35: 50 files
                                                                                
Processing batch 36: 50 files
                                                                                
Processing batch 37: 50 files
                                                                                
Processing batch 38: 50 files
                                                                                
Processing batch 39: 50 files
                                                                                
Processing batch 40: 50 files
                                                                                
Processing batch 41: 50 files
                                                                                
Processing batch 42: 50 files
                                                                                
Processing batch 43: 50 files
                                                                                
Processing batch 44: 50 files
                                                                                
Processing batch 45: 50 files
                                                                                
Processing batch 46: 50 files
                                                                                
Processing batch 47: 50 files
                                                                                
Processing batch 48: 50 files
                                                                                
Processing batch 49: 50 files
                                                                                
Processing batch 50: 50 files
                                                                                
Processing batch 51: 50 files
                                                                                
Processing batch 52: 50 files
                                                                                
Processing batch 53: 50 files
                                                                                
Processing batch 54: 50 files
                                                                                
Processing batch 55: 50 files
                                                                                
Processing batch 56: 50 files
                                                                                
Processing batch 57: 50 files
                                                                                
Processing batch 58: 50 files
                                                                                
Processing batch 59: 50 files
                                                                                
Processing batch 60: 50 files
                                                                                
Processing batch 61: 50 files
                                                                                
Processing batch 62: 50 files
                                                                                
Processing batch 63: 50 files
                                                                                
Processing batch 64: 50 files
                                                                                
Processing batch 65: 50 files
                                                                                
Processing batch 66: 50 files
                                                                                
Processing batch 67: 50 files
                                                                                
Processing batch 68: 50 files
                                                                                
Processing batch 69: 50 files
                                                                                
Processing batch 70: 50 files
                                                                                
Processing batch 71: 50 files
                                                                                
Processing batch 72: 50 files
                                                                                
Processing batch 73: 50 files
                                                                                
Processing batch 74: 50 files
                                                                                
Processing batch 75: 50 files
                                                                                
Processing batch 76: 50 files
                                                                                
Processing batch 77: 50 files
                                                                                
Processing batch 78: 50 files
                                                                                
Processing batch 79: 50 files
                                                                                
Processing batch 80: 50 files
                                                                                
Processing batch 81: 50 files
                                                                                
Processing batch 82: 50 files
                                                                                
Processing batch 83: 50 files
                                                                                
Processing batch 84: 50 files
                                                                                
Processing batch 85: 50 files
                                                                                
Processing batch 86: 50 files
                                                                                
Processing batch 87: 50 files
                                                                                
Processing batch 88: 50 files
                                                                                
Processing batch 89: 50 files
                                                                                
Processing batch 90: 50 files
                                                                                
Processing batch 91: 50 files
                                                                                
Processing batch 92: 50 files
                                                                                
Processing batch 93: 50 files
                                                                                
Processing batch 94: 50 files
                                                                                
Processing batch 95: 50 files
                                                                                
Processing batch 96: 50 files
                                                                                
Processing batch 97: 50 files
                                                                                
Processing batch 98: 50 files
                                                                                
Processing batch 99: 50 files
                                                                                
Processing batch 101: 50 files
                                                                                
Processing batch 102: 50 files
                                                                                
Processing batch 103: 50 files
                                                                                
Processing batch 104: 50 files
                                                                                
Processing batch 105: 50 files
                                                                                
Processing batch 106: 50 files
                                                                                
Processing batch 107: 50 files
                                                                                
Processing batch 108: 50 files
                                                                                
Processing batch 109: 50 files
                                                                                
Processing batch 110: 50 files
                                                                                
Processing batch 111: 50 files
                                                                                
Processing batch 112: 50 files
                                                                                
Processing batch 113: 50 files
                                                                                
Processing batch 114: 50 files
                                                                                
Processing batch 115: 50 files
                                                                                
Processing batch 116: 50 files
                                                                                
Processing batch 117: 50 files
                                                                                
Processing batch 118: 50 files
                                                                                
Processing batch 119: 50 files
                                                                                
Processing batch 120: 50 files
                                                                                
Processing batch 121: 50 files
                                                                                
Processing batch 122: 50 files
                                                                                
Processing batch 123: 50 files
                                                                                
Processing batch 124: 50 files
                                                                                
Processing batch 125: 50 files
                                                                                
Processing batch 126: 50 files
                                                                                
Processing batch 127: 50 files
                                                                                
Processing batch 128: 50 files
                                                                                
Processing batch 129: 50 files
                                                                                
Processing batch 130: 50 files
                                                                                
Processing batch 131: 50 files
                                                                                
Processing batch 132: 50 files
                                                                                
Processing batch 133: 50 files
                                                                                
Processing batch 134: 50 files
                                                                                
Processing batch 135: 20 files
                                                                                
Total Climate Change batch runtime: 4697.74 seconds
                                                                                
Available years: 2017 to 2019
Distinct years: 3
Using years: 2015 to 2019
                                                                                
year geohash2_regions
0 2017 77
1 2018 77
2 2019 77
Top warming two-character geohash regions:
                                                                                
geohash2 lat lon n_years first_year last_year mean_temp_c temp_slope_c_per_year estimated_temp_change_c
0 8x 44.818510 -146.372595 3 2017 2019 12.233501 0.309031 0.618062
1 b8 48.606524 -147.518343 3 2017 2019 10.344493 0.251628 0.503257
2 bb 47.805079 -140.624287 3 2017 2019 11.473247 0.231085 0.462169
3 b9 53.291323 -148.854043 3 2017 2019 8.761258 0.230133 0.460267
4 8z 42.307755 -140.070052 3 2017 2019 14.511733 0.196659 0.393319
5 bc 53.431775 -140.625296 3 2017 2019 9.985080 0.003220 0.006440
Cooling/flat two-character geohash regions:
geohash2 lat lon n_years first_year last_year mean_temp_c temp_slope_c_per_year estimated_temp_change_c
0 f4 58.692086 -84.460206 3 2017 2019 -5.820832 -2.999996 -5.999993
1 cd 58.668861 -106.779269 3 2017 2019 -0.262855 -2.982981 -5.965962
2 c8 47.805292 -106.877264 3 2017 2019 6.949232 -2.922989 -5.845978
3 f3 53.435855 -73.124153 3 2017 2019 -0.518192 -2.784224 -5.568448
4 c9 53.433213 -106.876210 3 2017 2019 3.537299 -2.709362 -5.418725
5 cb 47.798558 -95.625294 3 2017 2019 6.174566 -2.702436 -5.404873
6 f6 58.369469 -73.322819 3 2017 2019 -4.076636 -2.698814 -5.397629
7 cc 53.438562 -95.625057 3 2017 2019 2.391764 -2.644396 -5.288792
8 f1 53.431203 -84.376120 3 2017 2019 -0.525059 -2.624037 -5.248074
9 cf 58.797982 -95.624255 3 2017 2019 -3.979622 -2.606682 -5.213364
                                                                                
Rows used for warming-region correlation matrix: 18
Global PCC correlation matrix across warming regions:
avg_temp_c avg_precipitable_water avg_pressure_surface avg_rh_zerodegc avg_snow_depth avg_cloud_cover avg_daily_precip avg_vegetation avg_visibility avg_wind_gust avg_albedo
avg_temp_c 1.000 0.920 0.963 -0.957 NaN -0.108 -0.028 NaN 0.356 -0.576 NaN
avg_precipitable_water 0.920 1.000 0.925 -0.894 NaN 0.032 0.334 NaN 0.013 -0.310 NaN
avg_pressure_surface 0.963 0.925 1.000 -0.973 NaN -0.000 0.076 NaN 0.231 -0.441 NaN
avg_rh_zerodegc -0.957 -0.894 -0.973 1.000 NaN 0.039 0.034 NaN -0.249 0.478 NaN
avg_snow_depth NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
avg_cloud_cover -0.108 0.032 -0.000 0.039 NaN 1.000 0.444 NaN -0.525 0.508 NaN
avg_daily_precip -0.028 0.334 0.076 0.034 NaN 0.444 1.000 NaN -0.713 0.545 NaN
avg_vegetation NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
avg_visibility 0.356 0.013 0.231 -0.249 NaN -0.525 -0.713 NaN 1.000 -0.647 NaN
avg_wind_gust -0.576 -0.310 -0.441 0.478 NaN 0.508 0.545 NaN -0.647 1.000 NaN
avg_albedo NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
No description has been provided for this image
Saved correlation heatmap: /bigdata/students/rsbhatia2/plots/climate_change_global_corr.png
Temperature-vs-feature PCC by warming region:
                                                                                
geohash2 n_years pcc_temp_vs_avg_precipitable_water pcc_temp_vs_avg_pressure_surface pcc_temp_vs_avg_rh_zerodegc pcc_temp_vs_avg_snow_depth pcc_temp_vs_avg_cloud_cover pcc_temp_vs_avg_daily_precip pcc_temp_vs_avg_vegetation pcc_temp_vs_avg_visibility pcc_temp_vs_avg_wind_gust pcc_temp_vs_avg_albedo
0 8z 3 0.573 -0.992 0.979 NaN 0.148 0.266 NaN 0.417 -0.993 NaN
1 b9 3 -0.664 0.742 -0.265 NaN 0.409 -0.993 NaN 1.000 -0.753 NaN
2 b8 3 -0.953 0.543 -0.625 NaN -0.716 -0.981 NaN 1.000 -0.126 NaN
3 bb 3 -0.401 0.306 -0.615 NaN -0.264 -0.452 NaN 0.923 -0.954 NaN
4 bc 3 0.098 0.354 0.290 NaN -0.198 -0.429 NaN 0.057 -0.880 NaN
5 8x 3 -0.359 -0.099 -0.726 NaN 0.267 -0.749 NaN 0.703 0.723 NaN
How much temp-feature correlations differ by region:
correlation min_pcc max_pcc mean_pcc std_pcc range_pcc
1 pcc_temp_vs_avg_pressure_surface -0.992 0.742 0.142 0.622 1.734
6 pcc_temp_vs_avg_wind_gust -0.993 0.723 -0.497 0.677 1.716
2 pcc_temp_vs_avg_rh_zerodegc -0.726 0.979 -0.160 0.671 1.705
0 pcc_temp_vs_avg_precipitable_water -0.953 0.573 -0.284 0.546 1.525
4 pcc_temp_vs_avg_daily_precip -0.993 0.266 -0.556 0.472 1.259
3 pcc_temp_vs_avg_cloud_cover -0.716 0.409 -0.059 0.415 1.125
5 pcc_temp_vs_avg_visibility 0.057 1.000 0.683 0.380 0.943
Full PCC matrix for warming region 8x:
avg_temp_c avg_precipitable_water avg_pressure_surface avg_rh_zerodegc avg_snow_depth avg_cloud_cover avg_daily_precip avg_vegetation avg_visibility avg_wind_gust avg_albedo
avg_temp_c 1.000 -0.359 -0.099 -0.726 NaN 0.267 -0.749 NaN 0.703 0.723 NaN
avg_precipitable_water -0.359 1.000 -0.893 0.903 NaN 0.803 0.887 NaN -0.916 -0.904 NaN
avg_pressure_surface -0.099 -0.893 1.000 -0.613 NaN -0.985 -0.585 NaN 0.638 0.615 NaN
avg_rh_zerodegc -0.726 0.903 -0.613 1.000 NaN 0.469 0.999 NaN -0.999 -1.000 NaN
avg_snow_depth NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
avg_cloud_cover 0.267 0.803 -0.985 0.469 NaN 1.000 0.438 NaN -0.497 -0.472 NaN
avg_daily_precip -0.749 0.887 -0.585 0.999 NaN 0.438 1.000 NaN -0.998 -0.999 NaN
avg_vegetation NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
avg_visibility 0.703 -0.916 0.638 -0.999 NaN -0.497 -0.998 NaN 1.000 1.000 NaN
avg_wind_gust 0.723 -0.904 0.615 -1.000 NaN -0.472 -0.999 NaN 1.000 1.000 NaN
avg_albedo NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
Full PCC matrix for warming region b8:
avg_temp_c avg_precipitable_water avg_pressure_surface avg_rh_zerodegc avg_snow_depth avg_cloud_cover avg_daily_precip avg_vegetation avg_visibility avg_wind_gust avg_albedo
avg_temp_c 1.000 -0.953 0.543 -0.625 NaN -0.716 -0.981 NaN 1.000 -0.126 NaN
avg_precipitable_water -0.953 1.000 -0.262 0.357 NaN 0.895 0.993 NaN -0.958 0.422 NaN
avg_pressure_surface 0.543 -0.262 1.000 -0.995 NaN 0.196 -0.371 NaN 0.527 0.764 NaN
avg_rh_zerodegc -0.625 0.357 -0.995 1.000 NaN -0.097 0.462 NaN -0.609 -0.696 NaN
avg_snow_depth NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
avg_cloud_cover -0.716 0.895 0.196 -0.097 NaN 1.000 0.837 NaN -0.730 0.782 NaN
avg_daily_precip -0.981 0.993 -0.371 0.462 NaN 0.837 1.000 NaN -0.985 0.315 NaN
avg_vegetation NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
avg_visibility 1.000 -0.958 0.527 -0.609 NaN -0.730 -0.985 NaN 1.000 -0.145 NaN
avg_wind_gust -0.126 0.422 0.764 -0.696 NaN 0.782 0.315 NaN -0.145 1.000 NaN
avg_albedo NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
Full PCC matrix for warming region bb:
avg_temp_c avg_precipitable_water avg_pressure_surface avg_rh_zerodegc avg_snow_depth avg_cloud_cover avg_daily_precip avg_vegetation avg_visibility avg_wind_gust avg_albedo
avg_temp_c 1.000 -0.401 0.306 -0.615 NaN -0.264 -0.452 NaN 0.923 -0.954 NaN
avg_precipitable_water -0.401 1.000 -0.995 0.969 NaN -0.777 0.998 NaN -0.723 0.659 NaN
avg_pressure_surface 0.306 -0.995 1.000 -0.939 NaN 0.837 -0.988 NaN 0.649 -0.579 NaN
avg_rh_zerodegc -0.615 0.969 -0.939 1.000 NaN -0.598 0.981 NaN -0.871 0.824 NaN
avg_snow_depth NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
avg_cloud_cover -0.264 -0.777 0.837 -0.598 NaN 1.000 -0.741 NaN 0.127 -0.039 NaN
avg_daily_precip -0.452 0.998 -0.988 0.981 NaN -0.741 1.000 NaN -0.761 0.700 NaN
avg_vegetation NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
avg_visibility 0.923 -0.723 0.649 -0.871 NaN 0.127 -0.761 NaN 1.000 -0.996 NaN
avg_wind_gust -0.954 0.659 -0.579 0.824 NaN -0.039 0.700 NaN -0.996 1.000 NaN
avg_albedo NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
Full PCC matrix for warming region b9:
avg_temp_c avg_precipitable_water avg_pressure_surface avg_rh_zerodegc avg_snow_depth avg_cloud_cover avg_daily_precip avg_vegetation avg_visibility avg_wind_gust avg_albedo
avg_temp_c 1.000 -0.664 0.742 -0.265 NaN 0.409 -0.993 NaN 1.000 -0.753 NaN
avg_precipitable_water -0.664 1.000 -0.994 0.897 NaN -0.954 0.574 NaN -0.673 0.992 NaN
avg_pressure_surface 0.742 -0.994 1.000 -0.843 NaN 0.915 -0.660 NaN 0.750 -1.000 NaN
avg_rh_zerodegc -0.265 0.897 -0.843 1.000 NaN -0.988 0.152 NaN -0.276 0.834 NaN
avg_snow_depth NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
avg_cloud_cover 0.409 -0.954 0.915 -0.988 NaN 1.000 -0.301 NaN 0.419 -0.909 NaN
avg_daily_precip -0.993 0.574 -0.660 0.152 NaN -0.301 1.000 NaN -0.992 0.672 NaN
avg_vegetation NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
avg_visibility 1.000 -0.673 0.750 -0.276 NaN 0.419 -0.992 NaN 1.000 -0.760 NaN
avg_wind_gust -0.753 0.992 -1.000 0.834 NaN -0.909 0.672 NaN -0.760 1.000 NaN
avg_albedo NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
Full PCC matrix for warming region 8z:
avg_temp_c avg_precipitable_water avg_pressure_surface avg_rh_zerodegc avg_snow_depth avg_cloud_cover avg_daily_precip avg_vegetation avg_visibility avg_wind_gust avg_albedo
avg_temp_c 1.000 0.573 -0.992 0.979 NaN 0.148 0.266 NaN 0.417 -0.993 NaN
avg_precipitable_water 0.573 1.000 -0.464 0.395 NaN -0.726 0.942 NaN -0.507 -0.472 NaN
avg_pressure_surface -0.992 -0.464 1.000 -0.997 NaN -0.272 -0.141 NaN -0.529 1.000 NaN
avg_rh_zerodegc 0.979 0.395 -0.997 1.000 NaN 0.345 0.065 NaN 0.592 -0.996 NaN
avg_snow_depth NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
avg_cloud_cover 0.148 -0.726 -0.272 0.345 NaN 1.000 -0.914 NaN 0.961 -0.264 NaN
avg_daily_precip 0.266 0.942 -0.141 0.065 NaN -0.914 1.000 NaN -0.766 -0.150 NaN
avg_vegetation NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
avg_visibility 0.417 -0.507 -0.529 0.592 NaN 0.961 -0.766 NaN 1.000 -0.521 NaN
avg_wind_gust -0.993 -0.472 1.000 -0.996 NaN -0.264 -0.150 NaN -0.521 1.000 NaN
avg_albedo NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
Saved warming regions: /bigdata/students/rsbhatia2/plots/climate_change_warming_regions.csv
Saved regional temp correlations: /bigdata/students/rsbhatia2/plots/climate_change_regional_temp_correlations.csv

Weather Station¶

I am only using August 2019 data. The result is a 10 second, 4fps video that shows how different metrics changed over the month.

Data selection is through the server. Here is how I ran the server for this:

python server.py --input-glob "hdfs:///3hr/2019/*_20*{08}*_*" --host 0.0.0.0 --port 28420 --threads 3 --max-files 1500 --hold-open-after-done --file-stride 4

In [44]:
from collections import defaultdict, deque
import math
import os
import socket
import subprocess
import time


import matplotlib.pyplot as plt
from IPython.display import clear_output, display
from pyspark.streaming import StreamingContext

sc = spark.sparkContext

# ============================================================
# CONFIG
# ============================================================

WEATHER_HOST = "mc08"
WEATHER_PORT = 28420
WEATHER_BATCH_INTERVAL_SECONDS = 5
WEATHER_HISTORY_LIMIT = 500
WEATHER_SAVE_FRAMES = True
WEATHER_FRAME_DIR = f"/bigdata/students/rsbhatia2/weather_station_frames"
WEATHER_VIDEO_OUT = f"/bigdata/students/rsbhatia2/weather_station.mp4"
WEATHER_VIDEO_FPS = 2
WEATHER_CHECKPOINT_DIR = f"hdfs:///user/{sc.sparkUser()}/weather_station_streaming_checkpoint"

TIME_IDX = 0
LAT_IDX = 1
LON_IDX = 2
PRESSURE_SURFACE_IDX = 6
HUMIDITY_IDX = 8
TEMP_SURFACE_IDX = 10
PRECIP_3HR_IDX = 13
VISIBILITY_IDX = 15
WIND_GUST_IDX = 17

WEATHER_LOCATIONS = [
    {
        "name": "San Francisco Bay Area",
        "lat_min": 37.2,
        "lat_max": 38.2,
        "lon_min": -122.8,
        "lon_max": -121.7,
    },
    {
        "name": "Los Angeles Coast",
        "lat_min": 33.5,
        "lat_max": 34.4,
        "lon_min": -118.8,
        "lon_max": -117.7,
    },
    {
        "name": "Seattle / Puget Sound",
        "lat_min": 47.2,
        "lat_max": 47.9,
        "lon_min": -122.7,
        "lon_max": -121.9,
    },
    {
        "name": "Chicago",
        "lat_min": 41.5,
        "lat_max": 42.2,
        "lon_min": -88.1,
        "lon_max": -87.3,
    },
    {
        "name": "Miami",
        "lat_min": 25.5,
        "lat_max": 26.1,
        "lon_min": -80.5,
        "lon_max": -80.0,
    },
]

WEATHER_METRICS = [
    ("temp_c", "Temp C"),
    ("pressure_hpa", "Pressure hPa"),
    ("humidity_pct", "Humidity %"),
    ("precip", "Precip"),
    ("visibility_km", "Visibility km"),
    ("wind_gust", "Wind Gust"),
]

WEATHER_STREAMING_CONTEXT = None
WEATHER_HISTORY = {
    location["name"]: {metric_key: deque(maxlen=WEATHER_HISTORY_LIMIT) for metric_key, _ in WEATHER_METRICS}
    for location in WEATHER_LOCATIONS
}
WEATHER_BATCH_LABELS = deque(maxlen=WEATHER_HISTORY_LIMIT)
WEATHER_FRAME_COUNT = 0
WEATHER_STOP_REQUESTED = False

# ============================================================
# PARSING AND AGGREGATION 
# ============================================================

def weather_safe_float(parts, idx):
    if idx is None or idx >= len(parts):
        return None

    value = parts[idx].strip()
    if value == "" or value.lower() == "null":
        return None

    return float(value)


def weather_location_for(lat, lon):
    for location in WEATHER_LOCATIONS:
        if (
            location["lat_min"] <= lat <= location["lat_max"]
            and location["lon_min"] <= lon <= location["lon_max"]
        ):
            return location["name"]

    return None


def empty_weather_summary():
    return (0,) + tuple(0.0 for _ in range(len(WEATHER_METRICS) * 2))


def metric_pair(value):
    if value is None:
        return (0.0, 0.0)

    try:
        value = float(value)
    except Exception:
        return (0.0, 0.0)

    if not math.isfinite(value):
        return (0.0, 0.0)

    return (value, 1.0)


def extract_weather_values(parts):
    temp_k = weather_safe_float(parts, TEMP_SURFACE_IDX)
    pressure_pa = weather_safe_float(parts, PRESSURE_SURFACE_IDX)
    humidity_pct = weather_safe_float(parts, HUMIDITY_IDX)
    precip = weather_safe_float(parts, PRECIP_3HR_IDX)
    visibility_m = weather_safe_float(parts, VISIBILITY_IDX)
    wind_gust = weather_safe_float(parts, WIND_GUST_IDX)

    return [
        temp_k - 273.15 if temp_k is not None else None,
        pressure_pa / 100.0 if pressure_pa is not None else None,
        humidity_pct,
        precip,
        visibility_m / 1000.0 if visibility_m is not None else None,
        wind_gust,
    ]


def parse_weather_station_line(line):
    try:
        if not line or line.startswith("1_time"):
            return None

        parts = line.rstrip("\t").split("\t")

        lat = weather_safe_float(parts, LAT_IDX)
        lon = weather_safe_float(parts, LON_IDX)
        if lat is None or lon is None:
            return None

        location_name = weather_location_for(lat, lon)
        if location_name is None:
            return None

        metric_values = extract_weather_values(parts)
        metric_pairs = [metric_pair(value) for value in metric_values]

        if not any(count > 0 for _, count in metric_pairs):
            return None

        flattened = []
        for metric_sum, metric_count in metric_pairs:
            flattened.extend([metric_sum, metric_count])

        return (location_name, (1,) + tuple(flattened))

    except Exception:
        return None


def classify_weather_station_line(line):
    counts = [("raw_lines", 1)]

    try:
        if not line or line.startswith("1_time"):
            return counts

        parts = line.rstrip("\t").split("\t")

        lat = weather_safe_float(parts, LAT_IDX)
        lon = weather_safe_float(parts, LON_IDX)
        if lat is None or lon is None:
            return counts

        counts.append(("valid_latlon", 1))

        if weather_location_for(lat, lon) is None:
            return counts

        counts.append(("selected_location", 1))

        metric_values = extract_weather_values(parts)
        if any(metric_pair(value)[1] > 0 for value in metric_values):
            counts.append(("usable_metric", 1))

        return counts

    except Exception:
        counts.append(("parse_errors", 1))
        return counts


def combine_weather_summaries(left, right):
    return tuple(left[i] + right[i] for i in range(len(left)))


def update_weather_state(new_values, current_state):
    if current_state is None:
        current_state = empty_weather_summary()

    total = current_state
    for value in new_values:
        total = combine_weather_summaries(total, value)

    return total


def weather_state_to_means(state):
    means = {}

    for idx, (metric_key, _) in enumerate(WEATHER_METRICS):
        pair_start = 1 + idx * 2
        metric_sum = state[pair_start]
        metric_count = state[pair_start + 1]
        means[metric_key] = metric_sum / metric_count if metric_count > 0 else None

    return means


# ============================================================
# VISUALIZATION
# ============================================================

def update_weather_history(timestamp, rows):
    batch_label = timestamp.strftime("%H:%M:%S") if hasattr(timestamp, "strftime") else str(timestamp)
    WEATHER_BATCH_LABELS.append(batch_label)

    row_by_location = dict(rows)
    for location in WEATHER_LOCATIONS:
        location_name = location["name"]
        means = weather_state_to_means(row_by_location[location_name]) if location_name in row_by_location else None

        for metric_key, _ in WEATHER_METRICS:
            value = means[metric_key] if means is not None else None
            WEATHER_HISTORY[location_name][metric_key].append(value)


def draw_weather_dashboard():
    global WEATHER_FRAME_COUNT

    fig, axes = plt.subplots(
        nrows=len(WEATHER_METRICS),
        ncols=len(WEATHER_LOCATIONS),
        figsize=(18, 12),
        sharex=True,
    )

    x_values = list(range(len(WEATHER_BATCH_LABELS)))

    for row_idx, (metric_key, metric_label) in enumerate(WEATHER_METRICS):
        for col_idx, location in enumerate(WEATHER_LOCATIONS):
            ax = axes[row_idx][col_idx]
            location_name = location["name"]
            y_values = list(WEATHER_HISTORY[location_name][metric_key])

            ax.plot(x_values[-len(y_values):], y_values, marker="o", linewidth=1.5, markersize=3)
            ax.grid(True, alpha=0.25)

            if row_idx == 0:
                ax.set_title(location_name, fontsize=10)
            if col_idx == 0:
                ax.set_ylabel(metric_label, fontsize=9)
            if row_idx == len(WEATHER_METRICS) - 1:
                ax.set_xlabel("Batch", fontsize=9)

    fig.suptitle("Weather Station Online Summary", fontsize=16)
    fig.tight_layout(rect=[0, 0, 1, 0.97])

    if WEATHER_SAVE_FRAMES:
        os.makedirs(WEATHER_FRAME_DIR, exist_ok=True)
        frame_path = os.path.join(WEATHER_FRAME_DIR, f"weather_station_{WEATHER_FRAME_COUNT:05d}.png")
        fig.savefig(frame_path, dpi=120, bbox_inches="tight")
        WEATHER_FRAME_COUNT += 1

    clear_output(wait=True)
    display(fig)
    plt.close(fig)


def handle_weather_batch(timestamp, rdd):
    if WEATHER_STOP_REQUESTED:
        return

    rows = rdd.collect()
    update_weather_history(timestamp, rows)
    draw_weather_dashboard()

    total_records = sum(row[1][0] for row in rows)
    print(f"Batch: {timestamp} | Online locations: {len(rows)} | Location rows summarized: {total_records}")


def handle_weather_debug_batch(timestamp, rdd):
    if WEATHER_STOP_REQUESTED:
        return

    counts = dict(rdd.collect()) if not rdd.isEmpty() else {}
    print(
        "Debug counts | "
        f"Batch: {timestamp} | "
        f"raw lines: {counts.get('raw_lines', 0)} | "
        f"valid lat/lon: {counts.get('valid_latlon', 0)} | "
        f"selected location: {counts.get('selected_location', 0)} | "
        f"usable metric: {counts.get('usable_metric', 0)} | "
        f"parse errors: {counts.get('parse_errors', 0)}"
    )


# ============================================================
# STREAM CONTROLS
# ============================================================

def build_weather_streaming_context():
    ssc = StreamingContext(sc, WEATHER_BATCH_INTERVAL_SECONDS)
    ssc.checkpoint(WEATHER_CHECKPOINT_DIR)

    lines = ssc.socketTextStream(WEATHER_HOST, WEATHER_PORT)

    debug_counts = lines.flatMap(classify_weather_station_line).reduceByKey(lambda left, right: left + right)
    debug_counts.foreachRDD(handle_weather_debug_batch)

    parsed = lines.map(parse_weather_station_line).filter(lambda row: row is not None)
    batch_summaries = parsed.reduceByKey(combine_weather_summaries)
    online_summaries = batch_summaries.updateStateByKey(update_weather_state)
    online_summaries.foreachRDD(handle_weather_batch)

    return ssc


def weather_server_is_listening(timeout_seconds=2.0):
    try:
        with socket.create_connection((WEATHER_HOST, WEATHER_PORT), timeout=timeout_seconds):
            return True
    except OSError as exc:
        print(f"Could not connect to {WEATHER_HOST}:{WEATHER_PORT}: {exc}")
        print("Start server.py first and wait until it prints the Listening message.")
        return False


def start_weather_station():
    global WEATHER_STREAMING_CONTEXT, WEATHER_STOP_REQUESTED

    WEATHER_STOP_REQUESTED = False

    if WEATHER_SAVE_FRAMES:
        os.makedirs(WEATHER_FRAME_DIR, exist_ok=True)
        print(f"Saving dashboard frames to: {WEATHER_FRAME_DIR}")

    if WEATHER_STREAMING_CONTEXT is not None:
        print("Weather station stream is already running. Call stop_weather_station() first if needed.")
        return WEATHER_STREAMING_CONTEXT

    print(f"Connecting Spark Streaming to {WEATHER_HOST}:{WEATHER_PORT}")
    print("Start server.py first, then call stop_weather_station() when the demo is done.")

    if not weather_server_is_listening():
        print("Spark Streaming was not started.")
        return None

    WEATHER_STREAMING_CONTEXT = build_weather_streaming_context()
    WEATHER_STREAMING_CONTEXT.start()
    return WEATHER_STREAMING_CONTEXT


def run_weather_station():
    return start_weather_station()


def stop_weather_station():
    global WEATHER_STREAMING_CONTEXT, WEATHER_STOP_REQUESTED

    WEATHER_STOP_REQUESTED = True

    if WEATHER_STREAMING_CONTEXT is None:
        print("Weather station stream is not running.")
        return

    WEATHER_STREAMING_CONTEXT.stop(stopSparkContext=False, stopGraceFully=False)
    WEATHER_STREAMING_CONTEXT = None
    print("Weather station stream stopped.")

def reset_weather_station_frames():
    global WEATHER_FRAME_COUNT, WEATHER_STOP_REQUESTED

    os.makedirs(WEATHER_FRAME_DIR, exist_ok=True)
    for filename in os.listdir(WEATHER_FRAME_DIR):
        if filename.startswith("weather_station_") and filename.endswith(".png"):
            os.remove(os.path.join(WEATHER_FRAME_DIR, filename))

    WEATHER_FRAME_COUNT = 0
    WEATHER_STOP_REQUESTED = False
    print(f"Cleared weather station frames from: {WEATHER_FRAME_DIR}")

print("Weather Station code loaded.")
print("Remote test server command is in the comments at the top of this cell.")
print("Frames save automatically while the dashboard updates.")
print("Run stop_weather_station() when the video has enough dashboard updates.")
Weather Station code loaded.
Remote test server command is in the comments at the top of this cell.
Frames save automatically while the dashboard updates.
Run stop_weather_station() when the video has enough dashboard updates.
In [29]:
reset_weather_station_frames()
Cleared weather station frames from: /bigdata/students/rsbhatia2/weather_station_frames
In [42]:
weather_station_streaming_runtime_start = time.perf_counter()
run_weather_station()
No description has been provided for this image
[Stage 2424:>                                                       (0 + 1) / 1]
In [43]:
stop_weather_station()

weather_station_streaming_runtime = time.perf_counter() - weather_station_streaming_runtime_start
print(f"Total Weather Station streaming runtime: {weather_station_streaming_runtime:.2f} seconds")
26/05/22 02:26:33 ERROR TaskSchedulerImpl: Lost executor 8 on 10.0.2.31: Command exited with code 50
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7768_17 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6597_46 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9580_45 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6133_9 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6154_52 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9580_14 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6154_9 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7314_15 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9490_45 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7799_8 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7082_16 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6139_9 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6315_32 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7051_39 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9557_45 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9512_14 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9535_14 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7051_16 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7096_16 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6133_32 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7314_38 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6154_32 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6139_32 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6597_35 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9490_14 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6133_52 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6139_52 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9535_45 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9512_45 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6315_52 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_9557_14 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_7813_8 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6597_12 !
26/05/22 02:26:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_6315_9 !
Weather station stream stopped.
Total Weather Station streaming runtime: 420.03 seconds
No description has been provided for this image

You can find the video in the repository. It is called weather_station.mp4

Prediction/Classification¶

Fog Classification With MLlib¶

This extends the least-foggy Bay Area analysis with a logistic regression classifier. The model predicts whether an individual Bay Area weather observation is foggy or clear.

The label is built from visibility_surface: observations with visibility at or below 10000 meters are labeled foggy, observations with visibility at or above 18000 meters are labeled clear, and middle-visibility observations are dropped. Visibility is intentionally not used as a training feature so the model does not simply learn the label rule.

Training features are cloud cover, surface temperature, relative humidity at the zero-degree isotherm, surface pressure, wind gust speed, month, hour, latitude, and longitude. The data is partitioned by time: 2015-2018 train the model, and 2019 tests it.

This improves the least-foggy analysis by checking whether weather variables other than visibility can distinguish foggy from clear Bay Area conditions. If the model performs well, the hand-built clarity ranking is supported by a learned classifier; if it performs poorly, that suggests visibility is doing most of the work and the score should be interpreted more cautiously.

In [32]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, LongType, DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import StandardScaler, VectorAssembler
import time

from SparkHandler import Sparky, SparkyBatchContext, SparkyAnalysisContext
import util

sc = spark.sparkContext
fog_classification_runtime_start = time.perf_counter()

# ============================================================
# CONFIG
# ============================================================

FOG_CLASSIFICATION_INPUT_GLOB = "hdfs:///3hr/*/*_20*{05,06,07,08,09}*_*"
FOG_CLASSIFICATION_BATCH_SIZE = 50
FOG_CLASSIFICATION_PARTIAL_OUT = f"hdfs:///user/{sc.sparkUser()}/fog_classification_partials"

# Bay Area bounding box, same as least foggy cause this is directly comparing to that.
FOG_LAT_MIN = 36.8
FOG_LAT_MAX = 38.9
FOG_LON_MIN = -123.2
FOG_LON_MAX = -121.4

FOG_MONTHS = [5, 6, 7, 8, 9]
FOGGY_VISIBILITY_MAX = 10000.0
CLEAR_VISIBILITY_MIN = 18000.0

FOG_FEATURE_COLS = [
    "cloud",
    "temp_c",
    "rh",
    "pressure",
    "wind",
    "month",
    "hour",
    "lat",
    "lon",
]

fog_raw_schema = StructType([
    StructField("time_ms", LongType(), False),
    StructField("lat", DoubleType(), False),
    StructField("lon", DoubleType(), False),
    StructField("cloud", DoubleType(), False),
    StructField("temp_k", DoubleType(), False),
    StructField("rh", DoubleType(), False),
    StructField("pressure", DoubleType(), False),
    StructField("wind", DoubleType(), False),
    StructField("visibility", DoubleType(), False),
])

fog_time_idx = None
fog_lat_idx = None
fog_lon_idx = None
fog_cloud_idx = None
fog_temp_idx = None
fog_rh_idx = None
fog_pressure_idx = None
fog_wind_idx = None
fog_visibility_idx = None
In [33]:
def parse_fog_classification_line(line):
    try:
        if line.startswith("1_time"):
            return None

        parts = line.rstrip("\t").split("\t")

        time_ms = util.safe_float(parts, fog_time_idx)
        lat = util.safe_float(parts, fog_lat_idx)
        lon = util.safe_float(parts, fog_lon_idx)

        if time_ms is None or lat is None or lon is None:
            return None

        if not (FOG_LAT_MIN <= lat <= FOG_LAT_MAX and FOG_LON_MIN <= lon <= FOG_LON_MAX):
            return None

        cloud = util.safe_float(parts, fog_cloud_idx)
        temp_k = util.safe_float(parts, fog_temp_idx)
        rh = util.safe_float(parts, fog_rh_idx)
        pressure = util.safe_float(parts, fog_pressure_idx)
        wind = util.safe_float(parts, fog_wind_idx)
        visibility = util.safe_float(parts, fog_visibility_idx)

        if any(value is None for value in [cloud, temp_k, rh, pressure, wind, visibility]):
            return None

        return (
            int(time_ms),
            lat,
            lon,
            cloud,
            temp_k,
            rh,
            pressure,
            wind,
            visibility,
        )

    except Exception:
        return None


def fog_classification_partial_transform(points):
    labeled = (
        points
        .withColumn(
            "timestamp",
            F.from_unixtime((F.col("time_ms") / 1000).cast("long")).cast("timestamp")
        )
        .withColumn("year", F.year("timestamp"))
        .withColumn("month", F.month("timestamp"))
        .withColumn("hour", F.hour("timestamp"))
        .filter(F.col("month").isin(FOG_MONTHS))
        .withColumn("temp_c", F.col("temp_k") - F.lit(273.15))
        .withColumn(
            "label",
            F.when(F.col("visibility") <= F.lit(FOGGY_VISIBILITY_MAX), F.lit(1.0))
             .when(F.col("visibility") >= F.lit(CLEAR_VISIBILITY_MIN), F.lit(0.0))
        )
        .filter(F.col("label").isNotNull())
    )

    return labeled.select(
        "year",
        "month",
        "hour",
        "lat",
        "lon",
        "cloud",
        "temp_c",
        "rh",
        "pressure",
        "wind",
        "label",
    )


def combine_fog_classification_partials(partials):
    return partials


def analyze_fog_classification_rows(rows):
    return rows
In [34]:
fog_sparky = Sparky(spark, sc)

fog_batch_ctx = SparkyBatchContext(
    glob_path=FOG_CLASSIFICATION_INPUT_GLOB,
    partial_out=FOG_CLASSIFICATION_PARTIAL_OUT,
    schema=fog_raw_schema,
    parse_line=parse_fog_classification_line,
    partial_transform=fog_classification_partial_transform,
    batch_size=FOG_CLASSIFICATION_BATCH_SIZE,
)

fog_input_paths = fog_sparky.setup_job(fog_batch_ctx)

if len(fog_input_paths) == 0:
    raise ValueError(f"No files matched: {FOG_CLASSIFICATION_INPUT_GLOB}")


def fog_required_idx(name):
    key = name.strip().lower()
    if key not in fog_sparky.columns:
        raise ValueError(f"Missing required column: {name}")
    return fog_sparky.columns[key]

fog_time_idx = fog_required_idx("time")
fog_lat_idx = fog_required_idx("lat")
fog_lon_idx = fog_required_idx("lon")
fog_cloud_idx = fog_required_idx("total_cloud_cover_entire_atmosphere_single_layer")
fog_temp_idx = fog_required_idx("temperature_surface")
fog_rh_idx = fog_required_idx("relative_humidity_zerodegc_isotherm")
fog_pressure_idx = fog_required_idx("pressure_surface")
fog_wind_idx = fog_required_idx("wind_speed_gust_surface")
fog_visibility_idx = fog_required_idx("visibility_surface")

print("Using column indices:")
print("time:", fog_time_idx)
print("lat:", fog_lat_idx)
print("lon:", fog_lon_idx)
print("cloud cover:", fog_cloud_idx)
print("temperature:", fog_temp_idx)
print("relative humidity:", fog_rh_idx)
print("pressure:", fog_pressure_idx)
print("wind gust:", fog_wind_idx)
print("visibility:", fog_visibility_idx)
print("visibility is used only for labels, not model features")

for batch_id, start in enumerate(range(0, len(fog_input_paths), fog_batch_ctx.batch_size)):
    batch_paths = fog_input_paths[start:start + fog_batch_ctx.batch_size]
    fog_sparky.process_batch(fog_batch_ctx, batch_paths, batch_id)
Deleted old partial output: hdfs:/user/rsbhatia2/fog_classification_partials
Found 4381 files
                                                                                
Using column indices:
time: 0
lat: 1
lon: 2
cloud cover: 12
temperature: 10
relative humidity: 8
pressure: 6
wind gust: 17
visibility: 15
visibility is used only for labels, not model features
Processing batch 0: 50 files
                                                                                
Processing batch 1: 50 files
                                                                                
Processing batch 2: 50 files
                                                                                
Processing batch 3: 50 files
                                                                                
Processing batch 4: 50 files
                                                                                
Processing batch 5: 50 files
                                                                                
Processing batch 6: 50 files
                                                                                
Processing batch 7: 50 files
                                                                                
Processing batch 8: 50 files
                                                                                
Processing batch 9: 50 files
                                                                                
Processing batch 10: 50 files
                                                                                
Processing batch 11: 50 files
                                                                                
Processing batch 12: 50 files
                                                                                
Processing batch 13: 50 files
                                                                                
Processing batch 14: 50 files
                                                                                
Processing batch 15: 50 files
                                                                                
Processing batch 16: 50 files
                                                                                
Processing batch 17: 50 files
                                                                                
Processing batch 18: 50 files
                                                                                
Processing batch 19: 50 files
                                                                                
Processing batch 20: 50 files
                                                                                
Processing batch 21: 50 files
                                                                                
Processing batch 22: 50 files
                                                                                
Processing batch 23: 50 files
                                                                                
Processing batch 24: 50 files
                                                                                
Processing batch 25: 50 files
                                                                                
Processing batch 26: 50 files
                                                                                
Processing batch 27: 50 files
                                                                                
Processing batch 28: 50 files
                                                                                
Processing batch 29: 50 files
                                                                                
Processing batch 30: 50 files
                                                                                
Processing batch 31: 50 files
                                                                                
Processing batch 32: 50 files
                                                                                
Processing batch 33: 50 files
                                                                                
Processing batch 34: 50 files
                                                                                
Processing batch 35: 50 files
                                                                                
Processing batch 36: 50 files
                                                                                
Processing batch 37: 50 files
                                                                                
Processing batch 38: 50 files
                                                                                
Processing batch 39: 50 files
                                                                                
Processing batch 40: 50 files
                                                                                
Processing batch 41: 50 files
                                                                                
Processing batch 42: 50 files
                                                                                
Processing batch 43: 50 files
                                                                                
Processing batch 44: 50 files
                                                                                
Processing batch 45: 50 files
                                                                                
Processing batch 46: 50 files
                                                                                
Processing batch 47: 50 files
                                                                                
Processing batch 48: 50 files
                                                                                
Processing batch 49: 50 files
                                                                                
Processing batch 50: 50 files
                                                                                
Processing batch 51: 50 files
                                                                                
Processing batch 52: 50 files
                                                                                
Processing batch 53: 50 files
                                                                                
Processing batch 54: 50 files
                                                                                
Processing batch 55: 50 files
                                                                                
Processing batch 56: 50 files
                                                                                
Processing batch 57: 50 files
                                                                                
Processing batch 58: 50 files
                                                                                
Processing batch 59: 50 files
                                                                                
Processing batch 60: 50 files
                                                                                
Processing batch 61: 50 files
                                                                                
Processing batch 62: 50 files
                                                                                
Processing batch 63: 50 files
                                                                                
Processing batch 64: 50 files
                                                                                
Processing batch 65: 50 files
                                                                                
Processing batch 66: 50 files
                                                                                
Processing batch 67: 50 files
                                                                                
Processing batch 68: 50 files
                                                                                
Processing batch 69: 50 files
                                                                                
Processing batch 70: 50 files
                                                                                
Processing batch 71: 50 files
                                                                                
Processing batch 72: 50 files
                                                                                
Processing batch 73: 50 files
                                                                                
Processing batch 74: 50 files
                                                                                
Processing batch 75: 50 files
                                                                                
Processing batch 76: 50 files
                                                                                
Processing batch 77: 50 files
                                                                                
Processing batch 78: 50 files
                                                                                
Processing batch 79: 50 files
                                                                                
Processing batch 80: 50 files
                                                                                
Processing batch 81: 50 files
                                                                                
Processing batch 82: 50 files
                                                                                
Processing batch 83: 50 files
                                                                                
Processing batch 84: 50 files
                                                                                
Processing batch 85: 50 files
                                                                                
Processing batch 86: 50 files
                                                                                
Processing batch 87: 31 files
                                                                                
In [35]:
fog_analysis_ctx = SparkyAnalysisContext(
    partial_in=FOG_CLASSIFICATION_PARTIAL_OUT,
    combine_partials=combine_fog_classification_partials,
    analyze=analyze_fog_classification_rows,
    result_out=None,
)

fog_rows = fog_sparky.analyze(fog_analysis_ctx, fog_sparky.combine(fog_analysis_ctx)).cache()

train = fog_rows.filter(F.col("year").between(2015, 2018)).cache()
test = fog_rows.filter(F.col("year") == 2019).cache()

print("Training label counts:")
train.groupBy("label").count().orderBy("label").show()

print("Test label counts:")
test.groupBy("label").count().orderBy("label").show()

train_label_count = train.select("label").distinct().count()
test_label_count = test.select("label").distinct().count()

if train_label_count < 2:
    raise ValueError("Training set does not contain both foggy and clear labels.")

if test_label_count < 2:
    raise ValueError("Test set does not contain both foggy and clear labels.")

print("Model feature columns:", FOG_FEATURE_COLS)
print("visibility_surface excluded from features:", "visibility" not in FOG_FEATURE_COLS)

assembler = VectorAssembler(
    inputCols=FOG_FEATURE_COLS,
    outputCol="features",
    handleInvalid="skip",
)

scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withStd=True,
    withMean=True,
)

lr = LogisticRegression(
    labelCol="label",
    featuresCol="scaled_features",
    maxIter=50,
    regParam=0.01,
    elasticNetParam=0.0,
)

fog_pipeline = Pipeline(stages=[assembler, scaler, lr])
fog_model = fog_pipeline.fit(train)
fog_predictions = fog_model.transform(test).cache()

print("Prediction rows:", fog_predictions.count())
Training label counts:
                                                                                
+-----+------+
|label| count|
+-----+------+
|  0.0|271661|
|  1.0| 33021|
+-----+------+

Test label counts:
+-----+------+
|label| count|
+-----+------+
|  0.0|131402|
|  1.0| 17243|
+-----+------+

Model feature columns: ['cloud', 'temp_c', 'rh', 'pressure', 'wind', 'month', 'hour', 'lat', 'lon']
visibility_surface excluded from features: True
[Stage 1638:====================================================> (57 + 2) / 59]
Prediction rows: 148645
                                                                                
In [36]:
lr_model = fog_model.stages[-1]

coefficient_rows = [
    (feature, float(coef))
    for feature, coef in zip(FOG_FEATURE_COLS, lr_model.coefficients)
]

coefficient_df = spark.createDataFrame(coefficient_rows, ["feature", "coefficient"])
print("Logistic regression coefficients:")
coefficient_df.orderBy(F.desc(F.abs(F.col("coefficient")))).show(truncate=False)

print("Confusion matrix: rows are actual labels, columns are predictions")
(
    fog_predictions
    .groupBy("label")
    .pivot("prediction", [0.0, 1.0])
    .count()
    .na.fill(0)
    .orderBy("label")
    .show()
)

accuracy = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy",
).evaluate(fog_predictions)

weighted_f1 = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="f1",
).evaluate(fog_predictions)

area_under_roc = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC",
).evaluate(fog_predictions)

print(f"Accuracy: {accuracy:.4f}")
print(f"Weighted F1: {weighted_f1:.4f}")
print(f"Area under ROC: {area_under_roc:.4f}")

print("Example predictions:")
(
    fog_predictions
    .select(
        "year",
        "month",
        "hour",
        "lat",
        "lon",
        "label",
        "prediction",
        "probability",
        "cloud",
        "temp_c",
        "rh",
        "pressure",
        "wind",
    )
    .orderBy("year", "month", "hour")
    .show(20, truncate=False)
)

fog_classification_runtime = time.perf_counter() - fog_classification_runtime_start
print(f"Total Prediction/Classification Spark runtime: {fog_classification_runtime:.2f} seconds")
Logistic regression coefficients:
                                                                                
+--------+--------------------+
|feature |coefficient         |
+--------+--------------------+
|cloud   |1.1067942230378602  |
|temp_c  |-0.7872210748882439 |
|lon     |-0.400521811557659  |
|pressure|0.3690058883045009  |
|wind    |-0.36672343270150165|
|month   |0.21371538457949868 |
|hour    |0.15716091232829532 |
|lat     |-0.15161985573516182|
|rh      |-0.06357281862133732|
+--------+--------------------+

Confusion matrix: rows are actual labels, columns are predictions
+-----+------+----+
|label|   0.0| 1.0|
+-----+------+----+
|  0.0|129296|2106|
|  1.0| 14647|2596|
+-----+------+----+

                                                                                
Accuracy: 0.8873
Weighted F1: 0.8577
Area under ROC: 0.9072
Example predictions:
+----+-----+----+------------------+-------------------+-----+----------+-----------------------------------------+-----+------------------+----+----------+---------+
|year|month|hour|lat               |lon                |label|prediction|probability                              |cloud|temp_c            |rh  |pressure  |wind     |
+----+-----+----+------------------+-------------------+-----+----------+-----------------------------------------+-----+------------------+----+----------+---------+
|2019|5    |2   |37.76021306282369 |-122.9866273902384 |0.0  |0.0       |[0.6557099198197698,0.3442900801802302]  |100.0|14.695400000000006|37.0|101081.0  |4.8545904|
|2019|5    |2   |37.76021306282369 |-122.9866273902384 |0.0  |0.0       |[0.6026044674460598,0.3973955325539402]  |100.0|13.338459999999998|66.0|101222.89 |3.1258583|
|2019|5    |2   |37.76021306282369 |-122.9866273902384 |0.0  |0.0       |[0.6040506172227778,0.39594938277722225] |100.0|13.251059999999995|23.0|101211.266|4.5      |
|2019|5    |2   |38.49513874119703 |-122.48812487581753|0.0  |0.0       |[0.8234601659614486,0.17653983403855145] |100.0|11.678500000000042|93.0|97879.695 |2.6258583|
|2019|5    |2   |37.76021306282369 |-122.9866273902384 |0.0  |0.0       |[0.7772587866947785,0.22274121330522145] |100.0|13.197660000000042|5.0 |101257.664|13.644336|
|2019|5    |2   |37.07226085949479 |-121.8466103871987 |0.0  |0.0       |[0.8415230708804309,0.15847692911956912] |100.0|14.138480000000015|28.0|98074.89  |2.7258582|
|2019|5    |2   |37.80626966684229 |-122.03276508131742|1.0  |0.0       |[0.8951205781739128,0.10487942182608723] |79.0 |9.611080000000015 |24.0|98697.67  |6.3      |
|2019|5    |2   |37.13199613541767 |-122.82142370021901|0.0  |0.0       |[0.6123541199467968,0.3876458800532032]  |100.0|13.59847000000002 |46.0|101278.09 |4.325858 |
|2019|5    |2   |37.76021306282369 |-122.9866273902384 |0.0  |0.0       |[0.8160051563116283,0.1839948436883717]  |95.0 |13.342200000000048|32.0|101263.4  |13.7301  |
|2019|5    |2   |38.68439162763422 |-121.98154998865351|0.0  |0.0       |[0.8545880423737439,0.14541195762625614] |100.0|13.478480000000047|56.0|99533.3   |3.4258583|
|2019|5    |2   |37.07226085949479 |-121.8466103871987 |0.0  |0.0       |[0.8876160529681716,0.11238394703182841] |78.0 |11.221060000000023|24.0|97992.87  |2.8      |
|2019|5    |2   |37.44953919998765 |-122.2168242814588 |0.0  |0.0       |[0.7763555894809914,0.22364441051900863] |100.0|15.238500000000045|32.0|99916.49  |3.0258584|
|2019|5    |2   |38.80965104619415 |-121.87476520122758|0.0  |0.0       |[0.9803487412712196,0.019651258728780374]|17.0 |11.217650000000049|39.0|100447.266|4.1443357|
|2019|5    |2   |37.5548823631671  |-121.55606012175988|0.0  |0.0       |[0.9220186583455454,0.07798134165445458] |100.0|12.278470000000027|32.0|95062.89  |2.7258582|
|2019|5    |2   |37.13199613541767 |-122.82142370021901|0.0  |0.0       |[0.5844937625445045,0.41550623745549553] |100.0|13.461080000000038|22.0|101248.87 |3.9      |
|2019|5    |2   |37.5967978678328  |-121.97931844540852|0.0  |0.0       |[0.817165050891673,0.18283494910832698]  |100.0|13.688469999999995|34.0|99323.695 |3.4258583|
|2019|5    |2   |37.76021306282369 |-122.9866273902384 |0.0  |0.0       |[0.7055133871642577,0.2944866128357423]  |100.0|14.050900000000013|21.0|101608.766|9.306425 |
|2019|5    |2   |36.838970056062415|-122.60836677431931|0.0  |0.0       |[0.6105370835193823,0.3894629164806177]  |100.0|13.798500000000047|25.0|101310.09 |3.8258584|
|2019|5    |2   |38.68439162763422 |-121.98154998865351|0.0  |0.0       |[0.9802840247373842,0.019715975262615815]|0.0  |6.141080000000045 |8.0 |99388.87  |3.4      |
|2019|5    |2   |36.84100827316722 |-121.92525054696785|0.0  |0.0       |[0.7255518025469067,0.27444819745309335] |100.0|14.37847000000005 |27.0|101310.09 |3.2258582|
+----+-----+----+------------------+-------------------+-----+----------+-----------------------------------------+-----+------------------+----+----------+---------+
only showing top 20 rows

Total Prediction/Classification Spark runtime: 1312.98 seconds

Fog Classification Results¶

The classifier was trained on 304682 labeled Bay Area observations from 2015-2018 and tested on 148645 labeled observations from 2019. The training set contains 271661 clear observations and 33021 foggy observations. The test set contains 131402 clear observations and 17243 foggy observations, so the data is strongly weighted toward clear conditions.

The model performs well overall on the 2019 holdout set: accuracy is 0.8873, weighted F1 is 0.8577, and area under ROC is 0.9072. The ROC score is the most useful headline metric here because it shows the model can separate foggy and clear observations well even though visibility_surface was excluded from the training features.

The strongest learned signal is cloud cover. Its coefficient is 1.1068, so higher cloud cover pushes predictions toward foggy. Surface temperature has a coefficient of -0.7872, meaning warmer observations push predictions toward clear. Longitude, pressure, wind, month, hour, and latitude also affect the prediction. Relative humidity has the smallest coefficient, -0.0636, so it contributes little in this fitted model.

The confusion matrix shows that the model is much stronger at identifying clear observations than foggy observations. It correctly predicts 129296 clear observations and misclassifies only 2106 clear observations as foggy. For foggy observations, it correctly predicts 2596 and misses 14647. In other words, this model is conservative about predicting fog, which makes sense given the class imbalance in the test set.

This improves the least-foggy analysis because it checks the hand-built clarity score against a learned model. The model uses weather variables other than visibility and still gets strong holdout performance, especially for identifying clear conditions. The main caveat is that the current classifier should be treated as a clear-condition detector more than a fog-event detector unless the training data is rebalanced or the decision threshold is tuned.

The main problem here, is that clear observations dominate the dataset. So the classifier mostly learns how to avoid false positives for fog predictions. As a model that is limiting, but since the we are talking about least foggy here, being able to accurately predict if there is going to be fog is useful for Least Foggy.

Part 2¶

Clash Royale¶

Clash Royale is a 2016 real-time strategy mobile game developed and published by Supercell. It combines elements of collectible card games, tower defense, and multiplayer online battle arena (MOBA). Players collect cards and create a deck to use in 3–5 minute battles, with the aim to destroy opposing towers whilst defending. (from wikipedia)

Number of games in the dataset: 37.9 Million

Task 1: Full-Deck Elo¶

In this task, I am going to apply an aggregated Elo system to decks. Each deck is made of 8 cards, and I consider all combinations of the cards as the same deck.

Example: for 3 card decks, [a, b, c] and [c, a, b] are the same deck for this task.

In [37]:
from pyspark import StorageLevel
from pyspark.sql import functions as F
import time

ROYALE_BATTLES_GLOB = "hdfs:///royale/*/*.csv"
ROYALE_CARD_MASTER_PATH = "hdfs:///royale/CardMasterListSeason18_12082020.csv"
ROYALE_TASK1_PARTIAL_OUT = "hdfs:///sparky/royale_task1_matchup_partials"
ROYALE_TASK1_MATCHUP_OUT = "hdfs:///sparky/royale_task1_matchups"
ROYALE_TASK1_RATINGS_OUT = "hdfs:///sparky/royale_task1_ratings"
ROYALE_TASK1_BATCH_SIZE = 1
ROYALE_TASK1_K = 32.0
ROYALE_TASK1_ITERATIONS = 5
ROYALE_TASK1_MIN_GAMES = 100

# Prevent Spark from spending heap building enormous plan strings when an action starts.
# Kept running out of heap space when doing this, so our overlords suggested this.
spark.conf.set("spark.sql.maxPlanStringLength", "4096")

previous_log_level = sc.getLogLevel() if hasattr(sc, "getLogLevel") else "WARN"
sc.setLogLevel("ERROR")
royale_task1_runtime_start = time.perf_counter()

winner_card_cols = [f"winner.card{i}.id" for i in range(1, 9)]
loser_card_cols = [f"loser.card{i}.id" for i in range(1, 9)]


def clear_hdfs_path(path):
    hadoop_path = spark._jvm.org.apache.hadoop.fs.Path(path)
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
    if fs.exists(hadoop_path):
        fs.delete(hadoop_path, True)
        print(f"Deleted old output: {path}")


def hdfs_glob_paths(glob_path):
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
    statuses = fs.globStatus(spark._jvm.org.apache.hadoop.fs.Path(glob_path))
    if statuses is None:
        return []
    return sorted(str(status.getPath()) for status in statuses)


def deck_key(cols):
    return F.concat_ws(
        "|",
        F.array_sort(F.array(*[F.col(f"`{col}`").cast("string") for col in cols])),
    )


def build_matchup_counts(df):
    return (
        df
        .select(
            deck_key(winner_card_cols).alias("winner_deck"),
            deck_key(loser_card_cols).alias("loser_deck"),
        )
        .where(
            F.col("winner_deck").isNotNull()
            & F.col("loser_deck").isNotNull()
            & (F.size(F.split(F.col("winner_deck"), "\\|")) == 8)
            & (F.size(F.split(F.col("loser_deck"), "\\|")) == 8)
        )
        .groupBy("winner_deck", "loser_deck")
        .agg(F.count("*").alias("games"))
    )


battle_paths = hdfs_glob_paths(ROYALE_BATTLES_GLOB)
if len(battle_paths) == 0:
    raise ValueError(f"No battle CSV files matched: {ROYALE_BATTLES_GLOB}")

print(f"Found {len(battle_paths)} battle CSV files")
clear_hdfs_path(ROYALE_TASK1_PARTIAL_OUT)
clear_hdfs_path(ROYALE_TASK1_MATCHUP_OUT)
clear_hdfs_path(ROYALE_TASK1_RATINGS_OUT)

for batch_id, start in enumerate(range(0, len(battle_paths), ROYALE_TASK1_BATCH_SIZE)):
    batch_paths = battle_paths[start:start + ROYALE_TASK1_BATCH_SIZE]
    print(f"Processing Clash Royale Task 1 batch {batch_id}: {len(batch_paths)} file(s)")

    batch_df = (
        spark.read
        .option("header", "true")
        .option("multiLine", "false")
        .csv(batch_paths)
    )

    missing_cols = [col for col in winner_card_cols + loser_card_cols if col not in batch_df.columns]
    if missing_cols:
        raise ValueError(f"Missing expected battle columns in batch {batch_id}: {missing_cols}")

    partial_matchups = build_matchup_counts(batch_df)
    (
        partial_matchups
        .write
        .mode("overwrite")
        .parquet(f"{ROYALE_TASK1_PARTIAL_OUT}/batch_{batch_id:04d}")
    )

    print(f"Wrote partial matchup counts for batch {batch_id}")

combined_matchups = (
    spark.read
    .parquet(f"{ROYALE_TASK1_PARTIAL_OUT}/batch_*")
    .groupBy("winner_deck", "loser_deck")
    .agg(F.sum("games").alias("games"))
)

(
    combined_matchups
    .write
    .mode("overwrite")
    .parquet(ROYALE_TASK1_MATCHUP_OUT)
)

matchups = spark.read.parquet(ROYALE_TASK1_MATCHUP_OUT).persist(StorageLevel.DISK_ONLY)
matchup_count = matchups.count()
print(f"Combined matchup rows: {matchup_count}")

initial_ratings = (
    matchups.select(F.col("winner_deck").alias("deck"))
    .union(matchups.select(F.col("loser_deck").alias("deck")))
    .distinct()
    .withColumn("elo", F.lit(800.0))
)

initial_rating_path = f"{ROYALE_TASK1_RATINGS_OUT}/iteration_0000"
(
    initial_ratings
    .write
    .mode("overwrite")
    .parquet(initial_rating_path)
)

ratings = spark.read.parquet(initial_rating_path).persist(StorageLevel.DISK_ONLY)
ratings.count()

for iteration in range(ROYALE_TASK1_ITERATIONS):
    w = ratings.select(F.col("deck").alias("winner_deck"), F.col("elo").alias("winner_elo"))
    l = ratings.select(F.col("deck").alias("loser_deck"), F.col("elo").alias("loser_elo"))

    matchup_deltas = (
        matchups
        .join(w, "winner_deck")
        .join(l, "loser_deck")
        .withColumn(
            "expected_win",
            F.expr("1.0 / (1.0 + pow(10.0, (loser_elo - winner_elo) / 400.0))"),
        )
        .withColumn("winner_delta", F.lit(ROYALE_TASK1_K) * F.col("games") * (1.0 - F.col("expected_win")))
        .withColumn("loser_delta", -F.col("winner_delta"))
    )

    deck_deltas = (
        matchup_deltas.select(
            F.col("winner_deck").alias("deck"),
            F.col("winner_delta").alias("delta"),
            F.col("games").alias("games_seen"),
        )
        .union(
            matchup_deltas.select(
                F.col("loser_deck").alias("deck"),
                F.col("loser_delta").alias("delta"),
                F.col("games").alias("games_seen"),
            )
        )
        .groupBy("deck")
        .agg(
            F.sum("delta").alias("total_delta"),
            F.sum("games_seen").alias("games_seen"),
        )
        .select("deck", (F.col("total_delta") / F.col("games_seen")).alias("delta"))
    )

    updated_ratings = (
        ratings
        .join(deck_deltas, "deck", "left")
        .select("deck", (F.col("elo") + F.coalesce(F.col("delta"), F.lit(0.0))).alias("elo"))
    )

    next_rating_path = f"{ROYALE_TASK1_RATINGS_OUT}/iteration_{iteration + 1:04d}"
    (
        updated_ratings
        .write
        .mode("overwrite")
        .parquet(next_rating_path)
    )

    ratings.unpersist(blocking=False)
    ratings = spark.read.parquet(next_rating_path).persist(StorageLevel.DISK_ONLY)
    ratings.count()
    print(f"Finished Elo iteration {iteration + 1}/{ROYALE_TASK1_ITERATIONS}")

deck_game_counts = (
    matchups.select(F.col("winner_deck").alias("deck"), F.col("games").alias("games_seen"))
    .union(matchups.select(F.col("loser_deck").alias("deck"), F.col("games").alias("games_seen")))
    .groupBy("deck")
    .agg(F.sum("games_seen").alias("games_seen"))
)

rating_rows = (
    ratings
    .join(deck_game_counts, "deck", "left")
    .select("deck", F.round("elo", 2).alias("elo"), F.coalesce(F.col("games_seen"), F.lit(0)).alias("games_seen"))
    .persist(StorageLevel.DISK_ONLY)
)
unique_deck_count = rating_rows.count()
ranked_rating_rows = rating_rows.where(F.col("games_seen") >= ROYALE_TASK1_MIN_GAMES).persist(StorageLevel.DISK_ONLY)
ranked_deck_count = ranked_rating_rows.count()

card_master_df = (
    spark.read
    .option("header", "true")
    .csv(ROYALE_CARD_MASTER_PATH)
)
card_id_col = "team.card1.id" if "team.card1.id" in card_master_df.columns else card_master_df.columns[0]
card_name_col = "team.card1.name" if "team.card1.name" in card_master_df.columns else card_master_df.columns[1]
card_name_rows = (
    card_master_df
    .select(F.col(f"`{card_id_col}`").cast("string").alias("card_id"), F.col(f"`{card_name_col}`").alias("card_name"))
    .where(F.col("card_id").isNotNull() & F.col("card_name").isNotNull())
    .collect()
)
card_names = {row["card_id"]: row["card_name"] for row in card_name_rows}


def display_deck(deck):
    return ", ".join(card_names.get(card_id, card_id) for card_id in deck.split("|"))


def show_ranked_decks(title, rows):
    print(title)
    for rank, row in enumerate(rows, start=1):
        print(f"{rank}. Elo {row['elo']:.2f}, games {row['games_seen']}: {display_deck(row['deck'])}")
    print()

print(f"Unique decks seen: {unique_deck_count}")
print(f"Decks with at least {ROYALE_TASK1_MIN_GAMES} games: {ranked_deck_count}")

top_5_decks = ranked_rating_rows.orderBy(F.desc("elo")).limit(5).collect()
bottom_5_decks = ranked_rating_rows.orderBy(F.asc("elo")).limit(5).collect()

show_ranked_decks(f"Top 5 full decks with at least {ROYALE_TASK1_MIN_GAMES} games", top_5_decks)
show_ranked_decks(f"Bottom 5 full decks with at least {ROYALE_TASK1_MIN_GAMES} games", bottom_5_decks)

royale_task1_runtime = time.perf_counter() - royale_task1_runtime_start
print(f"Total Clash Royale Task 1 Spark runtime: {royale_task1_runtime:.2f} seconds")

sc.setLogLevel(previous_log_level)
Found 10 battle CSV files
Deleted old output: hdfs:///sparky/royale_task1_matchup_partials
Deleted old output: hdfs:///sparky/royale_task1_matchups
Deleted old output: hdfs:///sparky/royale_task1_ratings
Processing Clash Royale Task 1 batch 0: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 0
Processing Clash Royale Task 1 batch 1: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 1
Processing Clash Royale Task 1 batch 2: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 2
Processing Clash Royale Task 1 batch 3: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 3
Processing Clash Royale Task 1 batch 4: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 4
Processing Clash Royale Task 1 batch 5: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 5
Processing Clash Royale Task 1 batch 6: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 6
Processing Clash Royale Task 1 batch 7: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 7
Processing Clash Royale Task 1 batch 8: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 8
Processing Clash Royale Task 1 batch 9: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 9
                                                                                
Combined matchup rows: 35768615
                                                                                
Finished Elo iteration 1/5
                                                                                
Finished Elo iteration 2/5
                                                                                
Finished Elo iteration 3/5
                                                                                ]
Finished Elo iteration 4/5
                                                                                
Finished Elo iteration 5/5
                                                                                
Unique decks seen: 16074675
Decks with at least 100 games: 23929
Top 5 full decks with at least 100 games
1. Elo 859.35, games 115: P.E.K.K.A, Valkyrie, Mega Minion, Goblin Hut, Furnace, Fireball, Arrows, Zap
2. Elo 859.16, games 721: Goblins, Valkyrie, Musketeer, Hog Rider, Inferno Tower, Fireball, Arrows, Zap
3. Elo 858.25, games 111: Knight, Minions, Balloon, Lumberjack, Tesla, Fireball, Arrows, Zap
4. Elo 854.86, games 217: Balloon, Valkyrie, Inferno Dragon, Electro Wizard, Mega Knight, Electro Spirit, Poison, Giant Snowball
5. Elo 853.19, games 188: Balloon, Valkyrie, Wizard, Electro Wizard, Mega Knight, Firecracker, Goblin Barrel, The Log

Bottom 5 full decks with at least 100 games
1. Elo 729.01, games 174: Fireball, Arrows, Lightning, Zap, Poison, The Log, Tornado, Giant Snowball
2. Elo 729.52, games 172: Fireball, Arrows, Rocket, Lightning, Zap, Poison, The Log, Giant Snowball
3. Elo 729.82, games 194: Fireball, Arrows, Rocket, Zap, Poison, The Log, Tornado, Heal Spirit
4. Elo 730.04, games 106: Fireball, Arrows, Rage, Zap, Poison, Tornado, Earthquake, Giant Snowball
5. Elo 730.60, games 178: Fireball, Arrows, Zap, Poison, The Log, Tornado, Earthquake, Giant Snowball

Total Clash Royale Task 1 Spark runtime: 788.18 seconds

Interpretation¶

The highest rated deck is as shown above with 115 games. I think deck 2 is probably the best one, giving that its Elo is very close (0.19 diff) and there have been almost 6 times as many games as the first deck.

Just looking at the Bottom decks, itseems like haing a bunch of spells is really really bad.

Task 2: First-Three-Slot Sub-Deck Elo¶

In this task , I am looking at the first-three slots only, as those are powered slots that allow cards that can be powered up to activate their powers.

This is less accurate, because there are games in the dataset where players might choose to not have powered cards or maybe they don't have enough powered cards. However, this was still interesting.

In [38]:
from pyspark import StorageLevel
from pyspark.sql import functions as F
import time

ROYALE_BATTLES_GLOB = "hdfs:///royale/*/*.csv"
ROYALE_CARD_MASTER_PATH = "hdfs:///royale/CardMasterListSeason18_12082020.csv"
ROYALE_TASK2_PARTIAL_OUT = "hdfs:///sparky/royale_task2_matchup_partials"
ROYALE_TASK2_MATCHUP_OUT = "hdfs:///sparky/royale_task2_matchups"
ROYALE_TASK2_RATINGS_OUT = "hdfs:///sparky/royale_task2_ratings"
ROYALE_TASK2_BATCH_SIZE = 1
ROYALE_TASK2_K = 32.0
ROYALE_TASK2_ITERATIONS = 5
ROYALE_TASK2_MIN_GAMES = 100

spark.conf.set("spark.sql.maxPlanStringLength", "4096")

previous_log_level = sc.getLogLevel() if hasattr(sc, "getLogLevel") else "WARN"
sc.setLogLevel("ERROR")
royale_task2_runtime_start = time.perf_counter()

winner_subdeck_cols = [f"winner.card{i}.id" for i in range(1, 4)]
loser_subdeck_cols = [f"loser.card{i}.id" for i in range(1, 4)]


def clear_hdfs_path_task2(path):
    hadoop_path = spark._jvm.org.apache.hadoop.fs.Path(path)
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
    if fs.exists(hadoop_path):
        fs.delete(hadoop_path, True)
        print(f"Deleted old output: {path}")


def hdfs_glob_paths_task2(glob_path):
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
    statuses = fs.globStatus(spark._jvm.org.apache.hadoop.fs.Path(glob_path))
    if statuses is None:
        return []
    return sorted(str(status.getPath()) for status in statuses)


def ordered_subdeck_key(cols):
    return F.concat_ws("|", *[F.trim(F.col(f"`{col}`").cast("string")) for col in cols])


def complete_subdeck_filter(cols):
    checks = [F.col(f"`{col}`").isNotNull() & (F.trim(F.col(f"`{col}`").cast("string")) != "") for col in cols]
    condition = checks[0]
    for check in checks[1:]:
        condition = condition & check
    return condition


def build_task2_matchup_counts(df):
    return (
        df
        .where(complete_subdeck_filter(winner_subdeck_cols) & complete_subdeck_filter(loser_subdeck_cols))
        .select(
            ordered_subdeck_key(winner_subdeck_cols).alias("winner_deck"),
            ordered_subdeck_key(loser_subdeck_cols).alias("loser_deck"),
        )
        .groupBy("winner_deck", "loser_deck")
        .agg(F.count("*").alias("games"))
    )


battle_paths = hdfs_glob_paths_task2(ROYALE_BATTLES_GLOB)
if len(battle_paths) == 0:
    raise ValueError(f"No battle CSV files matched: {ROYALE_BATTLES_GLOB}")

print(f"Found {len(battle_paths)} battle CSV files")
clear_hdfs_path_task2(ROYALE_TASK2_PARTIAL_OUT)
clear_hdfs_path_task2(ROYALE_TASK2_MATCHUP_OUT)
clear_hdfs_path_task2(ROYALE_TASK2_RATINGS_OUT)

for batch_id, start in enumerate(range(0, len(battle_paths), ROYALE_TASK2_BATCH_SIZE)):
    batch_paths = battle_paths[start:start + ROYALE_TASK2_BATCH_SIZE]
    print(f"Processing Clash Royale Task 2 batch {batch_id}: {len(batch_paths)} file(s)")

    batch_df = (
        spark.read
        .option("header", "true")
        .option("multiLine", "false")
        .csv(batch_paths)
    )

    missing_cols = [col for col in winner_subdeck_cols + loser_subdeck_cols if col not in batch_df.columns]
    if missing_cols:
        raise ValueError(f"Missing expected battle columns in batch {batch_id}: {missing_cols}")

    partial_matchups = build_task2_matchup_counts(batch_df)
    (
        partial_matchups
        .write
        .mode("overwrite")
        .parquet(f"{ROYALE_TASK2_PARTIAL_OUT}/batch_{batch_id:04d}")
    )

    print(f"Wrote partial matchup counts for batch {batch_id}")

combined_matchups = (
    spark.read
    .parquet(f"{ROYALE_TASK2_PARTIAL_OUT}/batch_*")
    .groupBy("winner_deck", "loser_deck")
    .agg(F.sum("games").alias("games"))
)

(
    combined_matchups
    .write
    .mode("overwrite")
    .parquet(ROYALE_TASK2_MATCHUP_OUT)
)

matchups = spark.read.parquet(ROYALE_TASK2_MATCHUP_OUT).persist(StorageLevel.DISK_ONLY)
matchup_count = matchups.count()
print(f"Combined matchup rows: {matchup_count}")

initial_ratings = (
    matchups.select(F.col("winner_deck").alias("deck"))
    .union(matchups.select(F.col("loser_deck").alias("deck")))
    .distinct()
    .withColumn("elo", F.lit(800.0))
)

initial_rating_path = f"{ROYALE_TASK2_RATINGS_OUT}/iteration_0000"
(
    initial_ratings
    .write
    .mode("overwrite")
    .parquet(initial_rating_path)
)

ratings = spark.read.parquet(initial_rating_path).persist(StorageLevel.DISK_ONLY)
ratings.count()

for iteration in range(ROYALE_TASK2_ITERATIONS):
    w = ratings.select(F.col("deck").alias("winner_deck"), F.col("elo").alias("winner_elo"))
    l = ratings.select(F.col("deck").alias("loser_deck"), F.col("elo").alias("loser_elo"))

    matchup_deltas = (
        matchups
        .join(w, "winner_deck")
        .join(l, "loser_deck")
        .withColumn(
            "expected_win",
            F.expr("1.0 / (1.0 + pow(10.0, (loser_elo - winner_elo) / 400.0))"),
        )
        .withColumn("winner_delta", F.lit(ROYALE_TASK2_K) * F.col("games") * (1.0 - F.col("expected_win")))
        .withColumn("loser_delta", -F.col("winner_delta"))
    )

    deck_deltas = (
        matchup_deltas.select(
            F.col("winner_deck").alias("deck"),
            F.col("winner_delta").alias("delta"),
            F.col("games").alias("games_seen"),
        )
        .union(
            matchup_deltas.select(
                F.col("loser_deck").alias("deck"),
                F.col("loser_delta").alias("delta"),
                F.col("games").alias("games_seen"),
            )
        )
        .groupBy("deck")
        .agg(
            F.sum("delta").alias("total_delta"),
            F.sum("games_seen").alias("games_seen"),
        )
        .select("deck", (F.col("total_delta") / F.col("games_seen")).alias("delta"))
    )

    updated_ratings = (
        ratings
        .join(deck_deltas, "deck", "left")
        .select("deck", (F.col("elo") + F.coalesce(F.col("delta"), F.lit(0.0))).alias("elo"))
    )

    next_rating_path = f"{ROYALE_TASK2_RATINGS_OUT}/iteration_{iteration + 1:04d}"
    (
        updated_ratings
        .write
        .mode("overwrite")
        .parquet(next_rating_path)
    )

    ratings.unpersist(blocking=False)
    ratings = spark.read.parquet(next_rating_path).persist(StorageLevel.DISK_ONLY)
    ratings.count()
    print(f"Finished Elo iteration {iteration + 1}/{ROYALE_TASK2_ITERATIONS}")

deck_game_counts = (
    matchups.select(F.col("winner_deck").alias("deck"), F.col("games").alias("games_seen"))
    .union(matchups.select(F.col("loser_deck").alias("deck"), F.col("games").alias("games_seen")))
    .groupBy("deck")
    .agg(F.sum("games_seen").alias("games_seen"))
)

rating_rows = (
    ratings
    .join(deck_game_counts, "deck", "left")
    .select("deck", F.round("elo", 2).alias("elo"), F.coalesce(F.col("games_seen"), F.lit(0)).alias("games_seen"))
    .persist(StorageLevel.DISK_ONLY)
)
unique_deck_count = rating_rows.count()
ranked_rating_rows = rating_rows.where(F.col("games_seen") >= ROYALE_TASK2_MIN_GAMES).persist(StorageLevel.DISK_ONLY)
ranked_deck_count = ranked_rating_rows.count()

card_master_df = (
    spark.read
    .option("header", "true")
    .csv(ROYALE_CARD_MASTER_PATH)
)
card_id_col = "team.card1.id" if "team.card1.id" in card_master_df.columns else card_master_df.columns[0]
card_name_col = "team.card1.name" if "team.card1.name" in card_master_df.columns else card_master_df.columns[1]
card_name_rows = (
    card_master_df
    .select(F.col(f"`{card_id_col}`").cast("string").alias("card_id"), F.col(f"`{card_name_col}`").alias("card_name"))
    .where(F.col("card_id").isNotNull() & F.col("card_name").isNotNull())
    .collect()
)
card_names = {row["card_id"]: row["card_name"] for row in card_name_rows}


def display_subdeck(deck):
    return ", ".join(card_names.get(card_id, card_id) for card_id in deck.split("|"))


def show_ranked_subdecks(title, rows):
    print(title)
    for rank, row in enumerate(rows, start=1):
        print(f"{rank}. Elo {row['elo']:.2f}, games {row['games_seen']}: {display_subdeck(row['deck'])}")
    print()

print(f"Unique ordered first-three-slot sub-decks seen: {unique_deck_count}")
print(f"Sub-decks with at least {ROYALE_TASK2_MIN_GAMES} games: {ranked_deck_count}")

top_5_decks = ranked_rating_rows.orderBy(F.desc("elo")).limit(5).collect()
bottom_5_decks = ranked_rating_rows.orderBy(F.asc("elo")).limit(5).collect()

show_ranked_subdecks(f"Top 5 ordered first-three-slot sub-decks with at least {ROYALE_TASK2_MIN_GAMES} games", top_5_decks)
show_ranked_subdecks(f"Bottom 5 ordered first-three-slot sub-decks with at least {ROYALE_TASK2_MIN_GAMES} games", bottom_5_decks)

royale_task2_runtime = time.perf_counter() - royale_task2_runtime_start
print(f"Total Clash Royale Task 2 Spark runtime: {royale_task2_runtime:.2f} seconds")

sc.setLogLevel(previous_log_level)
Found 10 battle CSV files
Deleted old output: hdfs:///sparky/royale_task2_matchup_partials
Deleted old output: hdfs:///sparky/royale_task2_matchups
Deleted old output: hdfs:///sparky/royale_task2_ratings
Processing Clash Royale Task 2 batch 0: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 0
Processing Clash Royale Task 2 batch 1: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 1
Processing Clash Royale Task 2 batch 2: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 2
Processing Clash Royale Task 2 batch 3: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 3
Processing Clash Royale Task 2 batch 4: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 4
Processing Clash Royale Task 2 batch 5: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 5
Processing Clash Royale Task 2 batch 6: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 6
Processing Clash Royale Task 2 batch 7: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 7
Processing Clash Royale Task 2 batch 8: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 8
Processing Clash Royale Task 2 batch 9: 1 file(s)
                                                                                
Wrote partial matchup counts for batch 9
                                                                                
Combined matchup rows: 36691387
                                                                                
Finished Elo iteration 1/5
                                                                                
Finished Elo iteration 2/5
                                                                                
Finished Elo iteration 3/5
                                                                                
Finished Elo iteration 4/5
                                                                                
Finished Elo iteration 5/5
                                                                                
Unique ordered first-three-slot sub-decks seen: 805306
Sub-decks with at least 100 games: 140643
Top 5 ordered first-three-slot sub-decks with at least 100 games
1. Elo 847.63, games 202: Inferno Tower, Zap, Goblins
2. Elo 839.37, games 117: Royal Recruits, Balloon, Cannon Cart
3. Elo 837.57, games 143: Valkyrie, Hunter, Spear Goblins
4. Elo 834.83, games 111: Battle Ram, Witch, Hunter
5. Elo 834.09, games 113: Dark Prince, Hunter, Magic Archer

Bottom 5 ordered first-three-slot sub-decks with at least 100 games
1. Elo 728.79, games 681: Earthquake, Giant Snowball, Rage
2. Elo 728.85, games 1008: Poison, Zap, Earthquake
3. Elo 731.28, games 103: Zap, Giant Snowball, Earthquake
4. Elo 731.31, games 103: The Log, Zap, Giant Snowball
5. Elo 731.70, games 431: Furnace, Tombstone, Inferno Tower

Total Clash Royale Task 2 Spark runtime: 349.05 seconds

Interpretation¶

Inferno tower, Zap, and Goblins form the best power trio. Interestingly, it is the structure and spell only trios that perform the worst.

Based on this result and the one before, I think a player might want to pull back on how many spells and structures are added to the deck.

Task 3: Level the playing field.¶

In [39]:
from pyspark import StorageLevel
from pyspark.sql import functions as F
import time

# A level-agnostic card keeps a strong win rate across multiple observed card levels.
ROYALE_BATTLES_GLOB = "hdfs:///royale/*/*.csv"
ROYALE_CARD_MASTER_PATH = "hdfs:///royale/CardMasterListSeason18_12082020.csv"
ROYALE_TASK3_PARTIAL_OUT = "hdfs:///sparky/royale_task3_card_level_partials"
ROYALE_TASK3_CARD_LEVEL_OUT = "hdfs:///sparky/royale_task3_card_level_summary"
ROYALE_TASK3_BATCH_SIZE = 1
ROYALE_TASK3_MIN_LEVEL_GAMES = 1000
ROYALE_TASK3_MIN_TOTAL_GAMES = 10000
ROYALE_TASK3_MIN_LEVELS = 3
ROYALE_TASK3_WIN_RATE_FLOOR = 0.50

spark.conf.set("spark.sql.maxPlanStringLength", "4096")

previous_log_level = sc.getLogLevel() if hasattr(sc, "getLogLevel") else "WARN"
sc.setLogLevel("ERROR")
royale_task3_runtime_start = time.perf_counter()

winner_task3_cols = [(f"winner.card{i}.id", f"winner.card{i}.level") for i in range(1, 9)]
loser_task3_cols = [(f"loser.card{i}.id", f"loser.card{i}.level") for i in range(1, 9)]


def clear_hdfs_path_task3(path):
    hadoop_path = spark._jvm.org.apache.hadoop.fs.Path(path)
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
    if fs.exists(hadoop_path):
        fs.delete(hadoop_path, True)
        print(f"Deleted old output: {path}")


def hdfs_glob_paths_task3(glob_path):
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
    statuses = fs.globStatus(spark._jvm.org.apache.hadoop.fs.Path(glob_path))
    if statuses is None:
        return []
    return sorted(str(status.getPath()) for status in statuses)


def card_level_struct(card_col, level_col, wins):
    return F.struct(
        F.trim(F.col(f"`{card_col}`").cast("string")).alias("card_id"),
        F.col(f"`{level_col}`").cast("int").alias("card_level"),
        F.lit(wins).cast("long").alias("wins"),
        F.lit(1).cast("long").alias("games"),
    )


def build_task3_card_level_counts(df):
    winner_observations = [card_level_struct(card_col, level_col, 1) for card_col, level_col in winner_task3_cols]
    loser_observations = [card_level_struct(card_col, level_col, 0) for card_col, level_col in loser_task3_cols]

    return (
        df
        .select(F.explode(F.array(*(winner_observations + loser_observations))).alias("obs"))
        .select("obs.card_id", "obs.card_level", "obs.wins", "obs.games")
        .where(
            F.col("card_id").isNotNull()
            & (F.col("card_id") != "")
            & F.col("card_level").isNotNull()
        )
        .groupBy("card_id", "card_level")
        .agg(
            F.sum("wins").alias("wins"),
            F.sum("games").alias("games"),
        )
    )


battle_paths = hdfs_glob_paths_task3(ROYALE_BATTLES_GLOB)
if len(battle_paths) == 0:
    raise ValueError(f"No battle CSV files matched: {ROYALE_BATTLES_GLOB}")

print(f"Found {len(battle_paths)} battle CSV files")
clear_hdfs_path_task3(ROYALE_TASK3_PARTIAL_OUT)
clear_hdfs_path_task3(ROYALE_TASK3_CARD_LEVEL_OUT)

expected_task3_cols = [col for pair in winner_task3_cols + loser_task3_cols for col in pair]

for batch_id, start in enumerate(range(0, len(battle_paths), ROYALE_TASK3_BATCH_SIZE)):
    batch_paths = battle_paths[start:start + ROYALE_TASK3_BATCH_SIZE]
    print(f"Processing Clash Royale Task 3 batch {batch_id}: {len(batch_paths)} file(s)")

    batch_df = (
        spark.read
        .option("header", "true")
        .option("multiLine", "false")
        .csv(batch_paths)
    )

    missing_cols = [col for col in expected_task3_cols if col not in batch_df.columns]
    if missing_cols:
        raise ValueError(f"Missing expected battle columns in batch {batch_id}: {missing_cols}")

    partial_card_levels = build_task3_card_level_counts(batch_df)
    (
        partial_card_levels
        .write
        .mode("overwrite")
        .parquet(f"{ROYALE_TASK3_PARTIAL_OUT}/batch_{batch_id:04d}")
    )

    print(f"Wrote partial card-level counts for batch {batch_id}")

combined_card_levels = (
    spark.read
    .parquet(f"{ROYALE_TASK3_PARTIAL_OUT}/batch_*")
    .groupBy("card_id", "card_level")
    .agg(
        F.sum("wins").alias("wins"),
        F.sum("games").alias("games"),
    )
    .withColumn("level_win_rate", F.col("wins") / F.col("games"))
)

(
    combined_card_levels
    .write
    .mode("overwrite")
    .parquet(ROYALE_TASK3_CARD_LEVEL_OUT)
)

card_level_summary = spark.read.parquet(ROYALE_TASK3_CARD_LEVEL_OUT).persist(StorageLevel.DISK_ONLY)
card_level_row_count = card_level_summary.count()
print(f"Combined card-level aggregate rows: {card_level_row_count}")

eligible_card_levels = card_level_summary.where(F.col("games") >= ROYALE_TASK3_MIN_LEVEL_GAMES)

card_scores = (
    eligible_card_levels
    .groupBy("card_id")
    .agg(
        F.sum("wins").alias("total_wins"),
        F.sum("games").alias("total_games"),
        F.count("*").alias("eligible_levels"),
        F.min("level_win_rate").alias("min_level_win_rate"),
        F.max("level_win_rate").alias("max_level_win_rate"),
        F.coalesce(F.stddev_pop("level_win_rate"), F.lit(0.0)).alias("win_rate_stddev"),
    )
    .withColumn("overall_win_rate", F.col("total_wins") / F.col("total_games"))
    .where(
        (F.col("total_games") >= ROYALE_TASK3_MIN_TOTAL_GAMES)
        & (F.col("eligible_levels") >= ROYALE_TASK3_MIN_LEVELS)
        & (F.col("overall_win_rate") >= ROYALE_TASK3_WIN_RATE_FLOOR)
    )
    .withColumn("level_agnostic_score", F.col("overall_win_rate") - (F.lit(2.0) * F.col("win_rate_stddev")))
    .persist(StorageLevel.DISK_ONLY)
)

eligible_card_count = card_scores.count()
print(f"Cards passing Task 3 filters: {eligible_card_count}")

card_master_df = (
    spark.read
    .option("header", "true")
    .csv(ROYALE_CARD_MASTER_PATH)
)
card_id_col = "team.card1.id" if "team.card1.id" in card_master_df.columns else card_master_df.columns[0]
card_name_col = "team.card1.name" if "team.card1.name" in card_master_df.columns else card_master_df.columns[1]
card_name_rows = (
    card_master_df
    .select(F.col(f"`{card_id_col}`").cast("string").alias("card_id"), F.col(f"`{card_name_col}`").alias("card_name"))
    .where(F.col("card_id").isNotNull() & F.col("card_name").isNotNull())
    .collect()
)
card_names = {row["card_id"]: row["card_name"] for row in card_name_rows}


def display_card(card_id):
    return card_names.get(card_id, card_id)


top_3_cards = (
    card_scores
    .orderBy(F.desc("level_agnostic_score"), F.desc("overall_win_rate"), F.desc("total_games"))
    .limit(3)
    .collect()
)

print("Top 3 level-agnostic winning cards")
for rank, row in enumerate(top_3_cards, start=1):
    print(
        f"{rank}. {display_card(row['card_id'])}: "
        f"score {row['level_agnostic_score']:.4f}, "
        f"win rate {row['overall_win_rate']:.2%}, "
        f"games {row['total_games']}, "
        f"eligible levels {row['eligible_levels']}, "
        f"level win-rate range {row['min_level_win_rate']:.2%}-{row['max_level_win_rate']:.2%}, "
        f"stddev {row['win_rate_stddev']:.4f}"
    )

card_level_summary.unpersist(blocking=False)
card_scores.unpersist(blocking=False)

royale_task3_runtime = time.perf_counter() - royale_task3_runtime_start
print(f"Total Clash Royale Task 3 Spark runtime: {royale_task3_runtime:.2f} seconds")
sc.setLogLevel(previous_log_level)
Found 10 battle CSV files
Deleted old output: hdfs:///sparky/royale_task3_card_level_partials
Deleted old output: hdfs:///sparky/royale_task3_card_level_summary
Processing Clash Royale Task 3 batch 0: 1 file(s)
                                                                                
Wrote partial card-level counts for batch 0
Processing Clash Royale Task 3 batch 1: 1 file(s)
                                                                                
Wrote partial card-level counts for batch 1
Processing Clash Royale Task 3 batch 2: 1 file(s)
                                                                                
Wrote partial card-level counts for batch 2
Processing Clash Royale Task 3 batch 3: 1 file(s)
                                                                                
Wrote partial card-level counts for batch 3
Processing Clash Royale Task 3 batch 4: 1 file(s)
                                                                                
Wrote partial card-level counts for batch 4
Processing Clash Royale Task 3 batch 5: 1 file(s)
                                                                                
Wrote partial card-level counts for batch 5
Processing Clash Royale Task 3 batch 6: 1 file(s)
                                                                                
Wrote partial card-level counts for batch 6
Processing Clash Royale Task 3 batch 7: 1 file(s)
                                                                                
Wrote partial card-level counts for batch 7
Processing Clash Royale Task 3 batch 8: 1 file(s)
                                                                                
Wrote partial card-level counts for batch 8
Processing Clash Royale Task 3 batch 9: 1 file(s)
                                                                                
Wrote partial card-level counts for batch 9
Combined card-level aggregate rows: 986
                                                                                
Cards passing Task 3 filters: 49
Top 3 level-agnostic winning cards
1. The Log: score 0.4831, win rate 50.37%, games 20915126, eligible levels 5, level win-rate range 48.39%-51.36%, stddev 0.0103
2. Night Witch: score 0.4788, win rate 51.31%, games 4568406, eligible levels 5, level win-rate range 47.83%-52.86%, stddev 0.0171
3. Mega Knight: score 0.4760, win rate 50.14%, games 15362533, eligible levels 5, level win-rate range 48.39%-51.94%, stddev 0.0127
Total Clash Royale Task 3 Spark runtime: 83.62 seconds

Interpretation¶

  1. The Log
  2. Night Witch
  3. Mega Knight

The best 3 cards all have things in common:

  • They are all legendary in rarity. The rarest class of cards.
  • They are all area damage cards. All these cards can do area damage. Either in the form of targeting multiple enemies or spawning minions.

The interesting thing here is that maybe cost doesn't matter too much, since The Log costs 2 elxir (in-game currency), Night Witch costs 4, and Mega Knight costs 7.

Task 4: Card rarity vs wins¶

In [40]:
from pyspark import StorageLevel
from pyspark.sql import functions as F
import time

ROYALE_BATTLES_GLOB = "hdfs:///royale/*/*.csv"
ROYALE_TASK4_PARTIAL_OUT = "hdfs:///sparky/royale_task4_rarity_partials"
ROYALE_TASK4_SUMMARY_OUT = "hdfs:///sparky/royale_task4_rarity_summary"
ROYALE_TASK4_BATCH_SIZE = 1

spark.conf.set("spark.sql.maxPlanStringLength", "4096")

previous_log_level = sc.getLogLevel() if hasattr(sc, "getLogLevel") else "WARN"
sc.setLogLevel("ERROR")
royale_task4_runtime_start = time.perf_counter()

rarity_names = ["common", "rare", "epic", "legendary"]
winner_rarity_cols = [f"winner.{rarity}.count" for rarity in rarity_names]
loser_rarity_cols = [f"loser.{rarity}.count" for rarity in rarity_names]


def clear_hdfs_path_task4(path):
    hadoop_path = spark._jvm.org.apache.hadoop.fs.Path(path)
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
    if fs.exists(hadoop_path):
        fs.delete(hadoop_path, True)
        print(f"Deleted old output: {path}")


def hdfs_glob_paths_task4(glob_path):
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
    statuses = fs.globStatus(spark._jvm.org.apache.hadoop.fs.Path(glob_path))
    if statuses is None:
        return []
    return sorted(str(status.getPath()) for status in statuses)


def rarity_observation_struct(column_name, rarity, won):
    return F.struct(
        F.lit(rarity).alias("rarity"),
        F.col(f"`{column_name}`").cast("int").alias("rarity_count"),
        F.lit(won).cast("long").alias("wins"),
        F.lit(1).cast("long").alias("games"),
    )


def build_task4_rarity_counts(df):
    winner_observations = [
        rarity_observation_struct(f"winner.{rarity}.count", rarity, 1)
        for rarity in rarity_names
    ]
    loser_observations = [
        rarity_observation_struct(f"loser.{rarity}.count", rarity, 0)
        for rarity in rarity_names
    ]

    return (
        df
        .select(F.explode(F.array(*(winner_observations + loser_observations))).alias("obs"))
        .select("obs.rarity", "obs.rarity_count", "obs.wins", "obs.games")
        .where(F.col("rarity_count").isNotNull())
        .groupBy("rarity", "rarity_count")
        .agg(
            F.sum("wins").alias("wins"),
            F.sum("games").alias("games"),
        )
    )


battle_paths = hdfs_glob_paths_task4(ROYALE_BATTLES_GLOB)
if len(battle_paths) == 0:
    raise ValueError(f"No battle CSV files matched: {ROYALE_BATTLES_GLOB}")

print(f"Found {len(battle_paths)} battle CSV files")
clear_hdfs_path_task4(ROYALE_TASK4_PARTIAL_OUT)
clear_hdfs_path_task4(ROYALE_TASK4_SUMMARY_OUT)

expected_task4_cols = winner_rarity_cols + loser_rarity_cols

for batch_id, start in enumerate(range(0, len(battle_paths), ROYALE_TASK4_BATCH_SIZE)):
    batch_paths = battle_paths[start:start + ROYALE_TASK4_BATCH_SIZE]
    print(f"Processing Clash Royale Task 4 batch {batch_id}: {len(batch_paths)} file(s)")

    batch_df = (
        spark.read
        .option("header", "true")
        .option("multiLine", "false")
        .csv(batch_paths)
    )

    missing_cols = [col for col in expected_task4_cols if col not in batch_df.columns]
    if missing_cols:
        raise ValueError(f"Missing expected battle columns in batch {batch_id}: {missing_cols}")

    partial_rarity_counts = build_task4_rarity_counts(batch_df)
    (
        partial_rarity_counts
        .write
        .mode("overwrite")
        .parquet(f"{ROYALE_TASK4_PARTIAL_OUT}/batch_{batch_id:04d}")
    )

    print(f"Wrote partial rarity counts for batch {batch_id}")

combined_rarity_counts = (
    spark.read
    .parquet(f"{ROYALE_TASK4_PARTIAL_OUT}/batch_*")
    .groupBy("rarity", "rarity_count")
    .agg(
        F.sum("wins").alias("wins"),
        F.sum("games").alias("games"),
    )
    .withColumn("win_rate", F.col("wins") / F.col("games"))
)

(
    combined_rarity_counts
    .write
    .mode("overwrite")
    .parquet(f"{ROYALE_TASK4_SUMMARY_OUT}/win_rates_by_count")
)

rarity_counts = spark.read.parquet(f"{ROYALE_TASK4_SUMMARY_OUT}/win_rates_by_count").persist(StorageLevel.DISK_ONLY)
rarity_count_rows = rarity_counts.count()
print(f"Combined rarity/count aggregate rows: {rarity_count_rows}")

correlation_summary = (
    rarity_counts
    .withColumn("x", F.col("rarity_count").cast("double"))
    .withColumn("n", F.col("games").cast("double"))
    .withColumn("sum_x", F.col("x") * F.col("n"))
    .withColumn("sum_y", F.col("wins").cast("double"))
    .withColumn("sum_xy", F.col("x") * F.col("wins").cast("double"))
    .withColumn("sum_x2", F.col("x") * F.col("x") * F.col("n"))
    .withColumn("sum_y2", F.col("wins").cast("double"))
    .groupBy("rarity")
    .agg(
        F.sum("n").alias("observations"),
        F.sum("sum_x").alias("sum_x"),
        F.sum("sum_y").alias("wins"),
        F.sum("sum_xy").alias("sum_xy"),
        F.sum("sum_x2").alias("sum_x2"),
        F.sum("sum_y2").alias("sum_y2"),
    )
    .withColumn("avg_rarity_count", F.col("sum_x") / F.col("observations"))
    .withColumn("win_rate", F.col("wins") / F.col("observations"))
    .withColumn(
        "pearson_correlation",
        (
            (F.col("observations") * F.col("sum_xy")) - (F.col("sum_x") * F.col("wins"))
        ) / F.sqrt(
            ((F.col("observations") * F.col("sum_x2")) - (F.col("sum_x") * F.col("sum_x")))
            * ((F.col("observations") * F.col("sum_y2")) - (F.col("wins") * F.col("wins")))
        )
    )
    .select(
        "rarity",
        F.col("observations").cast("long").alias("observations"),
        F.round("avg_rarity_count", 4).alias("avg_rarity_count"),
        F.round("win_rate", 6).alias("win_rate"),
        F.round("pearson_correlation", 6).alias("pearson_correlation"),
        F.round(F.abs("pearson_correlation"), 6).alias("abs_correlation"),
    )
)

(
    correlation_summary
    .write
    .mode("overwrite")
    .parquet(f"{ROYALE_TASK4_SUMMARY_OUT}/correlations")
)

correlation_rows = (
    spark.read
    .parquet(f"{ROYALE_TASK4_SUMMARY_OUT}/correlations")
    .orderBy(F.desc("abs_correlation"), "rarity")
    .collect()
)

print("Rarity count vs win correlation, strongest absolute correlation first")
for row in correlation_rows:
    print(
        f"{row['rarity']}: correlation {row['pearson_correlation']:.6f}, "
        f"avg count {row['avg_rarity_count']:.4f}, "
        f"win rate {row['win_rate']:.2%}, "
        f"observations {row['observations']}"
    )

print()
for rarity in rarity_names:
    rows = (
        rarity_counts
        .where(F.col("rarity") == rarity)
        .orderBy("rarity_count")
        .collect()
    )
    print(f"{rarity.title()} count win rates")
    for row in rows:
        print(
            f"  {row['rarity_count']}: "
            f"win rate {row['win_rate']:.2%}, "
            f"wins {row['wins']}, "
            f"games {row['games']}"
        )
    print()

rarity_counts.unpersist(blocking=False)

royale_task4_runtime = time.perf_counter() - royale_task4_runtime_start
print(f"Total Clash Royale Task 4 Spark runtime: {royale_task4_runtime:.2f} seconds")
sc.setLogLevel(previous_log_level)
Found 10 battle CSV files
Deleted old output: hdfs:///sparky/royale_task4_rarity_partials
Deleted old output: hdfs:///sparky/royale_task4_rarity_summary
Processing Clash Royale Task 4 batch 0: 1 file(s)
                                                                                
Wrote partial rarity counts for batch 0
Processing Clash Royale Task 4 batch 1: 1 file(s)
                                                                                
Wrote partial rarity counts for batch 1
Processing Clash Royale Task 4 batch 2: 1 file(s)
                                                                                
Wrote partial rarity counts for batch 2
Processing Clash Royale Task 4 batch 3: 1 file(s)
                                                                                
Wrote partial rarity counts for batch 3
Processing Clash Royale Task 4 batch 4: 1 file(s)
                                                                                
Wrote partial rarity counts for batch 4
Processing Clash Royale Task 4 batch 5: 1 file(s)
                                                                                
Wrote partial rarity counts for batch 5
Processing Clash Royale Task 4 batch 6: 1 file(s)
                                                                                
Wrote partial rarity counts for batch 6
Processing Clash Royale Task 4 batch 7: 1 file(s)
                                                                                
Wrote partial rarity counts for batch 7
Processing Clash Royale Task 4 batch 8: 1 file(s)
                                                                                
Wrote partial rarity counts for batch 8
Processing Clash Royale Task 4 batch 9: 1 file(s)
                                                                                
Wrote partial rarity counts for batch 9
                                                                                
Combined rarity/count aggregate rows: 36
                                                                                
Rarity count vs win correlation, strongest absolute correlation first
rare: correlation -0.008498, avg count 2.0598, win rate 50.00%, observations 75946774
epic: correlation 0.008474, avg count 2.2634, win rate 50.00%, observations 75946774
common: correlation -0.004201, avg count 2.0377, win rate 50.00%, observations 75946774
legendary: correlation 0.003590, avg count 1.6391, win rate 50.00%, observations 75946774

Common count win rates
  0: win rate 49.69%, wins 4364117, games 8783445
  1: win rate 50.20%, wins 9838091, games 19595919
  2: win rate 50.46%, wins 10546771, games 20901584
  3: win rate 49.95%, wins 8299707, games 16615318
  4: win rate 48.63%, wins 3557320, games 7315818
  5: win rate 50.22%, wins 1131604, games 2253514
  6: win rate 50.02%, wins 202056, games 403934
  7: win rate 49.39%, wins 29647, games 60026
  8: win rate 23.66%, wins 4074, games 17216

Rare count win rates
  0: win rate 50.14%, wins 3770799, games 7520151
  1: win rate 50.38%, wins 10058467, games 19966130
  2: win rate 50.41%, wins 11308236, games 22430635
  3: win rate 49.38%, wins 7670131, games 15533119
  4: win rate 49.59%, wins 3920614, games 7906260
  5: win rate 47.82%, wins 1049326, games 2194467
  6: win rate 49.48%, wins 179982, games 363765
  7: win rate 49.52%, wins 14968, games 30228
  8: win rate 42.79%, wins 864, games 2019

Epic count win rates
  0: win rate 48.37%, wins 2973996, games 6148803
  1: win rate 49.59%, wins 9077034, games 18303394
  2: win rate 50.34%, wins 10873635, games 21600199
  3: win rate 50.37%, wins 8064627, games 16011484
  4: win rate 50.32%, wins 4381865, games 8708699
  5: win rate 50.08%, wins 1865407, games 3724848
  6: win rate 50.89%, wins 642689, games 1262868
  7: win rate 50.53%, wins 83501, games 165246
  8: win rate 50.08%, wins 10633, games 21233

Legendary count win rates
  0: win rate 48.59%, wins 6635081, games 13655748
  1: win rate 50.46%, wins 12560681, games 24894109
  2: win rate 50.48%, wins 10861760, games 21516648
  3: win rate 50.07%, wins 4891953, games 9770413
  4: win rate 49.87%, wins 2081498, games 4174210
  5: win rate 48.95%, wins 661477, games 1351454
  6: win rate 48.53%, wins 185665, games 382557
  7: win rate 47.86%, wins 57401, games 119925
  8: win rate 46.35%, wins 37871, games 81710

Total Clash Royale Task 4 Spark runtime: 64.47 seconds

Interpretation¶

There is no strong evidence that having more cards of a rarity leads to more wins.

Having a lot of legendaries doesn't seem like a good idea. Though from the data, I think having more epics than anything else might be a good idea. So having some epics and rares with 1-2 commons and legendaries might be a good starting point for a deck.

Task 5: Card Category Count vs Wins¶

In this task, I want to see if having a particular category of cards is correlated with wins.

There are different categories:

  • Troops
  • Structures
  • Spells
In [41]:
from pyspark import StorageLevel
from pyspark.sql import functions as F
import time

ROYALE_BATTLES_GLOB = "hdfs:///royale/*/*.csv"
ROYALE_TASK5_PARTIAL_OUT = "hdfs:///sparky/royale_task5_category_partials"
ROYALE_TASK5_SUMMARY_OUT = "hdfs:///sparky/royale_task5_category_summary"
ROYALE_TASK5_BATCH_SIZE = 1

spark.conf.set("spark.sql.maxPlanStringLength", "4096")

previous_log_level = sc.getLogLevel() if hasattr(sc, "getLogLevel") else "WARN"
sc.setLogLevel("ERROR")
royale_task5_runtime_start = time.perf_counter()

category_names = ["troop", "structure", "spell"]
winner_category_cols = [f"winner.{category}.count" for category in category_names]
loser_category_cols = [f"loser.{category}.count" for category in category_names]


def clear_hdfs_path_task5(path):
    hadoop_path = spark._jvm.org.apache.hadoop.fs.Path(path)
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
    if fs.exists(hadoop_path):
        fs.delete(hadoop_path, True)
        print(f"Deleted old output: {path}")


def hdfs_glob_paths_task5(glob_path):
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
    statuses = fs.globStatus(spark._jvm.org.apache.hadoop.fs.Path(glob_path))
    if statuses is None:
        return []
    return sorted(str(status.getPath()) for status in statuses)


def category_observation_struct(column_name, category, won):
    return F.struct(
        F.lit(category).alias("category"),
        F.col(f"`{column_name}`").cast("int").alias("category_count"),
        F.lit(won).cast("long").alias("wins"),
        F.lit(1).cast("long").alias("games"),
    )


def build_task5_category_counts(df):
    winner_observations = [
        category_observation_struct(f"winner.{category}.count", category, 1)
        for category in category_names
    ]
    loser_observations = [
        category_observation_struct(f"loser.{category}.count", category, 0)
        for category in category_names
    ]

    return (
        df
        .select(F.explode(F.array(*(winner_observations + loser_observations))).alias("obs"))
        .select("obs.category", "obs.category_count", "obs.wins", "obs.games")
        .where(F.col("category_count").isNotNull())
        .groupBy("category", "category_count")
        .agg(
            F.sum("wins").alias("wins"),
            F.sum("games").alias("games"),
        )
    )


battle_paths = hdfs_glob_paths_task5(ROYALE_BATTLES_GLOB)
if len(battle_paths) == 0:
    raise ValueError(f"No battle CSV files matched: {ROYALE_BATTLES_GLOB}")

print(f"Found {len(battle_paths)} battle CSV files")
clear_hdfs_path_task5(ROYALE_TASK5_PARTIAL_OUT)
clear_hdfs_path_task5(ROYALE_TASK5_SUMMARY_OUT)

expected_task5_cols = winner_category_cols + loser_category_cols

for batch_id, start in enumerate(range(0, len(battle_paths), ROYALE_TASK5_BATCH_SIZE)):
    batch_paths = battle_paths[start:start + ROYALE_TASK5_BATCH_SIZE]
    print(f"Processing Clash Royale Task 5 batch {batch_id}: {len(batch_paths)} file(s)")

    batch_df = (
        spark.read
        .option("header", "true")
        .option("multiLine", "false")
        .csv(batch_paths)
    )

    missing_cols = [col for col in expected_task5_cols if col not in batch_df.columns]
    if missing_cols:
        raise ValueError(f"Missing expected battle columns in batch {batch_id}: {missing_cols}")

    partial_category_counts = build_task5_category_counts(batch_df)
    (
        partial_category_counts
        .write
        .mode("overwrite")
        .parquet(f"{ROYALE_TASK5_PARTIAL_OUT}/batch_{batch_id:04d}")
    )

    print(f"Wrote partial category counts for batch {batch_id}")

combined_category_counts = (
    spark.read
    .parquet(f"{ROYALE_TASK5_PARTIAL_OUT}/batch_*")
    .groupBy("category", "category_count")
    .agg(
        F.sum("wins").alias("wins"),
        F.sum("games").alias("games"),
    )
    .withColumn("win_rate", F.col("wins") / F.col("games"))
)

(
    combined_category_counts
    .write
    .mode("overwrite")
    .parquet(f"{ROYALE_TASK5_SUMMARY_OUT}/win_rates_by_count")
)

category_counts = spark.read.parquet(f"{ROYALE_TASK5_SUMMARY_OUT}/win_rates_by_count").persist(StorageLevel.DISK_ONLY)
category_count_rows = category_counts.count()
print(f"Combined category/count aggregate rows: {category_count_rows}")

correlation_summary = (
    category_counts
    .withColumn("x", F.col("category_count").cast("double"))
    .withColumn("n", F.col("games").cast("double"))
    .withColumn("sum_x", F.col("x") * F.col("n"))
    .withColumn("sum_y", F.col("wins").cast("double"))
    .withColumn("sum_xy", F.col("x") * F.col("wins").cast("double"))
    .withColumn("sum_x2", F.col("x") * F.col("x") * F.col("n"))
    .withColumn("sum_y2", F.col("wins").cast("double"))
    .groupBy("category")
    .agg(
        F.sum("n").alias("observations"),
        F.sum("sum_x").alias("sum_x"),
        F.sum("sum_y").alias("wins"),
        F.sum("sum_xy").alias("sum_xy"),
        F.sum("sum_x2").alias("sum_x2"),
        F.sum("sum_y2").alias("sum_y2"),
    )
    .withColumn("avg_category_count", F.col("sum_x") / F.col("observations"))
    .withColumn("win_rate", F.col("wins") / F.col("observations"))
    .withColumn(
        "pearson_correlation",
        (
            (F.col("observations") * F.col("sum_xy")) - (F.col("sum_x") * F.col("wins"))
        ) / F.sqrt(
            ((F.col("observations") * F.col("sum_x2")) - (F.col("sum_x") * F.col("sum_x")))
            * ((F.col("observations") * F.col("sum_y2")) - (F.col("wins") * F.col("wins")))
        )
    )
    .select(
        "category",
        F.col("observations").cast("long").alias("observations"),
        F.round("avg_category_count", 4).alias("avg_category_count"),
        F.round("win_rate", 6).alias("win_rate"),
        F.round("pearson_correlation", 6).alias("pearson_correlation"),
        F.round(F.abs("pearson_correlation"), 6).alias("abs_correlation"),
    )
)

(
    correlation_summary
    .write
    .mode("overwrite")
    .parquet(f"{ROYALE_TASK5_SUMMARY_OUT}/correlations")
)

correlation_rows = (
    spark.read
    .parquet(f"{ROYALE_TASK5_SUMMARY_OUT}/correlations")
    .orderBy(F.desc("abs_correlation"), "category")
    .collect()
)

print("Category count vs win correlation, strongest absolute correlation first")
for row in correlation_rows:
    print(
        f"{row['category']}: correlation {row['pearson_correlation']:.6f}, "
        f"avg count {row['avg_category_count']:.4f}, "
        f"win rate {row['win_rate']:.2%}, "
        f"observations {row['observations']}"
    )

print()
for category in category_names:
    rows = (
        category_counts
        .where(F.col("category") == category)
        .orderBy("category_count")
        .collect()
    )
    print(f"{category.title()} count win rates")
    for row in rows:
        print(
            f"  {row['category_count']}: "
            f"win rate {row['win_rate']:.2%}, "
            f"wins {row['wins']}, "
            f"games {row['games']}"
        )
    print()

category_counts.unpersist(blocking=False)

royale_task5_runtime = time.perf_counter() - royale_task5_runtime_start
print(f"Total Clash Royale Task 5 Spark runtime: {royale_task5_runtime:.2f} seconds")
sc.setLogLevel(previous_log_level)
Found 10 battle CSV files
Deleted old output: hdfs:///sparky/royale_task5_category_partials
Deleted old output: hdfs:///sparky/royale_task5_category_summary
Processing Clash Royale Task 5 batch 0: 1 file(s)
                                                                                
Wrote partial category counts for batch 0
Processing Clash Royale Task 5 batch 1: 1 file(s)
                                                                                
Wrote partial category counts for batch 1
Processing Clash Royale Task 5 batch 2: 1 file(s)
                                                                                
Wrote partial category counts for batch 2
Processing Clash Royale Task 5 batch 3: 1 file(s)
                                                                                
Wrote partial category counts for batch 3
Processing Clash Royale Task 5 batch 4: 1 file(s)
                                                                                
Wrote partial category counts for batch 4
Processing Clash Royale Task 5 batch 5: 1 file(s)
                                                                                
Wrote partial category counts for batch 5
Processing Clash Royale Task 5 batch 6: 1 file(s)
                                                                                
Wrote partial category counts for batch 6
Processing Clash Royale Task 5 batch 7: 1 file(s)
                                                                                
Wrote partial category counts for batch 7
Processing Clash Royale Task 5 batch 8: 1 file(s)
                                                                                
Wrote partial category counts for batch 8
Processing Clash Royale Task 5 batch 9: 1 file(s)
                                                                                
Wrote partial category counts for batch 9
                                                                                
Combined category/count aggregate rows: 27
Category count vs win correlation, strongest absolute correlation first
structure: correlation -0.010709, avg count 0.4233, win rate 50.00%, observations 75946774
spell: correlation 0.006955, avg count 1.8873, win rate 50.00%, observations 75946774
troop: correlation 0.000136, avg count 5.6894, win rate 50.00%, observations 75946774

Troop count win rates
  0: win rate 6.99%, wins 2372, games 33939
  1: win rate 23.39%, wins 4848, games 20725
  2: win rate 43.49%, wins 41412, games 95212
  3: win rate 49.69%, wins 811090, games 1632248
  4: win rate 50.23%, wins 4311608, games 8584163
  5: win rate 50.21%, wins 10677092, games 21263647
  6: win rate 50.01%, wins 13480285, games 26954968
  7: win rate 49.95%, wins 7141065, games 14297157
  8: win rate 49.06%, wins 1503615, games 3064715

Structure count win rates
  0: win rate 50.14%, wins 23703609, games 47274637
  1: win rate 50.22%, wins 12842223, games 25571620
  2: win rate 46.21%, wins 1315216, games 2846166
  3: win rate 48.26%, wins 96567, games 200115
  4: win rate 37.66%, wins 12020, games 31918
  5: win rate 37.32%, wins 2121, games 5683
  6: win rate 31.93%, wins 718, games 2249
  7: win rate 25.56%, wins 340, games 1330
  8: win rate 4.39%, wins 573, games 13056

Spell count win rates
  0: win rate 46.91%, wins 2015971, games 4297746
  1: win rate 49.87%, wins 9757285, games 19565577
  2: win rate 50.33%, wins 17476103, games 34723280
  3: win rate 50.56%, wins 7741046, games 15311555
  4: win rate 49.07%, wins 947716, games 1931356
  5: win rate 40.26%, wins 32517, games 80771
  6: win rate 16.70%, wins 1835, games 10989
  7: win rate 5.43%, wins 536, games 9866
  8: win rate 2.42%, wins 378, games 15634

Total Clash Royale Task 5 Spark runtime: 57.97 seconds

Interpretation¶

According to Pearson Correlation, there is no significant effect of different categories' participation in a deck.

Structure count had the strongest relationship, but even that was negative and minimal.

Based on the count vs win rates data:

  • Decks with a lot of spells or structures perform very poorly. However, looking at the number of games played, it might be that they were challenge games played by youtubers. (I PLAYED ALL SPELLS AND WON _)
  • Reflecting the previous point, decks with no troops perform extremely poorly.

Conclusion¶

The best 2 decks we saw were:

  1. Elo 859.35, games 115: P.E.K.K.A, Valkyrie, Mega Minion, Goblin Hut, Furnace, Fireball, Arrows, Zap
  2. Elo 859.16, games 721: Goblins, Valkyrie, Musketeer, Hog Rider, Inferno Tower, Fireball, Arrows, Zap

To make my point clearer, I name: deck1 -> Larry deck2 -> Barry

Now, here are the stats for Larry:

#Common #Rare #Epic #Legendary #Structures #Troops #Spells
2 5 1 0 2 3 3

Now Barry:

#Common #Rare #Epic #Legendary #Structures #Troops #Spells
3 5 0 0 1 4 3

Interestingly, Barry doesn't have any epic cards which only has an overall winrate of $48.37\%$ It doesn't seem too bad, but there are other combinations for epic cards that touch winrates of $50.89\%$ which is about $2.5\%$ better.

What I would do differently¶

If the goal wasn't to explore spark's distributed analysis capabilities, I would have employed a traditional Elo system instead of an aggregated one. The problem with traditional method is that it has to be chronological, which isn't very spark friendly.