๐Ÿ’ป data_simulator.py

python ยท 286 lines ยท โฌ‡๏ธ Download

"""
BLE Crowdsourced Location Data Simulator
=========================================
Simulates the "Find My Device" scenario:
- A lost device broadcasts BLE at a fixed (or moving) location
- Good Samaritans (passersby) detect the BLE signal and report their GPS location + RSSI

BLE Path Loss Model (log-distance):
    RSSI = TX_power - 10 * n * log10(d) + noise
    d = 10^((TX_power - RSSI) / (10 * n))

Where:
    n     = path loss exponent (2.0 in free space, ~2.5-4.0 indoors)
    d     = estimated distance from samaritan to lost device (meters)
"""

import numpy as np
from dataclasses import dataclass, field
from typing import List, Optional, Tuple


# ---------------------------------------------------------------------------
# BLE radio parameters
# ---------------------------------------------------------------------------
TX_POWER_DBM = -59        # RSSI at 1 meter (calibration constant)
PATH_LOSS_EXP = 2.7       # Urban / mixed environment
RSSI_NOISE_STD = 4.0      # dBm, Gaussian noise on RSSI measurement
BLE_RANGE_M = 60.0        # Maximum practical BLE detection range (meters)


@dataclass
class SamaritanReport:
    """A single crowd-sourced location report from one samaritan."""
    samaritan_id: int
    gps_position: np.ndarray      # [lon, lat] in meters (projected)
    rssi_dbm: float               # Measured BLE RSSI
    estimated_distance: float     # d = f(RSSI) in meters
    timestamp: float              # Unix timestamp (seconds)
    localization_method: str      # 'gnss', 'wifi', 'cell'
    gnss_accuracy_m: float        # Reported GPS accuracy (meters, 1-sigma)
    confidence: float             # Composite confidence โˆˆ (0, 1]


@dataclass
class SimulationScenario:
    """Complete simulated scenario with ground truth."""
    true_device_positions: np.ndarray   # (T, 2) trajectory; T=1 for static
    reports: List[SamaritanReport]
    scenario_type: str                  # 'static' | 'moving' | 'metro'
    timestamps: np.ndarray              # time axis for trajectory


# ---------------------------------------------------------------------------
# RSSI <-> distance conversion
# ---------------------------------------------------------------------------

def rssi_to_distance(rssi: float, tx_power: float = TX_POWER_DBM,
                     n: float = PATH_LOSS_EXP) -> float:
    """Convert measured RSSI (dBm) to estimated distance (meters)."""
    return 10 ** ((tx_power - rssi) / (10.0 * n))


def distance_to_rssi(distance: float, tx_power: float = TX_POWER_DBM,
                     n: float = PATH_LOSS_EXP) -> float:
    """Ideal RSSI at a given distance."""
    if distance < 0.1:
        distance = 0.1
    return tx_power - 10.0 * n * np.log10(distance)


# ---------------------------------------------------------------------------
# Samaritan GPS error model
# ---------------------------------------------------------------------------

def sample_gps_error(method: str, rng: np.random.Generator) -> Tuple[float, float]:
    """
    Returns (gps_error_std_m, confidence) for different localization methods.
    Based on empirical accuracy distributions:
      - GNSS:  ~3-15 m (open sky) but can spike to 50-100 m in urban canyons
      - WiFi:  ~10-30 m typical
      - Cell:  ~100-500 m
    Outlier probability: ~5% for GNSS, 10% for WiFi, 20% for cell
    """
    outlier = rng.random()

    if method == 'gnss':
        if outlier < 0.05:           # Urban canyon / multipath outlier
            std = rng.uniform(40, 120)
            confidence = 0.15
        else:
            std = rng.uniform(3, 15)
            confidence = rng.uniform(0.7, 0.98)
    elif method == 'wifi':
        if outlier < 0.10:
            std = rng.uniform(60, 150)
            confidence = 0.20
        else:
            std = rng.uniform(10, 35)
            confidence = rng.uniform(0.45, 0.75)
    else:  # cell
        if outlier < 0.20:
            std = rng.uniform(200, 600)
            confidence = 0.05
        else:
            std = rng.uniform(80, 300)
            confidence = rng.uniform(0.10, 0.35)

    return std, confidence


# ---------------------------------------------------------------------------
# Static scenario generator
# ---------------------------------------------------------------------------

