McNeil River Guided Lottery Strategy¶

Source: https://www.adfg.alaska.gov/index.cfm?adfg=viewingpermits.mcneil_guided

In [1]:
from __future__ import annotations

from datetime import date, datetime, timezone
from pathlib import Path
import re
import subprocess

import matplotlib.pyplot as plt
import polars as pl


def find_project_root(start: Path) -> Path:
    for candidate in [start, *start.parents]:
        if (candidate / "pyproject.toml").exists():
            return candidate
    return start


PROJECT_ROOT = find_project_root(Path.cwd().resolve())
URL = "https://www.adfg.alaska.gov/index.cfm?adfg=viewingpermits.mcneil_guided"
GROUP_SIZE = 3
CHOICES_PER_PERSON = 4
SEASON_START_MONTH_DAY = (6, 7)
SEASON_END_MONTH_DAY = (8, 25)
CACHE_PATH = PROJECT_ROOT / "data" / "mcneil_guided_daily.csv"
CACHE_META_PATH = PROJECT_ROOT / "data" / "mcneil_guided_daily.meta.txt"

pl.Config.set_tbl_rows(40)
pl.Config.set_tbl_cols(30)
Out[1]:
polars.config.Config
In [2]:
def fetch_html_via_curl(url: str) -> str:
    """Use curl because direct HTTP clients are intermittently blocked by the site WAF."""
    return subprocess.check_output(["curl", "-sL", url], text=True)


def clean_text(value: str) -> str:
    return " ".join(value.replace("\xa0", " " ).split())


def to_int(value: str) -> int | None:
    match = re.search(r"-?\d+", value)
    return int(match.group()) if match else None


def to_float(value: str) -> float | None:
    match = re.search(r"-?\d+(?:\.\d+)?", value)
    return float(match.group()) if match else None


def parse_guided_daily_from_html(html: str) -> pl.DataFrame:
    from bs4 import BeautifulSoup

    soup = BeautifulSoup(html, "html.parser")

    page_text = " ".join(soup.get_text(" ", strip=True).split())
    year_match = re.search(r"(20\d{2})\s+High Tide Time", page_text)
    if not year_match:
        raise ValueError("Could not infer schedule year from ADFG page.")

    year = int(year_match.group(1))
    season_start = date(year, *SEASON_START_MONTH_DAY)
    season_end = date(year, *SEASON_END_MONTH_DAY)

    schedule_table = None
    for table in soup.find_all("table"):
        text = " ".join(table.get_text(" ", strip=True).split())
        if "Time Block Letter" in text and "% winning 5-Yr. Average by time block" in text:
            schedule_table = table
            break

    if schedule_table is None:
        raise ValueError("Could not locate the guided schedule table on the page.")

    rows: list[dict] = []
    active: dict = {}

    for tr in schedule_table.select("tbody tr"):
        cells = tr.find_all("td")
        if not cells:
            continue

        texts = [clean_text(td.get_text(" ", strip=True)) for td in cells]
        first_cell = texts[0]

        if re.fullmatch(r"[A-Z]", first_cell):
            if len(cells) < 12:
                continue
            active = {
                "block": first_cell,
                "permits_available": to_int(texts[4]),
                "draw_pct": to_float(texts[5]),
                "tide_flats_activity": texts[6] or None,
                "sedge_flats_activity": texts[7] or None,
                "mikfik_activity": texts[8] or None,
                "falls_activity": texts[9] or None,
                "lagoon_activity": texts[10] or None,
            }
            date_text, time_cell, height_cell, bears_text = texts[1], cells[2], cells[3], texts[11]
        else:
            if not active or len(cells) < 4:
                continue
            date_text, time_cell, height_cell, bears_text = texts[0], cells[1], cells[2], texts[3]

        if not re.fullmatch(r"\d{1,2}-[A-Za-z]{3}", date_text):
            continue

        obs_date = datetime.strptime(f"{date_text}-{year}", "%d-%b-%Y").date()
        if not (season_start <= obs_date <= season_end):
            continue

        rows.append(
            {
                **active,
                "date": obs_date,
                "avg_bears_day": to_float(bears_text),
                "access_risk_day": ("tg-uc3z" in (time_cell.get("class") or []))
                or ("tg-uc3z" in (height_cell.get("class") or [])),
            }
        )

    if not rows:
        raise ValueError("No guided rows were parsed from the schedule table.")

    return pl.DataFrame(rows).sort(["block", "date"])


def load_mcneil_guided_daily(
    url: str = URL,
    cache_path: Path = CACHE_PATH,
    cache_meta_path: Path = CACHE_META_PATH,
    refresh_cache: bool = False,
) -> pl.DataFrame:
    if cache_path.exists() and not refresh_cache:
        return pl.read_csv(
            cache_path,
            try_parse_dates=True,
            schema_overrides={"access_risk_day": pl.Boolean},
        )

    html = fetch_html_via_curl(url)
    daily_df = parse_guided_daily_from_html(html)

    cache_path.parent.mkdir(parents=True, exist_ok=True)
    daily_df.write_csv(cache_path)

    cache_meta_path.parent.mkdir(parents=True, exist_ok=True)
    cache_meta_path.write_text(
        "\n".join(
            [
                f"fetched_utc={datetime.now(timezone.utc).isoformat()}",
                f"source_url={url}",
                f"rows={daily_df.height}",
            ]
        )
    )

    return daily_df
In [3]:
def summarize_by_block(daily_df: pl.DataFrame, group_size: int = GROUP_SIZE) -> pl.DataFrame:
    return (
        daily_df.group_by("block")
        .agg(
            [
                pl.col("date").min().alias("start_date"),
                pl.col("date").max().alias("end_date"),
                pl.first("permits_available").alias("permits_available"),
                pl.first("win_pct").alias("draw_pct"),
                pl.col("avg_bears_day").mean().round(2).alias("avg_bears_day"),
                pl.col("avg_bears_day").min().alias("min_bears_day"),
                pl.col("avg_bears_day").max().alias("max_bears_day"),
                pl.col("access_risk_day").cast(pl.UInt8).max().alias("has_access_risk_day"),
                pl.first("tide_flats_activity").alias("tide_flats_activity"),
                pl.first("sedge_flats_activity").alias("sedge_flats_activity"),
                pl.first("mikfik_activity").alias("mikfik_activity"),
                pl.first("falls_activity").alias("falls_activity"),
                pl.first("lagoon_activity").alias("lagoon_activity"),
            ]
        )
        .sort("start_date")
        .with_columns([(pl.col("draw_pct") / 100).alias("p_single")])
        .with_columns(
            [
                (1 - (1 - pl.col("p_single")) ** group_size).alias("p_at_least_1_of_3"),
                (
                    3 * pl.col("p_single") ** 2 * (1 - pl.col("p_single"))
                    + pl.col("p_single") ** 3
                ).alias("p_at_least_2_of_3"),
                (pl.col("p_single") ** group_size).alias("p_all_3_of_3"),
                pl.concat_str(
                    [
                        pl.col("start_date").dt.strftime("%b %-d"),
                        pl.lit(" - "),
                        pl.col("end_date").dt.strftime("%b %-d"),
                    ]
                ).alias("window"),
            ]
        )
    )


def rank_windows(
    block_df: pl.DataFrame,
    probability_col: str,
    n_choices: int = 4,
) -> pl.DataFrame:
    return (
        block_df.with_columns(
            [
                (pl.col(probability_col) * pl.col("avg_bears_day")).alias("expected_bears_unadjusted")
            ]
        )
        .with_columns(
            [
                pl.col("expected_bears_unadjusted").alias("score")
            ]
        )
        .sort("score", descending=True)
        .select(
            [
                "block",
                "window",
                "permits_available",
                "draw_pct",
                "avg_bears_day",
                "has_access_risk_day",
                probability_col,
                "expected_bears_unadjusted",
                "score",
            ]
        )
        .head(n_choices)
    )


def build_shared_choice_set(
    block_df: pl.DataFrame,
    probability_col: str,
    choices_per_person: int = CHOICES_PER_PERSON,
) -> pl.DataFrame:
    shared_choices = rank_windows(
        block_df,
        probability_col=probability_col,
        n_choices=choices_per_person,
    ).get_column("block").to_list()

    return pl.DataFrame(
        [
            {
                **{f"choice_{j + 1}": pick for j, pick in enumerate(shared_choices)},
            }
        ]
    )
In [4]:
# Set refresh_cache=True when you want to pull the latest table from ADFG.
daily_df = load_mcneil_guided_daily(refresh_cache=False)
block_df = summarize_by_block(daily_df)

print(f"Daily rows: {daily_df.height}")
print(f"Time blocks: {block_df.height}")
if CACHE_META_PATH.exists():
    print(CACHE_META_PATH.read_text())

block_df.select(["block", "window", "permits_available", "draw_pct", "avg_bears_day", "has_access_risk_day"])
Daily rows: 80
Time blocks: 20
fetched_utc=2026-02-13T08:51:47.145920+00:00
source_url=https://www.adfg.alaska.gov/index.cfm?adfg=viewingpermits.mcneil_guided
rows=80
Out[4]:
shape: (20, 6)
blockwindowpermits_availabledraw_pctavg_bears_dayhas_access_risk_day
strstri64f64f64u8
"A""Jun 7 - Jun 10"1015.014.01
"B""Jun 11 - Jun 14"1010.016.251
"C""Jun 15 - Jun 18"77.017.250
"D""Jun 19 - Jun 22"106.021.01
"E""Jun 23 - Jun 26"75.023.751
"F""Jun 27 - Jun 30"105.028.251
"G""Jul 1 - Jul 4"103.037.750
"H""Jul 5 - Jul 8"103.047.251
"I""Jul 9 - Jul 12"103.049.751
"J""Jul 13 - Jul 16"72.050.00
"K""Jul 17 - Jul 20"103.051.00
"L""Jul 21 - Jul 24"103.038.51
"M""Jul 25 - Jul 28"72.029.01
"N""Jul 29 - Aug 1"104.026.750
"O""Aug 2 - Aug 5"104.021.750
"P""Aug 6 - Aug 9"74.020.251
"Q""Aug 10 - Aug 13"108.018.250
"R""Aug 14 - Aug 17"1011.016.00
"S""Aug 18 - Aug 21"1013.011.751
"T""Aug 22 - Aug 25"1018.09.751

Core Tradeoff¶

Draw odds v. bear volume

Seasonality: Bears Over Time¶

In [5]:
# Seasonal pattern: average bears/day by window over time.
season_df = block_df.select(["block", "start_date", "window", "avg_bears_day", "draw_pct"]).sort("start_date")

dates = season_df.get_column("start_date").to_list()
bears = season_df.get_column("avg_bears_day").to_list()
draw_pct = season_df.get_column("draw_pct").to_list()
blocks = season_df.get_column("block").to_list()

