Overall Statistics
Total Orders
1478
Average Win
0.01%
Average Loss
-0.04%
Compounding Annual Return
2.951%
Drawdown
12.800%
Expectancy
-0.466
Start Equity
1000000
End Equity
1029644.45
Net Profit
2.964%
Sharpe Ratio
-0.255
Sortino Ratio
-0.28
Probabilistic Sharpe Ratio
20.546%
Loss Rate
59%
Win Rate
41%
Profit-Loss Ratio
0.29
Alpha
0
Beta
0
Annual Standard Deviation
0.093
Annual Variance
0.009
Information Ratio
0.317
Tracking Error
0.093
Treynor Ratio
0
Total Fees
$3481.03
Estimated Strategy Capacity
$16000.00
Lowest Capacity Asset
GDL TPNAWDP49CO5
Portfolio Turnover
1.50%
#region imports
from AlgorithmImports import *

import pandas as pd
from dateutil.relativedelta import relativedelta
#endregion


class ROCAndNearnessAlphaModel(AlphaModel):
    """
    This class ranks securities in the universe by their historical rate of change and nearness to trailing highs.
    """
    _securities = []
    _month = -1
    
    def __init__(self, algorithm, roc_lookback_months=6, nearness_lookback_months=12, holding_months=6, pct_long=50):
        """
        Input:
         - algorithm
            The algorithm instance
         - roc_lookback_months
            Number of trailing months to calculate the rate of change over (> 0)
         - nearness_lookback_months
            Number of trailing months to calculate the nearness factor over (> 0)
         - holding_months
            Number of months to hold positions (> 0)
         - pct_long
            The percentage of the universe we go long (0 < pct_long <= 100)
        """
        if roc_lookback_months <= 0 or nearness_lookback_months <= 0 or holding_months <= 0:
            algorithm.quit(f"Requirement violated:  roc_lookback_months > 0 and nearness_lookback_months > 0 and holding_months > 0")
        
        if pct_long <= 0 or pct_long > 100:
            algorithm.quit(f"Requirement violated: 0 < pct_long <= 100")

        self._algorithm = algorithm        
        self._roc_lookback_months = roc_lookback_months
        self._nearness_lookback_months = nearness_lookback_months
        self._lookback_months = max(roc_lookback_months, nearness_lookback_months)
        self._holding_months = holding_months
        self._pct_long = pct_long
    
    def update(self, algorithm, data):
        # Reset price history when corporate actions occur
        for symbol in set(data.splits.keys() + data.dividends.keys()):
            self._reset(algorithm.securities[symbol])

        # Only emit insights when there is quote data, not when we get corporate action or alt data
        if data.quote_bars.count == 0:
            return []
        
        # Rebalance monthly
        if self._month == algorithm.time.month:
            return []
        self._month = algorithm.time.month
        
        # Rank symbols. A higher rank (or index in the list) is associated with relatively greater rate of change
        # over the roc lookback window and a closer closing price to the max price in the nearness lookback window
        ranking_df = pd.DataFrame()
        for security in self._securities:
            if data.contains_key(security.symbol) and self._is_ready(security) and security.is_tradable and security.price > 0:
                row = pd.DataFrame({'ROC': self._roc(security), 'Nearness': self._nearness(security)}, index=[security.symbol])
                ranking_df = pd.concat([ranking_df, row])
        ranked_symbols =  ranking_df.rank().sum(axis=1).sort_values().index
        
        # Generate insights 
        insights = []
        num_long = int(len(ranked_symbols) * (self._pct_long / 100))
        if num_long > 0:
            for symbol in ranked_symbols[-num_long:]:
                insights.append(Insight.price(symbol, Expiry.END_OF_MONTH, InsightDirection.UP))
        return insights
    
    def on_securities_changed(self, algorithm, changes):
        for security in changes.added_securities:
            self._securities.append(security)

            # Warm up history
            self._warm_up_history(security)

            # Setup indicator consolidator
            self._set_up_consolidator(security)

        for security in changes.removed_securities:
            if security in self._securities:
                self._securities.remove(security)
            algorithm.subscription_manager.remove_consolidator(security.symbol, security.consolidator)

    def _warm_up_history(self, security):
        """
        Warms up the `historical_prices_by_symbol` dictionary with historical data as far back as the 
        earliest lookback window.
        
        Input:
         - security
            Security that needs history warm up
        """
        symbol = security.symbol
        # Get the historical data
        start_lookback = Expiry.END_OF_MONTH(self._algorithm.time) - relativedelta(months=self._lookback_months + 1)
        history = self._algorithm.history(symbol, start_lookback, self._algorithm.time, Resolution.DAILY, data_normalization_mode=DataNormalizationMode.SCALED_RAW)
        
        # Rollback history timestamp by 1 day to ensure accurate monthly ROC values
        if history.shape[0] > 0:
            history = history.unstack(level=0)
            history = history.set_index(history.index.map(lambda x: x - timedelta(days=1))).stack().swaplevel()
        
        security.history = history.loc[symbol, ['open', 'high', 'close']] if symbol in history.index else pd.DataFrame()
    
    def _set_up_consolidator(self, security):
        security.consolidator = TradeBarConsolidator(timedelta(1))
        security.consolidator.data_consolidated += self._consolidation_handler
        self._algorithm.subscription_manager.add_consolidator(security.symbol, security.consolidator)

    def _consolidation_handler(self, sender, consolidated):
        """
        Updates the rolling lookback window with the latest data.
        
        Inputs
         - sender
            Function calling the consolidator
         - consolidated
            Tradebar representing the latest completed trading day
        """
        security = self._algorithm.securities[consolidated.symbol]

        # Add new data point to history
        if consolidated.time not in security.history.index:
            row = pd.DataFrame({'open': consolidated.open, 'high': consolidated.high, 'close': consolidated.close},
                                index=[consolidated.time])
            security.history = pd.concat([security.history, row])

        # Remove expired history
        start_lookback = Expiry.END_OF_MONTH(self._algorithm.time) - relativedelta(months=self._lookback_months + 1)
        security.history = security.history[security.history.index >= start_lookback]

    def _reset(self, security):
        self._warm_up_history(security)
        
        # Reset the consolidator to clear our pricing data from before the corporate action
        self._algorithm.subscription_manager.remove_consolidator(security.symbol, security.consolidator)
        self._set_up_consolidator(security)
    
    def _is_ready(self, security):
        """
        Boolean to signal if the security has sufficient history to fill the ROC lookback window.
        """
        return self._get_lookback(security, self._roc_lookback_months).shape[0] > 1

    def _roc(self, security):
        """
        Calculates the rate of change over the ROC lookback window.
        """
        lookback = self._get_lookback(security, self._roc_lookback_months)
        start_price = lookback.iloc[0].open
        end_price = lookback.iloc[-1].close
        return (end_price - start_price) / start_price 

    def _nearness(self, security):
        """
        Calculates how close the closing price of the nearness lookback window was to its maximum price.
        """
        lookback = self._get_lookback(security, self._nearness_lookback_months)
        return lookback.iloc[-1].close / lookback.high.max()
    
    def _get_lookback(self, security, num_months):
        """
        Slices the historical data into the trailing `num_months` months.
        
        Input:
         - security
            The security for which to check history
         - num_months
            Number of trailing months in the lookback window
        
        Returns DataFrame containing data for the lookback window.
        """
        start_lookback = Expiry.END_OF_MONTH(self._algorithm.time) - relativedelta(months=num_months + 1)
        end_lookback = Expiry.END_OF_MONTH(self._algorithm.time) - relativedelta(months=1)
        
        return security.history[(security.history.index >= start_lookback) & (security.history.index < end_lookback)]
