Skip to content

Commit

Permalink
Merge pull request #170 from TAlonglong/issue169
Browse files Browse the repository at this point in the history
adding backup targets
  • Loading branch information
pnuu authored Jun 5, 2023
2 parents 8cdb573 + 95c847f commit 38af438
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 25 deletions.
21 changes: 21 additions & 0 deletions examples/move_it_client.ini
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,27 @@ login = user
# Port for file requests: 0 = random
publish_port = 0

# Example using backup targets in adition to the primary target
[eumetcast_hrit_0deg_scp_backup_targets]
# Servers to listen, <server>:<port>
# Use the port number defined with the -p flag for server or mirror
providers = satmottag2:9010
# REmote destination for the data using SCP
destination = scp://<user>@<primary target>/tmp/foo
# Login credentials for local SSH server. Using keys, so no password given
login = user
# Topic to follow. Common for every provider. Optional.
# topic = /1b/hrit-segment/0deg
# Port for file requests: 0 = random
publish_port = 0
# Backup targets to try if primary target is not working.
# Assumption: Uses the same protocol as the defined in destination.
# Also only implemented for scp
# Worth noteing is server connection_uptime. If a backup host is used and
# the next request- comes within the server connection uptime this backup
# host will be reused.
backup_targets = backup_host1 backup_host2

# Example acting as a hot spare
# NOTE: all of the clients are required to have the same section names
# as they are used to set the heartbeat publishing and subscription
Expand Down
15 changes: 14 additions & 1 deletion trollmoves/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def read_config(filename):
_set_config_defaults(res[section])
_parse_boolean_config_items(res[section], cp_[section])
_parse_nameservers(res[section], cp_[section])
_parse_backup_targets(res[section], cp_[section])
if not _check_provider_config(res, section):
continue
if not _check_destination(res, section):
Expand All @@ -110,6 +111,7 @@ def _set_config_defaults(conf):
conf.setdefault("transfer_req_timeout", 10 * DEFAULT_REQ_TIMEOUT)
conf.setdefault("nameservers", None)
conf.setdefault("create_target_directory", True)
conf.setdefault("backup_targets", None)


def _parse_boolean_config_items(conf, raw_conf):
Expand All @@ -132,6 +134,13 @@ def _parse_nameservers(conf, raw_conf):
conf["nameservers"] = val


def _parse_backup_targets(conf, raw_conf):
val = raw_conf.get("backup_targets")
if isinstance(val, str):
val = val.split()
conf["backup_targets"] = val


