๐ป location_estimator.py
python ยท 731 lines ยท โฌ๏ธ Download
"""
Core Location Estimation Algorithm
====================================
Implements the solution to Problem 1: Crowdsourced Offline High-Precision
Location Aggregation based on Nonlinear Optimization.
Key components:
1. Weight Composer โ multi-factor wแตข design
2. Robust WNLS Solver โ IRLS with M-estimator (Huber / Tukey)
3. Scene Classifier โ static vs. moving vs. metro detection
4. Trajectory Estimator โ Kalman-filter-based moving position estimate
5. Pipeline โ end-to-end pipeline combining all components
Mathematical formulation:
p* = argmin_p ฮฃแตข wแตข ยท ฯ( f(rแตข) - ||p - pแตข||โ )
Where:
wแตข = composite weight for samaritan i
rแตข = RSSI measurement from samaritan i
pแตข = GPS position reported by samaritan i
f(rแตข)= estimated distance from RSSI
ฯ(ยท) = Huber robust loss (reduces to L2 near 0, L1 for large residuals)
"""
import numpy as np
from scipy.optimize import minimize
from scipy.spatial.distance import cdist
from typing import List, Tuple, Optional
from dataclasses import dataclass
from data_simulator import (
SamaritanReport, SimulationScenario,
rssi_to_distance, BLE_RANGE_M,
)
@dataclass
class LocationEstimate:
"""Result from the location estimation pipeline."""
position: np.ndarray
uncertainty_m: float
scene_type: str
used_reports: int
total_reports: int
convergence_score: float
trajectory: Optional[np.ndarray] = None
class WeightComposer:
"""
Computes composite weight wแตข for each samaritan report.
Weight factors (all normalized to [0, 1] before multiplication):
w_method : localization method quality (gnss > wifi > cell)
w_rssi : RSSI strength (stronger signal = samaritan closer = more info)
w_freshness: recency of report (more recent = higher weight)
w_confidence: device-reported GPS confidence
w_geometric : geometric diversity bonus (spread-out samaritans preferred)
Final: wแตข = w_method ยท w_rssi ยท w_freshness ยท w_confidence
Geometric diversity is applied as a post-hoc multiplier.
"""
METHOD_QUALITY = {'gnss': 1.0, 'wifi': 0.55, 'cell': 0.15}
RSSI_REFERENCE_DBM = -70.0
def __init__(self, freshness_halflife_s: float = 1800.0):
"""
Args:
freshness_halflife_s: Time constant for recency decay (seconds).
Default 30 minutes โ older reports lose weight.
"""
self.freshness_halflife = freshness_halflife_s
def compute_weights(
self,
reports: List[SamaritanReport],
reference_time: Optional[float] = None,
) -> np.ndarray:
"""
Compute composite weights for a list of reports.
Returns:
weights: np.ndarray of shape (N,), non-negative, not yet normalized
"""
if not reports:
return np.array([])
if reference_time is None:
reference_time = max(r.timestamp for r in reports)
weights = np.ones(len(reports))
for i, report in enumerate(reports):
w_method = self.METHOD_QUALITY.get(report.localization_method, 0.3)
rssi_normalized = np.clip((report.rssi_dbm + 110) / 50.0, 0.0, 1.0)
w_rssi = 0.2 + 0.8 * rssi_normalized
age_s = max(0, reference_time - report.timestamp)
w_freshness = 0.5 + 0.5 * np.exp(-age_s / self.freshness_halflife)
w_conf = max(0.1, report.confidence)
w_accuracy = max(0.2, min(1.0, 10.0 / max(report.gnss_accuracy_m, 0.5)))
factors = [w_method, w_rssi, w_freshness, w_conf, w_accuracy]
weights[i] = float(np.prod(factors) ** (1.0 / len(factors)))
positions = np.array([r.gps_position for r in reports])
diversity_bonus = self._geometric_diversity_bonus(positions)
weights *= diversity_bonus
weights = np.maximum(weights, 1e-6)
return weights
def _geometric_diversity_bonus(self, positions: np.ndarray) -> np.ndarray:
"""
Give a diversity bonus to reports that cover unique angular directions
from the centroid, encouraging spread-out triangulation geometry.
Scale: [0.8, 1.2] so it modulates but doesn't dominate.
"""
if len(positions) <= 1:
return np.ones(len(positions))
centroid = positions.mean(axis=0)
vectors = positions - centroid
norms = np.linalg.norm(vectors, axis=1, keepdims=True) + 1e-6
unit_vectors = vectors / norms
cos_sim = unit_vectors @ unit_vectors.T
np.fill_diagonal(cos_sim, 0)
avg_similarity = np.abs(cos_sim).mean(axis=1)
diversity = 1.0 - avg_similarity
bonus = 0.8 + 0.4 * (diversity - diversity.min()) / (diversity.max() - diversity.min() + 1e-9)
return bonus
def huber_loss(r: float, delta: float = 5.0) -> float:
"""Huber loss: quadratic for |r| โค ฮด, linear beyond."""
if abs(r) <= delta:
return 0.5 * r ** 2
return delta * (abs(r) - 0.5 * delta)
def huber_weight(r: float, delta: float = 5.0) -> float:
"""IRLS weight for Huber: w(r) = min(1, ฮด/|r|)."""
if abs(r) < 1e-6:
return 1.0
return min(1.0, delta / abs(r))
def tukey_weight(r: float, c: float = 10.0) -> float:
"""Tukey bisquare IRLS weight โ zero weight for extreme outliers."""
if abs(r) > c:
return 0.0
return (1 - (r / c) ** 2) ** 2
class RobustWNLSSolver:
"""
Iteratively Reweighted Least Squares (IRLS) solver for:
p* = argmin_p ฮฃแตข wแตข ยท ฯ(f(rแตข) - ||p - pแตข||โ)
Strategy:
- Initialize via weighted centroid (fast, reasonable starting point)
- IRLS: alternate between updating p (optimization) and
updating IRLS weights based on residuals (outlier down-weighting)
- Final refinement with scipy.optimize for precision
"""
def __init__(
self,
m_estimator: str = 'huber',
huber_delta: float = 8.0,
tukey_c: float = 15.0,
max_irls_iter: int = 20,
convergence_tol: float = 0.1,
):
self.m_estimator = m_estimator
self.huber_delta = huber_delta
self.tukey_c = tukey_c
self.max_irls_iter = max_irls_iter
self.convergence_tol = convergence_tol
def _irls_weight(self, residual: float) -> float:
if self.m_estimator == 'huber':
return huber_weight(residual, self.huber_delta)
elif self.m_estimator == 'tukey':
return tukey_weight(residual, self.tukey_c)
return 1.0
def _objective_gps(self, p: np.ndarray, positions: np.ndarray,
combined_weights: np.ndarray) -> float:
"""
GPS-position objective: minimize weighted squared distance from GPS reports.
This is the primary objective for static scenes where GPS accuracy dominates.
"""
diffs = positions - p[None, :]
sq_dists = np.sum(diffs ** 2, axis=1)
return float(np.sum(combined_weights * sq_dists))
def _gradient_gps(self, p: np.ndarray, positions: np.ndarray,
combined_weights: np.ndarray) -> np.ndarray:
"""Analytical gradient of GPS objective."""
diffs = p[None, :] - positions
return 2.0 * (combined_weights[:, None] * diffs).sum(axis=0)
def _objective_rssi(self, p: np.ndarray, positions: np.ndarray,
distances: np.ndarray, combined_weights: np.ndarray) -> float:
"""
RSSI-distance objective (original formulation from problem statement).
Used as a soft regularizer blended with GPS objective.
"""
diffs = np.linalg.norm(positions - p[None, :], axis=1)
residuals = distances - diffs
return float(np.sum(combined_weights * residuals ** 2))
def _gradient_rssi(self, p: np.ndarray, positions: np.ndarray,
distances: np.ndarray, combined_weights: np.ndarray) -> np.ndarray:
"""Analytical gradient of RSSI objective."""
diffs_vec = p[None, :] - positions
diffs_norm = np.linalg.norm(diffs_vec, axis=1) + 1e-6
residuals = distances - diffs_norm
grad_factors = -2.0 * combined_weights * residuals / diffs_norm
return (grad_factors[:, None] * diffs_vec).sum(axis=0)
def solve(
self,
reports: List[SamaritanReport],
prior_weights: np.ndarray,
initial_position: Optional[np.ndarray] = None,
rssi_blend: float = 0.15,
) -> Tuple[np.ndarray, float, np.ndarray]:
"""
Solve for device position using a blended GPS+RSSI objective.
The primary insight: GPS position accuracy (3-15m for GNSS) is far better
than RSSI ranging accuracy (15-50m typical). So the GPS objective
dominates, with RSSI acting as a soft direction-correcting regularizer.
Blended objective: (1-ฮฑ)ยทGPS_obj + ฮฑยทRSSI_obj, ฮฑ = rssi_blend
Args:
reports: List of samaritan reports
prior_weights: wแตข from WeightComposer (shape N,)
initial_position: Starting point for optimizer
rssi_blend: Blend coefficient ฮฑ for RSSI objective [0, 1]
Returns:
(position, residual_std, irls_final_weights)
"""
positions = np.array([r.gps_position for r in reports])
distances = np.array([r.estimated_distance for r in reports])
N = len(reports)
if N == 0:
raise ValueError("No reports to process")
rssi_init_w = np.array([
1.0 / (1.0 + np.exp(0.1 * (r.rssi_dbm + 80))) for r in reports
])
combined_init = prior_weights * rssi_init_w
w_norm_init = combined_init / combined_init.sum()
if initial_position is None:
initial_position = (w_norm_init[:, None] * positions).sum(axis=0)
p_current = initial_position.copy()
irls_weights = np.ones(N)
pos_scale = max(np.std(positions), 1.0)
for iteration in range(self.max_irls_iter):
combined_weights = prior_weights * irls_weights
def objective(p):
gps_loss = self._objective_gps(p, positions, combined_weights)
rssi_loss = self._objective_rssi(p, positions, distances, combined_weights)
return (1 - rssi_blend) * gps_loss + rssi_blend * rssi_loss * (pos_scale ** 2)
def gradient(p):
gps_grad = self._gradient_gps(p, positions, combined_weights)
rssi_grad = self._gradient_rssi(p, positions, distances, combined_weights)
return (1 - rssi_blend) * gps_grad + rssi_blend * rssi_grad * (pos_scale ** 2)
result = minimize(
fun=objective,
x0=p_current,
jac=gradient,
method='L-BFGS-B',
options={'maxiter': 300, 'ftol': 1e-12, 'gtol': 1e-7},
)
p_new = result.x
gps_residuals = np.linalg.norm(positions - p_new[None, :], axis=1)
irls_weights_new = np.array([self._irls_weight(r) for r in gps_residuals])
position_change = np.linalg.norm(p_new - p_current)
p_current = p_new
irls_weights = irls_weights_new
if position_change < self.convergence_tol:
break
final_gps_diffs = np.linalg.norm(positions - p_current[None, :], axis=1)
final_combined = prior_weights * irls_weights
w_norm = final_combined / final_combined.sum()
weighted_residual_std = np.sqrt(np.sum(w_norm * final_gps_diffs ** 2))
return p_current, weighted_residual_std, irls_weights
class SceneClassifier:
"""
Classifies the device scenario as static, moving (pedestrian), or metro.
Features used:
- Spread of samaritan GPS positions (high spread -> device likely moved)
- Temporal correlation between position and timestamp
- Speed estimate from position change over time
"""
STATIC_SPREAD_THRESHOLD_M = 80.0
METRO_SPEED_THRESHOLD_MPS = 5.0
def classify(self, reports: List[SamaritanReport]) -> str:
"""
Returns: 'static' | 'moving' | 'metro'
"""
if len(reports) < 3:
return 'static'
positions = np.array([r.gps_position for r in reports])
timestamps = np.array([r.timestamp for r in reports])
order = np.argsort(timestamps)
positions = positions[order]
timestamps = timestamps[order]
spatial_spread = np.std(positions, axis=0).mean()
if spatial_spread < self.STATIC_SPREAD_THRESHOLD_M:
return 'static'
time_span = timestamps[-1] - timestamps[0]
if time_span < 1.0:
return 'static'
pos_span = np.linalg.norm(positions[-1] - positions[0])
implied_speed = pos_span / time_span
if implied_speed > self.METRO_SPEED_THRESHOLD_MPS:
return 'metro'
return 'moving'
class TrajectoryEstimator:
"""
Estimates device position over time for moving scenarios using a
Kalman Filter with constant-velocity motion model.
State vector: [x, y, vx, vy]
Observation: [x_obs, y_obs] derived from each samaritan report
For each samaritan report, we compute a "shadow position" โ our best
estimate of where the device was when the samaritan detected it,
by subtracting the RSSI-estimated distance in the direction of the
centroid of surrounding reports.
"""
def __init__(
self,
process_noise: float = 1.0,
obs_noise_m: float = 20.0,
):
self.Q_scale = process_noise ** 2
self.R_scale = obs_noise_m ** 2
def estimate(
self,
reports: List[SamaritanReport],
prior_weights: np.ndarray,
scene_type: str = 'moving',
) -> np.ndarray:
"""
Estimate the most recent device position from a moving sequence.
Strategy:
1. Sort reports by time
2. For each report: estimate device position at that time
= samaritan_gps - (direction_to_centroid * rssi_distance)
3. Kalman Filter over these shadow positions to smooth the trajectory
4. Return state at the last observation time
Returns:
position: np.ndarray [x, y] โ best estimate at latest timestamp
"""
if len(reports) == 0:
raise ValueError("No reports")
order = np.argsort([r.timestamp for r in reports])
sorted_reports = [reports[i] for i in order]
sorted_weights = prior_weights[order]
timestamps = np.array([r.timestamp for r in sorted_reports])
positions = np.array([r.gps_position for r in sorted_reports])
rough_centroid = (sorted_weights[:, None] * positions).sum(axis=0) / sorted_weights.sum()
shadow_positions = []
shadow_weights = []
for i, report in enumerate(sorted_reports):
direction = rough_centroid - report.gps_position
dir_norm = np.linalg.norm(direction)
if dir_norm < 1e-3:
dir_norm = 1.0
shadow = report.gps_position + (report.estimated_distance / dir_norm) * direction
shadow_positions.append(shadow)
shadow_weights.append(sorted_weights[i])
shadow_positions = np.array(shadow_positions)
shadow_weights = np.array(shadow_weights)
if len(shadow_positions) == 1:
return shadow_positions[0]
dt_avg = max((timestamps[-1] - timestamps[0]) / max(len(timestamps) - 1, 1), 1.0)
F = np.array([[1, 0, dt_avg, 0],
[0, 1, 0, dt_avg],
[0, 0, 1, 0],
[0, 0, 0, 1]])
G = np.array([[0.5 * dt_avg**2, 0],
[0, 0.5 * dt_avg**2],
[dt_avg, 0],
[0, dt_avg]])
Q = self.Q_scale * G @ G.T
H = np.array([[1, 0, 0, 0],
[0, 1, 0, 0]])
x = np.array([shadow_positions[0, 0], shadow_positions[0, 1], 0.0, 0.0])
P = np.diag([100.0, 100.0, 10.0, 10.0])
filtered_positions = [x[:2].copy()]
for i in range(1, len(shadow_positions)):
x = F @ x
P = F @ P @ F.T + Q
r_scale = self.R_scale / (shadow_weights[i] / shadow_weights.max() + 1e-6)
R = np.eye(2) * r_scale
S = H @ P @ H.T + R
K = P @ H.T @ np.linalg.inv(S)
innovation = shadow_positions[i] - H @ x
x = x + K @ innovation
P = (np.eye(4) - K @ H) @ P
filtered_positions.append(x[:2].copy())
return np.array(filtered_positions[-1])
class LocationAggregationPipeline:
"""
End-to-end pipeline for crowdsourced device location estimation.
For static scenes:
โ WeightComposer + RobustWNLSSolver (IRLS + Huber)
For moving/metro scenes:
โ SceneClassifier confirms type
โ TrajectoryEstimator (Kalman filter over shadow positions)
โ Final IRLS refinement with latest 3-5 temporally fresh reports
"""
def __init__(
self,
outlier_rejection_percentile: float = 80.0,
min_reports_for_clustering: int = 3,
):
self.weight_composer = WeightComposer()
self.solver = RobustWNLSSolver(m_estimator='huber', huber_delta=8.0)
self.scene_classifier = SceneClassifier()
self.trajectory_estimator = TrajectoryEstimator()
self.outlier_percentile = outlier_rejection_percentile
self.min_reports = min_reports_for_clustering
def _filter_outliers(
self,
reports: List[SamaritanReport],
weights: np.ndarray,
) -> Tuple[List[SamaritanReport], np.ndarray]:
"""Remove reports with weights below the outlier threshold."""
if len(reports) <= self.min_reports:
return reports, weights
threshold = np.percentile(weights, 100 - self.outlier_percentile)
mask = weights >= threshold
if mask.sum() < self.min_reports:
top_indices = np.argsort(weights)[-self.min_reports:]
mask = np.zeros(len(weights), dtype=bool)
mask[top_indices] = True
filtered_reports = [r for r, m in zip(reports, mask) if m]
filtered_weights = weights[mask]
return filtered_reports, filtered_weights
def estimate(self, scenario: SimulationScenario) -> LocationEstimate:
"""
Main entry point. Processes a simulation scenario and returns
the estimated device location.
"""
reports = scenario.reports
total = len(reports)
if total == 0:
raise ValueError("Empty reports list")
weights = self.weight_composer.compute_weights(reports)
scene_type = self.scene_classifier.classify(reports)
filtered_reports, filtered_weights = self._filter_outliers(reports, weights)
used = len(filtered_reports)
if scene_type == 'static':
positions_arr = np.array([r.gps_position for r in filtered_reports])
w_norm = filtered_weights / filtered_weights.sum()
initial_centroid = (w_norm[:, None] * positions_arr).sum(axis=0)
distances_to_centroid = np.linalg.norm(
positions_arr - initial_centroid[None, :], axis=1
)
weighted_median_dist = np.average(distances_to_centroid, weights=filtered_weights)
dist_threshold = weighted_median_dist * 2.5 + 10.0
gps_inlier_mask = distances_to_centroid <= dist_threshold
if gps_inlier_mask.sum() >= self.min_reports:
final_reports = [r for r, m in zip(filtered_reports, gps_inlier_mask) if m]
final_weights = filtered_weights[gps_inlier_mask]
used = len(final_reports)
else:
final_reports = filtered_reports
final_weights = filtered_weights
position, residual_std, irls_w = self.solver.solve(
final_reports, final_weights
)
convergence = max(0.0, 1.0 - residual_std / 50.0)
return LocationEstimate(
position=position,
uncertainty_m=residual_std,
scene_type=scene_type,
used_reports=used,
total_reports=total,
convergence_score=convergence,
)
else:
position = self.trajectory_estimator.estimate(
filtered_reports, filtered_weights, scene_type=scene_type
)
w_norm = filtered_weights / filtered_weights.sum()
positions_arr = np.array([r.gps_position for r in filtered_reports])
centroid = (w_norm[:, None] * positions_arr).sum(axis=0)
diffs = np.linalg.norm(positions_arr - position[None, :], axis=1)
dist_est = np.array([r.estimated_distance for r in filtered_reports])
residuals = np.abs(diffs - dist_est)
residual_std = np.average(residuals, weights=filtered_weights)
return LocationEstimate(
position=position,
uncertainty_m=residual_std,
scene_type=scene_type,
used_reports=used,
total_reports=total,
convergence_score=max(0.0, 1.0 - residual_std / 100.0),
)
def baseline_weighted_centroid(scenario: SimulationScenario) -> np.ndarray:
"""
Approximate the current method: weighted centroid of all GPS reports.
No RSSI-distance correction, simple confidence-based weights.
"""
reports = scenario.reports
if not reports:
raise ValueError("Empty reports")
weights = np.array([r.confidence for r in reports])
positions = np.array([r.gps_position for r in reports])
w_norm = weights / weights.sum()
return (w_norm[:, None] * positions).sum(axis=0)
if __name__ == '__main__':
import sys
sys.path.insert(0, '.')
from data_simulator import generate_static_scenario, generate_moving_scenario
pipeline = LocationAggregationPipeline()
scene = generate_static_scenario(n_samaritans=15, seed=42)
est = pipeline.estimate(scene)
true_pos = scene.true_device_positions[0]
error = np.linalg.norm(est.position - true_pos)
baseline_pos = baseline_weighted_centroid(scene)
baseline_error = np.linalg.norm(baseline_pos - true_pos)
print(f"[STATIC] True: {true_pos}, Estimated: {est.position.round(1)}")
print(f" Error: {error:.1f}m Baseline: {baseline_error:.1f}m "
f"Scene: {est.scene_type} Reports: {est.used_reports}/{est.total_reports}")
scene_m = generate_moving_scenario(n_samaritans=6, seed=1)
est_m = pipeline.estimate(scene_m)
last_report_time = max(r.timestamp for r in scene_m.reports)
t_idx = int(last_report_time)
true_pos_m = scene_m.true_device_positions[min(t_idx, len(scene_m.true_device_positions)-1)]
error_m = np.linalg.norm(est_m.position - true_pos_m)
baseline_pos_m = baseline_weighted_centroid(scene_m)
baseline_error_m = np.linalg.norm(baseline_pos_m - true_pos_m)
print(f"\n[MOVING] True (at last obs): {true_pos_m.round(1)}, Estimated: {est_m.position.round(1)}")
print(f" Error: {error_m:.1f}m Baseline: {baseline_error_m:.1f}m "
f"Scene: {est_m.scene_type} Reports: {est_m.used_reports}/{est_m.total_reports}")