| Overall Statistics |
|
Total Orders 174 Average Win 7.01% Average Loss -4.86% Compounding Annual Return 7.851% Drawdown 45.300% Expectancy 0.046 Start Equity 10000 End Equity 15747.91 Net Profit 57.479% Sharpe Ratio 0.228 Sortino Ratio 0.161 Probabilistic Sharpe Ratio 2.788% Loss Rate 57% Win Rate 43% Profit-Loss Ratio 1.44 Alpha -0.014 Beta 0.442 Annual Standard Deviation 0.191 Annual Variance 0.036 Information Ratio -0.43 Tracking Error 0.204 Treynor Ratio 0.099 Total Fees $86.00 Estimated Strategy Capacity $41000000.00 Lowest Capacity Asset QQQ 32SD71INLKSIU|QQQ RIWIV7K5Z9LX Portfolio Turnover 3.44% Drawdown Recovery 651 |
#region imports
from AlgorithmImports import *
#endregion
class MarketOpenExecutionModel(ExecutionModel):
'''Provides an implementation of IExecutionModel that immediately submits market orders to achieve the desired portfolio targets,
only when the market is open'''
def __init__(self):
self.targets_collection = PortfolioTargetCollection()
def execute(self, algorithm, targets):
# for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call
self.targets_collection.add_range(targets)
if not self.targets_collection.is_empty:
for target in self.targets_collection.order_by_margin_impact(algorithm):
security = algorithm.securities[target.symbol]
# calculate remaining quantity to be ordered
quantity = OrderSizing.get_unordered_quantity(algorithm, target, security)
if quantity != 0:
above_minimum_portfolio = BuyingPowerModelExtensions.above_minimum_order_margin_portfolio_percentage(security.buying_power_model, security, quantity, algorithm.portfolio, algorithm.settings.minimum_order_margin_portfolio_percentage)
# check if the market exchange is opened
is_open = security.exchange.hours.is_open(algorithm.time, False)
if above_minimum_portfolio and is_open:
algorithm.market_order(security, quantity)
self.targets_collection.clear_fulfilled(algorithm)# region imports
from AlgorithmImports import *
# endregion
class BullPutOnUptrend(QCAlgorithm):
def Initialize(self) -> None:
self.SetStartDate(2019, 7, 1)
self.SetEndDate(2025, 7, 1)
self.SetCash(10000)
# 1) Add the underlying equity explicitly
underlying_equity = self.AddEquity("QQQ", Resolution.Daily)
self.underlying = underlying_equity.Symbol
# 2) Add options on that equity (canonical)
option = self.AddOption("QQQ", Resolution.Minute)
self.symbol = option.Symbol
option.SetFilter(lambda u: u.IncludeWeeklys().Strikes(-7, +7).Expiration(0, 1))
# --- SMA(200) with DAILY consolidator (explicit daily updates) ---
self.sma200 = SimpleMovingAverage(180)
self.daily_consolidator = self.Consolidate(self.underlying, Resolution.Daily, self.OnDailyBar)
self.RegisterIndicator(self.underlying, self.sma200, self.daily_consolidator)
# Warm up indicators
self.SetWarmup(180, Resolution.Daily)
# Use underlying as benchmark
self.SetBenchmark(self.underlying)
self.openSpread = None
# --- Schedule the daily trading routine ---
# Run a few minutes after market open to use the most recent SMA (yesterday's close)
self.Schedule.On(
self.DateRules.EveryDay(self.underlying),
self.TimeRules.AfterMarketOpen(self.underlying, 10),
self.TradeDaily
)
# Called once per DAILY bar of the underlying (after the bar closes)
def OnDailyBar(self, bar: TradeBar) -> None:
if self.sma200.IsReady:
self.Plot("Indicators", "Close", bar.Close)
self.Plot("Indicators", "SMA200", float(self.sma200.Current.Value))
# Keep OnData minimal—no trading here anymore
def OnData(self, slice: Slice) -> None:
pass
# --- Daily scheduled trading logic ---
def TradeDaily(self) -> None:
# Ensure SMA is ready and we're not in warmup
if self.IsWarmingUp or not self.sma200.IsReady:
return
# Only place one spread at a time (avoid stacking)
if self.openSpread and any(self.Portfolio[x].Invested for x in self.openSpread):
return
# Use the equity price for the trend check
underlying_price = self.Securities[self.underlying].Price
if not underlying_price:
return
# Trade only if underlying is in an uptrend: Price > 200 SMA
if underlying_price <= self.sma200.Current.Value:
return
# Pull the current option contract list (scheduled events don't get a Slice)
contracts = list(self.OptionChainProvider.GetOptionContractList(self.underlying, self.Time))
if not contracts:
return
# Focus on PUTs
puts = [c for c in contracts if c.ID.OptionRight == OptionRight.Put]
if not puts:
return
# 1) Expiry closest to 30 calendar days
expiries = sorted({c.ID.Date for c in puts})
if not expiries:
return
target_expiry = min(expiries, key=lambda d: abs((d.date() - self.Time.date()).days - 30))
# 2) Puts at that expiry
puts_at_expiry = [c for c in puts if c.ID.Date == target_expiry]
if not puts_at_expiry:
return
# 3) Short put = nearest OTM strike (strike just below price; if none, nearest overall)
strikes = sorted({c.ID.StrikePrice for c in puts_at_expiry})
if not strikes:
return
otm_strikes = [k for k in strikes if k < underlying_price]
if otm_strikes:
short_strike = max(otm_strikes) # closest below price
else:
# fallback: nearest strike overall
short_strike = min(strikes, key=lambda k: abs(k - underlying_price))
# 4) Long put = 3 strikes below the short put (if available)
try:
sp_idx = strikes.index(short_strike)
except ValueError:
return
lp_idx = sp_idx - 5
if lp_idx < 0:
return
long_strike = strikes[lp_idx]
# 5) Build and SELL a Bull Put Spread (short higher strike, long lower strike)
spread = OptionStrategies.BullPutSpread(
self.symbol,
short_strike, # short (higher) strike
long_strike, # long (lower) strike
target_expiry
)
# Ensure exact legs exist as securities to avoid resolution issues
for right, strike in [(OptionRight.Put, long_strike), (OptionRight.Put, short_strike)]:
sym = Symbol.CreateOption(self.underlying, Market.USA, OptionStyle.American, right, strike, target_expiry)
if sym not in self.Securities:
self.AddOptionContract(sym, Resolution.Minute)
# Sell the spread
self.Buy(spread, 1)
self.Debug(
f"[{self.Time}] Sold Bull Put Spread: long {long_strike}P / short {short_strike}P @ {target_expiry.date()} | "
f"Price={underlying_price:.2f}, SMA200={self.sma200.Current.Value:.2f}"
)
# Track legs to avoid stacking
# Use actual symbols from current contracts list
long_put_symbol = Symbol.CreateOption(self.underlying, Market.USA, OptionStyle.American, OptionRight.Put, long_strike, target_expiry)
short_put_symbol = Symbol.CreateOption(self.underlying, Market.USA, OptionStyle.American, OptionRight.Put, short_strike, target_expiry)
self.openSpread = [long_put_symbol, short_put_symbol]
#region imports
from AlgorithmImports import *
from Portfolio.EqualWeightingPortfolioConstructionModel import EqualWeightingPortfolioConstructionModel
from utils import reset_and_warm_up
#endregion
class SparseOptimizationPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel):
'''Using sparse optimization to construct our own replicated portfolio compared to a benchmark
In this model, we only model the upside while discarding downside datapoints. For details, refer to
https://www.quantconnect.com/docs/v2/research-environment/applying-research/sparse-optimization
'''
def __init__(self, algorithm, benchmark, lookback = 252, resolution = None,
rebalance = Expiry.END_OF_QUARTER, portfolio_bias = PortfolioBias.LONG_SHORT):
super().__init__(rebalance, portfolio_bias)
self.algorithm = algorithm
self.lookback = lookback
self.resolution = resolution
self.benchmark = benchmark
self.w = None
def determine_target_percent(self, active_insights):
result = {}
# get the log return series for the benchmark and composites to construct the replicated portfolio
pct_change = {symbol: self.returns(self.algorithm.securities[symbol]) for symbol in self.algorithm.securities.keys() \
if symbol in [x.symbol for x in active_insights]+[self.benchmark]}
pct_change = pd.DataFrame(pct_change)
pct_change = pct_change.fillna(pct_change.mean())
pct_change_portfolio = pct_change[[x for x in pct_change.columns if x != self.benchmark]].dropna(axis=1)
if pct_change_portfolio.empty:
return {insight: 0 for insight in active_insights}
pct_change_benchmark = pct_change[self.benchmark].values.reshape(-1, 1)
active_symbols = pct_change_portfolio.columns
pct_change_portfolio = pct_change_portfolio.values
p = 0.1 # penalty term for exceeding upper limit, between 0 & 1
M = 0.01 # huber loss term size to adapt for outliers, between 0 & 1
l = 0.005 # minimum size of inclusive constituents, between 0 & 1
u = 0.10 # maximum size of inclusive constituents, between 0 & 1
tol = 0.001 # optimization problem tolerance
max_iter = 100 # optimization maximum iteration for speed
iters = 1 # counter for number of iteration
hdr = 10000 # optimization function output, an arbitary large number as initial state
m = pct_change_portfolio.shape[0]; n = pct_change_portfolio.shape[1]
# Use previous weightings as starting point if valid, otherwise use equal size
w_ = self.w.values.reshape(-1, 1) if self.w is not None and self.w.size == n \
else np.array([1/n] * n).reshape(n, 1)
# placeholders
weights = pd.Series()
a = np.array([None] * m).reshape(m, 1)
c = np.array([None] * m).reshape(m, 1)
d = np.array([None] * n).reshape(n, 1)
# Optimization Methods for Financial Index Tracking: From Theory to Practice. K. Benidis, Y. Feng, D. P. Palomer (2018)
# https://palomar.home.ece.ust.hk/papers/2018/BenidisFengPalomar-FnT2018.pdf
while iters < max_iter:
x_k = (pct_change_benchmark - pct_change_portfolio @ w_)
for i in range(n):
w = w_[i]
d[i] = d_ = 1/(np.log(1+l/p)*(p+w))
for i in range(m):
xk = float(x_k[i])
if xk < 0:
a[i] = M / (M - 2*xk)
c[i] = xk
else:
c[i] = 0
if 0 <= xk <= M:
a[i] = 1
else:
a[i] = M/abs(xk)
L3 = 1/m * pct_change_portfolio.T @ np.diagflat(a.T) @ pct_change_portfolio
eig_val, eig_vec = np.linalg.eig(L3.astype(float))
eig_val = np.real(eig_val); eig_vec = np.real(eig_vec)
q3 = 1/max(eig_val) * (2 * (L3 - max(eig_val) * np.eye(n)) @ w_ + eig_vec @ d \
- 2/m * pct_change_portfolio.T @ np.diagflat(a.T) @ (c - pct_change_benchmark))
mu = float(-(np.sum(q3) + 2)/n); mu_ = 0
while mu > mu_:
mu = mu_
index1 = [i for i, q in enumerate(q3) if mu + q < -u*2]
index2 = [i for i, q in enumerate(q3) if -u*2 < mu + q < 0]
mu_ = float(-(np.sum([q3[i] for i in index2]) + 2 - len(index1)*u*2)/max(1, len(index2)))
# Obtain the weights and HDR (optimization function result) of this iteration.
w_ = np.amax(np.concatenate((-(mu + q3)/2, u*np.ones((n, 1))), axis=1), axis=1).reshape(-1, 1)
w_ = w_/np.sum(abs(w_))
hdr_ = float(w_.T @ w_ + q3.T @ w_)
# If the HDR converges, we take the current weights
if abs(hdr - hdr_) < tol:
break
# Else, we would increase the iteration count and use the current weights for the next iteration.
iters += 1
hdr = hdr_
if all([x != np.nan for x in w_]):
self.w = pd.Series(data = w_.flatten(), index = active_symbols)
# Normalize the weights
total_weight = np.sum(np.abs(self.w.values))
for insight in active_insights:
symbol = insight.symbol
if symbol in active_symbols:
result[insight] = self.w[symbol] / total_weight
return result
def on_securities_changed(self, algorithm, changes):
super().on_securities_changed(algorithm, changes)
for added in changes.added_securities:
self.init_security_data(algorithm, added)
for removed in changes.removed_securities:
self.dispose_security_data(algorithm, removed)
def handle_corporate_actions(self, algorithm, slice):
symbols = set(slice.dividends.keys())
symbols.update(slice.splits.keys())
for symbol in symbols:
self.warm_up_indicator(algorithm.securities[symbol])
def warm_up_indicator(self, security):
self.reset(security)
security['consolidator'] = reset_and_warm_up(self.algorithm, security, self.resolution, self.lookback)
def init_security_data(self, algorithm, security):
# To store the historical daily log return
security['window'] = RollingWindow[IndicatorDataPoint](self.lookback)
# Use daily log return to predict cointegrating vector
security['logr'] = LogReturn(1)
security['logr'].updated += lambda sender, updated: security['window'].add(IndicatorDataPoint(updated.end_time, updated.value))
security['consolidator'] = TradeBarConsolidator(timedelta(1))
# Subscribe the consolidator and indicator to data for automatic update
algorithm.register_indicator(security.symbol, security['logr'], security['consolidator'])
self.warm_up_indicator(security)
def reset(self, security):
security['logr'].reset()
security['window'].reset()
def dispose_security_data(self, algorithm, security):
self.reset(security)
algorithm.subscription_manager.remove_consolidator(security.symbol, security['consolidator'])
def returns(self, security):
return pd.Series(
data = [x.value for x in security['window']],
index = [x.end_time for x in security['window']])[::-1]#region imports
from AlgorithmImports import *
#endregion
class MarketIndexETFUniverseSelectionModel(ETFConstituentsUniverseSelectionModel):
def __init__(self, benchmark, universe_settings: UniverseSettings = None) -> None:
super().__init__(benchmark, universe_settings, self.etf_constituents_filter)
def etf_constituents_filter(self, constituents):
# Get the 20 securities with the largest weight in the index
selected = sorted([c for c in constituents if c.weight],
key=lambda c: c.weight, reverse=True)
return [c.symbol for c in selected[:20]]#region imports
from AlgorithmImports import *
#endregion
def reset_and_warm_up(algorithm, security, resolution, lookback = None):
indicator = security['logr']
consolidator = security['consolidator']
if not lookback:
lookback = indicator.warm_up_period
# historical request to update the consolidator that will warm up the indicator
history = algorithm.history[consolidator.input_type](security.symbol, lookback, resolution,
data_normalization_mode = DataNormalizationMode.SCALED_RAW)
indicator.reset()
# Replace the consolidator, since we cannot reset it
# Not ideal since we don't the consolidator type and period
algorithm.subscription_manager.remove_consolidator(security.symbol, consolidator)
consolidator = TradeBarConsolidator(timedelta(1))
algorithm.register_indicator(security.symbol, indicator, consolidator)
for bar in list(history)[:-1]:
consolidator.update(bar)
return consolidator
'''
# In main.py, OnData and call HandleCorporateActions for framework models (if necessary)
def on_data(self, slice):
if slice.splits or slice.dividends:
self.alpha.handle_corporate_actions(self, slice)
self.pcm.handle_corporate_actions(self, slice)
self.risk.handle_corporate_actions(self, slice)
self.execution.handle_corporate_actions(self, slice)
# In the framework models, add
from utils import reset_and_warm_up
and implement HandleCorporateActions. E.g.:
def handle_corporate_actions(self, algorithm, slice):
for security.symbol, data in self.security.symbol_data.items():
if slice.splits.contains_key(security.symbol) or slice.dividends.contains_key(security.symbol):
data.warm_up_indicator()
where WarmUpIndicator will call reset_and_warm_up for each indicator/consolidator pair
'''