"""Discovery and polling helpers for Bot-Man-Toe servers.""" import multiprocessing from queue import Empty from typing import Any, Dict, Optional, Tuple, Union import requests def _poll_worker(url: str, payload: Dict[str, Any], timeout: Optional[Union[float, Tuple[float, float]]], queue: multiprocessing.Queue) -> None: """Run a polling request in a separate process.""" try: response = requests.get(url, json=payload, timeout=timeout) response.raise_for_status() content = response.json() if isinstance(content, dict): queue.put(("ok", content)) return except requests.exceptions.RequestException: queue.put(("request_error", None)) return except ValueError: queue.put(("invalid", None)) return queue.put(("invalid", None)) def _poll_request( url: str, payload: Dict[str, Any], request_timeout: Optional[Union[float, Tuple[float, float]]], deadline: float, ) -> tuple[str, Optional[Dict[str, Any]]]: """Execute a bounded JSON request and classify the result.""" context = multiprocessing.get_context("spawn") queue: multiprocessing.Queue = context.Queue(maxsize=1) process = context.Process(target=_poll_worker, args=(url, payload, request_timeout, queue)) process.daemon = True process.start() process.join(deadline) if process.is_alive(): process.terminate() process.join() queue.close() queue.join_thread() return ("timeout", None) try: status, content = queue.get(timeout=0.1) except Empty: queue.close() queue.join_thread() return ("invalid", None) queue.close() queue.join_thread() return status, content class ServerAgent: """Representation of a server that can host one or more games.""" def __init__(self, url: str, name: str, games: Dict[str, Dict[str, Any]], debug : bool = False) -> None: """ Create a server representation. :param url: The URL used to discover the server. :type url: str :param name: Name of the server's player. :type name: str :param games: Games the server is willing to play. :type games: Dict[str, Dict[str, Any]] """ self.debug = debug self.games = games self.name = name self.url = url.strip("/") @classmethod def from_url( cls, url: str, timeout: Optional[Union[float, Tuple[float, float]]] = 10.0, debug : bool = False ) -> "ServerAgent": """ Create a server agent by polling its discovery endpoint. The root endpoint is expected to return a JSON object with an optional ``name`` field and an optional ``games`` mapping. :param url: The URL that the server can be reached at. :type url: str :param timeout: Request timeout passed to ``requests.get``. :type timeout: Optional[Union[float, Tuple[float, float]]] :return: The server's representation. :rtype: ServerAgent :raises requests.exceptions.HTTPError: If the server returns a non-success HTTP status code. :raises requests.exceptions.RequestException: If the request fails before a response is received. :raises ValueError: If the response body is not a JSON object or if the payload contains malformed discovery fields. """ if timeout is None: deadline = 10.0 elif isinstance(timeout, (int, float)): deadline = float(timeout) else: deadline = sum(timeout) status, content = _poll_request(url.rstrip("/") + "/", {}, timeout, deadline) if status == "timeout": raise requests.exceptions.RequestException("Server discovery request timed out.") if status == "request_error": raise requests.exceptions.RequestException("Server discovery request failed.") if status != "ok" or content is None: raise ValueError("Server discovery responses must be JSON objects.") raw_name = content.get("name", "") name = "" if raw_name is None else str(raw_name) games: Dict[str, Dict[str, Any]] = {} raw_games = content.get("games", {}) if raw_games is not None: if not isinstance(raw_games, dict): raise ValueError("The 'games' field must be a JSON object when provided.") for game_name, profile in raw_games.items(): if isinstance(profile, dict): games[str(game_name)] = profile return cls(url=url, name=name, games=games, debug=debug) def poll(self, game: str, payload: Dict[str, Any], timeout: float = 1.0) -> Optional[Dict[str, Any]]: """ Inquire a game to make a move. :param game: The game the ServerAgent is asked to play. :type game: str :param payload: The JSON payload that represents the game's state. :type payload: Dict[str, Any] :param timeout: Maximum number of seconds to wait for a move. :type timeout: float :return: The server's response, or None if the server did not respond in time or returned an invalid response. :rtype: Optional[Dict[str, Any]] """ url = f"{self.url.rstrip('/')}/{game.lstrip('/')}" status, content = _poll_request(url, payload, timeout, 3.0) if self.debug: print(f"[DBG] Agent `{self.name}` returned:") print(( status, content )) if status != "ok": return None return content