#region imports
from AlgorithmImports import *

from universe import AssetManagementUniverseSelection
from alpha import ROCAndNearnessAlphaModel
from portfolio import NetDirectionWeightedPortfolioConstructionModel
#endregion


class AssetManagementFirmMomentum(QCAlgorithm):

    _undesired_symbols_from_previous_deployment = []
    _checked_symbols_from_previous_deployment = False

    def initialize(self):
        holding_months = self.get_parameter("holding_months", 6)

        # Set backtest start date and warm-up period
        WARM_UP_FOR_LIVE_MODE = self.get_parameter("warm_up_for_live_mode", 0)
        MORNING_STAR_LIVE_MODE_HISTORY = timedelta(30) # US Fundamental Data by Morningstar is limited to the last 30 days
        if self.live_mode:
            self.set_warm_up(MORNING_STAR_LIVE_MODE_HISTORY, Resolution.DAILY)
        else: # Backtest mode
            if WARM_UP_FOR_LIVE_MODE: # Need to run a backtest before you can deploy this algorithm live
                self.set_start_date(2023, 3, 1)
                self.set_end_date(2024, 3, 1)
            else: # Regular backtest
                self.set_start_date(2023, 3, 1)
                self.set_end_date(2024, 3, 1)
            self.set_warm_up(timedelta(31 * holding_months), Resolution.DAILY)
        self.set_cash(1_000_000)

        self.settings.minimum_order_margin_portfolio_percentage = 0
        self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)
        
        self.set_security_initializer(CustomSecurityInitializer(self))

        self.universe_settings.data_normalization_mode = DataNormalizationMode.RAW
        self.universe_settings.schedule.on(self.date_rules.month_start())
        self.set_universe_selection(AssetManagementUniverseSelection(self, self.universe_settings))
        
        self.set_alpha(ROCAndNearnessAlphaModel(
            self,
            self.get_parameter("roc_lookback_months", 6),
            self.get_parameter("nearness_lookback_months", 12),
            holding_months,
            self.get_parameter("pct_long", 50)
        ))
        
        self.set_portfolio_construction(NetDirectionWeightedPortfolioConstructionModel())

        self.add_risk_management(NullRiskManagementModel())
        
        self.set_execution(ImmediateExecutionModel())


    def on_data(self, data):
        # Exit positions that aren't backed by existing insights.
        # If you don't want this behavior, delete this method definition.
        if not self.is_warming_up and not self._checked_symbols_from_previous_deployment:
            for security_holding in self.portfolio.values():
                if not security_holding.invested:
                    continue
                symbol = security_holding.symbol
                if not self.insights.has_active_insights(symbol, self.utc_time):
                    self._undesired_symbols_from_previous_deployment.append(symbol)
            self._checked_symbols_from_previous_deployment = True
        
        for symbol in self._undesired_symbols_from_previous_deployment:
            if self.is_market_open(symbol):
                self.liquidate(symbol, tag="Holding from previous deployment that's no longer desired")
                self._undesired_symbols_from_previous_deployment.remove(symbol)