import matplotlib.dates as mdates

fig, ax = plt.subplots(figsize=(14, 6))
ax2 = ax.twinx()

bears_line = ax.plot(
    dates,
    bears,
    marker="o",
    linewidth=2,
    color="#2a9d8f",
    label="Average bears/day",
)
win_line = ax2.plot(
    dates,
    draw_pct,
    marker="s",
    linewidth=2,
    linestyle="--",
    color="#e76f51",
    label="Win %",
)

for d, b, block in zip(dates, bears, blocks):
    ax.annotate(block, (d, b), textcoords="offset points", xytext=(4, 4), fontsize=8)

ax.set_title("Average Bears per Day and Win % by Window Over Time")
ax.set_xlabel("Window start date")
ax.set_ylabel("Average bears/day", color="#2a9d8f")
ax2.set_ylabel("Win %", color="#e76f51")

# Weekly labels plus daily divider lines for easy counting.
ax.xaxis.set_major_locator(mdates.WeekdayLocator(interval=1))
ax.xaxis.set_major_formatter(mdates.DateFormatter("%b %d"))
ax.xaxis.set_minor_locator(mdates.DayLocator(interval=1))
ax.grid(which="minor", axis="x", linestyle="-", linewidth=0.6, alpha=0.18)
ax.grid(which="major", axis="y", linestyle="-", linewidth=0.8, alpha=0.25)

# Shared legend across both axes.
lines = bears_line + win_line
labels = [l.get_label() for l in lines]
ax.legend(lines, labels, loc="upper left")

fig.autofmt_xdate()
plt.tight_layout()
plt.show()
No description has been provided for this image
In [6]:
tradeoff_df = block_df.select(["block", "window", "draw_pct", "avg_bears_day", "has_access_risk_day"])

print("\nTop windows by individual draw %:")
display(tradeoff_df.sort("draw_pct", descending=True).head(6))

print("\nTop windows by average bears/day:")
display(tradeoff_df.sort("avg_bears_day", descending=True).head(6))
Top windows by individual draw %:
shape: (6, 5)
blockwindowdraw_pctavg_bears_dayhas_access_risk_day
strstrf64f64u8
"T""Aug 22 - Aug 25"18.09.751
"A""Jun 7 - Jun 10"15.014.01
"S""Aug 18 - Aug 21"13.011.751
"R""Aug 14 - Aug 17"11.016.00
"B""Jun 11 - Jun 14"10.016.251
"Q""Aug 10 - Aug 13"8.018.250
Top windows by average bears/day:
shape: (6, 5)
blockwindowdraw_pctavg_bears_dayhas_access_risk_day
strstrf64f64u8
"K""Jul 17 - Jul 20"3.051.00
"J""Jul 13 - Jul 16"2.050.00
"I""Jul 9 - Jul 12"3.049.751
"H""Jul 5 - Jul 8"3.047.251
"L""Jul 21 - Jul 24"3.038.51
"G""Jul 1 - Jul 4"3.037.750
In [7]:
# Visual tradeoff: odds vs bears, with label by block.
plot_df = block_df.select(["block", "draw_pct", "avg_bears_day"]).sort("block")
x = plot_df.get_column("draw_pct").to_list()
y = plot_df.get_column("avg_bears_day").to_list()
labels = plot_df.get_column("block").to_list()

fig, ax = plt.subplots(figsize=(14, 7))
sc = ax.scatter(x, y, s=90)
for xi, yi, label in zip(x, y, labels):
    ax.annotate(label, (xi, yi), textcoords="offset points", xytext=(4, 4), fontsize=9)

ax.set_title("Windows: Draw % vs Average Bears/Day")
ax.set_xlabel("ADFG 5-year average draw percentage (individual)")
ax.set_xlim(0, 100)
ax.set_ylabel("Average bears/day viewed")
ax.grid(alpha=0.2)
plt.show()
No description has been provided for this image

Lottery Uncertainty¶

Your group rule is strict: the trip only happens if all 3 can go. The uncertainty is how the lottery mechanics translate into a group success probability per window.

For each window, the notebook defines:

  • p_single: listed individual draw probability (draw_pct / 100)
  • g: group size (GROUP_SIZE = 3)
  • c: Scenario 2 capacity factor (CAPACITY_FACTOR, default 0.85)

We evaluate three plausible lottery-mechanics scenarios using a per-window success probability q_block:

  • Scenario 1: Party-draw model q_block = p_single
  • Scenario 2: One-hit + capacity model q_block = c * (1 - (1 - p_single)^g)
  • Scenario 3: Independent triple-hit model q_block = p_single^g

Interpretation of Scenario 2 (c):

  • c = 1.0: whenever at least one group member gets a hit, capacity is always sufficient for all 3.
  • c < 1.0: only a fraction of those one-hit events convert to a full 3-person trip.

Across your ordered 4-window list, outcomes are modeled as a first-success process:

  • p_trip = Σ_k [q_k * Π_{j<k}(1 - q_j)]
  • xBears = Σ_k [avg_bears_k * q_k * Π_{j<k}(1 - q_j)]

In the objective sections below, we optimize separately under each scenario so you can see how strategy changes.

In [8]:
from itertools import permutations

# Tunable policy knobs for discussion.
CAPACITY_FACTOR = 0.85       # Used only in Scenario 2.
QUALITY_MIN_TRIP_PROB = 0.01 # 1% minimum trip chance floor for quality-first objective.


assumption_specs = [
    {
        "name": "Scenario 1: Party-draw model",
        "q_expr": pl.col("p_single"),
    },
    {
        "name": "Scenario 2: One-hit + capacity model",
        "q_expr": (1 - (1 - pl.col("p_single")) ** GROUP_SIZE) * CAPACITY_FACTOR,
    },
    {
        "name": "Scenario 3: Independent triple-hit model",
        "q_expr": pl.col("p_single") ** GROUP_SIZE,
    },
]


all_blocks = block_df.get_column("block").to_list()
bears_map = dict(zip(block_df.get_column("block").to_list(), block_df.get_column("avg_bears_day").to_list()))


assumption_q_maps: dict[str, dict[str, float]] = {}
for spec in assumption_specs:
    tmp = block_df.select(
        [
            pl.col("block"),
            spec["q_expr"].clip(0.0, 1.0).alias("q"),
        ]
    )
    assumption_q_maps[spec["name"]] = dict(zip(tmp.get_column("block").to_list(), tmp.get_column("q").to_list()))


def evaluate_order_metrics(
    order: tuple[str, ...],
    q_map: dict[str, float],
    bears_map: dict[str, float],
) -> dict:
    p_no_prev = 1.0
    p_trip = 0.0
    expected_bears = 0.0

    for block in order:
        q = q_map[block]
        bears = bears_map[block]
        p_this = p_no_prev * q
        p_trip += p_this
        expected_bears += p_this * bears

        p_no_prev *= (1 - q)

    expected_bears_if_trip = expected_bears / p_trip if p_trip > 0 else 0.0

    return {
        "p_trip": p_trip,
        "expected_bears": expected_bears,
        "expected_bears_if_trip": expected_bears_if_trip,
    }


def optimize_order_for_assumption(
    objective: str,
    q_map: dict[str, float],
    quality_min_trip_prob: float = QUALITY_MIN_TRIP_PROB,
) -> tuple[tuple[str, ...], dict, bool]:
    best_order = None
    best_metrics = None
    best_score = float("-inf")

    for order in permutations(all_blocks, CHOICES_PER_PERSON):
        metrics = evaluate_order_metrics(order, q_map, bears_map)

        if objective == "access":
            score = metrics["p_trip"]
        elif objective == "value":
            score = metrics["expected_bears"]
        elif objective == "quality_with_floor":
            if metrics["p_trip"] < quality_min_trip_prob:
                continue
            score = metrics["expected_bears_if_trip"]
        else:
            raise ValueError(f"Unknown objective: {objective}")

        if score > best_score:
            best_score = score
            best_order = order
            best_metrics = metrics

    used_floor = True
    if objective == "quality_with_floor" and best_order is None:
        # Fallback if the floor is infeasible under this assumption.
        used_floor = False
        best_order, best_metrics, _ = optimize_order_for_assumption(
            objective="quality_with_floor_fallback",
            q_map=q_map,
            quality_min_trip_prob=0.0,
        )

    if objective == "quality_with_floor_fallback":
        # Internal fallback behaves like pure quality objective.
        best_order = None
        best_metrics = None
        best_score = float("-inf")
        for order in permutations(all_blocks, CHOICES_PER_PERSON):
            metrics = evaluate_order_metrics(order, q_map, bears_map)
            score = metrics["expected_bears_if_trip"]
            if score > best_score:
                best_score = score
                best_order = order
                best_metrics = metrics
        return best_order, best_metrics, False

    return best_order, best_metrics, used_floor


def summarize_objective_by_assumption(
    objective: str,
    objective_label: str,
    quality_min_trip_prob: float = QUALITY_MIN_TRIP_PROB,
) -> pl.DataFrame:
    rows = []

    for spec in assumption_specs:
        assumption_name = spec["name"]
        q_map = assumption_q_maps[assumption_name]

        order, metrics, used_floor = optimize_order_for_assumption(
            objective=objective,
            q_map=q_map,
            quality_min_trip_prob=quality_min_trip_prob,
        )

        row = {
            "assumption": assumption_name,
            "objective": objective_label,
            "choice_1": order[0],
            "choice_2": order[1],
            "choice_3": order[2],
            "choice_4": order[3],
            "trip_chance_pct": metrics["p_trip"] * 100,
            "expected_bears_per_application": metrics["expected_bears"],
            "expected_bears_if_trip_happens": metrics["expected_bears_if_trip"],
        }
        if objective == "quality_with_floor":
            row["quality_floor_applied"] = used_floor

        rows.append(row)

    out = pl.DataFrame(rows).with_columns(
        [
            pl.col("trip_chance_pct").round(2),
            pl.col("expected_bears_per_application").round(3),
            pl.col("expected_bears_if_trip_happens").round(3),
        ]
    )
    return out


