import re import sqlite3 from typing import Optional, Dict, Any, Tuple, List import json from rapidfuzz import process, fuzz, utils import math from libpostal_service import LibpostalService lp_service = LibpostalService() # --------------------------- # Минимальная предобработка # --------------------------- _DASHES = {"–": "-", "—": "-", "−": "-"} def _preclean_text(s: str) -> str: """Минимальная очистка для устойчивости парсера: регистр, дефисы, пробелы, буква-цифра разделение.""" s = (s or "").strip().lower().replace("ё", "е") for k, v in _DASHES.items(): s = s.replace(k, v) s = re.sub(r"\s+", " ", s) # Вставить пробел на границе буква<->цифра, чтобы "дом43" -> "дом 43" s = re.sub(r"(?<=[A-Za-zА-Яа-я])(?=\d)|(?<=\d)(?=[A-Za-zА-Яа-я])", " ", s) return s.strip() def _parts_from_libpostal(addr_text: str) -> Dict[str, str]: """Собирает метка->значение из parse_address без сложных правил, работая с ответом от сервиса.""" parts: Dict[str, str] = {} lp_response = lp_service.parse_address(addr_text) for item in lp_response: label = item.get('label') comp = item.get('value') if not comp or not label: continue comp = " ".join(comp.split()) if not comp: continue parts[label] = (parts[label] + " " + comp).strip() if label in parts else comp return parts def _prefer_cyrillic_min(expansions: List[str]) -> Optional[str]: """Выбрать «лучший» вариант из expand_address: предпочесть кириллицу и минимальную длину.""" if not expansions: return None cyr = [e for e in expansions if re.search(r"[А-Яа-я]", e)] pool = cyr if cyr else expansions pool = [" ".join(e.split()) for e in pool] pool.sort(key=len) return pool[0] if pool else None def canonicalize_road(road: str) -> str: """Канонизация названия улицы через expand_address с минимальной эвристикой выбора.""" if not road: return "" exp = lp_service.expand_address(road) return _prefer_cyrillic_min(exp) or road def _clean_token(s: str) -> str: return " ".join((s or "").strip().lower().replace("ё", "е").split()) def _unit_suffix_and_value(text: str) -> str: """ Преобразует текст unit/доп.компонента в компактный русскоязычный суффикс: - 'корпус 1' / 'к 1' -> 'К1' - 'строение 2' / 'стр 2' -> 'С2' - 'лит а' -> 'ЛА' - 'влад 5' -> 'ВЛ5' Если тип не распознан, по умолчанию считаем корпус 'К' (без тяжёлых регексов). """ t = _clean_token(text) # выделяем буквы/цифры (минимум регексов: только удалить пробелы) val = t.replace(" ", "") # эвристики по подстрокам if "строен" in t or t.startswith("стр"): # строение val = val.replace("строение", "").replace("стр", "") return f"С{val}".upper() if val else "С" if "корп" in t or t == "к" or t.startswith("к"): # корпус val = val.replace("корпус", "").replace("корп", "").replace("к", "") return f"К{val}".upper() if val else "К" if t.startswith("лит"): # литера val = val.replace("литера", "").replace("лит", "") return f"Л{val}".upper() if val else "Л" if t.startswith("влад"): # владение val = val.replace("владение", "").replace("влад", "") return f"ВЛ{val}".upper() if val else "ВЛ" # вход/подъезд/лестница можно при желании кодировать как 'П', 'ЛС', но обычно это не часть addr:housenumber в OSM # по умолчанию — трактуем как корпус # оставим только цифро-буквенную часть после удаления пробелов return f"К{val}".upper() if val else "" def build_house_number_from_parts(parts: Dict[str, str], original_line: str) -> str: """ Формирует финальный housenumber из libpostal: - base = house_number - добавляет суффиксы из unit/entrance/staircase/level, конвертируя их в 'К..'/'С..'/'Л..'/'ВЛ..' """ base = (parts.get("house_number") or "").strip() # защитимся от «дом43» и т.п. минимальной разделиловкой (сделано ранее в _preclean_text) suffixes = [] # unit часто содержит корпус/строение if parts.get("unit"): suffixes.append(_unit_suffix_and_value(parts["unit"])) # некоторые адреса могут класть строение/литеру в entrance/staircase for key in ("entrance", "staircase"): if parts.get(key): sfx = _unit_suffix_and_value(parts[key]) if sfx: suffixes.append(sfx) # level — это этаж, обычно не часть housenumber в OSM; по умолчанию не добавляем # если базовый номер пуст — попробуем через expand_address всей строки (без сложных правил) if not base: for e in lp_service.expand_address(original_line): p2 = _parts_from_libpostal(e) b2 = (p2.get("house_number") or "").strip() if b2: base = b2 if p2.get("unit") and not suffixes: suffixes.append(_unit_suffix_and_value(p2["unit"])) break # склеиваем: пробелы убираем, верхний регистр для стабильности hn = "".join(base.split()).upper() + "".join(suffixes) return hn def normalize_street_for_index(street_original: str) -> str: """Нормализация ключа улицы для индекса (регистронезависимая, с лексической очисткой).""" # Используем обработчик RapidFuzz по умолчанию + канонизацию libpostal street_clean = _preclean_text(street_original) street_canon = canonicalize_road(street_clean) return utils.default_process(street_canon) or street_canon def normalize_house_for_index(house_original: str) -> str: """Простая нормализация номера для индекса: убрать пробелы, привести к верхнему регистру.""" return "".join((house_original or "").split()).upper() # --------------------------- # Поиск/валидация адреса # --------------------------- def parse_with_libpostal(addr: str) -> Dict[str, str]: """ Минимально-шумоустойчивая раскладка через libpostal. """ txt = _preclean_text(addr) parts = _parts_from_libpostal(txt) print("Parts:", parts) road = parts.get("road") or parts.get("pedestrian") or parts.get("footway") or parts.get("residential") or "" house = build_house_number_from_parts(parts, txt) city = parts.get("city") or parts.get("city_district") or parts.get("suburb") or "" postcode = parts.get("postcode", "") # Канонизируем улицу для сравнения road_canon = canonicalize_road(road) return { "road": road_canon or road or "", "house_number": house or "", "city": city or "", "postcode": postcode or "" } def best_street_match(conn: sqlite3.Connection, street_query: str, cutoff: int = 86) -> Optional[Tuple[str, str]]: """ Ищет с учётом опечаток: exact по нормализованному ключу, иначе WRatio к ближайшему. """ s_key = normalize_street_for_index(street_query) cur = conn.execute("SELECT street_original FROM street_names WHERE street_norm = ?", (s_key,)) row = cur.fetchone() if row: return (s_key, row[0]) cur = conn.execute("SELECT street_norm, street_original FROM street_names") rows = cur.fetchall() if not rows: return None choices = [r[0] for r in rows] match = process.extractOne(s_key, choices, scorer=fuzz.WRatio, processor=None, score_cutoff=cutoff) if match: matched_norm = match[0] for s_n, s_o in rows: if s_n == matched_norm: return (s_n, s_o) return None def _base_digits(s: str) -> str: m = re.search(r"\d+", s or "") return m.group(0) if m else "" def best_house_match(conn, street_norm: str, house_query: str, cutoff: int = 82) -> Optional[Tuple[str, str]]: # Простой канон запроса: убрать пробелы, верхний регистр q_norm = ("".join((house_query or "").split())).upper() q_base = _base_digits(q_norm) # Вытаскиваем все номера по улице cur = conn.execute( "SELECT housenumber_norm, housenumber_original FROM address_index WHERE street_norm = ?", (street_norm,) ) rows = cur.fetchall() if not rows: return None # 1) Точное совпадение целиком for hn, ho in rows: if hn == q_norm: return (hn, ho) # 2) Жёсткий фильтр по базовому числу (обязательное совпадение) if q_base: filtered: List[Tuple[str, str]] = [(hn, ho) for hn, ho in rows if _base_digits(hn) == q_base] else: filtered = rows if not filtered: return None # 3) Нечёткий матч только среди отфильтрованных кандидатов choices = [hn for hn, _ in filtered] match = process.extractOne(q_norm, choices, scorer=fuzz.WRatio, processor=None, score_cutoff=cutoff) if match: best_hn = match[0] for hn, ho in filtered: if hn == best_hn: return (hn, ho) # 4) Fallback: если остался один кандидат с таким же базовым числом — принять его if len(filtered) == 1: return filtered[0] return None def _interp_point_along(coords: List[List[float]], t: float) -> Dict[str,float]: # coords: [[lon,lat], ...], t∈[0,1] по длине полилинии def dist(a,b): dx=a[0]-b[0]; dy=a[1]-b[1]; return math.hypot(dx, dy) seg_len=[]; total=0.0 for i in range(len(coords)-1): d=dist(coords[i], coords[i+1]); seg_len.append(d); total += d if total==0.0: return {"lat": coords[0][1], "lon": coords[0][0]} target = t*total; acc=0.0 for i,d in enumerate(seg_len): if acc + d >= target: ratio = 0.0 if d==0 else (target-acc)/d lon = coords[i][0] + (coords[i+1][0]-coords[i][0])*ratio lat = coords[i][1] + (coords[i+1][1]-coords[i][1])*ratio return {"lat": float(lat), "lon": float(lon)} acc += d return {"lat": coords[-1][1], "lon": coords[-1][0]} def _fits_type(n:int, itype:str)->bool: itype=(itype or "").lower() if itype=="odd": return n%2==1 if itype=="even": return n%2==0 return True # 'all' или неизвестный тип def find_interpolation_coord(conn: sqlite3.Connection, street_norm: str, house_query: str) -> Optional[Dict[str,float]]: m = re.search(r"\d+", house_query or "") if not m: return None n = int(m.group(0)) cur = conn.execute( "SELECT itype, start_num, end_num, coords_json FROM interpolation_ways WHERE street_norm=?", (street_norm,) ) best=None for itype, s_i, e_i, coords_json in cur.fetchall(): if n < min(s_i,e_i) or n > max(s_i,e_i): continue if not _fits_type(n, itype): continue coords = json.loads(coords_json) # позиция вдоль диапазона с плавающей точкой span = (e_i - s_i) if (e_i - s_i)!=0 else 1 t = (n - s_i) / span t = max(0.0, min(1.0, t)) return _interp_point_along(coords, t) return best def get_street_centroid_any(conn: sqlite3.Connection, street_norm: str) -> Optional[Dict[str, float]]: # 1) по адресным точкам cur = conn.execute( "SELECT AVG(lat), AVG(lon) FROM address_index WHERE street_norm=? AND lat IS NOT NULL AND lon IS NOT NULL", (street_norm,) ) lat, lon = cur.fetchone() or (None,None) if lat is not None and lon is not None: return {"lat": float(lat), "lon": float(lon)} # 2) по геометрии улиц (highway) cur = conn.execute("SELECT lat, lon FROM street_center WHERE street_norm=?",(street_norm,)) row = cur.fetchone() if row: return {"lat": float(row[0]), "lon": float(row[1])} return None def validate_address(sqlite_path: str, address_str: str) -> Dict[str, Any]: comps = parse_with_libpostal(address_str) street_in = comps.get("road", "") house_in = comps.get("house_number", "") conn = sqlite3.connect(sqlite_path) try: # улица обязательна if not street_in: return { "valid": False, "corrected": None, "components": comps, "coordinates": None, "reason": "Не удалось распознать улицу" } st = best_street_match(conn, street_in) if not st: return { "valid": False, "corrected": None, "components": comps, "coordinates": None, "reason": "Улица не найдена в локальной базе" } street_norm, street_orig = st # 1) дом не введён: вернуть координату улицы (centroid) if not house_in: cent = get_street_centroid_any(conn, street_norm) return { "valid": True if cent else False, "corrected": street_orig, "components": comps, "coordinates": cent, "reason": None if cent else "Для улицы нет координатной опоры" } # 2) дом введён: пробуем точное сопоставление hn = best_house_match(conn, street_norm, house_in) if hn: house_norm, house_orig = hn cur = conn.execute( "SELECT city, postcode, lat, lon FROM address_index WHERE street_norm=? AND housenumber_norm=? LIMIT 1", (street_norm, house_norm) ) row = cur.fetchone() city, postcode, lat, lon = (row or (None,None,None,None)) coords = {"lat": float(lat), "lon": float(lon)} if lat is not None and lon is not None else None # если почему-то lat/lon пустые (редкий случай), дополнительно попробуем интерполяцию/центроид if coords is None: coords = find_interpolation_coord(conn, street_norm, house_norm) or get_street_centroid_any(conn, street_norm) corrected = ", ".join(x for x in [f"{street_orig} {house_orig}", city, postcode] if x and str(x).strip()) return { "valid": True, "corrected": corrected, "components": comps, "coordinates": coords, "reason": None } # 3) дом введён, но не найден: invalid, однако координаты улицы — обязательно fallback = find_interpolation_coord(conn, street_norm, house_in) or get_street_centroid_any(conn, street_norm) return { "valid": False, "corrected": None, "components": comps, "coordinates": fallback, "reason": "Номер дома отсутствует на найденной улице" } finally: conn.close() if __name__ == "__main__": res = validate_address("addresses.sqlite", "Дальняя 8") print(res) pass