# ====================================================================================================
# V9.1.1 - 分离式回撤管理器 (drawdown_manager.py)
#
# Decoupled Drawdown Manager
#
# V9.1.1 Bug修复 (2025-09-21):
# 1. 【关键修复】修复了因遗漏属性初始化导致的 AttributeError。
# 2. 在 __init__ 中恢复了 global_monitor_symbols, global_monitor_groups 等属性的初始化。
# 3. 重新整合了分组监控功能到 update_daily_global_status 函数中。
#
# V9.1.0 核心重构 (保留):
# 1. 将`on_data_update`拆分为 update_daily_global_status() 和 update_intraday_asset_status()。
# ====================================================================================================
# region imports
from AlgorithmImports import *
# endregion
from datetime import datetime, timedelta
from collections import deque
class DrawdownManager:
"""
分离式回撤管理器 V9.1.1 (Bug修复版)
将全局(低频)和个股(高频)的更新逻辑分离,以实现更灵活的风险控制策略。
"""
# 模式常量定义
MODE_DISABLED = 0
MODE_ASSET_ONLY = 1
MODE_GLOBAL_ASSET = 2
MODE_FULL_LAYERS = 3
def __init__(self, algorithm, config):
self.algo = algorithm
self.mode = config.get('drawdown_control_mode', 0)
self.enable_logging = config.get('enable_drawdown_logging', False)
# === 【V9.1.1 修复】恢复所有缺失的属性初始化 ===
self.global_monitor_symbols = config.get('global_monitor_symbols', None)
self.global_monitor_groups = config.get('global_monitor_groups', None)
self.global_group_thresholds = config.get('global_group_thresholds', None)
self.global_use_realized_pnl = config.get('global_use_realized_pnl', True)
# ===============================================
self.global_dd_threshold = config.get('global_dd_threshold', 200)
self.time_window_days = config.get('dd_time_window', 10)
self.loss_day_ratio = config.get('loss_day_ratio', 0.8)
self.asset_dd_normal = config.get('asset_dd_normal', 60)
self.asset_dd_warning = config.get('asset_dd_warning', 30)
# 状态变量
self.portfolio_high_water_mark = 0
self.global_realized_pnl = 0
self.global_pnl_peak = 0
self.group_realized_pnl = {}
self.group_pnl_peaks = {}
self.asset_pnl_peaks = {}
self.asset_cumulative_pnl = {}
self.daily_pnl_history = deque(maxlen=31)
self.global_warning_active = False
self.asset_stopped = {}
self.previous_day_value = None
self.last_log_date = None
def initialize_portfolio_value(self, initial_value):
if self.mode == self.MODE_DISABLED: return
self.portfolio_high_water_mark = initial_value
self.previous_day_value = initial_value
def update_daily_global_status(self, current_portfolio_value):
"""
【V9.1.1】低频全局状态更新函数,应在每日定时任务中调用。
"""
if self.mode < self.MODE_GLOBAL_ASSET: return
# --- 第一层:全局监控 ---
global_drawdown = self._calculate_global_drawdown(current_portfolio_value)
layer1_triggered = global_drawdown >= self.global_dd_threshold
# --- 【V9.1.1 修复】重新整合分组监控 ---
if self.global_use_realized_pnl and self.global_monitor_groups:
group_warnings = self._check_group_drawdowns()
layer1_triggered = layer1_triggered or any(group_warnings.values())
# ====================================
# --- 第二层:时间确认 ---
if self.mode == self.MODE_FULL_LAYERS and layer1_triggered:
layer2_triggered = self._check_time_confirmation()
self.global_warning_active = layer1_triggered and layer2_triggered
elif self.mode == self.MODE_GLOBAL_ASSET:
self.global_warning_active = layer1_triggered
else:
self.global_warning_active = False
def update_intraday_asset_status(self):
"""
高频个股状态更新函数,应在 OnData 中调用。
"""
if self.mode == self.MODE_DISABLED: return
for symbol in self.algo.Securities.Keys:
holdings = self.algo.Securities[symbol].Holdings
cumulative_pnl = holdings.TotalCloseProfit()
self._check_and_update_asset_status(symbol, cumulative_pnl)
def _calculate_global_drawdown(self, current_portfolio_value):
if self.global_use_realized_pnl:
return self._calculate_realized_pnl_drawdown()
else:
return self._update_equity_drawdown(current_portfolio_value)
def _update_equity_drawdown(self, portfolio_value):
self.portfolio_high_water_mark = max(self.portfolio_high_water_mark, portfolio_value)
return self.portfolio_high_water_mark - portfolio_value
def _calculate_realized_pnl_drawdown(self):
all_traded_symbols = self.algo.Securities.Keys
symbols_to_monitor = self._get_symbols_to_monitor(all_traded_symbols)
total_realized_pnl = 0
for symbol in symbols_to_monitor:
if symbol in self.algo.Securities:
security_info = self.algo.Securities[symbol]
total_realized_pnl += security_info.Holdings.TotalCloseProfit()
self.global_realized_pnl = total_realized_pnl
self.global_pnl_peak = max(self.global_pnl_peak, total_realized_pnl)
return self.global_pnl_peak - self.global_realized_pnl
def _get_symbols_to_monitor(self, all_symbols):
if self.global_monitor_symbols is None: return all_symbols
monitored, symbol_str_set = [], {s.upper() for s in self.global_monitor_symbols}
for symbol in all_symbols:
if symbol.ID.Symbol.upper() in symbol_str_set: monitored.append(symbol)
if len(monitored) == 0 and self.enable_logging and not hasattr(self, '_monitor_warning_shown'):
self.algo.Debug(f"[Warning] No symbols matched for monitoring: {self.global_monitor_symbols}")
self._monitor_warning_shown = True
return monitored
def _check_group_drawdowns(self):
"""【V9.1.1 修复】恢复分组监控功能"""
if not self.global_monitor_groups or not self.global_group_thresholds: return {}
group_warnings, all_securities = {}, self.algo.Securities
for group_name, symbols_list in self.global_monitor_groups.items():
group_pnl, group_symbol_set = 0, {s.upper() for s in symbols_list}
for symbol in all_securities.Keys:
if symbol.ID.Symbol.upper() in group_symbol_set:
group_pnl += all_securities[symbol].Holdings.TotalCloseProfit()
if group_name not in self.group_pnl_peaks:
self.group_pnl_peaks[group_name] = max(0, group_pnl)
else:
self.group_pnl_peaks[group_name] = max(self.group_pnl_peaks[group_name], group_pnl)
self.group_realized_pnl[group_name] = group_pnl
group_drawdown = self.group_pnl_peaks[group_name] - group_pnl
threshold = self.global_group_thresholds.get(group_name, self.global_dd_threshold)
group_warnings[group_name] = group_drawdown >= threshold
if self.enable_logging and group_warnings[group_name]:
self.algo.Debug(f"[Group Warning] {group_name}: Drawdown ${group_drawdown:.2f} >= ${threshold}")
return group_warnings
def update_asset_metrics(self, symbol, cumulative_pnl):
if self.mode == self.MODE_DISABLED: return 0
if symbol not in self.asset_pnl_peaks:
self.asset_pnl_peaks[symbol] = max(0, cumulative_pnl)
else:
self.asset_pnl_peaks[symbol] = max(self.asset_pnl_peaks[symbol], cumulative_pnl)
self.asset_cumulative_pnl[symbol] = cumulative_pnl
return self.asset_pnl_peaks[symbol] - self.asset_cumulative_pnl[symbol]
def _check_time_confirmation(self):
available_days = len(self.daily_pnl_history)
if available_days < 3: return False
effective_window = min(available_days, self.time_window_days)
recent_days = list(self.daily_pnl_history)[-effective_window:]
loss_days = sum(1 for pnl in recent_days if pnl < 0)
required_loss_days = int(effective_window * self.loss_day_ratio)
return loss_days >= required_loss_days
def _check_and_update_asset_status(self, symbol, cumulative_pnl):
drawdown = self.update_asset_metrics(symbol, cumulative_pnl)
threshold = self.asset_dd_warning if self.mode >= self.MODE_GLOBAL_ASSET and self.global_warning_active else self.asset_dd_normal
if drawdown >= threshold and not self.asset_stopped.get(symbol, False):
self.asset_stopped[symbol] = True
def can_trade(self, symbol):
if self.mode == self.MODE_DISABLED: return True
return not self.asset_stopped.get(symbol, False)
def update_daily_pnl_history(self, current_portfolio_value):
if self.mode != self.MODE_FULL_LAYERS: return
if self.global_use_realized_pnl:
self._calculate_realized_pnl_drawdown()
if hasattr(self, '_previous_day_pnl'):
daily_pnl = self.global_realized_pnl - self._previous_day_pnl
self.daily_pnl_history.append(daily_pnl)
self._previous_day_pnl = self.global_realized_pnl
else:
if self.previous_day_value is not None:
daily_pnl = current_portfolio_value - self.previous_day_value
self.daily_pnl_history.append(daily_pnl)
self.previous_day_value = current_portfolio_value
def log_daily_status(self):
if not self.enable_logging: return
current_date = self.algo.Time.date()
if self.last_log_date == current_date: return
self.last_log_date = current_date
if self.mode >= self.MODE_GLOBAL_ASSET:
self.algo.Log(f"[DrawdownManager V9.1.1] Daily Report - {current_date}")
self.algo.Log(f" Mode: {self.mode}")
if self.global_use_realized_pnl:
drawdown = self.global_pnl_peak - self.global_realized_pnl
self.algo.Log(f" Global Monitor: Realized PnL (Daily)")
self.algo.Log(f" Global Realized PnL: ${self.global_realized_pnl:.2f}")
self.algo.Log(f" PnL Peak: ${self.global_pnl_peak:.2f}")
self.algo.Log(f" Global Drawdown: ${drawdown:.2f}")
else:
drawdown = self.portfolio_high_water_mark - self.algo.Portfolio.TotalPortfolioValue
self.algo.Log(f" Global Monitor: Total Equity (Daily) - Official Logic")
self.algo.Log(f" Portfolio Equity: ${self.algo.Portfolio.TotalPortfolioValue:.2f}")
self.algo.Log(f" Equity Peak: ${self.portfolio_high_water_mark:.2f}")
self.algo.Log(f" Equity Drawdown: ${drawdown:.2f}")
stopped_symbols = [str(s) for s, stopped in self.asset_stopped.items() if stopped]
if stopped_symbols:
self.algo.Log(f" Stopped assets ({len(stopped_symbols)}): {', '.join(stopped_symbols[:5])}")
if len(stopped_symbols) > 5: self.algo.Log(f" ... and {len(stopped_symbols) - 5} more")
def get_status_summary(self):
if self.mode == self.MODE_DISABLED: return {'mode': 'DISABLED'}
if self.global_use_realized_pnl:
global_drawdown = self.global_pnl_peak - self.global_realized_pnl
global_info = {'type': 'realized_pnl', 'current_pnl': self.global_realized_pnl, 'pnl_peak': self.global_pnl_peak, 'drawdown': global_drawdown}
else:
portfolio_drawdown = self.portfolio_high_water_mark - self.algo.Portfolio.TotalPortfolioValue
global_info = {'type': 'portfolio_value', 'current_equity': self.algo.Portfolio.TotalPortfolioValue, 'high_water_mark': self.portfolio_high_water_mark, 'drawdown': portfolio_drawdown}
monitored_symbols = 'ALL' if self.global_monitor_symbols is None else self.global_monitor_symbols
asset_details = {str(symbol): {'stopped': self.asset_stopped.get(symbol, False), 'current_pnl': self.asset_cumulative_pnl.get(symbol, 0), 'pnl_peak': self.asset_pnl_peaks.get(symbol, 0), 'drawdown': self.asset_pnl_peaks.get(symbol, 0) - self.asset_cumulative_pnl.get(symbol, 0)} for symbol in self.asset_pnl_peaks.keys()}
group_status = {group_name: {'pnl': self.group_realized_pnl.get(group_name, 0), 'peak': self.group_pnl_peaks.get(group_name, 0), 'drawdown': self.group_pnl_peaks.get(group_name, 0) - self.group_realized_pnl.get(group_name, 0)} for group_name in self.global_monitor_groups.keys()} if self.global_monitor_groups else {}
return {
'mode': self.mode, 'global_warning': self.global_warning_active,
'global_monitoring': global_info, 'monitored_symbols': monitored_symbols,
'group_status': group_status,
'stopped_assets': [str(s) for s, stopped in self.asset_stopped.items() if stopped],
'asset_details': asset_details,
}
def reset_asset_stop(self, symbol=None):
if symbol:
if symbol in self.asset_stopped:
self.asset_stopped[symbol] = False
if self.enable_logging: self.algo.Debug(f"[DrawdownManager V9.1.1] Reset stop status for {symbol}")
else:
self.asset_stopped = {}
if self.enable_logging: self.algo.Debug("[DrawdownManager V9.1.1] Reset all stop statuses")
def reset_global_peaks(self):
if self.global_use_realized_pnl:
self.global_pnl_peak = max(0, self.global_realized_pnl)
else:
self.portfolio_high_water_mark = self.algo.Portfolio.TotalPortfolioValue
if self.enable_logging: self.algo.Log("[DrawdownManager V9.1.1] Global peaks reset")from AlgorithmImports import *
from datetime import datetime, timedelta, date
import numpy as np
import pandas as pd
class FearGreedAnalyzer:
"""
Fear and Greed Index 自适应分析器
V13.1版本:修复实盘交易中的未来数据请求问题 + 严格数据延迟控制
主要修复:
1. 智能时间窗口管理,区分回测和实盘
2. 渐进式数据加载策略
3. 健壮的错误处理和fallback机制
4. 优化的数据缓存策略
5. V13.1新增:数据延迟超过阈值自动切换到default模式
- 实盘:5天
- 回测:7天
"""
def __init__(self, algorithm, config):
"""
初始化Fear and Greed分析器
Args:
algorithm: QuantConnect算法实例
config: 配置字典
"""
self.algorithm = algorithm
# Fear and Greed Index相关配置
self.enabled = config.get('enable_fear_greed_adaptive', False)
# 贪婪阈值:默认60,可通过配置调整
# 指数 < 阈值时使用default模式,>= 阈值时进行标准化回撤计算
self.greed_threshold = config.get('fear_greed_threshold', 60)
# 历史回看天数:用于计算标准化回撤的窗口期,默认20天
self.lookback_days = config.get('fear_greed_lookback_days', 20)
# 标准化回撤阈值:默认1.5倍标准差
self.normalized_drawdown_threshold = config.get('fear_greed_drawdown_threshold', 1.5)
self.debug_mode = config.get('debug_fear_greed', False)
# Fear and Greed Index数据符号
self.fg_symbol = None
# 当前选择的CHOP模式
self.current_chop_mode = config.get('chop_adaptive_mode', 'default')
# 存储历史数据
self.fear_greed_data = {}
self.last_update_date = None
# V13新增:数据加载管理
self.data_loading_attempts = 0
self.max_loading_attempts = 3
self.last_successful_load = None
def initialize(self):
"""
初始化Fear and Greed Index数据源
"""
if not self.enabled:
return
try:
# 添加Fear and Greed Index数据
self.fg_symbol = self.algorithm.AddData(FearGreedIndex, "FG").Symbol
if self.debug_mode:
mode_str = "回测" if not self.algorithm.LiveMode else "实盘"
self.algorithm.Debug(f"[FearGreed V13.1] [{mode_str}] 初始化成功,数据符号: {self.fg_symbol}")
# 注意:不在这里预加载数据,而是在warm-up完成后加载
except Exception as e:
self.algorithm.Error(f"[FearGreed V13.1] 初始化失败: {str(e)}")
self.enabled = False
def load_historical_data_segments(self):
"""
V13优化:智能分段加载历史数据
关键修复:
1. 区分回测和实盘的时间窗口策略
2. 实盘中永远不请求未来数据
3. 渐进式加载,优先保证近期数据
4. 增强错误处理和重试机制
"""
if not self.enabled or self.fg_symbol is None:
return
# 防止重复加载(除非失败后重试)
if self.data_loading_attempts >= self.max_loading_attempts:
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] 已达到最大加载尝试次数 ({self.max_loading_attempts})")
return
self.data_loading_attempts += 1
try:
current_time = self.algorithm.Time
# V13核心修复:智能时间窗口管理
if self.algorithm.LiveMode:
# 实盘模式:永远不请求未来数据
end_time = current_time
# 实盘中只需要90天的历史数据即可
start_time = current_time - timedelta(days=90)
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] 实盘模式:数据窗口 {start_time.date()} 至 {end_time.date()}")
else:
# 回测模式:使用配置的日期范围
if hasattr(self.algorithm, 'EndDate') and self.algorithm.EndDate:
end_time = self.algorithm.EndDate + timedelta(days=30)
else:
end_time = current_time + timedelta(days=30)
if hasattr(self.algorithm, 'StartDate') and self.algorithm.StartDate:
start_time = self.algorithm.StartDate - timedelta(days=30)
else:
start_time = current_time - timedelta(days=400)
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] 回测模式:数据窗口 {start_time.date()} 至 {end_time.date()}")
# V13优化:渐进式数据加载
success_count = 0
# 分段加载,优先加载近期数据
current_end = end_time
segment = 0
while current_end > start_time and segment < 20:
try:
# 每段30天
current_start = current_end - timedelta(days=30)
if current_start < start_time:
current_start = start_time
# 确保不请求未来数据
if current_end > self.algorithm.Time:
current_end = self.algorithm.Time
# 请求历史数据
history = self.algorithm.History(
self.fg_symbol,
current_start,
current_end,
Resolution.DAILY
)
if not history.empty:
self._process_history_data(history)
success_count += 1
if self.debug_mode and segment < 3: # 只记录前3段
self.algorithm.Debug(f"[FearGreed V13.1] 加载段 {segment}: {current_start.date()} 至 {current_end.date()} 成功")
# 移动到下一段
current_end = current_start
segment += 1
except Exception as e:
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] 段 {segment} 加载失败: {str(e)[:100]}")
# 继续下一段
current_end = current_end - timedelta(days=30)
segment += 1
continue
# 记录加载结果
if success_count > 0:
self.last_successful_load = current_time
if self.fear_greed_data and self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] 成功加载 {len(self.fear_greed_data)} 条历史数据")
self.algorithm.Debug(f"[FearGreed V13.1] 数据范围: {min(self.fear_greed_data.keys())} 至 {max(self.fear_greed_data.keys())}")
else:
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] 历史数据加载失败,将依赖实时数据")
except Exception as e:
self.algorithm.Error(f"[FearGreed V13.1] 分段加载失败: {str(e)[:200]}")
# V13:失败后的fallback策略
if self.data_loading_attempts < self.max_loading_attempts:
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] 将在下次OnData时重试加载")
def _process_history_data(self, history):
"""
V13优化:处理历史数据并存储到字典
增加数据验证和错误处理
"""
try:
for index, row in history.iterrows():
# 获取日期
if hasattr(index, '__len__') and len(index) > 1:
data_date = index[-1]
else:
data_date = index
# 转换为date对象
if hasattr(data_date, 'date'):
data_date = data_date.date()
else:
data_date = pd.to_datetime(str(data_date)).date()
# 获取指数值 - 优先使用Value属性
value = None
if hasattr(row, 'value'):
value = row['value']
elif 'value' in history.columns:
value = row['value']
elif 'qcindex' in history.columns:
value = row['qcindex']
elif 'close' in history.columns:
value = row['close']
else:
value = row.iloc[0]
# V13:数据验证
if value is not None and 0 <= float(value) <= 100:
# 存储到字典
self.fear_greed_data[data_date] = float(value)
elif self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] 忽略无效数据: {data_date} = {value}")
except Exception as e:
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] 数据处理失败: {str(e)}")
def update_from_data_slice(self, data):
"""
V13优化:从OnData的数据切片中更新Fear & Greed值
增加数据验证和错误处理
"""
if not self.enabled or self.fg_symbol is None:
return
try:
# 检查数据切片中是否有Fear & Greed数据
if self.fg_symbol in data and data[self.fg_symbol] is not None:
current_value = data[self.fg_symbol].Value
current_date = self.algorithm.Time.date()
# V13:数据验证
if 0 <= current_value <= 100:
# 更新缓存
self.fear_greed_data[current_date] = float(current_value)
if self.debug_mode and current_date != self.last_update_date:
self.algorithm.Debug(f"[FearGreed V13.1] 实时更新: {current_date} = {current_value:.2f}")
self.last_update_date = current_date
elif self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] 忽略无效实时数据: {current_value}")
except Exception as e:
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] 数据切片更新失败: {str(e)}")
def get_latest_index_value(self, current_date):
"""
V13优化:获取指定日期或之前最近的Fear & Greed指数值
增强的数据查找策略
"""
# 向前查找最多30天
for days_back in range(30):
check_date = current_date - timedelta(days=days_back)
if check_date in self.fear_greed_data:
return self.fear_greed_data[check_date], check_date, days_back
# V13:如果缓存中没有,尝试动态获取(只在数据加载失败时)
if self.data_loading_attempts < self.max_loading_attempts:
return self._fetch_latest_value_fallback(current_date)
return None
def _fetch_latest_value_fallback(self, current_date):
"""
V13优化:缓存失效时的后备获取方法
限制请求次数,避免频繁API调用
"""
try:
# V13:限制fallback请求,避免影响性能
if self.algorithm.LiveMode:
# 实盘中谨慎使用fallback
lookback_days = [7, 14]
else:
# 回测中可以更激进
lookback_days = [7, 14, 21, 30]
for lookback in lookback_days:
try:
history = self.algorithm.History(
self.fg_symbol,
lookback,
Resolution.DAILY
)
if not history.empty:
# 处理并缓存数据
self._process_history_data(history)
# 再次尝试获取
for days_back in range(lookback):
check_date = current_date - timedelta(days=days_back)
if check_date in self.fear_greed_data:
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] Fallback成功,使用{lookback}天回看")
return self.fear_greed_data[check_date], check_date, days_back
except:
continue # 静默失败,尝试下一个
except Exception as e:
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] Fallback失败: {str(e)[:100]}")
return None
def analyze_and_set_mode(self, current_time):
"""
V13优化:分析Fear and Greed Index并设置CHOP模式
增强的错误处理和实盘保护
每日UTC 00:30执行
"""
if not self.enabled:
return
try:
# 跳过预热期
if self.algorithm.IsWarmingUp:
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] {current_time} 预热期跳过")
return
current_date = current_time.date()
mode_str = "回测" if not self.algorithm.LiveMode else "实盘"
# 获取昨日的指数值(文档要求:昨日的恐慌贪婪指数)
yesterday = current_date - timedelta(days=1)
result = self.get_latest_index_value(yesterday)
# V13:无数据时的处理
if result is None:
# 无数据时保持当前模式或使用default
if self.current_chop_mode is None:
self.current_chop_mode = 'default'
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] [{mode_str}] {current_time} 无可用数据,保持{self.current_chop_mode}模式")
return
current_index, data_date, days_old = result
# 注意:days_old现在是相对于昨天的延迟,需要加1得到相对于今天的延迟
actual_days_old = days_old + 1
# V13.1修改:更严格的数据时效性检查(特别是实盘模式)
# 实盘5天,回测7天,超时后切换到default模式
max_data_age = 5 if self.algorithm.LiveMode else 7
if actual_days_old > max_data_age:
# 数据过旧,切换到default模式
self.current_chop_mode = 'default'
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] [{mode_str}] {current_time} 数据过旧({actual_days_old}天前),切换到default模式")
if self.algorithm.LiveMode:
self.algorithm.Log(f"[FearGreed V13.1 WARNING] 实盘数据已{actual_days_old}天未更新,超过{max_data_age}天限制,已切换到default模式")
return
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] [{mode_str}] {current_time} 使用{data_date}的指数: {current_index:.2f} (延迟{actual_days_old}天)")
# 低于阈值:直接采用default模式
if current_index < self.greed_threshold:
self.current_chop_mode = 'default'
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] 指数<{self.greed_threshold},选择default模式")
else:
# 高于或等于阈值:计算标准化回撤
# 收集历史数据(从昨天开始往前数lookback_days天,不包含今天)
historical_values = []
for days_back in range(1, self.lookback_days + 1): # 从1开始,不包含今天
check_date = current_date - timedelta(days=days_back)
if check_date in self.fear_greed_data:
historical_values.append(self.fear_greed_data[check_date])
# 确保有足够的数据点
min_required_points = min(10, self.lookback_days)
if len(historical_values) < min_required_points:
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] 历史数据不足(仅{len(historical_values)}条,需{min_required_points}条),使用default模式")
self.current_chop_mode = 'default'
return
# 计算标准化回撤
values_array = np.array(historical_values)
max_20d = np.max(values_array)
std_20d = np.std(values_array, ddof=1)
if std_20d > 0:
drawdown = max_20d - current_index # 回撤值
normalized_drawdown = drawdown / std_20d
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] 标准化回撤计算({len(historical_values)}天数据):")
self.algorithm.Debug(f" ├─ {self.lookback_days}日最高值: {max_20d:.2f}")
self.algorithm.Debug(f" ├─ 当前指数值: {current_index:.2f} (昨日)")
self.algorithm.Debug(f" ├─ 回撤值: {drawdown:.2f} ({max_20d:.2f} - {current_index:.2f})")
self.algorithm.Debug(f" ├─ {self.lookback_days}日标准差: {std_20d:.2f}")
self.algorithm.Debug(f" └─ 标准化回撤: {normalized_drawdown:.2f} ({drawdown:.2f} ÷ {std_20d:.2f})")
if normalized_drawdown > self.normalized_drawdown_threshold:
self.current_chop_mode = 'reverse'
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] 标准化回撤>{self.normalized_drawdown_threshold},选择reverse模式")
else:
self.current_chop_mode = 'default'
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] 标准化回撤<={self.normalized_drawdown_threshold},选择default模式")
else:
self.current_chop_mode = 'default'
if self.debug_mode:
self.algorithm.Debug(f"[FearGreed V13.1] 标准差为0,使用default模式")
except Exception as e:
# V13:实盘保护 - 出错时保持当前模式或使用default
if self.current_chop_mode is None:
self.current_chop_mode = 'default'
if self.algorithm.LiveMode:
self.algorithm.Log(f"[FearGreed V13.1 ERROR] 分析失败,保持{self.current_chop_mode}模式: {str(e)[:200]}")
else:
self.algorithm.Error(f"[FearGreed V13.1] 分析失败: {str(e)}")
def get_current_mode(self):
"""
获取当前的CHOP模式
Returns:
str: 'default' 或 'reverse'
"""
return self.current_chop_mode# ====================================================================================================
# V8.5模块化拆分 - 技术指标模块 (indicators.py)
#
# 本模块包含:
# 1. PineScriptRSI类 - Pine Script兼容的RSI实现
# 2. HighPerformanceHMA类 - 高性能Hull Moving Average实现
# 3. ALMA计算器类
# 4. CHOP指标计算器类
# 5. ATR指标计算器类
# ====================================================================================================
from AlgorithmImports import *
import numpy as np
import math
from collections import deque
class ALMACalculator:
"""
Arnaud Legoux Moving Average计算器
完全复现Pine Script的ta.alma()
"""
def __init__(self, length=1440, offset=0.85, sigma=6):
"""
初始化ALMA计算器
Args:
length: ALMA周期
offset: 偏移参数
sigma: 平滑参数
"""
self.length = length
self.offset = offset
self.sigma = sigma
# 预计算权重以提高性能
self.weights = self._precompute_weights()
def _precompute_weights(self):
"""预计算ALMA权重以提高性能"""
length = self.length
offset = self.offset
sigma = self.sigma
m = offset * (length - 1)
s = length / sigma
weights = []
weight_sum = 0.0
for i in range(length):
w = np.exp(-((i - m) ** 2) / (2 * s * s))
weights.append(w)
weight_sum += w
# 归一化权重
if weight_sum > 0:
weights = [w / weight_sum for w in weights]
return weights
def calculate(self, data):
"""
计算ALMA值
Args:
data: 价格数据列表
Returns:
ALMA值,如果数据不足返回None
"""
if len(data) < self.length:
return None
# 反转数据,使最新数据在索引0
data = list(reversed(data))[:self.length]
# 计算加权平均(优化:使用列表推导式)
alma = sum(data[i] * self.weights[i] for i in range(self.length))
return alma
def calculate_slices(self, data, offsets):
"""
计算ALMA时间切片值
Args:
data: 价格数据列表
offsets: 偏移字典 {'current': 0, '4h': 240, ...}
Returns:
字典包含各个时间切片的ALMA值
"""
results = {}
for key, offset in offsets.items():
# 获取起始位置的数据
if offset + self.length <= len(data):
# 直接从数据切片
prices = data[offset:offset + self.length]
if len(prices) == self.length:
results[key] = self.calculate(prices)
else:
results[key] = None
else:
results[key] = None
return results
class CHOPCalculator:
"""
CHOP指标计算器
完全复现Pine Script的CHOP指标计算逻辑
"""
def __init__(self, length=240):
"""
初始化CHOP计算器
Args:
length: CHOP周期,默认240(4小时)
"""
self.length = length
self.atr_rma = None # ATR RMA状态
def calculate_atr_standard(self, symbol_data):
"""计算标准ATR - 用于CHOP计算"""
if symbol_data.price_window.Count < self.length + 1:
return None
# 初始化ATR(使用简单平均)
if self.atr_rma is None:
# 第一次计算,使用简单平均
tr_values = []
for i in range(self.length):
if i + 1 < symbol_data.price_window.Count:
tr = symbol_data.calculate_true_range(i)
if tr is not None:
tr_values.append(tr)
if len(tr_values) >= self.length:
self.atr_rma = sum(tr_values) / self.length
return self.atr_rma
return None
else:
# 后续计算,使用RMA
tr = symbol_data.calculate_true_range(0)
if tr is not None:
alpha = 1.0 / self.length
self.atr_rma = alpha * tr + (1 - alpha) * self.atr_rma
return self.atr_rma
return None
def calculate_chop(self, symbol_data):
"""
计算CHOP指标值
Args:
symbol_data: 标的物数据对象
Returns:
CHOP值,如果数据不足返回None
"""
# Pine: atr_chop = ta.atr(chop_length)
current_atr = self.calculate_atr_standard(symbol_data)
if current_atr is not None:
# 更新ATR窗口
symbol_data.atr_window.Add(current_atr)
# Pine: math.sum(atr_chop, chop_length)
if symbol_data.atr_window.Count >= self.length:
# 优化:预先提取数据减少属性访问
atr_values = [symbol_data.atr_window[i] for i in range(self.length)]
atr_sum = sum(atr_values)
# Pine: high_low_range = ta.highest(high, chop_length) - ta.lowest(low, chop_length)
# 优化:使用缓存数据
highs, lows = symbol_data.get_high_low_list(self.length)
if highs and lows:
high_low_range = max(highs) - min(lows)
# Pine: chop_value = 100 * math.log10(math.sum(atr_chop, chop_length) / high_low_range) / math.log10(chop_length)
if high_low_range > 0:
chop_value = 100 * math.log10(atr_sum / high_low_range) / math.log10(self.length)
return chop_value
return None
class ATRCalculator:
"""
ATR指标计算器
支持多种ATR计算方式
"""
def __init__(self, period=1440):
"""
初始化ATR计算器
Args:
period: ATR周期,默认1440(24小时)
"""
self.period = period
def calculate_atr_hma(self, symbol_data):
"""
计算ATR with HMA smoothing - 用于ATR通道(V6版本使用高性能HMA)
完全复现Pine Script的ta.atr()与ta.hma()的组合
Args:
symbol_data: 标的物数据对象
Returns:
HMA平滑的ATR值
"""
if symbol_data.price_window.Count < self.period + 1:
return None
# 计算当前True Range
tr = symbol_data.calculate_true_range(0)
if tr is not None:
# 使用高性能HMA更新ATR平滑
hma_value = symbol_data.hma_atr_smooth.update(tr)
if symbol_data.hma_atr_smooth.is_ready:
return hma_value
return None
def calculate_atr_bands(self, symbol_data, band_period=240, multiplier=3.0):
"""
计算ATR通道
Args:
symbol_data: 标的物数据对象
band_period: 通道周期
multiplier: 通道倍数
Returns:
tuple: (center, upper, lower) 或 (None, None, None)
"""
if symbol_data.price_window.Count >= band_period:
# Pine: atr_band_center = ta.hma(close, atr_band_period)
# 使用高性能HMA计算中心线
current_price = symbol_data.get_current_price()
if current_price is not None:
atr_band_center = symbol_data.hma_atr_band.update(current_price)
if symbol_data.hma_atr_band.is_ready and atr_band_center is not None:
# 计算ATR(使用HMA平滑)
atr_custom = self.calculate_atr_hma(symbol_data)
if atr_custom is not None:
# Pine: atr_upper_band = atr_band_center + (atr_custom * atr_fuel_multiplier)
atr_upper = atr_band_center + (atr_custom * multiplier)
atr_lower = atr_band_center - (atr_custom * multiplier)
return atr_band_center, atr_upper, atr_lower
return None, None, None
class IndicatorEngine:
"""
指标计算引擎
统一管理所有技术指标的计算
"""
def __init__(self, config):
"""
初始化指标引擎
Args:
config: 配置字典,包含各种指标参数
"""
self.config = config
# 初始化各类计算器
self.alma_calculator = ALMACalculator(
config.get('alma_length', 1440),
config.get('alma_offset', 0.85),
config.get('alma_sigma', 6)
)
self.chop_calculator = CHOPCalculator(
config.get('chop_length', 240)
)
self.atr_calculator = ATRCalculator(
config.get('atr_period', 1440)
)
def calculate_all_indicators(self, symbol_data):
"""
计算所有指标
Args:
symbol_data: 标的物数据对象
"""
# 计算ALMA时间切片
self.calculate_alma_slices(symbol_data)
# 计算CHOP
chop_value = self.chop_calculator.calculate_chop(symbol_data)
if chop_value is not None:
symbol_data.current_chop = chop_value
# 更新CHOP历史
symbol_data.chop_history.append(chop_value)
# 计算RSI(使用Pine Script兼容实现)
current_price = symbol_data.get_current_price()
if current_price is not None:
rsi_value = symbol_data.rsi_calculator.update(current_price)
if symbol_data.rsi_calculator.is_ready and rsi_value is not None:
symbol_data.current_rsi = rsi_value
# 计算ATR通道
center, upper, lower = self.atr_calculator.calculate_atr_bands(
symbol_data,
self.config.get('atr_band_period', 240),
self.config.get('atr_fuel_multiplier', 3.0)
)
if center is not None and upper is not None and lower is not None:
symbol_data.current_atr = self.atr_calculator.calculate_atr_hma(symbol_data)
symbol_data.current_atr_upper = upper
symbol_data.current_atr_lower = lower
def calculate_alma_slices(self, symbol_data):
"""计算ALMA时间切片值"""
# 获取足够的价格数据
data_length = self.config.get('alma_length', 1440) + 1440 # 额外1440分钟用于偏移
prices = symbol_data.get_price_list(data_length)
if prices is not None:
# 计算各个时间切片的ALMA值
offsets = self.config.get('alma_offsets', {
'current': 0,
'4h': 240,
'8h': 480,
'12h': 720,
'24h': 1440
})
alma_values = self.alma_calculator.calculate_slices(prices, offsets)
# 更新当前值
for key, value in alma_values.items():
symbol_data.current_alma_values[key] = value
# ====================================================================================================
# region imports
from AlgorithmImports import *
# endregion
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data.Market import TradeBar
from QuantConnect.Orders import *
from QuantConnect.Securities import *
from datetime import datetime, timedelta
# 导入模块化组件
from utils import TimeUtils
from symbol_data import SymbolData
from indicators import IndicatorEngine
from signal_engine import SignalEngine
from position_manager import PositionManager
from drawdown_manager import DrawdownManager # V9.1.1 Bug修复版
from fear_greed_analyzer import FearGreedAnalyzer # V11新增
class MultiAssetAlmaAdaptiveStrategy(QCAlgorithm):
"""
Multi-Asset ALMA Adaptive Strategy V11.3.2 (NoneType Error Fix)
主策略类,统一管理所有组件和标的物
采用模块化架构,每个模块负责特定功能:
- utils: 工具函数
- symbol_data: 数据结构和标的物状态
- indicators: 技术指标计算
- signal_engine: 交易信号生成
- position_manager: 持仓管理和风险控制
- drawdown_manager: 分离式回撤管理(V9.1.1版)
- fear_greed_analyzer: Fear & Greed Index自适应分析(V11新增)
"""
def Initialize(self):
"""初始化策略"""
# === 时区修复(2025-08-20)===
# 关键修复:设置算法时区为UTC,确保所有Schedule.On定时任务按预期执行
self.SetTimeZone(TimeZones.UTC) # 确保使用UTC时区
# === 调试开关(关闭以提高性能)===
self.DEBUG_MODE = False # 设为True开启调试输出
# 设置基本参数 - 最近X天回测
self.SetStartDate(2025, 11, 1) # 开始日期
self.SetEndDate(2025, 11, 8) # 结束日期
# === Binance期货账户设置 ===
self.SetAccountCurrency("USDT")
self.SetCash("USDT", 5000) # 初始资金3000 USDT
# 设置Binance期货经纪商模型
self.SetBrokerageModel(BrokerageName.BinanceFutures, AccountType.Margin)
# === 启用异步处理(官方推荐多标的物使用)===
self.UniverseSettings.Asynchronous = True
# === 期货仓位管理(所有标的物共享配置)===
self.initial_capital = 100 # 每笔交易使用的保证金(USDT)
self.futures_leverage = 20 # 期货杠杆倍数
self.position_value_usdt = self.initial_capital * self.futures_leverage # 实际仓位价值 = 800 USDT
self.max_positions = 1 # 每个标的物最多单向持仓
# === 【V11.3】标的物列表配置 (两段式) ===
# 第一段:初始加载项 (用于快速启动和预热,可自行调整分配)
self.initial_load_symbols = ["MAVIAUSDT","1000CHEEMSUSDT","FLMUSDT","ASRUSDT","PIPPINUSDT","JELLYJELLYUSDT","KERNELUSDT","MELANIAUSDT","DIAUSDT","MYROUSDT","VELODROMEUSDT","AKTUSDT","HIPPOUSDT"]
# 第二段:启动后加载项 (在策略运行后加载)
self.post_startup_load_symbols = ["TRUMPUSDT","MEUSDT","AIXBTUSDT","LUMIAUSDT","AI16ZUSDT","VIRTUALUSDT","1000XECUSDT","FORTHUSDT","PERPUSDT","B2USDT","KDAUSDT","BANKUSDT","DEGENUSDT","SKLUSDT","VANAUSDT","1000XUSDT","BIDUSDT","BANUSDT","EDUUSDT","BANANAUSDT","ZENUSDT","BSVUSDT","OGUSDT","ACXUSDT","HBARUSDT","LQTYUSDT","ALCHUSDT","DOODUSDT","SWARMSUSDT","KAITOUSDT","SLERFUSDT","CHRUSDT","SHELLUSDT","OBOLUSDT","ILVUSDT","VVVUSDT","DASHUSDT","B3USDT","SWELLUSDT","ARCUSDT","SYSUSDT","FIDAUSDT"]
# === 构建配置字典 ===
self.config = self._build_config()
# === 初始化标的物数据 ===
self.symbol_data = {}
self._initialize_symbols(self.initial_load_symbols) # 只初始化第一段
# === 初始化模块化组件 ===
self.indicator_engine = IndicatorEngine(self.config)
self.signal_engine = SignalEngine(self.config)
self.position_manager = PositionManager(self, self.config)
self.drawdown_manager = DrawdownManager(self, self.config) # V9.1.1 Bug修复版
self.fear_greed_analyzer = FearGreedAnalyzer(self, self.config) # V11新增
# V11:初始化Fear & Greed数据源
self.fear_greed_analyzer.initialize()
# 请求历史数据
self.SetWarmUp(2880, Resolution.Minute)
# === V11 ALMA优化:手动预热指标避免等待2880分钟 ===
self._warm_up_manual_indicators(self.initial_load_symbols) # 只预热第一段
# 调度每日任务
self._setup_scheduled_events()
# 初始化所有标的物的策略模式
self._initialize_symbol_strategies(self.initial_load_symbols) # 只初始化第一段
# V11.3.2:启动时检查遗留仓位
self._check_existing_positions()
if self.DEBUG_MODE:
self.Debug(f"Multi-Asset ALMA Adaptive Strategy V11.3.2 initialized with {len(self.symbol_data)} symbols")
self.Debug(f"Fear & Greed Adaptive: {self.config['enable_fear_greed_adaptive']}")
self.Debug(f"Global Monitoring: {self.config['global_monitor_symbols'] or 'ALL'}")
self.Debug(f"Global Use Realized PnL: {self.config['global_use_realized_pnl']}")
self.Debug(f"TP Multiplier: {self.config['tp_multiplier']:.1f}x")
self.Debug(f"Leverage: {self.futures_leverage}x, Position Size: {self.initial_capital} USDT per symbol")
self.Debug(f"Risk Control: 30min monitoring={self.config['enable_30min_risk_control']}, 24h force close={self.config['use_24h_force_close']}")
self.Debug(f"Drawdown Control Mode: {self.config['drawdown_control_mode']}")
def _build_config(self):
"""构建配置字典,供所有模块使用"""
config = {
# 调试模式
'DEBUG_MODE': self.DEBUG_MODE,
# === V11:Fear & Greed Index自适应配置 ===
# 启用Fear & Greed自适应后,会覆盖下面的固定chop_adaptive_mode设置
'enable_fear_greed_adaptive': False, # 是否启用Fear & Greed自适应CHOP模式选择
'debug_fear_greed': True, # Fear & Greed模块独立调试开关
'fear_greed_threshold': 50, # 贪婪阈值:<此值用default,>=此值计算标准化回撤
'fear_greed_lookback_days': 20, # 标准化回撤计算窗口(天)
'fear_greed_drawdown_threshold': 1.5,# 标准化回撤阈值:超过此值选择reverse模式
# ====================================================================================================
# === V9.1.1:分离式回撤管理参数 ===
# ====================================================================================================
# 模式选择(0=禁用, 1=仅个股, 2=全局+个股, 3=完整三层)
'drawdown_control_mode': 2,
# === 第一层:全局监控参数 ===
'global_dd_threshold': 3000,
# V11.3 配置: 全局监控计算方式######################################################################################
# True = 使用已实现盈亏 (采样频率由调用时机决定,当前为每日) 默认采用false
# False = 使用组合总价值 (采样频率由调用时机决定,当前为每日, 复刻官方逻辑)
'global_use_realized_pnl': False,
# V8.9.1可选:分组监控配置(高级功能)只能采用none
'global_monitor_symbols': None, # 监控哪些标的物, None = 监控所有标的物
'global_monitor_groups': None,
'global_group_thresholds': None,
# === 第二层:时间确认参数(仅模式3使用)===
'dd_time_window': 7,
'loss_day_ratio': 0.8,
# === 第三层:个股控制参数 ===
'asset_dd_normal': 300,
'asset_dd_warning': 1,
# 回撤管理日志开关(独立于DEBUG_MODE)
'enable_drawdown_logging': True,
# === V8.5:简化风险控制参数 ===
# 30分钟同向仓位监控
'enable_30min_risk_control': True,
'min_positions_for_monitoring': 3,
'same_direction_threshold': 0.8,
'position_monitoring_window': 30,
# 48小时强制平仓(可选)
'use_24h_force_close': False,
'force_close_hours': 48,
# === ALMA参数(所有标的物使用相同配置)===
'alma_length': 1440,
'alma_offset': 0.85,
'alma_sigma': 6,
# ALMA时间切片偏移(以分钟为单位)
'alma_offsets': {
'current': 0,
'4h': 240, # 4小时 = 240分钟
'8h': 480, # 8小时 = 480分钟
'12h': 720, # 12小时 = 720分钟
'24h': 1440 # 24小时 = 1440分钟
},
# === CHOP参数(所有标的物使用相同配置)===
'chop_length': 240,
'chop_lower': 38.2,
'chop_upper': 61.8,
'use_chop_adaptive': True,
# === CHOP自适应模式开关(V8修复)===
# 'default': CHOP极端值→使用突破逻辑,无极端值→使用回归逻辑(V8默认)
# 'disabled': 禁用CHOP,全程使用回归逻辑
# 'reverse': CHOP极端值→使用回归逻辑,无极端值→使用突破逻辑(V7原模式)
# 注意:当enable_fear_greed_adaptive=True时,此设置会被Fear & Greed动态覆盖
'chop_adaptive_mode': 'disabled',
# === T1/T2时间过滤器参数(V8新增,仅回归逻辑使用)===
'use_time_filter': True,
'time_filter_t1': 60,
'time_filter_t2': 15,
# === 持仓时间定制化参数(V8新增)===
'reversion_position_time': 2880,
'breakout_position_time': 480,
# === 交易参数(所有标的物使用相同配置)===
'entry_min_diff': 0.02,
'entry_max_diff': 0.04,
'min_price_distance': 0.01,
'check_lower_bands': True,
'check_upper_bands': True,
# === ATR参数(所有标的物使用相同配置)=== ATR乘数默认都是3
'atr_period': 1440,
'atr_band_period': 240,
'atr_band_multiplier': 3.0,
'use_strict_atr_position': True,
'atr_fuel_multiplier': 3.0,
'use_atr_fuel': True,
# === RSI参数(所有标的物使用相同配置)===########################################### 似乎关闭未使用
'rsi_period': 14,
'rsi_lower': 30,
'rsi_upper': 70,
# === 风险管理(所有标的物使用相同配置)===
'position_size': self.initial_capital,
'leverage': self.futures_leverage,
'risk_reward_ratio': 2,
'tp_multiplier': 1.5,
'stop_loss_cooldown': 60,
'max_sl_count': 1000,
'check_interval': 1,
# === 时间管理(所有标的物使用相同配置)===
'forbidden_start_hour': 23,
'forbidden_end_hour': 1,
'chop_check_hour': 1,
'chop_check_minute': 1,
# 持仓时间管理
'use_position_timer': True,
'position_time_limit': 2880,
}
return config
def _initialize_symbols(self, symbols_to_add):
"""【V11.3 修改】初始化指定的标的物列表"""
for symbol_str in symbols_to_add:
try:
symbol = self.AddCryptoFuture(symbol_str, Resolution.Minute).Symbol
self.Securities[symbol].SetLeverage(self.futures_leverage)
# 设置BuyingPowerModel.Null避免购买力错误
self.Securities[symbol].SetBuyingPowerModel(BuyingPowerModel.Null)
# 创建SymbolData实例(包含高性能HMA和修复后的RSI)
symbol_data = SymbolData(symbol, self)
self.symbol_data[symbol] = symbol_data
if self.DEBUG_MODE:
self.Debug(f"成功添加期货品种: {symbol_str}")
except Exception as e:
self.Debug(f"添加 {symbol_str} 失败: {str(e)}")
def _setup_scheduled_events(self):
"""设置定时任务"""
try:
# V11:Fear & Greed检查(UTC 00:30,在CHOP检查之前)
if self.config['enable_fear_greed_adaptive']:
self.Schedule.On(
self.DateRules.EveryDay(),
self.TimeRules.At(0, 30), # UTC 00:30 (北京时间 08:30)
self.DailyFearGreedCheck
)
# 每日CHOP检查
self.Schedule.On(
self.DateRules.EveryDay(),
self.TimeRules.At(self.config['chop_check_hour'], self.config['chop_check_minute']),
self.DailyChopCheckAllSymbols
)
# V11.2:每日回撤报告与全局计算(在收盘时)
self.Schedule.On(
self.DateRules.EveryDay(),
self.TimeRules.At(23, 59), # UTC 23:59
self.DailyDrawdownReport
)
# === 【V11.3.1 Bug修复】使用正确的API安排一次性未来任务 ===
if self.post_startup_load_symbols:
future_time = self.Time + timedelta(minutes=5)
self.Schedule.On(
self.DateRules.On(future_time.year, future_time.month, future_time.day),
self.TimeRules.At(future_time.hour, future_time.minute),
self.LoadPostStartupSymbols
)
except Exception as e:
if self.DEBUG_MODE:
self.Debug(f"Schedule setup error: {str(e)}")
def _warm_up_manual_indicators(self, symbols_to_warm_up_str):
"""
【V11.3 修改】手动预热指定的标的物列表
"""
if self.DEBUG_MODE:
self.Debug(f"开始预热 {len(symbols_to_warm_up_str)} 个标的物...")
# 计算所需的历史数据长度
alma_period = self.config['alma_length'] # 1440
max_offset = max(self.config['alma_offsets'].values()) # 1440
total_needed = alma_period + max_offset # 2880
# 将字符串列表转换为Symbol对象列表进行遍历
symbol_objects = [s for s in self.symbol_data.keys() if s.Value in symbols_to_warm_up_str]
for symbol in symbol_objects:
symbol_data = self.symbol_data[symbol]
try:
# 获取历史数据
history = self.History[TradeBar](symbol, total_needed, Resolution.Minute)
if history is not None and len(list(history)) > 0:
history_list = list(history)
actual_bars = len(history_list)
if actual_bars >= total_needed:
# 预热价格窗口
for bar in history_list:
symbol_data.update_data_windows(bar)
# 更新指标
self.indicator_engine.calculate_all_indicators(symbol_data)
# 更新前值,为下一次计算做准备
symbol_data.update_previous_values()
if self.DEBUG_MODE:
self.Debug(f"成功预热 {symbol}: {actual_bars} 根K线")
else:
if self.DEBUG_MODE:
self.Debug(f"警告:{symbol} 历史数据不足 ({actual_bars}/{total_needed}),可能影响ALMA计算")
else:
if self.DEBUG_MODE:
self.Debug(f"警告:{symbol} 无法获取历史数据")
except Exception as e:
if self.DEBUG_MODE:
self.Debug(f"预热 {symbol} 失败: {str(e)}")
def _initialize_symbol_strategies(self, symbols_to_initialize_str):
"""【V11.3 修改】初始化指定的标的物列表的策略模式"""
symbol_objects = [s for s in self.symbol_data.keys() if s.Value in symbols_to_initialize_str]
for symbol in symbol_objects:
if symbol in self.symbol_data:
self.signal_engine.initialize_symbol_strategy(self.symbol_data[symbol])
def _check_existing_positions(self):
"""V11.3.2:启动时检查并标记遗留仓位"""
for symbol in self.symbol_data.keys():
if self.Portfolio[symbol].Invested:
self.Log(f"[Startup Check] {symbol.Value}: Found existing position")
# OnData中的防御代码会处理,这里只是记录
def LoadPostStartupSymbols(self):
"""
【V11.3 新增】
此函数由一次性定时任务调用,用于加载和预热第二段的标的物。
"""
if not self.post_startup_load_symbols:
return
self.Log(f"开始加载 {len(self.post_startup_load_symbols)} 个启动后标的物: {', '.join(self.post_startup_load_symbols)}")
# 1. 初始化第二段的标的物
self._initialize_symbols(self.post_startup_load_symbols)
# 2. 预热第二段的标的物
self._warm_up_manual_indicators(self.post_startup_load_symbols)
# 3. 初始化第二段标的物的策略模式
self._initialize_symbol_strategies(self.post_startup_load_symbols)
self.Log("所有启动后标的物已加载并预热完毕。")
def DailyFearGreedCheck(self):
"""V11:每日Fear & Greed检查,设置CHOP模式"""
# === 定时任务执行监控(2025-08-20添加)===
# 记录执行时间,确认Schedule.On是否正常触发
# 预期:每天UTC 00:30执行一次
self.Log(f"[FearGreed] DailyCheck执行 - {self.Time}")
# 分析Fear & Greed Index并设置CHOP模式
self.fear_greed_analyzer.analyze_and_set_mode(self.Time)
# 获取决定的模式
new_mode = self.fear_greed_analyzer.get_current_mode()
# 更新signal_engine的CHOP模式
if hasattr(self.signal_engine, 'chop_analyzer'):
self.signal_engine.chop_analyzer.chop_adaptive_mode = new_mode
if self.config['debug_fear_greed']:
self.Debug(f"[FearGreed] CHOP模式已更新为: {new_mode}")
def OnData(self, data):
"""处理所有标的物的K线数据"""
# V11:更新Fear & Greed实时数据(如果启用)
if self.config['enable_fear_greed_adaptive']:
self.fear_greed_analyzer.update_from_data_slice(data)
# 批量处理所有标的物的数据更新和指标计算
for symbol, symbol_data in self.symbol_data.items():
if not data.ContainsKey(symbol) or data[symbol] is None:
continue
# 获取当前K线数据
bar = data[symbol]
# 更新数据窗口
symbol_data.update_data_windows(bar)
# 等待数据充足
if not symbol_data.is_data_ready():
continue
# 保存当前值为前值(用于下一根K线的交易决策)
symbol_data.update_previous_values()
# 计算当前指标(仅用于下一根K线)
self.indicator_engine.calculate_all_indicators(symbol_data)
# 跳过预热期
if self.IsWarmingUp:
return
# V11:在预热期结束后加载Fear & Greed历史数据(只执行一次)
if self.config['enable_fear_greed_adaptive'] and not hasattr(self, '_fg_data_loaded'):
self.fear_greed_analyzer.load_historical_data_segments()
self._fg_data_loaded = True
# V8.9.1:初始化回撤管理器的组合价值(仅执行一次)
if not hasattr(self, '_drawdown_initialized'):
self.drawdown_manager.initialize_portfolio_value(self.Portfolio.TotalPortfolioValue)
self._drawdown_initialized = True
# 【V11.2】高频任务: 盘中实时更新个股回撤状态
self.drawdown_manager.update_intraday_asset_status()
# 检查是否在禁止交易时间
if TimeUtils.is_in_forbidden_time(
self.Time,
self.config['forbidden_start_hour'],
self.config['forbidden_end_hour']
):
return
# V8.8 关键修复:分离持仓管理和新开仓逻辑
# 处理每个标的物的交易逻辑
for symbol, symbol_data in self.symbol_data.items():
if not data.ContainsKey(symbol):
continue
bar = data[symbol]
# ===== V11.3.2 防御性止损止盈检查 =====
# 必须在调用manage_position之前执行
if self.Portfolio[symbol].Invested:
# 检查止损止盈是否为None
if symbol_data.position_sl is None or symbol_data.position_tp is None:
position = self.Portfolio[symbol]
current_price = bar.Close
# 记录问题发生的详细信息,帮助后续调试
self.Log(f"[WARNING] {symbol.Value}: Detected position without SL/TP")
self.Log(f" Time: {self.Time}")
self.Log(f" Quantity: {position.Quantity}")
self.Log(f" Current Price: {current_price:.4f}")
# 使用配置中的参数计算合理的止损止盈
sl_percentage = 0.02 # 2%止损
tp_percentage = self.config['risk_reward_ratio'] * sl_percentage
tp_percentage *= self.config['tp_multiplier']
if position.Quantity > 0: # 多仓
symbol_data.position_sl = current_price * (1 - sl_percentage)
symbol_data.position_tp = current_price * (1 + tp_percentage)
symbol_data.position_direction = 1
# 设置合理的use_reversal_signal(默认使用回归模式)
if not hasattr(symbol_data, 'use_reversal_signal'):
symbol_data.use_reversal_signal = False
elif position.Quantity < 0: # 空仓
symbol_data.position_sl = current_price * (1 + sl_percentage)
symbol_data.position_tp = current_price * (1 - tp_percentage)
symbol_data.position_direction = -1
if not hasattr(symbol_data, 'use_reversal_signal'):
symbol_data.use_reversal_signal = False
# 补充其他可能缺失的字段
if symbol_data.position_entry_time is None:
symbol_data.position_entry_time = self.Time
if symbol_data.position_entry_price is None:
symbol_data.position_entry_price = current_price
if not hasattr(symbol_data, 'bars_since_entry'):
symbol_data.bars_since_entry = 0
self.Log(f" Fixed - SL: {symbol_data.position_sl:.4f}, TP: {symbol_data.position_tp:.4f}")
# ===== 防御性检查结束 =====
# V8.8 修复:先检查是否允许新开仓
can_open_new_position = self.drawdown_manager.can_trade(symbol)
# V8.8 关键修复:持仓管理必须始终执行(包括止损)
# 无论是否允许新开仓,都要管理现有持仓
# 现在这里100%安全,因为上面已经确保了止损止盈不为None
self.position_manager.manage_position(symbol_data, bar)
# 以下逻辑仅在允许新开仓时执行
if not can_open_new_position:
continue # 仅跳过新开仓逻辑,不影响上面的持仓管理
# 检查冷却期
if symbol_data.is_in_cooldown(self.Time, self.config['stop_loss_cooldown']):
continue
# 生成交易信号
signal = self.signal_engine.generate_signal(symbol_data, bar, self.Time)
# 执行新开仓交易
if signal != 0:
success = self.position_manager.execute_trade(symbol_data, signal, bar)
def DailyChopCheckAllSymbols(self):
"""对所有标的物执行每日CHOP检查(UTC 01:01)"""
# === 定时任务执行监控(2025-08-20添加)===
# 记录执行时间,确认Schedule.On是否正常触发
# 预期:每天UTC 01:01执行一次
self.Log(f"[CHOP] DailyCheck执行 - {self.Time}")
for symbol, symbol_data in self.symbol_data.items():
if not symbol_data.is_data_ready():
continue
# 执行CHOP检查
strategy_changed, debug_info = self.signal_engine.execute_daily_chop_check(
symbol_data, self.Time, self.DEBUG_MODE
)
if self.DEBUG_MODE and debug_info:
china_time = TimeUtils.get_china_time(self.Time)
self.Debug(f"{symbol}: V11.3.2 CHOP CHECK\n{debug_info}")
def DailyDrawdownReport(self):
"""V11.2:每日回撤报告与全局状态更新"""
# === 定时任务执行监控(2025-08-20添加)===
# 记录执行时间,确认Schedule.On是否正常触发
# 预期:每天UTC 23:59执行一次
self.Log(f"[Drawdown] DailyReport执行 - {self.Time}")
# 获取当前组合价值,用于所有每日计算
current_value = self.Portfolio.TotalPortfolioValue
# 【V11.2】低频任务
# 1. 更新用于时间确认层的每日PnL历史
self.drawdown_manager.update_daily_pnl_history(current_value)
# 2. 执行每日一次的全局回撤计算
self.drawdown_manager.update_daily_global_status(current_value)
# 3. 输出日志
self.drawdown_manager.log_daily_status()
def OnOrderEvent(self, orderEvent):
"""处理订单事件"""
if self.DEBUG_MODE:
order = self.Transactions.GetOrderById(orderEvent.OrderId)
if order is not None:
self.Debug(f"订单事件: {order.Symbol} {orderEvent.Status} {orderEvent.FillQuantity}")
def OnEndOfAlgorithm(self):
"""算法结束时的清理工作"""
# V11.3:输出最终回撤管理状态
if self.config['drawdown_control_mode'] > 0:
status = self.drawdown_manager.get_status_summary()
self.Log(f"Final Drawdown Status V11.3.2: Mode={status.get('mode', 0)}")
# 输出全局监控信息
if 'global_monitoring' in status:
monitoring_info = status['global_monitoring']
if monitoring_info.get('type') == 'realized_pnl':
self.Log(f" Global PnL: ${monitoring_info.get('current_pnl', 0):.2f}, "
f"Peak: ${monitoring_info.get('pnl_peak', 0):.2f}, "
f"Drawdown: ${monitoring_info.get('drawdown', 0):.2f}")
else: # 对应 Equity 模式
self.Log(f" Portfolio Equity: ${monitoring_info.get('current_equity', 0):.2f}, "
f"Peak: ${monitoring_info.get('high_water_mark', 0):.2f}, "
f"Drawdown: ${monitoring_info.get('drawdown', 0):.2f}")
# 输出监控的标的物
monitored = status.get('monitored_symbols', 'ALL')
self.Log(f" Monitored Symbols: {monitored}")
# 输出停止的标的物
stopped = status.get('stopped_assets', [])
if stopped:
self.Log(f" Stopped Assets: {', '.join(stopped)}")
# 输出分组状态(如果有)
if 'group_status' in status and status.get('group_status'):
self.Log(" Group Status:")
for group, info in status['group_status'].items():
self.Log(f" {group}: PnL ${info.get('pnl', 0):.2f}, "
f"Drawdown ${info.get('drawdown', 0):.2f}")
# V11:输出Fear & Greed状态
if self.config['enable_fear_greed_adaptive']:
final_mode = self.fear_greed_analyzer.get_current_mode()
self.Log(f"Final CHOP Mode (Fear&Greed): {final_mode}")
if self.DEBUG_MODE:
total_trades = 0
for symbol_data in self.symbol_data.values():
if hasattr(symbol_data, 'sl_count'):
total_trades += symbol_data.sl_count
self.Debug(f"V11.3.2 Strategy 结束")
self.Debug(f"总交易次数: {total_trades}")
self.Debug(f"最终权益: {self.Portfolio.TotalPortfolioValue}")
# ====================================================================================================
# V10模块化拆分 - 持仓管理模块 (position_manager.py)
#
# V10新增功能:
# - 支持take-profit multiplier功能
# - 动态调整止盈距离
# - 保持与V8.5的完整兼容性
#
# 本模块包含:
# 1. PositionSizer类 - 仓位大小计算
# 2. StopLossCalculator类 - 止盈止损计算(V10增加tp_multiplier支持)
# 3. RiskController类 - V8.5风险控制(30分钟同向监控、24小时强制平仓)
# 4. PositionManager类 - 统一持仓管理(V10增加tp_multiplier支持)
# 5. 所有与仓位管理相关的逻辑
# ====================================================================================================
from AlgorithmImports import *
from datetime import datetime, timedelta
from collections import deque
from symbol_data import PositionRecord
from utils import (ValidationUtils, MathUtils, RiskControlUtils,
PositionSizeUtils, LoggingUtils)
class PositionSizer:
"""
仓位大小计算器
"""
def __init__(self, config):
"""
初始化仓位大小计算器
Args:
config: 配置字典
"""
self.config = config
self.position_size = config.get('position_size', 50) # 每笔交易使用的保证金(USDT)
self.leverage = config.get('leverage', 20) # 期货杠杆倍数
def calculate_position_size(self, price):
"""
计算仓位大小
Args:
price: 当前价格
Returns:
float: 合约数量
"""
return PositionSizeUtils.calculate_futures_position_size(
self.position_size, price, self.leverage
)
def is_valid_order_size(self, securities, symbol, quantity):
"""
验证订单大小是否满足交易所要求
Args:
securities: 证券对象集合
symbol: 交易标的
quantity: 订单数量
Returns:
bool: True表示订单大小有效
"""
return ValidationUtils.is_valid_order_size(securities, symbol, quantity)
class StopLossCalculator:
"""
止盈止损计算器
完全复现Pine Script逻辑
"""
def __init__(self, config):
"""
初始化止盈止损计算器
Args:
config: 配置字典
"""
self.config = config
self.risk_reward_ratio = config.get('risk_reward_ratio', 2.0)
# V10新增:止盈倍数,1.0表示原始止盈距离
self.tp_multiplier = config.get('tp_multiplier', 1.0)
def calculate_stops(self, symbol_data, signal, entry_price):
"""
计算止盈止损 - 完全复现Pine Script逻辑
Args:
symbol_data: 标的物数据对象
signal: 交易信号 (1: 做多, -1: 做空)
entry_price: 开仓价格
Returns:
tuple: (tp, sl) - 止盈价格和止损价格
"""
# 检查必要的值是否存在
if symbol_data.prev_alma_values['24h'] is None:
# 如果24H ALMA为None,使用默认值
default_alma_24h = entry_price * 1.01 # 默认1%的偏移
else:
default_alma_24h = symbol_data.prev_alma_values['24h']
# 获取ALMA间距
profit_space = symbol_data.get_alma_spacing_for_stops(signal)
if signal == 1: # 做多
if symbol_data.use_reversal_signal: # 突破模式
# Pine: fixed_sl := alma_24h
# Pine: dynamic_level := long_entry_price + (long_profit_space * fixed_risk_reward)
sl = default_alma_24h # 固定止损
# V10:应用tp_multiplier到动态止盈
tp = entry_price + (profit_space * self.risk_reward_ratio * self.tp_multiplier)
else: # 回归模式
# Pine: fixed_tp := alma_24h
# Pine: dynamic_level := long_entry_price - (long_profit_space / fixed_risk_reward)
# V10:回归模式下扩展固定止盈
if self.tp_multiplier != 1.0:
# 当tp_multiplier != 1.0时,扩展固定止盈
tp_extension = abs(default_alma_24h - entry_price) * (self.tp_multiplier - 1.0)
tp = default_alma_24h + tp_extension # 向上扩展止盈
else:
tp = default_alma_24h # 固定止盈(原始逻辑)
sl = entry_price - (profit_space / self.risk_reward_ratio) # 动态止损
else: # 做空
if symbol_data.use_reversal_signal: # 突破模式
# Pine: fixed_sl := alma_24h
# Pine: dynamic_level := short_entry_price - (short_profit_space * fixed_risk_reward)
sl = default_alma_24h # 固定止损
# V10:应用tp_multiplier到动态止盈
tp = entry_price - (profit_space * self.risk_reward_ratio * self.tp_multiplier)
else: # 回归模式
# Pine: fixed_tp := alma_24h
# Pine: dynamic_level := short_entry_price + (short_profit_space / fixed_risk_reward)
# V10:回归模式下扩展固定止盈
if self.tp_multiplier != 1.0:
# 当tp_multiplier != 1.0时,扩展固定止盈
tp_extension = abs(default_alma_24h - entry_price) * (self.tp_multiplier - 1.0)
tp = default_alma_24h - tp_extension # 向下扩展止盈
else:
tp = default_alma_24h # 固定止盈(原始逻辑)
sl = entry_price + (profit_space / self.risk_reward_ratio) # 动态止损
return tp, sl
class RiskController:
"""
V8.5新增风险控制器
实现30分钟同向仓位监控和24小时强制平仓
"""
def __init__(self, config):
"""
初始化风险控制器
Args:
config: 配置字典
"""
self.config = config
# 30分钟同向仓位监控参数
self.enable_30min_risk_control = config.get('enable_30min_risk_control', True)
self.min_positions_for_monitoring = config.get('min_positions_for_monitoring', 3)
self.same_direction_threshold = config.get('same_direction_threshold', 0.8)
self.position_monitoring_window = config.get('position_monitoring_window', 30)
# 24小时强制平仓参数
self.use_24h_force_close = config.get('use_24h_force_close', True)
self.force_close_hours = config.get('force_close_hours', 24)
# 仓位记录(用于30分钟监控)
self.position_records = deque()
def check_same_direction_risk(self, intended_direction, current_time):
"""
检查30分钟同向仓位风险
Args:
intended_direction: 拟开仓方向 (1: 多头, -1: 空头)
current_time: 当前时间
Returns:
bool: True=允许开仓, False=禁止开仓
"""
if not self.enable_30min_risk_control:
return True # 未启用风险控制,直接允许
# 清理过期的仓位记录(30分钟以外的)
cutoff_time = current_time - timedelta(minutes=self.position_monitoring_window)
self.position_records = deque([
record for record in self.position_records
if record.open_time >= cutoff_time
])
# 检查是否应该阻止同向开仓
should_block = RiskControlUtils.should_block_same_direction(
self.position_records,
intended_direction,
self.min_positions_for_monitoring,
self.same_direction_threshold
)
return not should_block # 风险检测到则禁止开仓
def record_new_position(self, symbol, direction, open_time):
"""
记录新开仓位
Args:
symbol: 标的物符号
direction: 开仓方向 (1: 多头, -1: 空头)
open_time: 开仓时间
"""
if self.enable_30min_risk_control:
record = PositionRecord(symbol, direction, open_time)
self.position_records.append(record)
def should_force_close_by_time(self, symbol_data, current_time):
"""
检查24小时强制平仓
Args:
symbol_data: 标的物数据对象
current_time: 当前时间
Returns:
bool: True=需要强制平仓, False=无需强制平仓
"""
if not self.use_24h_force_close:
return False # 未启用24小时强制平仓
return symbol_data.should_force_close_by_time(current_time, self.force_close_hours)
def get_risk_control_debug_info(self):
"""
获取风险控制调试信息
Returns:
str: 风险控制信息字符串
"""
risk_items = []
if self.enable_30min_risk_control:
risk_items.append(
f"30min Monitor: ≥{self.min_positions_for_monitoring} positions, "
f"<{self.same_direction_threshold:.0%} same direction"
)
if self.use_24h_force_close:
risk_items.append("24h Force Close: Enabled")
if risk_items:
return f"\nRisk Control: {', '.join(risk_items)}"
return ""
class TradeExecutor:
"""
交易执行器
负责实际的开仓和平仓操作
"""
def __init__(self, algorithm):
"""
初始化交易执行器
Args:
algorithm: 主算法实例
"""
self.algorithm = algorithm
def execute_entry(self, symbol_data, signal, entry_price, tp, sl, quantity):
"""
执行开仓操作
Args:
symbol_data: 标的物数据对象
signal: 交易信号
entry_price: 开仓价格
tp: 止盈价格
sl: 止损价格
quantity: 仓位数量
Returns:
bool: 是否成功开仓
"""
try:
# 记录开仓信息(先设置,再下单)
symbol_data.position_entry_time = self.algorithm.Time
symbol_data.position_entry_price = entry_price
symbol_data.position_tp = tp
symbol_data.position_sl = sl
# 下单
if signal == 1: # 做多
self.algorithm.MarketOrder(symbol_data.symbol, quantity)
return True
else: # 做空
self.algorithm.MarketOrder(symbol_data.symbol, -quantity)
return True
except Exception as e:
if hasattr(self.algorithm, 'Debug'):
self.algorithm.Debug(f"执行开仓失败 {symbol_data.symbol}: {str(e)}")
return False
def execute_exit(self, symbol_data, reason="Unknown"):
"""
执行平仓操作
Args:
symbol_data: 标的物数据对象
reason: 平仓原因
Returns:
bool: 是否成功平仓
"""
try:
# 使用 MarketOrder 反向平仓,确保保证金释放
quantity = self.algorithm.Portfolio[symbol_data.symbol].Quantity
if quantity != 0:
self.algorithm.MarketOrder(symbol_data.symbol, -quantity)
return True
return False
except Exception as e:
if hasattr(self.algorithm, 'Debug'):
self.algorithm.Debug(f"执行平仓失败 {symbol_data.symbol} ({reason}): {str(e)}")
return False
class PositionManager:
"""
统一持仓管理器
整合所有持仓管理功能
"""
def __init__(self, algorithm, config):
"""
初始化持仓管理器
Args:
algorithm: 主算法实例
config: 配置字典
"""
self.algorithm = algorithm
self.config = config
# 初始化各个组件
self.position_sizer = PositionSizer(config)
self.stop_calculator = StopLossCalculator(config)
self.risk_controller = RiskController(config)
self.trade_executor = TradeExecutor(algorithm)
# 持仓管理参数
self.use_position_timer = config.get('use_position_timer', True)
self.reversion_position_time = config.get('reversion_position_time', 1440)
self.breakout_position_time = config.get('breakout_position_time', 60)
self.check_interval = config.get('check_interval', 1)
self.max_sl_count = config.get('max_sl_count', 1000)
# V10新增:止盈倍数支持
self.tp_multiplier = config.get('tp_multiplier', 1.0)
# 调试模式
self.debug_mode = config.get('DEBUG_MODE', False)
def execute_trade(self, symbol_data, signal, bar):
"""
执行交易(开仓)
Args:
symbol_data: 标的物数据对象
signal: 交易信号
bar: K线数据
Returns:
bool: 是否成功开仓
"""
# V8.5新增:30分钟同向仓位风险检查
if not self.risk_controller.check_same_direction_risk(signal, self.algorithm.Time):
if self.debug_mode:
direction_name = "LONG" if signal == 1 else "SHORT"
self.algorithm.Debug(f"V8.5 Risk Control: {direction_name} blocked for {symbol_data.symbol}")
return False
# 检查是否已有持仓
if self.algorithm.Portfolio[symbol_data.symbol].Invested:
return False
close = float(bar.Close)
# 计算仓位大小
contracts = self.position_sizer.calculate_position_size(close)
# 验证订单大小
if not self.position_sizer.is_valid_order_size(
self.algorithm.Securities, symbol_data.symbol, contracts):
return False
# 保存当前ALMA间距用于止盈止损计算
if (symbol_data.prev_alma_values['current'] is not None and
symbol_data.prev_alma_values['24h'] is not None):
alma_spacing = abs(
symbol_data.prev_alma_values['current'] -
symbol_data.prev_alma_values['24h']
)
symbol_data.set_alma_spacing_for_stops(signal, alma_spacing)
else:
# 如果ALMA值为None,使用默认值
default_spacing = close * 0.02 # 2%的默认空间
symbol_data.set_alma_spacing_for_stops(signal, default_spacing)
# 计算止盈止损
tp, sl = self.stop_calculator.calculate_stops(symbol_data, signal, close)
# 执行开仓
success = self.trade_executor.execute_entry(
symbol_data, signal, close, tp, sl, contracts)
if success:
# V8.5新增:记录新开仓位
self.risk_controller.record_new_position(symbol_data.symbol, signal, self.algorithm.Time)
# 记录开仓日志
if self.debug_mode:
self.log_entry(symbol_data, signal, close)
return success
def manage_position(self, symbol_data, bar):
"""
管理现有持仓(V8.5版本,增加24小时强制平仓)
Args:
symbol_data: 标的物数据对象
bar: K线数据
"""
if not self.algorithm.Portfolio[symbol_data.symbol].Invested:
return
close = float(bar.Close)
high = float(bar.High)
low = float(bar.Low)
holding = self.algorithm.Portfolio[symbol_data.symbol]
# 更新持仓时间
if symbol_data.position_entry_time is not None:
symbol_data.bars_since_entry += 1
# V8.5新增:检查24小时强制平仓
if self.risk_controller.should_force_close_by_time(symbol_data, self.algorithm.Time):
success = self.trade_executor.execute_exit(symbol_data, '24H Force Close')
if success:
if self.debug_mode:
self.log_exit(symbol_data, '24H Force Close', close)
symbol_data.reset_position_vars()
return
# 检查止盈止损(使用高低价)
tp_hit = False
sl_hit = False
if symbol_data.use_reversal_signal: # 突破模式:动态止盈,固定止损
if holding.IsLong:
tp_hit = high > symbol_data.position_tp # dynamic_level是止盈
sl_hit = low < symbol_data.position_sl # fixed_sl是止损
else:
tp_hit = low < symbol_data.position_tp # dynamic_level是止盈
sl_hit = high > symbol_data.position_sl # fixed_sl是止损
else: # 回归模式:固定止盈,动态止损
if holding.IsLong:
tp_hit = high > symbol_data.position_tp # fixed_tp是止盈
sl_hit = low < symbol_data.position_sl # dynamic_level是止损
else:
tp_hit = low < symbol_data.position_tp # fixed_tp是止盈
sl_hit = high > symbol_data.position_sl # dynamic_level是止损
if tp_hit or sl_hit:
success = self.trade_executor.execute_exit(
symbol_data, 'TP' if tp_hit else 'SL')
if success:
# 使用TotalCloseProfit计算实际盈亏(包含手续费)
close_profit = holding.TotalCloseProfit()
is_profitable = close_profit > 0
symbol_data.last_trade_profit = is_profitable
# 更新T1/T2时间过滤器状态(仅在回归模式下)
if not symbol_data.use_reversal_signal and self.config.get('use_time_filter', True):
symbol_data.update_time_filter_state(is_profitable)
if sl_hit:
symbol_data.last_stop_loss_time = self.algorithm.Time
symbol_data.sl_count += 1
# 检查止损次数
if symbol_data.sl_count >= self.max_sl_count:
symbol_data.strategy_active = False
if self.debug_mode:
self.algorithm.Debug(
f"{symbol_data.symbol}: Strategy deactivated after "
f"{self.max_sl_count} stop losses"
)
if self.debug_mode:
self.log_exit(symbol_data, 'TP' if tp_hit else 'SL', close)
symbol_data.reset_position_vars()
return
# 持仓时间检查(V8版本:使用定制化持仓时间)
if self.use_position_timer and not symbol_data.timer_triggered:
# 添加check_interval条件,与Pine Script保持一致
if symbol_data.bars_since_entry % self.check_interval == 0:
time_elapsed = (
self.algorithm.Time - symbol_data.position_entry_time
).total_seconds() / 60
# V8: 根据交易模式选择持仓时间限制
position_time_limit = (
self.reversion_position_time if not symbol_data.use_reversal_signal
else self.breakout_position_time
)
if time_elapsed >= position_time_limit:
# 计算当前盈亏
current_profit = PositionSizeUtils.calculate_position_profit_percent(
symbol_data.position_entry_price, close, holding.IsLong
)
if current_profit > 0:
success = self.trade_executor.execute_exit(symbol_data, 'Timer Profit Close')
if success:
symbol_data.timer_triggered = True
if self.debug_mode:
strategy_type = ("Breakout" if symbol_data.use_reversal_signal
else "Reversion")
self.log_exit(
symbol_data,
f'Timer Profit Close ({strategy_type})',
close
)
symbol_data.reset_position_vars()
def log_entry(self, symbol_data, signal, price):
"""记录开仓信息(V8.5版本)"""
direction = "LONG" if signal == 1 else "SHORT"
strategy_type = "Breakout" if symbol_data.use_reversal_signal else "Reversion"
# 安全计算ALMA差值
alma_diff = 0
if (symbol_data.prev_alma_values['current'] is not None and
symbol_data.prev_alma_values['24h'] is not None):
alma_diff = MathUtils.calculate_percentage_diff(
symbol_data.prev_alma_values['current'],
symbol_data.prev_alma_values['24h']
)
# 安全检查ATR位置
atr_position = 'N/A'
if (symbol_data.prev_atr_upper is not None and
symbol_data.prev_atr_lower is not None):
if price > symbol_data.prev_atr_upper:
atr_position = 'Above Upper'
elif price < symbol_data.prev_atr_lower:
atr_position = 'Below Lower'
else:
atr_position = 'Inside Channel'
# 构建额外的调试信息
debug_info = ""
# V8新增:时间过滤器信息
if not symbol_data.use_reversal_signal and self.config.get('use_time_filter', True):
threshold = "T1(60min)" if symbol_data.using_t1_filter else "T2(15min)"
debug_info += f"\nTime Filter: {threshold} (Reversion Mode Only)"
# V8新增:持仓时间信息
position_time_limit = (
self.reversion_position_time if not symbol_data.use_reversal_signal
else self.breakout_position_time
)
debug_info += f"\nPosition Time Limit: {position_time_limit} min"
# V8.5新增:风险控制信息
debug_info += self.risk_controller.get_risk_control_debug_info()
# V10新增:止盈倍数信息
if self.tp_multiplier != 1.0:
debug_info += f"\nTP Multiplier: {self.tp_multiplier:.1f}x"
# 使用工具函数格式化日志
log_message = LoggingUtils.format_entry_log(
symbol_data, direction, price, strategy_type, alma_diff,
atr_position, symbol_data.prev_chop, symbol_data.prev_rsi,
symbol_data.position_tp, symbol_data.position_sl,
self.algorithm.Time, debug_info
)
self.algorithm.Debug(log_message)
def log_exit(self, symbol_data, reason, price):
"""记录平仓信息"""
log_message = LoggingUtils.format_exit_log(
symbol_data, reason, price, symbol_data.position_entry_price,
self.algorithm.Time
)
self.algorithm.Debug(log_message)# ====================================================================================================
# V8.5模块化拆分 - 信号生成引擎模块 (signal_engine.py)
#
# 本模块包含:
# 1. SignalEngine类 - 核心信号生成引擎
# 2. BaseConditionChecker类 - 基础条件检查器
# 3. TimeFilterManager类 - T1/T2时间过滤器管理器
# 4. CHOPAnalyzer类 - CHOP分析器
# 5. 所有与交易信号生成相关的逻辑
# ====================================================================================================
from AlgorithmImports import *
from datetime import datetime, timedelta
import math
from utils import ValidationUtils, MathUtils
class BaseConditionChecker:
"""
基础条件检查器
负责检查C1-C4基础交易条件
"""
def __init__(self, config):
"""
初始化基础条件检查器
Args:
config: 配置字典
"""
self.config = config
# 基础交易参数
self.entry_min_diff = config.get('entry_min_diff', 0.02)
self.entry_max_diff = config.get('entry_max_diff', 0.04)
self.min_price_distance = config.get('min_price_distance', 0.01)
self.check_lower_bands = config.get('check_lower_bands', True)
self.check_upper_bands = config.get('check_upper_bands', True)
def check_base_conditions(self, symbol_data, close):
"""
检查基础条件C1-C4
Args:
symbol_data: 标的物数据对象
close: 当前收盘价
Returns:
tuple: (long_base, short_base) - 多头和空头基础条件是否满足
"""
# 使用前值进行所有计算
current_alma = symbol_data.prev_alma_values['current']
alma_24h = symbol_data.prev_alma_values['24h']
if current_alma is None or alma_24h is None:
return False, False
# C1: ALMA差值检查
alma_diff = MathUtils.calculate_percentage_diff(current_alma, alma_24h)
c1 = self.entry_min_diff <= alma_diff <= self.entry_max_diff
# 如果C1不满足,直接返回
if not c1:
return False, False
# C2: 回归方向
c2_long = alma_24h > current_alma # 24H ALMA在上方,期待上涨
c2_short = alma_24h < current_alma # 24H ALMA在下方,期待下跌
# C3: 保护线检查
c3_long = self.check_protection_bands(symbol_data, 'long')
c3_short = self.check_protection_bands(symbol_data, 'short')
# C4: 价格位置
c4_long = close > current_alma and close < alma_24h
c4_short = close < current_alma and close > alma_24h
# alma_condition: 最小距离
alma_condition = MathUtils.calculate_percentage_diff(close, alma_24h) >= self.min_price_distance
# 根据模式组合条件
if symbol_data.use_reversal_signal: # 突破模式
# 使用原空头条件作为多头条件
long_base = c1 and c2_short and c3_short and c4_short and alma_condition
# 使用原多头条件作为空头条件
short_base = c1 and c2_long and c3_long and c4_long and alma_condition
else: # 回归模式
# 正常的多空条件
long_base = c1 and c2_long and c3_long and c4_long and alma_condition
short_base = c1 and c2_short and c3_short and c4_short and alma_condition
return long_base, short_base
def check_protection_bands(self, symbol_data, direction):
"""
检查保护线条件
Args:
symbol_data: 标的物数据对象
direction: 检查方向 ('long' 或 'short')
Returns:
bool: 保护线条件是否满足
"""
current_alma = symbol_data.prev_alma_values['current']
if current_alma is None:
return False
# 优化:缓存ALMA值避免重复访问
alma_4h = symbol_data.prev_alma_values['4h']
alma_8h = symbol_data.prev_alma_values['8h']
alma_12h = symbol_data.prev_alma_values['12h']
if direction == 'long' and self.check_lower_bands:
# 至少一条中间ALMA在current_alma下方
return (
(alma_4h is not None and alma_4h < current_alma) or
(alma_8h is not None and alma_8h < current_alma) or
(alma_12h is not None and alma_12h < current_alma)
)
elif direction == 'short' and self.check_upper_bands:
# 至少一条中间ALMA在current_alma上方
return (
(alma_4h is not None and alma_4h > current_alma) or
(alma_8h is not None and alma_8h > current_alma) or
(alma_12h is not None and alma_12h > current_alma)
)
return True
class TimeFilterManager:
"""
T1/T2时间过滤器管理器
V8新增功能,仅在回归模式下使用
"""
def __init__(self, config):
"""
初始化时间过滤器管理器
Args:
config: 配置字典
"""
self.config = config
self.use_time_filter = config.get('use_time_filter', True)
self.time_filter_t1 = config.get('time_filter_t1', 60) # T1时间:60分钟
self.time_filter_t2 = config.get('time_filter_t2', 15) # T2时间:15分钟
def apply_time_filter(self, symbol_data, long_base, short_base, current_time):
"""
应用T1/T2时间过滤器(仅回归模式且启用时间过滤器时)
Args:
symbol_data: 标的物数据对象
long_base: 多头基础条件
short_base: 空头基础条件
current_time: 当前时间
Returns:
tuple: (filtered_long_base, filtered_short_base)
"""
# 仅在回归模式且启用时间过滤器时应用
if symbol_data.use_reversal_signal or not self.use_time_filter:
return long_base, short_base
current_conditions_met = long_base or short_base
# 确定当前使用的时间阈值
current_threshold = self.time_filter_t1 if symbol_data.using_t1_filter else self.time_filter_t2
symbol_data.current_time_threshold = current_threshold
if current_conditions_met:
# 条件满足
if not symbol_data.time_filter_conditions_met:
# 第一次满足条件,开始计时
symbol_data.time_filter_start_time = current_time
symbol_data.time_filter_conditions_met = True
# 检查是否达到时间要求
if symbol_data.time_filter_start_time is not None:
elapsed_minutes = (current_time - symbol_data.time_filter_start_time).total_seconds() / 60
if elapsed_minutes >= current_threshold:
# 达到时间要求,允许开仓
return long_base, short_base
else:
# 未达到时间要求,不允许开仓
return False, False
else:
# 开始时间为空,不允许开仓
return False, False
else:
# 条件不满足,重置状态
symbol_data.reset_time_filter()
return False, False
class EnvironmentFilter:
"""
环境过滤器
检查RSI和ATR燃料条件
"""
def __init__(self, config):
"""
初始化环境过滤器
Args:
config: 配置字典
"""
self.config = config
self.rsi_lower = config.get('rsi_lower', 30)
self.rsi_upper = config.get('rsi_upper', 70)
self.use_atr_fuel = config.get('use_atr_fuel', True)
self.atr_fuel_multiplier = config.get('atr_fuel_multiplier', 3.0)
def check_filters(self, symbol_data, close):
"""
检查环境过滤器
Args:
symbol_data: 标的物数据对象
close: 当前收盘价
Returns:
bool: 环境过滤器是否通过
"""
# RSI过滤
if symbol_data.prev_rsi is not None:
if symbol_data.prev_rsi <= self.rsi_lower or symbol_data.prev_rsi >= self.rsi_upper:
return False
# ATR燃料检查
# Pine: fuel_distance = math.abs(close - current_alma)
# Pine: fuel_check_passed = not use_atr_fuel or (fuel_distance >= atr_custom * atr_fuel_multiplier)
if (self.use_atr_fuel and
symbol_data.prev_atr is not None and
symbol_data.prev_alma_values['current'] is not None):
fuel_distance = abs(close - symbol_data.prev_alma_values['current'])
if fuel_distance < symbol_data.prev_atr * self.atr_fuel_multiplier:
return False
return True
class ATRPositionChecker:
"""
ATR位置检查器
区分回归和突破模式的ATR位置要求
"""
def __init__(self, config):
"""
初始化ATR位置检查器
Args:
config: 配置字典
"""
self.config = config
self.use_strict_atr_position = config.get('use_strict_atr_position', True)
def check_atr_position(self, symbol_data, close, long_base, short_base):
"""
检查ATR位置(区分回归和突破)
Args:
symbol_data: 标的物数据对象
close: 当前收盘价
long_base: 多头基础条件
short_base: 空头基础条件
Returns:
tuple: (long_signal, short_signal) - 最终的多空信号
"""
if not self.use_strict_atr_position:
return long_base, short_base
if symbol_data.prev_atr_upper is None or symbol_data.prev_atr_lower is None:
return False, False
# Pine Script逻辑:
# if use_reversal_signal (突破模式):
# strict_atr_long := close > atr_upper_band
# strict_atr_short := close < atr_lower_band
# else (回归模式):
# strict_atr_long := close < atr_lower_band
# strict_atr_short := close > atr_upper_band
if symbol_data.use_reversal_signal: # 突破模式(Pine的use_reversal_signal=true)
# 多头需价格在上轨之上,空头需价格在下轨之下
long_signal = long_base and close > symbol_data.prev_atr_upper
short_signal = short_base and close < symbol_data.prev_atr_lower
else: # 回归模式(Pine的use_reversal_signal=false)
# 多头需价格在下轨之下,空头需价格在上轨之上
long_signal = long_base and close < symbol_data.prev_atr_lower
short_signal = short_base and close > symbol_data.prev_atr_upper
return long_signal, short_signal
class CHOPAnalyzer:
"""
CHOP分析器
负责CHOP极端值分析和策略模式切换
"""
def __init__(self, config):
"""
初始化CHOP分析器
Args:
config: 配置字典
"""
self.config = config
self.chop_lower = config.get('chop_lower', 38.2)
self.chop_upper = config.get('chop_upper', 61.8)
self.chop_adaptive_mode = config.get('chop_adaptive_mode', 'default')
self.use_chop_adaptive = config.get('use_chop_adaptive', True)
def analyze_daily_chop(self, symbol_data, current_time, debug_mode=False):
"""
执行每日CHOP检查(UTC 01:01)
Args:
symbol_data: 标的物数据对象
current_time: 当前时间
debug_mode: 是否开启调试模式
Returns:
tuple: (strategy_changed, debug_info) - 策略是否改变和调试信息
"""
if not symbol_data.is_data_ready():
return False, "Data not ready"
# 检查是否已经在今天执行过
current_date = current_time.date()
if (symbol_data.last_signal_date is not None and
symbol_data.last_signal_date == current_date):
return False, "Already checked today"
# Pine Script逻辑:检查UTC 23:00-00:59时段的CHOP值
found_extreme = False
min_chop_found = 100.0
max_chop_found = 0.0
window_count = 0
debug_info = ""
# 需要存储历史CHOP值
if len(symbol_data.chop_history) > 0:
# 检查过去121分钟(从9:01回看到7:00)
for i in range(1, min(122, len(symbol_data.chop_history))):
# 获取历史时间
hist_time = current_time - timedelta(minutes=i)
hist_hour = hist_time.hour
# 只检查UTC 23:00-00:59时段(中国时间7:00-8:59)
if hist_hour == 23 or hist_hour == 0:
if i <= len(symbol_data.chop_history):
historical_chop = symbol_data.chop_history[-i]
else:
historical_chop = 50.0
window_count += 1
min_chop_found = min(min_chop_found, historical_chop)
max_chop_found = max(max_chop_found, historical_chop)
if (historical_chop < self.chop_lower or
historical_chop > self.chop_upper):
found_extreme = True
debug_info = f"UTC {hist_hour}:{hist_time.minute:02d} CHOP={historical_chop:.2f}"
# 保存极端CHOP标记
symbol_data.has_extreme_chop = found_extreme
# 根据CHOP自适应模式设置信号类型
old_strategy = symbol_data.use_reversal_signal
if self.chop_adaptive_mode == 'disabled':
# 禁用CHOP模式:全程使用回归逻辑
symbol_data.use_reversal_signal = False
elif self.chop_adaptive_mode == 'reverse':
# 反向模式(V7原始逻辑):极端值→回归,无极端值→突破
if found_extreme:
symbol_data.use_reversal_signal = False # 回归逻辑
else:
symbol_data.use_reversal_signal = True # 突破逻辑
else: # self.chop_adaptive_mode == 'default'
# V8修复:默认模式:极端值→突破,无极端值→回归
if found_extreme:
symbol_data.use_reversal_signal = True # 突破逻辑
else:
symbol_data.use_reversal_signal = False # 回归逻辑
# 标记已设置
symbol_data.daily_signal_set = True
symbol_data.last_signal_date = current_time.date()
strategy_changed = old_strategy != symbol_data.use_reversal_signal
# 构建调试信息
if debug_mode:
check_result = ""
if found_extreme and debug_info:
check_result = f"EXTREME FOUND: {debug_info}"
else:
check_result = f"NO EXTREME in window, checked {window_count} bars"
check_result += f"\nCHOP Range: [{min_chop_found:.2f} - {max_chop_found:.2f}]"
mode = "Breakout" if symbol_data.use_reversal_signal else "Reversion"
full_debug_info = f"{check_result}\nMode: {self.chop_adaptive_mode}\nStrategy: {mode}"
return strategy_changed, full_debug_info
return strategy_changed, ""
class SignalEngine:
"""
核心信号生成引擎
统一管理所有信号生成逻辑
"""
def __init__(self, config):
"""
初始化信号引擎
Args:
config: 配置字典
"""
self.config = config
# 初始化各个组件
self.base_condition_checker = BaseConditionChecker(config)
self.time_filter_manager = TimeFilterManager(config)
self.environment_filter = EnvironmentFilter(config)
self.atr_position_checker = ATRPositionChecker(config)
self.chop_analyzer = CHOPAnalyzer(config)
def generate_signal(self, symbol_data, bar, current_time):
"""
为单个标的物生成交易信号
Args:
symbol_data: 标的物数据对象
bar: 当前K线数据
current_time: 当前时间
Returns:
int: 交易信号 (1: 做多, -1: 做空, 0: 无信号)
"""
# 检查前值是否准备好
if not symbol_data.are_indicators_ready():
return 0
# 获取当前价格
close = float(bar.Close)
# 检查基础条件(C1-C4)
long_base, short_base = self.base_condition_checker.check_base_conditions(
symbol_data, close)
# 如果基础条件都不满足,直接返回
if not long_base and not short_base:
return 0
# 应用T1/T2时间过滤器(仅回归逻辑下生效)
long_base, short_base = self.time_filter_manager.apply_time_filter(
symbol_data, long_base, short_base, current_time)
# 如果时间过滤器阻止了信号,直接返回
if not long_base and not short_base:
return 0
# 检查环境过滤器
if not self.environment_filter.check_filters(symbol_data, close):
return 0
# 检查ATR位置(决定最终方向)
long_signal, short_signal = self.atr_position_checker.check_atr_position(
symbol_data, close, long_base, short_base)
# 检查策略是否激活
if not symbol_data.strategy_active:
return 0
# 返回最终信号
if long_signal:
return 1
elif short_signal:
return -1
else:
return 0
def execute_daily_chop_check(self, symbol_data, current_time, debug_mode=False):
"""
执行每日CHOP检查
Args:
symbol_data: 标的物数据对象
current_time: 当前时间
debug_mode: 是否开启调试模式
Returns:
tuple: (strategy_changed, debug_info)
"""
return self.chop_analyzer.analyze_daily_chop(symbol_data, current_time, debug_mode)
def initialize_symbol_strategy(self, symbol_data):
"""
初始化标的物的策略模式
Args:
symbol_data: 标的物数据对象
"""
# 默认使用回归信号,直到第一次CHOP检查完成
if self.config.get('use_chop_adaptive', True) and not symbol_data.daily_signal_set:
symbol_data.use_reversal_signal = False
symbol_data.has_extreme_chop = False
def get_signal_debug_info(self, symbol_data, signal, close):
"""
获取信号调试信息
Args:
symbol_data: 标的物数据对象
signal: 生成的信号
close: 当前价格
Returns:
dict: 调试信息字典
"""
# 安全计算ALMA差值
if (symbol_data.prev_alma_values['current'] is not None and
symbol_data.prev_alma_values['24h'] is not None):
alma_diff = MathUtils.calculate_percentage_diff(
symbol_data.prev_alma_values['current'],
symbol_data.prev_alma_values['24h']
)
else:
alma_diff = 0
# 安全检查ATR位置
if (symbol_data.prev_atr_upper is not None and
symbol_data.prev_atr_lower is not None):
if close > symbol_data.prev_atr_upper:
atr_position = 'Above Upper'
elif close < symbol_data.prev_atr_lower:
atr_position = 'Below Lower'
else:
atr_position = 'Inside Channel'
else:
atr_position = 'N/A'
strategy_type = "Breakout" if symbol_data.use_reversal_signal else "Reversion"
return {
'signal': signal,
'strategy_type': strategy_type,
'alma_diff': alma_diff,
'atr_position': atr_position,
'chop_value': symbol_data.prev_chop,
'rsi_value': symbol_data.prev_rsi,
'close': close
}# ====================================================================================================
# V8.5模块化拆分 - 数据结构模块 (symbol_data.py)
#
# 本模块包含:
# 1. PositionRecord类 - 仓位记录数据结构
# 2. SymbolData类 - 标的物完整数据和状态封装
# 3. 所有与单个标的物相关的数据管理
# ====================================================================================================
from AlgorithmImports import *
from datetime import datetime, timedelta
from collections import deque
import numpy as np
import math
class PineScriptRSI:
"""
Pine Script兼容的RSI实现
使用RMA(Running Moving Average)而非SMA来计算gain和loss的平滑值
完全复现Pine Script的ta.rsi()函数
"""
def __init__(self, length=14):
"""
初始化RSI计算器
Args:
length: RSI周期,默认14
"""
self.length = length
self.alpha = 1.0 / length # RMA的alpha值
# RMA状态管理
self.avg_gain = None
self.avg_loss = None
self.prev_price = None
self.is_ready = False
self.current_rsi = None
def update(self, price):
"""
更新RSI值
Args:
price: 当前价格
Returns:
RSI值(0-100范围),如果数据不足返回None
"""
if self.prev_price is None:
self.prev_price = price
return None
# 计算价格变化
delta = price - self.prev_price
gain = max(delta, 0.0)
loss = max(-delta, 0.0)
# 初始化或更新RMA
if self.avg_gain is None or self.avg_loss is None:
# 第一次计算,直接使用当前值
self.avg_gain = gain
self.avg_loss = loss
else:
# 使用RMA公式更新:RMA = (previous_RMA * (length-1) + current_value) / length
# 等价于:RMA = previous_RMA * (1-alpha) + current_value * alpha
self.avg_gain = self.avg_gain * (1 - self.alpha) + gain * self.alpha
self.avg_loss = self.avg_loss * (1 - self.alpha) + loss * self.alpha
# 计算RSI
if self.avg_loss > 0:
rs = self.avg_gain / self.avg_loss
self.current_rsi = 100 - (100 / (1 + rs))
else:
self.current_rsi = 100.0
self.prev_price = price
self.is_ready = True
return self.current_rsi
class HighPerformanceHMA:
"""
高性能Hull Moving Average实现
特性:
1. 完整实现HMA公式:HMA = WMA(2*WMA(n/2) - WMA(n), sqrt(n))
2. 增量式计算,避免重复计算
3. 权重预计算和缓存
4. O(1)时间复杂度的更新操作
5. 内存使用优化
与Pine Script ta.hma()完全一致
"""
def __init__(self, period):
"""
初始化HMA计算器
Args:
period: HMA周期
"""
self.period = period
self.half_period = int(period / 2)
self.sqrt_period = int(math.sqrt(period))
# 数据存储
self.data_window = deque(maxlen=period)
self.diff_series = deque(maxlen=self.sqrt_period)
# 权重预计算(一次性计算,重复使用)
self.wma_weights = self._precompute_weights(period)
self.wma_half_weights = self._precompute_weights(self.half_period)
self.wma_sqrt_weights = self._precompute_weights(self.sqrt_period)
# 增量计算状态
self.wma_half_sum = 0.0
self.wma_full_sum = 0.0
self.wma_sqrt_sum = 0.0
# 就绪状态
self.is_ready = False
self.current_value = None
def _precompute_weights(self, length):
"""预计算WMA权重系数"""
weights = []
weight_sum = 0
for i in range(length):
weight = length - i # 最新数据权重最大
weights.append(weight)
weight_sum += weight
# 归一化权重
return [w / weight_sum for w in weights]
def update(self, value):
"""
增量式更新HMA值
Args:
value: 新的价格数据点
Returns:
当前HMA值,如果数据不足返回None
"""
self.data_window.append(value)
if len(self.data_window) < self.period:
return self.current_value
# 计算WMA(n/2)和WMA(n)
wma_half = self._calculate_wma_incremental(
list(self.data_window)[-self.half_period:],
self.wma_half_weights
)
wma_full = self._calculate_wma_incremental(
list(self.data_window),
self.wma_weights
)
# 计算差值:2*WMA(n/2) - WMA(n)
diff_value = 2 * wma_half - wma_full
self.diff_series.append(diff_value)
# 当差值序列达到sqrt(n)长度时,计算最终HMA
if len(self.diff_series) >= self.sqrt_period:
self.current_value = self._calculate_wma_incremental(
list(self.diff_series),
self.wma_sqrt_weights
)
self.is_ready = True
return self.current_value
def _calculate_wma_incremental(self, data, weights):
"""
使用预计算权重快速计算WMA
Args:
data: 数据序列(最新数据在末尾)
weights: 预计算的权重序列
Returns:
WMA值
"""
if len(data) < len(weights):
return None
# 使用最新的len(weights)个数据点
data_slice = data[-len(weights):]
# 向量化计算:sum(data * weights)
weighted_sum = 0.0
for i in range(len(weights)):
# weights[0]对应最新数据,weights[-1]对应最旧数据
weighted_sum += data_slice[-(i+1)] * weights[i]
return weighted_sum
class PositionRecord:
"""
记录开仓信息,用于30分钟同向仓位监控
V8.5新增的风险控制数据结构
"""
def __init__(self, symbol, direction, open_time):
"""
初始化仓位记录
Args:
symbol: 标的物符号
direction: 仓位方向 (1: 多头, -1: 空头)
open_time: 开仓时间
"""
self.symbol = symbol
self.direction = direction
self.open_time = open_time
class SymbolData:
"""
封装每个标的物的完整数据和状态
重要说明:
- 每个标的物的数据(价格、指标值、信号状态等)都独立存储在此类中
- 每个标的物独立生成交易信号,独立管理持仓
- 主类中的参数是配置参数,所有标的物使用相同的策略参数配置
- 但每个标的物的实际运行数据是完全独立的
"""
def __init__(self, symbol, algorithm):
"""
初始化标的物数据
Args:
symbol: 标的物符号
algorithm: 主算法实例的引用
"""
self.symbol = symbol
self.algorithm = algorithm
# === 数据存储 ===
self.price_window = RollingWindow[float](2880) # 48小时数据
self.high_window = RollingWindow[float](2880)
self.low_window = RollingWindow[float](2880)
self.atr_window = RollingWindow[float](240) # 存储240个ATR值
# === 性能优化:数据缓存 ===
self._price_cache = []
self._high_cache = []
self._low_cache = []
self._cache_update_counter = 0
# === ALMA前值存储(避免未来函数)===
self.prev_alma_values = {
'current': None,
'4h': None,
'8h': None,
'12h': None,
'24h': None
}
# === 当前ALMA值(仅用于下一根K线)===
self.current_alma_values = {
'current': None,
'4h': None,
'8h': None,
'12h': None,
'24h': None
}
# === 其他指标前值 ===
self.prev_chop = None
self.prev_rsi = None
self.prev_atr = None
self.prev_atr_upper = None
self.prev_atr_lower = None
# === 当前指标值 ===
self.current_chop = None
self.current_rsi = None
self.current_atr = None
self.current_atr_upper = None
self.current_atr_lower = None
# === CHOP历史记录 ===
self.chop_history = deque(maxlen=1440) # 存储24小时的CHOP值
# === 状态管理 ===
self.use_reversal_signal = False # False=回归模式, True=突破模式
self.daily_signal_set = False # 标记当日信号是否已设置
self.last_signal_date = None # 记录上次设置信号的日期(使用date对象)
self.has_extreme_chop = False # 记录当日是否有极端CHOP值
# === 持仓管理 ===
self.position_entry_time = None
self.position_entry_price = None
self.position_tp = None
self.position_sl = None
self.bars_since_entry = 0
self.timer_triggered = False
# === 风控状态 ===
self.last_stop_loss_time = None
self.sl_count = 0
self.last_trade_profit = True
self.strategy_active = True # 该标的物是否激活交易
# === T1/T2时间过滤器状态(V8新增)===
self.time_filter_start_time = None # 条件开始满足的时间
self.time_filter_conditions_met = False # 基础条件是否满足
self.current_time_threshold = None # 当前使用的时间阈值(T1或T2)
self.using_t1_filter = True # 是否使用T1过滤器(首次使用T1)
# === ALMA间距存储(用于止盈止损计算)===
self.long_profit_space = 0
self.short_profit_space = 0
# === ATR RMA状态(用于CHOP计算)===
self.atr_rma = None
# === V6新增:高性能HMA实例 ===
# 为每个需要HMA的场景创建独立的高性能HMA实例
self.hma_atr_band = HighPerformanceHMA(240) # 用于ATR通道中心线(4小时)
self.hma_atr_smooth = HighPerformanceHMA(1440) # 用于ATR平滑(24小时)
# === V7新增:Pine Script兼容的RSI实例 ===
# 使用修复后的RSI实现,确保与Pine Script ta.rsi()完全一致
self.rsi_calculator = PineScriptRSI(14) # 使用14周期RSI
def update_data_windows(self, bar):
"""
更新数据窗口
Args:
bar: K线数据
"""
self.price_window.Add(float(bar.Close))
self.high_window.Add(float(bar.High))
self.low_window.Add(float(bar.Low))
# 更新缓存计数器(每10根K线更新一次缓存)
self._cache_update_counter += 1
if self._cache_update_counter >= 10:
self._update_data_cache()
self._cache_update_counter = 0
def _update_data_cache(self):
"""更新数据缓存(性能优化)"""
# 批量提取数据到列表以减少后续访问开销
alma_length = 1440 # 从algorithm获取,这里硬编码以避免循环引用
chop_length = 240
if self.price_window.Count >= alma_length:
self._price_cache = [self.price_window[i] for i in range(min(alma_length + 500, self.price_window.Count))]
self._high_cache = [self.high_window[i] for i in range(min(chop_length, self.high_window.Count))]
self._low_cache = [self.low_window[i] for i in range(min(chop_length, self.low_window.Count))]
def update_previous_values(self):
"""更新前值(避免未来函数)"""
# 更新ALMA前值
for key in ['current', '4h', '8h', '12h', '24h']:
if self.current_alma_values[key] is not None:
self.prev_alma_values[key] = self.current_alma_values[key]
# 更新其他指标前值
self.prev_chop = self.current_chop
self.prev_rsi = self.current_rsi
self.prev_atr = self.current_atr
self.prev_atr_upper = self.current_atr_upper
self.prev_atr_lower = self.current_atr_lower
def reset_position_vars(self):
"""重置持仓相关变量(V8版本)"""
self.position_entry_time = None
self.position_entry_price = None
self.position_tp = None
self.position_sl = None
self.bars_since_entry = 0
self.timer_triggered = False
# 重置时间过滤器状态
self.reset_time_filter()
def reset_time_filter(self):
"""重置时间过滤器状态(V8新增)"""
self.time_filter_start_time = None
self.time_filter_conditions_met = False
self.current_time_threshold = None
def update_time_filter_state(self, is_profitable):
"""根据交易结果更新时间过滤器状态(V8新增)"""
if is_profitable:
# 盈利后切换到T2
self.using_t1_filter = False
else:
# 亏损后切回T1
self.using_t1_filter = True
def are_indicators_ready(self):
"""检查指标是否准备好"""
# 检查ALMA前值
alma_offsets = ['current', '4h', '8h', '12h', '24h']
for key in alma_offsets:
if self.prev_alma_values[key] is None:
return False
# 检查其他指标 - 允许CHOP为None(初始阶段)
return (self.prev_rsi is not None and
self.prev_atr is not None)
def is_data_ready(self):
"""检查数据窗口是否准备好"""
return self.price_window.IsReady
def get_current_price(self):
"""获取当前价格"""
if self.price_window.Count > 0:
return self.price_window[0]
return None
def get_price_list(self, length):
"""获取指定长度的价格列表"""
if self.price_window.Count < length:
return None
# 优先使用缓存数据
if self._price_cache and len(self._price_cache) >= length:
return self._price_cache[:length]
else:
# 降级到原始方法
prices = []
for i in range(length):
if i < self.price_window.Count:
prices.append(self.price_window[i])
return prices if len(prices) == length else None
def get_high_low_list(self, length):
"""获取指定长度的高低价列表"""
if self.high_window.Count < length or self.low_window.Count < length:
return None, None
# 优先使用缓存数据
if (self._high_cache and self._low_cache and
len(self._high_cache) >= length and len(self._low_cache) >= length):
return self._high_cache[:length], self._low_cache[:length]
else:
# 降级到原始方法
highs = [self.high_window[i] for i in range(length)]
lows = [self.low_window[i] for i in range(length)]
return highs, lows
def calculate_true_range(self, index):
"""计算指定索引的True Range值"""
if index + 1 >= self.price_window.Count:
return None
high = self.high_window[index]
low = self.low_window[index]
prev_close = self.price_window[index + 1]
# Pine: tr = math.max(high - low, math.abs(high - close[1]), math.abs(low - close[1]))
tr = max(
high - low,
abs(high - prev_close),
abs(low - prev_close)
)
return tr
def is_in_cooldown(self, current_time, cooldown_minutes=60):
"""检查是否在冷却期内"""
# Pine: cooldown_check = last_trade_profit or (not last_trade_profit and (time - last_close_time) >= cooldown_period * 60 * 1000)
if self.last_trade_profit:
return False # 盈利后无需冷却
if self.last_stop_loss_time is None:
return False
cooldown_elapsed = (current_time - self.last_stop_loss_time).total_seconds() / 60
return cooldown_elapsed < cooldown_minutes
def should_force_close_by_time(self, current_time, max_hours=24):
"""
V8.5新增:检查是否应该根据时间强制平仓
Args:
current_time: 当前时间
max_hours: 最大持仓时间(小时)
Returns:
bool: True表示应该强制平仓
"""
if self.position_entry_time is None:
return False
hours_elapsed = (current_time - self.position_entry_time).total_seconds() / 3600
return hours_elapsed >= max_hours
def calculate_position_profit_percent(self, current_price):
"""计算持仓盈亏百分比"""
if self.position_entry_price is None or self.position_entry_price == 0:
return 0
# 需要从Portfolio判断是否为多头,这里简化处理
# 在实际使用时需要传入持仓方向
return (current_price - self.position_entry_price) / self.position_entry_price * 100
def get_alma_spacing_for_stops(self, signal):
"""获取用于止盈止损计算的ALMA间距"""
if signal == 1: # 做多
return self.long_profit_space
else: # 做空
return self.short_profit_space
def set_alma_spacing_for_stops(self, signal, spacing):
"""设置用于止盈止损计算的ALMA间距"""
if signal == 1: # 做多
self.long_profit_space = spacing
else: # 做空
self.short_profit_space = spacing# ====================================================================================================
# V8.5模块化拆分 - 工具函数模块 (utils.py)
#
# 本模块包含:
# 1. 时间管理工具函数
# 2. 数学计算工具函数
# 3. 日志记录工具函数
# 4. 通用验证工具函数
# ====================================================================================================
from AlgorithmImports import *
from datetime import datetime, timedelta
import numpy as np
import math
class TimeUtils:
"""时间相关的工具函数"""
@staticmethod
def is_in_forbidden_time(current_time, forbidden_start_hour=23, forbidden_end_hour=1):
"""
检查是否在禁止交易时间
Args:
current_time: 当前时间
forbidden_start_hour: 禁止开始时间(UTC小时)
forbidden_end_hour: 禁止结束时间(UTC小时)
Returns:
bool: True表示在禁止时间内
"""
hour = current_time.hour
return forbidden_start_hour <= hour or hour < forbidden_end_hour
@staticmethod
def get_china_time(utc_time):
"""
将UTC时间转换为中国时间(UTC+8)
Args:
utc_time: UTC时间
Returns:
datetime: 中国时间
"""
return utc_time + timedelta(hours=8)
@staticmethod
def is_in_cooldown(last_stop_loss_time, current_time, cooldown_minutes=60):
"""
检查是否在冷却期内
Args:
last_stop_loss_time: 上次止损时间
current_time: 当前时间
cooldown_minutes: 冷却时间(分钟)
Returns:
bool: True表示在冷却期内
"""
if last_stop_loss_time is None:
return False
cooldown_elapsed = (current_time - last_stop_loss_time).total_seconds() / 60
return cooldown_elapsed < cooldown_minutes
class MathUtils:
"""数学计算相关的工具函数"""
@staticmethod
def calculate_percentage_diff(value1, value2):
"""
计算两个值之间的百分比差异
Args:
value1: 值1
value2: 值2(作为基准)
Returns:
float: 百分比差异
"""
if value2 == 0:
return 0
return abs(value1 - value2) / value2
@staticmethod
def safe_divide(numerator, denominator, default_value=0):
"""
安全除法,避免除零错误
Args:
numerator: 分子
denominator: 分母
default_value: 除零时的默认值
Returns:
float: 除法结果或默认值
"""
if denominator == 0:
return default_value
return numerator / denominator
@staticmethod
def clamp(value, min_value, max_value):
"""
将值限制在指定范围内
Args:
value: 待限制的值
min_value: 最小值
max_value: 最大值
Returns:
float: 限制后的值
"""
return max(min_value, min(value, max_value))
class ValidationUtils:
"""验证相关的工具函数"""
@staticmethod
def is_valid_price(price):
"""
验证价格是否有效
Args:
price: 价格值
Returns:
bool: True表示价格有效
"""
return price is not None and price > 0 and not math.isnan(price) and not math.isinf(price)
@staticmethod
def is_valid_indicator_value(value):
"""
验证指标值是否有效
Args:
value: 指标值
Returns:
bool: True表示指标值有效
"""
return value is not None and not math.isnan(value) and not math.isinf(value)
@staticmethod
def is_valid_order_size(securities, symbol, quantity):
"""
验证订单大小是否满足交易所要求
Args:
securities: 证券对象集合
symbol: 交易标的
quantity: 订单数量
Returns:
bool: True表示订单大小有效
"""
try:
crypto = securities[symbol]
order_value = abs(crypto.Price * quantity * crypto.SymbolProperties.ContractMultiplier)
return order_value > 10 # Binance最小订单价值(USDT)
except:
return False
class LoggingUtils:
"""日志记录相关的工具函数"""
@staticmethod
def format_entry_log(symbol_data, direction, price, strategy_type, alma_diff,
atr_position, chop_value, rsi_value, tp_value, sl_value,
current_time, debug_info=None):
"""
格式化开仓日志信息
Args:
symbol_data: 标的物数据
direction: 交易方向
price: 开仓价格
strategy_type: 策略类型
alma_diff: ALMA差值
atr_position: ATR位置
chop_value: CHOP值
rsi_value: RSI值
tp_value: 止盈价格
sl_value: 止损价格
current_time: 当前时间
debug_info: 额外的调试信息
Returns:
str: 格式化的日志信息
"""
china_time = TimeUtils.get_china_time(current_time)
# 安全格式化数值
alma_diff_str = f"{alma_diff:.2%}" if ValidationUtils.is_valid_indicator_value(alma_diff) else "N/A"
chop_str = f"{chop_value:.2f}" if ValidationUtils.is_valid_indicator_value(chop_value) else "N/A"
rsi_str = f"{rsi_value:.2f}" if ValidationUtils.is_valid_indicator_value(rsi_value) else "N/A"
tp_str = f"{tp_value:.4f}" if ValidationUtils.is_valid_price(tp_value) else "N/A"
sl_str = f"{sl_value:.4f}" if ValidationUtils.is_valid_price(sl_value) else "N/A"
tp_type = 'Fixed' if strategy_type == 'Reversion' else 'Dynamic'
sl_type = 'Dynamic' if strategy_type == 'Reversion' else 'Fixed'
log_message = f"""
=== {symbol_data.symbol} {direction} Entry (V8.5 Modular) ===
Time: {china_time.strftime('%Y-%m-%d %H:%M')} (UTC+8)
Price: {price:.4f}
Strategy: {strategy_type}
ALMA Diff: {alma_diff_str}
ATR Position: {atr_position}
CHOP: {chop_str}
RSI: {rsi_str} (Pine Script Compatible)
TP: {tp_str} ({tp_type})
SL: {sl_str} ({sl_type})"""
if debug_info:
log_message += f"\n{debug_info}"
return log_message
@staticmethod
def format_exit_log(symbol_data, reason, price, entry_price, current_time):
"""
格式化平仓日志信息
Args:
symbol_data: 标的物数据
reason: 平仓原因
price: 平仓价格
entry_price: 开仓价格
current_time: 当前时间
Returns:
str: 格式化的日志信息
"""
if entry_price is None:
return f"{symbol_data.symbol}: Exit ({reason}) - Entry price not available"
china_time = TimeUtils.get_china_time(current_time)
pnl = price - entry_price # 假设为多头,实际使用时需要根据方向调整
pnl_percent = (pnl / entry_price) * 100 if entry_price != 0 else 0
return f"""
=== {symbol_data.symbol} Exit ({reason}) (V8.5 Modular) ===
Time: {china_time.strftime('%Y-%m-%d %H:%M')} (UTC+8)
Price: {price:.4f}
PnL: {pnl:.4f} ({pnl_percent:.2f}%)
"""
class RiskControlUtils:
"""风险控制相关的工具函数"""
@staticmethod
def calculate_same_direction_ratio(position_records, intended_direction):
"""
计算同向仓位比例
Args:
position_records: 仓位记录列表
intended_direction: 拟开仓方向 (1: 多头, -1: 空头)
Returns:
tuple: (same_direction_count, total_count, ratio)
"""
if not position_records:
return 0, 0, 0.0
same_direction_count = sum(1 for record in position_records
if record.direction == intended_direction)
total_count = len(position_records)
ratio = same_direction_count / total_count if total_count > 0 else 0.0
return same_direction_count, total_count, ratio
@staticmethod
def should_block_same_direction(position_records, intended_direction,
min_positions=3, threshold=0.8):
"""
判断是否应该阻止同向开仓
Args:
position_records: 仓位记录列表
intended_direction: 拟开仓方向
min_positions: 最低基数要求
threshold: 同向比例阈值
Returns:
bool: True表示应该阻止开仓
"""
if len(position_records) < min_positions:
return False # 未达到最低基数,不阻止
_, _, ratio = RiskControlUtils.calculate_same_direction_ratio(
position_records, intended_direction)
return ratio >= threshold
@staticmethod
def should_force_close_by_time(entry_time, current_time, max_hours=24):
"""
判断是否应该根据时间强制平仓
Args:
entry_time: 开仓时间
current_time: 当前时间
max_hours: 最大持仓时间(小时)
Returns:
bool: True表示应该强制平仓
"""
if entry_time is None:
return False
hours_elapsed = (current_time - entry_time).total_seconds() / 3600
return hours_elapsed >= max_hours
class PositionSizeUtils:
"""仓位大小计算相关的工具函数"""
@staticmethod
def calculate_futures_position_size(position_value_usdt, price, leverage=1):
"""
计算期货仓位大小
Args:
position_value_usdt: 仓位价值(USDT)
price: 当前价格
leverage: 杠杆倍数
Returns:
float: 合约数量
"""
if price <= 0:
return 0
# 合约价值 = 保证金 * 杠杆
total_value = position_value_usdt * leverage
contracts = total_value / price
return contracts
@staticmethod
def calculate_position_profit_percent(entry_price, current_price, is_long=True):
"""
计算持仓盈亏百分比
Args:
entry_price: 开仓价格
current_price: 当前价格
is_long: 是否为多头仓位
Returns:
float: 盈亏百分比
"""
if entry_price is None or entry_price == 0:
return 0
if is_long:
return (current_price - entry_price) / entry_price * 100
else:
return (entry_price - current_price) / entry_price * 100