| Overall Statistics |
|
Total Trades 4 Average Win 0% Average Loss 0% Compounding Annual Return 13.645% Drawdown 10.300% Expectancy 0 Net Profit 7.221% Sharpe Ratio 0.832 Probabilistic Sharpe Ratio 43.012% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0.113 Beta 0.058 Annual Standard Deviation 0.147 Annual Variance 0.022 Information Ratio -0.093 Tracking Error 0.391 Treynor Ratio 2.099 Total Fees $4.67 |
from BankingIndustryStocks import BankingIndustryStocks
from datetime import datetime, timedelta
from numpy import sum
# import pandas as pd
# from grid_class import *
from gid_func import *
from collections import deque
def get_change(current, previous):
if current == previous:
return 0
try:
return round((abs(current - previous) / previous) * 100.0, 2)
except ZeroDivisionError:
return float('inf')
class CalibratedParticleAtmosphericScrubbers(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 2, 11) # Set Start Date
self.SetEndDate(2020, 8, 27) # Set End Date
self.SetCash(100000) # Set Strategy Cash
self.UniverseSettings.Resolution = Resolution.Daily
# self.UniverseSettings.Resolution = Resolution.Minute
self.SetUniverseSelection(BankingIndustryStocks())
self.AddAlpha(Grid_indicator("Grid"))
class Grid_indicator(AlphaModel):
def __init__(self, name):
self.symbolDataBySymbol = {}
self.Name = name
self.Time = datetime.min
self.Value = 0
def Update(self, algorithm, data):
self.HAS_RESET = False
insights = []
for symbol, item in self.symbolDataBySymbol.items():
#Want this to be an item specific seris index value aka 100 and for this to then reset just this one asset indicator, not all. Like it currently does.
if item.series_index == 150:
for mod in list(self.symbolDataBySymbol.values()):
if symbol == mod.Symbol:
algorithm.Debug(f"HIT max grid length of {item.series_index} for {mod.Symbol.Value}, REDRAW GRID!")
# algorithm.Debug(f"*** redo {mod.Symbol.Value} of indicator @ Index {item.series_index}*** ")
data_IN = {'Open' : data[mod.Symbol].Open,
'High' : data[mod.Symbol].High,
'Low' : data[mod.Symbol].Low,
'Close': data[mod.Symbol].Close
}
#Visually check the data in, make sure it is different for each symbol
algorithm.Debug(data_IN)
#This resets the dict with the latest grid for the specific symbol
self.symbolDataBySymbol[mod.Symbol] = SymbolData(mod.Symbol, data_IN, algorithm, self.Name)
algorithm.Debug(f"mod symbol: {mod.Symbol.Value}")
#Check out the new grid
algorithm.Debug(self.symbolDataBySymbol[mod.Symbol].blue_zero.head())
#Just print out one at blue 0 to visually check
algorithm.Debug(f"testing out blue0 @ Index 1: {self.symbolDataBySymbol[mod.Symbol].blue_zero.values[1][0]}")
#https://www.quantconnect.com/forum/discussion/7956/rolling-window-questions/p1
if data.ContainsKey(symbol) and data[symbol] is not None and self.HAS_RESET == False: #item.series_index != 5
algorithm.Log(f"{symbol.Value}: Blue 0 @ index {item.series_index} - {round(item.blue_zero.values[item.series_index][0], 2)} Low: {round(data[symbol].Close, 2)}")
algorithm.Log(f"{symbol.Value}: Index: {item.series_index}")
algorithm.Log(f"{symbol.Value}: Close: {round(data[symbol].Close, 2)}")
if not algorithm.Portfolio[symbol].Invested:
algorithm.SetHoldings(symbol, -0.1)
# # insights.append(Insight.Price(symbol, timedelta(days=150), InsightDirection.Down))
item.series_index += 1
return insights
def OnSecuritiesChanged(self, algorithm, changes):
for removed in changes.RemovedSecurities:
self.symbolDataBySymbol.pop(removed.Symbol, None)
algorithm.Debug(f"REMOVED Symbol in OnSecuritiesChanged: {removed.Symbol.Value}")
for added in changes.AddedSecurities:
if added.Symbol not in self.symbolDataBySymbol:
algorithm.Debug(f"Added Symbol in OnSecuritiesChanged: {added.Symbol.Value}")
data_IN = {'Open' : added.Open,
'High' : added.High,
'Low' : added.Low,
'Close': added.Close
}
algorithm.Debug(data_IN)
self.symbolDataBySymbol[added.Symbol] = SymbolData(added.Symbol, data_IN, algorithm, self.Name)
algorithm.Debug(f"blue_zero for {added.Symbol.Value} :{self.symbolDataBySymbol[added.Symbol].blue_zero.head(2)}")
class SymbolData:
def __init__(self, symbol, data_IN, algorithm, name):
self.Symbol = symbol
self.Name = name
self.Time = datetime.min
self.Value = 0
# self.OP_low = None
# self.long_reset = True
# self.RESET_POTENTIAL_FLAG = False
# self.ALREADY_TRADED = False
# self.POINT_B = False
# self.SHORT_TRADE_ENTER = False
# self.already_shorted = False
# self.LONG_TRADE_ENTER = False
# self.IN_LONG_TRADE = False
# self.LONG_A = False
self.blue_neg_one = None
self.blue_neg_two = None
self.blue_neg_three = None
self.blue_pos_one = None
self.blue_pos_two = None
self.blue_pos_three = None
self.green_neg_one = None
self.green_neg_two = None
self.green_neg_three = None
self.green_pos_one = None
self.green_pos_two = None
self.green_pos_three = None
self.green_zero = None
self.blue_zero = None
self.series_index = 0
self.indicator = self.makeIndicator(symbol, data_IN)
def makeIndicator(self, symbol, data_IN):
#just a random data here
data_IN = {'Date' : ["2020-08-31 09:30:00"],
'Open' : [data_IN["Open"]],
'High' : [data_IN["High"]],
'Low' : [data_IN["Low"]],
'Close': [data_IN["Close"]]}
data_IN = pd.DataFrame(data=data_IN)
data_IN.set_index("Date", inplace=True)
data_IN.index = pd.to_datetime(data_IN.index).strftime('%Y-%m-%d %H:%M:%S')
data_IN.index = pd.to_datetime(data_IN.index)
self.LOW_series = get_price_series(price_point="Low", interval="1d", data=data_IN)
self.blue_neg_one = self.LOW_series['blue neg1']
self.blue_neg_two = self.LOW_series['blue neg2']
self.blue_neg_three = self.LOW_series['blue neg3']
self.blue_pos_one = self.LOW_series['blue pos1']
self.blue_pos_two = self.LOW_series['blue pos2']
self.blue_pos_three = self.LOW_series['blue pos3']
self.green_neg_one = self.LOW_series['green neg1']
self.green_neg_two = self.LOW_series['green neg2']
self.green_neg_three = self.LOW_series['green neg3']
self.green_pos_one = self.LOW_series['green pos1']
self.green_pos_two = self.LOW_series['green pos2']
self.green_pos_three = self.LOW_series['green pos3']
self.green_zero = self.LOW_series['green 0']
self.blue_zero = self.LOW_series['blue 0']
# self.Time = input.EndTime
self.Value = self.blue_zero.iloc[0].values[0]
#This check does not seem to do anything
return len(self.blue_zero) == 150from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
class BankingIndustryStocks(FundamentalUniverseSelectionModel):
'''
This module selects the most liquid stocks listed on the Nasdaq Stock Exchange.
'''
def __init__(self, filterFineData = True, universeSettings = None, securityInitializer = None):
'''Initializes a new default instance of the TechnologyUniverseModule'''
super().__init__(filterFineData, universeSettings, securityInitializer)
self.numberOfSymbolsCoarse = 1000
self.numberOfSymbolsFine = 100
self.dollarVolumeBySymbol = {}
self.lastMonth = -1
def SelectCoarse(self, algorithm, coarse):
'''
Performs a coarse selection:
-The stock must have fundamental data
-The stock must have positive previous-day close price
-The stock must have positive volume on the previous trading day
'''
if algorithm.Time.month == self.lastMonth:
return Universe.Unchanged
sortedByDollarVolume = sorted([x for x in coarse if x.HasFundamentalData and x.Volume > 0 and x.Price > 0],
key = lambda x: x.DollarVolume, reverse=True)[:self.numberOfSymbolsCoarse]
self.dollarVolumeBySymbol = {x.Symbol:x.DollarVolume for x in sortedByDollarVolume}
# If no security has met the QC500 criteria, the universe is unchanged.
if len(self.dollarVolumeBySymbol) == 0:
return Universe.Unchanged
return list(self.dollarVolumeBySymbol.keys())
def SelectFine(self, algorithm, fine):
'''
Performs a fine selection for companies in the Morningstar Banking Sector
'''
# Filter stocks and sort on dollar volume
sortedByDollarVolume = sorted([x for x in fine if x.AssetClassification.MorningstarIndustryGroupCode == MorningstarIndustryGroupCode.Banks],
key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse=True)
if len(sortedByDollarVolume) == 0:
return Universe.Unchanged
self.lastMonth = algorithm.Time.month
return [x.Symbol for x in sortedByDollarVolume[:self.numberOfSymbolsFine]][:2]import matplotlib
import pandas as pd
import numpy as np
import re
from math import *
import os
def get_price_series(price_point: str, interval: str = "1d",
csv_records: int = 150, data=None):
class Line:
slope = 0.0
intercept = 0.0
point = (0, 0)
color = ""
def __init__(self, slope: float, intercept: float, color: str, point: tuple, label: str):
self.slope = slope
self.intercept = intercept
self.label = label
self.point = point
self.color = color
def get_y(self, x: float):
return self.slope * float(x) + self.intercept
print("Initializing data...")
lines = []
date_price_series = {}
grid_lines = []
col = price_point
#This is for GE calibration
blue_d = 30.0
green_d = 36.0
blue_deg = 45.0
green_deg = 103.0
const_blue_d = 1.06
const_green_d = 0.66
const_blue_angle = 1.24
const_green_angle = 1.09
ratio = 0.14001806684733514
xpx = 6.692936331960709
dpi = 100.0
# We will be using x_y_angle everywhere since it is in radians
blue_angle = radians(blue_deg)
green_angle = radians(green_deg)
frequency_unit = re.findall(r'[A-Za-z]+|\d+', interval)[1]
frequency = re.findall(r'[A-Za-z]+|\d+', interval)[0]
interval = frequency + frequency_unit
if interval != "1d":
matplotlib.rcParams['timezone'] = 'US/Eastern'
else:
matplotlib.rcParams['timezone'] = 'UTC'
'''
This is because it the one and only row of a dataframe
'''
xp = 0.0
yp = data[col][int(xp)]
point = (xp, yp)
# ratio = (ymax - ymin)/(xmax - xmin)
# print(ratio)
y_line_slope = 1.0 / tan(green_angle * const_green_angle) * ratio
y_line_intercept = yp - y_line_slope * xp
x_line_slope = tan(blue_angle * const_blue_angle) * ratio
x_line_intercept = yp - x_line_slope * xp
lines.append(Line(x_line_slope, x_line_intercept, 'blue', point, "blue 0"))
lines.append(Line(y_line_slope, y_line_intercept, "green", point, "green 0"))
for line in lines:
original_intercept = line.intercept
if line.color == "green":
g_d_inches = green_d / 25.4
inches = g_d_inches / sin(green_angle - pi / 2.0)
xp = inches * dpi / xpx * const_green_d
else:
b_d_inches = blue_d / 25.4
inches = b_d_inches / sin(blue_angle)
xp = inches * dpi / xpx * const_blue_d
for direction in (-1.0, 1.0):
intercept = original_intercept
for x in range(5):
yp = intercept
intercept = yp + abs(line.slope) * xp * direction
point = (xp, yp)
if direction == 1: # going up
pos = "pos" + str(x + 1)
else:
pos = "neg" + str(x + 1)
label = line.color + " " + pos
li = Line(line.slope, intercept, line.color, point, label)
grid_lines.append(li)
all_lines = lines + grid_lines
xp = lines[0].point[0]
for line in all_lines:
date = data.index[int(xp)]
new_intercept = xp * line.slope + line.intercept
prices = []
for x in range(csv_records):
price = round(line.slope * x + new_intercept, 8)
prices.append(price)
df = {col: prices}
df = pd.DataFrame(df)
df.index.name = "Date"
print("Making:", line.label)
date_price_series[line.label] = df
return date_price_series
# Your New Python File