| Overall Statistics |
|
Total Trades 1790 Average Win 1.95% Average Loss -1.15% Compounding Annual Return 129.638% Drawdown 2.100% Expectancy 0.796 Net Profit 5.723% Sharpe Ratio 7.003 Probabilistic Sharpe Ratio 98.029% Loss Rate 33% Win Rate 67% Profit-Loss Ratio 1.69 Alpha 0.86 Beta -0.138 Annual Standard Deviation 0.115 Annual Variance 0.013 Information Ratio 2.796 Tracking Error 0.144 Treynor Ratio -5.831 Total Fees $12409.80 Estimated Strategy Capacity $0 Lowest Capacity Asset 6J WSYSCWI3O1KX |
import pytz
from math import floor
class FuturesTester(QCAlgorithm):
def Initialize(self):
self.SetTimeZone("America/Chicago")
self.SetCash(10000000)
self.jpy = self.AddFuture(Futures.Currencies.JPY, Resolution.Tick)
self.jpy.SetFilter(0, 90)
self.future = self.Securities[self.jpy.Symbol]
# self.Settings.FreePortfolioValuePercentage = 0.98
self.tz = pytz.timezone('America/Chicago')
csv = self.Download("https://www.dropbox.com/s/x4i79k6gol5pr48/QC_trades_data.csv?dl=1")
data = csv.split("\n")
self.b_trades = []
self.s_trades = []
for i in data:
line = i.split(',')
if line[0] == 'buy':
self.b_trades.append({"direction": line[1], "time": datetime.fromtimestamp(float(line[2]), tz=self.tz)})
elif line[0] == 'sell':
self.s_trades.append({"direction": line[1], "time": datetime.fromtimestamp(float(line[2]), tz=self.tz)})
start = min(self.b_trades[0]["time"], self.s_trades[0]["time"]) - timedelta(days=1)
end = max(self.b_trades[-1]["time"], self.s_trades[-1]["time"]) + timedelta(hours=1)
self.SetStartDate(start)
self.SetEndDate(2021, 10, 31)
self.prev_day = start.day
self.prev_time = None
self.b_remaining = 0
self.s_remaining = 0
self.b_start = None
self.s_start = None
self.liquid_contract = None
def OnData(self, slice):
t = self.Time.replace(tzinfo=self.tz)
if self.b_remaining < 0:
self.get_liquid_contract(slice)
# self.Debug("Volume: {} Remaining: {} Seconds since open: {}".format(self.liquid_contract.Volume, self.b_remaining, (self.Time - self.b_start).seconds))
if self.liquid_contract.Volume > 1:
order = -(self.liquid_contract.Volume - 1)
self.b_remaining += order
self.MarketOrder(self.liquid_contract.Symbol, order)
elif self.s_remaining > 0:
self.get_liquid_contract(slice)
# self.Debug("Volume: {} Remaining: {} Seconds since open: {}".format(self.liquid_contract.Volume, self.s_remaining, (self.Time - self.s_start).seconds))
if self.liquid_contract.Volume > 1:
order = (self.liquid_contract.Volume - 1)
self.s_remaining -= order
self.MarketOrder(self.liquid_contract.Symbol, order)
elif t >= self.b_trades[0]['time']:
if self.b_trades[0]['direction'] == "b":
if not self.Portfolio.Invested:
self.b_start = self.Time
self.get_liquid_contract(slice)
self.b_remaining = -self.get_number_of_contracts()
# self.Debug("b_trade open: {}".format(self.b_remaining))
# self.Debug(self.liquid_contract.Volume)
order = -(self.liquid_contract.Volume - 1)
self.b_remaining += order
self.MarketOrder(self.liquid_contract.Symbol, order)
else:
self.Debug("Buy execution error - trade already open")
elif self.b_trades[0]['direction'] == "s":
self.b_remaining = 0
# self.Debug("b liquidate")
self.Liquidate()
del self.b_trades[0]
elif t >= self.s_trades[0]['time']:
if self.s_trades[0]['direction'] == "s":
if not self.Portfolio.Invested:
self.s_start = self.Time
self.get_liquid_contract(slice)
self.s_remaining = self.get_number_of_contracts()
# self.Debug("s_trade open: {}".format(self.s_remaining))
# self.Debug(self.liquid_contract.Volume)
order = (self.liquid_contract.Volume - 1)
self.s_remaining -= order
self.MarketOrder(self.liquid_contract.Symbol, order)
else:
self.Debug("Sell execution error - trade already open")
elif self.s_trades[0]['direction'] == "b":
self.s_remaining = 0
# self.Debug("s liquidate")
self.Liquidate()
del self.s_trades[0]
# def OpenInterestSecurityInitializer(self, security):
# history = self.History([security.Symbol], timedelta(1))
# if 'openinterest' in history:
# oi = OpenInterest(self.Time, security.Symbol, history.openinterest.dropna().iloc[-1])
# security.SetMarketPrice(oi)
# def get_trading_contract(self, slice):
# for chain in slice.FutureChains.Values:
# contracts = chain.Contracts.Values
# chainContracts = list(contracts)
# self.Debug(chainContracts[0])
# self.Debug(len(chainContracts))
# chainContracts = sorted(chainContracts, key=lambda x: x.Expiry)
def get_number_of_contracts(self):
return floor((self.Portfolio.MarginRemaining * 0.05) / self.future.BuyingPowerModel.InitialOvernightMarginRequirement)
def OnMarginCallWarning(self):
self.Error("You received a margin call warning..")
def get_liquid_contract(self, slice):
for chain in slice.FutureChains:
self.popular_contracts = [contract for contract in chain.Value]
#2. If the length of contracts in this chain is zero, continue to the next chain
if not len(self.popular_contracts):
continue
#3. Sort our contracts by open interest in descending order and save to sortedByOIContracts
sortedByOIContracts = sorted(self.popular_contracts, key=lambda k : k.OpenInterest, reverse=True)
#4. Save the contract with the highest open interest to self.liquidContract
liquid_contract = sortedByOIContracts[0]
self.liquid_contract = liquid_contract