Overall Statistics
Total Orders
868
Average Win
6.10%
Average Loss
-3.08%
Compounding Annual Return
495.558%
Drawdown
40.600%
Expectancy
0.855
Start Equity
2000
End Equity
6200600.82
Net Profit
309930.041%
Sharpe Ratio
5.173
Sortino Ratio
6.742
Probabilistic Sharpe Ratio
99.999%
Loss Rate
38%
Win Rate
62%
Profit-Loss Ratio
1.98
Alpha
0
Beta
0
Annual Standard Deviation
0.511
Annual Variance
0.261
Information Ratio
5.241
Tracking Error
0.511
Treynor Ratio
0
Total Fees
$59578.72
Estimated Strategy Capacity
$1500000.00
Lowest Capacity Asset
QID TK9BTYMV6Q1X
Portfolio Turnover
33.42%
Drawdown Recovery
72
from datetime import timedelta
import math
from AlgorithmImports import *
import time
import numpy as np


class ActualIbkrFeeModel(FeeModel):
    def get_order_fee(self, parameters: OrderFeeParameters) -> OrderFee:
        security = parameters.security
        order = parameters.order
        # Optional check if it's equity
        if security.type != SecurityType.EQUITY:
            return OrderFee(CashAmount(0, "USD"))

        quantity = abs(order.quantity)
        price = security.price

        return OrderFee(CashAmount(get_fee(quantity, price), "USD"))


def get_fee(quantity, price, buffer=0):
    # Commission: $0.006 per share
    commission = 0.006 * quantity
    # Minimum commission: $0.80
    min_fee = 0.80
    # Maximum commission: 0.4% of trade value
    max_fee = 0.004 * quantity * price

    # Final commission per order
    final_fee = max(min_fee, min(commission, max_fee))

    return final_fee + buffer


