Overall Statistics
Total Orders
46
Average Win
4.69%
Average Loss
-3.03%
Compounding Annual Return
4.093%
Drawdown
21.700%
Expectancy
0.414
Start Equity
120000
End Equity
146289.88
Net Profit
21.908%
Sharpe Ratio
-0.064
Sortino Ratio
-0.065
Probabilistic Sharpe Ratio
5.184%
Loss Rate
44%
Win Rate
56%
Profit-Loss Ratio
1.54
Alpha
-0.01
Beta
0.062
Annual Standard Deviation
0.079
Annual Variance
0.006
Information Ratio
-0.502
Tracking Error
0.155
Treynor Ratio
-0.082
Total Fees
$221.96
Estimated Strategy Capacity
$1800000.00
Lowest Capacity Asset
IEF SGNKIKYGE9NP
Portfolio Turnover
2.18%
Drawdown Recovery
1041
# region imports
from AlgorithmImports import *
# endregion

class SymbolData():
    def __init__(self, period: int, mom_period: int) -> None:
        self._prices: RollingWindow = RollingWindow[float](mom_period)
        self._cpi_mom: RollingWindow = RollingWindow[float](period)  
        self._cpi_mom_change: RollingWindow = RollingWindow[float](period)
        self._last_inflation_change: int = 0
    
    def price_is_ready(self) -> bool:
        return self._prices.is_ready

    def is_ready(self) -> bool:
        return self._cpi_mom.is_ready and self._cpi_mom_change.is_ready
    
    def update_price(self, price: float) -> None:
        self._prices.add(price)

    def update(self, cpi_value: float) -> None:
        self._cpi_mom.add(cpi_value)
        if self._cpi_mom.is_ready:
            self._cpi_mom_change.add(np.diff(list(self._cpi_mom)[::-1]) * 100)

    def inflation_increase(self) -> bool:
        inflation_change: int = (
            1 
            if all(x > 0 for x in list(self._cpi_mom_change)) 
            else (
                -1 if all(x < 0 for x in list(self._cpi_mom_change)) 
                else self._last_inflation_change
            )
        )
        self._last_inflation_change = inflation_change
        return True if inflation_change == 1 else False

    def positive_momentum(self) -> bool:
        momentum: float = self._prices[0] / self._prices[self._prices.count - 1] - 1
        return True if momentum > 0 else False

class BLS_CPI(PythonData):
    _last_update_date: datetime.date = datetime(1,1,1).date()

    @staticmethod
    def get_last_update_date() -> datetime.date:
       return BLS_CPI._last_update_date

    def GetSource(self, config: SubscriptionDataConfig, date: datetime, isLiveMode: bool) -> SubscriptionDataSource:
        return SubscriptionDataSource(f'https://data.quantpedia.com/backtesting_data/economic/BLS_INFLATION_AS_REPORTED.csv', SubscriptionTransportMedium.REMOTE_FILE, FileFormat.CSV)

    def Reader(self, config: SubscriptionDataConfig, line: str, date: datetime, isLiveMode: bool) -> BaseData:
        data = BLS_CPI()
        data.symbol = config.symbol

        if not line[0].isdigit(): return None

        split = line.split(';')
        
        data_point_date = datetime.strptime(split[1], "%Y-%m-%d")
        data.time = data_point_date
        data.end_time = data_point_date + timedelta(days=1)

        if split[-1] != '':
            data.value = float(split[-1])

        if data.time.date() > BLS_CPI._last_update_date:
            BLS_CPI._last_update_date = data.time.date()
        
        return data
# region imports
from AlgorithmImports import *
import data_tools
from dateutil.relativedelta import relativedelta
import math
# endregion

# Strategy is rebalanced once a month (at the month start). 
# Initial rebalance after data warmup is done at the start of the execution.
# CPI data is updated once a month (in the middle of the month) and comes to the algorithm at the end of the month.
# CPI data source - https://www.bls.gov/news.release/cpi.nr0.htm
#
# Trading rules:
# If inflation has increased and GLD has positive momentum -> trade GLD.
# If inflation has not increased and IEF has positive momentum -> trade IEF.
# Else -> trade SHY.

