McNeil River Guided Lottery Strategy¶
Source: https://www.adfg.alaska.gov/index.cfm?adfg=viewingpermits.mcneil_guided
from __future__ import annotations
from datetime import date, datetime, timezone
from pathlib import Path
import re
import subprocess
import matplotlib.pyplot as plt
import polars as pl
def find_project_root(start: Path) -> Path:
for candidate in [start, *start.parents]:
if (candidate / "pyproject.toml").exists():
return candidate
return start
PROJECT_ROOT = find_project_root(Path.cwd().resolve())
URL = "https://www.adfg.alaska.gov/index.cfm?adfg=viewingpermits.mcneil_guided"
GROUP_SIZE = 3
CHOICES_PER_PERSON = 4
SEASON_START_MONTH_DAY = (6, 7)
SEASON_END_MONTH_DAY = (8, 25)
CACHE_PATH = PROJECT_ROOT / "data" / "mcneil_guided_daily.csv"
CACHE_META_PATH = PROJECT_ROOT / "data" / "mcneil_guided_daily.meta.txt"
pl.Config.set_tbl_rows(40)
pl.Config.set_tbl_cols(30)
polars.config.Config
def fetch_html_via_curl(url: str) -> str:
"""Use curl because direct HTTP clients are intermittently blocked by the site WAF."""
return subprocess.check_output(["curl", "-sL", url], text=True)
def clean_text(value: str) -> str:
return " ".join(value.replace("\xa0", " " ).split())
def to_int(value: str) -> int | None:
match = re.search(r"-?\d+", value)
return int(match.group()) if match else None
def to_float(value: str) -> float | None:
match = re.search(r"-?\d+(?:\.\d+)?", value)
return float(match.group()) if match else None
def parse_guided_daily_from_html(html: str) -> pl.DataFrame:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, "html.parser")
page_text = " ".join(soup.get_text(" ", strip=True).split())
year_match = re.search(r"(20\d{2})\s+High Tide Time", page_text)
if not year_match:
raise ValueError("Could not infer schedule year from ADFG page.")
year = int(year_match.group(1))
season_start = date(year, *SEASON_START_MONTH_DAY)
season_end = date(year, *SEASON_END_MONTH_DAY)
schedule_table = None
for table in soup.find_all("table"):
text = " ".join(table.get_text(" ", strip=True).split())
if "Time Block Letter" in text and "% winning 5-Yr. Average by time block" in text:
schedule_table = table
break
if schedule_table is None:
raise ValueError("Could not locate the guided schedule table on the page.")
rows: list[dict] = []
active: dict = {}
for tr in schedule_table.select("tbody tr"):
cells = tr.find_all("td")
if not cells:
continue
texts = [clean_text(td.get_text(" ", strip=True)) for td in cells]
first_cell = texts[0]
if re.fullmatch(r"[A-Z]", first_cell):
if len(cells) < 12:
continue
active = {
"block": first_cell,
"permits_available": to_int(texts[4]),
"draw_pct": to_float(texts[5]),
"tide_flats_activity": texts[6] or None,
"sedge_flats_activity": texts[7] or None,
"mikfik_activity": texts[8] or None,
"falls_activity": texts[9] or None,
"lagoon_activity": texts[10] or None,
}
date_text, time_cell, height_cell, bears_text = texts[1], cells[2], cells[3], texts[11]
else:
if not active or len(cells) < 4:
continue
date_text, time_cell, height_cell, bears_text = texts[0], cells[1], cells[2], texts[3]
if not re.fullmatch(r"\d{1,2}-[A-Za-z]{3}", date_text):
continue
obs_date = datetime.strptime(f"{date_text}-{year}", "%d-%b-%Y").date()
if not (season_start <= obs_date <= season_end):
continue
rows.append(
{
**active,
"date": obs_date,
"avg_bears_day": to_float(bears_text),
"access_risk_day": ("tg-uc3z" in (time_cell.get("class") or []))
or ("tg-uc3z" in (height_cell.get("class") or [])),
}
)
if not rows:
raise ValueError("No guided rows were parsed from the schedule table.")
return pl.DataFrame(rows).sort(["block", "date"])
def load_mcneil_guided_daily(
url: str = URL,
cache_path: Path = CACHE_PATH,
cache_meta_path: Path = CACHE_META_PATH,
refresh_cache: bool = False,
) -> pl.DataFrame:
if cache_path.exists() and not refresh_cache:
return pl.read_csv(
cache_path,
try_parse_dates=True,
schema_overrides={"access_risk_day": pl.Boolean},
)
html = fetch_html_via_curl(url)
daily_df = parse_guided_daily_from_html(html)
cache_path.parent.mkdir(parents=True, exist_ok=True)
daily_df.write_csv(cache_path)
cache_meta_path.parent.mkdir(parents=True, exist_ok=True)
cache_meta_path.write_text(
"\n".join(
[
f"fetched_utc={datetime.now(timezone.utc).isoformat()}",
f"source_url={url}",
f"rows={daily_df.height}",
]
)
)
return daily_df
def summarize_by_block(daily_df: pl.DataFrame, group_size: int = GROUP_SIZE) -> pl.DataFrame:
return (
daily_df.group_by("block")
.agg(
[
pl.col("date").min().alias("start_date"),
pl.col("date").max().alias("end_date"),
pl.first("permits_available").alias("permits_available"),
pl.first("win_pct").alias("draw_pct"),
pl.col("avg_bears_day").mean().round(2).alias("avg_bears_day"),
pl.col("avg_bears_day").min().alias("min_bears_day"),
pl.col("avg_bears_day").max().alias("max_bears_day"),
pl.col("access_risk_day").cast(pl.UInt8).max().alias("has_access_risk_day"),
pl.first("tide_flats_activity").alias("tide_flats_activity"),
pl.first("sedge_flats_activity").alias("sedge_flats_activity"),
pl.first("mikfik_activity").alias("mikfik_activity"),
pl.first("falls_activity").alias("falls_activity"),
pl.first("lagoon_activity").alias("lagoon_activity"),
]
)
.sort("start_date")
.with_columns([(pl.col("draw_pct") / 100).alias("p_single")])
.with_columns(
[
(1 - (1 - pl.col("p_single")) ** group_size).alias("p_at_least_1_of_3"),
(
3 * pl.col("p_single") ** 2 * (1 - pl.col("p_single"))
+ pl.col("p_single") ** 3
).alias("p_at_least_2_of_3"),
(pl.col("p_single") ** group_size).alias("p_all_3_of_3"),
pl.concat_str(
[
pl.col("start_date").dt.strftime("%b %-d"),
pl.lit(" - "),
pl.col("end_date").dt.strftime("%b %-d"),
]
).alias("window"),
]
)
)
def rank_windows(
block_df: pl.DataFrame,
probability_col: str,
n_choices: int = 4,
) -> pl.DataFrame:
return (
block_df.with_columns(
[
(pl.col(probability_col) * pl.col("avg_bears_day")).alias("expected_bears_unadjusted")
]
)
.with_columns(
[
pl.col("expected_bears_unadjusted").alias("score")
]
)
.sort("score", descending=True)
.select(
[
"block",
"window",
"permits_available",
"draw_pct",
"avg_bears_day",
"has_access_risk_day",
probability_col,
"expected_bears_unadjusted",
"score",
]
)
.head(n_choices)
)
def build_shared_choice_set(
block_df: pl.DataFrame,
probability_col: str,
choices_per_person: int = CHOICES_PER_PERSON,
) -> pl.DataFrame:
shared_choices = rank_windows(
block_df,
probability_col=probability_col,
n_choices=choices_per_person,
).get_column("block").to_list()
return pl.DataFrame(
[
{
**{f"choice_{j + 1}": pick for j, pick in enumerate(shared_choices)},
}
]
)
# Set refresh_cache=True when you want to pull the latest table from ADFG.
daily_df = load_mcneil_guided_daily(refresh_cache=False)
block_df = summarize_by_block(daily_df)
print(f"Daily rows: {daily_df.height}")
print(f"Time blocks: {block_df.height}")
if CACHE_META_PATH.exists():
print(CACHE_META_PATH.read_text())
block_df.select(["block", "window", "permits_available", "draw_pct", "avg_bears_day", "has_access_risk_day"])
Daily rows: 80 Time blocks: 20 fetched_utc=2026-02-13T08:51:47.145920+00:00 source_url=https://www.adfg.alaska.gov/index.cfm?adfg=viewingpermits.mcneil_guided rows=80
| block | window | permits_available | draw_pct | avg_bears_day | has_access_risk_day |
|---|---|---|---|---|---|
| str | str | i64 | f64 | f64 | u8 |
| "A" | "Jun 7 - Jun 10" | 10 | 15.0 | 14.0 | 1 |
| "B" | "Jun 11 - Jun 14" | 10 | 10.0 | 16.25 | 1 |
| "C" | "Jun 15 - Jun 18" | 7 | 7.0 | 17.25 | 0 |
| "D" | "Jun 19 - Jun 22" | 10 | 6.0 | 21.0 | 1 |
| "E" | "Jun 23 - Jun 26" | 7 | 5.0 | 23.75 | 1 |
| "F" | "Jun 27 - Jun 30" | 10 | 5.0 | 28.25 | 1 |
| "G" | "Jul 1 - Jul 4" | 10 | 3.0 | 37.75 | 0 |
| "H" | "Jul 5 - Jul 8" | 10 | 3.0 | 47.25 | 1 |
| "I" | "Jul 9 - Jul 12" | 10 | 3.0 | 49.75 | 1 |
| "J" | "Jul 13 - Jul 16" | 7 | 2.0 | 50.0 | 0 |
| "K" | "Jul 17 - Jul 20" | 10 | 3.0 | 51.0 | 0 |
| "L" | "Jul 21 - Jul 24" | 10 | 3.0 | 38.5 | 1 |
| "M" | "Jul 25 - Jul 28" | 7 | 2.0 | 29.0 | 1 |
| "N" | "Jul 29 - Aug 1" | 10 | 4.0 | 26.75 | 0 |
| "O" | "Aug 2 - Aug 5" | 10 | 4.0 | 21.75 | 0 |
| "P" | "Aug 6 - Aug 9" | 7 | 4.0 | 20.25 | 1 |
| "Q" | "Aug 10 - Aug 13" | 10 | 8.0 | 18.25 | 0 |
| "R" | "Aug 14 - Aug 17" | 10 | 11.0 | 16.0 | 0 |
| "S" | "Aug 18 - Aug 21" | 10 | 13.0 | 11.75 | 1 |
| "T" | "Aug 22 - Aug 25" | 10 | 18.0 | 9.75 | 1 |
Core Tradeoff¶
Draw odds v. bear volume
Seasonality: Bears Over Time¶
# Seasonal pattern: average bears/day by window over time.
season_df = block_df.select(["block", "start_date", "window", "avg_bears_day", "draw_pct"]).sort("start_date")
dates = season_df.get_column("start_date").to_list()
bears = season_df.get_column("avg_bears_day").to_list()
draw_pct = season_df.get_column("draw_pct").to_list()
blocks = season_df.get_column("block").to_list()
import matplotlib.dates as mdates
fig, ax = plt.subplots(figsize=(14, 6))
ax2 = ax.twinx()
bears_line = ax.plot(
dates,
bears,
marker="o",
linewidth=2,
color="#2a9d8f",
label="Average bears/day",
)
win_line = ax2.plot(
dates,
draw_pct,
marker="s",
linewidth=2,
linestyle="--",
color="#e76f51",
label="Win %",
)
for d, b, block in zip(dates, bears, blocks):
ax.annotate(block, (d, b), textcoords="offset points", xytext=(4, 4), fontsize=8)
ax.set_title("Average Bears per Day and Win % by Window Over Time")
ax.set_xlabel("Window start date")
ax.set_ylabel("Average bears/day", color="#2a9d8f")
ax2.set_ylabel("Win %", color="#e76f51")
# Weekly labels plus daily divider lines for easy counting.
ax.xaxis.set_major_locator(mdates.WeekdayLocator(interval=1))
ax.xaxis.set_major_formatter(mdates.DateFormatter("%b %d"))
ax.xaxis.set_minor_locator(mdates.DayLocator(interval=1))
ax.grid(which="minor", axis="x", linestyle="-", linewidth=0.6, alpha=0.18)
ax.grid(which="major", axis="y", linestyle="-", linewidth=0.8, alpha=0.25)
# Shared legend across both axes.
lines = bears_line + win_line
labels = [l.get_label() for l in lines]
ax.legend(lines, labels, loc="upper left")
fig.autofmt_xdate()
plt.tight_layout()
plt.show()
tradeoff_df = block_df.select(["block", "window", "draw_pct", "avg_bears_day", "has_access_risk_day"])
print("\nTop windows by individual draw %:")
display(tradeoff_df.sort("draw_pct", descending=True).head(6))
print("\nTop windows by average bears/day:")
display(tradeoff_df.sort("avg_bears_day", descending=True).head(6))
Top windows by individual draw %:
| block | window | draw_pct | avg_bears_day | has_access_risk_day |
|---|---|---|---|---|
| str | str | f64 | f64 | u8 |
| "T" | "Aug 22 - Aug 25" | 18.0 | 9.75 | 1 |
| "A" | "Jun 7 - Jun 10" | 15.0 | 14.0 | 1 |
| "S" | "Aug 18 - Aug 21" | 13.0 | 11.75 | 1 |
| "R" | "Aug 14 - Aug 17" | 11.0 | 16.0 | 0 |
| "B" | "Jun 11 - Jun 14" | 10.0 | 16.25 | 1 |
| "Q" | "Aug 10 - Aug 13" | 8.0 | 18.25 | 0 |
Top windows by average bears/day:
| block | window | draw_pct | avg_bears_day | has_access_risk_day |
|---|---|---|---|---|
| str | str | f64 | f64 | u8 |
| "K" | "Jul 17 - Jul 20" | 3.0 | 51.0 | 0 |
| "J" | "Jul 13 - Jul 16" | 2.0 | 50.0 | 0 |
| "I" | "Jul 9 - Jul 12" | 3.0 | 49.75 | 1 |
| "H" | "Jul 5 - Jul 8" | 3.0 | 47.25 | 1 |
| "L" | "Jul 21 - Jul 24" | 3.0 | 38.5 | 1 |
| "G" | "Jul 1 - Jul 4" | 3.0 | 37.75 | 0 |
# Visual tradeoff: odds vs bears, with label by block.
plot_df = block_df.select(["block", "draw_pct", "avg_bears_day"]).sort("block")
x = plot_df.get_column("draw_pct").to_list()
y = plot_df.get_column("avg_bears_day").to_list()
labels = plot_df.get_column("block").to_list()
fig, ax = plt.subplots(figsize=(14, 7))
sc = ax.scatter(x, y, s=90)
for xi, yi, label in zip(x, y, labels):
ax.annotate(label, (xi, yi), textcoords="offset points", xytext=(4, 4), fontsize=9)
ax.set_title("Windows: Draw % vs Average Bears/Day")
ax.set_xlabel("ADFG 5-year average draw percentage (individual)")
ax.set_xlim(0, 100)
ax.set_ylabel("Average bears/day viewed")
ax.grid(alpha=0.2)
plt.show()
Lottery Uncertainty¶
Your group rule is strict: the trip only happens if all 3 can go. The uncertainty is how the lottery mechanics translate into a group success probability per window.
For each window, the notebook defines:
p_single: listed individual draw probability (draw_pct / 100)g: group size (GROUP_SIZE = 3)c: Scenario 2 capacity factor (CAPACITY_FACTOR, default0.85)
We evaluate three plausible lottery-mechanics scenarios using a per-window success probability q_block:
- Scenario 1: Party-draw model
q_block = p_single - Scenario 2: One-hit + capacity model
q_block = c * (1 - (1 - p_single)^g) - Scenario 3: Independent triple-hit model
q_block = p_single^g
Interpretation of Scenario 2 (c):
c = 1.0: whenever at least one group member gets a hit, capacity is always sufficient for all 3.c < 1.0: only a fraction of those one-hit events convert to a full 3-person trip.
Across your ordered 4-window list, outcomes are modeled as a first-success process:
p_trip = Σ_k [q_k * Π_{j<k}(1 - q_j)]xBears = Σ_k [avg_bears_k * q_k * Π_{j<k}(1 - q_j)]
In the objective sections below, we optimize separately under each scenario so you can see how strategy changes.
from itertools import permutations
# Tunable policy knobs for discussion.
CAPACITY_FACTOR = 0.85 # Used only in Scenario 2.
QUALITY_MIN_TRIP_PROB = 0.01 # 1% minimum trip chance floor for quality-first objective.
assumption_specs = [
{
"name": "Scenario 1: Party-draw model",
"q_expr": pl.col("p_single"),
},
{
"name": "Scenario 2: One-hit + capacity model",
"q_expr": (1 - (1 - pl.col("p_single")) ** GROUP_SIZE) * CAPACITY_FACTOR,
},
{
"name": "Scenario 3: Independent triple-hit model",
"q_expr": pl.col("p_single") ** GROUP_SIZE,
},
]
all_blocks = block_df.get_column("block").to_list()
bears_map = dict(zip(block_df.get_column("block").to_list(), block_df.get_column("avg_bears_day").to_list()))
assumption_q_maps: dict[str, dict[str, float]] = {}
for spec in assumption_specs:
tmp = block_df.select(
[
pl.col("block"),
spec["q_expr"].clip(0.0, 1.0).alias("q"),
]
)
assumption_q_maps[spec["name"]] = dict(zip(tmp.get_column("block").to_list(), tmp.get_column("q").to_list()))
def evaluate_order_metrics(
order: tuple[str, ...],
q_map: dict[str, float],
bears_map: dict[str, float],
) -> dict:
p_no_prev = 1.0
p_trip = 0.0
expected_bears = 0.0
for block in order:
q = q_map[block]
bears = bears_map[block]
p_this = p_no_prev * q
p_trip += p_this
expected_bears += p_this * bears
p_no_prev *= (1 - q)
expected_bears_if_trip = expected_bears / p_trip if p_trip > 0 else 0.0
return {
"p_trip": p_trip,
"expected_bears": expected_bears,
"expected_bears_if_trip": expected_bears_if_trip,
}
def optimize_order_for_assumption(
objective: str,
q_map: dict[str, float],
quality_min_trip_prob: float = QUALITY_MIN_TRIP_PROB,
) -> tuple[tuple[str, ...], dict, bool]:
best_order = None
best_metrics = None
best_score = float("-inf")
for order in permutations(all_blocks, CHOICES_PER_PERSON):
metrics = evaluate_order_metrics(order, q_map, bears_map)
if objective == "access":
score = metrics["p_trip"]
elif objective == "value":
score = metrics["expected_bears"]
elif objective == "quality_with_floor":
if metrics["p_trip"] < quality_min_trip_prob:
continue
score = metrics["expected_bears_if_trip"]
else:
raise ValueError(f"Unknown objective: {objective}")
if score > best_score:
best_score = score
best_order = order
best_metrics = metrics
used_floor = True
if objective == "quality_with_floor" and best_order is None:
# Fallback if the floor is infeasible under this assumption.
used_floor = False
best_order, best_metrics, _ = optimize_order_for_assumption(
objective="quality_with_floor_fallback",
q_map=q_map,
quality_min_trip_prob=0.0,
)
if objective == "quality_with_floor_fallback":
# Internal fallback behaves like pure quality objective.
best_order = None
best_metrics = None
best_score = float("-inf")
for order in permutations(all_blocks, CHOICES_PER_PERSON):
metrics = evaluate_order_metrics(order, q_map, bears_map)
score = metrics["expected_bears_if_trip"]
if score > best_score:
best_score = score
best_order = order
best_metrics = metrics
return best_order, best_metrics, False
return best_order, best_metrics, used_floor
def summarize_objective_by_assumption(
objective: str,
objective_label: str,
quality_min_trip_prob: float = QUALITY_MIN_TRIP_PROB,
) -> pl.DataFrame:
rows = []
for spec in assumption_specs:
assumption_name = spec["name"]
q_map = assumption_q_maps[assumption_name]
order, metrics, used_floor = optimize_order_for_assumption(
objective=objective,
q_map=q_map,
quality_min_trip_prob=quality_min_trip_prob,
)
row = {
"assumption": assumption_name,
"objective": objective_label,
"choice_1": order[0],
"choice_2": order[1],
"choice_3": order[2],
"choice_4": order[3],
"trip_chance_pct": metrics["p_trip"] * 100,
"expected_bears_per_application": metrics["expected_bears"],
"expected_bears_if_trip_happens": metrics["expected_bears_if_trip"],
}
if objective == "quality_with_floor":
row["quality_floor_applied"] = used_floor
rows.append(row)
out = pl.DataFrame(rows).with_columns(
[
pl.col("trip_chance_pct").round(2),
pl.col("expected_bears_per_application").round(3),
pl.col("expected_bears_if_trip_happens").round(3),
]
)
return out
def optimize_robust_strategy_across_assumptions() -> tuple[tuple[str, ...], pl.DataFrame]:
best_order = None
best_min_score = float("-inf")
best_tiebreak = float("-inf")
for order in permutations(all_blocks, CHOICES_PER_PERSON):
per_assumption = []
for spec in assumption_specs:
q_map = assumption_q_maps[spec["name"]]
metrics = evaluate_order_metrics(order, q_map, bears_map)
per_assumption.append(metrics)
min_expected_bears = min(x["expected_bears"] for x in per_assumption)
min_trip_chance = min(x["p_trip"] for x in per_assumption)
if (min_expected_bears > best_min_score) or (
min_expected_bears == best_min_score and min_trip_chance > best_tiebreak
):
best_min_score = min_expected_bears
best_tiebreak = min_trip_chance
best_order = order
rows = []
for spec in assumption_specs:
q_map = assumption_q_maps[spec["name"]]
metrics = evaluate_order_metrics(best_order, q_map, bears_map)
rows.append(
{
"assumption": spec["name"],
"choice_1": best_order[0],
"choice_2": best_order[1],
"choice_3": best_order[2],
"choice_4": best_order[3],
"trip_chance_pct": metrics["p_trip"] * 100,
"expected_bears_per_application": metrics["expected_bears"],
"expected_bears_if_trip_happens": metrics["expected_bears_if_trip"],
}
)
robust_df = pl.DataFrame(rows).with_columns(
[
pl.col("trip_chance_pct").round(2),
pl.col("expected_bears_per_application").round(3),
pl.col("expected_bears_if_trip_happens").round(3),
]
)
return best_order, robust_df
Expected Bears Over Time¶
This chart shows expected bears seen per application window through the season under each lottery scenario (one series per scenario).
# Opening comparison: expected bears per window across lottery assumptions.
scenario_expected_df = block_df.select(["block", "start_date", "window", "avg_bears_day", "p_single"]).sort("start_date")
for spec in assumption_specs:
scenario_expected_df = scenario_expected_df.with_columns(
(spec["q_expr"].clip(0.0, 1.0) * pl.col("avg_bears_day")).alias(spec["name"])
)
scenario_names = [spec["name"] for spec in assumption_specs]
scenario3_name = "Scenario 3: Independent triple-hit model"
primary_names = [name for name in scenario_names if name != scenario3_name]
fig, ax_left = plt.subplots(figsize=(16, 8))
ax_right = ax_left.twinx()
dates = scenario_expected_df.get_column("start_date").to_list()
window_labels = [
f"{block} ({window})"
for block, window in zip(
scenario_expected_df.get_column("block").to_list(),
scenario_expected_df.get_column("window").to_list(),
)
]
primary_colors = ["#1f77b4", "#2ca02c"]
for i, name in enumerate(primary_names):
ax_left.plot(
dates,
scenario_expected_df.get_column(name).to_list(),
marker="o",
linewidth=2,
color=primary_colors[i % len(primary_colors)],
label=name,
)
ax_right.plot(
dates,
scenario_expected_df.get_column(scenario3_name).to_list(),
marker="s",
linewidth=2,
linestyle="--",
color="#d62728",
label=f"{scenario3_name} (right axis)",
)
ax_left.set_title("Expected Bears Seen Over Time by Lottery Scenario")
ax_left.set_xlabel("Window start date")
ax_left.set_ylabel("Expected bears per application window (Scenarios 1-2)")
ax_right.set_ylabel("Expected bears per application window (Scenario 3)")
ax_left.set_xticks(dates)
ax_left.set_xticklabels(window_labels, rotation=45, ha="right", fontsize=8)
ax_left.grid(alpha=0.25)
h1, l1 = ax_left.get_legend_handles_labels()
h2, l2 = ax_right.get_legend_handles_labels()
ax_left.legend(h1 + h2, l1 + l2, loc="upper right")
plt.tight_layout()
plt.show()
Expected Bears by Scenario and Window¶
This heatmap shows expected bears per application window (columns in date order) for each lottery scenario (rows).
# Heatmap view with separate scale for Scenario 3 due lower magnitude.
if "scenario_expected_df" not in globals():
scenario_expected_df = block_df.select(["block", "start_date", "window", "avg_bears_day", "p_single"]).sort("start_date")
for spec in assumption_specs:
scenario_expected_df = scenario_expected_df.with_columns(
(spec["q_expr"].clip(0.0, 1.0) * pl.col("avg_bears_day")).alias(spec["name"])
)
scenario_names = [spec["name"] for spec in assumption_specs]
scenario3_name = "Scenario 3: Independent triple-hit model"
primary_names = [name for name in scenario_names if name != scenario3_name]
window_labels = [
f"{block} ({window})"
for block, window in zip(
scenario_expected_df.get_column("block").to_list(),
scenario_expected_df.get_column("window").to_list(),
)
]
primary_values = [scenario_expected_df.get_column(name).to_list() for name in primary_names]
scenario3_values = [scenario_expected_df.get_column(scenario3_name).to_list()]
fig, (ax_top, ax_bottom) = plt.subplots(
2,
1,
figsize=(max(14, len(window_labels) * 0.7), 7.5),
gridspec_kw={"height_ratios": [2, 1]},
)
im_top = ax_top.imshow(primary_values, aspect="auto", cmap="YlGn", interpolation="nearest")
ax_top.set_title("Expected Bears Heatmap (Scenarios 1-2, shared scale)")
ax_top.set_ylabel("Lottery scenario")
ax_top.set_yticks(range(len(primary_names)))
ax_top.set_yticklabels(primary_names)
ax_top.set_xticks(range(len(window_labels)))
ax_top.set_xticklabels([])
threshold_top = max(max(row) for row in primary_values) * 0.55
for r, row in enumerate(primary_values):
for c, value in enumerate(row):
ax_top.text(
c,
r,
f"{value:.2f}",
ha="center",
va="center",
fontsize=7,
color="white" if value >= threshold_top else "black",
)
cbar_top = fig.colorbar(im_top, ax=ax_top)
cbar_top.set_label("Expected bears per application window (Scenarios 1-2)")
im_bottom = ax_bottom.imshow(scenario3_values, aspect="auto", cmap="YlOrRd", interpolation="nearest")
ax_bottom.set_title("Expected Bears Heatmap (Scenario 3, separate scale)")
ax_bottom.set_xlabel("Window (chronological)")
ax_bottom.set_ylabel("Lottery scenario")
ax_bottom.set_yticks([0])
ax_bottom.set_yticklabels([scenario3_name])
ax_bottom.set_xticks(range(len(window_labels)))
ax_bottom.set_xticklabels(window_labels, rotation=45, ha="right", fontsize=8)
threshold_bottom = max(max(row) for row in scenario3_values) * 0.55
for c, value in enumerate(scenario3_values[0]):
ax_bottom.text(
c,
0,
f"{value:.3f}",
ha="center",
va="center",
fontsize=7,
color="white" if value >= threshold_bottom else "black",
)
cbar_bottom = fig.colorbar(im_bottom, ax=ax_bottom)
cbar_bottom.set_label("Expected bears per application window (Scenario 3)")
plt.tight_layout()
plt.show()
# Precompute objective outputs once for semantic summary and appendix.
objective_1_results = summarize_objective_by_assumption(
objective="access",
objective_label="Maximize Trip Chance",
)
objective_2_results = summarize_objective_by_assumption(
objective="value",
objective_label="Maximize Overall Bear Value",
)
objective_3_results = summarize_objective_by_assumption(
objective="quality_with_floor",
objective_label="Maximize Quality with Trip-Chance Floor",
quality_min_trip_prob=QUALITY_MIN_TRIP_PROB,
)
robust_order, objective_4_results = optimize_robust_strategy_across_assumptions()
window_map = dict(zip(block_df.get_column("block").to_list(), block_df.get_column("window").to_list()))
Basic Strategies¶
- Safety-first: maximize your odds of getting any trip.
- Balanced: target strong overall value.
- Quality-first: prioritize the best viewing experience when the trip happens.
Each card below gives the exact 4 windows to submit (with dates), plus the likely range across lottery scenarios.
from IPython.display import Markdown
def order_from_assumption(df: pl.DataFrame, assumption_name: str) -> tuple[str, ...]:
row = df.filter(pl.col("assumption") == assumption_name).row(0, named=True)
return (row["choice_1"], row["choice_2"], row["choice_3"], row["choice_4"])
def evaluate_strategy_across_assumptions(order: tuple[str, ...]) -> pl.DataFrame:
rows = []
for spec in assumption_specs:
m = evaluate_order_metrics(order, assumption_q_maps[spec["name"]], bears_map)
rows.append(
{
"assumption": spec["name"],
"trip_chance_pct": m["p_trip"] * 100,
"xBears": m["expected_bears"],
"xBears_if_go": m["expected_bears_if_trip"],
}
)
return pl.DataFrame(rows)
SCENARIO_DEFAULT = "Scenario 1: Party-draw model"
strategy_specs = [
{
"strategy": "Safety-first",
"why": "Best if the group cares most about maximizing the chance of getting any trip.",
"order": tuple(robust_order),
},
{
"strategy": "Balanced",
"why": "Best if the group wants strong all-around value rather than extreme odds or extreme quality.",
"order": order_from_assumption(objective_2_results, SCENARIO_DEFAULT),
},
{
"strategy": "Quality-first",
"why": "Best if the group prioritizes the best bear viewing quality when the trip succeeds.",
"order": order_from_assumption(objective_3_results, SCENARIO_DEFAULT),
},
]
plot_rows = []
for spec in strategy_specs:
order = spec["order"]
eval_df = evaluate_strategy_across_assumptions(order)
trip_vals = eval_df.get_column("trip_chance_pct").to_list()
quality_vals = eval_df.get_column("xBears_if_go").to_list()
trip_min, trip_max = min(trip_vals), max(trip_vals)
quality_min, quality_max = min(quality_vals), max(quality_vals)
trip_mid = (trip_min + trip_max) / 2
quality_mid = (quality_min + quality_max) / 2
lines = [
f"### {spec['strategy']}",
spec["why"],
"",
"**Recommended window order**",
f"1. `{order[0]}` — {window_map[order[0]]}",
f"2. `{order[1]}` — {window_map[order[1]]}",
f"3. `{order[2]}` — {window_map[order[2]]}",
f"4. `{order[3]}` — {window_map[order[3]]}",
"",
f"**Chance of going (range):** {trip_min:.2f}% to {trip_max:.2f}%",
f"**Expected bears if you go (range):** {quality_min:.2f} to {quality_max:.2f}",
]
display(Markdown("\n".join(lines)))
plot_rows.append(
{
"strategy": spec["strategy"],
"trip_mid": trip_mid,
"trip_min": trip_min,
"trip_max": trip_max,
"quality_mid": quality_mid,
"quality_min": quality_min,
"quality_max": quality_max,
}
)
plot_df = pl.DataFrame(plot_rows)
fig, ax = plt.subplots(figsize=(10, 7))
colors = {
"Safety-first": "#4C78A8",
"Balanced": "#F58518",
"Quality-first": "#54A24B",
}
for row in plot_df.iter_rows(named=True):
x = row["trip_mid"]
y = row["quality_mid"]
xerr = [[x - row["trip_min"]], [row["trip_max"] - x]]
yerr = [[y - row["quality_min"]], [row["quality_max"] - y]]
ax.errorbar(
x,
y,
xerr=xerr,
yerr=yerr,
fmt="o",
capsize=6,
markersize=9,
color=colors[row["strategy"]],
label=row["strategy"],
)
ax.annotate(row["strategy"], (x, y), textcoords="offset points", xytext=(6, 6), fontsize=10)
ax.set_title("Strategy Map: Chance of Going vs Quality-if-Go (ranges across scenarios)")
ax.set_xlabel("Chance trip happens (%)")
ax.set_ylabel("Expected bears if trip happens")
ax.grid(alpha=0.25)
# De-duplicate legend entries.
handles, labels = ax.get_legend_handles_labels()
seen = set()
uniq_handles, uniq_labels = [], []
for h, l in zip(handles, labels):
if l not in seen:
seen.add(l)
uniq_handles.append(h)
uniq_labels.append(l)
ax.legend(uniq_handles, uniq_labels)
plt.show()
Safety-first¶
Best if the group cares most about maximizing the chance of getting any trip.
Recommended window order
R— Aug 14 - Aug 17A— Jun 7 - Jun 10S— Aug 18 - Aug 21T— Aug 22 - Aug 25
Chance of going (range): 1.27% to 77.89% Expected bears if you go (range): 11.88 to 13.48
Balanced¶
Best if the group wants strong all-around value rather than extreme odds or extreme quality.
Recommended window order
K— Jul 17 - Jul 20I— Jul 9 - Jul 12R— Aug 14 - Aug 17A— Jun 7 - Jun 10
Chance of going (range): 0.48% to 56.85% Expected bears if you go (range): 14.97 to 23.91
Quality-first¶
Best if the group prioritizes the best bear viewing quality when the trip succeeds.
Recommended window order
K— Jul 17 - Jul 20J— Jul 13 - Jul 16I— Jul 9 - Jul 12H— Jul 5 - Jul 8
Chance of going (range): 0.01% to 24.62% Expected bears if you go (range): 49.39 to 49.56
Appendix¶
Appendix A1: Detailed Output — Max Trip Chance¶
print("Detailed output: Objective 1 by lottery scenario")
display(objective_1_results)
Detailed output: Objective 1 by lottery scenario
| assumption | objective | choice_1 | choice_2 | choice_3 | choice_4 | trip_chance_pct | expected_bears_per_application | expected_bears_if_trip_happens |
|---|---|---|---|---|---|---|---|---|
| str | str | str | str | str | str | f64 | f64 | f64 |
| "Scenario 1: Party-draw model" | "Maximize Trip Chance" | "A" | "R" | "S" | "T" | 46.03 | 5.907 | 12.832 |
| "Scenario 2: One-hit + capacity… | "Maximize Trip Chance" | "A" | "T" | "R" | "S" | 77.89 | 9.821 | 12.608 |
| "Scenario 3: Independent triple… | "Maximize Trip Chance" | "A" | "R" | "S" | "T" | 1.27 | 0.151 | 11.88 |
Appendix A2: Detailed Output — Max Overall Bear Value¶
print("Detailed output: Objective 2 by lottery scenario")
display(objective_2_results)
Detailed output: Objective 2 by lottery scenario
| assumption | objective | choice_1 | choice_2 | choice_3 | choice_4 | trip_chance_pct | expected_bears_per_application | expected_bears_if_trip_happens |
|---|---|---|---|---|---|---|---|---|
| str | str | str | str | str | str | f64 | f64 | f64 |
| "Scenario 1: Party-draw model" | "Maximize Overall Bear Value" | "K" | "I" | "R" | "A" | 28.82 | 6.392 | 22.179 |
| "Scenario 2: One-hit + capacity… | "Maximize Overall Bear Value" | "K" | "I" | "H" | "A" | 46.68 | 13.854 | 29.678 |
| "Scenario 3: Independent triple… | "Maximize Overall Bear Value" | "R" | "A" | "S" | "T" | 1.27 | 0.151 | 11.881 |
Appendix A3: Detailed Output — Max Quality with Chance Floor¶
print("Detailed output: Objective 3 by lottery scenario")
display(objective_3_results)
Detailed output: Objective 3 by lottery scenario
| assumption | objective | choice_1 | choice_2 | choice_3 | choice_4 | trip_chance_pct | expected_bears_per_application | expected_bears_if_trip_happens | quality_floor_applied |
|---|---|---|---|---|---|---|---|---|---|
| str | str | str | str | str | str | f64 | f64 | f64 | bool |
| "Scenario 1: Party-draw model" | "Maximize Quality with Trip-Cha… | "K" | "J" | "I" | "H" | 10.56 | 5.226 | 49.496 | true |
| "Scenario 2: One-hit + capacity… | "Maximize Quality with Trip-Cha… | "K" | "J" | "I" | "H" | 24.62 | 12.203 | 49.56 | true |
| "Scenario 3: Independent triple… | "Maximize Quality with Trip-Cha… | "B" | "R" | "A" | "T" | 1.15 | 0.141 | 12.283 | true |
Appendix A4: Detailed Output — Robust Across Lottery Scenarios¶
print("Detailed output: Objective 4 robust strategy")
print("Robust window order:", robust_order)
display(objective_4_results)
Detailed output: Objective 4 robust strategy
Robust window order: ('R', 'A', 'S', 'T')
| assumption | choice_1 | choice_2 | choice_3 | choice_4 | trip_chance_pct | expected_bears_per_application | expected_bears_if_trip_happens |
|---|---|---|---|---|---|---|---|
| str | str | str | str | str | f64 | f64 | f64 |
| "Scenario 1: Party-draw model" | "R" | "A" | "S" | "T" | 46.03 | 5.94 | 12.903 |
| "Scenario 2: One-hit + capacity… | "R" | "A" | "S" | "T" | 77.89 | 10.499 | 13.478 |
| "Scenario 3: Independent triple… | "R" | "A" | "S" | "T" | 1.27 | 0.151 | 11.881 |
Appendix A5: Full Objective-Scenario Pair Visualization¶
objective_pair_comparison = pl.concat(
[
objective_1_results.with_columns(pl.lit("Objective 1: Max Trip Chance").alias("objective")),
objective_2_results.with_columns(pl.lit("Objective 2: Max Overall Bear Value").alias("objective")),
objective_3_results.with_columns(pl.lit("Objective 3: Max Quality w/ Chance Floor").alias("objective")),
objective_4_results.with_columns(pl.lit("Objective 4: Robust Across Scenarios").alias("objective")),
],
how="diagonal_relaxed",
).with_columns(
[
pl.col("expected_bears_per_application").alias("xBears"),
pl.col("expected_bears_if_trip_happens").alias("xBears_if_go"),
]
)
objective_name_map = {
"Objective 1: Max Trip Chance": "Max Trip Chance",
"Objective 2: Max Overall Bear Value": "Max Overall Bear Value",
"Objective 3: Max Quality w/ Chance Floor": "Max Quality with Chance Floor",
"Objective 4: Robust Across Scenarios": "Robust Across Lottery Scenarios",
}
assumption_name_map = {
"Scenario 1: Party-draw model": "Party-draw model",
"Scenario 2: One-hit + capacity model": "One-hit + capacity model",
"Scenario 3: Independent triple-hit model": "Independent triple-hit model",
}
plot_df = objective_pair_comparison.with_columns(
[
pl.col("objective").replace_strict(objective_name_map).alias("objective_name"),
pl.col("assumption").replace_strict(assumption_name_map).alias("assumption_name"),
]
).with_columns(
[
(pl.col("objective_name") + pl.lit(" | ") + pl.col("assumption_name")).alias("pair_label")
]
)
obj_colors = {
"Max Trip Chance": "#4C78A8",
"Max Overall Bear Value": "#F58518",
"Max Quality with Chance Floor": "#54A24B",
"Robust Across Lottery Scenarios": "#B279A2",
}
value_df = plot_df.sort("xBears", descending=True)
quality_df = plot_df.sort("xBears_if_go", descending=True)
value_labels = value_df.get_column("pair_label").to_list()
quality_labels = quality_df.get_column("pair_label").to_list()
value_vals = value_df.get_column("xBears").to_list()
quality_vals = quality_df.get_column("xBears_if_go").to_list()
value_cols = [obj_colors[x] for x in value_df.get_column("objective_name").to_list()]
quality_cols = [obj_colors[x] for x in quality_df.get_column("objective_name").to_list()]
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20, 10), sharey=False)
ax1.barh(value_labels, value_vals, color=value_cols)
ax1.invert_yaxis()
ax1.set_title("Overall Value Ranking (xBears)")
ax1.set_xlabel("Expected bears per application")
ax1.grid(axis="x", alpha=0.25)
ax2.barh(quality_labels, quality_vals, color=quality_cols)
ax2.invert_yaxis()
ax2.set_title("Quality-if-Go Ranking (xBears | go)")
ax2.set_xlabel("Expected bears if trip happens")
ax2.grid(axis="x", alpha=0.25)
from matplotlib.patches import Patch
handles = [Patch(color=c, label=o) for o, c in obj_colors.items()]
fig.legend(handles=handles, loc="lower center", ncol=2, title="Objective")
fig.suptitle("Objective-Scenario Pair Comparison", fontsize=15)
fig.tight_layout(rect=[0, 0.08, 1, 0.98])
plt.show()
best_value = value_df.row(0, named=True)
best_quality = quality_df.row(0, named=True)
print(f"Top xBears pair: {best_value['pair_label']} ({best_value['xBears']:.3f})")
print(f"Top xBears | go pair: {best_quality['pair_label']} ({best_quality['xBears_if_go']:.3f})")
window_map = dict(zip(block_df.get_column("block").to_list(), block_df.get_column("window").to_list()))
pair_detail_df = plot_df.select(
[
"objective_name",
"assumption_name",
"choice_1",
"choice_2",
"choice_3",
"choice_4",
]
).sort(["objective_name", "assumption_name"])
print("\nOptimal window picks (with dates) for each objective-scenario pair:")
try:
from IPython.display import Markdown
markdown_available = True
except Exception:
markdown_available = False
for row in pair_detail_df.iter_rows(named=True):
lines = [
f"### {row['objective_name']} | {row['assumption_name']}",
f"1. `{row['choice_1']}` — {window_map[row['choice_1']]}",
f"2. `{row['choice_2']}` — {window_map[row['choice_2']]}",
f"3. `{row['choice_3']}` — {window_map[row['choice_3']]}",
f"4. `{row['choice_4']}` — {window_map[row['choice_4']]}",
]
block = "\n".join(lines)
if markdown_available:
display(Markdown(block))
else:
print(block)
print()
Top xBears pair: Max Overall Bear Value | One-hit + capacity model (13.854) Top xBears | go pair: Max Quality with Chance Floor | One-hit + capacity model (49.560) Optimal window picks (with dates) for each objective-scenario pair:
Max Overall Bear Value | Independent triple-hit model¶
R— Aug 14 - Aug 17A— Jun 7 - Jun 10S— Aug 18 - Aug 21T— Aug 22 - Aug 25
Max Overall Bear Value | One-hit + capacity model¶
K— Jul 17 - Jul 20I— Jul 9 - Jul 12H— Jul 5 - Jul 8A— Jun 7 - Jun 10
Max Overall Bear Value | Party-draw model¶
K— Jul 17 - Jul 20I— Jul 9 - Jul 12R— Aug 14 - Aug 17A— Jun 7 - Jun 10
Max Quality with Chance Floor | Independent triple-hit model¶
B— Jun 11 - Jun 14R— Aug 14 - Aug 17A— Jun 7 - Jun 10T— Aug 22 - Aug 25
Max Quality with Chance Floor | One-hit + capacity model¶
K— Jul 17 - Jul 20J— Jul 13 - Jul 16I— Jul 9 - Jul 12H— Jul 5 - Jul 8
Max Quality with Chance Floor | Party-draw model¶
K— Jul 17 - Jul 20J— Jul 13 - Jul 16I— Jul 9 - Jul 12H— Jul 5 - Jul 8
Max Trip Chance | Independent triple-hit model¶
A— Jun 7 - Jun 10R— Aug 14 - Aug 17S— Aug 18 - Aug 21T— Aug 22 - Aug 25
Max Trip Chance | One-hit + capacity model¶
A— Jun 7 - Jun 10T— Aug 22 - Aug 25R— Aug 14 - Aug 17S— Aug 18 - Aug 21
Max Trip Chance | Party-draw model¶
A— Jun 7 - Jun 10R— Aug 14 - Aug 17S— Aug 18 - Aug 21T— Aug 22 - Aug 25
Robust Across Lottery Scenarios | Independent triple-hit model¶
R— Aug 14 - Aug 17A— Jun 7 - Jun 10S— Aug 18 - Aug 21T— Aug 22 - Aug 25
Robust Across Lottery Scenarios | One-hit + capacity model¶
R— Aug 14 - Aug 17A— Jun 7 - Jun 10S— Aug 18 - Aug 21T— Aug 22 - Aug 25
Robust Across Lottery Scenarios | Party-draw model¶
R— Aug 14 - Aug 17A— Jun 7 - Jun 10S— Aug 18 - Aug 21T— Aug 22 - Aug 25
Appendix A6: How Sensitive Are Results to Key Assumptions?¶
This section stress-tests one uncertain knob:
Minimum trip-chance floorin the quality objective (how strict to be about odds before optimizing quality).
Goal: check whether recommended windows are stable, or whether small floor changes cause recommendation flips.
import numpy as np
from IPython.display import Markdown
if "all_orders" not in globals():
all_orders = list(permutations(all_blocks, CHOICES_PER_PERSON))
def optimize_order_with_knobs(
objective: str,
q_map: dict[str, float],
quality_min_trip_prob: float = QUALITY_MIN_TRIP_PROB,
tie_eps: float = 1e-12,
) -> tuple[tuple[str, ...], dict, bool]:
best_order = None
best_metrics = None
best_score = float("-inf")
def objective_score(metrics: dict) -> float:
if objective == "access":
return metrics["p_trip"]
if objective == "value":
return metrics["expected_bears"]
if objective in {"quality", "quality_with_floor"}:
return metrics["expected_bears_if_trip"]
raise ValueError(f"Unknown objective: {objective}")
for order in all_orders:
metrics = evaluate_order_metrics(order, q_map, bears_map)
if objective == "quality_with_floor" and metrics["p_trip"] < quality_min_trip_prob:
continue
score = objective_score(metrics)
should_take = False
if score > best_score + tie_eps:
should_take = True
elif abs(score - best_score) <= tie_eps and best_metrics is not None:
# Stable tie-break: prefer better total expected value, then lexicographically smaller order.
if metrics["expected_bears"] > best_metrics["expected_bears"] + tie_eps:
should_take = True
elif abs(metrics["expected_bears"] - best_metrics["expected_bears"]) <= tie_eps and order < best_order:
should_take = True
if best_order is None:
should_take = True
if should_take:
best_score = score
best_order = order
best_metrics = metrics
used_floor = objective == "quality_with_floor"
if objective == "quality_with_floor" and best_order is None:
# If the chance floor is infeasible, fallback to pure quality.
best_order, best_metrics, _ = optimize_order_with_knobs(
objective="quality",
q_map=q_map,
quality_min_trip_prob=0.0,
tie_eps=tie_eps,
)
used_floor = False
return best_order, best_metrics, used_floor
def find_order_switches(df: pl.DataFrame, group_col: str, param_col: str) -> pl.DataFrame:
rows = []
for key in df.get_column(group_col).unique().to_list():
subset = df.filter(pl.col(group_col) == key).sort(param_col)
prev_order = None
prev_param = None
for row in subset.iter_rows(named=True):
order = (row["choice_1"], row["choice_2"], row["choice_3"], row["choice_4"])
if prev_order is None:
prev_order = order
prev_param = row[param_col]
continue
if order != prev_order:
rows.append(
{
group_col: key,
"from_" + param_col: prev_param,
"switch_at_" + param_col: row[param_col],
"old_order": " > ".join(prev_order),
"new_order": " > ".join(order),
}
)
prev_order = order
prev_param = row[param_col]
if not rows:
return pl.DataFrame(
schema={
group_col: pl.Utf8,
"from_" + param_col: pl.Float64,
"switch_at_" + param_col: pl.Float64,
"old_order": pl.Utf8,
"new_order": pl.Utf8,
}
)
return pl.DataFrame(rows)
assumption_short = {
"Scenario 1: Party-draw model": "Party-draw",
"Scenario 2: One-hit + capacity model": "One-hit+capacity",
"Scenario 3: Independent triple-hit model": "Triple-hit",
}
floor_grid = [0.005, 0.01, 0.02, 0.03, 0.05, 0.08]
floor_rows = []
for spec in assumption_specs:
name = spec["name"]
q_map = assumption_q_maps[name]
for floor in floor_grid:
order, metrics, used_floor = optimize_order_with_knobs(
objective="quality_with_floor",
q_map=q_map,
quality_min_trip_prob=floor,
)
floor_rows.append(
{
"assumption": name,
"quality_floor": floor,
"choice_1": order[0],
"choice_2": order[1],
"choice_3": order[2],
"choice_4": order[3],
"order_label": " > ".join(order),
"trip_chance_pct": metrics["p_trip"] * 100,
"xBears": metrics["expected_bears"],
"xBears_if_go": metrics["expected_bears_if_trip"],
"quality_floor_applied": used_floor,
}
)
floor_sensitivity_df = pl.DataFrame(floor_rows).with_columns(
[
pl.col("trip_chance_pct").round(2),
pl.col("xBears").round(3),
pl.col("xBears_if_go").round(3),
]
)
floor_switches_df = find_order_switches(floor_sensitivity_df, group_col="assumption", param_col="quality_floor")
floor_stability_df = floor_sensitivity_df.group_by("assumption").agg(
[
pl.col("order_label").n_unique().alias("distinct_optimal_orders"),
pl.col("trip_chance_pct").min().alias("min_trip_chance_pct"),
pl.col("trip_chance_pct").max().alias("max_trip_chance_pct"),
pl.col("xBears_if_go").min().alias("min_quality_if_go"),
pl.col("xBears_if_go").max().alias("max_quality_if_go"),
]
).with_columns(
[
pl.col("min_trip_chance_pct").round(2),
pl.col("max_trip_chance_pct").round(2),
pl.col("min_quality_if_go").round(3),
pl.col("max_quality_if_go").round(3),
]
).sort("assumption")
print("Quality objective sensitivity (varying minimum trip-chance floor):")
display(floor_stability_df)
if floor_switches_df.height > 0:
print("\nOrder switch points while varying quality floor:")
display(floor_switches_df.sort(["assumption", "switch_at_quality_floor"]))
else:
print("\nNo order switch points found for the tested quality-floor range.")
fig, ax = plt.subplots(1, 1, figsize=(8, 5))
for name in [spec["name"] for spec in assumption_specs]:
subset = floor_sensitivity_df.filter(pl.col("assumption") == name).sort("quality_floor")
ax.plot(
subset.get_column("quality_floor").to_list(),
subset.get_column("xBears_if_go").to_list(),
marker="o",
linewidth=2,
label=assumption_short[name],
)
ax.set_title("Quality Objective: Quality-if-Go vs Chance Floor")
ax.set_xlabel("Minimum trip-chance floor")
ax.set_ylabel("Expected bears if trip happens")
ax.grid(alpha=0.25)
ax.legend()
plt.tight_layout()
plt.show()
summary_lines = ["Sensitivity takeaway:"]
for row in floor_stability_df.iter_rows(named=True):
summary_lines.append(
f"- {assumption_short[row['assumption']]} (quality objective): "
f"{row['distinct_optimal_orders']} optimal order(s) across tested chance floors."
)
display(Markdown("\n".join(summary_lines)))
Quality objective sensitivity (varying minimum trip-chance floor):
| assumption | distinct_optimal_orders | min_trip_chance_pct | max_trip_chance_pct | min_quality_if_go | max_quality_if_go |
|---|---|---|---|---|---|
| str | u32 | f64 | f64 | f64 | f64 |
| "Scenario 1: Party-draw model" | 1 | 10.56 | 10.56 | 49.496 | 49.496 |
| "Scenario 2: One-hit + capacity… | 1 | 24.62 | 24.62 | 49.56 | 49.56 |
| "Scenario 3: Independent triple… | 3 | 0.01 | 1.15 | 12.283 | 49.393 |
Order switch points while varying quality floor:
| assumption | from_quality_floor | switch_at_quality_floor | old_order | new_order |
|---|---|---|---|---|
| str | f64 | f64 | str | str |
| "Scenario 3: Independent triple… | 0.005 | 0.01 | "F > Q > R > A" | "B > R > A > T" |
| "Scenario 3: Independent triple… | 0.01 | 0.02 | "B > R > A > T" | "K > J > I > H" |
Sensitivity takeaway:
- Party-draw (quality objective): 1 optimal order(s) across tested chance floors.
- One-hit+capacity (quality objective): 1 optimal order(s) across tested chance floors.
- Triple-hit (quality objective): 3 optimal order(s) across tested chance floors.
Appendix A7: Which Recommended Strategies Are Strictly Inferior?¶
A strategy is marked dominated if there exists another 4-window order in the same scenario with:
- equal or better
xBears, and - equal or better
xBears | go, - with at least one strictly better.
This checks whether any recommended strategy is clearly beaten on both overall value and quality-conditional value.
if "all_orders" not in globals():
all_orders = list(permutations(all_blocks, CHOICES_PER_PERSON))
def evaluate_all_orders_for_assumption(q_map: dict[str, float]) -> pl.DataFrame:
rows = []
for order in all_orders:
metrics = evaluate_order_metrics(order, q_map, bears_map)
rows.append(
{
"choice_1": order[0],
"choice_2": order[1],
"choice_3": order[2],
"choice_4": order[3],
"xBears": metrics["expected_bears"],
"xBears_if_go": metrics["expected_bears_if_trip"],
"trip_chance_pct": metrics["p_trip"] * 100,
}
)
return pl.DataFrame(rows)
recommended_specs = [
("Max Trip Chance", objective_1_results),
("Max Overall Bear Value", objective_2_results),
("Max Quality with Chance Floor", objective_3_results),
("Robust Across Lottery Scenarios", objective_4_results),
]
recommended_rows = []
for objective_name, source_df in recommended_specs:
for row in source_df.iter_rows(named=True):
order = (row["choice_1"], row["choice_2"], row["choice_3"], row["choice_4"])
scenario = row["assumption"]
precise = evaluate_order_metrics(order, assumption_q_maps[scenario], bears_map)
recommended_rows.append(
{
"assumption": scenario,
"objective_name": objective_name,
"choice_1": order[0],
"choice_2": order[1],
"choice_3": order[2],
"choice_4": order[3],
"recommended_order": " > ".join(order),
"xBears": precise["expected_bears"],
"xBears_if_go": precise["expected_bears_if_trip"],
"trip_chance_pct": precise["p_trip"] * 100,
}
)
recommended_pairs_df = pl.DataFrame(recommended_rows)
all_order_metrics = {
spec["name"]: evaluate_all_orders_for_assumption(assumption_q_maps[spec["name"]])
for spec in assumption_specs
}
pareto_rows = []
eps = 1e-12
for rec in recommended_pairs_df.iter_rows(named=True):
scenario = rec["assumption"]
universe = all_order_metrics[scenario]
x = universe.get_column("xBears").to_numpy()
y = universe.get_column("xBears_if_go").to_numpy()
x0 = float(rec["xBears"])
y0 = float(rec["xBears_if_go"])
dominating_mask = ((x >= x0 - eps) & (y >= y0 - eps) & ((x > x0 + eps) | (y > y0 + eps)))
dominated = bool(dominating_mask.any())
dominating_order = None
if dominated:
idx = int(np.where(dominating_mask)[0][0])
dom = universe.row(idx, named=True)
dominating_order = f"{dom['choice_1']} > {dom['choice_2']} > {dom['choice_3']} > {dom['choice_4']}"
pareto_rows.append(
{
"assumption": scenario,
"objective_name": rec["objective_name"],
"recommended_order": rec["recommended_order"],
"xBears": x0,
"xBears_if_go": y0,
"dominated": dominated,
"example_dominating_order": dominating_order,
"xBears_percentile": float((x <= x0 + eps).mean() * 100),
"xBears_if_go_percentile": float((y <= y0 + eps).mean() * 100),
}
)
pareto_check_df = pl.DataFrame(pareto_rows).with_columns(
[
pl.col("xBears").round(3),
pl.col("xBears_if_go").round(3),
pl.col("xBears_percentile").round(1),
pl.col("xBears_if_go_percentile").round(1),
]
).sort(["assumption", "objective_name"])
display(pareto_check_df)
obj_colors = {
"Max Trip Chance": "#4C78A8",
"Max Overall Bear Value": "#F58518",
"Max Quality with Chance Floor": "#54A24B",
"Robust Across Lottery Scenarios": "#B279A2",
}
assumption_short = {
"Scenario 1: Party-draw model": "Scenario 1",
"Scenario 2: One-hit + capacity model": "Scenario 2",
"Scenario 3: Independent triple-hit model": "Scenario 3",
}
fig, axes = plt.subplots(1, 3, figsize=(19, 5), sharey=True)
for ax, spec in zip(axes, assumption_specs):
name = spec["name"]
universe = all_order_metrics[name]
recs = pareto_check_df.filter(pl.col("assumption") == name)
ax.scatter(
universe.get_column("xBears").to_list(),
universe.get_column("xBears_if_go").to_list(),
s=7,
alpha=0.08,
color="#808080",
label="All possible 4-window orders",
)
for row in recs.iter_rows(named=True):
marker = "X" if row["dominated"] else "o"
ax.scatter(
row["xBears"],
row["xBears_if_go"],
s=120,
marker=marker,
color=obj_colors[row["objective_name"]],
edgecolor="black",
linewidth=0.5,
)
short_label = row["objective_name"].replace("Max ", "").replace(" Across Lottery Scenarios", "")
ax.annotate(short_label, (row["xBears"], row["xBears_if_go"]), textcoords="offset points", xytext=(4, 4), fontsize=8)
ax.set_title(f"{assumption_short[name]}: Dominance Map")
ax.set_xlabel("xBears")
ax.grid(alpha=0.2)
axes[0].set_ylabel("xBears | go")
from matplotlib.patches import Patch
legend_handles = [Patch(color=c, label=k) for k, c in obj_colors.items()]
fig.legend(handles=legend_handles, loc="lower center", ncol=2, title="Recommended strategy type")
fig.suptitle("Dominance Check: Recommended Strategies vs All Possible Orders", fontsize=14)
plt.tight_layout(rect=[0, 0.10, 1, 0.95])
plt.show()
n_dominated = pareto_check_df.filter(pl.col("dominated")).height
if n_dominated == 0:
display(Markdown("No recommended objective-scenario strategy is strictly dominated on both xBears and xBears | go."))
else:
dominated_slice = pareto_check_df.filter(pl.col("dominated")).select(["assumption", "objective_name", "example_dominating_order"])
display(Markdown(f"{n_dominated} recommended strategy pair(s) are strictly dominated; see table above for examples."))
display(dominated_slice)
| assumption | objective_name | recommended_order | xBears | xBears_if_go | dominated | example_dominating_order | xBears_percentile | xBears_if_go_percentile |
|---|---|---|---|---|---|---|---|---|
| str | str | str | f64 | f64 | bool | str | f64 | f64 |
| "Scenario 1: Party-draw model" | "Max Overall Bear Value" | "K > I > R > A" | 6.392 | 22.179 | false | null | 100.0 | 59.7 |
| "Scenario 1: Party-draw model" | "Max Quality with Chance Floor" | "K > J > I > H" | 5.226 | 49.496 | false | null | 80.4 | 100.0 |
| "Scenario 1: Party-draw model" | "Max Trip Chance" | "A > R > S > T" | 5.907 | 12.832 | true | "A > B > I > R" | 99.2 | 0.5 |
| "Scenario 1: Party-draw model" | "Robust Across Lottery Scenario… | "R > A > S > T" | 5.94 | 12.903 | true | "A > B > K > R" | 99.4 | 0.6 |
| "Scenario 2: One-hit + capacity… | "Max Overall Bear Value" | "K > I > H > A" | 13.854 | 29.678 | false | null | 100.0 | 88.1 |
| "Scenario 2: One-hit + capacity… | "Max Quality with Chance Floor" | "K > J > I > H" | 12.203 | 49.56 | false | null | 98.7 | 100.0 |
| "Scenario 2: One-hit + capacity… | "Max Trip Chance" | "A > T > R > S" | 9.821 | 12.608 | true | "A > B > C > D" | 45.2 | 0.6 |
| "Scenario 2: One-hit + capacity… | "Robust Across Lottery Scenario… | "R > A > S > T" | 10.499 | 13.478 | true | "A > B > F > I" | 71.2 | 2.2 |
| "Scenario 3: Independent triple… | "Max Overall Bear Value" | "R > A > S > T" | 0.151 | 11.881 | false | null | 100.0 | 19.5 |
| "Scenario 3: Independent triple… | "Max Quality with Chance Floor" | "B > R > A > T" | 0.141 | 12.283 | false | null | 100.0 | 20.1 |
| "Scenario 3: Independent triple… | "Max Trip Chance" | "A > R > S > T" | 0.151 | 11.88 | true | "R > A > S > T" | 100.0 | 19.5 |
| "Scenario 3: Independent triple… | "Robust Across Lottery Scenario… | "R > A > S > T" | 0.151 | 11.881 | false | null | 100.0 | 19.5 |
5 recommended strategy pair(s) are strictly dominated; see table above for examples.
| assumption | objective_name | example_dominating_order |
|---|---|---|
| str | str | str |
| "Scenario 1: Party-draw model" | "Max Trip Chance" | "A > B > I > R" |
| "Scenario 1: Party-draw model" | "Robust Across Lottery Scenario… | "A > B > K > R" |
| "Scenario 2: One-hit + capacity… | "Max Trip Chance" | "A > B > C > D" |
| "Scenario 2: One-hit + capacity… | "Robust Across Lottery Scenario… | "A > B > F > I" |
| "Scenario 3: Independent triple… | "Max Trip Chance" | "R > A > S > T" |
This check helps prevent accidental "lose-lose" recommendations. If a strategy is dominated, there is a clearly better option under the same lottery model on both total expected bears and quality conditional on success.
Appendix A8: Scenario 2 Capacity Calibration and Switch Points¶
Scenario 2 uses an uncertain capacity factor (c) in:
q_block = c * (1 - (1 - p_single)^3)
How this appendix calibrates c:
- Build a base one-hit probability for each window:
one_hit_prob = 1 - (1 - p_single)^3 - Sweep
con a grid from0.25to1.00in0.05steps. - For each
cand each objective (trip chance, total value, quality-with-floor), re-optimize the 4-window order. - Record:
- intervals where the optimal order stays constant (
capacity_intervals_df) - switch points where the optimal order changes (
capacity_switch_df)
- intervals where the optimal order stays constant (
Use this as a robustness check: if your recommended order is stable across plausible c, the strategy is less sensitive to unknown capacity mechanics.
if "all_orders" not in globals():
all_orders = list(permutations(all_blocks, CHOICES_PER_PERSON))
scenario2_base = block_df.select(
[
pl.col("block"),
(1 - (1 - pl.col("p_single")) ** GROUP_SIZE).alias("one_hit_prob"),
]
)
scenario2_base_map = dict(zip(scenario2_base.get_column("block").to_list(), scenario2_base.get_column("one_hit_prob").to_list()))
capacity_grid = [round(x, 2) for x in np.arange(0.25, 1.01, 0.05)]
objective_grid = [
("access", "Max Trip Chance"),
("value", "Max Overall Bear Value"),
("quality_with_floor", "Max Quality with Chance Floor"),
]
capacity_rows = []
for cap in capacity_grid:
q_map = {b: min(max(scenario2_base_map[b] * cap, 0.0), 1.0) for b in all_blocks}
for objective_key, objective_name in objective_grid:
order, metrics, used_floor = optimize_order_with_knobs(
objective=objective_key,
q_map=q_map,
quality_min_trip_prob=QUALITY_MIN_TRIP_PROB,
)
capacity_rows.append(
{
"capacity_factor": cap,
"objective_name": objective_name,
"choice_1": order[0],
"choice_2": order[1],
"choice_3": order[2],
"choice_4": order[3],
"order_label": " > ".join(order),
"trip_chance_pct": metrics["p_trip"] * 100,
"xBears": metrics["expected_bears"],
"xBears_if_go": metrics["expected_bears_if_trip"],
"quality_floor_applied": used_floor,
}
)
capacity_sweep_df = pl.DataFrame(capacity_rows).with_columns(
[
pl.col("trip_chance_pct").round(2),
pl.col("xBears").round(3),
pl.col("xBears_if_go").round(3),
]
)
switch_rows = []
interval_rows = []
for objective_name in [x[1] for x in objective_grid]:
subset = capacity_sweep_df.filter(pl.col("objective_name") == objective_name).sort("capacity_factor")
prev_order = None
prev_cap = None
interval_start = None
for row in subset.iter_rows(named=True):
order = (row["choice_1"], row["choice_2"], row["choice_3"], row["choice_4"])
cap = row["capacity_factor"]
if prev_order is None:
prev_order = order
prev_cap = cap
interval_start = cap
continue
if order != prev_order:
switch_rows.append(
{
"objective_name": objective_name,
"from_capacity": prev_cap,
"switch_at_capacity": cap,
"old_order": " > ".join(prev_order),
"new_order": " > ".join(order),
}
)
interval_rows.append(
{
"objective_name": objective_name,
"from_capacity": interval_start,
"to_capacity": prev_cap,
"order_label": " > ".join(prev_order),
}
)
interval_start = cap
prev_order = order
prev_cap = cap
interval_rows.append(
{
"objective_name": objective_name,
"from_capacity": interval_start,
"to_capacity": prev_cap,
"order_label": " > ".join(prev_order),
}
)
capacity_switch_df = pl.DataFrame(switch_rows) if switch_rows else pl.DataFrame(
schema={
"objective_name": pl.Utf8,
"from_capacity": pl.Float64,
"switch_at_capacity": pl.Float64,
"old_order": pl.Utf8,
"new_order": pl.Utf8,
}
)
capacity_intervals_df = pl.DataFrame(interval_rows).sort(["objective_name", "from_capacity"])
print("Optimal-order intervals by Scenario 2 capacity factor:")
display(capacity_intervals_df)
if capacity_switch_df.height > 0:
print("\nCapacity switch points:")
display(capacity_switch_df.sort(["objective_name", "switch_at_capacity"]))
else:
print("\nNo switch points in the tested capacity range.")
fig, axes = plt.subplots(3, 1, figsize=(12, 11), sharex=True)
for ax, objective_name in zip(axes, [x[1] for x in objective_grid]):
subset = capacity_sweep_df.filter(pl.col("objective_name") == objective_name).sort("capacity_factor")
x_vals = subset.get_column("capacity_factor").to_list()
xbears_vals = subset.get_column("xBears").to_list()
chance_vals = subset.get_column("trip_chance_pct").to_list()
ax.plot(x_vals, xbears_vals, marker="o", linewidth=2, color="#1f77b4", label="xBears")
ax.set_ylabel("xBears", color="#1f77b4")
ax.tick_params(axis="y", labelcolor="#1f77b4")
ax.grid(alpha=0.2)
ax2 = ax.twinx()
ax2.plot(x_vals, chance_vals, marker="s", linewidth=2, linestyle="--", color="#e76f51", label="Trip chance %")
ax2.set_ylabel("Trip chance %", color="#e76f51")
ax2.tick_params(axis="y", labelcolor="#e76f51")
if capacity_switch_df.height > 0:
for sw in capacity_switch_df.filter(pl.col("objective_name") == objective_name).iter_rows(named=True):
ax.axvline(sw["switch_at_capacity"], color="gray", linestyle=":", alpha=0.6)
ax.set_title(f"{objective_name}: performance vs Scenario 2 capacity factor")
axes[-1].set_xlabel("Scenario 2 capacity factor (c)")
plt.tight_layout()
plt.show()
summary_lines = ["Scenario 2 calibration takeaway:"]
for row in capacity_intervals_df.iter_rows(named=True):
summary_lines.append(
f"- {row['objective_name']}: use `{row['order_label']}` for c in [{row['from_capacity']:.2f}, {row['to_capacity']:.2f}]"
)
display(Markdown("\n".join(summary_lines)))
Optimal-order intervals by Scenario 2 capacity factor:
| objective_name | from_capacity | to_capacity | order_label |
|---|---|---|---|
| str | f64 | f64 | str |
| "Max Overall Bear Value" | 0.25 | 0.5 | "K > I > R > A" |
| "Max Overall Bear Value" | 0.55 | 1.0 | "K > I > H > A" |
| "Max Quality with Chance Floor" | 0.25 | 1.0 | "K > J > I > H" |
| "Max Trip Chance" | 0.25 | 1.0 | "R > A > S > T" |
Capacity switch points:
| objective_name | from_capacity | switch_at_capacity | old_order | new_order |
|---|---|---|---|---|
| str | f64 | f64 | str | str |
| "Max Overall Bear Value" | 0.5 | 0.55 | "K > I > R > A" | "K > I > H > A" |
Scenario 2 calibration takeaway:
- Max Overall Bear Value: use
K > I > R > Afor c in [0.25, 0.50] - Max Overall Bear Value: use
K > I > H > Afor c in [0.55, 1.00] - Max Quality with Chance Floor: use
K > J > I > Hfor c in [0.25, 1.00] - Max Trip Chance: use
R > A > S > Tfor c in [0.25, 1.00]
Appendix A9: Capacity Model Worked Example (Step-by-Step)¶
This worked example shows exactly how Scenario 2 converts listed draw odds into trip-level outcomes across a 4-window order.
For each ranked window k, the table computes:
p_single_k: listed individual draw probabilityone_hit_k = 1 - (1 - p_single_k)^3q_k = c * one_hit_k, withc = CAPACITY_FACTORp_this_k = p_no_prev_k * q_kwherep_no_prev_k = Î _{j<k}(1 - q_j)xBears_contrib_k = p_this_k * avg_bears_day_k
Summing p_this_k gives total trip chance, and summing xBears_contrib_k gives expected bears per application.
scenario2_name = "Scenario 2: One-hit + capacity model"
if "objective_2_results" in globals() and objective_2_results.height > 0:
row = objective_2_results.filter(pl.col("assumption") == scenario2_name).row(0, named=True)
worked_order = (row["choice_1"], row["choice_2"], row["choice_3"], row["choice_4"])
else:
worked_order = tuple(
block_df.sort("draw_pct", descending=True).get_column("block").head(CHOICES_PER_PERSON).to_list()
)
p_single_map = dict(zip(block_df.get_column("block").to_list(), block_df.get_column("p_single").to_list()))
bears_map_local = dict(zip(block_df.get_column("block").to_list(), block_df.get_column("avg_bears_day").to_list()))
window_map_local = dict(zip(block_df.get_column("block").to_list(), block_df.get_column("window").to_list()))
rows = []
p_no_prev = 1.0
cum_trip = 0.0
cum_xbears = 0.0
for rank, block in enumerate(worked_order, start=1):
p_single = float(p_single_map[block])
one_hit = 1 - (1 - p_single) ** GROUP_SIZE
q = max(0.0, min(1.0, CAPACITY_FACTOR * one_hit))
p_this = p_no_prev * q
xbears_contrib = p_this * float(bears_map_local[block])
cum_trip += p_this
cum_xbears += xbears_contrib
rows.append(
{
"rank": rank,
"block": block,
"window": window_map_local[block],
"p_single": p_single,
"one_hit_prob": one_hit,
"q_capacity_adjusted": q,
"p_no_prev_before": p_no_prev,
"p_this_window": p_this,
"avg_bears_day": float(bears_map_local[block]),
"xBears_contrib": xbears_contrib,
"cum_trip_prob": cum_trip,
"cum_xBears": cum_xbears,
}
)
p_no_prev *= (1 - q)
worked_capacity_df = pl.DataFrame(rows).with_columns(
[
pl.col("p_single").round(4),
pl.col("one_hit_prob").round(4),
pl.col("q_capacity_adjusted").round(4),
pl.col("p_no_prev_before").round(4),
pl.col("p_this_window").round(4),
pl.col("avg_bears_day").round(2),
pl.col("xBears_contrib").round(3),
pl.col("cum_trip_prob").round(4),
pl.col("cum_xBears").round(3),
]
)
print(f"Worked order under {scenario2_name}: {' > '.join(worked_order)}")
print(f"Using c = {CAPACITY_FACTOR:.2f}, group size = {GROUP_SIZE}")
display(worked_capacity_df)
print(f"\nTotal trip chance = {cum_trip * 100:.2f}%")
print(f"Expected bears per application = {cum_xbears:.3f}")
print(f"Expected bears if trip happens = {(cum_xbears / cum_trip):.3f}" if cum_trip > 0 else "Expected bears if trip happens = 0.000")
Worked order under Scenario 2: One-hit + capacity model: K > I > H > A Using c = 0.85, group size = 3
| rank | block | window | p_single | one_hit_prob | q_capacity_adjusted | p_no_prev_before | p_this_window | avg_bears_day | xBears_contrib | cum_trip_prob | cum_xBears |
|---|---|---|---|---|---|---|---|---|---|---|---|
| i64 | str | str | f64 | f64 | f64 | f64 | f64 | f64 | f64 | f64 | f64 |
| 1 | "K" | "Jul 17 - Jul 20" | 0.03 | 0.0873 | 0.0742 | 1.0 | 0.0742 | 51.0 | 3.786 | 0.0742 | 3.786 |
| 2 | "I" | "Jul 9 - Jul 12" | 0.03 | 0.0873 | 0.0742 | 0.9258 | 0.0687 | 49.75 | 3.419 | 0.1429 | 7.204 |
| 3 | "H" | "Jul 5 - Jul 8" | 0.03 | 0.0873 | 0.0742 | 0.8571 | 0.0636 | 47.25 | 3.006 | 0.2066 | 10.21 |
| 4 | "A" | "Jun 7 - Jun 10" | 0.15 | 0.3859 | 0.328 | 0.7934 | 0.2602 | 14.0 | 3.643 | 0.4668 | 13.854 |
Total trip chance = 46.68% Expected bears per application = 13.854 Expected bears if trip happens = 29.678
Appendix A10: Access-risk day reference¶
This is a planning reference only. Each row shows the requested window, whether any day is flagged, and the exact flagged dates. A flagged date means the ADFG row includes the access-risk tide marker and may require extra float logistics.
# Compact access-risk reference (planning only).
risk_dates_by_block = (
daily_df.filter(pl.col("access_risk_day"))
.group_by("block")
.agg(pl.col("date").dt.strftime("%b %-d").alias("risk_dates"))
)
window_access_risk_flags = (
block_df.select(["block", "window", "has_access_risk_day"])
.join(
risk_dates_by_block,
on="block",
how="left",
)
.with_columns([
pl.col("risk_dates")
.fill_null([])
.alias("risk_dates"),
pl.col("risk_dates").list.len().alias("risk_day_count"),
pl.col("risk_dates").list.join(", ").fill_null("No flagged days").alias("risk_days"),
])
.select([
"block",
"window",
pl.col('has_access_risk_day').cast(pl.Int8),
"risk_day_count",
"risk_days",
])
.sort("window")
)
display(window_access_risk_flags)
| block | window | has_access_risk_day | risk_day_count | risk_days |
|---|---|---|---|---|
| str | str | i8 | u32 | str |
| "Q" | "Aug 10 - Aug 13" | 0 | null | "No flagged days" |
| "R" | "Aug 14 - Aug 17" | 0 | null | "No flagged days" |
| "S" | "Aug 18 - Aug 21" | 1 | 2 | "Aug 20, Aug 21" |
| "O" | "Aug 2 - Aug 5" | 0 | null | "No flagged days" |
| "T" | "Aug 22 - Aug 25" | 1 | 3 | "Aug 22, Aug 23, Aug 24" |
| "P" | "Aug 6 - Aug 9" | 1 | 4 | "Aug 6, Aug 7, Aug 8, Aug 9" |
| "G" | "Jul 1 - Jul 4" | 0 | null | "No flagged days" |
| "J" | "Jul 13 - Jul 16" | 0 | null | "No flagged days" |
| "K" | "Jul 17 - Jul 20" | 0 | null | "No flagged days" |
| "L" | "Jul 21 - Jul 24" | 1 | 4 | "Jul 21, Jul 22, Jul 23, Jul 24" |
| "M" | "Jul 25 - Jul 28" | 1 | 2 | "Jul 25, Jul 26" |
| "N" | "Jul 29 - Aug 1" | 0 | null | "No flagged days" |
| "H" | "Jul 5 - Jul 8" | 1 | 2 | "Jul 7, Jul 8" |
| "I" | "Jul 9 - Jul 12" | 1 | 3 | "Jul 9, Jul 10, Jul 11" |
| "B" | "Jun 11 - Jun 14" | 1 | 1 | "Jun 11" |
| "C" | "Jun 15 - Jun 18" | 0 | null | "No flagged days" |
| "D" | "Jun 19 - Jun 22" | 1 | 2 | "Jun 21, Jun 22" |
| "E" | "Jun 23 - Jun 26" | 1 | 4 | "Jun 23, Jun 24, Jun 25, Jun 26" |
| "F" | "Jun 27 - Jun 30" | 1 | 1 | "Jun 27" |
| "A" | "Jun 7 - Jun 10" | 1 | 4 | "Jun 7, Jun 8, Jun 9, Jun 10" |
tradeoff_df = block_df.select(["block", "window", "draw_pct", "avg_bears_day", "has_access_risk_day"])
corr = tradeoff_df.select(pl.corr("draw_pct", "avg_bears_day")).item()
print(f"Correlation(draw_pct, avg_bears_day): {corr:.3f}")
Correlation(draw_pct, avg_bears_day): -0.754
Appendix A11: Draw % vs Bear Volume Model Comparison¶
import numpy as np
model_df = block_df.select(["draw_pct", "avg_bears_day"]).drop_nulls()
x = np.array(model_df.get_column("draw_pct"), dtype=float)
y = np.array(model_df.get_column("avg_bears_day"), dtype=float)
n = len(x)
if np.any(x <= 0):
raise ValueError("draw_pct must be strictly positive for reciprocal/log models")
def fit_ols(X: np.ndarray, y: np.ndarray):
beta, *_ = np.linalg.lstsq(X, y, rcond=None)
y_hat = X @ beta
sse = float(np.sum((y - y_hat) ** 2))
return beta, y_hat, sse
def fit_metrics(y: np.ndarray, y_hat: np.ndarray, sse: float, k_params: int) -> dict[str, float]:
n = len(y)
ss_tot = float(np.sum((y - y.mean()) ** 2))
r2 = 1.0 - sse / ss_tot if ss_tot > 0 else float("nan")
p = k_params - 1
adj_r2 = 1.0 - (1.0 - r2) * (n - 1) / (n - p - 1) if n > (p + 1) else float("nan")
rmse = float(np.sqrt(sse / n))
if sse <= 0:
aic = float("-inf")
else:
aic = n * np.log(sse / n) + 2 * k_params
aicc = aic + (2 * k_params * (k_params + 1)) / (n - k_params - 1) if n > (k_params + 1) else float("nan")
return {"r2": r2, "adj_r2": adj_r2, "rmse": rmse, "aicc": float(aicc)}
def design_linear(xv: np.ndarray) -> np.ndarray:
return np.column_stack([np.ones_like(xv), xv])
def design_reciprocal(xv: np.ndarray) -> np.ndarray:
return np.column_stack([np.ones_like(xv), 1.0 / xv])
def design_log(xv: np.ndarray) -> np.ndarray:
return np.column_stack([np.ones_like(xv), np.log(xv)])
def design_piecewise(xv: np.ndarray, knot: float) -> np.ndarray:
return np.column_stack([np.ones_like(xv), xv, np.maximum(0.0, xv - knot)])
def fit_piecewise_best(x_train: np.ndarray, y_train: np.ndarray):
uniq = np.unique(x_train)
candidates = uniq[2:-2] if len(uniq) >= 6 else uniq[1:-1]
if len(candidates) == 0:
candidates = np.array([float(np.median(uniq))])
best = None
for knot in candidates:
X = design_piecewise(x_train, float(knot))
beta, y_hat, sse = fit_ols(X, y_train)
m = fit_metrics(y_train, y_hat, sse, k_params=3)
score = m["aicc"] if np.isfinite(m["aicc"]) else m["rmse"]
if best is None or score < best["score"]:
best = {
"knot": float(knot),
"beta": beta,
"y_hat": y_hat,
"sse": sse,
"metrics": m,
"score": float(score),
}
return best
def loocv_rmse(model_name: str) -> float:
preds = np.zeros_like(y)
for i in range(n):
mask = np.arange(n) != i
x_tr, y_tr = x[mask], y[mask]
x_te = np.array([x[i]], dtype=float)
if model_name == "piecewise":
fit = fit_piecewise_best(x_tr, y_tr)
X_te = design_piecewise(x_te, fit["knot"])
preds[i] = float((X_te @ fit["beta"])[0])
else:
design_fn = {
"linear": design_linear,
"reciprocal": design_reciprocal,
"log": design_log,
}[model_name]
X_tr = design_fn(x_tr)
beta, _, _ = fit_ols(X_tr, y_tr)
X_te = design_fn(x_te)
preds[i] = float((X_te @ beta)[0])
return float(np.sqrt(np.mean((y - preds) ** 2)))
fits = {}
for name, design_fn in [
("linear", design_linear),
("reciprocal", design_reciprocal),
("log", design_log),
]:
X = design_fn(x)
beta, y_hat, sse = fit_ols(X, y)
fits[name] = {
"beta": beta,
"y_hat": y_hat,
"sse": sse,
"metrics": fit_metrics(y, y_hat, sse, k_params=X.shape[1]),
}
fits["piecewise"] = fit_piecewise_best(x, y)
rows = []
for name in ["linear", "reciprocal", "log", "piecewise"]:
m = fits[name]["metrics"]
rows.append(
{
"model": name,
"r2": m["r2"],
"adj_r2": m["adj_r2"],
"rmse": m["rmse"],
"aicc": m["aicc"],
"loocv_rmse": loocv_rmse(name),
"knot": fits[name].get("knot", float("nan")),
}
)
metrics_df = pl.DataFrame(rows).with_columns(
[
pl.col("r2").round(3),
pl.col("adj_r2").round(3),
pl.col("rmse").round(3),
pl.col("aicc").round(2),
pl.col("loocv_rmse").round(3),
pl.col("knot").round(3),
]
).sort("loocv_rmse")
print("Model comparison for draw_pct -> avg_bears_day (lower LOOCV RMSE is better):")
display(metrics_df)
best_model = metrics_df.row(0, named=True)["model"]
print(f"Best model by LOOCV RMSE: {best_model}")
lin_beta = fits["linear"]["beta"]
print(f"Linear reference: avg_bears_day = {lin_beta[0]:.3f} + ({lin_beta[1]:.3f}) * draw_pct")
print(f"Linear slope interpretation: +1 draw-point corresponds to {lin_beta[1]:.3f} bears/day on average.")
x_line = np.linspace(float(x.min()), float(x.max()), 400)
fig, ax = plt.subplots(figsize=(10, 6))
ax.scatter(x, y, s=70, alpha=0.85, label="Windows")
colors = {"linear": "#d62728", "reciprocal": "#1f77b4", "log": "#2ca02c", "piecewise": "#9467bd"}
for name in ["linear", "reciprocal", "log", "piecewise"]:
if name == "piecewise":
knot = fits[name]["knot"]
y_line = design_piecewise(x_line, knot) @ fits[name]["beta"]
suffix = f", knot={knot:.2f}"
elif name == "linear":
y_line = design_linear(x_line) @ fits[name]["beta"]
suffix = ""
elif name == "reciprocal":
y_line = design_reciprocal(x_line) @ fits[name]["beta"]
suffix = ""
else:
y_line = design_log(x_line) @ fits[name]["beta"]
suffix = ""
rmse_cv = float(metrics_df.filter(pl.col("model") == name).select("loocv_rmse").item())
ax.plot(
x_line,
y_line,
linewidth=2.2 if name == best_model else 1.6,
alpha=1.0 if name == best_model else 0.8,
color=colors[name],
label=f"{name} (LOOCV RMSE={rmse_cv:.3f}{suffix})",
)
ax.set_title("Section 1 Model Comparison: Draw % vs Average Bears/Day")
ax.set_xlabel("Draw %")
ax.set_ylabel("Average bears/day")
ax.grid(alpha=0.25)
ax.legend()
plt.tight_layout()
plt.show()
Model comparison for draw_pct -> avg_bears_day (lower LOOCV RMSE is better):
| model | r2 | adj_r2 | rmse | aicc | loocv_rmse | knot |
|---|---|---|---|---|---|---|
| str | f64 | f64 | f64 | f64 | f64 | f64 |
| "log" | 0.686 | 0.668 | 7.426 | 84.91 | 8.263 | NaN |
| "reciprocal" | 0.669 | 0.65 | 7.624 | 85.96 | 9.056 | NaN |
| "piecewise" | 0.735 | 0.704 | 6.82 | 84.3 | 9.419 | 5.0 |
| "linear" | 0.568 | 0.544 | 8.703 | 91.25 | 9.686 | NaN |
Best model by LOOCV RMSE: log Linear reference: avg_bears_day = 41.692 + (-2.214) * draw_pct Linear slope interpretation: +1 draw-point corresponds to -2.214 bears/day on average.