Overall Statistics
Total Orders
1538
Average Win
1.07%
Average Loss
-0.89%
Compounding Annual Return
1.817%
Drawdown
24.500%
Expectancy
0.110
Start Equity
100000.00
End Equity
138316.42
Net Profit
38.316%
Sharpe Ratio
-0.069
Sortino Ratio
-0.074
Probabilistic Sharpe Ratio
0.003%
Loss Rate
50%
Win Rate
50%
Profit-Loss Ratio
1.21
Alpha
-0.005
Beta
-0.004
Annual Standard Deviation
0.065
Annual Variance
0.004
Information Ratio
0.21
Tracking Error
0.1
Treynor Ratio
1.005
Total Fees
$0.00
Estimated Strategy Capacity
$11000000.00
Lowest Capacity Asset
NZDUSD 8G
Portfolio Turnover
32.15%
from AlgorithmImports import *
import numpy as np
import pandas as pd


class ForexStochasticBollinger(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2007, 1, 1)  # Test period start
        self.SetEndDate(2024, 12, 31)  # Test period end
        self.SetCash(100000)
        
        # Set the brokerage model
        self.SetBrokerageModel(BrokerageName.OANDA_BROKERAGE, AccountType.Margin)

        # List of currency pairs
        self.pairs =  ["EURUSD", "GBPUSD", "USDCAD", "USDCHF", "AUDUSD", "NZDUSD", "USDJPY", "USDSEK", "USDNOK"]
        self.symbols = [self.AddForex(pair, Resolution.MINUTE).Symbol for pair in self.pairs]

        # Add VIX index as a data feed
        self.vix = self.AddData(CBOE, "VIX", Resolution.Daily).Symbol

        # Initialize indicators and data structures
        self.indicators = {symbol: (self.STO(symbol, 14*24*60, 1*24*60, 3*24*60), self.BB(symbol, 20*24*60, 2)) for symbol in self.symbols}
        self.daily_returns = {symbol: [] for symbol in self.symbols}
        self.last_trade_direction = {}
        self.traded_symbols_today = set()  # Track which symbols were traded today
        self.trades_today = 0

        self.multiplier = 0
        self.no_multiplier = 0

        self.strategy_executed_today = False  # Flag to track if the strategy has been executed

        # Initialize transaction log
        self.transaction_log = []

        # Initialize rolling window for ADX values
        recent_days = 126
        self.adx_window = {symbol: RollingWindow[float](recent_days) for symbol in self.symbols}

        # Track order IDs for stop-loss and take-profit
        self.order_ids = {}

        # Set warm-up period for indicators
        self.SetWarmUp(30*24*60, Resolution.Minute)

        # Schedule trading logic to run at 20:00 ET daily
        self.Schedule.On(self.DateRules.EveryDay(),
                            self.TimeRules.At(20, 0, TimeZones.NEW_YORK),
                            self.ExecuteStrategy)
     

    def LiquidatePositions(self):
        if not self.strategy_executed_today:
            return  # Skip if the strategy was not executed today

        transaction_date = self.Time

        for symbol in self.traded_symbols_today:
            if not self.Portfolio[symbol].Invested:
                continue
            
            # Cancel all stop-loss and take-profit levels not hit for this symbol
            self.Transactions.CancelOpenOrders()
       
            quantity = self.Portfolio[symbol].Quantity
            
            order = self.MarketOrder(symbol, -quantity, tag="EOD_Liquidation")

            self.Liquidate(symbol)
          

        # Schedule a check for remaining positions at 16:50 ET
        self.Schedule.On(self.DateRules.EveryDay(),
                        self.TimeRules.At(16, 50, TimeZones.NEW_YORK),
                        self.CheckAndLiquidateRemainingPositions)


        # Reset the strategy executed flag
        self.strategy_executed_today = False
        self.traded_symbols_today = set()
    

    def CheckAndLiquidateRemainingPositions(self):
        for symbol in self.Portfolio.Keys:
            if self.Portfolio[symbol].Invested:
                self.Debug(f"Liquidating remaining position for {symbol} at market.")
                self.Liquidate(symbol)


    def ExecuteStrategy(self):
        if self.IsWarmingUp:
            return

        # Check for restricted trading dates
        restricted_dates = [
            self.Time.date().replace(month=12, day=23),  # December 23
            self.Time.date().replace(month=12, day=24),  # December 24
            self.Time.date().replace(month=12, day=25),  # December 25
            self.Time.date().replace(month=12, day=30),  # December 30
            self.Time.date().replace(month=12, day=31),  # December 31
            self.Time.date().replace(month=1, day=1)     # January 1
        ]
        
        if self.Time.date() in restricted_dates:
            self.Debug(f"Skipping trades on restricted date: {self.Time.date()}")
            return

        if self.Time.weekday() == 4:
            self.Debug("Skipping trades on Friday.")
            return
        elif self.Time.weekday() == 5:
            self.Debug("Skipping trades on Saturday.")
            return


        transaction_date = self.Time
        self.Debug("For " + str(transaction_date) + ":")

        # Reset trades count at the start of the day
        self.trades_today = 0
        self.strategy_executed_today = False
        self.traded_symbols_today = set()
        

        # Get the last vix
        vix_history = self.History(self.vix, 1, Resolution.Daily)
        if not vix_history.empty:
            latest_vix = vix_history['close'].values[0]
        else:
            latest_vix = 0


        # Calculate the 75th percentile of vix in past 5 years
        vix_history = self.History(self.vix, 1000, Resolution.Daily)
        if not vix_history.empty:
            vix_75th_percentile = vix_history['close'].quantile(0.75)
        else:
            vix_75th_percentile = 30  # Default value if history is not available


        # Skip trades if the rolling average VIX is above the 75th percentile
        if latest_vix > vix_75th_percentile:
            self.Debug(f"Skipping trades due to high rolling VIX: {latest_vix:.2f} (75th percentile: {vix_75th_percentile:.2f})")
            return


        long_signals = []
        short_signals = []


        # Evaluate signals for each pair
        for symbol in self.symbols:
            stoch, bb = self.indicators[symbol]
            current_k = stoch.StochK.Current.Value
            current_d = stoch.StochD.Current.Value
            upper_band = bb.UpperBand.Current.Value
            lower_band = bb.LowerBand.Current.Value


            # Calculate ADX for the current day
            adx_value = self.CalculateADX(symbol)


            # Update the ADX rolling window
            self.adx_window[symbol].Add(adx_value)


            # Calculate Q3 ADX value for the recent days
            if self.adx_window[symbol].IsReady:
                adx_values = list(self.adx_window[symbol])
                adx_q3 = max(np.percentile(adx_values, 75), 40)
            else:
                adx_q3 = 40  # Default values if history is not available
            
            # Skip trade if ADX is outside Q1 and Q3
            if adx_value > adx_q3 or adx_value < 20:
                self.Debug(f"Skipping trade for {symbol} due to ADX {adx_value:.4f} (Q1: {20:.4f}, Q3: {adx_q3:.4f}).")
                continue


            history = self.History(symbol, 2, Resolution.Daily)
            if history.empty or len(history) < 2:
                prev_day_return = 0
            else:
                previous_close = history['close'].iloc[-2]
                current_close = history['close'].iloc[-1]
                prev_day_return = (current_close - previous_close) / previous_close


            # Check long entry condition
            if current_k < 20 and current_d < 20 and self.Securities[symbol].Price < lower_band:
                if symbol not in self.last_trade_direction or self.last_trade_direction[symbol] != 1 or (self.last_trade_direction[symbol] == 1 and prev_day_return > 0):
                    long_signals.append(symbol)
                elif self.last_trade_direction[symbol] == 1 and prev_day_return < 0:
                    self.Debug(f"Long signal detected for {symbol} but skipping long trade since a long trade has been entered previously and previous day's return is negative: {prev_day_return * 100:.2f}%")
                
            # Check short entry condition
            elif current_k > 80 and current_d > 80 and self.Securities[symbol].Price > upper_band:
                if symbol not in self.last_trade_direction or self.last_trade_direction[symbol] != -1 or (self.last_trade_direction[symbol] == -1 and prev_day_return < 0):
                    short_signals.append(symbol)
                elif self.last_trade_direction[symbol] == -1 and prev_day_return > 0:
                    self.Debug(f"Short signal detected for {symbol} but skipping short trade since a short trade has been entered previously and previous day's return is positive: {prev_day_return * 100:.2f}%")


        total_signals = len(long_signals) + len(short_signals)


        if total_signals > 0:
            self.Debug(f"Detected Long Signals: {', '.join(str(symbol.Value) for symbol in long_signals)}")
            self.Debug(f"Detected Short Signals: {', '.join(str(symbol.Value) for symbol in short_signals)}")
        else:
            self.Debug(f"No trade signal is detected for the day {transaction_date:%Y-%m-%d}.")


        # When signals > 2: Select the two least correlated pairs
        if total_signals > 2:
            # Calculate pairwise correlation for returns in the past 30 days
            history = self.History(long_signals + short_signals, 30, Resolution.Daily)
            if history.empty:
                self.Debug("No historical data returned.")
                return


            close_prices = history['close']
            price_data = close_prices.reset_index().pivot(index='time', columns='symbol', values='close')
            daily_returns = price_data.pct_change().dropna()
            correlation_matrix = daily_returns.corr()


            # Log the correlation matrix for debugging
            self.Debug(f"Correlation Matrix:\n{correlation_matrix}")
            
            # Find the pairs with the lowest absolute correlation
            lowest_abs_corr = float('inf')
            selected_symbols = None


            # Iterate through the correlation matrix
            for i in range(len(correlation_matrix.columns)):
                for j in range(i + 1, len(correlation_matrix.columns)):
                    current_corr = correlation_matrix.iloc[i, j]
                    abs_corr = abs(current_corr)
                    if abs_corr < lowest_abs_corr:
                        lowest_abs_corr = abs_corr
                        selected_symbols = (correlation_matrix.columns[i], correlation_matrix.columns[j])


            if selected_symbols is not None:
                self.Debug(f"Selected Symbols: {selected_symbols[0]} and {selected_symbols[1]} with lowest absolute correlation {lowest_abs_corr:.4f}")
                
                max_loss = 0.015
                # Execute trades for the selected symbols
                for symbol in selected_symbols:
                    if symbol in long_signals:
                        self.Trade(symbol, True, max_loss)
                    elif symbol in short_signals:
                        self.Trade(symbol, False, max_loss)
            else:
                #self.Debug("No pairs found with sufficient correlation.")
                return


        # When there are exactly 2 signals, execute both trades
        elif total_signals == 2:
            max_loss = 0.015
            selected_symbols = long_signals + short_signals
            for symbol in selected_symbols:
                if symbol in long_signals:
                    self.Trade(symbol, True, max_loss)
                elif symbol in short_signals:
                    self.Trade(symbol, False, max_loss)


        elif total_signals == 1:
            max_loss = 0.02
            selected_symbols = long_signals + short_signals
            for symbol in selected_symbols:
                if symbol in long_signals:
                    self.Trade(symbol, True, max_loss)
                elif symbol in short_signals:
                    self.Trade(symbol, False, max_loss)
        
        if self.strategy_executed_today:
            # Schedule liquidation for the next day at 16:45 ET
            self.Schedule.On(self.DateRules.EveryDay(),
                            self.TimeRules.At(16, 45, TimeZones.NEW_YORK),
                            self.LiquidatePositions)


    def Trade(self, symbol, long_signal, max_loss):


        if self.trades_today <= 2:

            history_22 = self.History(symbol, 22*24*60, Resolution.Minute)

            history_2 = self.History(symbol, 2, Resolution.Daily)
            if history_2.empty or len(history_2) < 2:
                return
            
            previous_low = history_2['low'].iloc[-2]
            current_low = history_2['low'].iloc[-1]
            previous_high = history_2['high'].iloc[-2]
            current_high = history_2['high'].iloc[-1]
        
            # Calculate log returns
            volatility_22 = history_22['close'].dropna().std()  # Standard deviation of log returns
            
            # Determine stop-loss and take-profit sizes as n standard deviations from the entry price
            stop_loss_size = 1 * volatility_22
            take_profit_size = 2.5 * volatility_22


            # Calculate position size based on stop-loss size
            if (long_signal and (previous_high < current_high or previous_low < current_low)) or ((not long_signal) and (previous_low > current_low or previous_high > current_high)):
                position_size = 2 * max_loss / stop_loss_size
                self.multiplier += 1
            else:
                position_size = max_loss / stop_loss_size
                self.no_multiplier += 1 

            # Calculate the quantity to trade
            quantity = self.CalculateOrderQuantity(symbol, position_size)

            if long_signal:

                self.Liquidate(symbol)
                self.MarketOrder(symbol, quantity, tag = "EnterPosition")
                self.last_trade_direction[symbol] = 1
                self.LogTransaction(symbol, quantity)  # Log the transaction
                
                # Set stop-loss order with a tag
                stop_loss_price = self.Securities[symbol].Price - stop_loss_size
                stop_loss_order = self.StopMarketOrder(symbol, -quantity, stop_loss_price, tag="SetStopLoss")
                self.order_ids[symbol] = {'stop_loss': stop_loss_order.OrderId}

                # Set take-profit orders with a tag
                take_profit_up = self.Securities[symbol].Price + take_profit_size
                take_profit_order = self.LimitOrder(symbol, -quantity, take_profit_up, tag="TakeProfitUpLimit")
                self.order_ids[symbol]['take_profit'] = take_profit_order.OrderId
                
            else:

                self.Liquidate(symbol)
                self.MarketOrder(symbol, -quantity, tag = "EnterPosition")
                self.last_trade_direction[symbol] = -1
                self.LogTransaction(symbol, -quantity)  # Log the transaction
                
                # Set stop-loss order with a tag
                stop_loss_price = self.Securities[symbol].Price + stop_loss_size
                stop_loss_order = self.StopMarketOrder(symbol, quantity, stop_loss_price, tag="SetStopLoss")
                self.order_ids[symbol] = {'stop_loss': stop_loss_order.OrderId}


                # Set take-profit orders with a tag
                take_profit_down = self.Securities[symbol].Price - take_profit_size
                take_profit_order = self.LimitOrder(symbol, quantity, take_profit_down, tag="TakeProfitDownLimit")
                self.order_ids[symbol]['take_profit'] = take_profit_order.OrderId
            
            self.strategy_executed_today = True
            self.traded_symbols_today.add(symbol)
            self.trades_today += 1


    def LogTransaction(self, symbol, amount, stop_loss=False, take_profit=False):
        transaction_date = self.Time
        transaction_type = "StopLoss" if stop_loss else ("TakeProfit" if take_profit else "Trade")
        self.transaction_log.append({
            "date": transaction_date,
            "pair": symbol,
            "amount": amount,
            "type": transaction_type
        })
        self.Debug(f"Logged Transaction: {transaction_date}, {symbol}, quantity: {amount:.4f}, Type: {transaction_type}")


    def OnOrderEvent(self, orderEvent):
        # Fetch the order associated with the order event
        order = self.Transactions.GetOrderById(orderEvent.OrderId)
        
        # Check if the order is filled and has a tag of "StopLoss"
        if orderEvent.Status == OrderStatus.Filled:
            symbol = order.Symbol
            fill_price = orderEvent.FillPrice
            fill_quantity = orderEvent.FillQuantity
            direction = "Long" if fill_quantity < 0 else "Short"  # Negative quantity indicates closing a long position
            
            # Log stop-loss events
            if order.Tag == "SetStopLoss":
                # Log the stop-loss event
                self.Debug(f"Stop-loss hit for {symbol} ({direction}) at price {fill_price:.4f} with quantity {abs(fill_quantity):.4f}")
                self.LogTransaction(symbol, fill_quantity, stop_loss=True)
                # Cancel the take-profit order
                if symbol in self.order_ids and 'take_profit' in self.order_ids[symbol]:
                    self.Transactions.CancelOrder(self.order_ids[symbol]['take_profit'])


            # Log take-profit events
            elif order.Tag == "TakeProfitUpLimit":
                self.Debug(f"Upward take-profit hit for {symbol} ({direction}) at price {fill_price:.4f} with quantity {abs(fill_quantity):.4f}")
                self.LogTransaction(symbol, fill_quantity, take_profit=True)
                # Cancel the stop-loss order
                if symbol in self.order_ids and 'stop_loss' in self.order_ids[symbol]:
                    self.Transactions.CancelOrder(self.order_ids[symbol]['stop_loss'])
            
            elif order.Tag == "TakeProfitDownLimit":
                self.Debug(f"Downward take-profit hit for {symbol} ({direction}) at price {fill_price:.4f} with quantity {abs(fill_quantity):.4f}")
                self.LogTransaction(symbol, fill_quantity, take_profit=True)
                # Cancel the stop-loss order
                if symbol in self.order_ids and 'stop_loss' in self.order_ids[symbol]:
                    self.Transactions.CancelOrder(self.order_ids[symbol]['stop_loss'])


    def CalculateADX(self, symbol, period=14):
        """Calculate ADX for the given symbol."""
        # Fetch historical data for ADX calculation
        history = self.History(symbol, period + 1, Resolution.Daily)
        if history.empty:
            return 0
        
        high = history['high'].values
        low = history['low'].values
        close = history['close'].values


        # Calculate the directional movements
        up_move = np.maximum(high[1:] - high[:-1], 0)
        down_move = np.maximum(low[:-1] - low[1:], 0)


        # Calculate True Range
        tr1 = high[1:] - low[1:]
        tr2 = abs(high[1:] - close[:-1])
        tr3 = abs(low[1:] - close[:-1])
        true_range = np.maximum(np.maximum(tr1, tr2), tr3)


        # Calculate the smoothed averages
        atr = np.mean(true_range[-period:])  # Average True Range
        up_movement_avg = np.mean(up_move[-period:])  # Average Up Movement
        down_movement_avg = np.mean(down_move[-period:])  # Average Down Movement


        # Calculate the Directional Index
        if up_movement_avg + down_movement_avg == 0:
            return 0
        plus_di = 100 * (up_movement_avg / atr)
        minus_di = 100 * (down_movement_avg / atr)

        # Calculate ADX
        adx = 100 * np.mean(np.abs(plus_di - minus_di) / (plus_di + minus_di)) if (plus_di + minus_di) != 0 else 0

        return adx
    
    def on_margin_call_warning(self) -> None:
        self.debug(f"Warning: Close to margin call")
    
    
    def OnEndOfAlgorithm(self):
        self.Debug(f"Number of instances where multiplier is applied: {self.multiplier}")
        self.Debug(f"Number of instances where no multiplier is applied: {self.no_multiplier}")