Overall Statistics
Total Orders
2838
Average Win
2.86%
Average Loss
-2.09%
Compounding Annual Return
89.676%
Drawdown
41.800%
Expectancy
0.475
Start Equity
10000
End Equity
102135676.15
Net Profit
1021256.761%
Sharpe Ratio
1.967
Sortino Ratio
2.268
Probabilistic Sharpe Ratio
99.792%
Loss Rate
38%
Win Rate
62%
Profit-Loss Ratio
1.36
Alpha
0
Beta
0
Annual Standard Deviation
0.311
Annual Variance
0.097
Information Ratio
2.028
Tracking Error
0.311
Treynor Ratio
0
Total Fees
$1495530.72
Estimated Strategy Capacity
$1100000.00
Lowest Capacity Asset
VIXY UT076X30D0MD
Portfolio Turnover
38.21%
Drawdown Recovery
378
from datetime import timedelta, datetime
import math
from AlgorithmImports import *
import numpy as np
from settings import settings

class RSIRebalanceStrategy(QCAlgorithm):

    def Initialize(self):
        self.name = "Smart QQQ - BND vs QQQ"
        self.set_start_date(2011, 1, 1)
        self.set_end_date(2025, 6, 1)
        self.should_trade = True
        # https://www.interactivebrokers.ca/en/accounts/tradingPermissions.php?ib_entity=ca
        self.ibkr_market_order_buffer = 0.02
        self.ibkr_fee_buffer = 25

        self.set_brokerage_model(
            BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.CASH
        )

        self.symbols = [
            "SPLV", "TQQQ", "SPY", "QQQE", "VOOG", "VCR", "VTV",
            "VOOV", "VIXY", "BND", "QQQ", "QLD", "XLU", "PSQ", 
            "KMLM", "IBB", "SOXX", "UVXY", "XLP", "LABU", "SOXL", "XME"
        ]

        self.live_symbols = {
            "QQQ": "QQQM"
        }

        self.equities = {}

        for name in self.symbols:
            self.equities[name] = self.add_equity_symbol(name)

        if not self.live_mode:
            self.set_warmup(timedelta(days=300))
            self.set_cash(settings.start_cash)

            self.schedule.on(
                self.date_rules.month_start(1),
                self.time_rules.after_market_close("SPY", 0),
                self.add_cash,
            )

        self.schedule.on(
            self.date_rules.every_day(),
            self.time_rules.before_market_close("SPY", 6),
            self.reset_daily_variables,
        )

        step = -0.25 if self.live_mode else -1
        stop = 0 if self.live_mode else 0
        start = 3
        for offset in np.arange(start, stop, step):
            self.schedule.on(
                self.date_rules.every_day(),
                self.time_rules.before_market_close("SPY", offset),
                self.rebalance,
            )

        self.cashInvested = self.portfolio.cash_book["USD"].amount
        self.liquidated = False
        self.indicator_data = {
            "rsi": {},
            "sma": {},
            "price": {}
        }
        self.start_time = datetime.now()
        self.metrics_sent = False

    def rebalance(self):
        if self.time < self.start_date:
            return

        if not self.securities["SPY"].has_data:
            return

        set_holdings = self.set_holdings_2

        self.target_allocation = self.get_target_allocation()

        # Execute target allocation
        if self.target_allocation:
            for symbol, portion in self.target_allocation.items():
                live_symbol = self.live_symbols.get(symbol, None)
                # Switch to live mode symbol
                if self.live_mode and live_symbol:
                    set_holdings(live_symbol, portion)
                else:                    
                    set_holdings(symbol, portion)

        self.send_live_metrics()

    def get_target_allocation(self) -> dict:
        rsi = self.rsi_2
        sma = self.sma_2
        equities = self.equities
        price = self.price

        if (
            rsi("SPLV", 10) > 79 
            or rsi("QQQE", 10) > 79
            or rsi("VTV", 10) > 79
            or rsi("VOOG", 10) > 79
            or rsi("VOOV", 10) > 79
            or rsi("TQQQ", 10) > 79
            or rsi("SPY", 10) > 78
            or rsi("VCR", 10) > 80
        ):
           return {"VIXY": 1}

        elif rsi("XLP", 10) > 77:
            if rsi("SPY", 10) > 55: 
                return {"VIXY": 1}
            else: 
                return {"BND": 1}

        elif rsi("TQQQ", 10) < 31:
            return {"TQQQ": 1}
        elif rsi("LABU", 10) < 25:
            return {"IBB": 1}
        elif rsi("SOXL", 10) < 31:
            return {"SOXX": 1}

        elif rsi("UVXY", 10) > 79:
            return {"QLD": 1}

        elif price("TQQQ") > sma("TQQQ", 200):

            if rsi("TQQQ", 2) < 20: 
                return {"QLD": 1}

            elif price("TQQQ") > sma("TQQQ", 20):
                if rsi("BND", 15) > rsi("QQQ", 15): 
                    return {"QLD": 1}
                else: 
                    return {"QQQ": 1}
            else:
                if rsi("BND", 15) > rsi("QQQ", 15): 
                    return {"QLD": 1}
                else: 
                    return {"XME": 1}
        # Bear mode            
        elif price("TQQQ") > sma("TQQQ", 20):

            if rsi("TQQQ", 2) < 21: 
                return {"QLD": 1}
            elif rsi("BND", 15) > rsi("QQQ", 15): 
                return {"QLD": 1}
            else: 
                return {"XLU": 1}

        elif price("KMLM") > sma("KMLM", 20):
            return {"PSQ": 1}
        else:
            return {"XLU": 1}

    def allocate(self, allocation: dict):
        if len(self.target_allocation) == 0:
            self.target_allocation = allocation

    def reset_daily_variables(self):
        self.liquidated = False
        self.indicator_data = {
            "rsi": {},
            "sma": {},
            "price": {}
        }
        self.metrics_sent = False

    def add_equity_symbol(self, symbol: str) -> Symbol:
        s = self.add_equity(
            symbol, Resolution.MINUTE, data_normalization_mode=DataNormalizationMode.ADJUSTED
        )
        s.set_settlement_model(ImmediateSettlementModel())
        s.set_fill_model(ImmediateFillModel())
        return s.Symbol

    def add_cash(self):
        dcaCash = settings.dca_cash
        # self.additional_cash += dcaCash
        # if self.rsi_2("TQQQ", 10) < 45:
        # self.portfolio.cash_book["USD"].add_amount(self.additional_cash)
        # self.additional_cash = 0
        self.cashInvested += dcaCash

    def price(self, symbol: str):
        if self.indicator_data["price"].get(symbol) is not None:
            return self.indicator_data["price"][symbol]
        
        p = self.securities[symbol].price
        self.indicator_data["price"][symbol] = p
        return p

    def rsi_2(self, symbol: str, period):
        if self.indicator_data["rsi"].get(symbol) is None:
            self.indicator_data["rsi"][symbol] = {}
        
        if self.indicator_data["rsi"][symbol].get(period) is None or self.indicator_data["rsi"][symbol].get(period) == 0: 
            self.indicator_data["rsi"][symbol][period] = 0
        else: 
            return self.indicator_data["rsi"][symbol][period]

        warmup = int(round_up(11 * math.sqrt(period) + 5.5 * period, 0))
        extension = min(warmup, 250)
        r_w = RollingWindow[float](extension)
        history = self.history(symbol, extension - 1, Resolution.DAILY)
        for historical_bar in history:
            r_w.add(historical_bar.close)
        while r_w.count < extension:
            current_price = self.securities[symbol].price
            if self.live_mode:
                self.log(f"{symbol}: {current_price}")
            r_w.add(current_price)
        if r_w.is_ready:
            average_gain = 0
            average_loss = 0
            gain = 0
            loss = 0
            for i in range(extension - 1, extension - period - 1, -1):
                gain += max(r_w[i - 1] - r_w[i], 0)
                loss += abs(min(r_w[i - 1] - r_w[i], 0))
            average_gain = gain / period
            average_loss = loss / period
            for i in range(extension - period - 1, 0, -1):
                average_gain = (
                    average_gain * (period - 1) + max(r_w[i - 1] - r_w[i], 0)
                ) / period
                average_loss = (
                    average_loss * (period - 1) + abs(min(r_w[i - 1] - r_w[i], 0))
                ) / period
            if average_loss == 0:
                return 100
            else:
                rsi = round(100 - (100 / (1 + average_gain / average_loss)), 2)

                self.indicator_data["rsi"][symbol][period] = rsi

                return rsi
        else:
            return None

    def sma_2(self, symbol: str, period):
        if self.indicator_data["sma"].get(symbol) is None:
            self.indicator_data["sma"][symbol] = {}
        
        if self.indicator_data["sma"][symbol].get(period) is None or self.indicator_data["sma"][symbol].get(period) == 0: 
            self.indicator_data["sma"][symbol][period] = 0
        else: 
            return self.indicator_data["sma"][symbol][period]

        r_w = RollingWindow[float](period)
        history = self.history(symbol, period - 1, Resolution.DAILY)
        total = sum(bar.close for bar in history) + self.securities[symbol].price

        sma = round(total / period, 3)
        self.indicator_data["sma"][symbol][period] = sma

        return sma

    def set_holdings_2(self, symbol: str, portion=1):
        """IBKR Brokage Model somehow doesn't wait till liquidation finishes in set_holdings(symbol, 1, True)
        So we liquidate explicitly first and set_holdings after
        """

        if (self.should_trade == False):
            self.notify.email(settings.notify_email_address, 
            f"Live data | {self.name}",
            f"Boolean self.should_trade is {self.should_trade} so we don't trade today")
            return 

        # liquidate any other symbols when switching
        if (
            not self.liquidated
            and not self.portfolio[symbol].invested
            and self.portfolio.invested
        ):
            # for curr_pos in current_positions:
            #     self.liquidate(curr_pos, tag="Liquidated")
            self.liquidate()
            self.liquidated = True
            return

        if (
            len(self.transactions.get_open_orders()) > 0
            or self.portfolio.unsettled_cash > 0
            or (
                self.liquidated
                and self.portfolio.total_portfolio_value != self.portfolio.cash
            )
            or not self.securities[symbol].has_data
        ):
            return

        # Usually portion < 1 is for risky asset like VIXY, no need to keep it's portion
        # as we don't hold it for long
        if portion < 1:
            if self.liquidated and not self.portfolio.invested:
                self.set_holdings(symbol, portion)
                return
            elif self.portfolio[symbol].invested:
                self.set_holdings(symbol, portion)
                return

        # Calculate for limit order
        # using 99% of buying power to avoid "Insufficient buying power to complete orders" error
        # no idea why QC's engine has weird initial margin stuff
        # TFSA doesn't have any initial margin requirements, so we can use 100% - 10 dollars
        buying_power = (
            self.portfolio.margin_remaining - 10
            if self.live_mode
            else self.portfolio.margin_remaining * 0.988
        )
        symbol_price = self.securities[symbol].ask_price

        # 3 cents buffer for limit order, IBKR will find lowest ask
        limit_price = round_up(symbol_price + 0.03)

        shares_num: int = math.floor(buying_power / limit_price)

        security = self.securities[symbol]
        initial_margin_params = InitialMarginParameters(security, shares_num - 1)
        initial_margin_required = (
            security.buying_power_model.get_initial_margin_requirement(
                initial_margin_params
            )
        )
        # Extract the numeric value from the InitialMargin object
        required_margin_value = initial_margin_required.value

        while (
            shares_num >= 2
            and required_margin_value > buying_power - self.ibkr_fee_buffer
        ):
            shares_num = shares_num - 1
            initial_margin_params = InitialMarginParameters(security, shares_num)
            initial_margin_required = (
                security.buying_power_model.get_initial_margin_requirement(
                    initial_margin_params
                )
            )
            required_margin_value = initial_margin_required.value

        # order_amount = shares_num * limit_price
        # satisfy_ibkr_market_order = (1 - (order_amount / cash)) > self.ibkr_market_order_buffer

        if (
            shares_num >= 2
            and required_margin_value < buying_power - self.ibkr_fee_buffer
        ):
            self.limit_order(symbol, shares_num, limit_price)
            return
    
    def send_live_metrics(self):
        if not self.live_mode or self.metrics_sent:
            return
        
        formatted_data = format_indicator_dict(self.indicator_data, self.target_allocation)
        
        # self.log_indicator_dict(self.indicator_data)
        self.notify.email(settings.notify_email_address, 
            f"Live data | {self.name}",
            formatted_data)
        
        self.metrics_sent = True

    def log_indicator_dict(self, data: dict):
        log = self.log
        log(f"{self.time} ----------------\n")
        for indicator, symbols in data.items():
            log(indicator.upper())
            if indicator == "price":
                for symbol, value in symbols.items():
                    log(f"{symbol}: {value}")
            else:
                for symbol, periods in symbols.items():
                    for period, value in periods.items():
                        log(f"{symbol} {period}-period: {value}")
            log("")  # blank line between indicator groups

    def on_end_of_algorithm(self) -> None:
        self.debug(f"Cash invested: {self.cashInvested}")
        time_elapsed =  datetime.now() - self.start_time
        self.debug(f"Algo took: {time_elapsed.total_seconds()} seconds")

    def sort_tickers_by(self, tickers, eval_function, direction='ascending', period=10):
        """
        Sort tickers by a given evaluation function and return the top or bottom ticker.
        
        Args:
            tickers (list): List of ticker symbols (strings)
            eval_function (callable): Function that takes a ticker and returns a numeric value.
                                    Example: lambda ticker: get_rsi(ticker)
            direction (str): 'ascending' to get the ticker with the lowest value (bottom 1),
                            'descending' to get the ticker with the highest value (top 1)
        
        Returns:
            str: The ticker symbol that ranks either top 1 or bottom 1 based on direction
        
        Example:
            # Get ticker with highest RSI
            top_ticker = sort_tickers(['AAPL', 'MSFT', 'GOOGL'], lambda t: get_rsi(t), 'descending')
            
            # Get ticker with lowest price
            bottom_ticker = sort_tickers(['AAPL', 'MSFT', 'GOOGL'], lambda t: get_price(t), 'ascending')
        """
        if not tickers:
            return None
        
        # Evaluate all tickers with the provided function
        ticker_values = []
        for ticker in tickers:
            try:
                value = eval_function(ticker, period)
                if value is not None:  # Skip tickers where function returns None
                    ticker_values.append((ticker, value))
            except Exception as e:
                # Skip tickers that cause errors in evaluation
                continue
        
        if not ticker_values:
            return None
        
        # Sort based on direction
        if direction.lower() == 'ascending':
            # ascending: lowest value first (bottom 1)
            ticker_values.sort(key=lambda x: x[1])
            return ticker_values[0][0]
        elif direction.lower() == 'descending':
            # descending: highest value first (top 1)
            ticker_values.sort(key=lambda x: x[1], reverse=True)
            return ticker_values[0][0]
        else:
            raise ValueError(f"Invalid direction: {direction}. Use 'ascending' or 'descending'.")


def round_up(n, decimals=2):
    multiplier = 10**decimals
    return math.ceil(n * multiplier) / multiplier

def format_indicator_dict(data: dict, target_allocation: dict = None) -> str:
    lines = []
    for indicator, symbols in data.items():
        lines.append(indicator.upper())
        if indicator == "price":
            for symbol, value in symbols.items():
                lines.append(f"  {symbol}: {value}")
        else:
            for symbol, periods in symbols.items():
                for period, value in periods.items():
                    lines.append(f"  {symbol} {period}-period: {value}")
        lines.append("")
    
    if target_allocation:
        lines.append("TARGET ALLOCATION")
        for symbol, portion in target_allocation.items():
            lines.append(f"  {symbol}: {portion}")
    
    return "\n".join(lines).strip()
# region imports
from AlgorithmImports import *
# endregion

class Settings():
    def __init__(self):
        self.start_cash = 10000
        self.dca_cash = 0
        self.notify_email_address = "tuenguyen12329@gmail.com"

settings = Settings()