def optimize_robust_strategy_across_assumptions() -> tuple[tuple[str, ...], pl.DataFrame]:
    best_order = None
    best_min_score = float("-inf")
    best_tiebreak = float("-inf")

    for order in permutations(all_blocks, CHOICES_PER_PERSON):
        per_assumption = []
        for spec in assumption_specs:
            q_map = assumption_q_maps[spec["name"]]
            metrics = evaluate_order_metrics(order, q_map, bears_map)
            per_assumption.append(metrics)

        min_expected_bears = min(x["expected_bears"] for x in per_assumption)
        min_trip_chance = min(x["p_trip"] for x in per_assumption)

        if (min_expected_bears > best_min_score) or (
            min_expected_bears == best_min_score and min_trip_chance > best_tiebreak
        ):
            best_min_score = min_expected_bears
            best_tiebreak = min_trip_chance
            best_order = order

    rows = []
    for spec in assumption_specs:
        q_map = assumption_q_maps[spec["name"]]
        metrics = evaluate_order_metrics(best_order, q_map, bears_map)
        rows.append(
            {
                "assumption": spec["name"],
                "choice_1": best_order[0],
                "choice_2": best_order[1],
                "choice_3": best_order[2],
                "choice_4": best_order[3],
                "trip_chance_pct": metrics["p_trip"] * 100,
                "expected_bears_per_application": metrics["expected_bears"],
                "expected_bears_if_trip_happens": metrics["expected_bears_if_trip"],
                }
        )

    robust_df = pl.DataFrame(rows).with_columns(
        [
            pl.col("trip_chance_pct").round(2),
            pl.col("expected_bears_per_application").round(3),
            pl.col("expected_bears_if_trip_happens").round(3),
        ]
    )

    return best_order, robust_df

Expected Bears Over Time¶

This chart shows expected bears seen per application window through the season under each lottery scenario (one series per scenario).

In [9]:
# Opening comparison: expected bears per window across lottery assumptions.
scenario_expected_df = block_df.select(["block", "start_date", "window", "avg_bears_day", "p_single"]).sort("start_date")

for spec in assumption_specs:
    scenario_expected_df = scenario_expected_df.with_columns(
        (spec["q_expr"].clip(0.0, 1.0) * pl.col("avg_bears_day")).alias(spec["name"])
    )

scenario_names = [spec["name"] for spec in assumption_specs]
scenario3_name = "Scenario 3: Independent triple-hit model"
primary_names = [name for name in scenario_names if name != scenario3_name]

fig, ax_left = plt.subplots(figsize=(16, 8))
ax_right = ax_left.twinx()

dates = scenario_expected_df.get_column("start_date").to_list()
window_labels = [
    f"{block} ({window})"
    for block, window in zip(
        scenario_expected_df.get_column("block").to_list(),
        scenario_expected_df.get_column("window").to_list(),
    )
]

primary_colors = ["#1f77b4", "#2ca02c"]
for i, name in enumerate(primary_names):
    ax_left.plot(
        dates,
        scenario_expected_df.get_column(name).to_list(),
        marker="o",
        linewidth=2,
        color=primary_colors[i % len(primary_colors)],
        label=name,
    )

ax_right.plot(
    dates,
    scenario_expected_df.get_column(scenario3_name).to_list(),
    marker="s",
    linewidth=2,
    linestyle="--",
    color="#d62728",
    label=f"{scenario3_name} (right axis)",
)

ax_left.set_title("Expected Bears Seen Over Time by Lottery Scenario")
ax_left.set_xlabel("Window start date")
ax_left.set_ylabel("Expected bears per application window (Scenarios 1-2)")
ax_right.set_ylabel("Expected bears per application window (Scenario 3)")

ax_left.set_xticks(dates)
ax_left.set_xticklabels(window_labels, rotation=45, ha="right", fontsize=8)
ax_left.grid(alpha=0.25)

h1, l1 = ax_left.get_legend_handles_labels()
h2, l2 = ax_right.get_legend_handles_labels()
ax_left.legend(h1 + h2, l1 + l2, loc="upper right")

plt.tight_layout()
plt.show()
No description has been provided for this image

Expected Bears by Scenario and Window¶

This heatmap shows expected bears per application window (columns in date order) for each lottery scenario (rows).

In [10]:
# Heatmap view with separate scale for Scenario 3 due lower magnitude.
if "scenario_expected_df" not in globals():
    scenario_expected_df = block_df.select(["block", "start_date", "window", "avg_bears_day", "p_single"]).sort("start_date")
    for spec in assumption_specs:
        scenario_expected_df = scenario_expected_df.with_columns(
            (spec["q_expr"].clip(0.0, 1.0) * pl.col("avg_bears_day")).alias(spec["name"])
        )

scenario_names = [spec["name"] for spec in assumption_specs]
scenario3_name = "Scenario 3: Independent triple-hit model"
primary_names = [name for name in scenario_names if name != scenario3_name]

window_labels = [
    f"{block} ({window})"
    for block, window in zip(
        scenario_expected_df.get_column("block").to_list(),
        scenario_expected_df.get_column("window").to_list(),
    )
]

primary_values = [scenario_expected_df.get_column(name).to_list() for name in primary_names]
scenario3_values = [scenario_expected_df.get_column(scenario3_name).to_list()]

fig, (ax_top, ax_bottom) = plt.subplots(
    2,
    1,
    figsize=(max(14, len(window_labels) * 0.7), 7.5),
    gridspec_kw={"height_ratios": [2, 1]},
)

im_top = ax_top.imshow(primary_values, aspect="auto", cmap="YlGn", interpolation="nearest")
ax_top.set_title("Expected Bears Heatmap (Scenarios 1-2, shared scale)")
ax_top.set_ylabel("Lottery scenario")
ax_top.set_yticks(range(len(primary_names)))
ax_top.set_yticklabels(primary_names)
ax_top.set_xticks(range(len(window_labels)))
ax_top.set_xticklabels([])

threshold_top = max(max(row) for row in primary_values) * 0.55
for r, row in enumerate(primary_values):
    for c, value in enumerate(row):
        ax_top.text(
            c,
            r,
            f"{value:.2f}",
            ha="center",
            va="center",
            fontsize=7,
            color="white" if value >= threshold_top else "black",
        )

cbar_top = fig.colorbar(im_top, ax=ax_top)
cbar_top.set_label("Expected bears per application window (Scenarios 1-2)")

im_bottom = ax_bottom.imshow(scenario3_values, aspect="auto", cmap="YlOrRd", interpolation="nearest")
ax_bottom.set_title("Expected Bears Heatmap (Scenario 3, separate scale)")
ax_bottom.set_xlabel("Window (chronological)")
ax_bottom.set_ylabel("Lottery scenario")
ax_bottom.set_yticks([0])
ax_bottom.set_yticklabels([scenario3_name])
ax_bottom.set_xticks(range(len(window_labels)))
ax_bottom.set_xticklabels(window_labels, rotation=45, ha="right", fontsize=8)

threshold_bottom = max(max(row) for row in scenario3_values) * 0.55
for c, value in enumerate(scenario3_values[0]):
    ax_bottom.text(
        c,
        0,
        f"{value:.3f}",
        ha="center",
        va="center",
        fontsize=7,
        color="white" if value >= threshold_bottom else "black",
    )

cbar_bottom = fig.colorbar(im_bottom, ax=ax_bottom)
cbar_bottom.set_label("Expected bears per application window (Scenario 3)")

plt.tight_layout()
plt.show()
No description has been provided for this image
In [11]:
# Precompute objective outputs once for semantic summary and appendix.
objective_1_results = summarize_objective_by_assumption(
    objective="access",
    objective_label="Maximize Trip Chance",
)
objective_2_results = summarize_objective_by_assumption(
    objective="value",
    objective_label="Maximize Overall Bear Value",
)
objective_3_results = summarize_objective_by_assumption(
    objective="quality_with_floor",
    objective_label="Maximize Quality with Trip-Chance Floor",
    quality_min_trip_prob=QUALITY_MIN_TRIP_PROB,
)
robust_order, objective_4_results = optimize_robust_strategy_across_assumptions()

window_map = dict(zip(block_df.get_column("block").to_list(), block_df.get_column("window").to_list()))

Basic Strategies¶

  • Safety-first: maximize your odds of getting any trip.
  • Balanced: target strong overall value.
  • Quality-first: prioritize the best viewing experience when the trip happens.

Each card below gives the exact 4 windows to submit (with dates), plus the likely range across lottery scenarios.

In [12]:
from IPython.display import Markdown


def order_from_assumption(df: pl.DataFrame, assumption_name: str) -> tuple[str, ...]:
    row = df.filter(pl.col("assumption") == assumption_name).row(0, named=True)
    return (row["choice_1"], row["choice_2"], row["choice_3"], row["choice_4"])


def evaluate_strategy_across_assumptions(order: tuple[str, ...]) -> pl.DataFrame:
    rows = []
    for spec in assumption_specs:
        m = evaluate_order_metrics(order, assumption_q_maps[spec["name"]], bears_map)
        rows.append(
            {
                "assumption": spec["name"],
                "trip_chance_pct": m["p_trip"] * 100,
                "xBears": m["expected_bears"],
                "xBears_if_go": m["expected_bears_if_trip"],
            }
        )
    return pl.DataFrame(rows)


SCENARIO_DEFAULT = "Scenario 1: Party-draw model"

strategy_specs = [
    {
        "strategy": "Safety-first",
        "why": "Best if the group cares most about maximizing the chance of getting any trip.",
        "order": tuple(robust_order),
    },
    {
        "strategy": "Balanced",
        "why": "Best if the group wants strong all-around value rather than extreme odds or extreme quality.",
        "order": order_from_assumption(objective_2_results, SCENARIO_DEFAULT),
    },
    {
        "strategy": "Quality-first",
        "why": "Best if the group prioritizes the best bear viewing quality when the trip succeeds.",
        "order": order_from_assumption(objective_3_results, SCENARIO_DEFAULT),
    },
]

plot_rows = []

for spec in strategy_specs:
    order = spec["order"]
    eval_df = evaluate_strategy_across_assumptions(order)

    trip_vals = eval_df.get_column("trip_chance_pct").to_list()
    quality_vals = eval_df.get_column("xBears_if_go").to_list()

    trip_min, trip_max = min(trip_vals), max(trip_vals)
    quality_min, quality_max = min(quality_vals), max(quality_vals)
    trip_mid = (trip_min + trip_max) / 2
    quality_mid = (quality_min + quality_max) / 2

    lines = [
        f"### {spec['strategy']}",
        spec["why"],
        "",
        "**Recommended window order**",
        f"1. `{order[0]}` — {window_map[order[0]]}",
        f"2. `{order[1]}` — {window_map[order[1]]}",
        f"3. `{order[2]}` — {window_map[order[2]]}",
        f"4. `{order[3]}` — {window_map[order[3]]}",
        "",
        f"**Chance of going (range):** {trip_min:.2f}% to {trip_max:.2f}%",
        f"**Expected bears if you go (range):** {quality_min:.2f} to {quality_max:.2f}",
    ]
    display(Markdown("\n".join(lines)))

    plot_rows.append(
        {
            "strategy": spec["strategy"],
            "trip_mid": trip_mid,
            "trip_min": trip_min,
            "trip_max": trip_max,
            "quality_mid": quality_mid,
            "quality_min": quality_min,
            "quality_max": quality_max,
        }
    )

plot_df = pl.DataFrame(plot_rows)

fig, ax = plt.subplots(figsize=(10, 7))
colors = {
    "Safety-first": "#4C78A8",
    "Balanced": "#F58518",
    "Quality-first": "#54A24B",
}

