Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hop 0.8.0 #66

Merged
merged 16 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ testtools==2.5.0
virtualenv==20.13.0
wheel==0.34.2
inquirer>=2.8.0
hop-client<=0.5.1
hop-client==0.8.0
attrs~=21.4.0
docutils==0.17.1
myst-parser==0.16.1
Expand Down
2 changes: 1 addition & 1 deletion snews_pt/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = '1.2.0'
version = '1.3.0'
1 change: 1 addition & 0 deletions snews_pt/auxiliary/test-config.env
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ ALERT_TOPIC="kafka://${HOP_BROKER}/snews.alert-test"

FIREDRILL_OBSERVATION_TOPIC="kafka://${HOP_BROKER}/snews.experiments-firedrill"
FIREDRILL_ALERT_TOPIC="kafka://${HOP_BROKER}/snews.alert-firedrill"
CONNECTION_TEST_TOPIC="kafka://${HOP_BROKER}/snews.connection-testing"
1 change: 0 additions & 1 deletion snews_pt/message_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ def get_schema(self, tier, data, sent_time, version=__version__):
message['meta'] = data['meta']

if tier == 'Retraction':
message['which_tier'] = data['which_tier']
message['retract_latest'] = data['retract_latest']
message['retraction_reason'] = data['retraction_reason']
message['meta'] = data['meta']
Expand Down
9 changes: 7 additions & 2 deletions snews_pt/remote_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ def test_connection(detector_name=None, firedrill=True, start_at=-5, wait=10):
:param wait: `int` seconds to wait before terminating the check
"""
detector_name = detector_name or os.getenv("DETECTOR_NAME")
default_connection_topic = "kafka://kafka.scimma.org/snews.connection-testing"
connection_broker = os.getenv("CONNECTION_TEST_TOPIC", default_connection_topic)
stamp_time = datetime.utcnow().isoformat()
message = {'_id': '0_test-connection',
'detector_name': detector_name,
Expand All @@ -27,14 +29,17 @@ def test_connection(detector_name=None, firedrill=True, start_at=-5, wait=10):
topic = os.getenv("FIREDRILL_OBSERVATION_TOPIC")
else:
topic = os.getenv("OBSERVATION_TOPIC")

# substream = Stream(until_eos=False, auth=True, start_at=start_at) # when False, while loop doesn't break
substream = Stream(until_eos=True, auth=True, start_at=start_at)
pubstream = Stream(until_eos=True, auth=True)
click.secho(f"\n> Testing your connection to {topic}. \n> Should take ~{wait} seconds...\n")
click.secho(f"\n> Testing your connection.\n> Sending to {topic}\n"
f"> Expecting from {connection_broker}. \n> Should take ~{wait} seconds...\n")

start_time = datetime.utcnow()
confirmed = False
with pubstream.open(topic, "w") as ps, substream.open(topic, "r") as ss:
# with pubstream.open(topic, "w") as ps, substream.open(topic, "r") as ss:
with pubstream.open(topic, "w") as ps, substream.open(connection_broker, "r") as ss:
ps.write(message)
while (datetime.utcnow() - start_time) < timedelta(seconds=wait):
for read in ss:
Expand Down
5 changes: 5 additions & 0 deletions snews_pt/snews_format_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def check_if_test(self):
if "meta" in self.message_keys:
if "is_test" in self.message['meta'].keys():
return self.message["meta"]["is_test"]
return False

def check_id(self):
""" check if the format is correct
Expand Down Expand Up @@ -119,6 +120,10 @@ def check_message_type(self):
return False
self.bypass = True

elif "display-heartbeats" in self.message['_id']:
self.log.debug(f"\t> display-heartbeat is passed. Skipping format check.")
self.bypass = True

elif [i in self.message['_id'] for i in ['TimeTier', 'SigTier', 'CoincidenceTier']]:
self.log.debug(f"\t> Tier message Passed. Checking times.")

