Загрузить файлы в «/»

This commit is contained in:
2025-10-27 13:51:58 +03:00
parent 093af9e2a2
commit 5c7f7921e7

411
geocoder.py Normal file
View File

@@ -0,0 +1,411 @@
# -*- coding: utf-8 -*-
import os
import re
import sqlite3
from typing import Optional, Dict, Any, Tuple, List
# 1) Чтение OSM .osm.pbf и извлечение addr:* через pyosmium
import osmium # pip install pyosmium
# 2) Парсинг/нормализация адресов через libpostal
# Требуется установленная системная библиотека libpostal и Python-биндинги "postal"
from postal.parser import parse_address
from postal.expand import expand_address
import json
# 3) Нечеткий поиск (для автокоррекции)
from rapidfuzz import process, fuzz, utils # pip install rapidfuzz
import math
# (Опционально) геометрический центроид для way, если доступен shapely
try:
from shapely.geometry import LineString, Polygon
HAVE_SHAPELY = True
except Exception:
HAVE_SHAPELY = False
DB_SCHEMA = """
PRAGMA journal_mode=WAL;
CREATE TABLE IF NOT EXISTS street_names (
street_norm TEXT PRIMARY KEY,
street_original TEXT
);
CREATE TABLE IF NOT EXISTS address_index (
street_norm TEXT,
housenumber_norm TEXT,
street_original TEXT,
housenumber_original TEXT,
city TEXT,
postcode TEXT,
lat REAL,
lon REAL
);
CREATE INDEX IF NOT EXISTS idx_address_street ON address_index(street_norm);
CREATE INDEX IF NOT EXISTS idx_address_street_house ON address_index(street_norm,housenumber_norm);
"""
# ---------------------------
# Минимальная предобработка
# ---------------------------
_DASHES = {"": "-", "": "-", "": "-"}
def _preclean_text(s: str) -> str:
"""Минимальная очистка для устойчивости парсера: регистр, дефисы, пробелы, буква-цифра разделение."""
s = (s or "").strip().lower().replace("ё", "е")
for k, v in _DASHES.items():
s = s.replace(k, v)
s = re.sub(r"\s+", " ", s)
# Вставить пробел на границе буква<->цифра, чтобы "дом43" -> "дом 43"
s = re.sub(r"(?<=[A-Za-zА-Яа-я])(?=\d)|(?<=\d)(?=[A-Za-zА-Яа-я])", " ", s)
return s.strip()
def _parts_from_libpostal(addr_text: str) -> Dict[str, str]:
"""Собирает метка->значение из parse_address без сложных правил."""
parts: Dict[str, str] = {}
for comp, label in parse_address(addr_text):
comp = " ".join(comp.split())
if not comp:
continue
parts[label] = (parts[label] + " " + comp).strip() if label in parts else comp
return parts
def _prefer_cyrillic_min(expansions: List[str]) -> Optional[str]:
"""Выбрать «лучший» вариант из expand_address: предпочесть кириллицу и минимальную длину."""
if not expansions:
return None
cyr = [e for e in expansions if re.search(r"[А-Яа-я]", e)]
pool = cyr if cyr else expansions
pool = [" ".join(e.split()) for e in pool]
pool.sort(key=len)
return pool[0] if pool else None
def canonicalize_road(road: str) -> str:
"""Канонизация названия улицы через expand_address с минимальной эвристикой выбора."""
if not road:
return ""
exp = expand_address(road)
return _prefer_cyrillic_min(exp) or road
def _clean_token(s: str) -> str:
return " ".join((s or "").strip().lower().replace("ё", "е").split())
def _unit_suffix_and_value(text: str) -> str:
"""
Преобразует текст unit/доп.компонента в компактный русскоязычный суффикс:
- 'корпус 1' / 'к 1' -> 'К1'
- 'строение 2' / 'стр 2' -> 'С2'
- 'лит а' -> 'ЛА'
- 'влад 5' -> 'ВЛ5'
Если тип не распознан, по умолчанию считаем корпус 'К' (без тяжёлых регексов).
"""
t = _clean_token(text)
# выделяем буквы/цифры (минимум регексов: только удалить пробелы)
val = t.replace(" ", "")
# эвристики по подстрокам
if "строен" in t or t.startswith("стр"):
# строение
val = val.replace("строение", "").replace("стр", "")
return f"С{val}".upper() if val else "С"
if "корп" in t or t == "к" or t.startswith("к"):
# корпус
val = val.replace("корпус", "").replace("корп", "").replace("к", "")
return f"К{val}".upper() if val else "К"
if t.startswith("лит"):
# литера
val = val.replace("литера", "").replace("лит", "")
return f"Л{val}".upper() if val else "Л"
if t.startswith("влад"):
# владение
val = val.replace("владение", "").replace("влад", "")
return f"ВЛ{val}".upper() if val else "ВЛ"
# вход/подъезд/лестница можно при желании кодировать как 'П', 'ЛС', но обычно это не часть addr:housenumber в OSM
# по умолчанию — трактуем как корпус
# оставим только цифро-буквенную часть после удаления пробелов
return f"К{val}".upper() if val else ""
def build_house_number_from_parts(parts: Dict[str, str], original_line: str) -> str:
"""
Формирует финальный housenumber из libpostal:
- base = house_number
- добавляет суффиксы из unit/entrance/staircase/level, конвертируя их в 'К..'/'С..'/'Л..'/'ВЛ..'
"""
base = (parts.get("house_number") or "").strip()
# защитимся от «дом43» и т.п. минимальной разделиловкой (сделано ранее в _preclean_text)
suffixes = []
# unit часто содержит корпус/строение
if parts.get("unit"):
suffixes.append(_unit_suffix_and_value(parts["unit"]))
# некоторые адреса могут класть строение/литеру в entrance/staircase
for key in ("entrance", "staircase"):
if parts.get(key):
sfx = _unit_suffix_and_value(parts[key])
if sfx:
suffixes.append(sfx)
# level — это этаж, обычно не часть housenumber в OSM; по умолчанию не добавляем
# если базовый номер пуст — попробуем через expand_address всей строки (без сложных правил)
if not base:
for e in expand_address(original_line):
p2 = _parts_from_libpostal(e)
b2 = (p2.get("house_number") or "").strip()
if b2:
base = b2
if p2.get("unit") and not suffixes:
suffixes.append(_unit_suffix_and_value(p2["unit"]))
break
# склеиваем: пробелы убираем, верхний регистр для стабильности
hn = "".join(base.split()).upper() + "".join(suffixes)
return hn
def normalize_street_for_index(street_original: str) -> str:
"""Нормализация ключа улицы для индекса (регистронезависимая, с лексической очисткой)."""
# Используем обработчик RapidFuzz по умолчанию + канонизацию libpostal
street_clean = _preclean_text(street_original)
street_canon = canonicalize_road(street_clean)
return utils.default_process(street_canon) or street_canon
def normalize_house_for_index(house_original: str) -> str:
"""Простая нормализация номера для индекса: убрать пробелы, привести к верхнему регистру."""
return "".join((house_original or "").split()).upper()
# ---------------------------
# Поиск/валидация адреса
# ---------------------------
def parse_with_libpostal(addr: str) -> Dict[str, str]:
"""
Минимально-шумоустойчивая раскладка через libpostal.
"""
txt = _preclean_text(addr)
parts = _parts_from_libpostal(txt)
road = parts.get("road") or parts.get("pedestrian") or parts.get("footway") or parts.get("residential") or ""
house = build_house_number_from_parts(parts, txt)
city = parts.get("city") or parts.get("city_district") or parts.get("suburb") or ""
postcode = parts.get("postcode", "")
# Канонизируем улицу для сравнения
road_canon = canonicalize_road(road)
return {
"road": road_canon or road or "",
"house_number": house or "",
"city": city or "",
"postcode": postcode or ""
}
def best_street_match(conn: sqlite3.Connection, street_query: str, cutoff: int = 86) -> Optional[Tuple[str, str]]:
"""
Ищет с учётом опечаток: exact по нормализованному ключу, иначе WRatio к ближайшему.
"""
s_key = normalize_street_for_index(street_query)
cur = conn.execute("SELECT street_original FROM street_names WHERE street_norm = ?", (s_key,))
row = cur.fetchone()
if row:
return (s_key, row[0])
cur = conn.execute("SELECT street_norm, street_original FROM street_names")
rows = cur.fetchall()
if not rows:
return None
choices = [r[0] for r in rows]
match = process.extractOne(s_key, choices, scorer=fuzz.WRatio, processor=None, score_cutoff=cutoff)
if match:
matched_norm = match[0]
for s_n, s_o in rows:
if s_n == matched_norm:
return (s_n, s_o)
return None
def _base_digits(s: str) -> str:
m = re.search(r"\d+", s or "")
return m.group(0) if m else ""
def best_house_match(conn, street_norm: str, house_query: str, cutoff: int = 82) -> Optional[Tuple[str, str]]:
# Простой канон запроса: убрать пробелы, верхний регистр
q_norm = ("".join((house_query or "").split())).upper()
q_base = _base_digits(q_norm)
# Вытаскиваем все номера по улице
cur = conn.execute(
"SELECT housenumber_norm, housenumber_original FROM address_index WHERE street_norm = ?",
(street_norm,)
)
rows = cur.fetchall()
if not rows:
return None
# 1) Точное совпадение целиком
for hn, ho in rows:
if hn == q_norm:
return (hn, ho)
# 2) Жёсткий фильтр по базовому числу (обязательное совпадение)
if q_base:
filtered: List[Tuple[str, str]] = [(hn, ho) for hn, ho in rows if _base_digits(hn) == q_base]
else:
filtered = rows
if not filtered:
return None
# 3) Нечёткий матч только среди отфильтрованных кандидатов
choices = [hn for hn, _ in filtered]
match = process.extractOne(q_norm, choices, scorer=fuzz.WRatio, processor=None, score_cutoff=cutoff)
if match:
best_hn = match[0]
for hn, ho in filtered:
if hn == best_hn:
return (hn, ho)
# 4) Fallback: если остался один кандидат с таким же базовым числом — принять его
if len(filtered) == 1:
return filtered[0]
return None
def _interp_point_along(coords: List[List[float]], t: float) -> Dict[str,float]:
# coords: [[lon,lat], ...], t∈[0,1] по длине полилинии
def dist(a,b):
dx=a[0]-b[0]; dy=a[1]-b[1]; return math.hypot(dx, dy)
seg_len=[]; total=0.0
for i in range(len(coords)-1):
d=dist(coords[i], coords[i+1]); seg_len.append(d); total += d
if total==0.0:
return {"lat": coords[0][1], "lon": coords[0][0]}
target = t*total; acc=0.0
for i,d in enumerate(seg_len):
if acc + d >= target:
ratio = 0.0 if d==0 else (target-acc)/d
lon = coords[i][0] + (coords[i+1][0]-coords[i][0])*ratio
lat = coords[i][1] + (coords[i+1][1]-coords[i][1])*ratio
return {"lat": float(lat), "lon": float(lon)}
acc += d
return {"lat": coords[-1][1], "lon": coords[-1][0]}
def _fits_type(n:int, itype:str)->bool:
itype=(itype or "").lower()
if itype=="odd": return n%2==1
if itype=="even": return n%2==0
return True # 'all' или неизвестный тип
def find_interpolation_coord(conn: sqlite3.Connection, street_norm: str, house_query: str) -> Optional[Dict[str,float]]:
m = re.search(r"\d+", house_query or "")
if not m:
return None
n = int(m.group(0))
cur = conn.execute(
"SELECT itype, start_num, end_num, coords_json FROM interpolation_ways WHERE street_norm=?",
(street_norm,)
)
best=None
for itype, s_i, e_i, coords_json in cur.fetchall():
if n < min(s_i,e_i) or n > max(s_i,e_i):
continue
if not _fits_type(n, itype):
continue
coords = json.loads(coords_json)
# позиция вдоль диапазона с плавающей точкой
span = (e_i - s_i) if (e_i - s_i)!=0 else 1
t = (n - s_i) / span
t = max(0.0, min(1.0, t))
return _interp_point_along(coords, t)
return best
def get_street_centroid_any(conn: sqlite3.Connection, street_norm: str) -> Optional[Dict[str, float]]:
# 1) по адресным точкам
cur = conn.execute(
"SELECT AVG(lat), AVG(lon) FROM address_index WHERE street_norm=? AND lat IS NOT NULL AND lon IS NOT NULL",
(street_norm,)
)
lat, lon = cur.fetchone() or (None,None)
if lat is not None and lon is not None:
return {"lat": float(lat), "lon": float(lon)}
# 2) по геометрии улиц (highway)
cur = conn.execute("SELECT lat, lon FROM street_center WHERE street_norm=?",(street_norm,))
row = cur.fetchone()
if row:
return {"lat": float(row[0]), "lon": float(row[1])}
return None
def validate_address(sqlite_path: str, address_str: str) -> Dict[str, Any]:
comps = parse_with_libpostal(address_str)
street_in = comps.get("road", "")
house_in = comps.get("house_number", "")
conn = sqlite3.connect(sqlite_path)
try:
# улица обязательна
if not street_in:
return {
"valid": False,
"corrected": None,
"components": comps,
"coordinates": None,
"reason": "Не удалось распознать улицу"
}
st = best_street_match(conn, street_in)
if not st:
return {
"valid": False,
"corrected": None,
"components": comps,
"coordinates": None,
"reason": "Улица не найдена в локальной базе"
}
street_norm, street_orig = st
# 1) дом не введён: вернуть координату улицы (centroid)
if not house_in:
cent = get_street_centroid_any(conn, street_norm)
return {
"valid": True if cent else False,
"corrected": street_orig,
"components": comps,
"coordinates": cent,
"reason": None if cent else "Для улицы нет координатной опоры"
}
# 2) дом введён: пробуем точное сопоставление
hn = best_house_match(conn, street_norm, house_in)
if hn:
house_norm, house_orig = hn
cur = conn.execute(
"SELECT city, postcode, lat, lon FROM address_index WHERE street_norm=? AND housenumber_norm=? LIMIT 1",
(street_norm, house_norm)
)
row = cur.fetchone()
city, postcode, lat, lon = (row or (None,None,None,None))
coords = {"lat": float(lat), "lon": float(lon)} if lat is not None and lon is not None else None
# если почему-то lat/lon пустые (редкий случай), дополнительно попробуем интерполяцию/центроид
if coords is None:
coords = find_interpolation_coord(conn, street_norm, house_norm) or get_street_centroid_any(conn, street_norm)
corrected = ", ".join(x for x in [f"{street_orig} {house_orig}", city, postcode] if x and str(x).strip())
return {
"valid": True,
"corrected": corrected,
"components": comps,
"coordinates": coords,
"reason": None
}
# 3) дом введён, но не найден: invalid, однако координаты улицы — обязательно
fallback = find_interpolation_coord(conn, street_norm, house_in) or get_street_centroid_any(conn, street_norm)
return {
"valid": False,
"corrected": None,
"components": comps,
"coordinates": fallback,
"reason": "Номер дома отсутствует на найденной улице"
}
finally:
conn.close()
if __name__ == "__main__":
res = validate_address("addresses.sqlite", "ул. Большая Покровская, 15")
print(res)
pass