class KMLM_SIMPLE(QCAlgorithm):

    def Initialize(self):
        self.set_start_date(2021, 1, 1)
        # self.set_end_date(2022, 1, 1)
        # https://www.interactivebrokers.ca/en/accounts/tradingPermissions.php?ib_entity=ca
        # 5% ish for market orders
        self.ibkr_market_order_buffer = 0.02
        self.ibkr_fee_buffer = 25

        self.set_brokerage_model(
            BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.CASH
        )
        # self.fee_model = ActualIbkrFeeModel()
        self.SetSecurityInitializer(lambda s: s.SetLeverage(1))

        self.symbols = [
            "QQQ", "QLD", "TQQQ", "SPY", "GLD", "UVXY", "XLK", "KMLM", "QID", "QQQE",
            "VTV", "VOX", "TECL", "VOOG", "VOOV", "XLP", "XLY", "FAS", "SOXL", "SPXL",
            "LABU", "QQQM", "IGIB", "IYT", "ROM"
        ]

        self.equities = {}
        
        for name in self.symbols:
            self.equities[name] = self.add_equity_symbol(name)
        
        
        # self.live_mode_symbols: dict[str, Symbol] = {
        #     self.qqq.value: self.qqqm,
        # }

        # self.strat_position_symbols: list[Symbol] = [self.tqqq, self.qld, self.qqq, self.vixy, self.psq, self.gld]

        if not self.live_mode:
            self.set_warmup(timedelta(days=250))
            self.set_cash(2000)

            # self.schedule.on(self.date_rules.week_end(), self.time_rules.after_market_close(self.spy, 0), self.add_cash)
            self.schedule.on(
                self.date_rules.month_start(1),
                self.time_rules.after_market_close(self.equities["SPY"], 0),
                self.add_cash,
            )
            # self.schedule.on(
            #     self.date_rules.month_end(15),
            #     self.time_rules.after_market_close(self.spy, 0),
            #     self.add_cash,
            # )

        self.schedule.on(
            self.date_rules.every_day(),
            self.time_rules.before_market_close(self.equities["SPY"], 6),
            self.reset_liquidated,
        )

        # self.schedule.on(
        #     self.date_rules.year_start(1),
        #     self.time_rules.after_market_open(self.equities["SPY"], 1),
        #     self.save_start_equity,
        # )
        
        # self.schedule.on(
        #     self.date_rules.year_end(1),
        #     self.time_rules.after_market_open(self.equities["SPY"], 1),
        #     self.calculate_tax_and_liquidate,
        # )
        
        # self.schedule.on(
        #     self.date_rules.year_end(1),
        #     self.time_rules.after_market_open(self.equities["SPY"], 5),
        #     self.pay_tax,
        # )
        
        for offset in np.arange(2, 0, -1):
            self.schedule.on(
                self.date_rules.every_day(),
                self.time_rules.before_market_close(self.equities["SPY"], offset),
                self.rebalance,
            )

        self.cashInvested = self.portfolio.cash_book["USD"].amount
        self.liquidated = False
        self.stupid_order_log = ""
        self.tax = 0
        self.start_equity = 0
    # def on_data(self, slice: Slice) -> None:
    #     # Obtain the mapped TradeBar of the symbol if any
    #     # if not self.portfolio[self.spy].invested:
    #     #     self.set_holdings(self.spy, 1, True)
    #     if self.live_mode and self.securities[self.qqq].has_data:
    #         self.rebalance()

    def save_start_equity(self):
        self.start_equity = self.portfolio.total_portfolio_value
        self.tax = 0
        
    def calculate_tax_and_liquidate(self):
        capital_gain = self.portfolio.total_portfolio_value - self.start_equity
        if capital_gain < 100:
            return 
        self.debug(f"{self.time.year} capital gain {capital_gain}")
        tax = calculate_tax_only_on_capital_gain(100_000, capital_gain)
        
        symbols = [x.key for x in self.portfolio if x.value.invested]
        
        if len(symbols) == 0:
            return
        
        symbol = symbols[0]
        
        if(tax < self.portfolio.cash):
            shares_needed_to_sell = math.ceil((tax - self.portfolio.cash) / self.securities[symbol].ask_price)
            percentage_of_portfolio = ((shares_needed_to_sell * self.securities[symbol].ask_price) / self.portfolio.total_holdings_value)
            self.tax = tax
            self.set_holdings(symbol, percentage=(1 - percentage_of_portfolio))
        else:
            self.tax = tax

        
    def pay_tax(self):
        if self.tax == 0:
            return
        self.debug(f"{self.time.year} {self.tax} tax")
        self.portfolio.cash_book["USD"].add_amount(-self.tax)

    def reset_liquidated(self):
        self.liquidated = False

    def add_equity_symbol(self, symbol: str) -> Symbol:
        s = self.add_equity(
            symbol, Resolution.MINUTE, data_normalization_mode=DataNormalizationMode.RAW
        )
        s.set_settlement_model(ImmediateSettlementModel())
        # s.set_fee_model(self.fee_model)
        s.set_fill_model(ImmediateFillModel())
        return s.Symbol

    def add_cash(self):
        dcaCash = 150
        self.portfolio.cash_book["USD"].add_amount(dcaCash)
        self.cashInvested += dcaCash

    def check_and_get_live_symbol(self, symbol: Symbol):
        if (
            self.portfolio.total_portfolio_value < 20000
            and self.live_mode
            and symbol.value in self.live_mode_symbols
        ):
            return self.live_mode_symbols[symbol.value]
        return symbol

    def rsi_2(self, symbol, period):
        warmup = int(round_up(11 * math.sqrt(period) + 5.5 * period, 0))
        extension = min(warmup, 250)
        r_w = RollingWindow[float](extension)
        history = self.history(symbol.value, extension - 1, Resolution.DAILY)
        for historical_bar in history:
            r_w.add(historical_bar.close)
        while r_w.count < extension:
            current_price = self.securities[symbol.value].price
            r_w.add(current_price)
        if r_w.is_ready:
            average_gain = 0
            average_loss = 0
            gain = 0
            loss = 0
            for i in range(extension - 1, extension - period - 1, -1):
                gain += max(r_w[i - 1] - r_w[i], 0)
                loss += abs(min(r_w[i - 1] - r_w[i], 0))
            average_gain = gain / period
            average_loss = loss / period
            for i in range(extension - period - 1, 0, -1):
                average_gain = (
                    average_gain * (period - 1) + max(r_w[i - 1] - r_w[i], 0)
                ) / period
                average_loss = (
                    average_loss * (period - 1) + abs(min(r_w[i - 1] - r_w[i], 0))
                ) / period
            if average_loss == 0:
                return 100
            else:
                rsi = 100 - (100 / (1 + average_gain / average_loss))
                return rsi
        else:
            return None

    def sma_2(self, symbol: Symbol, period):
        r_w = RollingWindow[float](period)
        history = self.history(symbol.value, period - 1, Resolution.DAILY)
        for historical_bar in history:
            r_w.add(historical_bar.close)
        while r_w.count < period:
            current_price = self.securities[symbol.value].price
            r_w.add(current_price)
        if r_w.is_ready:
            sma = sum(r_w) / period
            return sma
        else:
            return 0

    # def get_current_strat_positions(self):
    #     """
    #     Returns:
    #         list[Symbol]: A list of stock symbols that this strategy buys and sells, not symbols for indicators.
    #     """
    #     current_positions = []
    #     for strat_symbol in self.strat_position_symbols:
    #         strat_symbol = self.check_and_get_live_symbol(strat_symbol)

    #         if self.portfolio[strat_symbol].invested:
    #             current_positions.append(strat_symbol)

    #     return current_positions

    def custom_log(self, message: str, only_live_mode: bool = True):
        if only_live_mode and self.live_mode:
            self.log(message)

    def set_holdings_2(self, symbol: Symbol, portion=1):
        """IBKR Brokage Model somehow doesn't wait till liquidation finishes in set_holdings(symbol, 1, True)
        So we liquidate explicitly first and set_holdings after
        """
        # symbol = self.check_and_get_live_symbol(symbol)

        # liquidate any other symbols when switching
        if (
            not self.liquidated
            and not self.portfolio[symbol].invested
            and self.portfolio.invested
        ):
            # for curr_pos in current_positions:
            #     self.liquidate(curr_pos, tag="Liquidated")
            self.liquidate()
            self.liquidated = True
            return

        if (
            len(self.transactions.get_open_orders()) > 0
            or self.portfolio.unsettled_cash > 0
            or (
                self.liquidated
                and self.portfolio.total_portfolio_value != self.portfolio.cash
            )
            or not self.securities[symbol].has_data
        ):
            self.custom_log(f"Skip order submissions")
            return

        # Usually portion < 1 is for risky asset like UVXY, no need to keep it's portion
        # as we don't hold it for long
        if portion < 1:
            if self.liquidated and not self.portfolio.invested:
                self.set_holdings(symbol, portion)
                return
            elif self.portfolio[symbol].invested:
                self.set_holdings(symbol, portion)
                return

        # Calculate for limit order
        # using 99% of buying power to avoid "Insufficient buying power to complete orders" error
        # no idea why QC's engine has weird initial margin stuff
        # TFSA doesn't have any initial margin requirements, so we can use 100% - 10 dollars
        buying_power = (
            self.portfolio.margin_remaining - 10
            if self.live_mode
            else self.portfolio.margin_remaining * 0.99
        )
        symbol_price = self.securities[symbol].ask_price

        # 5 cents buffer for limit order, IBKR will find lowest ask
        limit_price = round_up(symbol_price + 0.03)

        shares_num: int = math.floor(buying_power / limit_price)

        security = self.securities[symbol]
        initial_margin_params = InitialMarginParameters(security, shares_num)
        initial_margin_required = (
            security.buying_power_model.get_initial_margin_requirement(
                initial_margin_params
            )
        )
        # Extract the numeric value from the InitialMargin object
        required_margin_value = initial_margin_required.value

        while shares_num >= 2 and required_margin_value > buying_power - get_fee(
            shares_num, limit_price, 10
        ):
            shares_num = shares_num - 1
            initial_margin_params = InitialMarginParameters(security, shares_num)
            initial_margin_required = (
                security.buying_power_model.get_initial_margin_requirement(
                    initial_margin_params
                )
            )
            required_margin_value = initial_margin_required.value

        # order_amount = shares_num * limit_price
        # satisfy_ibkr_market_order = (1 - (order_amount / cash)) > self.ibkr_market_order_buffer

        if shares_num >= 2 and required_margin_value < buying_power - get_fee(
            shares_num, limit_price, 10
        ):
            self.stupid_order_log = f"Placing order for {symbol.value}, {shares_num} shares, {symbol_price} price, {required_margin_value} amount while buying power is {buying_power} and fee is {get_fee(shares_num, limit_price, 10)}"

            self.custom_log(
                f"Placing order for {symbol.value}, {shares_num} shares, {symbol_price} price, {required_margin_value} amount"
            )
            self.limit_order(symbol, shares_num, limit_price)
            return

    def rebalance(self):
        if self.time < self.start_date:
            return
        if not self.securities["SPY"].has_data:
            return
        
        rsi = self.rsi_2
        equities = self.equities

        if (
            rsi(equities["QQQ"], 10) > 79 or
            rsi(equities["VTV"], 10) > 79 or
            rsi(equities["VOX"], 10) > 79 or
            rsi(equities["TECL"], 10) > 79 or
            rsi(equities["VOOG"], 10) > 79 or
            rsi(equities["VOOV"], 10) > 79 or
            rsi(equities["XLP"], 10) > 75 or
            rsi(equities["XLY"], 10) > 80 or
            rsi(equities["FAS"], 10) > 80 or
            rsi(equities["SPY"], 10) > 80
        ):
            self.set_holdings_2(equities["UVXY"], 0.4)

        elif rsi(equities["TQQQ"], 10) < 31:
            self.set_holdings_2(equities["TQQQ"], 1)

        elif rsi(equities["SOXL"], 10) < 31:
            self.set_holdings_2(equities["SOXL"], 1)

        elif rsi(equities["SPXL"], 10) < 31:
            self.set_holdings_2(equities["SPXL"], 1)

        elif rsi(equities["LABU"], 10) < 25:
            self.set_holdings_2(equities["LABU"], 1)

        else:
            xlk_rsi_10 = rsi(equities["XLK"], 10)
            kmlm_rsi_10 = rsi(equities["KMLM"], 10)

            if xlk_rsi_10 is None or kmlm_rsi_10 is None:
                return  # Not enough data

            if xlk_rsi_10 > kmlm_rsi_10:
                if rsi(equities["IYT"], 10) > rsi(equities["SPY"], 10):
                    self.set_holdings_2(equities["TQQQ"], 1)
                elif rsi(equities["IGIB"], 10) > xlk_rsi_10:
                    self.set_holdings_2(equities["TQQQ"], 1)
                else:
                    self.set_holdings_2(equities["TQQQ"], 1)
            else:
                kmlm_price = self.securities[equities["KMLM"]].price
                kmlm_sma_20 = self.sma_2(equities["KMLM"], 20)

                if kmlm_sma_20 is None or kmlm_sma_20 == 0:
                    return  # Not enough data

                if kmlm_price > kmlm_sma_20:
                    self.set_holdings_2(equities["QID"], 1)  # PSQ (short QQQ)
                else:
                    self.set_holdings_2(equities["GLD"], 1)


    def on_end_of_algorithm(self) -> None:
        self.debug(f"Cash invested: {self.cashInvested}")

    def on_order_event(self, order_event: OrderEvent) -> None:
        # order = self.transactions.get_order_by_id(order_event.order_id)
        if order_event.status == OrderStatus.INVALID:
            # amount = order.quantity * order.limit_price
            self.debug(self.stupid_order_log)