class CustomSecurityInitializer(BrokerageModelSecurityInitializer):

    def __init__(self, algorithm):
        self._algorithm = algorithm
        super().__init__(algorithm.brokerage_model, SecuritySeeder.NULL)

    def initialize(self, security: Security) -> None:
        # First, call the superclass definition
        # This method sets the reality models of each security using the default reality models of the brokerage model
        super().initialize(security)

        # Seed the security with Daily bar
        bars = self._algorithm.history[TradeBar](security.symbol, 5, Resolution.DAILY)
        for bar in bars:
            security.set_market_price(bar)
#region imports
from AlgorithmImports import *
#endregion


class NetDirectionWeightedPortfolioConstructionModel(PortfolioConstructionModel):
    """
    This PCM allocates its portfolio based on the net direction of insights for all the symbols. A symbol
    that has two active insights with an up direction will have twice the allocation than a symbol with
    only one. Additionally, a symbol that has an up active insight and a down active insight will have no
    position. This PCM doesn't liquidate securities when they are removed from the universe. If it's 
    removed from the universe, it will remain invested until all the security's insights expire.
    """
    _insights = []
    _month = -1
    
    def create_targets(self, algorithm, insights):
        """
        Called each time the alpha model emits a list of insights.
        
        Input:
         - algorithm
            Algorithm instance running the backtest
         - insights
            List of insights
            
        Returns a list of portfolio targets.
        """
        for i in insights:
            self._insights.append(i)
        
        if algorithm.is_warming_up or algorithm.current_slice.quote_bars.count == 0 or self._month == algorithm.time.month:
            return []
        self._month = algorithm.time.month

        # Remove insights of delisted symbols
        self._insights = [i for i in self._insights if algorithm.securities[i.symbol].is_tradable]
        
        # Classify insights
        active_insights = []
        expired_insights = []
        while (len(self._insights) > 0):
            insight = self._insights.pop()
            (active_insights if insight.is_active(algorithm.utc_time) else expired_insights).append(insight)
        self._insights = active_insights

        # Liquidate symbols that have expired insights with no active insights
        active_symbols = set([i.symbol for i in active_insights])
        expired_symbols = set([i.symbol for i in expired_insights])
        liquidate_symbols = expired_symbols.difference(active_symbols)
        portfolio_targets = [PortfolioTarget.percent(algorithm, symbol, 0) for symbol in liquidate_symbols]
        
        # Get net direction by symbol and total number of directional insights
        net_direction_by_symbol, num_directional_insights = self._get_net_direction(active_insights)
        
        # Create portfolio targets for active symbols
        for symbol, net_direction in net_direction_by_symbol.items():
            percent = 0 if num_directional_insights == 0 else net_direction / num_directional_insights
            portfolio_targets.append( PortfolioTarget.percent(algorithm, symbol, percent) )
        
        return portfolio_targets
        
    def _get_net_direction(self, insights):
        """
        Determines the net direction of each symbol and the number of active directional insights.
        
        Input:
         - insights
            A list of active insights
        
        Returns a dictionary showing the net direction of each symbol, and the number of directional insights.
        """
        net_direction_by_symbol = {}
        num_directional_insights = 0
        
        for insight in insights:
            symbol = insight.symbol
            direction = insight.direction
            if symbol in net_direction_by_symbol:
                net_direction_by_symbol[symbol] += direction
            else:
                net_direction_by_symbol[symbol] = direction
                
            num_directional_insights += abs(direction)
        
        return net_direction_by_symbol, num_directional_insights

    def on_securities_changed(self, algorithm, changes):
        pass
