From 8ced5bd6db4ff78945801f8e9391e56b0af02947 Mon Sep 17 00:00:00 2001 From: Andrew Gorton Date: Mon, 8 May 2023 16:35:47 -0400 Subject: [PATCH] Save event performances data. Closes #85 Fixes #93 --- .gitignore | 6 ++ ...50c04cc_add_recording_performance_stats.py | 35 +++++++++ puterbot/crud.py | 42 +++++++++- puterbot/models.py | 8 ++ puterbot/record.py | 78 ++++++++++++++----- 5 files changed, 148 insertions(+), 21 deletions(-) create mode 100644 alembic/versions/c74e250c04cc_add_recording_performance_stats.py diff --git a/.gitignore b/.gitignore index fea93ba97..493849311 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,9 @@ cache # db *.db + +# VSCode +.VSCode + +# Generated performance charts +performance diff --git a/alembic/versions/c74e250c04cc_add_recording_performance_stats.py b/alembic/versions/c74e250c04cc_add_recording_performance_stats.py new file mode 100644 index 000000000..2199a3005 --- /dev/null +++ b/alembic/versions/c74e250c04cc_add_recording_performance_stats.py @@ -0,0 +1,35 @@ +"""add recording performance stats + +Revision ID: c74e250c04cc +Revises: 5139d7df38f6 +Create Date: 2023-05-08 12:45:10.082401 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'c74e250c04cc' +down_revision = '5139d7df38f6' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('performance_stat', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('recording_timestamp', sa.Integer(), nullable=True), + sa.Column('event_type', sa.String(), nullable=True), + sa.Column('start_time', sa.Integer(), nullable=True), + sa.Column('end_time', sa.Integer(), nullable=True), + sa.PrimaryKeyConstraint('id', name=op.f('pk_performance_stat')) + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('performance_stat') + # ### end Alembic commands ### diff --git a/puterbot/crud.py b/puterbot/crud.py index 76f24bcf8..dedbae6b1 100644 --- a/puterbot/crud.py +++ b/puterbot/crud.py @@ -2,7 +2,7 @@ import sqlalchemy as sa from puterbot.db import Session -from puterbot.models import InputEvent, Screenshot, Recording, WindowEvent +from puterbot.models import InputEvent, Screenshot, Recording, WindowEvent, PerformanceStat BATCH_SIZE = 1 @@ -68,6 +68,46 @@ def insert_window_event(recording_timestamp, event_timestamp, event_data): } _insert(event_data, WindowEvent, window_events) +def insert_perf_stat(recording_timestamp, event_type, start_time, end_time, buffer=None): + """ + Insert event performance stat into db + """ + + # Insert using Core API for improved performance (no rows are returned) + db_obj = { + column.name: None + for column in PerformanceStat.__table__.columns + } + + db_obj["recording_timestamp"] = recording_timestamp + db_obj["event_type"] = event_type + db_obj["start_time"] = start_time + db_obj["end_time"] = end_time + + if buffer is not None: + buffer.append(db_obj) + + if buffer is None or len(buffer) >= BATCH_SIZE: + to_insert = buffer or [db_obj] + result = db.execute(sa.insert(PerformanceStat), to_insert) + db.commit() + if buffer: + buffer.clear() + # Note: this does not contain the inserted row(s) + return result + +def get_perf_stats(recording_timestamp): + """ + return performance stats for a given recording + """ + + return ( + db + .query(PerformanceStat) + .filter(PerformanceStat.recording_timestamp == recording_timestamp) + .order_by(PerformanceStat.start_time) + .all() + ) def insert_recording(recording_data): db_obj = Recording(**recording_data) diff --git a/puterbot/models.py b/puterbot/models.py index deccac479..2df73ba61 100644 --- a/puterbot/models.py +++ b/puterbot/models.py @@ -222,3 +222,11 @@ class WindowEvent(Base): top = sa.Column(sa.Integer) width = sa.Column(sa.Integer) height = sa.Column(sa.Integer) + +class PerformanceStat(Base): + __tablename__ = "performance_stat" + id = sa.Column(sa.Integer, primary_key=True) + recording_timestamp = sa.Column(sa.Integer) + event_type = sa.Column(sa.String) + start_time = sa.Column(sa.Integer) + end_time = sa.Column(sa.Integer) diff --git a/puterbot/record.py b/puterbot/record.py index 5e5349f07..7d52ba69c 100644 --- a/puterbot/record.py +++ b/puterbot/record.py @@ -16,7 +16,6 @@ import sys import threading import time -import zlib from loguru import logger from pynput import keyboard, mouse @@ -25,12 +24,13 @@ import mss.tools import pygetwindow as pgw -from puterbot.config import ROOT_DIRPATH from puterbot.crud import ( insert_input_event, insert_screenshot, insert_recording, insert_window_event, + insert_perf_stat, + get_perf_stats, ) from puterbot.utils import ( configure_logging, @@ -40,6 +40,7 @@ take_screenshot, get_timestamp, set_start_time, + rows2dicts, ) @@ -51,7 +52,7 @@ "window": True, } DIRNAME_PERFORMANCE_PLOTS = "performance" -PLOT_PERFORMANCE = False +PLOT_PERFORMANCE = True Event = namedtuple("Event", ("timestamp", "type", "data")) @@ -431,10 +432,35 @@ def read_window_events( prev_geometry = geometry -def plot_performance( - recording_timestamp: float, +def performance_stats_writer ( perf_q: multiprocessing.Queue, -) -> None: + recording_timestamp: float, + terminate_event: multiprocessing.Event, +): + """ + Write performance stats to the db. + Each entry includes the event type, start time and end time + + Args: + perf_q: A queue for collecting performance data. + recording_timestamp: The timestamp of the recording. + terminate_event: An event to signal the termination of the process. + """ + + configure_logging(logger, LOG_LEVEL) + set_start_time(recording_timestamp) + logger.info("performance stats writer starting") + signal.signal(signal.SIGINT, signal.SIG_IGN) + while not terminate_event.is_set() or not perf_q.empty(): + try: + event_type, start_time, end_time = perf_q.get_nowait() + except queue.Empty: + continue + + insert_perf_stat(recording_timestamp, event_type, start_time, end_time) + logger.info("performance stats writer done") + +def plot_performance(recording_timestamp: float) -> None: """ Plot the performance of the event processing and writing. @@ -448,18 +474,17 @@ def plot_performance( type_to_proc_times = defaultdict(list) type_to_count = Counter() type_to_timestamps = defaultdict(list) - while not perf_q.empty(): - event_type, start_time, end_time = perf_q.get() - prev_start_time = type_to_prev_start_time.get(event_type, start_time) - start_time_delta = start_time - prev_start_time - type_to_start_time_deltas[event_type].append(start_time_delta) - type_to_prev_start_time[event_type] = start_time - type_to_proc_times[event_type].append(end_time - start_time) - type_to_count[event_type] += 1 - type_to_timestamps[event_type].append(start_time) - - if not PLOT_PERFORMANCE: - return + + perf_stats = get_perf_stats(recording_timestamp) + perf_stat_dicts = rows2dicts(perf_stats) + for perf_stat in perf_stat_dicts: + prev_start_time = type_to_prev_start_time.get(perf_stat["event_type"], perf_stat["start_time"]) + start_time_delta = perf_stat["start_time"] - prev_start_time + type_to_start_time_deltas[perf_stat["event_type"]].append(start_time_delta) + type_to_prev_start_time[perf_stat["event_type"]] = perf_stat["start_time"] + type_to_proc_times[perf_stat["event_type"]].append(perf_stat["end_time"] - perf_stat["start_time"]) + type_to_count[perf_stat["event_type"]] += 1 + type_to_timestamps[perf_stat["event_type"]].append(perf_stat["start_time"]) y_data = {"proc_times": {}, "start_time_deltas": {}} for i, event_type in enumerate(type_to_count): @@ -668,6 +693,17 @@ def record( ) window_event_writer.start() + terminate_perf_event = multiprocessing.Event() + perf_stat_writer = multiprocessing.Process( + target=performance_stats_writer, + args=( + perf_q, + recording_timestamp, + terminate_perf_event, + ), + ) + perf_stat_writer.start() + # TODO: discard events until everything is ready try: @@ -686,10 +722,12 @@ def record( input_event_writer.join() window_event_writer.join() - plot_performance(recording_timestamp, perf_q) + terminate_perf_event.set() - logger.info("done") + if PLOT_PERFORMANCE: + plot_performance(recording_timestamp) + logger.info("done") if __name__ == "__main__": fire.Fire(record)