| Overall Statistics |
|
Total Trades 237 Average Win 0.46% Average Loss -0.28% Compounding Annual Return -1.717% Drawdown 9.100% Expectancy -0.112 Net Profit -3.647% Sharpe Ratio -1.525 Sortino Ratio -1.719 Probabilistic Sharpe Ratio 0.789% Loss Rate 66% Win Rate 34% Profit-Loss Ratio 1.62 Alpha 0 Beta 0 Annual Standard Deviation 0.032 Annual Variance 0.001 Information Ratio -0.362 Tracking Error 0.032 Treynor Ratio 0 Total Fees $585.39 Estimated Strategy Capacity $620000000.00 Lowest Capacity Asset CL YG5JR83D8NNL Portfolio Turnover 2.65% |
from AlgorithmImports import *
from QuantConnect.DataSource import *
class USFuturesSecurityMasterDataClassicAlgorithm(QCAlgorithm):
def Initialize(self) -> None:
self.SetCash(1000000)
self.SetStartDate(2022, 1, 1)
# self.SetEndDate(2021, 1, 1)
# Set brokerage Model
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.SetSecurityInitializer(BrokerageModelSecurityInitializer(
self.BrokerageModel, FuncSecuritySeeder(self.GetLastKnownPrices)))
# Requesting data
self.continuous_contract = self.AddFuture("CL",
dataNormalizationMode = DataNormalizationMode.BackwardsRatio,
dataMappingMode = DataMappingMode.OpenInterest,
contractDepthOffset = 0)
self.symbol = self.continuous_contract.Symbol
# Historical data
history = self.History(self.symbol, 100, Resolution.Daily)
self.Debug(f"We got {len(history)} items from our history request")
# Initialize BollingerBands indicator
self.bollinger_bands = self.BB(self.symbol, 10, 2, MovingAverageType.Simple, Resolution.Daily)
# Inside your Initialize method, after fetching the historical data:
if not history.empty:
for index, row in history.iterrows():
# The 'index' variable already contains a datetime object in QuantConnect.
time = index[1]
price = float(row["close"])
# Update the Bollinger Bands indicator with the time and price
self.bollinger_bands.Update(time, price)
def OnData(self, slice: Slice) -> None:
# Accessing data
for symbol, changed_event in slice.SymbolChangedEvents.items():
self.HandleRollover(changed_event)
mapped_symbol = self.continuous_contract.Mapped
if not (slice.Bars.ContainsKey(self.symbol) and self.bollinger_bands.IsReady and mapped_symbol):
return
# Trading logic
price = slice.Bars[self.symbol].Close
if price > self.bollinger_bands.UpperBand.Current.Value and not self.Portfolio[mapped_symbol].IsLong:
self.MarketOrder(mapped_symbol, 1)
elif price < self.bollinger_bands.LowerBand.Current.Value and not self.Portfolio[mapped_symbol].IsShort:
self.MarketOrder(mapped_symbol, -1)
def HandleRollover(self, changed_event):
old_symbol = changed_event.OldSymbol
new_symbol = changed_event.NewSymbol
tag = f"Rollover at {self.Time}: {old_symbol} -> {new_symbol}"
quantity = self.Portfolio[old_symbol].Quantity
# Rolling over: to liquidate any position of the old mapped contract and switch to the newly mapped contract
self.Liquidate(old_symbol, tag=tag)
if quantity != 0:
self.MarketOrder(new_symbol, quantity, tag=tag)
self.Log(tag)
from AlgorithmImports import *
from QuantConnect.DataSource import *
class USFuturesSecurityMasterDataClassicAlgorithm (QCAlgorithm):
threshold = 0.01 # 1%
def Initialize(self) -> None:
self.SetCash(1000000)
self.SetStartDate(2022, 1, 1)
# self.SetEndDate(2021, 1, 1)
#Set brokerage Model
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.SetSecurityInitializer(BrokerageModelSecurityInitializer(
self.BrokerageModel, FuncSecuritySeeder(self.GetLastKnownPrices)))
# Requesting data
self.continuous_contract = self.AddFuture("CL",
dataNormalizationMode = DataNormalizationMode.BackwardsRatio,
dataMappingMode = DataMappingMode.OpenInterest,
contractDepthOffset = 0)
self.symbol = self.continuous_contract.Symbol
# Historical data
history = self.History(self.symbol, 100, Resolution.Daily)
self.Debug(f"We got {len(history)} items from our history request")
self.sma = self.SMA(self.symbol, 10, Resolution.Daily)
if not history.empty:
for time, row in history.droplevel(0).loc[self.symbol].iterrows():
self.sma.Update(IndicatorDataPoint(time, row.close))
def OnData(self, slice: Slice) -> None:
# Accessing data
for symbol, changed_event in slice.SymbolChangedEvents.items():
self.HandleRollover(changed_event)
mapped_symbol = self.continuous_contract.Mapped
if not (slice.Bars.ContainsKey(self.symbol) and self.sma.IsReady and mapped_symbol):
return
# Trading logic
if slice.Bars[self.symbol].Price > self.sma.Current.Value * (1+self.threshold) and not self.Portfolio[mapped_symbol].IsLong:
self.MarketOrder(mapped_symbol, 1)
elif slice.Bars[self.symbol].Price < self.sma.Current.Value * (1-self.threshold) and not self.Portfolio[mapped_symbol].IsShort:
self.MarketOrder(mapped_symbol, -1)
def HandleRollover(self, changed_event):
old_symbol = changed_event.OldSymbol
new_symbol = changed_event.NewSymbol
tag = f"Rollover at {self.Time}: {old_symbol} -> {new_symbol}"
quantity = self.Portfolio[old_symbol].Quantity
# Rolling over: to liquidate any position of the old mapped contract and switch to the newly mapped contract
self.Liquidate(old_symbol, tag=tag)
if quantity != 0: self.MarketOrder(new_symbol, quantity, tag = tag)
self.Log(tag)from AlgorithmImports import *
from QuantConnect.DataSource import *
from datetime import timedelta
class USFuturesSecurityMasterDataClassicAlgorithm(QCAlgorithm):
threshold = 0.01 # 1%
atr_period = 100 # ATR calculation period
risk_factor = 0.002 # 0.2% risk per trade
def Initialize(self) -> None:
self.SetCash(1000000)
self.SetStartDate(2022, 1, 1)
# self.SetEndDate(2021, 1, 1)
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
# Add Future contract and get the SymbolProperties
self.continuous_contract = self.AddFuture("GC",
dataNormalizationMode = DataNormalizationMode.BackwardsRatio,
dataMappingMode = DataMappingMode.OpenInterest,
contractDepthOffset = 0)
self.symbol = self.continuous_contract.Symbol
properties = self.Securities[self.symbol].SymbolProperties
self.contractMultiplier = properties.ContractMultiplier
self.Debug(f"Contract Multiplier: {self.contractMultiplier}")
# Initialize ATR
self.atr = self.ATR(self.symbol, self.atr_period, MovingAverageType.Simple, Resolution.Daily)
# Initialize SMA
self.sma = self.SMA(self.symbol, 10, Resolution.Daily)
self.Debug("ATR and SMA indicators have been initialized.")
history = self.History(self.symbol, 100, Resolution.Daily)
if not history.empty:
# Ensure we're iterating over the DataFrame correctly
for index, row in history.iterrows():
# The 'index' should be a tuple containing (symbol, time)
if isinstance(index, tuple) and len(index) == 2:
# Unpack the tuple
symbol, time = index
# Now proceed as before
# ...
else:
self.Debug(f"Unexpected index structure: {index}")
self.SetWarmUp(timedelta(days=self.atr_period))
def OnData(self, slice: Slice) -> None:
if self.IsWarmingUp:
return
for symbol, changed_event in slice.SymbolChangedEvents.items():
self.HandleRollover(changed_event)
mapped_symbol = self.continuous_contract.Mapped
if not (slice.Bars.ContainsKey(self.symbol) and self.sma.IsReady and self.atr.IsReady and mapped_symbol):
return
# Calculate the position size using ATR and contract multiplier
equity = self.Portfolio.TotalPortfolioValue
position_size = self.CalculatePositionSize(equity, self.atr.Current.Value, self.contractMultiplier)
# Trading logic
# Check if the position size is greater than zero before trading
if position_size > 0:
# Trading logic
price = slice.Bars[self.symbol].Price
if price > self.sma.Current.Value * (1 + self.threshold) and not self.Portfolio[mapped_symbol].IsLong:
self.Debug(f"Going long: {position_size} contracts.")
self.MarketOrder(mapped_symbol, position_size)
elif price < self.sma.Current.Value * (1 - self.threshold) and not self.Portfolio[mapped_symbol].IsShort:
self.Debug(f"Going short: {position_size} contracts.")
self.MarketOrder(mapped_symbol, -position_size)
else:
self.Debug("Position size is zero, no trade executed.")
def CalculatePositionSize(self, equity, atr, contractMultiplier):
dollar_volatility = atr * contractMultiplier
num_contracts = (self.risk_factor * equity) / dollar_volatility
self.Debug(f"Dollar volatility: {dollar_volatility}, Number of contracts: {num_contracts}")
# Check if the calculated number of contracts is greater than zero
if num_contracts > 0:
return int(num_contracts) # Ensure we only buy whole contracts
else:
self.Debug(f"Calculated position size is zero. Equity: {equity}, ATR: {atr}, Contract Multiplier: {contractMultiplier}")
return 0
def HandleRollover(self, changed_event):
old_symbol = changed_event.OldSymbol
new_symbol = changed_event.NewSymbol
tag = f"Rollover at {self.Time}: {old_symbol} -> {new_symbol}"
quantity = self.Portfolio[old_symbol].Quantity
# Rolling over: to liquidate any position of the old mapped contract and switch to the newly mapped contract
self.Liquidate(old_symbol, tag=tag)
if quantity != 0:
self.Debug(f"Handling rollover from {old_symbol} to {new_symbol}, quantity: {quantity}")
self.MarketOrder(new_symbol, quantity, tag = tag)
self.Log(tag)
from AlgorithmImports import *
from QuantConnect.DataSource import *
class USFuturesSecurityMasterDataClassicAlgorithm (QCAlgorithm):
threshold = 0.01 # 1%
def Initialize(self) -> None:
self.SetCash(1000000)
self.SetStartDate(2022, 1, 1)
# self.SetEndDate(2021, 1, 1)
#Set brokerage Model
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.SetSecurityInitializer(BrokerageModelSecurityInitializer(
self.BrokerageModel, FuncSecuritySeeder(self.GetLastKnownPrices)))
# Requesting data
self.continuous_contract = self.AddFuture("CL",
dataNormalizationMode = DataNormalizationMode.BackwardsRatio,
dataMappingMode = DataMappingMode.OpenInterest,
contractDepthOffset = 0)
self.symbol = self.continuous_contract.Symbol
# Historical data
history = self.History(self.symbol, 100, Resolution.Daily)
self.Debug(f"We got {len(history)} items from our history request")
self.sma = self.SMA(self.symbol, 10, Resolution.Daily)
if not history.empty:
for time, row in history.droplevel(0).loc[self.symbol].iterrows():
self.sma.Update(IndicatorDataPoint(time, row.close))
def OnData(self, slice: Slice) -> None:
# Accessing data
for symbol, changed_event in slice.SymbolChangedEvents.items():
self.HandleRollover(changed_event)
mapped_symbol = self.continuous_contract.Mapped
if not (slice.Bars.ContainsKey(self.symbol) and self.sma.IsReady and mapped_symbol):
return
# Trading logic
if slice.Bars[self.symbol].Price > self.sma.Current.Value * (1+self.threshold) and not self.Portfolio[mapped_symbol].IsLong:
self.MarketOrder(mapped_symbol, 1)
elif slice.Bars[self.symbol].Price < self.sma.Current.Value * (1-self.threshold) and not self.Portfolio[mapped_symbol].IsShort:
self.MarketOrder(mapped_symbol, -1)
def HandleRollover(self, changed_event):
old_symbol = changed_event.OldSymbol
new_symbol = changed_event.NewSymbol
tag = f"Rollover at {self.Time}: {old_symbol} -> {new_symbol}"
quantity = self.Portfolio[old_symbol].Quantity
# Rolling over: to liquidate any position of the old mapped contract and switch to the newly mapped contract
self.Liquidate(old_symbol, tag=tag)
if quantity != 0: self.MarketOrder(new_symbol, quantity, tag = tag)
self.Log(tag)