Skip to content

Commit

Permalink
feat: stop listener
Browse files Browse the repository at this point in the history
* created listener for "oa.stop" sequence

* fixed issue with comparing objects of diff types

* moved list of sequences to config.STOP_SEQUENCES and changed code to accomadate multiple stop sequences, + minor changes to naming and logging

* moved list of stop sequences to config.STOP_SEQUENCES

* filter out stop sequence in crud.get_action_events

* combined keyboard listeners for macOS compatability

* style changes

* code cleanup

* special char support

* change to config.STOP_STRS and split by character in record.py and crud.py

* black

* add todo and fix special char functionality

* fix filter_stop_sequences

* added SPECIAL_CHAR_STOP_SEQUENCES and STOP_SEQUENCES that combines STOP_STRS and SPECIAL_CHAR_STOP_SEQUENCES

* STOP_SEQUENCES moved to config.py

* black

* black
  • Loading branch information
angelala3252 authored Jul 2, 2023
1 parent 5b9f735 commit 385963c
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 26 deletions.
14 changes: 14 additions & 0 deletions openadapt/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,20 @@
],
}

# each string in STOP_STRS should only contain strings that don't contain special characters
STOP_STRS = [
"oa.stop",
# TODO:
# "<ctrl>+c,<ctrl>+c,<ctrl>+c"
]
# each list in SPECIAL_CHAR_STOP_SEQUENCES should contain sequences
# containing special chars, separated by keys
SPECIAL_CHAR_STOP_SEQUENCES = [["ctrl", "ctrl", "ctrl"]]
# sequences that when typed, will stop the recording of ActionEvents in record.py
STOP_SEQUENCES = [
list(stop_str) for stop_str in STOP_STRS
] + SPECIAL_CHAR_STOP_SEQUENCES


def getenv_fallback(var_name):
rval = os.getenv(var_name) or _DEFAULTS.get(var_name)
Expand Down
97 changes: 74 additions & 23 deletions openadapt/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
WindowEvent,
PerformanceStat,
)

from openadapt.config import STOP_SEQUENCES

BATCH_SIZE = 1

Expand All @@ -19,13 +19,11 @@
window_events = []
performance_stats = []


def _insert(event_data, table, buffer=None):
"""Insert using Core API for improved performance (no rows are returned)"""

db_obj = {
column.name: None
for column in table.__table__.columns
}
db_obj = {column.name: None for column in table.__table__.columns}
for key in db_obj:
if key in event_data:
val = event_data[key]
Expand Down Expand Up @@ -74,6 +72,7 @@ def insert_window_event(recording_timestamp, event_timestamp, event_data):
}
_insert(event_data, WindowEvent, window_events)


