Overall Statistics
Total Trades
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Net Profit
0%
Sharpe Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
-1.369
Tracking Error
0.119
Treynor Ratio
0
Total Fees
$0.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
from AlgorithmImports import *

import numpy as np
import pandas as pd
import statistics as stat
from scipy import stats
from datetime import timedelta


class ATH3Alpha(AlphaModel):
    
    def __init__(self, algorithm, rolling_window_size, add_linear_trend=True, maximumDrawdownPercent=0.3, tpPercent=0.5):
        # init
        self.rolling_window_size = rolling_window_size
        self.add_linear_trend = add_linear_trend
        self.Data = {}
        self.trailingHighs = dict()
        self.profitTakeData = { }

        # parameters
        self.maximumDrawdownPercent = -abs(maximumDrawdownPercent)
        self.tp_percent = abs(tpPercent)
        
        # add bond data
        # self.bond = algorithm.AddEquity("TLT", Resolution.Minute).Symbol
        
        # warm up
        algorithm.SetWarmUp(100, Resolution.Daily)

    def Update(self, algorithm, data):
        
        # Create empty list of insights
        insights = []
        
        # stop time is time after which we don't trade
        # stop_time = algorithm.Time.replace(hour=15, minute=29)
        # if algorithm.Time > stop_time:
        #     return insights

        # buy bond
        # if not algorithm.Portfolio[self.bond].Invested:
        #     insights.append(Insight.Price(self.bond, timedelta(days=999999), InsightDirection.Up, weight=0.1))

        # update windows
        for symbol in self.Data.keys():

            # check if there is the data for the symbol: if not continue    
            if not data.Bars.ContainsKey(symbol): continue
            if not data.ContainsKey(symbol): continue
            if data[symbol] is None: continue
            if not self.Data[symbol].close_window_day.IsReady: continue
            if not self.Data[symbol].high_window_day.IsReady: continue
            if not self.Data[symbol].high_window_month.IsReady: continue
            if not self.Data[symbol].high_window_week.IsReady: continue
            # if symbol == self.bond: continue
            # update rolling window; add new close price
            # if self.usevol:
            #     self.Data[symbol].update(data[symbol].Close, data[symbol].High, data[symbol].Volume)
            
            # util values
            close_current = data[symbol].Close
            rolling_month_high = max(list(self.Data[symbol].high_window_day)[1:self.Data[symbol].high_window_day.Count]) # max(high_list[7:self.Data[symbol].high_window.Count])
            week_high = self.Data[symbol].high_window_week[0]   # 0 or 1 ?
            month_high = self.Data[symbol].high_window_month[0] # 0 or 1 ?
            day_high = self.Data[symbol].high_window_day[0]     # 0 or 1 ?
            # if self.usevol:
            #     volume_growth = self.Data[symbol].volume_window[0] / (self.Data[symbol].volume_window[7] + 1) - 1
            
            # trading rule
            signal = close_current > week_high and close_current > month_high and close_current > day_high
            if not algorithm.Portfolio[symbol].Invested and signal: #  and volume_growth< 0
                
                # remove symbol from trailing stop object
                self.trailingHighs.pop(symbol, None)
                self.profitTakeData.pop(symbol, None)
                
                if self.add_linear_trend:
                    
                    # linear regression calculation
                    y = list(self.Data[symbol].close_window_day)[1:self.Data[symbol].close_window_day.Count]
                    x = np.arange(len(y))
                    y.reverse()
                    y = np.array(y)
                    log_y = np.log(y)
                    slope, intercept, r_value, p_value, std_err = stats.linregress(x, log_y)
                    # self.Debug(f"R value: {r_value}")
                    
                    if r_value < 0.3:
                        continue

                # standard deviation
                std_ = stat.stdev(list(self.Data[symbol].close_window_day)[1:self.Data[symbol].close_window_day.Count])
                # orders
                insights.append(Insight.Price(symbol, timedelta(days=999999), InsightDirection.Up, confidence=r_value))
                # algorithm.StopMarketOrder(symbol, -quantity, close_current - (std_ * self.stop_loss_factor))
                # algorithm.LimitOrder(symbol, -quantity, close_current + (std_ * self.profit_take_factor))
                
                    # We can move the whole TrailingStopRiskManagementModel here for checking and emitting flat insight
            
            ################ REMOVE SL/PT FOR NOW ################
            elif algorithm.Portfolio[symbol].Invested:
                # Add newly invested securities
                if symbol not in self.trailingHighs and symbol not in self.profitTakeData:
                    self.trailingHighs[symbol] = algorithm.Securities[symbol].Holdings.AveragePrice   # Set to average holding cost
                    self.profitTakeData[symbol] = algorithm.Securities[symbol].Holdings.AveragePrice   # Set to average holding cost
                    continue
                    
                # Check for new highs and update - set to tradebar high
                if self.trailingHighs[symbol] < data[symbol].High:
                    self.trailingHighs[symbol] = data[symbol].High
                    continue
               
                # Check for securities past the drawdown limit
                securityHigh = self.trailingHighs[symbol]
                drawdown = (data[symbol].Low / securityHigh) - 1
                    
                if drawdown < self.maximumDrawdownPercent:
                    # liquidate
                    insights.append(Insight.Price(symbol, Expiry.EndOfMonth, InsightDirection.Flat, confidence=1))
                    
                # profit take
                profit_percent = data[symbol].High / self.profitTakeData[symbol] - 1
                if profit_percent >= self.tp_percent:
                    # liquidate
                    insights.append(Insight.Price(symbol, Expiry.EndOfMonth, InsightDirection.Flat, confidence=1))
            ################ REMOVE SL/PT FOR NOW ################
        
        ################ REMOVE SL/PT FOR NOW ################    
        # for kvp in algorithm.Securities:
        #     symbol = kvp.Key
        #     security = kvp.Value

        #     # Remove if not invested
        #     if not security.Invested:
        #         self.trailingHighs.pop(symbol, None)
        #         continue

        #     # Add newly invested securities
        #     if symbol not in self.trailingHighs:
        #         self.trailingHighs[symbol] = security.Holdings.AveragePrice   # Set to average holding cost
        #         continue

        #     # Check for new highs and update - set to tradebar high
        #     if self.trailingHighs[symbol] < security.High:
        #         self.trailingHighs[symbol] = security.High
        #         continue

        #     # Check for securities past the drawdown limit
        #     securityHigh = self.trailingHighs[symbol]
        #     drawdown = (security.Low / securityHigh) - 1

        #     if drawdown < self.maximumDrawdownPercent:
        #         # liquidate
        #         insights.append(Insight.Price(symbol, Expiry.EndOfMonth, InsightDirection.Flat))
        ################ REMOVE SL/PT FOR NOW ################    
                    
        return insights                  

    def OnSecuritiesChanged(self, algorithm, changes):
        # action on added securities
        for security in changes.AddedSecurities:
            symbol = security.Symbol
            if symbol not in self.Data:
                self.Data[symbol] = SymbolData(algorithm, symbol)

        # action on removed securities
        for security in changes.RemovedSecurities:
            symbol = security.Symbol
            if symbol in self.Data:
                if self.Data.pop(symbol, None) is not None:
                    algorithm.Liquidate(symbol, 'Removed from universe')


