Overall Statistics
Total Trades
2217
Average Win
0.01%
Average Loss
-0.01%
Compounding Annual Return
41.780%
Drawdown
10.800%
Expectancy
-0.191
Net Profit
7.339%
Sharpe Ratio
1.393
Probabilistic Sharpe Ratio
54.649%
Loss Rate
53%
Win Rate
47%
Profit-Loss Ratio
0.73
Alpha
0.194
Beta
0.904
Annual Standard Deviation
0.221
Annual Variance
0.049
Information Ratio
1.828
Tracking Error
0.1
Treynor Ratio
0.341
Total Fees
$3589.90
Estimated Strategy Capacity
$730000000.00
Lowest Capacity Asset
TMK R735QTJ8XC9X
#region imports
from AlgorithmImports import *
#endregion


class CPIData(PythonData):
    # 12-month unadjusted CPI data
    # Source: https://www.bls.gov/charts/consumer-price-index/consumer-price-index-by-category-line-chart.htm
    # Release dates source: https://www.bls.gov/bls/news-release/cpi.htm

    def GetSource(self,
         config: SubscriptionDataConfig,
         date: datetime,
         isLive: bool) -> SubscriptionDataSource: 
        return SubscriptionDataSource("https://www.dropbox.com/s/99bjb6rnoqq4ihj/CPI%20data%202.csv?dl=1", SubscriptionTransportMedium.RemoteFile)

    def Reader(self,
         config: SubscriptionDataConfig,
         line: str,
         date: datetime,
         isLive: bool) -> BaseData:

        if not (line.strip()):
            return None

        cpi = CPIData()
        cpi.Symbol = config.Symbol

        try:
            def parse(pct):
                return float(pct[:-1]) / 100

            data = line.split(',')
            cpi.EndTime = datetime.strptime(data[0], "%m%d%Y %H:%M %p")
            cpi["month"] = data[1]
            cpi['all-items'] = parse(data[2])
            cpi['food'] = parse(data[3])
            cpi['food-at-home'] = parse(data[4])
            cpi['food-away-from-home'] = parse(data[5])
            cpi['energy'] = parse(data[6])
            cpi['gasoline'] = parse(data[7])
            cpi['electricity'] = parse(data[8])
            cpi['natural-gas'] = parse(data[9])
            cpi['all-items-less-food-and-energy'] = parse(data[10])
            cpi['commodities-less-food-and-energy-commodities'] = parse(data[11])
            cpi['apparel'] = parse(data[12])
            cpi['new-vehicles'] = parse(data[13])
            cpi['medical-car-commodities'] = parse(data[14])
            cpi['services-less-energy-services'] = parse(data[15])
            cpi['shelter'] = parse(data[16])
            cpi['medical-care-services'] = parse(data[17])
            cpi['education-and-communication'] = parse(data[18])
            
            cpi.Value = cpi['all-items']


        except ValueError:
            # Do nothing
            return None

        return cpi
# region imports
from AlgorithmImports import *
from data import CPIData
from symbol import SymbolData
# endregion

