Overall Statistics
Total Orders
1541
Average Win
1.68%
Average Loss
-0.87%
Compounding Annual Return
21.429%
Drawdown
32.100%
Expectancy
0.465
Start Equity
70000
End Equity
1564849.14
Net Profit
2135.499%
Sharpe Ratio
0.877
Sortino Ratio
1.063
Probabilistic Sharpe Ratio
34.979%
Loss Rate
50%
Win Rate
50%
Profit-Loss Ratio
1.93
Alpha
0.052
Beta
0.693
Annual Standard Deviation
0.16
Annual Variance
0.025
Information Ratio
0.103
Tracking Error
0.119
Treynor Ratio
0.202
Total Fees
$10003.11
Estimated Strategy Capacity
$120000000.00
Lowest Capacity Asset
ODFL R735QTJ8XC9X
Portfolio Turnover
3.75%
Drawdown Recovery
572
#region imports
from AlgorithmImports import *
import time
import random
import math
#endregion


class ForceIndexAlphaModel(AlphaModel):
    basket_capacity = 7
    trade_bar_count = 0
    trade_bar_none_count = 0

    def __init__(self, algorithm):
        self.algorithm = algorithm
        self._ema = {}
        self._fi = {}
        self._atr = {}
        self.pre_low = {}

        self.algorithm.schedule.on(
            algorithm.date_rules.every_day("QQQ"),
            # remember QC uses UTC+0 time and ET is UTC-4
            # algorithm.time_rules.after_market_open("QQQ", 0), 
            algorithm.time_rules.before_market_open("QQQ", 30),
            self.order
        )
        self.insights = []


    def order(self):
        self.algorithm.emit_insights(self.insights)
        # clear the insight list after emitting
        self.insights = []

    def indicators_update(self, algorithm, data):
        for symbol, trade_bar in data.items():
            self.pre_low[symbol].Add(trade_bar.low)

    def update(self, algorithm, data):
        self.indicators_update(algorithm, data)

        exit_insights_count = self.exit_insights_create(algorithm, data)

        self.entry_insights_create(algorithm, data, exit_insights_count)

        return []
        
    def entry_insights_create(self, algorithm, data, exit_insights_count):
        entry_insights_list = []
        for symbol, trade_bar in data.items():
            if (self._fi[symbol].is_ready and self._ema[symbol].is_ready) \
                and (not algorithm.portfolio[symbol].invested):

                if (trade_bar.close > self._ema[symbol].current.value) and (self._fi[symbol].current.value < 0):
                    entry_insights_list.append(
                        Insight(
                            symbol=symbol, 
                            period=timedelta(days=365 * 10),
                            type=InsightType.PRICE, 
                            direction=InsightDirection.UP, 
                            magnitude=trade_bar.volume * trade_bar.close,
                            # magnitude=math.fabs(self._fi[symbol].current.value),
                            confidence=1.0))
        entry_insights_list = sorted(entry_insights_list, key=lambda x: x.magnitude, reverse=True)

        securities_count = 0
        for security_holding in self.algorithm.portfolio.values():
            if security_holding.invested:
                securities_count += 1
        index = min(self.basket_capacity - (securities_count - exit_insights_count), self.basket_capacity)
        # self.algorithm.log(f"index {index}")
        entry_insights_list = entry_insights_list[:index]

        self.insights = self.insights + entry_insights_list
        return entry_insights_list
    
    def exit_insights_create(self, algorithm, data):
        exit_insights_count = 0
        for symbol, trade_bar in data.items():
            if algorithm.portfolio[symbol].invested and self.pre_low[symbol].is_ready and self._atr[symbol].is_ready:
                # self.algorithm.log(f"{trade_bar.close} : {self.pre_low[symbol][-1] - math.fabs(self._atr[symbol].current.value * 5.0)}")
                if trade_bar.close < (self.pre_low[symbol][-1] - math.fabs(1.0 * self._atr[symbol].current.value)):
                    insight = Insight(
                        symbol=symbol, 
                        period=timedelta(days=365 * 10),
                        type=InsightType.PRICE, 
                        direction=InsightDirection.DOWN, 
                        magnitude=trade_bar.volume * trade_bar.close,
                        confidence=1.0)
                    self.insights.append(insight)
                    exit_insights_count += 1
        return exit_insights_count

    def on_securities_changed(self, algorithm, changes):
        for security in changes.added_securities:
            self._ema[security.symbol] = algorithm.ema(security.symbol, 22, 0.5)
            self._fi[security.symbol] = algorithm.fi(security.symbol, 2)
            self._atr[security.symbol] = algorithm.atr(security.symbol, 22)
            self.pre_low[security.symbol] = RollingWindow[float](2)
# region imports
from AlgorithmImports import *
from alpha import ForceIndexAlphaModel
from portfolio import ForceIndexPortfolioConstructionModel
# endregion

