Overall Statistics
Total Trades
626
Average Win
0.21%
Average Loss
-0.09%
Compounding Annual Return
-10.572%
Drawdown
4.200%
Expectancy
-0.020
Net Profit
-0.934%
Sharpe Ratio
-0.966
Probabilistic Sharpe Ratio
28.379%
Loss Rate
72%
Win Rate
28%
Profit-Loss Ratio
2.50
Alpha
-0.539
Beta
1.1
Annual Standard Deviation
0.227
Annual Variance
0.051
Information Ratio
-2.701
Tracking Error
0.189
Treynor Ratio
-0.199
Total Fees
$2085.49
Estimated Strategy Capacity
$1600000.00
from SymbolData import SymbolData

class PSARStrategy(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2020, 11, 16)  # Set Start Date
        self.SetEndDate(2020, 12, 16) # Set End Date
        self.SetCash(100000)  # Set Strategy Cash
        
        # Handles setting benchmark symbol for statistics
        self.benchmark = "SPY"
        self.AddEquity(self.benchmark, Resolution.Minute)
        self.SetBenchmark(self.benchmark)
        
        # Creates a dummy 5 minute consolidator, we can use to calculate signals
        # on a regular interval
        self.five_minute_consolidator = QuoteBarConsolidator(timedelta(minutes=5))
        self.SubscriptionManager.AddConsolidator(self.benchmark, self.five_minute_consolidator)
        self.five_minute_consolidator.DataConsolidated += self.EveryFiveMinutes
        
        # our chosen tickers
        tickers = ["JNJ", "AAPL", "WFC"]
        
        # data subscriptions
        for ticker in tickers:
            symbol = self.AddEquity(ticker, Resolution.Minute).Symbol
            
        # dictionary to hold symbol_data objects for each symbol
        # in our universe
        self.symbols = {}
    
    
    def EveryFiveMinutes(self, sender, bar):
        '''Fires every 5 minutes
        
        Handles portfolio management logic
        
        1. Loops through universe and finds securities with valid entry signals and no existing position
        2. Liquidates all invested securities which meet exit signal
        3. Distributes portfolio equally across all existing positions'''
        
        
        # list to hold new entry symbols
        selected_entry_symbols = []
    
        # loop through universe and find uninvested symbols which meet entry signal
        # does not submit orders, just collects them into a list
        for symbol, symbol_data in self.symbols.items():
            
            if not symbol_data.IsReady:
                continue
            
            # self.Debug(f"{symbol} Ready")
            
            valid_entry_signal = self.CalculateEntrySignal(symbol_data)
            
            if not self.Portfolio[symbol].Invested and valid_entry_signal:
                selected_entry_symbols.append(symbol)
        
        
        # list of all currently invested symbols
        currently_invested_symbols = [symbol for symbol in self.Portfolio.Keys if self.Portfolio[symbol].Invested]
        
        ### EXIT LOGIC
        # Loops through universe
        # Looks for invested symbols which meet exit criteria
        # Liquidates them and removes them from the list of invested symbols
        for symbol, symbol_data in self.symbols.items():
            
            if not self.Portfolio[symbol].Invested or not symbol_data.IsReady:
                continue
            
            exit_signal = self.CalculateExitSignal(symbol_data)
            
            if exit_signal:
                self.Liquidate(symbol)
                currently_invested_symbols.remove(symbol)
                self.Debug(f"Liquidating {symbol}....")
        
        # combines list of new entry symbols and currently invested symbols
        total_long_symbols = currently_invested_symbols + selected_entry_symbols
        
        # if there are no new entries or existing positions
        if len(total_long_symbols) == 0:
            return
        
        # proportion of margin we will allocate to each symbol
        portfolio_allocation_per_security = 1 / len(total_long_symbols)
        
        # rebalance portfolio 
        for symbol in total_long_symbols:
            self.SetHoldings(symbol, portfolio_allocation_per_security)
            
        
            
    def OnSecuritiesChanged(self, changes):
        
        for security in changes.AddedSecurities:
            symbol = security.Symbol
            if symbol not in self.symbols and symbol != self.benchmark:
                self.symbols[symbol] = SymbolData(self, symbol)
                
            
        for security in changes.RemovedSecurities:
            symbol = security.Symbol
            if symbol in self.symbols:
                symbol_data = self.symbols.pop(symbol, None)
                symbol_data.KillConsolidators()
                
                
    def CalculateEntrySignal(self, symbol_data):
        '''Is fired from within SymbolData when each respective
        symbol is ready
        
        Computes total signal'''
        
        ## ENTRY LOGIC
        signal_one = self.SignalOne(symbol_data)
        
        signal_two = self.SignalTwo(symbol_data)
        
        signal_three = self.SignalThree(symbol_data)
        
        signal_four = False
        
        # Signal One MUST be met and one of Signals 2,3,4 must be met
        valid_entry_signal = signal_one and (signal_two or signal_three or signal_four)
        
        if valid_entry_signal:
            self.Debug(f"{symbol_data.symbol} valid entry...signal_two:{signal_two}, signal_three:{signal_three} " \
                    + f", signal_four: {signal_four}")
        
        return valid_entry_signal
        
        
        
    def SignalOne(self, symbol_data):
        '''All 3 currently trending up (slow < moderate < fast)'''
        psar_slow = symbol_data.psar_slow.Current.Value
        psar_moderate = symbol_data.psar_moderate.Current.Value
        psar_fast = symbol_data.psar_fast.Current.Value
        
        return psar_slow < psar_moderate and psar_moderate < psar_fast
        
    
    def SignalTwo(self, symbol_data):
        '''One of the following: 
            a. All 3 indicators equal on the previous bar (slow == moderate == fast)
            b. All 3 indicators within .02 of each other on the previous bar (slow + 0.02/0.01 == moderate/fast)
            c. Slow from 2 bars ago is > open'''
            
            
        # Condition A
        # All 3 indicators equal on the previous bar (slow == moderate == fast)
        previous_psar_slow = symbol_data.psar_slow_window[1].Value
        previous_psar_moderate = symbol_data.psar_moderate_window[1].Value
        previous_psar_fast = symbol_data.psar_fast_window[1].Value
        
        condition_a = previous_psar_slow == previous_psar_moderate and \
            previous_psar_slow == previous_psar_fast
                
        
        # Condition B
        # All 3 indicators within .02 of each other on the previous bar (slow + 0.02/0.01 == moderate/fast)
        max_distance = 0.02
        
        condition_b = abs(previous_psar_slow - previous_psar_moderate) < max_distance and \
                    abs(previous_psar_slow - previous_psar_fast) < max_distance and \
                    abs(previous_psar_fast - previous_psar_moderate) < max_distance
                    
        
        
        # Condition C
        # Slow from 2 bars ago is > open
        two_bars_ago_psar_slow = symbol_data.psar_slow_window[2].Value
        
        current_bar_open = symbol_data.bar_window[0].Open
        
        condition_c = two_bars_ago_psar_slow > current_bar_open
        
        return condition_a or condition_b or condition_c


    def SignalThree(self, symbol_data):
        '''Slow/moderate/fast < open'''
        
        psar_slow = symbol_data.psar_slow.Current.Value
        psar_moderate = symbol_data.psar_moderate.Current.Value
        psar_fast = symbol_data.psar_fast.Current.Value
        
        current_bar_open = symbol_data.bar_window[0].Open
        
        return psar_slow < current_bar_open and \
                psar_moderate < current_bar_open and \
                psar_fast < current_bar_open
                

    def CalculateExitSignal(self, symbol_data):
        
        current_psar_slow = symbol_data.psar_slow.Current.Value
        current_bar_open = symbol_data.bar_window[0].Open
        
        return current_psar_slow > current_bar_open