class CalculatingAsparagusFlamingo(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2022, 9, 13)
        self.SetEndDate(2022, 11, 25)
        self.SetCash(10_000_000)
        self.dataset_symbol = self.AddData(CPIData, "CPIData").Symbol

        self.SetSecurityInitializer(BrokerageModelSecurityInitializer(self.BrokerageModel, FuncSecuritySeeder(self.GetLastKnownPrices)))

        # Add ETF constituents universe
        self.UniverseSettings.Resolution = Resolution.Daily
        self.UniverseSettings.FillForward = False
        filter_function = lambda constituents: [x.Symbol for x in constituents]
        universe = self.Universe.ETF("SPY", Market.USA, self.UniverseSettings, filter_function)
        self.AddUniverse(universe, self.FineFundamentalFunction)

        # Add ETF
        self.benchmark_symbol = self.AddEquity('SPY', Resolution.Daily, fillDataForward=False).Symbol

        self.symbol_data_by_symbol = {}
        self.rebalance = False
        self.quantiles = int(self.GetParameter('quantiles'))

        # Warm up CPI data history
        self.cpi_lookback = timedelta(days=395)
        self.cpi_history = self.History(self.dataset_symbol, self.StartDate - self.cpi_lookback, self.StartDate)
        if not self.cpi_history.empty:
            self.cpi_history = self.cpi_history.loc[self.dataset_symbol]['value']

    def FineFundamentalFunction(self, fine: List[FineFundamental]) -> List[Symbol]:
        return [f.Symbol for f in fine if f.MarketCap > 0]


    def OnData(self, data: Slice):
        # Update CPI historical data
        if data.ContainsKey(self.dataset_symbol):
            self.rebalance = True
            self.cpi_history.loc[self.Time] = data[self.dataset_symbol].Value
            self.cpi_history = self.cpi_history.loc[self.cpi_history.index >= self.Time - self.cpi_lookback]

        # Check if the algorithm should rebalance
        if not self.rebalance or len(self.symbol_data_by_symbol) <= 1 or data.Time.hour != 0:
            return
        self.rebalance = False

        # Get the current environment and environment history
        cpi_growth = self.cpi_history.pct_change().dropna()
        current_environment = 1 if cpi_growth[-1] > 0 else -1

        # Get benchmark history
        benchmark_history = self.symbol_data_by_symbol[self.benchmark_symbol].history

        # Get ETF constituent history
        universe_history = pd.DataFrame()
        for symbol, symbol_data in self.symbol_data_by_symbol.items():
            if symbol == self.benchmark_symbol:
                continue
            universe_history[symbol] = symbol_data.history
        universe_history.dropna(axis=1, how='any', inplace=True)

        # Get historical environments data
        benchmark_environment_history = pd.Series()
        universe_environment_history = pd.DataFrame()
        for i in range(1, len(cpi_growth)):
            start = cpi_growth.index[i-1]
            end = cpi_growth.index[i]

            sample_environment = 1 if cpi_growth[i] > 0 else -1
            if current_environment != sample_environment:
                continue

            trade_open_date = universe_history.loc[universe_history.index >= start]
            if len(trade_open_date) == 0:
                break
            trade_open_date = trade_open_date.index[0]
            trade_close_date = universe_history.loc[universe_history.index >= end]
            if len(trade_close_date) == 0:
                break
            trade_close_date = trade_close_date.index[0]

            universe_env_daily_returns = universe_history.loc[trade_open_date:trade_close_date].pct_change().iloc[1:]
            benchmark_daily_returns = benchmark_history.loc[trade_open_date:trade_close_date].pct_change().iloc[1:]

            universe_environment_history = pd.concat([universe_environment_history, universe_env_daily_returns])
            benchmark_environment_history = pd.concat([benchmark_environment_history, benchmark_daily_returns])

        # Calculate Sharpe ratios
        universe_sharpes = self.sharpe_ratio(universe_environment_history)
        benchmark_sharpe = self.sharpe_ratio(benchmark_environment_history)
        
        # Select assets that outperform the benchmark in the current environment
        symbols_to_buy = universe_sharpes[universe_sharpes > benchmark_sharpe].index
        if len(symbols_to_buy) == 0:
            self.Liquidate()
            self.SetHoldings(self.benchmark_symbol, 1)
            self.Debug(f"{self.Time}: No assets outperform the benchmark. Buying the benchmark.")
            return
        outperforming_sharpes = universe_sharpes[universe_sharpes > benchmark_sharpe]
        symbols_to_buy = outperforming_sharpes.sort_values(ascending=False).index
        symbols_to_buy = symbols_to_buy[:max(1, int(len(symbols_to_buy)/self.quantiles))]

        # Calculate total market cap of all selected assets
        total_market_cap = 0
        for symbol, symbol_data in self.symbol_data_by_symbol.items():
            if symbol in symbols_to_buy:
                total_market_cap += symbol_data.security.Fundamentals.MarketCap

        # Create portfolio targets
        portfolio_targets = []
        for symbol, symbol_data in self.symbol_data_by_symbol.items():
            weight = 0
            if symbol in symbols_to_buy:
                weight = symbol_data.security.Fundamentals.MarketCap / total_market_cap
            portfolio_targets.append(PortfolioTarget(symbol, weight))

        # Submit orders
        self.SetHoldings(portfolio_targets)
        
        # Plot data
        self.Plot("Universe", "Total", len(self.symbol_data_by_symbol) - 1)
        self.Plot("Universe", "Selected", len(symbols_to_buy))

    def OnSecuritiesChanged(self, changes: SecurityChanges) -> None:
        self.rebalance = True
        for security in changes.AddedSecurities:
            self.symbol_data_by_symbol[security.Symbol] = SymbolData(self, security, self.cpi_lookback - timedelta(days=30))

        for security in changes.RemovedSecurities:
            symbol_data = self.symbol_data_by_symbol.pop(security.Symbol, None)
            self.Liquidate(security.Symbol)
            if symbol_data:
                symbol_data.dispose()

    def sharpe_ratio(self, returns):
        ann_returns = ((returns.mean() + 1) ** 252) - 1
        ann_std = returns.std() * np.sqrt(252)
        return ann_returns / ann_std

    def OnEndOfAlgorithm(self) -> None:
        holdings_value_by_symbol = {}
        for kvp in self.Portfolio:
            holdings_value_by_symbol[kvp.Key.Value] = kvp.Value.HoldingsValue
        self.ObjectStore.Save(f"{self.ProjectId}/holdings_value_by_symbol", json.dumps(holdings_value_by_symbol))
#region imports
from AlgorithmImports import *
#endregion

  
class SymbolData:
    def __init__(self, algorithm, security, lookback):
        self.algorithm = algorithm
        self.symbol = security.Symbol
        self.security = security

        # Set up consolidators to collect pricing data
        self.consolidator = TradeBarConsolidator(1)
        self.consolidator.DataConsolidated += self.consolidation_handler
        algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator)

        self.history = pd.Series()
        self.lookback = lookback

        # Get historical data
        history = algorithm.History(self.symbol, self.lookback, Resolution.Daily)
        if not history.empty:
            self.history = history.loc[self.symbol].open

    def consolidation_handler(self, sender: object, consolidated_bar: TradeBar) -> None:
        self.history.loc[consolidated_bar.EndTime] = consolidated_bar.Open
        self.history = self.history.loc[self.history.index > consolidated_bar.EndTime - self.lookback]

    def dispose(self):
        self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator)