| Overall Statistics |
|
Total Orders 2838 Average Win 2.86% Average Loss -2.09% Compounding Annual Return 89.676% Drawdown 41.800% Expectancy 0.475 Start Equity 10000 End Equity 102135676.15 Net Profit 1021256.761% Sharpe Ratio 1.967 Sortino Ratio 2.268 Probabilistic Sharpe Ratio 99.792% Loss Rate 38% Win Rate 62% Profit-Loss Ratio 1.36 Alpha 0 Beta 0 Annual Standard Deviation 0.311 Annual Variance 0.097 Information Ratio 2.028 Tracking Error 0.311 Treynor Ratio 0 Total Fees $1495530.72 Estimated Strategy Capacity $1100000.00 Lowest Capacity Asset VIXY UT076X30D0MD Portfolio Turnover 38.21% Drawdown Recovery 378 |
from datetime import timedelta, datetime
import math
from AlgorithmImports import *
import numpy as np
from settings import settings
class RSIRebalanceStrategy(QCAlgorithm):
def Initialize(self):
self.name = "Smart QQQ - BND vs QQQ"
self.set_start_date(2011, 1, 1)
self.set_end_date(2025, 6, 1)
self.should_trade = True
# https://www.interactivebrokers.ca/en/accounts/tradingPermissions.php?ib_entity=ca
self.ibkr_market_order_buffer = 0.02
self.ibkr_fee_buffer = 25
self.set_brokerage_model(
BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.CASH
)
self.symbols = [
"SPLV", "TQQQ", "SPY", "QQQE", "VOOG", "VCR", "VTV",
"VOOV", "VIXY", "BND", "QQQ", "QLD", "XLU", "PSQ",
"KMLM", "IBB", "SOXX", "UVXY", "XLP", "LABU", "SOXL", "XME"
]
self.live_symbols = {
"QQQ": "QQQM"
}
self.equities = {}
for name in self.symbols:
self.equities[name] = self.add_equity_symbol(name)
if not self.live_mode:
self.set_warmup(timedelta(days=300))
self.set_cash(settings.start_cash)
self.schedule.on(
self.date_rules.month_start(1),
self.time_rules.after_market_close("SPY", 0),
self.add_cash,
)
self.schedule.on(
self.date_rules.every_day(),
self.time_rules.before_market_close("SPY", 6),
self.reset_daily_variables,
)
step = -0.25 if self.live_mode else -1
stop = 0 if self.live_mode else 0
start = 3
for offset in np.arange(start, stop, step):
self.schedule.on(
self.date_rules.every_day(),
self.time_rules.before_market_close("SPY", offset),
self.rebalance,
)
self.cashInvested = self.portfolio.cash_book["USD"].amount
self.liquidated = False
self.indicator_data = {
"rsi": {},
"sma": {},
"price": {}
}
self.start_time = datetime.now()
self.metrics_sent = False
def rebalance(self):
if self.time < self.start_date:
return
if not self.securities["SPY"].has_data:
return
set_holdings = self.set_holdings_2
self.target_allocation = self.get_target_allocation()
# Execute target allocation
if self.target_allocation:
for symbol, portion in self.target_allocation.items():
live_symbol = self.live_symbols.get(symbol, None)
# Switch to live mode symbol
if self.live_mode and live_symbol:
set_holdings(live_symbol, portion)
else:
set_holdings(symbol, portion)
self.send_live_metrics()
def get_target_allocation(self) -> dict:
rsi = self.rsi_2
sma = self.sma_2
equities = self.equities
price = self.price
if (
rsi("SPLV", 10) > 79
or rsi("QQQE", 10) > 79
or rsi("VTV", 10) > 79
or rsi("VOOG", 10) > 79
or rsi("VOOV", 10) > 79
or rsi("TQQQ", 10) > 79
or rsi("SPY", 10) > 78
or rsi("VCR", 10) > 80
):
return {"VIXY": 1}
elif rsi("XLP", 10) > 77:
if rsi("SPY", 10) > 55:
return {"VIXY": 1}
else:
return {"BND": 1}
elif rsi("TQQQ", 10) < 31:
return {"TQQQ": 1}
elif rsi("LABU", 10) < 25:
return {"IBB": 1}
elif rsi("SOXL", 10) < 31:
return {"SOXX": 1}
elif rsi("UVXY", 10) > 79:
return {"QLD": 1}
elif price("TQQQ") > sma("TQQQ", 200):
if rsi("TQQQ", 2) < 20:
return {"QLD": 1}
elif price("TQQQ") > sma("TQQQ", 20):
if rsi("BND", 15) > rsi("QQQ", 15):
return {"QLD": 1}
else:
return {"QQQ": 1}
else:
if rsi("BND", 15) > rsi("QQQ", 15):
return {"QLD": 1}
else:
return {"XME": 1}
# Bear mode
elif price("TQQQ") > sma("TQQQ", 20):
if rsi("TQQQ", 2) < 21:
return {"QLD": 1}
elif rsi("BND", 15) > rsi("QQQ", 15):
return {"QLD": 1}
else:
return {"XLU": 1}
elif price("KMLM") > sma("KMLM", 20):
return {"PSQ": 1}
else:
return {"XLU": 1}
def allocate(self, allocation: dict):
if len(self.target_allocation) == 0:
self.target_allocation = allocation
def reset_daily_variables(self):
self.liquidated = False
self.indicator_data = {
"rsi": {},
"sma": {},
"price": {}
}
self.metrics_sent = False
def add_equity_symbol(self, symbol: str) -> Symbol:
s = self.add_equity(
symbol, Resolution.MINUTE, data_normalization_mode=DataNormalizationMode.ADJUSTED
)
s.set_settlement_model(ImmediateSettlementModel())
s.set_fill_model(ImmediateFillModel())
return s.Symbol
def add_cash(self):
dcaCash = settings.dca_cash
# self.additional_cash += dcaCash
# if self.rsi_2("TQQQ", 10) < 45:
# self.portfolio.cash_book["USD"].add_amount(self.additional_cash)
# self.additional_cash = 0
self.cashInvested += dcaCash
def price(self, symbol: str):
if self.indicator_data["price"].get(symbol) is not None:
return self.indicator_data["price"][symbol]
p = self.securities[symbol].price
self.indicator_data["price"][symbol] = p
return p
def rsi_2(self, symbol: str, period):
if self.indicator_data["rsi"].get(symbol) is None:
self.indicator_data["rsi"][symbol] = {}
if self.indicator_data["rsi"][symbol].get(period) is None or self.indicator_data["rsi"][symbol].get(period) == 0:
self.indicator_data["rsi"][symbol][period] = 0
else:
return self.indicator_data["rsi"][symbol][period]
warmup = int(round_up(11 * math.sqrt(period) + 5.5 * period, 0))
extension = min(warmup, 250)
r_w = RollingWindow[float](extension)
history = self.history(symbol, extension - 1, Resolution.DAILY)
for historical_bar in history:
r_w.add(historical_bar.close)
while r_w.count < extension:
current_price = self.securities[symbol].price
if self.live_mode:
self.log(f"{symbol}: {current_price}")
r_w.add(current_price)
if r_w.is_ready:
average_gain = 0
average_loss = 0
gain = 0
loss = 0
for i in range(extension - 1, extension - period - 1, -1):
gain += max(r_w[i - 1] - r_w[i], 0)
loss += abs(min(r_w[i - 1] - r_w[i], 0))
average_gain = gain / period
average_loss = loss / period
for i in range(extension - period - 1, 0, -1):
average_gain = (
average_gain * (period - 1) + max(r_w[i - 1] - r_w[i], 0)
) / period
average_loss = (
average_loss * (period - 1) + abs(min(r_w[i - 1] - r_w[i], 0))
) / period
if average_loss == 0:
return 100
else:
rsi = round(100 - (100 / (1 + average_gain / average_loss)), 2)
self.indicator_data["rsi"][symbol][period] = rsi
return rsi
else:
return None
def sma_2(self, symbol: str, period):
if self.indicator_data["sma"].get(symbol) is None:
self.indicator_data["sma"][symbol] = {}
if self.indicator_data["sma"][symbol].get(period) is None or self.indicator_data["sma"][symbol].get(period) == 0:
self.indicator_data["sma"][symbol][period] = 0
else:
return self.indicator_data["sma"][symbol][period]
r_w = RollingWindow[float](period)
history = self.history(symbol, period - 1, Resolution.DAILY)
total = sum(bar.close for bar in history) + self.securities[symbol].price
sma = round(total / period, 3)
self.indicator_data["sma"][symbol][period] = sma
return sma
def set_holdings_2(self, symbol: str, portion=1):
"""IBKR Brokage Model somehow doesn't wait till liquidation finishes in set_holdings(symbol, 1, True)
So we liquidate explicitly first and set_holdings after
"""
if (self.should_trade == False):
self.notify.email(settings.notify_email_address,
f"Live data | {self.name}",
f"Boolean self.should_trade is {self.should_trade} so we don't trade today")
return
# liquidate any other symbols when switching
if (
not self.liquidated
and not self.portfolio[symbol].invested
and self.portfolio.invested
):
# for curr_pos in current_positions:
# self.liquidate(curr_pos, tag="Liquidated")
self.liquidate()
self.liquidated = True
return
if (
len(self.transactions.get_open_orders()) > 0
or self.portfolio.unsettled_cash > 0
or (
self.liquidated
and self.portfolio.total_portfolio_value != self.portfolio.cash
)
or not self.securities[symbol].has_data
):
return
# Usually portion < 1 is for risky asset like VIXY, no need to keep it's portion
# as we don't hold it for long
if portion < 1:
if self.liquidated and not self.portfolio.invested:
self.set_holdings(symbol, portion)
return
elif self.portfolio[symbol].invested:
self.set_holdings(symbol, portion)
return
# Calculate for limit order
# using 99% of buying power to avoid "Insufficient buying power to complete orders" error
# no idea why QC's engine has weird initial margin stuff
# TFSA doesn't have any initial margin requirements, so we can use 100% - 10 dollars
buying_power = (
self.portfolio.margin_remaining - 10
if self.live_mode
else self.portfolio.margin_remaining * 0.988
)
symbol_price = self.securities[symbol].ask_price
# 3 cents buffer for limit order, IBKR will find lowest ask
limit_price = round_up(symbol_price + 0.03)
shares_num: int = math.floor(buying_power / limit_price)
security = self.securities[symbol]
initial_margin_params = InitialMarginParameters(security, shares_num - 1)
initial_margin_required = (
security.buying_power_model.get_initial_margin_requirement(
initial_margin_params
)
)
# Extract the numeric value from the InitialMargin object
required_margin_value = initial_margin_required.value
while (
shares_num >= 2
and required_margin_value > buying_power - self.ibkr_fee_buffer
):
shares_num = shares_num - 1
initial_margin_params = InitialMarginParameters(security, shares_num)
initial_margin_required = (
security.buying_power_model.get_initial_margin_requirement(
initial_margin_params
)
)
required_margin_value = initial_margin_required.value
# order_amount = shares_num * limit_price
# satisfy_ibkr_market_order = (1 - (order_amount / cash)) > self.ibkr_market_order_buffer
if (
shares_num >= 2
and required_margin_value < buying_power - self.ibkr_fee_buffer
):
self.limit_order(symbol, shares_num, limit_price)
return
def send_live_metrics(self):
if not self.live_mode or self.metrics_sent:
return
formatted_data = format_indicator_dict(self.indicator_data, self.target_allocation)
# self.log_indicator_dict(self.indicator_data)
self.notify.email(settings.notify_email_address,
f"Live data | {self.name}",
formatted_data)
self.metrics_sent = True
def log_indicator_dict(self, data: dict):
log = self.log
log(f"{self.time} ----------------\n")
for indicator, symbols in data.items():
log(indicator.upper())
if indicator == "price":
for symbol, value in symbols.items():
log(f"{symbol}: {value}")
else:
for symbol, periods in symbols.items():
for period, value in periods.items():
log(f"{symbol} {period}-period: {value}")
log("") # blank line between indicator groups
def on_end_of_algorithm(self) -> None:
self.debug(f"Cash invested: {self.cashInvested}")
time_elapsed = datetime.now() - self.start_time
self.debug(f"Algo took: {time_elapsed.total_seconds()} seconds")
def sort_tickers_by(self, tickers, eval_function, direction='ascending', period=10):
"""
Sort tickers by a given evaluation function and return the top or bottom ticker.
Args:
tickers (list): List of ticker symbols (strings)
eval_function (callable): Function that takes a ticker and returns a numeric value.
Example: lambda ticker: get_rsi(ticker)
direction (str): 'ascending' to get the ticker with the lowest value (bottom 1),
'descending' to get the ticker with the highest value (top 1)
Returns:
str: The ticker symbol that ranks either top 1 or bottom 1 based on direction
Example:
# Get ticker with highest RSI
top_ticker = sort_tickers(['AAPL', 'MSFT', 'GOOGL'], lambda t: get_rsi(t), 'descending')
# Get ticker with lowest price
bottom_ticker = sort_tickers(['AAPL', 'MSFT', 'GOOGL'], lambda t: get_price(t), 'ascending')
"""
if not tickers:
return None
# Evaluate all tickers with the provided function
ticker_values = []
for ticker in tickers:
try:
value = eval_function(ticker, period)
if value is not None: # Skip tickers where function returns None
ticker_values.append((ticker, value))
except Exception as e:
# Skip tickers that cause errors in evaluation
continue
if not ticker_values:
return None
# Sort based on direction
if direction.lower() == 'ascending':
# ascending: lowest value first (bottom 1)
ticker_values.sort(key=lambda x: x[1])
return ticker_values[0][0]
elif direction.lower() == 'descending':
# descending: highest value first (top 1)
ticker_values.sort(key=lambda x: x[1], reverse=True)
return ticker_values[0][0]
else:
raise ValueError(f"Invalid direction: {direction}. Use 'ascending' or 'descending'.")
def round_up(n, decimals=2):
multiplier = 10**decimals
return math.ceil(n * multiplier) / multiplier
def format_indicator_dict(data: dict, target_allocation: dict = None) -> str:
lines = []
for indicator, symbols in data.items():
lines.append(indicator.upper())
if indicator == "price":
for symbol, value in symbols.items():
lines.append(f" {symbol}: {value}")
else:
for symbol, periods in symbols.items():
for period, value in periods.items():
lines.append(f" {symbol} {period}-period: {value}")
lines.append("")
if target_allocation:
lines.append("TARGET ALLOCATION")
for symbol, portion in target_allocation.items():
lines.append(f" {symbol}: {portion}")
return "\n".join(lines).strip()
# region imports
from AlgorithmImports import *
# endregion
class Settings():
def __init__(self):
self.start_cash = 10000
self.dca_cash = 0
self.notify_email_address = "tuenguyen12329@gmail.com"
settings = Settings()