735 lines
23 KiB
Python
735 lines
23 KiB
Python
"""
|
|
This app hosts the client that'll perform the ELO tracking.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import copy
|
|
import json
|
|
import os
|
|
import random
|
|
import threading
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Sequence
|
|
|
|
import pyclient
|
|
|
|
from pyclient.games import FinishState, Game
|
|
|
|
|
|
DEFAULT_ELO = 1000
|
|
STD_DEV_DIFF = 400
|
|
ELO_K_FACTOR = 32
|
|
|
|
@dataclass(frozen=True)
|
|
class PlayerIdentifier:
|
|
"""
|
|
The Player Identifier uniquely identifies each player for the ELO
|
|
tracker.
|
|
"""
|
|
name : str
|
|
url : str
|
|
version : str | None
|
|
|
|
def __key__(self) -> tuple[str, str, str | None]:
|
|
return (self.name, self.url, self.version)
|
|
|
|
@classmethod
|
|
def from_server_agent(cls, agent : pyclient.ServerAgent) -> "PlayerIdentifier":
|
|
"""
|
|
Gain a player identifier from an agent.
|
|
"""
|
|
return cls(
|
|
name=agent.name,
|
|
url=agent.url,
|
|
version=agent.profile.get("version",
|
|
agent.profile.get("me.noordstar.peanuts.agent.version", None)
|
|
),
|
|
)
|
|
|
|
@dataclass()
|
|
class EloStat:
|
|
"""
|
|
The EloStat records the ELO statistics of a single player.
|
|
What's their score, and how much did they win?
|
|
"""
|
|
|
|
# Identity
|
|
player_id : PlayerIdentifier
|
|
|
|
# Statistics
|
|
losses : int
|
|
draws : int
|
|
wins : int
|
|
elo : float
|
|
|
|
@classmethod
|
|
def new(cls, player_id : PlayerIdentifier) -> "EloStat":
|
|
"""
|
|
Create a new ELO type based on a player.
|
|
|
|
:param player_id: Unique player identifier
|
|
:type player_id: PlayerIdentifier
|
|
:return: New empty Elo statistics for the player
|
|
:rtype: EloStat
|
|
"""
|
|
return cls(
|
|
player_id=player_id, losses=0, draws=0, wins=0, elo=DEFAULT_ELO,
|
|
)
|
|
|
|
def to_json(self) -> dict[str, Any]:
|
|
"""
|
|
Convert EloStat to a JSON-formatted dictionary
|
|
|
|
:return: The EloStat in JSON format
|
|
:rtype: dict[str, Any]
|
|
"""
|
|
d = dict(
|
|
name=self.player_id.name,
|
|
url=self.player_id.url,
|
|
losses=self.losses,
|
|
draws=self.draws,
|
|
wins=self.wins,
|
|
elo=int(self.elo),
|
|
)
|
|
|
|
if self.player_id.version is not None:
|
|
d["version"] = self.player_id.version
|
|
|
|
return d
|
|
|
|
@dataclass(frozen=True)
|
|
class Match:
|
|
"""
|
|
A Match represents a game written to disk in JSONL format.
|
|
"""
|
|
game_name : str
|
|
participants : list[tuple[PlayerIdentifier, FinishState]]
|
|
timestamp : str
|
|
|
|
@staticmethod
|
|
def now() -> str:
|
|
"""
|
|
Get a timestamp of now.
|
|
|
|
:return: Timestamp in ISO format
|
|
:rtype: str
|
|
"""
|
|
return datetime.now(tz=timezone.utc).isoformat()
|
|
|
|
@classmethod
|
|
def from_json_record(cls, record : dict[str, Any]) -> "Match":
|
|
"""
|
|
Create a new match from a decoded JSON object.
|
|
|
|
:param participants: Decoded JSON object
|
|
:type participants: dict[str, Any]
|
|
:return: An initialized match
|
|
:rtype: Match
|
|
:raises KeyError: The JSON is missing required keys.
|
|
:raises ValueError: The JSON is formatted improperly.
|
|
"""
|
|
participants : list[dict[str, Any]] = record["participants"]
|
|
if not isinstance(participants, list):
|
|
raise ValueError(
|
|
"Key `participants` must be list of objects"
|
|
)
|
|
|
|
game_name : str = str(record["name"])
|
|
timestamp : str = str(record["timestamp"]) # TODO: Perhaps verify ISO format?
|
|
new_participants : list[tuple[PlayerIdentifier, FinishState]] = []
|
|
|
|
for i, participant in enumerate(participants):
|
|
# Sanity assertions
|
|
if not isinstance(participant, dict):
|
|
raise ValueError(
|
|
f"Participant #{i+1} must be dictionary"
|
|
)
|
|
|
|
for key in ["name", "url", "result"]:
|
|
if not isinstance(participant[key], str):
|
|
raise ValueError(
|
|
f"Participant #{i+1} must have the `{key}` key as string"
|
|
)
|
|
|
|
# Initialize participant
|
|
name : str = participant["name"]
|
|
url : str = participant["url"]
|
|
result = FinishState.from_str(participant["result"])
|
|
version : str | None = participant.get("version", None)
|
|
if version is not None:
|
|
version = str(version)
|
|
|
|
new_participants.append((
|
|
PlayerIdentifier(name=name, url=url, version=version),
|
|
result,
|
|
))
|
|
|
|
if len(new_participants) < 2:
|
|
raise ValueError(
|
|
"Expected at least 2 participants in a game for which ELO can be tracked"
|
|
)
|
|
|
|
return cls(
|
|
game_name=game_name,
|
|
participants=new_participants,
|
|
timestamp=timestamp,
|
|
)
|
|
|
|
@classmethod
|
|
def from_replay(
|
|
cls,
|
|
players : list[pyclient.ServerAgent],
|
|
replay : pyclient.GameReplay,
|
|
timestamp : str | None,
|
|
) -> "Match":
|
|
"""
|
|
Convert a GameReplay into a match.
|
|
|
|
:param players: The participants of the match.
|
|
:type players: list[pyclient.ServerAgent]
|
|
:param replay: Game summary.
|
|
:type replay: pyclient.GameReplay
|
|
:param timestamp: ISO formatted timestamp of when the game was planned.
|
|
:type timestamp: str
|
|
:return: An initialized match.
|
|
:rtype: Match
|
|
:raises ValueError: The replay shows an unfinished match.
|
|
"""
|
|
results = replay.turns[-1].state.winner()
|
|
if results is None:
|
|
raise ValueError(
|
|
"Game hasn't finished yet."
|
|
)
|
|
|
|
participants : list[tuple[PlayerIdentifier, FinishState]] = []
|
|
for i, agent in enumerate(players):
|
|
finish_state = results.get(i, None)
|
|
if finish_state is None:
|
|
continue
|
|
|
|
participants.append((
|
|
PlayerIdentifier.from_server_agent(agent=agent),
|
|
finish_state,
|
|
))
|
|
|
|
return cls(
|
|
game_name=replay.game_name,
|
|
participants=participants,
|
|
timestamp=timestamp or cls.now()
|
|
)
|
|
|
|
def log(self, file_name : str) -> None:
|
|
"""
|
|
Log the current match to disk.
|
|
|
|
:param file_name: File name to write the match to.
|
|
:type file_name: str
|
|
"""
|
|
with open(file_name, "a", encoding="utf-8") as wp:
|
|
wp.write(json.dumps(self.to_json(), sort_keys=True) + "\n")
|
|
|
|
def to_json(self) -> dict[str, Any]:
|
|
"""
|
|
Convert the Match back to JSON.
|
|
|
|
:return: The Match in a dictionary that's can be converted to JSON.
|
|
:rtype: dict[str, Any]
|
|
"""
|
|
participants : list[dict[str, str]] = []
|
|
|
|
for player_id, result in self.participants:
|
|
d : dict[str, str]= dict(
|
|
name=player_id.name,
|
|
url=player_id.url,
|
|
result=result.name,
|
|
)
|
|
|
|
if player_id.version is not None:
|
|
d["version"] = player_id.version
|
|
|
|
participants.append(d)
|
|
|
|
return dict(
|
|
name=self.game_name,
|
|
participants=participants,
|
|
timestamp=self.timestamp,
|
|
)
|
|
|
|
class EloTracker:
|
|
"""
|
|
The Elo tracker tracks matches between URLs that it is familiar with.
|
|
"""
|
|
def __init__(
|
|
self,
|
|
game_file_name: str,
|
|
player_file_name: str,
|
|
debug: bool = False,
|
|
name: str = "Bot-Man-Toe Elo Tracker",
|
|
) -> None:
|
|
"""
|
|
Create an EloTracker.
|
|
|
|
:param game_file_name: The file name to write game results to.
|
|
:type game_file_name: str
|
|
:param player_file_name: The file name to read player URLs from.
|
|
:type player_file_name: str
|
|
:param debug: Whether to print scheduler errors.
|
|
:type debug: bool
|
|
:param name: Display name for the leaderboard.
|
|
:type name: str
|
|
"""
|
|
|
|
# Threading variables
|
|
self.__lock = threading.RLock()
|
|
self.__scheduler_stop = threading.Event()
|
|
self.__scheduler_thread: threading.Thread | None = None
|
|
|
|
# Immutable values
|
|
self.debug: bool = debug
|
|
self.game_file_name: str = game_file_name
|
|
self.player_file_name: str = player_file_name
|
|
self.name: str = name
|
|
|
|
# Thread-unsafe variables
|
|
# Please use a lock while doing CRUD operations on them
|
|
self.players: list[pyclient.ServerAgent] = []
|
|
self.__matches: list[Match] = []
|
|
self.__stats: dict[PlayerIdentifier, EloStat] = {}
|
|
|
|
# Initialize tracker
|
|
self.__load_matches()
|
|
self.load_players()
|
|
|
|
def __debug(self, message: str) -> None:
|
|
"""
|
|
Send a debug message to stdout. Ignored when not in debug mode.
|
|
|
|
:param message: The message to debug log
|
|
:type message: str
|
|
"""
|
|
if self.debug:
|
|
with self.__lock:
|
|
print(f"[EloTracker] {message}")
|
|
|
|
def __get_stat(self, player_id : PlayerIdentifier) -> EloStat:
|
|
"""
|
|
Get a player's statistics based on their player identifier.
|
|
|
|
If the player wasn't known, the function returns a newly
|
|
initialized record in the database for them.
|
|
|
|
:param player_id: Unique player identifier.
|
|
:type player_id: PlayerIdentifier
|
|
:return: Elo statistics
|
|
"""
|
|
with self.__lock:
|
|
stat = self.__stats.get(player_id, None)
|
|
|
|
if stat is not None:
|
|
return stat
|
|
|
|
stat = EloStat.new(player_id=player_id)
|
|
self.__stats[player_id] = stat
|
|
|
|
return stat
|
|
|
|
def __load_matches(self) -> None:
|
|
"""
|
|
Load persisted JSONL records and rebuild in-memory statistics.
|
|
"""
|
|
if not os.path.exists(self.game_file_name):
|
|
return
|
|
|
|
with self.__lock:
|
|
self.__matches = []
|
|
self.__stats = {}
|
|
|
|
with open(self.game_file_name, encoding="utf-8") as fp:
|
|
for line_no, line in enumerate(fp, start=1):
|
|
line = line.strip()
|
|
if line == "":
|
|
continue
|
|
|
|
try:
|
|
record = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
self.__debug(
|
|
f"Skipping malformed match record on line {line_no}."
|
|
)
|
|
continue
|
|
|
|
if not isinstance(record, dict):
|
|
self.__debug(
|
|
f"Skipping non-object match record on line {line_no}."
|
|
)
|
|
continue
|
|
|
|
try:
|
|
m = Match.from_json_record(record=record)
|
|
except ( KeyError, ValueError ):
|
|
self.__debug(
|
|
f"Skipping malformed JSON object on line {line_no}."
|
|
)
|
|
else:
|
|
self.__matches.append(m)
|
|
self.__register_match(m)
|
|
|
|
def __register_match(self, m : Match) -> None:
|
|
"""
|
|
Apply a newly registered match to the aggregate statistics.
|
|
|
|
:param m: Newly created match with results & outcomes.
|
|
:type m: Match
|
|
"""
|
|
|
|
effective_k = ELO_K_FACTOR / (len(m.participants) - 1)
|
|
scores : dict[PlayerIdentifier, float] = {}
|
|
|
|
# First, calculate the pairwise ELO results
|
|
# Do not apply them yet, in order to guarantee fair ELO shifts
|
|
for player_id1, result1 in m.participants:
|
|
total_k = 0.0
|
|
rating_1 = self.__get_stat(player_id=player_id1).elo
|
|
|
|
for player_id2, result2 in m.participants:
|
|
rating_2 = self.__get_stat(player_id=player_id2).elo
|
|
|
|
expected_score = 1 / (1 + 10 ** ((rating_2 - rating_1) / STD_DEV_DIFF))
|
|
|
|
actual_score = 0
|
|
if result1.score() > 0.0:
|
|
actual_score = result1.score() / (result1.score() + result2.score())
|
|
|
|
total_k += effective_k * (actual_score - expected_score)
|
|
|
|
scores[player_id1] = total_k
|
|
|
|
all_scores = sum(scores.values())
|
|
|
|
if 0.001 <= abs(all_scores):
|
|
self.__debug(
|
|
f"In total, all ELO score changes added together are {all_scores} (should be 0.0)"
|
|
)
|
|
|
|
# Then, apply the ELO score update + count the wins, draws & losses
|
|
for player_id, result in m.participants:
|
|
player = self.__get_stat(player_id=player_id)
|
|
player.elo += scores[player_id]
|
|
|
|
match result:
|
|
case FinishState.draw:
|
|
player.draws += 1
|
|
case FinishState.loss:
|
|
player.losses += 1
|
|
case FinishState.win:
|
|
player.wins += 1
|
|
|
|
def __scheduler_loop(
|
|
self,
|
|
game: Game,
|
|
interval_seconds: float,
|
|
player_count: int,
|
|
) -> None:
|
|
"""
|
|
Perform a schedule in which you play a random game.
|
|
|
|
:param game: Game to play.
|
|
:type game: pyclient.Game
|
|
:param interval_seconds: Number of seconds to sleep between games
|
|
:type interval_seconds: float
|
|
:param player_count: The number of players that are supposed to participate
|
|
:type player_count: int
|
|
"""
|
|
while not self.__scheduler_stop.is_set():
|
|
try:
|
|
self.load_players()
|
|
|
|
with self.__lock:
|
|
available = len(self.players)
|
|
|
|
if available < player_count:
|
|
self.__debug(
|
|
f"Skipping scheduled match: {available} players available, "
|
|
f"{player_count} required."
|
|
)
|
|
else:
|
|
self.__debug(
|
|
"Playing a new scheduled match"
|
|
)
|
|
self.play_random_match(game=game, player_count=player_count)
|
|
except Exception as exc:
|
|
self.__debug(f"Scheduled match failed: {exc}")
|
|
raise exc
|
|
|
|
if self.__scheduler_stop.wait(interval_seconds):
|
|
break
|
|
|
|
def create_flask_app(self, import_name : str) -> Any:
|
|
"""
|
|
Create a Flask app that exposes tracker statistics.
|
|
|
|
:param import_name: The name of the application package.
|
|
:type import_name: str
|
|
"""
|
|
try:
|
|
from flask import Flask, Response, jsonify
|
|
except ImportError as exc:
|
|
raise ImportError(
|
|
"Flask is required to host the EloTracker server. "
|
|
"Install the project requirements before calling create_app()."
|
|
) from exc
|
|
|
|
app = Flask(__name__)
|
|
|
|
@app.get("/")
|
|
def index() -> Response:
|
|
return Response(
|
|
"""
|
|
<!doctype html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<title>Bot-Man-Toe Elo Tracker</title>
|
|
</head>
|
|
<body>
|
|
<main>
|
|
<h1>Bot-Man-Toe Elo Tracker</h1>
|
|
<p>The JSON API is available at /leaderboard, /matches, /players, and /health.</p>
|
|
</main>
|
|
</body>
|
|
</html>
|
|
""".strip(),
|
|
mimetype="text/html",
|
|
)
|
|
|
|
@app.get("/health")
|
|
def health() -> Response:
|
|
return jsonify({
|
|
"ok": True,
|
|
"periodic_matches": self.is_running_periodic_matches(),
|
|
})
|
|
|
|
@app.get("/leaderboard")
|
|
def leaderboard() -> Response:
|
|
return jsonify(self.get_json_leaderboard())
|
|
|
|
@app.get("/matches")
|
|
def matches() -> Response:
|
|
return jsonify(self.get_json_matches())
|
|
|
|
@app.get("/players")
|
|
def players() -> Response:
|
|
return jsonify(self.get_json_players())
|
|
|
|
return app
|
|
|
|
def is_running_periodic_matches(self) -> bool:
|
|
"""
|
|
Return whether the scheduler thread is currently alive.
|
|
|
|
:return: Whether the scheduler thread is currently alive.
|
|
:rtype: bool
|
|
"""
|
|
with self.__lock:
|
|
return self.__scheduler_thread is not None and self.__scheduler_thread.is_alive()
|
|
|
|
def load_players(self) -> None:
|
|
"""
|
|
Load the known players from disk.
|
|
|
|
:raises ValueError: File was not properly JSON-formatted.
|
|
"""
|
|
with open(self.player_file_name, encoding="utf-8") as fp:
|
|
obj = json.load(fp)
|
|
if not isinstance(obj, dict):
|
|
raise ValueError(
|
|
"Expected list of URLs in player file."
|
|
)
|
|
|
|
urls = obj.get("players", [])
|
|
if not isinstance(urls, list):
|
|
raise ValueError(
|
|
"Expected `players` field to be a list of strings."
|
|
)
|
|
|
|
players : list[pyclient.ServerAgent] = []
|
|
for url in urls:
|
|
if not isinstance(url, str):
|
|
continue
|
|
|
|
try:
|
|
agent = pyclient.Agent.from_url(url, debug=self.debug)
|
|
except ValueError:
|
|
pass # Not an available player right now
|
|
else:
|
|
players.append(agent)
|
|
|
|
with self.__lock:
|
|
self.players = players
|
|
|
|
for agent in players:
|
|
self.__get_stat(PlayerIdentifier.from_server_agent(agent))
|
|
|
|
def play_match(self, players: list[str], game: Game) -> pyclient.GameReplay:
|
|
"""
|
|
Play a single match with appointed players.
|
|
|
|
:param players: List of URLs that participate.
|
|
:type players: list[str]
|
|
:return: A summary of the game.
|
|
:rtype: pyclient.GameReplay
|
|
:raises ValueError: One of the URLs could not be accessed.
|
|
"""
|
|
agents = [
|
|
pyclient.Agent.from_url(url, debug=self.debug)
|
|
for url in players
|
|
]
|
|
|
|
replay = pyclient.PyClient(debug=self.debug).play_game(
|
|
players=agents,
|
|
start=game,
|
|
)
|
|
|
|
m = Match.from_replay(players=agents, replay=replay, timestamp=Match.now())
|
|
|
|
# Record match
|
|
m.log(self.game_file_name)
|
|
self.__register_match(m)
|
|
|
|
return replay
|
|
|
|
def play_random_match(
|
|
self,
|
|
game: Game,
|
|
player_count: int | None = None,
|
|
) -> pyclient.GameReplay:
|
|
"""
|
|
Play a game with any known players.
|
|
|
|
:param game: The game to start playing
|
|
:type game: Game
|
|
:param player_count: Optional number of players to select.
|
|
:type player_count: int | None
|
|
:raises ValueError: One of the randomly chosen URLs could not be accessed.
|
|
"""
|
|
with self.__lock:
|
|
players = [agent.url for agent in self.players]
|
|
|
|
random.shuffle(players)
|
|
|
|
if player_count is not None:
|
|
players = players[:player_count]
|
|
|
|
return self.play_match(players=players, game=game)
|
|
|
|
def start_periodic_matches(
|
|
self,
|
|
game: Game,
|
|
interval_seconds: float = 300,
|
|
player_count: int = 2,
|
|
) -> None:
|
|
"""
|
|
Start running matches periodically in a daemon thread.
|
|
|
|
:param game: Game to play.
|
|
:type game: pyclient.Game
|
|
:param interval_seconds: Number of seconds to sleep between games
|
|
:type interval_seconds: float
|
|
:param player_count: The number of players that are supposed to participate
|
|
:type player_count: int
|
|
"""
|
|
if interval_seconds <= 0:
|
|
raise ValueError("interval_seconds must be greater than zero.")
|
|
if player_count <= 1:
|
|
raise ValueError("player_count must be greater than one.")
|
|
|
|
with self.__lock:
|
|
if self.is_running_periodic_matches():
|
|
self.stop_periodic_matches()
|
|
|
|
self.__scheduler_stop.clear()
|
|
self.__scheduler_thread = threading.Thread(
|
|
target=self.__scheduler_loop,
|
|
args=(game, interval_seconds, player_count),
|
|
daemon=True,
|
|
)
|
|
self.__scheduler_thread.start()
|
|
|
|
def stop_periodic_matches(self) -> None:
|
|
"""
|
|
Stop the periodic match scheduler and wait briefly for it to exit.
|
|
"""
|
|
thread: threading.Thread | None
|
|
|
|
with self.__lock:
|
|
self.__scheduler_stop.set()
|
|
thread = self.__scheduler_thread
|
|
|
|
if thread is not None:
|
|
thread.join(timeout=5)
|
|
|
|
with self.__lock:
|
|
if self.__scheduler_thread is thread:
|
|
self.__scheduler_thread = None
|
|
|
|
def get_json_players(self) -> list[dict[str, Any]]:
|
|
"""
|
|
Return known currently available players as notebook-friendly dicts.
|
|
"""
|
|
with self.__lock:
|
|
players : list[EloStat] = list(self.__stats.values())
|
|
|
|
players.sort(
|
|
key=lambda player: (-int(player.elo), player.player_id.name)
|
|
)
|
|
|
|
return [ stat.to_json() for stat in players ]
|
|
|
|
def get_json_matches(self, limit: int | None = None) -> list[dict[str, Any]]:
|
|
"""
|
|
Return persisted match records, newest last unless limited.
|
|
|
|
:param limit: Maximum number of most recent matches
|
|
:type limit: int | None
|
|
:return: A list of most recent matches
|
|
:rtype: list[dict[str, Any]]
|
|
"""
|
|
with self.__lock:
|
|
if limit is None:
|
|
matches = self.__matches
|
|
elif limit <= 0:
|
|
matches = []
|
|
else:
|
|
matches = self.__matches[-limit:]
|
|
|
|
return [ m.to_json() for m in matches ]
|
|
|
|
def get_json_leaderboard(self) -> dict[str, Any]:
|
|
"""
|
|
Return aggregate player statistics for local use or JSON APIs.
|
|
"""
|
|
return {
|
|
"name": self.name,
|
|
"players": self.get_json_players(),
|
|
}
|
|
|
|
def start_server(
|
|
self,
|
|
host: str = "127.0.0.1",
|
|
import_name : str = __name__,
|
|
port: int = 5000,
|
|
debug: bool = False,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""
|
|
Start a Flask development server from which the ELO scores can be
|
|
viewed interactively.
|
|
"""
|
|
return (
|
|
self.create_flask_app(import_name=import_name)
|
|
.run(host=host, port=port, debug=debug, **kwargs)
|
|
)
|