Overall Statistics
Total Orders
23
Average Win
8.51%
Average Loss
-4.31%
Compounding Annual Return
33.727%
Drawdown
17.000%
Expectancy
0.891
Start Equity
100000.00
End Equity
179225.91
Net Profit
79.226%
Sharpe Ratio
0.936
Sortino Ratio
1.234
Probabilistic Sharpe Ratio
54.561%
Loss Rate
36%
Win Rate
64%
Profit-Loss Ratio
1.97
Alpha
0.139
Beta
0.458
Annual Standard Deviation
0.209
Annual Variance
0.044
Information Ratio
0.346
Tracking Error
0.211
Treynor Ratio
0.427
Total Fees
$0.00
Estimated Strategy Capacity
$3700000.00
Lowest Capacity Asset
BTCUSD 2XR
Portfolio Turnover
2.33%
Drawdown Recovery
169
# region imports
from AlgorithmImports import *
import numpy as np
import pandas as pd
# endregion

class BTC_ETHPairsTrading(QCAlgorithm):

    def Initialize(self):
        """Initialize the algorithm"""
        # Set backtest period to match research data (2023-2025)
        self.SetStartDate(2023, 1, 1)
        self.SetEndDate(2025, 1, 1)
        self.SetCash(100000)
        
        # Add cryptocurrency symbols using proper QuantConnect syntax
        self.btc = self.AddCrypto("BTCUSD", Resolution.Daily).Symbol
        self.eth = self.AddCrypto("ETHUSD", Resolution.Daily).Symbol
        
        # Initialize rolling windows for price history (store prices as float to avoid type issues)
        self.btc_price_window = RollingWindow[float](60)  # 60 days of BTC prices
        self.eth_price_window = RollingWindow[float](60)  # 60 days of ETH prices
        self.ratio_window = RollingWindow[float](60)      # 60 days of ratio data
        
        # Technical indicators using QuantConnect's built-in indicators
        self.btc_sma_5 = self.SMA(self.btc, 5)
        self.btc_sma_10 = self.SMA(self.btc, 10)
        self.btc_sma_20 = self.SMA(self.btc, 20)
        self.btc_sma_50 = self.SMA(self.btc, 50)
        
        self.eth_sma_5 = self.SMA(self.eth, 5)
        self.eth_sma_10 = self.SMA(self.eth, 10)
        self.eth_sma_20 = self.SMA(self.eth, 20)
        self.eth_sma_50 = self.SMA(self.eth, 50)
        
        # Bollinger Bands for ratio (we'll calculate manually)
        self.ratio_sma_20 = SimpleMovingAverage(20)
        self.ratio_std_20 = StandardDeviation(20)
        
        # Strategy state
        self.in_btc = False
        self.position_size = 0.0
        
        # Feature storage for analysis
        self.feature_history = []
        
        # Warm up period to ensure indicators are ready
        self.SetWarmUp(60, Resolution.Daily)
        
        # Schedule feature calculation daily after market close
        self.Schedule.On(self.DateRules.EveryDay(self.btc), 
                        self.TimeRules.BeforeMarketClose(self.btc, 30), 
                        self.CalculateFeatures)

    def OnData(self, data):
        """Main trading logic"""
        # Skip during warm-up
        if self.IsWarmingUp:
            return
            
        # Ensure we have data for both symbols
        if not (data.ContainsKey(self.btc) and data.ContainsKey(self.eth)):
            return
            
        # Get current prices - handle both TradeBar and QuoteBar robustly
        btc_bar = data[self.btc]
        eth_bar = data[self.eth]
        
        # Extract close prices - try multiple properties to handle different data types
        btc_price = None
        if hasattr(btc_bar, 'Close'):
            btc_price = btc_bar.Close
        elif hasattr(btc_bar, 'Value'):
            btc_price = btc_bar.Value
        elif hasattr(btc_bar, 'Price'):
            btc_price = btc_bar.Price
            
        eth_price = None
        if hasattr(eth_bar, 'Close'):
            eth_price = eth_bar.Close
        elif hasattr(eth_bar, 'Value'):
            eth_price = eth_bar.Value
        elif hasattr(eth_bar, 'Price'):
            eth_price = eth_bar.Price
        
        if btc_price is None or eth_price is None or btc_price <= 0 or eth_price <= 0:
            return
            
        # Calculate current BTC/ETH ratio
        current_ratio = btc_price / eth_price
        
        # Update rolling windows with prices (no type issues)
        self.btc_price_window.Add(float(btc_price))
        self.eth_price_window.Add(float(eth_price))
        self.ratio_window.Add(float(current_ratio))
        
        # Update ratio indicators
        self.ratio_sma_20.Update(IndicatorDataPoint(data.Time, current_ratio))
        self.ratio_std_20.Update(IndicatorDataPoint(data.Time, current_ratio))
        
        # Only trade if we have enough data
        if not self.ratio_sma_20.IsReady or not self.ratio_std_20.IsReady:
            return
            
        # Calculate features for ML scoring
        features = self.GetCurrentFeatures(current_ratio, btc_price, eth_price)
        
        if features is None:
            return
            
        # Calculate ML scores
        signal_score = self.CalculateSignalScore(features)
        exit_score = self.CalculateExitScore(features)
        
        # Store features for leak analysis
        self.feature_history.append({
            'Time': data.Time,
            'Ratio': current_ratio,
            'SignalScore': signal_score,
            'ExitScore': exit_score,
            'Features': features.copy()
        })
        
        # Dynamic position sizing based on signal strength
        if signal_score > 0.8:
            position_size = 1.0
        elif signal_score > 0.7:
            position_size = 0.8
        elif signal_score > 0.6:
            position_size = 0.6
        else:
            position_size = 0.4
            
        # Entry logic - high signal confidence
        entry_threshold = 0.6
        if signal_score > entry_threshold and not self.in_btc:
            # Calculate position size in dollars
            portfolio_value = self.Portfolio.TotalPortfolioValue
            position_value = portfolio_value * position_size
            btc_quantity = position_value / btc_price
            
            # Execute buy order
            self.MarketOrder(self.btc, btc_quantity)
            self.in_btc = True
            self.position_size = position_size
            
            self.Debug(f"BUY BTC: Ratio={current_ratio:.4f}, Signal={signal_score:.3f}, Size={position_size:.2f}")
            
        # Exit logic - low confidence or high exit signal
        exit_threshold = 0.4
        if self.in_btc and (exit_score > exit_threshold or signal_score < 0.3):
            # Close BTC position
            self.Liquidate(self.btc)
            self.in_btc = False
            self.position_size = 0.0
            
            self.Debug(f"SELL BTC: Ratio={current_ratio:.4f}, Signal={signal_score:.3f}, Exit={exit_score:.3f}")

    def GetCurrentFeatures(self, current_ratio, btc_price, eth_price):
        """Calculate features avoiding lookahead bias"""
        if self.ratio_window.Count < 50:  # Need enough history
            return None
            
        features = {}
        
        # 1. Moving average ratios (using QuantConnect indicators)
        if self.btc_sma_5.IsReady and self.eth_sma_5.IsReady:
            ratio_ma_5 = self.btc_sma_5.Current.Value / self.eth_sma_5.Current.Value
            features['MA_Ratio_5'] = current_ratio / ratio_ma_5
            
        if self.btc_sma_10.IsReady and self.eth_sma_10.IsReady:
            ratio_ma_10 = self.btc_sma_10.Current.Value / self.eth_sma_10.Current.Value
            features['MA_Ratio_10'] = current_ratio / ratio_ma_10
            
        if self.btc_sma_20.IsReady and self.eth_sma_20.IsReady:
            ratio_ma_20 = self.btc_sma_20.Current.Value / self.eth_sma_20.Current.Value
            features['MA_Ratio_20'] = current_ratio / ratio_ma_20
            
        if self.btc_sma_50.IsReady and self.eth_sma_50.IsReady:
            ratio_ma_50 = self.btc_sma_50.Current.Value / self.eth_sma_50.Current.Value
            features['MA_Ratio_50'] = current_ratio / ratio_ma_50
        
        # 2. Momentum features
        if 'MA_Ratio_5' in features and 'MA_Ratio_20' in features:
            ma_5 = self.btc_sma_5.Current.Value / self.eth_sma_5.Current.Value
            ma_20 = self.btc_sma_20.Current.Value / self.eth_sma_20.Current.Value
            features['Momentum_5_20'] = (ma_5 - ma_20) / ma_20 if ma_20 > 0 else 0
            
        if 'MA_Ratio_10' in features and 'MA_Ratio_50' in features:
            ma_10 = self.btc_sma_10.Current.Value / self.eth_sma_10.Current.Value
            ma_50 = self.btc_sma_50.Current.Value / self.eth_sma_50.Current.Value
            features['Momentum_10_50'] = (ma_10 - ma_50) / ma_50 if ma_50 > 0 else 0
        
        # 3. Volatility features using price windows
        if self.ratio_window.Count >= 20:
            recent_ratios = [self.ratio_window[i] for i in range(min(20, self.ratio_window.Count))]
            recent_returns = [recent_ratios[i] / recent_ratios[i+1] - 1 for i in range(len(recent_ratios)-1)]
            
            if len(recent_returns) >= 5:
                vol_5 = np.std(recent_returns[:5])
                vol_20 = np.std(recent_returns) if len(recent_returns) >= 20 else vol_5
                features['Volatility_5'] = vol_5
                features['Volatility_20'] = vol_20
                features['Vol_Ratio'] = vol_5 / vol_20 if vol_20 > 0 else 1.0
        
        # 4. Trend strength
        if 'Momentum_10_50' in features:
            features['Trend_Strength'] = abs(features['Momentum_10_50'])
        
        # 5. Price position in range using price windows
        if self.ratio_window.Count >= 20:
            recent_ratios = [self.ratio_window[i] for i in range(min(20, self.ratio_window.Count))]
            min_ratio = min(recent_ratios)
            max_ratio = max(recent_ratios)
            range_20 = max_ratio - min_ratio
            features['Price_Position'] = (current_ratio - min_ratio) / range_20 if range_20 > 0 else 0.5
        
        # 6. Bollinger Band position
        if self.ratio_sma_20.IsReady and self.ratio_std_20.IsReady:
            bb_middle = self.ratio_sma_20.Current.Value
            bb_std = self.ratio_std_20.Current.Value
            bb_upper = bb_middle + (2 * bb_std)
            bb_lower = bb_middle - (2 * bb_std)
            
            features['BB_Position'] = (current_ratio - bb_lower) / (bb_upper - bb_lower) if (bb_upper - bb_lower) > 0 else 0.5
        
        # 7. Statistical features using price windows
        if self.ratio_window.Count >= 20:
            recent_ratios = [self.ratio_window[i] for i in range(min(20, self.ratio_window.Count))]
            recent_returns = [recent_ratios[i] / recent_ratios[i+1] - 1 for i in range(len(recent_ratios)-1)]
            
            if len(recent_returns) >= 10:
                features['Skewness'] = self.CalculateSkewness(recent_returns)
                features['Kurtosis'] = self.CalculateKurtosis(recent_returns)
        
        return features

    def CalculateSignalScore(self, features):
        """Calculate ensemble signal score - same logic as research notebook"""
        score = 0
        
        # Momentum signals (weight: 30%)
        if features.get('Momentum_5_20', 0) > 0.02:
            score += 0.15
        if features.get('Momentum_10_50', 0) > 0.01:
            score += 0.15
        
        # Trend strength (weight: 20%)
        if features.get('Trend_Strength', 0) > 0.02:
            score += 0.20
        
        # Price position signals (weight: 15%)
        price_pos = features.get('Price_Position', 0.5)
        if 0.2 < price_pos < 0.8:  # Not at extremes
            score += 0.075
        bb_pos = features.get('BB_Position', 0.5)
        if bb_pos < 0.3:  # Near lower Bollinger Band
            score += 0.075
        
        # Volatility signals (weight: 15%)
        vol_ratio = features.get('Vol_Ratio', 1.0)
        if 0.8 < vol_ratio < 1.2:  # Stable volatility
            score += 0.075
        vol_20 = features.get('Volatility_20', 0)
        # Simplified volatility regime detection
        if vol_20 < 0.02:  # Low volatility
            score += 0.075
        
        # Statistical signals (weight: 10%)
        skewness = features.get('Skewness', 0)
        if -0.5 < skewness < 0.5:  # Normal distribution
            score += 0.05
        kurtosis = features.get('Kurtosis', 3)
        if kurtosis < 3:  # Not too heavy tails
            score += 0.05
        
        # MA signals (weight: 10%)
        ma_ratio_5 = features.get('MA_Ratio_5', 1.0)
        ma_ratio_10 = features.get('MA_Ratio_10', 1.0)
        if ma_ratio_5 > 1.0 and ma_ratio_10 > 1.0:
            score += 0.10
        
        return score

    def CalculateExitScore(self, features):
        """Calculate exit signal score - same logic as research notebook"""
        exit_score = 0
        
        # Exit signals
        if features.get('Momentum_5_20', 0) < -0.01:
            exit_score += 0.3
        if features.get('BB_Position', 0.5) > 0.8:  # Near upper Bollinger Band
            exit_score += 0.2
        if features.get('Vol_Ratio', 1.0) > 1.5:  # Volatility spike
            exit_score += 0.2
        if features.get('Trend_Strength', 0) < 0.005:  # Weak trend
            exit_score += 0.3
        
        return exit_score

    def CalculateFeatures(self):
        """Scheduled feature calculation for analysis"""
        if len(self.feature_history) > 0:
            latest = self.feature_history[-1]
            self.Debug(f"Latest Signal Score: {latest['SignalScore']:.3f}, Ratio: {latest['Ratio']:.4f}")

    def CalculateSkewness(self, data):
        """Calculate skewness of data"""
        if len(data) < 3:
            return 0
        return float(np.mean([((x - np.mean(data)) / np.std(data)) ** 3 for x in data])) if np.std(data) > 0 else 0

    def CalculateKurtosis(self, data):
        """Calculate kurtosis of data"""
        if len(data) < 4:
            return 3
        return float(np.mean([((x - np.mean(data)) / np.std(data)) ** 4 for x in data]) - 3) if np.std(data) > 0 else 0

    def OnEndOfAlgorithm(self):
        """Analysis at end of backtest"""
        self.Debug(f"Algorithm completed. Total feature records: {len(self.feature_history)}")
        self.Debug(f"Final portfolio value: ${self.Portfolio.TotalPortfolioValue:,.2f}")
        
        if len(self.feature_history) > 0:
            # Calculate some basic statistics
            signal_scores = [f['SignalScore'] for f in self.feature_history]
            avg_signal = np.mean(signal_scores)
            max_signal = max(signal_scores)
            self.Debug(f"Average signal score: {avg_signal:.3f}, Max: {max_signal:.3f}")
            
            # Log data for leak analysis
            first_record = self.feature_history[0]
            last_record = self.feature_history[-1]
            self.Debug(f"First record: {first_record['Time']}, Last: {last_record['Time']}")
            self.Debug(f"Strategy used {len([f for f in self.feature_history if f['SignalScore'] > 0.6])} high-confidence signals")