-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathdatasource.py
296 lines (256 loc) · 12.2 KB
/
datasource.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
import time
from typing import List, Tuple
import matplotlib.pyplot as plt
import numpy as np
from fuzzywuzzy import fuzz
from nilmtk import DataSet, MeterGroup
from pandas import DataFrame
from datasources.paths_manager import UK_DALE, REDD
from nilmlab.lab_exceptions import LabelNormalizationError
from utils.logger import timing, TIMING, info, debug
NAME_UK_DALE = 'UK DALE'
NAME_REDD = 'REDD'
SITE_METER = 'Site meter'
class Datasource():
def __init__(self, dataset: DataSet, name: str):
self.dataset = dataset
self.name = name
def get_dataset(self):
return self.dataset
def get_name(self):
return self.name
def read_all_meters(self, start: str, end: str, sample_period: int = 6, building: int = 1) \
-> Tuple[DataFrame, MeterGroup]:
"""
Read the records during the given start and end dates, for all the meters of the given building.
Args:
start (str): The starting date in the format "{month}-{day of month}-{year}" e.g. "05-30-2012".
end (str): The final date in the format "{month}-{day of month}-{year}" e.g. "08-30-2012".
sample_period (int): The sample period of the records.
building (int): The building to read the records from.
Returns:
Returns a tuple containing the respective DataFrame and MeterGroup of the data that are read.
"""
start_time = time.time() if TIMING else None
self.dataset.set_window(start=start, end=end)
elec = self.dataset.buildings[building].elec
timing('NILMTK selecting all meters: {}'.format(round(time.time() - start_time, 2)))
start_time = time.time() if TIMING else None
df = elec.dataframe_of_meters(sample_period=sample_period)
timing('NILMTK converting all meters to dataframe: {}'.format(round(time.time() - start_time, 2)))
df.fillna(0, inplace=True)
return df, elec
def read_selected_appliances(self, appliances: List, start: str, end: str, sample_period=6, building=1,
include_mains=True) -> Tuple[DataFrame, MeterGroup]:
"""
Loads the data of the specified appliances.
Args:
appliances (List): A list of appliances to read their records.
start (str): The starting date in the format "{month}-{day of month}-{year}" e.g. "05-30-2012".
end (str): The final date in the format "{month}-{day of month}-{year}" e.g. "08-30-2012".
sample_period (int): The sample period of the records.
building (int): The building to read the records from.
include_mains (bool): True if should include main meters.
Returns:
Returns a tuple containing the respective DataFrame and MeterGroup of the data that are read.
"""
debug(f" read_selected_appliances {appliances}, {building}, {start}, {end}, {include_mains}")
selected_metergroup = self.get_selected_metergroup(appliances, building, end, start, include_mains)
start_time = time.time() if TIMING else None
df = selected_metergroup.dataframe_of_meters(sample_period=sample_period)
timing('NILMTK converting specified appliances to dataframe: {}'.format(round(time.time() - start_time, 2)))
debug(f"Length of data of read_selected_appliances {len(df)}")
df.fillna(0, inplace=True)
return df, selected_metergroup
def read_mains(self, start, end, sample_period=6, building=1) -> Tuple[DataFrame, MeterGroup]:
"""
Loads the data of the specified appliances.
Args:
start (str): The starting date in the format "{month}-{day of month}-{year}" e.g. "05-30-2012".
end (str): The final date in the format "{month}-{day of month}-{year}" e.g. "08-30-2012".
sample_period (int): The sample period of the records.
building (int): The building to read the records from.
Returns:
Returns a tuple containing the respective DataFrame and MeterGroup of the data that are read.
"""
self.dataset.set_window(start=start, end=end)
mains_meter = self.dataset.buildings[building].elec.mains()
if isinstance(mains_meter, MeterGroup):
mains_metergroup = mains_meter
else:
mains_metergroup = MeterGroup(meters=[mains_meter])
start_time = time.time() if TIMING else None
df = mains_metergroup.dataframe_of_meters(sample_period=sample_period)
timing('NILMTK converting mains to dataframe: {}'.format(round(time.time() - start_time, 2)))
df.fillna(0, inplace=True)
return df, mains_metergroup
# def read_data_of_appliance(self, start, end, sample_period=6, building=1, device=None) -> np.ndarray:
# """
# Reads the data of a specific appliance. If no device is specified then it reads the main meter.
# :param start:
# :type start:
# :param end:
# :type end:
# :param sample_period:
# :type sample_period:
# :param building:
# :type building:
# :param device:
# :type device:
# :return:
# :rtype:
# """
# start_time = time.time() if TIMING else None
#
# power_df = self.read_df(start, end, sample_period, building, device)
# power_data = power_df.values
#
# debug('Power data shape {}'.format(power_data.shape))
# debug('Type of power_data {}'.format(type(power_data)))
# debug('Size of power_data {}'.format(len(power_data)))
# timing('NILMTK reading and getting power series: {}'.format(round(time.time() - start_time, 2)))
# self.clean_nans(power_data)
#
# return power_data
def get_selected_metergroup(self, appliances, building, end, start, include_mains) -> MeterGroup:
"""
Gets a MeterGroup with the specified appliances for the given building during the given dates.
Args:
appliances (List): A list of appliances to read their records.
building (int): The building to read the records from.
start (str): The starting date in the format "{month}-{day of month}-{year}" e.g. "05-30-2012".
end (str): The final date in the format "{month}-{day of month}-{year}" e.g. "08-30-2012".
include_mains (bool): True if should include main meters.
Returns:
A MeterGroup containing the specified appliances.
"""
start_time = time.time() if TIMING else None
self.dataset.set_window(start=start, end=end)
elec = self.dataset.buildings[building].elec
appliances_with_one_meter = []
appliances_with_more_meters = []
for appliance in appliances:
metergroup = elec.select_using_appliances(type=appliances)
if len(metergroup.meters) > 1:
appliances_with_more_meters.append(appliance)
else:
appliances_with_one_meter.append(appliance)
special_metergroup = None
for appliance in appliances_with_more_meters:
inst = 1
if appliance == 'sockets' and building == 3:
inst = 4
if special_metergroup is None:
special_metergroup = elec.select_using_appliances(type=appliance, instance=inst)
else:
special_metergroup = special_metergroup.union(elec.select_using_appliances(type=appliance, instance=1))
selected_metergroup = elec.select_using_appliances(type=appliances_with_one_meter)
selected_metergroup = selected_metergroup.union(special_metergroup)
if include_mains:
mains_meter = self.dataset.buildings[building].elec.mains()
if isinstance(mains_meter, MeterGroup):
if len(mains_meter.meters) > 1:
mains_meter = mains_meter.meters[0]
mains_metergroup = MeterGroup(meters=[mains_meter])
else:
mains_metergroup = mains_meter
else:
mains_metergroup = MeterGroup(meters=[mains_meter])
selected_metergroup = selected_metergroup.union(mains_metergroup)
timing('NILMTK select using appliances: {}'.format(round(time.time() - start_time, 2)))
return selected_metergroup
@staticmethod
def normalize_columns(df: DataFrame, meter_group: MeterGroup, appliance_names: List[str]) -> Tuple[DataFrame, dict]:
"""
It normalizes the names of the columns for compatibility.
Args:
df (DataFrame):
meter_group (MeterGroup):
appliance_names (List[str]):
Returns:
A tuple with a DataFrame and a dictionary mapping labels to ids.
"""
labels = meter_group.get_labels(df.columns)
normalized_labels = []
info(f"Df columns before normalization {df.columns}")
info(f"Labels before normalization {labels}")
for label in labels:
if label == SITE_METER and SITE_METER not in appliance_names:
normalized_labels.append(SITE_METER)
continue
for name in appliance_names:
ratio = fuzz.ratio(label.lower().replace('electric', "").lstrip().rstrip().split()[0],
name.lower().replace('electric', "").lstrip().rstrip().split()[0])
if ratio > 90:
info(f"{name} ~ {label} ({ratio}%)")
normalized_labels.append(name)
if len(normalized_labels) != len(labels):
debug(f"len(normalized_labels) {len(normalized_labels)} != len(labels) {len(labels)}")
raise LabelNormalizationError()
label2id = {l: i for l, i in zip(normalized_labels, df.columns)}
df.columns = normalized_labels
info(f"Normalized labels {normalized_labels}")
return df, label2id
@staticmethod
def rename_columns(df: DataFrame, meter_group: MeterGroup) -> (DataFrame, dict, dict):
"""
Rename columns of the given DataFrame using the respective labels of each meter.
Args:
df (DataFrame):
meter_group (MeterGroup):
Returns:
Returns a DataFrame with renamed columns and two dictionaries to covnert labels to ids and vice versa.
"""
new_columns = []
label2id = dict()
id2label = dict()
for col in df.columns:
try:
meter = meter_group[col]
label = meter.label() + str(col[0])
new_columns.append(label)
label2id[label] = col
id2label[col] = label
except KeyError:
info(f"KeyError key={col}")
df.columns = new_columns
return df, label2id, id2label
# def read_df(self, start, end, sample_period=6, building=1, device=None):
# self.dataset.set_window(start=start, end=end)
# elec = self.dataset.buildings[building].elec
# if device is not None:
# mains = elec.submeters()[device]
# debug('Reading data of {}.'.format(device))
# else:
# mains = elec.mains()
# debug('Reading data of mains.')
# power_df = mains.power_series_all_data(sample_period=sample_period)
# return power_df
@staticmethod
def clean_nans(data):
start_time = time.time() if TIMING else None
np.nan_to_num(data, False)
timing('None to num: {}'.format(round(time.time() - start_time, 2)))
class DatasourceFactory:
"""
It is responsible to create different data sources that are based on various data sets.
"""
@staticmethod
def create_uk_dale_datasource():
return Datasource(DatasourceFactory.get_uk_dale_dataset(), NAME_UK_DALE)
@staticmethod
def get_uk_dale_dataset():
return DataSet(UK_DALE)
@staticmethod
def create_redd_datasource():
return Datasource(DatasourceFactory.get_redd_dataset(), NAME_REDD)
@staticmethod
def get_redd_dataset():
return DataSet(REDD)
def save_and_plot(sequence, plot=False, save_figure=False, filename=None):
if plot or save_figure:
plt.plot(sequence)
if filename is not None and save_figure:
plt.savefig(filename + '.png')
if plot:
plt.show()