def insert_perf_stat(recording_timestamp, event_type, start_time, end_time):
"""
Insert event performance stat into db
Expand All @@ -87,19 +86,20 @@ def insert_perf_stat(recording_timestamp, event_type, start_time, end_time):
}
_insert(event_perf_stat, PerformanceStat, performance_stats)


def get_perf_stats(recording_timestamp):
"""
return performance stats for a given recording
"""

return (
db
.query(PerformanceStat)
db.query(PerformanceStat)
.filter(PerformanceStat.recording_timestamp == recording_timestamp)
.order_by(PerformanceStat.start_time)
.all()
)


def insert_recording(recording_data):
db_obj = Recording(**recording_data)
db.add(db_obj)
Expand All @@ -109,36 +109,87 @@ def insert_recording(recording_data):


def get_latest_recording():
return (
db
.query(Recording)
.order_by(sa.desc(Recording.timestamp))
.limit(1)
.first()
)
return db.query(Recording).order_by(sa.desc(Recording.timestamp)).limit(1).first()


def get_recording(timestamp):
return (
db
.query(Recording)
.filter(Recording.timestamp == timestamp)
.first()
)
return db.query(Recording).filter(Recording.timestamp == timestamp).first()


def _get(table, recording_timestamp):
return (
db
.query(table)
db.query(table)
.filter(table.recording_timestamp == recording_timestamp)
.order_by(table.timestamp)
.all()
)


def get_action_events(recording):
return _get(ActionEvent, recording.timestamp)
action_events = _get(ActionEvent, recording.timestamp)
# filter out stop sequences listed in STOP_SEQUENCES and Ctrl + C
filter_stop_sequences(action_events)
return action_events


def filter_stop_sequences(action_events):
# check for ctrl c first
# TODO: want to handle sequences like ctrl c the same way as normal sequences
if len(action_events) >= 2:
if (
action_events[-1].canonical_key_char == "c"
and action_events[-2].canonical_key_name == "ctrl"
):
# remove ctrl c
# ctrl c must be held down at same time, so no release event
action_events.pop()
action_events.pop()
return

# create list of indices for sequence detection
# one index for each stop sequence in STOP_SEQUENCES
# start from the back of the sequence
stop_sequence_indices = [len(sequence) - 1 for sequence in STOP_SEQUENCES]

# index of sequence to remove, -1 if none found
sequence_to_remove = -1
# number of events to remove
num_to_remove = 0

for i in range(0, len(STOP_SEQUENCES)):
# iterate backwards through list of action events
for j in range(len(action_events) - 1, -1, -1):
# never go past 1st action event, so if a sequence is longer than
# len(action_events), it can't have been in the recording
if (
action_events[j].canonical_key_char
== STOP_SEQUENCES[i][stop_sequence_indices[i]]
or action_events[j].canonical_key_name
== STOP_SEQUENCES[i][stop_sequence_indices[i]]
) and action_events[j].name == "press":
# for press events, compare the characters
stop_sequence_indices[i] -= 1
num_to_remove += 1
elif action_events[j].name == "release" and (
action_events[j].canonical_key_char in STOP_SEQUENCES[i]
or action_events[j].canonical_key_name in STOP_SEQUENCES[i]
):
# can consider any release event with any sequence char as part of the sequence
num_to_remove += 1
else:
# not part of the sequence, so exit inner loop
break

if stop_sequence_indices[i] == -1:
# completed whole sequence, so set sequence_to_remove to
# current sequence and exit outer loop
sequence_to_remove = i
break

if sequence_to_remove != -1:
# remove that sequence
for _ in range(0, num_to_remove):
action_events.pop()


def get_screenshots(recording, precompute_diffs=False):
Expand Down
48 changes: 45 additions & 3 deletions openadapt/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
}
PLOT_PERFORMANCE = False


Event = namedtuple("Event", ("timestamp", "type", "data"))

global sequence_detected # Flag to indicate if a stop sequence is detected
STOP_SEQUENCES = config.STOP_SEQUENCES

utils.configure_logging(logger, LOG_LEVEL)


Expand Down Expand Up @@ -407,6 +409,7 @@ def read_window_events(
window_data = window.get_active_window_data()
if not window_data:
continue

if window_data["title"] != prev_window_data.get("title") or window_data[
"window_id"
] != prev_window_data.get("window_id"):
Expand All @@ -433,7 +436,7 @@ def read_window_events(


@trace(logger)
def performance_stats_writer (
def performance_stats_writer(
perf_q: multiprocessing.Queue,
recording_timestamp: float,
terminate_event: multiprocessing.Event,
Expand Down Expand Up @@ -506,12 +509,46 @@ def read_keyboard_events(
terminate_event: multiprocessing.Event,
recording_timestamp: float,
) -> None:
# create list of indices for sequence detection
# one index for each stop sequence in STOP_SEQUENCES
stop_sequence_indices = [0 for _ in STOP_SEQUENCES]

def on_press(event_q, key, injected):
canonical_key = keyboard_listener.canonical(key)
logger.debug(f"{key=} {injected=} {canonical_key=}")
if not injected:
handle_key(event_q, "press", key, canonical_key)

# stop sequence code
nonlocal stop_sequence_indices
global sequence_detected
canonical_key_name = getattr(canonical_key, "name", None)

for i in range(0, len(STOP_SEQUENCES)):
# check each stop sequence
stop_sequence = STOP_SEQUENCES[i]
# stop_sequence_indices[i] is the index for this stop sequence
# get canonical KeyCode of current letter in this sequence
canonical_sequence = keyboard_listener.canonical(
keyboard.KeyCode.from_char(stop_sequence[stop_sequence_indices[i]])
)

# Check if the pressed key matches the current key in this sequence
if (
canonical_key == canonical_sequence
or canonical_key_name == stop_sequence[stop_sequence_indices[i]]
):
# increment this index
stop_sequence_indices[i] += 1
else:
# Reset index since pressed key doesn't match sequence key
stop_sequence_indices[i] = 0

# Check if the entire sequence has been entered correctly
if stop_sequence_indices[i] == len(stop_sequence):
logger.info("Stop sequence entered! Stopping recording now.")
sequence_detected = True # Set global flag to end recording

def on_release(event_q, key, injected):
canonical_key = keyboard_listener.canonical(key)
logger.debug(f"{key=} {injected=} {canonical_key=}")
Expand Down Expand Up @@ -659,9 +696,14 @@ def record(

# TODO: discard events until everything is ready

global sequence_detected
sequence_detected = False

try:
while True:
while not sequence_detected:
time.sleep(1)

terminate_event.set()
except KeyboardInterrupt:
terminate_event.set()

Expand Down

0 comments on commit 385963c

Please sign in to comment.