def round_up(n, decimals=2):
    multiplier = 10**decimals
    return math.ceil(n * multiplier) / multiplier

def calculate_tax_only_on_capital_gain(employment_income, capital_gain):
    # 50% of capital gains are taxable
    taxable_capital_gain = 0.5 * capital_gain
    total_income = employment_income + taxable_capital_gain

    # Federal tax brackets (2025)
    fed_brackets = [0, 55198, 110396, 177000, 240000]
    fed_rates = [0.15, 0.205, 0.26, 0.29, 0.33]

    # Ontario tax brackets (2025 approx)
    on_brackets = [0, 49231, 98463, 150000, 220000]
    on_rates = [0.0505, 0.0915, 0.1116, 0.1216, 0.1316]

    def bracket_tax(income, brackets, rates):
        tax = 0
        for i in range(1, len(brackets)):
            lower = brackets[i-1]
            upper = brackets[i]
            if income > lower:
                tax += (min(income, upper) - lower) * rates[i-1]
        if income > brackets[-1]:
            tax += (income - brackets[-1]) * rates[-1]
        return tax

    # Tax with capital gain
    tax_with_gain_fed = bracket_tax(total_income, fed_brackets, fed_rates)
    tax_with_gain_prov = bracket_tax(total_income, on_brackets, on_rates)

    # Tax without capital gain
    tax_without_gain_fed = bracket_tax(employment_income, fed_brackets, fed_rates)
    tax_without_gain_prov = bracket_tax(employment_income, on_brackets, on_rates)

    # Difference is tax due to capital gain
    tax_on_capital_gain = (tax_with_gain_fed + tax_with_gain_prov) - (tax_without_gain_fed + tax_without_gain_prov)

    return round(tax_on_capital_gain, 2)