| Overall Statistics |
|
Total Orders 74 Average Win 2.54% Average Loss -1.85% Compounding Annual Return 58.051% Drawdown 10.600% Expectancy 0.442 Start Equity 13000 End Equity 16304.15 Net Profit 25.417% Sharpe Ratio 1.695 Sortino Ratio 1.564 Probabilistic Sharpe Ratio 69.412% Loss Rate 39% Win Rate 61% Profit-Loss Ratio 1.38 Alpha 0 Beta 0 Annual Standard Deviation 0.205 Annual Variance 0.042 Information Ratio 1.964 Tracking Error 0.205 Treynor Ratio 0 Total Fees $86.50 Estimated Strategy Capacity $2100000.00 Lowest Capacity Asset VNQ T2FCD04TATET Portfolio Turnover 31.28% Drawdown Recovery 60 |
from datetime import timedelta, datetime
import math
from AlgorithmImports import *
import numpy as np
from settings import settings
class RSIRebalanceStrategy(QCAlgorithm):
def Initialize(self):
self.set_start_date(2025, 6, 1)
# self.set_end_date(2025, 1, 1)
# https://www.interactivebrokers.ca/en/accounts/tradingPermissions.php?ib_entity=ca
self.ibkr_market_order_buffer = 0.02
self.ibkr_fee_buffer = 25
self.set_brokerage_model(
BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.CASH
)
self.symbols = [ "QLD", "TQQQ", "SPY", "PSQ", "VIXY",
"IBB", "QQQE", "VTV", "VOOG", "VOOV", "XLP", "XLY", "LABU",
"BND", "UTSL", "SH", "SOXX", "SOXL", "VNQ", "UTSL", "XLU"
]
self.equities = {}
for name in self.symbols:
self.equities[name] = self.add_equity_symbol(name)
if not self.live_mode:
self.set_warmup(timedelta(days=250))
self.set_cash(settings.start_cash)
self.schedule.on(
self.date_rules.month_start(1),
self.time_rules.after_market_close("SPY", 0),
self.add_cash,
)
# self.schedule.on(
# self.date_rules.month_start(15),
# self.time_rules.after_market_close("SPY", 0),
# self.add_cash,
# )
self.schedule.on(
self.date_rules.every_day(),
self.time_rules.before_market_close("SPY", 6),
self.reset_daily_variables,
)
step = -0.25 if self.live_mode else -1
stop = 0 if self.live_mode else 0
start = 3
for offset in np.arange(start, stop, step):
self.schedule.on(
self.date_rules.every_day(),
self.time_rules.before_market_close("SPY", offset),
self.rebalance,
)
self.cashInvested = self.portfolio.cash_book["USD"].amount
self.liquidated = False
self.additional_cash = 0
self.prev_tqqq_rsi = 100
self.prev_tqqq_sma = 0
self.less_than_40 = 0
self.indicator_data = {
"rsi": {},
"sma": {}
}
self.start_time = datetime.now()
self.metrics_sent = False
def rebalance(self):
if self.time < self.start_date:
return
if not self.securities["SPY"].has_data:
return
# reset indicator data
if self.live_mode:
self.indicator_data = {
"rsi": {},
"sma": {}
}
rsi = self.rsi_2
sma = self.sma_2
set_holdings = self.set_holdings_2
equities = self.equities
price = self.price
if (rsi("TQQQ", 10) < 80):
self.portfolio.cash_book["USD"].add_amount(self.additional_cash)
self.additional_cash = 0
if (
rsi("QQQE", 10) > 79
or rsi("VTV", 10) > 79
or rsi("VOOG", 10) > 79
or rsi("VOOV", 10) > 79
or rsi("XLP", 10) > 75
or rsi("TQQQ", 10) > 79
or rsi("XLY", 10) > 80
or rsi("SPY", 10) > 80
):
set_holdings("VIXY", 1)
elif price("TQQQ") > sma("TQQQ", 200):
if price("TQQQ") > sma("TQQQ", 20):
set_holdings("QLD", 1)
else:
set_holdings("VNQ", 1)
else:
if price("TQQQ") > sma("TQQQ", 20):
set_holdings("QLD", 1)
else:
set_holdings("PSQ", 1)
self.prev_tqqq_rsi = rsi("TQQQ", 10)
self.prev_tqqq_sma = sma("TQQQ", 200)
self.send_live_metrics()
def reset_daily_variables(self):
self.liquidated = False
self.indicator_data = {
"rsi": {},
"sma": {}
}
self.metrics_sent = False
def add_equity_symbol(self, symbol: str) -> Symbol:
s = self.add_equity(
symbol, Resolution.MINUTE, data_normalization_mode=DataNormalizationMode.ADJUSTED
)
s.set_settlement_model(ImmediateSettlementModel())
s.set_fill_model(ImmediateFillModel())
return s.Symbol
def add_cash(self):
dcaCash = settings.dca_cash
self.additional_cash += dcaCash
# if self.rsi_2("TQQQ", 10) < 45:
# self.portfolio.cash_book["USD"].add_amount(self.additional_cash)
# self.additional_cash = 0
self.cashInvested += dcaCash
def price(self, symbol: str):
return self.securities[symbol].price
# def rsi_2(self, symbol: str, period):
# if self.indicator_data["rsi"].get(symbol) is None:
# self.indicator_data["rsi"][symbol] = {}
# if self.indicator_data["rsi"][symbol].get(period) is None or self.indicator_data["rsi"][symbol].get(period) == 0:
# self.indicator_data["rsi"][symbol][period] = 0
# else:
# return self.indicator_data["rsi"][symbol][period]
# extension = min(round(11 * period**0.5 + 5.5*period), 2000)
# r_w = RollingWindow[float](extension) #90
# change = RollingWindow[float](extension - 1) #89
# history = self.history(symbol, extension, Resolution.DAILY)
# for historical_bar in history:
# r_w.add(historical_bar.close)
# r_w.add(self.securities[symbol].price)
# for i in range(r_w.count - 2, -1, -1):
# change.add(round(r_w[i] - r_w[i+1], 10))
# if r_w.is_ready and change.is_ready:
# gain = 0
# loss = 0
# for i in range(change.count - 1, change.count - 1 - period, -1):
# gain += max(change[i], 0)
# loss += abs(min(change[i], 0))
# avgGainStartingPt = round(gain / period, 10)
# avgLossStartingPt = round(loss / period, 10)
# #Start avgGain and avgLoss
# avgGain = round(((period - 1) * avgGainStartingPt + max(change[change.count - 1 - period], 0)) / period, 10)
# avgLoss = round(((period - 1) * avgLossStartingPt + abs(min(change[change.count - 1 - period], 0))) / period, 10)
# for i in range(change.count - 1 - period - 1, -1, -1):
# avgGain = round(((period - 1) * avgGain + max(change[i], 0)) / period, 10)
# avgLoss = round(((period - 1) * avgLoss + abs(min(change[i], 0))) / period, 10)
# if avgLoss == 0:
# return 100
# else:
# rs = round(avgGain / avgLoss, 10)
# rsi = round(100 - 100/(1+rs), 5)
# self.indicator_data["rsi"][symbol][period] = rsi
# return rsi
# else:
# return 50 #None
def rsi_2(self, symbol: str, period):
if self.indicator_data["rsi"].get(symbol) is None:
self.indicator_data["rsi"][symbol] = {}
if self.indicator_data["rsi"][symbol].get(period) is None or self.indicator_data["rsi"][symbol].get(period) == 0:
self.indicator_data["rsi"][symbol][period] = 0
else:
return self.indicator_data["rsi"][symbol][period]
warmup = int(round_up(11 * math.sqrt(period) + 5.5 * period, 0))
extension = min(warmup, 250)
r_w = RollingWindow[float](extension)
history = self.history(symbol, extension - 1, Resolution.DAILY)
for historical_bar in history:
r_w.add(historical_bar.close)
while r_w.count < extension:
current_price = self.securities[symbol].price
r_w.add(current_price)
if r_w.is_ready:
average_gain = 0
average_loss = 0
gain = 0
loss = 0
for i in range(extension - 1, extension - period - 1, -1):
gain += max(r_w[i - 1] - r_w[i], 0)
loss += abs(min(r_w[i - 1] - r_w[i], 0))
average_gain = gain / period
average_loss = loss / period
for i in range(extension - period - 1, 0, -1):
average_gain = (
average_gain * (period - 1) + max(r_w[i - 1] - r_w[i], 0)
) / period
average_loss = (
average_loss * (period - 1) + abs(min(r_w[i - 1] - r_w[i], 0))
) / period
if average_loss == 0:
return 100
else:
rsi = round(100 - (100 / (1 + average_gain / average_loss)), 2)
self.indicator_data["rsi"][symbol][period] = rsi
return rsi
else:
return None
def sma_2(self, symbol: str, period):
if self.indicator_data["sma"].get(symbol) is None:
self.indicator_data["sma"][symbol] = {}
if self.indicator_data["sma"][symbol].get(period) is None or self.indicator_data["sma"][symbol].get(period) == 0:
self.indicator_data["sma"][symbol][period] = 0
else:
return self.indicator_data["sma"][symbol][period]
r_w = RollingWindow[float](period)
history = self.history(symbol, period - 1, Resolution.DAILY)
total = sum(bar.close for bar in history) + self.securities[symbol].price
sma = round(total / period, 3)
self.indicator_data["sma"][symbol][period] = sma
return sma
def set_holdings_2(self, symbol: str, portion=1):
"""IBKR Brokage Model somehow doesn't wait till liquidation finishes in set_holdings(symbol, 1, True)
So we liquidate explicitly first and set_holdings after
"""
# liquidate any other symbols when switching
if (
not self.liquidated
and not self.portfolio[symbol].invested
and self.portfolio.invested
):
# for curr_pos in current_positions:
# self.liquidate(curr_pos, tag="Liquidated")
self.liquidate()
self.liquidated = True
return
if (
len(self.transactions.get_open_orders()) > 0
or self.portfolio.unsettled_cash > 0
or (
self.liquidated
and self.portfolio.total_portfolio_value != self.portfolio.cash
)
or not self.securities[symbol].has_data
):
return
# Usually portion < 1 is for risky asset like VIXY, no need to keep it's portion
# as we don't hold it for long
if portion < 1:
if self.liquidated and not self.portfolio.invested:
self.set_holdings(symbol, portion)
return
elif self.portfolio[symbol].invested:
self.set_holdings(symbol, portion)
return
# Calculate for limit order
# using 99% of buying power to avoid "Insufficient buying power to complete orders" error
# no idea why QC's engine has weird initial margin stuff
# TFSA doesn't have any initial margin requirements, so we can use 100% - 10 dollars
buying_power = (
self.portfolio.margin_remaining - 10
if self.live_mode
else self.portfolio.margin_remaining * 0.988
)
symbol_price = self.securities[symbol].ask_price
# 5 cents buffer for limit order, IBKR will find lowest ask
limit_price = round_up(symbol_price + 0.03)
shares_num: int = math.floor(buying_power / limit_price)
security = self.securities[symbol]
initial_margin_params = InitialMarginParameters(security, shares_num - 1)
initial_margin_required = (
security.buying_power_model.get_initial_margin_requirement(
initial_margin_params
)
)
# Extract the numeric value from the InitialMargin object
required_margin_value = initial_margin_required.value
while (
shares_num >= 2
and required_margin_value > buying_power - self.ibkr_fee_buffer
):
shares_num = shares_num - 1
initial_margin_params = InitialMarginParameters(security, shares_num)
initial_margin_required = (
security.buying_power_model.get_initial_margin_requirement(
initial_margin_params
)
)
required_margin_value = initial_margin_required.value
# order_amount = shares_num * limit_price
# satisfy_ibkr_market_order = (1 - (order_amount / cash)) > self.ibkr_market_order_buffer
if (
shares_num >= 2
and required_margin_value < buying_power - self.ibkr_fee_buffer
):
self.limit_order(symbol, shares_num, limit_price)
return
def send_live_metrics(self):
if not self.live_mode or self.metrics_sent:
return
formatted_data = format_indicator_dict(self.indicator_data)
self.log_indicator_dict(self.indicator_data)
self.notify.email(settings.notify_email_address,
"Live data - Holy grail smart qqq",
formatted_data)
self.metrics_sent = True
def log_indicator_dict(self, data: dict):
log = self.log
log(f"{self.time} ----------------\n")
for indicator, symbols in data.items():
log(indicator.upper())
for symbol, periods in symbols.items():
for period, value in periods.items():
log(f"{symbol} {period}-period: {value}")
log("") # blank line between indicator groups
def on_end_of_algorithm(self) -> None:
self.debug(f"Cash invested: {self.cashInvested}")
time_elapsed = datetime.now() - self.start_time
self.debug(f"Algo took: {time_elapsed.total_seconds()} seconds")
def round_up(n, decimals=2):
multiplier = 10**decimals
return math.ceil(n * multiplier) / multiplier
def format_indicator_dict(data: dict) -> str:
lines = []
for indicator, symbols in data.items():
lines.append(indicator.upper())
for symbol, periods in symbols.items():
parts = []
for period, value in periods.items():
parts.append(f"{symbol} {period}-period: {value}")
lines.append(" | ".join(parts))
lines.append("") # blank line between indicators
return "\n".join(lines).strip()
# region imports
from AlgorithmImports import *
# endregion
class Settings():
def __init__(self):
self.start_cash = 13000
self.dca_cash = 0
self.notify_email_address = ""
settings = Settings()