Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding backup targets #170

Merged
merged 23 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions examples/move_it_client.ini
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,27 @@ login = user
# Port for file requests: 0 = random
publish_port = 0

# Example using backup targets in adition to the primary target
[eumetcast_hrit_0deg_scp_backup_targets]
# Servers to listen, <server>:<port>
# Use the port number defined with the -p flag for server or mirror
providers = satmottag2:9010
# REmote destination for the data using SCP
destination = scp://<user>@<primary target>/tmp/foo
# Login credentials for local SSH server. Using keys, so no password given
login = user
# Topic to follow. Common for every provider. Optional.
# topic = /1b/hrit-segment/0deg
# Port for file requests: 0 = random
publish_port = 0
# Backup targets to try if primary target is not working.
# Assumption: Uses the same protocol as the defined in destination.
# Also only implemented for scp
# Worth noteing is server connection_uptime. If a backup host is used and
# the next request- comes within the server connection uptime this backup
# host will be reused.
backup_targets = backup_host1 backup_host2

# Example acting as a hot spare
# NOTE: all of the clients are required to have the same section names
# as they are used to set the heartbeat publishing and subscription
Expand Down
15 changes: 14 additions & 1 deletion trollmoves/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def read_config(filename):
_set_config_defaults(res[section])
_parse_boolean_config_items(res[section], cp_[section])
_parse_nameservers(res[section], cp_[section])
_parse_backup_targets(res[section], cp_[section])
if not _check_provider_config(res, section):
continue
if not _check_destination(res, section):
Expand All @@ -110,6 +111,7 @@ def _set_config_defaults(conf):
conf.setdefault("transfer_req_timeout", 10 * DEFAULT_REQ_TIMEOUT)
conf.setdefault("nameservers", None)
conf.setdefault("create_target_directory", True)
conf.setdefault("backup_targets", None)


def _parse_boolean_config_items(conf, raw_conf):
Expand All @@ -132,6 +134,13 @@ def _parse_nameservers(conf, raw_conf):
conf["nameservers"] = val


def _parse_backup_targets(conf, raw_conf):
val = raw_conf.get("backup_targets")
if isinstance(val, str):
val = val.split()
conf["backup_targets"] = val


