diff --git a/python/pathfinding_demo.ipynb b/python/pathfinding_demo.ipynb index c76475c..3945409 100644 --- a/python/pathfinding_demo.ipynb +++ b/python/pathfinding_demo.ipynb @@ -13,24 +13,75 @@ "source": [ "# Pathfinding demo\n", "\n", - "## List of methods\n", + "Pathfinding is the task of finding shortest (or any) path from one point to the other [1]. Majority of practically used pathfinding methods are based on graph search, e.g. representing the map as a graph (e.g. grid of nodes covering the map) and finding (shortest) path between nodes on this graph. Edges between the nodes may have some cost, which represents length or difficulty of getting from one node to the other.\n", "\n", - "Non-exhaustive list of methods follows.\n", + "## Brief overview of methods\n", "\n", - "$V$ is the set of all vertices; $E$ is the set of all edges; $|V|$ is the size of the set\n", + "### Graph-based\n", "\n", - "1. Depth-first search\n", - " - s\n", - "1. [Bellman-Ford algorithm](https://en.wikipedia.org/wiki/Bellman%E2%80%93Ford_algorithm)\n", - " - exhaustive method\n", - " - $O(|V|*|E|)$\n", + "Before we can use graph-based search, we have to map the graph nodes and edges to the positions in the world. This may be as simple as creating regular grid, or more efficiently create nodes only in some critical points in the world map. We can also use 3D space (navmeshes in games: jumps, climbing, ...).\n", + "The less nodes there are, the faster the search. \n", + "\n", + "* Depth-first search\n", + "* Breadth-first search\n", + " * does not take the cost into account\n", + " * can be used to create flow fields and distance maps\n", + " * useful for efficiently calculating paths for many agents with one destination\n", + "* Dijkstra\n", + " * similar to BFS, but takes edge cost into account\n", + " * prioritizes search to the direction of lesser cost\n", + "* Greedy Best-First search (GBFS)\n", + " * similar to BFS, but uses some heuristic to prioritize search\n", + " * this may be e.g. manhattan distance to the destination\n", + " * may get a bit \"stuck\" if there are obstacles\n", + "* A*\n", + " * combines GBFS and Dijkstra\n", + " * priority is the sum of heuristic and the cost-so-far\n", + " * usually the best option \n", + "\n", + "There are many possible modifications and optimizations to these methods. Graph-based methods may also be combined with non-graph ones (e.g. having one unit in RTS search the path using A*, and all the other units within a group attracted to it using potential field).\n", + "\n", + "### Non-graph based\n", + "\n", + "* Gradient descent\n", + " * Optimizes paths by following the steepest descent in a potential field\n", + "* Potential field methods\n", + " * Simulate attractive forces toward goals and repulsive forces from obstacles\n", + "* Straight-line or Euclidean paths\n", + " * Simply connect points directly, often with collision checks; basic for open spaces but may require smoothing for obstacles.\n", + "\n", + "## Selecting a method\n", + "\n", + "Selecting a method depends on several factors:\n", + "\n", + "* use case\n", + " * what \"world\" are we navigating in: is it possible to represent it efficiently as a graph?\n", + " * if we do just one-off calculation, or if we intend to do that very often\n", + " * if we do \"one source, one destination\", \"one source, all destinations\", \"all sources, one destination\" or \"all sources, all destinations\" - see [this article](https://www.redblobgames.com/pathfinding/tower-defense/) \n", + " * if there are moving obstacles\n", + "* performance requirements: how fast we want to calculate the path, how often we do it\n", + "* memory requirements: some methods are more memory-intensive\n", + "\n", + "## Sources\n", + "\n", + "[[1] Wikipedia on pathfinding](https://en.wikipedia.org/wiki/Pathfinding)\n", + "\n", + "[Redblobgames](https://www.redblobgames.com/) has excellent set of articles about pathfinding and other related topics for game development.\n", + "\n", + "## List of implemented methods\n", + "\n", + "This Python demo implements few of the described methods.\n", + "\n", + "1. Breadth-first search\n", "2. Dijkstra\n", - "3. A*" + "3. Greedy Best-First search (GBFS)\n", + "4. A*\n", + "\n" ] }, { "cell_type": "code", - "execution_count": 85, + "execution_count": 7, "id": "fbdf9d2c-d050-4744-b559-abc71e550725", "metadata": { "editable": true, @@ -47,12 +98,17 @@ "\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", - "from typing import Protocol, Optional" + "import time\n", + "import random\n", + "from typing import Optional, NewType, Any\n", + "from abc import ABC, abstractmethod\n", + "from queue import Queue, PriorityQueue\n", + "from dataclasses import dataclass, field" ] }, { "cell_type": "code", - "execution_count": 86, + "execution_count": 8, "id": "c704cf15-95fa-49c1-af1b-c99f7b5c8b95", "metadata": { "editable": true, @@ -67,57 +123,104 @@ "# Type and interfaces definition\n", "#\n", "\n", - "type Point2D = tuple[int, int] # tuple(x, y)\n", + "Point2D = NewType(\"Point2D\", tuple[int, int])\n", + "# type Point2D = tuple[int, int] # tuple(x, y)\n", "type Path = list[Point2D]\n", - "type ElapsedTime_ns = float # nanoseconds\n", - "type VisitedNodeCount = int\n", "\n", "class Map:\n", " \"\"\"\n", " 2D map consisting of cells with given cost\n", " \"\"\"\n", - " array: np.array\n", + " # array not defined as private, as plotting utilities work with it directly\n", + " array: np.ndarray\n", + " _visited_nodes: int\n", "\n", " def __init__(self, width: int, height: int) -> None:\n", " assert width > 0\n", " assert height > 0\n", - " self.array = np.zeros((width, height), dtype=np.float64)\n", + " rows = height\n", + " cols = width\n", + " self.array = np.zeros((rows, cols), dtype=np.float64)\n", + " self._visited_nodes = 0\n", "\n", " def Randomize(self, low: float = 0.0, high: float = 1.0) -> None:\n", " self.array = np.random.uniform(low, high, self.array.shape)\n", "\n", - " def GetCost(self, point: Point2D) -> float:\n", - " return self.array[point]\n", - " \n", " def IsPointValid(self, point: Point2D) -> bool:\n", - " ...\n", + " x, y = point\n", + " y_max, x_max = self.array.shape\n", + " x_in_bounds = (0 <= x < x_max) \n", + " y_in_bounds = (0 <= y < y_max) \n", + " return x_in_bounds and y_in_bounds\n", " \n", - " def GetNeighbours(self) -> list[Point2D]:\n", - " ...\n", - "\n", - " \n", - "\n", - "class PathFinder(Protocol):\n", - " def SetMap(m: Map) -> None:\n", - " ...\n", - "\n", - " def CalculatePath(start: Point2D, end: Point2D) -> Path:\n", + " def GetNeighbours(self, center_point: Point2D) -> list[Point2D]:\n", " \"\"\"\n", - " Calculate path on a given map.\n", - " Note: map must be set first using SetMap (or using constructor)\n", + " Get list of neighboring points (without actually visiting them)\n", " \"\"\"\n", + " points: list[Point2D] = []\n", + " x_center, y_center = center_point\n", + " for x in range(-1,2):\n", + " for y in range(-1,2):\n", + " if x == 0 and y == 0:\n", + " continue\n", + " p = Point2D((x + x_center, y + y_center))\n", + " if self.IsPointValid(p):\n", + " points.append(p)\n", + " return points\n", + " \n", + " def GetPointCost(self, point: Point2D) -> float:\n", + " x, y = point\n", + " row, col = y, x\n", + " return self.array[(row, col)]\n", + " \n", + " def GetPathCost(self, path: Path) -> float:\n", + " return sum([self.GetPointCost(p) for p in path])\n", "\n", - " def GetStats() -> (ElapsedTime_ns, VisitedNodeCount):\n", + " def ResetVisitedCount(self) -> None:\n", + " self._visited_nodes = 0\n", + "\n", + " def GetVisitedCount(self) -> int:\n", + " return self._visited_nodes\n", + "\n", + " def Visit(self, point: Point2D) -> float:\n", " \"\"\"\n", - " Return performance stats for the last calculation:\n", - " - elapsed time in nanoseconds,\n", - " - number of visited nodes during search\n", - " \"\"\"\n" + " Visit the node and return its cost\n", + " \"\"\"\n", + " if not self.IsPointValid(point):\n", + " raise ValueError(\"Point out of bounds\")\n", + " self._visited_nodes += 1\n", + " return self.GetPointCost(point)\n", + "\n", + " def CreateMaze(self, wall_probability: float = 0.3) -> None:\n", + " \"\"\"\n", + " Note: generated with Grok\n", + " Generate a simple maze on the map.\n", + " - Borders are set as walls (cost 1000).\n", + " - Internal cells are randomly set to 1 (path) or 1000 (wall) based on wall_probability.\n", + "\n", + " Args:\n", + " wall_probability (float): Probability (0-1) that an internal cell becomes a wall.\n", + " \"\"\"\n", + " rows, cols = self.array.shape\n", + "\n", + " # Set borders to walls (cost 1000)\n", + " self.array[0, :] = 1000 # Top row\n", + " self.array[-1, :] = 1000 # Bottom row\n", + " self.array[:, 0] = 1000 # Left column\n", + " self.array[:, -1] = 1000 # Right column\n", + "\n", + " # Set internal cells randomly\n", + " for y in range(1, rows - 1): # Skip borders\n", + " for x in range(1, cols - 1):\n", + " if random.random() < wall_probability:\n", + " self.array[y, x] = 1000 # Wall\n", + " else:\n", + " self.array[y, x] = 1 # Normal tile\n" ] }, { "cell_type": "code", - "execution_count": 87, + "execution_count": 9, "id": "043a1f1c-a7a7-4f24-b69c-c6c809830111", "metadata": { "editable": true, @@ -136,19 +239,20 @@ " _axes: Optional[plt.Axes]\n", " _cmap: plt.Colormap\n", " _cmap_counter: int\n", - " \n", + "\n", " def __init__(self):\n", " self._axes = None\n", " self._cmap = plt.get_cmap('tab10')\n", " self._cmap_counter = 0\n", - " \n", + "\n", " def DrawMap(self, m: Map):\n", " M, N = m.array.shape\n", " _, ax = plt.subplots()\n", - " ax.imshow(m.array, cmap='terrain', origin='lower', interpolation='none')\n", + " ax.imshow(m.array, cmap='gist_earth', origin='lower', interpolation='none')\n", " self._axes = ax\n", "\n", " def DrawPath(self, path: Path, label: str = \"Path\"):\n", + "\n", " \"\"\"\n", " Draw path on a map. Note that DrawMap has to be called first\n", " \"\"\"\n", @@ -159,12 +263,26 @@ " self._axes.plot(xs, ys, 'o-', color=color, label=label)\n", " self._axes.plot(xs[0], ys[0], 'o', color='lime', markersize=8) # starting point\n", " self._axes.plot(xs[-1], ys[-1], 'o', color='magenta', markersize=8) # end point\n", - " " + " self._axes.legend()\n", + "\n", + "\n", + "#\n", + "# Utilities and helper classes\n", + "#\n", + "\n", + "@dataclass(order=True)\n", + "class PrioritizedItem:\n", + " \"\"\"\n", + " Helper class for wrapping items in the PriorityQueue,\n", + " so that it can compare items with priority\n", + " \"\"\"\n", + " item: Any = field(compare=False)\n", + " priority: float\n" ] }, { "cell_type": "code", - "execution_count": 88, + "execution_count": 10, "id": "859c64f4-e65c-4905-a775-c6f17542eac8", "metadata": {}, "outputs": [], @@ -173,47 +291,232 @@ "# Method: depth-first search\n", "#\n", "\n", - "class DFS:\n", + "#\n", + "# Pathfinding implementations\n", + "#\n", "\n", - " name = \"Depth First Search\"\n", + "class PathFinderBase(ABC):\n", + " name: str\n", " _map: Optional[Map]\n", - " \n", + " _elapsed_time_ns: int\n", + " _visited_node_count: int\n", + "\n", " def __init__(self) -> None:\n", " self._map = None\n", - " \n", + " self._elapsed_time_ns = 0\n", + " self._visited_node_count = 0\n", + "\n", + "\n", " def SetMap(self, m: Map) -> None:\n", " self._map = m\n", - " \n", - " def CalculatePath(self, start: Point2D, end: Point2D) -> Path:\n", - " assert m is not None, \"SetMap must be called first\"\n", - " return [(0,0), (5,5), (6,6), (1,9)]\n", "\n", - " def GetStats(self) -> (ElapsedTime_ns, VisitedNodeCount):\n", - " return 150.0, 42\n", + " def CalculatePath(self, start: Point2D, end: Point2D) -> Optional[Path]:\n", + " \"\"\"\n", + " Calculate path on a given map.\n", + " Note: map must be set first using SetMap\n", + " \"\"\"\n", + " assert self._map is not None, \"SetMap must be called first\"\n", + " self._map.ResetVisitedCount()\n", + " start_time = time.perf_counter_ns()\n", + " res = self._CalculatePath(start, end)\n", + " stop_time = time.perf_counter_ns()\n", + " self._elapsed_time_ns = stop_time - start_time\n", + " self._visited_node_count = self._map.GetVisitedCount()\n", + " return res\n", + "\n", + " @abstractmethod\n", + " def _CalculatePath(self, start: Point2D, end: Point2D) -> Optional[Path]:\n", + " \"\"\"\n", + " This method must be implemented by the derived classes\n", + " \"\"\"\n", + "\n", + " def GetStats(self) -> tuple[int, int]:\n", + " \"\"\"\n", + " Return performance stats for the last calculation:\n", + " - elapsed time in nanoseconds,\n", + " - number of visited nodes during search\n", + " \"\"\"\n", + " return self._elapsed_time_ns, self._visited_node_count\n", "\n", "\n", - "class BFS:\n", + "class BFS(PathFinderBase):\n", + " \"\"\"\n", + " Iterative breadth-first search\n", + " Finds optimal path and creates flow-field, does not take the node cost into account.\n", + " This would be good match for static maps with lots of agents with one\n", + " destination.\n", + " Compared to A*, this is more computationally expensive if we only want\n", + " to find path for one agent.\n", + " \"\"\"\n", "\n", " name = \"Breadth First Search\"\n", - " _map: Optional[Map]\n", - " \n", - " def __init__(self) -> None:\n", - " self._map = None\n", - " \n", - " def SetMap(self, m: Map) -> None:\n", - " self._map = m\n", - " \n", - " def CalculatePath(self, start: Point2D, end: Point2D) -> Path:\n", - " assert m is not None, \"SetMap must be called first\"\n", - " return [(0,0), (1,0), (2,0), (3,0)]\n", + " # flow field and distance map\n", + " _came_from: dict[Point2D, Point2D]\n", + " _distance: dict[Point2D, float]\n", "\n", - " def GetStats(self) -> (ElapsedTime_ns, VisitedNodeCount):\n", - " return 300.0, 21" + " def _CalculatePath(self, start_point: Point2D, end_point: Point2D) -> Optional[Path]:\n", + " frontier: Queue[Point2D] = Queue()\n", + " frontier.put(start_point)\n", + " self._came_from: dict[Point2D, Optional[Point2D]] = { start_point: None }\n", + " self._distance: dict[Point2D, float] = { start_point: 0.0 }\n", + "\n", + " # build flow field\n", + " early_exit = False\n", + " while not frontier.empty() and not early_exit:\n", + " current = frontier.get()\n", + " for next_point in self._map.GetNeighbours(current):\n", + " if next_point not in self._came_from:\n", + " frontier.put(next_point)\n", + " self._distance[next_point] = self._distance[current] + 1.0\n", + " _ = self._map.Visit(next_point) # visit only to track visited node count\n", + " self._came_from[next_point] = current\n", + " if next_point == end_point:\n", + " # early exit - if you want to build the whole flow field, remove this\n", + " early_exit = True\n", + " break\n", + " # find actual path\n", + " path: Path = []\n", + " current = end_point\n", + " path.append(current)\n", + " while self._came_from[current] is not None:\n", + " current = self._came_from[current]\n", + " path.append(current)\n", + " path.reverse()\n", + " return path\n", + "\n", + "\n", + "class DijkstraAlgorithm(PathFinderBase):\n", + " \"\"\"\n", + " Dijsktra's algorithm (Uniform Cost Search)\n", + " Like BFS, but takes into account cost of nodes\n", + " (priority for the search being the distance from the start)\n", + " \"\"\"\n", + "\n", + " name = \"Dijkstra's Algorithm\"\n", + "\n", + " def _CalculatePath(self, start_point: Point2D, end_point: Point2D) -> Optional[Path]:\n", + " frontier: PriorityQueue[PrioritizedItem] = PriorityQueue()\n", + " came_from: dict[Point2D, Optional[Point2D]] = {start_point: None}\n", + " cost_so_far: dict[Point2D, float] = {start_point: 0.0}\n", + "\n", + " frontier.put(PrioritizedItem(start_point, 0.0))\n", + " while not frontier.empty():\n", + " current = frontier.get().item\n", + " if current == end_point:\n", + " # early exit - remove if you want to build the whole flow map\n", + " break\n", + " for next_point in self._map.GetNeighbours(current):\n", + " new_cost = cost_so_far[current] + self._map.Visit(next_point)\n", + " if next_point not in cost_so_far or new_cost < cost_so_far[next_point]:\n", + " cost_so_far[next_point] = new_cost\n", + " priority = new_cost\n", + " frontier.put(PrioritizedItem(next_point, priority))\n", + " came_from[next_point] = current\n", + " # build the actual path\n", + " path: Path = []\n", + " current = end_point\n", + " path.append(current)\n", + " while came_from[current] is not None:\n", + " current = came_from[current]\n", + " path.append(current)\n", + " path.reverse()\n", + " return path\n", + " \n", + "\n", + "class GBFS(PathFinderBase):\n", + " \"\"\"\n", + " Like Dijsktra's Algorithm, but uses some heuristic as a priority \n", + " instead of the cost of the node\n", + " \"\"\"\n", + " \n", + " name = \"Greedy Best First Search\"\n", + "\n", + " @staticmethod\n", + " def heuristic(a: Point2D, b: Point2D) -> float:\n", + " # for now we use Manhattan distance, although\n", + " # it is probably not entirely correct, given that\n", + " # we can also move diagonally in the grid\n", + " # TODO a problem for future me\n", + " x_a, y_a = a\n", + " x_b, y_b = b\n", + " return abs(x_a - x_b) + abs(y_a - y_b)\n", + "\n", + " def _CalculatePath(self, start_point: Point2D, end_point: Point2D) -> Optional[Path]:\n", + " frontier: PriorityQueue[PrioritizedItem] = PriorityQueue()\n", + " came_from: dict[Point2D, Optional[Point2D]] = {start_point: None}\n", + " \n", + " frontier.put(PrioritizedItem(start_point, 0.0))\n", + " # create the flow field\n", + " while not frontier.empty():\n", + " current = frontier.get().item\n", + " if current == end_point:\n", + " # early exit\n", + " break\n", + " for next_point in self._map.GetNeighbours(current):\n", + " if next_point not in came_from:\n", + " priority = self.heuristic(end_point, next_point)\n", + " frontier.put(PrioritizedItem(next_point, priority))\n", + " _ = self._map.Visit(next_point) # visit only to track visited node count\n", + " came_from[next_point] = current\n", + " # create the actual path\n", + " path: Path = [end_point]\n", + " while came_from[current] is not None:\n", + " current = came_from[current]\n", + " path.append(current)\n", + " path.reverse()\n", + " return path\n", + "\n", + "\n", + "class A_star(PathFinderBase):\n", + " \"\"\"\n", + " Combines Dijsktra's Algorithm and GBFS:\n", + " priority is the sum of the heuristic and distance from the start\n", + " \"\"\"\n", + "\n", + " name = \"A*\"\n", + "\n", + " @staticmethod\n", + " def heuristic(a: Point2D, b: Point2D) -> float:\n", + " # for now we use Manhattan distance, although\n", + " # it is probably not entirely correct, given that\n", + " # we can also move diagonally in the grid\n", + " # TODO a problem for future me\n", + " x_a, y_a = a\n", + " x_b, y_b = b\n", + " return abs(x_a - x_b) + abs(y_a - y_b)\n", + "\n", + " def _CalculatePath(self, start_point: Point2D, end_point: Point2D) -> Optional[Path]:\n", + " frontier: PriorityQueue[PrioritizedItem] = PriorityQueue()\n", + " came_from: dict[Point2D, Optional[Point2D]] = { start_point: None }\n", + " cost_so_far: dict[Point2D, float] = { start_point: 0.0 }\n", + "\n", + " frontier.put(PrioritizedItem(start_point, 0.0))\n", + " while not frontier.empty():\n", + " current = frontier.get().item\n", + " if current == end_point:\n", + " # early exit\n", + " break\n", + " for next_point in self._map.GetNeighbours(current):\n", + " new_cost = cost_so_far[current] + self._map.Visit(next_point)\n", + " if next_point not in cost_so_far or new_cost < cost_so_far[next_point]:\n", + " cost_so_far[next_point] = new_cost\n", + " priority = new_cost + self.heuristic(end_point, next_point)\n", + " frontier.put(PrioritizedItem(next_point, priority))\n", + " came_from[next_point] = current\n", + " # create the actual path\n", + " path: Path = [end_point]\n", + " current = end_point\n", + " while came_from[current] is not None:\n", + " current = came_from[current]\n", + " path.append(current)\n", + " path.reverse()\n", + " return path\n", + "\n" ] }, { "cell_type": "code", - "execution_count": 89, + "execution_count": 11, "id": "ece3a6c8-aa1d-49a8-9f4c-06ebff72f991", "metadata": { "editable": true, @@ -227,13 +530,15 @@ "name": "stdout", "output_type": "stream", "text": [ - "Breadth First Search : took 300.0 ns, visited 21 nodes\n", - "Depth First Search : took 150.0 ns, visited 42 nodes\n" + "Breadth First Search : took 1.941 ms, visited 561 nodes, cost 17012.00\n", + "Dijkstra's Algorithm : took 2.708 ms, visited 2925 nodes, cost 2027.00\n", + "Greedy Best First Search: took 0.268 ms, visited 120 nodes, cost 10019.00\n", + "A* : took 0.370 ms, visited 275 nodes, cost 2028.00\n" ] }, { "data": { - "image/png": "", + "image/png": "", "text/plain": [ "
" ] @@ -243,31 +548,34 @@ } ], "source": [ - "# Define the map and start/stop points\n", - "m = Map(10, 10)\n", - "m.Randomize()\n", - "starting_point: Point2D = (0,0)\n", - "end_point: Point2D = (9,9)\n", + " # Define the map and start/stop points\n", + "m = Map(30,20)\n", + "#m.Randomize()\n", + "m.CreateMaze()\n", + "starting_point: Point2D = Point2D((29,19))\n", + "end_point: Point2D = Point2D((1,1))\n", "\n", - "#\n", - "# Calculate paths using various methods and visualize them\n", - "#\n", - "\n", - "path_finder_classes: list[PathFinder] = {\n", - " DFS, BFS\n", - "}\n", + "path_finder_classes: list[type[PathFinderBase]] = [\n", + " BFS,\n", + " DijkstraAlgorithm,\n", + " GBFS,\n", + " A_star,\n", + "]\n", "\n", "v = Visualizer()\n", "v.DrawMap(m)\n", "\n", - "for pt in path_finder_classes:\n", - " path_finder = pt()\n", + "for pfc in path_finder_classes:\n", + " path_finder = pfc()\n", " path_finder.SetMap(m)\n", " path = path_finder.CalculatePath(starting_point, end_point)\n", " elapsed_time, visited_nodes = path_finder.GetStats()\n", - " print(f\"{path_finder.name:22}: took {elapsed_time} ns, visited {visited_nodes} nodes\")\n", - " v.DrawPath(path)\n", - "\n" + " if path is not None: \n", + " cost = m.GetPathCost(path)\n", + " print(f\"{path_finder.name:24}: took {elapsed_time/1e6:.3f} ms, visited {visited_nodes} nodes, cost {cost:.2f}\")\n", + " v.DrawPath(path, label=path_finder.name)\n", + " else:\n", + " print(f\"{path_finder.name}: No path found\")\n" ] }, {