-
Notifications
You must be signed in to change notification settings - Fork 0
/
eon_groomer.py
1623 lines (1506 loc) · 88.9 KB
/
eon_groomer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2014, 2015 Patrick Moran for Verizon
#
# Distributes WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License. If not, see <http://www.gnu.org/licenses/>.
from collections import deque
import g_config
import g_eon_api_bridge
# from g_graphics import plot_assets
import time
import logging
import json
from g_lat_lon_distance import lat_lon_distance, move_to_lat_lon, compute_resolution
from sortedcontainers import SortedDict
import pickle
import copy
import pandas
from numpy import int64, fmax, argsort, array, interp, linspace, diff, random
import arrow
import Queue
import os
import threading
ON = 1
OFF = 0
class GroomingMessageHandler(threading.Thread):
def __init__(self,
incoming_q,
incoming_queue_lock,
outgoing_q,
outgoing_queue_lock,
module_instance_name='Unnamed',
shared_data=None, shared_data_lock=None):
self.incoming_rabbit_mq = incoming_q
self.incoming_queue_lock = incoming_queue_lock
self.outgoing_q = outgoing_q
self.outgoing_queue_lock = outgoing_queue_lock
self.my_local_logger = logging.getLogger(module_instance_name)
self.my_local_logger.setLevel(logging.DEBUG)
self.local_q = deque()
self.eon_api_bridge = g_eon_api_bridge.EonApiBridge()
self.handle_queue = False
self.instance_name = module_instance_name
# This is used to run the main loop
self.run_enable = True
self.shared_data = shared_data
self.shared_data_lock = shared_data_lock
self.start_time = 0
self.run_start_time = time.time()
self.groomer_state = "0:IDLE" # Used to determine the current state of this thread in a multi-threaded env
self.groom_run_state = "0:IDLE" # Used to determine the current run mode of this thread
self.idle_count = 0
self.end_time = 0
self.query_count = 0
self.asset_dictionary = {}
self.working_radius = g_config.START_RADIUS # This will hold the radius units 0.12
self.cell_collection_set = set()
self.resolution = compute_resolution(self.working_radius)
self.cell_count = 0
self.utility_region = g_config.UTILITY_REGION
self.ttl = g_config.TTL_MAX
self.SHOW_PLOTS = False
self.cell_time_event = False
threading.Thread.__init__(self)
@staticmethod
def check_message_payload(dequeued_item):
"""
This method checks that the message payload keys matches the required (specified) keys
:return: False is any key is missing otherwise True
"""
key_array = ["dateTime",
"payload",
"messageType"]
# Note that the "ttl" key (and others) may be present but its not checked here!
for key in key_array:
if key not in dequeued_item.keys():
return False
key_array = ["zoomR",
"spatial",
"circuitID",
"reputationEnabled",
"assetID",
"temporal",
"outageTime",
"company",
"votes",
"zoomT",
"longitude",
"latitude"]
for key in key_array:
if key not in dequeued_item["payload"].keys():
return False
return True
def process_incoming_rabbit_mq(self):
"""
Processes the Rabbit MQ bus messages and process the queue depending on the type
If the type is Query then put it on the local queue for processing later
"""
self.groomer_state = "3:PROCESS QUEUE"
lock_counter = 0
while not self.incoming_queue_lock.acquire(False):
self.my_local_logger.debug("Trying to acquire lock. Sleeping 0.05s.")
time.sleep(g_config.SLEEP_TIME)
lock_counter += 1
if lock_counter > 100:
self.my_local_logger.debug("Cant acquire incoming queue lock, returning")
self.my_local_logger.error("Unable to acquire lock in process_incoming_queue, returning!")
self.groomer_state = "4:PROCESS QUEUE LOCK ERROR"
return
while not self.incoming_rabbit_mq.empty():
self.my_local_logger.debug(
"Groomer says Incoming Rabbit MQ not empty, length is %d" % self.incoming_rabbit_mq.qsize())
self.my_local_logger.debug("Acquired lock")
# This is where the incoming grooming message is pulled off the Rabbit MQ.
dequeued_item = self.incoming_rabbit_mq.get()
if self.check_message_payload(dequeued_item):
self.my_local_logger.info("A %s type message was dequeued " %
dequeued_item['messageType'])
else:
self.my_local_logger.error("Message payload is malformed in process_incoming_queue, returning")
if self.incoming_queue_lock:
self.incoming_queue_lock.release()
self.my_local_logger.debug("GROOMER rabbit MQ lock was released")
self.my_local_logger.info("The rabbit MQ lock was released")
self.groomer_state = "5:PROCESS QUEUE MALFORMED"
return
# Determine what is queue command type is and dispatch it.
if dequeued_item['messageType'] == 'Test':
# This is a dummy Test which is dropped for now.
pass
elif dequeued_item['messageType'] == 'Clear':
# Restore the previous results
pass
elif dequeued_item['messageType'] == 'Save':
# Save the current groom (filter) settings and kick off a new Utility wide groom process
# Grab the Query message type and stuff it in a local fifo queue
self.my_local_logger.debug("Save type message received")
self.my_local_logger.debug("query_guid = %s" % "None - missing on save") # dequeued_item['queryGuid'])
#######################################################
# Collect interesting payload information here
#######################################################
if "ttl" not in dequeued_item.keys():
dequeued_item["ttl"] = g_config.TTL_UTILITY_SPAN
self.local_q.append(dequeued_item)
self.my_local_logger.debug("Message queued to the local incoming queue (len=%d)" % len(self.local_q))
self.my_local_logger.info("Message queued to the local incoming queue (len=%d)" % len(self.local_q))
pass
elif dequeued_item['messageType'] == 'Query':
# Grab the Query message type and stuff it in a local fifo queue
self.my_local_logger.debug("Query type message received")
self.my_local_logger.debug("query_guid = %s" % dequeued_item['queryGuid'])
#######################################################
# Collect interesting payload information here
#######################################################
if "ttl" not in dequeued_item.keys():
dequeued_item["ttl"] = g_config.TTL_MAX
self.local_q.append(dequeued_item)
self.my_local_logger.debug("Message queued to the local incoming queue (len=%d)" % len(self.local_q))
self.my_local_logger.info("Message queued to the local incoming queue (len=%d)" % len(self.local_q))
else:
self.my_local_logger.error("incoming_rabbit_mq TYPE is a UNKNOWN")
if self.incoming_queue_lock:
self.incoming_queue_lock.release()
self.my_local_logger.debug("GROOMER rabbit MQ lock was released")
self.my_local_logger.info("The rabbit MQ lock was released")
self.my_local_logger.debug("process_incoming_rabbit_mq finished")
self.groomer_state = "0:IDLE"
def get_data_in_cell_area(self, cell_parameters, ttl):
"""
Ask the EON API for onts, circuits and transformers for a given lat, lon and radius
Returns a group of items that are inside the circle with a given center (lat, lon) and
radius.
Note: convert the time units in the ONT event list into minutes by dividing by 60000
:param cell_parameters: Latitude
:param ttl: The time to live.
:return: this_cell # A hexagonal cell dictionary
this_cell = {'neighbors': [], # the 6 nearest neighbor cells
'assets': {}, # The utility assets including their lat and lon and events
'onts': {}, # Verizon's ONTs including their lat and lon and events
'state': '' # A string representing the state of this cell.
This is used for multi threading purposes so that neighboring cells can see
whats going on.
'circuits': {} # This is a set of circuits in this cell. All assets on a circuit
are in the circuits list
'lat_lon': [] # The lat and lon array of the center of the cell
'radius': 1.00 # The radius of the circumscribed cell.
ont_items is a dictionary of {'lat_lon':[],'assets':[],'events':[]}
asset_items is a dictionary of {'lat_lon':[],'onts':[],'events':[]}
circuit_items is a dictionary of {'connected_items' , asset_item_key}
where asset_item_key is a key entry in the asset_item dictionary
events is an array of 2 sets of events. events[0] is the "fail_time" and events[1] is the "restore_time"
A call to teh API is done in a loop to gather all items, here is a test of teh api call:
The swagger test example is
http://10.123.0.27:8080/eon360/api/query
With a json payload of
{
"itemType":"ALL",
"circle": {
"unit": "MILES",
"longitude": -73.8773389,
"radius": 1.0,
"latitude": 41.2693778
},
"pageParameter": {
"page": 0,
"size": 100
}
}
This will return a data structure like this
dd['eligibility']['dataItems']
dd['alarm']['dataItems']
dd['utility']['dataItems']
"""
# query_guid = payload["query_guid"]
this_lat = cell_parameters["latitude"]
this_lon = cell_parameters["longitude"]
# utility = cell_parameters["company"]
groom_time = cell_parameters["outageTime"]
# circuit_id = cell_parameters["circuitID"]
# asset_id = cell_parameters["assetID"]
# votes = cell_parameters["votes"]
# spatial = cell_parameters["spatial"]
# temporal = cell_parameters["temporal"]
# reputation_ena = cell_parameters["reputationEnabled"]
# zoom_t = cell_parameters["zoomT"]
# zoom_r = cell_parameters["zoomR"]
this_radius = cell_parameters["radius"]
# units = cell_parameters["units"]
query_type = "ALL"
ont_serial_number_set = set()
ont_items = {}
asset_serial_number_set = set()
asset_items = {}
circuit_serial_number_set = set()
circuit_items = {}
# The six neighbor cells are initially set to be empty
# This a string quid and an angle (in degrees)
neighbor_array = [["", 0], ["", 60], ["", 120], ["", 180], ["", 240], ["", 300]]
this_cell = {'neighbors': neighbor_array,
'assets': {},
'onts': {},
'circuits': {},
'state': 'creating',
'lat_lon': [this_lat, this_lon],
'radius': this_radius,
'groom_time': groom_time,
'ttl': 0
}
page_number = 0
page_size = 20
query_parameter = json.dumps({"itemType": query_type,
"circle": {"longitude": this_lon,
"latitude": this_lat,
"radius": this_radius, "unit": g_config.RADIUS_UNITS},
"pageParameter": {"page": page_number, "size": page_size}})
self.my_local_logger.debug("Formed query parameter: %s" % query_parameter)
dd = self.eon_api_bridge.query_post_eon_data_30(query_parameter=query_parameter)
more_pages = True
# Loop here until no more utility components of the first collection are found
while more_pages and dd is not None:
# This is the ONTs loop through them and find all the ONTs in the area
for this_ont in dd['eligibility']['dataItems']:
ont_dictionary_keyword = this_ont['ontSerialNumber']
ont_serial_number_set.add(ont_dictionary_keyword)
if ont_dictionary_keyword == "[PENDING INSTALL]":
self.my_local_logger.debug("skipping this ont in eligibility list")
continue
ont_items[ont_dictionary_keyword] = {'lat_lon': [this_ont['latitude'], this_ont['longitude']]}
alarm_set_time = set()
alarm_clear_time = set()
ont_items[ont_dictionary_keyword]['events'] = [alarm_set_time, alarm_clear_time]
ont_items[ont_dictionary_keyword]['assets'] = set()
for this_alarm in dd['alarm']['dataItems']:
alarm_dictionary_keyword = this_alarm['ontSerialNumber']
if alarm_dictionary_keyword not in ont_serial_number_set:
if alarm_dictionary_keyword == "[PENDING INSTALL]":
self.my_local_logger.debug("skipping this ONT in the alarm list")
continue
ont_serial_number_set.add(alarm_dictionary_keyword)
ont_items[alarm_dictionary_keyword] = {'lat_lon': [this_alarm['latitude'], this_alarm['longitude']]}
alarm_set_time = set()
alarm_clear_time = set()
ont_items[alarm_dictionary_keyword]['events'] = [alarm_set_time, alarm_clear_time]
ont_items[alarm_dictionary_keyword]['assets'] = set()
if this_alarm['alarmReceiveTime']:
alarm_set = float(this_alarm['alarmReceiveTime']) # * 1e-3) / 60
ont_items[alarm_dictionary_keyword]['events'][0].add(alarm_set)
if this_alarm['alarmClearTime']:
alarm_clear = float(this_alarm['alarmClearTime']) # * 1e-3) / 60
ont_items[alarm_dictionary_keyword]['events'][1].add(alarm_clear)
# Now go through the assets and associate the assets to the ONTs and the ONTs to the assets
for this_item in dd['utility']['dataItems']:
asset_dictionary_keyword = this_item['transformerID']
if asset_dictionary_keyword not in asset_serial_number_set:
asset_serial_number_set.add(asset_dictionary_keyword)
asset_items[asset_dictionary_keyword] = {'lat_lon': [this_item['latitude'], this_item['longitude']]}
asset_items[asset_dictionary_keyword]['events'] = [set(), set()]
asset_items[asset_dictionary_keyword]['onts'] = set()
asset_items[asset_dictionary_keyword]['guid'] = this_item['guid']
asset_items[asset_dictionary_keyword]['serviceAddress'] = this_item['serviceAddress']
for this_ont in this_item['eligibilityList']:
ont_dictionary_keyword = this_ont['ontSerialNumber']
if ont_dictionary_keyword not in ont_serial_number_set:
ont_serial_number_set.add(ont_dictionary_keyword)
ont_items[ont_dictionary_keyword] = {
'lat_lon': [this_ont['latitude'], this_ont['longitude']]}
alarm_set_time = set()
alarm_clear_time = set()
ont_items[ont_dictionary_keyword]['events'] = [alarm_set_time, alarm_clear_time]
ont_items[ont_dictionary_keyword]['assets'] = set()
# Skip the ONTs that don't have an installation.
if ont_dictionary_keyword == "[PENDING INSTALL]":
self.my_local_logger.debug("skipping the ONT listed on eligibility list in asset_id=%s" %
asset_dictionary_keyword)
self.my_local_logger.info("Skipping %s because it's status is PENDING INSTALL" %
asset_dictionary_keyword)
continue
# Stitch up the assets in the onts
ont_items[ont_dictionary_keyword]['assets'].add(asset_dictionary_keyword)
# Stitch up the onts in the assets
asset_items[asset_dictionary_keyword]['onts'].add(ont_dictionary_keyword)
circuit_dictionary_keyword = this_item['circuitID']
if circuit_dictionary_keyword not in circuit_serial_number_set:
# add the circuit item to the circuit_serial_number_set is needed
circuit_serial_number_set.add(circuit_dictionary_keyword)
# and create an empty set
circuit_items[circuit_dictionary_keyword] = {'connected_items': set()}
# Now add the data structure to the set
circuit_items[circuit_dictionary_keyword]['connected_items'].add(asset_dictionary_keyword)
###########################
# Look for the next page #
###########################
if (dd['utility']['pageTotalItems'] == page_size) or \
(dd['alarm']['pageTotalItems'] == page_size) or \
(dd['eligibility']['pageTotalItems'] == page_size):
self.my_local_logger.debug("Collecting next page for this message")
page_number += 1
more_pages = True
query_parameter = json.dumps({"itemType": query_type,
"circle": {"longitude": this_lon,
"latitude": this_lat,
"radius": this_radius,
"unit": g_config.RADIUS_UNITS},
"pageParameter": {"page": page_number, "size": page_size}})
dd = self.eon_api_bridge.query_post_eon_data_30(query_parameter=query_parameter)
else:
more_pages = False
this_cell['assets'] = asset_items
# Go over the ONT set and see it there are any that don't have alarms. This might happen if there were no alarms
# posted to this ONT because the main alarm injestion loop failed for some reason. There will still be alarms
# that are posted on the ONTs and those can be recovered here.
for this_ont in ont_items:
if len(ont_items[this_ont]['events'][0]) == 0 or len(ont_items[this_ont]['events'][1]) == 0:
# To find any ONTs that don't seem to have alarms make this call:
# where ONT_SERIAL_NUMBER is 00ABB96 in this example.
# http://10.123.0.27:8080/eon360/api/alarms?sortBy=alarmReceiveTime&ontSerialNumber=000ABB96&p=0&s=20
dd = self.eon_api_bridge.alarm_get_pons_nms_00(ont_serial_number=this_ont)
if dd:
if 'alarms' in dd.keys():
for this_alarm in dd['alarms']:
if this_alarm['alarmReceiveTime']:
alarm_set = float(this_alarm['alarmReceiveTime']) # * 1e-3) / 60
ont_items[this_ont]['events'][0].add(alarm_set)
self.my_local_logger.info("Adding an AlarmReceiveTime to the data")
if this_alarm['alarmClearTime']:
alarm_clear = float(this_alarm['alarmClearTime']) # * 1e-3) / 60
ont_items[this_ont]['events'][1].add(alarm_clear)
else:
self.my_local_logger.warning("No alarms found in call to alarm_get_pons_nms_00(ont_serial_number=%s)" % this_ont )
else:
self.my_local_logger.warning("Nothing returned from the API call")
this_cell['onts'] = ont_items
this_cell['circuits'] = circuit_items
this_cell['state'] = 'populated'
this_cell['ttl'] = ttl
self.my_local_logger.info("This CELL (radius= %3.3f %s @ lat=%f, lon=%f) has %d circuits, %d assets and %d onts." %
(this_radius, g_config.RADIUS_UNITS, this_lat, this_lon,
len(circuit_items), len(asset_items), len(ont_items))
)
# Note convert the time units into minutes by dividing by 60000
return this_cell
@staticmethod
def persist_cell_pickle(cell, filename=""):
"""
:param cell: The cell structure that is persisted to disk
:return:
"""
this_lat = cell['lat_lon'][0]
this_lon = cell['lat_lon'][1]
if this_lat < 0:
lat_str = ("%03.2f" % (float(round(-this_lat * 100)) / 100.0)).replace('.', 'm')
else:
lat_str = ("%03.2f" % (float(round(this_lat * 100)) / 100.0)).replace('.', 'p')
if this_lon < 0:
lon_str = ("%03.2f" % (float(round(-this_lon * 100)) / 100.0)).replace('.', 'm')
else:
lon_str = ("%03.2f" % (float(round(this_lon * 100)) / 100.0)).replace('.', 'p')
if filename == "":
filename = 'cell_' + lat_str + '_' + lon_str
filename += '.pck'
full_path = g_config.BASE_DIR + os.sep + g_config.PICKLES + os.sep + filename
with open(full_path, "w") as f: # write mode
pickle.dump(cell, f)
@staticmethod
def un_persist_cell_pickle(this_lat, this_lon):
"""
:param this_lat:
:param this_lon:
:return: cell
"""
if this_lat < 0:
lat_str = ("%03.2f" % (float(round(-this_lat * 100)) / 100.0)).replace('.', 'm')
else:
lat_str = ("%03.2f" % (float(round(this_lat * 100)) / 100.0)).replace('.', 'p')
if this_lon < 0:
lon_str = ("%03.2f" % (float(round(-this_lon * 100)) / 100.0)).replace('.', 'm')
else:
lon_str = ("%03.2f" % (float(round(this_lon * 100)) / 100.0)).replace('.', 'p')
filename = 'cell_' + lat_str + '_' + lon_str + '.pck'
with open(filename, "r") as f: # read mode
cell = pickle.load(open(f))
return cell
def temporal_filter(self, cell):
"""
:param cell:
This method does the filter model of the ont and returns a filtered outage based on the
alarm_condition (a value between 0 and 1)
Start with the alarm_condition =0 which is no alarm (These are alarm_conditions for ALARMs)
This is how the EPOCH number can be converted back and forth to a date.
In this context ON means power is ON, OFF means power is off
t is in milliseconds. To convert to minutes divide by 1000 and by 60.
:return:
"""
self.cell_time_event = False
for this_ont in cell['onts']:
event_vector = {'t': [int64(g_config.ENGINE_BEGIN_TIME)], 'a': [ON]}
on_times = cell['onts'][this_ont]['events'][ON]
off_times = cell['onts'][this_ont]['events'][OFF]
if len(on_times) > 0:
for this_alarm in on_times:
event_vector['t'].append(this_alarm)
event_vector['a'].append(ON)
if len(off_times) > 0:
for this_alarm in off_times:
event_vector['t'].append(this_alarm)
event_vector['a'].append(OFF)
# At this point we have a temporal vector of event for this ONT.
time_vector = array(event_vector['t'])
ind = argsort(time_vector)
power_state = array(event_vector['a'])[ind]
t = time_vector[ind]
# At this point the sorted time and alarm vectors are ready
# tw = t[t > t[-1] - config.ALARM_DETECT_WINDOW * 1000]
# aw = a[t > t[-1] - config.ALARM_DETECT_WINDOW * 1000]
# Deglitch the vectors now
# To deglitch the time vector take all the values that at ON and extend them by 5 minutes then
# and add (or) them back to the time vector
# time_of_alarm_condition = tw[-1] # The last time vector point (the sorted value)
# alarm_condition = aw[-1]
time_count = len(t)
deglitched_power_state = copy.copy(power_state)
# see for example http://pandas.pydata.org/pandas-docs/stable/timeseries.html
for i in range(time_count - 1):
if power_state[i] == OFF and power_state[i + 1] == ON:
if t[i + 1] < t[i] + g_config.DEGLITCH_TIME:
self.my_local_logger.debug(
"Deglitched the power at %s" % (pandas.to_datetime(t[i], unit='ms')))
deglitched_power_state[i] = ON
else:
self.my_local_logger.debug("off time is %f min (%f hours) (days %f)" % (
(t[i + 1] - t[i]) / 1000 / 60, (t[i + 1] - t[i]) / 1000 / 60 / 60,
(t[i + 1] - t[i]) / 1000 / 60 / 60 / 24))
power_state_array = []
time_array = []
for i in range(time_count-1):
time_array.append(t[i])
time_array.append(t[i+1] - g_config.MS_TIME_RESOLUTION) # something around 5 seconds
power_state_array.append(deglitched_power_state[i])
power_state_array.append(deglitched_power_state[i])
if deglitched_power_state[i] == ON:
self.my_local_logger.debug("power on at %s" % (pandas.to_datetime(t[i], unit='ms')))
if deglitched_power_state[i] == OFF:
self.my_local_logger.debug("power off at %s" % (pandas.to_datetime(t[i], unit='ms')))
time_array.append(t[-1])
power_state_array.append(deglitched_power_state[-1])
sample_time = cell['groom_time']
if sample_time > t[-1]:
self.my_local_logger.debug(
"sample time is after the end of time in the time event list, using interpolated value")
time_array.append(sample_time - g_config.MS_TIME_RESOLUTION)
power_state_array.append(deglitched_power_state[-1])
time_array_sec = [round(x / 1000) for x in time_array]
# time_domain_vector = [time_array, power_state_array] # column_stack((time_array,power_state_array))
# Calculate a +/- 1 week interval every 5 minutes from the groom time unless the groom time is the same as
# the current time then the last 30 minutes are used to compute the time vector.
# This is done to allow the real time groomer to run a bit faster than the interactive groomer during the
# interp call.
# The arrow library produces timestamp values in seconds.
current_time = arrow.utcnow().to('US/Eastern')
a_week_ago = current_time.replace(weeks=-1)
sample_time_arrow = arrow.get(sample_time/1000)
if sample_time_arrow.timestamp < a_week_ago.timestamp:
# This is a grooming operation that fits in the 2 week span of time.
start_time = sample_time_arrow.replace(weeks=-1)
stop_time = sample_time_arrow.replace(weeks=1)
else:
start_time = sample_time_arrow.replace(weeks=-2)
stop_time = sample_time_arrow
# The time vector will be in seconds
# One minute = 60
# One hour = 60*60
# One day = 24*60*60
# One week = 7*24*60*60
# Five minute intervals are 5*60
delta_time = 5*60 # This is the sample interval of the time vector (Every 5 minutes)
number_of_points = (stop_time.timestamp - start_time.timestamp) / delta_time
sample_time_array = linspace(start_time.timestamp, stop_time.timestamp, number_of_points)
sample_power_array = interp(sample_time_array, time_array_sec, power_state_array)
time_domain_vector = [sample_time_array, sample_power_array]
reliability = sum(sample_power_array)/len(sample_power_array)
event_durations = []
event_times = []
if sample_power_array.min() == sample_power_array.max():
self.SHOW_PLOTS = False
else:
self.SHOW_PLOTS = True
if self.SHOW_PLOTS:
if not g_config.IS_DEPLOYED:
print "Reliability = %4.4f" % reliability
if reliability > 0.8:
self.cell_time_event = True
if not g_config.IS_DEPLOYED:
try:
import matplotlib.pyplot as plt
# plt.plot(time_array, power_state_array, 'o')
plt.plot(sample_time_array, sample_power_array, '-x')
plt.show(block=False)
except:
print "Something went wrong with the matplotlib command, skipping!"
if (sample_power_array[0] > 0) and (sample_power_array[-1] > 0):
if not g_config.IS_DEPLOYED:
print "Diff the time vector to find the on and off times."
diff_sample_power_array = diff(sample_power_array)
index_on = diff_sample_power_array > 0
on_times = sample_time_array[index_on]
index_off = diff_sample_power_array < 0
off_times = sample_time_array[index_off]
if len(on_times) == len(off_times):
for k, t_off in enumerate(off_times):
# The power will be off from the time it turns minus the time it turned off.
power_fail_event_duration = on_times[k] - t_off
if not g_config.IS_DEPLOYED:
print "power fail event duration = %f" % power_fail_event_duration
event_durations.append(power_fail_event_duration)
event_times.append(t_off)
if not g_config.IS_DEPLOYED:
print "Found a %10.2f minute outage on %s" % (
(power_fail_event_duration/60),
arrow.get(t_off).format("MMMM DD, YYYY @ hh:mm A")
)
else:
self.my_local_logger.info('Power event edges are mismatched, skipping this: ')
else:
self.my_local_logger.info('Power event edges in the window are mismatched, skipping this: ')
else:
self.my_local_logger.info('Power event outage has low reliability, skipping this: ')
self.my_local_logger.info('temporal data for cell has %d points from %s to %s' % (
number_of_points, start_time, stop_time))
cell['onts'][this_ont]['temporal_filter'] = {'reliability': reliability,
'event_durations': event_durations,
'event_times': event_times,
'time_domain_vector': time_domain_vector}
return cell
def spatial_filter(self, cell):
"""
The spatial filter does a filtering of the ont collection based on the asset called this_asset.
:param cell:
A cell that contains of onts along with their locations and states.
The onts values must have been filtered temporally first.
:return:
"""
if self.cell_time_event:
# Only append outages on assets for the cells that have events
if not g_config.IS_DEPLOYED:
print "An interesting time event has occurred in this cell..."
for this_ont in cell['onts']:
event_durations = cell['onts'][this_ont]['temporal_filter']['event_durations']
event_times = cell['onts'][this_ont]['temporal_filter']['event_times']
if not g_config.IS_DEPLOYED:
if this_ont == "0016FE13":
print "found an event"
for this_asset in cell['onts'][this_ont]['assets']:
if not g_config.IS_DEPLOYED:
if this_asset == "TR1000489404_108":
print "found a matching asset"
try:
event_activities = cell['assets'][this_asset]['spatial_filter']
except KeyError:
event_activities = {'distance': [], 'events': []}
if len(event_durations) > 0:
ont_lat = cell['onts'][this_ont]['lat_lon'][0]
ont_lon = cell['onts'][this_ont]['lat_lon'][1]
lat_lon = cell['assets'][this_asset]['lat_lon']
asset_lat = lat_lon[0]
asset_lon = lat_lon[1]
this_distance = lat_lon_distance(asset_lat, asset_lon, ont_lat, ont_lon, units='mi')
event_activities['distance'].append(this_distance)
event_activities['events'].append(
{'event_durations': event_durations, 'event_times': event_times}
)
cell['assets'][this_asset]['spatial_filter'] = event_activities
if not g_config.IS_DEPLOYED:
print " ...done with interesting cell."
return cell
def vote_on_assets(self, cell, temporal_data, spatial_data, voting_data):
"""
:param cell:
:param voting_data: an integer that is the number of votes to use
:return:
"""
try:
this_filter = json.loads(spatial_data)
total_counts = len(this_filter['r'])
weights = []
for i in range(total_counts):
weights.append(this_filter['r'][i])
except TypeError as e:
self.my_local_logger.error('Spatial data has a Type Error: %s, %s' % (spatial_data, e))
except ValueError as e:
self.my_local_logger.error('Spatial data has a ValueError: %s, %s' % (spatial_data, e))
self.my_local_logger.info('spatial data = %s', spatial_data)
self.my_local_logger.info('temporal data = %s', temporal_data)
if voting_data:
try:
number_of_votes = int(voting_data)
except ValueError as e:
self.my_local_logger.error('Voting data has en error in the passed value %s' % e)
number_of_votes = 1
except TypeError as e:
self.my_local_logger.error('Voting data is not a string %s' % e)
number_of_votes = 1
else:
number_of_votes = 1
self.my_local_logger.info('Number of votes passed: %d' % number_of_votes)
for this_asset in cell['assets']:
cell['assets'][this_asset]['outage_events'] = None
try:
# these_distances = cell['assets'][this_asset]['spatial_filter']['distance']
these_events = cell['assets'][this_asset]['spatial_filter']['events']
except KeyError:
# print "No outages on this asset"
continue
if len(these_events) > 0:
if len(these_events) >= 1: # number_of_votes:
# This is where the filter will take place.
# These events is an array.
# I must iterate over an array of these event items
try:
outage_events = cell['assets'][this_asset]['outage_events']
except KeyError:
outage_events = {'event_durations': [], 'event_times': []}
if outage_events is None:
outage_events = {'event_durations': [], 'event_times': []}
for this_event_dict in these_events:
for j, this_event in enumerate(this_event_dict['event_durations']):
outage_events['event_durations'].append(this_event)
outage_events['event_times'].append(this_event_dict['event_times'][j])
cell['assets'][this_asset]['outage_events'] = outage_events
return cell
def post_outage_on_asset(self, cell, payload):
"""
:param cell:
:param payload: this will be of the form
http://10.123.0.27:8080/eon360/api/utilities?p=0&s=20
"eonUtilityEntries": [
{
"id": "5508dacee4b0df5309df591e",
"version": 0,
#######################
## ADD THIS GUID
"guid": "46f7655c-9160-4c08-b272-59c32232ba9f",
#######################
"company": "CEDRAFT",
"serviceAddress": "{\"CE Map ID\": \"None\",
\"Municipality\": \"New Castle\",
\"Provenance\":\"Report A\",
\"Attached Assets\": [],
\"Next Hop\": \"PS302355612\",
\"Type\": \"HOUSE\",
\"Downstream\": \"None\",
\"Transformer Supply\": [\"TR302355616_T4\"],
\"Upstream\":\"PS302355612\",
\"Connections\": [],
\"Address\":\"10 VALLEY VIEW RD, Chappaqua NY, 10514-2532\",
\"Utility ID\": \"None\"}",
"errorCode": "0",
"circuitID": "10U2",
"transformerID": "HS01c902165608e5f12ce4c01c78c70415",
"eligibilityList": [
{
"id": "54a079aae4b040db636a2d95",
"version": 0,
"guid": "23697667-4810-4169-8802-46ad6efae3a3",
"company": "",
"ontSerialNumber": "59054969",
"errorCode": "0.91",
"alarmID": "CHPQNYCPOL1*LET-3*11*1*1",
"ontAddress": "8 Brookside Cir,Chappaqua,NY,10514",
"modelCoefficients": null,
"longitude": f-73.787811,
"latitude": 41.175064,
"createdAtTimestamp": 1419803050366,
"lastModifiedAtTimestamp": 1419803050366
},
"payload": {
"company": "CEDRAFT",
"outageTime": 1430452800000,
"longitude": lon,
"latitude": lat,
"circuitID": "",
"assetID": "",
"votes": 3,
"spatial": '{"r":[1,1]}',
"temporal": "[1,0; .8,24; .3, 60]",
"reputationEnabled": True,
"zoomT": 1,
"zoomR": 1,
"radius": 0.12,
"units": "MI"
},
The post must be of the form
{
"eventDuration": "long",
"guid": "",
"id": "",
"utility": {
"assetType": "",
"circuitID": "",
"company": "",
"outageID": "",
"transformerID": ""
},
"timeOfEvent": "Date",
"company": "",
"longitude": 0,
"internalUtilityGuid": "",
"latitude": 0,
"algorithm": "",
"version": "long"
}
:return:
"""
# circuit_id = ""
# First loop over all circuits:
try:
for this_circuit in cell['circuits']:
# Now loop over all the items on that circuit
for this_asset in cell['circuits'][this_circuit]['connected_items']:
asset_item = cell['assets'][this_asset]
outages = asset_item['outage_events']
# This is the form of an event (If there is one!)
# It will be None if there are no events otherwise it will be:
# 'event_durations': copy.deepcopy(these_events['event_durations']),
# 'event_times': copy.deepcopy(these_events['event_times'])
if outages:
self.my_local_logger.info('Examining circuit=%s, asset=%s. which has %d outages to post!' % (this_circuit, this_asset, len(outages)))
if this_asset[0:2] == "TR":
asset_type = "TRANSFORMER"
elif this_asset[0:2] == "HS":
asset_type = "HOUSE"
elif this_asset[0:2] == "PS":
asset_type = "POLE, SECONDARY"
elif this_asset[0:2] == "PP":
asset_type = "POLE, PRIMARY"
else:
asset_type = "OTHER"
for i, this_event_duration in enumerate(outages['event_durations']):
address_string = cell['assets'][this_asset]['serviceAddress']
self.my_local_logger.info("address_string = %s" % address_string)
address_string_pairs = json.loads(address_string)
this_address = ''
if "Municipality" in address_string_pairs.keys():
this_address += 'Municipality:' + address_string_pairs['Municipality'] + '|'
if "Address" in address_string_pairs.keys():
this_address += 'Address:' + address_string_pairs['Address'] + '|'
# Here's how to include the CE Map ID and the Utility ID if needed
# this_address += 'CE MapID:' + this_asset.split('_')[1] + '|'
# this_address += 'UtilityID:' + this_asset.split('_')[0][2:]
if this_address[-1] == '|':
this_address = this_address[:-1]
utility_document = {
"internalUtilityGuid": asset_item['guid'],
"eventDuration": int(round(this_event_duration * 1000)),
# "guid": "guid-here",
# "id": 'id-here',
"utility": {
"assetType": asset_type,
"circuitID": this_circuit,
"company": payload["company"],
"outageID": 'outage-id-here',
"transformerID": this_asset,
"address": this_address
},
"timeOfEvent": int(round(outages['event_times'][i] * 1000)),
# "longitude": asset_item['lat_lon'][1],
# "latitude": asset_item['lat_lon'][0],
"algorithm": "NEAR10"
# "version": 0
}
if not g_config.IS_DEPLOYED:
print "Posting a %10.2f minute outage on %s, circuit: %s, asset_id: %s" % (
(utility_document['eventDuration'] / 1000 / 60),
arrow.get(utility_document['timeOfEvent'] / 1000).format("MMMM DD, YYYY @ hh:mm A"),
utility_document['utility']['circuitID'],
utility_document['utility']['transformerID']
)
self.my_local_logger.info('Posting: %s' % json.dumps(utility_document))
self.eon_api_bridge.groomed_outages_post_20(utility_document)
else:
if not g_config.IS_DEPLOYED:
print "Nothing to post for circuit: %s, asset_id: %s" % (
this_circuit,
this_asset
)
except:
self.my_local_logger.error('Posting outage error')
def build_in_memory_cell_db(self, cell):
"""
:param cell: A cell of data that represents the collection of onts, assets and circuits along with the alarms
Creates an in-memory data structure that has this information:
this_cell = {'neighbors': [], # the 6 nearest neighbors
'assets': {}, # The utility assets including their lat and lon and events
'onts': {}, # Verizon's ONTs including their lat and lon and events
'state': '' # A string representing the state of this cell.
This is used for multi threading purposes so that neighboring cells can see
whats going on.
'circuits': {} # This is a set of circuits in this cell. All assets on a circuit
are in the circuits list
'lat_lon': [] # The lat and lon array of the center of the cell
'radius': 1.00 # The radius of the circumscribed cell.
ont_items is a dictionary of {'lat_lon':[],'assets':[],'events':[]}
asset_items is a dictionary of {'lat_lon':[],'onts':[],'events':[]}
:return: none
"""
asset_dict = {'groom_time': cell['groom_time']}
for this_asset in cell['assets']:
asset_dict[this_asset] = SortedDict()
for this_ont in cell['assets'][this_asset]['onts']:
this_distance = lat_lon_distance(cell['assets'][this_asset]['lat_lon'][0],
cell['assets'][this_asset]['lat_lon'][1],
cell['onts'][this_ont]['lat_lon'][0],
cell['onts'][this_ont]['lat_lon'][1])
for this_event in cell['onts'][this_ont]['events'][0]:
event_key = int(this_event / 1000)
if event_key in asset_dict[this_asset]:
asset_dict[this_asset][event_key]['voters'].update({this_distance: this_ont})
else:
voters = SortedDict()
voters.update({this_distance: this_ont})
asset_dict[this_asset].update({event_key: {'state': 0, 'voters': voters}})
# self.my_local_logger.debug("%d,0,%s,%s,%f" % (event_key, this_ont, this_asset, this_distance)
for this_event in cell['onts'][this_ont]['events'][1]:
event_key = int(this_event / 1000)
if event_key in asset_dict[this_asset]:
asset_dict[this_asset][event_key]['voters'].update({this_distance: this_ont})
else:
voters = SortedDict()
voters.update({this_distance: this_ont})
asset_dict[this_asset].update({event_key: {'state': 1, 'voters': voters}})
# self.my_local_logger.debug("%d,1,%s,%s,%f" % (event_key, this_ont, this_asset, this_distance)
self.asset_dictionary = asset_dict
self.my_local_logger.debug("done with build_in_memory_cell_db")
@staticmethod
def compute_cell_guid(payload, resolution):
"""
Computes a GUID based on the lat lon and time value
"""
# query_guid = payload["query_guid"]
this_lat = payload["latitude"]
this_lon = payload["longitude"]
# utility = payload["company"]
outage_test_time = payload["outageTime"]
# circuit_id = payload["circuitID"]
# asset_id = payload["assetID"]
# votes = payload["votes"]
# spatial = payload["spatial"]
# temporal = payload["temporal"]
# reputation_ena = payload["reputationEnabled"]
# zoom_t = payload["zoomT"]
# zoom_r = payload["zoomR"]
# radius = payload["radius"]
# units = payload["units"]
# The number of decimal points in the lat and lon gridify the guid
fmt_str = "%%4.%df_%%4.%df_%%d" % (resolution, resolution)
this_guid = fmt_str % (this_lat, this_lon, outage_test_time)
cell_guid = this_guid.replace(".", "p").replace("-", "m")
timestamp_guid = "%d" % outage_test_time
return cell_guid, timestamp_guid
def save_cell_in_shared_mem(self, this_cell_guid, cell):
while not self.shared_data_lock.acquire(False):
self.my_local_logger.info('Waiting to acquire lock for shared data.')
time.sleep(g_config.SLEEP_TIME)
self.shared_data['cell_collection_set'].add(this_cell_guid)
self.shared_data['cell_collection_dict'][this_cell_guid] = cell
self.shared_data_lock.release()
def get_shared_data(self, query_type="all", dict_key=None):
my_shared_data = None
if query_type == "all":
while not self.shared_data_lock.acquire(False):
self.my_local_logger.info('groom_outages: waiting to acquire lock for shared data.')
time.sleep(g_config.SLEEP_TIME)
my_shared_data = copy.copy(self.shared_data)
self.shared_data_lock.release()
elif query_type == "cell_collection_dict":
while not self.shared_data_lock.acquire(False):
self.my_local_logger.info('groom_outages: waiting to acquire lock for shared data.')
time.sleep(g_config.SLEEP_TIME)
if dict_key is not None:
my_shared_data = copy.copy(self.shared_data['cell_collection_dict'][dict_key])
else:
my_shared_data = copy.copy(self.shared_data['cell_collection_dict'])
self.shared_data_lock.release()