diff --git a/geocoder.py b/geocoder.py new file mode 100644 index 0000000..72cb635 --- /dev/null +++ b/geocoder.py @@ -0,0 +1,411 @@ +# -*- coding: utf-8 -*- +import os +import re +import sqlite3 +from typing import Optional, Dict, Any, Tuple, List + +# 1) Чтение OSM .osm.pbf и извлечение addr:* через pyosmium +import osmium # pip install pyosmium + +# 2) Парсинг/нормализация адресов через libpostal +# Требуется установленная системная библиотека libpostal и Python-биндинги "postal" +from postal.parser import parse_address +from postal.expand import expand_address +import json +# 3) Нечеткий поиск (для автокоррекции) +from rapidfuzz import process, fuzz, utils # pip install rapidfuzz +import math +# (Опционально) геометрический центроид для way, если доступен shapely +try: + from shapely.geometry import LineString, Polygon + HAVE_SHAPELY = True +except Exception: + HAVE_SHAPELY = False + +DB_SCHEMA = """ +PRAGMA journal_mode=WAL; +CREATE TABLE IF NOT EXISTS street_names ( + street_norm TEXT PRIMARY KEY, + street_original TEXT +); +CREATE TABLE IF NOT EXISTS address_index ( + street_norm TEXT, + housenumber_norm TEXT, + street_original TEXT, + housenumber_original TEXT, + city TEXT, + postcode TEXT, + lat REAL, + lon REAL +); +CREATE INDEX IF NOT EXISTS idx_address_street ON address_index(street_norm); +CREATE INDEX IF NOT EXISTS idx_address_street_house ON address_index(street_norm,housenumber_norm); +""" + +# --------------------------- +# Минимальная предобработка +# --------------------------- + +_DASHES = {"–": "-", "—": "-", "−": "-"} + +def _preclean_text(s: str) -> str: + """Минимальная очистка для устойчивости парсера: регистр, дефисы, пробелы, буква-цифра разделение.""" + s = (s or "").strip().lower().replace("ё", "е") + for k, v in _DASHES.items(): + s = s.replace(k, v) + s = re.sub(r"\s+", " ", s) + # Вставить пробел на границе буква<->цифра, чтобы "дом43" -> "дом 43" + s = re.sub(r"(?<=[A-Za-zА-Яа-я])(?=\d)|(?<=\d)(?=[A-Za-zА-Яа-я])", " ", s) + return s.strip() + +def _parts_from_libpostal(addr_text: str) -> Dict[str, str]: + """Собирает метка->значение из parse_address без сложных правил.""" + parts: Dict[str, str] = {} + for comp, label in parse_address(addr_text): + comp = " ".join(comp.split()) + if not comp: + continue + parts[label] = (parts[label] + " " + comp).strip() if label in parts else comp + return parts + +def _prefer_cyrillic_min(expansions: List[str]) -> Optional[str]: + """Выбрать «лучший» вариант из expand_address: предпочесть кириллицу и минимальную длину.""" + if not expansions: + return None + cyr = [e for e in expansions if re.search(r"[А-Яа-я]", e)] + pool = cyr if cyr else expansions + pool = [" ".join(e.split()) for e in pool] + pool.sort(key=len) + return pool[0] if pool else None + +def canonicalize_road(road: str) -> str: + """Канонизация названия улицы через expand_address с минимальной эвристикой выбора.""" + if not road: + return "" + exp = expand_address(road) + return _prefer_cyrillic_min(exp) or road + +def _clean_token(s: str) -> str: + return " ".join((s or "").strip().lower().replace("ё", "е").split()) + +def _unit_suffix_and_value(text: str) -> str: + """ + Преобразует текст unit/доп.компонента в компактный русскоязычный суффикс: + - 'корпус 1' / 'к 1' -> 'К1' + - 'строение 2' / 'стр 2' -> 'С2' + - 'лит а' -> 'ЛА' + - 'влад 5' -> 'ВЛ5' + Если тип не распознан, по умолчанию считаем корпус 'К' (без тяжёлых регексов). + """ + t = _clean_token(text) + # выделяем буквы/цифры (минимум регексов: только удалить пробелы) + val = t.replace(" ", "") + # эвристики по подстрокам + if "строен" in t or t.startswith("стр"): + # строение + val = val.replace("строение", "").replace("стр", "") + return f"С{val}".upper() if val else "С" + if "корп" in t or t == "к" or t.startswith("к"): + # корпус + val = val.replace("корпус", "").replace("корп", "").replace("к", "") + return f"К{val}".upper() if val else "К" + if t.startswith("лит"): + # литера + val = val.replace("литера", "").replace("лит", "") + return f"Л{val}".upper() if val else "Л" + if t.startswith("влад"): + # владение + val = val.replace("владение", "").replace("влад", "") + return f"ВЛ{val}".upper() if val else "ВЛ" + # вход/подъезд/лестница можно при желании кодировать как 'П', 'ЛС', но обычно это не часть addr:housenumber в OSM + # по умолчанию — трактуем как корпус + # оставим только цифро-буквенную часть после удаления пробелов + return f"К{val}".upper() if val else "" + +def build_house_number_from_parts(parts: Dict[str, str], original_line: str) -> str: + """ + Формирует финальный housenumber из libpostal: + - base = house_number + - добавляет суффиксы из unit/entrance/staircase/level, конвертируя их в 'К..'/'С..'/'Л..'/'ВЛ..' + """ + base = (parts.get("house_number") or "").strip() + # защитимся от «дом43» и т.п. минимальной разделиловкой (сделано ранее в _preclean_text) + suffixes = [] + + # unit часто содержит корпус/строение + if parts.get("unit"): + suffixes.append(_unit_suffix_and_value(parts["unit"])) + + # некоторые адреса могут класть строение/литеру в entrance/staircase + for key in ("entrance", "staircase"): + if parts.get(key): + sfx = _unit_suffix_and_value(parts[key]) + if sfx: + suffixes.append(sfx) + + # level — это этаж, обычно не часть housenumber в OSM; по умолчанию не добавляем + + # если базовый номер пуст — попробуем через expand_address всей строки (без сложных правил) + if not base: + for e in expand_address(original_line): + p2 = _parts_from_libpostal(e) + b2 = (p2.get("house_number") or "").strip() + if b2: + base = b2 + if p2.get("unit") and not suffixes: + suffixes.append(_unit_suffix_and_value(p2["unit"])) + break + + # склеиваем: пробелы убираем, верхний регистр для стабильности + hn = "".join(base.split()).upper() + "".join(suffixes) + return hn + +def normalize_street_for_index(street_original: str) -> str: + """Нормализация ключа улицы для индекса (регистронезависимая, с лексической очисткой).""" + # Используем обработчик RapidFuzz по умолчанию + канонизацию libpostal + street_clean = _preclean_text(street_original) + street_canon = canonicalize_road(street_clean) + return utils.default_process(street_canon) or street_canon + +def normalize_house_for_index(house_original: str) -> str: + """Простая нормализация номера для индекса: убрать пробелы, привести к верхнему регистру.""" + return "".join((house_original or "").split()).upper() + + + +# --------------------------- +# Поиск/валидация адреса +# --------------------------- + +def parse_with_libpostal(addr: str) -> Dict[str, str]: + """ + Минимально-шумоустойчивая раскладка через libpostal. + """ + txt = _preclean_text(addr) + parts = _parts_from_libpostal(txt) + road = parts.get("road") or parts.get("pedestrian") or parts.get("footway") or parts.get("residential") or "" + house = build_house_number_from_parts(parts, txt) + city = parts.get("city") or parts.get("city_district") or parts.get("suburb") or "" + postcode = parts.get("postcode", "") + # Канонизируем улицу для сравнения + road_canon = canonicalize_road(road) + return { + "road": road_canon or road or "", + "house_number": house or "", + "city": city or "", + "postcode": postcode or "" + } + +def best_street_match(conn: sqlite3.Connection, street_query: str, cutoff: int = 86) -> Optional[Tuple[str, str]]: + """ + Ищет с учётом опечаток: exact по нормализованному ключу, иначе WRatio к ближайшему. + """ + s_key = normalize_street_for_index(street_query) + cur = conn.execute("SELECT street_original FROM street_names WHERE street_norm = ?", (s_key,)) + row = cur.fetchone() + if row: + return (s_key, row[0]) + + cur = conn.execute("SELECT street_norm, street_original FROM street_names") + rows = cur.fetchall() + if not rows: + return None + choices = [r[0] for r in rows] + match = process.extractOne(s_key, choices, scorer=fuzz.WRatio, processor=None, score_cutoff=cutoff) + if match: + matched_norm = match[0] + for s_n, s_o in rows: + if s_n == matched_norm: + return (s_n, s_o) + return None + +def _base_digits(s: str) -> str: + m = re.search(r"\d+", s or "") + return m.group(0) if m else "" + +def best_house_match(conn, street_norm: str, house_query: str, cutoff: int = 82) -> Optional[Tuple[str, str]]: + # Простой канон запроса: убрать пробелы, верхний регистр + q_norm = ("".join((house_query or "").split())).upper() + q_base = _base_digits(q_norm) + + # Вытаскиваем все номера по улице + cur = conn.execute( + "SELECT housenumber_norm, housenumber_original FROM address_index WHERE street_norm = ?", + (street_norm,) + ) + rows = cur.fetchall() + if not rows: + return None + + # 1) Точное совпадение целиком + for hn, ho in rows: + if hn == q_norm: + return (hn, ho) + + # 2) Жёсткий фильтр по базовому числу (обязательное совпадение) + if q_base: + filtered: List[Tuple[str, str]] = [(hn, ho) for hn, ho in rows if _base_digits(hn) == q_base] + else: + filtered = rows + + if not filtered: + return None + + # 3) Нечёткий матч только среди отфильтрованных кандидатов + choices = [hn for hn, _ in filtered] + match = process.extractOne(q_norm, choices, scorer=fuzz.WRatio, processor=None, score_cutoff=cutoff) + if match: + best_hn = match[0] + for hn, ho in filtered: + if hn == best_hn: + return (hn, ho) + + # 4) Fallback: если остался один кандидат с таким же базовым числом — принять его + if len(filtered) == 1: + return filtered[0] + + return None + +def _interp_point_along(coords: List[List[float]], t: float) -> Dict[str,float]: + # coords: [[lon,lat], ...], t∈[0,1] по длине полилинии + def dist(a,b): + dx=a[0]-b[0]; dy=a[1]-b[1]; return math.hypot(dx, dy) + seg_len=[]; total=0.0 + for i in range(len(coords)-1): + d=dist(coords[i], coords[i+1]); seg_len.append(d); total += d + if total==0.0: + return {"lat": coords[0][1], "lon": coords[0][0]} + target = t*total; acc=0.0 + for i,d in enumerate(seg_len): + if acc + d >= target: + ratio = 0.0 if d==0 else (target-acc)/d + lon = coords[i][0] + (coords[i+1][0]-coords[i][0])*ratio + lat = coords[i][1] + (coords[i+1][1]-coords[i][1])*ratio + return {"lat": float(lat), "lon": float(lon)} + acc += d + return {"lat": coords[-1][1], "lon": coords[-1][0]} + +def _fits_type(n:int, itype:str)->bool: + itype=(itype or "").lower() + if itype=="odd": return n%2==1 + if itype=="even": return n%2==0 + return True # 'all' или неизвестный тип + +def find_interpolation_coord(conn: sqlite3.Connection, street_norm: str, house_query: str) -> Optional[Dict[str,float]]: + m = re.search(r"\d+", house_query or "") + if not m: + return None + n = int(m.group(0)) + cur = conn.execute( + "SELECT itype, start_num, end_num, coords_json FROM interpolation_ways WHERE street_norm=?", + (street_norm,) + ) + best=None + for itype, s_i, e_i, coords_json in cur.fetchall(): + if n < min(s_i,e_i) or n > max(s_i,e_i): + continue + if not _fits_type(n, itype): + continue + coords = json.loads(coords_json) + # позиция вдоль диапазона с плавающей точкой + span = (e_i - s_i) if (e_i - s_i)!=0 else 1 + t = (n - s_i) / span + t = max(0.0, min(1.0, t)) + return _interp_point_along(coords, t) + return best + +def get_street_centroid_any(conn: sqlite3.Connection, street_norm: str) -> Optional[Dict[str, float]]: + # 1) по адресным точкам + cur = conn.execute( + "SELECT AVG(lat), AVG(lon) FROM address_index WHERE street_norm=? AND lat IS NOT NULL AND lon IS NOT NULL", + (street_norm,) + ) + lat, lon = cur.fetchone() or (None,None) + if lat is not None and lon is not None: + return {"lat": float(lat), "lon": float(lon)} + # 2) по геометрии улиц (highway) + cur = conn.execute("SELECT lat, lon FROM street_center WHERE street_norm=?",(street_norm,)) + row = cur.fetchone() + if row: + return {"lat": float(row[0]), "lon": float(row[1])} + return None + +def validate_address(sqlite_path: str, address_str: str) -> Dict[str, Any]: + comps = parse_with_libpostal(address_str) + street_in = comps.get("road", "") + house_in = comps.get("house_number", "") + + conn = sqlite3.connect(sqlite_path) + try: + # улица обязательна + if not street_in: + return { + "valid": False, + "corrected": None, + "components": comps, + "coordinates": None, + "reason": "Не удалось распознать улицу" + } + + st = best_street_match(conn, street_in) + if not st: + return { + "valid": False, + "corrected": None, + "components": comps, + "coordinates": None, + "reason": "Улица не найдена в локальной базе" + } + street_norm, street_orig = st + + # 1) дом не введён: вернуть координату улицы (centroid) + if not house_in: + cent = get_street_centroid_any(conn, street_norm) + return { + "valid": True if cent else False, + "corrected": street_orig, + "components": comps, + "coordinates": cent, + "reason": None if cent else "Для улицы нет координатной опоры" + } + + # 2) дом введён: пробуем точное сопоставление + hn = best_house_match(conn, street_norm, house_in) + if hn: + house_norm, house_orig = hn + cur = conn.execute( + "SELECT city, postcode, lat, lon FROM address_index WHERE street_norm=? AND housenumber_norm=? LIMIT 1", + (street_norm, house_norm) + ) + row = cur.fetchone() + city, postcode, lat, lon = (row or (None,None,None,None)) + coords = {"lat": float(lat), "lon": float(lon)} if lat is not None and lon is not None else None + # если почему-то lat/lon пустые (редкий случай), дополнительно попробуем интерполяцию/центроид + if coords is None: + coords = find_interpolation_coord(conn, street_norm, house_norm) or get_street_centroid_any(conn, street_norm) + corrected = ", ".join(x for x in [f"{street_orig} {house_orig}", city, postcode] if x and str(x).strip()) + return { + "valid": True, + "corrected": corrected, + "components": comps, + "coordinates": coords, + "reason": None + } + + # 3) дом введён, но не найден: invalid, однако координаты улицы — обязательно + fallback = find_interpolation_coord(conn, street_norm, house_in) or get_street_centroid_any(conn, street_norm) + return { + "valid": False, + "corrected": None, + "components": comps, + "coordinates": fallback, + "reason": "Номер дома отсутствует на найденной улице" + } + finally: + conn.close() + +if __name__ == "__main__": + + res = validate_address("addresses.sqlite", "ул. Большая Покровская, 15") + print(res) + pass