def generate_static_scenario(
    n_samaritans: int = 20,
    device_pos: Optional[np.ndarray] = None,
    area_size_m: float = 200.0,
    seed: int = 42,
) -> SimulationScenario:
    """
    Lost device is stationary. Samaritans walk by randomly within BLE range.

    Args:
        n_samaritans: Number of helper reports to generate
        device_pos:   True device position [x, y] in meters; random if None
        area_size_m:  Size of simulation area
        seed:         RNG seed for reproducibility
    """
    rng = np.random.default_rng(seed)

    if device_pos is None:
        device_pos = rng.uniform(50, area_size_m - 50, size=2)

    reports = []
    method_choices = ['gnss', 'gnss', 'gnss', 'wifi', 'wifi', 'cell']  # distribution

    for i in range(n_samaritans):
        method = rng.choice(method_choices)

        # Samaritan GPS position: uniformly distributed within BLE range
        # (in practice they are within ~50 m to trigger detection)
        angle = rng.uniform(0, 2 * np.pi)
        radius = rng.uniform(1, min(BLE_RANGE_M, area_size_m / 2))
        true_samaritan_pos = device_pos + radius * np.array([np.cos(angle), np.sin(angle)])

        # GPS error applied to samaritan's own position
        gps_std, confidence = sample_gps_error(method, rng)
        gps_error = rng.normal(0, gps_std, size=2)
        reported_gps = true_samaritan_pos + gps_error

        # True distance from samaritan to device
        true_distance = np.linalg.norm(true_samaritan_pos - device_pos)

        # RSSI with log-normal shadowing noise
        ideal_rssi = distance_to_rssi(true_distance)
        measured_rssi = ideal_rssi + rng.normal(0, RSSI_NOISE_STD)
        estimated_distance = rssi_to_distance(measured_rssi)

        timestamp = rng.uniform(0, 3600)   # spread over 1 hour

        reports.append(SamaritanReport(
            samaritan_id=i,
            gps_position=reported_gps,
            rssi_dbm=measured_rssi,
            estimated_distance=estimated_distance,
            timestamp=timestamp,
            localization_method=method,
            gnss_accuracy_m=gps_std,
            confidence=confidence,
        ))

    return SimulationScenario(
        true_device_positions=device_pos.reshape(1, 2),
        reports=reports,
        scenario_type='static',
        timestamps=np.array([0.0]),
    )


# ---------------------------------------------------------------------------
# Moving scenario generator (e.g. device in bag, walking/metro)
# ---------------------------------------------------------------------------

def generate_moving_scenario(
    n_samaritans: int = 8,
    duration_s: float = 600.0,
    speed_mps: float = 1.5,
    device_start: Optional[np.ndarray] = None,
    scenario_type: str = 'moving',
    seed: int = 99,
) -> SimulationScenario:
    """
    Lost device is moving (person carrying it unknowingly, or in transit).
    Samaritans are sparse โ€” key challenge of the problem.

    Args:
        n_samaritans: Few reports (typically 2-8 for moving scenario)
        duration_s:   Total movement duration (seconds)
        speed_mps:    Device movement speed (m/s)
        device_start: Starting position
        scenario_type: 'moving' (walk) or 'metro' (faster, more linear)
        seed:         RNG seed
    """
    rng = np.random.default_rng(seed)

    if device_start is None:
        device_start = rng.uniform(100, 400, size=2)

    # Generate device trajectory via random walk (or linear for metro)
    dt = 1.0   # 1 second steps
    n_steps = int(duration_s / dt)
    trajectory = np.zeros((n_steps, 2))
    trajectory[0] = device_start

    if scenario_type == 'metro':
        # Linear direction with slight noise (metro line)
        direction = rng.uniform(0, 2 * np.pi)
        metro_speed = rng.uniform(8, 15)   # m/s
        for t in range(1, n_steps):
            noise = rng.normal(0, 0.5, 2)   # small perpendicular noise
            dx = metro_speed * np.array([np.cos(direction), np.sin(direction)]) * dt
            trajectory[t] = trajectory[t - 1] + dx + noise
    else:
        # Pedestrian random walk with momentum (smooth direction changes)
        heading = rng.uniform(0, 2 * np.pi)
        for t in range(1, n_steps):
            heading += rng.normal(0, 0.1)   # slow heading change
            dx = speed_mps * np.array([np.cos(heading), np.sin(heading)]) * dt
            trajectory[t] = trajectory[t - 1] + dx

    # Sparse samaritan reports at random times along the trajectory
    report_times = np.sort(rng.uniform(0, duration_s, size=n_samaritans))
    reports = []
    method_choices = ['gnss', 'gnss', 'wifi']

    for i, t in enumerate(report_times):
        t_idx = int(t / dt)
        device_pos_at_t = trajectory[t_idx]

        method = rng.choice(method_choices)
        angle = rng.uniform(0, 2 * np.pi)
        radius = rng.uniform(5, min(BLE_RANGE_M, 40))
        true_samaritan_pos = device_pos_at_t + radius * np.array([np.cos(angle), np.sin(angle)])

        gps_std, confidence = sample_gps_error(method, rng)
        gps_error = rng.normal(0, gps_std, size=2)
        reported_gps = true_samaritan_pos + gps_error

        true_distance = np.linalg.norm(true_samaritan_pos - device_pos_at_t)
        ideal_rssi = distance_to_rssi(true_distance)
        measured_rssi = ideal_rssi + rng.normal(0, RSSI_NOISE_STD)
        estimated_distance = rssi_to_distance(measured_rssi)

        reports.append(SamaritanReport(
            samaritan_id=i,
            gps_position=reported_gps,
            rssi_dbm=measured_rssi,
            estimated_distance=estimated_distance,
            timestamp=t,
            localization_method=method,
            gnss_accuracy_m=gps_std,
            confidence=confidence,
        ))

    return SimulationScenario(
        true_device_positions=trajectory,
        reports=reports,
        scenario_type=scenario_type,
        timestamps=np.arange(n_steps) * dt,
    )


# ---------------------------------------------------------------------------
# Quick sanity check
# ---------------------------------------------------------------------------
if __name__ == '__main__':
    static_scene = generate_static_scenario(n_samaritans=15, seed=0)
    print(f"Static scenario: device @ {static_scene.true_device_positions[0]}, "
          f"{len(static_scene.reports)} reports")

    moving_scene = generate_moving_scenario(n_samaritans=6, seed=1)
    print(f"Moving scenario: start @ {moving_scene.true_device_positions[0]}, "
          f"{len(moving_scene.reports)} reports over "
          f"{moving_scene.true_device_positions.shape[0]}s")