| Overall Statistics |
|
Total Orders 266 Average Win 0.78% Average Loss -0.48% Compounding Annual Return -1.433% Drawdown 10.900% Expectancy -0.131 Start Equity 30000.00 End Equity 27489.17 Net Profit -8.369% Sharpe Ratio -1.201 Sortino Ratio -1.121 Probabilistic Sharpe Ratio 0.006% Loss Rate 67% Win Rate 33% Profit-Loss Ratio 1.63 Alpha -0.038 Beta 0.007 Annual Standard Deviation 0.031 Annual Variance 0.001 Information Ratio -0.427 Tracking Error 0.378 Treynor Ratio -5.639 Total Fees $693.95 Estimated Strategy Capacity $7200000.00 Lowest Capacity Asset ETHUSDT 18N Portfolio Turnover 1.10% |
from AlgorithmImports import *
import numpy as np
import pandas as pd
from collections import defaultdict
from decimal import Decimal
import pickle
import base64
class Config:
def __init__(self, algorithm):
# Basic configuration
self.start_date = datetime(2019, 1, 1)
self.end_date = datetime(2025, 3, 31)
self.initial_cash = 30000
self.commission_rate = float(algorithm.GetParameter("commission_rate") or 0.001)
self.base_slippage = float(algorithm.GetParameter("slippage") or 0.0005)
self.benchmark_symbol = "BTCUSDT"
self.trading_symbol = "ETHUSDT"
self.exchange = algorithm.GetParameter("exchange", "binance")
self.resolution = Resolution.Hour
# Indicator parameters - Adjusted for better signal quality
self.ema_short = 12
self.ema_medium = 50
self.ema_long = 200
self.rsi_period = 14
self.atr_period = 14
self.macd_fast = 8
self.macd_slow = 21
self.macd_signal = 9
# Risk management parameters - Significantly improved
self.stop_loss_pct = 0.12 # Wider stop loss to avoid premature exits
self.take_profit_pct = 0.12 # Higher profit target
self.max_drawdown_exit = 0.15
self.trailing_stop_pct = 0.09 # Wider trailing stop
self.max_position_pct = 0.20 # Maximum allocation reduced
self.initial_position_pct = 0.10 # Starting allocation reduced
# Trading parameters - Optimized for holding periods
self.min_holding_days = 3
self.max_holding_days = 21 # Extended to allow trends to develop
self.min_trade_interval_hours = 120 # Increased to reduce overtrading
# Model parameters
self.best_qtable_file = algorithm.GetParameter("best_qtable_file", "best_qtable.pkl")
self.warmup_period = 200
# Market regime parameters - Drastically improved for stability
self.regime_change_threshold = 0.08 # Much higher threshold for regime change
self.regime_lookback_days = 14 # Extended lookback for more stability
self.regime_update_hours = 48 # Less frequent updates
self.regime_stability_days = 5 # Minimum days to maintain a regime
def get_market(self):
"""Convert exchange name to QC Market enum value"""
exchange_map = {
"binance": Market.Binance,
"coinbase": Market.GDAX,
"gdax": Market.GDAX,
"bitfinex": Market.Bitfinex,
"kraken": Market.Kraken
}
return exchange_map.get(self.exchange.lower(), Market.Binance)
class CustomFeeModel(FeeModel):
def __init__(self, commission_rate):
self.commission_rate = commission_rate
def GetOrderFee(self, parameters):
security = parameters.Security
order = parameters.Order
trade_value = abs(order.Quantity) * security.Price
fee = Decimal(str(trade_value)) * Decimal(str(self.commission_rate))
currency = security.QuoteCurrency.Symbol
return OrderFee(CashAmount(fee, currency))
class MBPOTraderAlgorithm(QCAlgorithm):
def Initialize(self):
self.config = Config(self)
self.SetStartDate(self.config.start_date)
self.SetEndDate(self.config.end_date)
self.SetCash(self.config.initial_cash)
# Setup markets and securities
market = self.config.get_market()
self.SetBrokerageModel(DefaultBrokerageModel(AccountType.Margin))
self.btcSymbol = self.AddCrypto(self.config.benchmark_symbol, self.config.resolution, market).Symbol
self.SetBenchmark(self.btcSymbol)
self.symbol = self.AddCrypto(self.config.trading_symbol, self.config.resolution, market).Symbol
# Set fee and slippage models
for security in self.Securities.Values:
security.SetFeeModel(CustomFeeModel(self.config.commission_rate))
security.SetSlippageModel(ConstantSlippageModel(self.config.base_slippage))
# Create indicators
self.ema_short = self.EMA(self.symbol, self.config.ema_short, self.config.resolution)
self.ema_medium = self.EMA(self.symbol, self.config.ema_medium, self.config.resolution)
self.ema_long = self.EMA(self.symbol, self.config.ema_long, self.config.resolution)
self.rsi = self.RSI(self.symbol, self.config.rsi_period, MovingAverageType.Simple, self.config.resolution)
self.macd = self.MACD(self.symbol, self.config.macd_fast, self.config.macd_slow, self.config.macd_signal,
MovingAverageType.Exponential, self.config.resolution)
self.atr = self.ATR(self.symbol, self.config.atr_period, MovingAverageType.Simple, self.config.resolution)
self.bbands = self.BB(self.symbol, 20, 2, MovingAverageType.Simple, self.config.resolution)
# Create separate components for Stochastic calculation
# High, low, close for stochastic calculation
self.high14 = self.MAX(self.symbol, 14, Resolution.Hour, Field.High)
self.low14 = self.MIN(self.symbol, 14, Resolution.Hour, Field.Low)
# We'll manually calculate stochastic values
self.stoch_k_values = []
self.stoch_d_values = []
self.volatility = self.STD(self.symbol, 20, Resolution.Hour)
# For tracking volatility history
self.volatility_history = []
# Setup warmup
self.SetWarmUp(TimeSpan.FromDays(self.config.warmup_period))
# Initialize variables
self.market_regime = "neutral"
self.last_regime_update = None
self.last_regime_change = None
self.is_data_ready = False
self.price_history = []
self.volume_history = []
self.regime_history = []
self.trade_history = []
self.position_entry_time = None
self.position_entry_price = 0
self.highest_price_since_entry = 0
self.lowest_price_since_entry = float('inf')
self.last_trade_time = datetime(1900, 1, 1)
self.consecutive_losses = 0
self.consecutive_profits = 0
self.equity_curve = {}
self.btc_history = {}
self.q_table = None
self.model_loaded = False
self.daily_returns = []
self.last_regime_log = None
self.successful_trade_signals = []
self.last_signal_scores = {}
self.market_conditions = {
"trend_strength": 0,
"volatility": 0,
"momentum": 0
}
# Load model
self.LoadModel()
# Add charts
self.AddCharts()
# Log initialization
self.Log(f"Initialized MBPO Trader on {str(market).split('.')[-1]}")
self.Log(f"Trading {self.config.trading_symbol} with benchmark {self.config.benchmark_symbol}")
def LoadModel(self):
"""Load the trained Q-table model"""
try:
qtable_data_b64 = self.ObjectStore.Read(self.config.best_qtable_file)
if qtable_data_b64 is not None:
qtable_data = base64.b64decode(qtable_data_b64)
loaded_dict = pickle.loads(qtable_data)
# Convert keys to proper format
converted_dict = {tuple(map(int, k.strip('()').split(', '))) if isinstance(k, str) else k: v
for k, v in loaded_dict.items()}
self.q_table = defaultdict(lambda: np.zeros(3), converted_dict)
self.Log(f"QTable loaded from ObjectStore: '{self.config.best_qtable_file}'")
self.model_loaded = True
else:
self.Log(f"QTable not found, using rule-based strategy")
self.q_table = defaultdict(lambda: np.zeros(3))
self.model_loaded = True
except Exception as e:
self.Log(f"Error loading model: {str(e)}")
self.q_table = defaultdict(lambda: np.zeros(3))
self.model_loaded = True
def AddCharts(self):
"""Add charts for monitoring performance"""
# Performance chart
chart = Chart("Performance")
chart.AddSeries(Series("Strategy", SeriesType.Line))
chart.AddSeries(Series("Benchmark", SeriesType.Line))
self.AddChart(chart)
# Risk metrics chart
risk_chart = Chart("Risk")
risk_chart.AddSeries(Series("Drawdown", SeriesType.Line))
risk_chart.AddSeries(Series("Profit_Factor", SeriesType.Line))
self.AddChart(risk_chart)
# Regime chart
regime_chart = Chart("Market Regime")
regime_chart.AddSeries(Series("Regime", SeriesType.Line))
self.AddChart(regime_chart)
def OnWarmupFinished(self):
self.Log("Algorithm finished warming up")
self.is_data_ready = self.CheckIndicatorsReady()
def CheckIndicatorsReady(self):
"""Check if all indicators are ready"""
if not (self.ema_short.IsReady and self.ema_medium.IsReady and
self.rsi.IsReady and self.macd.IsReady and
self.atr.IsReady and self.bbands.IsReady and
self.high14.IsReady and self.low14.IsReady and
self.volatility.IsReady):
return False
return True
def CalculateStochastic(self):
"""Calculate stochastic K% and D% values manually"""
if not self.high14.IsReady or not self.low14.IsReady:
return None, None
# Get current close price
current_close = self.Securities[self.symbol].Close
# Calculate %K
highest_high = self.high14.Current.Value
lowest_low = self.low14.Current.Value
# Avoid division by zero
if highest_high == lowest_low:
stoch_k = 50 # Default to middle value
else:
stoch_k = 100 * (current_close - lowest_low) / (highest_high - lowest_low)
# Store K values
self.stoch_k_values.append(stoch_k)
# Keep only the last 20 values to save memory
if len(self.stoch_k_values) > 20:
self.stoch_k_values.pop(0)
# Calculate %D (3-period simple moving average of %K)
if len(self.stoch_k_values) >= 3:
stoch_d = sum(self.stoch_k_values[-3:]) / 3
self.stoch_d_values.append(stoch_d)
if len(self.stoch_d_values) > 20:
self.stoch_d_values.pop(0)
else:
stoch_d = stoch_k # Default if not enough history
return stoch_k, stoch_d
def UpdateMarketRegime(self):
"""Update market regime with reduced frequency and improved stability"""
# Only update regime every regime_update_hours
if (self.last_regime_update is not None and
(self.Time - self.last_regime_update).total_seconds() < self.config.regime_update_hours * 3600):
return
# Enforce minimum days before changing regime again
if (self.last_regime_change is not None and
(self.Time - self.last_regime_change).total_seconds() < self.config.regime_stability_days * 24 * 3600 and
self.market_regime != "neutral"):
return
if len(self.price_history) < self.config.regime_lookback_days * 24: # Need enough history
return
try:
# Get price history for the lookback period
lookback_prices = self.price_history[-self.config.regime_lookback_days * 24:]
current_price = lookback_prices[-1]
# Calculate trend indicators
ema_short_above_medium = self.ema_short.Current.Value > self.ema_medium.Current.Value
ema_medium_above_long = self.ema_medium.Current.Value > self.ema_long.Current.Value if self.ema_long.IsReady else False
# Calculate long-term trend
price_trend = 0
if len(lookback_prices) > 5 * 24: # Use 5-day price trend
price_5days_ago = lookback_prices[-5 * 24]
price_trend = (current_price / price_5days_ago - 1) * 100
# Calculate price momentum
recent_returns = [(p2/p1-1) for p1, p2 in zip(lookback_prices[:-1], lookback_prices[1:])]
avg_return = np.mean(recent_returns) * 100 # Average return in percent
# Get RSI and MACD values
rsi_value = self.rsi.Current.Value
macd_value = self.macd.Current.Value
macd_signal = self.macd.Signal.Current.Value
# Get Stochastic values - using manual calculation
stoch_k, stoch_d = self.CalculateStochastic()
# Default values if calculation fails
if stoch_k is None:
stoch_k = 50
stoch_d = 50
# Get volatility (normalized)
current_vol = self.volatility.Current.Value
# Use volatility history instead of accessing Samples directly
if len(self.volatility_history) > 0:
avg_vol = np.mean(self.volatility_history[-20:])
else:
avg_vol = current_vol
normalized_vol = current_vol / current_price * 100 if current_price > 0 else 0
# Calculate volume trend if available
volume_trend = 0
if len(self.volume_history) > 20:
recent_volume = np.mean(self.volume_history[-5:])
past_volume = np.mean(self.volume_history[-20:-5])
volume_trend = 1 if recent_volume > past_volume else -1 if recent_volume < past_volume else 0
# Determine regime based on weighted factors
bullish_signals = [
ema_short_above_medium and ema_medium_above_long, # Weight: 3
rsi_value > 55, # Weight: 1
macd_value > 0, # Weight: 2
macd_value > macd_signal, # Weight: 2
price_trend > 2, # Weight: 3
avg_return > 0.25, # Weight: 2
stoch_k > stoch_d and stoch_k < 80, # Weight: 1
normalized_vol < avg_vol * 1.2, # Weight: 1
volume_trend > 0 # Weight: 1
]
bearish_signals = [
not ema_short_above_medium and not ema_medium_above_long, # Weight: 3
rsi_value < 45, # Weight: 1
macd_value < 0, # Weight: 2
macd_value < macd_signal, # Weight: 2
price_trend < -2, # Weight: 3
avg_return < -0.25, # Weight: 2
stoch_k < stoch_d and stoch_k > 20, # Weight: 1
normalized_vol > avg_vol * 1.5, # Weight: 1
volume_trend < 0 # Weight: 1
]
# Weights for each signal
bullish_weights = [3, 1, 2, 2, 3, 2, 1, 1, 1]
bearish_weights = [3, 1, 2, 2, 3, 2, 1, 1, 1]
# Calculate weighted scores
bullish_score = sum(signal * weight for signal, weight in zip(bullish_signals, bullish_weights))
bearish_score = sum(signal * weight for signal, weight in zip(bearish_signals, bearish_weights))
# Total possible score
max_score = sum(bullish_weights)
# Update market conditions for reference
self.market_conditions["trend_strength"] = bullish_score - bearish_score
self.market_conditions["volatility"] = normalized_vol
self.market_conditions["momentum"] = avg_return
# Strong signal requires significant confidence
new_regime = "neutral"
if bullish_score > max_score * 0.7 and bullish_score > bearish_score * 2:
new_regime = "bullish"
elif bearish_score > max_score * 0.7 and bearish_score > bullish_score * 2:
new_regime = "bearish"
# Only log regime changes, not neutral to neutral
if new_regime != self.market_regime:
old_regime = self.market_regime
# Store regime change regardless
self.market_regime = new_regime
self.last_regime_update = self.Time
# Only update last_regime_change if it's a significant change (not to/from neutral)
if (old_regime == "bullish" and new_regime == "bearish") or \
(old_regime == "bearish" and new_regime == "bullish"):
self.last_regime_change = self.Time
self.regime_history.append((self.Time, old_regime, new_regime))
# Only log major transitions (not to/from neutral)
major_transition = (old_regime == "bullish" and new_regime == "bearish") or \
(old_regime == "bearish" and new_regime == "bullish")
# Prevent duplicate logs for the same transition type
log_key = f"{old_regime}_to_{new_regime}"
if major_transition and log_key != self.last_regime_log:
self.Log(f"Market regime changed from {old_regime} to {new_regime}")
self.last_regime_log = log_key
# Plot regime (1=bullish, 0=neutral, -1=bearish)
regime_value = 1 if self.market_regime == "bullish" else -1 if self.market_regime == "bearish" else 0
self.Plot("Market Regime", "Regime", regime_value)
except Exception as e:
self.Debug(f"Error updating market regime: {str(e)}")
def ExecuteTrade(self, action, signal_strength=0):
"""Execute trade based on the action"""
if not self.HasValidPrice() or not self.CanTrade():
return
pre_trade_price = self.Securities[self.symbol].Price
# Adjust position size based on market conditions and signal strength
allocation = self.GetPositionSize(signal_strength)
try:
if action == 1: # LONG
# Skip if in clear bearish regime
if self.market_regime == "bearish":
return
# Close any short position first
if self.Portfolio[self.symbol].Invested and self.Portfolio[self.symbol].IsShort:
self.ClosePosition("SHORT")
# Open long position
if not self.Portfolio[self.symbol].Invested:
self.SetHoldings(self.symbol, allocation)
self.position_entry_time = self.Time
self.position_entry_price = self.Securities[self.symbol].Price
self.highest_price_since_entry = self.position_entry_price
self.lowest_price_since_entry = self.position_entry_price
self.last_trade_time = self.Time
signal_strength = max(0.5, signal_strength) # Ensure minimum logging value
self.Log(f"Opening LONG at {self.Time} with allocation {allocation:.2f}, signal strength: {signal_strength:.2f}")
# Store signal score for post-trade analysis
self.last_signal_scores[self.Time] = {
"direction": "LONG",
"strength": signal_strength,
"market_regime": self.market_regime,
"trend_strength": self.market_conditions["trend_strength"],
"volatility": self.market_conditions["volatility"],
"momentum": self.market_conditions["momentum"]
}
elif action == 2: # SHORT
# Skip if in clear bullish regime
if self.market_regime == "bullish":
return
# Close any long position first
if self.Portfolio[self.symbol].Invested and not self.Portfolio[self.symbol].IsShort:
self.ClosePosition("LONG")
# Open short position
if not self.Portfolio[self.symbol].Invested:
self.SetHoldings(self.symbol, -allocation)
self.position_entry_time = self.Time
self.position_entry_price = self.Securities[self.symbol].Price
self.highest_price_since_entry = self.position_entry_price
self.lowest_price_since_entry = self.position_entry_price
self.last_trade_time = self.Time
signal_strength = max(0.5, signal_strength) # Ensure minimum logging value
self.Log(f"Opening SHORT at {self.Time} with allocation {allocation:.2f}, signal strength: {signal_strength:.2f}")
# Store signal score for post-trade analysis
self.last_signal_scores[self.Time] = {
"direction": "SHORT",
"strength": signal_strength,
"market_regime": self.market_regime,
"trend_strength": self.market_conditions["trend_strength"],
"volatility": self.market_conditions["volatility"],
"momentum": self.market_conditions["momentum"]
}
else: # HOLD - Check for exits
if self.Portfolio[self.symbol].Invested and self.position_entry_time is not None:
should_exit, exit_reason = self.ShouldExitPosition()
if should_exit:
position_type = "SHORT" if self.Portfolio[self.symbol].IsShort else "LONG"
self.ClosePosition(position_type, exit_reason)
except Exception as e:
self.Log(f"Error executing trade: {str(e)}")
def GetPositionSize(self, signal_strength=0):
"""Determine appropriate position size based on market conditions and signal strength"""
base_allocation = self.config.initial_position_pct
# Adjust based on signal strength
if signal_strength > 0:
# Increase allocation for strong signals (up to 1.5x)
signal_factor = 1 + min(0.5, signal_strength * 0.5)
base_allocation *= signal_factor
# Reduce size based on volatility
if self.atr.IsReady:
price = self.Securities[self.symbol].Price
atr_percent = self.atr.Current.Value / price if price > 0 else 0
# High volatility = smaller position
if atr_percent > 0.03: # 3% daily moves
base_allocation *= 0.8
elif atr_percent > 0.05: # 5% daily moves
base_allocation *= 0.6
elif atr_percent > 0.07: # 7% daily moves
base_allocation *= 0.4
# Reduce size after consecutive losses
if self.consecutive_losses > 0:
base_allocation *= max(0.4, 1 - (self.consecutive_losses * 0.15)) # More aggressive reduction
# Increase size after consecutive profits (up to 1.5x)
if self.consecutive_profits > 1:
profit_boost = min(0.5, self.consecutive_profits * 0.1)
base_allocation *= (1 + profit_boost)
# Reduce position size in neutral market regime
if self.market_regime == "neutral":
base_allocation *= 0.7
# Respect maximum position size
return min(base_allocation, self.config.max_position_pct)
def _get_signal_strength(self, direction):
"""Calculate signal strength for improved position sizing and logging"""
rsi = self.rsi.Current.Value
macd = self.macd.Current.Value
macd_signal = self.macd.Signal.Current.Value
# Calculate price in relation to Bollinger Bands
bb_width = (self.bbands.UpperBand.Current.Value - self.bbands.LowerBand.Current.Value) / self.bbands.MiddleBand.Current.Value
price = self.Securities[self.symbol].Price
bb_position = (price - self.bbands.LowerBand.Current.Value) / (self.bbands.UpperBand.Current.Value - self.bbands.LowerBand.Current.Value) if self.bbands.UpperBand.Current.Value > self.bbands.LowerBand.Current.Value else 0.5
# Get latest stochastic values - manual calculation
stoch_k, stoch_d = self.CalculateStochastic()
# Default values if calculation fails
if stoch_k is None:
stoch_k = 50
stoch_d = 50
if direction == "long":
# RSI contribution (higher is better for longs)
rsi_score = 0
if rsi < 30: # Oversold - strong buy
rsi_score = 1.0
elif rsi < 40: # Still attractive
rsi_score = 0.8
elif rsi < 50: # Neutral to positive
rsi_score = 0.6
elif rsi < 70: # Still acceptable
rsi_score = 0.3
else: # Overbought - not ideal
rsi_score = 0.1
# MACD contribution (positive and rising is better for longs)
macd_score = 0
if macd > 0 and macd > macd_signal: # Strong bullish
macd_score = 1.0
elif macd > 0: # Positive but not rising
macd_score = 0.7
elif macd > macd_signal: # Rising but negative
macd_score = 0.5
else: # Negative and falling
macd_score = 0.1
# Bollinger Band position (lower is better for longs - buying dips)
bb_score = 0
if bb_position < 0.2: # Near or below lower band
bb_score = 1.0
elif bb_position < 0.4: # Lower section
bb_score = 0.8
elif bb_position < 0.6: # Middle section
bb_score = 0.4
elif bb_position < 0.8: # Upper section
bb_score = 0.2
else: # Near or above upper band
bb_score = 0.1
# Stochastic (lower K with rising K-D is better for longs)
stoch_score = 0
if stoch_k < 20 and stoch_k > stoch_d: # Oversold and rising
stoch_score = 1.0
elif stoch_k < 30 and stoch_k > stoch_d: # Near oversold and rising
stoch_score = 0.8
elif stoch_k > stoch_d: # Rising
stoch_score = 0.6
elif stoch_k < 30: # Oversold but not rising
stoch_score = 0.4
else: # Not ideal
stoch_score = 0.2
# EMA alignment (all aligned is better)
ema_score = 0
if self.ema_short.Current.Value > self.ema_medium.Current.Value:
ema_score += 0.5
if self.ema_medium.IsReady and self.ema_long.IsReady and self.ema_medium.Current.Value > self.ema_long.Current.Value:
ema_score += 0.5
# Calculate weighted average (giving more weight to trend and momentum)
return (rsi_score * 0.15 + macd_score * 0.25 + bb_score * 0.15 + stoch_score * 0.15 + ema_score * 0.3)
else: # short
# RSI contribution (higher is better for shorts)
rsi_score = 0
if rsi > 70: # Overbought - strong sell
rsi_score = 1.0
elif rsi > 60: # Still attractive
rsi_score = 0.8
elif rsi > 50: # Neutral to positive
rsi_score = 0.6
elif rsi > 30: # Still acceptable
rsi_score = 0.3
else: # Oversold - not ideal
rsi_score = 0.1
# MACD contribution (negative and falling is better for shorts)
macd_score = 0
if macd < 0 and macd < macd_signal: # Strong bearish
macd_score = 1.0
elif macd < 0: # Negative but not falling
macd_score = 0.7
elif macd < macd_signal: # Falling but positive
macd_score = 0.5
else: # Positive and rising
macd_score = 0.1
# Bollinger Band position (higher is better for shorts - selling rallies)
bb_score = 0
if bb_position > 0.8: # Near or above upper band
bb_score = 1.0
elif bb_position > 0.6: # Upper section
bb_score = 0.8
elif bb_position > 0.4: # Middle section
bb_score = 0.4
elif bb_position > 0.2: # Lower section
bb_score = 0.2
else: # Near or below lower band
bb_score = 0.1
# Stochastic (higher K with falling K-D is better for shorts)
stoch_score = 0
if stoch_k > 80 and stoch_k < stoch_d: # Overbought and falling
stoch_score = 1.0
elif stoch_k > 70 and stoch_k < stoch_d: # Near overbought and falling
stoch_score = 0.8
elif stoch_k < stoch_d: # Falling
stoch_score = 0.6
elif stoch_k > 70: # Overbought but not falling
stoch_score = 0.4
else: # Not ideal
stoch_score = 0.2
# EMA alignment (all aligned is better)
ema_score = 0
if self.ema_short.Current.Value < self.ema_medium.Current.Value:
ema_score += 0.5
if self.ema_medium.IsReady and self.ema_long.IsReady and self.ema_medium.Current.Value < self.ema_long.Current.Value:
ema_score += 0.5
# Calculate weighted average (giving more weight to trend and momentum)
return (rsi_score * 0.15 + macd_score * 0.25 + bb_score * 0.15 + stoch_score * 0.15 + ema_score * 0.3)
def ClosePosition(self, position_type, reason=""):
"""Close current position and log the result"""
if not self.Portfolio[self.symbol].Invested:
return
current_holdings = self.Portfolio[self.symbol].Quantity
entry_price = self.position_entry_price
current_price = self.Securities[self.symbol].Price
self.Liquidate(self.symbol)
# Calculate profit/loss
if entry_price > 0:
if position_type == "SHORT":
pnl_pct = (entry_price - current_price) / entry_price
else:
pnl_pct = (current_price - entry_price) / entry_price
result = "PROFIT" if pnl_pct > 0 else "LOSS"
duration = self.Time - self.position_entry_time
# Update trade history
trade_record = {
"time": self.Time,
"type": position_type,
"pnl_pct": pnl_pct,
"duration": duration,
"reason": reason,
"entry_price": entry_price,
"exit_price": current_price
}
# Add signal info if available
if self.position_entry_time in self.last_signal_scores:
trade_record.update(self.last_signal_scores[self.position_entry_time])
# If profitable, store signal characteristics
if pnl_pct > 0:
self.successful_trade_signals.append(self.last_signal_scores[self.position_entry_time])
self.trade_history.append(trade_record)
# Update consecutive wins/losses
if pnl_pct > 0:
self.consecutive_losses = 0
self.consecutive_profits += 1
else:
self.consecutive_losses += 1
self.consecutive_profits = 0
reason_text = f" due to {reason}" if reason else ""
self.Log(f"Trade {result}: {position_type} closed with {pnl_pct:.2%} P&L after {duration}{reason_text}")
self.position_entry_time = None
self.position_entry_price = 0
def ShouldExitPosition(self):
"""Determine if current position should be exited with dynamic thresholds"""
if not self.Portfolio[self.symbol].Invested or self.position_entry_time is None:
return False, ""
current_price = self.Securities[self.symbol].Price
is_short = self.Portfolio[self.symbol].IsShort
# Update tracking values
if current_price > self.highest_price_since_entry:
self.highest_price_since_entry = current_price
if current_price < self.lowest_price_since_entry:
self.lowest_price_since_entry = current_price
# Time holding position
holding_days = (self.Time - self.position_entry_time).days
# Adjust exit criteria based on holding time
# For longer-held positions, we loosen the trailing stops
time_factor = min(1.0, holding_days / 7) # Scale factor based on holding time (max 1.0)
adjusted_trailing_stop = self.config.trailing_stop_pct * (1 - time_factor * 0.3) # Reduce by up to 30%
# Exit condition 1: Max holding period
if holding_days >= self.config.max_holding_days:
return True, f"max holding period ({holding_days} days)"
# Exit condition 2: Stop loss - fixed
if is_short:
loss_pct = (current_price - self.position_entry_price) / self.position_entry_price
if loss_pct > self.config.stop_loss_pct:
return True, f"stop loss triggered ({loss_pct:.2%})"
else:
loss_pct = (self.position_entry_price - current_price) / self.position_entry_price
if loss_pct > self.config.stop_loss_pct:
return True, f"stop loss triggered ({loss_pct:.2%})"
# Exit condition 3: Take profit - based on volatility and time in trade
# The longer a position is held, the higher the take profit target
profit_target = self.config.take_profit_pct * (1 + time_factor * 0.5) # Increase by up to 50%
if is_short:
profit_pct = (self.position_entry_price - current_price) / self.position_entry_price
if profit_pct > profit_target:
return True, f"take profit triggered ({profit_pct:.2%})"
else:
profit_pct = (current_price - self.position_entry_price) / self.position_entry_price
if profit_pct > profit_target:
return True, f"take profit triggered ({profit_pct:.2%})"
# Exit condition 4: Trailing stop - dynamic
if is_short:
if self.lowest_price_since_entry < self.position_entry_price:
bounce = (current_price - self.lowest_price_since_entry) / self.lowest_price_since_entry
if bounce > adjusted_trailing_stop:
return True, f"trailing stop ({bounce:.2%} from lowest)"
else:
if self.highest_price_since_entry > self.position_entry_price:
drop = (self.highest_price_since_entry - current_price) / self.highest_price_since_entry
if drop > adjusted_trailing_stop:
return True, f"trailing stop ({drop:.2%} from highest)"
# Exit condition 5: Market regime change - only if the position is against the regime
# AND the position hasn't moved in our favor significantly
if is_short and self.market_regime == "bullish":
# For shorts, check if we're not in significant profit already
profit_pct = (self.position_entry_price - current_price) / self.position_entry_price
if profit_pct < 0.05: # Less than 5% profit
return True, f"market regime change to {self.market_regime}"
elif not is_short and self.market_regime == "bearish":
# For longs, check if we're not in significant profit already
profit_pct = (current_price - self.position_entry_price) / self.position_entry_price
if profit_pct < 0.05: # Less than 5% profit
return True, f"market regime change to {self.market_regime}"
# No exit condition met
return False, ""
def CanTrade(self):
"""Check if trading is allowed based on various conditions"""
# Check minimum time between trades
hours_since_last_trade = (self.Time - self.last_trade_time).total_seconds() / 3600
min_interval = self.config.min_trade_interval_hours * (1 + 0.25 * self.consecutive_losses)
if hours_since_last_trade < min_interval:
return False
# Check minimum holding period
if self.Portfolio[self.symbol].Invested and self.position_entry_time is not None:
holding_days = (self.Time - self.position_entry_time).days
if holding_days < self.config.min_holding_days:
return False
return True
def HasValidPrice(self):
"""Check if we have valid price data"""
if not self.Securities.ContainsKey(self.symbol) or not self.Securities[self.symbol].HasData:
return False
price = self.Securities[self.symbol].Price
return (price is not None) and (price > 0)
def GetSignals(self):
"""Get improved trading signals with stronger filtering"""
if not self.is_data_ready:
return 0, 0 # HOLD, 0 signal strength
# Basic indicator checks
rsi = self.rsi.Current.Value
macd = self.macd.Current.Value
macd_signal = self.macd.Signal.Current.Value
price = self.Securities[self.symbol].Price
# Get Stochastic values - using manual calculation
stoch_k, stoch_d = self.CalculateStochastic()
# Default values if calculation fails
if stoch_k is None:
stoch_k = 50
stoch_d = 50
# Calculate price in relation to Bollinger Bands
bb_width = (self.bbands.UpperBand.Current.Value - self.bbands.LowerBand.Current.Value) / self.bbands.MiddleBand.Current.Value
bb_position = (price - self.bbands.LowerBand.Current.Value) / (self.bbands.UpperBand.Current.Value - self.bbands.LowerBand.Current.Value) if self.bbands.UpperBand.Current.Value > self.bbands.LowerBand.Current.Value else 0.5
# Define signal scores (weighted approach)
long_score = 0
short_score = 0
# Check volatility
# Store current volatility value
current_vol = self.volatility.Current.Value
self.volatility_history.append(current_vol)
# Keep only the most recent values
if len(self.volatility_history) > 20:
self.volatility_history.pop(0)
# Calculate average volatility from history
avg_volatility = np.mean(self.volatility_history) if self.volatility_history else current_vol
# Use this condition for volatility comparison (fixed from previous error)
# Check if current volatility is lower than average (good for trend)
lower_volatility = current_vol < avg_volatility
# === Long signals ===
# RSI signals (0-100 scale)
if rsi < 30: # Oversold
long_score += 3
elif rsi < 40:
long_score += 2
elif rsi < 50:
long_score += 1
# Stochastic (oversold and rising)
if stoch_k < 20 and stoch_k > stoch_d:
long_score += 3
elif stoch_k < 30 and stoch_k > stoch_d:
long_score += 2
elif stoch_k < 50 and stoch_k > stoch_d:
long_score += 1
# Bollinger Band signals (price at lower band)
if bb_position < 0.1: # Price at or below lower band
long_score += 3
elif bb_position < 0.3:
long_score += 2
elif bb_position < 0.5:
long_score += 1
# MACD signals
if macd > 0 and macd > macd_signal: # Bullish crossover
long_score += 3
elif macd > 0:
long_score += 2
elif macd > macd_signal:
long_score += 1
# Trend signals (EMA)
if self.ema_short.Current.Value > self.ema_medium.Current.Value and self.ema_medium.Current.Value > self.ema_long.Current.Value:
long_score += 4 # Strong uptrend
elif self.ema_short.Current.Value > self.ema_medium.Current.Value:
long_score += 2 # Potential start of uptrend
# Volatility factor (lower volatility is better for trend following)
if lower_volatility:
long_score += 1
# Market regime bias
if self.market_regime == "bullish":
long_score += 4
elif self.market_regime == "neutral":
long_score += 1
# === Short signals ===
# RSI signals
if rsi > 70: # Overbought
short_score += 3
elif rsi > 60:
short_score += 2
elif rsi > 50:
short_score += 1
# Stochastic (overbought and falling)
if stoch_k > 80 and stoch_k < stoch_d:
short_score += 3
elif stoch_k > 70 and stoch_k < stoch_d:
short_score += 2
elif stoch_k > 50 and stoch_k < stoch_d:
short_score += 1
# Bollinger Band signals (price at upper band)
if bb_position > 0.9: # Price at or above upper band
short_score += 3
elif bb_position > 0.7:
short_score += 2
elif bb_position > 0.5:
short_score += 1
# MACD signals
if macd < 0 and macd < macd_signal: # Bearish crossover
short_score += 3
elif macd < 0:
short_score += 2
elif macd < macd_signal:
short_score += 1
# Trend signals (EMA)
if self.ema_short.Current.Value < self.ema_medium.Current.Value and self.ema_medium.Current.Value < self.ema_long.Current.Value:
short_score += 4 # Strong downtrend
elif self.ema_short.Current.Value < self.ema_medium.Current.Value:
short_score += 2 # Potential start of downtrend
# Volatility factor (higher volatility can be good for shorts)
# Using a different approach than checking length of samples directly
if current_vol > avg_volatility * 1.5:
short_score += 1
# Market regime bias
if self.market_regime == "bearish":
short_score += 4
elif self.market_regime == "neutral":
short_score += 1
# Calculate signal strength (normalize to 0-1 scale)
max_possible_score = 20 # Maximum possible score
long_strength = min(1.0, long_score / max_possible_score)
short_strength = min(1.0, short_score / max_possible_score)
# Stronger thresholds for trade initiation
min_score_threshold = 12 # 60% of max score
score_difference = 5 # Require clear dominance
if long_score > min_score_threshold and long_score > short_score + score_difference:
return 1, long_strength # LONG
elif short_score > min_score_threshold and short_score > long_score + score_difference:
return 2, short_strength # SHORT
else:
# HOLD - with strength of best signal (for reference)
return 0, max(long_strength, short_strength)
def OnData(self, data):
"""Process market data updates"""
# Skip if warming up or data not ready
if self.IsWarmingUp:
return
# Ensure we have data for our symbols
if not (data.ContainsKey(self.symbol) and data.ContainsKey(self.btcSymbol)) or not self.HasValidPrice():
return
# Update data ready status if needed
if not self.is_data_ready:
self.is_data_ready = self.CheckIndicatorsReady()
if not self.is_data_ready:
return
# Update price and volume history
current_price = self.Securities[self.symbol].Close
self.price_history.append(current_price)
# Record volume if available
if hasattr(data[self.symbol], 'Volume') and data[self.symbol].Volume > 0:
self.volume_history.append(data[self.symbol].Volume)
# Keep history at a manageable size
if len(self.price_history) > 500:
self.price_history.pop(0)
if len(self.volume_history) > 500:
self.volume_history.pop(0)
# Update market regime
self.UpdateMarketRegime()
# Track portfolio and benchmark history
current_date = self.Time.date()
# Calculate daily returns if we have a new date
if current_date not in self.equity_curve:
if len(self.equity_curve) > 0:
prev_date = max(self.equity_curve.keys())
prev_value = self.equity_curve[prev_date]
current_value = self.Portfolio.TotalPortfolioValue
daily_return = (current_value / prev_value) - 1
self.daily_returns.append(daily_return)
self.equity_curve[current_date] = self.Portfolio.TotalPortfolioValue
self.btc_history[current_date] = data[self.btcSymbol].Close
# Update performance charts (less frequently to avoid quota issues)
if self.Time.minute == 0: # Update once per hour
self.UpdatePerformanceCharts(current_date)
# Check exit conditions for existing position
if self.Portfolio[self.symbol].Invested and self.position_entry_time is not None:
should_exit, exit_reason = self.ShouldExitPosition()
if should_exit:
position_type = "SHORT" if self.Portfolio[self.symbol].IsShort else "LONG"
self.ClosePosition(position_type, exit_reason)
return # Skip entry logic after an exit
# Get trade signal
action, signal_strength = self.GetSignals()
# Execute the trade
self.ExecuteTrade(action, signal_strength)
def UpdatePerformanceCharts(self, current_date):
"""Update performance charts"""
if len(self.equity_curve) <= 1:
return
# Update strategy vs benchmark performance
start_date = list(self.equity_curve.keys())[0]
initial_portfolio = self.equity_curve[start_date]
initial_btc = self.btc_history.get(start_date, 1)
current_portfolio = self.equity_curve[current_date]
current_btc = self.btc_history[current_date]
portfolio_return = (current_portfolio / initial_portfolio - 1) * 100
btc_return = (current_btc / initial_btc - 1) * 100
self.Plot("Performance", "Strategy", portfolio_return)
self.Plot("Performance", "Benchmark", btc_return)
# Plot drawdown
equity_values = list(self.equity_curve.values())
peak = max(equity_values)
current = equity_values[-1]
drawdown = (peak - current) / peak * 100 if peak > 0 else 0
self.Plot("Risk", "Drawdown", drawdown)
# Calculate profit factor if we have trades
if len(self.trade_history) > 0:
profits = sum(max(0, trade["pnl_pct"]) for trade in self.trade_history)
losses = sum(abs(min(0, trade["pnl_pct"])) for trade in self.trade_history)
profit_factor = profits / losses if losses > 0 else 1
self.Plot("Risk", "Profit_Factor", profit_factor)
def OnEndOfAlgorithm(self):
"""Report final performance metrics"""
if not hasattr(self, 'equity_curve') or not self.equity_curve:
self.Log("No portfolio history recorded.")
return
# Calculate Sharpe ratio
if hasattr(self, 'daily_returns') and len(self.daily_returns) > 0:
annual_sharpe = np.mean(self.daily_returns) / (np.std(self.daily_returns) + 1e-10) * np.sqrt(252)
self.Log(f"Strategy Sharpe Ratio: {annual_sharpe:.2f}")
# Calculate max drawdown
peak = 0
max_drawdown = 0
for date in sorted(self.equity_curve.keys()):
value = self.equity_curve[date]
if value > peak:
peak = value
drawdown = (peak - value) / peak if peak > 0 else 0
max_drawdown = max(max_drawdown, drawdown)
self.Log(f"Maximum Drawdown: {max_drawdown:.2%}")
# Log annual performance
self.LogAnnualPerformance()
# Log trade statistics
self.LogTradeStatistics()
# Log total performance
self.LogTotalPerformance()
# Analyze successful trades for future improvement
self.AnalyzeSuccessfulTrades()
def AnalyzeSuccessfulTrades(self):
"""Analyze characteristics of successful trades for future strategy improvement"""
if len(self.successful_trade_signals) < 3:
return # Not enough data for meaningful analysis
try:
# Extract key metrics
signal_strengths = [record.get('strength', 0) for record in self.successful_trade_signals]
trend_strengths = [record.get('trend_strength', 0) for record in self.successful_trade_signals]
volatilities = [record.get('volatility', 0) for record in self.successful_trade_signals]
self.Log("\n=== Successful Trade Analysis ===")
self.Log(f"Average Signal Strength: {np.mean(signal_strengths):.2f}")
self.Log(f"Average Trend Strength: {np.mean(trend_strengths):.2f}")
self.Log(f"Average Volatility: {np.mean(volatilities):.2f}%")
# Count successful trades by market regime
regime_counts = {}
for record in self.successful_trade_signals:
regime = record.get('market_regime', 'unknown')
regime_counts[regime] = regime_counts.get(regime, 0) + 1
total = len(self.successful_trade_signals)
self.Log("Market Regimes:")
for regime, count in regime_counts.items():
self.Log(f" {regime}: {count} trades ({count/total*100:.1f}%)")
except Exception as e:
self.Debug(f"Error analyzing successful trades: {str(e)}")
def LogAnnualPerformance(self):
"""Log annual performance summary"""
portfolio_by_year = {}
btc_by_year = {}
for date, value in self.equity_curve.items():
portfolio_by_year.setdefault(date.year, {})[date] = value
for date, price in self.btc_history.items():
btc_by_year.setdefault(date.year, {})[date] = price
self.Log("=== Annual Performance Summary ===")
self.Log("Year Strategy Return Benchmark Return Beat BTC?")
self.Log("-----------------------------------------------")
total_years = 0
years_beat_btc = 0
for year in sorted(portfolio_by_year.keys()):
if year not in btc_by_year:
continue
dates = sorted(portfolio_by_year[year].keys())
if not dates:
continue
start_value = portfolio_by_year[year][dates[0]]
end_value = portfolio_by_year[year][dates[-1]]
strategy_return = (end_value / start_value - 1) * 100
btc_dates = sorted(btc_by_year[year].keys())
if not btc_dates:
continue
btc_start = btc_by_year[year][btc_dates[0]]
btc_end = btc_by_year[year][btc_dates[-1]]
benchmark_return = (btc_end / btc_start - 1) * 100
beat = "✓" if strategy_return > benchmark_return else "✗"
self.Log(f"{year} {strategy_return:7.2f}% {benchmark_return:7.2f}% {beat}")
total_years += 1
if strategy_return > benchmark_return:
years_beat_btc += 1
win_rate = (years_beat_btc / total_years) * 100 if total_years > 0 else 0
self.Log(f"\nTotal Years: {total_years}")
self.Log(f"Years Beat BTC: {years_beat_btc}")
self.Log(f"Win Rate: {win_rate:.1f}%")
def LogTradeStatistics(self):
"""Log detailed trade statistics"""
if not hasattr(self, 'trade_history') or not self.trade_history:
self.Log("No trades recorded.")
return
winning_trades = [t for t in self.trade_history if t['pnl_pct'] > 0]
losing_trades = [t for t in self.trade_history if t['pnl_pct'] <= 0]
avg_winner = np.mean([t['pnl_pct'] for t in winning_trades]) * 100 if winning_trades else 0
avg_loser = np.mean([t['pnl_pct'] for t in losing_trades]) * 100 if losing_trades else 0
win_rate = len(winning_trades) / len(self.trade_history) * 100 if self.trade_history else 0
# Count trades by type
long_trades = len([t for t in self.trade_history if t['type'] == 'LONG'])
short_trades = len([t for t in self.trade_history if t['type'] == 'SHORT'])
# Count trades by exit reason
exit_reasons = {}
for trade in self.trade_history:
reason = trade.get('reason', 'unknown')
exit_reasons[reason] = exit_reasons.get(reason, 0) + 1
self.Log("\n=== Trade Performance ===")
self.Log(f"Total Trades: {len(self.trade_history)}")
self.Log(f"Long Trades: {long_trades}, Short Trades: {short_trades}")
self.Log(f"Winning Trades: {len(winning_trades)} ({win_rate:.1f}%)")
self.Log(f"Losing Trades: {len(losing_trades)} ({100 - win_rate:.1f}%)")
self.Log(f"Average Winner: {avg_winner:.2f}%")
self.Log(f"Average Loser: {avg_loser:.2f}%")
# Calculate profit factor
total_gains = sum(max(0, t['pnl_pct']) for t in self.trade_history)
total_losses = sum(abs(min(0, t['pnl_pct'])) for t in self.trade_history)
profit_factor = total_gains / total_losses if total_losses > 0 else float('inf')
self.Log(f"Profit Factor: {profit_factor:.2f}")
# Average holding period
avg_duration = np.mean([t['duration'].total_seconds() / 86400 for t in self.trade_history])
self.Log(f"Average Holding Period: {avg_duration:.1f} days")
# Top exit reasons
self.Log("\nExit Reasons:")
for reason, count in sorted(exit_reasons.items(), key=lambda x: x[1], reverse=True):
self.Log(f" {reason}: {count} trades ({count/len(self.trade_history)*100:.1f}%)")
def LogTotalPerformance(self):
"""Log overall strategy performance"""
first_date = min(self.equity_curve.keys())
last_date = max(self.equity_curve.keys())
initial_value = self.equity_curve[first_date]
final_value = self.equity_curve[last_date]
total_return = (final_value / initial_value - 1) * 100
if first_date in self.btc_history and last_date in self.btc_history:
initial_btc = self.btc_history[first_date]
final_btc = self.btc_history[last_date]
benchmark_return = (final_btc / initial_btc - 1) * 100
self.Log(f"\n=== Total Performance ===")
self.Log(f"Strategy Total Return: {total_return:.2f}%")
self.Log(f"Benchmark Total Return: {benchmark_return:.2f}%")
self.Log(f"Alpha: {total_return - benchmark_return:.2f}%")
# Annualized returns
years = (last_date - first_date).days / 365.25
annualized_return = ((1 + total_return/100) ** (1/years) - 1) * 100
annualized_benchmark = ((1 + benchmark_return/100) ** (1/years) - 1) * 100
self.Log(f"Annualized Return: {annualized_return:.2f}%")
self.Log(f"Annualized Benchmark: {annualized_benchmark:.2f}%")