| Overall Statistics |
|
Total Orders 689 Average Win 1.47% Average Loss -0.71% Compounding Annual Return 39.433% Drawdown 15.000% Expectancy 0.453 Start Equity 1000000.00 End Equity 1285773.52 Net Profit 28.577% Sharpe Ratio 1.111 Sortino Ratio 1.454 Probabilistic Sharpe Ratio 58.998% Loss Rate 53% Win Rate 47% Profit-Loss Ratio 2.08 Alpha 0.04 Beta 0.25 Annual Standard Deviation 0.209 Annual Variance 0.044 Information Ratio -1.548 Tracking Error 0.346 Treynor Ratio 0.928 Total Fees $0.00 Estimated Strategy Capacity $1200000.00 Lowest Capacity Asset BTCUSDT 18N Portfolio Turnover 39.77% Drawdown Recovery 21 |
from AlgorithmImports import *
from sklearn.ensemble import RandomForestClassifier
import numpy as np
class HybridAlpha(AlphaModel):
def __init__(self, algorithm, binance_symbol, coinbase_symbol, stop_loss_percentage=0.03):
self.algorithm = algorithm
self.binance = binance_symbol
self.coinbase = coinbase_symbol
self.hour = -4
self.stop_loss_percentage = stop_loss_percentage
self.entry_price = None
# Directional parameters
self.volume_period = 5
self.volume_window = RollingWindow[float](self.volume_period)
self.atr_period = 10
self.atr_window = RollingWindow[float](self.atr_period)
self.momentum_period = 3
self.window = RollingWindow[TradeBar](max(self.volume_period, self.atr_period, self.momentum_period) + 20)
# Arbitrage parameters
self.spread_threshold = 0.005
self.arbitrage_active = False
# ML components
self.ml_model = RandomForestClassifier(n_estimators=30, max_depth=4, random_state=42, min_samples_split=10)
self.ml_trained = False
self.ml_lookback = 5 # Use n days of history for training
self.train_once = True # Train once after warmup
# Consolidator
self.consolidator = TradeBarConsolidator(timedelta(days=1))
self.consolidator.data_consolidated += self.on_consolidated
algorithm.subscription_manager.add_consolidator(self.binance, self.consolidator)
# Warmup - need extra data for labeling
warmup_days = max(self.volume_period, self.atr_period, self.ml_lookback) + 10
history = algorithm.history[TradeBar](self.binance, warmup_days * 24 * 60, Resolution.MINUTE)
for bar in history:
self.consolidator.update(bar)
def on_consolidated(self, sender, bar):
self.window.add(bar)
if self.window.count >= self.volume_period:
volumes = [self.window[i].volume for i in range(self.volume_period)]
avg_volume = sum(volumes) / self.volume_period
self.volume_window.add(avg_volume)
if self.window.count >= 2:
prev_close = self.window[1].close
tr = max(bar.high - bar.low, abs(bar.high - prev_close), abs(prev_close - bar.low))
self.atr_window.add(tr)
def extract_features(self, index=0):
"""Extract features at specific window index for backtesting"""
if self.window.count < max(self.volume_period, self.atr_period) + index:
return None
bar = self.window[index]
# Volume ratio
if self.volume_window.count > index:
avg_volume = self.volume_window[index]
volume_ratio = bar.volume / avg_volume if avg_volume > 0 else 1
else:
return None
# ATR
if self.atr_window.count >= self.atr_period:
current_atr = sum([self.atr_window[i] for i in range(index, min(index + self.atr_period, self.atr_window.count))]) / self.atr_period
avg_atr = sum(self.atr_window) / self.atr_window.count
atr_ratio = current_atr / avg_atr if avg_atr > 0 else 1
else:
return None
# Momentum
if self.window.count >= self.momentum_period + index + 1:
past_price = self.window[self.momentum_period + index].close
momentum = (bar.close - past_price) / past_price if past_price != 0 else 0
else:
return None
# Price changes
price_change_1d = (bar.close - bar.open) / bar.open if bar.open > 0 else 0
if self.window.count >= 3 + index:
price_change_3d = (bar.close - self.window[2 + index].close) / self.window[2 + index].close if self.window[2 + index].close > 0 else 0
else:
price_change_3d = 0
# Simple RSI
rsi = 50
if self.window.count >= 14 + index:
gains = sum([max(self.window[i].close - self.window[i+1].close, 0) for i in range(index, min(index + 14, self.window.count - 1))])
losses = sum([max(self.window[i+1].close - self.window[i].close, 0) for i in range(index, min(index + 14, self.window.count - 1))])
if losses > 0:
rs = gains / losses
rsi = 100 - (100 / (1 + rs))
return [volume_ratio, atr_ratio, momentum, price_change_1d, price_change_3d, rsi / 100]
def train_model(self):
"""Train model once using historical data in backtest"""
if self.window.count < self.ml_lookback + 5:
return False
X_train = []
y_train = []
# Build training dataset from historical window
for i in range(5, min(self.ml_lookback, self.window.count - 1)):
features = self.extract_features(i)
if features is None:
continue
# Label: 1 if price increased >2% in next 5 days, 0 otherwise
current_price = self.window[i].close
future_price = self.window[max(0, i - 5)].close # Looking forward (reversed index)
label = 1 if future_price > current_price * 1.02 else 0
X_train.append(features)
y_train.append(label)
if len(X_train) >= 20: # Need minimum samples
X = np.array(X_train)
y = np.array(y_train)
self.ml_model.fit(X, y)
self.ml_trained = True
self.algorithm.log(f"ML model trained with {len(X)} historical samples")
return True
return False
def update(self, algorithm, data):
if not (data.contains_key(self.binance) and data.contains_key(self.coinbase)) or self.window.count < max(self.volume_period, self.atr_period):
return []
if self.hour == algorithm.time.hour:
return []
self.hour = algorithm.time.hour
# Train model once after warmup in backtest
if self.train_once and not self.ml_trained:
if self.train_model():
self.train_once = False
binance_price = algorithm.securities[self.binance].price
coinbase_price = algorithm.securities[self.coinbase].price
bar = self.window[0]
insights = []
# Directional strategy with ML
if not self.arbitrage_active:
# Stop-loss
if self.entry_price is not None and algorithm.portfolio[self.binance].quantity > 0:
stop_loss_price = self.entry_price * (1 - self.stop_loss_percentage)
if binance_price <= stop_loss_price:
algorithm.insights.cancel([self.binance])
algorithm.log(f"Stop-loss at {stop_loss_price:.2f}")
self.entry_price = None
return []
# Traditional signals
current_volume = bar.volume
avg_volume = self.volume_window[0] if self.volume_window.count > 0 else 1
volume_surge = current_volume > avg_volume * 1.2
current_atr = sum(self.atr_window) / self.atr_period if self.atr_window.count >= self.atr_period else 0
high_volatility = current_atr > 0 # Simplified
if self.window.count >= self.momentum_period + 1:
past_price = self.window[self.momentum_period].close
momentum = (binance_price - past_price) / past_price if past_price != 0 else 0
else:
momentum = 0
positive_momentum = momentum > 0
# ML prediction
ml_signal = False
if self.ml_trained:
features = self.extract_features(0)
if features is not None:
try:
prediction = self.ml_model.predict([features])[0]
proba = self.ml_model.predict_proba([features])[0]
confidence = proba[1] if len(proba) > 1 else 0.5
ml_signal = prediction == 1 and confidence > 0.65
except:
ml_signal = False
# Combined: Traditional AND ML must agree (stronger signal)
traditional_signal = volume_surge and positive_momentum
if traditional_signal and ml_signal:
self.entry_price = binance_price
algorithm.log(f"ML+Traditional BUY at {binance_price:.2f}")
insights.append(Insight(self.binance, timedelta(days=30), InsightType.PRICE, InsightDirection.UP))
elif not self.ml_trained and traditional_signal:
# Fallback to traditional only if ML not ready
self.entry_price = binance_price
algorithm.log(f"Traditional BUY at {binance_price:.2f}")
insights.append(Insight(self.binance, timedelta(days=30), InsightType.PRICE, InsightDirection.UP))
# Exit
if algorithm.portfolio[self.binance].invested and not (positive_momentum or ml_signal):
algorithm.insights.cancel([self.binance])
algorithm.log("EXIT signal")
self.entry_price = None
# Arbitrage (unchanged)
spread = abs(binance_price - coinbase_price) / min(binance_price, coinbase_price)
if spread > self.spread_threshold and not algorithm.portfolio[self.binance].invested:
self.arbitrage_active = True
if binance_price < coinbase_price:
insights.append(Insight(self.binance, timedelta(hours=24), InsightType.PRICE, InsightDirection.UP))
insights.append(Insight(self.coinbase, timedelta(hours=24), InsightType.PRICE, InsightDirection.DOWN))
algorithm.log(f"ARB: Buy Binance, Sell Coinbase {spread:.2%}")
else:
insights.append(Insight(self.binance, timedelta(hours=24), InsightType.PRICE, InsightDirection.DOWN))
insights.append(Insight(self.coinbase, timedelta(hours=24), InsightType.PRICE, InsightDirection.UP))
algorithm.log(f"ARB: Buy Coinbase, Sell Binance {spread:.2%}")
elif self.arbitrage_active and spread < self.spread_threshold / 2:
algorithm.insights.cancel([self.binance, self.coinbase])
algorithm.log(f"ARB exit {spread:.2%}")
self.arbitrage_active = False
return insightsfrom AlgorithmImports import *
from alpha import HybridAlpha # Ensure this matches your alpha.py file
class MyAlgorithm(QCAlgorithm):
def initialize(self):
self.set_start_date(2018, 1, 1)
self.set_end_date(2025, 8, 1)
self.set_cash(100000)
# Set warm-up period for data pre-loading
self.set_warm_up(timedelta(days=30))
# Debug to confirm import
self.debug("Imported HybridAlpha successfully")
# BTC pair only
btc_binance = self.add_crypto("BTCUSDT", Resolution.MINUTE, Market.BINANCE)
btc_coinbase = self.add_crypto("BTCUSD", Resolution.MINUTE, Market.COINBASE)
binance_symbol = btc_binance.symbol
coinbase_symbol = btc_coinbase.symbol
# Set reality models for accurate fees and slippage per security
# Binance: Approx 0.1% maker/taker (base tier, adjust if using BNB discounts)
btc_binance.fee_model = CustomFeeModel(self, 0.001) # 0.1%
# Coinbase: Approx 0.6% maker (mid-tier, adjust based on volume tier)
btc_coinbase.fee_model = CustomFeeModel(self, 0.006)
# Optional: Add slippage for liquid pairs (0.01%)
btc_binance.slippage_model = ConstantSlippageModel(0.0001)
btc_coinbase.slippage_model = ConstantSlippageModel(0.0001)
# Add HybridAlpha model for BTC only
self.add_alpha(HybridAlpha(self, binance_symbol, coinbase_symbol))
self.set_portfolio_construction(EqualWeightingPortfolioConstructionModel())
self.set_execution(ImmediateExecutionModel())
# Upgrade risk management to cap losses per security
self.set_risk_management(MaximumDrawdownPercentPerSecurity(0.05))
# Use default brokerage for mixed-market compatibility
self.set_brokerage_model(BrokerageName.DEFAULT)
# Set benchmark
self.set_benchmark("BTCUSDT")
# Note: Multi-exchange arbitrage (Binance + Coinbase) not supported in live trading;
# requires separate algorithms per brokerage for live deployment.
def on_warmup_finished(self):
"""Called when warm-up period is complete"""
self.debug("Warm-up period finished - ready to trade")
def on_data(self, data):
pass
class CustomFeeModel(FeeModel):
def __init__(self, algorithm, fee_percent):
self.algorithm = algorithm
self.fee_percent = fee_percent
def get_order_fee(self, parameters):
# Apply percentage-based fee to order value
order_value = parameters.order.absolute_quantity * parameters.order.price
return OrderFee(CashAmount(order_value * self.fee_percent, "USD"))from AlgorithmImports import *
class HybridAlpha(AlphaModel):
def __init__(self, algorithm, binance_symbol, coinbase_symbol, stop_loss_percentage=0.03):
self.algorithm = algorithm
self.binance_symbol = binance_symbol
self.coinbase_symbol = coinbase_symbol
self.stop_loss_percentage = stop_loss_percentage
self.last_signal_time = datetime.min
self.entry_price = None
self.arbitrage_active = False
self.volume_period = 10
self.atr_period = 14
self.momentum_period = 5
self.atr_sma_period = 10
self.window_size = max(self.volume_period, self.atr_period, self.momentum_period)
self.volume_window = RollingWindow(self.volume_period)
self.atr_window = RollingWindow(self.atr_period)
self.atr_window_sma = RollingWindow(self.atr_sma_period)
self.window = RollingWindow(self.window_size)
self.volume_multiplier = 1.2
self.atr_multiplier = 1.1
self.spread_threshold = 0.015
# Consolidators for both symbols
self.consolidators = {}
for symbol in [self.binance_symbol, self.coinbase_symbol]:
consolidator = TradeBarConsolidator(timedelta(days=1))
consolidator.data_consolidated += (
lambda sender, bar, sym=symbol: self.on_consolidated(sym, bar)
)
self.consolidators[symbol] = consolidator
algorithm.subscription_manager.add_consolidator(symbol, consolidator)
# Warmup with daily historical data
for symbol in [self.binance_symbol, self.coinbase_symbol]:
history = algorithm.history[TradeBar](symbol, self.window_size, Resolution.DAILY)
bar_found = False
for bar in history:
self.consolidators[symbol].update(bar)
bar_found = True
if not bar_found:
algorithm.log(f"Warning: No historical data for {symbol}")
def on_consolidated(self, symbol, bar):
if symbol == self.binance_symbol:
self.window.add(bar)
self.algorithm.log(f"Updated window for {symbol}, count: {self.window.count}")
if self.window.count >= self.volume_period:
volumes = [self.window[i].volume for i in range(self.volume_period)]
avg_volume = sum(volumes) / self.volume_period if volumes else 0
self.volume_window.add(avg_volume)
self.algorithm.log(f"Volume SMA for {symbol}: {avg_volume:.0f}")
if self.window.count >= 2:
prev_close = self.window[1].close
tr = max(
bar.high - bar.low,
abs(bar.high - prev_close),
abs(prev_close - bar.low),
)
self.atr_window.add(tr)
if self.atr_window.count >= self.atr_period:
atr = sum(self.atr_window) / self.atr_period
self.atr_window_sma.add(atr)
self.algorithm.log(f"ATR SMA for {symbol}: {atr:.2f}")
def update(self, algorithm, data):
insights = []
if not (data.contains_key(self.binance_symbol) and data.contains_key(self.coinbase_symbol)):
algorithm.log(f"Missing data for {self.binance_symbol} or {self.coinbase_symbol}")
return insights
if self.window.count < self.window_size:
algorithm.log(
f"Insufficient window data for {self.binance_symbol}: "
f"{self.window.count}/{self.window_size}"
)
return insights
if (algorithm.time - self.last_signal_time).total_seconds() / 60 < 15:
return insights
self.last_signal_time = algorithm.time
binance_price = algorithm.securities[self.binance_symbol].price
coinbase_price = algorithm.securities[self.coinbase_symbol].price
if binance_price == 0 or coinbase_price == 0:
algorithm.log(f"Zero price detected for {self.binance_symbol} or {self.coinbase_symbol}")
return insights
bar = self.window[0]
if not self.arbitrage_active:
if self.entry_price is not None and algorithm.portfolio[self.binance_symbol].quantity > 0:
stop_loss_price = self.entry_price * (1 - self.stop_loss_percentage)
if binance_price <= stop_loss_price:
insights.append(
Insight(self.binance_symbol, timedelta(hours=1), InsightType.PRICE, InsightDirection.FLAT)
)
algorithm.log(f"Directional stop-loss for {self.binance_symbol} at {stop_loss_price:.2f}")
self.entry_price = None
return insights
current_volume = bar.volume
avg_volume = self.volume_window[0] if self.volume_window.count > 0 else 0
volume_surge = current_volume > avg_volume * self.volume_multiplier if avg_volume > 0 else False
current_atr = (
sum(self.atr_window) / self.atr_period
if self.atr_window.count >= self.atr_period
else 0
)
avg_atr = (
sum(self.atr_window_sma) / self.atr_sma_period
if self.atr_window_sma.count >= self.atr_sma_period
else 0
)
high_volatility = current_atr > avg_atr * self.atr_multiplier if avg_atr > 0 else False
if self.window.count >= self.momentum_period + 1:
past_price = self.window[self.momentum_period].close
momentum = (binance_price - past_price) / past_price if past_price != 0 else 0
else:
momentum = 0
positive_momentum = momentum > 0
algorithm.log(
f"{self.binance_symbol}: Volume={current_volume:.0f}, AvgVol={avg_volume:.0f}, "
f"VolumeSurge={volume_surge}, ATR={current_atr:.2f}, AvgATR={avg_atr:.2f}, "
f"HighVol={high_volatility}, Momentum={momentum:.2%}, PositiveMom={positive_momentum}"
)
if volume_surge and high_volatility and positive_momentum:
self.entry_price = binance_price
algorithm.log(f"Directional buy for {self.binance_symbol}: Volume surge, high volatility, positive momentum")
insights.append(
Insight(self.binance_symbol, timedelta(days=30), InsightType.PRICE, InsightDirection.UP)
)
if algorithm.portfolio[self.binance_symbol].invested and not (volume_surge or positive_momentum):
insights.append(
Insight(self.binance_symbol, timedelta(hours=1), InsightType.PRICE, InsightDirection.FLAT)
)
algorithm.log(f"Directional sell for {self.binance_symbol}: Conditions weakened")
self.entry_price = None
spread = (
abs(binance_price - coinbase_price) / min(binance_price, coinbase_price)
if min(binance_price, coinbase_price) > 0
else 0
)
algorithm.log(f"Arbitrage {self.binance_symbol}/{self.coinbase_symbol}: Spread={spread:.2%}, Threshold={self.spread_threshold:.2%}")
if spread > self.spread_threshold and not (algorithm.portfolio[self.binance_symbol].invested or algorithm.portfolio[self.coinbase_symbol].invested):
self.arbitrage_active = True
if binance_price < coinbase_price:
insights.append(
Insight(self.binance_symbol, timedelta(hours=24), InsightType.PRICE, InsightDirection.UP)
)
insights.append(
Insight(self.coinbase_symbol, timedelta(hours=24), InsightType.PRICE, InsightDirection.DOWN)
)
algorithm.log(
f"Arbitrage: Buy {self.binance_symbol} ({binance_price:.2f}), Sell {self.coinbase_symbol} ({coinbase_price:.2f}), Spread {spread:.2%}"
)
else:
insights.append(
Insight(self.binance_symbol, timedelta(hours=24), InsightType.PRICE, InsightDirection.DOWN)
)
insights.append(
Insight(self.coinbase_symbol, timedelta(hours=24), InsightType.PRICE, InsightDirection.UP)
)
algorithm.log(
f"Arbitrage: Buy {self.coinbase_symbol} ({coinbase_price:.2f}), Sell {self.binance_symbol} ({binance_price:.2f}), Spread {spread:.2%}"
)
elif self.arbitrage_active and spread < self.spread_threshold / 2:
insights.append(
Insight(self.binance_symbol, timedelta(hours=1), InsightType.PRICE, InsightDirection.FLAT)
)
insights.append(
Insight(self.coinbase_symbol, timedelta(hours=1), InsightType.PRICE, InsightDirection.FLAT)
)
algorithm.log(
f"Arbitrage exit for {self.binance_symbol}/{self.coinbase_symbol}: Spread narrowed to {spread:.2%}"
)
self.arbitrage_active = False
return insights