Overall Statistics
Total Trades
24
Average Win
0.15%
Average Loss
-0.15%
Compounding Annual Return
0.723%
Drawdown
1.100%
Expectancy
0.008
Net Profit
0.012%
Sharpe Ratio
0.116
Probabilistic Sharpe Ratio
45.587%
Loss Rate
50%
Win Rate
50%
Profit-Loss Ratio
1.02
Alpha
0.497
Beta
-0.382
Annual Standard Deviation
0.067
Annual Variance
0.005
Information Ratio
-12.432
Tracking Error
0.102
Treynor Ratio
-0.02
Total Fees
$24.00
import scipy as sp
from sklearn.linear_model import LinearRegression
import numpy as np

class UncoupledTachyonContainmentField(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2017, 1, 1)  # Set Start Date
        self.SetEndDate(2017, 1, 7)  # Set End Date
        self.SetCash(100000)  # Set Strategy Cash
        self.__numberOfSymbols = 3 #600
        self.SetUniverseSelection(FineFundamentalUniverseSelectionModel(self.CoarseSelectionFunction, self.FineSelectionFunction))
        
        #Settings
        self.UniverseSettings.Resolution = Resolution.Minute
        self.UniverseSettings.ExtendedMarketHours = False
        self.SetSecurityInitializer(lambda x: x.SetDataNormalizationMode(DataNormalizationMode.Raw))
        self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Cash)
        #self.SetBenchmark('SPY')

        # Define lists, dictionaries needed for storing values
        self.slope_dict = {}
        self.ranked_ra = {}
        self.volume_profiles = {}
        self.val = {}
        self.vah = {}
        self.pct_change = {}
        self.top_stocks = {}
        self.number_stocks = 3
        self.purchased_stocks = []

        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(3, 10), self.DataGenerator)

    

 

    def OnData(self, data):
        
        '''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
            Arguments:
                data: Slice object keyed by symbol containing the stock data
        '''
        if self.Time.hour == 10 and self.Time.minute == 30:
            
            for key, value in self.ranked_ra.items():
                if data.ContainsKey(key) and data[key] is not None:
                    self.pct_change[key] = data[key].Price # some calculation with price at 10:30 AM; resulting value is stored in a dictionary as value with according key pair (I removed the calculation on purpose)
            
            
            self.pct_change = dict(sorted(self.pct_change.items(), key=lambda x: x[1], reverse=True))
            ranked_pct_change = {item:index for index, item in enumerate(self.pct_change,1)}

            #Combine ranking of both ranked_ra & val_pct_change; Data is pulled from dictionaries stored in Initialize and they according was generated with DataGenerator-Function
            for key, value in self.ranked_ra.items():
                if key not in ranked_pct_change:
                    continue
                value_2 = ranked_pct_change[key]
                self.top_stocks[key] = value + value_2
    
            #Rank stocks according to total rank
            self.top_stocks = dict(sorted(self.top_stocks.items(), key=lambda x: x[1], reverse=False))
            
            counter = 0
            
            self.purchased_stocks = []
            
            if counter < self.number_stocks:
                for ticker in self.top_stocks.keys():
                    self.SetHoldings(ticker, 0.5/self.number_stocks)
                    counter += 1
                    self.purchased_stocks.append(ticker)
                
            
        if self.Time.hour == 15 and self.Time.minute == 00:
            if self.Portfolio.Invested:
                self.Liquidate()
                
        #self.Log(str(f"Number of stocks in Universe: {len(self.ranked_ra)}, Bought these tickers: {self.purchased_stocks}"))

 

    def CoarseSelectionFunction(self, coarse):
        # sort descending by daily dollar volume
        sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse = True)
        
        # select only stocks with fundamental data
        cleaned_selection = [x for x in sortedByDollarVolume if x.HasFundamentalData]
    
        # return the symbol objects of the top entries from our cleaned collection
        return [ x.Symbol for x in cleaned_selection[:self.__numberOfSymbols] ]
    
    def FineSelectionFunction(self, fine):
        
        def adder(x):
            if x == None:
                x = 0.01
            else:
                x
            return x
        
        filtered_stocks = [x for x in fine if x.AssetClassification.MorningstarIndustryGroupCode != MorningstarIndustryGroupCode.Banks
                    and adder(x.FinancialStatements.IncomeStatement.InterestIncome.ThreeMonths) !=0
                    and adder(x.FinancialStatements.IncomeStatement.EBIT.ThreeMonths) !=0
                    and adder(x.FinancialStatements.BalanceSheet.CurrentDebt.ThreeMonths) !=0
                    and x.CompanyReference.PrimaryExchangeID in ["NYS","NAS"]
                    and x.CompanyReference.CountryId == "USA"
                    ]
        
        # take all entries from the filtered collection
        return [x.Symbol for x in filtered_stocks]

 

    def DataGenerator(self):
        
        # Clear all the data from the past before starting the calculation for this day again
        self.slope_dict.clear()
        self.ranked_ra.clear()
        self.volume_profiles.clear()
        self.val.clear()
        self.vah.clear()
    
        slices_ra = self.History(self.Securities.Keys, 10, Resolution.Daily)
        
        for i in self.Securities.Keys:
            price_data = slices_ra.loc[str(i)]["close"].iloc[-10:]
            y = np.log(price_data)
            x = np.arange(len(y))
            slope, intercept, r_value, p_value, std_err = sp.stats.linregress(x,y)
            self.slope_dict[str(i)] = {"Slope":slope, "R²":r_value**2}
            
        self.slope_dict = dict(sorted(self.slope_dict.items(), key=lambda x: x[1]["R²"], reverse=True))
        self.ranked_ra = {}
        for index, (key, item) in enumerate(self.slope_dict.items(),1):
            #if index >= len(self.slope_dict.keys())*0.1:
            #    break
            self.ranked_ra[key] = index
        #self.ranked_ra = {key:index for index, (key, item) in enumerate(self.slope_dict.items(),1) if index < len(self.slope_dict.keys())*0.1}


        slices_vp = self.History(list(self.ranked_ra.keys()), 15, Resolution.Minute)
        
        #Code to create volume profiles for all requested stocks
        for i in self.ranked_ra.keys():
            #create stock keys in volume profile
            self.volume_profiles[str(i)] = {}
            
            low = round(min(slices_vp.loc[str(i)]["close"]), 2)
            high = round(max(slices_vp.loc[str(i)]["close"]), 2)
            
            price_range = high - low
            total_volume = sum(slices_vp.loc[str(i)]["volume"])

            #create volume profile for every stock in keys
            for row in slices_vp.loc[str(i)].itertuples():
                if row.volume > 0:
                    key = round(row.close, 2)
                    volume_per_level = row.volume
                    if key not in self.volume_profiles[str(i)].keys():
                        self.volume_profiles[str(i)][str(key)] = volume_per_level
                    else:
                        self.volume_profiles[str(i)][str(key)] += volume_per_level
                
            # Set target volume - 70% of daily volume
            target_vol = sum(self.volume_profiles[str(i)].values()) * 0.7
        
            # Get the price level that had the largest volume
            max_vol_price = max(self.volume_profiles[str(i)], key=self.volume_profiles[str(i)].get)
        
            # Setup a window to capture the POC, centered at the most liquid level
            curr_max_price = float(max_vol_price)
            curr_min_price = float(max_vol_price)
            curr_vol = self.volume_profiles[str(i)][max_vol_price]
        
            price_levels = sorted([float(price_level) for price_level, vol in self.volume_profiles[str(i)].items() if vol > 0])
        
        
            # Grow window bounds until we have 70% of the day's volume captured
            while curr_vol < target_vol:
                
                # Price one level up
                price_up = None
                price_up_vol = 0
                up_prices = [price for price in price_levels if price > curr_max_price]
                if len(up_prices) > 0:
                    price_up = up_prices[0]
                    price_up_vol = self.volume_profiles[str(i)][str(price_up)]
                
                # Price one level down
                price_down = None
                price_down_vol = 0
                down_prices = [price for price in price_levels if price < curr_min_price]
                if len(down_prices) > 0:
                    price_down = down_prices[-1]
                    price_down_vol = self.volume_profiles[str(i)][str(price_down)]
                
                #Grow windows in the direction of more volume
                if price_up is not None and (price_up_vol > price_down_vol):
                    curr_max_price = round(price_up, 2)
                    curr_vol += price_up_vol
                else:
                    curr_min_price = round(price_down, 2)
                    curr_vol += price_down_vol
                
            
            # Save VAL, value area low & VAH, value area high for each stock    
            self.val[str(i)] = curr_min_price
            self.vah[str(i)] = curr_max_price