| Overall Statistics |
|
Total Orders 112 Average Win 6.26% Average Loss -4.60% Compounding Annual Return 41.824% Drawdown 44.600% Expectancy 0.881 Start Equity 5000 End Equity 29215.40 Net Profit 484.308% Sharpe Ratio 0.945 Sortino Ratio 1.037 Probabilistic Sharpe Ratio 37.206% Loss Rate 20% Win Rate 80% Profit-Loss Ratio 1.36 Alpha 0 Beta 0 Annual Standard Deviation 0.34 Annual Variance 0.116 Information Ratio 1.024 Tracking Error 0.34 Treynor Ratio 0 Total Fees $116.44 Estimated Strategy Capacity $3700000.00 Lowest Capacity Asset BRKB R735QTJ8XC9X Portfolio Turnover 2.63% |
# region imports
from AlgorithmImports import *
# endregion
class MarketCapFactor:
def __init__(self, security):
self._security = security
@property
def value(self):
return self._security.fundamentals.market_cap
class SortinoFactor:
def __init__(self, algorithm, symbol, lookback):
self._sortino = algorithm.sortino(symbol, lookback, resolution=Resolution.DAILY)
@property
def value(self):
return self._sortino.current.value
class KERFactor:
def __init__(self, algorithm, symbol, lookback):
self._ker = algorithm.ker(symbol, lookback, resolution=Resolution.DAILY)
@property
def value(self):
return self._ker.current.value
class FCFYieldFactor:
"""Free Cash Flow Yield factor"""
def __init__(self, security):
self._security = security
@property
def value(self):
try:
fundamentals = self._security.fundamentals
return fundamentals.valuation_ratios.FCFYield
except:
return np.nan
class CorrFactor:
def __init__(self, algorithm, symbol, reference, lookback):
self._c = algorithm.c(symbol, reference, lookback, correlation_type=CorrelationType.Pearson, resolution=Resolution.DAILY)
@property
def value(self):
return 1 - abs(self._c.current.value)
class ROCFactor:
def __init__(self, algorithm, symbol, lookback):
self._roc = algorithm.roc(symbol, lookback, resolution=Resolution.DAILY)
@property
def value(self):
return self._roc.current.value# region imports
from AlgorithmImports import *
from scipy import optimize
from scipy.optimize import Bounds
from factors import *
# endregion
class FactorWeightOptimizationAlgorithm(QCAlgorithm):
_selection_data_by_symbol = {}
def initialize(self):
self.set_start_date(2020, 1, 1)
self.set_cash(5000)
self.settings.automatic_indicator_warm_up = True
spy = Symbol.create('SPY', SecurityType.EQUITY, Market.USA)
# Add a universe of hourly data.
self.universe_settings.resolution = Resolution.Minute
self.universe_size = self.get_parameter('universe_size', 8)
self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)
self.set_security_initializer(
BrokerageModelSecurityInitializer(self.brokerage_model, FuncSecuritySeeder(self.get_last_known_prices))
)
#self._universe = self.add_universe(self.universe.etf(spy, self.UniverseSettings, self.ETFConstituentsFilter2))
self._universe = self.add_universe(self.universe.etf(spy, universe_filter_func=lambda constituents: [c.symbol for c in sorted([c for c in constituents if c.weight], key=lambda c: c.weight)[-self.universe_size:]]))
self._lookback = self.get_parameter('lookback', 21) # Set a 21-day trading lookback.
self.set_execution(FSLBSpreadExecutionModel(0.009))
self.leverageset = 1.9
# Create a Schedule Event to rebalance the portfolio.
self.schedule.on(self.date_rules.month_start(spy), self.time_rules.after_market_open(spy, 31), self._rebalance)
#self.schedule.on(self.date_rules.week_start(spy), self.time_rules.after_market_open(spy, 35), self._rebalance)
#self.schedule.on(self.date_rules.every([5]), self.time_rules.before_market_close(spy, 1), self._rebalance)
#self.schedule.on(self.date_rules.every_day(spy), self.TimeRules.Every(TimeSpan.FromHours(24*(self._lookback))), self._rebalance)
self.set_warm_up(timedelta(self._lookback+1))
def ETFConstituentsFilter(self, constituents):
# Laden historischer Daten für alle Symbole
symbols = [c.symbol for c in constituents if c.weight]
history = self.History(symbols, 1, Resolution.Daily)
# Speichern der letzten bekannten Preise
prices = {}
for symbol in symbols:
if not history.empty and symbol in history.index.levels[0]:
prices[symbol] = history.loc[symbol].iloc[-1]['close']
else:
prices[symbol] = None # Kein Preis verfügbar
# Filtern Sie die Konstituenten basierend auf Gewicht und Preis
filtered_constituents = [
c for c in constituents
if c.weight and prices.get(c.symbol) is not None and prices.get(c.symbol) < 50
]
return [c.symbol for c in sorted(filtered_constituents, key=lambda c: c.weight)[-self.universe_size:]]
def ETFConstituentsFilter2(self, constituents):
# Laden historischer Daten für alle Symbole
filtered_constituents = [c for c in constituents if c.weight is not None]
return [c.symbol for c in sorted(filtered_constituents, key=lambda c: c.weight)[-self.universe_size:]]
def on_securities_changed(self, changes):
for security in changes.added_securities: # Create factors for assets that enter the universe.
# security.factors = [MarketCapFactor(security), SortinoFactor(self, security.symbol, self._lookback)]
security.factors = [FCFYieldFactor(security), KERFactor(self, security.symbol, self._lookback)]
def _rebalance1(self):
if self.IsMarketOpen("SPY"):
# Get raw factor values of the universe constituents.
factors_df = pd.DataFrame()
for symbol in self._universe.selected:
for i, factors in enumerate(self.securities[symbol].factors):
factors_df.loc[symbol, i] = factors.value
# Calculate the factor z-scores.
factor_zscores = (factors_df - factors_df.mean()) / factors_df.std()
# Run an optimization to find optimal factor weights. Objective: Maximize trailing return. Initial guess: Equal-weighted.
trailing_return = self.history(list(self._universe.selected), self._lookback, Resolution.DAILY).close.unstack(0).pct_change(self._lookback-1).iloc[-1]
num_factors = factors_df.shape[1]
factor_weights = optimize.minimize(lambda weights: -(np.dot(factor_zscores, weights) * trailing_return).sum(), x0=np.array([1.0/num_factors] * num_factors), method='Nelder-Mead', bounds=Bounds([0] * num_factors, [1] * num_factors), options={'maxiter': 10}).x
# Calculate the portfolio weights. Ensure the portfolio is long-only with 100% exposure, then rebalance the portfolio.
portfolio_weights = (factor_zscores * factor_weights).sum(axis=1)
portfolio_weights = portfolio_weights[portfolio_weights > 0]
self.set_holdings([PortfolioTarget(symbol, self.leverageset*weight/portfolio_weights.sum()) for symbol, weight in portfolio_weights.items()], True)
pass
def _rebalance(self):
# Überprüfen, ob der Markt offen ist
if not self.IsMarketOpen("SPY"):
return
# 1. Erstellen des Factors-DataFrame
factors_df = pd.DataFrame()
for symbol in self._universe.selected:
if symbol in self.securities:
for i, factors in enumerate(self.securities[symbol].factors):
factors_df.loc[symbol, i] = factors.value
# Sicherstellen, dass es Faktoren gibt
if factors_df.empty:
self.Debug("Factors DataFrame is empty. Skipping rebalance.")
return
# Berechnen der Z-Scores
factor_zscores = (factors_df - factors_df.mean()) / factors_df.std()
# 2. Berechnung der trailing returns
history_data = self.history(list(self._universe.selected), self._lookback, Resolution.DAILY)
if 'close' not in history_data.columns.get_level_values(-1):
self.Debug("Close prices not available in historical data.")
return
# Zugriff auf 'close'-Preise
if isinstance(history_data.columns, pd.MultiIndex):
# Zugriff auf die 'close'-Spalte des MultiIndex
close_prices = history_data.xs('close', axis=1, level=-1).unstack(level=0)
else:
# Falls kein MultiIndex vorhanden ist
close_prices = history_data['close'].unstack(level=0)
# Berechnung der Rückgaben
trailing_return = close_prices.pct_change(self._lookback - 1).iloc[-1]
# 3. Optimierung der Faktor-Gewichte
num_factors = factors_df.shape[1]
initial_guess = np.array([1.0 / num_factors] * num_factors)
def objective_function(weights):
portfolio_return = np.dot(factor_zscores, weights) * trailing_return
return -portfolio_return.sum() # Maximieren, daher negatives Vorzeichen
result = optimize.minimize(
objective_function,
x0=initial_guess,
method='Nelder-Mead',
bounds=Bounds([0] * num_factors, [1] * num_factors),
options={'maxiter': 10}
)
# Überprüfen, ob die Optimierung erfolgreich war
if not result.success:
self.Debug(f"Optimization failed: {result.message}")
return
factor_weights = result.x
# 4. Portfolio-Gewichte berechnen und normalisieren
portfolio_weights = (factor_zscores @ factor_weights).clip(0)
portfolio_weights /= portfolio_weights.sum() # Normalisierung auf 100%
# 5. Rebalancieren des Portfolios
targets = [
PortfolioTarget(symbol, self.leverageset * weight)
for symbol, weight in portfolio_weights.items()
if weight > 0
]
self.SetHoldings(targets, True)
class FSLBSpreadExecutionModel(ExecutionModel):
'''Execution model that submits orders while the current spread is tight.
Note this execution model will not work using Resolution.DAILY since
Exchange.exchange_open will be false, suggested resolution is Minute
'''
def __init__(self, accepting_spread_percent=0.005):
'''Initializes a new instance of the SpreadExecutionModel class'''
self.targets_collection = PortfolioTargetCollection()
# Gets or sets the maximum spread compare to current price in percentage.
self.accepting_spread_percent = Math.abs(accepting_spread_percent)
def execute(self, algorithm, targets):
'''Executes market orders if the spread percentage to price is in
desirable range, prioritizing sell orders.
Args:
algorithm: The algorithm instance
targets: The portfolio targets'''
# update the complete set of portfolio targets with the new targets
self.targets_collection.add_range(targets)
# for performance we check count value, OrderByMarginImpact and
# ClearFulfilled are expensive to call
if not self.targets_collection.is_empty:
# 1. Separate targets into sell and buy orders using get_unordered_quantity
sell_targets = []
buy_targets = []
for target in self.targets_collection:
security = algorithm.securities[target.symbol]
quantity = OrderSizing.get_unordered_quantity(algorithm, target, security, True)
if quantity < 0:
sell_targets.append(target)
elif quantity > 0:
buy_targets.append(target)
# 2. Process sell orders first
for target in sell_targets:
symbol = target.symbol
security = algorithm.securities[symbol]
unordered_quantity = OrderSizing.get_unordered_quantity(algorithm, target, security, True)
buying_power = algorithm.portfolio.get_buying_power(symbol, OrderDirection.SELL)
margin_remaining = algorithm.portfolio.margin_remaining
'''
if(margin_remaining <= 0):
algorithm.liquidate(symbol)
self.targets_collection.remove(target)
continue'''
if unordered_quantity != 0:
algorithm.market_order(symbol, unordered_quantity)
# 3. Process buy orders
if algorithm.portfolio.margin_remaining>0:
for target in buy_targets:
symbol = target.symbol
security = algorithm.securities[symbol]
unordered_quantity = OrderSizing.get_unordered_quantity(algorithm, target, security, True)
margin_remaining = algorithm.portfolio.margin_remaining
if self.spread_is_favorable(security):
# Berechnung des maximalen Kaufkraftniveaus basierend auf der Margin
if 1.2 * algorithm.Securities[symbol].AskPrice * unordered_quantity > algorithm.Portfolio.MarginRemaining and unordered_quantity != 0:
unordered_quantity = math.floor(algorithm.Portfolio.MarginRemaining / algorithm.Securities[symbol].AskPrice)
# Berechnung des initialen Margin-Bedarfs für die angeforderte Menge
if unordered_quantity > 0:
# Wenn genügend freie Margin verfügbar ist, um die Order auszuführen
if algorithm.Portfolio.MarginRemaining - algorithm.Securities[symbol].BuyingPowerModel.GetInitialMarginRequirement(InitialMarginParameters(algorithm.Securities[symbol], unordered_quantity)).Value >= 0:
#algorithm.Debug(f"Genügend Kaufkraft für {unordered_quantity} {symbol}. Order wird ausgeführt.")
algorithm.MarketOrder(symbol, unordered_quantity)
self.targets_collection.clear_fulfilled(algorithm)
def spread_is_favorable(self, security):
'''Determines if the spread is in desirable range.'''
# Price has to be larger than zero to avoid zero division error, or
# negative price causing the spread percentage < 0 by error
# Has to be in opening hours of exchange to avoid extreme spread in
# OTC period
return security.exchange.exchange_open \
and security.price > 0 and security.ask_price > 0 and security.bid_price > 0 \
and (security.ask_price - security.bid_price) / security.price <= self.accepting_spread_percent
class MarketCapFactor:
def __init__(self, security):
self._security = security
@property
def value(self):
return self._security.fundamentals.market_cap
class SortinoFactor:
def __init__(self, algorithm, symbol, lookback):
self._sortino = algorithm.sortino(symbol, lookback, resolution=Resolution.DAILY)
@property
def value(self):
return self._sortino.current.value
class KERFactor:
def __init__(self, algorithm, symbol, lookback):
self._ker = algorithm.ker(symbol, lookback, resolution=Resolution.DAILY)
@property
def value(self):
return self._ker.current.value
class FCFYieldFactor:
"""Free Cash Flow Yield factor"""
def __init__(self, security):
self._security = security
@property
def value(self):
try:
fundamentals = self._security.fundamentals
return fundamentals.valuation_ratios.FCFYield
except:
return np.nan
class CorrFactor:
def __init__(self, algorithm, symbol, reference, lookback):
self._c = algorithm.c(symbol, reference, lookback, correlation_type=CorrelationType.Pearson, resolution=Resolution.DAILY)
@property
def value(self):
return 1 - abs(self._c.current.value)
class ROCFactor:
def __init__(self, algorithm, symbol, lookback):
self._roc = algorithm.roc(symbol, lookback, resolution=Resolution.DAILY)
@property
def value(self):
return self._roc.current.value