-
Notifications
You must be signed in to change notification settings - Fork 867
/
base.py
145 lines (105 loc) · 3.94 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# Copyright 2014-2015 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
import abc
import os
import pandas
import sys
from datetime import datetime
from nab.util import createPath, getProbationPeriod
class AnomalyDetector(object, metaclass=abc.ABCMeta):
"""
Base class for all anomaly detectors. When inheriting from this class please
take note of which methods MUST be overridden, as documented below.
"""
def __init__( self,
dataSet,
probationaryPercent):
self.dataSet = dataSet
self.probationaryPeriod = getProbationPeriod(
probationaryPercent, dataSet.data.shape[0])
self.inputMin = self.dataSet.data["value"].min()
self.inputMax = self.dataSet.data["value"].max()
def initialize(self):
"""Do anything to initialize your detector in before calling run.
Pooling across cores forces a pickling operation when moving objects from
the main core to the pool and this may not always be possible. This function
allows you to create objects within the pool itself to avoid this issue.
"""
pass
def getAdditionalHeaders(self):
"""
Returns a list of strings. Subclasses can add in additional columns per
record.
This method MAY be overridden to provide the names for those
columns.
"""
return []
@abc.abstractmethod
def handleRecord(self, inputData):
"""
Returns a list [anomalyScore, *]. It is required that the first
element of the list is the anomalyScore. The other elements may
be anything, but should correspond to the names returned by
getAdditionalHeaders().
This method MUST be overridden by subclasses
"""
raise NotImplementedError
def getHeader(self):
"""
Gets the outputPath and all the headers needed to write the results files.
"""
headers = ["timestamp",
"value",
"anomaly_score"]
headers.extend(self.getAdditionalHeaders())
return headers
def run(self):
"""
Main function that is called to collect anomaly scores for a given file.
"""
headers = self.getHeader()
rows = []
for i, row in self.dataSet.data.iterrows():
inputData = row.to_dict()
detectorValues = self.handleRecord(inputData)
# Make sure anomalyScore is between 0 and 1
if not 0 <= detectorValues[0] <= 1:
raise ValueError(
f"anomalyScore must be a number between 0 and 1. "
f"Please verify if '{self.handleRecord.__qualname__}' method is "
f"returning a value between 0 and 1")
outputRow = list(row) + list(detectorValues)
rows.append(outputRow)
# Progress report
if (i % 1000) == 0:
print(".", end=' ')
sys.stdout.flush()
ans = pandas.DataFrame(rows, columns=headers)
return ans
def detectDataSet(args):
"""
Function called in each detector process that run the detector that it is
given.
@param args (tuple) Arguments to run a detector on a file and then
"""
(i, detectorInstance, detectorName, labels, outputDir, relativePath) = args
relativeDir, fileName = os.path.split(relativePath)
fileName = detectorName + "_" + fileName
outputPath = os.path.join(outputDir, detectorName, relativeDir, fileName)
createPath(outputPath)
print("%s: Beginning detection with %s for %s" % \
(i, detectorName, relativePath))
detectorInstance.initialize()
results = detectorInstance.run()
# label=1 for relaxed windows, 0 otherwise
results["label"] = labels
results.to_csv(outputPath, index=False)
print("%s: Completed processing %s records at %s" % \
(i, len(results.index), datetime.now()))
print("%s: Results have been written to %s" % (i, outputPath))