Overall Statistics
Total Trades
3732
Average Win
0.21%
Average Loss
-0.39%
Compounding Annual Return
108.706%
Drawdown
38.000%
Expectancy
0.150
Net Profit
188.212%
Sharpe Ratio
1.892
Probabilistic Sharpe Ratio
71.914%
Loss Rate
26%
Win Rate
74%
Profit-Loss Ratio
0.55
Alpha
0.916
Beta
-0.503
Annual Standard Deviation
0.435
Annual Variance
0.189
Information Ratio
1.153
Tracking Error
0.552
Treynor Ratio
-1.634
Total Fees
$18945.03
Estimated Strategy Capacity
$3700000.00
Lowest Capacity Asset
FD R735QTJ8XC9X
from Execution.ImmediateExecutionModel import ImmediateExecutionModel
from Portfolio.EqualWeightingPortfolioConstructionModel import EqualWeightingPortfolioConstructionModel
from Risk.MaximumDrawdownPercentPerSecurity import MaximumDrawdownPercentPerSecurity
import operator
from datetime import timedelta, time

class DeterminedBlueSheep(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2020, 1, 1)  # Set Start Date
        self.SetCash(100000)  # Set Strategy Cash
        # self.AddEquity("SPY", Resolution.Minute)
        
        ################################### PARAMETRI ######################################################
        #Lista di avvio
        self.initialList = ["AMD", "TSLA", "M", "MSFT"]
        
        self.entryZscore = 0 #percentuale di sicurezza rispetto alla deviazione standard del periodo (1 = 100%)
       
        self.myStdDays = 90 #periodo di calcolo della deviazione standard mobile
        
        self.fastAverage = 5 #finestra in giorni per la media mobile veloce
        
        self.slowAverage = 30 #finestra in giorni per la media mobile lenta
        
        self.concurrentEquities = 10 #al massimo quanti titoli gestire assieme
        
        self.insightsDaysDuration = 100 #per quanti giorni dura al massimo un insight
        ####################################################################################################
        
        self.UniverseSettings.Resolution = Resolution.Hour
        
        self.SetExecution(ImmediateExecutionModel())

        self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())

        self.SetRiskManagement(MaximumDrawdownPercentPerSecurity(0.01))
        
        #dizionario con gli indicatori e dati consolidati per ogni simbolo
        self.symDict = { };
        
        #subscribe to every ticker on the list before starting the filter
        for ticker in self.initialList:
            self.AddEquity(ticker, Resolution.Hour).Symbol
            self.symDict[ticker] = SymbolData(self, ticker, self.entryZscore, self.myStdDays, self.fastAverage, self.slowAverage)
            
        # per ora lo scheduling e il filtro grosso/fine non possono funzionare assieme https://github.com/QuantConnect/Lean/issues/3890
        self.SetUniverseSelection(ScheduledUniverseSelectionModel(
            self.DateRules.EveryDay(),
            self.TimeRules.At(10,1),
            self.SelectSymbols
        ))
        
        #lista dei simboli da tradare
        self.symbols = []
        
        self.AddAlpha(SimpleGapAlphaModel(self)) #AlphaModel needs the dictionary
        
        self.SetWarmUp(max([self.myStdDays, self.fastAverage, self.slowAverage])+ 1, Resolution.Daily)
        
        
        
    # Scheduled Universe Construction    
    
    def SelectSymbols(self, dateTime):
        
        createdSymbols = []
        
        # How the Universe is changing based on the list
        
        for sym in self.initialList:
            
            if self.symDict[sym].gapIndicator != 0 and sym not in self.symbols:
                
                self.symbols.append(sym)
                
            elif self.symDict[sym].gapIndicator == 0 and sym in self.symbols:
                
                self.symbols.remove(sym)
                
        subset = {key: self.symDict[key] for key in self.symbols}
        
        # This should sort The dictionary by gapLevel, so we can use the strongest gaps
        
        self.symbols = [sym.symbol for sym in (sorted(subset.values(), key=operator.attrgetter('gapLevel')))]
        
        
        if len(self.symbols) > self.concurrentEquities:
            
            del self.symbols[self.concurrentEquities:]
            
        for sym in self.symbols:
            
            createdSymbols.append(Symbol.Create(sym, SecurityType.Equity, Market.USA))
        
        return createdSymbols
        


    # I need to get Real Time data for some indicators

    def OnData(self, data):
        '''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
            Arguments:
                data: Slice object keyed by symbol containing the stock data
        '''
        split = {}
        dividend = {}
        if data.Splits.Count > 0:
            self.Debug(f"Split")
            for symbol, symbol_data in self.symDict.items():
                if data.Splits.ContainsKey(symbol):
                    split[symbol] = data.Splits[symbol].SplitFactor
                    self.Debug(f"Split for {symbol}; data.ContainsKey {data.ContainsKey(symbol)}; data[symbol] {data[symbol] is not None}")
        
        if data.Dividends.Count > 0:
            self.Debug(f"Dividend")
            for symbol, symbol_data in self.symDict.items():
                if data.Dividends.ContainsKey(symbol):
                    dividend[symbol] = data.Dividends[symbol].Distribution
                    self.Debug(f"Dividend for {symbol}; data.ContainsKey {data.ContainsKey(symbol)}; data[symbol] {data[symbol] is not None}")
                
        for symbol, symbol_data in self.symDict.items():
            self.symDict[symbol].Update1(split.get(symbol,None),dividend.get(symbol,None))
            
        if not (data is not None and data.Time.time() <= time(10,0)):
            return
            
        for symbol, symbol_data in self.symDict.items():
            if data.ContainsKey(symbol) and data[symbol] is not None and symbol_data.IsReady:
                if data.Splits.Count > 0 or data.Dividends.Count > 0:
                    self.Quit("Found it!")
                
                split_factor = None
                if data.Splits.ContainsKey(symbol):
                    split_factor = data.Splits[symbol].SplitFactor
                
                dividend_distribution = None
                if data.Dividends.ContainsKey(symbol):
                    dividend_distribution = data.Dividends[symbol].Distribution
                
                symbol_data.Update(data[symbol], split_factor, dividend_distribution)
                    
        if self.IsWarmingUp:
            return

        num_bars = 0            
        for symbol, symbol_data in self.symDict.items():
            if data.ContainsKey(symbol) and data[symbol] is not None:
                num_bars += 1
            if not symbol_data.IsReady:
                self.Quit('Not Ready!')
            
        if num_bars != 4:
            self.Debug(f'{num_bars} bars at {self.Time}. Splits: {data.Splits.Count}; Dividends: {data.Dividends.Count}')
            
        #for ticker in self.initialList:
            
        #    if data is None or (not data.Bars.ContainsKey(ticker)) or (not self.symDict[ticker]._retStd.IsReady): pass
            
        #    else:
            
        #        myTradeBar = data.Bars[ticker]
                
                # Check if it Gapes Up or Down
        #        self.Debug(f"Assert:{myTradeBar.Open >= self.symDict[ticker].daily_bar.High or myTradeBar.Open <= self.symDict[ticker].daily_bar.Low} - {self.Time.time()} - {ticker} ready? {self.symDict[ticker]._retStd.Current.Value} - prev Hi:{self.symDict[ticker].daily_bar.High} prev Lo:{self.symDict[ticker].daily_bar.Low} Op bar:{myTradeBar.Open} ")
                
        #        if myTradeBar.Open >= self.symDict[ticker].daily_bar.High*(1+self.symDict[ticker].entryZscore*self.symDict[ticker]._retStd.Current.Value):
                    
        #            self.symDict[ticker].gapIndicator = 1
                    
        #            self.symDict[ticker].gapLevel = (myTradeBar.Open - self.symDict[ticker].daily_bar.High)/self.symDict[ticker].daily_bar.High
                    
        #        elif myTradeBar.Open <= self.symDict[ticker].daily_bar.Low*(1-self.symDict[ticker].entryZscore*self.symDict[ticker]._retStd.Current.Value):
                    
        #            self.symDict[ticker].gapIndicator = -1
                    
        #            self.symDict[ticker].gapLevel = (self.symDict[ticker].daily_bar.Low - myTradeBar.Open)/self.symDict[ticker].daily_bar.Low
                
                
                # Check if the gape trend is still on
                
        #        if self.symDict[ticker].gapIndicator > 0 and self.symDict[ticker].fast.Current.Value < self.symDict[ticker].slow.Current.Value:
                    
        #            self.symDict[ticker].gapIndicator = 0
                    
        #        elif self.symDict[ticker].gapIndicator < 0 and self.symDict[ticker].fast.Current.Value > self.symDict[ticker].slow.Current.Value:
                    
        #            self.symDict[ticker].gapIndicator = 0
        
        
        
        
