| Overall Statistics |
|
Total Trades 43 Average Win 9.09% Average Loss -0.47% Compounding Annual Return -20.398% Drawdown 14.900% Expectancy -0.028 Net Profit -9.044% Sharpe Ratio -1.423 Probabilistic Sharpe Ratio 2.164% Loss Rate 95% Win Rate 5% Profit-Loss Ratio 19.41 Alpha -0.131 Beta -0.058 Annual Standard Deviation 0.1 Annual Variance 0.01 Information Ratio -2.027 Tracking Error 0.166 Treynor Ratio 2.449 Total Fees $369.80 Estimated Strategy Capacity $1300000000.00 Lowest Capacity Asset NQ Y9CDFY0C6TXD Portfolio Turnover 26.89% |
# ---
# ---
# Region Imports
from AlgorithmImports import *
# ---
# ---
# Alpha model class
class alpha_1_month(AlphaModel):
def __init__(self, algorithm, ndx):
# ---
# Initialize various variables
self.level = None
self.ndx = {}
self.last_consolidation_time = None
self.minute = 0
self.x = 0
self.ndx_close = RollingWindow[float](2)
# Scheduler that calls for the end of the month to liquidate all positions
algorithm.Schedule.On(algorithm.DateRules.MonthEnd(ndx),
algorithm.TimeRules.BeforeMarketClose(ndx, 10),
Action(lambda: self.LiquidatePositions(algorithm)))
# ---
# Called on every data point
def Update(self, algorithm, data):
# ---
# Initialize some variables to store insights
insights = []
insight = None
# ---
# Function that's handling consolidation. Checking if certain time period has passed and we can continue
if self.last_consolidation_time is not None and self.minute != self.last_consolidation_time.minute:
self.minute = self.last_consolidation_time.minute # Update the minute value
# In case we need to use a rolling window, we update it with 15min Close price here
if data.ContainsKey("NDX"):
self.ndx_close.Add(data["NDX"].Close)
# ---
# Created the level and there is data for NDX index
if self.level is not None and self.ndx is not None:
# Plotting level and NDX on both (Main Chart and Breakout Price) charts for visual purposes
algorithm.Plot("Breakout Price", "Level", self.level)
algorithm.Plot("Main Chart", "Level", self.level)
algorithm.Plot("Main Chart", "NDX", data["NDX"].Close)
algorithm.Plot("Main Chart", "Futures", data["/NQ"].Close)
# ---
# Iterate through all the symbols that are in data (data is a dictionary)
for symbol in data.Keys:
# If symbol is "NDX" or "/NQ", skip the plotting
if str(symbol.Value) == "NDX" or str(symbol.Value) == "/NQ":
continue
# Plot data for future contracts only
algorithm.Plot("Breakout Price", str(symbol.Value) + " Price Close", data[symbol].Close)
# First check if we have a future selected as well as check if level has been set
if self.latest_contract is not None and self.level is not None:
# We are using a rolling window for ndx_close. Thats if we need to access the [2] value. Otherwise can use data["NDX"].Close
if self.ndx_close[1] < self.level and self.ndx_close[0] > self.level:
insight = Insight(self.latest_contract.Symbol, timedelta(minutes=20), InsightType.Price, InsightDirection.Up)
insight.SourceModel = "1 Month"
insights.append(insight)
algorithm.Plot("Main Chart", "Buy", data[symbol].Close)
if self.ndx_close[1] > self.level and self.ndx_close[0] < self.level:
insight = Insight(self.latest_contract.Symbol, timedelta(minutes=20), InsightType.Price, InsightDirection.Flat)
insight.SourceModel = "1 Month"
insights.append(insight)
algorithm.Plot("Main Chart", "Sell", data[symbol].Close)
return insights
# ---
# Setting the consolidation time at consolidater bars close
# Updates every period, so self.minute is equal to last_cons... every period
def consolidation_handler(self, sender, bar):
self.last_consolidation_time = bar.Time
# ---
# Function that's called every time new securities have been added to our universe of securities
def OnSecuritiesChanged(self, algorithm, changes):
# ---
# We want to create data consolidator for NDX Index only
for security in changes.AddedSecurities:
if security.Symbol == "NDX":
self.consolidator = TradeBarConsolidator(timedelta(minutes=15))
self.consolidator.DataConsolidated += self.consolidation_handler
algorithm.SubscriptionManager.AddConsolidator(security.Symbol, self.consolidator)
self.ndx["NDX"] = security # Here I make self.ndx to be NDX index
# ---
# Those two schedules have to be here so that we can send NDX index to set level! It has to be the newest contract.
algorithm.Schedule.On(algorithm.DateRules.MonthStart(security.Symbol),
algorithm.TimeRules.At(0, 0),
Action(lambda: self.SetLevel(algorithm, security.Symbol)))
# ---
# If its future, we add it to array and sort it so we have the latest selected
added_contracts = [x for x in changes.AddedSecurities if x.Symbol.SecurityType == SecurityType.Future]
if added_contracts:
self.latest_contract = sorted(added_contracts, key=lambda x: x.Expiry, reverse=True)[0]
# ---
# This function sets our breakout level
def SetLevel(self, algorithm, symbol):
try:
history = algorithm.History(symbol, 1, Resolution.Minute) # Get one row of the last history for the given symbol (NDX)
except:
history = None
if history is not None and not history.empty and str(symbol) in history.index:
self.level = history.loc[str(symbol)].open[0] # If it's not empty we set our level at the open price.
else:
self.level = None
# ---
# Should send an insight (not liquidate()) to liquidate or enter a flat position
def LiquidatePositions(self, algorithm):
# ---
# Initialize some variables to store insights
insights = []
insight = None
# We send insight here to close all remaining positions at the end of the month
insight = Insight(self.latest_contract.Symbol, timedelta(minutes=20), InsightType.Price, InsightDirection.Flat)
insights.append(insight)
return insights# ---
# Regional imports
from alpha_1_month import *
# Global imports
from AlgorithmImports import *
from datetime import datetime, timedelta
import threading
# ---
# Main class
class MyAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2023, 1, 1)
self.SetEndDate(2023, 6, 1)
self.SetCash(1000000)
self.ndx = self.AddIndex("NDX", Resolution.Minute).Symbol
self.qqq = self.AddFuture(Futures.Indices.NASDAQ100EMini, Resolution.Minute)
self.qqq.SetFilter(timedelta(0), timedelta(180))
# Track symbols in our universe
self.universe_symbols = [self.ndx]
# Set algorithm framework models
self.SetUniverseSelection(ManualUniverseSelectionModel(self.universe_symbols))
# Add Alpha model
self.AddAlpha(alpha_1_month(self, self.ndx))
self.InsightsGenerated += self.OnInsightsGenerated
self.SetExecution(ImmediateExecutionModel())
# Create a new Chart named "Buy Points"
buy_series = Chart("Main Chart")
# Create a new Series named "Asset Name"
buy_series.AddSeries(Series("Buy", SeriesType.Scatter, 0))
buy_series.AddSeries(Series("Sell", SeriesType.Scatter, 0))
buy_series.AddSeries(Series("Level", SeriesType.Line, 0))
buy_series.AddSeries(Series("Futures", SeriesType.Line, 0))
buy_series.AddSeries(Series("Take Profit", SeriesType.Scatter, 0))
# Add the Chart to the algorithm
self.AddChart(buy_series)
self.current_holdings = {}
self.desired_exposure_per_alpha = 1000000
def OnInsightsGenerated(self, algorithm: IAlgorithm, insights_collection: GeneratedInsightsCollection):
insights = insights_collection.Insights
for insight in insights:
symbol = insight.Symbol
multiplier = self.Securities[symbol].SymbolProperties.ContractMultiplier
price = self.Securities[symbol].Close
# Calculate the closest quantity of future contracts
quantity = int(self.desired_exposure_per_alpha / (price * multiplier))
#self.Debug(f"printing some stuff. Price -> {price}, Quantity -> {quantity}, Symbol -> {symbol}")
# Update current holdings
if insight.SourceModel not in self.current_holdings:
self.current_holdings[insight.SourceModel] = 0
if insight.Direction == InsightDirection.Up:
if insight.SourceModel in self.current_holdings and self.current_holdings[insight.SourceModel] == 0:
orderTicket = self.MarketOrder(symbol, quantity, tag="1 Month")
takeProfitPrice = price * 1.10
self.LimitOrder(symbol, -quantity, int(takeProfitPrice), "Take Profit")
algorithm.Plot("Main Chart", "Take Profit", takeProfitPrice)
else:
pass
if insight.Direction == InsightDirection.Flat:
if insight.SourceModel in self.current_holdings and self.current_holdings[insight.SourceModel] == quantity:
orderTicket = self.MarketOrder(symbol, -self.current_holdings[insight.SourceModel], tag="1 Month")
else:
pass
def OnSecuritiesChanged(self, changes):
# ---
# Put all added futures contracts into an array
added_contracts = [x for x in changes.AddedSecurities if x.Symbol.SecurityType == SecurityType.Future]
if added_contracts:
# if any were added, then we append them to our universe_symbols array which we input into universe model
self.universe_symbols.append(sorted(added_contracts, key=lambda x: x.Expiry, reverse=True)[0])
# ---
# Put all delisted futures into an array
deleted_contracts = [x for x in changes.RemovedSecurities if x.Symbol.SecurityType == SecurityType.Future]
if deleted_contracts:
# if any were deleted, then we remove them from our universe_symbols array which we input into universe model
self.universe_symbols.remove(deleted_contracts)
# ---
# This is the main function that's handling the data management in the current_holdings dictionary
def OnOrderEvent(self, orderEvent):
# Check if the order event is filled
if orderEvent.Status != OrderStatus.Filled:
return
# Retrieve the order from the Transactions object
order = self.Transactions.GetOrderById(orderEvent.OrderId)
# Get the symbol from the filled order
symbol = order.Symbol
quantity = order.Quantity
model = order.Tag # This is assuming your 'SourceModel' is set in the tag
if model not in self.current_holdings:
self.current_holdings[model] = 0
self.current_holdings[model] += quantity
# If the order is a sell order, cancel any open limit orders for the same symbol
if order.Direction == OrderDirection.Sell:
openOrders = self.Transactions.GetOpenOrders(symbol)
for openOrder in openOrders:
if openOrder.Type == OrderType.Limit:
self.Transactions.CancelOrder(openOrder.Id)