class InflationforGoldandTreasuries(QCAlgorithm):

    _notional_value: int = 120_000
    _trade_exec_minute_offset: int = 15
    
    _period: int = 2
    _mom_period: int = 12

    def initialize(self) -> None:
        self.set_start_date(2021, 1, 1)
        self.set_cash(self._notional_value)

        leverage: int = 2

        tickers: List[str] = ['GLD', 'IEF', 'SHY']

        self._gld, self._ief, self._shy = [
            self.add_equity(ticker, Resolution.MINUTE, leverage=leverage).symbol for ticker in tickers
        ]
        self._CPI: Symbol = self.add_data(data_tools.BLS_CPI, 'BLSCPI', Resolution.DAILY).symbol

        self._data: Dict[Symbol, data_tools.SymbolData] = {}

        self._price_symbols: List[Symbol] = [self._gld, self._ief]
        for symbol in self._price_symbols:
            self._data[symbol] = data_tools.SymbolData(self._period, self._mom_period)
        self._data[self._CPI] = data_tools.SymbolData(self._period, self._mom_period)

        self.settings.daily_precise_end_time = False
        self.settings.minimum_order_margin_portfolio_percentage = 0.

        market: Symbol = self.add_equity('SPY', Resolution.MINUTE).symbol

        self.set_warm_up((self._mom_period+2)*30, Resolution.DAILY)

        self._rebalance_flag: bool = False
        self.schedule.on(
            self.date_rules.month_start(market),
            self.time_rules.before_market_close(market, self._trade_exec_minute_offset),
            self._rebalance
        )

    def on_warmup_finished(self) -> None:
        self.log(f"Finished warming up {self.time}")

        # NOTE temporary, just to see the execution
        # self._rebalance_flag = True
        
    def on_data(self, slice: Slice) -> None:
        # Update data when new CPI data arrives.
        if slice.contains_key(self._CPI) and slice[self._CPI]:
            self.log(f'new CPI data arrived {slice[self._CPI].end_time} - CPI: {slice[self._CPI].value}')

            # store CPI
            self._data[self._CPI].update(slice[self._CPI].value)

            # store asset prices
            for s in self._price_symbols:
                if self.securities.contains_key(s) and self.securities[s].get_last_data():
                    self.log(f'{s} asset price data updated')
                    self._data[s].update_price(self.securities[s].get_last_data().price)
        
        if self.is_warming_up: return
        
        # Monthly rebalance.
        if not self._rebalance_flag:
            return
        self._rebalance_flag = False

        self.log('new monthly rebalance')

        price_ready: Dict[str, bool] = { s.value : self._data[s].price_is_ready() for s in self._price_symbols }
        if not all(price_ready.values()):
            self.log(f'price data not ready yet - {price_ready}')
            return
        if not self._data[self._CPI].is_ready():
            self.log(f'CPI data not ready yet')
            return

        # Check if data is still coming.
        CPI_last_update_date: datetime.date = data_tools.BLS_CPI.get_last_update_date()
        if self.securities[self._CPI].get_last_data() and self.time.date() > CPI_last_update_date + relativedelta(months=1):
            self.log(f'data for CPI stopped coming in; last update date: {CPI_last_update_date}')
            self.liquidate()
            return

        self.log(f'trade asset decision')

        # Trade asset decision.
        traded_asset: Symbol = (
            self._gld 
            if self._data[self._CPI].inflation_increase() 
            and self._data[self._gld].positive_momentum()
            else (
                self._ief 
                if not self._data[self._CPI].inflation_increase()
                and self._data[self._ief].positive_momentum()
                else self._shy
            )
        )

        # Trade execution.
        if not self.portfolio[traded_asset].invested:
            self.log(f'liquidating existing holdings')
            self.liquidate()
        
        # Calculate additional quantity.
        quantity: int = self._notional_value // self.securities[traded_asset].price
        additional_q: float = quantity - self.portfolio[traded_asset].quantity
        self.log(f'additional quantity for {traded_asset.value}: {additional_q}; existing quantity: {self.portfolio[traded_asset].quantity}')
        if additional_q >= 1:
            self.market_order(traded_asset, additional_q)
        else:
            self.log(f'additional quanitity of {additional_q} shares not executed')

    def _rebalance(self) -> None:
        self._rebalance_flag = True