Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove client listener callback #103

Merged
merged 4 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions bin/move_it_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import time

from trollmoves.move_it_base import MoveItBase
from trollmoves.client import StatCollector

LOGGER = logging.getLogger("move_it_client")
LOG_FORMAT = "[%(asctime)s %(levelname)-8s %(name)s] %(message)s"
Expand Down Expand Up @@ -117,8 +116,6 @@ def parse_args():
help="The configuration file to run on.")
parser.add_argument("-l", "--log",
help="The file to log to. stdout otherwise.")
parser.add_argument("-s", "--stats",
help="Save stats to this file")
parser.add_argument("-v", "--verbose", default=False, action="store_true",
help="Toggle verbose logging")
return parser.parse_args()
Expand All @@ -130,11 +127,7 @@ def main():
client = MoveItClient(cmd_args)

try:
if cmd_args.stats:
stat = StatCollector(cmd_args.stats)
client.reload_cfg_file(cmd_args.config_file, callback=stat.collect)
else:
client.reload_cfg_file(cmd_args.config_file)
client.reload_cfg_file(cmd_args.config_file)
client.run()
except KeyboardInterrupt:
LOGGER.debug("Interrupting")
Expand Down
72 changes: 14 additions & 58 deletions trollmoves/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from contextlib import suppress

import tarfile
import pyinotify
from zmq import LINGER, POLLIN, REQ, Poller
import bz2
from posttroll import get_context
Expand Down Expand Up @@ -137,12 +136,11 @@ def read_config(filename):
class Listener(Thread):
"""PyTroll listener class for reading messages for Trollduction."""

def __init__(self, address, topics, callback, *args, die_event=None, **kwargs):
def __init__(self, address, topics, *args, die_event=None, **kwargs):
"""Init Listener object."""
super(Listener, self).__init__()

self.topics = topics
self.callback = callback
self.subscriber = None
self.address = address
self.running = False
Expand All @@ -156,7 +154,7 @@ def __init__(self, address, topics, callback, *args, die_event=None, **kwargs):
def restart(self):
"""Restart the listener, returns a new running instance."""
self.stop()
new_listener = self.__class__(self.address, self.topics, self.callback, *self.cargs,
new_listener = self.__class__(self.address, self.topics, *self.cargs,
die_event=self.die_event, **self.ckwargs)
new_listener.death_count = self.death_count + 1
new_listener.start()
Expand Down Expand Up @@ -235,10 +233,10 @@ def run(self):
# the ongoing transfers before starting processing here
delay = self.ckwargs.get("processing_delay", False)
if delay:
add_timer(float(delay), self.callback, msg, *self.cargs,
add_timer(float(delay), request_push, msg, *self.cargs,
**self.ckwargs)
else:
self.callback(msg, *self.cargs, **self.ckwargs)
request_push(msg, *self.cargs, **self.ckwargs)

LOGGER.debug("Exiting listener %s", str(self.address))
except Exception as err:
Expand Down Expand Up @@ -513,12 +511,12 @@ def get_next_msg(uid):
return ongoing_transfers[uid].pop(0)


def add_timer(timeout, callback, msg, *args, **kwargs):
def add_timer(timeout, func, msg, *args, **kwargs):
"""Add a timer for hot spare."""
huid = get_msg_uid(msg)
cargs = [msg] + list(args)
with hot_spare_timer_lock:
timer = CTimer(timeout, callback, args=cargs, kwargs=kwargs)
timer = CTimer(timeout, func, args=cargs, kwargs=kwargs)
ongoing_hot_spare_timers[huid] = timer
ongoing_hot_spare_timers[huid].start()
LOGGER.debug("Added timer for UID %s.", huid)
Expand Down Expand Up @@ -649,9 +647,8 @@ def setup_publisher(self):
except (KeyError, NameError):
pass

def setup_listeners(self, callback, keep_providers=None):
def setup_listeners(self, keep_providers=None):
"""Set up the listeners."""
self.callback = callback
keep_providers = keep_providers or []
try:
topics = []
Expand Down Expand Up @@ -681,7 +678,6 @@ def setup_listeners(self, callback, keep_providers=None):
listener = Listener(
provider,
topics,
callback,
publisher=self.publisher,
die_event=self.listener_died_event,
**self._config)
Expand Down Expand Up @@ -746,24 +742,24 @@ def publisher_needs_restarting(self, other_config):
return True
return False

def refresh(self, new_config, callback):
def refresh(self, new_config):
"""Refresh the chain with new config."""
publisher_needs_restarting = self.publisher_needs_restarting(new_config)
unchanged_providers = self.get_unchanged_providers(new_config)
self._config = new_config
if publisher_needs_restarting:
self._refresh_publisher()
self._refresh_listeners(callback, unchanged_providers)
self._refresh_listeners(unchanged_providers)
if not self.running:
self.start()

def _refresh_publisher(self):
self._stop_publisher()
self.setup_publisher()

def _refresh_listeners(self, callback, unchanged_providers):
def _refresh_listeners(self, unchanged_providers):
self.reset_listeners(keep_providers=unchanged_providers)
self.setup_listeners(callback, keep_providers=unchanged_providers)
self.setup_listeners(keep_providers=unchanged_providers)

def reset_listeners(self, keep_providers=None):
"""Reset the listeners."""
Expand Down Expand Up @@ -791,12 +787,12 @@ def restart(self):
"""Restart the chain, return a new running instance."""
self.stop()
new_chain = self.__class__(self._name, self._config)
new_chain.setup_listeners(self.callback)
new_chain.setup_listeners()
new_chain.start()
return new_chain


def reload_config(filename, chains, callback=request_push):
def reload_config(filename, chains):
"""Rebuild chains if needed (if the configuration changed) from *filename*."""
LOGGER.debug("New config file detected: %s", filename)

Expand All @@ -815,7 +811,7 @@ def reload_config(filename, chains, callback=request_push):
chains[key] = Chain(key, new_config)
chains[key].start()

chains[key].refresh(new_config, callback)
chains[key].refresh(new_config)

LOGGER.debug("%s %s", verb, key)

Expand Down Expand Up @@ -922,46 +918,6 @@ def send_and_recv(self, msg, timeout=DEFAULT_REQ_TIMEOUT):

return rep

# Generic event handler


class EventHandler(pyinotify.ProcessEvent):
"""Handle events with a generic *fun* function."""

def __init__(self, fun, *args, **kwargs):
"""Initialize handler."""
pyinotify.ProcessEvent.__init__(self, *args, **kwargs)
self._fun = fun

def process_IN_CLOSE_WRITE(self, event):
"""Process on closing after writing."""
self._fun(event.pathname)

def process_IN_CREATE(self, event):
"""Process on closing after linking."""
try:
if os.stat(event.pathname).st_nlink > 1:
self._fun(event.pathname)
except OSError:
return

def process_IN_MOVED_TO(self, event):
"""Process on closing after moving."""
self._fun(event.pathname)


class StatCollector(object):
"""StatCollector class."""

def __init__(self, statfile):
"""Initialize collector."""
self.statfile = statfile

def collect(self, msg, *args, **kwargs):
"""Collect."""
with open(self.statfile, 'a') as fd:
fd.write(time.asctime() + " - " + str(msg) + "\n")


def terminate(chains):
"""Terminate client chains."""
Expand Down
Loading