Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -1.369 Tracking Error 0.119 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset |
from AlgorithmImports import * import numpy as np import pandas as pd import statistics as stat from scipy import stats from datetime import timedelta class ATH3Alpha(AlphaModel): def __init__(self, algorithm, rolling_window_size, add_linear_trend=True, maximumDrawdownPercent=0.3, tpPercent=0.5): # init self.rolling_window_size = rolling_window_size self.add_linear_trend = add_linear_trend self.Data = {} self.trailingHighs = dict() self.profitTakeData = { } # parameters self.maximumDrawdownPercent = -abs(maximumDrawdownPercent) self.tp_percent = abs(tpPercent) # add bond data # self.bond = algorithm.AddEquity("TLT", Resolution.Minute).Symbol # warm up algorithm.SetWarmUp(100, Resolution.Daily) def Update(self, algorithm, data): # Create empty list of insights insights = [] # stop time is time after which we don't trade # stop_time = algorithm.Time.replace(hour=15, minute=29) # if algorithm.Time > stop_time: # return insights # buy bond # if not algorithm.Portfolio[self.bond].Invested: # insights.append(Insight.Price(self.bond, timedelta(days=999999), InsightDirection.Up, weight=0.1)) # update windows for symbol in self.Data.keys(): # check if there is the data for the symbol: if not continue if not data.Bars.ContainsKey(symbol): continue if not data.ContainsKey(symbol): continue if data[symbol] is None: continue if not self.Data[symbol].close_window_day.IsReady: continue if not self.Data[symbol].high_window_day.IsReady: continue if not self.Data[symbol].high_window_month.IsReady: continue if not self.Data[symbol].high_window_week.IsReady: continue # if symbol == self.bond: continue # update rolling window; add new close price # if self.usevol: # self.Data[symbol].update(data[symbol].Close, data[symbol].High, data[symbol].Volume) # util values close_current = data[symbol].Close rolling_month_high = max(list(self.Data[symbol].high_window_day)[1:self.Data[symbol].high_window_day.Count]) # max(high_list[7:self.Data[symbol].high_window.Count]) week_high = self.Data[symbol].high_window_week[0] # 0 or 1 ? month_high = self.Data[symbol].high_window_month[0] # 0 or 1 ? day_high = self.Data[symbol].high_window_day[0] # 0 or 1 ? # if self.usevol: # volume_growth = self.Data[symbol].volume_window[0] / (self.Data[symbol].volume_window[7] + 1) - 1 # trading rule signal = close_current > week_high and close_current > month_high and close_current > day_high if not algorithm.Portfolio[symbol].Invested and signal: # and volume_growth< 0 # remove symbol from trailing stop object self.trailingHighs.pop(symbol, None) self.profitTakeData.pop(symbol, None) if self.add_linear_trend: # linear regression calculation y = list(self.Data[symbol].close_window_day)[1:self.Data[symbol].close_window_day.Count] x = np.arange(len(y)) y.reverse() y = np.array(y) log_y = np.log(y) slope, intercept, r_value, p_value, std_err = stats.linregress(x, log_y) # self.Debug(f"R value: {r_value}") if r_value < 0.3: continue # standard deviation std_ = stat.stdev(list(self.Data[symbol].close_window_day)[1:self.Data[symbol].close_window_day.Count]) # orders insights.append(Insight.Price(symbol, timedelta(days=999999), InsightDirection.Up, confidence=r_value)) # algorithm.StopMarketOrder(symbol, -quantity, close_current - (std_ * self.stop_loss_factor)) # algorithm.LimitOrder(symbol, -quantity, close_current + (std_ * self.profit_take_factor)) # We can move the whole TrailingStopRiskManagementModel here for checking and emitting flat insight ################ REMOVE SL/PT FOR NOW ################ elif algorithm.Portfolio[symbol].Invested: # Add newly invested securities if symbol not in self.trailingHighs and symbol not in self.profitTakeData: self.trailingHighs[symbol] = algorithm.Securities[symbol].Holdings.AveragePrice # Set to average holding cost self.profitTakeData[symbol] = algorithm.Securities[symbol].Holdings.AveragePrice # Set to average holding cost continue # Check for new highs and update - set to tradebar high if self.trailingHighs[symbol] < data[symbol].High: self.trailingHighs[symbol] = data[symbol].High continue # Check for securities past the drawdown limit securityHigh = self.trailingHighs[symbol] drawdown = (data[symbol].Low / securityHigh) - 1 if drawdown < self.maximumDrawdownPercent: # liquidate insights.append(Insight.Price(symbol, Expiry.EndOfMonth, InsightDirection.Flat, confidence=1)) # profit take profit_percent = data[symbol].High / self.profitTakeData[symbol] - 1 if profit_percent >= self.tp_percent: # liquidate insights.append(Insight.Price(symbol, Expiry.EndOfMonth, InsightDirection.Flat, confidence=1)) ################ REMOVE SL/PT FOR NOW ################ ################ REMOVE SL/PT FOR NOW ################ # for kvp in algorithm.Securities: # symbol = kvp.Key # security = kvp.Value # # Remove if not invested # if not security.Invested: # self.trailingHighs.pop(symbol, None) # continue # # Add newly invested securities # if symbol not in self.trailingHighs: # self.trailingHighs[symbol] = security.Holdings.AveragePrice # Set to average holding cost # continue # # Check for new highs and update - set to tradebar high # if self.trailingHighs[symbol] < security.High: # self.trailingHighs[symbol] = security.High # continue # # Check for securities past the drawdown limit # securityHigh = self.trailingHighs[symbol] # drawdown = (security.Low / securityHigh) - 1 # if drawdown < self.maximumDrawdownPercent: # # liquidate # insights.append(Insight.Price(symbol, Expiry.EndOfMonth, InsightDirection.Flat)) ################ REMOVE SL/PT FOR NOW ################ return insights def OnSecuritiesChanged(self, algorithm, changes): # action on added securities for security in changes.AddedSecurities: symbol = security.Symbol if symbol not in self.Data: self.Data[symbol] = SymbolData(algorithm, symbol) # action on removed securities for security in changes.RemovedSecurities: symbol = security.Symbol if symbol in self.Data: if self.Data.pop(symbol, None) is not None: algorithm.Liquidate(symbol, 'Removed from universe') class SymbolData(object): def __init__(self, algorithm, symbol): self.symbol = symbol self.algorithm = algorithm # define windows self.close_window_day = RollingWindow[float](22) self.high_window_day = RollingWindow[float](22) self.volume_window_day = RollingWindow[float](22) self.high_window_week = RollingWindow[float](2) self.high_window_month = RollingWindow[float](2) # consolidators # day # DayConsolidator = TradeBarConsolidator(timedelta(days=1)) # DayConsolidator.DataConsolidated += self.OnDataConsolidated # self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, DayConsolidator) # # week # WeekConsolidator = TradeBarConsolidator(timedelta(weeks=1)) # WeekConsolidator.DataConsolidated += self.OnDataConsolidatedWeek # self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, WeekConsolidator) # # month # MonthConsolidator = TradeBarConsolidator(timedelta(weeks=1)) # MonthConsolidator.DataConsolidated += self.OnDataConsolidatedMonth # self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, MonthConsolidator) # old self.algorithm.Consolidate(self.symbol, Resolution.Daily, self.OnDataConsolidated) self.algorithm.Consolidate(self.symbol, Calendar.Weekly, self.OnDataConsolidatedWeek) self.algorithm.Consolidate(self.symbol, Calendar.Monthly, self.OnDataConsolidatedMonth) # warm up all windows history = self.algorithm.History([self.symbol], 22, Resolution.Daily) if history.shape[0] == 0: self.algorithm.Log('DataFrame is empty!') return else: for time, row in history.loc[self.symbol].iterrows(): self.close_window_day.Add(row["close"]) self.high_window_day.Add(row["high"]) self.volume_window_day.Add(row["volume"]) def update(self, close, high, volume): self.volume_window.Add(volume) def OnDataConsolidated(self, consolidated): self.high_window_day.Add(consolidated.High) self.close_window_day.Add(consolidated.Close) self.volume_window_day.Add(consolidated.Volume) def OnDataConsolidatedWeek(self, consolidated): self.high_window_week.Add(consolidated.High) def OnDataConsolidatedMonth(self, consolidated): self.high_window_month.Add(consolidated.High)
#region imports # from AlgorithmImports import * #endregion # #Imports # from itertools import groupby # from datetime import datetime, timedelta # from pytz import utc # from clr import AddReference # AddReference("System") # AddReference("QuantConnect.Common") # AddReference("QuantConnect.Algorithm") # AddReference("QuantConnect.Algorithm.Framework") # from QuantConnect import Resolution, Extensions # from QuantConnect.Algorithm.Framework.Portfolio import * # from QuantConnect.Algorithm.Framework.Risk import * # #Global variables # Zero = int(0) # class RiskManagement(RiskManagementModel): # def __init__(self): # ''' # Initialization variables # ''' # # Long Position Variables # self.LongTrail = {} # self.LongTrailingDrawdown = float(0.1) # # Short Position Variables # self.ShortTrail = {} # self.ShortTrailingDrawdown = float(0.1) # def ManageRisk(self, algorithm, targets): # ''' # Main risk management handler. Passes algorithm and targets # ''' # RiskAdjustedTargets = [] # for asset in self.LongTrail: # if not algorithm.Portfolio[asset].Invested: # self.LongTrail[asset] = [algorithm.Securities[asset].Price, 0] # # for asset in self.ShortTrail: # # if not algorithm.Portfolio[asset].Invested: # # self.ShortTrail[asset] = [algorithm.Securities[asset].Price, 0] # invested = [x.Key for x in algorithm.Portfolio if x.Value.Invested] # if invested: # for asset in invested: # if algorithm.Portfolio[asset].IsLong: # if asset not in self.LongTrail or self.LongTrail[asset][1] == 0: # self.LongTrail[asset] = [algorithm.Portfolio[asset].Price, algorithm.Portfolio[asset].Quantity] # # elif algorithm.Portfolio[asset].IsShort: # # if asset not in self.ShortTrail or self.ShortTrail[asset][1] == 0: # # self.ShortTrail[asset] = [algorithm.Portfolio[asset].Price, algorithm.Portfolio[asset].Quantity] # self.TrailingStop(algorithm, asset, RiskAdjustedTargets) # return RiskAdjustedTargets # def TrailingStop(self, algorithm, asset, RiskAdjustedTargets): # ''' # Manages trailing stop for both long and short assets respectively # ''' # if algorithm.Portfolio[asset].IsLong: # if algorithm.Portfolio[asset].Price > self.LongTrail[asset][0]: # self.LongTrail[asset][0] = algorithm.Portfolio[asset].Price # elif algorithm.Portfolio[asset].Price / self.LongTrail[asset][0] < (1-self.LongTrailingDrawdown): # RiskAdjustedTargets.append(PortfolioTarget(asset, 0)) # algorithm.Debug(f'Long trailing Stop Triggered for {asset}. Current Price: {algorithm.Portfolio[asset].Price} | Highest Price: {self.LongTrail[asset][0]} | Loss: {algorithm.Portfolio[asset].Price / self.LongTrail[asset][0]} | Date: {algorithm.Time}') # self.LongTrail.pop(asset) # # if algorithm.Portfolio[asset].IsShort: # # if algorithm.Portfolio[asset].Price < self.ShortTrail[asset][0]: # # self.ShortTrail[asset][0] = algorithm.Portfolio[asset].Price # # elif algorithm.Portfolio[asset].Price / self.ShortTrail[asset][0] > 1 / (1-self.ShortTrailingDrawdown): # # RiskAdjustedTargets.append(PortfolioTarget(asset, 0)) # # algorithm.Debug(f'Short trailing Stop Triggered for {asset}. Current Price: {algorithm.Portfolio[asset].Price} | Lowest Price: {self.ShortTrail[asset][0]} | Loss: {algorithm.Portfolio[asset].Price / self.ShortTrail[asset][0]} | Date: {algorithm.Time}') # # self.ShortTrail.pop(asset) # return RiskAdjustedTargets # def ProfitTake(self, algorithm, asset, RiskAdjustedTargets): # ''' # Manages profit take for long assets # ''' # if algorithm.Portfolio[asset].IsLong: # if algorithm.Portfolio[asset].Price > self.LongTrail[asset][0]: # return
from AlgorithmImports import * # from ConfidenceWeightedPortfolioConstructionModel import ConfidenceWeightedPortfolioConstructionModel # Algo framework modules from ATH3Alpha import ATH3Alpha from datetime import timedelta class GeekyApricotKitten(QCAlgorithm): def Initialize(self): self.SetStartDate(2010, 1, 1) self.SetEndDate(2010, 5, 1) self.SetCash(100000) # init self.backtestSymbolsPerDay = {} self.current_universe = [] self.time = -1 self.Data = {} # PARAMETERS self.algo = "3ath" # algo type, can be "factor" or "3ATH" self.frequency = Resolution.Hour # Universe frequency # 3ATH self.sl_percent = 0.05 # stop loss percent self.pt_percent = 0.05 # profit take percent # ALGO FRAMEWORK # universe self.UniverseSettings.Resolution = self.frequency self.UniverseSettings.Leverage = 1 self.AddUniverseSelection(ScheduledUniverseSelectionModel( self.DateRules.MonthStart(0), self.TimeRules.At(10, 0), self.SelectSymbols )) self.SetSecurityInitializer(self.CustomSecurityInitializer) if self.algo == "factor": # monthly rebalancing strategy self.AddAlpha(ConstantAlphaModel(InsightType.Price, InsightDirection.Up, timedelta(days = 30), 0.25, None)) self.Settings.RebalancePortfolioOnInsightChanges = False self.Settings.RebalancePortfolioOnSecurityChanges = True self.SetPortfolioConstruction( EqualWeightingPortfolioConstructionModel(lambda dt: None) ) # self.DateRules.MonthStart(0)) elif self.algo == "3ath": # alpha # self.AddAlpha(ConstantAlphaModel(InsightType.Price, InsightDirection.Up, timedelta(minutes = 50), 0.025, None)) self.AddAlpha(ATH3Alpha(self, 22, True, maximumDrawdownPercent = self.sl_percent, tpPercent = self.pt_percent)) # portfolio contrsuction self.Settings.RebalancePortfolioOnInsightChanges = True self.Settings.RebalancePortfolioOnSecurityChanges = False # self.SetPortfolioConstruction( EqualWeightingPortfolioConstructionModel(lambda dt: None) ) # self.SetPortfolioConstruction( AccumulativeInsightPortfolioConstructionModel(percent = 0.05)) # self.SetPortfolioConstruction(ConfidenceWeightedPortfolioConstructionModel(lambda dt: None, PortfolioBias.Long)) self.SetPortfolioConstruction( NullPortfolioConstructionModel() ) # execution self.SetExecution( ImmediateExecutionModel() ) # risk # self.AddRiskManagement(RiskManagement()) # warm up self.SetWarmUp(100) def CustomSecurityInitializer(self, security): '''Initialize the security with raw prices''' security.SetDataNormalizationMode(DataNormalizationMode.SplitAdjusted) def SelectSymbols(self, date): # handle live mode file format if self.LiveMode: # fetch the file from dropbox str = self.Download("https://www.dropbox.com/s/2l73mu97gcehmh7/daily-stock-picker-live.csv?dl=1") # if we have a file for today, return symbols, else leave universe unchanged self.current_universe = str.split(',') if len(str) > 0 else self.current_universe return self.current_universe # backtest - first cache the entire file if len(self.backtestSymbolsPerDay) == 0: # str = self.Download("https://www.dropbox.com/s/ae1couew5ir3z9y/daily-stock-picker-backtest.csv?dl=1", headers) str = self.Download("https://contentiobatch.blob.core.windows.net/qc-backtest/universe.csv") for line in str.splitlines(): data = line.split(',') self.backtestSymbolsPerDay[data[0]] = data[1:] # index = date.strftime("%Y%m%d") # self.current_universe = self.backtestSymbolsPerDay.get(index, self.current_universe) # [self.Debug(x) for x in self.current_universe] # self.current_universe = [Symbol.Create(x, SecurityType.Equity, Market.USA) for x in self.current_universe if x not in ["|", " "]] # return self.current_universe index = date.strftime("%Y%m%d") if index not in self.backtestSymbolsPerDay: return Universe.Unchanged tickers = self.backtestSymbolsPerDay[index] self.current_universe = [Symbol.Create(x, SecurityType.Equity, Market.USA) for x in tickers if x not in ["|", " "]] [self.Debug(f"{date}: {x}") for x in self.current_universe] return self.current_universe