You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
239 lines
6.4 KiB
239 lines
6.4 KiB
import asyncio
|
|
import os
|
|
import re
|
|
|
|
import aiofiles.os
|
|
|
|
from nio import AsyncClient, MatrixRoom, RoomMessageText, UploadResponse
|
|
from PIL import Image
|
|
from pyboy import PyBoy, WindowEvent
|
|
import yaml
|
|
|
|
# --------------------------
|
|
# Constant definitions
|
|
|
|
with open('config/config.yml', 'r', encoding='utf-8') as open_file:
|
|
config = yaml.safe_load(open_file)
|
|
|
|
GAME_FILE_NAME = config['files']['game_save']
|
|
STATE_FILE_NAME = config['files']['latest_state']
|
|
SCREENSHOT_FILE_NAME = config['files']['screenshot']
|
|
|
|
HOMESERVER = config['matrix']['homeserver']
|
|
ACCESS_TOKEN = config['matrix']['access_token']
|
|
GAME_ROOM = config['matrix']['game_room']
|
|
|
|
# --------------------------
|
|
# Program setup
|
|
|
|
# Start emulator
|
|
pyboy = PyBoy(GAME_FILE_NAME)
|
|
progress = dict(still_running=True)
|
|
|
|
# Load old state
|
|
try:
|
|
pyboy.load_state(open(STATE_FILE_NAME, 'rb'))
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
# Set up a Matrix client
|
|
client = AsyncClient(HOMESERVER)
|
|
client.access_token = ACCESS_TOKEN
|
|
|
|
# --------------------------
|
|
# GameBoy Emulation
|
|
|
|
async def run_simulation():
|
|
"""
|
|
Keep the game running while no-one's giving any input.
|
|
"""
|
|
|
|
while not pyboy.tick():
|
|
# print("Another tick - maybe it's stuck?")
|
|
await asyncio.sleep(0)
|
|
|
|
progress['still_running'] = False
|
|
pyboy.stop()
|
|
raise RuntimeError("We are done here.")
|
|
|
|
def press_button_shortly(press, release, ticks = 5):
|
|
"""
|
|
Press a button for a certain amount of ticks.
|
|
"""
|
|
pyboy.send_input(press)
|
|
|
|
for _ in range(ticks):
|
|
pyboy.tick()
|
|
|
|
pyboy.send_input(release)
|
|
|
|
def press_button(button : str) -> str | None:
|
|
"""
|
|
Press the button that's given as a string.
|
|
"""
|
|
button = button.replace(' ', '').lower().strip()
|
|
|
|
if not re.fullmatch(
|
|
r'a|b|up?|d(own)?|l(eft)?|r(ight)?|start|select|screen(shot)?', button
|
|
):
|
|
return None
|
|
|
|
match button[0]:
|
|
case 'a':
|
|
press_button_shortly(
|
|
WindowEvent.PRESS_BUTTON_A,
|
|
WindowEvent.RELEASE_BUTTON_A
|
|
)
|
|
return 'a'
|
|
case 'b':
|
|
press_button_shortly(
|
|
WindowEvent.PRESS_BUTTON_B,
|
|
WindowEvent.RELEASE_BUTTON_B
|
|
)
|
|
return 'b'
|
|
case 'u':
|
|
press_button_shortly(
|
|
WindowEvent.PRESS_ARROW_UP,
|
|
WindowEvent.RELEASE_ARROW_UP
|
|
)
|
|
return 'up'
|
|
case 'd':
|
|
press_button_shortly(
|
|
WindowEvent.PRESS_ARROW_DOWN,
|
|
WindowEvent.RELEASE_ARROW_DOWN
|
|
)
|
|
return 'down'
|
|
case 'l':
|
|
press_button_shortly(
|
|
WindowEvent.PRESS_ARROW_LEFT,
|
|
WindowEvent.RELEASE_ARROW_LEFT
|
|
)
|
|
return 'left'
|
|
case 'r':
|
|
press_button_shortly(
|
|
WindowEvent.PRESS_ARROW_RIGHT,
|
|
WindowEvent.RELEASE_ARROW_RIGHT
|
|
)
|
|
return 'right'
|
|
case 's':
|
|
match button[1]:
|
|
# start
|
|
case 't':
|
|
press_button_shortly(
|
|
WindowEvent.PRESS_BUTTON_START,
|
|
WindowEvent.RELEASE_BUTTON_START
|
|
)
|
|
return 'start'
|
|
|
|
# select
|
|
case 'e':
|
|
press_button_shortly(
|
|
WindowEvent.PRESS_BUTTON_SELECT,
|
|
WindowEvent.RELEASE_BUTTON_SELECT
|
|
)
|
|
return 'select'
|
|
|
|
# screenshot
|
|
case 'c':
|
|
img = pyboy.screen_image()
|
|
img.save(SCREENSHOT_FILE_NAME)
|
|
return 'screenshot'
|
|
|
|
|
|
# --------------------------
|
|
# Matrix API
|
|
|
|
async def send_image(client, room_id, image):
|
|
"""Send image to toom.
|
|
|
|
Arguments:
|
|
---------
|
|
client : Client
|
|
room_id : str
|
|
image : str, file name of image
|
|
|
|
https://matrix-nio.readthedocs.io/en/latest/examples.html#sending-an-image
|
|
"""
|
|
im = Image.open(image)
|
|
(width, height) = im.size # im.size returns (width,height) tuple
|
|
|
|
# first do an upload of image, then send URI of upload to room
|
|
file_stat = await aiofiles.os.stat(image)
|
|
async with aiofiles.open(image, "r+b") as f:
|
|
resp, maybe_keys = await client.upload(
|
|
f,
|
|
content_type="image/png", # image/jpeg
|
|
filename=os.path.basename(image),
|
|
filesize=file_stat.st_size,
|
|
)
|
|
if isinstance(resp, UploadResponse):
|
|
print("Image was uploaded successfully to server. ")
|
|
else:
|
|
print(f"Failed to upload image. Failure response: {resp}")
|
|
|
|
content = {
|
|
"body": os.path.basename(image), # descriptive title
|
|
"info": {
|
|
"size": file_stat.st_size,
|
|
"mimetype": 'image/png',
|
|
"thumbnail_info": None, # TODO
|
|
"w": width, # width in pixel
|
|
"h": height, # height in pixel
|
|
"thumbnail_url": None, # TODO
|
|
},
|
|
"msgtype": "m.image",
|
|
"url": resp.content_uri,
|
|
}
|
|
|
|
try:
|
|
await client.room_send(room_id, message_type="m.room.message", content=content)
|
|
print("Sent screenshot!")
|
|
except Exception:
|
|
print(f"Failed to send screenshot.")
|
|
|
|
async def message_callback(room : MatrixRoom, event : RoomMessageText) -> None:
|
|
"""
|
|
Process a message if it was sent
|
|
"""
|
|
if room.room_id != GAME_ROOM:
|
|
return
|
|
|
|
exec = press_button(event.body)
|
|
|
|
if exec is not None:
|
|
# Save current state
|
|
pyboy.save_state(open(STATE_FILE_NAME, 'wb'))
|
|
|
|
# Mark accepted button message as read
|
|
await client.room_read_markers(
|
|
room_id=GAME_ROOM,
|
|
fully_read_event=event.event_id,
|
|
read_event=event.event_id
|
|
)
|
|
|
|
|
|
# Send screenshot if requested
|
|
if exec == 'screenshot':
|
|
await send_image(client, GAME_ROOM, SCREENSHOT_FILE_NAME)
|
|
|
|
|
|
async def setup_matrix_client():
|
|
client.add_event_callback(message_callback, RoomMessageText)
|
|
await client.join(GAME_ROOM)
|
|
await client.sync_forever(timeout=30000)
|
|
|
|
# --------------------------
|
|
# Run the program
|
|
|
|
async def main():
|
|
await asyncio.gather(
|
|
|
|
# Emulate the game
|
|
run_simulation(),
|
|
|
|
# Gather input from Matrix, then handle it
|
|
setup_matrix_client()
|
|
)
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(main()) |