Overall Statistics
Total Trades
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Net Profit
0%
Sharpe Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
-8.674
Tracking Error
0.133
Treynor Ratio
0
Total Fees
$0.00
from collections import deque
import numpy as np

class ModulatedCalibratedThrustAssembly(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2020, 11, 1)  # Set Start Date
        self.SetCash(100000)  # Set Strategy Cash
        self.sym = self.AddEquity("SPY", Resolution.Daily).Symbol
        self.vi = VI(10)
        self.RegisterIndicator(self.sym, self.vi, Resolution.Daily)
        
        self.rwUptrend = RollingWindow[float](10)
        self.rwDowntrend = RollingWindow[float](10)
        
        self.vi.RegisterCallback(self.OnVIUpdate)
        
        self.SetWarmUp(10)
        
    def OnVIUpdate(self, sender):
        self.rwUptrend.Add(sender.Uptrend)
        self.rwDowntrend.Add(sender.Downtrend)
        
    def OnData(self, data):
        if self.rwUptrend.IsReady:
            self.Log(list(self.rwUptrend))
        
class VI:
    def __init__(self, period):
        self.Name = "Vortex"
        self.Time = datetime.min
        self.IsReady = False
        self.Value = 0 #arbitrary, not to be read - just outputted so as not to cause compiler err. 
        self.Uptrend = 0
        self.Downtrend = 0
        self.queue = deque(maxlen=period+1)
        self.Period = period
        
        self.callbacks = []
        
    def __repr__(self):
        return f"{self.Name} -> IsReady: {self.IsReady}, Time: {self.Time}, Value: {self.Value}"
        
    def RegisterCallback(self, f):
        self.callbacks.append(f)
    
    def Update(self, input):
        self.queue.appendleft(input)
        self.Time = input.EndTime
        
        sum_trn = 0
        sum_vm_uptrend = 0
        sum_vm_downtrend = 0
        
        lows = np.array([float(x.Low) for x in self.queue])
        highs = np.array([float(x.High) for x in self.queue])
        closes = np.array([float(x.Close) for x in self.queue])  
        
        if len(self.queue) == self.queue.maxlen:
            for i in range(0, self.queue.maxlen - 1):
                sum_trn += np.nanmax(
                    np.array([
                        highs[i] - lows[i],
                        highs[i] - closes[i+1],
                        lows[i] - closes[i+1]
                ]))
                
                sum_vm_uptrend += abs(highs[i] - lows[i+1])
                sum_vm_downtrend += abs(lows[i] - highs[i+1])
            
            self.Downtrend = sum_vm_uptrend / sum_trn
            self.Uptrend = sum_vm_downtrend / sum_trn
    
        
        self.IsReady = len(self.queue) == self.queue.maxlen
        
        for f in self.callbacks:
            f(self)
        
        return self.IsReady