Overall Statistics
Total Orders
74
Average Win
2.54%
Average Loss
-1.85%
Compounding Annual Return
58.051%
Drawdown
10.600%
Expectancy
0.442
Start Equity
13000
End Equity
16304.15
Net Profit
25.417%
Sharpe Ratio
1.695
Sortino Ratio
1.564
Probabilistic Sharpe Ratio
69.412%
Loss Rate
39%
Win Rate
61%
Profit-Loss Ratio
1.38
Alpha
0
Beta
0
Annual Standard Deviation
0.205
Annual Variance
0.042
Information Ratio
1.964
Tracking Error
0.205
Treynor Ratio
0
Total Fees
$86.50
Estimated Strategy Capacity
$2100000.00
Lowest Capacity Asset
VNQ T2FCD04TATET
Portfolio Turnover
31.28%
Drawdown Recovery
60
from datetime import timedelta, datetime
import math
from AlgorithmImports import *
import numpy as np
from settings import settings

class RSIRebalanceStrategy(QCAlgorithm):

    def Initialize(self):
        self.set_start_date(2025, 6, 1)
        # self.set_end_date(2025, 1, 1)

        # https://www.interactivebrokers.ca/en/accounts/tradingPermissions.php?ib_entity=ca
        self.ibkr_market_order_buffer = 0.02
        self.ibkr_fee_buffer = 25

        self.set_brokerage_model(
            BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.CASH
        )

        self.symbols = [ "QLD", "TQQQ", "SPY", "PSQ", "VIXY",
            "IBB", "QQQE", "VTV", "VOOG", "VOOV", "XLP", "XLY", "LABU",
            "BND", "UTSL", "SH", "SOXX", "SOXL", "VNQ", "UTSL", "XLU"
        ]

        self.equities = {}

        for name in self.symbols:
            self.equities[name] = self.add_equity_symbol(name)

        if not self.live_mode:
            self.set_warmup(timedelta(days=250))
            self.set_cash(settings.start_cash)

            self.schedule.on(
                self.date_rules.month_start(1),
                self.time_rules.after_market_close("SPY", 0),
                self.add_cash,
            )
            # self.schedule.on(
            #     self.date_rules.month_start(15),
            #     self.time_rules.after_market_close("SPY", 0),
            #     self.add_cash,
            # )

        self.schedule.on(
            self.date_rules.every_day(),
            self.time_rules.before_market_close("SPY", 6),
            self.reset_daily_variables,
        )

        step = -0.25 if self.live_mode else -1
        stop = 0 if self.live_mode else 0
        start = 3
        for offset in np.arange(start, stop, step):
            self.schedule.on(
                self.date_rules.every_day(),
                self.time_rules.before_market_close("SPY", offset),
                self.rebalance,
            )

        self.cashInvested = self.portfolio.cash_book["USD"].amount
        self.liquidated = False
        self.additional_cash = 0
        self.prev_tqqq_rsi = 100
        self.prev_tqqq_sma = 0
        self.less_than_40 = 0
        self.indicator_data = {
            "rsi": {},
            "sma": {}
        }
        self.start_time = datetime.now()
        self.metrics_sent = False

    def rebalance(self):
        if self.time < self.start_date:
            return

        if not self.securities["SPY"].has_data:
            return
        
        # reset indicator data
        if self.live_mode:
            self.indicator_data = {
                "rsi": {},
                "sma": {}
            }

        rsi = self.rsi_2
        sma = self.sma_2
        set_holdings = self.set_holdings_2
        equities = self.equities
        price = self.price

        if (rsi("TQQQ", 10) < 80):
            self.portfolio.cash_book["USD"].add_amount(self.additional_cash)
            self.additional_cash = 0

        if (
            rsi("QQQE", 10) > 79
            or rsi("VTV", 10) > 79
            or rsi("VOOG", 10) > 79
            or rsi("VOOV", 10) > 79
            or rsi("XLP", 10) > 75
            or rsi("TQQQ", 10) > 79
            or rsi("XLY", 10) > 80
            or rsi("SPY", 10) > 80
        ):
            set_holdings("VIXY", 1)

        elif price("TQQQ") > sma("TQQQ", 200):
            if price("TQQQ") > sma("TQQQ", 20):
                set_holdings("QLD", 1)
            else:
                set_holdings("VNQ", 1)
        else:
            if price("TQQQ") > sma("TQQQ", 20):
                set_holdings("QLD", 1)
            else:
                set_holdings("PSQ", 1)

        self.prev_tqqq_rsi = rsi("TQQQ", 10)
        self.prev_tqqq_sma = sma("TQQQ", 200)
        self.send_live_metrics()

    def reset_daily_variables(self):
        self.liquidated = False
        self.indicator_data = {
            "rsi": {},
            "sma": {}
        }
        self.metrics_sent = False

    def add_equity_symbol(self, symbol: str) -> Symbol:
        s = self.add_equity(
            symbol, Resolution.MINUTE, data_normalization_mode=DataNormalizationMode.ADJUSTED
        )
        s.set_settlement_model(ImmediateSettlementModel())
        s.set_fill_model(ImmediateFillModel())
        return s.Symbol

    def add_cash(self):
        dcaCash = settings.dca_cash
        self.additional_cash += dcaCash
        # if self.rsi_2("TQQQ", 10) < 45:
        # self.portfolio.cash_book["USD"].add_amount(self.additional_cash)
        # self.additional_cash = 0
        self.cashInvested += dcaCash

    def price(self, symbol: str):
        return self.securities[symbol].price

    # def rsi_2(self, symbol: str, period):
    #     if self.indicator_data["rsi"].get(symbol) is None:
    #         self.indicator_data["rsi"][symbol] = {}
        
    #     if self.indicator_data["rsi"][symbol].get(period) is None or self.indicator_data["rsi"][symbol].get(period) == 0: 
    #         self.indicator_data["rsi"][symbol][period] = 0
    #     else: 
    #         return self.indicator_data["rsi"][symbol][period]

    #     extension = min(round(11 * period**0.5 + 5.5*period), 2000)
    #     r_w = RollingWindow[float](extension) #90
    #     change = RollingWindow[float](extension - 1) #89
    #     history = self.history(symbol, extension, Resolution.DAILY)

    #     for historical_bar in history:
    #         r_w.add(historical_bar.close)

    #     r_w.add(self.securities[symbol].price)
        
    #     for i in range(r_w.count - 2, -1, -1):
    #         change.add(round(r_w[i] - r_w[i+1], 10))

    #     if r_w.is_ready and change.is_ready:
    #         gain = 0
    #         loss = 0
    #         for i in range(change.count - 1, change.count - 1 - period, -1):
    #             gain += max(change[i], 0)
    #             loss += abs(min(change[i], 0))

    #         avgGainStartingPt = round(gain / period, 10)
    #         avgLossStartingPt = round(loss / period, 10)

    #         #Start avgGain and avgLoss
    #         avgGain = round(((period - 1) * avgGainStartingPt + max(change[change.count - 1 - period], 0)) / period, 10)
    #         avgLoss = round(((period - 1) * avgLossStartingPt + abs(min(change[change.count - 1 - period], 0))) / period, 10)
            
    #         for i in range(change.count - 1 - period - 1, -1, -1):
    #             avgGain = round(((period - 1) * avgGain + max(change[i], 0)) / period, 10)
    #             avgLoss = round(((period - 1) * avgLoss + abs(min(change[i], 0))) / period, 10)
            
    #         if avgLoss == 0:
    #             return 100
    #         else:
    #             rs = round(avgGain / avgLoss, 10)
    #             rsi = round(100 - 100/(1+rs), 5)

    #             self.indicator_data["rsi"][symbol][period] = rsi

    #             return rsi
    #     else:
    #         return 50 #None

    def rsi_2(self, symbol: str, period):
        if self.indicator_data["rsi"].get(symbol) is None:
            self.indicator_data["rsi"][symbol] = {}
        
        if self.indicator_data["rsi"][symbol].get(period) is None or self.indicator_data["rsi"][symbol].get(period) == 0: 
            self.indicator_data["rsi"][symbol][period] = 0
        else: 
            return self.indicator_data["rsi"][symbol][period]

        warmup = int(round_up(11 * math.sqrt(period) + 5.5 * period, 0))
        extension = min(warmup, 250)
        r_w = RollingWindow[float](extension)
        history = self.history(symbol, extension - 1, Resolution.DAILY)
        for historical_bar in history:
            r_w.add(historical_bar.close)
        while r_w.count < extension:
            current_price = self.securities[symbol].price
            r_w.add(current_price)
        if r_w.is_ready:
            average_gain = 0
            average_loss = 0
            gain = 0
            loss = 0
            for i in range(extension - 1, extension - period - 1, -1):
                gain += max(r_w[i - 1] - r_w[i], 0)
                loss += abs(min(r_w[i - 1] - r_w[i], 0))
            average_gain = gain / period
            average_loss = loss / period
            for i in range(extension - period - 1, 0, -1):
                average_gain = (
                    average_gain * (period - 1) + max(r_w[i - 1] - r_w[i], 0)
                ) / period
                average_loss = (
                    average_loss * (period - 1) + abs(min(r_w[i - 1] - r_w[i], 0))
                ) / period
            if average_loss == 0:
                return 100
            else:
                rsi = round(100 - (100 / (1 + average_gain / average_loss)), 2)

                self.indicator_data["rsi"][symbol][period] = rsi

                return rsi
        else:
            return None

    def sma_2(self, symbol: str, period):
        if self.indicator_data["sma"].get(symbol) is None:
            self.indicator_data["sma"][symbol] = {}
        
        if self.indicator_data["sma"][symbol].get(period) is None or self.indicator_data["sma"][symbol].get(period) == 0: 
            self.indicator_data["sma"][symbol][period] = 0
        else: 
            return self.indicator_data["sma"][symbol][period]

        r_w = RollingWindow[float](period)
        history = self.history(symbol, period - 1, Resolution.DAILY)
        total = sum(bar.close for bar in history) + self.securities[symbol].price

        sma = round(total / period, 3)
        self.indicator_data["sma"][symbol][period] = sma

        return sma

    def set_holdings_2(self, symbol: str, portion=1):
        """IBKR Brokage Model somehow doesn't wait till liquidation finishes in set_holdings(symbol, 1, True)
        So we liquidate explicitly first and set_holdings after
        """
        # liquidate any other symbols when switching
        if (
            not self.liquidated
            and not self.portfolio[symbol].invested
            and self.portfolio.invested
        ):
            # for curr_pos in current_positions:
            #     self.liquidate(curr_pos, tag="Liquidated")
            self.liquidate()
            self.liquidated = True
            return

        if (
            len(self.transactions.get_open_orders()) > 0
            or self.portfolio.unsettled_cash > 0
            or (
                self.liquidated
                and self.portfolio.total_portfolio_value != self.portfolio.cash
            )
            or not self.securities[symbol].has_data
        ):
            return

        # Usually portion < 1 is for risky asset like VIXY, no need to keep it's portion
        # as we don't hold it for long
        if portion < 1:
            if self.liquidated and not self.portfolio.invested:
                self.set_holdings(symbol, portion)
                return
            elif self.portfolio[symbol].invested:
                self.set_holdings(symbol, portion)
                return

        # Calculate for limit order
        # using 99% of buying power to avoid "Insufficient buying power to complete orders" error
        # no idea why QC's engine has weird initial margin stuff
        # TFSA doesn't have any initial margin requirements, so we can use 100% - 10 dollars
        buying_power = (
            self.portfolio.margin_remaining - 10
            if self.live_mode
            else self.portfolio.margin_remaining * 0.988
        )
        symbol_price = self.securities[symbol].ask_price

        # 5 cents buffer for limit order, IBKR will find lowest ask
        limit_price = round_up(symbol_price + 0.03)

        shares_num: int = math.floor(buying_power / limit_price)

        security = self.securities[symbol]
        initial_margin_params = InitialMarginParameters(security, shares_num - 1)
        initial_margin_required = (
            security.buying_power_model.get_initial_margin_requirement(
                initial_margin_params
            )
        )
        # Extract the numeric value from the InitialMargin object
        required_margin_value = initial_margin_required.value

        while (
            shares_num >= 2
            and required_margin_value > buying_power - self.ibkr_fee_buffer
        ):
            shares_num = shares_num - 1
            initial_margin_params = InitialMarginParameters(security, shares_num)
            initial_margin_required = (
                security.buying_power_model.get_initial_margin_requirement(
                    initial_margin_params
                )
            )
            required_margin_value = initial_margin_required.value

        # order_amount = shares_num * limit_price
        # satisfy_ibkr_market_order = (1 - (order_amount / cash)) > self.ibkr_market_order_buffer

        if (
            shares_num >= 2
            and required_margin_value < buying_power - self.ibkr_fee_buffer
        ):
            self.limit_order(symbol, shares_num, limit_price)
            return
    
    def send_live_metrics(self):
        if not self.live_mode or self.metrics_sent:
            return
        
        formatted_data = format_indicator_dict(self.indicator_data)

        self.log_indicator_dict(self.indicator_data)
        self.notify.email(settings.notify_email_address, 
            "Live data - Holy grail smart qqq",
            formatted_data)
        
        self.metrics_sent = True

    def log_indicator_dict(self, data: dict):
        log = self.log
        log(f"{self.time} ----------------\n")
        for indicator, symbols in data.items():
            log(indicator.upper())
            for symbol, periods in symbols.items():
                for period, value in periods.items():
                    log(f"{symbol} {period}-period: {value}")
            log("")  # blank line between indicator groups

    def on_end_of_algorithm(self) -> None:
        self.debug(f"Cash invested: {self.cashInvested}")
        time_elapsed =  datetime.now() - self.start_time
        self.debug(f"Algo took: {time_elapsed.total_seconds()} seconds")

def round_up(n, decimals=2):
    multiplier = 10**decimals
    return math.ceil(n * multiplier) / multiplier

def format_indicator_dict(data: dict) -> str:
    lines = []
    for indicator, symbols in data.items():
        lines.append(indicator.upper())
        for symbol, periods in symbols.items():
            parts = []
            for period, value in periods.items():
                parts.append(f"{symbol} {period}-period: {value}")
            lines.append(" | ".join(parts))
        lines.append("")  # blank line between indicators
    return "\n".join(lines).strip()
# region imports
from AlgorithmImports import *
# endregion

class Settings():
    def __init__(self):
        self.start_cash = 13000
        self.dca_cash = 0
        self.notify_email_address = ""

settings = Settings()