| Overall Statistics |
|
Total Orders 387 Average Win 5.75% Average Loss -4.17% Compounding Annual Return 301.028% Drawdown 49.100% Expectancy 0.429 Start Equity 100000 End Equity 1856076.48 Net Profit 1756.076% Sharpe Ratio 3.465 Sortino Ratio 3.858 Probabilistic Sharpe Ratio 94.682% Loss Rate 40% Win Rate 60% Profit-Loss Ratio 1.38 Alpha 1.958 Beta 1.253 Annual Standard Deviation 0.591 Annual Variance 0.35 Information Ratio 3.479 Tracking Error 0.568 Treynor Ratio 1.635 Total Fees $42858.22 Estimated Strategy Capacity $770000.00 Lowest Capacity Asset SVIX XX86EIYKAK4L Portfolio Turnover 47.29% |
#region imports
from AlgorithmImports import *
#endregion
import os
import re
def extract_stock_symbols(project_dir):
# Create a set to store unique stock symbols
symbols_set = set()
# Traverse through all subdirectories in the project directory
for root, dirs, files in os.walk(project_dir):
for file in files:
if file.endswith(".py"):
file_path = os.path.join(root, file)
with open(file_path, "r") as f:
content = f.read()
# Extract stock symbols from the file content
symbols_array = extract_symbols(content)
# Add unique symbols to the set
symbols_set.update(symbols_array)
# Print the unique stock symbols to the terminal
print("Unique Stock Symbols:")
print("'" + "','".join(sorted(symbols_set)) + "'")
def extract_symbols(text):
pattern = r"'\b([A-Z]{2,5})\b'"
matches = re.findall(pattern, text)
return list(set(matches))
# Provide the path to your project directory
project_directory = "Strategies"
# Run the script
extract_stock_symbols(project_directory)#region imports
from AlgorithmImports import *
#endregion
import os
import re
def replace_pattern(project_dir):
pattern = r"CumReturn\(self\.algo,\s*'([A-Z]+)',\s*(\d+)\)\s*([<>]=?)\s*([-+]?\d+(?:\.\d+)?)"
for root, dirs, files in os.walk(project_dir):
for file in files:
if file.endswith(".py"):
file_path = os.path.join(root, file)
with open(file_path, "r") as f:
content = f.read()
matches = list(re.finditer(pattern, content))
modified_content = content
for match in reversed(matches):
found = match.group()
number = float(match.group(4))
if abs(number) >= 1:
fixed = replace_match(match)
modified_content = modified_content[:match.start()] + fixed + modified_content[match.end():]
print(f"File: {file_path}")
print(f"Found: {found}")
print(f"Fixed: {fixed}")
print("---")
if modified_content != content:
with open(file_path, "w") as f:
f.write(modified_content)
def replace_match(match):
ticker = match.group(1)
period = match.group(2)
operator = match.group(3)
number = float(match.group(4))
if abs(number) >= 1:
number /= 100
return f"CumReturn(self.algo, '{ticker}', {period}) {operator} {number}"
# Provide the path to your project directory
project_directory = "Strategies"
# Run the script
replace_pattern(project_directory)#region imports from AlgorithmImports import * #endregion # Your New Python File
#CombinedStrategy1/version1.py
from AlgorithmImports import *
from indicators import *
from main import DogeStrat
#https://app.composer.trade/symphony/2Jbph7DrhfBvRoCZgaKW/details
class SimonsKMLMSwitcherSinglePopsStrategy(DogeStrat):
def __init__(self, algo):
super().__init__()
self.algo = algo
def Execute(self):
self.group_KMLM_switcher()
def group_KMLM_switcher(self):
if RSI(self.algo, 'QQQE', 10) > 79:
AH(self.algo, 'UVXY', 1, 1)
elif RSI(self.algo, 'VTV', 10) > 79:
AH(self.algo, 'UVXY', 1, 1)
elif RSI(self.algo, 'VOX', 10) > 79:
AH(self.algo, 'UVXY', 1, 1)
elif RSI(self.algo, 'TECL', 10) > 79:
AH(self.algo, 'UVXY', 1, 1)
elif RSI(self.algo, 'VOOG', 10) > 79:
AH(self.algo, 'UVXY', 1, 1)
elif RSI(self.algo, 'VOOV', 10) > 79:
AH(self.algo, 'UVXY', 1, 1)
elif RSI(self.algo, 'XLP', 10) > 75:
AH(self.algo, 'UVXY', 1, 1)
elif RSI(self.algo, 'TQQQ', 10) > 79:
AH(self.algo, 'UVXY', 1, 1)
elif RSI(self.algo, 'XLY', 10) > 80:
AH(self.algo, 'UVXY', 1, 1)
elif RSI(self.algo, 'FAS', 10) > 80:
AH(self.algo, 'UVXY', 1, 1)
elif RSI(self.algo, 'SPY', 10) > 80:
AH(self.algo, 'UVXY', 1, 1)
else:
self.group_single_popped_KMLM()
def group_single_popped_KMLM(self):
if RSI(self.algo, 'UVXY', 21) > 65:
self.group_BSC()
else:
self.group_combined_pop_bot()
def group_BSC(self):
if RSI(self.algo, 'SPY', 21) > 30:
AH(self.algo, 'VIXM', 1, 1)
else:
AH(self.algo, 'SPXL', 1, 1)
def group_combined_pop_bot(self):
if RSI(self.algo, 'TQQQ', 10) < 30:
AH(self.algo, 'TECL', 1, 1)
elif RSI(self.algo, 'SOXL', 10) < 30:
AH(self.algo, 'SOXL', 1, 1)
elif RSI(self.algo, 'SPXL', 10) < 30:
AH(self.algo, 'SPXL', 1, 1)
else:
self.group_copypasta_YOLO_gainzs()
def group_copypasta_YOLO_gainzs(self):
self.group_KMLM_switcher_TECL_SVIX()
def group_KMLM_switcher_TECL_SVIX(self):
if RSI(self.algo, 'XLK', 10) > RSI(self.algo, 'KMLM', 10):
Sort(self.algo, 'RSI', ('TECL', 'SOXL', 'SVIX'), 10, False, 1, 1, 1)
else:
Sort(self.algo, 'RSI', ('SQQQ', 'TLT'), 10, True, 1, 1, 1)#indicators.py
from AlgorithmImports import *
from AlgorithmImports import *
import math
import pandas as pd
from cmath import sqrt
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Common")
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data.Custom import *
from QuantConnect.Python import PythonData
import csv
import io
import time
import json
import random
import re
def RSI(self, equity, period):
extension = min(period * 5, 250)
r_w = RollingWindow[float](extension)
history = self.History(equity, extension - 1, Resolution.Daily)
for historical_bar in history:
r_w.Add(historical_bar.Close)
while r_w.Count < extension:
current_price = self.Securities[equity].Price
r_w.Add(current_price)
if r_w.IsReady:
average_gain = 0
average_loss = 0
gain = 0
loss = 0
for i in range(extension - 1, extension - period - 1, -1):
if i - 1 < 0:
diff = random.randint(0, 99)
else:
diff = r_w[i - 1] - r_w[i]
gain += max(diff, 0)
loss += abs(min(diff, 0))
average_gain = gain / period
average_loss = loss / period
for i in range(extension - period - 1, 0, -1):
if i - 1 < 0:
diff = random.randint(0, 99)
else:
diff = r_w[i - 1] - r_w[i]
average_gain = (average_gain * (period - 1) + max(diff, 0)) / period
average_loss = (average_loss * (period - 1) + abs(min(diff, 0))) / period
if average_loss == 0:
return 100
else:
rsi = 100 - (100 / (1 + average_gain / average_loss))
return rsi
else:
return None
def CumReturn(self,equity,period):
history = self.History(equity,period,Resolution.Daily)
closing_prices = pd.Series([bar.Close for bar in history])
current_price = self.Securities[equity].Price
closing_prices = pd.concat([closing_prices, pd.Series([current_price])])
first_price = closing_prices.iloc[0]
if first_price == 0:
return 0
else:
return_val = (current_price/first_price) - 1
return return_val
def STD(self, equity, period):
r_w = RollingWindow[float](period + 1)
r_w_return = RollingWindow[float](period)
history = self.History(equity, period, Resolution.Daily)
for historical_bar in history:
r_w.Add(historical_bar.Close)
while r_w.Count < period + 1:
current_price = self.Securities[equity].Price
r_w.Add(current_price)
for i in range(period, 0, -1):
if r_w[i] != 0:
daily_return = (r_w[i-1] / r_w[i] - 1)
else:
daily_return = 0 # or any other appropriate value
r_w_return.Add(daily_return)
dfstd = pd.DataFrame({'r_w_return': r_w_return})
if r_w.IsReady:
std = dfstd['r_w_return'].std()
if std == 0:
return 0
else:
return std
else:
return 0
def MaxDD(self, equity, period):
history = self.History(equity,period - 1,Resolution.Daily)
closing_prices = pd.Series([bar.Close for bar in history])
current_price = self.Securities[equity].Price
closing_prices = pd.concat([closing_prices, pd.Series([current_price])])
rolling_max = closing_prices.cummax()
drawdowns = (rolling_max - closing_prices)/rolling_max
max_dd = drawdowns.min()
return max_dd
def SMA(self,equity,period):
r_w = RollingWindow[float](period)
history = self.History(equity,period - 1,Resolution.Daily)
for historical_bar in history:
r_w.Add(historical_bar.Close)
while r_w.Count < period:
current_price = self.Securities[equity].Price
r_w.Add(current_price)
if r_w.IsReady:
sma = sum(r_w)/period
return sma
else:
return 0
def IV(self,equity,period):
r_w = RollingWindow[float](period + 1)
r_w_return = RollingWindow[float](period)
history = self.History(equity,period,Resolution.Daily)
for historical_bar in history:
r_w.Add(historical_bar.Close)
while r_w.Count < period + 1:
current_price = self.Securities[equity].Price
r_w.Add(current_price)
for i in range (period,0,-1):
if r_w[i] == 0:
return 0
else:
daily_return = (r_w[i-1]/r_w[i] - 1)
r_w_return.Add(daily_return)
dfinverse = pd.DataFrame({'r_w_return':r_w_return})
if r_w.IsReady:
std = dfinverse['r_w_return'].std()
if std == 0:
return 0
else:
inv_vol = 1/std
return inv_vol
else:
return 0
def SMADayRet(self, equity, period):
r_w = RollingWindow[float](period + 1)
r_w_return = RollingWindow[float](period)
history = self.History(equity, period, Resolution.Daily)
for historical_bar in history:
r_w.Add(historical_bar.Close)
while r_w.Count < period + 1:
current_price = self.Securities[equity].Price
r_w.Add(current_price)
for i in range(period, 0, -1):
if r_w[i] == 0:
return 0 # Return 0 instead of None
daily_return = (r_w[i-1] / r_w[i] - 1)
r_w_return.Add(daily_return)
if r_w.IsReady:
smareturn = sum(r_w_return) / period
return smareturn
else:
return 0
def EMA(self,equity,period):
extension = period + 50
r_w = RollingWindow[float](extension)
history = self.History(equity,extension - 1,Resolution.Daily)
for historical_bar in history:
r_w.Add(historical_bar.Close)
while r_w.Count < extension:
current_price = self.Securities[equity].Price
r_w.Add(current_price)
if r_w.IsReady:
total_price = 0
for i in range(extension - 1,extension - period - 2,-1):
total_price += r_w[i]
average_price = total_price/period
for i in range(extension - period - 2,-1,-1):
average_price = r_w[i]*2/(period+1) + average_price*(1-2/(period+1))
return average_price
else:
return None
def Sort(self, sort_type, equities, period, reverse, num_assets, number, multiplier):
# Update the PT value for the given strategy number
PT = getattr(self, f"PT{number}") * multiplier
# Define a dictionary to map sort types to functions
indicator_functions = {
'EMA': EMA,
'RSI': RSI,
'CumReturn': CumReturn,
'STD': STD,
'MaxDD': MaxDD,
'SMA': SMA,
'IV': IV,
'SMADayRet': SMADayRet,
# Add other indicators here as needed
}
# Compute the indicator value for each equity
returns = {}
for equity in equities:
indicator_function = indicator_functions.get(sort_type, None)
if callable(indicator_function):
indicator_value = indicator_function(self, equity, period)
if indicator_value is not None:
returns[equity] = indicator_value
# Sort the equities based on the indicator values
sorted_equities = sorted(returns.items(), key=lambda x: x[1], reverse=reverse)
# Select the top 'num_assets' from the sorted list
top_equities = sorted_equities[:num_assets]
# Get the current HT and HTS attributes for the strategy
HT = getattr(self, f"HT{number}")
HTS = getattr(self, f"HTS{number}")
# Assign each of the top equities to the next available slot in HT and HTS
for equity, value in top_equities:
for i in HT.keys():
if HT[i] == 0:
HT[i] = PT / num_assets # Distribute PT evenly among the assets
HTS[i].append(equity) # Append the asset symbol
break # Move to the next asset after appending
# Update the HT and HTS attributes for the strategy
setattr(self, f"HT{number}", HT)
setattr(self, f"HTS{number}", HTS)
def AH(self,equities,PTnumber,multiplier): #AppendHolding
if not isinstance(equities,list):
equities = [equities]
HT = getattr(self,f"HT{PTnumber}")
HTS = getattr(self,f"HTS{PTnumber}")
PT = getattr(self,f"PT{PTnumber}") * multiplier
for equity in equities:
for i in HT.keys():
if HT[i] == 0:
HT[i] = PT
HTS[i].append(equity)
break
def GetCurrentPrice(self, symbol):
"""
Gets the current price of a security.
:param self: The self instance containing the securities.
:param symbol: The symbol of the security.
:return: The current price of the security or None if not available.
"""
if symbol in self.Securities:
return self.Securities[symbol].Price
else:
self.Debug(f"Symbol {symbol} not found in securities.")
return None
def AHIV(self, assets, period, PTnumber, multiplier):
if isinstance(assets, str):
assets = [assets]
# Calculate sum_IV_assets
tickers_IV = []
for ticker in assets:
try:
IV_value = eval(f"IV(self, '{ticker}', {period})")
tickers_IV.append(IV_value)
except:
self.Log(f"Error calculating IV for ticker: {ticker}")
continue
sum_IV_assets_value = sum(tickers_IV)
HT = getattr(self, f"HT{PTnumber}")
HTS = getattr(self, f"HTS{PTnumber}")
PT = getattr(self, f"PT{PTnumber}") * multiplier
# Call the AH function for each asset with the calculated portion
for ticker in assets:
try:
IV_value = eval(f"IV(self, '{ticker}', {period})")
portion = IV_value / sum_IV_assets_value
except:
self.Log(f"Error calculating portion for ticker: {ticker}")
continue
for i in HT.keys():
if HT[i] == 0:
HT[i] = portion * PT
HTS[i].append(ticker)
break
def GroupSort(self, filter_type, group_methods, window, select_type, num_assets, number, multiplier):
try:
# Define a dictionary to map filter types to their corresponding functions
indicator_functions = {
'EMA': EMA,
'RSI': RSI,
'CumReturn': CumReturn,
'STD': STD,
'MaxDD': MaxDD,
'SMA': SMA,
'IV': IV,
'SMADayRet': SMADayRet,
}
# Ensure the filter type is supported
if filter_type not in indicator_functions:
raise ValueError(f"Unsupported filter type: {filter_type}")
# Calculate the weighted indicator values for each group
group_indicator_values = {}
for group_method in group_methods:
if group_method is None:
continue # Skip None values in group_methods
# Execute the group method to get the equities and weights
group_method_func = getattr(self, group_method)
result = group_method_func()
if result is not None and isinstance(result, tuple) and len(result) == 2:
equities, weights = result
# Calculate the weighted indicator value for the group
weighted_indicator_value = 0
for equity, weight in zip(equities, weights):
indicator_value = indicator_functions[filter_type](self, equity, window)
if indicator_value is not None:
weighted_indicator_value += weight * indicator_value
group_indicator_values[group_method] = weighted_indicator_value
# Determine sorting direction
reverse = True if select_type == 'Top' else False
# Sort groups based on the weighted indicator values
sorted_groups = sorted(group_indicator_values.items(), key=lambda x: x[1], reverse=reverse)[:num_assets]
# Get the current HT and HTS attributes for the strategy
HT = getattr(self, f"HT{number}")
HTS = getattr(self, f"HTS{number}")
# Adjust the multiplier based on the number of assets
adjusted_multiplier = multiplier / num_assets
# Update HT and HTS based on the sorted groups
total_weight = sum(weight for _, weight in sorted_groups)
allocated_tickers = set() # Track allocated tickers
total_allocation = 0 # Track total allocation
for group_method, _ in sorted_groups:
group_method_func = getattr(self, group_method)
equities, weights = group_method_func()
for equity, weight in zip(equities, weights):
if equity not in allocated_tickers: # Check if the ticker is already allocated
for i in HT.keys():
if HT[i] == 0:
allocation = (weight / total_weight) * adjusted_multiplier
total_allocation += allocation
if total_allocation <= 1.0: # Check if total allocation exceeds 100%
HT[i] = allocation
HTS[i].append(equity)
allocated_tickers.add(equity) # Mark the ticker as allocated
break
# Scale down the allocations if the total allocation exceeds 100%
if total_allocation > 1.0:
scale_factor = 1.0 / total_allocation
for i in HT.keys():
HT[i] *= scale_factor
# Update the HT and HTS attributes for the strategy
setattr(self, f"HT{number}", HT)
setattr(self, f"HTS{number}", HTS)
except:
# If an error occurs, capture the group methods using regex and execute them with equal allocation
group_method_pattern = re.compile(r'self\.(\w+)\(\)')
group_methods_str = str(group_methods)
captured_group_methods = group_method_pattern.findall(group_methods_str)
num_groups = len(captured_group_methods)
equal_allocation = 1.0 / num_groups
for group_method in captured_group_methods:
group_method_func = getattr(self, group_method)
if callable(group_method_func):
equities, weights = group_method_func()
total_weight = sum(weights)
for equity, weight in zip(equities, weights):
for i in HT.keys():
if HT[i] == 0:
allocation = (weight / total_weight) * equal_allocation
HT[i] = allocation
HTS[i].append(equity)
break
# Update the HT and HTS attributes for the strategy
setattr(self, f"HT{number}", HT)
setattr(self, f"HTS{number}", HTS)# main.py
from AlgorithmImports import *
import math
import pandas as pd
from cmath import sqrt
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Common")
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data.Custom import *
from QuantConnect.Python import PythonData
import csv
import io
import time
import json
import os
import importlib
import inspect
import re
import numpy as np
import types
from indicators import *
class DogeStrat(QCAlgorithm):
def Initialize(self):
self.cash = 100000
self.SetStartDate(2023,4,1)
self.SetEndDate(2025,5,7)
self.SetCash(self.cash)
self.equities = ['KMLM','QQQE','SOXL','SPXL','SQQQ','SVIX','TECL','TLT','VOOG','VOOV','VOX','VTV','XLK','XLP','XLY','AGG','BND','BTAL','DBC','DGRO','EMA','ERX','FAS','QQQE','RSI','SCHD','SOXL','SPXL','SPY','SVXY','TBX','TECL','TECS','TLT','TMF','TMV','TQQQ','UGL','USDU','UVXY','VIXM','VIXY','VOOG','VOOV','VOX','VTV','XLK','XLP','XLU','XLY']
for equity in self.equities:
self.AddEquity(equity,Resolution.Minute)
self.Securities[equity].SetDataNormalizationMode(DataNormalizationMode.Adjusted)
self.AddEquity('BIL',Resolution.Minute)
self.Securities['BIL'].SetDataNormalizationMode(DataNormalizationMode.TotalReturn)
self.exclude_symbols = ['BIL']
# Base directory for strategies
strategy_base_dir = "Strategies"
# List of strategy names (you would populate this list dynamically)
self.strategy_names = [
"CombinedStrategy1",
]
# Get the absolute path of the current file (main.py)
current_file_path = os.path.abspath(__file__)
# Get the directory of the current file
current_directory = os.path.dirname(current_file_path)
# Construct the path to the Strategies folder
strategies_directory = os.path.join(current_directory, "Strategies")
# Dynamically import strategies and create instances
self.strategies = {}
# Generate setattr(self, f'HT...') from 1 to 1000
for i in range(1, 500):
setattr(self, f'HT{i}', {str(j).zfill(2): 0 for j in range(1, 10)})
setattr(self, f'HTS{i}', {str(j).zfill(2): [] for j in range(1, 10)})
for strategy_name in self.strategy_names:
strategy_module_name = f"Strategies.{strategy_name}.version1"
strategy_module = importlib.import_module(strategy_module_name)
# Get the source code of the strategy module
strategy_module_source = inspect.getsource(strategy_module)
# Find all occurrences of "class ...Strategy" in the source code
strategy_class_names = re.findall(r'class\s+(\w+Strategy)', strategy_module_source)
for strategy_class_name in strategy_class_names:
# Get the strategy class from the module
strategy_class = getattr(strategy_module, strategy_class_name)
# Extract the assigned number from the strategy class name if available
assigned_numbers = re.findall(r'\d+', strategy_class_name)
if assigned_numbers:
assigned_number = int(assigned_numbers[0])
else:
assigned_number = None
# Create an instance of the strategy class, passing 'self' as the 'context' argument
strategy_instance = strategy_class(self)
# Store the strategy instance in the dictionary
self.strategies[strategy_class_name] = strategy_instance
self.PTMaster = 1
self.filter_value = 0.02
self.buffer_pct = 0.05 #Rebalance
self.number_of_PT = len(self.strategies) + 10
# Fixed strategies and their percentages
self.fixed_strategies = {
1: 0.94, # Strategy 1 with 94%
}
# Initialize PT variables for each strategy
for i in range(1, self.number_of_PT + 1):
if i in self.fixed_strategies:
setattr(self, f'PT{i}', self.fixed_strategies[i] * self.PTMaster)
# Assign self.algo to self
self.algo = self
# Dictionary to store group definitions
self.group_definitions = {}
# Iterate over the strategies
for strategy_name, strategy in self.strategies.items():
# Get the source code of the strategy module
strategy_module_source = inspect.getsource(strategy.__class__)
# Find all occurrences of "GroupSort(" and "Sort(" in the source code
group_sort_calls = re.findall(r"GroupSort\(.*?\)", strategy_module_source)
# Extract the group names from the GroupSort and Sort calls and add them to the dictionary
for call in group_sort_calls:
group_names = re.findall(r"'(.*?)'", call)
for group_name in group_names:
if group_name not in self.group_definitions:
# Add the group name to the dictionary with a default definition
self.group_definitions[group_name] = f"""
def {group_name}(self):
# Define the logic for the {group_name} method
# Return a tuple of equities and weights
equities = []
weights = []
return equities, weights
""".lstrip() # Remove leading whitespace
# Dynamically generate the group methods in the DogeStrat instance
for group_name, group_def in self.group_definitions.items():
exec(group_def, globals(), locals())
setattr(self, group_name, types.MethodType(locals()[group_name], self))
self.Schedule.On(self.DateRules.EveryDay("SPY"),
self.TimeRules.BeforeMarketClose("SPY",5),
self.FunctionBeforeMarketClose)
def OnData(self, data):
# This function is called every time new data is received
pass
def FunctionBeforeMarketClose(self):
# Execute all strategies stored in the dictionary
for strategy_name, strategy in self.strategies.items():
strategy.Execute()
self.LogStrategies()
self.ExecuteTrade()
self.SetVarToZero()
def ExecuteTrade(self):
df_list = []
# Process each top-performing strategy
for strategy_number in self.FinalTickers:
HTS_attr = getattr(self, f'HTS{strategy_number}')
HT_attr = getattr(self, f'HT{strategy_number}')
group = {
'HTS': [HTS_attr[i][0] if len(HTS_attr[i]) == 1 else HTS_attr[i] for i in HTS_attr],
'HT': [HT_attr[i]/1 for i in HT_attr]
}
df = pd.DataFrame(group)
df_list.append(df)
# Combine all dataframes
df_combined = pd.concat(df_list)
df_combined['HTS'] = df_combined['HTS'].astype(str)
result = df_combined.groupby(['HTS']).sum().reset_index()
# Dictionary with pairs
pairs_dict = {'TECL':'TECS', 'TMF':'TMV'}
pairs_dict.update({v: k for k,v in pairs_dict.items()}) #ensure both directions are covered
# Track selling and buying
processed_pairs_selling = set()
processed_pairs_buying = set()
liquidated_equities = set()
# Exclude symbols
# dictionary
symbol_dict = dict(zip(result.iloc[:,0],result.iloc[:,1]))
# Log output
output = "*****"
for symbol, percentage in symbol_dict.items():
output += "{}: {}% - ".format(symbol, round(percentage*100, 2))
output = output.rstrip(" - ")
self.Log(output)
# Symbols to be transformed
transform_symbols = ['SPY']
transform_mapping = {'SPY':'SPXL'}
transform_ratios = {'SPY':3}
# Transform symbols
for symbol in transform_symbols:
if symbol in symbol_dict:
new_symbol = transform_mapping[symbol]
ratio = transform_ratios[symbol]
new_percentage = symbol_dict[symbol]/ratio
# Adjust percentage allocation
if new_symbol in symbol_dict:
new_percentage += symbol_dict[new_symbol]
symbol_dict[new_symbol] = new_percentage
# Remove transformed
symbol_dict.pop(symbol, None)
# Ensure updated equities list
updated_equities = set(symbol_dict.keys())
# Liquidate equities
for equity in self.equities:
if equity not in updated_equities and self.Portfolio[equity].HoldStock and equity not in liquidated_equities:
self.Liquidate(equity)
liquidated_equities.add(equity)
# Iterate pairs selling
for symbol1,symbol2 in pairs_dict.items():
if symbol1 in symbol_dict and symbol2 in symbol_dict:
offset_value = abs(symbol_dict[symbol1] - symbol_dict[symbol2])
if symbol_dict[symbol1] >= symbol_dict[symbol2] and self.Portfolio[symbol2].HoldStock:
self.Liquidate(symbol2)
elif symbol_dict[symbol1] <= symbol_dict[symbol2] and self.Portfolio[symbol1].HoldStock:
self.Liquidate(symbol1)
# Mark processed selling
processed_pairs_selling.add(symbol1)
processed_pairs_selling.add(symbol2)
# Iterate remaining selling
for symbol,value in symbol_dict.items():
if symbol not in processed_pairs_selling and not value == 0 and symbol not in self.exclude_symbols:
if isinstance(symbol, str) and symbol.startswith("['") and symbol.endswith("']"):
symbol_list = eval(symbol)
percentage_equity = sum(self.Portfolio[s].HoldingsValue for s in symbol_list) / self.Portfolio.TotalPortfolioValue
else:
percentage_equity = self.Portfolio[symbol].HoldingsValue/self.Portfolio.TotalPortfolioValue
if value < percentage_equity and abs(value/percentage_equity - 1) > self.buffer_pct:
if isinstance(symbol, str) and symbol.startswith("['") and symbol.endswith("']"):
symbol_list = eval(symbol)
for s in symbol_list:
self.SetHoldings(s, value / len(symbol_list))
else:
self.SetHoldings(symbol,value)
# Iterate pairs buying
for symbol1,symbol2 in pairs_dict.items():
if symbol1 in symbol_dict and symbol2 in symbol_dict and symbol1 not in processed_pairs_buying and symbol2 not in processed_pairs_buying:
offset_value = abs(symbol_dict[symbol1] - symbol_dict[symbol2])
if offset_value > self.filter_value:
if symbol_dict[symbol1] > symbol_dict[symbol2]:
if isinstance(symbol1, list):
for s in symbol1:
self.SetHoldings(s, offset_value / len(symbol1))
else:
self.SetHoldings(symbol1,offset_value)
else:
if isinstance(symbol2, list):
for s in symbol2:
self.SetHoldings(s, offset_value / len(symbol2))
else:
self.SetHoldings(symbol2,offset_value)
else:
if isinstance(symbol1, list):
for s in symbol1:
if self.Portfolio[s].HoldStock:
self.Liquidate(s)
else:
if self.Portfolio[symbol1].HoldStock:
self.Liquidate(symbol1)
if isinstance(symbol2, list):
for s in symbol2:
if self.Portfolio[s].HoldStock:
self.Liquidate(s)
else:
if self.Portfolio[symbol2].HoldStock:
self.Liquidate(symbol2)
# Mark as processed buying
processed_pairs_buying.add(symbol1)
processed_pairs_buying.add(symbol2)
# Filter less than 1%
updated_equities = {symbol for symbol, value in symbol_dict.items() if value >= self.filter_value}
# Iterate remaining symbol_dict for buying
for symbol,value in symbol_dict.items():
if (symbol in updated_equities and
symbol not in processed_pairs_buying and
symbol not in self.exclude_symbols):
if isinstance(symbol, str) and symbol.startswith("['") and symbol.endswith("']"):
symbol_list = eval(symbol)
percentage_equity = sum(self.Portfolio[s].HoldingsValue for s in symbol_list) / self.Portfolio.TotalPortfolioValue
else:
percentage_equity = (self.Portfolio[symbol].HoldingsValue /
self.Portfolio.TotalPortfolioValue)
if value > percentage_equity and abs(percentage_equity/value - 1) > self.buffer_pct:
if isinstance(symbol, str) and symbol.startswith("['") and symbol.endswith("']"):
symbol_list = eval(symbol)
for s in symbol_list:
self.SetHoldings(s, value / len(symbol_list))
else:
self.SetHoldings(symbol,value)
def SetVarToZero(self):
for strategy_number in range(1, self.number_of_PT + 1):
setattr(self, f'HT{strategy_number}', {str(j).zfill(2): 0 for j in range(1, 10)})
setattr(self, f'HTS{strategy_number}', {str(j).zfill(2): [] for j in range(1, 10)})
def LogStrategies(self):
self.FinalTickers = list(self.fixed_strategies.keys())
# Create a set of current strategies
current_strategies = set(self.FinalTickers)
self.Log("Strategies, Percentages and Holdings")
for index, strategy_number in enumerate(self.FinalTickers, start=1):
if strategy_number in self.fixed_strategies:
percentage = self.fixed_strategies[strategy_number] * 100
strategy_name, holdings_info = self.get_strategy_name_and_holdings(strategy_number)
self.Log(f"Strategy {index}: {strategy_name}, Percentage: {round(percentage, 2)}%, Holdings: {holdings_info}")
self.Log(self.FinalTickers)
self.Log("-" * 50)
def get_strategy_name_and_holdings(self, strategy_number):
for strategy_name, strategy_instance in self.strategies.items():
strategy_module_source = inspect.getsource(strategy_instance.__class__)
strategy_numbers = re.findall(r'AH\(self(?:\.algo)?,\s*[\'\w\(\)\s,]+,\s*(\d+)\s*,', strategy_module_source)
strategy_numbers.extend(re.findall(r'Sort\(self(?:\.algo)?,\s*[\'\w]+,\s*[\'\w\(\)\s,]+,\s*\d+\s*,\s*\w+\s*,\s*\d+\s*,\s*(\d+)', strategy_module_source))
strategy_numbers.extend(re.findall(r'AHIV\(self(?:\.algo)?,\s*[\'\w\(\)\s,]+,\s*\d+\s*,\s*(\d+)\s*,\s*[\d\.]+\)', strategy_module_source))
strategy_numbers.extend(re.findall(r'GroupSort\(self(?:\.algo)?,\s*[\'\w]+,\s*[\'\w\(\)\s,]+,\s*\d+\s*,\s*\w+\s*,\s*\d+\s*,\s*(\d+)', strategy_module_source))
#self.Log(f"Strategy: {strategy_name}, Numbers: {strategy_numbers}") # Debugging log
if str(strategy_number) in strategy_numbers:
HT_attr = getattr(self, f'HT{strategy_number}')
HTS_attr = getattr(self, f'HTS{strategy_number}')
holdings_info = ', '.join([f"{ticker}: {round(HT_attr[key] * 100, 2)}%"
for key, tickers in HTS_attr.items()
if tickers
for ticker in (tickers if isinstance(tickers, list) else [tickers])])
return type(strategy_instance).__name__, holdings_info
return "", ""