class SymbolData(object):
    def __init__(self, algorithm, symbol):
        self.symbol = symbol
        self.algorithm = algorithm
        
        # define windows
        self.close_window_day = RollingWindow[float](22)
        self.high_window_day = RollingWindow[float](22)
        self.volume_window_day = RollingWindow[float](22)
        self.high_window_week = RollingWindow[float](2)
        self.high_window_month = RollingWindow[float](2)
          
        # consolidators
        # day
        # DayConsolidator = TradeBarConsolidator(timedelta(days=1))
        # DayConsolidator.DataConsolidated += self.OnDataConsolidated
        # self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, DayConsolidator)
        # # week
        # WeekConsolidator = TradeBarConsolidator(timedelta(weeks=1))
        # WeekConsolidator.DataConsolidated += self.OnDataConsolidatedWeek
        # self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, WeekConsolidator)
        # # month
        # MonthConsolidator = TradeBarConsolidator(timedelta(weeks=1))
        # MonthConsolidator.DataConsolidated += self.OnDataConsolidatedMonth
        # self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, MonthConsolidator)
        # old
        self.algorithm.Consolidate(self.symbol, Resolution.Daily, self.OnDataConsolidated)
        self.algorithm.Consolidate(self.symbol, Calendar.Weekly, self.OnDataConsolidatedWeek)
        self.algorithm.Consolidate(self.symbol, Calendar.Monthly, self.OnDataConsolidatedMonth)

        # warm up all windows
        history = self.algorithm.History([self.symbol], 22, Resolution.Daily)
        if history.shape[0] == 0:
            self.algorithm.Log('DataFrame is empty!')
            return
        else:
            for time, row in history.loc[self.symbol].iterrows():
                self.close_window_day.Add(row["close"])
                self.high_window_day.Add(row["high"])
                self.volume_window_day.Add(row["volume"])
        
    def update(self, close, high, volume):
        self.volume_window.Add(volume)

    def OnDataConsolidated(self, consolidated):
        self.high_window_day.Add(consolidated.High)
        self.close_window_day.Add(consolidated.Close)
        self.volume_window_day.Add(consolidated.Volume)

    def OnDataConsolidatedWeek(self, consolidated):
        self.high_window_week.Add(consolidated.High)
        
    def OnDataConsolidatedMonth(self, consolidated):
        self.high_window_month.Add(consolidated.High)






