| Overall Statistics |
|
Total Orders 1835 Average Win 0.80% Average Loss -0.38% Compounding Annual Return 2.562% Drawdown 12.200% Expectancy 0.309 Start Equity 100000.00 End Equity 157733.54 Net Profit 57.734% Sharpe Ratio -0.014 Sortino Ratio -0.016 Probabilistic Sharpe Ratio 0.084% Loss Rate 57% Win Rate 43% Profit-Loss Ratio 2.07 Alpha -0.001 Beta -0 Annual Standard Deviation 0.043 Annual Variance 0.002 Information Ratio -0.39 Tracking Error 0.168 Treynor Ratio 1.93 Total Fees $0.00 Estimated Strategy Capacity $160000.00 Lowest Capacity Asset NZDUSD 8G Portfolio Turnover 22.72% |
from AlgorithmImports import *
import numpy as np
import pandas as pd
class ForexStochasticBollinger(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2007, 1, 1) # Test period start
self.SetEndDate(2024, 12, 31) # Test period end
self.SetCash(100000)
# List of currency pairs
self.pairs = ["EURUSD", "GBPUSD", "USDCAD", "USDCHF", "AUDUSD", "NZDUSD", "USDJPY", "USDSEK", "USDNOK"]
self.symbols = [self.AddForex(pair, Resolution.DAILY).Symbol for pair in self.pairs]
# Add VIX index as a data feed
self.vix = self.AddData(CBOE, "VIX", Resolution.Daily).Symbol
# Initialize indicators and data structures
self.indicators = {symbol: (self.STO(symbol, 14, 1, 3), self.BB(symbol, 20, 2)) for symbol in self.symbols}
self.daily_returns = {symbol: [] for symbol in self.symbols}
self.last_trade_direction = {}
self.trades_today = 0
self.multiplier = 0
self.no_multiplier = 0
# Initialize transaction log
self.transaction_log = []
# Initialize rolling window for ADX values
recent_days = 126
self.adx_window = {symbol: RollingWindow[float](recent_days) for symbol in self.symbols}
# Set warm-up period for indicators
self.SetWarmUp(30, Resolution.Daily)
self.set_time_zone("UTC")
# Schedule trading logic to run at 00:01 daily
self.Schedule.On(self.DateRules.EveryDay(),
self.TimeRules.At(0, 1),
self.ExecuteStrategy)
# Schedule liquidation event at 00:00 next day
self.Schedule.On(self.DateRules.EveryDay(),
self.TimeRules.At(0, 0),
self.LiquidatePositions)
self.Schedule.On(self.DateRules.Every([DayOfWeek.Friday]),
self.TimeRules.At(22, 0),
self.CloseOnFriday)
def LiquidatePositions(self):
#self.Debug("Liquidating all positions at end of day.")
self.liquidate()
def CloseOnFriday(self):
if self.Portfolio.Invested:
#self.Debug(f"{self.Time}: Closing all positions on Friday")
self.Liquidate()
def ExecuteStrategy(self):
if self.IsWarmingUp:
return
if self.Time.weekday == 4 or self.Time.weekday == 5:
#self.Debug("Do not act on signals on Fri, Sat")
return
# Reset trades count at the start of the day
self.trades_today = 0
# Get the last vix
vix_history = self.History(self.vix, 1, Resolution.Daily)
if not vix_history.empty:
latest_vix = vix_history['close'].values[0]
else:
latest_vix = 0
# Calculate the 75th percentile of vix in past 5 years
vix_history = self.History(self.vix, 1000, Resolution.Daily)
if not vix_history.empty:
vix_75th_percentile = vix_history['close'].quantile(0.75)
else:
vix_75th_percentile = 30 # Default value if history is not available
# Skip trades if the rolling average VIX is above the 75th percentile
if latest_vix > vix_75th_percentile:
self.Debug(f"Skipping trades due to high rolling VIX: {latest_vix:.2f} (75th percentile: {vix_75th_percentile:.2f})")
return
long_signals = []
short_signals = []
# Evaluate signals for each pair
for symbol in self.symbols:
stoch, bb = self.indicators[symbol]
current_k = stoch.StochK.Current.Value
current_d = stoch.StochD.Current.Value
upper_band = bb.UpperBand.Current.Value
lower_band = bb.LowerBand.Current.Value
# Calculate ADX for the current day
adx_value = self.CalculateADX(symbol)
# Update the ADX rolling window
self.adx_window[symbol].Add(adx_value)
# Calculate Q3 ADX value for the recent days
if self.adx_window[symbol].IsReady:
adx_values = list(self.adx_window[symbol])
adx_q3 = max(np.percentile(adx_values, 75), 40)
else:
adx_q3 = 40 # Default values if history is not available
# Skip trade if ADX is outside Q1 and Q3
if adx_value > adx_q3 or adx_value < 20:
self.Debug(f"Skipping trade for {symbol} due to ADX {adx_value:.4f} (Q1: {20:.4f}, Q3: {adx_q3:.4f}).")
continue
history = self.History(symbol, 2, Resolution.Daily)
if history.empty or len(history) < 2:
prev_day_return = 0
else:
previous_close = history['close'].iloc[-2]
current_close = history['close'].iloc[-1]
prev_day_return = (current_close - previous_close) / previous_close
# Check long entry condition
if current_k < 20 and current_d < 20 and self.Securities[symbol].Price < lower_band:
if symbol not in self.last_trade_direction or self.last_trade_direction[symbol] != 1 or (self.last_trade_direction[symbol] == 1 and prev_day_return > 0):
long_signals.append(symbol)
elif self.last_trade_direction[symbol] == 1 and prev_day_return < 0:
self.Debug(f"Long signal detected for {symbol} but skipping long trade since a long trade has been entered previously and previous day's return is negative: {prev_day_return * 100:.2f}%")
# Check short entry condition
elif current_k > 80 and current_d > 80 and self.Securities[symbol].Price > upper_band:
if symbol not in self.last_trade_direction or self.last_trade_direction[symbol] != -1 or (self.last_trade_direction[symbol] == -1 and prev_day_return < 0):
short_signals.append(symbol)
elif self.last_trade_direction[symbol] == -1 and prev_day_return > 0:
self.Debug(f"Short signal detected for {symbol} but skipping short trade since a short trade has been entered previously and previous day's return is positive: {prev_day_return * 100:.2f}%")
total_signals = len(long_signals) + len(short_signals)
if total_signals > 0:
transaction_date = self.Time
self.Debug("For " + str(transaction_date) + ":")
self.Debug(f"Detected Long Signals: {', '.join(str(symbol.Value) for symbol in long_signals)}")
self.Debug(f"Detected Short Signals: {', '.join(str(symbol.Value) for symbol in short_signals)}")
else:
transaction_date = str(self.Time)
self.Debug("For " + str(transaction_date) + ":")
self.Debug("No trade signal is detected for the day.")
# When signals > 2: Select the two least correlated pairs
if total_signals > 2:
# Calculate pairwise correlation for returns in the past 30 days
history = self.History(long_signals + short_signals, 30, Resolution.Daily)
if history.empty:
self.Debug("No historical data returned.")
return
close_prices = history['close']
price_data = close_prices.reset_index().pivot(index='time', columns='symbol', values='close')
daily_returns = price_data.pct_change().dropna()
correlation_matrix = daily_returns.corr()
# Log the correlation matrix for debugging
self.Debug(f"Correlation Matrix:\n{correlation_matrix}")
# Find the pairs with the lowest absolute correlation
lowest_abs_corr = float('inf')
selected_symbols = None
# Iterate through the correlation matrix
for i in range(len(correlation_matrix.columns)):
for j in range(i + 1, len(correlation_matrix.columns)):
current_corr = correlation_matrix.iloc[i, j]
abs_corr = abs(current_corr)
if abs_corr < lowest_abs_corr:
lowest_abs_corr = abs_corr
selected_symbols = (correlation_matrix.columns[i], correlation_matrix.columns[j])
if selected_symbols is not None:
self.Debug(f"Selected Symbols: {selected_symbols[0]} and {selected_symbols[1]} with lowest absolute correlation {lowest_abs_corr:.4f}")
max_loss = 0.015
# Execute trades for the selected symbols
for symbol in selected_symbols:
if symbol in long_signals:
self.Trade(symbol, True, max_loss)
elif symbol in short_signals:
self.Trade(symbol, False, max_loss)
else:
#self.Debug("No pairs found with sufficient correlation.")
return
# When there are exactly 2 signals, execute both trades
elif total_signals == 2:
max_loss = 0.015
selected_symbols = long_signals + short_signals
for symbol in selected_symbols:
if symbol in long_signals:
self.Trade(symbol, True, max_loss)
elif symbol in short_signals:
self.Trade(symbol, False, max_loss)
elif total_signals == 1:
max_loss = 0.02
selected_symbols = long_signals + short_signals
for symbol in selected_symbols:
if symbol in long_signals:
self.Trade(symbol, True, max_loss)
elif symbol in short_signals:
self.Trade(symbol, False, max_loss)
def Trade(self, symbol, long_signal, max_loss):
if self.trades_today < 2:
history_22 = self.History(symbol, 22, Resolution.Daily)
if history_22.empty or len(history_22) < 22:
self.Debug(f"Insufficient data to calculate volatility for {symbol}")
return
history_2 = self.History(symbol, 2, Resolution.Daily)
if history_2.empty or len(history_2) < 2:
pass
else:
previous_high = history_2['high'].iloc[-2]
current_high = history_2['high'].iloc[-1]
previous_low = history_2['low'].iloc[-2]
current_low = history_2['low'].iloc[-1]
# Calculate log returns
volatility_22 = history_22['close'].std() # Standard deviation of log returns
# Determine stop-loss and take-profit sizes as n standard deviations from the entry price
stop_loss_size = 1.5 * volatility_22
take_profit_size = 3 * volatility_22
# Calculate position size based on stop-loss size
if (long_signal and (previous_high < current_high or previous_low < current_low)) or ((not long_signal) and (previous_low > current_low or previous_high > current_high)):
position_size = 2 * max_loss / stop_loss_size
self.multiplier += 1
else:
position_size = max_loss / stop_loss_size
self.no_multiplier += 1
# Calculate the quantity to trade
quantity = self.CalculateOrderQuantity(symbol, position_size)
if long_signal:
self.Liquidate(symbol)
self.MarketOrder(symbol, quantity) # Enter long position
self.last_trade_direction[symbol] = 1
self.Debug(f"Entered Long for {symbol} with quantity {quantity}, stop-loss size {stop_loss_size:.4f}, and take-profit size {take_profit_size:.4f}")
self.LogTransaction(symbol, quantity) # Log the transaction
# Set stop-loss order with a tag
stop_loss_price = self.Securities[symbol].Price - stop_loss_size
self.StopMarketOrder(symbol, -quantity, stop_loss_price, tag="StopLoss")
# Set take-profit orders with a tag
take_profit_up = self.Securities[symbol].Price + take_profit_size
self.LimitOrder(symbol, -quantity, take_profit_up, tag="TakeProfitUp")
else:
self.Liquidate(symbol)
self.MarketOrder(symbol, -quantity) # Enter short position
self.last_trade_direction[symbol] = -1
self.Debug(f"Entered Short for {symbol} with quantity {quantity}, stop-loss size {stop_loss_size:.4f}, and take-profit size {take_profit_size:.4f}")
self.LogTransaction(symbol, -quantity) # Log the transaction
# Set stop-loss order with a tag
stop_loss_price = self.Securities[symbol].Price + stop_loss_size
self.StopMarketOrder(symbol, quantity, stop_loss_price, tag="StopLoss")
# Set take-profit orders with a tag
take_profit_down = self.Securities[symbol].Price - take_profit_size
self.LimitOrder(symbol, quantity, take_profit_down, tag="TakeProfitDown")
self.trades_today += 1
def LogTransaction(self, symbol, amount, stop_loss=False, take_profit=False):
transaction_date = self.Time # Get the current date and time
transaction_type = "StopLoss" if stop_loss else ("TakeProfit" if take_profit else "Trade")
self.transaction_log.append({
"date": transaction_date,
"pair": symbol,
"amount": amount,
"type": transaction_type
})
self.Debug(f"Logged Transaction: {transaction_date}, {symbol}, Amount: {amount:.4f}, Type: {transaction_type}")
def OnOrderEvent(self, orderEvent):
# Fetch the order associated with the order event
order = self.Transactions.GetOrderById(orderEvent.OrderId)
# Check if the order is filled and has a tag of "StopLoss"
if orderEvent.Status == OrderStatus.Filled:
symbol = order.Symbol
fill_price = orderEvent.FillPrice
fill_quantity = orderEvent.FillQuantity
direction = "Long" if fill_quantity < 0 else "Short" # Negative quantity indicates closing a long position
# Log stop-loss events
if order.Tag == "StopLoss":
# Log the stop-loss event
self.Debug(f"Stop-loss hit for {symbol} ({direction}) at price {fill_price:.4f} with quantity {abs(fill_quantity):.4f}")
self.LogTransaction(symbol, fill_quantity, stop_loss=True)
# Log take-profit events
elif order.Tag == "TakeProfitUp":
self.Debug(f"Upward take-profit hit for {symbol} ({direction}) at price {fill_price:.4f} with quantity {abs(fill_quantity):.4f}")
self.LogTransaction(symbol, fill_quantity, take_profit=True)
elif order.Tag == "TakeProfitDown":
self.Debug(f"Downward take-profit hit for {symbol} ({direction}) at price {fill_price:.4f} with quantity {abs(fill_quantity):.4f}")
self.LogTransaction(symbol, fill_quantity, take_profit=True)
def CalculateADX(self, symbol, period=14):
"""Calculate ADX for the given symbol."""
# Fetch historical data for ADX calculation
history = self.History(symbol, period + 1, Resolution.Daily)
if history.empty:
return 0
high = history['high'].values
low = history['low'].values
close = history['close'].values
# Calculate the directional movements
up_move = np.maximum(high[1:] - high[:-1], 0)
down_move = np.maximum(low[:-1] - low[1:], 0)
# Calculate True Range
tr1 = high[1:] - low[1:]
tr2 = abs(high[1:] - close[:-1])
tr3 = abs(low[1:] - close[:-1])
true_range = np.maximum(np.maximum(tr1, tr2), tr3)
# Calculate the smoothed averages
atr = np.mean(true_range[-period:]) # Average True Range
up_movement_avg = np.mean(up_move[-period:]) # Average Up Movement
down_movement_avg = np.mean(down_move[-period:]) # Average Down Movement
# Calculate the Directional Index
if up_movement_avg + down_movement_avg == 0:
return 0
plus_di = 100 * (up_movement_avg / atr)
minus_di = 100 * (down_movement_avg / atr)
# Calculate ADX
adx = 100 * np.mean(np.abs(plus_di - minus_di) / (plus_di + minus_di)) if (plus_di + minus_di) != 0 else 0
return adx
def OnEndOfAlgorithm(self):
self.Debug(f"Number of instances where multiplier is applied: {self.multiplier}")
self.Debug(f"Number of instances where no multiplier is applied: {self.no_multiplier}")