Overall Statistics
Total Orders
1538
Average Win
1.07%
Average Loss
-0.89%
Compounding Annual Return
1.817%
Drawdown
24.500%
Expectancy
0.110
Start Equity
100000.00
End Equity
138316.42
Net Profit
38.316%
Sharpe Ratio
-0.069
Sortino Ratio
-0.074
Probabilistic Sharpe Ratio
0.003%
Loss Rate
50%
Win Rate
50%
Profit-Loss Ratio
1.21
Alpha
-0.005
Beta
-0.004
Annual Standard Deviation
0.065
Annual Variance
0.004
Information Ratio
0.21
Tracking Error
0.1
Treynor Ratio
1.005
Total Fees
$0.00
Estimated Strategy Capacity
$11000000.00
Lowest Capacity Asset
NZDUSD 8G
Portfolio Turnover
32.15%
from AlgorithmImports import *
import numpy as np
import pandas as pd
 

class ForexStochasticBollinger(QCAlgorithm):
   def Initialize(self):
       self.SetStartDate(2007, 1, 1)  # Test period start
       self.SetEndDate(2024, 12, 31)  # Test period end
       self.SetCash(100000)
       
       # Set the brokerage model
       self.SetBrokerageModel(BrokerageName.OANDA_BROKERAGE, AccountType.Margin)
 
       # List of currency pairs
       self.pairs =  ["EURUSD", "GBPUSD", "USDCAD", "USDCHF", "AUDUSD", "NZDUSD", "USDJPY", "USDSEK", "USDNOK"]
       self.symbols = [self.AddForex(pair, Resolution.MINUTE).Symbol for pair in self.pairs]
 
       # Add VIX index as a data feed
       self.vix = self.AddData(CBOE, "VIX", Resolution.Daily).Symbol
 
       # Initialize indicators and data structures
       self.indicators = {symbol: (self.STO(symbol, 14*24*60, 1*24*60, 3*24*60), self.BB(symbol, 20*24*60, 2)) for symbol in self.symbols}
       self.daily_returns = {symbol: [] for symbol in self.symbols}
       self.last_trade_direction = {}
       self.traded_symbols_today = set()  # Track which symbols were traded today
       self.trades_today = 0
 
       self.multiplier = 0
       self.no_multiplier = 0
 
       self.strategy_executed_today = False  # Flag to track if the strategy has been executed
 
       # Initialize transaction log
       self.transaction_log = []
 
       # Initialize rolling window for ADX values
       recent_days = 126
       self.adx_window = {symbol: RollingWindow[float](recent_days) for symbol in self.symbols}
 
       # Track order IDs for stop-loss and take-profit
       self.order_ids = {}
 
       # Set warm-up period for indicators
       self.SetWarmUp(30*24*60, Resolution.Minute)
 
       # Schedule trading logic to run at 20:00 ET daily
       self.Schedule.On(self.DateRules.EveryDay(),
                           self.TimeRules.At(20, 0, TimeZones.NEW_YORK),
                           self.ExecuteStrategy)
   
 
   def LiquidatePositions(self):
       if not self.strategy_executed_today:
           return  # Skip if the strategy was not executed today
 
       transaction_date = self.Time
 
       for symbol in self.traded_symbols_today:
           if not self.Portfolio[symbol].Invested:
               continue
           
           # Cancel all stop-loss and take-profit levels not hit for this symbol
           self.Transactions.CancelOpenOrders()
     
           quantity = self.Portfolio[symbol].Quantity
           
           order = self.MarketOrder(symbol, -quantity, tag="EOD_Liquidation")
 
           self.Liquidate(symbol)
         
 
       # Schedule a check for remaining positions at 16:50 ET
       self.Schedule.On(self.DateRules.EveryDay(),
                       self.TimeRules.At(16, 50, TimeZones.NEW_YORK),
                       self.CheckAndLiquidateRemainingPositions)
 

       # Reset the strategy executed flag
       self.strategy_executed_today = False
       self.traded_symbols_today = set()
   
 
   def CheckAndLiquidateRemainingPositions(self):
       for symbol in self.Portfolio.Keys:
           if self.Portfolio[symbol].Invested:
               self.Debug(f"Liquidating remaining position for {symbol} at market.")
               self.Liquidate(symbol)
 

   def ExecuteStrategy(self):
       if self.IsWarmingUp:
           return
 
       # Check for restricted trading dates
       restricted_dates = [
           self.Time.date().replace(month=12, day=23),  # December 23
           self.Time.date().replace(month=12, day=24),  # December 24
           self.Time.date().replace(month=12, day=25),  # December 25
           self.Time.date().replace(month=12, day=30),  # December 30
           self.Time.date().replace(month=12, day=31),  # December 31
           self.Time.date().replace(month=1, day=1)     # January 1
       ]
       
       if self.Time.date() in restricted_dates:
           self.Debug(f"Skipping trades on restricted date: {self.Time.date()}")
           return
 
       if self.Time.weekday() == 4:
           self.Debug("Skipping trades on Friday.")
           return
       elif self.Time.weekday() == 5:
           self.Debug("Skipping trades on Saturday.")
           return
 

       transaction_date = self.Time
       self.Debug("For " + str(transaction_date) + ":")
 
       # Reset trades count at the start of the day
       self.trades_today = 0
       self.strategy_executed_today = False
       self.traded_symbols_today = set()
       
 
       # Get the last vix
       vix_history = self.History(self.vix, 1, Resolution.Daily)
       if not vix_history.empty:
           latest_vix = vix_history['close'].values[0]
       else:
           latest_vix = 0
 

       # Calculate the 75th percentile of vix in past 5 years
       vix_history = self.History(self.vix, 1000, Resolution.Daily)
       if not vix_history.empty:
           vix_75th_percentile = vix_history['close'].quantile(0.75)
       else:
           vix_75th_percentile = 30  # Default value if history is not available
 

       # Skip trades if the rolling average VIX is above the 75th percentile
       if latest_vix > vix_75th_percentile:
           self.Debug(f"Skipping trades due to high rolling VIX: {latest_vix:.2f} (75th percentile: {vix_75th_percentile:.2f})")
           return
 

       long_signals = []
       short_signals = []
 

       # Evaluate signals for each pair
       for symbol in self.symbols:
           stoch, bb = self.indicators[symbol]
           current_k = stoch.StochK.Current.Value
           current_d = stoch.StochD.Current.Value
           upper_band = bb.UpperBand.Current.Value
           lower_band = bb.LowerBand.Current.Value
 

           # Calculate ADX for the current day
           adx_value = self.CalculateADX(symbol)
 

           # Update the ADX rolling window
           self.adx_window[symbol].Add(adx_value)
 

           # Calculate Q3 ADX value for the recent days
           if self.adx_window[symbol].IsReady:
               adx_values = list(self.adx_window[symbol])
               adx_q3 = max(np.percentile(adx_values, 75), 40)
           else:
               adx_q3 = 40  # Default values if history is not available
           
           # Skip trade if ADX is outside Q1 and Q3
           if adx_value > adx_q3 or adx_value < 20:
               self.Debug(f"Skipping trade for {symbol} due to ADX {adx_value:.4f} (Q1: {20:.4f}, Q3: {adx_q3:.4f}).")
               continue
 

           history = self.History(symbol, 2, Resolution.Daily)
           if history.empty or len(history) < 2:
               prev_day_return = 0
           else:
               previous_close = history['close'].iloc[-2]
               current_close = history['close'].iloc[-1]
               prev_day_return = (current_close - previous_close) / previous_close
 

           # Check long entry condition
           if current_k < 20 and current_d < 20 and self.Securities[symbol].Price < lower_band:
               if symbol not in self.last_trade_direction or self.last_trade_direction[symbol] != 1 or (self.last_trade_direction[symbol] == 1 and prev_day_return > 0):
                   long_signals.append(symbol)
               elif self.last_trade_direction[symbol] == 1 and prev_day_return < 0:
                   self.Debug(f"Long signal detected for {symbol} but skipping long trade since a long trade has been entered previously and previous day's return is negative: {prev_day_return * 100:.2f}%")
               
           # Check short entry condition
           elif current_k > 80 and current_d > 80 and self.Securities[symbol].Price > upper_band:
               if symbol not in self.last_trade_direction or self.last_trade_direction[symbol] != -1 or (self.last_trade_direction[symbol] == -1 and prev_day_return < 0):
                   short_signals.append(symbol)
               elif self.last_trade_direction[symbol] == -1 and prev_day_return > 0:
                   self.Debug(f"Short signal detected for {symbol} but skipping short trade since a short trade has been entered previously and previous day's return is positive: {prev_day_return * 100:.2f}%")
 

       total_signals = len(long_signals) + len(short_signals)
 

       if total_signals > 0:
           self.Debug(f"Detected Long Signals: {', '.join(str(symbol.Value) for symbol in long_signals)}")
           self.Debug(f"Detected Short Signals: {', '.join(str(symbol.Value) for symbol in short_signals)}")
       else:
           self.Debug(f"No trade signal is detected for the day {transaction_date:%Y-%m-%d}.")
 

       # When signals > 2: Select the two least correlated pairs
       if total_signals > 2:
           # Calculate pairwise correlation for returns in the past 30 days
           history = self.History(long_signals + short_signals, 30, Resolution.Daily)
           if history.empty:
               self.Debug("No historical data returned.")
               return
 

           close_prices = history['close']
           price_data = close_prices.reset_index().pivot(index='time', columns='symbol', values='close')
           daily_returns = price_data.pct_change().dropna()
           correlation_matrix = daily_returns.corr()
 

           # Log the correlation matrix for debugging
           self.Debug(f"Correlation Matrix:\n{correlation_matrix}")
           
           # Find the pairs with the lowest absolute correlation
           lowest_abs_corr = float('inf')
           selected_symbols = None
 

           # Iterate through the correlation matrix
           for i in range(len(correlation_matrix.columns)):
               for j in range(i + 1, len(correlation_matrix.columns)):
                   current_corr = correlation_matrix.iloc[i, j]
                   abs_corr = abs(current_corr)
                   if abs_corr < lowest_abs_corr:
                       lowest_abs_corr = abs_corr
                       selected_symbols = (correlation_matrix.columns[i], correlation_matrix.columns[j])
 

           if selected_symbols is not None:
               self.Debug(f"Selected Symbols: {selected_symbols[0]} and {selected_symbols[1]} with lowest absolute correlation {lowest_abs_corr:.4f}")
               
               max_loss = 0.015
               # Execute trades for the selected symbols
               for symbol in selected_symbols:
                   if symbol in long_signals:
                       self.Trade(symbol, True, max_loss)
                   elif symbol in short_signals:
                       self.Trade(symbol, False, max_loss)
           else:
               #self.Debug("No pairs found with sufficient correlation.")
               return
 

       # When there are exactly 2 signals, execute both trades
       elif total_signals == 2:
           max_loss = 0.015
           selected_symbols = long_signals + short_signals
           for symbol in selected_symbols:
               if symbol in long_signals:
                   self.Trade(symbol, True, max_loss)
               elif symbol in short_signals:
                   self.Trade(symbol, False, max_loss)
 

       elif total_signals == 1:
           max_loss = 0.02
           selected_symbols = long_signals + short_signals
           for symbol in selected_symbols:
               if symbol in long_signals:
                   self.Trade(symbol, True, max_loss)
               elif symbol in short_signals:
                   self.Trade(symbol, False, max_loss)
       
       if self.strategy_executed_today:
           # Schedule liquidation for the next day at 16:45 ET
           self.Schedule.On(self.DateRules.EveryDay(),
                           self.TimeRules.At(16, 45, TimeZones.NEW_YORK),
                           self.LiquidatePositions)
 

   def Trade(self, symbol, long_signal, max_loss):
 

       if self.trades_today <= 2:
 
           history_22 = self.History(symbol, 22*24*60, Resolution.Minute)
 
           history_2 = self.History(symbol, 2, Resolution.Daily)
           if history_2.empty or len(history_2) < 2:
               return
           
           previous_low = history_2['low'].iloc[-2]
           current_low = history_2['low'].iloc[-1]
           previous_high = history_2['high'].iloc[-2]
           current_high = history_2['high'].iloc[-1]
       
           # Calculate log returns
           volatility_22 = history_22['close'].dropna().std()  # Standard deviation of log returns
           
           # Determine stop-loss and take-profit sizes as n standard deviations from the entry price
           stop_loss_size = 1 * volatility_22
           take_profit_size = 2.5 * volatility_22
 

           # Calculate position size based on stop-loss size
           if (long_signal and (previous_high < current_high or previous_low < current_low)) or ((not long_signal) and (previous_low > current_low or previous_high > current_high)):
               position_size = 2 * max_loss / stop_loss_size
               self.multiplier += 1
           else:
               position_size = max_loss / stop_loss_size
               self.no_multiplier += 1
 
           # Calculate the quantity to trade
           quantity = self.CalculateOrderQuantity(symbol, position_size)
 
           if long_signal:
 
               self.Liquidate(symbol)
               self.MarketOrder(symbol, quantity, tag = "EnterPosition")
               self.last_trade_direction[symbol] = 1
               self.LogTransaction(symbol, quantity)  # Log the transaction
               
               # Set stop-loss order with a tag
               stop_loss_price = self.Securities[symbol].Price - stop_loss_size
               stop_loss_order = self.StopMarketOrder(symbol, -quantity, stop_loss_price, tag="SetStopLoss")
               self.order_ids[symbol] = {'stop_loss': stop_loss_order.OrderId}
 
               # Set take-profit orders with a tag
               take_profit_up = self.Securities[symbol].Price + take_profit_size
               take_profit_order = self.LimitOrder(symbol, -quantity, take_profit_up, tag="TakeProfitUpLimit")
               self.order_ids[symbol]['take_profit'] = take_profit_order.OrderId
               
           else:
 
               self.Liquidate(symbol)
               self.MarketOrder(symbol, -quantity, tag = "EnterPosition")
               self.last_trade_direction[symbol] = -1
               self.LogTransaction(symbol, -quantity)  # Log the transaction
               
               # Set stop-loss order with a tag
               stop_loss_price = self.Securities[symbol].Price + stop_loss_size
               stop_loss_order = self.StopMarketOrder(symbol, quantity, stop_loss_price, tag="SetStopLoss")
               self.order_ids[symbol] = {'stop_loss': stop_loss_order.OrderId}
 

               # Set take-profit orders with a tag
               take_profit_down = self.Securities[symbol].Price - take_profit_size
               take_profit_order = self.LimitOrder(symbol, quantity, take_profit_down, tag="TakeProfitDownLimit")
               self.order_ids[symbol]['take_profit'] = take_profit_order.OrderId
           
           self.strategy_executed_today = True
           self.traded_symbols_today.add(symbol)
           self.trades_today += 1
 

   def LogTransaction(self, symbol, amount, stop_loss=False, take_profit=False):
       transaction_date = self.Time
       transaction_type = "StopLoss" if stop_loss else ("TakeProfit" if take_profit else "Trade")
       self.transaction_log.append({
           "date": transaction_date,
           "pair": symbol,
           "amount": amount,
           "type": transaction_type
       })
       self.Debug(f"Logged Transaction: {transaction_date}, {symbol}, quantity: {amount:.4f}, Type: {transaction_type}")
 

   def OnOrderEvent(self, orderEvent):
       # Fetch the order associated with the order event
       order = self.Transactions.GetOrderById(orderEvent.OrderId)
       
       # Check if the order is filled and has a tag of "StopLoss"
       if orderEvent.Status == OrderStatus.Filled:
           symbol = order.Symbol
           fill_price = orderEvent.FillPrice
           fill_quantity = orderEvent.FillQuantity
           direction = "Long" if fill_quantity < 0 else "Short"  # Negative quantity indicates closing a long position
           
           # Log stop-loss events
           if order.Tag == "SetStopLoss":
               # Log the stop-loss event
               self.Debug(f"Stop-loss hit for {symbol} ({direction}) at price {fill_price:.4f} with quantity {abs(fill_quantity):.4f}")
               self.LogTransaction(symbol, fill_quantity, stop_loss=True)
               # Cancel the take-profit order
               if symbol in self.order_ids and 'take_profit' in self.order_ids[symbol]:
                   self.Transactions.CancelOrder(self.order_ids[symbol]['take_profit'])
 

           # Log take-profit events
           elif order.Tag == "TakeProfitUpLimit":
               self.Debug(f"Upward take-profit hit for {symbol} ({direction}) at price {fill_price:.4f} with quantity {abs(fill_quantity):.4f}")
               self.LogTransaction(symbol, fill_quantity, take_profit=True)
               # Cancel the stop-loss order
               if symbol in self.order_ids and 'stop_loss' in self.order_ids[symbol]:
                   self.Transactions.CancelOrder(self.order_ids[symbol]['stop_loss'])
           
           elif order.Tag == "TakeProfitDownLimit":
               self.Debug(f"Downward take-profit hit for {symbol} ({direction}) at price {fill_price:.4f} with quantity {abs(fill_quantity):.4f}")
               self.LogTransaction(symbol, fill_quantity, take_profit=True)
               # Cancel the stop-loss order
               if symbol in self.order_ids and 'stop_loss' in self.order_ids[symbol]:
                   self.Transactions.CancelOrder(self.order_ids[symbol]['stop_loss'])
 

   def CalculateADX(self, symbol, period=14):
       """Calculate ADX for the given symbol."""
       # Fetch historical data for ADX calculation
       history = self.History(symbol, period + 1, Resolution.Daily)
       if history.empty:
           return 0
       
       high = history['high'].values
       low = history['low'].values
       close = history['close'].values
 

       # Calculate the directional movements
       up_move = np.maximum(high[1:] - high[:-1], 0)
       down_move = np.maximum(low[:-1] - low[1:], 0)
 

       # Calculate True Range
       tr1 = high[1:] - low[1:]
       tr2 = abs(high[1:] - close[:-1])
       tr3 = abs(low[1:] - close[:-1])
       true_range = np.maximum(np.maximum(tr1, tr2), tr3)
 

       # Calculate the smoothed averages
       atr = np.mean(true_range[-period:])  # Average True Range
       up_movement_avg = np.mean(up_move[-period:])  # Average Up Movement
       down_movement_avg = np.mean(down_move[-period:])  # Average Down Movement
 

       # Calculate the Directional Index
       if up_movement_avg + down_movement_avg == 0:
           return 0
       plus_di = 100 * (up_movement_avg / atr)
       minus_di = 100 * (down_movement_avg / atr)
 
       # Calculate ADX
       adx = 100 * np.mean(np.abs(plus_di - minus_di) / (plus_di + minus_di)) if (plus_di + minus_di) != 0 else 0
 
       return adx
   
   def on_margin_call_warning(self) -> None:
       self.debug(f"Warning: Close to margin call")
   
   
   def OnEndOfAlgorithm(self):
       self.Debug(f"Number of instances where multiplier is applied: {self.multiplier}")
       self.Debug(f"Number of instances where no multiplier is applied: {self.no_multiplier}")