Files
Input_to_route/route.py

462 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import math
import itertools
import requests
from typing import List, Tuple, Dict, Optional, Set
import random
class Point:
def __init__(self, coord: List[float], tag: str, visit_time: int, list_id=0):
self.coord = coord
self.tag = tag
self.visit_time = visit_time
self.matrix_index = None # Индекс точки в матрице расстояний
self.estimated_time = None # Оценочное время (перемещение + посещение)
self.list_id = list_id
def haversine(coord1: List[float], coord2: List[float]) -> float:
"""Calculate the great-circle distance between two points in kilometers."""
lat1, lon1 = coord1
lat2, lon2 = coord2
R = 6371 # Earth radius in km
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat/2) * math.sin(dlat/2) +
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
math.sin(dlon/2) * math.sin(dlon/2))
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def filter_points_by_time(start_coord: List[float], points: List[Point], total_time: int) -> List[Point]:
"""Filter points based on straight-line distance and visit time."""
filtered = []
for point in points:
distance = haversine(start_coord, point.coord)
travel_time = distance * 10 # Assume 6 km/h walking speed (10 min/km)
point.estimated_time = travel_time + point.visit_time
if point.estimated_time <= total_time:
filtered.append(point)
return filtered
def filter_points_by_tag_proximity(points: List[Point], max_per_tag: int = 10) -> List[Point]:
"""For each tag, keep only the closest points (by estimated time)."""
# Group points by tag
tag_to_points = {}
for point in points:
if point.tag not in tag_to_points:
tag_to_points[point.tag] = []
tag_to_points[point.tag].append(point)
# For each tag, sort by estimated time and keep top max_per_tag
filtered_points = []
for tag, tag_points in tag_to_points.items():
# Sort by estimated time (ascending)
sorted_points = sorted(tag_points, key=lambda p: p.estimated_time)
# Keep at most max_per_tag points
kept_points = sorted_points[:max_per_tag]
filtered_points.extend(kept_points)
print(f"Tag '{tag}': kept {len(kept_points)} out of {len(tag_points)} points")
return filtered_points
def get_duration_matrix(points: List[List[float]]) -> Optional[Tuple[List[List[float]], List[List[float]]]]:
"""Get duration matrix from server."""
url = "https://ha1m-maap-pdmc.gw-1a.dockhost.net/table"
payload = {"points": points}
headers = {"content-type": "application/json"}
try:
response = requests.post(url, json=payload, headers=headers, timeout=30)
if response.status_code == 200:
data = response.json()
return data.get("distances"), data.get("durations")
else:
print(f"Server error: {response.status_code}")
return None
except Exception as e:
print(f"Error requesting duration matrix: {e}")
return None
def group_points_by_significance(points: List[Point], tag_importance: Dict[str, int]) -> Dict[int, List[Point]]:
"""Group points by their importance level."""
grouped = {}
for point in points:
importance = tag_importance.get(point.tag, float('inf'))[0]
if importance not in grouped:
grouped[importance] = []
grouped[importance].append(point)
return grouped
def calculate_route_time_with_matrix(route: List[Point], start_coord: List[float],
duration_matrix: List[List[float]]) -> float:
"""Calculate total time for a route using the duration matrix."""
total_time = 0
current_index = 0 # Start point index
for point in route:
next_index = point.matrix_index
travel_time_seconds = duration_matrix[current_index][next_index]
travel_time_minutes = travel_time_seconds / 60.0
total_time += travel_time_minutes + point.visit_time
current_index = next_index
return total_time
def check_tags_constraint(points: List[Point]) -> bool:
"""Check if there are no more than 5 unique tags."""
unique_tags = set(point.tag for point in points)
return len(unique_tags) <= 5
def generate_routes_exact_tags(grouped_points: Dict[int, List[Point]],
all_tags: Set[str],
tag_importance: Dict[str, int]) -> List[List[Point]]:
"""Generate routes where each tag is visited exactly once using different coordinates."""
# Create a mapping from tag to points
tag_to_points = {}
for points_list in grouped_points.values():
for point in points_list:
if point.tag not in tag_to_points:
tag_to_points[point.tag] = []
tag_to_points[point.tag].append(point)
# For each tag, we need to select exactly one point
tag_selections = []
for tag in all_tags:
tag_selections.append(tag_to_points[tag])
# Generate all combinations of points (one per tag)
all_routes = []
print(len(list(itertools.product(*tag_selections))))
for point_combination in itertools.product(*tag_selections):
# Check if all points have unique coordinates
coords = [tuple(point.coord) for point in point_combination]
if len(coords) != len(set(coords)):
continue # Skip if any coordinates are duplicated
# Group points by importance
points_by_importance = {}
for point in point_combination:
imp = tag_importance[point.tag][0]
if imp not in points_by_importance:
points_by_importance[imp] = []
points_by_importance[imp].append(point)
# Sort by importance
sorted_importances = sorted(points_by_importance.keys())
# Generate all permutations within each importance group
importance_groups = [points_by_importance[imp] for imp in sorted_importances]
for ordering in itertools.product(*[itertools.permutations(group) for group in importance_groups]):
route = []
for group in ordering:
route.extend(group)
all_routes.append(route)
return all_routes
def generate_routes_with_repeats(grouped_points: Dict[int, List[Point]],
all_tags: Set[str],
tag_importance: Dict[str, int],
num_points: int) -> List[List[Point]]:
"""Generate routes when we need to repeat tags to reach the required number of points, ensuring unique coordinates."""
# Create a mapping from tag to points
tag_to_points = {}
for points_list in grouped_points.values():
for point in points_list:
if point.tag not in tag_to_points:
tag_to_points[point.tag] = []
tag_to_points[point.tag].append(point)
all_routes = []
# First, select one point for each tag (mandatory points)
mandatory_selections = [tag_to_points[tag] for tag in all_tags]
# Generate all combinations of mandatory points (one per tag)
for mandatory_combo in itertools.product(*mandatory_selections):
mandatory_points = list(mandatory_combo)
# Check if mandatory points have unique coordinates
mandatory_coords = [tuple(point.coord) for point in mandatory_points]
if len(mandatory_coords) != len(set(mandatory_coords)):
continue # Skip if any coordinates are duplicated in mandatory points
# We need to add (num_points - len(mandatory_points)) additional points
num_additional = num_points - len(mandatory_points)
if num_additional == 0:
# We have exactly the right number of points
points_by_importance = {}
for point in mandatory_points:
imp = tag_importance[point.tag][0]
if imp not in points_by_importance:
points_by_importance[imp] = []
points_by_importance[imp].append(point)
sorted_importances = sorted(points_by_importance.keys())
importance_groups = [points_by_importance[imp] for imp in sorted_importances]
for ordering in itertools.product(*[itertools.permutations(group) for group in importance_groups]):
route = []
for group in ordering:
route.extend(group)
all_routes.append(route)
else:
# We need to add additional points (can be from any tag, including repeats)
# But we must ensure all coordinates are unique
# Get all available points excluding mandatory points
all_available_points = []
for key in grouped_points.keys():
if tag_importance[key][1]:
all_available_points.extend(grouped_points[key])
# Remove mandatory points from available points
available_points = [p for p in all_available_points if p not in mandatory_points]
# Generate combinations of additional points
for additional_combo in itertools.combinations(available_points, num_additional):
# Check if additional points have unique coordinates and don't duplicate with mandatory
additional_coords = [tuple(point.coord) for point in additional_combo]
if len(additional_coords) != len(set(additional_coords)):
continue # Skip if any coordinates are duplicated in additional points
# Check if additional points don't duplicate with mandatory points
all_coords = mandatory_coords + additional_coords
if len(all_coords) != len(set(all_coords)):
continue # Skip if any coordinates are duplicated between mandatory and additional
full_route_candidate = mandatory_points + list(additional_combo)
# Group by importance
points_by_importance = {}
for point in full_route_candidate:
imp = tag_importance[point.tag]
if imp not in points_by_importance:
points_by_importance[imp] = []
points_by_importance[imp].append(point)
sorted_importances = sorted(points_by_importance.keys())
importance_groups = [points_by_importance[imp] for imp in sorted_importances]
for ordering in itertools.product(*[itertools.permutations(group) for group in importance_groups]):
route = []
for group in ordering:
route.extend(group)
all_routes.append(route)
return all_routes
def form_point_list(data):
point_list =[]
for list_id,entry in enumerate(data):
point = Point(list(map(float,entry['coordinate'].split(', '))),entry['type'],entry['time_to_visit'],list_id)
point_list.append(point)
return point_list
def build_route(data, mapping,start_coord,total_time,n_nodes,strategy='best'):
# Example input data - теперь с не более чем 5 уникальными тегами
start_coord_test = [56.331576, 44.003277]
total_time_test = 180 # Увеличим время до 4 часов для большего выбора
points = form_point_list(data)
tag_importance =mapping
# Используем 3 уникальных тега для демонстрации
points_test = [
Point([56.32448, 43.983546], "Памятник", 20),
Point([56.335607, 43.97481], "Архитектура", 20),
Point([56.313472, 43.990747], "Памятник", 20),
#Point([56.324157, 44.002696], "Памятник", 20),
#Point([56.316436, 43.994177], "Памятник", 20),
#Point([56.32377, 44.001879], "Памятник", 20),
#Point([56.329867, 43.99687], "Памятник", 20),
Point([56.311066, 43.94595], "Памятник", 20),
Point([56.333265, 43.972417], "Памятник", 20),
# Point([56.332166, 44.012111], "Памятник", 20),
#Point([56.326786, 44.006836], "Памятник", 20),
Point([56.330232, 44.010941], "Парк", 20),
Point([56.282221, 43.979263], "Парк", 20),
Point([56.277315, 43.921408], "Мозаика", 20),
Point([56.284829, 44.01893], "Парк", 20),
Point([56.308973, 43.99821], "Парк", 20),
Point([56.321545, 44.001921], "Парк", 20),
#Point([56.301798, 44.044003], "Мозаика", 20),
Point([56.268282, 43.919475], "Парк", 20),
Point([56.239625, 43.854551], "Парк", 20),
#Point([56.311214, 43.933981], "Парк", 20),
Point([56.314984, 44.007347], "Парк", 20),
Point([56.32509, 43.983433], "Парк", 20),
Point([56.27449, 43.973357], "Парк", 20),
Point([56.278073, 43.940886], "Парк", 20),
Point([56.358805, 43.825376], "Парк", 20),
Point([56.329995, 44.009444], "Памятник", 20),
Point([56.328551, 43.998718], "Памятник", 20),
Point([56.330355, 43.993105], "Архитектура", 20),
Point([56.321416, 43.973897], "Архитектура", 20),
# Point([56.327298, 44.005706], "Архитектура", 20),
#Point([56.328757, 43.998183], "Архитектура", 20),
# Point([56.328908, 43.995645], "Архитектура", 20),
Point([56.317578, 43.995805], "Архитектура", 20),
Point([56.329433, 44.012764], "Архитектура", 20),
Point([56.3301, 44.008831], "Архитектура", 20),
#Point([56.32995, 43.999495], "Архитектура", 20),
Point([56.327454, 44.041745], "Архитектура", 20),
#Point([56.328576, 44.004872], "Архитектура", 20),
Point([56.3275, 44.007658], "Архитектура", 20),
Point([56.330679, 44.013874], "Архитектура", 20),
# Point([56.331541, 44.001747], "Архитектура", 20),
# Point([56.335071, 43.974627], "Архитектура", 20),
#Point([56.317707, 43.995847], "Архитектура", 20),
#Point([56.323851, 43.985939], "Архитектура", 20),
Point([56.325701, 44.001527], "Архитектура", 20),
Point([56.328754, 43.998954], "Архитектура", 20),
#Point([56.323937, 43.990728], "Музей", 20),
#Point([56.2841, 43.84621], "Музей", 20),
#Point([56.328646, 44.028973], "Музей", 20),
Point([56.327391, 43.857522], "Мозаика", 20),
#Point([56.252239, 43.889066], "Мозаика", 20),
#Point([56.248436, 43.88106], "Мозаика", 20),
#Point([56.321257, 43.94545], "Мозаика", 20),
# Point([56.365284, 43.823251], "Мозаика", 20),
Point([56.294371, 43.912625], "Мозаика", 20),
#Point([56.241768, 43.859687], "Мозаика", 20),
#Point([56.300073, 43.938526], "Мозаика", 20),
#Point([56.229652, 43.947973], "Мозаика", 20),
# Point([56.269486, 43.9238], "Мозаика", 20),
Point([56.299251, 43.985146], "Мозаика", 20),
Point([56.293297, 44.034095], "Мозаика", 20),
Point([56.299251, 43.985146], "Мозаика", 20),
Point([56.229652, 43.947973], "Мозаика", 20),
Point([56.269486, 43.9238], "Мозаика", 20),
#Point([56.293297, 44.034095], "Мозаика", 20),
#Point([56.229652, 43.947973], "Мозаика", 20)
]
tag_importance_test = {
"Памятник": 1,
"Парк": 1,
"Мозаика": 1,
"Архитектура": 1,
#"Музей": 1
}
# Check tags constraint
if not check_tags_constraint(points):
print("Error: More than 5 unique tags in the input data")
return
print("Input data validation: OK (5 or fewer unique tags)")
# Step 1: Filter points using straight-line distance and total time
filtered_by_time = filter_points_by_time(start_coord, points, total_time)
print(f"After initial time filtering: {len(filtered_by_time)} points")
if len(filtered_by_time) < 3:
print("Not enough points after time filtering")
return
# Step 2: Filter points by tag proximity (keep max 10 closest points per tag)
filtered_points = filter_points_by_tag_proximity(filtered_by_time, max_per_tag=10)
print(f"After tag proximity filtering: {len(filtered_points)} points")
if len(filtered_points) < 3:
print("Not enough points after tag proximity filtering")
return
# Step 3: Prepare points for server request (start point + filtered points)
points_for_matrix = [start_coord] + [point.coord for point in filtered_points]
print("Requesting duration matrix from server...")
# Step 4: Get duration matrix from server
result = get_duration_matrix(points_for_matrix)
if result is None:
print("Failed to get duration matrix from server")
return
distances_matrix, durations_matrix = result
print("Duration matrix received successfully")
# Assign matrix indices to points
for i, point in enumerate(filtered_points):
point.matrix_index = i + 1 # +1 because index 0 is the start point
# Step 5: Group by importance
grouped_points = group_points_by_significance(filtered_points, tag_importance)
# Get all unique tags
all_tags = set(point.tag for point in filtered_points)
num_unique_tags = len(all_tags)
print(f"Unique tags: {all_tags} ({num_unique_tags} tags)")
# Step 6: Generate possible routes
print("Generating possible routes...")
# Determine the number of points in the route
if num_unique_tags >= n_nodes:
# Each tag must be visited exactly once
print("Each tag will be visited exactly once with unique coordinates")
possible_routes = generate_routes_exact_tags(grouped_points, all_tags, tag_importance)
else:
# We have fewer than 3 unique tags, need to repeat some tags
print(f"Only {num_unique_tags} unique tags available, will repeat tags to reach 3 points with unique coordinates")
possible_routes = generate_routes_with_repeats(grouped_points, all_tags, tag_importance, n_nodes)
if not possible_routes:
print("No valid routes found that cover all tags with unique coordinates")
return
# Step 7: Calculate time for each route and filter by total_time
valid_routes = []
for route in possible_routes:
route_time = calculate_route_time_with_matrix(route, start_coord, durations_matrix)
if route_time <= total_time:
valid_routes.append((route, route_time))
if not valid_routes:
print("No valid routes found within time constraint")
return
# Step 8: Find optimal route (minimum time)
if strategy=='random':
optimal_route, min_time = random.choice(valid_routes)
elif strategy=='longest':
optimal_route, min_time = max(valid_routes, key=lambda x: x[1])
else:
optimal_route, min_time = min(valid_routes, key=lambda x: x[1])
print(f"\nOptimal route (time: {min_time:.2f} min):")
for i, point in enumerate(optimal_route, 1):
print(f"{i}. {point.tag} at {point.coord} ({point.visit_time} min)")
# Print route details with travel times
print("\nRoute details:")
current_index = 0
total_route_time = 0
for i, point in enumerate(optimal_route):
travel_time_seconds = durations_matrix[current_index][point.matrix_index]
travel_time_minutes = travel_time_seconds / 60.0
segment_time = travel_time_minutes + point.visit_time
total_route_time += segment_time
print(f"Segment {i+1}: {travel_time_minutes:.2f} min travel + {point.visit_time} min visit = {segment_time:.2f} min")
current_index = point.matrix_index
print(f"Total route time: {total_route_time:.2f} min")
# Display all tags covered by the route
route_tags = set(point.tag for point in optimal_route)
#print(f"\nTags covered in this route: {', '.join(route_tags)}")
if all_tags.issubset(route_tags):
print("All tags are covered in this route!")
# Verify all coordinates are unique
route_coords = [tuple(point.coord) for point in optimal_route]
if len(route_coords) == len(set(route_coords)):
print("All coordinates in the route are unique!")
else:
print("ERROR: Duplicate coordinates found in the route!")
places = [data[point.list_id] for point in optimal_route]
return route_coords,places
#if __name__ == "__main__":
# build_route()