from System.Collections.Generic import List
from QuantConnect.Data.UniverseSelection import *
import math
class BasicTemplateAlgorithm(QCAlgorithm):
def __init__(self):
# please adjust your parameters below:
self.price_above = 10 # select only sotcks with price higher than this number
self.top_volume = 0.15 # percentage of top volume stocks to choose
self.market_open_minute = 15 # trade execution time.
self.first_gap_minute = 15 # if this variable == self.market_open_minute, the initia price for calculating the first gap would be yesterday close price.
self.second_gap_days = 5 # previous trading day to calculate the second gap
self.rebalance_day = 2 # holding period for each stock
self.first_selection_num = 3 # number of stocks to select according to the first gap
self.second_selection_num = 2 # number of stocks to buy
self.leverage = 0.2 # Leverage setting
self.holdings = []
def Initialize(self):
self.SetCash(100000) # initial cash
self.SetStartDate(2016,5,1)
self.SetEndDate(2016,6,1)
# Add spy as the benchmark for market open
self.AddUniverse(self.CoarseSelectionFunction)
self.spy = self.AddEquity("SPY", Resolution.Minute).Symbol
def CoarseSelectionFunction(self, coarse):
# select stocks with prices higher than a certain level.
selected = [x for x in coarse if float(x.Price) > self.price_above]
sortedByVolume = sorted(selected, key=lambda x: x.Volume, reverse=True)
top = sortedByVolume[:int(math.floor(self.top_volume*len(sortedByVolume)))]
list = List[Symbol]()
for x in top:
list.Add(x.Symbol)
return list
def OnData(self, slice):
self.data = slice
if self.Time.hour != 9 or self.Time.minute != 30+self.market_open_minute:
return
self.symbols = [x.Symbol for x in self.Portfolio.Values]
if len(self.symbols) == 1:
return
# select according to the first gap
for i in self.symbols:
if not self.data.ContainsKey(i):
continue
history = self.History(i, self.first_gap_minute+1, Resolution.Minute)
if 'close' not in history.columns:
continue
_price = float(history.close[0])
i.gap = float(self.data[i].Price)/_price - 1
self.symbols = [x for x in self.symbols if hasattr(x,'gap')]
self.symbols.sort(key = lambda x: x.gap, reverse = True)
selected = self.symbols[:self.first_selection_num]
# select according to the second gap
for i in selected:
long_history = self.History(i, self.second_gap_days, Resolution.Daily)
if 'close' not in long_history.columns:
continue
long_his_price = long_history.close[0]
i.long_gap = float(self.data[i].Price)/long_his_price - 1
i.total_gap = i.gap + i.long_gap
# sorted by total gap
selected = [x for x in selected if hasattr(x,'total_gap')]
selected.sort(key = lambda x: x.total_gap, reverse = True)
final = selected[:self.second_selection_num]
# log holdings information
for i in self.holdings:
i.holding_day +=1
self.Log('Equity %s current %f, cost basis %f'%(str(i).split(' ')[0], float(self.data[i].Price), i.cost))
if i.holding_day == self.rebalance_day:
self.Log('sell %s at %f, cost basis %f'%(str(i).split(' ')[0], float(self.data[i].Price), i.cost))
self.Liquidate(i)
self.holdings = [x for x in self.holdings if self.Portfolio[x].Invested]
# long stocks and log out long information.
for i in final:
i.cost = float(self.data[i].Price)
self.Log('long %s, at price %f, target percentage %f'%(str(i).split(' ')[0], i.cost, self.leverage/(self.second_selection_num*self.rebalance_day)))
self.SetHoldings(i, self.leverage/(self.second_selection_num*self.rebalance_day))
i.holding_day = 0
self.holdings.append(i)