Overall Statistics
Total Orders
98
Average Win
0.36%
Average Loss
-0.24%
Compounding Annual Return
19.473%
Drawdown
2.000%
Expectancy
1.022
Start Equity
1000000
End Equity
1143195.57
Net Profit
14.320%
Sharpe Ratio
1.408
Sortino Ratio
1.906
Probabilistic Sharpe Ratio
91.839%
Loss Rate
20%
Win Rate
80%
Profit-Loss Ratio
1.51
Alpha
0.076
Beta
0.028
Annual Standard Deviation
0.055
Annual Variance
0.003
Information Ratio
0.078
Tracking Error
0.185
Treynor Ratio
2.764
Total Fees
$1140.08
Estimated Strategy Capacity
$0
Lowest Capacity Asset
SHV TP8J6Z7L419H
Portfolio Turnover
6.96%
Drawdown Recovery
37
from AlgorithmImports import *
import numpy as np
import pandas as pd
from datetime import timedelta

class DefenseADRAlgorithm(QCAlgorithm):
    def initialize(self):
        self.set_start_date(2022, 2, 20)
        self.set_end_date(2025, 9, 30)
        self.set_account_currency("USD")
        self.set_cash(100000)
        self.set_time_zone(TimeZones.NEW_YORK)

        self.tickers = [
            "LMT", "RTX", "NOC", "GD", "HII", "LHX", "BWXT", "KTOS", "AVAV",
            "ESLT", "TDG", "HEI", "HWM", "CW", "PLTR", "MOG.A", "PSN",
            "VSEC", "DRS", "CAE", "OSK", "GE", "BA", "AXON", "CW", "RKLB", "HEI.A"
        ]

        self.symbols = [self.add_equity(t, Resolution.DAILY).symbol for t in self.tickers]

        self.safe_assets = {
            self.add_equity("SHV", Resolution.DAILY).symbol: 0.5,
            self.add_equity("SHY", Resolution.DAILY).symbol: 0.25,
            self.add_equity("GLD", Resolution.DAILY).symbol: 0.25
        }

        self.lookback = 20
        self.max_weight_per_name = 0.25
        self.adr_index = []
        self.days_at_high = 0
        self.phase = "neutral"

        self.adr_percentile = 70
        self.adr_floor = 2.0
        self.crash_threshold = -2.0
        self.daily_loss_limit = -1.5

        self.positive_candle_days = 2

        self.cooldown_until = None
        self.prev_portfolio_value = None

        self.schedule.on(
            self.date_rules.every_day(),
            self.time_rules.after_market_open(self.symbols[0], 30),
            self.update_adr_index
        )

        self.schedule.on(
            self.date_rules.every(DayOfWeek.FRIDAY),
            self.time_rules.after_market_open(self.symbols[0], 0),
            self.rebalance
        )

        for minute in range(30, 391, 30):
            self.schedule.on(
                self.date_rules.every_day(),
                self.time_rules.after_market_open(self.symbols[0], minute),
                self.check_intraday_crash
            )

        self.schedule.on(
            self.date_rules.every_day(),
            self.time_rules.before_market_close(self.symbols[0], 10),
            self.check_daily_equity_drop
        )

        self.set_warm_up(self.lookback + 1)

    def momentum_exhaustion_filter(self, symbol):
        hist = self.history([symbol], 6 * 5, Resolution.DAILY)
        if hist.empty:
            return False

        weekly_closes = hist["close"].unstack(level=0)[symbol].resample('W-FRI').last()
        weekly_changes = weekly_closes.pct_change()

        recent = weekly_changes[-5:]
        green_weeks = (recent > 0).sum()

        return green_weeks >= 4

    def update_adr_index(self):
        if self.is_warming_up:
            return

        prices = []
        for symbol in self.symbols:
            hist = self.history([symbol], self.lookback, Resolution.DAILY)
            if hist.empty or len(hist.index) < self.lookback:
                continue
            closes = hist["close"].unstack(level=0)[symbol]
            prices.append(closes.iloc[-1] / closes.iloc[0])

        if len(prices) > 0:
            adr_value = np.mean(prices)
            self.adr_index.append(adr_value)

            if len(self.adr_index) > 50:
                self.adr_index.pop(0)

            if len(self.adr_index) >= 20:
                short_sma = np.mean(self.adr_index[-5:])
                long_sma = np.mean(self.adr_index[-20:])
                slope_up = short_sma > long_sma

                if adr_value >= max(self.adr_index):
                    self.days_at_high += 1
                else:
                    self.days_at_high = 0

                if slope_up and self.days_at_high <= 3:
                    self.phase = "momentum"
                elif slope_up and self.days_at_high > 3:
                    self.phase = "extended"
                elif not slope_up:
                    self.phase = "pullback"
                else:
                    self.phase = "neutral"

    def check_intraday_crash(self):
        if self.is_warming_up or len(self.portfolio.keys()) == 0:
            return

        invested_symbols = [s for s in self.portfolio.keys() if self.portfolio[s].invested]
        if len(invested_symbols) == 0:
            return

        hist = self.history(invested_symbols, 30, Resolution.MINUTE)
        if hist.empty or "close" not in hist.columns:
            return

        hist_closes = hist["close"].unstack(level=0)
        returns = []

        for symbol in invested_symbols:
            if symbol not in hist_closes.columns:
                continue
            closes = hist_closes[symbol]
            if len(closes) < 2:
                continue
            change_pct = (closes.iloc[-1] / closes.iloc[0] - 1) * 100.0
            returns.append(change_pct)

        if len(returns) == 0:
            return

        avg_return = np.mean(returns)

        if avg_return < self.crash_threshold:
            for holding in self.portfolio.values():
                if holding.invested and holding.symbol not in self.safe_assets:
                    self.set_holdings(holding.symbol, 0)
            for sym, weight in self.safe_assets.items():
                self.set_holdings(sym, weight)

    def check_daily_equity_drop(self):
        if self.is_warming_up:
            return

        today_value = self.portfolio.total_portfolio_value

        if self.prev_portfolio_value is not None:
            daily_return = (today_value / self.prev_portfolio_value - 1) * 100.0

            if daily_return < self.daily_loss_limit:
                for holding in self.portfolio.values():
                    if holding.invested and holding.symbol not in self.safe_assets:
                        self.set_holdings(holding.symbol, 0)
                for sym, weight in self.safe_assets.items():
                    self.set_holdings(sym, weight)
                self.cooldown_until = self.time + timedelta(days=2)

        self.prev_portfolio_value = today_value

    def rebalance(self):
        if self.is_warming_up:
            return

        in_cooldown = self.cooldown_until and self.time < self.cooldown_until
        allow_only_rebounds = in_cooldown

        if self.phase == "pullback":
            for holding in self.portfolio.values():
                if holding.invested and holding.symbol not in self.safe_assets:
                    self.set_holdings(holding.symbol, 0)
            for sym, weight in self.safe_assets.items():
                self.set_holdings(sym, weight)
            return

        adr_map = {}
        trend_ok = {}
        exhaustion_ok = {}
        positive_candles_ok = {}

        if len(self.adr_index) >= self.lookback:
            market_trend = self.adr_index[-1] >= self.adr_index[-self.lookback]
        else:
            market_trend = True

        for symbol in self.symbols:
            hist = self.history([symbol], self.lookback, Resolution.DAILY)
            if hist.empty or len(hist.index) < self.lookback:
                continue

            highs = hist["high"].unstack(level=0)[symbol]
            lows = hist["low"].unstack(level=0)[symbol]
            closes = hist["close"].unstack(level=0)[symbol]
            opens = hist["open"].unstack(level=0)[symbol]

            adr = 100 * (np.mean(highs / lows) - 1)
            adr_map[symbol] = adr

            trend_ok[symbol] = (closes.iloc[-1] > closes.iloc[0]) and market_trend

            exhaustion_ok[symbol] = not self.momentum_exhaustion_filter(symbol)

            if len(closes) >= self.positive_candle_days:
                positive_candles = all(closes.iloc[-i] > opens.iloc[-i] for i in range(1, self.positive_candle_days + 1))
                positive_candles_ok[symbol] = positive_candles
            else:
                positive_candles_ok[symbol] = False

        if not adr_map:
            for holding in self.portfolio.values():
                if holding.invested and holding.symbol not in self.safe_assets:
                    self.set_holdings(holding.symbol, 0)
            for sym, weight in self.safe_assets.items():
                self.set_holdings(sym, weight)
            return

        adr_values = list(adr_map.values())
        rank_threshold = np.percentile(adr_values, self.adr_percentile)
        dynamic_threshold = max(rank_threshold, self.adr_floor)
        strong_buy_threshold = np.percentile(adr_values, 90)

        selected = [
            s for s, adr in adr_map.items()
            if adr > (strong_buy_threshold if allow_only_rebounds else dynamic_threshold)
            and trend_ok.get(s, False)
            and exhaustion_ok.get(s, True)
            and positive_candles_ok.get(s, False)
        ]

        if not selected:
            for holding in self.portfolio.values():
                if holding.invested and holding.symbol not in self.safe_assets:
                    self.set_holdings(holding.symbol, 0)
            for sym, weight in self.safe_assets.items():
                self.set_holdings(sym, weight)
            return

        total_exposure = 1.0 if self.phase == "momentum" else 0.5 if self.phase == "extended" else 1.0

        k = len(selected)
        equal_weight = total_exposure / k if k > 0 else 0
        target_weight = min(equal_weight, self.max_weight_per_name)

        for holding in self.portfolio.values():
            if holding.symbol not in selected and holding.invested:
                self.set_holdings(holding.symbol, 0)

        for sym in selected:
            self.set_holdings(sym, target_weight)

    def on_data(self, data: Slice):
        pass