Overall Statistics Total Trades340Average Win0.24%Average Loss-0.21%Compounding Annual Return7.466%Drawdown10.100%Expectancy0.405Net Profit24.112%Sharpe Ratio0.647Probabilistic Sharpe Ratio22.435%Loss Rate35%Win Rate65%Profit-Loss Ratio1.15Alpha0.07Beta-0.228Annual Standard Deviation0.085Annual Variance0.007Information Ratio-0.089Tracking Error0.155Treynor Ratio-0.241Total Fees$362.52Estimated Strategy Capacity$890000000.00Lowest Capacity AssetSPY R735QTJ8XC9X
# This code is provided for informational purposes only.
# Do NOT trade using it or you WILL loose money.

from SimpleLinearRegressionChannel import SimpleLinearRegressionChannel

class SimpleLinearRegressionChannelAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2016, 1, 1)
self.SetEndDate(2018, 12, 31)
self.SetCash(100000)

self.slrc = SimpleLinearRegressionChannel(self, 26, 26, 3.0)
self.SetWarmUp(self.slrc.WarmUpPeriod, Resolution.Daily)

def OnData(self, data):
if data.ContainsKey(self.spy) and data[self.spy] is not None:
self.slrc.Update(data[self.spy])
else:
return

return

# Since our indicator is ready, we can use it to compare the current bar with the simple linear regression projection.
(low, mid, high) = self.slrc.GetProjection()

# self.Debug(f"({low}, {mid}, {high})")

bar = data[self.spy]
minBarBody = min(bar.Open, bar.Close)
maxBarBody = max(bar.Open, bar.Close)
posSlope = self.slrc.GetSlope() > 0.0

if posSlope:
if minBarBody > mid:
self.SetHoldings(self.spy, 1)
elif minBarBody > low:
self.SetHoldings(self.spy, 0.7)
elif minBarBody < low and maxBarBody > low:
self.SetHoldings(self.spy, 0.3)
elif maxBarBody < low:
self.Liquidate(self.spy)
else:
if maxBarBody < mid:
self.SetHoldings(self.spy, -1)
elif maxBarBody < high:
self.SetHoldings(self.spy, -0.7)
elif maxBarBody > high and minBarBody < high:
self.SetHoldings(self.spy, -0.3)
elif minBarBody > high:
self.Liquidate(self.spy)

# This code is provided for informational purposes only.
# Do NOT trade using it or you WILL loose money.

from collections import deque
from statistics import stdev

class SimpleLinearRegressionChannel(PythonIndicator):
def __init__(self, algorithm: QCAlgorithm, base_period: int, projection_period: int, channel_width: float):
super().__init__()
assert base_period > 0, f"{self.__init__.__qualname__}: base_period must be greater than 0."
assert projection_period > 0, f"{self.__init__.__qualname__}: projection_period must be greater than 0."
assert channel_width >= 0.0, f"{self.__init__.__qualname__}: channel_width must be greater than or equal to 0.0."
if base_period < 10:
algorithm.Log(f"Warning - {self.__init__.__qualname__}: base_period is less than 10. This is very few data points to compute a simple linear regression.")
self._algorithm = algorithm
self._base_period = base_period
self._x = list(range(1, base_period + 1))
self._x_sum = sum(self._x)
self._x_mean = self._x_sum / base_period
self._diffs_x_mean = [(x_i - self._x_mean) for x_i in self._x]
self._B1_den = sum(pow(x_i, 2) for x_i in self._diffs_x_mean)
self._projection_period = projection_period
self._channel_width = channel_width
self._stdev = None
self.Value = None

self._base_window = deque(maxlen=base_period)
self._B0 = None
self._B1 = None
self._projection_window = deque(maxlen=projection_period)

self._R_den_x = (base_period * sum(pow(x_i, 2) for x_i in self._x)) - pow(self._x_sum, 2)

self.WarmUpPeriod = base_period

@property
return (len(self._base_window) == self._base_window.maxlen) and (len(self._projection_window) >= 1)

def Update(self, _input):
if len(self._base_window) != self._base_window.maxlen:
self._base_window.append(_input.Close)
else:
projection_size = len(self._projection_window)
if projection_size == 0:
self._simple_linreg()
if projection_size != self._projection_window.maxlen:
self._projection_window.append(_input.Close)
else:
self._reset(_input)

def _simple_linreg(self):
y_mean = sum(self._base_window) / self._base_period

B1_num = sum((x_j * y_j) for x_j, y_j in zip(self._diffs_x_mean, [(y_i - y_mean) for y_i in self._base_window]))
self._B1 = B1_num / self._B1_den
self._B0 = y_mean - (self._B1 * self._x_mean)

self._stdev = stdev(self._base_window)

def _reset(self, _input):
self._base_window.clear()
if self._base_period > self._projection_period:
self._base_window.extend(self._projection_window)
self._base_window.append(_input.Close)
self._projection_window.clear()
else:
while len(self._base_window) != self._base_window.maxlen:
self._base_window.append(self._projection_window.popleft())
self._simple_linreg()
self._projection_window.append(_input.Close)

def GetSlope(self):
return self._B1
else:
return None

def GetProjection(self):
x = self._base_period + len(self._projection_window)
y = self._B0 + (self._B1 * x)
channel = self._channel_width * self._stdev
return (y - channel, y, y + channel)
else:
return (None, None, None)

def GetCorrelationCoefficient(self):