diff --git a/python/pathfinding_demo.py b/python/pathfinding_demo.py index 9d184db..2f245ee 100644 --- a/python/pathfinding_demo.py +++ b/python/pathfinding_demo.py @@ -8,9 +8,10 @@ import matplotlib.pyplot as plt import numpy as np import time -from typing import Optional, NewType +from typing import Optional, NewType, Any from abc import ABC, abstractmethod -from queue import Queue +from queue import Queue, PriorityQueue +from dataclasses import dataclass, field # # Type and interfaces definition @@ -224,7 +225,7 @@ class BFS(PathFinderBase): self._came_from: dict[Point2D, Optional[Point2D]] = { end_point: None } self._distance: dict[Point2D, float] = { end_point: 0.0 } - # build "flow map" + # build flow field early_exit = False while not frontier.empty() and not early_exit: current = frontier.get() @@ -248,15 +249,52 @@ class BFS(PathFinderBase): return path -class Dijkstra(PathFinderBase): +class DijkstraAlgorithm(PathFinderBase): """ + Dijsktra's algorithm (Uniform Cost Search) Like BFS, but takes into account cost of nodes """ - name = "Dijkstra" + name = "Dijkstra's Algorithm" + + @dataclass(order=True) + class PrioritizedItem: + """ + Helper class for wrapping items in the PriorityQueue, + so that it can compare items with priority + """ + item: Any = field(compare=False) + priority: float def _CalculatePath(self, start_point: Point2D, end_point: Point2D) -> Optional[Path]: - ... + frontier: PriorityQueue[self.PrioritizedItem] = PriorityQueue() + came_from: dict[Point2D, Optional[Point2D]] = {end_point: None} # we start from end node + cost_so_far: dict[Point2D, float] = {end_point: 0.0} + + frontier.put(self.PrioritizedItem(end_point, 0.0)) + while not frontier.empty(): + current = frontier.get().item + #print(f"{current=}") + if current == start_point: + # early exit - remove if you want to build the whole flow map + break + for next_point in self._map.GetNeighbours(current): + new_cost = cost_so_far[current] + self._map.Visit(next_point) + if next_point not in cost_so_far or new_cost < cost_so_far[next_point]: + cost_so_far[next_point] = new_cost + priority = new_cost + frontier.put(self.PrioritizedItem(next_point, priority)) + came_from[next_point] = current + # build the actual path + path: Path = [] + current = start_point + path.append(current) + while came_from[current] is not None: + current = came_from[current] + path.append(current) + return path + + class A_star(PathFinderBase): @@ -264,6 +302,8 @@ class A_star(PathFinderBase): def _CalculatePath(self, start_point: Point2D, end_point: Point2D) -> Optional[Path]: ... + + # # Calculate paths using various methods and visualize them # @@ -278,7 +318,7 @@ def main(): path_finder_classes: list[type[PathFinderBase]] = [ DFS, BFS, - Dijkstra, + DijkstraAlgorithm, A_star ]