| Overall Statistics |
|
Total Orders 53 Average Win 3.55% Average Loss -0.67% Compounding Annual Return 133.048% Drawdown 9.800% Expectancy 2.163 Start Equity 1000.00 End Equity 1425.83 Net Profit 42.583% Sharpe Ratio 3.023 Sortino Ratio 3.443 Probabilistic Sharpe Ratio 85.556% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 5.33 Alpha 0.777 Beta 0.423 Annual Standard Deviation 0.268 Annual Variance 0.072 Information Ratio 2.699 Tracking Error 0.271 Treynor Ratio 1.915 Total Fees $0.00 Estimated Strategy Capacity $56000.00 Lowest Capacity Asset BTCUSDT 2XR Portfolio Turnover 32.75% Drawdown Recovery 42 |
from AlgorithmImports import *
import numpy as np
import pandas as pd
from collections import deque
class BTCStrategy(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2024, 9, 1) # Start of your data
self.SetEndDate(2025, 1, 31) # End of your data
self.SetCash(1000) # Initial capital
self.symbol = self.add_crypto("BTCUSDT", Resolution.HOUR).Symbol # Using native crypto data
self.data_hourly = deque(maxlen=10000) # Store hourly data
self.data_4h = deque(maxlen=10000) # Store 4-hour data
self.data_daily = deque(maxlen=10000) # Store daily data
self.current_trade = 0
self.stop_loss = 0
self.last_below = 0
self.leverage = 1
def OnData(self, data):
# Collect hourly data
if self.symbol in data and data[self.symbol] is not None:
bar = data[self.symbol]
self.data_hourly.append({
'datetime': bar.Time,
'open': bar.Open,
'high': bar.High,
'low': bar.Low,
'close': bar.Close
})
# Derive 4-hour data by aggregating every 4 hours
if len(self.data_hourly) % 4 == 0 and len(self.data_hourly) >= 4:
hour_data = list(self.data_hourly)[-4:]
df_hour = pd.DataFrame(hour_data)
df_4h = pd.DataFrame({
'datetime': [hour_data[-1]['datetime']],
'open': [hour_data[0]['open']],
'high': [df_hour['high'].max()],
'low': [df_hour['low'].min()],
'close': [hour_data[-1]['close']]
})
self.data_4h.append(df_4h.iloc[0].to_dict())
# Derive daily data by aggregating at end of day
if bar.Time.hour == 23 and bar.Time.minute == 0: # End of day approximation
day_data = list(self.data_hourly)[-24:] if len(self.data_hourly) >= 24 else list(self.data_hourly)
df_day = pd.DataFrame(day_data)
df_daily = pd.DataFrame({
'datetime': [day_data[-1]['datetime']],
'open': [day_data[0]['open']],
'high': [df_day['high'].max()],
'low': [df_day['low'].min()],
'close': [day_data[-1]['close']]
})
self.data_daily.append(df_daily.iloc[0].to_dict())
# Skip until enough 4-hour data (210 hours = 52.5 4-hour periods)
# if len(self.data_hourly) < 210:
# return
# Convert to DataFrame for processing
df_4h = pd.DataFrame(list(self.data_4h))
df_daily = pd.DataFrame(list(self.data_daily))
if df_4h.empty or df_daily.empty or len(df_4h) < 1 or len(df_daily) < 1:
return
# Process data
df_4h, df_daily = self.process_data(df_4h, df_daily)
if len(df_4h) < 210 or df_4h['close'].isna().all() or df_daily['close'].isna().all():
return
result_data = self.strat(df_4h, df_daily)
signal = result_data['signals'].iloc[-1]
# Execute trades based on signals
if signal == 1 and self.current_trade == 0:
self.SetHoldings(self.symbol, 0.95 * self.leverage) # 95% of balance
self.current_trade = 1
self.stop_loss = result_data['close'].iloc[-1] - 2 * result_data['atr1'].iloc[-1]
self.Debug(f"Long Entry at {result_data['close'].iloc[-1]}")
elif signal == -1 and self.current_trade == 1:
self.Liquidate(self.symbol)
self.current_trade = 0
self.stop_loss = 0
self.Debug(f"Long Exit at {result_data['close'].iloc[-1]}")
elif signal == -1 and self.current_trade == 0:
self.SetHoldings(self.symbol, -0.95 * self.leverage) # Short 95% of balance
self.current_trade = -1
self.stop_loss = result_data['close'].iloc[-1] + 2 * result_data['atr1'].iloc[-1]
self.Debug(f"Short Entry at {result_data['close'].iloc[-1]}")
elif signal == 1 and self.current_trade == -1:
self.Liquidate(self.symbol)
self.SetHoldings(self.symbol, 0.95 * self.leverage)
self.current_trade = 1
self.stop_loss = result_data['close'].iloc[-1] - 2 * result_data['atr1'].iloc[-1]
self.Debug(f"Short to Long at {result_data['close'].iloc[-1]}")
elif signal == 2 and self.current_trade == -1:
self.Liquidate(self.symbol)
self.SetHoldings(self.symbol, 0.95 * self.leverage)
self.current_trade = 1
self.stop_loss = result_data['close'].iloc[-1] - 2 * result_data['atr1'].iloc[-1]
self.Debug(f"Type 2 Reverse at {result_data['close'].iloc[-1]}")
# Stop loss check
if self.current_trade == 1 and result_data['close'].iloc[-1] < self.stop_loss:
self.Liquidate(self.symbol)
self.current_trade = 0
self.stop_loss = 0
self.Debug(f"Stop Loss Exit Long at {result_data['close'].iloc[-1]}")
elif self.current_trade == -1 and result_data['close'].iloc[-1] > self.stop_loss:
self.Liquidate(self.symbol)
self.current_trade = 0
self.stop_loss = 0
self.Debug(f"Stop Loss Exit Short at {result_data['close'].iloc[-1]}")
# Trailing stop
# if self.current_trade == 1 and self.stop_loss != 0:
# self.stop_loss = max(self.stop_loss, result_data['close'].iloc[-1] - 2 * result_data['atr1'].iloc[-1])
# elif self.current_trade == -1 and self.stop_loss != 0:
# self.stop_loss = min(self.stop_loss, result_data['close'].iloc[-1] + 2 * result_data['atr1'].iloc[-1])
def process_data(self, data, data2):
# RSI Calculation
def RSI(data, window):
delta = data['close'].diff(1)
gain = (delta.where(delta > 0, 0)).rolling(window=window, min_periods=1).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=window, min_periods=1).mean()
rs = gain / loss
return 100 - (100 / (1 + rs.replace([np.inf, -np.inf], np.nan).fillna(0)))
data['RSI'] = RSI(data, window=14)
data2['DRSI'] = RSI(data2, window=14)
data['5_EMA'] = data['close'].ewm(span=5, adjust=False).mean()
data['9_EMA'] = data['close'].ewm(span=9, adjust=False).mean()
data['60_SMA'] = data['close'].rolling(window=60, min_periods=1).mean()
data['5_SMA'] = data['close'].rolling(window=5, min_periods=1).mean()
# Manual ATR Calculation (replacing pandas_ta.atr)
def calculate_atr(high, low, close, window):
tr = pd.concat([
high - low,
abs(high - close.shift(1)),
abs(low - close.shift(1))
], axis=1).max(axis=1)
return tr.rolling(window=window, min_periods=1).mean()
data['atr1'] = calculate_atr(data['high'], data['low'], data['close'], 10)
data2['atr2'] = calculate_atr(data2['high'], data2['low'], data2['close'], 10)
data['atr_ma'] = data['atr1'].rolling(window=10, min_periods=1).mean()
data['vol_factor'] = (data['atr1'] / data['atr_ma']).fillna(1)
data['atr'] = calculate_atr(np.log(data['high']), np.log(data['low']), np.log(data['close']), 16)
data['norm_range'] = (np.log(data['high']) - np.log(data['low'])) / data['atr']
data['v_hawk'] = self.hawkes_process(data['norm_range'], 0.1)
data['q05'] = data['v_hawk'].rolling(112, min_periods=1).quantile(0.05)
data['q95'] = data['v_hawk'].rolling(115, min_periods=1).quantile(0.95)
data['q70'] = data['v_hawk'].rolling(115, min_periods=1).quantile(0.7)
data['last_6_high'] = data['high'].rolling(window=6, min_periods=1).max()
data['last_6_low'] = data['low'].rolling(window=6, min_periods=1).min()
data['lookback'] = (22 * (1 + data['vol_factor'])).clip(upper=30).round().astype(int)
data['rsi_upper'] = 90 + 5 * (data['v_hawk'] / data['q95'].fillna(1))
data['rsi_lower'] = 60 - 5 * (data['v_hawk'] / data['q95'].fillna(1))
data['fast_ema_period'] = (5 + 5 * data['vol_factor']).round().astype(int)
data['slow_ema_period'] = (9 + 9 * data['vol_factor']).round().astype(int)
for i in range(len(data)):
if i >= 210:
fast_period = min(5, data['fast_ema_period'].iloc[i])
slow_period = min(9, data['slow_ema_period'].iloc[i])
data.loc[i, 'fast_ema'] = data['close'].iloc[i-fast_period:i+1].ewm(span=fast_period).mean().iloc[-1]
data.loc[i, 'slow_ema'] = data['close'].iloc[i-slow_period:i+1].ewm(span=slow_period).mean().iloc[-1]
return data, data2
def hawkes_process(self, data_series, kappa):
alpha = np.exp(-kappa)
output = np.zeros(len(data_series))
output[:] = np.nan
for i in range(1, len(data_series)):
if np.isnan(output[i - 1]):
output[i] = data_series.iloc[i]
else:
output[i] = output[i - 1] * alpha + data_series.iloc[i]
return pd.Series(output, index=data_series.index) * kappa
def strat(self, data, data2):
data['signals'] = 0
data['trade_type'] = ''
if len(data) == 0 or len(data2) == 0:
return data
k = next((i for i, x in enumerate(data['close']) if abs(x - data2['close'].iloc[0]) < 0.01), 0) # Approximate match
def RSI_Divergence(look_back, rolling_window):
start_idx = max(0, look_back - rolling_window)
highest_high = 0
rsi_high = 0
highest_high_2 = 0
rsi_high_2 = 0
for j in range(look_back-1, start_idx, -1):
if j < len(data2):
if data2.iloc[j]['high'] > highest_high:
highest_high_2 = highest_high
highest_high = data2.iloc[j]['high']
rsi_high_2 = rsi_high
rsi_high = data2.iloc[j]['DRSI']
return highest_high, rsi_high, highest_high_2, rsi_high_2
def RSI_Divergence1(look_back, rolling_window):
start_idx = max(0, look_back - rolling_window)
highest_low = float('inf')
rsi_low = 100
highest_low2 = float('inf')
rsi_low2 = 100
for j in range(look_back-1, start_idx, -1):
if j < len(data):
if data.iloc[j]['low'] < highest_low:
highest_low2 = highest_low
highest_low = data.iloc[j]['low']
rsi_low2 = rsi_low
rsi_low = data.iloc[j]['RSI']
return highest_low, rsi_low, highest_low2, rsi_low2
def RSI_Divergence2(look_back, rolling_window):
start_idx = max(0, look_back - rolling_window)
highest_high1 = 0
rsi_high1 = 0
highest_high1_2 = 0
rsi_high1_2 = 0
for j in range(look_back-1, start_idx, -1):
if j < len(data):
if data.iloc[j]['high'] > highest_high1:
highest_high1_2 = highest_high1
highest_high1 = data.iloc[j]['high']
rsi_high1_2 = rsi_high1
rsi_high1 = data.iloc[j]['RSI']
return highest_high1, rsi_high1, highest_high1_2, rsi_high1_2
curr_sig = 0
for i in range(len(data)):
if i < 210:
continue
p = int((i - k) / 6)
if p >= len(data2):
p = len(data2) - 1 if len(data2) > 0 else 0
rolling_window = 30
last_high, rsi_high, last_high_2, rsi_high_2 = RSI_Divergence(p, rolling_window)
last_high1, rsi_high1, last_high1_2, rsi_high1_2 = RSI_Divergence2(i, rolling_window)
last_low, rsi_low, last_low2, rsi_low2 = RSI_Divergence1(i, rolling_window)
atr_multiplier = 2
rsi_lower_bound = 20
rsi_upper_bound = 70
rsi_daily_upper = 50
last_high_low_lookback = int(6)
er_threshold = 0.1
er_threshold1 = 0.05
if data['v_hawk'].iloc[i] < data['q70'].iloc[i]:
self.last_below = i
continue
if ((data['v_hawk'].iloc[i] > data['q95'].iloc[i]) and (data['v_hawk'].iloc[i - 2] <= data['q95'].iloc[i - 2])):
change = data['close'].iloc[i] - data['close'].iloc[self.last_below]
if curr_sig == 0 and data['fast_ema'].iloc[i] < data['close'].iloc[i] and change > 0:
data.loc[i, 'signals'] = 1
curr_sig = 1
data.loc[i, 'trade_type'] = 'long'
self.stop_loss = data['close'].iloc[i] - atr_multiplier * data['atr1'].iloc[i]
data.loc[i, 'leverage'] = self.leverage
elif curr_sig == -1 and data['fast_ema'].iloc[i] < data['close'].iloc[i] and change > 0:
data.loc[i, 'signals'] = 2
curr_sig = 1
data.loc[i, 'trade_type'] = 'reverse_short'
self.stop_loss = data['close'].iloc[i] - atr_multiplier * data['atr1'].iloc[i]
data.loc[i, 'leverage'] = self.leverage
elif curr_sig == 1 and (data['close'].iloc[i] < data['60_SMA'].iloc[i] or data['RSI'].iloc[i] > data['rsi_upper'].iloc[i] or data['RSI'].iloc[i] < data['rsi_lower'].iloc[i]):
data.loc[i, 'signals'] = -1
curr_sig = 0
data.loc[i, 'trade_type'] = 'close'
self.stop_loss = 0
elif curr_sig == 0 and (i - k) % 6 == 0 and p < len(data2) and data2.iloc[p]['high'] > last_high and data2.iloc[p]['DRSI'] < rsi_high and data2.iloc[p]['open'] > data2.iloc[p]['close'] and data2.iloc[p]['DRSI'] > rsi_daily_upper:
data.loc[i, 'signals'] = -1
curr_sig = -1
data.loc[i, 'trade_type'] = 'short'
self.stop_loss = data2.iloc[p]['high']
data.loc[i, 'leverage'] = self.leverage
elif curr_sig == 0 and data.iloc[i]['high'] > last_high1 and data.iloc[i]['RSI'] < rsi_high1 and data.iloc[i]['open'] > data.iloc[i]['close'] and data.iloc[i]['RSI'] > rsi_upper_bound:
data.loc[i, 'signals'] = -1
curr_sig = -1
data.loc[i, 'trade_type'] = 'short'
self.stop_loss = data['high'].rolling(window=last_high_low_lookback).max().iloc[i]
data.loc[i, 'leverage'] = self.leverage
elif curr_sig == 0 and data.iloc[i]['low'] < last_low and data.iloc[i]['RSI'] > rsi_low and data.iloc[i]['RSI'] < rsi_lower_bound and data.iloc[i]['open'] < data.iloc[i]['close']:
data.loc[i, 'signals'] = 1
curr_sig = 1
data.loc[i, 'trade_type'] = 'long'
self.stop_loss = data['low'].rolling(window=last_high_low_lookback).min().iloc[i]
data.loc[i, 'leverage'] = self.leverage
elif curr_sig == 0 and last_low < last_low2 and data.iloc[i]['low'] < last_low2 and data.iloc[i]['RSI'] > rsi_low2 and data.iloc[i]['RSI'] < rsi_lower_bound and data.iloc[i]['open'] < data.iloc[i]['close']:
data.loc[i, 'signals'] = 1
curr_sig = 1
data.loc[i, 'trade_type'] = 'long'
self.stop_loss = data['low'].rolling(window=last_high_low_lookback).min().iloc[i]
data.loc[i, 'leverage'] = self.leverage
elif curr_sig == 0 and data['close'].iloc[i] < data['slow_ema'].iloc[i] and data['close'].iloc[i] < data['open'].iloc[i] and data['low'].iloc[i-1] > data['slow_ema'].iloc[i-1] and data['close'].iloc[i-1] < data['open'].iloc[i-1]:
data.loc[i, 'signals'] = -1
curr_sig = -1
data.loc[i, 'trade_type'] = 'short'
self.stop_loss = data['high'].rolling(window=last_high_low_lookback).max().iloc[i]
data.loc[i, 'leverage'] = self.leverage
elif curr_sig == -1 and data['high'].iloc[i] > data['slow_ema'].iloc[i]:
data.loc[i, 'signals'] = 1
curr_sig = 0
data.loc[i, 'trade_type'] = 'close'
self.stop_loss = 0
elif curr_sig == -1 and data['close'].iloc[i] > self.stop_loss:
data.loc[i, 'signals'] = 1
curr_sig = 0
data.loc[i, 'trade_type'] = 'close'
self.stop_loss = 0
elif curr_sig == 1 and data['close'].iloc[i] < self.stop_loss:
data.loc[i, 'signals'] = -1
curr_sig = 0
data.loc[i, 'trade_type'] = 'close'
self.stop_loss = 0
elif curr_sig == 1 and self.stop_loss != 0:
self.stop_loss = max(self.stop_loss, data['close'].iloc[i] - atr_multiplier * data['atr1'].iloc[i])
# elif curr_sig == -1 and self.stop_loss != 0:
# self.stop_loss = min(self.stop_loss, data['close'].iloc[i] + atr_multiplier * data['atr1'].iloc[i])
else:
data.loc[i, 'signals'] = 0
return data
def OnEndOfDay(self):
pass