| Overall Statistics |
|
Total Orders 23 Average Win 8.51% Average Loss -4.31% Compounding Annual Return 33.727% Drawdown 17.000% Expectancy 0.891 Start Equity 100000.00 End Equity 179225.91 Net Profit 79.226% Sharpe Ratio 0.936 Sortino Ratio 1.234 Probabilistic Sharpe Ratio 54.561% Loss Rate 36% Win Rate 64% Profit-Loss Ratio 1.97 Alpha 0.139 Beta 0.458 Annual Standard Deviation 0.209 Annual Variance 0.044 Information Ratio 0.346 Tracking Error 0.211 Treynor Ratio 0.427 Total Fees $0.00 Estimated Strategy Capacity $3700000.00 Lowest Capacity Asset BTCUSD 2XR Portfolio Turnover 2.33% Drawdown Recovery 169 |
# region imports
from AlgorithmImports import *
import numpy as np
import pandas as pd
# endregion
class BTC_ETHPairsTrading(QCAlgorithm):
def Initialize(self):
"""Initialize the algorithm"""
# Set backtest period to match research data (2023-2025)
self.SetStartDate(2023, 1, 1)
self.SetEndDate(2025, 1, 1)
self.SetCash(100000)
# Add cryptocurrency symbols using proper QuantConnect syntax
self.btc = self.AddCrypto("BTCUSD", Resolution.Daily).Symbol
self.eth = self.AddCrypto("ETHUSD", Resolution.Daily).Symbol
# Initialize rolling windows for price history (store prices as float to avoid type issues)
self.btc_price_window = RollingWindow[float](60) # 60 days of BTC prices
self.eth_price_window = RollingWindow[float](60) # 60 days of ETH prices
self.ratio_window = RollingWindow[float](60) # 60 days of ratio data
# Technical indicators using QuantConnect's built-in indicators
self.btc_sma_5 = self.SMA(self.btc, 5)
self.btc_sma_10 = self.SMA(self.btc, 10)
self.btc_sma_20 = self.SMA(self.btc, 20)
self.btc_sma_50 = self.SMA(self.btc, 50)
self.eth_sma_5 = self.SMA(self.eth, 5)
self.eth_sma_10 = self.SMA(self.eth, 10)
self.eth_sma_20 = self.SMA(self.eth, 20)
self.eth_sma_50 = self.SMA(self.eth, 50)
# Bollinger Bands for ratio (we'll calculate manually)
self.ratio_sma_20 = SimpleMovingAverage(20)
self.ratio_std_20 = StandardDeviation(20)
# Strategy state
self.in_btc = False
self.position_size = 0.0
# Feature storage for analysis
self.feature_history = []
# Warm up period to ensure indicators are ready
self.SetWarmUp(60, Resolution.Daily)
# Schedule feature calculation daily after market close
self.Schedule.On(self.DateRules.EveryDay(self.btc),
self.TimeRules.BeforeMarketClose(self.btc, 30),
self.CalculateFeatures)
def OnData(self, data):
"""Main trading logic"""
# Skip during warm-up
if self.IsWarmingUp:
return
# Ensure we have data for both symbols
if not (data.ContainsKey(self.btc) and data.ContainsKey(self.eth)):
return
# Get current prices - handle both TradeBar and QuoteBar robustly
btc_bar = data[self.btc]
eth_bar = data[self.eth]
# Extract close prices - try multiple properties to handle different data types
btc_price = None
if hasattr(btc_bar, 'Close'):
btc_price = btc_bar.Close
elif hasattr(btc_bar, 'Value'):
btc_price = btc_bar.Value
elif hasattr(btc_bar, 'Price'):
btc_price = btc_bar.Price
eth_price = None
if hasattr(eth_bar, 'Close'):
eth_price = eth_bar.Close
elif hasattr(eth_bar, 'Value'):
eth_price = eth_bar.Value
elif hasattr(eth_bar, 'Price'):
eth_price = eth_bar.Price
if btc_price is None or eth_price is None or btc_price <= 0 or eth_price <= 0:
return
# Calculate current BTC/ETH ratio
current_ratio = btc_price / eth_price
# Update rolling windows with prices (no type issues)
self.btc_price_window.Add(float(btc_price))
self.eth_price_window.Add(float(eth_price))
self.ratio_window.Add(float(current_ratio))
# Update ratio indicators
self.ratio_sma_20.Update(IndicatorDataPoint(data.Time, current_ratio))
self.ratio_std_20.Update(IndicatorDataPoint(data.Time, current_ratio))
# Only trade if we have enough data
if not self.ratio_sma_20.IsReady or not self.ratio_std_20.IsReady:
return
# Calculate features for ML scoring
features = self.GetCurrentFeatures(current_ratio, btc_price, eth_price)
if features is None:
return
# Calculate ML scores
signal_score = self.CalculateSignalScore(features)
exit_score = self.CalculateExitScore(features)
# Store features for leak analysis
self.feature_history.append({
'Time': data.Time,
'Ratio': current_ratio,
'SignalScore': signal_score,
'ExitScore': exit_score,
'Features': features.copy()
})
# Dynamic position sizing based on signal strength
if signal_score > 0.8:
position_size = 1.0
elif signal_score > 0.7:
position_size = 0.8
elif signal_score > 0.6:
position_size = 0.6
else:
position_size = 0.4
# Entry logic - high signal confidence
entry_threshold = 0.6
if signal_score > entry_threshold and not self.in_btc:
# Calculate position size in dollars
portfolio_value = self.Portfolio.TotalPortfolioValue
position_value = portfolio_value * position_size
btc_quantity = position_value / btc_price
# Execute buy order
self.MarketOrder(self.btc, btc_quantity)
self.in_btc = True
self.position_size = position_size
self.Debug(f"BUY BTC: Ratio={current_ratio:.4f}, Signal={signal_score:.3f}, Size={position_size:.2f}")
# Exit logic - low confidence or high exit signal
exit_threshold = 0.4
if self.in_btc and (exit_score > exit_threshold or signal_score < 0.3):
# Close BTC position
self.Liquidate(self.btc)
self.in_btc = False
self.position_size = 0.0
self.Debug(f"SELL BTC: Ratio={current_ratio:.4f}, Signal={signal_score:.3f}, Exit={exit_score:.3f}")
def GetCurrentFeatures(self, current_ratio, btc_price, eth_price):
"""Calculate features avoiding lookahead bias"""
if self.ratio_window.Count < 50: # Need enough history
return None
features = {}
# 1. Moving average ratios (using QuantConnect indicators)
if self.btc_sma_5.IsReady and self.eth_sma_5.IsReady:
ratio_ma_5 = self.btc_sma_5.Current.Value / self.eth_sma_5.Current.Value
features['MA_Ratio_5'] = current_ratio / ratio_ma_5
if self.btc_sma_10.IsReady and self.eth_sma_10.IsReady:
ratio_ma_10 = self.btc_sma_10.Current.Value / self.eth_sma_10.Current.Value
features['MA_Ratio_10'] = current_ratio / ratio_ma_10
if self.btc_sma_20.IsReady and self.eth_sma_20.IsReady:
ratio_ma_20 = self.btc_sma_20.Current.Value / self.eth_sma_20.Current.Value
features['MA_Ratio_20'] = current_ratio / ratio_ma_20
if self.btc_sma_50.IsReady and self.eth_sma_50.IsReady:
ratio_ma_50 = self.btc_sma_50.Current.Value / self.eth_sma_50.Current.Value
features['MA_Ratio_50'] = current_ratio / ratio_ma_50
# 2. Momentum features
if 'MA_Ratio_5' in features and 'MA_Ratio_20' in features:
ma_5 = self.btc_sma_5.Current.Value / self.eth_sma_5.Current.Value
ma_20 = self.btc_sma_20.Current.Value / self.eth_sma_20.Current.Value
features['Momentum_5_20'] = (ma_5 - ma_20) / ma_20 if ma_20 > 0 else 0
if 'MA_Ratio_10' in features and 'MA_Ratio_50' in features:
ma_10 = self.btc_sma_10.Current.Value / self.eth_sma_10.Current.Value
ma_50 = self.btc_sma_50.Current.Value / self.eth_sma_50.Current.Value
features['Momentum_10_50'] = (ma_10 - ma_50) / ma_50 if ma_50 > 0 else 0
# 3. Volatility features using price windows
if self.ratio_window.Count >= 20:
recent_ratios = [self.ratio_window[i] for i in range(min(20, self.ratio_window.Count))]
recent_returns = [recent_ratios[i] / recent_ratios[i+1] - 1 for i in range(len(recent_ratios)-1)]
if len(recent_returns) >= 5:
vol_5 = np.std(recent_returns[:5])
vol_20 = np.std(recent_returns) if len(recent_returns) >= 20 else vol_5
features['Volatility_5'] = vol_5
features['Volatility_20'] = vol_20
features['Vol_Ratio'] = vol_5 / vol_20 if vol_20 > 0 else 1.0
# 4. Trend strength
if 'Momentum_10_50' in features:
features['Trend_Strength'] = abs(features['Momentum_10_50'])
# 5. Price position in range using price windows
if self.ratio_window.Count >= 20:
recent_ratios = [self.ratio_window[i] for i in range(min(20, self.ratio_window.Count))]
min_ratio = min(recent_ratios)
max_ratio = max(recent_ratios)
range_20 = max_ratio - min_ratio
features['Price_Position'] = (current_ratio - min_ratio) / range_20 if range_20 > 0 else 0.5
# 6. Bollinger Band position
if self.ratio_sma_20.IsReady and self.ratio_std_20.IsReady:
bb_middle = self.ratio_sma_20.Current.Value
bb_std = self.ratio_std_20.Current.Value
bb_upper = bb_middle + (2 * bb_std)
bb_lower = bb_middle - (2 * bb_std)
features['BB_Position'] = (current_ratio - bb_lower) / (bb_upper - bb_lower) if (bb_upper - bb_lower) > 0 else 0.5
# 7. Statistical features using price windows
if self.ratio_window.Count >= 20:
recent_ratios = [self.ratio_window[i] for i in range(min(20, self.ratio_window.Count))]
recent_returns = [recent_ratios[i] / recent_ratios[i+1] - 1 for i in range(len(recent_ratios)-1)]
if len(recent_returns) >= 10:
features['Skewness'] = self.CalculateSkewness(recent_returns)
features['Kurtosis'] = self.CalculateKurtosis(recent_returns)
return features
def CalculateSignalScore(self, features):
"""Calculate ensemble signal score - same logic as research notebook"""
score = 0
# Momentum signals (weight: 30%)
if features.get('Momentum_5_20', 0) > 0.02:
score += 0.15
if features.get('Momentum_10_50', 0) > 0.01:
score += 0.15
# Trend strength (weight: 20%)
if features.get('Trend_Strength', 0) > 0.02:
score += 0.20
# Price position signals (weight: 15%)
price_pos = features.get('Price_Position', 0.5)
if 0.2 < price_pos < 0.8: # Not at extremes
score += 0.075
bb_pos = features.get('BB_Position', 0.5)
if bb_pos < 0.3: # Near lower Bollinger Band
score += 0.075
# Volatility signals (weight: 15%)
vol_ratio = features.get('Vol_Ratio', 1.0)
if 0.8 < vol_ratio < 1.2: # Stable volatility
score += 0.075
vol_20 = features.get('Volatility_20', 0)
# Simplified volatility regime detection
if vol_20 < 0.02: # Low volatility
score += 0.075
# Statistical signals (weight: 10%)
skewness = features.get('Skewness', 0)
if -0.5 < skewness < 0.5: # Normal distribution
score += 0.05
kurtosis = features.get('Kurtosis', 3)
if kurtosis < 3: # Not too heavy tails
score += 0.05
# MA signals (weight: 10%)
ma_ratio_5 = features.get('MA_Ratio_5', 1.0)
ma_ratio_10 = features.get('MA_Ratio_10', 1.0)
if ma_ratio_5 > 1.0 and ma_ratio_10 > 1.0:
score += 0.10
return score
def CalculateExitScore(self, features):
"""Calculate exit signal score - same logic as research notebook"""
exit_score = 0
# Exit signals
if features.get('Momentum_5_20', 0) < -0.01:
exit_score += 0.3
if features.get('BB_Position', 0.5) > 0.8: # Near upper Bollinger Band
exit_score += 0.2
if features.get('Vol_Ratio', 1.0) > 1.5: # Volatility spike
exit_score += 0.2
if features.get('Trend_Strength', 0) < 0.005: # Weak trend
exit_score += 0.3
return exit_score
def CalculateFeatures(self):
"""Scheduled feature calculation for analysis"""
if len(self.feature_history) > 0:
latest = self.feature_history[-1]
self.Debug(f"Latest Signal Score: {latest['SignalScore']:.3f}, Ratio: {latest['Ratio']:.4f}")
def CalculateSkewness(self, data):
"""Calculate skewness of data"""
if len(data) < 3:
return 0
return float(np.mean([((x - np.mean(data)) / np.std(data)) ** 3 for x in data])) if np.std(data) > 0 else 0
def CalculateKurtosis(self, data):
"""Calculate kurtosis of data"""
if len(data) < 4:
return 3
return float(np.mean([((x - np.mean(data)) / np.std(data)) ** 4 for x in data]) - 3) if np.std(data) > 0 else 0
def OnEndOfAlgorithm(self):
"""Analysis at end of backtest"""
self.Debug(f"Algorithm completed. Total feature records: {len(self.feature_history)}")
self.Debug(f"Final portfolio value: ${self.Portfolio.TotalPortfolioValue:,.2f}")
if len(self.feature_history) > 0:
# Calculate some basic statistics
signal_scores = [f['SignalScore'] for f in self.feature_history]
avg_signal = np.mean(signal_scores)
max_signal = max(signal_scores)
self.Debug(f"Average signal score: {avg_signal:.3f}, Max: {max_signal:.3f}")
# Log data for leak analysis
first_record = self.feature_history[0]
last_record = self.feature_history[-1]
self.Debug(f"First record: {first_record['Time']}, Last: {last_record['Time']}")
self.Debug(f"Strategy used {len([f for f in self.feature_history if f['SignalScore'] > 0.6])} high-confidence signals")