| Overall Statistics |
|
Total Trades 2086 Average Win 0.59% Average Loss -1.14% Compounding Annual Return -12.891% Drawdown 96.100% Expectancy 0.052 Net Profit -28.248% Sharpe Ratio 0.767 Probabilistic Sharpe Ratio 19.664% Loss Rate 31% Win Rate 69% Profit-Loss Ratio 0.52 Alpha 0.846 Beta -0.072 Annual Standard Deviation 1.099 Annual Variance 1.207 Information Ratio 0.712 Tracking Error 1.111 Treynor Ratio -11.763 Total Fees $0.00 Estimated Strategy Capacity $710000.00 Lowest Capacity Asset EURGBP 8G Portfolio Turnover 767.77% |
#region imports
from AlgorithmImports import *
#endregion
class CustomSlope:
def __init__(self, algorithm, name, period, symbol, atr):
self.Name = name
self.Time = datetime.min
self.IsReady = False
self.multiplier = 3
self.symbol = symbol
self._atr = atr
self._final_upperband = RollingWindow[float](2)
self._final_lowerband = RollingWindow[float](2)
self._supertrend = RollingWindow[float](2)
self._close = RollingWindow[float](3)
warmUpData = self.algorithm.History(symbol, 20, Resolution.Daily)
if not warmUpData.empty:
for bar in warmUpData.loc[symbol, :].itertuples():
tradebar = TradeBar(bar.Index, symbol, bar.open, bar.high, bar.low, bar.close, bar.volume)
self._close.Add(bar.close)
if not self._atr.IsReady:
continue
_basic_upperband = ((bar.high + bar.low)/2 + self.multiplier * self._atr.Current.Value)
_basic_lowerband = ((bar.high + bar.low)/2 - self.multiplier * self._atr.Current.Value)
if self._final_lowerband.IsReady and self._final_upperband.IsReady:
if (_basic_upperband < self._final_upperband[0]) or (self._close[1] > self._final_upperband[0]):
self._final_upperband.Add(_basic_upperband)
else:
self._final_upperband.Add(self._final_upperband[0])
if (_basic_lowerband > self._final_lowerband[0]) or (self._close[1] < self._final_lowerband[0]):
self._final_lowerband.Add(_basic_lowerband)
else:
self._final_lowerband.Add(self._final_lowerband[0])
else:
self._final_upperband.Add(0)
self._final_lowerband.Add(0)
if self._supertrend.IsReady:
if (self._supertrend[0] == self._final_upperband[1]) and (self._close[0] < self._final_upperband[0]):
self._supertrend.Add(self._final_upperband[0])
if (self._supertrend[0] == self._final_upperband[1]) and (self._close[0] > self._final_upperband[0]):
self._supertrend.Add(self._final_lowerband[0])
if (self._supertrend[0] == self._final_lowerband[1]) and (self._close[0] > self._final_lowerband[0]):
self._supertrend.Add(self._final_lowerband[0])
if (self._supertrend[0] == self._final_lowerband[1]) and (self._close[0] < self._final_lowerband[0]):
self._supertrend.Add(self._final_upperband[0])
else:
self._supertrend.Add(0)
else:
self.algorithm.Debug(f'{str(symbol)} warmUpData is empty')
def __repr__(self):
return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value)
def Update(self, high, low, close):
# Need to do this at the end, so that
self._close.Add(close)
self.IsReady = self._supertrend.IsReady
if self.IsReady:
# Update rolling windows
_basic_upperband = ((high + low)/2 + self.multiplier * self._atr.Current.Value)
_basic_lowerband = ((high + low)/2 - self.multiplier * self._atr.Current.Value)
if self._final_lowerband.IsReady and self._final_upperband.IsReady:
if (_basic_upperband < self._final_upperband[0]) or (self._close[1] > self._final_upperband[0]):
self._final_upperband.Add(_basic_upperband)
else:
self._final_upperband.Add(self._final_upperband[0])
if (_basic_lowerband > self._final_lowerband[0]) or (self._close[1] < self._final_lowerband[0]):
self._final_lowerband.Add(_basic_lowerband)
else:
self._final_lowerband.Add(self._final_lowerband[0])
if self._supertrend.IsReady:
if (self._supertrend[0] == self._final_upperband[1]) and (self._close[0] < self._final_upperband[0]):
self._supertrend.Add(self._final_upperband[0])
if (self._supertrend[0] == self._final_upperband[1]) and (self._close[0] > self._final_upperband[0]):
self._supertrend.Add(self._final_lowerband[0])
if (self._supertrend[0] == self._final_lowerband[1]) and (self._close[0] > self._final_lowerband[0]):
self._supertrend.Add(self._final_lowerband[0])
if (self._supertrend[0] == self._final_lowerband[1]) and (self._close[0] < self._final_lowerband[0]):
self._supertrend.Add(self._final_upperband[0])
else:
self._supertrend.Add(0)
return self.IsReady
@property
def IsReady(self):
if self._close.IsReady and self._final_lowerband.IsReady and self._final_upperband.IsReady:
return True
return False
def Update2(self, time, value):
self.queue.appendleft(value)
count = len(self.queue)
self.Time = time
self.IsReady = count == self.queue.maxlen
#### start here the indicator calulation
if self.IsReady:
y = np.log(self.queue)
x = [range(len(y))]
slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
self.Slope = slope * 10000 # value is very small an will display 0 if not multiplyed
self.Intercept = intercept
self.R_value = r_value * 10
self.P_value = p_value
self.Std_err = std_err
#### finish the custom indicator
return self.IsReady#region imports
from AlgorithmImports import *
#endregion
import pandas as pd
from datetime import datetime, timedelta
import re
from enum import Enum
class ReferenceType(Enum):
AVERAGE = 1
PREVIOUS_CLOSE = 2
class VolatilityType(Enum):
STANDARD_DEVIATION = 1
HIGH_LOW = 2 # This trigger very little orders, ineffective.
ATR = 3
class PositionAmountType(Enum):
AVERAGE = 1
INCREMENTAL = 2
DOUBLEUP = 3
class FXMomentumAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 12, 28)
# self.SetStartDate(2017, 12, 28)
self.SetEndDate(datetime.now())
self.SetCash(100000)
self.resolution = Resolution.Minute
self._grid_number = 10
self._leverage_ratio = 50
self._reference_price = None
self._std = None
self._stop_loss = None
self._cash_preserve_ratio = 0.99
self._cash_per_position = None
self.IS_READY = False
self.REFERENCE_PRICE_METHOD = ReferenceType.PREVIOUS_CLOSE
self.POSITION_AMOUNT_METHOD = PositionAmountType.AVERAGE
self.VOLATILITY_METHOD = VolatilityType.STANDARD_DEVIATION
self.VOLATILITY_MULTIPLIER = 2
self.STOP_LOSS_MODE = False
self.STOP_LOSS_RESCALE = False
self.STOP_LOSS_MULTIPLIER = 1.2 # 10% more on upper side and 10% less on lower side
# self.pair = 'GBPCHF'
# self.pair = 'AUDCAD'
# self.pair = 'GBPCAD'
# self.pair = 'USDJPY'
# self.pair = 'AUDJPY'
self.pair = 'EURGBP'
# self.pair = 'GBPUSD'
self.forex = self.AddForex(
self.pair,
self.resolution,
Market.Oanda,
True,
self._leverage_ratio
)
# self.Debug(f'Our Leverage: {self.forex.MarginModel.GetLeverage(self.forex)}')
################################################
# Adding Schedule event
self.Schedule.On(
self.DateRules.MonthStart(self.pair),
self.TimeRules.AfterMarketOpen(self.pair, 0),
self.openMarket
)
self.Schedule.On(
self.DateRules.MonthEnd(self.pair),
self.TimeRules.BeforeMarketClose(self.pair, 5),
self.closeMarket
)
def OnData(self, data):
if not self.IS_READY or self.pair not in data:
return
if not self.STOP_LOSS_MODE:
return
# Stop loss
if data[self.pair].Close < (self._reference_price - self._stop_loss) or data[self.pair].Close > (self._reference_price + self._stop_loss):
# self.Debug(f'{self.Time}: Break out and clean the open orders')
# Liquidate all positions and close all open orders
self.Liquidate(self.pair)
if self.STOP_LOSS_RESCALE:
self.openMarket()
def setParameters(self):
history = self.History([self.pair], 93, Resolution.Daily)
history = history.droplevel(0)
# Set up reference price
if self.REFERENCE_PRICE_METHOD == ReferenceType.PREVIOUS_CLOSE:
# Get close of previous day
self._reference_price = self.History(
[self.pair],
2880,
Resolution.Minute
).droplevel(0).close[-1]
elif self.REFERENCE_PRICE_METHOD == ReferenceType.AVERAGE:
# Daily moving average
self.Debug('Average mean')
self._reference_price = history.close[-20:].mean()
# Cash amount per position
if self.POSITION_AMOUNT_METHOD == PositionAmountType.AVERAGE:
# Leveraged Equal weighted
self._cash_per_position = (self.Portfolio.MarginRemaining * self._leverage_ratio * self._cash_preserve_ratio) / (self._grid_number * 2)
elif self.POSITION_AMOUNT_METHOD == PositionAmountType.INCREMENTAL:
# Leveraged incremental weighted
self._cash_per_position = (self.Portfolio.MarginRemaining * self._leverage_ratio * self._cash_preserve_ratio) / (sum(range(1, self._grid_number+1)) * 2)
elif self.POSITION_AMOUNT_METHOD == PositionAmountType.DOUBLEUP:
# Leveraged double up weighted
self._cash_per_position = (self.Portfolio.MarginRemaining * self._leverage_ratio * self._cash_preserve_ratio) / ((2**self._grid_number - 1) * 2)
# Get volatility of previous day
if self.VOLATILITY_METHOD == VolatilityType.STANDARD_DEVIATION:
# The 2 * Standardization = 95%
self._std = history.close.std() * self.VOLATILITY_MULTIPLIER
elif self.VOLATILITY_METHOD == VolatilityType.HIGH_LOW:
# The high and low of the previous market open day
self._std = abs(history.high.max() - history.low.min())
elif self.VOLATILITY_METHOD == VolatilityType.ATR:
# ATR
self._std = ta.ATR(history.high, history.low, history.close, timeperiod=60)[-1]
self._stop_loss = self._std * self.STOP_LOSS_MULTIPLIER
def openMarket(self):
# self.Debug(f'{self.Time} Market open. Order number ({len(self.Transactions.GetOpenOrders(self.pair))})')
self.setParameters()
# Cancel all open orders before creating new orders
self.Liquidate(self.pair)
# Set up the grid limit order
for i in range(1, self._grid_number + 1):
qty = int(self._cash_per_position * self.get_qty_multiplier(i) / (self._reference_price - self._std / self._grid_number * i))
order = self.LimitOrder(
self.pair,
qty,
round(self._reference_price - self._std / self._grid_number * i, 5),
f'Long{i}'
)
qty = int(self._cash_per_position * self.get_qty_multiplier(i) / (self._reference_price + self._std / self._grid_number * i))
order = self.LimitOrder(
self.pair,
-qty,
round(self._reference_price + self._std / self._grid_number * i, 5),
f'Short{i}'
)
self.IS_READY = True
def closeMarket(self):
# self.Debug(f'{self.Time} Market close')
# Liquidate all positions by the end of the day
self.Liquidate(self.pair)
# self.Debug(f'{self.Time} Market close. Order number ({len(self.Transactions.GetOpenOrders(self.pair))})')
def OnOrderEvent(self, orderEvent):
order = self.Transactions.GetOrderById(orderEvent.OrderId)
if orderEvent.Status == OrderStatus.Filled:
# self.Debug(
# "{0}: {1} ({2})".format(
# self.Time,
# orderEvent,
# order.Tag
# )
# )
match = re.match(r'(.*)(\d+)(.*)$', order.Tag)
if not match:
return
if match.group(1) == 'Long':
i = int(match.group(2))
if match.group(3) == '-Liquidate':
qty = int(self._cash_per_position * self.get_qty_multiplier(i) / (self._reference_price - self._std / self._grid_number * i))
self.LimitOrder(
self.pair,
qty,
round(self._reference_price - self._std / self._grid_number * i, 5),
f'Long{match.group(2)}'
)
else:
if (i - 1) == 0:
self.LimitOrder(
self.pair,
-abs(order.Quantity),
round(self._reference_price, 5),
f'Long{match.group(2)}-Liquidate'
)
elif i > 1:
self.LimitOrder(
self.pair,
-abs(order.Quantity),
round(self._reference_price - self._std / self._grid_number * (i - 1), 5),
f'Long{match.group(2)}-Liquidate'
)
elif match.group(1) == 'Short':
i = int(match.group(2))
if match.group(3) == '-Liquidate':
qty = int(self._cash_per_position * self.get_qty_multiplier(i) / (self._reference_price + self._std / self._grid_number * i))
self.LimitOrder(
self.pair,
-qty,
round(self._reference_price + self._std / self._grid_number * i, 5),
f'Short{match.group(2)}'
)
openOrders = self.Transactions.GetOpenOrders(self.pair)
for order in openOrders:
if order.Tag in [f'Short{match.group(2)}-Liquidate', f'Short{match.group(2)}-Stoploss']:
self.Transactions.CancelOrder(order.Id)
else:
if (i - 1) == 0:
self.LimitOrder(
self.pair,
abs(order.Quantity),
round(self._reference_price, 5),
f'Short{match.group(2)}-Liquidate'
)
else:
self.LimitOrder(
self.pair,
abs(order.Quantity),
round(self._reference_price + self._std / self._grid_number * (i - 1), 5),
f'Short{match.group(2)}-Liquidate'
)
self.IS_READY = False
def get_qty_multiplier(self, grid_number):
if self.POSITION_AMOUNT_METHOD == PositionAmountType.AVERAGE:
# Even weighted
return 1
elif self.POSITION_AMOUNT_METHOD == PositionAmountType.INCREMENTAL:
# Incremental
return grid_number
elif self.POSITION_AMOUNT_METHOD == PositionAmountType.DOUBLEUP:
# Double up weighted
if not isinstance(grid_number, (int)):
grid_number = int(grid_number)
return 2**(grid_number - 1)