Overall Statistics
Total Orders
1172
Average Win
3.53%
Average Loss
-1.98%
Compounding Annual Return
263.776%
Drawdown
29.800%
Expectancy
0.809
Start Equity
10000
End Equity
6012774.91
Net Profit
60027.749%
Sharpe Ratio
4.119
Sortino Ratio
5.567
Probabilistic Sharpe Ratio
100.000%
Loss Rate
35%
Win Rate
65%
Profit-Loss Ratio
1.78
Alpha
0
Beta
0
Annual Standard Deviation
0.383
Annual Variance
0.147
Information Ratio
4.226
Tracking Error
0.383
Treynor Ratio
0
Total Fees
$79089.28
Estimated Strategy Capacity
$520000.00
Lowest Capacity Asset
VIXY UT076X30D0MD
Portfolio Turnover
46.86%
Drawdown Recovery
90
from datetime import timedelta, datetime
import math
from AlgorithmImports import *
import numpy as np
from settings import settings


class RSIRebalanceStrategy(QCAlgorithm):

    def Initialize(self):
        self.name = "Smart QQQ - BND vs QQQ"
        self.set_start_date(2021, 6, 1)
        # self.set_end_date(2025, 6, 1)
        self.should_trade = True
        # https://www.interactivebrokers.ca/en/accounts/tradingPermissions.php?ib_entity=ca
        self.ibkr_market_order_buffer = 0.02
        self.ibkr_fee_buffer = 25

        self.set_brokerage_model(
            BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.CASH
        )

        self.symbols = [
            "SPLV", "TQQQ", "SPY", "QQQE", "VOOG", "VCR", "VTV",
            "VOOV", "VIXY", "BND", "QQQ", "QLD", "XLU", "PSQ", "TLT", "IYT", "QQQM",
            "KMLM", "IBB", "SOXX", "UVXY", "XLP", "LABU", "SOXL", "XME", "QID", "UVXY", "VIG", "SCHD",
            "IEF", "FDN", "SH", "GLD", "SQQQ", "XLK"
        ]

        self.live_symbols = {
            "QQQ": "QQQM"
        }

        self.equities = {}

        for name in self.symbols:
            self.equities[name] = self.add_equity_symbol(name)

        if not self.live_mode:
            self.set_warmup(timedelta(days=300))
            self.set_cash(settings.start_cash)

            self.schedule.on(
                self.date_rules.month_start(1),
                self.time_rules.after_market_close("SPY", 0),
                self.add_cash,
            )

        self.schedule.on(
            self.date_rules.every_day(),
            self.time_rules.before_market_close("SPY", 6),
            self.reset_daily_variables,
        )

        step = -0.25 if self.live_mode else -1
        stop = 0
        start = 2
        for offset in np.arange(start, stop, step):
            self.schedule.on(
                self.date_rules.every_day(),
                self.time_rules.before_market_close("SPY", offset),
                self.rebalance,
            )

        self.cashInvested = self.portfolio.cash_book["USD"].amount
        self.liquidated = False
        self.indicator_data = {
            "rsi": {},
            "sma": {},
            "price": {},
            "cum_ret": {}
        }
        self.condition_triggered = ""
        self.start_time = datetime.now()
        self.metrics_sent = False

    def rebalance(self):
        if self.time < self.start_date:
            return

        if not self.securities["SPY"].has_data:
            return

        set_holdings = self.set_holdings_2

        if (self.live_mode):
            self.get_target_allocation(False)

        self.target_allocation = self.get_target_allocation()

        # Execute target allocation
        if self.target_allocation:
            for symbol, portion in self.target_allocation.items():
                live_symbol = self.live_symbols.get(symbol, None)
                # Switch to live mode symbol or if live symbol has data
                if (live_symbol and self.securities[live_symbol].has_data) or (self.live_mode and live_symbol):
                    set_holdings(live_symbol, portion)
                else:
                    set_holdings(symbol, portion)
        try:
            self.send_live_metrics()
        except Exception as e:
            self.log(f"Error sending live metrics: {e}")

    def get_target_allocation(self, dni=True) -> dict:
        """
        Args: 
            dni (bool): dont_need_indicators. Don't need indicators, return
        """

        rsi = self.rsi_2
        sma = self.sma_2
        price = self.price
        result = None
        buffer = 0.07
        self.condition_triggered = ""
        cum_ret = self.cumulative_return

        sidewayMarket = cum_ret("QQQ", 60) <= -9
        
        if rsi("TQQQ", 10) > 82 - buffer:
            result = {"UVXY": 1}
            self.condition_triggered = f"TQQQ RSI(10)={rsi('TQQQ',10):.1f} >82"

        # ── overbought protection ─────────────────────────────────────────
        if (
            rsi("SPLV", 10) > 79 - buffer
            or rsi("QQQE", 10) > 79 - buffer
            or rsi("VTV",  10) > 79 - buffer
            or rsi("VOOG", 10) > 79 - buffer
            or rsi("VOOV", 10) > 79 - buffer
            or rsi("TQQQ", 10) > 80 - buffer
            or rsi("SPY",  10) > 79 - buffer
            or rsi("VCR",  10) > 80 - buffer
        ):
            result = {"VIXY": 1}
            self.condition_triggered = "overbought"

        if result is None:
            if rsi("XLP", 10) > 77:
                if rsi("SPY", 10) > 55:
                    result = {"VIXY": 1}
                    self.condition_triggered = f"XLP={rsi('XLP',10):.1f}>77 SPY={rsi('SPY',10):.1f}>55"
                else:
                    result = {"BND": 1}
                    self.condition_triggered = f"XLP={rsi('XLP',10):.1f}>77 SPY={rsi('SPY',10):.1f}≤55"

        if result is None:
            # ── oversold bounces ──────────────────────────────────────────────
            if rsi("TQQQ", 10) < 31:
                result = {"TQQQ": 1}
                self.condition_triggered = f"TQQQ RSI(10)={rsi('TQQQ',10):.1f} <31"
            elif rsi("LABU", 10) < 25:
                result = {"IBB": 1}
                self.condition_triggered = f"LABU RSI(10)={rsi('LABU',10):.1f} <25"
            elif rsi("SOXL", 10) < 31:
                result = {"SOXX": 1}
                self.condition_triggered = f"SOXL RSI(10)={rsi('SOXL',10):.1f} <31"

        if result is None:
            # ── bull/bear signal count ────────────────────────────────────────
            s1 = 1 if rsi("BND", 15) > rsi("QQQ", 15) else 0
            s2 = 1 if rsi("TLT", 10) > 50 else 0
            s3 = 1 if rsi("BND", 20) > rsi("SH",  60) else 0
            s4 = 1 if rsi("IEF", 10) > rsi("PSQ", 20) else 0
            s5 = 1 if rsi("XLK", 10) > rsi("KMLM", 10) else 0
            inner_bull_signals = s1 + s2 + s3 + s4 + s5
            sig = f"{inner_bull_signals}/5 [{s1}{s2}{s3}{s4}{s5}] [BND15>QQQ15 TLT>50 BND20>SH60 IEF>PSQ20 XLK>KMLM]"

            # ── trend-following ───────────────────────────────────────────────
            if price("TQQQ") > sma("TQQQ", 200):
                # Bull mode (above 200-day SMA)
                if rsi("TQQQ", 2) < 16:
                    result = {"QLD": 1}
                    self.condition_triggered = f"bull>200 TQQQ RSI(2)={rsi('TQQQ',2):.1f} <16"
                elif price("TQQQ") > sma("TQQQ", 20):
                    if inner_bull_signals == 5:
                        result = {"TQQQ": 1} if not sidewayMarket else {"QLD": 1}
                        self.condition_triggered = f"bull>200+20 sig={sig} == 5 | sideway={sidewayMarket}"
                    elif inner_bull_signals >= 2:
                        result = {"QLD": 1} if not sidewayMarket else {"QQQ": 1}    
                        self.condition_triggered = f"bull>200+20 sig={sig} >=2 | sideway={sidewayMarket}"
                    else:
                        if rsi("PSQ", 10) > rsi("IEF", 10):
                            if inner_bull_signals == 1: 
                                result = {"PSQ": 1}
                            else:
                                result = {"QID": 1}
                        else:
                            result = {"IEF": 1}
                        self.condition_triggered = f"bull>200+20 sig={sig} <2 PSQ10={rsi('PSQ',10):.1f} vs IEF10={rsi('IEF',10):.1f}"
                else:
                    result = {"GLD": 1}
                    self.condition_triggered = f"bull>200 <20SMA"

            elif price("TQQQ") > sma("TQQQ", 20):
                # Mild bear mode (below 200 but above 20-day SMA)
                if rsi("TQQQ", 2) < 16:
                    result = {"QLD": 1}
                    self.condition_triggered = f"bear<200 >20 QLD RSI(2)={rsi('TQQQ',2):.1f} <16"
                elif inner_bull_signals == 5:
                    result = {"TQQQ": 1} if not sidewayMarket else {"QLD": 1}
                    self.condition_triggered = f"bear<200 >20 sig={sig} == 5"
                elif inner_bull_signals >= 2:
                    result = {"QLD": 1} if not sidewayMarket else {"QQQ": 1}    
                    self.condition_triggered = f"bear<200 >20 sig={sig} >=2"
                else:
                    result = {"XLU": 1}
                    self.condition_triggered = f"bear<200 >20 sig={sig} <2"

            else:
                # Bear mode (below both SMAs)         
                if price("KMLM") > sma("KMLM", 20) or rsi("SQQQ", 10) < 32 or inner_bull_signals <= 2:
                    result = {"QID": 1}
                    self.condition_triggered = "very bear short"
                else:
                    result = {"XLU": 1}
                    self.condition_triggered = f"bear<200+20 sig={sig} >=2"

        return result if dni else None

    def reset_daily_variables(self):
        self.liquidated = False
        self.indicator_data = {
            "rsi": {},
            "sma": {},
            "price": {},
            "cum_ret": {}
        }
        self.metrics_sent = False

    def add_equity_symbol(self, symbol: str) -> Symbol:
        s = self.add_equity(
            symbol, Resolution.MINUTE, data_normalization_mode=DataNormalizationMode.ADJUSTED
        )
        s.set_settlement_model(ImmediateSettlementModel())
        s.set_fill_model(ImmediateFillModel())
        return s.Symbol

    def add_cash(self):
        dcaCash = settings.dca_cash
        # self.additional_cash += dcaCash
        # if self.rsi_2("TQQQ", 10) < 45:
        # self.portfolio.cash_book["USD"].add_amount(self.additional_cash)
        # self.additional_cash = 0
        self.cashInvested += dcaCash

    def price(self, symbol: str):
        if self.indicator_data["price"].get(symbol) is not None:
            return self.indicator_data["price"][symbol]

        p = self.securities[symbol].price
        self.indicator_data["price"][symbol] = p
        return p

    def rsi_2(self, symbol: str, period):
        if self.indicator_data["rsi"].get(symbol) is None:
            self.indicator_data["rsi"][symbol] = {}

        if self.indicator_data["rsi"][symbol].get(period) is None or self.indicator_data["rsi"][symbol].get(period) == 0:
            self.indicator_data["rsi"][symbol][period] = 0
        else:
            return self.indicator_data["rsi"][symbol][period]

        warmup = int(round_up(11 * math.sqrt(period) + 5.5 * period, 0))
        extension = min(warmup, 250)
        r_w = RollingWindow[float](extension)
        history = self.history(symbol, extension - 1, Resolution.DAILY)
        for historical_bar in history:
            r_w.add(historical_bar.close)
        while r_w.count < extension:
            current_price = self.securities[symbol].price
            if self.live_mode:
                self.log(f"{symbol}: {current_price}")
            r_w.add(current_price)
        if r_w.is_ready:
            average_gain = 0
            average_loss = 0
            gain = 0
            loss = 0
            for i in range(extension - 1, extension - period - 1, -1):
                gain += max(r_w[i - 1] - r_w[i], 0)
                loss += abs(min(r_w[i - 1] - r_w[i], 0))
            average_gain = gain / period
            average_loss = loss / period
            for i in range(extension - period - 1, 0, -1):
                average_gain = (
                    average_gain * (period - 1) + max(r_w[i - 1] - r_w[i], 0)
                ) / period
                average_loss = (
                    average_loss * (period - 1) +
                    abs(min(r_w[i - 1] - r_w[i], 0))
                ) / period
            if average_loss == 0:
                return 100
            else:
                rsi = round(100 - (100 / (1 + average_gain / average_loss)), 2)

                self.indicator_data["rsi"][symbol][period] = rsi

                return rsi
        else:
            return None

    def sma_2(self, symbol: str, period):
        if self.indicator_data["sma"].get(symbol) is None:
            self.indicator_data["sma"][symbol] = {}

        if self.indicator_data["sma"][symbol].get(period) is None or self.indicator_data["sma"][symbol].get(period) == 0:
            self.indicator_data["sma"][symbol][period] = 0
        else:
            return self.indicator_data["sma"][symbol][period]

        r_w = RollingWindow[float](period)
        history = self.history(symbol, period - 1, Resolution.DAILY)
        total = sum(bar.close for bar in history) + \
            self.securities[symbol].price

        sma = round(total / period, 3)
        self.indicator_data["sma"][symbol][period] = sma

        return sma

    def cumulative_return(self, symbol: str, period: int) -> float:
        """Cumulative Return over `period` daily bars.

        Returns (close_today / close_N_days_ago - 1) * 100
        """
        cache = self.indicator_data["cum_ret"]
        key = (symbol, period)
        if cache.get(key) is not None:
            return cache[key]

        history = self.history(symbol, period, Resolution.DAILY)
        bars = [bar.close for bar in history]
        if not bars:
            return None

        first_close = bars[0]
        current_price = self.securities[symbol].price
        if first_close == 0:
            return None

        cr = round((current_price / first_close - 1) * 100, 2)
        cache[key] = cr
        return cr

    def set_holdings_2(self, symbol: str, portion=1):
        """IBKR Brokage Model somehow doesn't wait till liquidation finishes in set_holdings(symbol, 1, True)
        So we liquidate explicitly first and set_holdings after
        """

        if (self.should_trade == False):
            self.notify.email(settings.notify_email_address,
                              f"Live data | {self.name}",
                              f"Boolean self.should_trade is {self.should_trade} so we don't trade today")
            return

        # liquidate any other symbols when switching
        if (
            not self.liquidated
            and not self.portfolio[symbol].invested
            and self.portfolio.invested
        ):
            # for curr_pos in current_positions:
            #     self.liquidate(curr_pos, tag="Liquidated")
            self.liquidate()
            self.liquidated = True
            return

        if (
            len(self.transactions.get_open_orders()) > 0
            or self.portfolio.unsettled_cash > 0
            or (
                self.liquidated
                and self.portfolio.total_portfolio_value != self.portfolio.cash
            )
            or not self.securities[symbol].has_data
        ):
            return

        # Usually portion < 1 is for risky asset like VIXY, no need to keep it's portion
        # as we don't hold it for long
        if portion < 1:
            if self.liquidated and not self.portfolio.invested:
                self.set_holdings(symbol, portion)
                return
            elif self.portfolio[symbol].invested:
                self.set_holdings(symbol, portion)
                return

        # Calculate for limit order
        # using 99% of buying power to avoid "Insufficient buying power to complete orders" error
        # no idea why QC's engine has weird initial margin stuff
        # TFSA doesn't have any initial margin requirements, so we can use 100% - 10 dollars
        buying_power = (
            self.portfolio.margin_remaining - 10 - self.ibkr_fee_buffer
            if self.live_mode
            else self.portfolio.margin_remaining * 0.988 - self.ibkr_fee_buffer
        )
        symbol_price = self.securities[symbol].ask_price

        # 3 cents buffer for limit order, IBKR will find lowest ask
        limit_price = round_up(symbol_price + 0.03)

        shares_num: int = math.floor(buying_power / limit_price)

        security = self.securities[symbol]
        initial_margin_params = InitialMarginParameters(
            security, shares_num - 1)
        initial_margin_required = (
            security.buying_power_model.get_initial_margin_requirement(
                initial_margin_params
            )
        )
        # Extract the numeric value from the InitialMargin object
        required_margin_value = initial_margin_required.value

        while (
            shares_num >= 2
            and required_margin_value > buying_power
        ):
            shares_num = shares_num - 1
            initial_margin_params = InitialMarginParameters(
                security, shares_num)
            initial_margin_required = (
                security.buying_power_model.get_initial_margin_requirement(
                    initial_margin_params
                )
            )
            required_margin_value = initial_margin_required.value

        # order_amount = shares_num * limit_price
        # satisfy_ibkr_market_order = (1 - (order_amount / cash)) > self.ibkr_market_order_buffer

        if (
            shares_num >= 2
            and required_margin_value < buying_power
        ):
            self.limit_order(symbol, shares_num, limit_price)
            return

    def send_live_metrics(self):
        if not self.live_mode or self.metrics_sent:
            return

        formatted_data = format_indicator_dict(
            self.indicator_data, self.target_allocation, self.condition_triggered)

        date_str = self.time.strftime("%Y-%m-%d %H:%M")
        self.notify.email(
            settings.notify_email_address,
            f"{self.name} | {date_str}",
            formatted_data,
        )

        self.metrics_sent = True

    def log_indicator_dict(self, data: dict):
        log = self.log
        log(f"{self.time} ----------------\n")
        for indicator, symbols in data.items():
            log(indicator.upper())
            if indicator == "price":
                for symbol, value in symbols.items():
                    log(f"{symbol}: {value}")
            else:
                for symbol, periods in symbols.items():
                    for period, value in periods.items():
                        log(f"{symbol} {period}-period: {value}")
            log("")  # blank line between indicator groups

    def on_end_of_algorithm(self) -> None:
        self.debug(f"Cash invested: {self.cashInvested}")
        time_elapsed = datetime.now() - self.start_time
        self.debug(f"Algo took: {time_elapsed.total_seconds()} seconds")

    def sort_tickers_by(self, tickers, eval_function, direction='ascending', period=10):
        """
        Sort tickers by a given evaluation function and return the top or bottom ticker.

        Args:
            tickers (list): List of ticker symbols (strings)
            eval_function (callable): Function that takes a ticker and returns a numeric value.
                                    Example: lambda ticker: get_rsi(ticker)
            direction (str): 'ascending' to get the ticker with the lowest value (bottom 1),
                            'descending' to get the ticker with the highest value (top 1)

        Returns:
            str: The ticker symbol that ranks either top 1 or bottom 1 based on direction

        Example:
            # Get ticker with highest RSI
            top_ticker = sort_tickers(['AAPL', 'MSFT', 'GOOGL'], lambda t: get_rsi(t), 'descending')

            # Get ticker with lowest price
            bottom_ticker = sort_tickers(['AAPL', 'MSFT', 'GOOGL'], lambda t: get_price(t), 'ascending')
        """
        if not tickers:
            return None

        # Evaluate all tickers with the provided function
        ticker_values = []
        for ticker in tickers:
            try:
                value = eval_function(ticker, period)
                if value is not None:  # Skip tickers where function returns None
                    ticker_values.append((ticker, value))
            except Exception as e:
                # Skip tickers that cause errors in evaluation
                continue

        if not ticker_values:
            return None

        # Sort based on direction
        if direction.lower() == 'ascending':
            # ascending: lowest value first (bottom 1)
            ticker_values.sort(key=lambda x: x[1])
            return ticker_values[0][0]
        elif direction.lower() == 'descending':
            # descending: highest value first (top 1)
            ticker_values.sort(key=lambda x: x[1], reverse=True)
            return ticker_values[0][0]
        else:
            raise ValueError(
                f"Invalid direction: {direction}. Use 'ascending' or 'descending'.")


