Overall Statistics |
Total Trades 4434 Average Win 0.15% Average Loss -0.09% Compounding Annual Return -56.833% Drawdown 29.600% Expectancy -0.285 Net Profit -29.621% Sharpe Ratio -3.084 Probabilistic Sharpe Ratio 0.000% Loss Rate 74% Win Rate 26% Profit-Loss Ratio 1.76 Alpha -0.427 Beta 0.226 Annual Standard Deviation 0.141 Annual Variance 0.02 Information Ratio -2.152 Tracking Error 0.184 Treynor Ratio -1.925 Total Fees $21535.43 Estimated Strategy Capacity $24000000.00 Lowest Capacity Asset VFC R735QTJ8XC9X |
from AlgorithmImports import * import numpy as np import pandas as pd import statistics as stat from scipy import stats from datetime import timedelta class ATH3Alpha(AlphaModel): def __init__(self, algorithm, rolling_window_size, add_linear_trend=True, maximumDrawdownPercent=0.3, tpPercent=0.5, buy_bond=False): # init self.algorithm = algorithm self.rolling_window_size = rolling_window_size self.add_linear_trend = add_linear_trend self.Data = {} self.trailingHighs = dict() self.profitTakeData = { } # parameters self.maximumDrawdownPercent = -abs(maximumDrawdownPercent) self.tp_percent = abs(tpPercent) self.buy_bond = buy_bond # add bond data if self.buy_bond: self.bond = algorithm.AddEquity("TLT", Resolution.Hour).Symbol # warm up # algorithm.SetWarmUp(100, Resolution.Daily) def Update(self, algorithm, data): # Create empty list of insights insights = [] # algorithm.Debug(f"Portflio Cash: {algorithm.Portfolio.Cash}") # algorithm.Debug(f"Portflio Usettle: {algorithm.Portfolio.UnsettledCash}") # algorithm.Debug(f"Portflio Invested: {algorithm.Portfolio.Invested}") # algorithm.Debug(f"Portflio TotalHoldingsValue: {algorithm.Portfolio.TotalHoldingsValue}") # algorithm.Debug(f"Portflio MarginRemaining: {algorithm.Portfolio.MarginRemaining}") # algorithm.Debug(f"Portflio TotalPortfolioValue: {algorithm.Portfolio.TotalPortfolioValue}") # stop time is time after which we don't trade stop_time = algorithm.Time.replace(hour=15, minute=30) if algorithm.Time > stop_time: return insights # buy bond if self.buy_bond: if not algorithm.Portfolio[self.bond].Invested: insights.append(Insight.Price(self.bond, timedelta(days=999999), InsightDirection.Up, confidence=0.1)) # update windows for symbol in self.Data.keys(): # check if there is the data for the symbol: if not continue if not data.Bars.ContainsKey(symbol): continue if not data.ContainsKey(symbol): continue if data[symbol] is None: continue if not self.Data[symbol].close_window_day.IsReady: continue if not self.Data[symbol].high_window_day.IsReady: continue if not self.Data[symbol].high_window_month.IsReady: continue if not self.Data[symbol].high_window_week.IsReady: continue if self.buy_bond: if symbol == self.bond: continue # update rolling window; add new close price # if self.usevol: # self.Data[symbol].update(data[symbol].Close, data[symbol].High, data[symbol].Volume) # util values close_current = data[symbol].Close week_high = self.Data[symbol].high_window_week[0] # 0 or 1 ? month_high = self.Data[symbol].high_window_month[0] # 0 or 1 ? day_high = self.Data[symbol].high_window_day[0] # 0 or 1 ? # if self.usevol: # volume_growth = self.Data[symbol].volume_window[0] / (self.Data[symbol].volume_window[7] + 1) - 1 # trading rule signal = close_current > week_high and close_current > month_high and close_current > day_high and (close_current / month_high - 1) < 0.2 if not algorithm.Portfolio[symbol].Invested and signal and algorithm.Portfolio.Cash > 10000: # and volume_growth< 0 # remove symbol from trailing stop object self.trailingHighs.pop(symbol, None) self.profitTakeData.pop(symbol, None) if self.add_linear_trend: # linear regression calculation y = list(self.Data[symbol].close_window_day)[1:self.Data[symbol].close_window_day.Count] x = np.arange(len(y)) y.reverse() y = np.array(y) log_y = np.log(y) slope, intercept, r_value, p_value, std_err = stats.linregress(x, log_y) # self.Debug(f"R value: {r_value}") if r_value < 0.3: continue # standard deviation std_ = stat.stdev(list(self.Data[symbol].close_window_day)[1:self.Data[symbol].close_window_day.Count]) # orders # algorithm.Debug(f"TEST: {(close_current / month_high - 1)}") insights.append(Insight.Price(symbol, timedelta(days=999999), InsightDirection.Up, confidence=r_value)) # algorithm.StopMarketOrder(symbol, -quantity, close_current - (std_ * self.stop_loss_factor)) # algorithm.LimitOrder(symbol, -quantity, close_current + (std_ * self.profit_take_factor)) # We can move the whole TrailingStopRiskManagementModel here for checking and emitting flat insight ################ REMOVE SL/PT FOR NOW ################ elif algorithm.Portfolio[symbol].Invested: # Test if we cross 10% growth mean_daily_return = np.mean(np.diff(np.log(np.flip(np.array(list(self.Data[symbol].close_window_day)))))) # algorithm.Debug(algorithm.Portfolio[symbol].Price / algorithm.Portfolio[symbol].AveragePrice) if (algorithm.Portfolio[symbol].Price / algorithm.Portfolio[symbol].AveragePrice) < (mean_daily_return * 2): return else: self.maximumDrawdownPercent = -mean_daily_return # Add newly invested securities if symbol not in self.trailingHighs and symbol not in self.profitTakeData: self.trailingHighs[symbol] = algorithm.Securities[symbol].Holdings.AveragePrice # Set to average holding cost self.profitTakeData[symbol] = algorithm.Securities[symbol].Holdings.AveragePrice # Set to average holding cost continue # Check for new highs and update - set to tradebar high if self.trailingHighs[symbol] < data[symbol].High: self.trailingHighs[symbol] = data[symbol].High continue # Check for securities past the drawdown limit securityHigh = self.trailingHighs[symbol] drawdown = (data[symbol].Low / securityHigh) - 1 if drawdown < self.maximumDrawdownPercent: # liquidate insights.append(Insight.Price(symbol, Expiry.EndOfMonth, InsightDirection.Flat, confidence=1)) # profit take profit_percent = data[symbol].High / self.profitTakeData[symbol] - 1 if profit_percent >= self.tp_percent: # liquidate insights.append(Insight.Price(symbol, Expiry.EndOfMonth, InsightDirection.Flat, confidence=1)) ################ REMOVE SL/PT FOR NOW ################ ################ REMOVE SL/PT FOR NOW ################ # for kvp in algorithm.Securities: # symbol = kvp.Key # security = kvp.Value # # Remove if not invested # if not security.Invested: # self.trailingHighs.pop(symbol, None) # continue # # Add newly invested securities # if symbol not in self.trailingHighs: # self.trailingHighs[symbol] = security.Holdings.AveragePrice # Set to average holding cost # continue # # Check for new highs and update - set to tradebar high # if self.trailingHighs[symbol] < security.High: # self.trailingHighs[symbol] = security.High # continue # # Check for securities past the drawdown limit # securityHigh = self.trailingHighs[symbol] # drawdown = (security.Low / securityHigh) - 1 # if drawdown < self.maximumDrawdownPercent: # # liquidate # insights.append(Insight.Price(symbol, Expiry.EndOfMonth, InsightDirection.Flat)) ################ REMOVE SL/PT FOR NOW ################ return insights def OnSecuritiesChanged(self, algorithm, changes): # action on added securities for security in changes.AddedSecurities: symbol = security.Symbol if symbol not in self.Data: self.Data[symbol] = SymbolData(algorithm, symbol) # self.Data[symbol].WarmUpIndicators(algorithm, historyFastfive.loc[ticker], \ # historySlowfive.loc[ticker], historySlowtwo.loc[ticker], \ # historySlowone.loc[ticker]) # action on removed securities for security in changes.RemovedSecurities: symbol = security.Symbol if symbol in self.Data: if self.Data.pop(symbol, None) is not None: algorithm.Liquidate(symbol, 'Removed from universe') class SymbolData(object): def __init__(self, algorithm, symbol): self.symbol = symbol self.algorithm = algorithm # define windows self.close_window_day = RollingWindow[float](22) self.high_window_day = RollingWindow[float](22) self.volume_window_day = RollingWindow[float](22) self.high_window_week = RollingWindow[float](2) self.high_window_month = RollingWindow[float](2) # consolidators self.dayConsolidator = TradeBarConsolidator(timedelta(days = 1)) self.weekConsolidator = TradeBarConsolidator(timedelta(days = 5)) self.monthConsolidator = TradeBarConsolidator(timedelta(days = 22)) self.dayConsolidator.DataConsolidated += self.OnDataConsolidatedDaily self.weekConsolidator.DataConsolidated += self.OnDataConsolidatedWeek self.monthConsolidator.DataConsolidated += self.OnDataConsolidatedMonth algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.dayConsolidator) algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.weekConsolidator) algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.monthConsolidator) # warm up history = self.algorithm.History([self.symbol], 22 * 4, Resolution.Daily) if history.shape[0] == 0: self.algorithm.Log('DataFrame is empty!') return for tuple in history.loc[self.symbol].itertuples(): tradebar = TradeBar(tuple.Index, self.symbol, tuple.open, tuple.high, tuple.low, tuple.close, tuple.volume) self.dayConsolidator.Update(tradebar) self.weekConsolidator.Update(tradebar) self.monthConsolidator.Update(tradebar) # day # DayConsolidator = TradeBarConsolidator(timedelta(days=1)) # DayConsolidator.DataConsolidated += self.OnDataConsolidated # self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, DayConsolidator) # # week # WeekConsolidator = TradeBarConsolidator(timedelta(weeks=1)) # WeekConsolidator.DataConsolidated += self.OnDataConsolidatedWeek # self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, WeekConsolidator) # # month # MonthConsolidator = TradeBarConsolidator(timedelta(weeks=1)) # MonthConsolidator.DataConsolidated += self.OnDataConsolidatedMonth # self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, MonthConsolidator) # old # self.algorithm.Consolidate(self.symbol, Resolution.Daily, self.OnDataConsolidated) # self.algorithm.Consolidate(self.symbol, Calendar.Weekly, self.OnDataConsolidatedWeek) # self.algorithm.Consolidate(self.symbol, Calendar.Monthly, self.OnDataConsolidatedMonth) # warm up all windows # history = self.algorithm.History([self.symbol], 22, Resolution.Daily) # if history.shape[0] == 0: # self.algorithm.Log('DataFrame is empty!') # return # else: # for time, row in history.loc[self.symbol].iterrows(): # self.close_window_day.Add(row["close"]) # self.high_window_day.Add(row["high"]) # self.volume_window_day.Add(row["volume"]) # def WarmUpIndicators(self, algorithm, history): # for tuple in history.itertuples(): # tradebar = TradeBar(tuple.Index, self.Symbol, tuple.open, tuple.high, tuple.low, tuple.close, tuple.volume) # self.dayConsolidator.Update(tradebar) # self.weekConsolidator.Update(tradebar) # self.high_window_month.Update(tradebar) # def RegisterIndicator(self, algorithm): # algorithm.RegisterIndicator(self.Symbol, self.fastFiveEMA, self.dayConsolidator, Field.Close) # algorithm.RegisterIndicator(self.Symbol, self.slowFiveEMA, self.fiveMinuteConsolidator, Field.Close) # algorithm.RegisterIndicator(self.Symbol, self.slowFiveSMA, self.fiveMinuteConsolidator, Field.Close) # def update(self, close, high, volume): # self.volume_window.Add(volume) def OnDataConsolidatedDaily(self, sender, consolidated): self.high_window_day.Add(consolidated.High) self.close_window_day.Add(consolidated.Close) self.volume_window_day.Add(consolidated.Volume) def OnDataConsolidatedWeek(self, sender, consolidated): self.high_window_week.Add(consolidated.High) def OnDataConsolidatedMonth(self, sender, consolidated): self.high_window_month.Add(consolidated.High)
from AlgorithmImports import * import numpy as np import pandas as pd import statistics as stat from scipy import stats from datetime import timedelta class ATHAlpha(AlphaModel): def __init__(self, algorithm, rolling_window_size, add_linear_trend=True, maximumDrawdownPercent=0.3, tpPercent=0.5): # init self.algorithm = algorithm self.rolling_window_size = rolling_window_size self.add_linear_trend = add_linear_trend self.Data = {} self.trailingHighs = dict() self.profitTakeData = { } # parameters self.maximumDrawdownPercent = -abs(maximumDrawdownPercent) self.tp_percent = abs(tpPercent) def Update(self, algorithm, data): # Create empty list of insights insights = [] # update windows for symbol in self.Data.keys(): # check if there is the data for the symbol: if not continue if not data.Bars.ContainsKey(symbol): continue if not data.ContainsKey(symbol): continue if data[symbol] is None: continue if not self.Data[symbol].high_window.IsReady: continue self.Data[symbol].update(data[symbol].Close, data[symbol].High) # util values close_current = data[symbol].Close rolling_month_high = max(list(self.Data[symbol].high_window)[1:self.Data[symbol].high_window.Count]) # trading rule signal = close_current > rolling_month_high if not algorithm.Portfolio[symbol].Invested and signal: # remove symbol from trailing stop object self.trailingHighs.pop(symbol, None) self.profitTakeData.pop(symbol, None) if self.add_linear_trend: # linear regression calculation y = list(self.Data[symbol].close_window)[1:self.Data[symbol].close_window.Count] x = np.arange(len(y)) y.reverse() y = np.array(y) log_y = np.log(y) slope, intercept, r_value, p_value, std_err = stats.linregress(x, log_y) if r_value < 0.3: continue # standard deviation std_ = stat.stdev(list(self.Data[symbol].close_window)[1:self.Data[symbol].close_window.Count]) # orders insights.append(Insight.Price(symbol, timedelta(days=999999), InsightDirection.Up, confidence=r_value)) ################ REMOVE SL/PT FOR NOW ################ elif algorithm.Portfolio[symbol].Invested: # Add newly invested securities if symbol not in self.trailingHighs and symbol not in self.profitTakeData: self.trailingHighs[symbol] = algorithm.Securities[symbol].Holdings.AveragePrice # Set to average holding cost self.profitTakeData[symbol] = algorithm.Securities[symbol].Holdings.AveragePrice # Set to average holding cost continue # Check for new highs and update - set to tradebar high if self.trailingHighs[symbol] < data[symbol].High: self.trailingHighs[symbol] = data[symbol].High continue # Check for securities past the drawdown limit securityHigh = self.trailingHighs[symbol] drawdown = (data[symbol].Low / securityHigh) - 1 if drawdown < self.maximumDrawdownPercent: # liquidate insights.append(Insight.Price(symbol, Expiry.EndOfMonth, InsightDirection.Flat, confidence=1)) # profit take profit_percent = data[symbol].High / self.profitTakeData[symbol] - 1 if profit_percent >= self.tp_percent: # liquidate insights.append(Insight.Price(symbol, Expiry.EndOfMonth, InsightDirection.Flat, confidence=1)) ################ REMOVE SL/PT FOR NOW ################ return insights def OnSecuritiesChanged(self, algorithm, changes): # action on added securities for security in changes.AddedSecurities: symbol = security.Symbol if symbol not in self.Data: self.Data[symbol] = SymbolData(algorithm, symbol) # action on removed securities for security in changes.RemovedSecurities: symbol = security.Symbol if symbol in self.Data: if self.Data.pop(symbol, None) is not None: algorithm.Liquidate(symbol, 'Removed from universe') class SymbolData(object): def __init__(self, algorithm, symbol): self.symbol = symbol self.algorithm = algorithm self.close_window = RollingWindow[float](8 * 22) self.high_window = RollingWindow[float](8 * 22) # Rolling windows for universe frequency history = self.algorithm.History([self.symbol], 8 * 22, Resolution.Hour) if history.shape[0] == 0: self.algorithm.Log('DataFrame is empty!') pass else: for time, row in history.loc[self.symbol].iterrows(): self.close_window.Add(row["close"]) self.high_window.Add(row["high"]) def update(self, close, high): self.close_window.Add(close) self.high_window.Add(high)
from AlgorithmImports import * from EqualWeightingPortfolioConstructionModel import EqualWeightingPortfolioConstructionModel, PortfolioBias class InsightWeightingPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel): '''Provides an implementation of IPortfolioConstructionModel that generates percent targets based on the Insight.Weight. The target percent holdings of each Symbol is given by the Insight.Weight from the last active Insight for that symbol. For insights of direction InsightDirection.Up, long targets are returned and for insights of direction InsightDirection.Down, short targets are returned. If the sum of all the last active Insight per symbol is bigger than 1, it will factor down each target percent holdings proportionally so the sum is 1. It will ignore Insight that have no Insight.Weight value.''' def __init__(self, rebalance = Resolution.Daily, portfolioBias = PortfolioBias.LongShort): '''Initialize a new instance of InsightWeightingPortfolioConstructionModel Args: rebalance: Rebalancing parameter. If it is a timedelta, date rules or Resolution, it will be converted into a function. If None will be ignored. The function returns the next expected rebalance time for a given algorithm UTC DateTime. The function returns null if unknown, in which case the function will be called again in the next loop. Returning current time will trigger rebalance. portfolioBias: Specifies the bias of the portfolio (Short, Long/Short, Long)''' super().__init__(rebalance, portfolioBias) def ShouldCreateTargetForInsight(self, insight): '''Method that will determine if the portfolio construction model should create a target for this insight Args: insight: The insight to create a target for''' # Ignore insights that don't have Weight value return insight.Weight is not None def DetermineTargetPercent(self, activeInsights): '''Will determine the target percent for each insight Args: activeInsights: The active insights to generate a target for''' result = {} # We will adjust weights proportionally in case the sum is > 1 so it sums to 1. weightSums = sum(self.GetValue(insight) for insight in activeInsights if self.RespectPortfolioBias(insight)) weightFactor = 1.0 if weightSums > 1: weightFactor = 1 / weightSums return result for insight in activeInsights: result[insight] = (insight.Direction if self.RespectPortfolioBias(insight) else InsightDirection.Flat) * self.GetValue(insight) * weightFactor return result def GetValue(self, insight): '''Method that will determine which member will be used to compute the weights and gets its value Args: insight: The insight to create a target for Returns: The value of the selected insight member''' return insight.Weight
#region imports # from AlgorithmImports import * #endregion # #Imports # from itertools import groupby # from datetime import datetime, timedelta # from pytz import utc # from clr import AddReference # AddReference("System") # AddReference("QuantConnect.Common") # AddReference("QuantConnect.Algorithm") # AddReference("QuantConnect.Algorithm.Framework") # from QuantConnect import Resolution, Extensions # from QuantConnect.Algorithm.Framework.Portfolio import * # from QuantConnect.Algorithm.Framework.Risk import * # #Global variables # Zero = int(0) # class RiskManagement(RiskManagementModel): # def __init__(self): # ''' # Initialization variables # ''' # # Long Position Variables # self.LongTrail = {} # self.LongTrailingDrawdown = float(0.1) # # Short Position Variables # self.ShortTrail = {} # self.ShortTrailingDrawdown = float(0.1) # def ManageRisk(self, algorithm, targets): # ''' # Main risk management handler. Passes algorithm and targets # ''' # RiskAdjustedTargets = [] # for asset in self.LongTrail: # if not algorithm.Portfolio[asset].Invested: # self.LongTrail[asset] = [algorithm.Securities[asset].Price, 0] # # for asset in self.ShortTrail: # # if not algorithm.Portfolio[asset].Invested: # # self.ShortTrail[asset] = [algorithm.Securities[asset].Price, 0] # invested = [x.Key for x in algorithm.Portfolio if x.Value.Invested] # if invested: # for asset in invested: # if algorithm.Portfolio[asset].IsLong: # if asset not in self.LongTrail or self.LongTrail[asset][1] == 0: # self.LongTrail[asset] = [algorithm.Portfolio[asset].Price, algorithm.Portfolio[asset].Quantity] # # elif algorithm.Portfolio[asset].IsShort: # # if asset not in self.ShortTrail or self.ShortTrail[asset][1] == 0: # # self.ShortTrail[asset] = [algorithm.Portfolio[asset].Price, algorithm.Portfolio[asset].Quantity] # self.TrailingStop(algorithm, asset, RiskAdjustedTargets) # return RiskAdjustedTargets # def TrailingStop(self, algorithm, asset, RiskAdjustedTargets): # ''' # Manages trailing stop for both long and short assets respectively # ''' # if algorithm.Portfolio[asset].IsLong: # if algorithm.Portfolio[asset].Price > self.LongTrail[asset][0]: # self.LongTrail[asset][0] = algorithm.Portfolio[asset].Price # elif algorithm.Portfolio[asset].Price / self.LongTrail[asset][0] < (1-self.LongTrailingDrawdown): # RiskAdjustedTargets.append(PortfolioTarget(asset, 0)) # algorithm.Debug(f'Long trailing Stop Triggered for {asset}. Current Price: {algorithm.Portfolio[asset].Price} | Highest Price: {self.LongTrail[asset][0]} | Loss: {algorithm.Portfolio[asset].Price / self.LongTrail[asset][0]} | Date: {algorithm.Time}') # self.LongTrail.pop(asset) # # if algorithm.Portfolio[asset].IsShort: # # if algorithm.Portfolio[asset].Price < self.ShortTrail[asset][0]: # # self.ShortTrail[asset][0] = algorithm.Portfolio[asset].Price # # elif algorithm.Portfolio[asset].Price / self.ShortTrail[asset][0] > 1 / (1-self.ShortTrailingDrawdown): # # RiskAdjustedTargets.append(PortfolioTarget(asset, 0)) # # algorithm.Debug(f'Short trailing Stop Triggered for {asset}. Current Price: {algorithm.Portfolio[asset].Price} | Lowest Price: {self.ShortTrail[asset][0]} | Loss: {algorithm.Portfolio[asset].Price / self.ShortTrail[asset][0]} | Date: {algorithm.Time}') # # self.ShortTrail.pop(asset) # return RiskAdjustedTargets # def ProfitTake(self, algorithm, asset, RiskAdjustedTargets): # ''' # Manages profit take for long assets # ''' # if algorithm.Portfolio[asset].IsLong: # if algorithm.Portfolio[asset].Price > self.LongTrail[asset][0]: # return
from AlgorithmImports import * # from ConfidenceWeightedPortfolioConstructionModel import ConfidenceWeightedPortfolioConstructionModel # Algo framework modules from ATH3Alpha import ATH3Alpha from ATHAlpha import ATHAlpha from datetime import timedelta class GeekyApricotKitten(QCAlgorithm): def Initialize(self): self.SetStartDate(2009, 12, 31) self.SetEndDate(2010, 6, 1) self.SetCash(100000) # init self.backtestSymbolsPerDay = {} self.current_universe = [] self.time = -1 self.Data = {} # PARAMETERS self.algo = "3ath" # algo type, can be "factor" or "3ath" or "ath" self.frequency = Resolution.Minute # Universe frequency self.portfolio_construction = "confidence" # Portfolio construction type; can be "equal", "confidence", "none" # 3ATH self.sl_percent = 0.2 # stop loss percent self.pt_percent = 0.2 # profit take percent # ALGO FRAMEWORK # universe self.UniverseSettings.Resolution = self.frequency self.UniverseSettings.Leverage = 1 self.AddUniverseSelection(ScheduledUniverseSelectionModel( self.DateRules.MonthStart(0), self.TimeRules.At(10, 0), self.SelectSymbols )) self.SetSecurityInitializer(self.CustomSecurityInitializer) if self.algo == "factor": # monthly rebalancing strategy self.AddAlpha(ConstantAlphaModel(InsightType.Price, InsightDirection.Up, timedelta(days = 30), 0.25, None)) self.Settings.RebalancePortfolioOnInsightChanges = False self.Settings.RebalancePortfolioOnSecurityChanges = True self.SetPortfolioConstruction( EqualWeightingPortfolioConstructionModel(lambda dt: None) ) # self.DateRules.MonthStart(0)) elif self.algo == "3ath" or self.algo == "ath": # alpha if self.algo == "3ath": self.AddAlpha(ATH3Alpha(self, 22, True, maximumDrawdownPercent = self.sl_percent, tpPercent = self.pt_percent)) elif self.algo == "ath": self.AddAlpha(ATHAlpha(self, 22, True, maximumDrawdownPercent = self.sl_percent, tpPercent = self.pt_percent)) # portfolio contrsuction self.Settings.RebalancePortfolioOnInsightChanges = True self.Settings.RebalancePortfolioOnSecurityChanges = False if self.portfolio_construction == "equal": self.SetPortfolioConstruction( EqualWeightingPortfolioConstructionModel(lambda dt: None) ) if self.portfolio_construction == "confidence": self.SetPortfolioConstruction(ConfidenceWeightedPortfolioConstructionModel(lambda dt: None, PortfolioBias.Long)) if self.portfolio_construction == "null": self.SetPortfolioConstruction( NullPortfolioConstructionModel() ) # execution self.SetExecution( ImmediateExecutionModel() ) # risk # self.AddRiskManagement(RiskManagement()) # warm up self.SetWarmUp(100) def CustomSecurityInitializer(self, security): '''Initialize the security with raw prices''' security.SetDataNormalizationMode(DataNormalizationMode.SplitAdjusted) def SelectSymbols(self, date): # handle live mode file format if self.LiveMode: # fetch the file from dropbox str = self.Download("https://www.dropbox.com/s/2l73mu97gcehmh7/daily-stock-picker-live.csv?dl=1") # if we have a file for today, return symbols, else leave universe unchanged self.current_universe = str.split(',') if len(str) > 0 else self.current_universe return self.current_universe # backtest - first cache the entire file if len(self.backtestSymbolsPerDay) == 0: # str = self.Download("https://www.dropbox.com/s/ae1couew5ir3z9y/daily-stock-picker-backtest.csv?dl=1", headers) str = self.Download("https://contentiobatch.blob.core.windows.net/qc-backtest/universe.csv") for line in str.splitlines(): data = line.split(',') self.backtestSymbolsPerDay[data[0]] = data[1:] # index = date.strftime("%Y%m%d") # self.current_universe = self.backtestSymbolsPerDay.get(index, self.current_universe) # [self.Debug(x) for x in self.current_universe] # self.current_universe = [Symbol.Create(x, SecurityType.Equity, Market.USA) for x in self.current_universe if x not in ["|", " "]] # return self.current_universe index = date.strftime("%Y%m%d") if index not in self.backtestSymbolsPerDay: return Universe.Unchanged tickers = self.backtestSymbolsPerDay[index] self.current_universe = [Symbol.Create(x, SecurityType.Equity, Market.USA) for x in tickers if x not in ["|", " "]] [self.Debug(f"{date}: {x}") for x in self.current_universe] return self.current_universe