for row in plot_df.iter_rows(named=True):
    x = row["trip_mid"]
    y = row["quality_mid"]
    xerr = [[x - row["trip_min"]], [row["trip_max"] - x]]
    yerr = [[y - row["quality_min"]], [row["quality_max"] - y]]

    ax.errorbar(
        x,
        y,
        xerr=xerr,
        yerr=yerr,
        fmt="o",
        capsize=6,
        markersize=9,
        color=colors[row["strategy"]],
        label=row["strategy"],
    )
    ax.annotate(row["strategy"], (x, y), textcoords="offset points", xytext=(6, 6), fontsize=10)

ax.set_title("Strategy Map: Chance of Going vs Quality-if-Go (ranges across scenarios)")
ax.set_xlabel("Chance trip happens (%)")
ax.set_ylabel("Expected bears if trip happens")
ax.grid(alpha=0.25)

# De-duplicate legend entries.
handles, labels = ax.get_legend_handles_labels()
seen = set()
uniq_handles, uniq_labels = [], []
for h, l in zip(handles, labels):
    if l not in seen:
        seen.add(l)
        uniq_handles.append(h)
        uniq_labels.append(l)
ax.legend(uniq_handles, uniq_labels)

plt.show()

Safety-first¶

Best if the group cares most about maximizing the chance of getting any trip.

Recommended window order

  1. R — Aug 14 - Aug 17
  2. A — Jun 7 - Jun 10
  3. S — Aug 18 - Aug 21
  4. T — Aug 22 - Aug 25

Chance of going (range): 1.27% to 77.89% Expected bears if you go (range): 11.88 to 13.48

Balanced¶

Best if the group wants strong all-around value rather than extreme odds or extreme quality.

Recommended window order

  1. K — Jul 17 - Jul 20
  2. I — Jul 9 - Jul 12
  3. R — Aug 14 - Aug 17
  4. A — Jun 7 - Jun 10

Chance of going (range): 0.48% to 56.85% Expected bears if you go (range): 14.97 to 23.91

Quality-first¶

Best if the group prioritizes the best bear viewing quality when the trip succeeds.

Recommended window order

  1. K — Jul 17 - Jul 20
  2. J — Jul 13 - Jul 16
  3. I — Jul 9 - Jul 12
  4. H — Jul 5 - Jul 8

Chance of going (range): 0.01% to 24.62% Expected bears if you go (range): 49.39 to 49.56

No description has been provided for this image

Appendix¶

Appendix A1: Detailed Output — Max Trip Chance¶

In [13]:
print("Detailed output: Objective 1 by lottery scenario")
display(objective_1_results)
Detailed output: Objective 1 by lottery scenario
shape: (3, 9)
assumptionobjectivechoice_1choice_2choice_3choice_4trip_chance_pctexpected_bears_per_applicationexpected_bears_if_trip_happens
strstrstrstrstrstrf64f64f64
"Scenario 1: Party-draw model""Maximize Trip Chance""A""R""S""T"46.035.90712.832
"Scenario 2: One-hit + capacity…"Maximize Trip Chance""A""T""R""S"77.899.82112.608
"Scenario 3: Independent triple…"Maximize Trip Chance""A""R""S""T"1.270.15111.88

Appendix A2: Detailed Output — Max Overall Bear Value¶

In [14]:
print("Detailed output: Objective 2 by lottery scenario")
display(objective_2_results)
Detailed output: Objective 2 by lottery scenario
shape: (3, 9)
assumptionobjectivechoice_1choice_2choice_3choice_4trip_chance_pctexpected_bears_per_applicationexpected_bears_if_trip_happens
strstrstrstrstrstrf64f64f64
"Scenario 1: Party-draw model""Maximize Overall Bear Value""K""I""R""A"28.826.39222.179
"Scenario 2: One-hit + capacity…"Maximize Overall Bear Value""K""I""H""A"46.6813.85429.678
"Scenario 3: Independent triple…"Maximize Overall Bear Value""R""A""S""T"1.270.15111.881

Appendix A3: Detailed Output — Max Quality with Chance Floor¶

In [15]:
print("Detailed output: Objective 3 by lottery scenario")
display(objective_3_results)
Detailed output: Objective 3 by lottery scenario
shape: (3, 10)
assumptionobjectivechoice_1choice_2choice_3choice_4trip_chance_pctexpected_bears_per_applicationexpected_bears_if_trip_happensquality_floor_applied
strstrstrstrstrstrf64f64f64bool
"Scenario 1: Party-draw model""Maximize Quality with Trip-Cha…"K""J""I""H"10.565.22649.496true
"Scenario 2: One-hit + capacity…"Maximize Quality with Trip-Cha…"K""J""I""H"24.6212.20349.56true
"Scenario 3: Independent triple…"Maximize Quality with Trip-Cha…"B""R""A""T"1.150.14112.283true

Appendix A4: Detailed Output — Robust Across Lottery Scenarios¶

In [16]:
print("Detailed output: Objective 4 robust strategy")
print("Robust window order:", robust_order)
display(objective_4_results)
Detailed output: Objective 4 robust strategy
Robust window order: ('R', 'A', 'S', 'T')
shape: (3, 8)
assumptionchoice_1choice_2choice_3choice_4trip_chance_pctexpected_bears_per_applicationexpected_bears_if_trip_happens
strstrstrstrstrf64f64f64
"Scenario 1: Party-draw model""R""A""S""T"46.035.9412.903
"Scenario 2: One-hit + capacity…"R""A""S""T"77.8910.49913.478
"Scenario 3: Independent triple…"R""A""S""T"1.270.15111.881

Appendix A5: Full Objective-Scenario Pair Visualization¶

In [17]:
objective_pair_comparison = pl.concat(
    [
        objective_1_results.with_columns(pl.lit("Objective 1: Max Trip Chance").alias("objective")),
        objective_2_results.with_columns(pl.lit("Objective 2: Max Overall Bear Value").alias("objective")),
        objective_3_results.with_columns(pl.lit("Objective 3: Max Quality w/ Chance Floor").alias("objective")),
        objective_4_results.with_columns(pl.lit("Objective 4: Robust Across Scenarios").alias("objective")),
    ],
    how="diagonal_relaxed",
).with_columns(
    [
        pl.col("expected_bears_per_application").alias("xBears"),
        pl.col("expected_bears_if_trip_happens").alias("xBears_if_go"),
    ]
)

objective_name_map = {
    "Objective 1: Max Trip Chance": "Max Trip Chance",
    "Objective 2: Max Overall Bear Value": "Max Overall Bear Value",
    "Objective 3: Max Quality w/ Chance Floor": "Max Quality with Chance Floor",
    "Objective 4: Robust Across Scenarios": "Robust Across Lottery Scenarios",
}

assumption_name_map = {
    "Scenario 1: Party-draw model": "Party-draw model",
    "Scenario 2: One-hit + capacity model": "One-hit + capacity model",
    "Scenario 3: Independent triple-hit model": "Independent triple-hit model",
}

plot_df = objective_pair_comparison.with_columns(
    [
        pl.col("objective").replace_strict(objective_name_map).alias("objective_name"),
        pl.col("assumption").replace_strict(assumption_name_map).alias("assumption_name"),
    ]
).with_columns(
    [
        (pl.col("objective_name") + pl.lit(" | ") + pl.col("assumption_name")).alias("pair_label")
    ]
)

obj_colors = {
    "Max Trip Chance": "#4C78A8",
    "Max Overall Bear Value": "#F58518",
    "Max Quality with Chance Floor": "#54A24B",
    "Robust Across Lottery Scenarios": "#B279A2",
}

value_df = plot_df.sort("xBears", descending=True)
quality_df = plot_df.sort("xBears_if_go", descending=True)

value_labels = value_df.get_column("pair_label").to_list()
quality_labels = quality_df.get_column("pair_label").to_list()
value_vals = value_df.get_column("xBears").to_list()
quality_vals = quality_df.get_column("xBears_if_go").to_list()
value_cols = [obj_colors[x] for x in value_df.get_column("objective_name").to_list()]
quality_cols = [obj_colors[x] for x in quality_df.get_column("objective_name").to_list()]

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20, 10), sharey=False)

ax1.barh(value_labels, value_vals, color=value_cols)
ax1.invert_yaxis()
ax1.set_title("Overall Value Ranking (xBears)")
ax1.set_xlabel("Expected bears per application")
ax1.grid(axis="x", alpha=0.25)

ax2.barh(quality_labels, quality_vals, color=quality_cols)
ax2.invert_yaxis()
ax2.set_title("Quality-if-Go Ranking (xBears | go)")
ax2.set_xlabel("Expected bears if trip happens")
ax2.grid(axis="x", alpha=0.25)

from matplotlib.patches import Patch
handles = [Patch(color=c, label=o) for o, c in obj_colors.items()]
fig.legend(handles=handles, loc="lower center", ncol=2, title="Objective")
fig.suptitle("Objective-Scenario Pair Comparison", fontsize=15)
fig.tight_layout(rect=[0, 0.08, 1, 0.98])
plt.show()

best_value = value_df.row(0, named=True)
best_quality = quality_df.row(0, named=True)
print(f"Top xBears pair: {best_value['pair_label']} ({best_value['xBears']:.3f})")
print(f"Top xBears | go pair: {best_quality['pair_label']} ({best_quality['xBears_if_go']:.3f})")

window_map = dict(zip(block_df.get_column("block").to_list(), block_df.get_column("window").to_list()))
pair_detail_df = plot_df.select(
    [
        "objective_name",
        "assumption_name",
        "choice_1",
        "choice_2",
        "choice_3",
        "choice_4",
    ]
).sort(["objective_name", "assumption_name"])

print("\nOptimal window picks (with dates) for each objective-scenario pair:")

try:
    from IPython.display import Markdown
    markdown_available = True
except Exception:
    markdown_available = False

for row in pair_detail_df.iter_rows(named=True):
    lines = [
        f"### {row['objective_name']} | {row['assumption_name']}",
        f"1. `{row['choice_1']}` — {window_map[row['choice_1']]}",
        f"2. `{row['choice_2']}` — {window_map[row['choice_2']]}",
        f"3. `{row['choice_3']}` — {window_map[row['choice_3']]}",
        f"4. `{row['choice_4']}` — {window_map[row['choice_4']]}",
    ]
    block = "\n".join(lines)

    if markdown_available:
        display(Markdown(block))
    else:
        print(block)
        print()
No description has been provided for this image
Top xBears pair: Max Overall Bear Value | One-hit + capacity model (13.854)
Top xBears | go pair: Max Quality with Chance Floor | One-hit + capacity model (49.560)

Optimal window picks (with dates) for each objective-scenario pair:

