| Overall Statistics |
|
Total Orders 906 Average Win 1.11% Average Loss -0.94% Compounding Annual Return 23.400% Drawdown 22.600% Expectancy 0.109 Start Equity 1000000 End Equity 1513747.5 Net Profit 51.375% Sharpe Ratio 0.633 Sortino Ratio 0.483 Probabilistic Sharpe Ratio 38.688% Loss Rate 49% Win Rate 51% Profit-Loss Ratio 1.18 Alpha 0.031 Beta 0.733 Annual Standard Deviation 0.199 Annual Variance 0.04 Information Ratio -0.019 Tracking Error 0.186 Treynor Ratio 0.172 Total Fees $26982.50 Estimated Strategy Capacity $120000000.00 Lowest Capacity Asset NQ YOGVNNAOI1OH Portfolio Turnover 403.23% |
# region imports
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.feature_selection import f_regression
# endregion
class VolumeProfileAlgorithm(QCAlgorithm):
def initialize(self):
self.set_start_date(2023, 1, 1)
self.set_end_date(2024, 12, 21)
self.set_cash(1000000)
# Set the symbol of the asset we want to trade
future = self.add_future(
Futures.Indices.NASDAQ_100_E_MINI, Resolution.MINUTE
)
future.set_filter(timedelta(0), timedelta(182))
self.future_symbol = future.symbol
self.futures_contract = None
self.contract_count = 0
# Volume Profile indicator settings
self.profile_period = 120 # 2 hours
self.value_area_percentage = 0.4
self.volume_profile = VolumeProfile(
"Volume Profile", self.profile_period, self.value_area_percentage
)
# Rolling window to store past prices
self.past_prices_period = 20
self.past_prices = RollingWindow[TradeBar](self.past_prices_period)
# Consolidate data
self.consolidate(
self.future_symbol, timedelta(minutes=1), self.on_data_consolidated
)
self.register_indicator(
self.future_symbol, self.volume_profile, timedelta(hours=2)
)
# Setting stoploss
self.stop_loss_len = 100
self.stop_loss_indicator = self.min(
self.future_symbol, self.stop_loss_len, Resolution.MINUTE
)
self.stop_loss_price = 0
# Warm up using historical method
history = self.history[TradeBar](self.future_symbol, timedelta(days=1), Resolution.MINUTE)
for trade_bar in history:
self.volume_profile.update(trade_bar)
self.stop_loss_indicator.update(trade_bar.end_time, trade_bar.close)
self.past_prices.add(trade_bar)
self.log("Finished warming up indicator")
# Free portfolio setting
self.settings.free_portfolio_value = 0.3
def on_data_consolidated(self, data: Slice):
# Store the past prices of the future contract
self.past_prices.add(data)
def on_data(self, data: Slice):
# Check if the strategy warm up period is over and indicators are ready
if self.is_warming_up or not self.volume_profile.is_ready or not self.past_prices.is_ready or not self.stop_loss_indicator.is_ready:
# self.log(
# f"Warming up: {self.is_warming_up}, Volume Profile Ready: {self.volume_profile.is_ready}, Past Prices Ready: {self.past_prices.is_ready}")
return
current_price = self.past_prices[0].close
# Verify entry criteria to invest
if not self.portfolio.invested:
# self.log("Not invested! Finding futures contract...")
# Find the future contract with the max open interest above 1000
# This for-loop works because we're only checking one futures security
for chain in data.future_chains:
popular_contracts = [
contract for contract in chain.value if contract.open_interest > 1000
]
if len(popular_contracts) == 0:
continue
self.futures_contract = max(
popular_contracts, key=lambda k: k.open_interest)
# Check if price is moving towards the value area based on the direction of the slope
# and the volume profile
past_prices = [x.close for x in self.past_prices if x is not None]
slope = self.compute_slope(past_prices)
# Find the attack angle should be greater than -45 degrees
angle = np.arctan(slope) * 180 / np.pi
# # Log the indicators and price
# self.log(f"Current Price: {current_price} and Angle: {angle}")
# self.log(f"Value Area High: {self.volume_profile.value_area_high}")
# self.log(f"Value Area Low: {self.volume_profile.value_area_low}")
if (self.volume_profile.value_area_low < current_price < self.volume_profile.value_area_high):
# Only enter if the attack angle is greater than -90 and less than -60
if angle > -90 and angle < -60:
self.log(
"Price is moving towards the value area! Invest!")
self.set_holdings(self.futures_contract.symbol, .2)
self.stop_loss_price = self.volume_profile.value_area_low
self.log(f"Current price: {current_price}, Angle: {angle}")
self.log(f"Value Area High: {self.volume_profile.value_area_high}")
self.log(f"Value Area Low: {self.volume_profile.value_area_low}")
# Exit or update exit stop loss price
else:
# Exit check
if current_price <= self.stop_loss_price:
self.log(f"Stop loss at {current_price}")
self.liquidate(self.futures_contract.symbol)
# Check if you should update stop loss price
elif self.past_prices[0].close > self.past_prices[1].close:
self.stop_loss_price = self.stop_loss_price + \
(self.past_prices[0].close - self.past_prices[1].close)
self.log(
f"Updating stop loss order of {self.stop_loss_price}!")
# Plotting the data
# self.plot("VolumeProfile","vp", self.volume_profile.current.value)
# self.plot("VolumeProfile","profile_high", self.volume_profile.profile_high)
# self.plot("VolumeProfile","profile_low", self.volume_profile.profile_low)
# self.plot("VolumeProfile","poc_price", self.volume_profile.poc_price)
# self.plot("VolumeProfile","poc_volume", self.volume_profile.poc_volume)
# self.plot("VolumeProfile","value_area_volume", self.volume_profile.value_area_volume)
# self.plot("VolumeProfile","value_area_high", self.volume_profile.value_area_high)
# self.plot("VolumeProfile","value_area_low", self.volume_profile.value_area_low)
# self.plot("VolumeProfile","current_price", self.past_prices[0].close)
def compute_slope(self, prices: list) -> float:
# Convert list to numpy array and reshape to 2D for sklearn
prices_array = np.array(prices).reshape(-1, 1)
# Create an array of indices representing time
times = np.array(range(len(prices))).reshape(-1, 1)
# Fit a linear regression model
model = LinearRegression().fit(times, prices_array)
# Find the p-value of the slope
p_value = f_regression(times, prices_array)[1]
# Return the slope only if the p value is less than 0.05
if p_value < 0.05:
return model.coef_[0][0]
else:
return 0# region imports
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
from sklearn.linear_model import LinearRegression
# endregion
class VolumeProfileAlgorithm(QCAlgorithm):
def initialize(self):
self.set_start_date(2023, 1, 1)
self.set_end_date(2024, 8, 1)
self.set_cash(100000)
# Set the symbol of the asset we want to trade
future = self.add_future(
Futures.Indices.NASDAQ_100_E_MINI, Resolution.MINUTE
)
future.set_filter(timedelta(0), timedelta(182))
self.future_symbol = future.symbol
self.futures_contract = None
self.contract_count = 0
# Volume Profile indicator settings
self.profile_period = 120 # 2 hours
self.value_area_percentage = 0.4
self.volume_profile = VolumeProfile(
"Volume Profile", self.profile_period, self.value_area_percentage
)
# Rolling window to store past prices
self.past_prices_period = 20
self.past_prices = RollingWindow[TradeBar](self.past_prices_period)
# Long or short position
self.is_long = True
# Consolidate data
self.consolidate(
self.future_symbol, timedelta(minutes=1), self.on_data_consolidated
)
self.register_indicator(
self.future_symbol, self.volume_profile, timedelta(hours=2)
)
# Setting stoploss
self.stop_loss_len = 100
self.stop_loss_indicator = self.min(
self.future_symbol, self.stop_loss_len, Resolution.MINUTE
)
self.stop_loss_price = 0
# Warm up period
self.set_warm_up(timedelta(days=2))
# Free portfolio setting
self.settings.free_portfolio_value = 0.3
def on_data_consolidated(self, data: Slice):
# Store the past prices of the future contract
self.past_prices.add(data)
def on_data(self, data: Slice):
# Check if the strategy warm up period is over and indicators are ready
if self.is_warming_up or not self.volume_profile.is_ready or not self.past_prices.is_ready or not self.stop_loss_indicator.is_ready:
# self.log(
# f"Warming up: {self.is_warming_up}, Volume Profile Ready: {self.volume_profile.is_ready}, Past Prices Ready: {self.past_prices.is_ready}")
return
current_price = self.past_prices[0].close
# Verify entry criteria to invest
if not self.portfolio.invested:
self.log("Not invested! Finding futures contract...")
# Find the future contract with the max open interest above 1000
# This for-loop works because we're only checking one futures security
for chain in data.future_chains:
popular_contracts = [
contract for contract in chain.value if contract.open_interest > 1000
]
if len(popular_contracts) == 0:
continue
self.futures_contract = max(
popular_contracts, key=lambda k: k.open_interest)
# Check if price is moving towards the value area based on the direction of the slope
# and the volume profile
past_prices = [x.close for x in self.past_prices if x is not None]
slope = self.compute_slope(past_prices)
# Log the indicators and price
self.log(f"Current Price: {current_price} and Slope: {slope}")
self.log(f"Value Area High: {self.volume_profile.value_area_high}")
self.log(f"Value Area Low: {self.volume_profile.value_area_low}")
if (self.volume_profile.value_area_low <= current_price <= self.volume_profile.value_area_high):
# Long condition
if slope < -0.5:
self.log(
"Price is moving towards the value area! Invest!")
self.set_holdings(self.futures_contract.symbol, 1)
self.stop_loss_price = self.stop_loss_indicator.current.value
self.log(
f"Current price: {current_price}, stop order price: {self.stop_loss_price}")
else:
self.log("Price isn't in value area, keep waiting...")
# Exit or update exit stop loss price
else:
# Exit check
if current_price < self.stop_loss_price:
self.log(f"Stop loss at {current_price}")
self.liquidate(self.futures_contract.symbol)
# Check if you should update stop loss price
elif self.past_prices[0].close > self.past_prices[1].close:
self.stop_loss_price = self.stop_loss_price + \
(self.past_prices[0].close - self.past_prices[1].close)
self.log(
f"Updating stop loss order of {self.stop_loss_price}!")
# Plotting the data
# self.plot("VolumeProfile","vp", self.volume_profile.current.value)
# self.plot("VolumeProfile","profile_high", self.volume_profile.profile_high)
# self.plot("VolumeProfile","profile_low", self.volume_profile.profile_low)
# self.plot("VolumeProfile","poc_price", self.volume_profile.poc_price)
# self.plot("VolumeProfile","poc_volume", self.volume_profile.poc_volume)
# self.plot("VolumeProfile","value_area_volume", self.volume_profile.value_area_volume)
# self.plot("VolumeProfile","value_area_high", self.volume_profile.value_area_high)
# self.plot("VolumeProfile","value_area_low", self.volume_profile.value_area_low)
# self.plot("VolumeProfile","current_price", self.past_prices[0].close)
def compute_slope(self, prices: list) -> float:
# Convert list to numpy array and reshape to 2D for sklearn
prices_array = np.array(prices).reshape(-1, 1)
# Create an array of indices representing time
times = np.array(range(len(prices))).reshape(-1, 1)
# Fit a linear regression model
model = LinearRegression().fit(times, prices_array)
# Return the slope of the regression line
return model.coef_[0][0]
# region imports
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
from sklearn.linear_model import LinearRegression
# endregion
class VolumeProfileAlgorithm(QCAlgorithm):
def initialize(self):
self.set_start_date(2020, 1, 1)
self.set_end_date(2024, 6, 1)
self.set_cash(100000)
# Set the symbol of the asset we want to trade
future = self.add_future(
Futures.Indices.NASDAQ_100_E_MINI,
Resolution.MINUTE,
data_normalization_mode = DataNormalizationMode.RAW
)
future.set_filter(timedelta(0), timedelta(182))
self.future_symbol = future.symbol
self.futures_contract = None
self.contract_count = 0
# Volume Profile indicator settings
self.profile_period = 234 * 5 # So 5 min would be 234 5 min period
self.value_area_percentage = 0.4
self.volume_profile = VolumeProfile(
"Volume Profile", self.profile_period, self.value_area_percentage
)
# Rolling window to store past prices
self.past_prices_period = 5
self.past_prices = RollingWindow[TradeBar](self.past_prices_period)
# Long or short position
self.is_long = True
# Consolidate data
self.consolidate(
self.future_symbol, timedelta(minutes=5), self.on_data_consolidated
)
self.register_indicator(
self.future_symbol, self.volume_profile, timedelta(hours=2)
)
# Warm up period
self.set_warm_up(timedelta(days=2))
# Free portfolio setting
self.settings.free_portfolio_value = 0.3
def on_data_consolidated(self, data: Slice):
# Store the past prices of the future contract
self.past_prices.add(data)
# Check if the strategy warm up period is over and indicators are ready
if self.is_warming_up or not self.volume_profile.is_ready or not self.past_prices.is_ready:
# self.log(
# f"Warming up: {self.is_warming_up}, Volume Profile Ready: {self.volume_profile.is_ready}, Past Prices Ready: {self.past_prices.is_ready}")
return
current_price = self.past_prices[0].close
# Verify entry criteria to invest
if not self.portfolio.invested:
self.log("Not invested! Finding futures contract...")
# Find the future contract with the max open interest above 1000
# This for-loop only iterates ONCE because
# it's only checking one futures security...I think
for chain in self.future_chains:
popular_contracts = [
contract for contract in chain.value if contract.open_interest > 1000
]
if len(popular_contracts) == 0:
continue
self.futures_contract = max(
popular_contracts, key=lambda k: k.open_interest)
# Check if price is moving towards the value area based on the direction of the slope
# and the volume profile
past_prices = [x.close for x in self.past_prices if x is not None]
slope = self.compute_slope(past_prices)
# # Log the indicators and price
# self.log(f"Current Price: {current_price} and Slope: {slope}")
# self.log(f"Value Area High: {self.volume_profile.value_area_high}")
# self.log(f"Value Area Low: {self.volume_profile.value_area_low}")
# Find the range for low volume node
top_range = self.volume_profile.profile_high - self.volume_profile.value_area_high
bottom_range = self.volume_profile.value_area_low - self.volume_profile.profile_low
heavy_top = top_range < bottom_range
# Check if price is inside value_area_high <-> volume_profile_high
if heavy_top and self.volume_profile.value_area_low >= current_price >= (self.volume_profile.profile_low + bottom_range * .5):
# If price is inside check if the price is decline rapidly
if slope < -1.5:
# Entry
self.set_holdings(self.futures_contract.symbol, -1)
# Set the stoploss
self.stop_loss = self.volume_profile.poc_price
# Set the take profit
self.take_profit = (self.volume_profile.profile_low + bottom_range * .5)
# Exit or update exit stop loss price
else:
# Exit check
if current_price >= self.stop_loss:
self.liquidate(self.futures_contract.symbol,tag = 'Stop Loss')
elif current_price <= self.take_profit:
self.liquidate(self.futures_contract.symbol,tag = 'Take Profit')
# Trailing
# elif self.past_prices[0].close > self.past_prices[1].close:
# self.stop_loss_price = self.stop_loss_price + \
# (self.past_prices[0].close - self.past_prices[1].close)
# self.log(
# f"Updating stop loss order of {self.stop_loss_price}!")
def on_data(self, data: Slice):
self.future_chains = data.future_chains
def compute_slope(self, prices: list) -> float:
# Convert list to numpy array and reshape to 2D for sklearn
prices_array = np.array(prices).reshape(-1, 1)
# Create an array of indices representing time
times = np.array(range(len(prices))).reshape(-1, 1)
# Fit a linear regression model
model = LinearRegression().fit(times, prices_array)
# Return the slope of the regression line
return model.coef_[0][0]