Skip to content

Commit

Permalink
deeplearning example
Browse files Browse the repository at this point in the history
  • Loading branch information
kanghua309 committed Oct 8, 2017
1 parent 40cf95d commit af7ee04
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 1 deletion.
2 changes: 1 addition & 1 deletion campaign/deeplearn/training.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from zipline.pipeline import Pipeline, enginefrom zipline.pipeline.factors import AverageDollarVolume, Returnsfrom zipline.pipeline.engine import ( ExplodingPipelineEngine, SimplePipelineEngine,)from zipline.algorithm import TradingAlgorithmfrom zipline.data.bundles.core import loadfrom zipline.data.data_portal import DataPortalfrom zipline.finance.trading import TradingEnvironmentfrom zipline.pipeline.data import USEquityPricingfrom zipline.pipeline.loaders import USEquityPricingLoaderfrom zipline.utils.calendars import get_calendarfrom zipline.utils.factory import create_simulation_parametersfrom zipline.utils.cli import Date, Timestampimport pandas as pdimport osimport reDEFAULT_CAPITAL_BASE = 1e5from zipline.pipeline.factors import CustomFactor,Latestfrom zipline.data.bundles import registerfrom zipline.data.bundles.viadb import viadbfrom me.pipeline.factors.tsfactor import Fundamental############################################# bundle #############################################equities1={}register( 'my-db-bundle', # name this whatever you like viadb(equities1), calendar='SHSZ')bundle = 'my-db-bundle'bundle_timestamp = pd.Timestamp.utcnow()environ = os.environbundle_data = load( bundle, environ, bundle_timestamp,)prefix, connstr = re.split( r'sqlite:///', str(bundle_data.asset_finder.engine.url), maxsplit=1, )print prefix, connstrif prefix: raise ValueError( "invalid url %r, must begin with 'sqlite:///'" % str(bundle_data.asset_finder.engine.url), )############################################# trading_environment #############################################trading_calendar = get_calendar("SHSZ")trading_environment = TradingEnvironment(bm_symbol=None, exchange_tz="Asia/Shanghai", trading_calendar=trading_calendar, asset_db_path=connstr)'''first_trading_day = \ bundle_data.equity_minute_bar_reader.first_trading_daydata = DataPortal( trading_environment.asset_finder, get_calendar("SHSZ"), first_trading_day=first_trading_day, equity_minute_reader=bundle_data.equity_minute_bar_reader, equity_daily_reader=bundle_data.equity_daily_bar_reader, adjustment_reader=bundle_data.adjustment_reader,)'''############################################# choose_loader #############################################pipeline_loader = USEquityPricingLoader( bundle_data.equity_daily_bar_reader, bundle_data.adjustment_reader,)def choose_loader(column): if column in USEquityPricing.columns: return pipeline_loader raise ValueError( "No PipelineLoader registered for column %s." % column )#data_frequency = 'daily',#capital_base = DEFAULT_CAPITAL_BASEstart = '2015-9-1'end = '2015-9-9''''sim_params = create_simulation_parameters( capital_base=capital_base, start=Date(tz='utc', as_timestamp=True).parser(start), end=Date(tz='utc', as_timestamp=True).parser(end), data_frequency=data_frequency, trading_calendar=trading_calendar, )'''#print Date(tz='utc', as_timestamp=True).parser(start)#perf_tracker = None# Pull in the environment's new AssetFinder for quick reference#print trading_calendar.all_sessionsyour_engine = SimplePipelineEngine(get_loader=choose_loader, calendar=trading_calendar.all_sessions, asset_finder=trading_environment.asset_finder)def make_pipeline(): h2o = (USEquityPricing.high.latest - USEquityPricing.open.latest) / USEquityPricing.open.latest l2o = (USEquityPricing.low.latest - USEquityPricing.open.latest) / USEquityPricing.open.latest c2o = (USEquityPricing.close.latest - USEquityPricing.open.latest) / USEquityPricing.open.latest h2c = (USEquityPricing.high.latest - USEquityPricing.close.latest) / USEquityPricing.close.latest l2c = (USEquityPricing.low.latest - USEquityPricing.close.latest) / USEquityPricing.close.latest vol = USEquityPricing.volume.latest outstanding = Fundamental(trading_environment.asset_finder).outstanding outstanding.window_safe = True turnover_rate = vol/Latest([outstanding]) pipe_columns = { 'h2o': h2o, 'l2o': l2o, 'c2o': c2o, 'h2c': h2c, 'l2c': l2c, 'vol': vol, 'turnover_rate': turnover_rate, } #pipe_screen = (low_returns | high_returns) pipe = Pipeline(columns=pipe_columns) return pipemy_pipe = make_pipeline()result = your_engine.run_pipeline(my_pipe,Date(tz='utc', as_timestamp=True).parser(start),Date(tz='utc', as_timestamp=True).parser(end))print result
# -*- coding: utf-8 -*-from zipline.pipeline import Pipeline, enginefrom zipline.pipeline.factors import AverageDollarVolume, Returnsfrom zipline.pipeline.data import USEquityPricingfrom zipline.utils.cli import Date, Timestampimport pandas as pdimport numpy as npfrom zipline.pipeline.factors import CustomFactor,Latestfrom me.pipeline.factors.tsfactor import Fundamentalpd.set_option('display.width', 800)from me.helper.research_env import env_isinstanceyour_engine = env_isinstance()def make_pipeline(): h2o = (USEquityPricing.high.latest - USEquityPricing.open.latest) / USEquityPricing.open.latest l2o = (USEquityPricing.low.latest - USEquityPricing.open.latest) / USEquityPricing.open.latest c2o = (USEquityPricing.close.latest - USEquityPricing.open.latest) / USEquityPricing.open.latest h2c = (USEquityPricing.high.latest - USEquityPricing.close.latest) / USEquityPricing.close.latest l2c = (USEquityPricing.low.latest - USEquityPricing.close.latest) / USEquityPricing.close.latest vol = USEquityPricing.volume.latest outstanding = Fundamental(your_engine._finder).outstanding outstanding.window_safe = True turnover_rate = vol/Latest([outstanding]) returns = Returns(inputs=[USEquityPricing.close], window_length=5) pipe_columns = { 'h2o': h2o.zscore(), 'l2o': l2o.zscore(), 'c2o': c2o.zscore(), 'h2c': h2c.zscore(), 'l2c': l2c.zscore(), 'vol': vol.zscore(), 'turnover_rate': turnover_rate.zscore(), 'return':returns.zscore(), } #pipe_screen = (low_returns | high_returns) pipe = Pipeline(columns=pipe_columns) return pipemy_pipe = make_pipeline()start = '2015-9-1'end = '2017-9-11'result = your_engine.run_pipeline(my_pipe,Date(tz='utc', as_timestamp=True).parser(start),Date(tz='utc',as_timestamp=True).parser(end))result = result.reset_index()P = result.pivot_table(index=result['level_0'],columns='level_1',values=['h2o','l2o','c2o','h2c','l2c','vol','turnover_rate','return']) # Make a pivot table from the datami = P.columns.tolist()new_ind = pd.Index(e[1].symbol +'_' + e[0] for e in mi)P.columns = new_indP = P.sort_index(axis=1) # Sort by columnsP.index.name = 'date'clean_and_flat = P.dropna(1) #去掉0列print clean_and_flatprint "*" * 50, "flat result", "*" * 50target_cols = list(filter(lambda x: 'return' in x, clean_and_flat.columns.values))input_cols = list(filter(lambda x: 'return' not in x, clean_and_flat.columns.values))size = len(clean_and_flat)InputDF = clean_and_flat[input_cols][:size]TargetDF = clean_and_flat[target_cols][:size]num_features = len(InputDF.columns)num_stocks = len(TargetDF.columns)print "num stocks %s,last train data %s,first train data %s" % (num_stocks,TargetDF.index[-1],TargetDF.index[0])print "*" * 50, "Training a rnn network", "*" * 50# 生成数据used_size = 252 * 2 # PARAMtest_size = 25 #? 只使用最新100个数据train_X,train_y =InputDF[-used_size:].values, TargetDF[-used_size:].valuestest_X, test_y = InputDF[-test_size:].values, TargetDF[-test_size:].values #TODOtrain_X = train_X.astype(np.float32)train_y = train_y.astype(np.float32)test_X = test_X.astype(np.float32)test_y = test_y.astype(np.float32)print np.shape(train_X),np.shape(train_y)print "Train Set <X:y> shape"print "Train Data Count:%s , Feather Count:%s , Stock Count:%s" % (len(train_X),num_features,num_stocks) # 3300 个股票日? 股票没有那么多,500个#############################################################################################################import tensorflow as tftf.logging.set_verbosity(tf.logging.INFO)from tensorflow.contrib.learn.python.learn.estimators.estimator import SKCompatlearn = tf.contrib.learnRNN_HIDDEN_SIZE = 100 #!!!NUM_LAYERS = 2BATCH_SIZE = 25 #PARMNUM_EPOCHS = 200 # 200 #PARMlr = 0.003NUM_TRAIN_BATCHES = int(len(train_X) / BATCH_SIZE) # 每个epoch的批次数量 , BATCH_SIZE相当于前进步常,其总数为66ATTN_LENGTH = 10dropout_keep_prob=0.5beta = 0def LstmCell(): lstm_cell = tf.nn.rnn_cell.BasicLSTMCell(RNN_HIDDEN_SIZE, state_is_tuple=True) return lstm_celldef makeGRUCells(): cells = [] for i in range(NUM_LAYERS): cell = tf.nn.rnn_cell.GRUCell(num_units=RNN_HIDDEN_SIZE) if len(cells)== 0: # Add attention wrapper to first layer. cell = tf.contrib.rnn.AttentionCellWrapper( cell, attn_length=ATTN_LENGTH, state_is_tuple=True) cell = tf.nn.rnn_cell.DropoutWrapper(cell,output_keep_prob=dropout_keep_prob) cells.append(cell) attn_cell = tf.nn.rnn_cell.MultiRNNCell(cells, state_is_tuple=True) #GRUCell必须false,True 比错 ,如果是BasicLSTMCell 必须True return attn_celldef lstm_model(X, y): cell = makeGRUCells() ''' output, _ = tf.nn.dynamic_rnn( cell, inputs=tf.expand_dims(X, -1), dtype=tf.float32, time_major=False ) ''' split_inputs = tf.reshape(X, shape=[1, BATCH_SIZE, num_features],name="reshape_l1") # Each item in the batch is a time step, iterate through them #print split_inputs split_inputs = tf.unstack(split_inputs, axis=1, name="unpack_l1") output, _ = tf.nn.static_rnn(cell, inputs=split_inputs, dtype=tf.float32 ) output = tf.transpose(output, [1, 0, 2]) output = output[-1] # 通过无激活函数的全连接层,计算就是线性回归,并将数据压缩成一维数组结构 predictions = tf.contrib.layers.fully_connected(output, num_stocks, None) labels = y loss = tf.losses.mean_squared_error(predictions, labels) #print "lost:",loss train_op = tf.contrib.layers.optimize_loss(loss, tf.contrib.framework.get_global_step(), optimizer="Adagrad", learning_rate=lr) return predictions, loss, train_opPRINT_STEPS = 100validation_monitor = learn.monitors.ValidationMonitor(test_X, test_y, every_n_steps=PRINT_STEPS, early_stopping_rounds=1000)# 进行训练regressor = SKCompat(learn.Estimator(model_fn=lstm_model, model_dir="Models/model_0", config=tf.contrib.learn.RunConfig( save_checkpoints_steps=100, save_checkpoints_secs=None, save_summary_steps=100, )))print "Total Train Step: ",NUM_TRAIN_BATCHES * NUM_EPOCHSprint "*" * 50, "Training a rnn regress task now", "*" * 50regressor.fit(train_X, train_y,batch_size=BATCH_SIZE,steps= NUM_TRAIN_BATCHES * NUM_EPOCHS ) # steps=train_labels.shape[0]/batch_size * epochs,print "*" * 50, "Predict tomorrow stock price now", "*" * 50pred = regressor.predict(test_X[-BATCH_SIZE:]) #使用最后25天预测? 未来5天的股票价格date = clean_and_flat.index[-1] #+1? +5? 获取最后一天的数据df = pd.DataFrame(pred[-1:],index=[date],columns=target_cols) #所有的股票数据一次性预测df.index.name = "date"print "*" * 50, "Predicted stock price from last trade day", "*" * 50print df
Expand Down
2 changes: 2 additions & 0 deletions crontab/collect/stock_daily_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-

