| Overall Statistics |
|
Total Orders 1538 Average Win 1.07% Average Loss -0.89% Compounding Annual Return 1.817% Drawdown 24.500% Expectancy 0.110 Start Equity 100000.00 End Equity 138316.42 Net Profit 38.316% Sharpe Ratio -0.069 Sortino Ratio -0.074 Probabilistic Sharpe Ratio 0.003% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 1.21 Alpha -0.005 Beta -0.004 Annual Standard Deviation 0.065 Annual Variance 0.004 Information Ratio 0.21 Tracking Error 0.1 Treynor Ratio 1.005 Total Fees $0.00 Estimated Strategy Capacity $11000000.00 Lowest Capacity Asset NZDUSD 8G Portfolio Turnover 32.15% |
from AlgorithmImports import *
import numpy as np
import pandas as pd
class ForexStochasticBollinger(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2007, 1, 1) # Test period start
self.SetEndDate(2024, 12, 31) # Test period end
self.SetCash(100000)
# Set the brokerage model
self.SetBrokerageModel(BrokerageName.OANDA_BROKERAGE, AccountType.Margin)
# List of currency pairs
self.pairs = ["EURUSD", "GBPUSD", "USDCAD", "USDCHF", "AUDUSD", "NZDUSD", "USDJPY", "USDSEK", "USDNOK"]
self.symbols = [self.AddForex(pair, Resolution.MINUTE).Symbol for pair in self.pairs]
# Add VIX index as a data feed
self.vix = self.AddData(CBOE, "VIX", Resolution.Daily).Symbol
# Initialize indicators and data structures
self.indicators = {symbol: (self.STO(symbol, 14*24*60, 1*24*60, 3*24*60), self.BB(symbol, 20*24*60, 2)) for symbol in self.symbols}
self.daily_returns = {symbol: [] for symbol in self.symbols}
self.last_trade_direction = {}
self.traded_symbols_today = set() # Track which symbols were traded today
self.trades_today = 0
self.multiplier = 0
self.no_multiplier = 0
self.strategy_executed_today = False # Flag to track if the strategy has been executed
# Initialize transaction log
self.transaction_log = []
# Initialize rolling window for ADX values
recent_days = 126
self.adx_window = {symbol: RollingWindow[float](recent_days) for symbol in self.symbols}
# Track order IDs for stop-loss and take-profit
self.order_ids = {}
# Set warm-up period for indicators
self.SetWarmUp(30*24*60, Resolution.Minute)
# Schedule trading logic to run at 20:00 ET daily
self.Schedule.On(self.DateRules.EveryDay(),
self.TimeRules.At(20, 0, TimeZones.NEW_YORK),
self.ExecuteStrategy)
def LiquidatePositions(self):
if not self.strategy_executed_today:
return # Skip if the strategy was not executed today
transaction_date = self.Time
for symbol in self.traded_symbols_today:
if not self.Portfolio[symbol].Invested:
continue
# Cancel all stop-loss and take-profit levels not hit for this symbol
self.Transactions.CancelOpenOrders()
quantity = self.Portfolio[symbol].Quantity
order = self.MarketOrder(symbol, -quantity, tag="EOD_Liquidation")
self.Liquidate(symbol)
# Schedule a check for remaining positions at 16:50 ET
self.Schedule.On(self.DateRules.EveryDay(),
self.TimeRules.At(16, 50, TimeZones.NEW_YORK),
self.CheckAndLiquidateRemainingPositions)
# Reset the strategy executed flag
self.strategy_executed_today = False
self.traded_symbols_today = set()
def CheckAndLiquidateRemainingPositions(self):
for symbol in self.Portfolio.Keys:
if self.Portfolio[symbol].Invested:
self.Debug(f"Liquidating remaining position for {symbol} at market.")
self.Liquidate(symbol)
def ExecuteStrategy(self):
if self.IsWarmingUp:
return
# Check for restricted trading dates
restricted_dates = [
self.Time.date().replace(month=12, day=23), # December 23
self.Time.date().replace(month=12, day=24), # December 24
self.Time.date().replace(month=12, day=25), # December 25
self.Time.date().replace(month=12, day=30), # December 30
self.Time.date().replace(month=12, day=31), # December 31
self.Time.date().replace(month=1, day=1) # January 1
]
if self.Time.date() in restricted_dates:
self.Debug(f"Skipping trades on restricted date: {self.Time.date()}")
return
if self.Time.weekday() == 4:
self.Debug("Skipping trades on Friday.")
return
elif self.Time.weekday() == 5:
self.Debug("Skipping trades on Saturday.")
return
transaction_date = self.Time
self.Debug("For " + str(transaction_date) + ":")
# Reset trades count at the start of the day
self.trades_today = 0
self.strategy_executed_today = False
self.traded_symbols_today = set()
# Get the last vix
vix_history = self.History(self.vix, 1, Resolution.Daily)
if not vix_history.empty:
latest_vix = vix_history['close'].values[0]
else:
latest_vix = 0
# Calculate the 75th percentile of vix in past 5 years
vix_history = self.History(self.vix, 1000, Resolution.Daily)
if not vix_history.empty:
vix_75th_percentile = vix_history['close'].quantile(0.75)
else:
vix_75th_percentile = 30 # Default value if history is not available
# Skip trades if the rolling average VIX is above the 75th percentile
if latest_vix > vix_75th_percentile:
self.Debug(f"Skipping trades due to high rolling VIX: {latest_vix:.2f} (75th percentile: {vix_75th_percentile:.2f})")
return
long_signals = []
short_signals = []
# Evaluate signals for each pair
for symbol in self.symbols:
stoch, bb = self.indicators[symbol]
current_k = stoch.StochK.Current.Value
current_d = stoch.StochD.Current.Value
upper_band = bb.UpperBand.Current.Value
lower_band = bb.LowerBand.Current.Value
# Calculate ADX for the current day
adx_value = self.CalculateADX(symbol)
# Update the ADX rolling window
self.adx_window[symbol].Add(adx_value)
# Calculate Q3 ADX value for the recent days
if self.adx_window[symbol].IsReady:
adx_values = list(self.adx_window[symbol])
adx_q3 = max(np.percentile(adx_values, 75), 40)
else:
adx_q3 = 40 # Default values if history is not available
# Skip trade if ADX is outside Q1 and Q3
if adx_value > adx_q3 or adx_value < 20:
self.Debug(f"Skipping trade for {symbol} due to ADX {adx_value:.4f} (Q1: {20:.4f}, Q3: {adx_q3:.4f}).")
continue
history = self.History(symbol, 2, Resolution.Daily)
if history.empty or len(history) < 2:
prev_day_return = 0
else:
previous_close = history['close'].iloc[-2]
current_close = history['close'].iloc[-1]
prev_day_return = (current_close - previous_close) / previous_close
# Check long entry condition
if current_k < 20 and current_d < 20 and self.Securities[symbol].Price < lower_band:
if symbol not in self.last_trade_direction or self.last_trade_direction[symbol] != 1 or (self.last_trade_direction[symbol] == 1 and prev_day_return > 0):
long_signals.append(symbol)
elif self.last_trade_direction[symbol] == 1 and prev_day_return < 0:
self.Debug(f"Long signal detected for {symbol} but skipping long trade since a long trade has been entered previously and previous day's return is negative: {prev_day_return * 100:.2f}%")
# Check short entry condition
elif current_k > 80 and current_d > 80 and self.Securities[symbol].Price > upper_band:
if symbol not in self.last_trade_direction or self.last_trade_direction[symbol] != -1 or (self.last_trade_direction[symbol] == -1 and prev_day_return < 0):
short_signals.append(symbol)
elif self.last_trade_direction[symbol] == -1 and prev_day_return > 0:
self.Debug(f"Short signal detected for {symbol} but skipping short trade since a short trade has been entered previously and previous day's return is positive: {prev_day_return * 100:.2f}%")
total_signals = len(long_signals) + len(short_signals)
if total_signals > 0:
self.Debug(f"Detected Long Signals: {', '.join(str(symbol.Value) for symbol in long_signals)}")
self.Debug(f"Detected Short Signals: {', '.join(str(symbol.Value) for symbol in short_signals)}")
else:
self.Debug(f"No trade signal is detected for the day {transaction_date:%Y-%m-%d}.")
# When signals > 2: Select the two least correlated pairs
if total_signals > 2:
# Calculate pairwise correlation for returns in the past 30 days
history = self.History(long_signals + short_signals, 30, Resolution.Daily)
if history.empty:
self.Debug("No historical data returned.")
return
close_prices = history['close']
price_data = close_prices.reset_index().pivot(index='time', columns='symbol', values='close')
daily_returns = price_data.pct_change().dropna()
correlation_matrix = daily_returns.corr()
# Log the correlation matrix for debugging
self.Debug(f"Correlation Matrix:\n{correlation_matrix}")
# Find the pairs with the lowest absolute correlation
lowest_abs_corr = float('inf')
selected_symbols = None
# Iterate through the correlation matrix
for i in range(len(correlation_matrix.columns)):
for j in range(i + 1, len(correlation_matrix.columns)):
current_corr = correlation_matrix.iloc[i, j]
abs_corr = abs(current_corr)
if abs_corr < lowest_abs_corr:
lowest_abs_corr = abs_corr
selected_symbols = (correlation_matrix.columns[i], correlation_matrix.columns[j])
if selected_symbols is not None:
self.Debug(f"Selected Symbols: {selected_symbols[0]} and {selected_symbols[1]} with lowest absolute correlation {lowest_abs_corr:.4f}")
max_loss = 0.015
# Execute trades for the selected symbols
for symbol in selected_symbols:
if symbol in long_signals:
self.Trade(symbol, True, max_loss)
elif symbol in short_signals:
self.Trade(symbol, False, max_loss)
else:
#self.Debug("No pairs found with sufficient correlation.")
return
# When there are exactly 2 signals, execute both trades
elif total_signals == 2:
max_loss = 0.015
selected_symbols = long_signals + short_signals
for symbol in selected_symbols:
if symbol in long_signals:
self.Trade(symbol, True, max_loss)
elif symbol in short_signals:
self.Trade(symbol, False, max_loss)
elif total_signals == 1:
max_loss = 0.02
selected_symbols = long_signals + short_signals
for symbol in selected_symbols:
if symbol in long_signals:
self.Trade(symbol, True, max_loss)
elif symbol in short_signals:
self.Trade(symbol, False, max_loss)
if self.strategy_executed_today:
# Schedule liquidation for the next day at 16:45 ET
self.Schedule.On(self.DateRules.EveryDay(),
self.TimeRules.At(16, 45, TimeZones.NEW_YORK),
self.LiquidatePositions)
def Trade(self, symbol, long_signal, max_loss):
if self.trades_today <= 2:
history_22 = self.History(symbol, 22*24*60, Resolution.Minute)
history_2 = self.History(symbol, 2, Resolution.Daily)
if history_2.empty or len(history_2) < 2:
return
previous_low = history_2['low'].iloc[-2]
current_low = history_2['low'].iloc[-1]
previous_high = history_2['high'].iloc[-2]
current_high = history_2['high'].iloc[-1]
# Calculate log returns
volatility_22 = history_22['close'].dropna().std() # Standard deviation of log returns
# Determine stop-loss and take-profit sizes as n standard deviations from the entry price
stop_loss_size = 1 * volatility_22
take_profit_size = 2.5 * volatility_22
# Calculate position size based on stop-loss size
if (long_signal and (previous_high < current_high or previous_low < current_low)) or ((not long_signal) and (previous_low > current_low or previous_high > current_high)):
position_size = 2 * max_loss / stop_loss_size
self.multiplier += 1
else:
position_size = max_loss / stop_loss_size
self.no_multiplier += 1
# Calculate the quantity to trade
quantity = self.CalculateOrderQuantity(symbol, position_size)
if long_signal:
self.Liquidate(symbol)
self.MarketOrder(symbol, quantity, tag = "EnterPosition")
self.last_trade_direction[symbol] = 1
self.LogTransaction(symbol, quantity) # Log the transaction
# Set stop-loss order with a tag
stop_loss_price = self.Securities[symbol].Price - stop_loss_size
stop_loss_order = self.StopMarketOrder(symbol, -quantity, stop_loss_price, tag="SetStopLoss")
self.order_ids[symbol] = {'stop_loss': stop_loss_order.OrderId}
# Set take-profit orders with a tag
take_profit_up = self.Securities[symbol].Price + take_profit_size
take_profit_order = self.LimitOrder(symbol, -quantity, take_profit_up, tag="TakeProfitUpLimit")
self.order_ids[symbol]['take_profit'] = take_profit_order.OrderId
else:
self.Liquidate(symbol)
self.MarketOrder(symbol, -quantity, tag = "EnterPosition")
self.last_trade_direction[symbol] = -1
self.LogTransaction(symbol, -quantity) # Log the transaction
# Set stop-loss order with a tag
stop_loss_price = self.Securities[symbol].Price + stop_loss_size
stop_loss_order = self.StopMarketOrder(symbol, quantity, stop_loss_price, tag="SetStopLoss")
self.order_ids[symbol] = {'stop_loss': stop_loss_order.OrderId}
# Set take-profit orders with a tag
take_profit_down = self.Securities[symbol].Price - take_profit_size
take_profit_order = self.LimitOrder(symbol, quantity, take_profit_down, tag="TakeProfitDownLimit")
self.order_ids[symbol]['take_profit'] = take_profit_order.OrderId
self.strategy_executed_today = True
self.traded_symbols_today.add(symbol)
self.trades_today += 1
def LogTransaction(self, symbol, amount, stop_loss=False, take_profit=False):
transaction_date = self.Time
transaction_type = "StopLoss" if stop_loss else ("TakeProfit" if take_profit else "Trade")
self.transaction_log.append({
"date": transaction_date,
"pair": symbol,
"amount": amount,
"type": transaction_type
})
self.Debug(f"Logged Transaction: {transaction_date}, {symbol}, quantity: {amount:.4f}, Type: {transaction_type}")
def OnOrderEvent(self, orderEvent):
# Fetch the order associated with the order event
order = self.Transactions.GetOrderById(orderEvent.OrderId)
# Check if the order is filled and has a tag of "StopLoss"
if orderEvent.Status == OrderStatus.Filled:
symbol = order.Symbol
fill_price = orderEvent.FillPrice
fill_quantity = orderEvent.FillQuantity
direction = "Long" if fill_quantity < 0 else "Short" # Negative quantity indicates closing a long position
# Log stop-loss events
if order.Tag == "SetStopLoss":
# Log the stop-loss event
self.Debug(f"Stop-loss hit for {symbol} ({direction}) at price {fill_price:.4f} with quantity {abs(fill_quantity):.4f}")
self.LogTransaction(symbol, fill_quantity, stop_loss=True)
# Cancel the take-profit order
if symbol in self.order_ids and 'take_profit' in self.order_ids[symbol]:
self.Transactions.CancelOrder(self.order_ids[symbol]['take_profit'])
# Log take-profit events
elif order.Tag == "TakeProfitUpLimit":
self.Debug(f"Upward take-profit hit for {symbol} ({direction}) at price {fill_price:.4f} with quantity {abs(fill_quantity):.4f}")
self.LogTransaction(symbol, fill_quantity, take_profit=True)
# Cancel the stop-loss order
if symbol in self.order_ids and 'stop_loss' in self.order_ids[symbol]:
self.Transactions.CancelOrder(self.order_ids[symbol]['stop_loss'])
elif order.Tag == "TakeProfitDownLimit":
self.Debug(f"Downward take-profit hit for {symbol} ({direction}) at price {fill_price:.4f} with quantity {abs(fill_quantity):.4f}")
self.LogTransaction(symbol, fill_quantity, take_profit=True)
# Cancel the stop-loss order
if symbol in self.order_ids and 'stop_loss' in self.order_ids[symbol]:
self.Transactions.CancelOrder(self.order_ids[symbol]['stop_loss'])
def CalculateADX(self, symbol, period=14):
"""Calculate ADX for the given symbol."""
# Fetch historical data for ADX calculation
history = self.History(symbol, period + 1, Resolution.Daily)
if history.empty:
return 0
high = history['high'].values
low = history['low'].values
close = history['close'].values
# Calculate the directional movements
up_move = np.maximum(high[1:] - high[:-1], 0)
down_move = np.maximum(low[:-1] - low[1:], 0)
# Calculate True Range
tr1 = high[1:] - low[1:]
tr2 = abs(high[1:] - close[:-1])
tr3 = abs(low[1:] - close[:-1])
true_range = np.maximum(np.maximum(tr1, tr2), tr3)
# Calculate the smoothed averages
atr = np.mean(true_range[-period:]) # Average True Range
up_movement_avg = np.mean(up_move[-period:]) # Average Up Movement
down_movement_avg = np.mean(down_move[-period:]) # Average Down Movement
# Calculate the Directional Index
if up_movement_avg + down_movement_avg == 0:
return 0
plus_di = 100 * (up_movement_avg / atr)
minus_di = 100 * (down_movement_avg / atr)
# Calculate ADX
adx = 100 * np.mean(np.abs(plus_di - minus_di) / (plus_di + minus_di)) if (plus_di + minus_di) != 0 else 0
return adx
def on_margin_call_warning(self) -> None:
self.debug(f"Warning: Close to margin call")
def OnEndOfAlgorithm(self):
self.Debug(f"Number of instances where multiplier is applied: {self.multiplier}")
self.Debug(f"Number of instances where no multiplier is applied: {self.no_multiplier}")