| Overall Statistics |
|
Total Orders 2209 Average Win 0.32% Average Loss -0.19% Compounding Annual Return 307.518% Drawdown 61.900% Expectancy 0.604 Start Equity 50000 End Equity 161648.72 Net Profit 223.297% Sharpe Ratio 3.165 Sortino Ratio 3.708 Probabilistic Sharpe Ratio 71.689% Loss Rate 41% Win Rate 59% Profit-Loss Ratio 1.70 Alpha 2.865 Beta 1.499 Annual Standard Deviation 0.93 Annual Variance 0.865 Information Ratio 3.155 Tracking Error 0.916 Treynor Ratio 1.963 Total Fees $1256.28 Estimated Strategy Capacity $13000000.00 Lowest Capacity Asset MNQ YEBKSYL2454X Portfolio Turnover 153.00% |
from AlgorithmImports import *
class DarStkBear:
def __init__(self, algorithm, symbol, execute_info):
self.algorithm = algorithm
self.symbol = symbol
self.execute_info = execute_info
self.swing_high = None
self.low_TPIH = None
self.last_low = None
self.aom = None
self.volume_imbalance = None
def check_for_bearish_setup(self, window_5min, window_1min):
self.algorithm.Debug("Executing check_for_bearish_setup")
if not isinstance(window_5min, (list, RollingWindow)) or not isinstance(window_1min, (list, RollingWindow)):
self.algorithm.Debug("window_5min and window_1min should be collections of TradeBar objects.")
return
if len(window_5min) < 3:
self.algorithm.Debug(f"window_5min length: {len(window_5min)} is less than 3")
return
if len(window_1min) < 15:
self.algorithm.Debug(f"window_1min length: {len(window_1min)} is less than 15")
return
if not self.checkSwingHigh(window_5min):
self.algorithm.Debug("No swing high found")
return
if not self.checkCloseAbove(window_5min):
self.algorithm.Debug("No close above the swing high")
return
if not self.checkCloseBelow(window_5min):
self.algorithm.Debug("No close below the low that formed the swing high")
return
self.calcAOM()
if not self.checkVI(window_1min):
self.algorithm.Debug("No volume imbalance found")
return
self.DarStkBearEntry()
def checkSwingHigh(self, window_5min):
self.algorithm.Debug("Executing checkSwingHigh")
if len(window_5min) < 3:
return False
bar1 = window_5min[-3]
bar2 = window_5min[-2]
bar3 = window_5min[-1]
self.algorithm.Debug(f"Bar1: {bar1}, Bar2: {bar2}, Bar3: {bar3}")
if bar2.High > bar1.High and bar2.High > bar3.High:
self.swing_high = bar2.High
self.low_TPIH = min(bar1.Low, bar3.Low)
self.storeSwingHigh(bar2)
self.algorithm.Debug(f"Swing high set at {self.swing_high}, Low TPIH set at {self.low_TPIH}")
return True
return False
def storeSwingHigh(self, bar):
self.swing_high = bar.High
self.swing_high_time = bar.Time
self.algorithm.Debug(f"Stored swing high at {self.swing_high} with time {self.swing_high_time}")
def checkCloseAbove(self, window_5min):
self.algorithm.Debug("Executing checkCloseAbove")
for bar in window_5min:
if bar.Close > self.swing_high:
self.algorithm.Debug(f"Close above swing high found at {bar.Close}")
return True
return False
def checkCloseBelow(self, window_5min):
self.algorithm.Debug("Executing checkCloseBelow")
for bar in window_5min:
if bar.Close < self.low_TPIH:
self.algorithm.Debug(f"Close below low TPIH found at {bar.Close}")
return True
return False
def calcAOM(self):
self.algorithm.Debug("Executing calcAOM")
if not self.swing_high or not self.low_TPIH:
return
fib_range_high = self.swing_high
fib_range_low = self.low_TPIH - (self.swing_high - self.low_TPIH) * 0.618
self.aom = (fib_range_low, fib_range_high)
self.algorithm.Debug(f"AOM set at {self.aom}")
def checkVI(self, window_1min):
self.algorithm.Debug("Executing checkVI")
if not self.aom:
return False
for bar in window_1min[-15:]:
if self.aom[0] <= bar.Low <= self.aom[1] and self.aom[0] <= bar.High <= self.aom[1]:
self.algorithm.Debug(f"Volume imbalance found at {bar.High}")
self.volume_imbalance = bar.High
return True
return False
def DarStkBearEntry(self):
self.algorithm.Debug("Executing DarStkBearEntry")
if self.volume_imbalance is None:
self.algorithm.Debug("Volume imbalance not set")
return
entry_price = self.volume_imbalance
stop_loss_price = self.swing_high + (self.swing_high - self.low_TPIH) * 0.5
take_profit_price = self.low_TPIH - (self.swing_high - self.low_TPIH) * 4
current_contract = self.algorithm.Securities[self.symbol].Symbol
mapped_contract = self.algorithm.Securities[current_contract].Mapped
self.execute_info.place_order(mapped_contract, "sell", entry_price, stop_loss_price, take_profit_price)
self.algorithm.Debug(f"Placed sell order at {entry_price} with stop loss at {stop_loss_price} and take profit at {take_profit_price} for {mapped_contract}")from AlgorithmImports import *
class DarStkBull:
def __init__(self, algorithm, symbol, execute_info):
self.algorithm = algorithm
self.symbol = symbol
self.execute_info = execute_info
self.swing_low = None
self.high_TPIL = None
self.last_high = None
self.aom = None
self.volume_imbalance = None
def check_for_bullish_setup(self, window_5min, window_1min):
self.algorithm.Debug("Executing check_for_bullish_setup")
if not isinstance(window_5min, (list, RollingWindow)) or not isinstance(window_1min, (list, RollingWindow)):
self.algorithm.Debug("window_5min and window_1min should be collections of TradeBar objects.")
return
if len(window_5min) < 3:
self.algorithm.Debug(f"window_5min length: {len(window_5min)} is less than 3")
return
if len(window_1min) < 15:
self.algorithm.Debug(f"window_1min length: {len(window_1min)} is less than 15")
return
if not self.checkSwingLow(window_5min):
self.algorithm.Debug("No swing low found")
return
if not self.checkCloseBelow(window_5min):
self.algorithm.Debug("No close below the swing low")
return
if not self.checkCloseAbove(window_5min):
self.algorithm.Debug("No close above the high that formed the swing low")
return
self.calcAOM()
if not self.checkVI(window_1min):
self.algorithm.Debug("No volume imbalance found")
return
self.DarStkBullEntry()
def checkSwingLow(self, window_5min):
self.algorithm.Debug("Executing checkSwingLow")
if len(window_5min) < 3:
return False
bar1 = window_5min[-3]
bar2 = window_5min[-2]
bar3 = window_5min[-1]
self.algorithm.Debug(f"Bar1: {bar1}, Bar2: {bar2}, Bar3: {bar3}")
if bar2.Low < bar1.Low and bar2.Low < bar3.Low:
self.swing_low = bar2.Low
self.high_TPIL = max(bar1.High, bar3.High)
self.storeSwingLow(bar2)
self.algorithm.Debug(f"Swing low set at {self.swing_low}, High TPIL set at {self.high_TPIL}")
return True
return False
def storeSwingLow(self, bar):
self.swing_low = bar.Low
self.swing_low_time = bar.Time
self.algorithm.Debug(f"Stored swing low at {self.swing_low} with time {self.swing_low_time}")
def checkCloseBelow(self, window_5min):
self.algorithm.Debug("Executing checkCloseBelow")
for bar in window_5min:
if bar.Close < self.swing_low:
self.algorithm.Debug(f"Close below swing low found at {bar.Close}")
return True
return False
def checkCloseAbove(self, window_5min):
self.algorithm.Debug("Executing checkCloseAbove")
for bar in window_5min:
if bar.Close > self.high_TPIL:
self.algorithm.Debug(f"Close above high TPIL found at {bar.Close}")
return True
return False
def calcAOM(self):
self.algorithm.Debug("Executing calcAOM")
if not self.swing_low or not self.high_TPIL:
return
fib_range_low = self.swing_low
fib_range_high = self.high_TPIL + (self.high_TPIL - self.swing_low) * 0.618
self.aom = (fib_range_low, fib_range_high)
self.algorithm.Debug(f"AOM set at {self.aom}")
def checkVI(self, window_1min):
self.algorithm.Debug("Executing checkVI")
if not self.aom:
return False
for bar in window_1min[-15:]:
if self.aom[0] <= bar.Low <= self.aom[1] and self.aom[0] <= bar.High <= self.aom[1]:
self.algorithm.Debug(f"Volume imbalance found at {bar.Low}")
self.volume_imbalance = bar.Low
return True
return False
def DarStkBullEntry(self):
self.algorithm.Debug("Executing DarStkBullEntry")
if self.volume_imbalance is None:
self.algorithm.Debug("Volume imbalance not set")
return
entry_price = self.volume_imbalance
stop_loss_price = self.swing_low - (self.high_TPIL - self.swing_low) * 0.5
take_profit_price = self.high_TPIL + (self.high_TPIL - self.swing_low) * 4
current_contract = self.algorithm.Securities[self.symbol].Symbol
mapped_contract = self.algorithm.Securities[current_contract].Mapped
self.execute_info.place_order(mapped_contract, "buy", entry_price, stop_loss_price, take_profit_price)
self.algorithm.Debug(f"Placed buy order at {entry_price} with stop loss at {stop_loss_price} and take profit at {take_profit_price} for {mapped_contract}")from AlgorithmImports import *
class DetermineBias:
def __init__(self, algorithm, multi_time_frame_analysis):
self.algorithm = algorithm
self.multi_time_frame_analysis = multi_time_frame_analysis
self.current_structure = "neutral"
self.h1_data = None
self.m15_data = None
self.swing_highs = []
self.swing_lows = []
def update_data(self):
latest_bars = self.multi_time_frame_analysis.GetLatestBars()
if latest_bars:
self.h1_data = latest_bars.get("1hour")
self.m15_data = latest_bars.get("15min")
self.algorithm.Debug("Data updated in DetermineBias.")
else:
self.algorithm.Debug("Latest bars not available for updating data.")
def analyze_m15_swings(self):
if not self.m15_data or len(self.m15_data) < 3:
return
for i in range(2, len(self.m15_data)):
bar1 = self.m15_data[i - 2]
bar2 = self.m15_data[i - 1]
bar3 = self.m15_data[i]
if bar2.High > bar1.High and bar2.High > bar3.High:
self.swing_highs.append(bar2)
self.algorithm.Debug(f"Swing high added: {bar2.Time} High: {bar2.High}")
if bar2.Low < bar1.Low and bar2.Low < bar3.Low:
self.swing_lows.append(bar2)
self.algorithm.Debug(f"Swing low added: {bar2.Time} Low: {bar2.Low}")
def update_current_structure(self):
self.update_data()
if not self.m15_data or len(self.m15_data) < 3:
self.algorithm.Debug("Not enough data to update current structure.")
return self.current_structure
self.analyze_m15_swings()
if len(self.swing_highs) >= 3 and len(self.swing_lows) >= 3:
current_swing_high = self.swing_highs[-1]
current_swing_low = self.swing_lows[-1]
if self.current_structure == "bearish" and self.m15_data[-1].Close > current_swing_high.High:
self.current_structure = "potential_bullish_reversal"
self.algorithm.Debug("Potential bullish reversal detected.")
elif self.current_structure == "bullish" and self.m15_data[-1].Close < current_swing_low.Low:
self.current_structure = "potential_bearish_reversal"
self.algorithm.Debug("Potential bearish reversal detected.")
if current_swing_high.Time > current_swing_low.Time and self.current_structure != "bullish":
self.current_structure = "bullish"
self.algorithm.Debug("Market structure changed to bullish.")
elif current_swing_low.Time > current_swing_high.Time and self.current_structure != "bearish":
self.current_structure = "bearish"
self.algorithm.Debug("Market structure changed to bearish.")
# Ensure the structure is updated at least once to avoid infinite neutral state
if self.current_structure == "neutral" and len(self.m15_data) >= 15:
last_swing = self.m15_data[-1]
if last_swing.Close > self.m15_data[0].High:
self.current_structure = "bullish"
self.algorithm.Debug("Initial market structure set to bullish.")
elif last_swing.Close < self.m15_data[0].Low:
self.current_structure = "bearish"
self.algorithm.Debug("Initial market structure set to bearish.")
return self.current_structure
def determine_bias(self):
self.update_data()
return self.update_current_structure()from AlgorithmImports import *
class ExecuteInfo:
def __init__(self, algorithm):
self.algorithm = algorithm
# Risk parameters
self.position_size = 2 # Number of micro E-mini NQ contracts
self.stop_loss_handles = 10 # Stop loss in handles
self.take_profit_handles = 40 # Take profit in handles
# Derived risk parameters in dollars
self.tick_value = 0.5 # Tick value for micro E-mini NQ (1 tick = $0.50)
self.handles_to_ticks = 4 # 1 handle = 4 ticks for NQ
self.stop_loss_ticks = self.stop_loss_handles * self.handles_to_ticks
self.take_profit_ticks = self.take_profit_handles * self.handles_to_ticks
self.risk_per_contract = self.stop_loss_ticks * self.tick_value # Risk per contract
self.reward_per_contract = self.take_profit_ticks * self.tick_value # Reward per contract
# Calculate total risk and reward
self.total_risk = self.position_size * self.risk_per_contract # Total risk for position
self.total_reward = self.position_size * self.reward_per_contract # Total reward for position
def place_order(self, contract, direction, entry_price, stop_loss_price=None, take_profit_price=None):
if direction.lower() == "buy":
self.algorithm.MarketOrder(contract, 1)
self.algorithm.Debug(f"Buy order placed for {contract} at {entry_price}")
elif direction.lower() == "sell":
self.algorithm.MarketOrder(contract, -1)
self.algorithm.Debug(f"Sell order placed for {contract} at {entry_price}")
if stop_loss_price is not None:
self.algorithm.Debug(f"Stop loss set at {stop_loss_price}")
if take_profit_price is not None:
self.algorithm.Debug(f"Take profit set at {take_profit_price}")
def set_stop_loss(self, symbol, entry_price):
"""
Set a stop loss order.
"""
stop_loss_price = entry_price - self.stop_loss_handles if self.algorithm.Portfolio[symbol].Quantity > 0 else entry_price + self.stop_loss_handles
self.algorithm.StopMarketOrder(symbol, -self.position_size, stop_loss_price)
def set_take_profit(self, symbol, entry_price):
"""
Set a take profit order.
"""
take_profit_price = entry_price + self.take_profit_handles if self.algorithm.Portfolio[symbol].Quantity > 0 else entry_price - self.take_profit_handles
self.algorithm.LimitOrder(symbol, -self.position_size, take_profit_price)from AlgorithmImports import *
from multi_time_frame_analysis import MultiTimeFrameAnalysis
from execute_info import ExecuteInfo
from DarStkBull import DarStkBull
from DarStkBear import DarStkBear
from determine_bias import DetermineBias
class DarStk(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2023, 1, 1)
self.SetEndDate(2023, 11, 1)
self.SetCash(50000)
# Set base resolution to minute
self.UniverseSettings.Resolution = Resolution.Minute
# Adding the future security with base resolution
self.future = self.AddFuture(Futures.Indices.MICRO_NASDAQ_100_E_MINI)
self.future.SetFilter(0, 90)
self.future_symbol = self.future.Symbol
self.Debug("DarStk v1.2.1 initialized successfully")
self.SetTimeZone(TimeZones.NewYork)
self.SetWarmUp(timedelta(days=10))
# Initialize multi-time frame analysis
self.multi_time_frame_analysis = MultiTimeFrameAnalysis(self, self.future_symbol)
# Initialize execute info
self.execute_info = ExecuteInfo(self)
# Initialize DetermineBias
self.determine_bias = DetermineBias(self, self.multi_time_frame_analysis)
# Initialize DarStkBull and DarStkBear
self.dar_stk_bull = DarStkBull(self, self.future_symbol, self.execute_info)
self.dar_stk_bear = DarStkBear(self, self.future_symbol, self.execute_info)
# Define trading hours
self.trading_start_time = time(10, 0) # 10:00 AM
self.trading_end_time = time(15, 30) # 3:30 PM
def OnData(self, data):
self.Debug(f"OnData called at {self.Time}")
# Skip trading logic if still warming up
if self.IsWarmingUp:
self.Debug("Skipping OnData because the algorithm is still warming up.")
return
current_time = self.Time.time()
# Execute trading logic only during defined trading hours
if self.trading_start_time <= current_time <= self.trading_end_time:
latest_bars = self.multi_time_frame_analysis.GetLatestBars()
if latest_bars:
self.Debug("Executing trading logic.")
self.ExecuteTradingLogic(latest_bars)
else:
self.Debug("Skipping ExecuteTradingLogic because latest bars are not ready.")
def ExecuteTradingLogic(self, latest_bars):
# Determine market structure
market_structure = self.determine_bias.update_current_structure()
self.Debug(f"Determined market structure: {market_structure}")
# Execute trades based on market structure
if market_structure == "bullish":
self.dar_stk_bull.check_for_bullish_setup(latest_bars["5min"], latest_bars["1min"])
elif market_structure == "bearish":
self.dar_stk_bear.check_for_bearish_setup(latest_bars["5min"], latest_bars["1min"])
else:
self.Debug("Market structure is neutral; no trades will be executed.")
def OnWarmUpFinished(self):
self.Debug("Warm-up finished. Indicators and rolling windows are ready.")#region imports
from AlgorithmImports import *
#endregion
class MultiTimeFrameAnalysis:
def __init__(self, algorithm, symbol):
self.algorithm = algorithm # Store a reference to the algorithm instance
self.symbol = symbol # Store the symbol for which consolidators are set up
# Adding consolidators for different time frames
self.consolidator_1hour = TradeBarConsolidator(timedelta(hours=1))
self.consolidator_15min = TradeBarConsolidator(timedelta(minutes=15))
self.consolidator_5min = TradeBarConsolidator(timedelta(minutes=5))
self.consolidator_1min = TradeBarConsolidator(timedelta(minutes=1))
# Subscribing to the DataConsolidated events
self.consolidator_1hour.DataConsolidated += self.OnDataConsolidated_1hour
self.consolidator_15min.DataConsolidated += self.OnDataConsolidated_15min
self.consolidator_5min.DataConsolidated += self.OnDataConsolidated_5min
self.consolidator_1min.DataConsolidated += self.OnDataConsolidated_1min
# Adding consolidators to the SubscriptionManager
self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator_1hour)
self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator_15min)
self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator_5min)
self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator_1min)
# Initializing RollingWindows for each time frame
self.window_1hour = RollingWindow[TradeBar](1) # 1 bar (1 hour of 1-hour data)
self.window_15min = RollingWindow[TradeBar](4) # 4 bars (1 hour of 15-minute data)
self.window_5min = RollingWindow[TradeBar](12) # 12 bars (1 hour of 5-minute data)
self.window_1min = RollingWindow[TradeBar](60) # 60 bars (1 hour of 1-minute data)
self.algorithm.Debug("MultiTimeFrameAnalysis initialized successfully")
def OnDataConsolidated_1hour(self, sender, bar: TradeBar):
if isinstance(bar, TradeBar):
self.window_1hour.Add(bar)
def OnDataConsolidated_15min(self, sender, bar: TradeBar):
if isinstance(bar, TradeBar):
self.window_15min.Add(bar)
def OnDataConsolidated_5min(self, sender, bar: TradeBar):
if isinstance(bar, TradeBar):
self.window_5min.Add(bar)
def OnDataConsolidated_1min(self, sender, bar: TradeBar):
if isinstance(bar, TradeBar):
self.window_1min.Add(bar)
def IsReady(self):
"""
Check if all rolling windows have sufficient data.
"""
return self.window_1hour.IsReady and self.window_15min.IsReady and self.window_5min.IsReady and self.window_1min.IsReady
def GetLatestBars(self):
if self.IsReady():
return {
"1hour": list(self.window_1hour),
"15min": list(self.window_15min),
"5min": list(self.window_5min),
"1min": list(self.window_1min)
}
else:
self.algorithm.Debug("Rolling windows are not ready.")
return None # Ensure to handle the case when data is not ready