class SymbolData:
    
    def __init__(self, algorithm, symbol):
        
        self.algorithm = algorithm
        self.symbol = symbol
        
        self.consolidator = QuoteBarConsolidator(timedelta(minutes=5))
        self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator)
        
        self.consolidator.DataConsolidated += self.OnFiveMinuteBar
        
        self.psar_slow = ParabolicStopAndReverse(0.01, 0.01, 0.20)
        self.psar_moderate = ParabolicStopAndReverse(0.01, 0.02, 0.20)
        self.psar_fast = ParabolicStopAndReverse(0.01, 0.03, 0.20)
    
        self.algorithm.RegisterIndicator(self.symbol, self.psar_slow, self.consolidator)
        self.algorithm.RegisterIndicator(self.symbol, self.psar_moderate, self.consolidator)
        self.algorithm.RegisterIndicator(self.symbol, self.psar_fast, self.consolidator)
        
        self.psar_slow.Updated += self.OnPSARSlow
        self.psar_moderate.Updated += self.OnPSARModerate
        self.psar_fast.Updated += self.OnPSARFast
        
        
        self.psar_slow_window = RollingWindow[IndicatorDataPoint](3)
        self.psar_moderate_window = RollingWindow[IndicatorDataPoint](3)
        self.psar_fast_window = RollingWindow[IndicatorDataPoint](3)
        
        
        self.bar_window = RollingWindow[QuoteBar](5)
    
    
    def OnFiveMinuteBar(self, sender, bar):
        '''Fires each time there is a new five minute bar
        Stores five minute bars as they arrive'''
        self.bar_window.Add(bar)
        
    def OnPSARSlow(self, sender, updated):
        '''Fires each time psar_slow is updated'''
        if self.psar_slow.IsReady:
            self.psar_slow_window.Add(self.psar_slow.Current)
    
    def OnPSARModerate(self, sender, updated):
        '''Fires each time psar_moderate is updated'''
        if self.psar_moderate.IsReady:
            self.psar_moderate_window.Add(self.psar_moderate.Current)
            
    def OnPSARFast(self, sender, updated):
        '''Fires each time psar_fast is updated'''
        if self.psar_fast.IsReady:
            self.psar_fast_window.Add(self.psar_fast.Current)
    
    @property
    def IsReady(self):
        '''Checks whether all data is ready to be used to calculate signals'''
        return self.psar_slow_window.IsReady and self.psar_fast_window.IsReady and \
            self.psar_moderate_window.IsReady and self.bar_window.IsReady
                
                
    def KillConsolidators(self):
        '''Removes data subscriptions'''
        self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator)