๐ป evaluate.py
python ยท 308 lines ยท โฌ๏ธ Download
"""
Evaluation Framework
=====================
Computes key metrics:
- CEP50 / CEP90: Circular Error Probable at 50th/90th percentile
- Mean / Median absolute error
- Improvement ratio over baseline (for moving scenarios)
Target: CEP90 โค 30m for static (โฅ10 samaritans), moving precision +20% vs baseline
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
import numpy as np
import pandas as pd
from typing import List, Dict
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from data_simulator import (
generate_static_scenario, generate_moving_scenario, SimulationScenario,
)
from location_estimator import (
LocationAggregationPipeline, baseline_weighted_centroid, LocationEstimate,
)
def cep(errors: np.ndarray, percentile: float = 90.0) -> float:
"""Circular Error Probable at given percentile."""
return float(np.percentile(errors, percentile))
def evaluate_static(
n_trials: int = 500,
n_samaritans: int = 15,
base_seed: int = 1000,
) -> Dict:
"""
Run n_trials Monte Carlo trials for the static scenario.
Returns errors for our method and the baseline.
"""
pipeline = LocationAggregationPipeline()
our_errors = []
baseline_errors = []
for trial in range(n_trials):
scene = generate_static_scenario(
n_samaritans=n_samaritans,
seed=base_seed + trial,
)
true_pos = scene.true_device_positions[0]
est = pipeline.estimate(scene)
our_errors.append(np.linalg.norm(est.position - true_pos))
bl_pos = baseline_weighted_centroid(scene)
baseline_errors.append(np.linalg.norm(bl_pos - true_pos))
our_errors = np.array(our_errors)
baseline_errors = np.array(baseline_errors)
return {
'our_errors': our_errors,
'baseline_errors': baseline_errors,
'our_cep50': cep(our_errors, 50),
'our_cep90': cep(our_errors, 90),
'baseline_cep50': cep(baseline_errors, 50),
'baseline_cep90': cep(baseline_errors, 90),
'our_mean': our_errors.mean(),
'baseline_mean': baseline_errors.mean(),
'n_samaritans': n_samaritans,
}
def evaluate_moving(
n_trials: int = 300,
n_samaritans: int = 6,
base_seed: int = 5000,
scenario_type: str = 'moving',
) -> Dict:
"""
Run n_trials for the moving scenario.
Error = distance from estimated position to device position
at the time of the last samaritan report.
"""
pipeline = LocationAggregationPipeline()
our_errors = []
baseline_errors = []
for trial in range(n_trials):
scene = generate_moving_scenario(
n_samaritans=n_samaritans,
seed=base_seed + trial,
scenario_type=scenario_type,
)
last_t = max(r.timestamp for r in scene.reports)
t_idx = min(int(last_t), len(scene.true_device_positions) - 1)
true_pos = scene.true_device_positions[t_idx]
est = pipeline.estimate(scene)
our_errors.append(np.linalg.norm(est.position - true_pos))
bl_pos = baseline_weighted_centroid(scene)
baseline_errors.append(np.linalg.norm(bl_pos - true_pos))
our_errors = np.array(our_errors)
baseline_errors = np.array(baseline_errors)
improvement = (baseline_errors.mean() - our_errors.mean()) / baseline_errors.mean() * 100
return {
'our_errors': our_errors,
'baseline_errors': baseline_errors,
'our_cep50': cep(our_errors, 50),
'our_cep90': cep(our_errors, 90),
'baseline_cep50': cep(baseline_errors, 50),
'baseline_cep90': cep(baseline_errors, 90),
'our_mean': our_errors.mean(),
'baseline_mean': baseline_errors.mean(),
'improvement_pct': improvement,
'n_samaritans': n_samaritans,
'scenario_type': scenario_type,
}
def sensitivity_analysis(
samaritan_counts: List[int] = [3, 5, 8, 10, 15, 20],
n_trials: int = 200,
) -> pd.DataFrame:
rows = []
for n in samaritan_counts:
res = evaluate_static(n_trials=n_trials, n_samaritans=n)
rows.append({
'n_samaritans': n,
'our_cep50': res['our_cep50'],
'our_cep90': res['our_cep90'],
'baseline_cep50': res['baseline_cep50'],
'baseline_cep90': res['baseline_cep90'],
'our_mean': res['our_mean'],
})
return pd.DataFrame(rows)
def plot_results(
static_res: Dict,
moving_res: Dict,
metro_res: Dict,
sensitivity_df: pd.DataFrame,
output_dir: str = '../results',
):
os.makedirs(output_dir, exist_ok=True)
fig, axes = plt.subplots(2, 2, figsize=(14, 11))
fig.suptitle('Problem 1: Crowdsourced BLE Location Aggregation โ Results',
fontsize=14, fontweight='bold')
ax = axes[0, 0]
errors_our = np.sort(static_res['our_errors'])
errors_bl = np.sort(static_res['baseline_errors'])
cdf = np.arange(1, len(errors_our) + 1) / len(errors_our)
ax.plot(errors_our, cdf * 100, 'b-', lw=2, label=f"Ours (CEP90={static_res['our_cep90']:.1f}m)")
ax.plot(errors_bl, cdf * 100, 'r--', lw=2, label=f"Baseline (CEP90={static_res['baseline_cep90']:.1f}m)")
ax.axhline(90, color='gray', linestyle=':', alpha=0.7, label='90th percentile')
ax.axvline(30, color='green', linestyle='--', alpha=0.7, label='Target: 30m')
ax.set_xlabel('Positioning Error (m)')
ax.set_ylabel('CDF (%)')
ax.set_title(f'Static Scene โ CDF (N={static_res["n_samaritans"]} samaritans)')
ax.legend(fontsize=8)
ax.set_xlim(0, min(100, errors_bl.max() * 1.05))
ax.grid(True, alpha=0.3)
ax = axes[0, 1]
m_our = np.sort(moving_res['our_errors'])
m_bl = np.sort(moving_res['baseline_errors'])
cdf_m = np.arange(1, len(m_our) + 1) / len(m_our)
ax.plot(m_our, cdf_m * 100, 'b-', lw=2,
label=f"Ours (CEP90={moving_res['our_cep90']:.1f}m)")
ax.plot(m_bl, cdf_m * 100, 'r--', lw=2,
label=f"Baseline (CEP90={moving_res['baseline_cep90']:.1f}m)")
ax.axhline(90, color='gray', linestyle=':', alpha=0.7)
ax.set_xlabel('Positioning Error (m)')
ax.set_ylabel('CDF (%)')
improvement = moving_res['improvement_pct']
ax.set_title(f'Moving Scene โ CDF (Improvement: {improvement:.1f}%)')
ax.legend(fontsize=8)
ax.set_xlim(0, min(300, m_bl.max() * 1.05))
ax.grid(True, alpha=0.3)
ax = axes[1, 0]
ax.plot(sensitivity_df['n_samaritans'], sensitivity_df['our_cep90'],
'b-o', lw=2, label='Ours CEP90')
ax.plot(sensitivity_df['n_samaritans'], sensitivity_df['baseline_cep90'],
'r--s', lw=2, label='Baseline CEP90')
ax.plot(sensitivity_df['n_samaritans'], sensitivity_df['our_cep50'],
'b:^', lw=1, alpha=0.6, label='Ours CEP50')
ax.axhline(30, color='green', linestyle='--', alpha=0.8, label='Target CEP90=30m')
ax.axvline(10, color='orange', linestyle=':', alpha=0.7, label='Min 10 samaritans')
ax.set_xlabel('Number of Samaritans')
ax.set_ylabel('Error (m)')
ax.set_title('Sensitivity: CEP90 vs. Number of Samaritans')
ax.legend(fontsize=8)
ax.grid(True, alpha=0.3)
ax = axes[1, 1]
scenarios = ['Static', 'Moving (Walk)', 'Moving (Metro)']
our_cep90s = [static_res['our_cep90'], moving_res['our_cep90'], metro_res['our_cep90']]
bl_cep90s = [static_res['baseline_cep90'], moving_res['baseline_cep90'],
metro_res['baseline_cep90']]
x = np.arange(len(scenarios))
w = 0.35
bars1 = ax.bar(x - w/2, our_cep90s, w, label='Ours', color='steelblue', alpha=0.85)
bars2 = ax.bar(x + w/2, bl_cep90s, w, label='Baseline', color='tomato', alpha=0.85)
ax.bar_label(bars1, fmt='%.1fm', fontsize=8, padding=2)
ax.bar_label(bars2, fmt='%.1fm', fontsize=8, padding=2)
ax.axhline(30, color='green', linestyle='--', alpha=0.8, label='CEP90 target 30m')
ax.set_xticks(x)
ax.set_xticklabels(scenarios)
ax.set_ylabel('CEP90 (m)')
ax.set_title('CEP90 Comparison by Scenario')
ax.legend(fontsize=8)
ax.grid(True, alpha=0.3, axis='y')
plt.tight_layout()
out_path = os.path.join(output_dir, 'evaluation_results.png')
plt.savefig(out_path, dpi=150, bbox_inches='tight')
print(f"Figure saved: {out_path}")
plt.close()
if __name__ == '__main__':
print("=" * 60)
print("PROBLEM 1 โ EVALUATION")
print("=" * 60)
print("\n[1/4] Evaluating static scenario (500 trials, 15 samaritans)...")
static_res = evaluate_static(n_trials=500, n_samaritans=15)
print(f" Our method โ CEP50: {static_res['our_cep50']:.1f}m "
f"CEP90: {static_res['our_cep90']:.1f}m Mean: {static_res['our_mean']:.1f}m")
print(f" Baseline โ CEP50: {static_res['baseline_cep50']:.1f}m "
f"CEP90: {static_res['baseline_cep90']:.1f}m Mean: {static_res['baseline_mean']:.1f}m")
target_met = "โ
MET" if static_res['our_cep90'] <= 30 else "โ MISSED"
print(f" Target CEP90 โค 30m: {target_met} ({static_res['our_cep90']:.1f}m)")
print("\n[2/4] Evaluating moving scenario (300 trials, 6 samaritans)...")
moving_res = evaluate_moving(n_trials=300, n_samaritans=6, scenario_type='moving')
print(f" Our method โ CEP50: {moving_res['our_cep50']:.1f}m "
f"CEP90: {moving_res['our_cep90']:.1f}m Mean: {moving_res['our_mean']:.1f}m")
print(f" Baseline โ CEP50: {moving_res['baseline_cep50']:.1f}m "
f"CEP90: {moving_res['baseline_cep90']:.1f}m Mean: {moving_res['baseline_mean']:.1f}m")
impr_target = "โ
MET" if moving_res['improvement_pct'] >= 20 else "โ MISSED"
print(f" Improvement over baseline: {moving_res['improvement_pct']:.1f}% "
f"(target โฅ 20%): {impr_target}")
print("\n[3/4] Evaluating metro scenario (200 trials)...")
metro_res = evaluate_moving(n_trials=200, n_samaritans=5, scenario_type='metro')
print(f" Our method โ CEP90: {metro_res['our_cep90']:.1f}m Mean: {metro_res['our_mean']:.1f}m")
print(f" Baseline โ CEP90: {metro_res['baseline_cep90']:.1f}m Mean: {metro_res['baseline_mean']:.1f}m")
print(f" Improvement: {metro_res['improvement_pct']:.1f}%")
print("\n[4/4] Sensitivity analysis (varying samaritan count)...")
sens_df = sensitivity_analysis(
samaritan_counts=[3, 5, 8, 10, 15, 20], n_trials=200
)
print(sens_df.round(1).to_string(index=False))
print("\n[Plotting results...]")
plot_results(static_res, moving_res, metro_res, sens_df)
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
print(f" Static CEP90: {static_res['our_cep90']:.1f}m (target โค 30m)")
print(f" Moving improvement: {moving_res['improvement_pct']:.1f}% (target โฅ 20%)")
print(f" Metro improvement: {metro_res['improvement_pct']:.1f}%")