class SymbolData(object):
    
    def __init__(self, algorithm, symbol, entryZscore, myStDays, fastAverage, slowAverage):
        
        self.algo = algorithm
        
        self.symbol = symbol 
        
        self.entryZscore = entryZscore #percentuale di sicurezza rispetto alla deviazione standard del periodo
       
        self.myStdDays = myStDays #periodo di calcolo della deviazione standard mobile
        
        self.fastAverage = fastAverage #finestra in giorni per la media mobile veloce
        
        self.slowAverage = slowAverage #finestra in giorni per la media mobile lenta
        
        self.gapIndicator = 0 #0 fine trend, 1 gappa su e uptrend, -1 gappa giù e downtrend
        
        self.gapLevel = 0
        
        self.daily_bar = None
        
        self.prev_bar = None
        
        self.daily_consolidator = TradeBarConsolidator(timedelta(days = 1))    ## 1 Day TradeBar Consolidator
        self.daily_consolidator.DataConsolidated += self.DailyConsolidator  ## Add fuction to do what you want every day with your data
        self.algo.SubscriptionManager.AddConsolidator(self.symbol, self.daily_consolidator)
        
        self._retStd = StandardDeviation(self.symbol, self.myStdDays) #Deviazione Standard sui ritorni
        #self.algo.RegisterIndicator(self.symbol, self._retStd, Resolution.Daily)
        
        self.fast = ExponentialMovingAverage(self.fastAverage)
        #self.algo.RegisterIndicator(self.symbol, self.fast, Resolution.Daily)
        
        self.slow = ExponentialMovingAverage(self.slowAverage)
        #self.algo.RegisterIndicator(self.symbol, self.slow, Resolution.Daily)
        self.splitFactors = []
        self.dividendFactors = []
        
        
        
    def DailyConsolidator(self, sender, bar):
        
        self.daily_bar = bar
        
        if self.prev_bar is not None:
            
            ret = (self.daily_bar.Close -  self.prev_bar.Close) / self.prev_bar.Close
            
            self._retStd.Update(self.algo.Time, ret)
            
            self.fast.Update(self.algo.Time, self.daily_bar.Close)
            
            self.slow.Update(self.algo.Time, self.daily_bar.Close)
            
            self.prev_bar = bar
            
        else:
            
            self.prev_bar = bar
            
            
    @property
    def IsReady(self):
        
        return self._retStd.IsReady and self.fast.IsReady and self.slow.IsReady
    
    
    def Update1(self,split,dividend):
        if split is not None or dividend is not None:
            self.algo.Log(f'Split factor {split}; Dividend distribution {dividend}')
    def Update(self, bar, split_factor, dividend_distribution):
        
        if split_factor is not None or dividend_distribution is not None:
            self.algo.Quit(f'Split factor {split_factor}; Dividend distribution {dividend_distribution}')
        
        # Check if it Gapes Up or Down
        if bar.Open >= self.daily_bar.High * (1+self.entryZscore * self._retStd.Current.Value):
            
            self.gapIndicator = 1
            
            self.gapLevel = (bar.Open - self.daily_bar.High)/self.daily_bar.High
            
        elif bar.Open <= self.daily_bar.Low*(1-self.entryZscore*self._retStd.Current.Value):
            
            self.gapIndicator = -1
            
            self.gapLevel = (self.daily_bar.Low - bar.Open)/self.daily_bar.Low
        
        
        # Check if the gape trend is still on
        if self.gapIndicator > 0 and self.fast.Current.Value < self.slow.Current.Value:
            
            self.gapIndicator = 0
            
        elif self.gapIndicator < 0 and self.fast.Current.Value > self.slow.Current.Value:
            
            self.gapIndicator = 0
            
            
            
