#!/usr/bin/env python3 """ 量化交易系统 - 100万资金管理 策略:可转债双低 + 小市值动量 + 高股息防御 作者:Kimi Claw 日期:2026-03-07 """ import pandas as pd import numpy as np import akshare as ak from datetime import datetime, timedelta import json import logging import os from typing import List, Dict, Tuple import warnings warnings.filterwarnings('ignore') # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('quant_trade.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class PortfolioManager: """组合管理器 - 总资金100万分配""" def __init__(self, total_capital: float = 1000000): self.total_capital = total_capital self.allocations = { 'convertible_bond': 0.40, # 可转债40万 'small_cap_momentum': 0.30, # 小市值动量30万 'high_dividend': 0.20, # 高股息20万 'cash': 0.10 # 现金10万 } self.positions = {} # 当前持仓 self.trade_history = [] # 交易记录 def get_strategy_capital(self, strategy_name: str) -> float: """获取指定策略的资金额度""" return self.total_capital * self.allocations.get(strategy_name, 0) def record_trade(self, strategy: str, action: str, code: str, name: str, price: float, shares: int, reason: str = ""): """记录交易""" trade = { 'datetime': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'strategy': strategy, 'action': action, # BUY/SELL 'code': code, 'name': name, 'price': price, 'shares': shares, 'amount': price * shares, 'reason': reason } self.trade_history.append(trade) logger.info(f"[交易记录] {action} {name}({code}): {shares}股 @ {price:.2f}") def save_state(self): """保存组合状态""" state = { 'total_capital': self.total_capital, 'allocations': self.allocations, 'positions': self.positions, 'trade_history': self.trade_history, 'update_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } with open('portfolio_state.json', 'w', encoding='utf-8') as f: json.dump(state, f, ensure_ascii=False, indent=2) logger.info("组合状态已保存") class ConvertibleBondStrategy: """可转债双低策略 - 40万资金""" def __init__(self, capital: float = 400000): self.capital = capital self.position_limit = 20 # 最多持有20只 self.single_limit = capital / self.position_limit # 每只2万 self.stop_loss = -0.08 # 8%止损 self.take_profit = 0.15 # 15%止盈 def get_candidates(self) -> pd.DataFrame: """获取候选可转债""" try: # 获取可转债数据 df = ak.bond_cb_jsl(cookie="") # 筛选条件 df['bond_price'] = pd.to_numeric(df['价格'], errors='coerce') df['premium_rate'] = pd.to_numeric(df['溢价率'].str.replace('%', ''), errors='coerce') df['remain_year'] = pd.to_numeric(df['剩余年限'], errors='coerce') # 双低分数 = 价格 + 溢价率 df['double_low_score'] = df['bond_price'] + df['premium_rate'] # 过滤条件 mask = ( (df['bond_price'] < 115) & # 价格低于115 (df['premium_rate'] < 30) & # 溢价率低于30% (df['remain_year'] > 1) & # 剩余期限大于1年 (df['评级'].isin(['AAA', 'AA+', 'AA', 'AA-'])) # 评级 ) candidates = df[mask].copy() candidates = candidates.nsmallest(30, 'double_low_score') logger.info(f"可转债候选池: {len(candidates)}只") return candidates[['代码', '名称', 'bond_price', 'premium_rate', 'double_low_score', '转股价值', '剩余规模']] except Exception as e: logger.error(f"获取可转债数据失败: {e}") return pd.DataFrame() def generate_signals(self, current_positions: List[str]) -> Tuple[List[Dict], List[str]]: """ 生成交易信号 Returns: (buy_list, sell_list) """ candidates = self.get_candidates() if candidates.empty: return [], [] buy_list = [] sell_list = [] # 目标持仓(前20只) target_codes = candidates.head(self.position_limit)['代码'].tolist() # 需要卖出的:不在目标列表中的当前持仓 for code in current_positions: if code not in target_codes: sell_list.append(code) # 需要买入的:目标列表中不在当前持仓的 for _, row in candidates.head(self.position_limit).iterrows(): if row['代码'] not in current_positions: buy_list.append({ 'code': row['代码'], 'name': row['名称'], 'price': row['bond_price'], 'amount': self.single_limit }) return buy_list, sell_list class SmallCapMomentumStrategy: """小市值动量策略 - 30万资金""" def __init__(self, capital: float = 300000): self.capital = capital self.position_limit = 10 # 最多持有10只 self.single_limit = capital / self.position_limit # 每只3万 self.stop_loss = -0.08 # 8%止损 self.market_filter = True # 启用市场过滤器 def get_all_stocks(self) -> pd.DataFrame: """获取全市场股票列表""" try: df = ak.stock_zh_a_spot_em() df['market_cap'] = df['总市值'].astype(float) / 1e8 # 转为亿 df['turnover'] = df['成交额'].astype(float) / 1e4 # 转为万 df['change_20d'] = df['20日涨跌幅'].astype(float) # 过滤条件 mask = ( (df['market_cap'] < 50) & # 市值小于50亿 (df['turnover'] > 1000) & # 成交额大于1000万 (~df['名称'].str.contains('ST|退', na=False)) & # 排除ST和退市 (~df['代码'].str.startswith('68')) & # 排除科创板 (~df['代码'].str.startswith('8')) & # 排除北交所 (~df['代码'].str.startswith('4')) & # 排除新三板 (df['change_20d'] > -20) # 排除暴跌股 ) return df[mask].copy() except Exception as e: logger.error(f"获取股票数据失败: {e}") return pd.DataFrame() def market_trend_ok(self) -> bool: """检查市场趋势 - 中证1000是否站上20日均线""" try: # 获取中证1000指数 df = ak.index_zh_a_hist(symbol="000852", period="daily") if len(df) < 30: return True # 数据不足,默认允许交易 df['ma20'] = df['close'].rolling(20).mean() latest_close = df['close'].iloc[-1] latest_ma20 = df['ma20'].iloc[-1] is_bullish = latest_close > latest_ma20 logger.info(f"中证1000: 收盘价{latest_close:.2f}, MA20:{latest_ma20:.2f}, 趋势:{'多头' if is_bullish else '空头'}") return is_bullish except Exception as e: logger.error(f"获取指数数据失败: {e}") return True def get_candidates(self) -> pd.DataFrame: """获取候选股票(20日涨幅排名)""" df = self.get_all_stocks() if df.empty: return df # 按20日涨幅排序,取前200 df = df.nlargest(200, 'change_20d') logger.info(f"小市值动量候选池: {len(df)}只") return df[['代码', '名称', 'market_cap', 'change_20d', 'turnover', '最新价']] def generate_signals(self, current_positions: List[str]) -> Tuple[List[Dict], List[str], bool]: """ 生成交易信号 Returns: (buy_list, sell_list, market_ok) """ market_ok = self.market_trend_ok() if not market_ok: logger.warning("市场趋势不佳,建议减仓或清仓") # 返回空买入列表,卖出所有持仓 return [], current_positions, False candidates = self.get_candidates() if candidates.empty: return [], [], True buy_list = [] sell_list = [] # 目标持仓(前10只) target_codes = candidates.head(self.position_limit)['代码'].tolist() # 需要卖出的 for code in current_positions: if code not in target_codes: sell_list.append(code) # 需要买入的 for _, row in candidates.head(self.position_limit).iterrows(): if row['代码'] not in current_positions: buy_list.append({ 'code': row['代码'], 'name': row['名称'], 'price': row['最新价'], 'amount': self.single_limit }) return buy_list, sell_list, True class HighDividendStrategy: """高股息防御策略 - 20万资金""" def __init__(self, capital: float = 200000): self.capital = capital self.target_dividend_yield = 0.05 # 目标股息率5% self.holdings = [] # 当前持仓 # 核心标的池(历史高股息+稳定) self.core_pool = [ {'code': '600900', 'name': '长江电力', 'sector': '水电'}, {'code': '601088', 'name': '中国神华', 'sector': '煤炭'}, {'code': '601288', 'name': '农业银行', 'sector': '银行'}, {'code': '601006', 'name': '大秦铁路', 'sector': '交运'}, {'code': '600377', 'name': '宁沪高速', 'sector': '高速'}, {'code': '600887', 'name': '伊利股份', 'sector': '消费'}, {'code': '000895', 'name': '双汇发展', 'sector': '食品'}, {'code': '600048', 'name': '保利发展', 'sector': '地产'}, ] def get_dividend_data(self, code: str) -> Dict: """获取个股股息数据""" try: # 获取历史分红数据 df = ak.stock_dividend_cninfo(symbol=code) if df.empty: return {'dividend_yield': 0, 'years': 0} # 计算平均股息率(近3年) recent = df.head(3) avg_yield = recent['股息率'].mean() if '股息率' in recent.columns else 0 return { 'dividend_yield': avg_yield, 'years': len(df) } except Exception as e: logger.error(f"获取{code}股息数据失败: {e}") return {'dividend_yield': 0, 'years': 0} def screen_stocks(self) -> List[Dict]: """筛选高股息股票""" results = [] for stock in self.core_pool: try: # 获取实时价格 df = ak.stock_zh_a_spot_em() stock_info = df[df['代码'] == stock['code']] if stock_info.empty: continue price = float(stock_info['最新价'].values[0]) dividend_data = self.get_dividend_data(stock['code']) # 估算当前股息率(假设分红金额不变) if dividend_data['years'] > 0: # 简化计算:使用历史平均股息率 current_yield = dividend_data['dividend_yield'] else: current_yield = 0 results.append({ 'code': stock['code'], 'name': stock['name'], 'sector': stock['sector'], 'price': price, 'dividend_yield': current_yield, 'score': current_yield * 100 # 评分就是股息率 }) except Exception as e: logger.error(f"处理{stock['code']}失败: {e}") # 按股息率排序 results = sorted(results, key=lambda x: x['dividend_yield'], reverse=True) return results def generate_allocation(self) -> List[Dict]: """生成配置方案""" stocks = self.screen_stocks() if not stocks: return [] # 选择股息率前5的股票,每只4万 selected = stocks[:5] allocation = [] for stock in selected: if stock['dividend_yield'] >= 0.04: # 至少4%股息率 shares = int(40000 / stock['price'] / 100) * 100 # 整手 allocation.append({ 'code': stock['code'], 'name': stock['name'], 'price': stock['price'], 'shares': shares, 'dividend_yield': stock['dividend_yield'], 'invest_amount': shares * stock['price'] }) return allocation class RiskManager: """风险管理器""" def __init__(self, portfolio: PortfolioManager): self.portfolio = portfolio self.max_drawdown_total = 0.12 # 总回撤12%红线 self.max_drawdown_strategy = 0.15 # 单策略回撤15% self.max_loss_per_trade = 0.08 # 单笔8%止损 def check_portfolio_risk(self, current_value: float) -> Dict: """检查组合风险""" initial_value = self.portfolio.total_capital drawdown = (initial_value - current_value) / initial_value alerts = [] actions = [] if drawdown > self.max_drawdown_total: alerts.append(f"⚠️ 总回撤 {drawdown*100:.1f}% 超过红线 {self.max_drawdown_total*100:.1f}%") actions.append("HALF_ALL") # 全部减仓50% elif drawdown > 0.08: alerts.append(f"⚠️ 总回撤 {drawdown*100:.1f}% 接近警戒线") return { 'drawdown': drawdown, 'alerts': alerts, 'actions': actions, 'is_safe': len(alerts) == 0 } def check_stop_loss(self, position: Dict, current_price: float) -> bool: """检查是否需要止损""" entry_price = position.get('entry_price', current_price) loss_pct = (current_price - entry_price) / entry_price if loss_pct < -self.max_loss_per_trade: logger.warning(f"止损触发: {position['code']} 亏损 {loss_pct*100:.1f}%") return True return False class BacktestEngine: """回测引擎""" def __init__(self, strategy, start_date: str, end_date: str, initial_capital: float): self.strategy = strategy self.start_date = start_date self.end_date = end_date self.initial_capital = initial_capital self.capital = initial_capital self.positions = {} # 当前持仓 self.trades = [] # 交易记录 self.daily_values = [] # 每日净值 def run(self) -> Dict: """运行回测""" logger.info(f"开始回测: {self.start_date} 至 {self.end_date}") # 这里简化处理,实际应该按日遍历 # 由于AKShare历史数据获取限制,这里提供框架 # 模拟回测结果 results = { 'initial_capital': self.initial_capital, 'final_value': self.capital, 'total_return': 0, 'annual_return': 0, 'max_drawdown': 0, 'sharpe_ratio': 0, 'trade_count': len(self.trades), 'win_rate': 0 } return results class QuantSystem: """量化交易系统主类""" def __init__(self): self.portfolio = PortfolioManager(total_capital=1000000) self.cb_strategy = ConvertibleBondStrategy(capital=400000) self.sc_strategy = SmallCapMomentumStrategy(capital=300000) self.hd_strategy = HighDividendStrategy(capital=200000) self.risk_manager = RiskManager(self.portfolio) def daily_run(self): """每日运行""" logger.info("=" * 60) logger.info("开始每日策略运行") logger.info("=" * 60) # 1. 检查风险 # current_value = self.calculate_portfolio_value() # risk_status = self.risk_manager.check_portfolio_risk(current_value) # 2. 生成各策略信号 logger.info("\n--- 可转债双低策略 ---") cb_buys, cb_sells = self.cb_strategy.generate_signals([]) logger.info(f"买入信号: {len(cb_buys)}只, 卖出信号: {len(cb_sells)}只") logger.info("\n--- 小市值动量策略 ---") sc_buys, sc_sells, market_ok = self.sc_strategy.generate_signals([]) logger.info(f"买入信号: {len(sc_buys)}只, 卖出信号: {len(sc_sells)}只, 市场状态: {'正常' if market_ok else '空头'}") logger.info("\n--- 高股息防御策略 ---") hd_allocation = self.hd_strategy.generate_allocation() logger.info(f"配置方案: {len(hd_allocation)}只股票") for item in hd_allocation: logger.info(f" {item['name']}({item['code']}): {item['shares']}股 @ {item['price']:.2f}, 股息率{item['dividend_yield']*100:.1f}%") # 3. 保存状态 self.portfolio.save_state() logger.info("\n策略运行完成") def generate_report(self) -> str: """生成交易报告""" report = [] report.append("=" * 60) report.append("量化交易系统日报") report.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") report.append("=" * 60) # 资金分配 report.append("\n【资金分配】") for strategy, ratio in self.portfolio.allocations.items(): amount = self.portfolio.total_capital * ratio report.append(f" {strategy}: {ratio*100:.0f}% = {amount:,.0f}元") # 策略详情 report.append("\n【可转债双低策略】") cb_candidates = self.cb_strategy.get_candidates() if not cb_candidates.empty: report.append(f"候选池: {len(cb_candidates)}只") report.append("Top 5:") for _, row in cb_candidates.head(5).iterrows(): report.append(f" {row['名称']}: 价格{row['bond_price']:.2f}, 溢价率{row['premium_rate']:.1f}%, 双低{row['double_low_score']:.1f}") report.append("\n【小市值动量策略】") market_ok = self.sc_strategy.market_trend_ok() report.append(f"市场趋势: {'多头' if market_ok else '空头/观望'}") sc_candidates = self.sc_strategy.get_candidates() if not sc_candidates.empty: report.append(f"候选池: {len(sc_candidates)}只") report.append("Top 5:") for _, row in sc_candidates.head(5).iterrows(): report.append(f" {row['名称']}: 市值{row['market_cap']:.1f}亿, 20日涨幅{row['change_20d']:.1f}%") report.append("\n【高股息防御策略】") hd_allocation = self.hd_strategy.generate_allocation() if hd_allocation: report.append("推荐配置:") for item in hd_allocation: report.append(f" {item['name']}: {item['shares']}股, 约{item['invest_amount']:,.0f}元, 股息率{item['dividend_yield']*100:.1f}%") return "\n".join(report) def main(): """主函数""" print("=" * 60) print("量化交易系统 v1.0") print("100万资金管理 - 可转债双低 + 小市值动量 + 高股息防御") print("=" * 60) system = QuantSystem() # 运行日报 system.daily_run() # 生成报告 report = system.generate_report() print("\n" + report) # 保存报告 report_file = f"report_{datetime.now().strftime('%Y%m%d')}.txt" with open(report_file, 'w', encoding='utf-8') as f: f.write(report) print(f"\n报告已保存: {report_file}") if __name__ == "__main__": main()