-
Notifications
You must be signed in to change notification settings - Fork 0
/
run_predict.py
66 lines (52 loc) · 2.11 KB
/
run_predict.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import datetime
import os
from datetime import date
import luigi
import mlflow.pyfunc
import numpy as np
import pandas as pd
import yaml
from src.get_data import get_ticker_list, get_tickers_data
from src.preprocessing import make_stationary_series, compute_lags
from src.tda import transform_data_to_tda
config_path = os.path.join('config/config.yaml')
config = yaml.safe_load(open(config_path))['predict']
mlflow.set_tracking_uri(config['mlflow']['tracking_uri'])
class PredictionTask(luigi.Task):
def requires(self):
return None
def output(self):
return None
def run(self):
result_path = 'data/prediction.csv'
sp500_tickers = get_ticker_list(is_random=True, size=5)
data = get_tickers_data(sp500_tickers, config['data']['period'], config['data']['info_keys'])
grouped = data.groupby('symbol')
result = {}
for key, group in grouped:
df = group.drop(
config['data']['info_keys'],
axis=1
).dropna()
df = make_stationary_series(df)
df.loc[df.shape[0]] = [0, 0, 0]
df = compute_lags(df).dropna()
y = df['log_diff'].tail(5)
X = df.drop(['log_diff', 'log', 'price'], axis=1).tail(5)
model = mlflow.pyfunc.load_model(
model_uri=f"models:/rf-model-{key}/Staging"
)
X_test, y_test = transform_data_to_tda(X, y, artefacts_postfix=key)
prediction = df.iloc[-2]['price'] + np.exp(model.predict(X_test))
result[key] = prediction[0]
today = date.today() + datetime.timedelta(days=1)
today = today.strftime('%Y-%m-%d')
if os.path.exists(result_path):
df = pd.read_csv(result_path)
result = pd.DataFrame(result.items(), columns=['shortName', today])
df = df.merge(result, how='inner', on='shortName')
df.to_csv(result_path, index=False)
else:
pd.DataFrame(result.items(), columns=['shortName', today]).to_csv(result_path, index=False)
if __name__ == '__main__':
luigi.build([PredictionTask()])