Max Overall Bear Value | Independent triple-hit model¶

  1. R — Aug 14 - Aug 17
  2. A — Jun 7 - Jun 10
  3. S — Aug 18 - Aug 21
  4. T — Aug 22 - Aug 25

Max Overall Bear Value | One-hit + capacity model¶

  1. K — Jul 17 - Jul 20
  2. I — Jul 9 - Jul 12
  3. H — Jul 5 - Jul 8
  4. A — Jun 7 - Jun 10

Max Overall Bear Value | Party-draw model¶

  1. K — Jul 17 - Jul 20
  2. I — Jul 9 - Jul 12
  3. R — Aug 14 - Aug 17
  4. A — Jun 7 - Jun 10

Max Quality with Chance Floor | Independent triple-hit model¶

  1. B — Jun 11 - Jun 14
  2. R — Aug 14 - Aug 17
  3. A — Jun 7 - Jun 10
  4. T — Aug 22 - Aug 25

Max Quality with Chance Floor | One-hit + capacity model¶

  1. K — Jul 17 - Jul 20
  2. J — Jul 13 - Jul 16
  3. I — Jul 9 - Jul 12
  4. H — Jul 5 - Jul 8

Max Quality with Chance Floor | Party-draw model¶

  1. K — Jul 17 - Jul 20
  2. J — Jul 13 - Jul 16
  3. I — Jul 9 - Jul 12
  4. H — Jul 5 - Jul 8

Max Trip Chance | Independent triple-hit model¶

  1. A — Jun 7 - Jun 10
  2. R — Aug 14 - Aug 17
  3. S — Aug 18 - Aug 21
  4. T — Aug 22 - Aug 25

Max Trip Chance | One-hit + capacity model¶

  1. A — Jun 7 - Jun 10
  2. T — Aug 22 - Aug 25
  3. R — Aug 14 - Aug 17
  4. S — Aug 18 - Aug 21

Max Trip Chance | Party-draw model¶

  1. A — Jun 7 - Jun 10
  2. R — Aug 14 - Aug 17
  3. S — Aug 18 - Aug 21
  4. T — Aug 22 - Aug 25

Robust Across Lottery Scenarios | Independent triple-hit model¶

  1. R — Aug 14 - Aug 17
  2. A — Jun 7 - Jun 10
  3. S — Aug 18 - Aug 21
  4. T — Aug 22 - Aug 25

Robust Across Lottery Scenarios | One-hit + capacity model¶

  1. R — Aug 14 - Aug 17
  2. A — Jun 7 - Jun 10
  3. S — Aug 18 - Aug 21
  4. T — Aug 22 - Aug 25

Robust Across Lottery Scenarios | Party-draw model¶

  1. R — Aug 14 - Aug 17
  2. A — Jun 7 - Jun 10
  3. S — Aug 18 - Aug 21
  4. T — Aug 22 - Aug 25

Appendix A6: How Sensitive Are Results to Key Assumptions?¶

This section stress-tests one uncertain knob:

  • Minimum trip-chance floor in the quality objective (how strict to be about odds before optimizing quality).

Goal: check whether recommended windows are stable, or whether small floor changes cause recommendation flips.

In [18]:
import numpy as np
from IPython.display import Markdown

if "all_orders" not in globals():
    all_orders = list(permutations(all_blocks, CHOICES_PER_PERSON))


def optimize_order_with_knobs(
    objective: str,
    q_map: dict[str, float],
    quality_min_trip_prob: float = QUALITY_MIN_TRIP_PROB,
    tie_eps: float = 1e-12,
) -> tuple[tuple[str, ...], dict, bool]:
    best_order = None
    best_metrics = None
    best_score = float("-inf")

    def objective_score(metrics: dict) -> float:
        if objective == "access":
            return metrics["p_trip"]
        if objective == "value":
            return metrics["expected_bears"]
        if objective in {"quality", "quality_with_floor"}:
            return metrics["expected_bears_if_trip"]
        raise ValueError(f"Unknown objective: {objective}")

    for order in all_orders:
        metrics = evaluate_order_metrics(order, q_map, bears_map)

        if objective == "quality_with_floor" and metrics["p_trip"] < quality_min_trip_prob:
            continue

        score = objective_score(metrics)

        should_take = False
        if score > best_score + tie_eps:
            should_take = True
        elif abs(score - best_score) <= tie_eps and best_metrics is not None:
            # Stable tie-break: prefer better total expected value, then lexicographically smaller order.
            if metrics["expected_bears"] > best_metrics["expected_bears"] + tie_eps:
                should_take = True
            elif abs(metrics["expected_bears"] - best_metrics["expected_bears"]) <= tie_eps and order < best_order:
                should_take = True

        if best_order is None:
            should_take = True

        if should_take:
            best_score = score
            best_order = order
            best_metrics = metrics

    used_floor = objective == "quality_with_floor"
    if objective == "quality_with_floor" and best_order is None:
        # If the chance floor is infeasible, fallback to pure quality.
        best_order, best_metrics, _ = optimize_order_with_knobs(
            objective="quality",
            q_map=q_map,
            quality_min_trip_prob=0.0,
            tie_eps=tie_eps,
        )
        used_floor = False

    return best_order, best_metrics, used_floor


def find_order_switches(df: pl.DataFrame, group_col: str, param_col: str) -> pl.DataFrame:
    rows = []
    for key in df.get_column(group_col).unique().to_list():
        subset = df.filter(pl.col(group_col) == key).sort(param_col)
        prev_order = None
        prev_param = None

        for row in subset.iter_rows(named=True):
            order = (row["choice_1"], row["choice_2"], row["choice_3"], row["choice_4"])
            if prev_order is None:
                prev_order = order
                prev_param = row[param_col]
                continue

            if order != prev_order:
                rows.append(
                    {
                        group_col: key,
                        "from_" + param_col: prev_param,
                        "switch_at_" + param_col: row[param_col],
                        "old_order": " > ".join(prev_order),
                        "new_order": " > ".join(order),
                    }
                )
                prev_order = order
                prev_param = row[param_col]

    if not rows:
        return pl.DataFrame(
            schema={
                group_col: pl.Utf8,
                "from_" + param_col: pl.Float64,
                "switch_at_" + param_col: pl.Float64,
                "old_order": pl.Utf8,
                "new_order": pl.Utf8,
            }
        )

    return pl.DataFrame(rows)


assumption_short = {
    "Scenario 1: Party-draw model": "Party-draw",
    "Scenario 2: One-hit + capacity model": "One-hit+capacity",
    "Scenario 3: Independent triple-hit model": "Triple-hit",
}

floor_grid = [0.005, 0.01, 0.02, 0.03, 0.05, 0.08]
floor_rows = []

for spec in assumption_specs:
    name = spec["name"]
    q_map = assumption_q_maps[name]
    for floor in floor_grid:
        order, metrics, used_floor = optimize_order_with_knobs(
            objective="quality_with_floor",
            q_map=q_map,
            quality_min_trip_prob=floor,
        )
        floor_rows.append(
            {
                "assumption": name,
                "quality_floor": floor,
                "choice_1": order[0],
                "choice_2": order[1],
                "choice_3": order[2],
                "choice_4": order[3],
                "order_label": " > ".join(order),
                "trip_chance_pct": metrics["p_trip"] * 100,
                "xBears": metrics["expected_bears"],
                "xBears_if_go": metrics["expected_bears_if_trip"],
                "quality_floor_applied": used_floor,
            }
        )

floor_sensitivity_df = pl.DataFrame(floor_rows).with_columns(
    [
        pl.col("trip_chance_pct").round(2),
        pl.col("xBears").round(3),
        pl.col("xBears_if_go").round(3),
    ]
)

floor_switches_df = find_order_switches(floor_sensitivity_df, group_col="assumption", param_col="quality_floor")

floor_stability_df = floor_sensitivity_df.group_by("assumption").agg(
    [
        pl.col("order_label").n_unique().alias("distinct_optimal_orders"),
        pl.col("trip_chance_pct").min().alias("min_trip_chance_pct"),
        pl.col("trip_chance_pct").max().alias("max_trip_chance_pct"),
        pl.col("xBears_if_go").min().alias("min_quality_if_go"),
        pl.col("xBears_if_go").max().alias("max_quality_if_go"),
    ]
).with_columns(
    [
        pl.col("min_trip_chance_pct").round(2),
        pl.col("max_trip_chance_pct").round(2),
        pl.col("min_quality_if_go").round(3),
        pl.col("max_quality_if_go").round(3),
    ]
).sort("assumption")

print("Quality objective sensitivity (varying minimum trip-chance floor):")
display(floor_stability_df)

if floor_switches_df.height > 0:
    print("\nOrder switch points while varying quality floor:")
    display(floor_switches_df.sort(["assumption", "switch_at_quality_floor"]))
else:
    print("\nNo order switch points found for the tested quality-floor range.")

fig, ax = plt.subplots(1, 1, figsize=(8, 5))

for name in [spec["name"] for spec in assumption_specs]:
    subset = floor_sensitivity_df.filter(pl.col("assumption") == name).sort("quality_floor")
    ax.plot(
        subset.get_column("quality_floor").to_list(),
        subset.get_column("xBears_if_go").to_list(),
        marker="o",
        linewidth=2,
        label=assumption_short[name],
    )

ax.set_title("Quality Objective: Quality-if-Go vs Chance Floor")
ax.set_xlabel("Minimum trip-chance floor")
ax.set_ylabel("Expected bears if trip happens")
ax.grid(alpha=0.25)
ax.legend()

plt.tight_layout()
plt.show()

summary_lines = ["Sensitivity takeaway:"]
for row in floor_stability_df.iter_rows(named=True):
    summary_lines.append(
        f"- {assumption_short[row['assumption']]} (quality objective): "
        f"{row['distinct_optimal_orders']} optimal order(s) across tested chance floors."
    )

display(Markdown("\n".join(summary_lines)))
Quality objective sensitivity (varying minimum trip-chance floor):
shape: (3, 6)
assumptiondistinct_optimal_ordersmin_trip_chance_pctmax_trip_chance_pctmin_quality_if_gomax_quality_if_go
stru32f64f64f64f64
"Scenario 1: Party-draw model"110.5610.5649.49649.496
"Scenario 2: One-hit + capacity…124.6224.6249.5649.56
"Scenario 3: Independent triple…30.011.1512.28349.393
Order switch points while varying quality floor:
shape: (2, 5)
assumptionfrom_quality_floorswitch_at_quality_floorold_ordernew_order
strf64f64strstr
"Scenario 3: Independent triple…0.0050.01"F > Q > R > A""B > R > A > T"
"Scenario 3: Independent triple…0.010.02"B > R > A > T""K > J > I > H"
No description has been provided for this image

