diff --git a/elo.py b/elo.py new file mode 100644 index 0000000..7678f65 --- /dev/null +++ b/elo.py @@ -0,0 +1,36 @@ +""" + Create an ELO tracker that compares various server agents out there. +""" + +from elo_tracker import EloTracker +from pyclient.games import TicTacToe + +import pyclient +import time + +GAME_FILE = "games.jsonl" +PLAYER_FILE = "known_players.json" + +def main() -> int: + tracker = EloTracker( + game_file_name=GAME_FILE, + player_file_name=PLAYER_FILE, + debug=True, + ) + + tracker.start_periodic_matches( + game=TicTacToe.empty(), + interval_seconds=60, + player_count=2, + ) + + try: + while True: + pass + except KeyboardInterrupt: + tracker.stop_periodic_matches() + + return 0 + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/elo_tracker/__init__.py b/elo_tracker/__init__.py new file mode 100644 index 0000000..2bd5a9c --- /dev/null +++ b/elo_tracker/__init__.py @@ -0,0 +1,5 @@ +from .app import EloTracker + +__all__ = [ + "EloTracker", +] diff --git a/elo_tracker/app.py b/elo_tracker/app.py new file mode 100644 index 0000000..ce02e35 --- /dev/null +++ b/elo_tracker/app.py @@ -0,0 +1,734 @@ +""" + This app hosts the client that'll perform the ELO tracking. +""" + +from __future__ import annotations + +import copy +import json +import os +import random +import threading + +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any, Sequence + +import pyclient + +from pyclient.games import FinishState, Game + + +DEFAULT_ELO = 1000 +STD_DEV_DIFF = 400 +ELO_K_FACTOR = 32 + +@dataclass(frozen=True) +class PlayerIdentifier: + """ + The Player Identifier uniquely identifies each player for the ELO + tracker. + """ + name : str + url : str + version : str | None + + def __key__(self) -> tuple[str, str, str | None]: + return (self.name, self.url, self.version) + + @classmethod + def from_server_agent(cls, agent : pyclient.ServerAgent) -> "PlayerIdentifier": + """ + Gain a player identifier from an agent. + """ + return cls( + name=agent.name, + url=agent.url, + version=agent.profile.get("version", + agent.profile.get("me.noordstar.peanuts.agent.version", None) + ), + ) + +@dataclass() +class EloStat: + """ + The EloStat records the ELO statistics of a single player. + What's their score, and how much did they win? + """ + + # Identity + player_id : PlayerIdentifier + + # Statistics + losses : int + draws : int + wins : int + elo : float + + @classmethod + def new(cls, player_id : PlayerIdentifier) -> "EloStat": + """ + Create a new ELO type based on a player. + + :param player_id: Unique player identifier + :type player_id: PlayerIdentifier + :return: New empty Elo statistics for the player + :rtype: EloStat + """ + return cls( + player_id=player_id, losses=0, draws=0, wins=0, elo=DEFAULT_ELO, + ) + + def to_json(self) -> dict[str, Any]: + """ + Convert EloStat to a JSON-formatted dictionary + + :return: The EloStat in JSON format + :rtype: dict[str, Any] + """ + d = dict( + name=self.player_id.name, + url=self.player_id.url, + losses=self.losses, + draws=self.draws, + wins=self.wins, + elo=int(self.elo), + ) + + if self.player_id.version is not None: + d["version"] = self.player_id.version + + return d + +@dataclass(frozen=True) +class Match: + """ + A Match represents a game written to disk in JSONL format. + """ + game_name : str + participants : list[tuple[PlayerIdentifier, FinishState]] + timestamp : str + + @staticmethod + def now() -> str: + """ + Get a timestamp of now. + + :return: Timestamp in ISO format + :rtype: str + """ + return datetime.now(tz=timezone.utc).isoformat() + + @classmethod + def from_json_record(cls, record : dict[str, Any]) -> "Match": + """ + Create a new match from a decoded JSON object. + + :param participants: Decoded JSON object + :type participants: dict[str, Any] + :return: An initialized match + :rtype: Match + :raises KeyError: The JSON is missing required keys. + :raises ValueError: The JSON is formatted improperly. + """ + participants : list[dict[str, Any]] = record["participants"] + if not isinstance(participants, list): + raise ValueError( + "Key `participants` must be list of objects" + ) + + game_name : str = str(record["name"]) + timestamp : str = str(record["timestamp"]) # TODO: Perhaps verify ISO format? + new_participants : list[tuple[PlayerIdentifier, FinishState]] = [] + + for i, participant in enumerate(participants): + # Sanity assertions + if not isinstance(participant, dict): + raise ValueError( + f"Participant #{i+1} must be dictionary" + ) + + for key in ["name", "url", "result"]: + if not isinstance(participant[key], str): + raise ValueError( + f"Participant #{i+1} must have the `{key}` key as string" + ) + + # Initialize participant + name : str = participant["name"] + url : str = participant["url"] + result = FinishState.from_str(participant["result"]) + version : str | None = participant.get("version", None) + if version is not None: + version = str(version) + + new_participants.append(( + PlayerIdentifier(name=name, url=url, version=version), + result, + )) + + if len(new_participants) < 2: + raise ValueError( + "Expected at least 2 participants in a game for which ELO can be tracked" + ) + + return cls( + game_name=game_name, + participants=new_participants, + timestamp=timestamp, + ) + + @classmethod + def from_replay( + cls, + players : list[pyclient.ServerAgent], + replay : pyclient.GameReplay, + timestamp : str | None, + ) -> "Match": + """ + Convert a GameReplay into a match. + + :param players: The participants of the match. + :type players: list[pyclient.ServerAgent] + :param replay: Game summary. + :type replay: pyclient.GameReplay + :param timestamp: ISO formatted timestamp of when the game was planned. + :type timestamp: str + :return: An initialized match. + :rtype: Match + :raises ValueError: The replay shows an unfinished match. + """ + results = replay.turns[-1].state.winner() + if results is None: + raise ValueError( + "Game hasn't finished yet." + ) + + participants : list[tuple[PlayerIdentifier, FinishState]] = [] + for i, agent in enumerate(players): + finish_state = results.get(i, None) + if finish_state is None: + continue + + participants.append(( + PlayerIdentifier.from_server_agent(agent=agent), + finish_state, + )) + + return cls( + game_name=replay.game_name, + participants=participants, + timestamp=timestamp or cls.now() + ) + + def log(self, file_name : str) -> None: + """ + Log the current match to disk. + + :param file_name: File name to write the match to. + :type file_name: str + """ + with open(file_name, "a", encoding="utf-8") as wp: + wp.write(json.dumps(self.to_json(), sort_keys=True) + "\n") + + def to_json(self) -> dict[str, Any]: + """ + Convert the Match back to JSON. + + :return: The Match in a dictionary that's can be converted to JSON. + :rtype: dict[str, Any] + """ + participants : list[dict[str, str]] = [] + + for player_id, result in self.participants: + d : dict[str, str]= dict( + name=player_id.name, + url=player_id.url, + result=result.name, + ) + + if player_id.version is not None: + d["version"] = player_id.version + + participants.append(d) + + return dict( + name=self.game_name, + participants=participants, + timestamp=self.timestamp, + ) + +class EloTracker: + """ + The Elo tracker tracks matches between URLs that it is familiar with. + """ + def __init__( + self, + game_file_name: str, + player_file_name: str, + debug: bool = False, + name: str = "Bot-Man-Toe Elo Tracker", + ) -> None: + """ + Create an EloTracker. + + :param game_file_name: The file name to write game results to. + :type game_file_name: str + :param player_file_name: The file name to read player URLs from. + :type player_file_name: str + :param debug: Whether to print scheduler errors. + :type debug: bool + :param name: Display name for the leaderboard. + :type name: str + """ + + # Threading variables + self.__lock = threading.RLock() + self.__scheduler_stop = threading.Event() + self.__scheduler_thread: threading.Thread | None = None + + # Immutable values + self.debug: bool = debug + self.game_file_name: str = game_file_name + self.player_file_name: str = player_file_name + self.name: str = name + + # Thread-unsafe variables + # Please use a lock while doing CRUD operations on them + self.players: list[pyclient.ServerAgent] = [] + self.__matches: list[Match] = [] + self.__stats: dict[PlayerIdentifier, EloStat] = {} + + # Initialize tracker + self.__load_matches() + self.load_players() + + def __debug(self, message: str) -> None: + """ + Send a debug message to stdout. Ignored when not in debug mode. + + :param message: The message to debug log + :type message: str + """ + if self.debug: + with self.__lock: + print(f"[EloTracker] {message}") + + def __get_stat(self, player_id : PlayerIdentifier) -> EloStat: + """ + Get a player's statistics based on their player identifier. + + If the player wasn't known, the function returns a newly + initialized record in the database for them. + + :param player_id: Unique player identifier. + :type player_id: PlayerIdentifier + :return: Elo statistics + """ + with self.__lock: + stat = self.__stats.get(player_id, None) + + if stat is not None: + return stat + + stat = EloStat.new(player_id=player_id) + self.__stats[player_id] = stat + + return stat + + def __load_matches(self) -> None: + """ + Load persisted JSONL records and rebuild in-memory statistics. + """ + if not os.path.exists(self.game_file_name): + return + + with self.__lock: + self.__matches = [] + self.__stats = {} + + with open(self.game_file_name, encoding="utf-8") as fp: + for line_no, line in enumerate(fp, start=1): + line = line.strip() + if line == "": + continue + + try: + record = json.loads(line) + except json.JSONDecodeError: + self.__debug( + f"Skipping malformed match record on line {line_no}." + ) + continue + + if not isinstance(record, dict): + self.__debug( + f"Skipping non-object match record on line {line_no}." + ) + continue + + try: + m = Match.from_json_record(record=record) + except ( KeyError, ValueError ): + self.__debug( + f"Skipping malformed JSON object on line {line_no}." + ) + else: + self.__matches.append(m) + self.__register_match(m) + + def __register_match(self, m : Match) -> None: + """ + Apply a newly registered match to the aggregate statistics. + + :param m: Newly created match with results & outcomes. + :type m: Match + """ + + effective_k = ELO_K_FACTOR / (len(m.participants) - 1) + scores : dict[PlayerIdentifier, float] = {} + + # First, calculate the pairwise ELO results + # Do not apply them yet, in order to guarantee fair ELO shifts + for player_id1, result1 in m.participants: + total_k = 0.0 + rating_1 = self.__get_stat(player_id=player_id1).elo + + for player_id2, result2 in m.participants: + rating_2 = self.__get_stat(player_id=player_id2).elo + + expected_score = 1 / (1 + 10 ** ((rating_2 - rating_1) / STD_DEV_DIFF)) + + actual_score = 0 + if result1.score() > 0.0: + actual_score = result1.score() / (result1.score() + result2.score()) + + total_k += effective_k * (actual_score - expected_score) + + scores[player_id1] = total_k + + all_scores = sum(scores.values()) + + if 0.001 <= abs(all_scores): + self.__debug( + f"In total, all ELO score changes added together are {all_scores} (should be 0.0)" + ) + + # Then, apply the ELO score update + count the wins, draws & losses + for player_id, result in m.participants: + player = self.__get_stat(player_id=player_id) + player.elo += scores[player_id] + + match result: + case FinishState.draw: + player.draws += 1 + case FinishState.loss: + player.losses += 1 + case FinishState.win: + player.wins += 1 + + def __scheduler_loop( + self, + game: Game, + interval_seconds: float, + player_count: int, + ) -> None: + """ + Perform a schedule in which you play a random game. + + :param game: Game to play. + :type game: pyclient.Game + :param interval_seconds: Number of seconds to sleep between games + :type interval_seconds: float + :param player_count: The number of players that are supposed to participate + :type player_count: int + """ + while not self.__scheduler_stop.is_set(): + try: + self.load_players() + + with self.__lock: + available = len(self.players) + + if available < player_count: + self.__debug( + f"Skipping scheduled match: {available} players available, " + f"{player_count} required." + ) + else: + self.__debug( + "Playing a new scheduled match" + ) + self.play_random_match(game=game, player_count=player_count) + except Exception as exc: + self.__debug(f"Scheduled match failed: {exc}") + raise exc + + if self.__scheduler_stop.wait(interval_seconds): + break + + def create_flask_app(self, import_name : str) -> Any: + """ + Create a Flask app that exposes tracker statistics. + + :param import_name: The name of the application package. + :type import_name: str + """ + try: + from flask import Flask, Response, jsonify + except ImportError as exc: + raise ImportError( + "Flask is required to host the EloTracker server. " + "Install the project requirements before calling create_app()." + ) from exc + + app = Flask(__name__) + + @app.get("/") + def index() -> Response: + return Response( + """ + + + + + Bot-Man-Toe Elo Tracker + + +
+

