Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronize flush_all(). #2191

Merged
merged 2 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion server/fishtest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,12 @@ def init_rundb(event):
# We do not want to do the following in the constructor of rundb since
# it writes to the db and starts the flush timer.
if rundb.is_primary_instance():
rundb.update_aggregated_data()
# We install signal handlers when all cache sensitive code in the
# main thread is finished. In that way we can safely use
# locks in the signal handlers (which also run in the main thread).
signal.signal(signal.SIGINT, rundb.exit_run)
signal.signal(signal.SIGTERM, rundb.exit_run)
rundb.update_aggregated_data()
rundb.schedule_tasks()

config.add_subscriber(add_rundb, NewRequest)
Expand Down
11 changes: 5 additions & 6 deletions server/fishtest/run_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,11 @@ def flush_buffers(self):
self.runs.replace_one({"_id": oldest_run_id}, oldest_run)

def flush_all(self):
# Note that we do not grab locks because this method is
# called from a signal handler and grabbing locks might deadlock
for run_id in list(self.run_cache):
entry = self.run_cache.get(run_id, None)
if entry is not None and entry["is_changed"]:
self.runs.replace_one({"_id": ObjectId(run_id)}, entry["run"])
with self.run_cache_lock:
for run_id, entry in self.run_cache.items():
if entry["is_changed"]:
with self.active_run_lock(run_id):
self.runs.replace_one({"_id": ObjectId(run_id)}, entry["run"])

def clean_cache(self):
now = time.time()
Expand Down
2 changes: 1 addition & 1 deletion server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ def exit_run(self, signum, frame):
if self.scheduler is not None:
print("Stopping scheduler... ", flush=True)
self.scheduler.stop()
if self.__is_primary_instance:
if self.is_primary_instance():
print("Flushing cache... ", flush=True)
self.run_cache.flush_all()
if self.port >= 0:
Expand Down
6 changes: 6 additions & 0 deletions server/fishtest/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,16 @@ def create_task(
self._refresh()
return task

def join(self):
"""Join worker thread - if possible"""
if threading.current_thread() != self.__worker_thread:
self.__worker_thread.join()

def stop(self):
"""This stops the scheduler"""
self.__thread_stopped = True
self._refresh()
self.join()

def _refresh(self):
self.__event.set()
Expand Down
Loading