Sensitivity takeaway:

  • Party-draw (quality objective): 1 optimal order(s) across tested chance floors.
  • One-hit+capacity (quality objective): 1 optimal order(s) across tested chance floors.
  • Triple-hit (quality objective): 3 optimal order(s) across tested chance floors.

Appendix A7: Which Recommended Strategies Are Strictly Inferior?¶

A strategy is marked dominated if there exists another 4-window order in the same scenario with:

  • equal or better xBears, and
  • equal or better xBears | go,
  • with at least one strictly better.

This checks whether any recommended strategy is clearly beaten on both overall value and quality-conditional value.

In [19]:
if "all_orders" not in globals():
    all_orders = list(permutations(all_blocks, CHOICES_PER_PERSON))


def evaluate_all_orders_for_assumption(q_map: dict[str, float]) -> pl.DataFrame:
    rows = []
    for order in all_orders:
        metrics = evaluate_order_metrics(order, q_map, bears_map)
        rows.append(
            {
                "choice_1": order[0],
                "choice_2": order[1],
                "choice_3": order[2],
                "choice_4": order[3],
                "xBears": metrics["expected_bears"],
                "xBears_if_go": metrics["expected_bears_if_trip"],
                "trip_chance_pct": metrics["p_trip"] * 100,
            }
        )
    return pl.DataFrame(rows)


recommended_specs = [
    ("Max Trip Chance", objective_1_results),
    ("Max Overall Bear Value", objective_2_results),
    ("Max Quality with Chance Floor", objective_3_results),
    ("Robust Across Lottery Scenarios", objective_4_results),
]

recommended_rows = []
for objective_name, source_df in recommended_specs:
    for row in source_df.iter_rows(named=True):
        order = (row["choice_1"], row["choice_2"], row["choice_3"], row["choice_4"])
        scenario = row["assumption"]
        precise = evaluate_order_metrics(order, assumption_q_maps[scenario], bears_map)

        recommended_rows.append(
            {
                "assumption": scenario,
                "objective_name": objective_name,
                "choice_1": order[0],
                "choice_2": order[1],
                "choice_3": order[2],
                "choice_4": order[3],
                "recommended_order": " > ".join(order),
                "xBears": precise["expected_bears"],
                "xBears_if_go": precise["expected_bears_if_trip"],
                "trip_chance_pct": precise["p_trip"] * 100,
            }
        )

recommended_pairs_df = pl.DataFrame(recommended_rows)

all_order_metrics = {
    spec["name"]: evaluate_all_orders_for_assumption(assumption_q_maps[spec["name"]])
    for spec in assumption_specs
}

pareto_rows = []
eps = 1e-12

for rec in recommended_pairs_df.iter_rows(named=True):
    scenario = rec["assumption"]
    universe = all_order_metrics[scenario]

    x = universe.get_column("xBears").to_numpy()
    y = universe.get_column("xBears_if_go").to_numpy()

    x0 = float(rec["xBears"])
    y0 = float(rec["xBears_if_go"])

    dominating_mask = ((x >= x0 - eps) & (y >= y0 - eps) & ((x > x0 + eps) | (y > y0 + eps)))
    dominated = bool(dominating_mask.any())

    dominating_order = None
    if dominated:
        idx = int(np.where(dominating_mask)[0][0])
        dom = universe.row(idx, named=True)
        dominating_order = f"{dom['choice_1']} > {dom['choice_2']} > {dom['choice_3']} > {dom['choice_4']}"

    pareto_rows.append(
        {
            "assumption": scenario,
            "objective_name": rec["objective_name"],
            "recommended_order": rec["recommended_order"],
            "xBears": x0,
            "xBears_if_go": y0,
            "dominated": dominated,
            "example_dominating_order": dominating_order,
            "xBears_percentile": float((x <= x0 + eps).mean() * 100),
            "xBears_if_go_percentile": float((y <= y0 + eps).mean() * 100),
        }
    )

pareto_check_df = pl.DataFrame(pareto_rows).with_columns(
    [
        pl.col("xBears").round(3),
        pl.col("xBears_if_go").round(3),
        pl.col("xBears_percentile").round(1),
        pl.col("xBears_if_go_percentile").round(1),
    ]
).sort(["assumption", "objective_name"])

display(pareto_check_df)

obj_colors = {
    "Max Trip Chance": "#4C78A8",
    "Max Overall Bear Value": "#F58518",
    "Max Quality with Chance Floor": "#54A24B",
    "Robust Across Lottery Scenarios": "#B279A2",
}

assumption_short = {
    "Scenario 1: Party-draw model": "Scenario 1",
    "Scenario 2: One-hit + capacity model": "Scenario 2",
    "Scenario 3: Independent triple-hit model": "Scenario 3",
}

fig, axes = plt.subplots(1, 3, figsize=(19, 5), sharey=True)

for ax, spec in zip(axes, assumption_specs):
    name = spec["name"]
    universe = all_order_metrics[name]
    recs = pareto_check_df.filter(pl.col("assumption") == name)

    ax.scatter(
        universe.get_column("xBears").to_list(),
        universe.get_column("xBears_if_go").to_list(),
        s=7,
        alpha=0.08,
        color="#808080",
        label="All possible 4-window orders",
    )

    for row in recs.iter_rows(named=True):
        marker = "X" if row["dominated"] else "o"
        ax.scatter(
            row["xBears"],
            row["xBears_if_go"],
            s=120,
            marker=marker,
            color=obj_colors[row["objective_name"]],
            edgecolor="black",
            linewidth=0.5,
        )
        short_label = row["objective_name"].replace("Max ", "").replace(" Across Lottery Scenarios", "")
        ax.annotate(short_label, (row["xBears"], row["xBears_if_go"]), textcoords="offset points", xytext=(4, 4), fontsize=8)

    ax.set_title(f"{assumption_short[name]}: Dominance Map")
    ax.set_xlabel("xBears")
    ax.grid(alpha=0.2)

axes[0].set_ylabel("xBears | go")

from matplotlib.patches import Patch
legend_handles = [Patch(color=c, label=k) for k, c in obj_colors.items()]
fig.legend(handles=legend_handles, loc="lower center", ncol=2, title="Recommended strategy type")
fig.suptitle("Dominance Check: Recommended Strategies vs All Possible Orders", fontsize=14)
plt.tight_layout(rect=[0, 0.10, 1, 0.95])
plt.show()

n_dominated = pareto_check_df.filter(pl.col("dominated")).height
if n_dominated == 0:
    display(Markdown("No recommended objective-scenario strategy is strictly dominated on both xBears and xBears | go."))
else:
    dominated_slice = pareto_check_df.filter(pl.col("dominated")).select(["assumption", "objective_name", "example_dominating_order"])
    display(Markdown(f"{n_dominated} recommended strategy pair(s) are strictly dominated; see table above for examples."))
    display(dominated_slice)
shape: (12, 9)
assumptionobjective_namerecommended_orderxBearsxBears_if_godominatedexample_dominating_orderxBears_percentilexBears_if_go_percentile
strstrstrf64f64boolstrf64f64
"Scenario 1: Party-draw model""Max Overall Bear Value""K > I > R > A"6.39222.179falsenull100.059.7
"Scenario 1: Party-draw model""Max Quality with Chance Floor""K > J > I > H"5.22649.496falsenull80.4100.0
"Scenario 1: Party-draw model""Max Trip Chance""A > R > S > T"5.90712.832true"A > B > I > R"99.20.5
"Scenario 1: Party-draw model""Robust Across Lottery Scenario…"R > A > S > T"5.9412.903true"A > B > K > R"99.40.6
"Scenario 2: One-hit + capacity…"Max Overall Bear Value""K > I > H > A"13.85429.678falsenull100.088.1
"Scenario 2: One-hit + capacity…"Max Quality with Chance Floor""K > J > I > H"12.20349.56falsenull98.7100.0
"Scenario 2: One-hit + capacity…"Max Trip Chance""A > T > R > S"9.82112.608true"A > B > C > D"45.20.6
"Scenario 2: One-hit + capacity…"Robust Across Lottery Scenario…"R > A > S > T"10.49913.478true"A > B > F > I"71.22.2
"Scenario 3: Independent triple…"Max Overall Bear Value""R > A > S > T"0.15111.881falsenull100.019.5
"Scenario 3: Independent triple…"Max Quality with Chance Floor""B > R > A > T"0.14112.283falsenull100.020.1
"Scenario 3: Independent triple…"Max Trip Chance""A > R > S > T"0.15111.88true"R > A > S > T"100.019.5
"Scenario 3: Independent triple…"Robust Across Lottery Scenario…"R > A > S > T"0.15111.881falsenull100.019.5
No description has been provided for this image

5 recommended strategy pair(s) are strictly dominated; see table above for examples.

shape: (5, 3)
assumptionobjective_nameexample_dominating_order
strstrstr
"Scenario 1: Party-draw model""Max Trip Chance""A > B > I > R"
"Scenario 1: Party-draw model""Robust Across Lottery Scenario…"A > B > K > R"
"Scenario 2: One-hit + capacity…"Max Trip Chance""A > B > C > D"
"Scenario 2: One-hit + capacity…"Robust Across Lottery Scenario…"A > B > F > I"
"Scenario 3: Independent triple…"Max Trip Chance""R > A > S > T"

This check helps prevent accidental "lose-lose" recommendations. If a strategy is dominated, there is a clearly better option under the same lottery model on both total expected bears and quality conditional on success.

Appendix A8: Scenario 2 Capacity Calibration and Switch Points¶

Scenario 2 uses an uncertain capacity factor (c) in: q_block = c * (1 - (1 - p_single)^3)

How this appendix calibrates c:

  1. Build a base one-hit probability for each window: one_hit_prob = 1 - (1 - p_single)^3
  2. Sweep c on a grid from 0.25 to 1.00 in 0.05 steps.
  3. For each c and each objective (trip chance, total value, quality-with-floor), re-optimize the 4-window order.
  4. Record:
    • intervals where the optimal order stays constant (capacity_intervals_df)
    • switch points where the optimal order changes (capacity_switch_df)

Use this as a robustness check: if your recommended order is stable across plausible c, the strategy is less sensitive to unknown capacity mechanics.

In [20]:
if "all_orders" not in globals():
    all_orders = list(permutations(all_blocks, CHOICES_PER_PERSON))

scenario2_base = block_df.select(
    [
        pl.col("block"),
        (1 - (1 - pl.col("p_single")) ** GROUP_SIZE).alias("one_hit_prob"),
    ]
)
scenario2_base_map = dict(zip(scenario2_base.get_column("block").to_list(), scenario2_base.get_column("one_hit_prob").to_list()))

capacity_grid = [round(x, 2) for x in np.arange(0.25, 1.01, 0.05)]
objective_grid = [
    ("access", "Max Trip Chance"),
    ("value", "Max Overall Bear Value"),
    ("quality_with_floor", "Max Quality with Chance Floor"),
]