#region imports
# from AlgorithmImports import *
#endregion
# #Imports
# from itertools import groupby
# from datetime import datetime, timedelta
# from pytz import utc

# from clr import AddReference
# AddReference("System")
# AddReference("QuantConnect.Common")
# AddReference("QuantConnect.Algorithm")
# AddReference("QuantConnect.Algorithm.Framework")

# from QuantConnect import Resolution, Extensions
# from QuantConnect.Algorithm.Framework.Portfolio import *
# from QuantConnect.Algorithm.Framework.Risk import *


# #Global variables
# Zero = int(0)

# class RiskManagement(RiskManagementModel):

#     def __init__(self):
#         '''
#         Initialization variables
        
#         '''
        
#         # Long Position Variables
#         self.LongTrail = {}
#         self.LongTrailingDrawdown = float(0.1)
        
#         # Short Position Variables
#         self.ShortTrail = {}
#         self.ShortTrailingDrawdown = float(0.1)


#     def ManageRisk(self, algorithm, targets):
#         '''
#         Main risk management handler. Passes algorithm and targets
        
#         '''
        
#         RiskAdjustedTargets = []
        
#         for asset in self.LongTrail:
#             if not algorithm.Portfolio[asset].Invested:
#                 self.LongTrail[asset] = [algorithm.Securities[asset].Price, 0]
            
#         # for asset in self.ShortTrail:
#         #     if not algorithm.Portfolio[asset].Invested:
#         #         self.ShortTrail[asset] = [algorithm.Securities[asset].Price, 0]
        
#         invested = [x.Key for x in algorithm.Portfolio if x.Value.Invested]
    
#         if invested:
#             for asset in invested:
                
#                 if algorithm.Portfolio[asset].IsLong:
#                     if asset not in self.LongTrail or self.LongTrail[asset][1] == 0:
#                         self.LongTrail[asset] = [algorithm.Portfolio[asset].Price, algorithm.Portfolio[asset].Quantity]
                    
#                 # elif algorithm.Portfolio[asset].IsShort:
#                 #     if asset not in self.ShortTrail or self.ShortTrail[asset][1] == 0:
#                 #         self.ShortTrail[asset] = [algorithm.Portfolio[asset].Price, algorithm.Portfolio[asset].Quantity]

#                 self.TrailingStop(algorithm, asset, RiskAdjustedTargets)
            
#         return RiskAdjustedTargets
        
        
#     def TrailingStop(self, algorithm, asset, RiskAdjustedTargets):
#         '''
#         Manages trailing stop for both long and short assets respectively
        
#         '''
    
#         if algorithm.Portfolio[asset].IsLong:
            
#             if algorithm.Portfolio[asset].Price > self.LongTrail[asset][0]:
#                 self.LongTrail[asset][0] = algorithm.Portfolio[asset].Price
               
#             elif algorithm.Portfolio[asset].Price / self.LongTrail[asset][0] < (1-self.LongTrailingDrawdown):
#                 RiskAdjustedTargets.append(PortfolioTarget(asset, 0))
#                 algorithm.Debug(f'Long trailing Stop Triggered for {asset}.  Current Price: {algorithm.Portfolio[asset].Price} | Highest Price: {self.LongTrail[asset][0]} | Loss: {algorithm.Portfolio[asset].Price / self.LongTrail[asset][0]} | Date: {algorithm.Time}')
#                 self.LongTrail.pop(asset)
                
        
#         # if algorithm.Portfolio[asset].IsShort:
            
#         #     if algorithm.Portfolio[asset].Price < self.ShortTrail[asset][0]:
#         #         self.ShortTrail[asset][0] = algorithm.Portfolio[asset].Price
               
#         #     elif algorithm.Portfolio[asset].Price / self.ShortTrail[asset][0] > 1 / (1-self.ShortTrailingDrawdown):
#         #         RiskAdjustedTargets.append(PortfolioTarget(asset, 0))
#         #         algorithm.Debug(f'Short trailing Stop Triggered for {asset}. Current Price: {algorithm.Portfolio[asset].Price} | Lowest Price: {self.ShortTrail[asset][0]} | Loss: {algorithm.Portfolio[asset].Price / self.ShortTrail[asset][0]} | Date: {algorithm.Time}')
#         #         self.ShortTrail.pop(asset)
        
#         return RiskAdjustedTargets
    
#     def ProfitTake(self, algorithm, asset, RiskAdjustedTargets):
#         '''
#         Manages profit take for long assets
                
#         '''
#         if algorithm.Portfolio[asset].IsLong:
#             if algorithm.Portfolio[asset].Price > self.LongTrail[asset][0]:
                
#         return
        
