Bot-Man-Toe/pyclient/poll.py

157 lines
5.4 KiB
Python

"""Discovery and polling helpers for Bot-Man-Toe servers."""
import multiprocessing
from queue import Empty
from typing import Any, Dict, Optional, Tuple, Union
import requests
def _poll_worker(url: str, payload: Dict[str, Any], timeout: Optional[Union[float, Tuple[float, float]]], queue: multiprocessing.Queue) -> None:
"""Run a polling request in a separate process."""
try:
response = requests.get(url, json=payload, timeout=timeout)
response.raise_for_status()
content = response.json()
if isinstance(content, dict):
queue.put(("ok", content))
return
except requests.exceptions.RequestException:
queue.put(("request_error", None))
return
except ValueError:
queue.put(("invalid", None))
return
queue.put(("invalid", None))
def _poll_request(
url: str,
payload: Dict[str, Any],
request_timeout: Optional[Union[float, Tuple[float, float]]],
deadline: float,
) -> tuple[str, Optional[Dict[str, Any]]]:
"""Execute a bounded JSON request and classify the result."""
context = multiprocessing.get_context("spawn")
queue: multiprocessing.Queue = context.Queue(maxsize=1)
process = context.Process(target=_poll_worker, args=(url, payload, request_timeout, queue))
process.daemon = True
process.start()
process.join(deadline)
if process.is_alive():
process.terminate()
process.join()
queue.close()
queue.join_thread()
return ("timeout", None)
try:
status, content = queue.get(timeout=0.1)
except Empty:
queue.close()
queue.join_thread()
return ("invalid", None)
queue.close()
queue.join_thread()
return status, content
class ServerAgent:
"""Representation of a server that can host one or more games."""
def __init__(self, url: str, name: str, games: Dict[str, Dict[str, Any]]) -> None:
"""
Create a server representation.
:param url: The URL used to discover the server.
:type url: str
:param name: Name of the server's player.
:type name: str
:param games: Games the server is willing to play.
:type games: Dict[str, Dict[str, Any]]
"""
self.url = url.strip("/")
self.name = name
self.games = games
@classmethod
def from_url(
cls,
url: str,
timeout: Optional[Union[float, Tuple[float, float]]] = 10.0,
) -> "ServerAgent":
"""
Create a server agent by polling its discovery endpoint.
The root endpoint is expected to return a JSON object with an optional
``name`` field and an optional ``games`` mapping.
:param url: The URL that the server can be reached at.
:type url: str
:param timeout: Request timeout passed to ``requests.get``.
:type timeout: Optional[Union[float, Tuple[float, float]]]
:return: The server's representation.
:rtype: ServerAgent
:raises requests.exceptions.HTTPError: If the server returns a
non-success HTTP status code.
:raises requests.exceptions.RequestException: If the request fails
before a response is received.
:raises ValueError: If the response body is not a JSON object or if the
payload contains malformed discovery fields.
"""
if timeout is None:
deadline = 10.0
elif isinstance(timeout, (int, float)):
deadline = float(timeout)
else:
deadline = sum(timeout)
status, content = _poll_request(url.rstrip("/") + "/", {}, timeout, deadline)
if status == "timeout":
raise requests.exceptions.RequestException("Server discovery request timed out.")
if status == "request_error":
raise requests.exceptions.RequestException("Server discovery request failed.")
if status != "ok" or content is None:
raise ValueError("Server discovery responses must be JSON objects.")
raw_name = content.get("name", "")
name = "" if raw_name is None else str(raw_name)
games: Dict[str, Dict[str, Any]] = {}
raw_games = content.get("games", {})
if raw_games is not None:
if not isinstance(raw_games, dict):
raise ValueError("The 'games' field must be a JSON object when provided.")
for game_name, profile in raw_games.items():
if isinstance(profile, dict):
games[str(game_name)] = profile
return cls(url=url, name=name, games=games)
def poll(self, game: str, payload: Dict[str, Any], timeout: float = 1.0) -> Optional[Dict[str, Any]]:
"""
Inquire a game to make a move.
:param game: The game the ServerAgent is asked to play.
:type game: str
:param payload: The JSON payload that represents the game's state.
:type payload: Dict[str, Any]
:param timeout: Maximum number of seconds to wait for a move.
:type timeout: float
:return: The server's response, or None if the server did not respond
in time or returned an invalid response.
:rtype: Optional[Dict[str, Any]]
"""
url = f"{self.url.rstrip('/')}/{game.lstrip('/')}"
status, content = _poll_request(url, payload, timeout, timeout)
if status != "ok":
return None
return content