Overall Statistics
Total Orders
53
Average Win
3.55%
Average Loss
-0.67%
Compounding Annual Return
133.048%
Drawdown
9.800%
Expectancy
2.163
Start Equity
1000.00
End Equity
1425.83
Net Profit
42.583%
Sharpe Ratio
3.023
Sortino Ratio
3.443
Probabilistic Sharpe Ratio
85.556%
Loss Rate
50%
Win Rate
50%
Profit-Loss Ratio
5.33
Alpha
0.777
Beta
0.423
Annual Standard Deviation
0.268
Annual Variance
0.072
Information Ratio
2.699
Tracking Error
0.271
Treynor Ratio
1.915
Total Fees
$0.00
Estimated Strategy Capacity
$56000.00
Lowest Capacity Asset
BTCUSDT 2XR
Portfolio Turnover
32.75%
Drawdown Recovery
42
from AlgorithmImports import *
import numpy as np
import pandas as pd
from collections import deque

class BTCStrategy(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2024, 9, 1)  # Start of your data
        self.SetEndDate(2025, 1, 31)  # End of your data
        self.SetCash(1000)  # Initial capital
        self.symbol = self.add_crypto("BTCUSDT", Resolution.HOUR).Symbol # Using native crypto data
        self.data_hourly = deque(maxlen=10000)  # Store hourly data
        self.data_4h = deque(maxlen=10000)  # Store 4-hour data
        self.data_daily = deque(maxlen=10000)  # Store daily data
        self.current_trade = 0
        self.stop_loss = 0
        self.last_below = 0
        self.leverage = 1
    

    def OnData(self, data):
        # Collect hourly data
        if self.symbol in data and data[self.symbol] is not None:
            bar = data[self.symbol]
            self.data_hourly.append({
                'datetime': bar.Time,
                'open': bar.Open,
                'high': bar.High,
                'low': bar.Low,
                'close': bar.Close
            })

            # Derive 4-hour data by aggregating every 4 hours
            if len(self.data_hourly) % 4 == 0 and len(self.data_hourly) >= 4:
                hour_data = list(self.data_hourly)[-4:]
                df_hour = pd.DataFrame(hour_data)
                df_4h = pd.DataFrame({
                    'datetime': [hour_data[-1]['datetime']],
                    'open': [hour_data[0]['open']],
                    'high': [df_hour['high'].max()],
                    'low': [df_hour['low'].min()],
                    'close': [hour_data[-1]['close']]
                })
                self.data_4h.append(df_4h.iloc[0].to_dict())

            # Derive daily data by aggregating at end of day
            if bar.Time.hour == 23 and bar.Time.minute == 0:  # End of day approximation
                day_data = list(self.data_hourly)[-24:] if len(self.data_hourly) >= 24 else list(self.data_hourly)
                df_day = pd.DataFrame(day_data)
                df_daily = pd.DataFrame({
                    'datetime': [day_data[-1]['datetime']],
                    'open': [day_data[0]['open']],
                    'high': [df_day['high'].max()],
                    'low': [df_day['low'].min()],
                    'close': [day_data[-1]['close']]
                })
                self.data_daily.append(df_daily.iloc[0].to_dict())

            # Skip until enough 4-hour data (210 hours = 52.5 4-hour periods)
            # if len(self.data_hourly) < 210:
            #     return

        # Convert to DataFrame for processing
        df_4h = pd.DataFrame(list(self.data_4h))
        df_daily = pd.DataFrame(list(self.data_daily))
        if df_4h.empty or df_daily.empty or len(df_4h) < 1 or len(df_daily) < 1:
            return

        # Process data
        df_4h, df_daily = self.process_data(df_4h, df_daily)
        if len(df_4h) < 210 or df_4h['close'].isna().all() or df_daily['close'].isna().all():
            return
        result_data = self.strat(df_4h, df_daily)
        signal = result_data['signals'].iloc[-1]

        # Execute trades based on signals
        if signal == 1 and self.current_trade == 0:
            self.SetHoldings(self.symbol, 0.95 * self.leverage)  # 95% of balance
            self.current_trade = 1
            self.stop_loss = result_data['close'].iloc[-1] - 2 * result_data['atr1'].iloc[-1]
            self.Debug(f"Long Entry at {result_data['close'].iloc[-1]}")

        elif signal == -1 and self.current_trade == 1:
            self.Liquidate(self.symbol)
            self.current_trade = 0
            self.stop_loss = 0
            self.Debug(f"Long Exit at {result_data['close'].iloc[-1]}")

        elif signal == -1 and self.current_trade == 0:
            self.SetHoldings(self.symbol, -0.95 * self.leverage)  # Short 95% of balance
            self.current_trade = -1
            self.stop_loss = result_data['close'].iloc[-1] + 2 * result_data['atr1'].iloc[-1]
            self.Debug(f"Short Entry at {result_data['close'].iloc[-1]}")

        elif signal == 1 and self.current_trade == -1:
            self.Liquidate(self.symbol)
            self.SetHoldings(self.symbol, 0.95 * self.leverage)
            self.current_trade = 1
            self.stop_loss = result_data['close'].iloc[-1] - 2 * result_data['atr1'].iloc[-1]
            self.Debug(f"Short to Long at {result_data['close'].iloc[-1]}")

        elif signal == 2 and self.current_trade == -1:
            self.Liquidate(self.symbol)
            self.SetHoldings(self.symbol, 0.95 * self.leverage)
            self.current_trade = 1
            self.stop_loss = result_data['close'].iloc[-1] - 2 * result_data['atr1'].iloc[-1]
            self.Debug(f"Type 2 Reverse at {result_data['close'].iloc[-1]}")

        # Stop loss check
        if self.current_trade == 1 and result_data['close'].iloc[-1] < self.stop_loss:
            self.Liquidate(self.symbol)
            self.current_trade = 0
            self.stop_loss = 0
            self.Debug(f"Stop Loss Exit Long at {result_data['close'].iloc[-1]}")

        elif self.current_trade == -1 and result_data['close'].iloc[-1] > self.stop_loss:
            self.Liquidate(self.symbol)
            self.current_trade = 0
            self.stop_loss = 0
            self.Debug(f"Stop Loss Exit Short at {result_data['close'].iloc[-1]}")

        # Trailing stop
        # if self.current_trade == 1 and self.stop_loss != 0:
        #     self.stop_loss = max(self.stop_loss, result_data['close'].iloc[-1] - 2 * result_data['atr1'].iloc[-1])
        # elif self.current_trade == -1 and self.stop_loss != 0:
        #     self.stop_loss = min(self.stop_loss, result_data['close'].iloc[-1] + 2 * result_data['atr1'].iloc[-1])

    def process_data(self, data, data2):
        # RSI Calculation
        def RSI(data, window):
            delta = data['close'].diff(1)
            gain = (delta.where(delta > 0, 0)).rolling(window=window, min_periods=1).mean()
            loss = (-delta.where(delta < 0, 0)).rolling(window=window, min_periods=1).mean()
            rs = gain / loss
            return 100 - (100 / (1 + rs.replace([np.inf, -np.inf], np.nan).fillna(0)))

        data['RSI'] = RSI(data, window=14)
        data2['DRSI'] = RSI(data2, window=14)
        data['5_EMA'] = data['close'].ewm(span=5, adjust=False).mean()
        data['9_EMA'] = data['close'].ewm(span=9, adjust=False).mean()
        data['60_SMA'] = data['close'].rolling(window=60, min_periods=1).mean()
        data['5_SMA'] = data['close'].rolling(window=5, min_periods=1).mean()

        # Manual ATR Calculation (replacing pandas_ta.atr)
        def calculate_atr(high, low, close, window):
            tr = pd.concat([
                high - low,
                abs(high - close.shift(1)),
                abs(low - close.shift(1))
            ], axis=1).max(axis=1)
            return tr.rolling(window=window, min_periods=1).mean()

        data['atr1'] = calculate_atr(data['high'], data['low'], data['close'], 10)
        data2['atr2'] = calculate_atr(data2['high'], data2['low'], data2['close'], 10)
        data['atr_ma'] = data['atr1'].rolling(window=10, min_periods=1).mean()
        data['vol_factor'] = (data['atr1'] / data['atr_ma']).fillna(1)

        data['atr'] = calculate_atr(np.log(data['high']), np.log(data['low']), np.log(data['close']), 16)
        data['norm_range'] = (np.log(data['high']) - np.log(data['low'])) / data['atr']
        data['v_hawk'] = self.hawkes_process(data['norm_range'], 0.1)

        data['q05'] = data['v_hawk'].rolling(112, min_periods=1).quantile(0.05)
        data['q95'] = data['v_hawk'].rolling(115, min_periods=1).quantile(0.95)
        data['q70'] = data['v_hawk'].rolling(115, min_periods=1).quantile(0.7)
        data['last_6_high'] = data['high'].rolling(window=6, min_periods=1).max()
        data['last_6_low'] = data['low'].rolling(window=6, min_periods=1).min()

        data['lookback'] = (22 * (1 + data['vol_factor'])).clip(upper=30).round().astype(int)
        data['rsi_upper'] = 90 + 5 * (data['v_hawk'] / data['q95'].fillna(1))
        data['rsi_lower'] = 60 - 5 * (data['v_hawk'] / data['q95'].fillna(1))
        data['fast_ema_period'] = (5 + 5 * data['vol_factor']).round().astype(int)
        data['slow_ema_period'] = (9 + 9 * data['vol_factor']).round().astype(int)

        for i in range(len(data)):
            if i >= 210:
                fast_period = min(5, data['fast_ema_period'].iloc[i])
                slow_period = min(9, data['slow_ema_period'].iloc[i])
                data.loc[i, 'fast_ema'] = data['close'].iloc[i-fast_period:i+1].ewm(span=fast_period).mean().iloc[-1]
                data.loc[i, 'slow_ema'] = data['close'].iloc[i-slow_period:i+1].ewm(span=slow_period).mean().iloc[-1]

        return data, data2

    def hawkes_process(self, data_series, kappa):
        alpha = np.exp(-kappa)
        output = np.zeros(len(data_series))
        output[:] = np.nan
        for i in range(1, len(data_series)):
            if np.isnan(output[i - 1]):
                output[i] = data_series.iloc[i]
            else:
                output[i] = output[i - 1] * alpha + data_series.iloc[i]
        return pd.Series(output, index=data_series.index) * kappa

    def strat(self, data, data2):
        data['signals'] = 0
        data['trade_type'] = ''
        if len(data) == 0 or len(data2) == 0:
            return data
        k = next((i for i, x in enumerate(data['close']) if abs(x - data2['close'].iloc[0]) < 0.01), 0)  # Approximate match

        def RSI_Divergence(look_back, rolling_window):
            start_idx = max(0, look_back - rolling_window)
            highest_high = 0
            rsi_high = 0
            highest_high_2 = 0
            rsi_high_2 = 0
            for j in range(look_back-1, start_idx, -1):
                if j < len(data2):
                    if data2.iloc[j]['high'] > highest_high:
                        highest_high_2 = highest_high
                        highest_high = data2.iloc[j]['high']
                        rsi_high_2 = rsi_high
                        rsi_high = data2.iloc[j]['DRSI']
            return highest_high, rsi_high, highest_high_2, rsi_high_2

        def RSI_Divergence1(look_back, rolling_window):
            start_idx = max(0, look_back - rolling_window)
            highest_low = float('inf')
            rsi_low = 100
            highest_low2 = float('inf')
            rsi_low2 = 100
            for j in range(look_back-1, start_idx, -1):
                if j < len(data):
                    if data.iloc[j]['low'] < highest_low:
                        highest_low2 = highest_low
                        highest_low = data.iloc[j]['low']
                        rsi_low2 = rsi_low
                        rsi_low = data.iloc[j]['RSI']
            return highest_low, rsi_low, highest_low2, rsi_low2

        def RSI_Divergence2(look_back, rolling_window):
            start_idx = max(0, look_back - rolling_window)
            highest_high1 = 0
            rsi_high1 = 0
            highest_high1_2 = 0
            rsi_high1_2 = 0
            for j in range(look_back-1, start_idx, -1):
                if j < len(data):
                    if data.iloc[j]['high'] > highest_high1:
                        highest_high1_2 = highest_high1
                        highest_high1 = data.iloc[j]['high']
                        rsi_high1_2 = rsi_high1
                        rsi_high1 = data.iloc[j]['RSI']
            return highest_high1, rsi_high1, highest_high1_2, rsi_high1_2

        curr_sig = 0
        for i in range(len(data)):
            if i < 210:
                continue

            p = int((i - k) / 6)
            if p >= len(data2):
                p = len(data2) - 1 if len(data2) > 0 else 0
            rolling_window = 30 
            last_high, rsi_high, last_high_2, rsi_high_2 = RSI_Divergence(p, rolling_window)
            last_high1, rsi_high1, last_high1_2, rsi_high1_2 = RSI_Divergence2(i, rolling_window)
            last_low, rsi_low, last_low2, rsi_low2 = RSI_Divergence1(i, rolling_window)

            atr_multiplier = 2 
            rsi_lower_bound = 20 
            rsi_upper_bound = 70 
            rsi_daily_upper = 50 
            last_high_low_lookback = int(6)
            er_threshold = 0.1 
            er_threshold1 = 0.05 

            if data['v_hawk'].iloc[i] < data['q70'].iloc[i]:
                self.last_below = i
                continue

            if ((data['v_hawk'].iloc[i] > data['q95'].iloc[i]) and (data['v_hawk'].iloc[i - 2] <= data['q95'].iloc[i - 2])):
                change = data['close'].iloc[i] - data['close'].iloc[self.last_below]
                if curr_sig == 0 and data['fast_ema'].iloc[i] < data['close'].iloc[i] and change > 0:
                    data.loc[i, 'signals'] = 1
                    curr_sig = 1
                    data.loc[i, 'trade_type'] = 'long'
                    self.stop_loss = data['close'].iloc[i] - atr_multiplier * data['atr1'].iloc[i]
                    data.loc[i, 'leverage'] = self.leverage

                elif curr_sig == -1 and data['fast_ema'].iloc[i] < data['close'].iloc[i] and change > 0:
                    data.loc[i, 'signals'] = 2
                    curr_sig = 1
                    data.loc[i, 'trade_type'] = 'reverse_short'
                    self.stop_loss = data['close'].iloc[i] - atr_multiplier * data['atr1'].iloc[i]
                    data.loc[i, 'leverage'] = self.leverage

                elif curr_sig == 1 and (data['close'].iloc[i] < data['60_SMA'].iloc[i] or data['RSI'].iloc[i] > data['rsi_upper'].iloc[i] or data['RSI'].iloc[i] < data['rsi_lower'].iloc[i]):
                    data.loc[i, 'signals'] = -1
                    curr_sig = 0
                    data.loc[i, 'trade_type'] = 'close'
                    self.stop_loss = 0

            elif curr_sig == 0 and (i - k) % 6 == 0 and p < len(data2) and data2.iloc[p]['high'] > last_high and data2.iloc[p]['DRSI'] < rsi_high and data2.iloc[p]['open'] > data2.iloc[p]['close'] and data2.iloc[p]['DRSI'] > rsi_daily_upper:
                data.loc[i, 'signals'] = -1
                curr_sig = -1
                data.loc[i, 'trade_type'] = 'short'
                self.stop_loss = data2.iloc[p]['high']
                data.loc[i, 'leverage'] = self.leverage

            elif curr_sig == 0 and data.iloc[i]['high'] > last_high1 and data.iloc[i]['RSI'] < rsi_high1 and data.iloc[i]['open'] > data.iloc[i]['close'] and data.iloc[i]['RSI'] > rsi_upper_bound:
                data.loc[i, 'signals'] = -1
                curr_sig = -1
                data.loc[i, 'trade_type'] = 'short'
                self.stop_loss = data['high'].rolling(window=last_high_low_lookback).max().iloc[i]
                data.loc[i, 'leverage'] = self.leverage

            elif curr_sig == 0 and data.iloc[i]['low'] < last_low and data.iloc[i]['RSI'] > rsi_low and data.iloc[i]['RSI'] < rsi_lower_bound and data.iloc[i]['open'] < data.iloc[i]['close']:
                data.loc[i, 'signals'] = 1
                curr_sig = 1
                data.loc[i, 'trade_type'] = 'long'
                self.stop_loss = data['low'].rolling(window=last_high_low_lookback).min().iloc[i]
                data.loc[i, 'leverage'] = self.leverage

            elif curr_sig == 0 and last_low < last_low2 and data.iloc[i]['low'] < last_low2 and data.iloc[i]['RSI'] > rsi_low2 and data.iloc[i]['RSI'] < rsi_lower_bound and data.iloc[i]['open'] < data.iloc[i]['close']:
                data.loc[i, 'signals'] = 1
                curr_sig = 1
                data.loc[i, 'trade_type'] = 'long'
                self.stop_loss = data['low'].rolling(window=last_high_low_lookback).min().iloc[i]
                data.loc[i, 'leverage'] = self.leverage

            elif curr_sig == 0 and data['close'].iloc[i] < data['slow_ema'].iloc[i] and data['close'].iloc[i] < data['open'].iloc[i] and data['low'].iloc[i-1] > data['slow_ema'].iloc[i-1] and data['close'].iloc[i-1] < data['open'].iloc[i-1]:
                data.loc[i, 'signals'] = -1
                curr_sig = -1
                data.loc[i, 'trade_type'] = 'short'
                self.stop_loss = data['high'].rolling(window=last_high_low_lookback).max().iloc[i]
                data.loc[i, 'leverage'] = self.leverage

            elif curr_sig == -1 and data['high'].iloc[i] > data['slow_ema'].iloc[i]:
                data.loc[i, 'signals'] = 1
                curr_sig = 0
                data.loc[i, 'trade_type'] = 'close'
                self.stop_loss = 0

            elif curr_sig == -1 and data['close'].iloc[i] > self.stop_loss:
                data.loc[i, 'signals'] = 1
                curr_sig = 0
                data.loc[i, 'trade_type'] = 'close'
                self.stop_loss = 0

            elif curr_sig == 1 and data['close'].iloc[i] < self.stop_loss:
                data.loc[i, 'signals'] = -1
                curr_sig = 0
                data.loc[i, 'trade_type'] = 'close'
                self.stop_loss = 0

            elif curr_sig == 1 and self.stop_loss != 0:
                self.stop_loss = max(self.stop_loss, data['close'].iloc[i] - atr_multiplier * data['atr1'].iloc[i])
            # elif curr_sig == -1 and self.stop_loss != 0:
            #     self.stop_loss = min(self.stop_loss, data['close'].iloc[i] + atr_multiplier * data['atr1'].iloc[i])
            else:
                data.loc[i, 'signals'] = 0

        return data

    def OnEndOfDay(self):
        pass