def _check_provider_config(conf, section):
if "providers" not in conf[section]:
LOGGER.warning("Incomplete section %s: add an 'providers' item.",
Expand Down Expand Up @@ -268,6 +277,10 @@ def _handle_beat_message(self, msg):

def _process_message(self, msg):
delay = self.ckwargs.get("processing_delay", False)
backup_targets = self.ckwargs.get('backup_targets', None)
if backup_targets:
LOGGER.debug("Adding backup_targets %s to the message.", str(backup_targets))
msg.data['backup_targets'] = backup_targets
if delay:
# If this is a hot spare client, wait for a while
# for a public "push" message which will update
Expand Down Expand Up @@ -521,7 +534,7 @@ def replace_mda(msg, kwargs):
try:
replacement = dict(item.split(':') for item in kwargs[key].split('|'))
replacement = replacement[msg.data[key]]
except ValueError:
except (ValueError, AttributeError):
replacement = kwargs[key]
msg.data[key] = replacement
return msg
Expand Down
41 changes: 30 additions & 11 deletions trollmoves/movers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2020
# Copyright (c) 2012-2023
#
# Author(s):
#
Expand Down Expand Up @@ -45,7 +45,7 @@
LOGGER = logging.getLogger(__name__)


def move_it(pathname, destination, attrs=None, hook=None, rel_path=None):
def move_it(pathname, destination, attrs=None, hook=None, rel_path=None, backup_targets=None):
"""Check if the file pointed by *pathname* is in the filelist, and move it if it is.
The *destination* provided is used, and if *rel_path* is provided, it will
Expand All @@ -71,7 +71,12 @@ def move_it(pathname, destination, attrs=None, hook=None, rel_path=None):
raise

try:
mover(pathname, new_dest, attrs=attrs).copy()
m = mover(pathname, new_dest, attrs=attrs, backup_targets=backup_targets)
m.copy()
last_dest = m.destination
if last_dest != new_dest:
new_dest = last_dest
fake_dest = clean_url(new_dest)
if hook:
hook(pathname, new_dest)
except Exception as err:
Expand All @@ -83,12 +88,12 @@ def move_it(pathname, destination, attrs=None, hook=None, rel_path=None):
else:
LOGGER.info("Successfully copied %s to %s",
pathname, str(fake_dest))

return m.destination

class Mover(object):
"""Base mover object. Doesn't do anything as it has to be subclassed."""

def __init__(self, origin, destination, attrs=None):
def __init__(self, origin, destination, attrs=None, backup_targets=None):
"""Initialize the Mover."""
LOGGER.debug("destination = %s", str(destination))
try:
Expand All @@ -102,7 +107,7 @@ def __init__(self, origin, destination, attrs=None):
LOGGER.debug("Destination: %s", str(destination))
self.origin = origin
self.attrs = attrs or {}

self.backup_targets = backup_targets
def copy(self):
"""Copy the file."""
raise NotImplementedError("Copy for scheme " + self.destination.scheme +
Expand Down Expand Up @@ -133,7 +138,7 @@ def get_connection(self, hostname, port, username=None):
timer = CTimer(int(self.attrs.get('connection_uptime', 30)),
self.delete_connection, (connection,))
timer.start()
self.active_connections[(hostname, port, username)] = connection, timer
self.active_connections[(self.destination.hostname, port, username)] = connection, timer

return connection

Expand Down Expand Up @@ -299,10 +304,19 @@ class ScpMover(Mover):
def open_connection(self):
"""Open a connection."""
from paramiko import SSHClient, SSHException

import copy
retries = 3
ssh_key_filename = self.attrs.get("ssh_key_filename", None)
timeout = self.attrs.get("ssh_connection_timeout", None)
try:
timeout = float(self.attrs.get("ssh_connection_timeout", None))
except TypeError:
timeout = None
backup_targets = copy.deepcopy(self.backup_targets)
backup_targets_message = ""
try:
num_backup_targets = len(backup_targets)
except TypeError:
num_backup_targets = None
while retries > 0:
retries -= 1
try:
Expand All @@ -329,7 +343,13 @@ def open_connection(self):
ssh_connection.close()
time.sleep(2)
LOGGER.debug("Retrying ssh connect ...")
raise IOError("Failed to ssh connect after 3 attempts")
if retries == 0 and backup_targets:
backup_target = backup_targets.pop(0)
self.destination = self.destination._replace(netloc=f"{self.destination.username}@{backup_target}")
LOGGER.info("Changing destination to backup target: %s", self.destination.hostname)
retries = 3
backup_targets_message = f" to primary and {num_backup_targets} backup host(s)"
raise IOError(f"Failed to ssh connect after 3 attempts{backup_targets_message}.")

@staticmethod
def is_connected(connection):
Expand Down Expand Up @@ -363,7 +383,6 @@ def copy(self):
ssh_connection = self.get_connection(self.destination.hostname,
self.destination.port or 22,
self._dest_username)

try:
scp = SCPClient(ssh_connection.get_transport())
except Exception as err:
Expand Down
31 changes: 18 additions & 13 deletions trollmoves/server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2020
# Copyright (c) 2012-2023
#
# Author(s):
#
Expand Down Expand Up @@ -129,27 +129,28 @@ def pong(self, message):
def push(self, message):
"""Reply to push request."""
new_msg = self._move_files(message)
if new_msg is None:
if new_msg and new_msg.type is not 'err':
_destination = clean_url(new_msg.data['destination'])
new_msg = Message(message.subject,
_get_push_message_type(message),
data=message.data.copy())
new_msg.data['destination'] = clean_url(new_msg.data['destination'])
new_msg.data['destination'] = _destination

return new_msg

def _move_files(self, message):
error_message = None
return_message = None
for data in gen_dict_contains(message.data, 'uri'):
pathname = urlparse(data['uri']).path
rel_path = data.get('path', None)
error_message = self._validate_requested_file(pathname, message)
if error_message is not None:
return_message = self._validate_requested_file(pathname, message)
if return_message is not None:
break
error_message = self._move_file(pathname, message, rel_path)
if error_message is not None:
return_message = self._move_file(pathname, message, rel_path)
if return_message.type is "err":
break

return error_message
return return_message

def _validate_requested_file(self, pathname, message):
# FIXME: check against file_cache
Expand All @@ -161,14 +162,18 @@ def _validate_requested_file(self, pathname, message):
return None

def _move_file(self, pathname, message, rel_path):
error_message = None
return_message = None
try:
move_it(pathname, message.data['destination'], self._attrs, rel_path=rel_path)
destination = move_it(pathname, message.data['destination'],
self._attrs, rel_path=rel_path,
backup_targets=message.data.get('backup_targets', None))
message.data['destination'] = destination
except Exception as err:
error_message = Message(message.subject, "err", data=str(err))
return_message = Message(message.subject, "err", data=str(err))
else:
self._add_to_deleter(pathname)
return error_message
return_message = message
return return_message

def _add_to_deleter(self, pathname):
if self._attrs.get('compression') or self._is_delete_set():
Expand Down
85 changes: 85 additions & 0 deletions trollmoves/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,17 @@
nameservers = ns1 ns2
"""

CLIENT_CONFIG_BACKUP_TARGETS = """
[foo]
providers = bar
destination = scp://primary_host/tmp/foo
login = user
topic = /1b/hrit-segment/0deg
publish_port = 0
nameservers = ns1
backup_targets=backup_host1 backup_host2
"""

LOCAL_DIR = "/local"

CHAIN_BASIC_CONFIG = {"login": "user:pass", "topic": "/foo", "publish_port": 12345, "nameservers": None,
Expand Down Expand Up @@ -265,6 +276,12 @@ def client_config_2_items():
yield _write_named_temporary_config(CLIENT_CONFIG_2_ITEMS)


@pytest.fixture
def client_config_backup_targets():
"""Create a fixture for a client config."""
yield _write_named_temporary_config(CLIENT_CONFIG_BACKUP_TARGETS)


@pytest.fixture
def compression_config():
"""Create a fixture for compression config."""
Expand Down Expand Up @@ -917,6 +934,37 @@ def test_request_push_single_call(send_ack, send_request, clean_ongoing_transfer
assert len(file_cache) == 1


@patch('trollmoves.client.ongoing_transfers', new_callable=dict)
@patch('trollmoves.client.file_cache', new_callable=deque)
@patch('trollmoves.client.clean_ongoing_transfer')
@patch('trollmoves.client.send_request')
@patch('trollmoves.client.send_ack')
def test_request_push_backup_targets(send_ack, send_request, clean_ongoing_transfer, file_cache, ongoing_transfers):
"""Test trollmoves.client.request_push() with a single file."""
from trollmoves.client import request_push
from tempfile import gettempdir

msg_file_backup_targets = MSG_FILE2
msg_file_backup_targets.data['backup_targets'] = ['backup_host1', 'backup_host2']
clean_ongoing_transfer.return_value = [msg_file_backup_targets]
send_request.return_value = [msg_file_backup_targets, 'localhost']
publisher = MagicMock()
kwargs = {'transfer_req_timeout': 1.0, 'req_timeout': 1.0}

request_push(msg_file_backup_targets, gettempdir(), 'login', publisher=publisher,
**kwargs)

send_request.assert_called_once()
send_ack.assert_called_once()
# The file should be added to ongoing transfers
assert UID_FILE2 in ongoing_transfers
# And removed
clean_ongoing_transfer.assert_called_once_with(UID_FILE2)
# The transferred file should be in the cache
assert MSG_FILE2.data['uid'] in file_cache
assert len(file_cache) == 1


@patch('trollmoves.client.ongoing_transfers', new_callable=dict)
@patch('trollmoves.client.file_cache', new_callable=deque)
@patch('trollmoves.client.clean_ongoing_transfer')
Expand Down Expand Up @@ -1679,6 +1727,17 @@ def test_read_config_nameservers_are_a_list_or_tuple(client_config_2_items):
assert isinstance(conf['foo']['nameservers'], (list, tuple))


def test_read_config_backup_targets(client_config_backup_targets):
"""Test that backup targets are given as a list."""
from trollmoves.client import read_config

try:
conf = read_config(client_config_backup_targets)
finally:
os.remove(client_config_backup_targets)
assert isinstance(conf['foo']['backup_targets'], list)


@patch('trollmoves.client.ongoing_transfers', new_callable=dict)
@patch('trollmoves.client.file_cache', new_callable=deque)
@patch('trollmoves.client.clean_ongoing_transfer')
Expand All @@ -1703,3 +1762,29 @@ def test_request_push_ftp(send_ack, send_request, clean_ongoing_transfer, file_c
assert "somepass" not in file_msg.data["uri"]
assert "/some/dir" in file_msg.data["uri"]
assert not file_msg.data["uri"].startswith("ftp://")


@patch('trollmoves.client.ongoing_transfers', new_callable=dict)
@patch('trollmoves.client.file_cache', new_callable=deque)
@patch('trollmoves.client.clean_ongoing_transfer')
@patch('trollmoves.client.send_request')
@patch('trollmoves.client.send_ack')
def test_request_push_scp(send_ack, send_request, clean_ongoing_transfer, file_cache, ongoing_transfers, tmp_path):
"""Test trollmoves.client.request_push() using scp with a single file."""
from trollmoves.client import request_push

clean_ongoing_transfer.return_value = [MSG_FILE_FTP]
send_request.return_value = [MSG_FILE_FTP, 'localhost']
publisher = MagicMock()
kwargs = {'transfer_req_timeout': 1.0, 'req_timeout': 1.0}

destination = f"ftp://{os.fspath(tmp_path)}/some/dir"

request_push(MSG_FILE_FTP, destination, 'someuser:somepass', publisher=publisher,
**kwargs)

file_msg = Message(rawstr=publisher.send.mock_calls[-1][1][0])
assert "someuser" not in file_msg.data["uri"]
assert "somepass" not in file_msg.data["uri"]
assert "/some/dir" in file_msg.data["uri"]
assert not file_msg.data["uri"].startswith("ftp://")
Loading

0 comments on commit 38af438

Please sign in to comment.