Expand Down
11 changes: 3 additions & 8 deletions snews_pt/snews_pt_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,13 @@ def time_tier_data(machine_time=None, neutrino_time=None, p_val=None, timing_ser
return time_tier_dict


def retraction_data(machine_time=None, which_tier=None,
retract_latest=0, retraction_reason=None, meta=None):
def retraction_data(machine_time=None, retract_latest=0, retraction_reason=None, meta=None):
""" Formats data for Retraction as dict object

Parameters
----------
machine_time : `str`
The machine time at the time of execution of command
which_tier : 'str'
OBS type of false message ['CoincidenceTier', 'SigTier', 'TimeTier, 'ALL']
retract_latest: 'int' or 'str'
Tells retraction methods to look for N latest message sent by a detector. can also pass 'ALL'
to retract all messages in a OBS tier.
Expand All @@ -213,10 +210,8 @@ def retraction_data(machine_time=None, which_tier=None,
dictionary of the retraction data

"""
keys = ['machine_time',
'retract_latest', 'which_tier', 'retraction_reason', 'meta']
values = [machine_time, retract_latest,
which_tier, retraction_reason, meta]
keys = ['machine_time', 'retract_latest', 'retraction_reason', 'meta']
values = [machine_time, retract_latest, retraction_reason, meta]
zip_iterator = zip(keys, values)
retraction_dict = dict(zip_iterator)
return retraction_dict
Expand Down
12 changes: 6 additions & 6 deletions snews_pt/snews_pub.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to try all these new updates, as soon as I find some time. Thanks for upgrade!

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
from dateutil.parser import isoparse as fromisoformat
import os, click
from hop import Stream
try:
from hop.models import JSONBlob
except ImportError:
raise ImportError(f"SNEWS Publishing Tools and Coincidence System requires hop version>=0.8.0\n"
f"Please upgrade your `pip install -U hop-client`")
from . import snews_pt_utils
from .snews_format_checker import SnewsFormat
from .snews_pt_utils import prettyprint_dictionary
Expand Down Expand Up @@ -99,7 +104,7 @@ def send(self, messages):
messages = list(messages)
for message in messages:
message["sent_time"] = datetime.utcnow().isoformat()
self.stream.write(message)
self.stream.write(JSONBlob(message))
self.display_message(message)


Expand All @@ -123,7 +128,6 @@ def __init__(self, env_file=None,
p_values=None,
t_bin_width=None,
timing_series=None,
which_tier=None,
retract_latest=None,
retraction_reason=None,
detector_status=None,
Expand Down Expand Up @@ -157,9 +161,6 @@ def __init__(self, env_file=None,
width of time window [sec] ,defaults to None.
timing_series: `list`,
defaults to None, list of strings following ISO format
which_tier: `str`
which tier are you trying to retract from, defaults to None.
Options: 'CoincidenceTier' / 'SigTier' / 'TimingTier' / 'ALL'
retract_latest: `int`
how many of your last messages do you want to retract, defaults to None
retraction_reason: `str`
Expand All @@ -186,7 +187,6 @@ def __init__(self, env_file=None,
'p_values': p_values,
't_bin_width': t_bin_width,
'timing_series': timing_series,
'which_tier': which_tier,
'retract_latest': retract_latest,
'retraction_reason': retraction_reason,
'detector_status': detector_status,
Expand Down
9 changes: 9 additions & 0 deletions snews_pt/snews_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ def save_message(message, outputfolder, return_file=False):
file = make_file(outputfolder)
with open(file, 'w') as outfile:
json.dump(message, outfile, indent=4, sort_keys=True)

if return_file:
return file

def display(message):
""" Function to format output messages
"""
click.echo(click.style('ALERT MESSAGE'.center(65, '_'), bg='red', bold=True))

for k, v in message.items():
key_type = type(v)
if key_type == type(None):
Expand All @@ -52,6 +54,7 @@ def display(message):
click.echo(f'{k:<20s}' + click.style(f':{items:<45}', bg='blue'))
else:
click.echo(f'{k:<20s}:{items:<45}')

click.secho('_'.center(65, '_'), bg='bright_red')


Expand Down Expand Up @@ -100,6 +103,9 @@ def subscribe(self, outputfolder=None, auth=True):
try:
with stream.open(self.alert_topic, "r") as s:
for message in s:
# Access message dictionary from JSOBlob
message = message.content
# Save and display
save_message(message, outputfolder)
snews_pt_utils.display_gif()
display(message)
Expand All @@ -120,6 +126,9 @@ def subscribe_and_redirect_alert(self, outputfolder=None, auth=True):
try:
with stream.open(self.alert_topic, "r") as s:
for message in s:
# Access message dictionary from JSONBlobg
message = message.content
# Save and display
file = save_message(message, outputfolder, return_file=True)
snews_pt_utils.display_gif()
display(message)
Expand Down
2 changes: 1 addition & 1 deletion snews_pt/test/test_coincidence_tier.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def test_coincidence_expected():
assert coin.message_data == {'detector_name': 'KamLAND', 'machine_time': None,
'neutrino_time': '2012-06-09T15:31:08.891011',
'p_val': None, 'p_values': None, 't_bin_width': None, 'timing_series': None,
'which_tier': None, 'retract_latest': None, 'retraction_reason': None,
'retract_latest': None, 'retraction_reason': None,
'detector_status': None, 'is_pre_sn': False, 'is_test':True}

# the SNEWSTierPublisher already checks this in creation, but we can double check
Expand Down
1 change: 0 additions & 1 deletion snews_pt/test/test_significance_tier.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def test_significance_expected():
'p_values': [0.4, 0.5],
't_bin_width': 0.8,
'timing_series': None,
'which_tier': None,
'retract_latest': None,
'retraction_reason': None,
'detector_status': None,
Expand Down
2 changes: 1 addition & 1 deletion snews_pt/test/test_timing_tier.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def test_timing_expected():
'neutrino_time': '2012-06-09T15:31:08.109876', 'p_val': None,
'p_values': None, 't_bin_width': None,
'timing_series': ['2012-06-09T15:31:08.109876', '2012-06-09T15:33:07.891011'],
'which_tier': None, 'retract_latest': None, 'retraction_reason': None,
'retract_latest': None, 'retraction_reason': None,
'detector_status': None, 'is_pre_sn': False, 'is_test':True}
assert tims.env_file == None

Expand Down
2 changes: 1 addition & 1 deletion snews_pt/tier_decider.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from datetime import datetime
import os, sys

valid_keys = ["detector_name", "machine_time", "neutrino_time", "p_val", "p_values", "timing_series", "which_tier",
valid_keys = ["detector_name", "machine_time", "neutrino_time", "p_val", "p_values", "timing_series",
"retract_latest", "retraction_reason", "detector_status", "is_pre_sn", 't_bin_width']

class TierDecider:
Expand Down