| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363 |
- """
- 三龙出海主控引擎 (Dragon Rotation Engine)
- 整合多品种数据、品种选择、信号生成、仓位管理、风控体系
- 实现每日交易循环:数据更新 → 品种选择 → 信号生成 → 仓位调整 → 风控检查
- """
- from typing import Dict, List, Optional, Any
- from datetime import datetime, timedelta
- import pandas as pd
- import numpy as np
- from core.ecosystem import EcosystemFusion, UnifiedEcosystem
- from .data_loader import MultiAssetDataLoader
- from .selector import DragonSelector, SymbolScore
- from .signal_engine import UnifiedSignalEngine, SignalResult
- from .position_manager import PositionManager
- from .risk_manager import MultiLayerRiskManager, RiskCheckResult, RiskStatus
- class DragonRotationEngine:
- """
- 三龙出海多品种轮动引擎
- 核心流程:
- 1. 每日更新三品种数据
- 2. DragonSelector选择最优品种
- 3. UnifiedSignalEngine生成交易信号
- 4. PositionManager计算建议仓位
- 5. RiskManager执行风控检查
- 6. 执行交易并记录
- """
- def __init__(
- self,
- initial_capital: float = 1_000_000,
- score_threshold: float = 60,
- data_dir: str = "quant"
- ):
- self.initial_capital = initial_capital
- self.current_capital = initial_capital
- self.score_threshold = score_threshold
- # 初始化各模块
- self.data_loader = MultiAssetDataLoader(data_dir)
- self.ecosystem_fusion = EcosystemFusion()
- self.selector = DragonSelector(score_threshold=score_threshold)
- self.signal_engine = UnifiedSignalEngine()
- self.position_manager = PositionManager()
- self.risk_manager = MultiLayerRiskManager()
- # 当前状态
- self.current_symbol: Optional[str] = None
- self.current_position: float = 0.0
- self.in_position: bool = False
- # 数据缓存
- self.data_dict: Dict[str, pd.DataFrame] = {}
- # 交易记录
- self.trades: List[Dict] = []
- self.daily_stats: List[Dict] = []
- def initialize(self, start_date: Optional[str] = None):
- """初始化引擎,加载历史数据"""
- print("Initializing Dragon Rotation Engine...")
- # 加载三品种数据
- self.data_dict = self.data_loader.load_all_data(start_date)
- if len(self.data_dict) < 3:
- raise ValueError(f"Failed to load data. Only {len(self.data_dict)} symbols loaded.")
- print(f"Loaded data for: {list(self.data_dict.keys())}")
- # 初始化风险管理
- self.risk_manager.initialize(self.initial_capital, datetime.now())
- print("Engine initialized successfully.")
- def run_cycle(
- self,
- current_date: Optional[datetime] = None,
- verbose: bool = False
- ) -> Dict[str, Any]:
- """
- 执行一个交易周期
- Args:
- current_date: 当前日期(回测用)
- verbose: 是否打印详细信息
- Returns:
- Dict: 周期结果
- """
- if current_date is None:
- current_date = datetime.now()
- result = {
- "date": current_date,
- "symbol": None,
- "signal": "neutral",
- "position": 0.0,
- "executed": False,
- "reason": ""
- }
- try:
- # 1. 获取三品种当前数据窗口
- symbol_data = {}
- for symbol in self.data_dict.keys():
- df = self.data_loader.get_symbol_data_at_date(
- self.data_dict, symbol, current_date, lookback=80
- )
- if df is not None and len(df) >= 60:
- symbol_data[symbol] = df
- if len(symbol_data) < 3:
- result["reason"] = "Insufficient data"
- return result
- # 2. 识别当前生态
- # 使用第一个品种的数据进行生态识别
- first_symbol = list(symbol_data.keys())[0]
- ecosystem = self.ecosystem_fusion.fuse(
- price_data=symbol_data[first_symbol],
- tick_data=None
- )
- # 3. 品种选择
- selected_symbol, score_details = self.selector.select(
- symbol_data, ecosystem, current_date
- )
- if selected_symbol is None:
- # 空仓
- if self.in_position:
- # 平仓
- self._close_position(current_date, "no_selection")
- result["reason"] = "No symbol meets threshold"
- return result
- result["symbol"] = selected_symbol
- # 4. 检查是否需要切换品种
- if self.current_symbol is not None and self.current_symbol != selected_symbol:
- # 切换品种
- self._switch_symbol(self.current_symbol, selected_symbol, current_date)
- self.current_symbol = selected_symbol
- # 5. 生成交易信号
- signal = self.signal_engine.generate_signal(
- symbol_data[selected_symbol],
- current_date
- )
- result["signal"] = signal.signal
- # 6. 风控检查
- if self.in_position:
- # 更新持仓价格
- current_price = symbol_data[selected_symbol]['close'].iloc[-1]
- self.risk_manager.update_position(selected_symbol, current_price)
- # 检查风险
- risk_check = self.risk_manager.check_risk(selected_symbol, current_date)
- if risk_check.should_close_position:
- self._close_position(current_date, risk_check.message)
- result["reason"] = risk_check.message
- return result
- if risk_check.should_reduce_position:
- self._reduce_position(
- selected_symbol,
- risk_check.reduction_pct,
- current_date
- )
- # 7. 入场信号处理
- if signal.signal == "enter_long" and not self.in_position:
- # 计算仓位
- volatility = score_details.volatility_60d if score_details else 0.25
- position_size = self.position_manager.calculate_position(
- selected_symbol,
- signal.confidence,
- volatility,
- current_date
- )
- if position_size > 0:
- # 执行入场
- self._open_position(
- selected_symbol,
- position_size,
- symbol_data[selected_symbol]['close'].iloc[-1],
- current_date
- )
- result["position"] = position_size
- result["executed"] = True
- result["reason"] = "Entry signal"
- elif signal.signal == "exit" and self.in_position:
- # 出场信号
- self._close_position(current_date, "signal_exit")
- result["reason"] = "Exit signal"
- elif signal.signal == "hold" and self.in_position:
- # 持仓中,检查再平衡
- if self.position_manager.should_rebalance(current_date):
- self.position_manager.rebalance(symbol_data, current_date)
- # 8. 更新权益
- self._update_equity(current_date)
- # 记录每日统计
- self.daily_stats.append({
- "date": current_date,
- "symbol": selected_symbol,
- "score": score_details.total_score if score_details else 0,
- "signal": signal.signal,
- "position": self.current_position,
- "equity": self.current_capital
- })
- if verbose:
- print(f"[{current_date.strftime('%Y-%m-%d')}] "
- f"Symbol: {selected_symbol}, Signal: {signal.signal}, "
- f"Position: {self.current_position:.2%}")
- except Exception as e:
- result["reason"] = f"Error: {str(e)}"
- print(f"Error in cycle: {e}")
- return result
- def _open_position(
- self,
- symbol: str,
- position_size: float,
- price: float,
- date: datetime
- ):
- """开仓"""
- self.in_position = True
- self.current_position = position_size
- # 注册到风险管理
- self.risk_manager.register_position(symbol, price, position_size, date)
- # 记录交易
- self.trades.append({
- "date": date,
- "action": "open",
- "symbol": symbol,
- "price": price,
- "position": position_size,
- "equity": self.current_capital
- })
- def _close_position(self, date: datetime, reason: str):
- """平仓"""
- if not self.in_position:
- return
- # 记录交易
- self.trades.append({
- "date": date,
- "action": "close",
- "symbol": self.current_symbol,
- "reason": reason,
- "position": self.current_position,
- "equity": self.current_capital
- })
- # 清理状态
- self.in_position = False
- self.current_position = 0.0
- self.risk_manager.close_position(self.current_symbol)
- self.signal_engine._reset_position()
- def _reduce_position(self, symbol: str, reduction_pct: float, date: datetime):
- """减仓"""
- new_size = self.position_manager.reduce_position(symbol, reduction_pct)
- self.current_position = new_size
- self.trades.append({
- "date": date,
- "action": "reduce",
- "symbol": symbol,
- "reduction": reduction_pct,
- "new_position": new_size
- })
- def _switch_symbol(self, old_symbol: str, new_symbol: str, date: datetime):
- """切换品种"""
- # 先平旧仓位
- if self.in_position:
- self._close_position(date, f"switch_to_{new_symbol}")
- # 重置信号引擎状态
- self.signal_engine._reset_position()
- def _update_equity(self, date: datetime):
- """更新账户权益"""
- # 简化计算:实际权益更新在回测引擎中处理
- self.risk_manager.update_equity(self.current_capital, date)
- def get_performance_summary(self) -> Dict[str, Any]:
- """获取绩效摘要"""
- if not self.daily_stats:
- return {}
- equity_curve = [s["equity"] for s in self.daily_stats]
- # 计算收益率
- total_return = (equity_curve[-1] - self.initial_capital) / self.initial_capital
- # 计算最大回撤
- peak = self.initial_capital
- max_drawdown = 0.0
- for equity in equity_curve:
- if equity > peak:
- peak = equity
- drawdown = (peak - equity) / peak
- max_drawdown = max(max_drawdown, drawdown)
- # 计算年化收益
- days = len(self.daily_stats)
- annual_return = (1 + total_return) ** (252 / days) - 1 if days > 0 else 0
- # 计算波动率
- daily_returns = [(equity_curve[i] - equity_curve[i-1]) / equity_curve[i-1]
- for i in range(1, len(equity_curve))]
- volatility = np.std(daily_returns) * np.sqrt(252) if daily_returns else 0
- # 计算夏普比率
- sharpe = (annual_return - 0.03) / volatility if volatility > 0 else 0
- return {
- "total_return": total_return,
- "annual_return": annual_return,
- "max_drawdown": max_drawdown,
- "volatility": volatility,
- "sharpe_ratio": sharpe,
- "total_trades": len([t for t in self.trades if t["action"] == "open"]),
- "days": days
- }
- def reset(self):
- """重置引擎状态"""
- self.current_capital = self.initial_capital
- self.current_symbol = None
- self.current_position = 0.0
- self.in_position = False
- self.trades = []
- self.daily_stats = []
- self.selector.current_symbol = None
- self.signal_engine._reset_position()
- self.position_manager = PositionManager()
- self.risk_manager = MultiLayerRiskManager()
- self.risk_manager.initialize(self.initial_capital, datetime.now())
|