class SimpleGapAlphaModel(AlphaModel):
    
    def __init__(self, algorithm):
        
        self.algo = algorithm
        
        self.all_symbols = []
        self.removed_symbols = []
        
        
        
    def OnSecuritiesChanged(self, algorithm, changes):
        num_added = 0
        for security in changes.AddedSecurities:
            num_added += 1
            symbol = security.Symbol
            if symbol not in self.all_symbols:
                self.all_symbols.append(symbol)
        
        self.removed_symbols = []
        for security in changes.RemovedSecurities:
            symbol = security.Symbol
            self.removed_symbols.append(symbol)
        
        algorithm.Plot('Changes', 'Added', num_added)
        algorithm.Plot('Changes', 'Removed', len(self.removed_symbols))
        
        
        
    def Update(self, algorithm, data):
        
        insights = []
        
        if self.algo.IsWarmingUp:
            return insights
            
        for x in self.all_symbols:
            if x in self.removed_symbols:
                insights.append(Insight.Price(x, timedelta(days=self.algo.insightsDaysDuration), InsightDirection.Flat))
                
            elif self.algo.symDict[x.Value].gapIndicator > 0:
                insights.append(Insight.Price(x,timedelta(days=self.algo.insightsDaysDuration), InsightDirection.Up))
                
            elif self.algo.symDict[x.Value].gapIndicator < 0:
                insights.append(Insight.Price(x,timedelta(days=self.algo.insightsDaysDuration), InsightDirection.Down))
        
        return insights