๐ป data_simulator.py
python ยท 286 lines ยท โฌ๏ธ Download
"""
BLE Crowdsourced Location Data Simulator
=========================================
Simulates the "Find My Device" scenario:
- A lost device broadcasts BLE at a fixed (or moving) location
- Good Samaritans (passersby) detect the BLE signal and report their GPS location + RSSI
BLE Path Loss Model (log-distance):
RSSI = TX_power - 10 * n * log10(d) + noise
d = 10^((TX_power - RSSI) / (10 * n))
Where:
n = path loss exponent (2.0 in free space, ~2.5-4.0 indoors)
d = estimated distance from samaritan to lost device (meters)
"""
import numpy as np
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
TX_POWER_DBM = -59
PATH_LOSS_EXP = 2.7
RSSI_NOISE_STD = 4.0
BLE_RANGE_M = 60.0
@dataclass
class SamaritanReport:
"""A single crowd-sourced location report from one samaritan."""
samaritan_id: int
gps_position: np.ndarray
rssi_dbm: float
estimated_distance: float
timestamp: float
localization_method: str
gnss_accuracy_m: float
confidence: float
@dataclass
class SimulationScenario:
"""Complete simulated scenario with ground truth."""
true_device_positions: np.ndarray
reports: List[SamaritanReport]
scenario_type: str
timestamps: np.ndarray
def rssi_to_distance(rssi: float, tx_power: float = TX_POWER_DBM,
n: float = PATH_LOSS_EXP) -> float:
"""Convert measured RSSI (dBm) to estimated distance (meters)."""
return 10 ** ((tx_power - rssi) / (10.0 * n))
def distance_to_rssi(distance: float, tx_power: float = TX_POWER_DBM,
n: float = PATH_LOSS_EXP) -> float:
"""Ideal RSSI at a given distance."""
if distance < 0.1:
distance = 0.1
return tx_power - 10.0 * n * np.log10(distance)
def sample_gps_error(method: str, rng: np.random.Generator) -> Tuple[float, float]:
"""
Returns (gps_error_std_m, confidence) for different localization methods.
Based on empirical accuracy distributions:
- GNSS: ~3-15 m (open sky) but can spike to 50-100 m in urban canyons
- WiFi: ~10-30 m typical
- Cell: ~100-500 m
Outlier probability: ~5% for GNSS, 10% for WiFi, 20% for cell
"""
outlier = rng.random()
if method == 'gnss':
if outlier < 0.05:
std = rng.uniform(40, 120)
confidence = 0.15
else:
std = rng.uniform(3, 15)
confidence = rng.uniform(0.7, 0.98)
elif method == 'wifi':
if outlier < 0.10:
std = rng.uniform(60, 150)
confidence = 0.20
else:
std = rng.uniform(10, 35)
confidence = rng.uniform(0.45, 0.75)
else:
if outlier < 0.20:
std = rng.uniform(200, 600)
confidence = 0.05
else:
std = rng.uniform(80, 300)
confidence = rng.uniform(0.10, 0.35)
return std, confidence
def generate_static_scenario(
n_samaritans: int = 20,
device_pos: Optional[np.ndarray] = None,
area_size_m: float = 200.0,
seed: int = 42,
) -> SimulationScenario:
"""
Lost device is stationary. Samaritans walk by randomly within BLE range.
Args:
n_samaritans: Number of helper reports to generate
device_pos: True device position [x, y] in meters; random if None
area_size_m: Size of simulation area
seed: RNG seed for reproducibility
"""
rng = np.random.default_rng(seed)
if device_pos is None:
device_pos = rng.uniform(50, area_size_m - 50, size=2)
reports = []
method_choices = ['gnss', 'gnss', 'gnss', 'wifi', 'wifi', 'cell']
for i in range(n_samaritans):
method = rng.choice(method_choices)
angle = rng.uniform(0, 2 * np.pi)
radius = rng.uniform(1, min(BLE_RANGE_M, area_size_m / 2))
true_samaritan_pos = device_pos + radius * np.array([np.cos(angle), np.sin(angle)])
gps_std, confidence = sample_gps_error(method, rng)
gps_error = rng.normal(0, gps_std, size=2)
reported_gps = true_samaritan_pos + gps_error
true_distance = np.linalg.norm(true_samaritan_pos - device_pos)
ideal_rssi = distance_to_rssi(true_distance)
measured_rssi = ideal_rssi + rng.normal(0, RSSI_NOISE_STD)
estimated_distance = rssi_to_distance(measured_rssi)
timestamp = rng.uniform(0, 3600)
reports.append(SamaritanReport(
samaritan_id=i,
gps_position=reported_gps,
rssi_dbm=measured_rssi,
estimated_distance=estimated_distance,
timestamp=timestamp,
localization_method=method,
gnss_accuracy_m=gps_std,
confidence=confidence,
))
return SimulationScenario(
true_device_positions=device_pos.reshape(1, 2),
reports=reports,
scenario_type='static',
timestamps=np.array([0.0]),
)
def generate_moving_scenario(
n_samaritans: int = 8,
duration_s: float = 600.0,
speed_mps: float = 1.5,
device_start: Optional[np.ndarray] = None,
scenario_type: str = 'moving',
seed: int = 99,
) -> SimulationScenario:
"""
Lost device is moving (person carrying it unknowingly, or in transit).
Samaritans are sparse โ key challenge of the problem.
Args:
n_samaritans: Few reports (typically 2-8 for moving scenario)
duration_s: Total movement duration (seconds)
speed_mps: Device movement speed (m/s)
device_start: Starting position
scenario_type: 'moving' (walk) or 'metro' (faster, more linear)
seed: RNG seed
"""
rng = np.random.default_rng(seed)
if device_start is None:
device_start = rng.uniform(100, 400, size=2)
dt = 1.0
n_steps = int(duration_s / dt)
trajectory = np.zeros((n_steps, 2))
trajectory[0] = device_start
if scenario_type == 'metro':
direction = rng.uniform(0, 2 * np.pi)
metro_speed = rng.uniform(8, 15)
for t in range(1, n_steps):
noise = rng.normal(0, 0.5, 2)
dx = metro_speed * np.array([np.cos(direction), np.sin(direction)]) * dt
trajectory[t] = trajectory[t - 1] + dx + noise
else:
heading = rng.uniform(0, 2 * np.pi)
for t in range(1, n_steps):
heading += rng.normal(0, 0.1)
dx = speed_mps * np.array([np.cos(heading), np.sin(heading)]) * dt
trajectory[t] = trajectory[t - 1] + dx
report_times = np.sort(rng.uniform(0, duration_s, size=n_samaritans))
reports = []
method_choices = ['gnss', 'gnss', 'wifi']
for i, t in enumerate(report_times):
t_idx = int(t / dt)
device_pos_at_t = trajectory[t_idx]
method = rng.choice(method_choices)
angle = rng.uniform(0, 2 * np.pi)
radius = rng.uniform(5, min(BLE_RANGE_M, 40))
true_samaritan_pos = device_pos_at_t + radius * np.array([np.cos(angle), np.sin(angle)])
gps_std, confidence = sample_gps_error(method, rng)
gps_error = rng.normal(0, gps_std, size=2)
reported_gps = true_samaritan_pos + gps_error
true_distance = np.linalg.norm(true_samaritan_pos - device_pos_at_t)
ideal_rssi = distance_to_rssi(true_distance)
measured_rssi = ideal_rssi + rng.normal(0, RSSI_NOISE_STD)
estimated_distance = rssi_to_distance(measured_rssi)
reports.append(SamaritanReport(
samaritan_id=i,
gps_position=reported_gps,
rssi_dbm=measured_rssi,
estimated_distance=estimated_distance,
timestamp=t,
localization_method=method,
gnss_accuracy_m=gps_std,
confidence=confidence,
))
return SimulationScenario(
true_device_positions=trajectory,
reports=reports,
scenario_type=scenario_type,
timestamps=np.arange(n_steps) * dt,
)
if __name__ == '__main__':
static_scene = generate_static_scenario(n_samaritans=15, seed=0)
print(f"Static scenario: device @ {static_scene.true_device_positions[0]}, "
f"{len(static_scene.reports)} reports")
moving_scene = generate_moving_scenario(n_samaritans=6, seed=1)
print(f"Moving scenario: start @ {moving_scene.true_device_positions[0]}, "
f"{len(moving_scene.reports)} reports over "
f"{moving_scene.true_device_positions.shape[0]}s")