forked from tonina/timeseries
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtime_series.py
231 lines (201 loc) · 8.4 KB
/
time_series.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import math
import numpy as np
import pandas as pd
import scipy
import scipy.fftpack
from scipy.cluster.vq import kmeans, vq
from scipy.signal import argrelextrema
from rpy2.robjects import pandas2ri
pandas2ri.activate()
from smoothing import *
from r_packages import *
class TimeSeries:
'''
Class for analyzing of time series. Compute harmonic components, smoothed and reconstructed data, extremes.
'''
def __init__(self, data_list, dates, num_comp=3, num_harm=10, start_harm=2):
'''
Initialize instance of class with data list and corresponding date list.
:param data_list: list of float
:param dates: list of date
:param num_comp: int, number of main periods (harmonic components)
:param num_harm: int, number of first used harmonics
:param start_harm: int, number of start harmonic
:return:
'''
self.data_list = data_list
self.dates = dates
data_frame = pd.DataFrame(data={'Value': data_list}, index=dates)
self.raw_data = data_frame['Value']
self.num_comp = num_comp
self.num_harm = num_harm
self.start_harm = start_harm
self.ssa = rssa.ssa(
pd.DataFrame(data=list(self.raw_data), index=list(self.raw_data.index)))
self._periods = {} # will contain main periods
# will contain dict of list of harmonics for every corresponding period
self._period_harmonics = {}
self._deviations = {} # will contain standard deviation
@property
def deviations(self):
'''
Return std.deviations. Automatically call comp_main_periods() if needed.
'''
if not self._deviations:
self.comp_main_periods()
return self._deviations
@property
def periods(self):
'''
Return periods. Automatically call comp_main_periods() if needed.
'''
if not self._periods:
self.comp_main_periods()
return self._periods
@property
def period_harmonics(self):
'''
Return self._period_harmonics Automatically call comp_main_periods() if needed.
'''
if not self._periods:
self.comp_main_periods()
return self._periods
def get_raw_data(self):
'''
Return raw_data in pandas dataframe format.
:return:
'''
return self.raw_data
def _comp_periods(self):
'''
Return list of first n periods. Compute it by using Fourier transform to every reconstructed harmonic.
:return: list of floats
'''
periods = []
for i in range(self.start_harm, self.num_harm + 1):
r = rssa.reconstruct(self.ssa, base.list(base.c(i)))
signal = np.array(r[0][0])
fft = abs(scipy.fft(signal))
freqs = scipy.fftpack.fftfreq(signal.size)
max_freq = abs(freqs[np.argmax(fft)])
periods.append(1 / max_freq if max_freq != 0 else float('inf'))
return periods
def comp_main_periods(self):
'''
Return list of main periods. It's computed by clustering periods.
:return: tuple of dicts:
dict with component number as key and main periods as value
dict with component number as key and numbers of harmonics as value
'''
periods = self._comp_periods()
harm_data = np.vstack(np.array(periods))
centroids, _ = kmeans(harm_data, self.num_comp)
idx, _ = vq(harm_data, centroids)
# list of lists with number of harmonics
harm_numbers = [[]] * self.num_comp
# list of lists with periods for corresponding harmonics
harm_periods = [[]] * self.num_comp
for i in range(self.num_comp):
periods_copy = periods.copy()
for item in harm_data[idx == i]:
index = periods_copy.index(item)
harm_numbers[i].append(index + self.start_harm)
harm_periods[i].append(periods[index])
periods_copy[index] = 0
# sort main periods and components of period
centroids_list = [item[0] for item in centroids]
centroids_skeys = sorted(range(len(centroids_list) key=centroids_list.__getitem__, reverse=True)
for k in enumerate(centroids_skeys):
self._periods(k + 1)=centroids_list[k]
self._period_harmonics[k + 1]=harm_numbers[k]
self._deviations[k + 1]=np.std(np.array(harm_periods[k]))
return self._periods, self._period_harmonics, self._deviations
def get_main_periods(self, order):
'''
Retrun main periods by order of components.
:param order: tuple of int
:return: dict with component number as keys and periods as values
'''
return {o: self.periods[o] for o in order}
def get_deviations(self, order):
'''
Return standard deviations of main periods.
:param order: tuple of int
:return: dict with component number as keys and standard deviations as values
'''
return {o: self.deviations[o] for o in order}
def get_smooth(self, comp_num):
'''
Compute smoothed graph by component.
:param comp_num: int, component number
:return: smoothed graph in pandas dataframe format
'''
if not self.periods:
self.comp_main_periods()
sm_window = self.periods[comp_num]
smoothed = pd.rolling_mean(self.raw_data, sm_window, min_periods=0)
return smoothed
def get_reconstruct(self, comp_num):
'''
Compute reconstructed graph by component.
:param comp_num: int, component number
:return: reconstructed timeseries in pandas dataframe format
'''
comp_list = [
1, ] # include trend component to harmonic components for reconstruction
for i in range(comp_num):
comp_list = comp_list + self.period_harmonics[i + 1]
r = rssa.reconstruct(self.ssa, base.list(base.c(*comp_list)))
row = r[0][0]
row_frame = pd.DataFrame(data={'Value': row}, index=self.dates)
reconstructed = row_frame['Value']
return reconstructed
def get_recon_harm(self, comp_num):
'''
Compute reconstructed harmonics by component.
:param comp_num: int, component number
:return: reconsturcted timeseries in pandas dataframe format
'''
comp_list = self.period_harmonics[comp_num]
r = rssa.reconstruct(self.ssa, base.list(base.c(*comp_list)))
row = r[0][0]
row_frame = pd.DataFrame(data={'Value': row}, index=self.dates)
reconstructed = row_frame['Value']
return reconstructed
def local_extremes(self, timeseries, golay_window=51, golay_order=3):
'''
Defines local maximum and minimums of timeseries.
Return maximums and minimums in pandas dataframe format.
:param timeseries: timeseries in pandas dataframe format, timeseries for looking of extremes
:param golay_window: int, window of smoothing for savitzky_golay filter
:param golay order: int, polynomial order of savitzky_golay filter
:return: tuple of dataframe
'''
if golay_window % 2 == 0:
golay_window = golay_window + 1
smooths = savitzky_golay(timeseries, golay_window, golay_order)
# compute maximums
maxs = argrelextrema(smooths, np.greater)
max_series = timeseries[maxs[0]]
# compute minimums
mins = argrelextrema(smooths, np.less)
min_series = timeseries[mins[0]]
return max_series, min_series
def trend(self, last_max, last_min, current_point):
'''
Compute and return trend rate and trend angle from last extreme.
:param last_max: tuple of date and float
:param last_min: tuple of date and float
:param current_point: tuple of date and float
:return: tuple of str, float and float, type of extreme, trend rate, trend angle
'''
if last_max[0] > last_min[0]:
extreme_type = 'max'
last_extreme = last_max
else:
extreme_type = 'min'
last_extreme = last_min
days_delta = len(self.raw_data[self.raw_data.index > last_extreme[0]])
trend_rate = (current_point[1] - last_extreme[1]) / days_delta
trend_angle = 10000 * 180 * (math.atan(trend_rate)) / math.pi
return extreme_type, trend_rate, trend_angle