#region imports
from AlgorithmImports import *
#endregion
# 08/15/2023: -Adjusted algorithm so that it can warm-up properly even though US Fundamental data is limited to last 30-days in warm-up
#             https://www.quantconnect.com/terminal/processCache?request=embedded_backtest_ec4dc1c6b728147dbbd847040e91eb33.html
#region imports
from AlgorithmImports import *
#endregion


class AssetManagementUniverseSelection(FundamentalUniverseSelectionModel):
    """
    This universe selection model refreshes monthly to contain US securities in the asset management industry.
    """
    _fine_symbols_by_date = {}

    def __init__(self, algorithm: QCAlgorithm, universe_settings: UniverseSettings = None):
        def select(fundamental):
            fine = [x for x in fundamental if x.has_fundamental_data]
            
            key = f"{algorithm.time.year}{algorithm.time.month}"
            if key in self._fine_symbols_by_date:
                symbols = [algorithm.symbol(security_id) for security_id in self._fine_symbols_by_date[key]]
            else:
                if not list(fine):
                    algorithm.quit("Insufficient fundamental data in the ObjectStore. Run a backtest before deploying live.")
                    return []
                symbols = [f.symbol for f in fine if f.asset_classification.morningstar_industry_code == MorningstarIndustryCode.ASSET_MANAGEMENT]
                self._fine_symbols_by_date[key] = [str(symbol) for symbol in symbols]
                algorithm.object_store.save(self.OBJECT_STORE_KEY, json.dumps(self._fine_symbols_by_date))

            return symbols
        
        super().__init__(None, select, universe_settings)

        self.OBJECT_STORE_KEY = str(algorithm.project_id)
        if algorithm.live_mode:
            if not algorithm.object_store.contains_key(self.OBJECT_STORE_KEY):
                algorithm.quit("No fundamental data in the ObjectStore. Run a backtest before deploying live.")
                return
            self._fine_symbols_by_date = json.loads(algorithm.object_store.read(self.OBJECT_STORE_KEY))