Source code for get_weather_data.core.distance

"""Geographic distance calculations for get-weather-data."""

import math
from dataclasses import dataclass
from typing import Any, Sequence, TypeVar

# Constants for distance calculation
NAUTICAL_MILE_PER_LAT = 60.00721
NAUTICAL_MILE_PER_LON = 60.10793
RAD = math.pi / 180.0
METERS_PER_NAUTICAL_MILE = 1852

# Try to import scipy for KDTree optimization
try:
    import numpy as np
    from scipy.spatial import cKDTree

    KDTREE_AVAILABLE = True
except ImportError:
    KDTREE_AVAILABLE = False
    np = None  # type: ignore[assignment]
    cKDTree = None  # type: ignore[assignment, misc]


def meters_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    """Calculate distance between two lat/lon coordinates in meters.

    Uses a simplified spherical approximation suitable for distances
    within the continental US.

    Args:
        lat1: Latitude of first point.
        lon1: Longitude of first point.
        lat2: Latitude of second point.
        lon2: Longitude of second point.

    Returns:
        Distance in meters.
    """
    y_distance = (lat2 - lat1) * NAUTICAL_MILE_PER_LAT
    x_distance = (
        (math.cos(lat1 * RAD) + math.cos(lat2 * RAD))
        * (lon2 - lon1)
        * (NAUTICAL_MILE_PER_LON / 2)
    )

    distance = math.sqrt(y_distance**2 + x_distance**2)
    return distance * METERS_PER_NAUTICAL_MILE


[docs] @dataclass class Station: """A weather station with location.""" id: str name: str lat: float lon: float type: str state: str = "" elevation: float | None = None
@dataclass class StationDistance: """A station with its distance from a reference point.""" station: Station distance_meters: int T = TypeVar("T") class StationIndex: """Pre-built spatial index for fast nearest-neighbor queries. Build once, query many times. Uses KDTree when scipy is available. """ def __init__(self, stations: Sequence[Station]) -> None: """Build the spatial index from a list of stations.""" self.stations = [s for s in stations if s.lat is not None and s.lon is not None] self._tree: Any = None if KDTREE_AVAILABLE and self.stations: coords = np.array([(s.lat, s.lon) for s in self.stations]) self._tree = cKDTree(coords) def find_closest( self, lat: float, lon: float, n: int, max_distance_km: float | None = None, ) -> list[StationDistance]: """Query the pre-built index for closest stations. Args: lat: Latitude of reference point. lon: Longitude of reference point. n: Maximum number of stations to return. max_distance_km: Maximum distance in kilometers. Returns: List of StationDistance objects, sorted by distance. """ if not self.stations: return [] if self._tree is not None: k = min(n, len(self.stations)) distances, indices = self._tree.query([lat, lon], k=k) # Handle single result case if isinstance(distances, float): distances = [distances] indices = [indices] results = [] for dist_deg, idx in zip(distances, indices): station = self.stations[idx] dist_m = int(dist_deg * 111000) if max_distance_km is not None and dist_m > max_distance_km * 1000: break results.append(StationDistance(station=station, distance_meters=dist_m)) return results # Fallback to brute force return _find_closest_brute(lat, lon, self.stations, n, max_distance_km) def find_closest( lat: float, lon: float, stations: Sequence[Station], n: int | None = None, max_distance_km: float | None = None, ) -> list[StationDistance]: """Find closest stations to a given coordinate. Uses KDTree for O(log n) lookup when scipy is available and there are many stations. Falls back to brute force O(n) otherwise. Args: lat: Latitude of reference point. lon: Longitude of reference point. stations: Sequence of Station objects to search. n: Maximum number of stations to return (None = all). max_distance_km: Maximum distance in kilometers (None = no limit). Returns: List of StationDistance objects, sorted by distance. """ if not stations: return [] # Filter out stations without coordinates valid_stations = [s for s in stations if s.lat is not None and s.lon is not None] if not valid_stations: return [] # Use KDTree for large datasets if KDTREE_AVAILABLE and len(valid_stations) > 100: return _find_closest_kdtree(lat, lon, valid_stations, n, max_distance_km) return _find_closest_brute(lat, lon, valid_stations, n, max_distance_km) def _find_closest_kdtree( lat: float, lon: float, stations: list[Station], n: int | None, max_distance_km: float | None, ) -> list[StationDistance]: """Find closest stations using KDTree (scipy).""" coords = np.array([(s.lat, s.lon) for s in stations]) tree = cKDTree(coords) k = len(stations) if n is None else min(n, len(stations)) distances, indices = tree.query([lat, lon], k=k) # Handle single result case if isinstance(distances, float): distances = [distances] indices = [indices] results = [] for dist_deg, idx in zip(distances, indices): station = stations[idx] # Convert degree distance to meters (approximate) dist_m = int(dist_deg * 111000) if max_distance_km is not None and dist_m > max_distance_km * 1000: break results.append(StationDistance(station=station, distance_meters=dist_m)) return results def _find_closest_brute( lat: float, lon: float, stations: list[Station], n: int | None, max_distance_km: float | None, ) -> list[StationDistance]: """Find closest stations using brute force O(n) search.""" results = [] for station in stations: try: dist = meters_distance(lat, lon, station.lat, station.lon) except Exception: continue if max_distance_km is not None and dist > max_distance_km * 1000: continue results.append(StationDistance(station=station, distance_meters=int(dist))) # Sort by distance results.sort(key=lambda x: x.distance_meters) if n is not None: results = results[:n] return results def f2c(fahrenheit: float) -> float: """Convert Fahrenheit to Celsius.""" return (fahrenheit - 32) * 5.0 / 9.0 def c2f(celsius: float) -> float: """Convert Celsius to Fahrenheit.""" return celsius * 9.0 / 5.0 + 32 def knots_to_ms(knots: float) -> float: """Convert knots to meters per second.""" return 0.51444 * knots def ms_to_knots(ms: float) -> float: """Convert meters per second to knots.""" return ms / 0.51444