| Overall Statistics |
|
Total Orders 3938 Average Win 2.60% Average Loss -1.69% Compounding Annual Return 75.860% Drawdown 33.400% Expectancy 0.433 Start Equity 10000 End Equity 154080413.09 Net Profit 1540704.131% Sharpe Ratio 1.751 Sortino Ratio 2.134 Probabilistic Sharpe Ratio 99.314% Loss Rate 44% Win Rate 56% Profit-Loss Ratio 1.54 Alpha 0 Beta 0 Annual Standard Deviation 0.299 Annual Variance 0.089 Information Ratio 1.814 Tracking Error 0.299 Treynor Ratio 0 Total Fees $5827122.02 Estimated Strategy Capacity $1000.00 Lowest Capacity Asset EUM TXA0MGR7LUG5 Portfolio Turnover 46.48% Drawdown Recovery 339 |
from datetime import timedelta, datetime
import math
from AlgorithmImports import *
import numpy as np
from settings import settings
class RSIRebalanceStrategy(QCAlgorithm):
def Initialize(self):
self.set_start_date(2009, 1, 1)
self.should_trade = True
# https://www.interactivebrokers.ca/en/accounts/tradingPermissions.php?ib_entity=ca
self.ibkr_market_order_buffer = 0.02
self.ibkr_fee_buffer = 25
self.set_brokerage_model(
BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.CASH
)
self.symbols = [ 'SLV', "SHV", "EEM", "MMT", "XLU", "PIM", "IBB", "MHD", "XLP", "IEI", "IWM", "IGIB",
"DBE", "IEF", "DIA", "EUM", "EET", "EEV", "EDC", "EDZ", "SPY", 'GLD', 'UGL', 'DLN', 'ISCB', 'TQQQ', 'VIXY',
'VCR', 'XLP', 'VTV', 'VOOG', 'VOOV', 'SPY', 'QQQE']
self.equities = {}
for name in self.symbols:
self.equities[name] = self.add_equity_symbol(name)
if not self.live_mode:
self.set_warmup(timedelta(days=300))
self.set_cash(settings.start_cash)
self.schedule.on(
self.date_rules.month_start(1),
self.time_rules.after_market_close("EEM", 0),
self.add_cash,
)
# self.schedule.on(
# self.date_rules.month_start(15),
# self.time_rules.after_market_close("SPY", 0),
# self.add_cash,
# )
self.schedule.on(
self.date_rules.every_day(),
self.time_rules.before_market_close("EEM", 6),
self.reset_daily_variables,
)
step = -0.25 if self.live_mode else -1
stop = 0 if self.live_mode else 0
start = 3
for offset in np.arange(start, stop, step):
self.schedule.on(
self.date_rules.every_day(),
self.time_rules.before_market_close("EEM", offset),
self.rebalance,
)
self.cashInvested = self.portfolio.cash_book["USD"].amount
self.liquidated = False
self.additional_cash = 0
self.prev_tqqq_rsi = 100
self.prev_tqqq_sma = 0
self.less_than_40 = 0
self.indicator_data = {
"rsi": {},
"sma": {}
}
self.start_time = datetime.now()
self.metrics_sent = False
def rebalance(self):
if self.time < self.start_date:
return
if not self.securities["SPY"].has_data:
return
# reset indicator data
if self.live_mode:
self.indicator_data = {
"rsi": {},
"sma": {}
}
rsi = self.rsi_2
sma = self.sma_2
set_holdings = self.set_holdings_2
equities = self.equities
price = self.price
target_ticker = "EEM"
if rsi("EEM", 14) < 30:
set_holdings("EDC", 1)
return
elif rsi("EEM", 10) > 80:
set_holdings("EDZ", 1)
return
if (
rsi("QQQE", 10) > 79
or rsi("VTV", 10) > 79
or rsi("VOOG", 10) > 79
or rsi("VOOV", 10) > 79
or rsi("XLP", 10) > 77
or rsi("TQQQ", 10) > 79
or rsi("SPY", 10) > 80
or rsi("VCR", 10) > 80
):
set_holdings("VIXY", 1)
return
shv_price = price("SHV")
shv_sma = sma("SHV", 50)
eem_price = price("EEM")
eem_sma = sma("EEM", 200)
shv_bull = shv_price > shv_sma
eem_bull = eem_price > eem_sma
# Store market conditions for reporting
self.market_conditions = {
"shv_bull": shv_bull,
"shv_price": shv_price,
"shv_sma": shv_sma,
"eem_bull": eem_bull,
"eem_price": eem_price,
"eem_sma": eem_sma
}
bullish_weight, total_weight, signal_details = self.create_signals(rsi, shv_bull, eem_bull)
# Store signal details for reporting
self.signal_details = signal_details
self.bullish_weight = bullish_weight
self.total_weight = total_weight
# Convert to ratio (0 to 1)
bullish_ratio = bullish_weight / total_weight if total_weight > 0 else 0.5
self.bullish_ratio = bullish_ratio
# Map to 5 buckets
if bullish_ratio >= 0.6: # 50% long
self.decision = "EET"
set_holdings("EET", 1)
elif bullish_ratio >= 0.4: # neutral
self.decision = "GLD"
set_holdings("GLD", 1)
elif bullish_ratio >= 0.2: # 50% short
self.decision = "EUM"
set_holdings("EUM", 1)
else: # 100% short
self.decision = "EEV"
set_holdings("EEV", 1)
self.send_live_metrics()
def create_signals(self, rsi, shv_bull, eem_bull):
"""
Returns total bullish weight, total possible weight, and detailed signal results.
Each signal contributes a weight when left RSI > right RSI.
"""
signals = [
# Signal.create(10, "IGIB", "SPY", (shv, eem) -> shv || eem ? 0 : shv ? 0.5 : 1)
(10, "IGIB", 10, "SPY", lambda s, e: 0 if (s or e) else 1),
# Signal.create(15, "IGIB", "EEM", (shv, eem) -> shv && !eem ? 0.5 : 0)
(15, "IGIB", 15, "EEM", lambda s, e: 0.5 if (s and not e) else 0),
# Signal.create(10, "IEI", 15, "IWM", (shv, eem) -> shv && eem ? 1 : 0)
(10, "IEI", 15, "IWM", lambda s, e: 1 if (s and e) else 0),
# Signal.create(10, "IGIB", "DLN", (shv, eem) -> !eem ? 1 : 0)
(10, "IGIB", 10, "DLN", lambda s, e: 1 if not e else 0),
# Signal.create(10, "ISCB", "IWM", (shv, eem) -> !eem ? 1 : 0)
(10, "ISCB", 10, "IWM", lambda s, e: 1 if not e else 0),
# Signal.create(10, "IGIB", "DBE", (shv, eem) -> eem ? 1 : 0)
(10, "IGIB", 10, "DBE", lambda s, e: 1 if e else 0),
# Signal.create(10, "IEF", "DIA", (shv, eem) -> eem ? 1 : 0)
(10, "IEF", 10, "DIA", lambda s, e: 1 if e else 0),
# Always-on signals (1/3 weight each)
(10, "MMT", 10, "XLU", lambda s, e: 1/3),
(10, "PIM", 10, "IBB", lambda s, e: 1/3),
(10, "MHD", 10, "XLP", lambda s, e: 1/3),
]
total_bullish_weight = 0
total_possible_weight = 0
signal_details = []
for left_window, left_ticker, right_window, right_ticker, weight_func in signals:
weight = weight_func(shv_bull, eem_bull)
total_possible_weight += weight
left_rsi = rsi(left_ticker, left_window)
right_rsi = rsi(right_ticker, right_window)
is_bullish = left_rsi > right_rsi
if is_bullish:
total_bullish_weight += weight
signal_details.append({
"left_ticker": left_ticker,
"left_window": left_window,
"right_ticker": right_ticker,
"right_window": right_window,
"weight": weight,
"left_rsi": left_rsi,
"right_rsi": right_rsi,
"is_bullish": is_bullish
})
return total_bullish_weight, total_possible_weight, signal_details
def reset_daily_variables(self):
self.liquidated = False
self.indicator_data = {
"rsi": {},
"sma": {}
}
self.metrics_sent = False
def add_equity_symbol(self, symbol: str) -> Symbol:
s = self.add_equity(
symbol, Resolution.MINUTE, data_normalization_mode=DataNormalizationMode.ADJUSTED
)
s.set_settlement_model(ImmediateSettlementModel())
s.set_fill_model(ImmediateFillModel())
return s.Symbol
def add_cash(self):
dcaCash = settings.dca_cash
self.additional_cash += dcaCash
# if self.rsi_2("TQQQ", 10) < 45:
# self.portfolio.cash_book["USD"].add_amount(self.additional_cash)
# self.additional_cash = 0
self.cashInvested += dcaCash
def price(self, symbol: str):
return self.securities[symbol].price
# def rsi_2(self, symbol: str, period):
# if self.indicator_data["rsi"].get(symbol) is None:
# self.indicator_data["rsi"][symbol] = {}
# if self.indicator_data["rsi"][symbol].get(period) is None or self.indicator_data["rsi"][symbol].get(period) == 0:
# self.indicator_data["rsi"][symbol][period] = 0
# else:
# return self.indicator_data["rsi"][symbol][period]
# extension = min(round(11 * period**0.5 + 5.5*period), 2000)
# r_w = RollingWindow[float](extension) #90
# change = RollingWindow[float](extension - 1) #89
# history = self.history(symbol, extension, Resolution.DAILY)
# for historical_bar in history:
# r_w.add(historical_bar.close)
# r_w.add(self.securities[symbol].price)
# for i in range(r_w.count - 2, -1, -1):
# change.add(round(r_w[i] - r_w[i+1], 10))
# if r_w.is_ready and change.is_ready:
# gain = 0
# loss = 0
# for i in range(change.count - 1, change.count - 1 - period, -1):
# gain += max(change[i], 0)
# loss += abs(min(change[i], 0))
# avgGainStartingPt = round(gain / period, 10)
# avgLossStartingPt = round(loss / period, 10)
# #Start avgGain and avgLoss
# avgGain = round(((period - 1) * avgGainStartingPt + max(change[change.count - 1 - period], 0)) / period, 10)
# avgLoss = round(((period - 1) * avgLossStartingPt + abs(min(change[change.count - 1 - period], 0))) / period, 10)
# for i in range(change.count - 1 - period - 1, -1, -1):
# avgGain = round(((period - 1) * avgGain + max(change[i], 0)) / period, 10)
# avgLoss = round(((period - 1) * avgLoss + abs(min(change[i], 0))) / period, 10)
# if avgLoss == 0:
# return 100
# else:
# rs = round(avgGain / avgLoss, 10)
# rsi = round(100 - 100/(1+rs), 5)
# self.indicator_data["rsi"][symbol][period] = rsi
# return rsi
# else:
# return 50 #None
def rsi_2(self, symbol: str, period):
if self.indicator_data["rsi"].get(symbol) is None:
self.indicator_data["rsi"][symbol] = {}
if self.indicator_data["rsi"][symbol].get(period) is None or self.indicator_data["rsi"][symbol].get(period) == 0:
self.indicator_data["rsi"][symbol][period] = 0
else:
return self.indicator_data["rsi"][symbol][period]
warmup = int(round_up(11 * math.sqrt(period) + 5.5 * period, 0))
extension = min(warmup, 250)
r_w = RollingWindow[float](extension)
history = self.history(symbol, extension - 1, Resolution.DAILY)
for historical_bar in history:
r_w.add(historical_bar.close)
while r_w.count < extension:
current_price = self.securities[symbol].price
if self.live_mode:
self.log(f"{symbol}: {current_price}")
r_w.add(current_price)
if r_w.is_ready:
average_gain = 0
average_loss = 0
gain = 0
loss = 0
for i in range(extension - 1, extension - period - 1, -1):
gain += max(r_w[i - 1] - r_w[i], 0)
loss += abs(min(r_w[i - 1] - r_w[i], 0))
average_gain = gain / period
average_loss = loss / period
for i in range(extension - period - 1, 0, -1):
average_gain = (
average_gain * (period - 1) + max(r_w[i - 1] - r_w[i], 0)
) / period
average_loss = (
average_loss * (period - 1) + abs(min(r_w[i - 1] - r_w[i], 0))
) / period
if average_loss == 0:
return 100
else:
rsi = round(100 - (100 / (1 + average_gain / average_loss)), 2)
self.indicator_data["rsi"][symbol][period] = rsi
return rsi
else:
return None
def sma_2(self, symbol: str, period):
if self.indicator_data["sma"].get(symbol) is None:
self.indicator_data["sma"][symbol] = {}
if self.indicator_data["sma"][symbol].get(period) is None or self.indicator_data["sma"][symbol].get(period) == 0:
self.indicator_data["sma"][symbol][period] = 0
else:
return self.indicator_data["sma"][symbol][period]
r_w = RollingWindow[float](period)
history = self.history(symbol, period - 1, Resolution.DAILY)
total = sum(bar.close for bar in history) + self.securities[symbol].price
sma = round(total / period, 3)
self.indicator_data["sma"][symbol][period] = sma
return sma
def set_holdings_2(self, symbol: str, portion=1):
"""IBKR Brokage Model somehow doesn't wait till liquidation finishes in set_holdings(symbol, 1, True)
So we liquidate explicitly first and set_holdings after
"""
if (self.should_trade == False):
self.notify.email(settings.notify_email_address,
"Live data | HG QQQ - IEF KMLM Short",
f"Boolean self.should_trade is {self.should_trade} so we don't trade today")
return
# liquidate any other symbols when switching
if (
not self.liquidated
and not self.portfolio[symbol].invested
and self.portfolio.invested
):
# for curr_pos in current_positions:
# self.liquidate(curr_pos, tag="Liquidated")
self.liquidate()
self.liquidated = True
return
if (
len(self.transactions.get_open_orders()) > 0
or self.portfolio.unsettled_cash > 0
or (
self.liquidated
and self.portfolio.total_portfolio_value != self.portfolio.cash
)
or not self.securities[symbol].has_data
):
return
# Usually portion < 1 is for risky asset like VIXY, no need to keep it's portion
# as we don't hold it for long
if portion < 1:
if self.liquidated and not self.portfolio.invested:
self.set_holdings(symbol, portion)
return
elif self.portfolio[symbol].invested:
self.set_holdings(symbol, portion)
return
# Calculate for limit order
# using 99% of buying power to avoid "Insufficient buying power to complete orders" error
# no idea why QC's engine has weird initial margin stuff
# TFSA doesn't have any initial margin requirements, so we can use 100% - 10 dollars
buying_power = (
self.portfolio.margin_remaining - 10
if self.live_mode
else self.portfolio.margin_remaining * 0.988
)
symbol_price = self.securities[symbol].ask_price
# 5 cents buffer for limit order, IBKR will find lowest ask
limit_price = round_up(symbol_price + 0.03)
shares_num: int = math.floor(buying_power / limit_price)
security = self.securities[symbol]
initial_margin_params = InitialMarginParameters(security, shares_num - 1)
initial_margin_required = (
security.buying_power_model.get_initial_margin_requirement(
initial_margin_params
)
)
# Extract the numeric value from the InitialMargin object
required_margin_value = initial_margin_required.value
while (
shares_num >= 2
and required_margin_value > buying_power - self.ibkr_fee_buffer
):
shares_num = shares_num - 1
initial_margin_params = InitialMarginParameters(security, shares_num)
initial_margin_required = (
security.buying_power_model.get_initial_margin_requirement(
initial_margin_params
)
)
required_margin_value = initial_margin_required.value
# order_amount = shares_num * limit_price
# satisfy_ibkr_market_order = (1 - (order_amount / cash)) > self.ibkr_market_order_buffer
if (
shares_num >= 2
and required_margin_value < buying_power - self.ibkr_fee_buffer
):
self.limit_order(symbol, shares_num, limit_price)
return
def send_live_metrics(self):
if not self.metrics_sent or not self.live_mode:
return
formatted_indicators = format_indicator_dict(self.indicator_data)
formatted_signals = format_signal_report(
self.market_conditions,
self.signal_details,
self.bullish_weight,
self.total_weight,
self.bullish_ratio,
self.decision
)
full_report = f"{formatted_signals}\n\n{formatted_indicators}"
self.log_indicator_dict(self.indicator_data)
self.log(formatted_signals)
self.notify.email(settings.notify_email_address,
"Live data | EEM Signals Strategy",
full_report)
self.metrics_sent = True
def log_indicator_dict(self, data: dict):
log = self.log
log(f"{self.time} ----------------\n")
for indicator, symbols in data.items():
log(indicator.upper())
for symbol, periods in symbols.items():
for period, value in periods.items():
log(f"{symbol} {period}-period: {value}")
log("") # blank line between indicator groups
def on_end_of_algorithm(self) -> None:
self.debug(f"Cash invested: {self.cashInvested}")
time_elapsed = datetime.now() - self.start_time
self.debug(f"Algo took: {time_elapsed.total_seconds()} seconds")
def round_up(n, decimals=2):
multiplier = 10**decimals
return math.ceil(n * multiplier) / multiplier
def format_indicator_dict(data: dict) -> str:
lines = []
for indicator, symbols in data.items():
lines.append(indicator.upper())
for symbol, periods in symbols.items():
parts = []
for period, value in periods.items():
parts.append(f"{symbol} {period}-period: {value}")
lines.append(" | ".join(parts))
lines.append("") # blank line between indicators
return "\n".join(lines).strip()
def format_signal_report(market_conditions, signal_details, bullish_weight, total_weight, bullish_ratio, decision):
"""Format signal data into a readable report."""
lines = []
sep = "=" * 70
# Market Conditions
lines.append(sep)
lines.append("MARKET CONDITIONS")
lines.append(sep)
shv_status = "✓ YES" if market_conditions["shv_bull"] else "✗ NO"
shv_cmp = ">" if market_conditions["shv_bull"] else "<"
lines.append(f"SHV Bull (SHV > SMA50): {shv_status} | Price: {market_conditions['shv_price']:.2f} {shv_cmp} SMA: {market_conditions['shv_sma']:.2f}")
eem_status = "✓ YES" if market_conditions["eem_bull"] else "✗ NO"
eem_cmp = ">" if market_conditions["eem_bull"] else "<"
lines.append(f"EEM Bull (EEM > SMA200): {eem_status} | Price: {market_conditions['eem_price']:.2f} {eem_cmp} SMA: {market_conditions['eem_sma']:.2f}")
# Signals Summary
lines.append("")
lines.append(sep)
lines.append(f"SIGNALS (Bullish: {bullish_weight:.2f} / {total_weight:.2f} = {bullish_ratio*100:.1f}%)")
lines.append(sep)
# Table header
lines.append(f"{'Signal':<25} | {'Weight':>6} | {'RSI L':>7} | {'RSI R':>7} | Result")
lines.append("-" * 25 + "-|-" + "-" * 6 + "-|-" + "-" * 7 + "-|-" + "-" * 7 + "-|-" + "-" * 8)
for sig in signal_details:
signal_name = f"{sig['left_ticker']}({sig['left_window']}) vs {sig['right_ticker']}({sig['right_window']})"
if sig['weight'] == 0:
result = "SKIP"
elif sig['is_bullish']:
result = "✓ BULL"
else:
result = "✗ BEAR"
left_rsi = f"{sig['left_rsi']:.1f}" if sig['left_rsi'] is not None else "N/A"
right_rsi = f"{sig['right_rsi']:.1f}" if sig['right_rsi'] is not None else "N/A"
lines.append(f"{signal_name:<25} | {sig['weight']:>6.2f} | {left_rsi:>7} | {right_rsi:>7} | {result}")
# Decision
lines.append("")
lines.append(sep)
if bullish_ratio >= 0.6:
bucket = "Long (>=60%)"
elif bullish_ratio >= 0.4:
bucket = "Neutral (40-60%)"
elif bullish_ratio >= 0.2:
bucket = "Defensive (20-40%)"
else:
bucket = "Short (<20%)"
lines.append(f"DECISION: {decision} (Bullish Ratio: {bullish_ratio*100:.1f}% -> {bucket})")
lines.append(sep)
return "\n".join(lines)
# region imports
from AlgorithmImports import *
# endregion
class Settings():
def __init__(self):
self.start_cash = 10000
self.dca_cash = 0
self.notify_email_address = "tuenguyen12329@gmail.com"
settings = Settings()