capacity_rows = []
for cap in capacity_grid:
    q_map = {b: min(max(scenario2_base_map[b] * cap, 0.0), 1.0) for b in all_blocks}

    for objective_key, objective_name in objective_grid:
        order, metrics, used_floor = optimize_order_with_knobs(
            objective=objective_key,
            q_map=q_map,
            quality_min_trip_prob=QUALITY_MIN_TRIP_PROB,
        )
        capacity_rows.append(
            {
                "capacity_factor": cap,
                "objective_name": objective_name,
                "choice_1": order[0],
                "choice_2": order[1],
                "choice_3": order[2],
                "choice_4": order[3],
                "order_label": " > ".join(order),
                "trip_chance_pct": metrics["p_trip"] * 100,
                "xBears": metrics["expected_bears"],
                "xBears_if_go": metrics["expected_bears_if_trip"],
                "quality_floor_applied": used_floor,
            }
        )

capacity_sweep_df = pl.DataFrame(capacity_rows).with_columns(
    [
        pl.col("trip_chance_pct").round(2),
        pl.col("xBears").round(3),
        pl.col("xBears_if_go").round(3),
    ]
)

switch_rows = []
interval_rows = []

for objective_name in [x[1] for x in objective_grid]:
    subset = capacity_sweep_df.filter(pl.col("objective_name") == objective_name).sort("capacity_factor")

    prev_order = None
    prev_cap = None
    interval_start = None

    for row in subset.iter_rows(named=True):
        order = (row["choice_1"], row["choice_2"], row["choice_3"], row["choice_4"])
        cap = row["capacity_factor"]

        if prev_order is None:
            prev_order = order
            prev_cap = cap
            interval_start = cap
            continue

        if order != prev_order:
            switch_rows.append(
                {
                    "objective_name": objective_name,
                    "from_capacity": prev_cap,
                    "switch_at_capacity": cap,
                    "old_order": " > ".join(prev_order),
                    "new_order": " > ".join(order),
                }
            )
            interval_rows.append(
                {
                    "objective_name": objective_name,
                    "from_capacity": interval_start,
                    "to_capacity": prev_cap,
                    "order_label": " > ".join(prev_order),
                }
            )
            interval_start = cap
            prev_order = order

        prev_cap = cap

    interval_rows.append(
        {
            "objective_name": objective_name,
            "from_capacity": interval_start,
            "to_capacity": prev_cap,
            "order_label": " > ".join(prev_order),
        }
    )

capacity_switch_df = pl.DataFrame(switch_rows) if switch_rows else pl.DataFrame(
    schema={
        "objective_name": pl.Utf8,
        "from_capacity": pl.Float64,
        "switch_at_capacity": pl.Float64,
        "old_order": pl.Utf8,
        "new_order": pl.Utf8,
    }
)

capacity_intervals_df = pl.DataFrame(interval_rows).sort(["objective_name", "from_capacity"])

print("Optimal-order intervals by Scenario 2 capacity factor:")
display(capacity_intervals_df)

if capacity_switch_df.height > 0:
    print("\nCapacity switch points:")
    display(capacity_switch_df.sort(["objective_name", "switch_at_capacity"]))
else:
    print("\nNo switch points in the tested capacity range.")

fig, axes = plt.subplots(3, 1, figsize=(12, 11), sharex=True)

for ax, objective_name in zip(axes, [x[1] for x in objective_grid]):
    subset = capacity_sweep_df.filter(pl.col("objective_name") == objective_name).sort("capacity_factor")

    x_vals = subset.get_column("capacity_factor").to_list()
    xbears_vals = subset.get_column("xBears").to_list()
    chance_vals = subset.get_column("trip_chance_pct").to_list()

    ax.plot(x_vals, xbears_vals, marker="o", linewidth=2, color="#1f77b4", label="xBears")
    ax.set_ylabel("xBears", color="#1f77b4")
    ax.tick_params(axis="y", labelcolor="#1f77b4")
    ax.grid(alpha=0.2)

    ax2 = ax.twinx()
    ax2.plot(x_vals, chance_vals, marker="s", linewidth=2, linestyle="--", color="#e76f51", label="Trip chance %")
    ax2.set_ylabel("Trip chance %", color="#e76f51")
    ax2.tick_params(axis="y", labelcolor="#e76f51")

    if capacity_switch_df.height > 0:
        for sw in capacity_switch_df.filter(pl.col("objective_name") == objective_name).iter_rows(named=True):
            ax.axvline(sw["switch_at_capacity"], color="gray", linestyle=":", alpha=0.6)

    ax.set_title(f"{objective_name}: performance vs Scenario 2 capacity factor")

axes[-1].set_xlabel("Scenario 2 capacity factor (c)")
plt.tight_layout()
plt.show()

summary_lines = ["Scenario 2 calibration takeaway:"]
for row in capacity_intervals_df.iter_rows(named=True):
    summary_lines.append(
        f"- {row['objective_name']}: use `{row['order_label']}` for c in [{row['from_capacity']:.2f}, {row['to_capacity']:.2f}]"
    )

display(Markdown("\n".join(summary_lines)))
Optimal-order intervals by Scenario 2 capacity factor:
shape: (4, 4)
objective_namefrom_capacityto_capacityorder_label
strf64f64str
"Max Overall Bear Value"0.250.5"K > I > R > A"
"Max Overall Bear Value"0.551.0"K > I > H > A"
"Max Quality with Chance Floor"0.251.0"K > J > I > H"
"Max Trip Chance"0.251.0"R > A > S > T"
Capacity switch points:
shape: (1, 5)
objective_namefrom_capacityswitch_at_capacityold_ordernew_order
strf64f64strstr
"Max Overall Bear Value"0.50.55"K > I > R > A""K > I > H > A"
No description has been provided for this image

Scenario 2 calibration takeaway:

  • Max Overall Bear Value: use K > I > R > A for c in [0.25, 0.50]
  • Max Overall Bear Value: use K > I > H > A for c in [0.55, 1.00]
  • Max Quality with Chance Floor: use K > J > I > H for c in [0.25, 1.00]
  • Max Trip Chance: use R > A > S > T for c in [0.25, 1.00]

Appendix A9: Capacity Model Worked Example (Step-by-Step)¶

This worked example shows exactly how Scenario 2 converts listed draw odds into trip-level outcomes across a 4-window order.

For each ranked window k, the table computes:

  • p_single_k: listed individual draw probability
  • one_hit_k = 1 - (1 - p_single_k)^3
  • q_k = c * one_hit_k, with c = CAPACITY_FACTOR
  • p_this_k = p_no_prev_k * q_k where p_no_prev_k = Π_{j<k}(1 - q_j)
  • xBears_contrib_k = p_this_k * avg_bears_day_k

Summing p_this_k gives total trip chance, and summing xBears_contrib_k gives expected bears per application.

In [21]:
scenario2_name = "Scenario 2: One-hit + capacity model"

if "objective_2_results" in globals() and objective_2_results.height > 0:
    row = objective_2_results.filter(pl.col("assumption") == scenario2_name).row(0, named=True)
    worked_order = (row["choice_1"], row["choice_2"], row["choice_3"], row["choice_4"])
else:
    worked_order = tuple(
        block_df.sort("draw_pct", descending=True).get_column("block").head(CHOICES_PER_PERSON).to_list()
    )

p_single_map = dict(zip(block_df.get_column("block").to_list(), block_df.get_column("p_single").to_list()))
bears_map_local = dict(zip(block_df.get_column("block").to_list(), block_df.get_column("avg_bears_day").to_list()))
window_map_local = dict(zip(block_df.get_column("block").to_list(), block_df.get_column("window").to_list()))

rows = []
p_no_prev = 1.0
cum_trip = 0.0
cum_xbears = 0.0

for rank, block in enumerate(worked_order, start=1):
    p_single = float(p_single_map[block])
    one_hit = 1 - (1 - p_single) ** GROUP_SIZE
    q = max(0.0, min(1.0, CAPACITY_FACTOR * one_hit))

    p_this = p_no_prev * q
    xbears_contrib = p_this * float(bears_map_local[block])

    cum_trip += p_this
    cum_xbears += xbears_contrib

    rows.append(
        {
            "rank": rank,
            "block": block,
            "window": window_map_local[block],
            "p_single": p_single,
            "one_hit_prob": one_hit,
            "q_capacity_adjusted": q,
            "p_no_prev_before": p_no_prev,
            "p_this_window": p_this,
            "avg_bears_day": float(bears_map_local[block]),
            "xBears_contrib": xbears_contrib,
            "cum_trip_prob": cum_trip,
            "cum_xBears": cum_xbears,
        }
    )

    p_no_prev *= (1 - q)

worked_capacity_df = pl.DataFrame(rows).with_columns(
    [
        pl.col("p_single").round(4),
        pl.col("one_hit_prob").round(4),
        pl.col("q_capacity_adjusted").round(4),
        pl.col("p_no_prev_before").round(4),
        pl.col("p_this_window").round(4),
        pl.col("avg_bears_day").round(2),
        pl.col("xBears_contrib").round(3),
        pl.col("cum_trip_prob").round(4),
        pl.col("cum_xBears").round(3),
    ]
)

print(f"Worked order under {scenario2_name}: {' > '.join(worked_order)}")
print(f"Using c = {CAPACITY_FACTOR:.2f}, group size = {GROUP_SIZE}")
display(worked_capacity_df)

print(f"\nTotal trip chance = {cum_trip * 100:.2f}%")
print(f"Expected bears per application = {cum_xbears:.3f}")
print(f"Expected bears if trip happens = {(cum_xbears / cum_trip):.3f}" if cum_trip > 0 else "Expected bears if trip happens = 0.000")
Worked order under Scenario 2: One-hit + capacity model: K > I > H > A
Using c = 0.85, group size = 3
shape: (4, 12)
rankblockwindowp_singleone_hit_probq_capacity_adjustedp_no_prev_beforep_this_windowavg_bears_dayxBears_contribcum_trip_probcum_xBears
i64strstrf64f64f64f64f64f64f64f64f64
1"K""Jul 17 - Jul 20"0.030.08730.07421.00.074251.03.7860.07423.786
2"I""Jul 9 - Jul 12"0.030.08730.07420.92580.068749.753.4190.14297.204
3"H""Jul 5 - Jul 8"0.030.08730.07420.85710.063647.253.0060.206610.21
4"A""Jun 7 - Jun 10"0.150.38590.3280.79340.260214.03.6430.466813.854
Total trip chance = 46.68%
Expected bears per application = 13.854
Expected bears if trip happens = 29.678

Appendix A10: Access-risk day reference¶

This is a planning reference only. Each row shows the requested window, whether any day is flagged, and the exact flagged dates. A flagged date means the ADFG row includes the access-risk tide marker and may require extra float logistics.