def _check_provider_config(conf, section):
if "providers" not in conf[section]:
LOGGER.warning("Incomplete section %s: add an 'providers' item.",
Expand Down Expand Up @@ -268,6 +277,10 @@ def _handle_beat_message(self, msg):

def _process_message(self, msg):
delay = self.ckwargs.get("processing_delay", False)
backup_targets = self.ckwargs.get('backup_targets', None)
if backup_targets:
LOGGER.debug("Adding backup_targets %s to the message.", str(backup_targets))
msg.data['backup_targets'] = backup_targets
if delay:
# If this is a hot spare client, wait for a while
# for a public "push" message which will update
Expand Down Expand Up @@ -521,7 +534,7 @@ def replace_mda(msg, kwargs):
try:
replacement = dict(item.split(':') for item in kwargs[key].split('|'))
replacement = replacement[msg.data[key]]
except ValueError:
except (ValueError, AttributeError):
replacement = kwargs[key]
msg.data[key] = replacement
return msg
Expand Down
2 changes: 1 addition & 1 deletion trollmoves/move_it_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def _run(self):
def create_publisher(port, publisher_name):
"""Create a publisher using port *port*."""
LOGGER.info("Starting publisher on port %s.", str(port))
return Publisher("tcp://*:" + str(port), publisher_name)
return Publisher("tcp://*:" + str(port), publisher_name).start()


# Generic event handler
Expand Down
39 changes: 29 additions & 10 deletions trollmoves/movers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2020
# Copyright (c) 2012-2023
#
# Author(s):
#
Expand Down Expand Up @@ -45,7 +45,7 @@
LOGGER = logging.getLogger(__name__)


def move_it(pathname, destination, attrs=None, hook=None, rel_path=None):
def move_it(pathname, destination, attrs=None, hook=None, rel_path=None, backup_targets=None):
"""Check if the file pointed by *pathname* is in the filelist, and move it if it is.

The *destination* provided is used, and if *rel_path* is provided, it will
Expand All @@ -71,7 +71,12 @@ def move_it(pathname, destination, attrs=None, hook=None, rel_path=None):
raise

try:
mover(pathname, new_dest, attrs=attrs).copy()
m = mover(pathname, new_dest, attrs=attrs, backup_targets=backup_targets)
m.copy()
last_dest = m.destination
TAlonglong marked this conversation as resolved.
Show resolved Hide resolved
if last_dest != new_dest:
new_dest = last_dest
fake_dest = clean_url(new_dest)
if hook:
hook(pathname, new_dest)
except Exception as err:
Expand All @@ -83,12 +88,12 @@ def move_it(pathname, destination, attrs=None, hook=None, rel_path=None):
else:
LOGGER.info("Successfully copied %s to %s",
pathname, str(fake_dest))

return m.destination

class Mover(object):
"""Base mover object. Doesn't do anything as it has to be subclassed."""

def __init__(self, origin, destination, attrs=None):
def __init__(self, origin, destination, attrs=None, backup_targets=None):
"""Initialize the Mover."""
LOGGER.debug("destination = %s", str(destination))
try:
Expand All @@ -102,7 +107,7 @@ def __init__(self, origin, destination, attrs=None):
LOGGER.debug("Destination: %s", str(destination))
self.origin = origin
self.attrs = attrs or {}

self.backup_targets = backup_targets
def copy(self):
"""Copy the file."""
raise NotImplementedError("Copy for scheme " + self.destination.scheme +
Expand Down Expand Up @@ -299,10 +304,19 @@ class ScpMover(Mover):
def open_connection(self):
"""Open a connection."""
from paramiko import SSHClient, SSHException

import copy
retries = 3
ssh_key_filename = self.attrs.get("ssh_key_filename", None)
timeout = self.attrs.get("ssh_connection_timeout", None)
try:
timeout = float(self.attrs.get("ssh_connection_timeout", None))
except TypeError:
timeout = None
backup_targets = copy.deepcopy(self.backup_targets)
backup_targets_message = ""
try:
num_backup_targets = len(backup_targets)
except TypeError:
num_backup_targets = None
while retries > 0:
retries -= 1
try:
Expand All @@ -329,7 +343,13 @@ def open_connection(self):
ssh_connection.close()
time.sleep(2)
LOGGER.debug("Retrying ssh connect ...")
raise IOError("Failed to ssh connect after 3 attempts")
if retries == 0 and backup_targets:
backup_target = backup_targets.pop(0)
self.destination = self.destination._replace(netloc=f"{self.destination.username}@{backup_target}")
TAlonglong marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.info("Changing destination to backup target: %s", self.destination.hostname)
retries = 3
backup_targets_message = f" to primary and {num_backup_targets} backup host(s)"
raise IOError(f"Failed to ssh connect after 3 attempts{backup_targets_message}.")

@staticmethod
def is_connected(connection):
Expand Down Expand Up @@ -363,7 +383,6 @@ def copy(self):
ssh_connection = self.get_connection(self.destination.hostname,
self.destination.port or 22,
self._dest_username)

try:
scp = SCPClient(ssh_connection.get_transport())
except Exception as err:
Expand Down
31 changes: 18 additions & 13 deletions trollmoves/server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2020
# Copyright (c) 2012-2023
#
# Author(s):
#
Expand Down Expand Up @@ -129,27 +129,28 @@ def pong(self, message):
def push(self, message):
"""Reply to push request."""
new_msg = self._move_files(message)
if new_msg is None:
if new_msg and new_msg.type is not 'err':
_destination = clean_url(new_msg.data['destination'])
TAlonglong marked this conversation as resolved.
Show resolved Hide resolved
new_msg = Message(message.subject,
_get_push_message_type(message),
data=message.data.copy())
new_msg.data['destination'] = clean_url(new_msg.data['destination'])
new_msg.data['destination'] = _destination

return new_msg

def _move_files(self, message):
error_message = None
return_message = None
for data in gen_dict_contains(message.data, 'uri'):
pathname = urlparse(data['uri']).path
rel_path = data.get('path', None)
error_message = self._validate_requested_file(pathname, message)
if error_message is not None:
return_message = self._validate_requested_file(pathname, message)
if return_message is not None:
break
error_message = self._move_file(pathname, message, rel_path)
if error_message is not None:
return_message = self._move_file(pathname, message, rel_path)
if return_message.type is "err":
break

return error_message
return return_message

def _validate_requested_file(self, pathname, message):
# FIXME: check against file_cache
Expand All @@ -161,14 +162,18 @@ def _validate_requested_file(self, pathname, message):
return None

def _move_file(self, pathname, message, rel_path):
error_message = None
return_message = None
try:
move_it(pathname, message.data['destination'], self._attrs, rel_path=rel_path)
destination = move_it(pathname, message.data['destination'],
self._attrs, rel_path=rel_path,
backup_targets=message.data.get('backup_targets', None))
message.data['destination'] = destination
except Exception as err:
error_message = Message(message.subject, "err", data=str(err))
return_message = Message(message.subject, "err", data=str(err))
else:
self._add_to_deleter(pathname)
return error_message
return_message = message
return return_message

def _add_to_deleter(self, pathname):
if self._attrs.get('compression') or self._is_delete_set():
Expand Down
85 changes: 85 additions & 0 deletions trollmoves/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,17 @@
nameservers = ns1 ns2
"""

CLIENT_CONFIG_BACKUP_TARGETS = """
[foo]
providers = bar
destination = scp://primary_host/tmp/foo
login = user
topic = /1b/hrit-segment/0deg
publish_port = 0
nameservers = ns1
backup_targets=backup_host1 backup_host2
TAlonglong marked this conversation as resolved.
Show resolved Hide resolved
"""

LOCAL_DIR = "/local"

CHAIN_BASIC_CONFIG = {"login": "user:pass", "topic": "/foo", "publish_port": 12345, "nameservers": None,
Expand Down Expand Up @@ -265,6 +276,12 @@ def client_config_2_items():
yield _write_named_temporary_config(CLIENT_CONFIG_2_ITEMS)


@pytest.fixture
def client_config_backup_targets():
"""Create a fixture for a client config."""
yield _write_named_temporary_config(CLIENT_CONFIG_BACKUP_TARGETS)


@pytest.fixture
def compression_config():
"""Create a fixture for compression config."""
Expand Down Expand Up @@ -917,6 +934,37 @@ def test_request_push_single_call(send_ack, send_request, clean_ongoing_transfer
assert len(file_cache) == 1


@patch('trollmoves.client.ongoing_transfers', new_callable=dict)
@patch('trollmoves.client.file_cache', new_callable=deque)
@patch('trollmoves.client.clean_ongoing_transfer')
@patch('trollmoves.client.send_request')
@patch('trollmoves.client.send_ack')
def test_request_push_backup_targets(send_ack, send_request, clean_ongoing_transfer, file_cache, ongoing_transfers):
"""Test trollmoves.client.request_push() with a single file."""
from trollmoves.client import request_push
from tempfile import gettempdir

msg_file_backup_targets = MSG_FILE2
msg_file_backup_targets.data['backup_targets'] = ['backup_host1', 'backup_host2']
clean_ongoing_transfer.return_value = [msg_file_backup_targets]
send_request.return_value = [msg_file_backup_targets, 'localhost']
publisher = MagicMock()
kwargs = {'transfer_req_timeout': 1.0, 'req_timeout': 1.0}

request_push(msg_file_backup_targets, gettempdir(), 'login', publisher=publisher,
**kwargs)

send_request.assert_called_once()
send_ack.assert_called_once()
# The file should be added to ongoing transfers
assert UID_FILE2 in ongoing_transfers
# And removed
clean_ongoing_transfer.assert_called_once_with(UID_FILE2)
# The transferred file should be in the cache
assert MSG_FILE2.data['uid'] in file_cache
assert len(file_cache) == 1


@patch('trollmoves.client.ongoing_transfers', new_callable=dict)
@patch('trollmoves.client.file_cache', new_callable=deque)
@patch('trollmoves.client.clean_ongoing_transfer')
Expand Down Expand Up @@ -1679,6 +1727,17 @@ def test_read_config_nameservers_are_a_list_or_tuple(client_config_2_items):
assert isinstance(conf['foo']['nameservers'], (list, tuple))


def test_read_config_backup_targets(client_config_backup_targets):
"""Test that backup targets are given as a list."""
from trollmoves.client import read_config

try:
conf = read_config(client_config_backup_targets)
finally:
os.remove(client_config_backup_targets)
assert isinstance(conf['foo']['backup_targets'], list)


@patch('trollmoves.client.ongoing_transfers', new_callable=dict)
@patch('trollmoves.client.file_cache', new_callable=deque)
@patch('trollmoves.client.clean_ongoing_transfer')
Expand All @@ -1703,3 +1762,29 @@ def test_request_push_ftp(send_ack, send_request, clean_ongoing_transfer, file_c
assert "somepass" not in file_msg.data["uri"]
assert "/some/dir" in file_msg.data["uri"]
assert not file_msg.data["uri"].startswith("ftp://")


@patch('trollmoves.client.ongoing_transfers', new_callable=dict)
@patch('trollmoves.client.file_cache', new_callable=deque)
@patch('trollmoves.client.clean_ongoing_transfer')
@patch('trollmoves.client.send_request')
@patch('trollmoves.client.send_ack')
def test_request_push_scp(send_ack, send_request, clean_ongoing_transfer, file_cache, ongoing_transfers, tmp_path):
"""Test trollmoves.client.request_push() using scp with a single file."""
from trollmoves.client import request_push

clean_ongoing_transfer.return_value = [MSG_FILE_FTP]
send_request.return_value = [MSG_FILE_FTP, 'localhost']
publisher = MagicMock()
kwargs = {'transfer_req_timeout': 1.0, 'req_timeout': 1.0}

destination = f"ftp://{os.fspath(tmp_path)}/some/dir"

request_push(MSG_FILE_FTP, destination, 'someuser:somepass', publisher=publisher,
**kwargs)

file_msg = Message(rawstr=publisher.send.mock_calls[-1][1][0])
assert "someuser" not in file_msg.data["uri"]
assert "somepass" not in file_msg.data["uri"]
assert "/some/dir" in file_msg.data["uri"]
assert not file_msg.data["uri"].startswith("ftp://")
Loading