Files
Input_to_route/geocoder.py

412 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import os
import re
import sqlite3
from typing import Optional, Dict, Any, Tuple, List
# 1) Чтение OSM .osm.pbf и извлечение addr:* через pyosmium
import osmium # pip install pyosmium
# 2) Парсинг/нормализация адресов через libpostal
# Требуется установленная системная библиотека libpostal и Python-биндинги "postal"
from postal.parser import parse_address
from postal.expand import expand_address
import json
# 3) Нечеткий поиск (для автокоррекции)
from rapidfuzz import process, fuzz, utils # pip install rapidfuzz
import math
# (Опционально) геометрический центроид для way, если доступен shapely
try:
from shapely.geometry import LineString, Polygon
HAVE_SHAPELY = True
except Exception:
HAVE_SHAPELY = False
DB_SCHEMA = """
PRAGMA journal_mode=WAL;
CREATE TABLE IF NOT EXISTS street_names (
street_norm TEXT PRIMARY KEY,
street_original TEXT
);
CREATE TABLE IF NOT EXISTS address_index (
street_norm TEXT,
housenumber_norm TEXT,
street_original TEXT,
housenumber_original TEXT,
city TEXT,
postcode TEXT,
lat REAL,
lon REAL
);
CREATE INDEX IF NOT EXISTS idx_address_street ON address_index(street_norm);
CREATE INDEX IF NOT EXISTS idx_address_street_house ON address_index(street_norm,housenumber_norm);
"""
# ---------------------------
# Минимальная предобработка
# ---------------------------
_DASHES = {"": "-", "": "-", "": "-"}
def _preclean_text(s: str) -> str:
"""Минимальная очистка для устойчивости парсера: регистр, дефисы, пробелы, буква-цифра разделение."""
s = (s or "").strip().lower().replace("ё", "е")
for k, v in _DASHES.items():
s = s.replace(k, v)
s = re.sub(r"\s+", " ", s)
# Вставить пробел на границе буква<->цифра, чтобы "дом43" -> "дом 43"
s = re.sub(r"(?<=[A-Za-zА-Яа-я])(?=\d)|(?<=\d)(?=[A-Za-zА-Яа-я])", " ", s)
return s.strip()
def _parts_from_libpostal(addr_text: str) -> Dict[str, str]:
"""Собирает метка->значение из parse_address без сложных правил."""
parts: Dict[str, str] = {}
for comp, label in parse_address(addr_text):
comp = " ".join(comp.split())
if not comp:
continue
parts[label] = (parts[label] + " " + comp).strip() if label in parts else comp
return parts
def _prefer_cyrillic_min(expansions: List[str]) -> Optional[str]:
"""Выбрать «лучший» вариант из expand_address: предпочесть кириллицу и минимальную длину."""
if not expansions:
return None
cyr = [e for e in expansions if re.search(r"[А-Яа-я]", e)]
pool = cyr if cyr else expansions
pool = [" ".join(e.split()) for e in pool]
pool.sort(key=len)
return pool[0] if pool else None
def canonicalize_road(road: str) -> str:
"""Канонизация названия улицы через expand_address с минимальной эвристикой выбора."""
if not road:
return ""
exp = expand_address(road)
return _prefer_cyrillic_min(exp) or road
def _clean_token(s: str) -> str:
return " ".join((s or "").strip().lower().replace("ё", "е").split())
def _unit_suffix_and_value(text: str) -> str:
"""
Преобразует текст unit/доп.компонента в компактный русскоязычный суффикс:
- 'корпус 1' / 'к 1' -> 'К1'
- 'строение 2' / 'стр 2' -> 'С2'
- 'лит а' -> 'ЛА'
- 'влад 5' -> 'ВЛ5'
Если тип не распознан, по умолчанию считаем корпус 'К' (без тяжёлых регексов).
"""
t = _clean_token(text)
# выделяем буквы/цифры (минимум регексов: только удалить пробелы)
val = t.replace(" ", "")
# эвристики по подстрокам
if "строен" in t or t.startswith("стр"):
# строение
val = val.replace("строение", "").replace("стр", "")
return f"С{val}".upper() if val else "С"
if "корп" in t or t == "к" or t.startswith("к"):
# корпус
val = val.replace("корпус", "").replace("корп", "").replace("к", "")
return f"К{val}".upper() if val else "К"
if t.startswith("лит"):
# литера
val = val.replace("литера", "").replace("лит", "")
return f"Л{val}".upper() if val else "Л"
if t.startswith("влад"):
# владение
val = val.replace("владение", "").replace("влад", "")
return f"ВЛ{val}".upper() if val else "ВЛ"
# вход/подъезд/лестница можно при желании кодировать как 'П', 'ЛС', но обычно это не часть addr:housenumber в OSM
# по умолчанию — трактуем как корпус
# оставим только цифро-буквенную часть после удаления пробелов
return f"К{val}".upper() if val else ""
def build_house_number_from_parts(parts: Dict[str, str], original_line: str) -> str:
"""
Формирует финальный housenumber из libpostal:
- base = house_number
- добавляет суффиксы из unit/entrance/staircase/level, конвертируя их в 'К..'/'С..'/'Л..'/'ВЛ..'
"""
base = (parts.get("house_number") or "").strip()
# защитимся от «дом43» и т.п. минимальной разделиловкой (сделано ранее в _preclean_text)
suffixes = []
# unit часто содержит корпус/строение
if parts.get("unit"):
suffixes.append(_unit_suffix_and_value(parts["unit"]))
# некоторые адреса могут класть строение/литеру в entrance/staircase
for key in ("entrance", "staircase"):
if parts.get(key):
sfx = _unit_suffix_and_value(parts[key])
if sfx:
suffixes.append(sfx)
# level — это этаж, обычно не часть housenumber в OSM; по умолчанию не добавляем
# если базовый номер пуст — попробуем через expand_address всей строки (без сложных правил)
if not base:
for e in expand_address(original_line):
p2 = _parts_from_libpostal(e)
b2 = (p2.get("house_number") or "").strip()
if b2:
base = b2
if p2.get("unit") and not suffixes:
suffixes.append(_unit_suffix_and_value(p2["unit"]))
break
# склеиваем: пробелы убираем, верхний регистр для стабильности
hn = "".join(base.split()).upper() + "".join(suffixes)
return hn
def normalize_street_for_index(street_original: str) -> str:
"""Нормализация ключа улицы для индекса (регистронезависимая, с лексической очисткой)."""
# Используем обработчик RapidFuzz по умолчанию + канонизацию libpostal
street_clean = _preclean_text(street_original)
street_canon = canonicalize_road(street_clean)
return utils.default_process(street_canon) or street_canon
def normalize_house_for_index(house_original: str) -> str:
"""Простая нормализация номера для индекса: убрать пробелы, привести к верхнему регистру."""
return "".join((house_original or "").split()).upper()
# ---------------------------
# Поиск/валидация адреса
# ---------------------------
def parse_with_libpostal(addr: str) -> Dict[str, str]:
"""
Минимально-шумоустойчивая раскладка через libpostal.
"""
txt = _preclean_text(addr)
parts = _parts_from_libpostal(txt)
road = parts.get("road") or parts.get("pedestrian") or parts.get("footway") or parts.get("residential") or ""
house = build_house_number_from_parts(parts, txt)
city = parts.get("city") or parts.get("city_district") or parts.get("suburb") or ""
postcode = parts.get("postcode", "")
# Канонизируем улицу для сравнения
road_canon = canonicalize_road(road)
return {
"road": road_canon or road or "",
"house_number": house or "",
"city": city or "",
"postcode": postcode or ""
}
def best_street_match(conn: sqlite3.Connection, street_query: str, cutoff: int = 86) -> Optional[Tuple[str, str]]:
"""
Ищет с учётом опечаток: exact по нормализованному ключу, иначе WRatio к ближайшему.
"""
s_key = normalize_street_for_index(street_query)
cur = conn.execute("SELECT street_original FROM street_names WHERE street_norm = ?", (s_key,))
row = cur.fetchone()
if row:
return (s_key, row[0])
cur = conn.execute("SELECT street_norm, street_original FROM street_names")
rows = cur.fetchall()
if not rows:
return None
choices = [r[0] for r in rows]
match = process.extractOne(s_key, choices, scorer=fuzz.WRatio, processor=None, score_cutoff=cutoff)
if match:
matched_norm = match[0]
for s_n, s_o in rows:
if s_n == matched_norm:
return (s_n, s_o)
return None
def _base_digits(s: str) -> str:
m = re.search(r"\d+", s or "")
return m.group(0) if m else ""
def best_house_match(conn, street_norm: str, house_query: str, cutoff: int = 82) -> Optional[Tuple[str, str]]:
# Простой канон запроса: убрать пробелы, верхний регистр
q_norm = ("".join((house_query or "").split())).upper()
q_base = _base_digits(q_norm)
# Вытаскиваем все номера по улице
cur = conn.execute(
"SELECT housenumber_norm, housenumber_original FROM address_index WHERE street_norm = ?",
(street_norm,)
)
rows = cur.fetchall()
if not rows:
return None
# 1) Точное совпадение целиком
for hn, ho in rows:
if hn == q_norm:
return (hn, ho)
# 2) Жёсткий фильтр по базовому числу (обязательное совпадение)
if q_base:
filtered: List[Tuple[str, str]] = [(hn, ho) for hn, ho in rows if _base_digits(hn) == q_base]
else:
filtered = rows
if not filtered:
return None
# 3) Нечёткий матч только среди отфильтрованных кандидатов
choices = [hn for hn, _ in filtered]
match = process.extractOne(q_norm, choices, scorer=fuzz.WRatio, processor=None, score_cutoff=cutoff)
if match:
best_hn = match[0]
for hn, ho in filtered:
if hn == best_hn:
return (hn, ho)
# 4) Fallback: если остался один кандидат с таким же базовым числом — принять его
if len(filtered) == 1:
return filtered[0]
return None
def _interp_point_along(coords: List[List[float]], t: float) -> Dict[str,float]:
# coords: [[lon,lat], ...], t∈[0,1] по длине полилинии
def dist(a,b):
dx=a[0]-b[0]; dy=a[1]-b[1]; return math.hypot(dx, dy)
seg_len=[]; total=0.0
for i in range(len(coords)-1):
d=dist(coords[i], coords[i+1]); seg_len.append(d); total += d
if total==0.0:
return {"lat": coords[0][1], "lon": coords[0][0]}
target = t*total; acc=0.0
for i,d in enumerate(seg_len):
if acc + d >= target:
ratio = 0.0 if d==0 else (target-acc)/d
lon = coords[i][0] + (coords[i+1][0]-coords[i][0])*ratio
lat = coords[i][1] + (coords[i+1][1]-coords[i][1])*ratio
return {"lat": float(lat), "lon": float(lon)}
acc += d
return {"lat": coords[-1][1], "lon": coords[-1][0]}
def _fits_type(n:int, itype:str)->bool:
itype=(itype or "").lower()
if itype=="odd": return n%2==1
if itype=="even": return n%2==0
return True # 'all' или неизвестный тип
def find_interpolation_coord(conn: sqlite3.Connection, street_norm: str, house_query: str) -> Optional[Dict[str,float]]:
m = re.search(r"\d+", house_query or "")
if not m:
return None
n = int(m.group(0))
cur = conn.execute(
"SELECT itype, start_num, end_num, coords_json FROM interpolation_ways WHERE street_norm=?",
(street_norm,)
)
best=None
for itype, s_i, e_i, coords_json in cur.fetchall():
if n < min(s_i,e_i) or n > max(s_i,e_i):
continue
if not _fits_type(n, itype):
continue
coords = json.loads(coords_json)
# позиция вдоль диапазона с плавающей точкой
span = (e_i - s_i) if (e_i - s_i)!=0 else 1
t = (n - s_i) / span
t = max(0.0, min(1.0, t))
return _interp_point_along(coords, t)
return best
def get_street_centroid_any(conn: sqlite3.Connection, street_norm: str) -> Optional[Dict[str, float]]:
# 1) по адресным точкам
cur = conn.execute(
"SELECT AVG(lat), AVG(lon) FROM address_index WHERE street_norm=? AND lat IS NOT NULL AND lon IS NOT NULL",
(street_norm,)
)
lat, lon = cur.fetchone() or (None,None)
if lat is not None and lon is not None:
return {"lat": float(lat), "lon": float(lon)}
# 2) по геометрии улиц (highway)
cur = conn.execute("SELECT lat, lon FROM street_center WHERE street_norm=?",(street_norm,))
row = cur.fetchone()
if row:
return {"lat": float(row[0]), "lon": float(row[1])}
return None
def validate_address(sqlite_path: str, address_str: str) -> Dict[str, Any]:
comps = parse_with_libpostal(address_str)
street_in = comps.get("road", "")
house_in = comps.get("house_number", "")
conn = sqlite3.connect(sqlite_path)
try:
# улица обязательна
if not street_in:
return {
"valid": False,
"corrected": None,
"components": comps,
"coordinates": None,
"reason": "Не удалось распознать улицу"
}
st = best_street_match(conn, street_in)
if not st:
return {
"valid": False,
"corrected": None,
"components": comps,
"coordinates": None,
"reason": "Улица не найдена в локальной базе"
}
street_norm, street_orig = st
# 1) дом не введён: вернуть координату улицы (centroid)
if not house_in:
cent = get_street_centroid_any(conn, street_norm)
return {
"valid": True if cent else False,
"corrected": street_orig,
"components": comps,
"coordinates": cent,
"reason": None if cent else "Для улицы нет координатной опоры"
}
# 2) дом введён: пробуем точное сопоставление
hn = best_house_match(conn, street_norm, house_in)
if hn:
house_norm, house_orig = hn
cur = conn.execute(
"SELECT city, postcode, lat, lon FROM address_index WHERE street_norm=? AND housenumber_norm=? LIMIT 1",
(street_norm, house_norm)
)
row = cur.fetchone()
city, postcode, lat, lon = (row or (None,None,None,None))
coords = {"lat": float(lat), "lon": float(lon)} if lat is not None and lon is not None else None
# если почему-то lat/lon пустые (редкий случай), дополнительно попробуем интерполяцию/центроид
if coords is None:
coords = find_interpolation_coord(conn, street_norm, house_norm) or get_street_centroid_any(conn, street_norm)
corrected = ", ".join(x for x in [f"{street_orig} {house_orig}", city, postcode] if x and str(x).strip())
return {
"valid": True,
"corrected": corrected,
"components": comps,
"coordinates": coords,
"reason": None
}
# 3) дом введён, но не найден: invalid, однако координаты улицы — обязательно
fallback = find_interpolation_coord(conn, street_norm, house_in) or get_street_centroid_any(conn, street_norm)
return {
"valid": False,
"corrected": None,
"components": comps,
"coordinates": fallback,
"reason": "Номер дома отсутствует на найденной улице"
}
finally:
conn.close()
if __name__ == "__main__":
res = validate_address("addresses.sqlite", "ул. Большая Покровская, 15")
print(res)
pass