| Overall Statistics |
|
Total Orders 543 Average Win 1.03% Average Loss -1.56% Compounding Annual Return 198.229% Drawdown 35.900% Expectancy 0.396 Start Equity 1000000 End Equity 16575810.02 Net Profit 1557.581% Sharpe Ratio 2.42 Sortino Ratio 3.532 Probabilistic Sharpe Ratio 88.850% Loss Rate 16% Win Rate 84% Profit-Loss Ratio 0.66 Alpha 1.459 Beta 0.637 Annual Standard Deviation 0.608 Annual Variance 0.37 Information Ratio 2.407 Tracking Error 0.603 Treynor Ratio 2.31 Total Fees $4169.46 Estimated Strategy Capacity $2000000.00 Lowest Capacity Asset NVMI RTSESF5CFJHH Portfolio Turnover 1.17% |
# region imports
from AlgorithmImports import *
# endregion
class CreativeApricotShark(QCAlgorithm):
def initialize(self):
self.SetStartDate(2022, 1, 1)
self.SetEndDate(2024, 12, 30)
self.SetCash(1000000)
#self.AddEquity("SPY", Resolution.Daily)
#self.tickers = ['AAPL','MSFT','GOOGL','AMZN', 'META','TSLA', 'SPY']#: 287% # winners
#self.tickers = ['XOM','JNJ','KO','MCD', 'MDT','SHW', 'CTAS'] #182.98 % # high dividend
self.tickers = ['FIVE','GPCR','STRL','NVMI', 'ONTO','ASML', 'VKTX'] # losers: 1,050.50 %
self.candles = {}
self.percent = 0.25
self.open_positions = []
self.transactions_history = pd.DataFrame(columns=['Date', 'Stock', 'Type of Transaction', 'Candle', 'Buy Price', 'Qty', 'Sell Price', 'P/L'])
self.last_sell_date = self.StartDate
# determines the investment percentage of the totalCash
self.percent = 0.05
# We are also going to have a stop-loss metric associated with each position
# to reduce the drawdown
self.stop_loss_threshold = -0.2
# Let us also explore the possibility of SMA anf FMA crossovers
self.fast_moving_averages = {}
self.slow_moving_averages = {}
self.mv_state = {} # Track the moving average state for each stock
# Set warm-up period to ensure indicators are ready
self.SetWarmUp(200, Resolution.Daily)
# Initialize moving averages
for ticker in self.tickers:
equity = self.AddEquity(ticker, Resolution.Daily)
self.candles[ticker] = Candle(self, ticker)
# Create the moving averages
self.fast_moving_averages[ticker] = self.SMA(ticker, 50, Resolution.Daily)
self.slow_moving_averages[ticker] = self.SMA(ticker, 200, Resolution.Daily)
self.mv_state[ticker] = None # Initialize the state to None
def OnData(self, data):
if self.IsWarmingUp:
return
for ticker, candle in self.candles.items():
if ticker not in data.Bars:
self.debug(ticker)
continue
bar = data.Bars[ticker]
candle.Update(bar)
# # Check for moving average crossover
# if self.fast_moving_averages[ticker].IsReady and self.slow_moving_averages[ticker].IsReady:
# fast_mv = self.fast_moving_averages[ticker].Current.Value
# slow_mv = self.slow_moving_averages[ticker].Current.Value
# # Check if slow moving average crosses below the fast moving average
# if slow_mv < fast_mv and self.mv_state[ticker] != "sell":
# self.Debug(f"Slow MV crossed below Fast MV for {ticker} at {self.Time}")
# self.sell_all_positions()
# self.mv_state[ticker] = "sell"
# break # Exit loop after selling all
# # Check if fast moving average crosses above the slow moving average
# if fast_mv > slow_mv and self.mv_state[ticker] == "sell":
# self.Debug(f"Fast MV crossed above Slow MV for {ticker} at {self.Time}")
# self.enter_new_positions(data)
# self.mv_state[ticker] = "buy"
# break # Exit loop after entering new positions
if candle.shouldExit():
self.close_positions([position for position in self.open_positions if position['Stock'] == ticker], data[ticker].Close, 'SELL', candleStick=candle.getPatternName())
elif candle.shouldEnter():
portfolio_value = self.Portfolio.TotalPortfolioValue
allocation = portfolio_value * self.percent # Allocate self.percent of portfolio value to each position
quantity = allocation // data[ticker].Close
# quantity = (10000 / data[ticker].Close + 1)
#self.Debug(f"Buying {quantity} shares of {ticker} at {data[ticker].Close} on {self.Time}")
self.MarketOrder(ticker, quantity)
self.open_positions.append({'Date': self.Time, 'Qty': quantity, 'Buy Price': data[ticker].Close, 'Stock': ticker, 'Paper P/L': 0, 'Paper P/L %': 0})
# Everyday we will calculate the Paper profit of each open position
self.calculate_paper_pl(data)
# Each day we will calculate to see if our stop-loss thresholf is being hit
self.check_stop_loss(data)
#self.check_and_sell_every_30_days(data)
def sell_all_positions(self):
'''Sell all open positions'''
for position in self.open_positions:
ticker = position['Stock']
qty = position['Qty']
self.MarketOrder(ticker, -qty)
self.open_positions = []
def enter_new_positions(self, data):
'''Enter a new position for each stock'''
for ticker in self.tickers:
if ticker in data.Bars:
portfolio_value = self.Portfolio.TotalPortfolioValue
allocation = portfolio_value * self.percent # Allocate 5% of portfolio value to each position
quantity = allocation // data[ticker].Close
self.Debug(f"Entering new position: Buying {quantity} shares of {ticker} at {data[ticker].Close} on {self.Time}")
self.MarketOrder(ticker, quantity)
self.open_positions.append({'Date': self.Time, 'Qty': quantity, 'Buy Price': data[ticker].Close, 'Stock': ticker, 'Paper P/L': 0, 'Paper P/L %': 0, 'Max Price': data[ticker].Close})
def check_and_sell_every_30_days(self, data):
if (self.Time - self.last_sell_date) >= timedelta(days=60):
self.sell_20_percent_profit(data)
self.last_sell_date = self.Time
def sell_20_percent_profit(self, data):
getProfit = []
for position in self.open_positions:
ticker = position['Stock']
if ticker not in data.Bars:
continue
price = data.Bars[ticker].Open
qty = position['Qty']
paperValue = qty * price
paperPL = paperValue - qty * position['Buy Price']
if paperPL > 0:
price = data.Bars[ticker].Open
qty = position['Qty']
sellQty = 0.25 * qty
if qty <= 4:
sellQty = qty
position['Qty'] -= sellQty
transaction = {'Date': self.Time, 'Stock': ticker, 'Type of Transaction': 'SELL 30D POSITION', 'Buy Price': position['Buy Price'],
'Sell Price': price, 'Qty': sellQty, 'P/L': (price - position['Buy Price']) * sellQty}
getProfit.append(transaction)
for transaction in getProfit:
ticker = transaction['Stock']
self.transactions_history.loc[len(self.transactions_history)] = transaction
self.Debug(f"Transaction {transaction}")
self.MarketOrder(ticker, -transaction['Qty'])
self.open_positions = [p for p in self.open_positions if p['Qty'] > 0]
def close_positions(self, open_positions, price, heading, candleStick=""):
'''
We sell 25% of each open position whenever our exit position candle occurs
'''
for o in open_positions:
qty = o['Qty']
sellQty = self.percent * qty
if qty <= 4:
sellQty = qty
leftQty = qty - sellQty
paperValue = sellQty * price
PL = paperValue - sellQty * o['Buy Price']
transaction = {'Date': o["Date"], 'Stock' : o['Stock'],'Type of Transaction' : heading, 'Candle': candleStick, 'Buy Price' : o['Buy Price'], 'Sell Price' : price, 'Qty': sellQty , 'P/L': PL}
self.transactions_history.loc[len(self.transactions_history)] = transaction
#self.Debug(f"Transacton {transaction}")
self.MarketOrder(o['Stock'], -sellQty)
o['Qty'] = leftQty
self.open_positions = [o for o in open_positions if o['Qty'] > 0]
def calculate_paper_pl(self, data):
'''
We regularly take out profits, if a current open position has a unrealized profit of greater than 30%
'''
getProfit = []
for position in self.open_positions:
ticker = position['Stock']
if ticker not in data.Bars:
continue
# Calculating the paper profit
price = data.Bars[ticker].Open
qty = position['Qty']
paperValue = qty * price
paperPL = paperValue - qty * position['Buy Price']
paperPLPercentage = paperPL / (qty * position['Buy Price'])
if paperPLPercentage > 0.3:
# Selling 25% of the position if paper profit is > 30%
sellQty = 0.25 * qty
if qty <= 4:
sellQty = qty
position['Qty'] -= sellQty
transaction = {'Date': self.Time, 'Stock': ticker, 'Type of Transaction': 'SELL FRAC', 'Buy Price': position['Buy Price'],
'Sell Price': price, 'Qty': sellQty, 'P/L': paperPL * (sellQty / qty)}
getProfit.append(transaction)
for transaction in getProfit:
ticker = transaction['Stock']
self.transactions_history.loc[len(self.transactions_history)] = transaction
#self.Debug(f"Transacton {transaction}")
#self.transactions_history = self.transactions_history.append(transaction, ignore_index=True)
# Selling a part of the position
self.MarketOrder(ticker, -transaction['Qty'])
self.open_positions = [p for p in self.open_positions if p['Qty'] > 0]
def check_stop_loss(self, data):
'''
This function iterates through all the open positions and liquidates
the entire position if our stop-loss is hit
'''
positions_to_liquidate = []
for position in self.open_positions:
ticker = position['Stock']
if ticker not in data.Bars:
continue
price = data.Bars[ticker].Open
qty = position['Qty']
paperValue = qty * price
paperPL = paperValue - qty * position['Buy Price']
paperPLPercentage = paperPL / (qty * position['Buy Price'])
if paperPLPercentage <= self.stop_loss_threshold:
self.Debug(f"Stop loss hit for position: {position}, current price = {price}, loss = {paperPL} ,loss per = {paperPLPercentage}")
positions_to_liquidate.append(position)
for position in positions_to_liquidate:
ticker = position['Stock']
# Liquidating the entire position
self.MarketOrder(ticker, -qty)
#self.close_positions([position], data[ticker].Close, 'STOP LOSS')
# We need to remove the position from open positions since we have liquidated
# the entire position
self.open_positions.remove(position)
class Candle:
def __init__(self, algorithm, ticker, frac=0.9):
self.algorithm = algorithm
self.ticker = ticker
self.frac = frac
self.bb = BollingerBands(20, 2, MovingAverageType.Simple)
self.bb2 = BollingerBands(20, 1, MovingAverageType.Simple)
self.rsi = RelativeStrengthIndex(14, MovingAverageType.Simple)
self.macd = MovingAverageConvergenceDivergence(12, 26, 9, MovingAverageType.Simple)
self.sma = SimpleMovingAverage(50)
self.data = []
self.pattern_name = ""
def Update(self, bar):
self.data.append(bar)
if len(self.data) > 2:
self.data.pop(0)
self.bb.Update(bar.EndTime, bar.Close)
self.bb2.Update(bar.EndTime, bar.Close)
self.rsi.Update(bar.EndTime, bar.Close)
self.macd.Update(bar.EndTime, bar.Close)
self.sma.Update(bar.EndTime, bar.Close)
def return_OHLC(self, candle):
return candle.Open, candle.High, candle.Low, candle.Close
def return_stats(self, candle):
delta_v = candle.Close - candle.Open
max_vi = max(candle.Close, candle.Open)
min_vi = min(candle.Close, candle.Open)
return delta_v, max_vi, min_vi
def shouldEnter(self):
if len(self.data) < 2:
return False
if self.isHangingMan() or self.isBullishEngulfing() or self.isDragonFlyDoji():
return True
return False
def shouldExit(self):
if len(self.data) < 2:
return False
if self.isInvertedHammer() : # look into why no. of trades is decreasing on more candle sticks
return True
return False
def isHangingMan(self):
candle = self.data[-1]
curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle)
curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle)
if ( (curr_high - curr_low) > -4 * curr_delta_v) and \
((curr_close - curr_low)/(0.001 + curr_high - curr_low ) > 0.6) and \
(curr_open - curr_low)/(0.001 + curr_high - curr_low ) > 0.6 and \
(curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value):
self.pattern_name = "Hanging Man"
return True
return False
def isInvertedHammer(self):
candle = self.data[-1]
curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle)
curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle)
if ( (curr_high - curr_low) > -3 * curr_delta_v) and \
((curr_high - curr_close)/(0.001 + curr_high - curr_low ) > 0.6) and \
(curr_high - curr_open)/(0.001 + curr_high - curr_low ) > 0.6 and \
(curr_close >= self.bb.UpperBand.Current.Value or curr_close <= self.bb.LowerBand.Current.Value) and \
(curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value):
self.pattern_name = "Inverted Hammer"
return True
return False
def isDragonFlyDoji(self):
candle = self.data[-1]
curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle)
curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle)
if ( curr_open == curr_close or (abs(curr_delta_v) / (curr_high - curr_low) < 0.1 )) and \
((curr_high - curr_max_vi) < (3 * abs(curr_delta_v)) ) and \
((curr_min_vi - curr_low) > (3 * abs(curr_delta_v)) ) and \
(curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value):
self.pattern_name = "DragonFly Doji"
return True
return False
def isGravestoneDoji(self):
candle = self.data[-1]
curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle)
curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle)
if ( curr_open == curr_close or(abs(curr_delta_v) / (curr_high - curr_low) < 0.1 )) and \
((curr_high - curr_max_vi) > (3 * abs(curr_delta_v)) ) and \
((curr_min_vi - curr_low) <= (3 * abs(curr_delta_v)) ) and \
(curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value):
self.pattern_name = "Gravestone Doji"
return True
return False
def isBullishEngulfing(self):
if len(self.data) < 2:
return False
candle = self.data[-1]
prev_candle = self.data[-2]
curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle)
prev_open, prev_high, prev_low, prev_close = self.return_OHLC(prev_candle)
curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle)
prev_delta_v, prev_max_vi, prev_min_vi = self.return_stats(prev_candle)
if curr_close >= prev_open and prev_open > prev_close and \
curr_close > curr_open and prev_close >= curr_open and \
(curr_close - curr_open) > (prev_open - prev_close) and \
(curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value):
self.pattern_name = "Bullish Engulfing"
return True
return False
def isBearishEngulfing(self):
if len(self.data) < 2:
return False
candle = self.data[-1]
prev_candle = self.data[-2]
curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle)
prev_open, prev_high, prev_low, prev_close = self.return_OHLC(prev_candle)
curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle)
prev_delta_v, prev_max_vi, prev_min_vi = self.return_stats(prev_candle)
if curr_open >= prev_close and prev_close > prev_open and \
curr_close < curr_open and prev_open >= curr_close and \
(curr_open - curr_close) > (prev_close - prev_open) and \
(curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value):
self.pattern_name = "Bearish Engulfing"
return True
return False
def getPatternName(self):
return self.pattern_name