import tushare as ts
# from sqlalchemy import create_engine
import sqlite3
Expand Down
2 changes: 2 additions & 0 deletions crontab/collect/stock_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-

import tushare as ts
# from sqlalchemy import create_engine
import sqlite3
Expand Down
1 change: 1 addition & 0 deletions me/helper/research_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from zipline.pipeline import Pipeline, enginefrom zipline.pipeline.factors import AverageDollarVolume, Returnsfrom zipline.pipeline.engine import ( ExplodingPipelineEngine, SimplePipelineEngine,)from zipline.algorithm import TradingAlgorithmfrom zipline.data.bundles.core import loadfrom zipline.data.data_portal import DataPortalfrom zipline.finance.trading import TradingEnvironmentfrom zipline.pipeline.data import USEquityPricingfrom zipline.pipeline.loaders import USEquityPricingLoaderfrom zipline.utils.calendars import get_calendarfrom zipline.utils.factory import create_simulation_parametersfrom zipline.utils.cli import Date, Timestampimport pandas as pdimport numpy as npimport osimport reDEFAULT_CAPITAL_BASE = 1e5from zipline.pipeline.factors import CustomFactor,Latestfrom zipline.data.bundles import registerfrom zipline.data.bundles.viadb import viadbfrom me.pipeline.factors.tsfactor import Fundamentalpd.set_option('display.width', 800)def env_isinstance(bundle='my-db-bundle',calendar='SHSZ',exchange_tz="Asia/Shanghai"): ############################################# bundle ############################################# equities1={} register( bundle, # name this whatever you like viadb(equities1), calendar=calendar ) bundle = 'my-db-bundle' bundle_timestamp = pd.Timestamp.utcnow() environ = os.environ bundle_data = load( bundle, environ, bundle_timestamp, ) prefix, connstr = re.split( r'sqlite:///', str(bundle_data.asset_finder.engine.url), maxsplit=1, ) print prefix, connstr if prefix: raise ValueError( "invalid url %r, must begin with 'sqlite:///'" % str(bundle_data.asset_finder.engine.url), ) ############################################# trading_environment ############################################# trading_calendar = get_calendar(calendar) trading_environment = TradingEnvironment(bm_symbol=None, exchange_tz=exchange_tz, trading_calendar=trading_calendar, asset_db_path=connstr) ############################################# choose_loader ############################################# pipeline_loader = USEquityPricingLoader( bundle_data.equity_daily_bar_reader, bundle_data.adjustment_reader, ) def choose_loader(column): if column in USEquityPricing.columns: return pipeline_loader raise ValueError( "No PipelineLoader registered for column %s." % column ) your_engine = SimplePipelineEngine(get_loader=choose_loader, calendar=trading_calendar.all_sessions, asset_finder=trading_environment.asset_finder) #your_engine._finder return your_engine
Expand Down
File renamed without changes.

0 comments on commit af7ee04

Please sign in to comment.