def round_up(n, decimals=2):
    multiplier = 10**decimals
    return math.ceil(n * multiplier) / multiplier


def format_indicator_dict(data: dict, target_allocation: dict = None, condition: str = "") -> str:
    SEP = "-" * 50
    lines = []

    # ── Decision ─────────────────────────────────────────────────────────
    lines.append("DECISION")
    lines.append(SEP)
    if target_allocation:
        for symbol, portion in target_allocation.items():
            lines.append(f"  → {symbol}  ({int(portion * 100)}%)")
    else:
        lines.append("  (none)")
    if condition:
        lines.append(f"  Reason: {condition}")
    lines.append("")

    # ── Pivot: symbol → {price, rsi: {period: val}, sma: {period: val}} ─
    sym_data: dict = {}
    for symbol, value in data.get("price", {}).items():
        sym_data.setdefault(symbol, {})["price"] = value
    for symbol, periods in data.get("rsi", {}).items():
        sym_data.setdefault(symbol, {})["rsi"] = dict(sorted(periods.items()))
    for symbol, periods in data.get("sma", {}).items():
        sym_data.setdefault(symbol, {})["sma"] = dict(sorted(periods.items()))

    # ── Per-symbol rows ───────────────────────────────────────────────────
    lines.append("INDICATORS")
    lines.append(SEP)
    for sym in sorted(sym_data.keys()):
        ind = sym_data[sym]
        parts = []
        if "price" in ind:
            parts.append(f"Price={ind['price']:.2f}")
        for period, val in ind.get("rsi", {}).items():
            parts.append(f"RSI({period})={val:.1f}")
        for period, val in ind.get("sma", {}).items():
            parts.append(f"SMA({period})={val:.2f}")
        lines.append(f"  {sym:<6}  " + "   ".join(parts))

    return "\n".join(lines).strip()
# region imports
from AlgorithmImports import *
# endregion

class Settings():
    def __init__(self):
        self.start_cash = 10000
        self.dca_cash = 0
        self.notify_email_address = "tuenguyen12329@gmail.com"

settings = Settings()