In [22]:
# Compact access-risk reference (planning only).
risk_dates_by_block = (
    daily_df.filter(pl.col("access_risk_day"))
    .group_by("block")
    .agg(pl.col("date").dt.strftime("%b %-d").alias("risk_dates"))
)

window_access_risk_flags = (
    block_df.select(["block", "window", "has_access_risk_day"])
    .join(
        risk_dates_by_block,
        on="block",
        how="left",
    )
    .with_columns([
        pl.col("risk_dates")
        .fill_null([])
        .alias("risk_dates"),
        pl.col("risk_dates").list.len().alias("risk_day_count"),
        pl.col("risk_dates").list.join(", ").fill_null("No flagged days").alias("risk_days"),
    ])
    .select([
        "block",
        "window",
        pl.col('has_access_risk_day').cast(pl.Int8),
        "risk_day_count",
        "risk_days",
    ])
    .sort("window")
)

display(window_access_risk_flags)
shape: (20, 5)
blockwindowhas_access_risk_dayrisk_day_countrisk_days
strstri8u32str
"Q""Aug 10 - Aug 13"0null"No flagged days"
"R""Aug 14 - Aug 17"0null"No flagged days"
"S""Aug 18 - Aug 21"12"Aug 20, Aug 21"
"O""Aug 2 - Aug 5"0null"No flagged days"
"T""Aug 22 - Aug 25"13"Aug 22, Aug 23, Aug 24"
"P""Aug 6 - Aug 9"14"Aug 6, Aug 7, Aug 8, Aug 9"
"G""Jul 1 - Jul 4"0null"No flagged days"
"J""Jul 13 - Jul 16"0null"No flagged days"
"K""Jul 17 - Jul 20"0null"No flagged days"
"L""Jul 21 - Jul 24"14"Jul 21, Jul 22, Jul 23, Jul 24"
"M""Jul 25 - Jul 28"12"Jul 25, Jul 26"
"N""Jul 29 - Aug 1"0null"No flagged days"
"H""Jul 5 - Jul 8"12"Jul 7, Jul 8"
"I""Jul 9 - Jul 12"13"Jul 9, Jul 10, Jul 11"
"B""Jun 11 - Jun 14"11"Jun 11"
"C""Jun 15 - Jun 18"0null"No flagged days"
"D""Jun 19 - Jun 22"12"Jun 21, Jun 22"
"E""Jun 23 - Jun 26"14"Jun 23, Jun 24, Jun 25, Jun 26"
"F""Jun 27 - Jun 30"11"Jun 27"
"A""Jun 7 - Jun 10"14"Jun 7, Jun 8, Jun 9, Jun 10"
In [23]:
tradeoff_df = block_df.select(["block", "window", "draw_pct", "avg_bears_day", "has_access_risk_day"])

corr = tradeoff_df.select(pl.corr("draw_pct", "avg_bears_day")).item()
print(f"Correlation(draw_pct, avg_bears_day): {corr:.3f}")
Correlation(draw_pct, avg_bears_day): -0.754

Appendix A11: Draw % vs Bear Volume Model Comparison¶

In [24]:
import numpy as np

model_df = block_df.select(["draw_pct", "avg_bears_day"]).drop_nulls()
x = np.array(model_df.get_column("draw_pct"), dtype=float)
y = np.array(model_df.get_column("avg_bears_day"), dtype=float)
n = len(x)

if np.any(x <= 0):
    raise ValueError("draw_pct must be strictly positive for reciprocal/log models")


def fit_ols(X: np.ndarray, y: np.ndarray):
    beta, *_ = np.linalg.lstsq(X, y, rcond=None)
    y_hat = X @ beta
    sse = float(np.sum((y - y_hat) ** 2))
    return beta, y_hat, sse


def fit_metrics(y: np.ndarray, y_hat: np.ndarray, sse: float, k_params: int) -> dict[str, float]:
    n = len(y)
    ss_tot = float(np.sum((y - y.mean()) ** 2))
    r2 = 1.0 - sse / ss_tot if ss_tot > 0 else float("nan")

    p = k_params - 1
    adj_r2 = 1.0 - (1.0 - r2) * (n - 1) / (n - p - 1) if n > (p + 1) else float("nan")
    rmse = float(np.sqrt(sse / n))

    if sse <= 0:
        aic = float("-inf")
    else:
        aic = n * np.log(sse / n) + 2 * k_params
    aicc = aic + (2 * k_params * (k_params + 1)) / (n - k_params - 1) if n > (k_params + 1) else float("nan")

    return {"r2": r2, "adj_r2": adj_r2, "rmse": rmse, "aicc": float(aicc)}


def design_linear(xv: np.ndarray) -> np.ndarray:
    return np.column_stack([np.ones_like(xv), xv])


def design_reciprocal(xv: np.ndarray) -> np.ndarray:
    return np.column_stack([np.ones_like(xv), 1.0 / xv])


def design_log(xv: np.ndarray) -> np.ndarray:
    return np.column_stack([np.ones_like(xv), np.log(xv)])


def design_piecewise(xv: np.ndarray, knot: float) -> np.ndarray:
    return np.column_stack([np.ones_like(xv), xv, np.maximum(0.0, xv - knot)])


def fit_piecewise_best(x_train: np.ndarray, y_train: np.ndarray):
    uniq = np.unique(x_train)
    candidates = uniq[2:-2] if len(uniq) >= 6 else uniq[1:-1]
    if len(candidates) == 0:
        candidates = np.array([float(np.median(uniq))])

    best = None
    for knot in candidates:
        X = design_piecewise(x_train, float(knot))
        beta, y_hat, sse = fit_ols(X, y_train)
        m = fit_metrics(y_train, y_hat, sse, k_params=3)

        score = m["aicc"] if np.isfinite(m["aicc"]) else m["rmse"]
        if best is None or score < best["score"]:
            best = {
                "knot": float(knot),
                "beta": beta,
                "y_hat": y_hat,
                "sse": sse,
                "metrics": m,
                "score": float(score),
            }

    return best


def loocv_rmse(model_name: str) -> float:
    preds = np.zeros_like(y)
    for i in range(n):
        mask = np.arange(n) != i
        x_tr, y_tr = x[mask], y[mask]
        x_te = np.array([x[i]], dtype=float)

        if model_name == "piecewise":
            fit = fit_piecewise_best(x_tr, y_tr)
            X_te = design_piecewise(x_te, fit["knot"])
            preds[i] = float((X_te @ fit["beta"])[0])
        else:
            design_fn = {
                "linear": design_linear,
                "reciprocal": design_reciprocal,
                "log": design_log,
            }[model_name]
            X_tr = design_fn(x_tr)
            beta, _, _ = fit_ols(X_tr, y_tr)
            X_te = design_fn(x_te)
            preds[i] = float((X_te @ beta)[0])

    return float(np.sqrt(np.mean((y - preds) ** 2)))


fits = {}

for name, design_fn in [
    ("linear", design_linear),
    ("reciprocal", design_reciprocal),
    ("log", design_log),
]:
    X = design_fn(x)
    beta, y_hat, sse = fit_ols(X, y)
    fits[name] = {
        "beta": beta,
        "y_hat": y_hat,
        "sse": sse,
        "metrics": fit_metrics(y, y_hat, sse, k_params=X.shape[1]),
    }

fits["piecewise"] = fit_piecewise_best(x, y)

rows = []
for name in ["linear", "reciprocal", "log", "piecewise"]:
    m = fits[name]["metrics"]
    rows.append(
        {
            "model": name,
            "r2": m["r2"],
            "adj_r2": m["adj_r2"],
            "rmse": m["rmse"],
            "aicc": m["aicc"],
            "loocv_rmse": loocv_rmse(name),
            "knot": fits[name].get("knot", float("nan")),
        }
    )

metrics_df = pl.DataFrame(rows).with_columns(
    [
        pl.col("r2").round(3),
        pl.col("adj_r2").round(3),
        pl.col("rmse").round(3),
        pl.col("aicc").round(2),
        pl.col("loocv_rmse").round(3),
        pl.col("knot").round(3),
    ]
).sort("loocv_rmse")

print("Model comparison for draw_pct -> avg_bears_day (lower LOOCV RMSE is better):")
display(metrics_df)

best_model = metrics_df.row(0, named=True)["model"]
print(f"Best model by LOOCV RMSE: {best_model}")

lin_beta = fits["linear"]["beta"]
print(f"Linear reference: avg_bears_day = {lin_beta[0]:.3f} + ({lin_beta[1]:.3f}) * draw_pct")
print(f"Linear slope interpretation: +1 draw-point corresponds to {lin_beta[1]:.3f} bears/day on average.")

x_line = np.linspace(float(x.min()), float(x.max()), 400)

fig, ax = plt.subplots(figsize=(10, 6))
ax.scatter(x, y, s=70, alpha=0.85, label="Windows")

colors = {"linear": "#d62728", "reciprocal": "#1f77b4", "log": "#2ca02c", "piecewise": "#9467bd"}
for name in ["linear", "reciprocal", "log", "piecewise"]:
    if name == "piecewise":
        knot = fits[name]["knot"]
        y_line = design_piecewise(x_line, knot) @ fits[name]["beta"]
        suffix = f", knot={knot:.2f}"
    elif name == "linear":
        y_line = design_linear(x_line) @ fits[name]["beta"]
        suffix = ""
    elif name == "reciprocal":
        y_line = design_reciprocal(x_line) @ fits[name]["beta"]
        suffix = ""
    else:
        y_line = design_log(x_line) @ fits[name]["beta"]
        suffix = ""

    rmse_cv = float(metrics_df.filter(pl.col("model") == name).select("loocv_rmse").item())
    ax.plot(
        x_line,
        y_line,
        linewidth=2.2 if name == best_model else 1.6,
        alpha=1.0 if name == best_model else 0.8,
        color=colors[name],
        label=f"{name} (LOOCV RMSE={rmse_cv:.3f}{suffix})",
    )

ax.set_title("Section 1 Model Comparison: Draw % vs Average Bears/Day")
ax.set_xlabel("Draw %")
ax.set_ylabel("Average bears/day")
ax.grid(alpha=0.25)
ax.legend()
plt.tight_layout()
plt.show()
Model comparison for draw_pct -> avg_bears_day (lower LOOCV RMSE is better):
shape: (4, 7)
modelr2adj_r2rmseaiccloocv_rmseknot
strf64f64f64f64f64f64
"log"0.6860.6687.42684.918.263NaN
"reciprocal"0.6690.657.62485.969.056NaN
"piecewise"0.7350.7046.8284.39.4195.0
"linear"0.5680.5448.70391.259.686NaN
Best model by LOOCV RMSE: log
Linear reference: avg_bears_day = 41.692 + (-2.214) * draw_pct
Linear slope interpretation: +1 draw-point corresponds to -2.214 bears/day on average.
No description has been provided for this image