class MeanReversionAlgorithm(QCAlgorithm):

    def initialize(self):
        self.set_end_date(datetime.now())
        self.set_start_date(self.end_date - timedelta(365 * 16))
        self.set_cash(70000)  # Set Strategy Cash
        self.set_warm_up(timedelta(30))
        
        self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)

        self.universe_settings.resolution = Resolution.DAILY
        symbols = [Symbol.create("QQQ", SecurityType.EQUITY, Market.USA)]
        self.add_universe_selection(ManualUniverseSelectionModel(symbols))
        # self.add_universe_selection(ETFConstituentsUniverseSelectionModel("SPY"))
        # self.add_universe_selection(ETFConstituentsUniverseSelectionModel("DJIA"))
        self.add_universe_selection(ETFConstituentsUniverseSelectionModel("QQQ"))
        self.set_benchmark("QQQ")

        self.add_alpha(ForceIndexAlphaModel(self))

        self.set_portfolio_construction(ForceIndexPortfolioConstructionModel(self))

        self.set_execution(ImmediateExecutionModel())

        # # Add risk management model to implement trailing stops, which adjusts stop-loss levels as the price moves favorably, helping to lock in gains and limit losses.
        self.add_risk_management(TrailingStopRiskManagementModel(0.15))
    
        
#region imports
from AlgorithmImports import *
import time
from alpha import ForceIndexAlphaModel
#endregion


 # Portfolio construction scaffolding class; basic method args.
class ForceIndexPortfolioConstructionModel(PortfolioConstructionModel):

    total_position = 1.0

    def __init__(self, algorithm) -> None:
        self.algorithm = algorithm

    # Determines the target percent for each insight
    def determine_target_percent(self, activeInsights: List[Insight]) -> Dict[Insight, float]:
        targets = {}
        for insight in activeInsights:
            if not self.algorithm.portfolio[insight.symbol].invested:  # new targets
                if insight.direction is InsightDirection.UP:
                    targets[insight] = self.total_position / ForceIndexAlphaModel.basket_capacity
            elif insight.direction is InsightDirection.DOWN:
                targets[insight] = 0.0
                insight.expire(self.algorithm.utc_time)
                self.algorithm.insights.cancel([insight.symbol])

        return targets
#region imports
from AlgorithmImports import *
from statsmodels.tsa.stattools import adfuller
#endregion

class StationarySelectionModel(ETFConstituentsUniverseSelectionModel):
    def __init__(self, algorithm, etf, lookback = 10, universe_settings = None):
        self.algorithm = algorithm
        self.lookback = lookback
        self.symbol_data = {}
        self.prices = {}

        symbol = Symbol.create(etf, SecurityType.EQUITY, Market.USA)
        super().__init__(symbol, universe_settings, self.etf_constituents_filter)

    def etf_constituents_filter(self, constituents):
        stationarity = {}

        for c in constituents:
            symbol = c.symbol
            if symbol not in self.symbol_data:
                self.symbol_data[symbol] = SymbolData(self.algorithm, symbol, self.lookback)
            data = self.symbol_data[symbol]

            # Update with the last price
            if c.market_value and c.shares_held:
                price = c.market_value / c.shares_held
                data.update(price)
            elif c.price != 0:
                data.update(c.price)
            elif symbol in self.prices:
                data.update(self.prices[symbol])
            # Cache the stationarity test statistics in the dict
            if data.test_statistics is not None:
                stationarity[symbol] = data.test_statistics

        # Return the top 10 lowest test statistics stocks (more negative stat means higher prob to have no unit root)
        selected = sorted(stationarity.items(), key=lambda x: x[1])
        return [x[0] for x in selected[:10]]

class PriceGetter(CoarseFundamentalUniverseSelectionModel):
    def __init__(self, universe):
        self.universe = universe
        super().__init__(self.selection)

    def selection(self, coarse):
        self.universe.prices = {c.symbol: c.price for c in coarse}
        return []

class SymbolData:
    def __init__(self, algorithm, symbol, lookback):
        # RollingWindow to hold log price series for stationary testing
        self.window = RollingWindow[float](lookback)
        self.model = None

        # Warm up RollingWindow
        history = algorithm.history[TradeBar](symbol, lookback, Resolution.DAILY)
        for bar in list(history)[:-1]:
            self.window.add(np.log(bar.close))

    def update(self, value):
        if value == 0: return

        # Update RollingWindow with log price
        self.window.add(np.log(value))
        if self.window.is_ready:
            # Test stationarity for log price series by augmented dickey-fuller test
            price = np.array(list(self.window))[::-1]
            self.model = adfuller(price, regression='ct', autolag='BIC')

    @property
    def test_statistics(self):
        return self.model[0] if self.model is not None else None