Bot-Man-Toe Elo Tracker

+

The JSON API is available at /leaderboard, /matches, /players, and /health.

+
+ + +""".strip(), + mimetype="text/html", + ) + + @app.get("/health") + def health() -> Response: + return jsonify({ + "ok": True, + "periodic_matches": self.is_running_periodic_matches(), + }) + + @app.get("/leaderboard") + def leaderboard() -> Response: + return jsonify(self.get_json_leaderboard()) + + @app.get("/matches") + def matches() -> Response: + return jsonify(self.get_json_matches()) + + @app.get("/players") + def players() -> Response: + return jsonify(self.get_json_players()) + + return app + + def is_running_periodic_matches(self) -> bool: + """ + Return whether the scheduler thread is currently alive. + + :return: Whether the scheduler thread is currently alive. + :rtype: bool + """ + with self.__lock: + return self.__scheduler_thread is not None and self.__scheduler_thread.is_alive() + + def load_players(self) -> None: + """ + Load the known players from disk. + + :raises ValueError: File was not properly JSON-formatted. + """ + with open(self.player_file_name, encoding="utf-8") as fp: + obj = json.load(fp) + if not isinstance(obj, dict): + raise ValueError( + "Expected list of URLs in player file." + ) + + urls = obj.get("players", []) + if not isinstance(urls, list): + raise ValueError( + "Expected `players` field to be a list of strings." + ) + + players : list[pyclient.ServerAgent] = [] + for url in urls: + if not isinstance(url, str): + continue + + try: + agent = pyclient.Agent.from_url(url, debug=self.debug) + except ValueError: + pass # Not an available player right now + else: + players.append(agent) + + with self.__lock: + self.players = players + + for agent in players: + self.__get_stat(PlayerIdentifier.from_server_agent(agent)) + + def play_match(self, players: list[str], game: Game) -> pyclient.GameReplay: + """ + Play a single match with appointed players. + + :param players: List of URLs that participate. + :type players: list[str] + :return: A summary of the game. + :rtype: pyclient.GameReplay + :raises ValueError: One of the URLs could not be accessed. + """ + agents = [ + pyclient.Agent.from_url(url, debug=self.debug) + for url in players + ] + + replay = pyclient.PyClient(debug=self.debug).play_game( + players=agents, + start=game, + ) + + m = Match.from_replay(players=agents, replay=replay, timestamp=Match.now()) + + # Record match + m.log(self.game_file_name) + self.__register_match(m) + + return replay + + def play_random_match( + self, + game: Game, + player_count: int | None = None, + ) -> pyclient.GameReplay: + """ + Play a game with any known players. + + :param game: The game to start playing + :type game: Game + :param player_count: Optional number of players to select. + :type player_count: int | None + :raises ValueError: One of the randomly chosen URLs could not be accessed. + """ + with self.__lock: + players = [agent.url for agent in self.players] + + random.shuffle(players) + + if player_count is not None: + players = players[:player_count] + + return self.play_match(players=players, game=game) + + def start_periodic_matches( + self, + game: Game, + interval_seconds: float = 300, + player_count: int = 2, + ) -> None: + """ + Start running matches periodically in a daemon thread. + + :param game: Game to play. + :type game: pyclient.Game + :param interval_seconds: Number of seconds to sleep between games + :type interval_seconds: float + :param player_count: The number of players that are supposed to participate + :type player_count: int + """ + if interval_seconds <= 0: + raise ValueError("interval_seconds must be greater than zero.") + if player_count <= 1: + raise ValueError("player_count must be greater than one.") + + with self.__lock: + if self.is_running_periodic_matches(): + self.stop_periodic_matches() + + self.__scheduler_stop.clear() + self.__scheduler_thread = threading.Thread( + target=self.__scheduler_loop, + args=(game, interval_seconds, player_count), + daemon=True, + ) + self.__scheduler_thread.start() + + def stop_periodic_matches(self) -> None: + """ + Stop the periodic match scheduler and wait briefly for it to exit. + """ + thread: threading.Thread | None + + with self.__lock: + self.__scheduler_stop.set() + thread = self.__scheduler_thread + + if thread is not None: + thread.join(timeout=5) + + with self.__lock: + if self.__scheduler_thread is thread: + self.__scheduler_thread = None + + def get_json_players(self) -> list[dict[str, Any]]: + """ + Return known currently available players as notebook-friendly dicts. + """ + with self.__lock: + players : list[EloStat] = list(self.__stats.values()) + + players.sort( + key=lambda player: (-int(player.elo), player.player_id.name) + ) + + return [ stat.to_json() for stat in players ] + + def get_json_matches(self, limit: int | None = None) -> list[dict[str, Any]]: + """ + Return persisted match records, newest last unless limited. + + :param limit: Maximum number of most recent matches + :type limit: int | None + :return: A list of most recent matches + :rtype: list[dict[str, Any]] + """ + with self.__lock: + if limit is None: + matches = self.__matches + elif limit <= 0: + matches = [] + else: + matches = self.__matches[-limit:] + + return [ m.to_json() for m in matches ] + + def get_json_leaderboard(self) -> dict[str, Any]: + """ + Return aggregate player statistics for local use or JSON APIs. + """ + return { + "name": self.name, + "players": self.get_json_players(), + } + + def start_server( + self, + host: str = "127.0.0.1", + import_name : str = __name__, + port: int = 5000, + debug: bool = False, + **kwargs: Any, + ) -> None: + """ + Start a Flask development server from which the ELO scores can be + viewed interactively. + """ + return ( + self.create_flask_app(import_name=import_name) + .run(host=host, port=port, debug=debug, **kwargs) + ) diff --git a/games.jsonl b/games.jsonl new file mode 100644 index 0000000..e69de29 diff --git a/known_players.json b/known_players.json new file mode 100644 index 0000000..cf92b60 --- /dev/null +++ b/known_players.json @@ -0,0 +1,5 @@ +{ + "players": [ + "https://bmt001.noordstar.me" + ] +} \ No newline at end of file diff --git a/pyclient/agent.py b/pyclient/agent.py index 51b01d7..0493bea 100644 --- a/pyclient/agent.py +++ b/pyclient/agent.py @@ -34,8 +34,16 @@ class Agent: :type url: str :return: An agent that contacts a server when polled. :rtype: ServerAgent + :raises ValueError: The server fails to reach out one of the URLs. """ - return ServerAgent.from_server_url(url=url, **kwargs) + try: + return ServerAgent.from_server_url(url=url, **kwargs) + except (ValueError, requests.RequestException, requests.HTTPError): + pass + + raise ValueError( + "URL did not lead to a willing agent" + ) @property def name(self) -> str: diff --git a/pyclient/client.py b/pyclient/client.py index 316c72c..4f3fdb2 100644 --- a/pyclient/client.py +++ b/pyclient/client.py @@ -66,6 +66,7 @@ class PyClient: :rtype: GameReplay """ return GameReplay( + game_name=start.game_name(), start=start, turns=list(self.gen_game(players=players, start=start)), ) diff --git a/pyclient/games/game.py b/pyclient/games/game.py index f4eecc8..448bc24 100644 --- a/pyclient/games/game.py +++ b/pyclient/games/game.py @@ -15,6 +15,40 @@ class FinishState(Enum): loss = auto() win = auto() + @classmethod + def from_str(cls, s : str) -> "FinishState": + """ + Convert the finish state from a string. + + :param s: String to convert. + :type s: str + :return: Finish state + :rtype: FinishState + :raises ValueError: Invalid string value. + """ + for option in FinishState: + if s == option.name: + return option + else: + raise ValueError( + f"Unknown finish state `{s}`" + ) + + def score(self) -> float: + """ + As a score between 0 and 1, convert how "good" an outcome is. + + :return: A score determining how beneficial a finish state is. + :rtype: float + """ + match self: + case FinishState.draw: + return 0.5 + case FinishState.loss: + return 0.0 + case FinishState.win: + return 1.0 + class Game: """ Base class for all games. diff --git a/pyclient/replay.py b/pyclient/replay.py index 694c152..b165a60 100644 --- a/pyclient/replay.py +++ b/pyclient/replay.py @@ -16,6 +16,7 @@ class GameReplay: played game. """ + game_name : str start : Game turns : list[Turn]