| Overall Statistics |
|
Total Trades 81 Average Win 0.67% Average Loss -0.10% Compounding Annual Return 186.324% Drawdown 6.300% Expectancy 5.973 Net Profit 37.139% Sharpe Ratio 5.773 Probabilistic Sharpe Ratio 93.966% Loss Rate 11% Win Rate 89% Profit-Loss Ratio 6.84 Alpha 1.382 Beta -0.513 Annual Standard Deviation 0.243 Annual Variance 0.059 Information Ratio 5.655 Tracking Error 0.254 Treynor Ratio -2.728 Total Fees $0.00 Estimated Strategy Capacity $10000000.00 |
from AlgoToolbox import AlgoToolbox
'''
To use this library place this at the top:
from StrategyFxScalper import StrategyFxScalper
Then instantiate the function:
StrategyFxScalpCustom(StrategyFxScalper):
...
startegy = StrategyFxScalpCustom()
strategy.Run(data)
'''
# ********************************
class StrategyFxScalper:
# ********************************
# -------------------------------------------------------------------------------------------------------------
def __init__(self, qcAlgo, fxGalaxy, resolution, anchor_resolution,
scalp_period, anchor_period, show_debug, show_log, anchor_slots): #, trade_units):
# -------------------------------------------------------------------------------------------------------------
self.buy_flag = 0
self.sell_flag = 0
self.limitOrderTicket = None
self.profitTargetOrderTicket = None
self.stopLossOrderTicket = None
self.scalp_ema_fast = 0
self.scalp_ema_slow = 0
self.anchor_ema_fast = 0
self.anchor_ema_slow = 0
self.qcAlgo = qcAlgo
self.fxGalaxy = fxGalaxy
self.BrickData = {}
self.fxHoldingCoeff = 1 / len(fxGalaxy)
self.resolution = resolution
self.anchorResolution = anchor_resolution
self.scalp_period = scalp_period
self.anchor_period = anchor_period
self.anchor_max_slots = anchor_slots
#self.trade_units_amount = trade_units
self.holding_value = 0
self.showDebug = show_debug
self.showLog = show_log
self.toolbox = AlgoToolbox(self.qcAlgo, self.showDebug, self.showLog)
# -------------------------------------------------------------------------------------------------------------
def Run(self, data, scalp_call = False, anchor_call = False):
# -------------------------------------------------------------------------------------------------------------
# validation
# exit if warming up
if self.qcAlgo.IsWarmingUp:
return
# init
ctxData = data
if ctxData is None:
ctxData = self.BrickData
# loop through all currencies in the galaxy
for fxStarBrick in self.fxGalaxy.values():
# assign current strategy
fxStarBrick.strategy = self
# debug if needed
if self.showDebug:
if scalp_call or anchor_call: # debug for scalp/anchor live calls only!
if fxStarBrick.IsChartHour(ctxData.Time): # debug for the chart time only
self.debugRunFx(ctxData, fxStarBrick)
# save brick data
msg_no_brick_data = "No data for " + fxStarBrick.symbol.Value + "!!! Stop Run-Workflow ...";
err_no_brick_data = "ERROR: No data for " + fxStarBrick.symbol.Value + "!!! Stop Run-Workflow. Error-Message: ";
data_exists = ctxData.ContainsKey(fxStarBrick.symbol)
if data_exists:
#try:
self.BrickData = ctxData
# live-run strategy for actual currency (live)
if not scalp_call and not anchor_call:
self.RunFx(ctxData, fxStarBrick)
# live-run strategy for actual currency (scalp period) if allowed
if scalp_call:
self.RunFxScalp(ctxData, fxStarBrick)
# live-run strategy for actual currency (anchor period) if allowed
if anchor_call:
self.RunFxAnchor(ctxData, fxStarBrick)
#except:
# # EXIT IF NO SYMBOL-DATA!!!
# self.apriori_debug(err_no_brick_data + sys.exc_info()[0])
# self.apriori_log(err_no_brick_data + sys.exc_info()[0])
# return
else:
# EXIT IF NO SYMBOL-DATA!!!
self.apriori_log(msg_no_brick_data)
self.apriori_debug(msg_no_brick_data)
return
# -------------------------------------------------------------------------------------------------------------
def RunScalp(self, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# validation
# exit if warming up
if self.qcAlgo.IsWarmingUp:
return
# go!
self.RunFxScalp(self.BrickData, fxBrick)
# -------------------------------------------------------------------------------------------------------------
def RunAnchor(self, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# validation
# exit if warming up
if self.qcAlgo.IsWarmingUp:
return
# go!
self.RunFxAnchor(self.BrickData, fxBrick)
# base (override in custom)
# -------------------------------------------------------------------------------------------------------------
def RunFx(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# init
fxBrick.UpdateScalpFluentData(data)
fxBrick.UpdateAnchorFluentData(data)
# go!
pass
# base (override in custom)
# -------------------------------------------------------------------------------------------------------------
def RunFxScalp(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# debug if needed
if self.showDebug:
self.debugRunFx(data, fxBrick)
# init
fxBrick.ResetScalpFluentData()
# go!
# check trades
self.CheckTradeRules(data, fxBrick)
# -------------------------------------------------------------------------------------------------------------
def CheckTradeRules(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
fxBrick.Check_Trade_Rules(data)
# base (override in custom)
# -------------------------------------------------------------------------------------------------------------
def RunFxAnchor(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# debug if needed
if self.showDebug:
self.debugRunFx(data, fxBrick)
# init
fxBrick.ResetAnchorFluentData()
fxBrick.AnchorIsWarmedUp()
# go!
pass
# ****************************************************
# * TRADE RULES REGION (BASE / BEGIN) *
# ****************************************************
# -------------------------------------------------------------------------------------------------------------
def OnTrigger_Long(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
pass
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnTrigger_Short(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
pass
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnTriggerDrop_Long(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
pass
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnTriggerDrop_Short(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
pass
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnEntry_Long(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
pass
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnEntry_Short(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
pass
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnEntry_Short(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
pass
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnExit_Long(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
pass
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnExit_Short(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
pass
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnStopLossAndTakeProfit_Long(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
pass
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnStopLossAndTakeProfit_Short(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
pass
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnFinishTrade_Long(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
pass
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnFinishTrade_Short(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
pass
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnStopLoss_Long(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
pass
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnStopLoss_Short(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
pass
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnTradeContinue_Long(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
pass
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnTradeContinue_Short(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
pass
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# ****************************************************
# * TRADE RULES REGION TRADE RULES (BASE / END) *
# ****************************************************
# ******************************************************
# * TRADE-ACTION RULES REGION TRADE RULES (BASE / BEGIN) *
# ******************************************************
# -------------------------------------------------------------------------------------------------------------
def OnTradeOpen_Long_Holdings(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
if self.qcAlgo.emit_trades:
# init
fxBrick.trade_action("OPEN_LONG")
# BUY via SetHoldings()
self.holding_value = 1 * self.qcAlgo.forex_leverage * self.qcAlgo.cash_max_ratio * fxBrick.weight
self.qcAlgo.SetHoldings(fxBrick.symbol, self.holding_value)
fxBrick.Emit_Insight()
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnTradeOpen_Long(self, data, fxBrick, set_limits=True):
# -------------------------------------------------------------------------------------------------------------
if self.qcAlgo.emit_trades:
# reset limits
fxBrick.reset_limits()
# !!! prevent initial canceling of the limit orders !!!
#fxBrick.veto_cancel_orders()
# init
fxBrick.trade_action("OPEN_LONG")
price = fxBrick.data.Close
# set absolute open level
fxBrick.trade_abs_start_level = price
fxBrick.trade_abs_stop_level = 0
# BUY via order (LONG)
# get holgig coeffitient
self.holding_value = 1 * self.qcAlgo.forex_leverage * self.qcAlgo.cash_max_ratio * fxBrick.weight
# get weighted rounded quantity
quantity = (self.qcAlgo.Portfolio.Cash / price) * self.holding_value
rounded_quantity = fxBrick.GetRoundedOrderSize(quantity)
# place long order (buy)
self.qcAlgo.Buy(fxBrick.symbol, rounded_quantity)
if set_limits:
# calc by PIP rounde price
take_profit = self.pip_round(fxBrick.trade_take_profit)
stop_loss = self.pip_round(fxBrick.stop_loss_level)
# take profit long
fxBrick.ProfitTarget = self.qcAlgo.LimitOrder(fxBrick.symbol, -rounded_quantity, take_profit)
# stop loss long
fxBrick.StopLoss = self.qcAlgo.StopMarketOrder(fxBrick.symbol, -rounded_quantity, stop_loss)
fxBrick.Emit_Insight()
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnTradeClose_Long(self, data, fxBrick, closeRatio = 1):
# closeRatio = 1 - close all holdings, 0.5 - close 50% of holdings, 0 - nothing to close
# -------------------------------------------------------------------------------------------------------------
# avoid part closing on 1. exit (part-close)
if closeRatio < 1 and not self.qcAlgo.exit_1_close:
return
# init
price = fxBrick.data.Close
# set absolute open level
fxBrick.trade_abs_stop_level = price
if self.qcAlgo.emit_trades:
# close trade on given ratio
fxBrick.trade_action("CLOSE_LONG_" + str(closeRatio * 100) + "_PERCENT")
self.trade_close(data, fxBrick, closeRatio)
fxBrick.Emit_Insight()
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnTradeOpen_Short_Holdings(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
if self.qcAlgo.emit_trades:
fxBrick.trade_action("OPEN_SHORT")
# SELL via SetHoldings()
self.holding_value = -1 * self.qcAlgo.forex_leverage * self.qcAlgo.cash_max_ratio * fxBrick.weight
self.qcAlgo.SetHoldings(fxBrick.symbol, self.holding_value)
fxBrick.Emit_Insight()
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnTradeOpen_Short(self, data, fxBrick, set_limits=True):
# -------------------------------------------------------------------------------------------------------------
if self.qcAlgo.emit_trades:
# reset limits
fxBrick.reset_limits()
# !!! prevent initial canceling of the limit orders !!!
#fxBrick.veto_cancel_orders()
# init
fxBrick.trade_action("OPEN_SHORT")
price = fxBrick.data.Close
# set absolute open level
fxBrick.trade_abs_start_level = price
fxBrick.trade_abs_stop_level = 0
# SELL via order (SHORT)
# get holgig coeffitient
self.holding_value = self.qcAlgo.forex_leverage * self.qcAlgo.cash_max_ratio * fxBrick.weight
# get weighted rounded quantity
quantity = (self.qcAlgo.Portfolio.Cash / price) * self.holding_value
rounded_quantity = fxBrick.GetRoundedOrderSize(quantity)
# place short order (sell)
self.qcAlgo.Sell(fxBrick.symbol, rounded_quantity)
if set_limits:
# calc by PIP rounde price
take_profit = self.pip_round(fxBrick.trade_take_profit)
stop_loss = self.pip_round(fxBrick.stop_loss_level)
# take profit short
fxBrick.ProfitTarget = self.qcAlgo.LimitOrder(fxBrick.symbol, rounded_quantity, take_profit)
# stop loss short
fxBrick.StopLoss = self.qcAlgo.StopMarketOrder(fxBrick.symbol, rounded_quantity, stop_loss)
fxBrick.Emit_Insight()
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def OnTradeClose_Short(self, data, fxBrick, closeRatio = 1):
# closeRatio = 1 - close all holdings, 0.5 - close 50% of holdings, 0 - nothing to close
# -------------------------------------------------------------------------------------------------------------
# avoid part closing on 1. exit (part-close)
if closeRatio < 1 and not self.qcAlgo.exit_1_close:
return
# init
price = fxBrick.data.Close
# set absolute open level
fxBrick.trade_abs_stop_level = price
if self.qcAlgo.emit_trades:
# adjust short coeff
if self.holding_value > 0:
self.holding_value = -1 * self.holding_value
# close trade on given ratio
fxBrick.trade_action("CLOSE_SHORT_" + str(closeRatio * 100) + "_PERCENT")
self.trade_close(data, fxBrick, closeRatio)
fxBrick.Emit_Insight()
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# -------------------------------------------------------------------------------------------------------------
def trade_close(self, data, fxBrick, closeRatio = 1):
# closeRatio = 1 - close all holdings, 0.5 - close 50% of holdings, 0 - nothing to close
# -------------------------------------------------------------------------------------------------------------
current_holding = 1 - closeRatio
if current_holding < 0:
current_holding = 0
if current_holding > 1:
current_holding = 1
# re-balance holding coefficient
self.holding_value = current_holding * self.holding_value
# close via SetHoldings()
self.qcAlgo.SetHoldings(fxBrick.symbol, self.holding_value)
# get profit benchmarks
fxBrick.get_profit_benchmarks()
# ******************************************************
# * TRADE-ACTION RULES REGION TRADE RULES (BASE / END) *
# ******************************************************
# -------------------------------------------------------------------------------------------------------------
def debugRunFx(self, bigData, fxStarBrick):
# -------------------------------------------------------------------------------------------------------------
try:
data = bigData[fxStarBrick.symbol]
# validation
# exit if chart is disabled
if not fxStarBrick.IsChartTime(data.Time):
return
# go!
self.debug("===================================================" )
self.debug("Run:" )
self.debug("Brick-Data: " + str(data.Time) + " : " + str(fxStarBrick.symbol))
self.debug("Current OHLC: " + str(data.Open) + " / " + str(data.High) + " / " \
+ str(data.Low) + " / " + str(data.Close))
self.debug("Fluent-Anchor OHLC: " \
+ str(fxStarBrick.anchor_fluent_open) + " / " \
+ str(fxStarBrick.anchor_fluent_high) + " / " \
+ str(fxStarBrick.anchor_fluent_low) + " / " \
+ str(fxStarBrick.anchor_fluent_close))
for idx in range(fxStarBrick.scalp_max_slots):
self.debug("TS-"+str(idx)+": " + str(fxStarBrick.scalpQuoteBar_RollingWindow[idx].Time))
self.debug("S-RW-"+str(idx)+": " + str(fxStarBrick.scalpQuoteBar_RollingWindow[idx]))
self.debug("-----------------------------------------------" )
self.debug("Scalp Indicators:")
for ind_rw_key in fxStarBrick.scalpIndicators_RollingWindows:
self.debug(str("S-IND:" + ind_rw_key))
rw = fxStarBrick.scalpIndicators_RollingWindows[ind_rw_key]
# show current indicator rolling window for current currency
for i in range(fxStarBrick.scalp_max_slots + 1):
self.debug(str(rw[i]))
self.debug("-------------------------------------------------------------------------------" )
for idx in range(fxStarBrick.anchor_max_slots):
self.debug("TA-"+str(idx)+": " + str(fxStarBrick.anchorQuoteBar_RollingWindow[idx].Time))
self.debug("A-RW-"+str(idx)+": " + str(fxStarBrick.anchorQuoteBar_RollingWindow[idx]))
self.debug("-----------------------------------------------" )
self.debug("Anchor Indicators:")
for ind_rw_key in fxStarBrick.anchorIndicators_RollingWindows.keys():
self.debug(str("A-IND:" + ind_rw_key))
rw = fxStarBrick.anchorIndicators_RollingWindows[ind_rw_key]
# show current indicator rolling window for current currency
for i in range(fxStarBrick.anchor_max_slots + 1):
self.debug(str(rw[i]))
self.debug("-------------------------------------------------------------------------------" )
except:
self.debug("Error debugRunFx() with" + str(fxStarBrick.symbol))
# -------------------------------------------------------------------------------------------------------------
def debug(self, msg):
# -------------------------------------------------------------------------------------------------------------
self.toolbox.show_debug(msg)
# -------------------------------------------------------------------------------------------------------------
def log(self, msg):
# -------------------------------------------------------------------------------------------------------------
self.toolbox.show_log(msg)
# -------------------------------------------------------------------------------------------------------------
def Debug(self, msg):
# -------------------------------------------------------------------------------------------------------------
self.toolbox.show_debug(msg)
# -------------------------------------------------------------------------------------------------------------
def apriori_debug(self, msg):
# -------------------------------------------------------------------------------------------------------------
self.toolbox.Debug(msg)
# -------------------------------------------------------------------------------------------------------------
def apriori_log(self, msg):
# -------------------------------------------------------------------------------------------------------------
self.toolbox.Log(msg)
# -------------------------------------------------------------------------------------------------------------
def pip_round(self, price):
# -------------------------------------------------------------------------------------------------------------
return self.toolbox.pip_round(price)
# *********************************
# * PLOT REGION (BEGIN) *
# *********************************
# -------------------------------------------------------------------------------------------------------------
def plot_scalp_chart(self, fxBrick, bar):
# -------------------------------------------------------------------------------------------------------------
self.plot_price_chart(fxBrick.scalp_chart.Name, fxBrick, bar)
# -------------------------------------------------------------------------------------------------------------
def plot_anchor_chart(self, fxBrick, bar):
# -------------------------------------------------------------------------------------------------------------
self.plot_price_chart(fxBrick.anchor_chart.Name, fxBrick, bar)
# -------------------------------------------------------------------------------------------------------------
def plot_scalp_ema_cross(self, fxBrick, value):
# -------------------------------------------------------------------------------------------------------------
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_ema_cross_point, value)
# -------------------------------------------------------------------------------------------------------------
def plot_anchor_ema_cross(self, fxBrick, value):
# -------------------------------------------------------------------------------------------------------------
self.plot(fxBrick.anchor_chart.Name, fxBrick.price_chart_serial_ema_cross_point, value)
# -------------------------------------------------------------------------------------------------------------
def plot_price_chart(self, chart_name, fxBrick, bar):
# -------------------------------------------------------------------------------------------------------------
self.plot(chart_name, fxBrick.price_chart_serial_open, bar.Open)
self.plot(chart_name, fxBrick.price_chart_serial_close, bar.Close)
self.plot(chart_name, fxBrick.price_chart_serial_close_consolidated, bar.Close)
self.plot(chart_name, fxBrick.price_chart_serial_open_price_point, bar.Open)
self.plot(chart_name, fxBrick.price_chart_serial_high_price_point, bar.High)
self.plot(chart_name, fxBrick.price_chart_serial_low_price_point, bar.Low)
self.plot(chart_name, fxBrick.price_chart_serial_close_price_point, bar.Close)
# -------------------------------------------------------------------------------------------------------------
def plot_trigger(self, fxBrick, value, isShort):
# -------------------------------------------------------------------------------------------------------------
if isShort:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_short_trigger_point, value)
else:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_long_trigger_point, value)
# -------------------------------------------------------------------------------------------------------------
def plot_entry(self, fxBrick, value, isShort, isPoint):
# -------------------------------------------------------------------------------------------------------------
if isShort:
if isPoint:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_short_entry_point, value)
else:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_short_entry, value)
else:
if isPoint:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_long_entry_point, value)
else:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_long_entry, value)
# -------------------------------------------------------------------------------------------------------------
def plot_stop_loss(self, fxBrick, value, isShort, isPoint):
# -------------------------------------------------------------------------------------------------------------
if isShort:
if isPoint:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_short_stop_loss_point, value)
else:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_short_stop_loss, value)
else:
if isPoint:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_long_stop_loss_point, value)
else:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_long_stop_loss, value)
# -------------------------------------------------------------------------------------------------------------
def plot_exit_1(self, fxBrick, value, isShort, isPoint):
# -------------------------------------------------------------------------------------------------------------
if isShort:
if isPoint:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_short_exit_1_point, value)
else:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_short_exit_1, value)
else:
if isPoint:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_long_exit_1_point, value)
else:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_long_exit_1, value)
# -------------------------------------------------------------------------------------------------------------
def plot_exit_2(self, fxBrick, value, isShort, isPoint):
# -------------------------------------------------------------------------------------------------------------
if isShort:
if isPoint:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_short_exit_2_point, value)
else:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_short_exit_2, value)
else:
if isPoint:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_long_exit_2_point, value)
else:
self.plot(fxBrick.scalp_chart.Name, fxBrick.price_chart_serial_long_exit_2, value)
# -------------------------------------------------------------------------------------------------------------
def plot(self, chart_name, serial_name, value):
# -------------------------------------------------------------------------------------------------------------
self.qcAlgo.Plot(chart_name, serial_name, value)
# *********************************
# * PLOT REGION (END) *
# *********************************# custom inpots
from ForexTrendBase import ForexTrendBase
from FxRedBirdBrick_South import FxRedBirdBrick_South
from StrategyFxRedBird_South import StrategyFxRedBird_South
# ********************************
class ForexRedBird_South(ForexTrendBase):
# ********************************
# -----------------------------------------------------------
# Initializer (Cosntructor)
# ------------------------------------------------------------
def Initializie(self):
# ------------------------------------------------------------
# base initializer
super(ForexRedBird_South, self).Initializie()
# main initializer
self.SetBrokerageModel(AlphaStreamsBrokerageModel())
# setup forex brokerages
if self.Broker == "FXCM":
self.SetBrokerageModel(BrokerageName.FxcmBrokerage)
else:
self.SetBrokerageModel(BrokerageName.OandaBrokerage)
# ------------------
# custom selector method
# -----------------------------------------------------------------------------------------------------------
def selectStrategy(self):
# -----------------------------------------------------------------------------------------------------------
return StrategyFxRedBird_South(self, self.forex_galaxy, self.resolution, self.anchorResolution, self.scalp_period,
self.anchor_period, self.show_debug, self.show_log, self.anchor_max_slots)
# ------------------
# custom selector
# -----------------------------------------------------------------------------------------------------------
def selectGalaxyStars(self):
# -----------------------------------------------------------------------------------------------------------
# init strategy galaxy
selected_galaxy = self.selectGalaxy(self.get_str_param("selected_galaxy"))
#self.toolbox.show_log
result = {}
# create galaxy of fx symbol bricks
for fxTickerStar in selected_galaxy:
fxStarWeight = selected_galaxy[fxTickerStar]
result[fxTickerStar] = FxRedBirdBrick_South(self, fxTickerStar, fxStarWeight,
self.resolution, self.anchorResolution,
self.scalp_period, self.anchor_period,
self.show_debug, self.show_log,
self.anchor_max_slots, self.scalp_max_slots)
# finally
return result
# -----------------------------------------------------------------------------------------------------------
def OnOrderEvent(self, orderEvent):
# -----------------------------------------------------------------------------------------------------------
return
'''
# init
filledOrderId = orderEvent.OrderId
overrule_veto = False
# looping all symbol bricks
for fxBrickSymbol in self.forex_galaxy:
if fxBrickSymbol == str(orderEvent.Symbol):
fxBrick = self.forex_galaxy[fxBrickSymbol]
#if fxBrick.Ignore_Cancel_Event:
# if fxBrick.trade_state == fxBrick.STAND_BY:
# return
#else:
show_action = False #not fxBrick.Ignore_Cancel_Event
state_reset = False #not fxBrick.Ignore_Cancel_Event
# If the ProfitTarget order was filled, close the StopLoss order
if fxBrick.match_order(fxBrick.ProfitTarget, filledOrderId):
if fxBrick.cancel_stop_loss(overrule_veto, show_action, state_reset):
fxBrick.trade_action_filled_tp_cancel_sl()
# If the StopLoss order was filled, close the ProfitTarget
if fxBrick.match_order(fxBrick.StopLoss, filledOrderId):
if fxBrick.cancel_take_profit(overrule_veto, show_action, state_reset):
fxBrick.trade_action_filled_sl_cancel_tp()
'''from ForexSymbolBrick import ForexSymbolBrick
from QuantConnect.Data import *
from QuantConnect import *
class FxRedBirdBrick_South(ForexSymbolBrick):
def __init__(self,qcAlgo, ticker, weight, resolution, anchorResolution, scalpPeriod, anchorPeriod,
showDebug = False, showLog = False, anchor_max_slots = 24, scalp_max_slots = 10):
super(FxRedBirdBrick_South, self).__init__(qcAlgo, ticker, weight, resolution, anchorResolution, scalpPeriod, anchorPeriod, \
showDebug, showLog, anchor_max_slots, scalp_max_slots)
# params candidates
self.delta_accelerate_max_ratio = 3 #5 #3 #2.7 # 0 deactivate
self.orange_risk_invert_weight = 5
self.red_risk_shift_percent = 10
self.exit2_level_min_steps_count = -1 #7 # deactivate = -1
self.RCT_pullback_trailing_stop_ratio = 0 # 0 = deactivate
# trailing props
self.TTG_trailing_trade_gain = 0
self.TLG_trailing_live_gain = 0
self.LLo_lowest_low_open = 0
self.LLc_lowest_low_close = 0
self.HHo_highest_high_open = 0
self.HHc_highest_high_close = 0
self.LVo_live_open = 0
self.LVc_live_close = 0
# delta props
self.minor_delta = 0
self.major_delta = 0
self.delta_acceleration = 0
# semafor props
self.IsLevel_Exit_1_Semafor = False
self.IsLevel_Exit_2_Semafor = False
self.exit2_level_steps_count = 0
# trade control props
self.trade_lowest_low = 0
self.trade_highest_high = 0
self.lowest_low_last_trailng_qbars = 0
self.highest_high_last_trailng_qbars = 0
self.touch_level = 0
self.exit_1_rate = self.qcAlgo.exit_1_rate
# machine states
self.EXIT_1_SHORT = self.EXIT_1_SHORT_BASE
self.EXIT_1_LONG = self.EXIT_1_LONG_BASE
self.EXIT_2_SHORT = self.EXIT_2_SHORT_BASE
self.EXIT_2_LONG = self.EXIT_2_LONG_BASE
# fast-ema trigger tolerance to bar touch delta (pips)
self.bar_to_fast_indicator_touch_delta_trigger_pips = self.qcAlgo.touch_trigger_pip_tolerance
# slow-ema stopper toleracnce to bar touch delta (pips)
self.bar_to_slow_inidcator_touch_delta_stopper_pips = self.qcAlgo.touch_trigger_pip_tolerance
# * TRADE RULES/EVENTS REGION (CUSTOM / BEGIN) *
def calc_trade_props(self, rolling_window_qoute_bar, lookback_steps, bar_touch_margins, trigger_pips_up, trigger_pips_down):
# long trade props
if self.is_long_trade:
self.stop_loss_level = self.touch_level
self.trade_highest_high = self.Get_Trade_Scalp_Highest_High(rolling_window_qoute_bar, lookback_steps, bar_touch_margins)
self.entry_level = self.plus_pip(self.trade_highest_high, trigger_pips_up)
self.trade_risk = abs(self.entry_level - self.stop_loss_level)
self.trade_break_even = self.entry_level + self.trade_risk
self.trade_exit = self.trade_break_even + self.trade_risk
self.trade_take_profit = self.trade_exit
# short trade props
if self.is_short_trade:
self.stop_loss_level = self.touch_level
self.trade_lowest_low = self.Get_Trade_Scalp_Lowest_Low(rolling_window_qoute_bar, lookback_steps, bar_touch_margins)
self.entry_level = self.plus_pip(self.trade_lowest_low, trigger_pips_down)
self.trade_risk = abs(self.entry_level - self.stop_loss_level)
self.trade_break_even = self.entry_level - self.trade_risk
self.trade_exit = self.trade_break_even - self.trade_risk
self.trade_take_profit = self.trade_exit
def invert_break_even(self):
# long trade props
if self.is_long_trade:
self.trade_break_even = self.entry_level - self.orange_risk_invert_weight * self.trade_risk
# short trade props
if self.is_short_trade:
self.trade_break_even = self.entry_level + self.orange_risk_invert_weight * self.trade_risk
def calc_on_break_even(self):
# common trade props (short/long)
self.stop_loss_level = self.entry_level
# self.trade_break_even = 0
def calc_diff_range(self, rw_qb, d_idx, u_idx):
o = 0
c = 0
for i in range(d_idx, u_idx):
if self.is_long_trade:
if o == 0 or o > rw_qb[i].Open:
o = rw_qb[i].Open
if c == 0 or c < rw_qb[i].Close:
c = rw_qb[i].Close
if self.is_short_trade:
if o == 0 or o < rw_qb[i].Open:
o = rw_qb[i].Open
if c == 0 or c > rw_qb[i].Close:
c = rw_qb[i].Close
return o, c
def calc_diff_delta(self, rw_qb):
# init
a_len_major = rw_qb.Size
a_len_minor = int(a_len_major / 2)
# go!
minor_o, minor_c = self.calc_diff_range(rw_qb, 0, a_len_minor)
major_o, major_c = self.calc_diff_range(rw_qb, a_len_minor, a_len_major)
self.minor_delta = minor_c - minor_o
self.major_delta = major_c - major_o
if self.minor_delta != 0:
self.delta_acceleration = abs(self.major_delta / self.minor_delta)
else:
self.delta_acceleration = 0
def Is_Support_Level(self, rw_qb):
if self.delta_accelerate_max_ratio == 0:
return False
else:
self.calc_diff_delta(rw_qb)
return self.major_delta < 0 and self.delta_acceleration > self.delta_accelerate_max_ratio
#return self.delta_acceleration > self.delta_accelerate_max_ratio
def Is_Resistance_Level(self, rw_qb):
if self.delta_accelerate_max_ratio == 0:
return False
else:
self.calc_diff_delta(rw_qb)
return self.major_delta > 0 and self.delta_acceleration > self.delta_accelerate_max_ratio
#return self.delta_acceleration > self.delta_accelerate_max_ratio
def calc_rolling_trade_props(self, rolling_window_qoute_bar, lookback_steps, bar_touch_margins, trigger_pips_up, trigger_pips_down):
force_open_close_touch_margins = "open_close"
# init
self.oc_lowest_low_last_trailng_qbars = self.Get_Trade_Scalp_Lowest_Low(rolling_window_qoute_bar, lookback_steps, force_open_close_touch_margins)
self.oc_highest_high_last_trailng_qbars = self.Get_Trade_Scalp_Highest_High(rolling_window_qoute_bar, lookback_steps, force_open_close_touch_margins)
self.TTG_trailing_trade_gain = abs(self.oc_highest_high_last_trailng_qbars - self.oc_lowest_low_last_trailng_qbars)
self.TLG_trailing_live_gain = self.scalp_fluent_close
# LONG trailing stop loss
if self.trade_state == self.EXIT_1_LONG:
self.lowest_low_last_trailng_qbars = self.Get_Trade_Scalp_Lowest_Low(rolling_window_qoute_bar, lookback_steps, bar_touch_margins)
self.stop_loss_level = self.plus_pip(self.lowest_low_last_trailng_qbars, trigger_pips_down)
self.trade_exit = 0 # exit via stop_loss only, while in trailing_exit-mode
self.trade_take_profit = self.trade_exit
# SHORT trailing stop loss
if self.trade_state == self.EXIT_1_SHORT:
self.highest_high_last_trailng_qbars = self.Get_Trade_Scalp_Highest_High(rolling_window_qoute_bar, lookback_steps, bar_touch_margins)
self.stop_loss_level = self.plus_pip(self.highest_high_last_trailng_qbars, trigger_pips_up)
self.trade_exit = 0 # exit via stop_loss only, while in trailing_exit-mode
self.trade_take_profit = self.trade_exit
# calc lookback profit
def update_hh_ll_limits(self):
#init
live_s_o = self.scalp_fluent_open
live_s_c = self.scalp_fluent_close
LLo = self.LLo_lowest_low_open
LLc = self.LLc_lowest_low_close
HHo = self.HHo_highest_high_open
HHc = self.HHc_highest_high_close
#go!
self.LVo_live_open = live_s_o
self.LVc_live_close = live_s_c
if self.is_long_trade:
if LLo == 0 or LLo > live_s_o:
self.LLo_lowest_low_open = live_s_o
if HHc == 0 or HHc < live_s_c:
self.HHc_highest_high_close = live_s_c
if self.is_short_trade:
if LLc == 0 or LLc > live_s_c:
self.LLc_lowest_low_close = live_s_c
if HHo == 0 or HHo < live_s_o:
self.HHo_highest_high_open = live_s_o
def Is_Pullback_Trailing_Exit(self):
# init
result = False
# compare
# long trade props
if self.is_long_trade:
if self.LVc_live_close < self.LVo_live_open:
self.TLG_trailing_live_gain = self.HHc_highest_high_close - self.LVc_live_close
self.TTG_trailing_trade_gain = self.HHc_highest_high_close - self.LLo_lowest_low_open
result = self.LLo_lowest_low_open > 0 and self.HHc_highest_high_close > 0 \
and self.LVo_live_open > self.LLo_lowest_low_open \
and abs(self.TLG_trailing_live_gain / self.TTG_trailing_trade_gain) > self.RCT_pullback_trailing_stop_ratio
# short trade props
if self.is_short_trade:
if self.LVc_live_close > self.LVo_live_open:
self.TLG_trailing_live_gain = self.LVc_live_close - self.LLc_lowest_low_close
self.TTG_trailing_trade_gain = self.HHo_highest_high_open - self.LLc_lowest_low_close
result = self.HHo_highest_high_open > 0 and self.LLc_lowest_low_close > 0 \
and self.LVo_live_open < self.HHo_highest_high_open \
and abs(self.TLG_trailing_live_gain / self.TTG_trailing_trade_gain) > self.RCT_pullback_trailing_stop_ratio
# finally
return result
def Check_Trade_Rules(self, data):
# base
super(FxRedBirdBrick_South, self).Check_Trade_Rules(data)
# init environment vars
rw_ai_fast = self.anchorIndicators_RollingWindows["ema_8_anchor"]
rw_ai_slow = self.anchorIndicators_RollingWindows["ema_21_anchor"]
rw_si_fast = self.scalpIndicators_RollingWindows["ema_8_scalp"]
rw_si_slow = self.scalpIndicators_RollingWindows["ema_21_scalp"]
rw_a_qb = self.anchorQuoteBar_RollingWindow
rw_s_qb = self.scalpQuoteBar_RollingWindow
trailing_exit_stop_bars_count = self.qcAlgo.trailing_exit_stop_loss_bars
live_s_time = self.scalp_fluent_time
live_s_o = self.scalp_fluent_open
live_s_h = self.scalp_fluent_high
live_s_l = self.scalp_fluent_low
live_s_c = self.scalp_fluent_close
live_s_value = live_s_c
post_s_o = rw_s_qb[0].Open
post_s_h = rw_s_qb[0].High
post_s_l = rw_s_qb[0].Low
post_s_c = rw_s_qb[0].Close
prev_s_o = rw_s_qb[1].Open
prev_s_h = rw_s_qb[1].High
prev_s_l = rw_s_qb[1].Low
prev_s_c = rw_s_qb[1].Close
live_a_time = self.anchor_fluent_time
live_a_o = self.anchor_fluent_open
live_a_h = self.anchor_fluent_high
live_a_l = self.anchor_fluent_low
live_a_c = self.anchor_fluent_close
live_a_value = live_a_c
live_ai_fast = self.anchor_ema_current_value(8)
live_ai_slow = self.anchor_ema_current_value(21)
live_si_fast = self.scalp_ema_current_value(8)
live_si_mid = self.scalp_ema_current_value(13)
live_si_slow = self.scalp_ema_current_value(21)
s_narrow = self.is_narrow_ema_scalp
s_delta_pips = self.scalp_emas_pips_delta
s_delta_min_pips = self.qcAlgo.scalp_delta_min_pips
a_delta = self.anchor_emas_pips_delta
a_delta_min_pips = self.qcAlgo.anchor_delta_min_pips
bar_touch_margins = self.qcAlgo.bar_touch_margins
tolerance = self.qcAlgo.touch_trigger_pip_tolerance
trigger_touch_tolerance = self.bar_to_fast_indicator_touch_delta_trigger_pips
stopper_touch_tolerance = self.bar_to_slow_inidcator_touch_delta_stopper_pips
trigger_pips_up = self.qcAlgo.pullback_trigger_pips_up
trigger_pips_down = self.qcAlgo.pullback_trigger_pips_down
exit_trailing = self.qcAlgo.exit_strategy_trailing_exit
exit_break_even = self.qcAlgo.exit_strategy_break_even_exit
take_profit_limit = self.qcAlgo.take_profit
stop_loss_limit = self.qcAlgo.take_profit
# exit_trailing startegy has bigger prioriry
if exit_trailing and exit_break_even:
exit_break_even = False
exit_trailing = True
else:
if not exit_trailing and not exit_break_even:
exit_trailing = True
exit_break_even = False
# Go!
if self.check_trade_allowed:
# Init trade section vars
self.trade_confidence = 1
live_idx = 0
prior_idx = 1
level = live_s_value
# * *
# * HANDLE TRADE STATE-MACHINE *
# * *
if self.trade_state == self.STAND_BY:
# --- handle STAND-BY (begin)
self.IsLevel_TradeEnabled_Semafors[live_idx] = \
self.IsLevel_TradeEnabled(live_ai_fast, live_ai_slow, live_a_o, live_a_h, live_a_l, live_a_c, tolerance, bar_touch_margins)
live_trade_enabled = self.IsLevel_TradeEnabled_Semafors[live_idx]
self.Indicator_Deltas[live_idx] = abs(live_ai_fast - live_ai_slow)
live_indicator_delta = self.Indicator_Deltas[live_idx]
self.Direction_Semafors[live_idx] = \
self.Check_Trade_Direction(live_ai_fast, live_ai_slow, live_a_o, live_a_h, live_a_l, live_a_c, tolerance, bar_touch_margins)
live_trade_direction = self.Direction_Semafors[live_idx]
# check if trade is enabled
if live_trade_enabled:
# check anchor history rules
hist_trade_enabled = True
hist_trade_direction_match = True
hist_ind_expanded = True
hist_ind_expanded_sum = 0
hist_ind_expanded_count = 0
hist_ind_expanded_avg = 0
for ha_idx in range(1, self.anchor_max_slots + 1):
# init
self.Indicator_Deltas[ha_idx] = 0
qb_idx = ha_idx - 1
hist_ai_fast = self.value(rw_ai_fast[ha_idx])
hist_ai_slow = self.value(rw_ai_slow[ha_idx])
hist_indicator_delta = abs(hist_ai_fast - hist_ai_slow)
# calc history enablers
self.IsLevel_TradeEnabled_Semafors[ha_idx] = \
self.IsLevel_TradeEnabled(hist_ai_fast , hist_ai_slow, \
rw_a_qb[qb_idx].Open, rw_a_qb[qb_idx].High, rw_a_qb[qb_idx].Low, rw_a_qb[qb_idx].Close, \
tolerance, bar_touch_margins)
# calc history direction
self.Direction_Semafors[ha_idx] = \
self.Check_Trade_Direction(hist_ai_fast , hist_ai_slow, \
rw_a_qb[qb_idx].Open, rw_a_qb[qb_idx].High, rw_a_qb[qb_idx].Low, rw_a_qb[qb_idx].Close, \
tolerance, bar_touch_margins)
# calc history (rolling window) trade enabling
if ha_idx <= self.anchor_lookback_for_trade_enabled:
hist_trade_enabled = hist_trade_enabled and self.IsLevel_TradeEnabled_Semafors[ha_idx]
if self.direction_lookback:
hist_trade_direction_match = hist_trade_direction_match and live_trade_direction == self.Direction_Semafors[ha_idx]
if self.indicator_lookback:
# calc history indicators
self.Indicator_Deltas[live_idx] = abs(hist_ai_fast - hist_ai_slow)
hist_ind_expanded_sum = hist_ind_expanded_sum + self.Indicator_Deltas[live_idx]
hist_ind_expanded_count = hist_ind_expanded_count + 1
# calc avg history indicators
if self.indicator_lookback and hist_ind_expanded_count > 0:
hist_ind_expanded_avg = hist_ind_expanded_sum / hist_ind_expanded_count
hist_ind_expanded = live_indicator_delta >= hist_ind_expanded_avg
# exit if history lookback trades wasn't enabled
if not hist_trade_enabled or not hist_trade_direction_match or not hist_ind_expanded:
# pay attention to the cancel events
# self.cancel_event_attention()
return
# reset live trend direction props
self.IsLevel_TradeEnabled_Semafors[live_idx] = \
self.IsLevel_TradeEnabled(live_ai_fast, live_ai_slow, live_a_o, live_a_h, live_a_l, live_a_c, tolerance, bar_touch_margins)
# check trigger
is_trigger = \
self.IsLevel_Trigger(\
live_si_fast, live_si_mid, live_si_slow, \
live_s_o, live_s_h, live_s_l, live_s_c, \
trigger_touch_tolerance, bar_touch_margins)
# handle trigger semafor
if is_trigger:
# activate trigger flag
self.activate_trigger_semafor()
# calculate trade props
self.calc_trade_props(rw_s_qb, rw_s_qb.Size, bar_touch_margins, trigger_pips_up, trigger_pips_down)
# Calc confidence
self.Get_Confidence()
# call strategy - trigger (long)
if self.is_long_trade:
self.strategy.OnTrigger_Long(data, self)
# set machine-state
self.trade_state = self.TRIGGER_LONG
# Log state Event
self.trade_action("TRIGGER_LONG", self.qcAlgo.emit_info_insights)
# call strategy - trigger (short)
if self.is_short_trade:
self.strategy.OnTrigger_Short(data, self)
# set machine-state
self.trade_state = self.TRIGGER_SHORT
self.trade_action("TRIGGER_SHORT", self.qcAlgo.emit_info_insights)
else:
if self.qcAlgo.emit_info_insights:
self.trade_action("WAIT_FOR_TRIGGER", self.qcAlgo.emit_info_insights)
# exit
return
# --- handle STAND-BY (end)
if self.qcAlgo.emit_info_insights:
self.trade_action("WAIT_FOR_TRADE_ENABLING", self.qcAlgo.emit_info_insights)
if self.trade_state == self.TRIGGER_LONG:
# --- handle TRIGGER_LONG (start)
# Calc confidence
self.Get_Confidence()
if not self.Is_Resistance_Level(rw_a_qb) and self.IsLevel_Entry(level):
# disable cancel limit orders
self.veto_cancel_orders
# reset common trade props (short/long)
self.trade_start_level = self.data.Value
self.trade_take_profit = self.trade_exit
# calc limits
self.calc_stop_loss(level, stop_loss_limit)
self.calc_take_profit(level, exit_trailing, take_profit_limit)
# set semafor
self.activate_entry_semafor()
# call strategy
self.strategy.OnEntry_Long(data, self)
# set machine-state
self.trade_state = self.ENTRY_LONG
# Log state Event
self.trade_action("ENTRY_LONG")
# call strategy - open trade
self.strategy.OnTradeOpen_Long(data, self, False)
else:
if level < self.stop_loss_level:
# 100% confidence on "DROP TRIGGER"
self.trade_confidence = 1
# set semafor
self.activate_stand_by()
# call strategy
self.strategy.OnTriggerDrop_Long(data, self)
# set machine-state
self.trade_state = self.DROP_TRIGGER_LONG
# Log state Event
self.trade_action("DROP_TRIGGER_LONG")
else:
if self.qcAlgo.emit_info_insights:
self.trade_action("WAIT_FOR_ENTRY_LONG", self.qcAlgo.emit_info_insights)
# exit
return
# --- handle TRIGGER_LONG (end)
if self.trade_state == self.TRIGGER_SHORT:
# --- handle TRIGGER_SHORT (start)
# Calc confidence
self.Get_Confidence()
if not self.Is_Support_Level(rw_a_qb) and self.IsLevel_Entry(level):
# disable cancel limit orders
self.veto_cancel_orders
# reset common trade props (short/long)
self.trade_start_level = self.data.Value
self.trade_take_profit = self.trade_exit
# calc limits
self.calc_stop_loss(level, stop_loss_limit)
self.calc_take_profit(level, exit_trailing, take_profit_limit)
# set semafor
self.activate_entry_semafor()
# call strategy
self.strategy.OnEntry_Short(data, self)
# set machine-state
self.trade_state = self.ENTRY_SHORT
# Log state Event
self.trade_action("ENTRY_SHORT")
# call strategy - open trade
self.strategy.OnTradeOpen_Short(data, self, False)
else:
if level > self.stop_loss_level:
# 100% confidence on "DROP TRIGGER"
self.trade_confidence = 1
# set semafor
self.activate_stand_by()
# call strategy
self.strategy.OnTriggerDrop_Short(data, self)
# set machine-state
self.trade_state = self.DROP_TRIGGER_SHORT
# Log state Event
self.trade_action("DROP_TRIGGER_SHORT")
else:
if self.qcAlgo.emit_info_insights:
self.trade_action("WAIT_FOR_ENTRY_SHORT", self.qcAlgo.emit_info_insights)
# exit
return
# --- handle TRIGGER_SHORT (end)
if self.trade_state == self.ENTRY_LONG:
# check engulf condition
if self.qcAlgo.engulf and self.qcAlgo.engulf_ratio > 0:
if self.IsLevel_Reverse_Engulf( \
post_s_o,post_s_h,post_s_l, post_s_c, \
prev_s_o, prev_s_h, prev_s_l, prev_s_c, \
self.qcAlgo.engulf_ratio):
# change trade state to "trend reversion"
self.trade_state = self.TREND_REVERSION_LONG_TO_SHORT
# check if trade state is still ENTRY
if self.trade_state == self.ENTRY_LONG:
# Calc confidence
self.Get_Confidence()
# --- handle ENTRY_LONG (start)
## Go! (proceed main entry routine)
if self.IsLevel_Exit_1(level):
# 100% confidence on "EXIT"-Events
self.trade_confidence = 1
# set semafor
self.activate_exit_1_semafor()
# call strategy
self.strategy.OnExit_1_Long(data, self)
# set machine-state
self.trade_state = self.EXIT_1_LONG
self.exit2_level_steps_count = 0
# Log state Event
self.trade_action("EXIT_1_LONG")
# risk management - "break even" on "exit_1" event
self.calc_on_break_even()
else:
if self.IsLevel_StopLoss(level, stop_loss_limit):
# pay attention to the cancel events
# self.cancel_event_attention()
# return
# 100% confidence on "STOP LOSS"
self.trade_confidence = 1
# set semafors
self.activate_stand_by()
self.activate_stop_loss_semafor()
# call strategy
self.strategy.OnStopLoss_Long(data, self)
# set machine-state
self.trade_state = self.STOP_LOSS_LONG
self.trade_action_noinsight_nobenchmark("STOP_LOSS_LONG")
# call strategy - close 100% of the holdings
self.strategy.OnTradeClose_Long(data, self)
else:
if self.IsLevel_OrangeRisk(level, stop_loss_limit):
# 100% confidence on "ORANGE RISK"
self.trade_confidence = 1
# set semafor
self.activate_orange_risk_semafor()
# force machine-state to EXIT_1 (LONG)
# self.trade_state = self.EXIT_1_LONG
self.invert_break_even()
self.trade_action_insight_benchmark("ORANGE_RISK_LONG")
else:
self.strategy.OnTradeContinue_Long(data, self)
# maintain the same machine-state
self.trade_state = self.ENTRY_LONG
if self.qcAlgo.emit_info_insights:
self.trade_action_insight_benchmark("TRADE_CONTINUE_LONG_1")
return
# --- handle ENTRY_LONG (end)
if self.trade_state == self.ENTRY_SHORT:
# check engulf condition
if self.qcAlgo.engulf and self.qcAlgo.engulf_ratio > 0:
if self.IsLevel_Reverse_Engulf( \
post_s_o,post_s_h,post_s_l, post_s_c, \
prev_s_o, prev_s_h, prev_s_l, prev_s_c, \
self.qcAlgo.engulf_ratio):
# change trade state to "trend reversion"
self.trade_state = self.TREND_REVERSION_SHORT_TO_LONG
# check if trade state is still ENTRY
if self.trade_state == self.ENTRY_SHORT:
# --- handle ENTRY_SHORT (start)
# Calc confidence
self.Get_Confidence()
if self.IsLevel_Exit_1(level):
# 100% confidence on "EXIT"-Events
self.trade_confidence = 1
# set semafor
self.activate_exit_1_semafor()
# call strategy
self.strategy.OnExit_1_Short(data, self)
# set machine-state
self.trade_state = self.EXIT_1_SHORT
self.exit2_level_steps_count = 0
# Log state Event
self.trade_action("EXIT_1_SHORT")
# risk management - "break even" on "exit_1" event
self.calc_on_break_even()
else:
if self.IsLevel_StopLoss(level, stop_loss_limit):
# pay attention to the cancel events
# self.cancel_event_attention()
# return
# (deprecated)
# 100% confidence on "STOP LOSS"
self.trade_confidence = 1
# set semafors
self.activate_stand_by()
self.activate_stop_loss_semafor()
# call strategy
self.strategy.OnStopLoss_Short(data, self)
# set machine-state
self.trade_state = self.STOP_LOSS_SHORT
# Log state Event
self.trade_action_noinsight_nobenchmark("STOP_LOSS_SHORT")
# call strategy - close 100% of the holdings
self.strategy.OnTradeClose_Short(data, self)
else:
if self.IsLevel_OrangeRisk(level, stop_loss_limit):
# 100% confidence on "ORANGE RISK"
self.trade_confidence = 1
# set semafor
self.activate_orange_risk_semafor()
# force machine-state to EXIT_1 (SHORT)
#self.trade_state = self.EXIT_1_SHORT
self.invert_break_even()
self.trade_action_insight_benchmark("ORANGE_RISK_SHORT")
# risk management - "break even" on "orange risk" event
#self.calc_on_break_even()
else:
self.strategy.OnTradeContinue_Short(data, self)
# maintain the same machine-state
self.trade_state = self.ENTRY_SHORT
if self.qcAlgo.emit_info_insights:
self.trade_action_insight_benchmark("TRADE_CONTINUE_SHORT_1")
# exit
return
# --- handle ENTRY_SHORT (end)
if self.trade_state == self.EXIT_1_LONG:
# --- handle EXIT_1_LONG (start)
# Calc confidence
self.Get_Confidence()
if exit_trailing:
# Re-Calculating trailing stop loss (if in trailng exit mode)
self.calc_rolling_trade_props(rw_s_qb, trailing_exit_stop_bars_count, bar_touch_margins, trigger_pips_up, trigger_pips_down)
# reset common trade props (short/long)
self.trade_start_level = self.data.Value
self.trade_take_profit = self.trade_exit
# calc limits
self.calc_stop_loss(level, stop_loss_limit)
self.calc_take_profit(level, exit_trailing, take_profit_limit)
self.update_hh_ll_limits()
if self.IsLevel_Exit_2(level, exit_trailing, take_profit_limit):
# 100% confidence on "EXIT"-Events
self.trade_confidence = 1
# set semafor
self.activate_exit_2_semafor()
# call strategy
self.strategy.OnExit_2_Long(data, self, exit_trailing)
self.trade_state = self.EXIT_2_LONG
# Log state Event
self.trade_action("EXIT_2_LONG")
# call strategy - close trade
self.strategy.OnTradeClose_Long(data, self)
else:
if self.IsLevel_StopLoss(level, stop_loss_limit):
# 100% confidence on "STOP LOSS"
self.trade_confidence = 1
# set semafors
self.activate_stand_by()
self.activate_stop_loss_semafor()
self.strategy.OnStopLoss_Long(data, self)
self.trade_state = self.STOP_LOSS_TAKE_PROFIT_LONG
# Log state Event
self.trade_action_noinsight_nobenchmark("EXIT_TAKE_PROFIT_LONG")
# call strategy - stop loss, close trade and take profit
self.strategy.OnStopLossAndTakeProfit_Long(data, self)
# call strategy - close 100% of the holdings
self.strategy.OnTradeClose_Long(data, self, 100/100)
else:
self.strategy.OnTradeContinue_Long(data, self)
# maintain the same machine-state
self.trade_state = self.EXIT_1_LONG
if self.qcAlgo.emit_info_insights:
self.trade_action_insight_benchmark("TRADE_CONTINUE_LONG_2")
# exit
return
# --- handle EXIT_1_LONG (end)
if self.trade_state == self.EXIT_1_SHORT:
# --- handle EXIT_1_SHORT (start)
# Calc confidence
self.Get_Confidence()
if exit_trailing:
# Re-Calculating trailing stop loss (if in trailng exit mode)
self.calc_rolling_trade_props(rw_s_qb, trailing_exit_stop_bars_count, bar_touch_margins, trigger_pips_up, trigger_pips_down)
# reset common trade props (short/long)
self.trade_start_level = self.data.Value
self.trade_take_profit = self.trade_exit
# calc limits
self.calc_stop_loss(level, stop_loss_limit)
self.calc_take_profit(level, exit_trailing, take_profit_limit)
self.update_hh_ll_limits()
if self.IsLevel_Exit_2(level, exit_trailing, take_profit_limit):
# 100% confidence on "EXIT"-Events
self.trade_confidence = 1
# set semafor
self.activate_exit_2_semafor()
# call strategy
self.strategy.OnExit_2_Short(data, self, exit_trailing)
self.trade_state = self.EXIT_2_SHORT
# Log state Event
self.trade_action("EXIT_2_SHORT")
# call strategy - close trade
self.strategy.OnTradeClose_Short(data, self)
else:
if self.IsLevel_StopLoss(level, stop_loss_limit):
# 100% confidence on "STOP LOSS"
self.trade_confidence = 1
# set semafors
self.activate_stand_by()
self.activate_stop_loss_semafor()
self.strategy.OnStopLoss_Short(data, self)
self.trade_state = self.STOP_LOSS_TAKE_PROFIT_SHORT
# Log state Event
self.trade_action_noinsight_nobenchmark("EXIT_TAKE_PROFIT_SHORT")
# call strategy - stop loss, close trade and take profit
self.strategy.OnStopLossAndTakeProfit_Short(data, self)
# call strategy - close 100% of the holdings
self.strategy.OnTradeClose_Short(data, self, 100/100)
else:
self.strategy.OnTradeContinue_Short(data, self)
# maintain the same machine-state
self.trade_state = self.EXIT_1_SHORT
if self.qcAlgo.emit_info_insights:
self.trade_action_insight_benchmark("TRADE_CONTINUE_SHORT_2")
# exit
return
# --- handle EXIT_1_SHORT (end)
if self.trade_state == self.TREND_REVERSION_LONG_TO_SHORT:
# --- handle TREND_REVERSION_LONG_TO_LONG (start)
# 100% confidence on "EXIT"-Events
self.trade_confidence = 1
# drop all semafors
self.drop_all_semafors()
# Log state Event
self.trade_action_insight_benchmark("TREND_REVERSION_LONG_TO_SHORT")
# call strategy - close trade
self.strategy.OnTradeClose_Long(data, self)
# Log state Event
self.trade_action_insight_benchmark("CLOSE_LONG_ON_TREND_REVERSION")
if self.qcAlgo.reverse_trade:
# reverse trade direction
self.Flip_Trade_Direction()
# calculate trade props
self.calc_trade_props(rw_s_qb, rw_s_qb.Size, bar_touch_margins, trigger_pips_up, trigger_pips_down)
# reset common trade props (short/long)
self.trade_start_level = self.data.Value
self.trade_take_profit = self.trade_exit
# calc limits
self.calc_stop_loss(level, stop_loss_limit)
self.calc_take_profit(level, exit_trailing, take_profit_limit)
# set semafor
self.activate_entry_semafor()
# call strategy
self.strategy.OnEntry_Short(data, self)
# set machine-state
self.trade_state = self.ENTRY_SHORT
# Log state Event
self.trade_action_insight_benchmark("ENTRY_SHORT_ON_TREND_REVERSION")
# call strategy - open trade
self.strategy.OnTradeOpen_Short(data, self, False)
# exit
return
# --- handle TREND_REVERSION_LONG_TO_LONG (end)
if self.trade_state == self.TREND_REVERSION_SHORT_TO_LONG:
# --- handle TREND_REVERSION_SHORT_TO_LONG (start)
# 100% confidence on "EXIT"-Events
self.trade_confidence = 1
# drop all semafors
self.drop_all_semafors()
# Log state Event
self.trade_action_insight_benchmark("TREND_REVERSION_SHORT_TO_LONG")
# call strategy - close trade
self.strategy.OnTradeClose_Short(data, self)
# Log state Event
self.trade_action_insight_benchmark("CLOSE_SHORT_ON_TREND_REVERSION")
if self.qcAlgo.reverse_trade:
# reverse trade direction
self.Flip_Trade_Direction()
# calculate trade props
self.calc_trade_props(rw_s_qb, rw_s_qb.Size, bar_touch_margins, trigger_pips_up, trigger_pips_down)
# reset common trade props (short/long)
self.trade_start_level = self.data.Value
self.trade_take_profit = self.trade_exit
# calc limits
self.calc_stop_loss(level, stop_loss_limit)
self.calc_take_profit(level, exit_trailing, take_profit_limit)
# set semafor
self.activate_entry_semafor()
# call strategy
self.strategy.OnEntry_Long(data, self)
# set machine-state
self.trade_state = self.ENTRY_LONG
# Log state Event
self.trade_action_insight_benchmark("ENTRY_LONG_ON_TREND_REVERSION")
# call strategy - open trade
self.strategy.OnTradeOpen_Long(data, self, False)
# exit
return
# --- handle TREND_REVERSION_SHORT_TO_LONG (end) ------------------------
if self.trade_state == self.EXIT_2_LONG or self.trade_state == self.EXIT_2_SHORT \
or self.trade_state == self.TREND_REVERSION_LONG_TO_SHORT or self.trade_state == self.TREND_REVERSION_SHORT_TO_LONG \
or self.trade_state == self.STOP_LOSS_LONG or self.trade_state == self.STOP_LOSS_SHORT \
or self.trade_state == self.STOP_LOSS_TAKE_PROFIT_SHORT or self.trade_state == self.STOP_LOSS_TAKE_PROFIT_LONG \
or self.trade_state == self.DROP_TRIGGER_LONG or self.trade_state == self.DROP_TRIGGER_SHORT:
# --- handle EXIT_2 / STOP_LOSS (start)
# 100% confidence on exit
self.trade_confidence = 1
# reset semafors
self.drop_all_semafors()
# drop trend direction
self.trade_direction_flattening()
if self.trade_state == self.EXIT_2_LONG \
or self.trade_state == self.STOP_LOSS_LONG \
or self.trade_state == self.TREND_REVERSION_LONG_TO_SHORT \
or self.trade_state == self.STOP_LOSS_TAKE_PROFIT_LONG \
or self.trade_state == self.DROP_TRIGGER_LONG:
# call Long-Strategy
self.strategy.OnFinishTrade_Long(data, self)
# Log state Event
self.trade_action_noinsight_nobenchmark("FINISH_TRADE_LONG")
if self.trade_state == self.EXIT_2_SHORT \
or self.trade_state == self.STOP_LOSS_SHORT \
or self.trade_state == self.TREND_REVERSION_SHORT_TO_LONG \
or self.trade_state == self.STOP_LOSS_TAKE_PROFIT_SHORT \
or self.trade_state == self.DROP_TRIGGER_SHORT:
# call Short-Strategy
self.strategy.OnFinishTrade_Short(data, self)
# Log state Event
self.trade_action_noinsight_nobenchmark("FINISH_TRADE_SHORT")
# reset levels & machine-state
self.reset_state()
# reset levels
self.reset_all_levels()
# reset machine-state
self.trade_state = self.STAND_BY
# exit
return
# --- handle EXIT_2 / STOP_LOSS (end)
else:
return
def drop_all_semafors(self):
self.IsLevel_Trigger_Semafor = False
self.IsLevel_Entry_Semafor = False
self.IsLevel_Exit_1_Semafor = False
self.IsLevel_Exit_2_Semafor = False
self.IsLevel_StopLoss_Semafor = False
self.IsLevel_OrangeRisk_Semafor = False
def reset_all_levels(self):
self.exit2_level_steps_count = 0
self.trade_risk = 0
self.touch_level = 0
self.entry_level = 0
self.exit1_level = 0
self.exit2_level = 0
self.stop_loss_level = 0
self.stop_loss_purple_level = 0
self.TTG_trailing_trade_gain = 0
self.TLG_trailing_live_gain = 0
self.LLo_lowest_low_open = 0
self.LLc_lowest_low_close = 0
self.HHo_highest_high_open = 0
self.HHc_highest_high_close = 0
self.LVo_live_open = 0
self.LVc_live_close = 0
self.minor_delta = 0
self.major_delta = 0
self.delta_acceleration = 0
def activate_stand_by(self):
self.drop_all_semafors()
# reset live trade enabling semafor (idx = 0)
self.IsLevel_TradeEnabled_Semafors[0] = False
def activate_trigger_semafor(self):
self.drop_all_semafors()
self.IsLevel_Trigger_Semafor = True
def activate_entry_semafor(self):
self.drop_all_semafors()
self.IsLevel_Entry_Semafor = True
def activate_exit_1_semafor(self):
self.drop_all_semafors()
self.IsLevel_Exit_1_Semafor = True
def activate_exit_2_semafor(self):
self.drop_all_semafors()
self.IsLevel_Exit_2_Semafor = True
def activate_stop_loss_semafor(self):
self.drop_all_semafors()
self.IsLevel_StopLoss_Semafor = True
def activate_orange_risk_semafor(self):
self.drop_all_semafors()
self.IsLevel_OrangeRisk_Semafor = True
def Get_Trade_Scalp_Highest_High(self, rw_quote_bar, max_bars, touch_margins):
# ---------
result = 0
for i in range(max_bars):
if touch_margins == "open_close":
if result == 0 or result < rw_quote_bar[i].Open:
result = rw_quote_bar[i].Open
if result == 0 or result < rw_quote_bar[i].Close:
result = rw_quote_bar[i].Close
else:
if result == 0 or result < rw_quote_bar[i].High:
result = rw_quote_bar[i].High
return result
def Get_Trade_Scalp_Lowest_Low(self, rw_quote_bar, max_bars, touch_margins):
# ---------
result = 0
for i in range(max_bars):
if touch_margins == "open_close":
if result == 0 or result > rw_quote_bar[i].Open:
result = rw_quote_bar[i].Open
if result == 0 or result > rw_quote_bar[i].Close:
result = rw_quote_bar[i].Close
else:
if result == 0 or result > rw_quote_bar[i].Low:
result = rw_quote_bar[i].Low
return result
def Get_Live_Spread(self, rw_quote_bar, touch_margins):
# ---------
if touch_margins == "open_close":
return rw_quote_bar[0].Close - rw_quote_bar[0].Open
else:
return rw_quote_bar[0].High - rw_quote_bar[0].Low
def Flip_Trade_Direction(self):
# init
self.direction_trade = InsightDirection.Flat
if self.is_short_trade or self.is_long_trade:
# flip short/long
self.is_short_trade = not self.is_short_trade
self.is_long_trade = not self.is_long_trade
# calc direction trend
if self.is_short_trade:
self.direction_trade = InsightDirection.Down
if self.is_long_trade:
self.direction_trade = InsightDirection.Up
return self.direction_trade
def Check_Trade_Direction(self, ind_fast, ind_slow, o,h,l,c, tolerance = 0, touch_margins = "open_close"):
# ---------
# call base routine
base_result = super(FxRedBirdBrick_South, self).Check_Trade_Direction(ind_fast, ind_slow, o,h,l,c, tolerance, touch_margins)
self.direction_trade = base_result
# init
self.direction_trade = InsightDirection.Flat
# calc short/long flags
self.is_short_trade = ind_fast < ind_slow
self.is_long_trade = ind_fast > ind_slow
ind_diff = self.pip_diff_abs(ind_fast, ind_slow)
if ind_diff < tolerance:
self.is_short_trade = False
self.is_long_trade = False
else:
if touch_margins == "open_close":
self.is_short_trade = self.is_short_trade and o < self.anchor_slow_indicator_margin and c < self.anchor_slow_indicator_margin
self.is_long_trade = self.is_long_trade and o > self.anchor_fast_indicator_margin and c > self.anchor_fast_indicator_margin
else:
self.is_short_trade = self.is_short_trade and l < self.anchor_slow_indicator_margin and h < self.anchor_slow_indicator_margin#
self.is_long_trade = self.is_long_trade and l > self.anchor_fast_indicator_margin and h > self.anchor_fast_indicator_margin
# calc direction trend
if self.is_short_trade:
self.direction_trade = InsightDirection.Down
if self.is_long_trade:
self.direction_trade = InsightDirection.Up
return self.direction_trade
def IsLevel_TradeEnabled(self, i_fast, i_slow, o,h,l,c, tolerance = 0, touch_margins = "open_close"):
# ---------
self.Check_Trade_Direction(i_fast, i_slow, o,h,l,c, tolerance, touch_margins)
return self.is_short_trade or self.is_long_trade
def IsLevel_Reverse_Engulf(self, o_curr,h_curr,l_curr,c_curr, o_prior,h_prior,l_prior,c_prior, engulf_ratio = 0):
# ---------
# init
result = False
# Go!
if self.is_short_trade or self.is_long_trade:
# trade enabled
if self.is_long_trade:
# long trade
result = c_curr < o_curr and c_prior > o_prior
if result:
delta_curr = abs(o_curr - c_curr)
delta_prior = abs(c_prior - o_prior)
if delta_prior > 0:
result = delta_curr / delta_prior >= engulf_ratio
return result
if self.is_short_trade:
# long trade
result = c_curr > o_curr and c_prior < o_prior
if result:
delta_curr = abs(c_curr - o_curr)
delta_prior = abs(o_prior - c_prior)
if delta_prior > 0:
result = delta_curr / delta_prior >= engulf_ratio
return result
else:
# no trade
return result
def IsLevel_Trigger(self, si_fast, si_mid, si_slow, o,h,l,c, tolerance = 0, touch_margins = "open_close"):
# ---------
# init margins
self.Set_Scalp_Indicators_Margins(si_fast, si_mid, si_slow, tolerance)
im_fast = self.scalp_fast_indicator_margin
im_mid = self.scalp_mid_indicator_margin
im_slow = self.scalp_slow_indicator_margin
trigger_pips_up = self.qcAlgo.pullback_trigger_pips_up
trigger_pips_down = self.qcAlgo.pullback_trigger_pips_down
# Go!
if self.is_long_trade:
# long trade
if touch_margins == "open_close":
touch_level = c
else:
touch_level = l
if c < o:
is_level_trigger = touch_level < im_fast and touch_level > im_mid and touch_level > im_slow
if is_level_trigger:
self.touch_level = self.plus_pip(touch_level, trigger_pips_down)
return is_level_trigger
else:
return False
if self.is_short_trade:
# short trade
if touch_margins == "open_close":
touch_level = c
else:
touch_level = h
if c > o:
is_level_trigger = touch_level > im_fast and touch_level < im_mid and touch_level < im_slow
if is_level_trigger:
self.touch_level = self.plus_pip(touch_level, trigger_pips_up)
return is_level_trigger
else:
return False
else:
# no trade
return False
def IsLevel_Entry(self, level):
# ---------
# Go!
if self.is_long_trade:
return level > self.entry_level
if self.is_short_trade:
return level < self.entry_level
def IsLevel_Exit_1(self, level):
# ---------
# Go!
if self.is_long_trade:
return level > self.trade_break_even
if self.is_short_trade:
return level < self.trade_break_even
def calc_take_profit(self, level, trailing=False, take_profit_limit = 0.001):
# ---------
# init
limit_price = 0
result = False
live_si_fast = self.scalp_ema_current_value(8)
live_si_mid = self.scalp_ema_current_value(13)
live_si_slow = self.scalp_ema_current_value(21)
live_trend_down = live_si_fast < live_si_mid and live_si_mid < live_si_slow
live_trend_up = live_si_fast > live_si_mid and live_si_mid > live_si_slow
# Go!
if not trailing:
# init
if take_profit_limit > 1:
take_profit_limit = 1
else:
if take_profit_limit < 0:
take_profit_limit = 0
# Go!
if self.is_long_trade:
self.trade_take_profit_level = self.trade_start_level * (1 + abs(take_profit_limit))
result = level > self.trade_take_profit_level
else:
if self.is_short_trade:
self.trade_take_profit_level = self.trade_start_level * (1 - abs(take_profit_limit))
result = level < self.trade_take_profit_level
else:
result = False
limit_price = self.trade_take_profit_level
else:
delta_profit_guaranteed_part_percent = self.exit_1_profit_guaranteed / 100
delta_exit_1_with_guaranteed_profit = abs(self.trade_break_even - self.trade_abs_start_level) * delta_profit_guaranteed_part_percent
if self.is_long_trade:
result = level < self.stop_loss_level
if self.exit2_level_steps_count > 0 and self.exit2_level_steps_count > self.exit2_level_min_steps_count:
result = result or live_trend_down
if self.drawdown_on_trade_start and self.trade_abs_start_level > 0 and self.trade_break_even > 0:
GP_limit_exit_1 = self.trade_abs_start_level + delta_exit_1_with_guaranteed_profit
self.stop_loss_purple_level = GP_limit_exit_1
result = result or level < GP_limit_exit_1
# check trailing exit by pullback ratio
if self.RCT_pullback_trailing_stop_ratio > 0:
result = result or self.Is_Pullback_Trailing_Exit()
else:
if self.is_short_trade:
result = level > self.stop_loss_level
if self.exit2_level_steps_count > 0 and self.exit2_level_steps_count > self.exit2_level_min_steps_count:
result = result or live_trend_up
if self.drawdown_on_trade_start and self.trade_abs_start_level > 0 and self.trade_break_even > 0:
GP_limit_exit_1 = self.trade_abs_start_level - delta_exit_1_with_guaranteed_profit
self.stop_loss_purple_level = GP_limit_exit_1
result = result or level > GP_limit_exit_1
# check trailing exit by pullback ratio
if self.RCT_pullback_trailing_stop_ratio > 0:
result = result or self.Is_Pullback_Trailing_Exit()
else:
result = False
if self.is_long_trade:
self.trade_take_profit_level = self.trade_start_level * (1 + abs(take_profit_limit))
if self.is_short_trade:
self.trade_take_profit_level = self.trade_start_level * (1 - abs(take_profit_limit))
limit_price = self.trade_take_profit_level
# finally
if self.exit2_level_steps_count > 0:
self.exit2_level_steps_count = self.exit2_level_steps_count + 1
return result
def IsLevel_Exit_2(self, level, trailing=False, take_profit_limit = 0.001):
# ---------
result = self.calc_take_profit(level, trailing, take_profit_limit)
limit_price = self.trade_take_profit_level
# Update Profit-Target order
if not result:
self.update_take_profit(limit_price)
# finally
return result
def calc_stop_loss(self, level, stop_loss_limit = .05):
# ---------
# init
stop_loss_limit = self.toolbox.cut_by_zero_one(stop_loss_limit)
orange_risk = self.toolbox.normalize_percent_abs(self.qcAlgo.orange_risk)
red_orange_risk = self.toolbox.cut_by_zero_one(orange_risk + self.red_risk_shift_percent / 100)
# Go!
if self.is_long_trade:
if not self.IsLevel_OrangeRisk_Semafor:
self.trade_given_stop_loss = self.trade_start_level * (1 - abs(stop_loss_limit))
else:
self.trade_given_stop_loss = self.trade_start_level * (1 - abs(stop_loss_limit) * red_orange_risk)
result = level < self.trade_given_stop_loss
else:
if self.is_short_trade:
if not self.IsLevel_OrangeRisk_Semafor:
self.trade_given_stop_loss = self.trade_start_level * (1 + abs(stop_loss_limit))
else:
self.trade_given_stop_loss = self.trade_start_level * (1 + abs(stop_loss_limit) * red_orange_risk)
result = level > self.trade_given_stop_loss
else:
result = False
# finally
return result
def IsLevel_StopLoss(self, level, stop_loss_limit = .05):
# ---------
# init
stop_loss_limit = self.toolbox.cut_by_zero_one(stop_loss_limit)
# Go!
result = self.calc_stop_loss(level, stop_loss_limit)
limit_price = self.trade_given_stop_loss
# Update Stop-Loss order
if not result:
self.update_stop_loss(limit_price)
# finally
return result
def calc_orange_risk_level(self, level, stop_loss_limit = .05):
# ---------
# init
stop_loss_limit = self.toolbox.cut_by_zero_one(stop_loss_limit)
orange_risk = self.toolbox.normalize_percent_abs(self.qcAlgo.orange_risk)
# Go!
if self.is_long_trade:
self.trade_orange_risk_level = self.trade_start_level * (1 - abs(stop_loss_limit) * orange_risk)
result = level < self.trade_orange_risk_level
else:
if self.is_short_trade:
self.trade_orange_risk_level = self.trade_start_level * (1 + abs(stop_loss_limit) * orange_risk)
result = level > self.trade_orange_risk_level
else:
result = False
# finally
return result
def IsLevel_OrangeRisk(self, level, stop_loss_limit = .05):
# ---------
return self.calc_orange_risk_level(level, stop_loss_limit)
# * TRADE RULES/EVENTS REGION (CUSTOM / END) *
def new_scalp_indicators(self):
# init
result = {}
self.scalp_periods.append(8)
self.scalp_periods.append(13)
self.scalp_periods.append(21)
# go!
for period in self.scalp_periods:
ticker = self.scalp_ticker("ema", period)
result[ticker] = ExponentialMovingAverage(ticker, period)
return result
def new_anchor_indicators(self):
# init
result = {}
self.anchor_periods.append(8)
self.anchor_periods.append(21)
# go!
for period in self.anchor_periods:
ticker = self.anchor_ticker("ema", period)
result[ticker] = ExponentialMovingAverage(ticker, period)
return result
def update_scalp_indicator_chart(self):
# ---------
# call base update mathod
super(FxRedBirdBrick_South, self).update_scalp_indicator_chart()
# adding scalp indicator series
self.update_indicator_chart(self.scalp_chart, "scalp", "ema", self.scalp_periods)
def update_anchor_indicator_chart(self):
# adding anchor indicator series
self.update_indicator_chart(self.anchor_chart, "anchor", "ema", self.anchor_periods)from StrategyFxScalper import StrategyFxScalper
'''
To use this library place this at the top:
from StrategyFxTrendSurferWest import StrategyFxTrendSurferWest
Then instantiate the function:
strategy = StrategyFxRedBird_South(...)
strategy.Run(data)
'''
# ********************************
class StrategyFxRedBird_South(StrategyFxScalper):
# ********************************
# ****************************************************
# * LONG - TRADE RULES REGION (CUSTOM / BEGIN) *
# ****************************************************
# override
# -------------------------------------------------------------------------------------------------------------
def OnTrigger_Long(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnTrigger_Long(data, fxBrick)
# go! (custom part)
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
pass
# override
# -------------------------------------------------------------------------------------------------------------
def OnEntry_Long(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnEntry_Long(data, fxBrick)
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
# pay attention to the cancel events
#fxBrick.cancel_event_attention()
# override
# -------------------------------------------------------------------------------------------------------------
def OnTradeContinue_Long(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnTradeContinue_Long(data, fxBrick)
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
pass
# override
# -------------------------------------------------------------------------------------------------------------
def OnStopLoss_Long(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnStopLoss_Long(data, fxBrick)
# cancel opened takeprofit and/or stop loss orders
#fxBrick.cancel_limits()
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
# i.e.: if fxBrick.trade_state == fxBrick.TRIGGER_LONG: ... etc. for all states except of the STAND_BY-Step (if )
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
pass
# -------------------------------------------------------------------------------------------------------------
def OnExit_1_Long(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
# init
exit_trailing = self.qcAlgo.exit_strategy_trailing_exit
# calc rest order size
old_order_size = fxBrick.roundedOrderSize
close_rate = fxBrick.exit_1_rate
close_order_size = fxBrick.GetRoundedOrderSize(old_order_size * close_rate)
rest_order_size = old_order_size - close_order_size
fxBrick.roundedOrderSize = rest_order_size
# calc by PIP rounded price
take_profit = self.pip_round(fxBrick.trade_take_profit)
stop_loss = self.pip_round(fxBrick.stop_loss_level)
if close_order_size > 0:
# sell part of the equity (long part close)
self.qcAlgo.Sell(fxBrick.symbol, close_order_size)
#fxBrick.veto_cancel_orders()
# Update Take Profit order
# fxBrick.update_take_profit(take_profit, -rest_order_size)
'''
if exit_trailing:
# cancel take profit order in trailing mode
overrule_veto = True
fxBrick.cancel_take_profit(overrule_veto)
else:
# Update Take Profit order
fxBrick.update_take_profit(take_profit, -rest_order_size)
'''
# Update Stop Loss order
# fxBrick.update_stop_loss(stop_loss, -rest_order_size)
# -------------------------------------------------------------------------------------------------------------
def OnExit_2_Long(self, data, fxBrick, exit_trailing):
# -------------------------------------------------------------------------------------------------------------
# cancel opened takeprofit and/or stop loss orders
#fxBrick.cancel_limits()
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
# finally
self.OnExit_Long(data, fxBrick)
# override
# -------------------------------------------------------------------------------------------------------------
def OnStopLossAndTakeProfit_Long(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnStopLossAndTakeProfit_Long(data, fxBrick)
# cancel opened takeprofit and/or stop loss orders
#fxBrick.cancel_limits()
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
# i.e.: if fxBrick.trade_state == fxBrick.TRIGGER_LONG: ... etc. for all states except of the STAND_BY-Step (if )
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
pass
# override
# -------------------------------------------------------------------------------------------------------------
def OnExit_Long(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnExit_Long(data, fxBrick)
# go! (custom part)
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
# ignore cancel event
#fxBrick.cancel_event_ignore()
# override
# -------------------------------------------------------------------------------------------------------------
def OnFinishTrade_Long(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnFinishTrade_Long(data, fxBrick)
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
# ignore cancel event
#fxBrick.cancel_event_ignore()
# *********************************************************
# * LONG - TRADE RULES REGION TRADE RULES (CUSTOM / END) *
# *********************************************************
# ****************************************************
# * SHORT - TRADE RULES REGION (CUSTOM / BEGIN) *
# ****************************************************
# override
# -------------------------------------------------------------------------------------------------------------
def OnTrigger_Short(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnTrigger_Short(data, fxBrick)
# go! (custom part)
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
pass
# override
# -------------------------------------------------------------------------------------------------------------
def OnEntry_Short(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnEntry_Short(data, fxBrick)
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
# pay attention to the cancel events
#fxBrick.cancel_event_attention()
# -------------------------------------------------------------------------------------------------------------
def OnTradeContinue_Short(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnTradeContinue_Short(data, fxBrick)
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
pass
# override
# -------------------------------------------------------------------------------------------------------------
def OnStopLoss_Short(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnStopLoss_Short(data, fxBrick)
# cancel opened takeprofit and/or stop loss orders
#fxBrick.cancel_limits()
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
pass
# -------------------------------------------------------------------------------------------------------------
def OnExit_1_Short(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# init
exit_trailing = self.qcAlgo.exit_strategy_trailing_exit
# calc rest order size
old_order_size = fxBrick.roundedOrderSize
close_rate = fxBrick.exit_1_rate
close_order_size = fxBrick.GetRoundedOrderSize(old_order_size * close_rate)
rest_order_size = old_order_size - close_order_size
fxBrick.roundedOrderSize = rest_order_size
# calc by PIP rounded price
take_profit = self.pip_round(fxBrick.trade_take_profit)
stop_loss = self.pip_round(fxBrick.stop_loss_level)
# buy part of the equity (short part close)
if close_order_size > 0:
self.qcAlgo.Buy(fxBrick.symbol, close_order_size)
#fxBrick.veto_cancel_orders()
# Update Take Profit order
#fxBrick.update_take_profit(take_profit, rest_order_size)
'''
if exit_trailing:
# cancel take profit order in trailing mode
overrule_veto = True
fxBrick.cancel_take_profit(overrule_veto)
else:
# Update Take Profit order
fxBrick.update_take_profit(take_profit, rest_order_size)
'''
# Update Stop Loss order
#fxBrick.update_stop_loss(stop_loss, rest_order_size)
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
# -------------------------------------------------------------------------------------------------------------
def OnExit_2_Short(self, data, fxBrick, exit_trailing):
# -------------------------------------------------------------------------------------------------------------
# cancel opened takeprofit and/or stop loss orders
#fxBrick.cancel_limits()
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
# finally
self.OnExit_Short(data, fxBrick)
# override
# -------------------------------------------------------------------------------------------------------------
def OnStopLossAndTakeProfit_Short(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnStopLossAndTakeProfit_Short(data, fxBrick)
# cancel opened takeprofit and/or stop loss orders
#fxBrick.cancel_limits()
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
# i.e.: if fxBrick.trade_state == fxBrick.TRIGGER_LONG: ... etc. for all states except of the STAND_BY-Step (if )
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
pass
# override
# -------------------------------------------------------------------------------------------------------------
def OnExit_Short(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnExit_Short(data, fxBrick)
# go! (custom part)
# ignore cancel event
#fxBrick.cancel_event_ignore()
# override
# -------------------------------------------------------------------------------------------------------------
def OnFinishTrade_Short(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnFinishTrade_Short(data, fxBrick)
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
# ignore cancel event
#fxBrick.cancel_event_ignore()
# *********************************************************
# * SHORT - TRADE RULES REGION TRADE RULES (CUSTOM / END) *
# *********************************************************
# ******************************************************
# * TRADE-ACTION RULES REGION TRADE RULES (BASE / BEGIN) *
# ******************************************************
# override
# -------------------------------------------------------------------------------------------------------------
def OnTradeOpen_Long(self, data, fxBrick, set_limits=True):
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnTradeOpen_Long(data, fxBrick, set_limits)
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
pass
# override
# -------------------------------------------------------------------------------------------------------------
def OnTradeOpen_Short(self, data, fxBrick, set_limits=True):
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnTradeOpen_Short(data, fxBrick, set_limits)
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
pass
# override
# -------------------------------------------------------------------------------------------------------------
def OnTradeClose_Long(self, data, fxBrick, closeRatio = 1):
# closeRatio = 1 - close all holdings, 0.5 - close 50% of holdings, 0 - nothing to close
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnTradeClose_Long(data, fxBrick, closeRatio)
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
pass
# override
# -------------------------------------------------------------------------------------------------------------
def OnTradeClose_Short(self, data, fxBrick, closeRatio = 1):
# closeRatio = 1 - close all holdings, 0.5 - close 50% of holdings, 0 - nothing to close
# -------------------------------------------------------------------------------------------------------------
# call base method
super(StrategyFxRedBird_South, self).OnTradeClose_Short(data, fxBrick, closeRatio)
# go! (custom part)
# TODO 1: Execute Rule
# TODO 2: Plot Data if executed
# Attention!!! Check "Brick-State" first (fxBrick.trade_state) in order to decide what to do next
pass
# ******************************************************
# * TRADE-ACTION RULES REGION TRADE RULES (BASE / END) *
# ******************************************************
# custom / override
# *********************************
# * PLOT REGION (BEGIN) *
# *********************************
# -------------------------------------------------------------------------------------------------------------
def plot_scalp_chart(self, fxBrick, bar):
# -------------------------------------------------------------------------------------------------------------
self.init_scalp_indicators(fxBrick)
self.plot_price_chart(fxBrick.scalp_chart.Name, fxBrick, bar)
self.plot(fxBrick.scalp_chart.Name, self.scalp_ema_8_ticker, self.scalp_ema_8.Current.Value)
self.plot(fxBrick.scalp_chart.Name, self.scalp_ema_13_ticker, self.scalp_ema_13.Current.Value)
self.plot(fxBrick.scalp_chart.Name, self.scalp_ema_21_ticker, self.scalp_ema_21.Current.Value)
# custom / override
# -------------------------------------------------------------------------------------------------------------
def plot_anchor_chart(self, fxBrick, bar):
# -------------------------------------------------------------------------------------------------------------
self.init_anchor_indicators(fxBrick)
self.plot_price_chart(fxBrick.anchor_chart.Name, fxBrick, bar)
self.plot(fxBrick.anchor_chart.Name, self.anchor_ema_8_ticker, self.anchor_ema_8.Current.Value)
self.plot(fxBrick.anchor_chart.Name, self.anchor_ema_21_ticker, self.anchor_ema_21.Current.Value)
# *********************************
# * PLOT REGION (END) *
# *********************************
# -------------------------------------------------------------------------------------------------------------
def init_scalp_indicators(self, fxBrick):
# -------------------------------------------------------------------------------------------------------------
self.scalp_ema_8_ticker = fxBrick.scalp_ticker("ema",8)
self.scalp_ema_13_ticker = fxBrick.scalp_ticker("ema",13)
self.scalp_ema_21_ticker = fxBrick.scalp_ticker("ema",21)
self.scalp_ema_8 = fxBrick.scalpIndicators[self.scalp_ema_8_ticker]
self.scalp_ema_13 = fxBrick.scalpIndicators[self.scalp_ema_13_ticker]
self.scalp_ema_21 = fxBrick.scalpIndicators[self.scalp_ema_21_ticker]
# -------------------------------------------------------------------------------------------------------------
def init_anchor_indicators(self, fxBrick):
# -------------------------------------------------------------------------------------------------------------
self.anchor_ema_8_ticker = fxBrick.anchor_ticker("ema",8)
self.anchor_ema_21_ticker = fxBrick.anchor_ticker("ema",21)
self.anchor_ema_8 = fxBrick.anchorIndicators[self.anchor_ema_8_ticker]
self.anchor_ema_21 = fxBrick.anchorIndicators[self.anchor_ema_21_ticker]
# -------------------------------------------------------------------------------------------------------------
def init_indicators(self, fxBrick):
# -------------------------------------------------------------------------------------------------------------
self.init_scalp_indicators(fxBrick)
self.init_anchor_indicators(fxBrick)
# custom (override)
# -------------------------------------------------------------------------------------------------------------
def RunFxScalp(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# base
# call base run
super(StrategyFxRedBird_South, self).RunFxScalp(data, fxBrick)
# Go!
# update and plot ema crosses - scalp
self.UpdateScalpCross(data, fxBrick, True)
# finally
# -------------------------------------------------------------------------------------------------------------
def UpdateScalpCross(self, data, fxBrick, doPlot = True):
# -------------------------------------------------------------------------------------------------------------
if self.qcAlgo.plot_ema_cross:
# update EMA labels - scalp
cross_scalp = fxBrick.IsNarrowScalpEMA(self.scalp_ema_8.Current.Value, self.scalp_ema_21.Current.Value)
# plot scalp EMA if cross event happened
if doPlot and cross_scalp and fxBrick.IsChartHour(data.Time):
scalp_ema_mean = (self.scalp_ema_8.Current.Value + self.scalp_ema_21.Current.Value) / 2
self.plot_scalp_ema_cross(fxBrick, scalp_ema_mean)
#self.plot_scalp_ema_cross(fxBrick, self.scalp_ema_8.Current.Value)
# custom (override)
# -------------------------------------------------------------------------------------------------------------
def RunFxAnchor(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# call base run
super(StrategyFxRedBird_South, self).RunFxAnchor(data, fxBrick)
# update and plot ema crosses - anchor
self.UpdateAnchorCross(data, fxBrick, True)
# Go!
pass
# -------------------------------------------------------------------------------------------------------------
def UpdateAnchorCross(self, data, fxBrick, doPlot = True):
# -------------------------------------------------------------------------------------------------------------
if self.qcAlgo.plot_ema_cross:
# update EMA labels - anchor
cross_anchor = fxBrick.IsNarrowAnchorEMA(self.anchor_ema_8.Current.Value, self.anchor_ema_21.Current.Value)
# plot anchor EMA if cross event happened
if doPlot and cross_anchor and fxBrick.IsChartHour(data.Time):
anchor_ema_mean = (self.anchor_ema_8.Current.Value + self.anchor_ema_21.Current.Value) / 2
self.plot_anchor_ema_cross(fxBrick, anchor_ema_mean)
#self.plot_anchor_ema_cross(fxBrick, self.anchor_ema_8.Current.Value)
# custom (override)
# -------------------------------------------------------------------------------------------------------------
def RunFx(self, data, fxBrick):
# -------------------------------------------------------------------------------------------------------------
# init
self.init_indicators(fxBrick)
# call base run
super(StrategyFxRedBird_South, self).RunFx(data, fxBrick)
# Go!
# check if all indicators are ready
if not fxBrick.all_indicators_ready:
return
# validation if data contains ticker
#if not data.ContainsKey(fxBrick.symbol):
# return
# ********** BEGIN: Strategy ******************
# update and plot ema crosses - anchor
#self.UpdateAnchorCross(data, fxBrick, True)
#self.UpdateScalpCross(data, fxBrick, True)
p1 = 0.67699
p2 = 0.65027
i = self.qcAlgo.pip_diff(p1, p2)
j = self.qcAlgo.add_pip(p2, -i/2)
return
'''
# go!
stopLoss = atr.Current.Value * 0.1
profitTarget = atr.Current.Value * 0.15
currentPrice = data[fxBrick.symbol].Price
signal_macd = macd.Signal.Current.Value
delta = (macd.Current.Value - signal_macd) / macd.Fast.Current.Value
#if not self.Portfolio.Invested \
if macd.Current.Value > 0 \
and macd.Current.Value > macd.Signal.Current.Value \
and rsi.Current.Value > 0.7 \
and bb.UpperBand.Current.Value > fxBrick.bbUpperPrevious:
stopLossPrice = currentPrice - stopLoss
profitTargetPrice = currentPrice + profitTarget
limitPrice = bb.UpperBand.Current.Value
# if not self.Portfolio.Invested:
#if self.buy_flag < 1:
# --------------------------------
self.buy_flag = 1
self.sell_flag = 0
# def emit_insight(self, time_delta, direction, magnitude=None, confidence=None, model=None, weight=None):
fxBrick.emit_insight(self.scalp_period, InsightDirection.Up, signal_macd, delta, "Forex Scalp Sampler 5 min (UP)", .7)
#self.qcAlgo.SetHoldings(fxBrick.symbol, self.fxHoldingCoeff)
#self.qcAlgo.LimitOrder(fxBrick.symbol, trade_units, profitTargetPrice)
#self.qcAlgo.StopMarketOrder(fxBrick.symbol, trade_units, stopLossPrice)
#self.qcAlgo.SetHoldings(fxBrick.symbol, 0.9)
self.qcAlgo.Buy(fxBrick.symbol, trade_units)
self.qcAlgo.LimitOrder(fxBrick.symbol, -trade_units, profitTargetPrice)
self.qcAlgo.StopMarketOrder(fxBrick.symbol, -trade_units, stopLossPrice)
# --------------------------------
else:
stopLossPrice = currentPrice - stopLoss
profitTargetPrice = currentPrice + profitTarget
limitPrice = bb.LowerBand.Current.Value
# if self.Portfolio.Invested:
#if self.sell_flag < 1:
# --------------------------------
self.buy_flag = 0
self.sell_flag = 1
fxBrick.emit_insight(self.scalp_period, InsightDirection.Down, signal_macd, delta, "Forex Scalp Sampler 5 min (DOWN)", .7)
#self.qcAlgo.SetHoldings(fxBrick.symbol, -self.fxHoldingCoeff)
#self.qcAlgo.LimitOrder(fxBrick.symbol, -trade_units, profitTargetPrice)
#self.qcAlgo.StopMarketOrder(fxBrick.symbol, -trade_units, stopLossPrice)
#self.qcAlgo.SetHoldings(fxBrick.symbol, 0)
self.qcAlgo.Sell(fxBrick.symbol, trade_units)
self.qcAlgo.LimitOrder(fxBrick.symbol, -trade_units, profitTargetPrice)
self.qcAlgo.StopMarketOrder(fxBrick.symbol, -trade_units, stopLossPrice)
# --------------------------------
fxBrick.updated_bb_prev()
'''
# ********** END: Test-Strategy ******************'''
To use this library place this at the top:
from ForexTrendBase import ForexTrendBase
Then define a fx strategy class
class ForexCustomStrategy(ForexTrendBase):
...
'''
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Indicators")
AddReference("QuantConnect.Common")
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Indicators import *
from datetime import datetime
# from Risk.MaximumDrawdownPercentPerSecurity import MaximumDrawdownPercentPerSecurity
from ForexGalaxyBay import ForexGalaxyBay
from AlgoToolbox import AlgoToolbox
import datetime
# *************************************
class ForexTrendBase(QCAlgorithm):
# *************************************
# -------------------------------------------------------------------------------------------------------------
def Initialize(self):
# -------------------------------------------------------------------------------------------------------------
# init
self.csv_sep_chr = ";"
#self.last_week = 0
#self.last_year = 0
#self.allow_strategy_run = False
# test shift
# self.min_scalp_shift = 2
# self.min_scalp_shift_offset = 1
# galaxy of bricks
self.selected_galaxy = None
# main shift!
self.min_scalp_shift = 1 # main!
self.min_scalp_shift_offset = 0
self.show_debug = self.get_bool_param("show_debug", False, False)
self.show_log = self.get_bool_param("show_log", False, False)
self.toolbox = AlgoToolbox(self, self.show_debug, self.show_log)
self.toolbox.show_log("ENVIRONMENT:")
self.toolbox.show_log("========================================")
self.resolution = Resolution.Minute
self.anchorResolution = Resolution.Hour
# set Start/End Dates/Hours for main Algo and Charts
self.setDateMargins()
self.anchor_period = self.get_int_param("anchor_period")
self.min_anchor_count = self.anchor_period - 1 #self.anchor_period
self.scalp_period = self.get_int_param("scalp_period")
self.min_scalp_count = self.scalp_period - self.min_scalp_shift #self.scalp_period
self.anchor_max_slots = self.get_int_param("anchor_max_slots")
self.scalp_max_slots = self.get_int_param("scalp_max_slots")
self.chart_disable = self.get_bool_param("chart_disable")
self.plot_ema_cross = False #self.get_bool_param("plot_ema_cross")
#self.trade_units = self.get_int_param("trade_units")
self.bar_touch_margins = self.get_str_param("bar_touch_margins")
# delta settings
self.anchor_delta_min_pips = self.get_int_param("anchor_delta_min_pips")
self.scalp_delta_min_pips = self.get_int_param("scalp_delta_min_pips")
# trigger settings
self.touch_trigger_pip_tolerance = self.get_float_param("touch_trigger_pip_tolerance")
self.pullback_trigger_pips_up = self.get_float_param("pullback_trigger_pips_up")
self.pullback_trigger_pips_down = self.get_float_param("pullback_trigger_pips_down")
# confidence properties
try:
self.confidence_trade_enabled_main = self.get_float_param("confidence_te_0")
self.confidence_trade_enabled_hist = self.get_float_param("confidence_te_hist")
self.confidence_indicator_delta_anchor = self.get_float_param("confidence_ind_a")
self.confidence_indicator_delta_scalp = self.get_float_param("confidence_ind_s")
except:
self.confidence_trade_enabled_main = self.get_float_param("confid_te_0")
self.confidence_trade_enabled_hist = self.get_float_param("confid_te_hist")
self.confidence_indicator_delta_anchor = self.get_float_param("confid_ind_a")
self.confidence_indicator_delta_scalp = self.get_float_param("confid_ind_s")
# get "orange risk"-level
try:
self.orange_risk = self.get_float_param("orange_risk")
except:
self.orange_risk = 100
self.exit_1_rate = self.get_int_param("exit_1_rate") / 100
self.exit_1_close = self.exit_1_rate > 0
self.exit_strategy_trailing_exit = self.get_bool_param("trailing")
self.exit_strategy_break_even_exit = not self.exit_strategy_trailing_exit
self.anchor_lookback_for_trade_enabled = self.get_int_param("anchor_lookback_trade_enabled")
self.direction_lookback = self.get_bool_param("direction_lookback")
self.indicator_lookback = self.get_bool_param("indicator_lookback")
self.drawdown_on_trade_start = self.get_bool_param("drawdown_on_trade_start")
self.exit_1_profit_guaranteed = self.get_int_param("exit1_garant")
self.trailing_exit_stop_loss_bars = self.get_int_param("trailing_exit_stop_loss_bars")
self.engulf = self.get_bool_param("engulf")
if self.engulf:
self.engulf_ratio = self.get_float_param("engulf")
else:
self.engulf_ratio = 0
self.reverse_trade = self.get_bool_param("reverse_trade")
# to show DEBUG-Message: self.toolbox.show_debug(<message>,<prefix>="",<suffix>="")
# to show LOG-Message: self.toolbox.show_log(<message>,<prefix>="",<suffix>="")
self.toolbox = AlgoToolbox(self, self.show_debug, self.show_log)
# trade settings
self.SetCash(self.get_int_param("cash_book"))
self.forex_leverage = self.get_float_param("forex_leverage")
self.trade_magnitude_amplifier = self.get_float_param("trade_magnitude_amplifier")
self.emit_trades = self.get_bool_param("emit_trades")
self.emit_insights = self.get_bool_param("emit_insights")
self.emit_info_insights = self.get_bool_param("emit_info_insights")
self.cash_reserve_ratio = self.get_int_param("cash_reserve_ratio_percent") / 100
self.min_cash_reserve_ratio = self.get_int_param("min_cash_reserve_ratio_percent") / 100
self.max_cash_reserve_ratio = self.get_int_param("max_cash_reserve_ratio_percent") / 100
self.take_profit = self.get_float_param("take_profit_percent") / 100
self.stop_loss = self.get_float_param("stop_loss_percent") / 100
# max 50% of cash reserve
if self.cash_reserve_ratio > self.max_cash_reserve_ratio:
self.cash_reserve_ratio = self.max_cash_reserve_ratio
else:
# min 10% of cash reserve
if self.cash_reserve_ratio < self.min_cash_reserve_ratio:
self.cash_reserve_ratio = self.min_cash_reserve_ratio
self.cash_max_ratio = 1 - self.cash_reserve_ratio
# drawdown = self.get_float_param("max_drawdown_pps")
# self.SetRiskManagement(MaximumDrawdownPercentPerSecurity(drawdown))
self.AddRiskManagement(NullRiskManagementModel())
# setup forex brokerages
self.Broker = self.get_str_param("Broker")
if self.Broker == "FXCM":
self.SetBrokerageModel(BrokerageName.FxcmBrokerage)
self.Market = Market.FXCM
else:
self.SetBrokerageModel(BrokerageName.OandaBrokerage)
self.Market = Market.Oanda
# create fx galaxy of major forex booster pairs
self.forex_galaxy = self.selectGalaxyStars()
# create fx sample strategy
self.forex_strategy = self.selectStrategy()
# assign strategy to all stars in galaxy
self.fusionGalaxyStrategy()
self.SetWarmUp(timedelta(self.get_int_param("warmup_days")))
self.show_csv_header = True # self.get_bool_param("show_csv_header")
self.toolbox.show_log("========================================")
# show csv-header if any
if self.show_csv_header:
self.toolbox.show_csv_log(self.csv_sep_chr \
+ "Trade-Time" + self.csv_sep_chr \
+ "Currency-Pair" + self.csv_sep_chr \
+ "Pair-Weight" + self.csv_sep_chr \
+ "Data-Time" + self.csv_sep_chr \
+ "Trade-Type" + self.csv_sep_chr \
+ "Trend-Direction" + self.csv_sep_chr \
+ "Model" + self.csv_sep_chr \
+ "Confidence" + self.csv_sep_chr \
+ "Magnitude" + self.csv_sep_chr \
+ "Current-Level" + self.csv_sep_chr \
+ "Entry-Level" + self.csv_sep_chr \
+ "StopLoss-Level" + self.csv_sep_chr \
+ "StopLoss-Purple" + self.csv_sep_chr \
+ "Trade-Risk" + self.csv_sep_chr \
+ "Break-Even" + self.csv_sep_chr \
+ "Trade-Exit" + self.csv_sep_chr \
+ "Trade-TakeProfit" + self.csv_sep_chr \
+ "Start-Level" + self.csv_sep_chr \
+ "Orange Risk-Level" + self.csv_sep_chr \
+ "Given StopLoss" + self.csv_sep_chr \
+ "Given TakeProfit" + self.csv_sep_chr \
+ "Symbol Last Trade Profit" + self.csv_sep_chr \
+ "Symbol unrealized Profit" + self.csv_sep_chr \
+ "Symbol unrealized Profit %" + self.csv_sep_chr \
+ "Portfolio Cash" + self.csv_sep_chr \
+ "Portfolio Total-Profit" + self.csv_sep_chr \
+ "Portfolio Total unrealized Profit")
self.toolbox.show_log("========================================")
# Optional: Be notified when securities change
def OnSecuritiesChanged(self, changes):
pass
# -----------------------------------------------------------------------------------------------------------
def get_bool_param(self, param, default=False, show_log=True):
# -----------------------------------------------------------------------------------------------------------
result = bool(self.get_parameter(param, default))
if show_log:
self.log_param(param, result)
return result
# -----------------------------------------------------------------------------------------------------------
def get_int_param(self, param, default=0, show_log=True):
# -----------------------------------------------------------------------------------------------------------
result = int(self.get_parameter(param, default))
if show_log:
self.log_param(param, result)
return result
# -----------------------------------------------------------------------------------------------------------
def get_float_param(self, param, default=0.0, show_log=True):
# -----------------------------------------------------------------------------------------------------------
result = float(self.get_parameter(param, default))
if show_log:
self.log_param(param, result)
return result
# -----------------------------------------------------------------------------------------------------------
def get_str_param(self, param, default="", show_log=True):
# -----------------------------------------------------------------------------------------------------------
result = self.get_parameter(param, default)
if show_log:
self.log_param(param, result)
return result
# -----------------------------------------------------------------------------------------------------------
def get_parameter(self, param, default = None):
# -----------------------------------------------------------------------------------------------------------
result = str(default)
try:
result = self.GetParameter(param)
except:
err_msg = param + " not found!"
self.toolbox.show_log(err_msg)
self.Debug(err_msg)
return result
# -----------------------------------------------------------------------------------------------------------
def log_param(self, param, value):
# -----------------------------------------------------------------------------------------------------------
self.toolbox.show_log(param + "=" + str(value))
# -----------------------------------------------------------------------------------------------------------
def setDateMargins(self):
# -----------------------------------------------------------------------------------------------------------
# set start date
start_date = self.toolbox.get_date(self.get_str_param("start_date"),"-")
if start_date[0] > 0:
self.SetStartDate(start_date[0], start_date[1], start_date[2])
# set end date of not empty
end_date = self.toolbox.get_date(self.get_str_param("end_date"),"-")
if end_date[0] > 0:
self.SetEndDate(end_date[0], end_date[1], end_date[2])
# set chart start date
chart_date_from = self.toolbox.get_date(self.get_str_param("chart_date_from"),"-")
if chart_date_from[0] > 0:
self.ChartStartDate = datetime(chart_date_from[0], chart_date_from[1], chart_date_from[2])
else:
self.ChartStartDate = self.StartDate
# set chart start hour
chart_hour_from = self.get_str_param("chart_hour_from")
if chart_hour_from:
self.ChartStartHour = int(chart_hour_from)
else:
self.ChartStartHour = 0
# set chart end date
chart_date_to = self.toolbox.get_date(self.get_str_param("chart_date_to"),"-")
if chart_date_to[0] > 0:
self.ChartEndDate = datetime(chart_date_to[0], chart_date_to[1], chart_date_from[2])
else:
self.ChartEndDate = self.EndDate
# set chart end hour
chart_hour_to = self.get_str_param("chart_hour_to")
if chart_hour_to:
self.ChartEndHour = int(chart_hour_to)
else:
self.ChartEndHour = 0
# adjust chart end/start dates
if self.ChartStartDate.replace(tzinfo=None) < self.StartDate.replace(tzinfo=None):
self.ChartStartDate = self.StartDate
if self.ChartEndDate.replace(tzinfo=None) > self.EndDate.replace(tzinfo=None):
self.ChartEndDate = self.EndDate
if self.ChartStartDate.replace(tzinfo=None) > self.ChartEndDate.replace(tzinfo=None):
self.ChartEndDate = self.ChartStartDate
# -----------------------------------------------------------------------------------------------------------
def selectGalaxy(self, galaxy_type):
# -----------------------------------------------------------------------------------------------------------
galaxy = ForexGalaxyBay()
return galaxy.selectGalaxy(galaxy_type)
# base (override)
# -----------------------------------------------------------------------------------------------------------
def selectStrategy(self):
# -----------------------------------------------------------------------------------------------------------
return None
# base (override)
# -----------------------------------------------------------------------------------------------------------
def selectGalaxyStars(self):
# -----------------------------------------------------------------------------------------------------------
return None
# -----------------------------------------------------------------------------------------------------------
def fusionGalaxyStrategy(self):
# -----------------------------------------------------------------------------------------------------------
for fxTickerStar in self.forex_galaxy.values():
fxTickerStar.strategy = self.forex_strategy
# -------------------------------------------------------------------------------------------------------------
def OnData(self, data):
# -------------------------------------------------------------------------------------------------------------
# init
self.last_data = data # saving last data
# Go!
if not self.IsWarmingUp:
# run each time
self.forex_strategy.Run(data)
# run on scalp edge (call Srategy Scalp-Run via Main-Algo)
if self.scalp_time_elapsed():
# pass
self.forex_strategy.Run(data, True)
# run on anchor edge (call Srategy Anchor-Run via Main-Algo)
if self.anchor_time_elapsed():
# pass
self.forex_strategy.Run(data, False, True)
else:
return
# -------------------------------------------------------------------------------------------------------------
def OnOrderEvent(self, orderEvent):
# -------------------------------------------------------------------------------------------------------------
order = self.Transactions.GetOrderById(orderEvent.OrderId)
if order.Status == OrderStatus.Filled:
if order.Type == OrderType.Limit or order.Type == OrderType.StopMarket:
self.Transactions.CancelOpenOrders(order.Symbol)
if order.Status == OrderStatus.Canceled:
self.Log("Storno Order: " + str(orderEvent))
#--------------------------------------------------
# -----------------------------------------------------------------------------------------------------------
def scalp_time_elapsed(self):
# -----------------------------------------------------------------------------------------------------------
# exit if not end of anchor time frame
count = self.min_scalp_count + self.min_scalp_shift - self.min_scalp_shift_offset
if count == self.scalp_period:
self.min_scalp_count = 0
return True
else:
self.min_scalp_count = count
return False
# -----------------------------------------------------------------------------------------------------------
def anchor_time_elapsed(self):
# -----------------------------------------------------------------------------------------------------------
# exit if not end of anchor time frame
count = self.min_anchor_count + 1
if count == self.anchor_period:
self.min_anchor_count = 0
return True
else:
self.min_anchor_count = count
return False
# -------------------------------------------------------------------------------------------------------------
def pip_diff(self, open, close):
# -------------------------------------------------------------------------------------------------------------
return self.toolbox.pip_diff(open, close)
# -------------------------------------------------------------------------------------------------------------
def add_pip(self, price, pip):
# -------------------------------------------------------------------------------------------------------------
return self.toolbox.add_pip(price, pip)# *************************************
class ForexGalaxyBay:
# *************************************
'''
To use this library place this at the top:
from ForexGalaxyBay import ForexGalaxyBay
Then instantiate the function:
fx = ForexGalaxyBay()
for fx_ticker in fx.selectGalaxy("MajorTen"):
self.AddForex(fx_ticker, resolution, market)
...
'''
# ----------------------------------------------------------------------------
def __init__(self):
# ----------------------------------------------------------------------------
# Currency-Pair Symbol: Weight
self.fxPairs_MajorForexBooster_Orig = { \
"EURUSD": .27
,"USDJPY": .13
,"GBPUSD": .11
,"AUDUSD": .09
,"USDCAD": .06
,"USDCHF": .06
,"NZDUSD": .04
,"EURJPY": .04
,"GBPJPY": .04
,"EURGBP": .03
,"AUDJPY": .03
,"EURAUD": .02
,"EURCAD": .02
,"AUDCAD": .02
,"USDHKD": .01
,"EURNZD": .01
,"AUDNZD": .01
,"USDMXN": .01
}
self.fxPairs_MajorForexBooster = { \
"EURUSD": .27
,"USDJPY": .13
,"GBPUSD": .11
,"AUDUSD": .09
,"USDCAD": .06
,"USDCHF": .06
,"NZDUSD": .04
,"EURJPY": .04
,"GBPJPY": .04
,"EURGBP": .03
,"AUDJPY": .03
,"EURAUD": .02
,"EURCAD": .02
,"AUDCAD": .02
,"USDHKD": .01
,"EURNZD": .01
,"AUDNZD": .01
,"USDMXN": .01
}
self.fxPairs_MajorForexBooster_Equal = { \
"EURUSD": .05
,"USDJPY": .05
,"GBPUSD": .05
,"AUDUSD": .05
,"USDCAD": .05
,"USDCHF": .05
,"NZDUSD": .05
,"EURJPY": .05
,"GBPJPY": .05
,"EURGBP": .05
,"AUDJPY": .05
,"EURAUD": .05
,"EURCAD": .05
,"AUDCAD": .05
,"USDHKD": .05
,"EURNZD": .05
,"AUDNZD": .05
,"USDMXN": .05
,"CADJPY": .05
,"CADHKD": .05
}
self.fxPairs_MajorForexBooster_Cool = { \
"EURUSD": .27
,"USDJPY": .13
,"GBPUSD": .11
,"AUDUSD": .09
,"USDCAD": .06
,"USDCHF": .06
,"NZDUSD": .04
,"EURJPY": .04
,"GBPJPY": .04
,"EURGBP": .03
,"AUDJPY": .03
,"EURAUD": .02
,"EURCAD": .02
,"AUDCAD": .02
,"USDHKD": .01
,"EURNZD": .01
,"AUDNZD": .01
,"EURMXN": .01
}
self.fxPairs_MajorForexBooster_Cool_Variant = { \
"EURUSD": .27
,"USDJPY": .12
,"GBPUSD": .12
,"AUDUSD": .11
,"USDCAD": .06
,"USDCHF": .06
,"NZDUSD": .04
,"EURJPY": .04
,"GBPJPY": .04
,"EURGBP": .03
,"AUDJPY": .03
,"EURAUD": .02
,"EURCAD": .02
,"AUDCAD": .02
,"USDHKD": .01
,"EURNZD": .01
,"AUDNZD": .01
#"USDMXN": .01
}
self.fxPairs_MajorForexBooster_Dude = { \
"EURUSD": .27
,"USDJPY": .13
,"GBPUSD": .11
,"AUDUSD": .10
,"USDCAD": .07
,"USDCHF": .07
,"NZDUSD": .05
,"EURGBP": .04
,"EURAUD": .03
,"EURCAD": .03
,"AUDCAD": .03
,"EURNZD": .02
,"AUDNZD": .02
,"USDMXN": .02
,"USDHKD": .01
}
self.fxPairs_MajorForexBooster_BigDude = { \
"EURUSD": .23
,"USDJPY": .11
,"GBPUSD": .10
,"AUDUSD": .10
,"USDMXN": .10
,"USDCAD": .07
,"USDCHF": .07
,"EURCAD": .04
,"AUDCAD": .04
,"NZDUSD": .04
,"EURGBP": .03
,"EURAUD": .02
,"EURNZD": .02
,"AUDNZD": .02
,"USDHKD": .01
}
self.fxPairs_MagicTrinity = { \
"EURUSD": .5
,"GBPUSD": .3
,"USDJPY": .2
}
self.fxPairs_MajorFive = { \
"EURUSD": .4
,"USDJPY": .2
,"GBPUSD": .2
,"AUDUSD": .1
,"USDCAD": .1
}
self.fxPairs_MajorTen_Once = { \
"EURUSD": .1
,"USDJPY": .1
,"GBPUSD": .1
,"AUDUSD": .1
,"USDCAD": .1
,"USDCHF": .1
,"NZDUSD": .1
,"EURJPY": .1
,"GBPJPY": .1
,"EURGBP": .1
}
self.fxPairs_MajorTen_Double = { \
"EURUSD": .2
,"USDJPY": .2
,"GBPUSD": .2
,"AUDUSD": .2
,"USDCAD": .2
,"USDCHF": .2
,"NZDUSD": .2
,"EURJPY": .2
,"GBPJPY": .2
,"EURGBP": .2
}
self.fxPairs_MajorTen_Orig = { \
"EURUSD": .33
,"USDJPY": .16
,"GBPUSD": .13
,"AUDUSD": .07
,"USDCAD": .06
,"USDCHF": .06
,"NZDUSD": .05
,"EURJPY": .05
,"GBPJPY": .05
,"EURGBP": .04
}
self.fxPairs_MajorTen_Ext1 = { \
"EURUSD": .3
,"USDJPY": .1
,"AUDNZD": .05
,"AUDUSD": .05
,"USDCAD": .1
,"USDCHF": .1
,"NZDUSD": .05
,"EURJPY": .1
,"GBPJPY": .1
,"EURGBP": .05
}
self.fxPairs_EurUsd = { \
"EURUSD": 1
}
self.fxPairs_EurGbp = { \
"EURGBP": 1
}
self.fxPairs_EURUSD_USDCHF = { \
"EURUSD": .5
,"USDCHF": .5
}
self.fxPairs_EURUSD_GBPUSD = { \
"EURUSD": .7
,"GBPUSD": .3
}
self.fxPairs_EURUSD_GBPUSD_EQUAL = { \
"EURUSD": .5
,"GBPUSD": .5
}
self.fxPairs_EurUsd_Orig = { \
"EURUSD": 1
}
self.fxPairs_MajorForexBooster_JPY = { \
"EURUSD": .05
,"USDJPY": .05
,"GBPUSD": .05
,"AUDUSD": .05
,"USDCAD": .05
,"USDCHF": .05
,"NZDUSD": .05
,"EURJPY": .05
,"GBPJPY": .05
,"EURGBP": .05
,"AUDCAD": .05
,"AUDJPY": .05
,"AUDNZD": .05
,"CADJPY": .05
,"EURNZD": .05
,"GBPNZD": .05
,"SGDJPY": .05
,"NZDJPY": .05
,"USDJPY": .05
}
self.fxPairs_MajorForexBooster_Asia_Ext = { \
"EURUSD": .03125
,"USDJPY": .03125
,"GBPUSD": .03125
,"AUDUSD": .03125
,"USDCAD": .03125
,"USDCHF": .03125
,"NZDUSD": .03125
,"EURJPY": .03125
,"GBPJPY": .03125
,"EURGBP": .03125
,"AUDCAD": .03125
,"AUDCHF": .03125
,"AUDHKD": .03125
,"AUDJPY": .03125
,"AUDNZD": .03125
,"CADHKD": .03125
,"CADJPY": .03125
,"CHFHKD": .03125
,"CHFJPY": .03125
,"EURHKD": .03125
,"EURNZD": .03125
,"GBPHKD": .03125
,"GBPNZD": .03125
,"HKDJPY": .03125
,"NZDHKD": .03125
,"NZDJPY": .03125
,"SGDHKD": .03125
,"SGDJPY": .03125
,"TRYJPY": .03125
,"USDHKD": .03125
,"USDJPY": .03125
,"ZARJPY": .03125
}
self.fxPairs_MajorForexBooster_Asia = { \
"USDJPY": .037
,"AUDUSD": .037
,"NZDUSD": .037
,"EURJPY": .037
,"GBPJPY": .037
,"AUDCAD": .037
,"AUDCHF": .037
,"AUDHKD": .037
,"AUDJPY": .037
,"AUDNZD": .037
,"CADHKD": .037
,"CADJPY": .037
,"CHFHKD": .037
,"CHFJPY": .037
,"EURHKD": .037
,"EURNZD": .037
,"GBPHKD": .037
,"GBPNZD": .037
,"HKDJPY": .037
,"NZDHKD": .037
,"NZDJPY": .037
,"SGDHKD": .037
,"SGDJPY": .037
,"TRYJPY": .037
,"USDHKD": .037
,"USDJPY": .037
,"ZARJPY": .037
}
self.fxPairs_MajorTen = { \
"EURUSD": .1
,"USDJPY": .1
,"GBPUSD": .1
,"AUDUSD": .1
,"USDCAD": .1
,"USDCHF": .1
,"NZDUSD": .1
,"EURJPY": .1
,"GBPJPY": .1
,"EURGBP": .1
}
self.fxPairs_MajorTen_Main = { \
"EURUSD": .1
,"USDJPY": .1
,"GBPUSD": .1
,"AUDUSD": .1
,"USDCAD": .1
,"USDCHF": .1
,"NZDUSD": .1
,"EURJPY": .1
,"GBPJPY": .1
,"EURGBP": .1
}
self.fxPairs_MajorTen_OldStyle = { \
"EURUSD": .1
,"EURCAD": .1
,"CHFUSD": .1
,"EURAUD": .1
,"GBPNZD": .1
,"NZDEUR": .1
,"GBPEUR": .1
,"USDCAD": .1
,"AUDCAD": .1
,"AUDNZD": .1
}
self.fxPairs_MajorForexBooster_All = { \
"AUDCAD": .01429
,"AUDCHF": .01429
,"AUDHKD": .01429
,"AUDJPY": .01429
,"AUDNZD": .01429
,"AUDUSD": .01429
,"CADCHF": .01429
,"CADHKD": .01429
,"CADJPY": .01429
,"CADSGD": .01429
,"CHFHKD": .01429
,"CHFJPY": .01429
,"CHFZAR": .01429
,"EURAUD": .01429
,"EURCAD": .01429
,"EURCHF": .01429
,"EURCZK": .01429
,"EURDKK": .01429
,"EURGBP": .01429
,"EURHKD": .01429
,"EURHUF": .01429
,"EURJPY": .01429
,"EURNOK": .01429
,"EURNZD": .01429
,"EURPLN": .01429
,"EURSEK": .01429
,"EURSGD": .01429
,"EURTRY": .01429
,"EURUSD": .04289
,"EURZAR": .01429
,"GBPAUD": .01429
,"GBPCAD": .01429
,"GBPCHF": .01429
,"GBPHKD": .01429
,"GBPJPY": .01429
,"GBPNZD": .01429
,"GBPPLN": .01429
,"GBPSGD": .01429
,"GBPUSD": .01429
,"GBPZAR": .01429
,"HKDJPY": .01429
,"NZDCAD": .01429
,"NZDCHF": .01429
,"NZDHKD": .01429
,"NZDJPY": .01429
,"NZDSGD": .01429
,"NZDUSD": .01429
,"SGDCHF": .01429
,"SGDHKD": .01429
,"SGDJPY": .01429
,"TRYJPY": .01429
,"USDCAD": .01429
,"USDCHF": .01429
,"USDCNH": .01429
,"USDCZK": .01429
,"USDDKK": .01429
,"USDHKD": .01429
,"USDHUF": .01429
,"USDJPY": .01429
,"USDMXN": .01429
,"USDNOK": .01429
,"USDPLN": .01429
,"USDSEK": .01429
,"USDSGD": .01429
,"USDTHB": .01429
,"USDTRY": .01429
,"USDZAR": .01429
,"ZARJPY": .01429
}
self.fxPairs_Fx_Agile_Six = { \
# selected (green)
"EURUSD": .5
,"GBPUSD": .2
,"AUDUSD": .1
,"EURGBP": .1
,"HKDJPY": .05
,"USDJPY": .05
}
self.fxPairs_Fx_Agile_Six_Even = { \
# selected (green)
"EURUSD": .4
,"GBPUSD": .2
,"AUDUSD": .1
,"EURGBP": .1
,"HKDJPY": .1
,"USDJPY": .1
}
self.fxPairs_Fx_Agile_Six_Equal = { \
# selected (green)
"EURUSD": .5
,"GBPUSD": .1
,"AUDUSD": .1
,"EURGBP": .1
,"HKDJPY": .1
,"USDJPY": .1
}
self.fxPairs_Fx_Agile_Five_Equal = { \
"EURUSD": .5
,"GBPUSD": .2
,"AUDUSD": .1
,"HKDJPY": .1
,"USDJPY": .1
}
self.fxPairs_Fx_Agile_Quattro = { \
"EURUSD": .4
,"GBPUSD": .3
,"AUDUSD": .2
,"HKDJPY": .1
}
self.fxPairs_Fx_Agile_Trident = { \
"EURUSD": .5
,"GBPUSD": .3
,"AUDUSD": .2
}
self.fxPairs_Fx_Test = { \
"EURUSD": 1
#"GBPUSD": 1
#"AUDUSD": 1
#"EURGBP": 1
#"HKDJPY": 1
#"USDJPY": 1
#"USDCAD": 1
#"USDCHF": 1
#"EURNZD": 1
#"EURAUD": 1
#"AUDCAD": 1
#"AUDCHF": 1
#"AUDHKD": 1
#"AUDJPY": 1
#"AUDNZD": 1
#"CADCHF": 1
#"CADHKD": 1
#"CADJPY": 1
#"CADSGD": 1
#"CHFHKD": 1
#"CHFJPY": 1
#"CHFZAR": 1
#"EURCAD": 1
#"EURCHF": 1
#"EURCZK": 1
#"EURDKK": 1
#"EURHKD": 1
#"EURHUF": 1
#"EURJPY": 1
#"EURNOK": 1
#"EURPLN": 1
#"EURSEK": 1
#"EURSGD": 1
#"EURTRY": 1
#"EURZAR": 1
#"GBPAUD": 1
#"GBPCAD": 1
#"GBPCHF": 1
#"GBPHKD": 1
#"GBPJPY": 1
#"GBPNZD": 1
#"GBPPLN": 1
#"GBPSGD": 1
#"GBPZAR": 1
#"NZDCAD": 1
#"NZDCHF": 1
#"NZDHKD": 1
#"NZDJPY": 1
#"NZDSGD": 1
#"NZDUSD": 1
#"SGDCHF": 1
#"SGDHKD": 1
#"SGDJPY": 1
#"TRYJPY": 1
#"USDCNH": 1
#"USDCZK": 1
#"USDDKK": 1
#"USDHKD": 1
#"USDHUF": 1
#"USDMXN": 1
#"USDNOK": 1
#"USDPLN": 1
#"USDSEK": 1
#"USDSGD": 1
#"USDTHB": 1
#"USDTRY": 1
#"USDZAR": 1
#"ZARJPY": 1
}
self.fxPairs_Fx_Agile_Trident = { \
"EURUSD": .5
,"GBPUSD": .3
,"AUDUSD": .2
}
# ----------------------------------------------------------------------------
def selectGalaxy(self, galaxy_type = None):
# ----------------------------------------------------------------------------
if galaxy_type.strip().upper() == "AgileTrident".strip().upper():
return self.fxPairs_Fx_Agile_Trident
if galaxy_type.strip().upper() == "AgileQuattro".strip().upper():
return self.fxPairs_Fx_Agile_Quattro
if galaxy_type.strip().upper() == "Test".strip().upper():
return self.fxPairs_Fx_Test
if galaxy_type.strip().upper() == "EurUsd".strip().upper():
return self.fxPairs_EurUsd
if galaxy_type.strip().upper() == "EurUsd_GbpUsd".strip().upper():
return self.fxPairs_EURUSD_GBPUSD
if galaxy_type.strip().upper() == "EurUsd_GbpUsd_E".strip().upper():
return self.fxPairs_EURUSD_GBPUSD_EQUAL
if galaxy_type.strip().upper() == "MajorFive".strip().upper():
return self.fxPairs_MajorFive
if galaxy_type.strip().upper() == "AgileSix".strip().upper():
return self.fxPairs_Fx_Agile_Six
if galaxy_type.strip().upper() == "AgileSixEven".strip().upper():
return self.fxPairs_Fx_Agile_Six_Even
if galaxy_type.strip().upper() == "AgileSixEqual".strip().upper():
return self.fxPairs_Fx_Agile_Six_Equal
if galaxy_type.strip().upper() == "AgileFiveEqual".strip().upper():
return self.fxPairs_Fx_Agile_Five_Equal
if galaxy_type.strip().upper() == "MagicTrinity".strip().upper():
return self.fxPairs_MagicTrinity
if galaxy_type.strip().upper() == "EurGbp".strip().upper():
return self.fxPairs_EurGbp
if galaxy_type.strip().upper() == "EurUsd_Orig".strip().upper():
return self.fxPairs_EurUsd_Orig
if galaxy_type.strip().upper() == "EURUSD_USDCHF".strip().upper():
return self.fxPairs_EURUSD_USDCHF
if galaxy_type.strip().upper() == "MajorForexBooster".strip().upper():
return self.fxPairs_MajorForexBooster
if galaxy_type.strip().upper() == "MajorForexBooster_Equal".strip().upper():
return self.fxPairs_MajorForexBooster_Equal
if galaxy_type.strip().upper() == "MFB_Equal".strip().upper():
return self.fxPairs_MajorForexBooster_Equal
if galaxy_type.strip().upper() == "MajorForexBooster_All".strip().upper():
return self.fxPairs_MajorForexBooster_All
if galaxy_type.strip().upper() == "MajorForexBooster_Asia".strip().upper():
return self.fxPairs_MajorForexBooster_Asia
if galaxy_type.strip().upper() == "MajorForexBooster_Asia_Ext".strip().upper():
return self.fxPairs_MajorForexBooster_Asia_Ext
if galaxy_type.strip().upper() == "MajorForexBooster_Cool".strip().upper():
return self.fxPairs_MajorForexBooster_Cool
if galaxy_type.strip().upper() == "MFB_Cool".strip().upper():
return self.fxPairs_MajorForexBooster_Cool
if galaxy_type.strip().upper() == "MFB_Cool_Variant".strip().upper():
return self.fxPairs_MajorForexBooster_Cool_Variant
if galaxy_type.strip().upper() == "MajorForexBooster_Dude".strip().upper():
return self.fxPairs_MajorForexBooster_Dude
if galaxy_type.strip().upper() == "MFB_Dude".strip().upper():
return self.fxPairs_MajorForexBooster_Dude
if galaxy_type.strip().upper() == "MajorForexBooster_BigDude".strip().upper():
return self.fxPairs_MajorForexBooster_BigDude
if galaxy_type.strip().upper() == "MFB_BigDude".strip().upper():
return self.fxPairs_MajorForexBooster_BigDude
if galaxy_type.strip().upper() == "MajorForexBooster_JPY".strip().upper():
return self.fxPairs_MajorForexBooster_JPY
if galaxy_type.strip().upper() == "MajorForexBooster_Orig".strip().upper():
return self.fxPairs_MajorForexBooster_Orig
if galaxy_type.strip().upper() == "MajorTen".strip().upper():
return self.fxPairs_MajorTen
if galaxy_type.strip().upper() == "MajorTen_Double".strip().upper():
return self.fxPairs_MajorTen_Double
if galaxy_type.strip().upper() == "MajorTen_Ext1".strip().upper():
return self.fxPairs_MajorTen_Ext1
if galaxy_type.strip().upper() == "MajorTen_Main".strip().upper():
return self.fxPairs_MajorTen_Main
if galaxy_type.strip().upper() == "MajorTen_OldStyle".strip().upper():
return self.fxPairs_MajorTen_OldStyle
if galaxy_type.strip().upper() == "MajorTen_Once".strip().upper():
return self.fxPairs_MajorTen_Once
if galaxy_type.strip().upper() == "MajorTen_Orig".strip().upper():
return self.fxPairs_MajorTen_Orig
# default galaxy
return self.fxPairs_EurUsd'''
To use this library place this at the top:
from AlgoToolbox import AlgoToolbox
Then instantiate the class:
toolbox = AlgoToolbox(self, True, True)
toolbox.show_log(...)
'''
# ********************************
class AlgoToolbox:
# ********************************
# -------------------------------------------------------------------------------------------------------------
def __init__(self, qcAlgo, showDebug = False, showLog = False):
# -------------------------------------------------------------------------------------------------------------
self.qcAlgo = qcAlgo
self.showDebug = showDebug
self.showLog = showLog
# -------------------------------------------------------------------------------------------------------------
def Log(self, msg, prefix = "", suffix = ""):
# -------------------------------------------------------------------------------------------------------------
self.qcAlgo.Log("LOG: " + prefix + " : " + str(self.qcAlgo.Time) + " - " + msg + suffix)
# -------------------------------------------------------------------------------------------------------------
def show_log(self, msg, prefix = "", suffix = ""):
# -------------------------------------------------------------------------------------------------------------
if self.showLog:
self.Log(msg, prefix, suffix)
# -------------------------------------------------------------------------------------------------------------
def show_csv_log(self, msg, sep_char = ";", hide_prefix = True):
# -------------------------------------------------------------------------------------------------------------
if self.showLog:
if hide_prefix:
self.qcAlgo.Log(str(msg))
else:
self.qcAlgo.Log("LOG" + sep_char + str(msg))
# -------------------------------------------------------------------------------------------------------------
def Debug(self, msg, prefix = "", suffix = ""):
# -------------------------------------------------------------------------------------------------------------
self.qcAlgo.Debug("DEBUG: " + prefix + " : " + str(self.qcAlgo.Time) + " - " + msg + suffix)
# -------------------------------------------------------------------------------------------------------------
def show_debug(self, msg, prefix = "", suffix = ""):
# -------------------------------------------------------------------------------------------------------------
if self.showLog:
self.Debug(msg, prefix, suffix)
# -------------------------------------------------------------------------------------------------------------
def pip_diff(self, open, close):
# -------------------------------------------------------------------------------------------------------------
multiplier = self.pip_multiplier(open)
pips = round((close - open) * multiplier)
return int(pips)
# -------------------------------------------------------------------------------------------------------------
def add_pip(self, price, pip):
# -------------------------------------------------------------------------------------------------------------
divider = self.pip_divider(price)
pip_price = pip * divider
result_price = price + pip_price
return result_price
# -------------------------------------------------------------------------------------------------------------
def pip_multiplier(self, price):
# -------------------------------------------------------------------------------------------------------------
pip_sign = 5
dec_pos = str(price).index('.')
return 10 ** (pip_sign - dec_pos)
# -------------------------------------------------------------------------------------------------------------
def pip_divider(self, price):
# -------------------------------------------------------------------------------------------------------------
multiplier = self.pip_multiplier(price)
return 1 / multiplier
# -------------------------------------------------------------------------------------------------------------
def pip_round(self, price):
# -------------------------------------------------------------------------------------------------------------
pip_size = 5
if price > 10:
pip_size = 3
result = round(price, pip_size)
return result
# -----------------------------------------------------------------------------------------------------------
def get_date(self, strDate, splitChar):
# -----------------------------------------------------------------------------------------------------------
if strDate and (not strDate.isspace()):
str_date_parts = strDate.split(splitChar)
year = int(str_date_parts[0])
month = int(str_date_parts[1])
day = int(str_date_parts[2])
return year, month, day
else:
return 0, 0, 0
# -----------------------------
def cut_by_zero_one(self, level):
# -----------------------------
if level > 1:
return 1
else:
if level < 0:
return 0
else:
return level
# -----------------------------
def cut_by_one_minusone(self, level):
# -----------------------------
if level > 1:
return 1
else:
if level < -1:
return -1
else:
return level
# -----------------------------
def normalize_percent(self, value):
# -----------------------------
return self.cut_by_one_minusone(value/100)
# -----------------------------
def normalize_percent_abs(self, value):
# -----------------------------
return abs(self.cut_by_one_minusone(value/100))from AlgoToolbox import AlgoToolbox
from QuantConnect import *
from numpy import diff
import numpy as np
# ===================================
class ForexSymbolBrick:
# ===================================
# ---------------------
def __init__(self,qcAlgo, ticker, weight, resolution, anchorResolution, scalpPeriod, anchorPeriod,
showDebug = False, showLog = False, anchor_max_slots = 24, scalp_max_slots = 10):
# ---------------------
self.qcAlgo = qcAlgo
self.strategy = None
self.toolbox = qcAlgo.toolbox
self.data = None
self.csv_sep_chr = self.qcAlgo.csv_sep_chr
self.StopLoss = None
self.ProfitTarget = None
self.anchor_lookback_for_trade_enabled = self.qcAlgo.anchor_lookback_for_trade_enabled
self.direction_lookback = self.qcAlgo.direction_lookback
self.indicator_lookback = self.qcAlgo.indicator_lookback
self.drawdown_on_trade_start = self.qcAlgo.drawdown_on_trade_start
self.exit_1_profit_guaranteed = self.qcAlgo.exit_1_profit_guaranteed
# scalp ema props
self.scalp_ema_fast = 0
self.scalp_ema_slow = 0
# anchor ema props
self.anchor_ema_fast = 0
self.anchor_ema_slow = 0
# fast-/slow-ema inter-delta props (pips)
self.anchor_emas_pips_delta = 0
self.scalp_emas_pips_delta = 0
# trade control props
self.anchor_warmed_up = False
self.check_trade_allowed = False
# machine states
self.STAND_BY = 0
self.TRIGGER_LONG = 1
self.TRIGGER_SHORT = -1
self.ENTRY_LONG = 10
self.ENTRY_SHORT = -10
self.EXIT_SHORT = -100
self.EXIT_LONG = 100
self.EXIT_1_SHORT_BASE = -101
self.EXIT_1_LONG_BASE = 101
self.EXIT_2_SHORT_BASE = -102
self.EXIT_2_LONG_BASE = 102
self.TRADE_CONTINUE_SHORT_1 = -201
self.TRADE_CONTINUE_LONG_1 = 201
self.TRADE_CONTINUE_SHORT_2 = -202
self.TRADE_CONTINUE_LONG_2 = 202
self.TRADE_SKIP_SHORT_1 = -301
self.TRADE_SKIP_LONG_1 = 301
self.TRADE_SKIP_SHORT_2 = -302
self.TRADE_SKIP_LONG_2 = 302
self.STOP_LOSS_SHORT = -401
self.STOP_LOSS_LONG = 401
self.STOP_LOSS_TAKE_PROFIT_SHORT = -402
self.STOP_LOSS_TAKE_PROFIT_LONG = 402
self.TREND_REVERSION_SHORT_TO_LONG = -403
self.TREND_REVERSION_LONG_TO_SHORT = 403
self.DROP_TRIGGER_SHORT = -404
self.DROP_TRIGGER_LONG = 404
self.trade_state = self.STAND_BY
# trade business props
self.trade_magnitude_scalp = 0
self.trade_magnitude_anchor = 0
self.trade_diff = 0
self.trade_risk = 0
self.trade_enter = 0
self.trade_break_even = 0
self.trade_stop_loss = 0
self.trade_take_profit = 0
self.trade_exit = 0
self.trade_start_level = 0
self.trade_take_profit_level = 0
self.trade_given_stop_loss = 0
self.trade_orange_risk_level = 0
self.stop_loss_level = 0
self.stop_loss_purple_level = 0
self.trade_lowest_low = 0
self.entry_level = 0
self.trade_take_profit = 0
self.trade_abs_start_level = 0
self.trade_abs_stop_level = 0
# order limit props
self.Veto_CancelOrders = False
self.Ignore_Cancel_Event = False
self.ProfitTargetCanceled = False
self.ProfitTarget_LastUpdate = None
self.StopLossCanceled = False
self.StopLoss_LastUpdate = None
# semafor props
self.IsLevel_TradeEnabled_Semafors = {}
self.Direction_Semafors = {}
self.Indicator_Deltas = {}
self.IsLevel_Trigger_Semafor = False
self.IsLevel_Entry_Semafor = False
self.IsLevel_Exit_1_Semafor = False
self.IsLevel_Exit_2_Semafor = False
self.IsLevel_StopLoss_Semafor = False
self.IsLevel_OrangeRisk_Semafor = False
# trend props
# self.is_trend_changed = False
self.is_narrow_ema_anchor = False
self.is_narrow_ema_scalp = False
self.is_short_trade = False
self.is_long_trade = False
self.anchor_fast_indicator_margin = 0
self.anchor_slow_indicator_margin = 0
self.scalp_fast_indicator_margin = 0
self.scalp_mid_indicator_margin = 0
self.scalp_slow_indicator_margin = 0
# scalp fluent props
self.scalp_fluent_time = 0
self.scalp_fluent_open = 0
self.scalp_fluent_high = 0
self.scalp_fluent_low = 0
self.scalp_fluent_close= 0
self.scalp_new_fluent_data = True
# anchor fluent props
self.anchor_fluent_time = 0
self.anchor_fluent_open = 0
self.anchor_fluent_high = 0
self.anchor_fluent_low = 0
self.anchor_fluent_close= 0
self.anchor_new_fluent_data = True
# trade size props
self.trade_units = 0
self.lot_size = 0
# trade props
self.ticker = ticker
self.resolution = resolution
self.anchorResolution = anchorResolution
self.direction_trade = InsightDirection.Flat
self.forex = self.qcAlgo.AddForex(ticker, self.resolution, self.qcAlgo.Market)
# trade insight props
self.symbol = self.forex.Symbol
self.weight = weight
self.scalpPeriod = scalpPeriod
self.direction_trend = InsightDirection.Flat
self.trade_model = "STAND_BY"
self.trade_confidence = 0
self.trade_magnitude = 0
# order lot size props
self.lotSize = 1
self.lotsCount = 0
self.orderSize = 0
self.roundedOrderSize = 0
# profit benchmarks
self.LastTradeProfit = 0
self.UnrealizedProfit = 0
self.UnrealizedProfitPercent = 0
self.Portfolio_Cash = 0
self.Portfolio_TotalProfit = 0
self.Portfolio_TotalUnrealizedProfit = 0
self.scalp_update_count = {}
self.scalp_periods = []
self.anchorPeriod = anchorPeriod
self.anchor_update_count = {}
self.anchor_periods = []
self.scalp_max_slots = scalp_max_slots
self.anchor_max_slots = anchor_max_slots
# self.scalp_double_period = self.scalpPeriod ] 2
self.bbUpperPrevious = 0
self.scalpQuoteBar_RollingWindow = RollingWindow[QuoteBar](self.scalp_max_slots)
self.anchorQuoteBar_RollingWindow = RollingWindow[QuoteBar](self.anchor_max_slots)
self.scalpIndicators = {}
self.anchorIndicators = {}
# scalp Rate of Change indicator
self.scalp_ROC = self.qcAlgo.ROC(self.symbol, scalpPeriod, self.resolution)
# anchor Rate of Change indicator
self.anchor_ROC = self.qcAlgo.ROC(self.symbol, anchorPeriod, self.anchorResolution)
self.scalpIndicators_RollingWindows = {}
self.anchorIndicators_RollingWindows = {}
self.showDebug = showDebug
self.showLog = showLog
self.toolbox = AlgoToolbox(self.qcAlgo, self.showDebug, self.showLog)
self.create_indicators()
self.create_consolidators()
self.scalp_chart = Chart(self.ticker + "-Scalp")
self.anchor_chart = Chart(self.ticker + "-Anchor")
# agnostic series (anchor / scalp)
self.price_chart_serial_open = "Open"
self.price_chart_serial_close = "Close"
self.price_chart_serial_close_consolidated = "Close-Bar"
self.price_chart_serial_open_price_point = "Open-PT"
self.price_chart_serial_close_price_point = "Close-PT"
self.price_chart_serial_high_price_point = "High-PT"
self.price_chart_serial_low_price_point = "Low-PT"
self.price_chart_serial_ema_cross_point = "EMA-Cross"
# scalp series shorts (lines)
self.price_chart_serial_short_entry = "Entry-S"
self.price_chart_serial_short_stop_loss = "StopL-S"
self.price_chart_serial_short_exit_1 = "Exit1-S"
self.price_chart_serial_short_exit_2 = "Exit2-S"
# scalp series shorts (points)
self.price_chart_serial_short_trigger_point = "Trigger-SPT"
self.price_chart_serial_short_entry_point = "Entry-SPT"
self.price_chart_serial_short_stop_loss_point = "StopL-SPT"
self.price_chart_serial_short_exit_1_point = "Exit1-SPT"
self.price_chart_serial_short_exit_2_point = "Exit2-SPT"
# scalp series longs (lines)
self.price_chart_serial_long_entry = "Entry-L"
self.price_chart_serial_long_stop_loss = "StopL-L"
self.price_chart_serial_long_exit_1 = "Exit1-L"
self.price_chart_serial_long_exit_2 = "Exit2-L"
# scalp series longs (points)
self.price_chart_serial_long_trigger_point = "Trigger-LPT"
self.price_chart_serial_long_entry_point = "Entry-LPT"
self.price_chart_serial_long_stop_loss_point = "StopL-LPT"
self.price_chart_serial_long_exit_1_point = "Exit1-LPT"
self.price_chart_serial_long_exit_2_point = "Exit2-LPT"
# Define Scalp-Series: z.B. self.scalp_ticker("ema", 8)
# Define Anchor-Series: z.B. self.anchor_ticker("ema", 8)
self.create_scalp_chart()
self.create_anchor_chart()
# [ PREDICTIONS REGION (BEGIN) ]
# ---------------------
def Emit_Insight(self):
# ---------------------
self.emit_insight(
self.scalpPeriod, \
self.direction_trend, \
self.trade_magnitude, \
self.trade_confidence, \
self.trade_model, \
self.weight)
# ---------------------
def emit_insight(self, time_delta_min, direction, magnitude=None, confidence=None, model=None, weight=None):
# ---------------------
# validation
if not self.qcAlgo.emit_insights:
return
# init vars
insight_timedelta = timedelta(minutes=time_delta_min)
# Creates an insight for the current symbol, predicting that it will move up/down within given timedelta
if (direction == InsightDirection.Flat):
self.qcAlgo.EmitInsights(
Insight.Price(self.symbol, insight_timedelta, InsightDirection.Flat))
else:
insight = Insight( \
self.symbol, insight_timedelta, InsightType.Price, \
direction, magnitude, confidence, model, weight)
self.qcAlgo.EmitInsights(insight)
# [ PREDICTIONS REGION (END) ]
# [ QUOTE BARS REGION (BEGIN) ]
# ---------------------
def create_consolidators(self):
# ---------------------
# create scalp onsolidator
self.scalpConsolidator = QuoteBarConsolidator(timedelta(minutes=self.scalpPeriod))
# create anchor consolidator
self.anchorConsolidator = QuoteBarConsolidator(timedelta(minutes=self.anchorPeriod))
# attach our event handlers
self.scalpConsolidator.DataConsolidated += self.ScalpBarsHandler
self.anchorConsolidator.DataConsolidated += self.AnchorBarsHandler
# add consolidators to the manager to receive updates from the engine
self.qcAlgo.SubscriptionManager.AddConsolidator(self.symbol, self.scalpConsolidator)
self.qcAlgo.SubscriptionManager.AddConsolidator(self.symbol, self.anchorConsolidator)
# ---------------------
def ScalpBarsHandler(self, sender, quoteBar):
# ---------------------
#self.toolbox.show_debug(str(quoteBar), "Adding Scalp-Bar: ")
# update RW Bars
self.scalpQuoteBar_RollingWindow.Add(quoteBar)
# update scalp ROC
self.scalp_ROC.Update(quoteBar.EndTime, quoteBar.Close)
# update RW Indicators
for ind_key in self.scalpIndicators.keys():
indicator = self.scalpIndicators[ind_key]
indicator_rw = self.scalpIndicators_RollingWindows[ind_key]
indicator.Update(quoteBar.EndTime, quoteBar.Close)
indicator_rw.Add(indicator.Current)
# plot scalp chart
if not self.qcAlgo.IsWarmingUp:
if self.IsChartTime(quoteBar.Time):
if self.qcAlgo.ChartStartHour == 0 or (quoteBar.Time.hour >= self.qcAlgo.ChartStartHour and quoteBar.Time.hour <= self.qcAlgo.ChartEndHour):
self.strategy.plot_scalp_chart(self, quoteBar)
# Call Scalp-Run (call Srategy Scalp-Run via Brick-Scalp Event-Handler)
# self.strategy.RunScalp(self)
# ---------------------
def AnchorBarsHandler(self, sender, quoteBar):
# ---------------------
#self.toolbox.show_debug(str(quoteBar), "Adding Anchor-Bar: ")
# update RW Bars
self.anchorQuoteBar_RollingWindow.Add(quoteBar)
# update anchor ROC
self.anchor_ROC.Update(quoteBar.EndTime, quoteBar.Close)
# update RW Indicators
for ind_key in self.anchorIndicators.keys():
indicator = self.anchorIndicators[ind_key]
indicator_rw = self.anchorIndicators_RollingWindows[ind_key]
indicator.Update(quoteBar.EndTime, quoteBar.Close)
indicator_rw.Add(indicator.Current)
# plot anchor chart
if not self.qcAlgo.IsWarmingUp:
if self.IsChartTime(quoteBar.Time):
self.strategy.plot_anchor_chart(self, quoteBar)
# Call Anchor-Run (call Srategy Anchor-Run via Brick-Achor Event-Handler)
# self.strategy.RunAnchor(self)
# [ QUOTE BARS REGION (END) ]
# [ TRADE RULES/EVENTS REGION (BASE / BEGIN) ]
# base
# ---------------------
def Check_Trade_Rules(self, data):
# ---------------------
self.check_trade_allowed = self.AllowCheckTrade()
# Go!
# TODO: Check Trade Rules (Trigger/Entry/StopLoss/Exit) here
# and call self.strategy.OnTrigger/OnEntry/OnStopLoss/OnExit if needed
# get profit benchmarks
self.get_profit_benchmarks()
# ---------------------
def get_profit_benchmarks(self):
# ---------------------
self.Symbol_LastTradeProfit = self.qcAlgo.Portfolio[self.symbol].LastTradeProfit
self.Symbol_UnrealizedProfit = self.qcAlgo.Portfolio[self.symbol].UnrealizedProfit
self.Symbol_UnrealizedProfitPercent = self.qcAlgo.Portfolio[self.symbol].UnrealizedProfitPercent
self.Portfolio_Cash = self.qcAlgo.Portfolio.Cash
self.Portfolio_TotalProfit = self.qcAlgo.Portfolio.TotalProfit
self.Portfolio_TotalUnrealizedProfit = self.qcAlgo.Portfolio.TotalUnrealizedProfit
# ---------------------
def Check_Trade_Direction(self, ind_fast, ind_slow, o,h,l,c, tolerance = 0, touch_margins = "open_close"):
# ---------------------
self.trade_direction_flattening()
self.Set_Anchor_Indicators_Margins(ind_fast, ind_slow, tolerance)
return InsightDirection.Flat
# ---------------------
def trade_direction_flattening(self):
# ---------------------
self.direction_trade = InsightDirection.Flat
# ---------------------
def trend_direction_flattening(self):
# ---------------------
self.direction_trend = InsightDirection.Flat
# ---------------------
def trade_direction_str(self):
# ---------------------
if self.direction_trade == InsightDirection.Up:
return "LONG"
else:
if self.direction_trade == InsightDirection.Down:
return "SHORT"
else:
return "WAIT"
# ---------------------
def set_trend_direction(self):
# ---------------------
if self.trade_magnitude > 0:
self.direction_trend = InsightDirection.Up
else:
if self.trade_magnitude < 0:
self.direction_trend = InsightDirection.Down
else:
self.trend_direction_flattening()
return self.direction_trend
# ---------------------
def trend_direction_str(self):
# ---------------------
if self.direction_trend == InsightDirection.Up:
return "UP"
else:
if self.direction_trend == InsightDirection.Down:
return "DOWN"
else:
return "FLAT"
# ---------------------
def Set_Anchor_Indicators_Margins(self, ind_fast, ind_slow, tolerance = 0):
# ---------------------
self.anchor_fast_indicator_margin = ind_fast
self.anchor_slow_indicator_margin = ind_slow
if ind_fast < ind_slow:
self.anchor_fast_indicator_margin = self.minus_pip(ind_fast, tolerance)
self.anchor_slow_indicator_margin = self.plus_pip(ind_slow, tolerance)
if ind_fast > ind_slow:
self.anchor_fast_indicator_margin = self.plus_pip(ind_fast, tolerance)
self.anchor_slow_indicator_margin = self.minus_pip(ind_slow, tolerance)
# ---------------------
def Set_Scalp_Indicators_Margins(self, ind_fast, ind_mid, ind_slow, tolerance = 0):
# ---------------------
self.scalp_fast_indicator_margin = ind_fast
self.scalp_mid_indicator_margin = ind_mid
self.scalp_slow_indicator_margin = ind_slow
if ind_fast < ind_slow:
self.scalp_fast_indicator_margin = self.minus_pip(ind_fast, tolerance)
self.scalp_slow_indicator_margin = self.plus_pip(ind_slow, tolerance)
if ind_fast > ind_slow:
self.scalp_fast_indicator_margin = self.plus_pip(ind_fast, tolerance)
self.scalp_slow_indicator_margin = self.minus_pip(ind_slow, tolerance)
# [ TRADE RULES/EVENTS REGION (BASE / BEGIN) ]
# [ TRADE DECISION FACTORS REGION (BEGIN) ]
# ---------------------
def IsNarrowScalpEMA(self, fast, slow):
# ---------------------
self.UpdateScalpEMA(fast, slow)
self.is_narrow_ema_scalp = self.scalp_emas_pips_delta < self.qcAlgo.scalp_delta_min_pips
return self.is_narrow_ema_scalp
# ---------------------
def IsNarrowAnchorEMA(self, fast, slow):
# ---------------------
self.UpdateAnchorEMA(fast, slow)
self.is_narrow_ema_anchor = self.anchor_emas_pips_delta < self.qcAlgo.anchor_delta_min_pips
return self.is_narrow_ema_anchor
# ---------------------
def AllowCheckTrade(self):
# ---------------------
return not self.is_narrow_ema_anchor
# [ TRADE DECISION FACTORS REGION (END) ]
# [ TRADE FACTORS REGION (BEGIN) ]
# ---------------------
def Switch_Model(self, mode):
# ---------------------
self.trade_model = mode
# ---------------------
def UpdateScalpEMA(self, fast, slow):
# ---------------------
self.scalp_ema_fast = fast
self.scalp_ema_slow = slow
self.scalp_emas_pips_delta = self.pip_diff_abs(fast, slow)
# ---------------------
def UpdateAnchorEMA(self, fast, slow):
# ---------------------
self.anchor_ema_fast = fast
self.anchor_ema_slow = slow
self.anchor_emas_pips_delta = self.pip_diff_abs(fast, slow)
# ---------------------
def UpdateAnchorFluentData(self, bigData):
# ---------------------
data = bigData[self.symbol]
self.data = data
self.anchor_fluent_time = data.Time
if self.anchor_new_fluent_data or data.High > self.anchor_fluent_high:
self.anchor_fluent_high = data.High
if self.anchor_new_fluent_data or data.Low < self.anchor_fluent_low:
self.anchor_fluent_low = data.Low
self.anchor_fluent_close= data.Close
if self.anchor_new_fluent_data:
self.anchor_fluent_open = data.Open
self.anchor_new_fluent_data = False
# ---------------------
def UpdateScalpFluentData(self, bigData):
# ---------------------
data = bigData[self.symbol]
self.scalp_fluent_time = data.Time
if self.scalp_new_fluent_data or data.High > self.scalp_fluent_high:
self.scalp_fluent_high = data.High
if self.scalp_new_fluent_data or data.Low < self.scalp_fluent_low:
self.scalp_fluent_low = data.Low
self.scalp_fluent_close= data.Close
if self.scalp_new_fluent_data:
self.scalp_fluent_open = data.Open
self.scalp_new_fluent_data = False
# ---------------------
def AnchorIsWarmedUp(self):
# ---------------------
if not self.anchor_warmed_up:
self.anchor_warmed_up = True
# ---------------------
def ResetAnchorFluentData(self):
# ---------------------
self.anchor_new_fluent_data = True
# ---------------------
def ResetScalpFluentData(self):
# ---------------------
self.scalp_new_fluent_data = True
# ---------------------
def cancel_limits(self):
# ---------------------
self.reset_limits()
self.cancel_take_profit(True)
self.cancel_stop_loss(True)
self.reset_limits()
# ---------------------
def match_order(self, ticket, order_id):
# ---------------------
if ticket is not None:
return ticket.OrderId == order_id
else:
return False
# ---------------------
def allow_cancel_orders(self):
# ---------------------
self.Veto_CancelOrders = False
# ---------------------
def veto_cancel_orders(self):
# ---------------------
self.Veto_CancelOrders = True
# ---------------------
def cancel_event_ignore(self):
# ---------------------
self.Ignore_Cancel_Event = True
# ---------------------
def cancel_event_attention(self):
# ---------------------
self.Ignore_Cancel_Event = False
# ---------------------
def reset_limits(self):
# ---------------------
self.allow_cancel_orders()
self.ProfitTargetCanceled = False
self.ProfitTarget_LastUpdate = None
self.StopLossCanceled = False
self.StopLoss_LastUpdate = None
# ---------------------
def cancel_enabled_state(self):
# ---------------------
return \
self.trade_state != self.ENTRY_SHORT and self.trade_state != self.ENTRY_LONG
#and self.trade_state != self.EXIT_1_SHORT_BASE and self.trade_state != self.EXIT_1_LONG_BASE
# ---------------------
def cancel_take_profit(self, overrule_veto = False, show_action = True, state_reset = False):
# ---------------------
# init
result = False
# cancel take profit order
if self.cancel_enabled_state():
if overrule_veto or (not self.Veto_CancelOrders and not self.ProfitTargetCanceled):
if self.ProfitTarget is not None:
self.ProfitTarget.Cancel()
result = True
self.ProfitTargetCanceled = True
if show_action:
self.trade_action_cancel_tp()
# state machine reset
if state_reset:
self.reset_state()
# finally
return result
# ---------------------
def cancel_stop_loss(self, overrule_veto = False, show_action = True, state_reset = False):
# ---------------------
# init
result = False
# cancel stop loss order
if self.cancel_enabled_state():
if overrule_veto or (not self.Veto_CancelOrders and not self.StopLossCanceled):
if self.StopLoss is not None:
self.StopLoss.Cancel()
result = True
self.StopLossCanceled = True
if show_action:
self.trade_action_cancel_sl()
# state machine reset
if state_reset:
self.reset_state()
# finally
return result
# ---------------------
def update_take_profit(self, price, quantity = 0, allow_lower_profit = False):
# ---------------------
# init
updated = False
allow_update = True
# Go!
# Update Take Profit order
if not self.ProfitTargetCanceled and self.ProfitTarget is not None:
if self.ProfitTarget_LastUpdate is None:
self.ProfitTarget_LastUpdate = UpdateOrderFields()
if quantity != 0 and self.ProfitTarget_LastUpdate.Quantity != quantity:
self.ProfitTarget_LastUpdate.Quantity = quantity
updated = True
if self.ProfitTarget_LastUpdate.LimitPrice != price:
last_limit_price = self.ProfitTarget_LastUpdate.LimitPrice
if last_limit_price is not None:
if self.is_long_trade:
allow_update = price > last_limit_price or allow_lower_profit
if self.is_short_trade:
allow_update = price < last_limit_price or allow_lower_profit
else:
allow_update = True
if allow_update:
self.ProfitTarget_LastUpdate.LimitPrice = self.pip_round(price)
updated = True
else:
updated = False
if updated:
self.ProfitTarget.Update(self.ProfitTarget_LastUpdate)
# ---------------------
def update_stop_loss(self, price, quantity = 0, allow_bigger_loss = False):
# ---------------------
# init
updated = False
allow_update = True
# Go!
# Update Take Profit order
if not self.StopLossCanceled and self.StopLoss is not None:
if self.StopLoss_LastUpdate is None:
self.StopLoss_LastUpdate = UpdateOrderFields()
if quantity != 0 and self.StopLoss_LastUpdate.Quantity != quantity:
self.StopLoss_LastUpdate.Quantity = quantity
updated = True
if self.StopLoss_LastUpdate.LimitPrice != price:
last_limit_price = self.StopLoss_LastUpdate.LimitPrice
if last_limit_price is not None:
if self.is_long_trade:
allow_update = price > last_limit_price or allow_bigger_loss
if self.is_short_trade:
allow_update = price < last_limit_price or allow_bigger_loss
else:
allow_update = True
if allow_update:
self.StopLoss_LastUpdate.LimitPrice = self.pip_round(price)
updated = True
else:
updated = False
if updated:
self.StopLoss.Update(self.StopLoss_LastUpdate)
# ---------------------
def reset_state(self):
# ---------------------
# reset levels
self.drop_all_levels()
# reset machine-state
self.trade_state = self.STAND_BY
# ---------------------
def drop_all_levels(self):
# ---------------------
self.trade_risk = 0
self.touch_level = 0
self.entry_level = 0
self.exit1_level = 0
self.exit2_level = 0
self.stop_loss_level = 0
self.stop_loss_purple_level = 0
self.trade_magnitude_scalp = 0
self.trade_magnitude_anchor = 0
self.trade_diff = 0
self.trade_enter = 0
self.trade_break_even = 0
self.trade_stop_loss = 0
self.trade_take_profit = 0
self.trade_exit = 0
self.trade_start_level = 0
self.trade_take_profit_level = 0
self.trade_given_stop_loss = 0
self.trade_orange_risk_level = 0
self.trade_lowest_low = 0
self.trade_abs_start_level = 0
# self.trade_abs_stop_level = 0
# [ TRADE FACTORS REGION (END) ]
# [ INDICATORS REGION (BEGIN) ]
# ---------------------
def create_indicators(self):
# ---------------------
self.create_scalp_indicators()
self.create_anchor_indicators()
# ---------------------
def new_scalp_indicators(self):
# ---------------------
pass
# ---------------------
def new_anchor_indicators(self):
# ---------------------
pass
# ---------------------
def create_scalp_indicators(self):
# ---------------------
indicators = self.new_scalp_indicators()
for ticker in indicators.keys():
self.register_scalp_indicator(ticker, indicators[ticker])
# ---------------------
def create_anchor_indicators(self):
# ---------------------
indicators = self.new_anchor_indicators()
for ticker in indicators.keys():
self.register_anchor_indicator(ticker, indicators[ticker])
# ---------------------
def register_scalp_indicator(self, ind_key, indicator):
# ---------------------
self.register_indicator(self.scalpIndicators, indicator, ind_key, self.scalpIndicators_RollingWindows, self.scalp_max_slots + 1)
# ---------------------
def register_anchor_indicator(self, ind_key, indicator):
# ---------------------
self.register_indicator(self.anchorIndicators, indicator, ind_key, self.anchorIndicators_RollingWindows, self.anchor_max_slots + 1)
# ---------------------
def register_indicator(self, indicators, indicator, ind_key, indicators_windows, ind_wind_len):
# ---------------------
indicators[ind_key] = indicator
indicators_windows[ind_key] = RollingWindow[IndicatorDataPoint](ind_wind_len)
# ---------------------
def scalp_indicators_ready(self):
# ---------------------
result = True
for indicator in self.scalpIndicators.values():
result &= indicator.IsReady
if not result:
return result
# finally
return result
# ---------------------
def anchor_indicators_ready(self):
# ---------------------
result = True
for indicator in self.anchorIndicators.values():
result &= indicator.IsReady
if not result:
return result
for indicator in self.quarterIndicators.values():
result &= indicator.IsReady
if not result:
return result
# finally
return result
# ---------------------
def all_indicators_ready(self):
# ---------------------
return self.scalp_indicators_ready() and self.anchor_indicators_ready()
# ---------------------
def plot_scalp_indicator(self, ind_key):
# ---------------------
pass
# indicator = self.scalpIndicators[ind_key]
# self.qcAlgo.PlotIndicator("scalp_" + ind_key + "_" + self.symbol, True, indicator)
# ---------------------
def plot_anchor_indicator(self, ind_key):
# ---------------------
pass
#indicator = self.anchorIndicators[ind_key]
#self.qcAlgo.PlotIndicator("anchor_" + ind_key + "_" + self.symbol, True, indicator)
# [ INDICATORS REGION (END) ]
# [ INSIGHT CALC REGION (BEGIN) ]
# ---------------------
def Get_Confidence(self):
# ---------------------
# main operation - calc confidence
self.trade_confidence = self.Calc_Insight_Confidence(self.IsLevel_TradeEnabled_Semafors)
# co-operations - calc Diff
# self.Get_Diff()
# co-operations - calc ROCs (anchor/scalp), Magnitude
self.Get_Magnitude()
return self.trade_confidence
# ---------------------
def Get_Magnitude(self):
# ---------------------
self.trade_magnitude_scalp = float(self.scalp_ROC.Current.Value)
self.trade_magnitude_anchor = float(self.scalp_ROC.Current.Value)
# finally return weighted magnitude (50% for anchor and 50% for scalp)
self.trade_magnitude = self.trade_magnitude_scalp * .5 + self.trade_magnitude_anchor * .5
# apply magnitude amplifier
self.trade_magnitude = self.trade_magnitude * self.qcAlgo.trade_magnitude_amplifier
self.set_trend_direction()
return self.trade_magnitude
# ---------------------
def Get_Diff(self):
# ---------------------
self.trade_diff = self.Calc_Insight_Diff()
return self.trade_diff
# ---------------------
def Calc_Insight_Confidence(self, trade_enabled_semafors):
# ---------------------
# init
confidence_offset = 0
wconf_indicator_delta_anchor = 0
wconf_indicator_delta_scalp = 0
wconf_trade_enabled_live = 0
wconf_trade_enabled_hist = 0
# get indicator rolling windows
rw_ai_fast = self.anchorIndicators_RollingWindows["ema_8_anchor"]
rw_ai_slow = self.anchorIndicators_RollingWindows["ema_21_anchor"]
rw_si_fast = self.scalpIndicators_RollingWindows["ema_8_scalp"]
rw_si_slow = self.scalpIndicators_RollingWindows["ema_21_scalp"]
live_ai_fast = rw_ai_fast[0].Value
live_ai_slow = rw_ai_slow[0].Value
live_si_fast = rw_si_fast[0].Value
live_si_slow = rw_si_slow[0].Value
# get confidence properties
weight_confidence_trade_enabled_live = self.qcAlgo.confidence_trade_enabled_main
weight_confidence_trade_enabled_hist = self.qcAlgo.confidence_trade_enabled_hist
weight_confidence_indicator_delta_anchor = self.qcAlgo.confidence_indicator_delta_anchor
weight_confidence_indicator_delta_scalp = self.qcAlgo.confidence_indicator_delta_scalp
# calc indicators delta (anchor)
ai_delta_narrow = self.Get_Rolling_Indicators_Delta_Narrow(rw_ai_fast, rw_ai_slow)
ai_delta_wide = self.Get_Rolling_Indicators_Delta_Wide(rw_ai_fast, rw_ai_slow)
delta_max_ai = abs(ai_delta_wide - ai_delta_narrow)
# calc indicators delta (scalp)
si_delta_narrow = self.Get_Rolling_Indicators_Delta_Narrow(rw_si_fast, rw_si_slow)
si_delta_wide = self.Get_Rolling_Indicators_Delta_Wide(rw_si_fast, rw_si_slow)
delta_max_si = abs(si_delta_wide - si_delta_narrow)
# Go calc!
# get anchor indicator confidence
diff_live_ai = abs(live_ai_slow - live_ai_fast)
delta_live_ai = abs(diff_live_ai - ai_delta_narrow)
if (delta_max_ai > 0):
rate_indicator_delta_anchor = delta_live_ai / delta_max_ai
else:
rate_indicator_delta_anchor = 0
wconf_indicator_delta_anchor = weight_confidence_indicator_delta_anchor * rate_indicator_delta_anchor
# get scalp indicator confidence
diff_live_si = abs(live_si_slow - live_si_fast)
delta_live_si = abs(diff_live_si - si_delta_narrow)
if delta_max_si > 0:
rate_indicator_delta_scalp = delta_live_si / delta_max_si
else:
rate_indicator_delta_scalp = 0
wconf_indicator_delta_scalp = weight_confidence_indicator_delta_scalp * rate_indicator_delta_scalp
if trade_enabled_semafors is not None:
# get live trade-enabled confidence
live_idx = 0
rate_trade_enabled_live = 0
if trade_enabled_semafors[live_idx]:
rate_trade_enabled_live = 1
wconf_trade_enabled_live = weight_confidence_trade_enabled_live * rate_trade_enabled_live
# get history trade-enabled history confidence
trade_enabled_hist_len = len(trade_enabled_semafors)
trade_enabled_hist_count = 0
for idx in range(1, trade_enabled_hist_len):
if trade_enabled_semafors[idx]:
trade_enabled_hist_count = trade_enabled_hist_count + 1
if trade_enabled_hist_len > 1:
rate_trade_enabled_hist = trade_enabled_hist_count / (trade_enabled_hist_len - 1)
else:
rate_trade_enabled_hist = 0
wconf_trade_enabled_hist = weight_confidence_trade_enabled_hist * rate_trade_enabled_hist
else:
confidence_offset = 0.5
# finally return confidence sum
total_confidence = confidence_offset + \
wconf_indicator_delta_anchor + \
wconf_indicator_delta_scalp + \
wconf_trade_enabled_live + \
wconf_trade_enabled_hist
# return sum of all confidences
self.trade_confidence = total_confidence
return total_confidence
# ---------------------
def Get_Rolling_Indicators_Delta_Narrow(self, rw_i_fast, rw_i_slow):
# ---------------------
# init
i_delta_narrow = 0
# go!
for idx in range(rw_i_fast.Size):
i_fast = self.value(rw_i_fast[idx])
i_slow = self.value(rw_i_slow[idx])
curr_delta = abs(i_slow - i_fast)
if i_delta_narrow == 0 or curr_delta < i_delta_narrow:
i_delta_narrow = curr_delta
# finally
return i_delta_narrow
# ---------------------
def Get_Rolling_Indicators_Delta_Wide(self, rw_i_fast, rw_i_slow):
# ---------------------
# init
i_delta_wide = 0
# go!
for idx in range(rw_i_fast.Size):
i_fast = self.value(rw_i_fast[idx])
i_slow = self.value(rw_i_slow[idx])
curr_delta = abs(i_slow - i_fast)
if i_delta_wide == 0 or curr_delta > i_delta_wide:
i_delta_wide = curr_delta
# finally
return i_delta_wide
# ---------------------
def Calc_Insight_Diff(self):
# ---------------------
# init
# get indicator rolling windows
rw_ai_fast = self.anchorIndicators_RollingWindows["ema_8_anchor"]
rw_ai_slow = self.anchorIndicators_RollingWindows["ema_21_anchor"]
rw_si_fast = self.scalpIndicators_RollingWindows["ema_8_scalp"]
rw_si_slow = self.scalpIndicators_RollingWindows["ema_21_scalp"]
# Go!
# calc mean derivative (anchor)
ai_mid_diff = self.Get_Hist_Diff(rw_ai_fast, rw_ai_slow)
# calc mean derivative (scalp)
si_mid_diff = self.Get_Hist_Diff(rw_si_fast, rw_si_slow)
# finally return weighted derivative (70% for anchor and 30% for scalp)
weighted_mid_diff = ai_mid_diff * .7 + si_mid_diff * .3
# return result
return weighted_mid_diff
# ---------------------
def Get_Hist_Diff(self, rw_i_fast, rw_i_slow):
# ---------------------
# init
dx = 1
rw_i_fast_values = self.ind_values(rw_i_fast)
rw_i_slow_values = self.ind_values(rw_i_slow)
# calc derivatives of fast/slow indicators
dy_rw_i_fast = diff(rw_i_fast_values)/dx
dy_rw_i_slow = diff(rw_i_fast_values)/dx
# calc mean derivative
dy_rw_i_fast_mean = np.mean(dy_rw_i_fast)
dy_rw_i_slow_mean = np.mean(dy_rw_i_slow)
dy_rw_i_mean = (dy_rw_i_fast_mean + dy_rw_i_slow_mean) / 2
if dy_rw_i_mean > 1:
pass
# finally
return dy_rw_i_mean
# ---------------------
def ind_values(self, indicators):
# ---------------------
result = []
for idx in range(indicators.Size):
result.append(self.value(indicators[idx]))
return result
# [ INSIGHT CALC REGION (END) ]
# [ PLOT REGION (BEGIN) ]
# ---------------------
def create_scalp_chart(self):
# ---------------------
self.update_price_chart(self.scalp_chart)
self.update_scalp_indicator_chart()
self.qcAlgo.AddChart(self.scalp_chart)
# ---------------------
def create_anchor_chart(self):
# ---------------------
self.update_price_chart(self.anchor_chart)
self.update_anchor_indicator_chart()
self.qcAlgo.AddChart(self.anchor_chart)
# ---------------------
def update_scalp_indicator_chart(self):
# ---------------------
# scalp series shorts (lines)
self.scalp_chart.AddSeries(Series(self.price_chart_serial_short_entry, SeriesType.Line, 0))
self.scalp_chart.AddSeries(Series(self.price_chart_serial_short_stop_loss, SeriesType.Line, 0))
self.scalp_chart.AddSeries(Series(self.price_chart_serial_short_exit_1, SeriesType.Line, 0))
self.scalp_chart.AddSeries(Series(self.price_chart_serial_short_exit_2, SeriesType.Line, 0))
# scalp series shorts (points)
self.scalp_chart.AddSeries(Series(self.price_chart_serial_short_trigger_point, SeriesType.Scatter, 0))
self.scalp_chart.AddSeries(Series(self.price_chart_serial_short_entry_point, SeriesType.Scatter, 0))
self.scalp_chart.AddSeries(Series(self.price_chart_serial_short_stop_loss_point, SeriesType.Scatter, 0))
self.scalp_chart.AddSeries(Series(self.price_chart_serial_short_exit_1_point, SeriesType.Scatter, 0))
self.scalp_chart.AddSeries(Series(self.price_chart_serial_short_exit_2_point, SeriesType.Scatter, 0))
# scalp series longs (lines)
self.scalp_chart.AddSeries(Series(self.price_chart_serial_long_entry, SeriesType.Line, 0))
self.scalp_chart.AddSeries(Series(self.price_chart_serial_long_stop_loss, SeriesType.Line, 0))
self.scalp_chart.AddSeries(Series(self.price_chart_serial_long_exit_1, SeriesType.Line, 0))
self.scalp_chart.AddSeries(Series(self.price_chart_serial_long_exit_2, SeriesType.Line, 0))
# scalp series longs (points)
self.scalp_chart.AddSeries(Series(self.price_chart_serial_long_trigger_point, SeriesType.Scatter, 0))
self.scalp_chart.AddSeries(Series(self.price_chart_serial_long_entry_point, SeriesType.Scatter, 0))
self.scalp_chart.AddSeries(Series(self.price_chart_serial_long_stop_loss_point, SeriesType.Scatter, 0))
self.scalp_chart.AddSeries(Series(self.price_chart_serial_long_exit_1_point, SeriesType.Scatter, 0))
self.scalp_chart.AddSeries(Series(self.price_chart_serial_long_exit_2_point, SeriesType.Scatter, 0))
# ---------------------
def update_anchor_indicator_chart(self):
# ---------------------
pass
# ---------------------
def update_price_chart(self, chart):
# ---------------------
# On the Trade Plotter Chart we want 2 series: open and close:
chart.AddSeries(Series(self.price_chart_serial_open, SeriesType.Line, 0))
chart.AddSeries(Series(self.price_chart_serial_close, SeriesType.Line, 0))
chart.AddSeries(Series(self.price_chart_serial_close_consolidated, SeriesType.Candle, 0))
chart.AddSeries(Series(self.price_chart_serial_open_price_point, SeriesType.Scatter, 0))
chart.AddSeries(Series(self.price_chart_serial_high_price_point, SeriesType.Scatter, 0))
chart.AddSeries(Series(self.price_chart_serial_low_price_point, SeriesType.Scatter, 0))
chart.AddSeries(Series(self.price_chart_serial_close_price_point, SeriesType.Scatter, 0))
chart.AddSeries(Series(self.price_chart_serial_ema_cross_point, SeriesType.Scatter, 0))
return chart
# ---------------------
def update_indicator_chart(self, chart, chart_type, ind_type, ind_periods):
# ---------------------
for period in ind_periods:
if chart_type == "anchor":
ticker = self.anchor_ticker(ind_type, period)
else:
ticker = self.scalp_ticker(ind_type, period)
chart.AddSeries(Series(ticker, SeriesType.Line, 0))
return chart
# [ PLOT REGION (END) ]
# [ HELPERS REGION (BEGIN) ]
# ---------------------
def GetRoundedOrderSize(self, orderQuantity):
# ---------------------
# init
# reset lot size
self.lotSize = self.qcAlgo.Securities[self.ticker].SymbolProperties.LotSize
if self.lotSize == 0:
self.Debug("Lot Size of " + self.symbol + " is 0 !!! Assuming LotSize=1 instead.")
self.lotSize = 1
# reset order size
self.orderSize = orderQuantity
# calc rounded order size
self.lotsCount = round(self.orderSize / self.lotSize)
self.roundedOrderSize = self.lotsCount * self.lotSize
# get result
return self.roundedOrderSize
# ---------------------
def IsChartHour(self, time):
# ---------------------
if not self.qcAlgo.IsWarmingUp:
if self.IsChartTime(time):
if self.qcAlgo.ChartStartHour == 0 or (time.hour >= self.qcAlgo.ChartStartHour and time.hour <= self.qcAlgo.ChartEndHour):
return True
return False
# ---------------------
def IsChartTime(self, time):
# ---------------------
cond_general = self.IsChartEnabled()
cond_date_start = time.replace(tzinfo=None) >= self.qcAlgo.ChartStartDate.replace(tzinfo=None)
cond_date_end = time.replace(tzinfo=None) <= self.qcAlgo.ChartEndDate.replace(tzinfo=None)
cond_hour = time.hour >= self.qcAlgo.ChartStartHour and time.hour <= self.qcAlgo.ChartEndHour
return cond_general & cond_date_start & cond_date_end & cond_hour
# ---------------------
def IsChartEnabled(self):
# ---------------------
return not self.qcAlgo.chart_disable
# ---------------------
def scalp_ticker(self, prefix, period):
# ---------------------
return prefix + "_" + str(period) + "_scalp"
# ---------------------
def anchor_ticker(self, prefix, period):
# ---------------------
return prefix + "_" + str(period) + "_anchor"
# ---------------------
def current_value(self, container):
# ---------------------
return container.Current.Value
# ---------------------
def value(self, container):
# ---------------------
return container.Value
# ---------------------
def anchor_ema_current_value(self, period):
# ---------------------
return self.current_value(self.anchorIndicators[self.anchor_ticker("ema", period)])
# ---------------------
def scalp_ema_current_value(self, period):
# ---------------------
return self.current_value(self.scalpIndicators[self.scalp_ticker("ema", period)])
# ---------------------
def scalp_elapsed(self, count_key):
# ---------------------
# exit if not end of scalp time frame
count = self.scalp_update_count[count_key]
if count < self.scalpPeriod - 1:
self.scalp_update_count[count_key] = count + 1
return False
else:
self.scalp_update_count[count_key] = 0
return True
# ---------------------
def anchor_elapsed(self, count_key):
# ---------------------
# exit if not end of anchor time frame
count = self.anchor_update_count[count_key]
if count < self.anchorPeriod - 1:
self.anchor_update_count[count_key] = count + 1
return False
else:
self.anchor_update_count[count_key] = 0
return True
# -------------------------------------------------------------------------
def pip_round(self, price):
# -------------------------------------------------------------------------
return self.toolbox.pip_round(price)
# ---------------------
def plus_pip(self, price, pip):
# ---------------------
return self.add_pip(price, pip)
# ---------------------
def minus_pip(self, price, pip):
# ---------------------
return self.add_pip(price, -1 * pip)
# ---------------------
def add_pip(self, price, pip):
# ---------------------
return self.toolbox.add_pip(price, pip)
# ---------------------
def pip_diff(self, a, b):
# ---------------------
return self.toolbox.pip_diff(a, b)
# ---------------------
def pip_diff_abs(self, a, b):
# ---------------------
return abs(self.pip_diff(a, b))
# ---------------------
def get_limits_as_str(self):
# ---------------------
return \
"CURR:" + str(self.pip_round(self.scalp_fluent_close)) + \
" STRT:" + str(self.pip_round(self.trade_start_level)) + \
" ORANGE-STOP:" + str(self.pip_round(self.trade_orange_risk_level)) + \
" STOP:" + str(self.pip_round(self.trade_given_stop_loss)) + \
" TAKE:" + str(self.pip_round(self.trade_take_profit_level))
# ---------------------
def trade_action_insight_benchmark(self, mode):
# ---------------------
mode_text = mode# + ". " + self.get_limits_as_str()
self.trade_action(mode_text, True, True)
# ---------------------
def trade_action_insight_nobenchmark(self, mode):
# ---------------------
mode_text = mode# + ". " + self.get_limits_as_str()
self.trade_action(mode_text, True, False)
# ---------------------
def trade_action_noinsight_benchmark(self, mode):
# ---------------------
mode_text = mode# + ". " + self.get_limits_as_str()
self.trade_action(mode_text, False, True)
# ---------------------
def trade_action_noinsight_nobenchmark(self, mode):
# ---------------------
mode_text = mode# + ". " + self.get_limits_as_str()
self.trade_action(mode_text, False, False)
# ---------------------
def trade_action_cancel_sl(self):
# ---------------------
msg = "CANCEL_STOPLOSS"
self.trade_action_noinsight_nobenchmark(msg)
# ---------------------
def trade_action_cancel_tp(self):
# ---------------------
msg = "CANCEL_TAKEPROFIT"
self.trade_action_noinsight_nobenchmark(msg)
# ---------------------
def trade_action_filled_tp_cancel_sl(self):
# ---------------------
msg = "FILLED_TAKEPROFIT_CANCEL_STOPLOSS"
self.trade_action_noinsight_nobenchmark(msg)
# ---------------------
def trade_action_filled_sl_cancel_tp(self):
# ---------------------
msg = "FILLED_STOPLOSS_CANCEL_TAKEPROFIT"
self.trade_action_noinsight_nobenchmark(msg)
# ---------------------
def trade_action_cancel_stoploss(self, mode):
# ---------------------
msg = str(self.symbol) + ": > filled TP, cancel SL"
self.trade_action_noinsight_nobenchmark(msg)
# ---------------------
def trade_action(self, mode, emit_insight = False, log_profit_benchmarks=True):
# ---------------------
self.Switch_Model(mode)
self.log_trade(log_profit_benchmarks)
if emit_insight:
self.Emit_Insight()
# ---------------------
def log_trade(self, log_profit_benchmarks):
# ---------------------
# init
trade_direction = self.trade_direction_str()
trend_direction = self.trend_direction_str()
# go!
if log_profit_benchmarks:
self.toolbox.show_csv_log(self.csv_sep_chr
+ str(self.symbol) + self.csv_sep_chr \
+ str(self.weight) + self.csv_sep_chr \
+ str(self.data.Time) + self.csv_sep_chr \
+ trade_direction + self.csv_sep_chr \
+ trend_direction + self.csv_sep_chr \
+ str(self.trade_model) + self.csv_sep_chr \
+ str(self.trade_confidence) + self.csv_sep_chr \
+ str(self.trade_magnitude) + self.csv_sep_chr \
+ str(self.pip_round(self.scalp_fluent_close)) + self.csv_sep_chr \
+ str(self.pip_round(self.entry_level)) + self.csv_sep_chr \
+ str(self.pip_round(self.stop_loss_level)) + self.csv_sep_chr \
+ str(self.pip_round(self.stop_loss_purple_level)) + self.csv_sep_chr \
+ str(self.pip_round(self.trade_risk)) + self.csv_sep_chr \
+ str(self.pip_round(self.trade_break_even)) + self.csv_sep_chr \
+ str(self.pip_round(self.trade_exit)) + self.csv_sep_chr \
+ str(self.pip_round(self.trade_take_profit)) + self.csv_sep_chr \
+ str(self.pip_round(self.trade_start_level)) + self.csv_sep_chr \
+ str(self.pip_round(self.trade_orange_risk_level)) + self.csv_sep_chr \
+ str(self.pip_round(self.trade_given_stop_loss)) + self.csv_sep_chr \
+ str(self.pip_round(self.trade_take_profit_level)) + self.csv_sep_chr \
+ str(self.Symbol_LastTradeProfit) + self.csv_sep_chr \
+ str(self.Symbol_UnrealizedProfit) + self.csv_sep_chr \
+ str(self.Symbol_UnrealizedProfitPercent) + self.csv_sep_chr \
+ str(self.Portfolio_Cash) + self.csv_sep_chr \
+ str(self.Portfolio_TotalProfit) + self.csv_sep_chr \
+ str(self.Portfolio_TotalUnrealizedProfit)
)
else:
self.toolbox.show_csv_log(self.csv_sep_chr
+ str(self.symbol) + self.csv_sep_chr \
+ str(self.weight) + self.csv_sep_chr \
+ str(self.data.Time) + self.csv_sep_chr \
+ trade_direction + self.csv_sep_chr \
+ trend_direction + self.csv_sep_chr \
+ str(self.trade_model) + self.csv_sep_chr \
+ str(self.trade_confidence) + self.csv_sep_chr \
+ str(self.trade_magnitude) + self.csv_sep_chr \
+ str(self.pip_round(self.scalp_fluent_close)) + self.csv_sep_chr \
+ "" + self.csv_sep_chr \
+ "" + self.csv_sep_chr \
+ "" + self.csv_sep_chr \
+ "" + self.csv_sep_chr \
+ "" + self.csv_sep_chr \
+ "" + self.csv_sep_chr \
+ "" + self.csv_sep_chr \
+ "" + self.csv_sep_chr \
+ "" + self.csv_sep_chr \
+ "" + self.csv_sep_chr \
+ "" + self.csv_sep_chr \
+ "" + self.csv_sep_chr \
+ "" + self.csv_sep_chr \
+ "" + self.csv_sep_chr \
+ "" + self.csv_sep_chr \
+ "" + self.csv_sep_chr \
+ ""
)
# [ HELPERS REGION (END) ]