from AlgorithmImports import *
# from ConfidenceWeightedPortfolioConstructionModel import ConfidenceWeightedPortfolioConstructionModel
# Algo framework modules
from ATH3Alpha import ATH3Alpha
from datetime import timedelta


class GeekyApricotKitten(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2010, 1, 1)
        self.SetEndDate(2010, 5, 1)
        self.SetCash(100000)

        # init
        self.backtestSymbolsPerDay = {}
        self.current_universe = []
        self.time = -1
        self.Data = {}
        
        # PARAMETERS
        self.algo = "3ath"                  # algo type, can be "factor" or "3ATH"
        self.frequency = Resolution.Hour    # Universe frequency
        # 3ATH
        self.sl_percent = 0.05    # stop loss percent
        self.pt_percent = 0.05    # profit take percent

        # ALGO FRAMEWORK
        # universe
        self.UniverseSettings.Resolution = self.frequency
        self.UniverseSettings.Leverage = 1
        self.AddUniverseSelection(ScheduledUniverseSelectionModel(
            self.DateRules.MonthStart(0),
            self.TimeRules.At(10, 0),
            self.SelectSymbols
        ))
        self.SetSecurityInitializer(self.CustomSecurityInitializer)
        
        if self.algo == "factor":
            # monthly rebalancing strategy
            self.AddAlpha(ConstantAlphaModel(InsightType.Price, InsightDirection.Up, timedelta(days = 30), 0.25, None))
            self.Settings.RebalancePortfolioOnInsightChanges = False
            self.Settings.RebalancePortfolioOnSecurityChanges = True
            self.SetPortfolioConstruction( EqualWeightingPortfolioConstructionModel(lambda dt: None) ) # self.DateRules.MonthStart(0))
        elif self.algo == "3ath":
            # alpha
            # self.AddAlpha(ConstantAlphaModel(InsightType.Price, InsightDirection.Up, timedelta(minutes = 50), 0.025, None))
            self.AddAlpha(ATH3Alpha(self, 22, True, maximumDrawdownPercent = self.sl_percent, tpPercent = self.pt_percent))
            # portfolio contrsuction
            self.Settings.RebalancePortfolioOnInsightChanges = True
            self.Settings.RebalancePortfolioOnSecurityChanges = False
            # self.SetPortfolioConstruction( EqualWeightingPortfolioConstructionModel(lambda dt: None) )
            # self.SetPortfolioConstruction( AccumulativeInsightPortfolioConstructionModel(percent = 0.05))
            # self.SetPortfolioConstruction(ConfidenceWeightedPortfolioConstructionModel(lambda dt: None, PortfolioBias.Long))
            self.SetPortfolioConstruction( NullPortfolioConstructionModel() ) 
            # execution
            self.SetExecution( ImmediateExecutionModel() )
            # risk
            # self.AddRiskManagement(RiskManagement())
        
        # warm up
        self.SetWarmUp(100)

    def CustomSecurityInitializer(self, security):
        '''Initialize the security with raw prices'''
        security.SetDataNormalizationMode(DataNormalizationMode.SplitAdjusted)
        
    def SelectSymbols(self, date):
        # handle live mode file format
        if self.LiveMode:
            # fetch the file from dropbox
            str = self.Download("https://www.dropbox.com/s/2l73mu97gcehmh7/daily-stock-picker-live.csv?dl=1")
            # if we have a file for today, return symbols, else leave universe unchanged
            self.current_universe = str.split(',') if len(str) > 0 else self.current_universe
            return self.current_universe

        # backtest - first cache the entire file
        if len(self.backtestSymbolsPerDay) == 0:

            # str = self.Download("https://www.dropbox.com/s/ae1couew5ir3z9y/daily-stock-picker-backtest.csv?dl=1", headers)
            str = self.Download("https://contentiobatch.blob.core.windows.net/qc-backtest/universe.csv")
            for line in str.splitlines():
                data = line.split(',')
                self.backtestSymbolsPerDay[data[0]] = data[1:]

        # index = date.strftime("%Y%m%d")
        # self.current_universe = self.backtestSymbolsPerDay.get(index, self.current_universe)
        # [self.Debug(x) for x in self.current_universe]
        # self.current_universe = [Symbol.Create(x, SecurityType.Equity, Market.USA) for x in self.current_universe if x not in ["|", " "]]
        
        # return self.current_universe
        
        index = date.strftime("%Y%m%d")
        if index not in self.backtestSymbolsPerDay:
            return Universe.Unchanged
        tickers = self.backtestSymbolsPerDay[index]
        self.current_universe = [Symbol.Create(x, SecurityType.Equity, Market.USA) for x in tickers if x not in ["|", " "]]
        [self.Debug(f"{date}: {x}") for x in self.current_universe]
       
        return self.current_universe