| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550 |
- #!/usr/bin/env python3
- """
- 量化交易系统 - 100万资金管理
- 策略:可转债双低 + 小市值动量 + 高股息防御
- 作者:Kimi Claw
- 日期:2026-03-07
- """
- import pandas as pd
- import numpy as np
- import akshare as ak
- from datetime import datetime, timedelta
- import json
- import logging
- import os
- from typing import List, Dict, Tuple
- import warnings
- warnings.filterwarnings('ignore')
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler('quant_trade.log', encoding='utf-8'),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- class PortfolioManager:
- """组合管理器 - 总资金100万分配"""
-
- def __init__(self, total_capital: float = 1000000):
- self.total_capital = total_capital
- self.allocations = {
- 'convertible_bond': 0.40, # 可转债40万
- 'small_cap_momentum': 0.30, # 小市值动量30万
- 'high_dividend': 0.20, # 高股息20万
- 'cash': 0.10 # 现金10万
- }
- self.positions = {} # 当前持仓
- self.trade_history = [] # 交易记录
-
- def get_strategy_capital(self, strategy_name: str) -> float:
- """获取指定策略的资金额度"""
- return self.total_capital * self.allocations.get(strategy_name, 0)
-
- def record_trade(self, strategy: str, action: str, code: str,
- name: str, price: float, shares: int, reason: str = ""):
- """记录交易"""
- trade = {
- 'datetime': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- 'strategy': strategy,
- 'action': action, # BUY/SELL
- 'code': code,
- 'name': name,
- 'price': price,
- 'shares': shares,
- 'amount': price * shares,
- 'reason': reason
- }
- self.trade_history.append(trade)
- logger.info(f"[交易记录] {action} {name}({code}): {shares}股 @ {price:.2f}")
-
- def save_state(self):
- """保存组合状态"""
- state = {
- 'total_capital': self.total_capital,
- 'allocations': self.allocations,
- 'positions': self.positions,
- 'trade_history': self.trade_history,
- 'update_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }
- with open('portfolio_state.json', 'w', encoding='utf-8') as f:
- json.dump(state, f, ensure_ascii=False, indent=2)
- logger.info("组合状态已保存")
- class ConvertibleBondStrategy:
- """可转债双低策略 - 40万资金"""
-
- def __init__(self, capital: float = 400000):
- self.capital = capital
- self.position_limit = 20 # 最多持有20只
- self.single_limit = capital / self.position_limit # 每只2万
- self.stop_loss = -0.08 # 8%止损
- self.take_profit = 0.15 # 15%止盈
-
- def get_candidates(self) -> pd.DataFrame:
- """获取候选可转债"""
- try:
- # 获取可转债数据
- df = ak.bond_cb_jsl(cookie="")
-
- # 筛选条件
- df['bond_price'] = pd.to_numeric(df['价格'], errors='coerce')
- df['premium_rate'] = pd.to_numeric(df['溢价率'].str.replace('%', ''), errors='coerce')
- df['remain_year'] = pd.to_numeric(df['剩余年限'], errors='coerce')
-
- # 双低分数 = 价格 + 溢价率
- df['double_low_score'] = df['bond_price'] + df['premium_rate']
-
- # 过滤条件
- mask = (
- (df['bond_price'] < 115) & # 价格低于115
- (df['premium_rate'] < 30) & # 溢价率低于30%
- (df['remain_year'] > 1) & # 剩余期限大于1年
- (df['评级'].isin(['AAA', 'AA+', 'AA', 'AA-'])) # 评级
- )
-
- candidates = df[mask].copy()
- candidates = candidates.nsmallest(30, 'double_low_score')
-
- logger.info(f"可转债候选池: {len(candidates)}只")
- return candidates[['代码', '名称', 'bond_price', 'premium_rate',
- 'double_low_score', '转股价值', '剩余规模']]
-
- except Exception as e:
- logger.error(f"获取可转债数据失败: {e}")
- return pd.DataFrame()
-
- def generate_signals(self, current_positions: List[str]) -> Tuple[List[Dict], List[str]]:
- """
- 生成交易信号
- Returns: (buy_list, sell_list)
- """
- candidates = self.get_candidates()
- if candidates.empty:
- return [], []
-
- buy_list = []
- sell_list = []
-
- # 目标持仓(前20只)
- target_codes = candidates.head(self.position_limit)['代码'].tolist()
-
- # 需要卖出的:不在目标列表中的当前持仓
- for code in current_positions:
- if code not in target_codes:
- sell_list.append(code)
-
- # 需要买入的:目标列表中不在当前持仓的
- for _, row in candidates.head(self.position_limit).iterrows():
- if row['代码'] not in current_positions:
- buy_list.append({
- 'code': row['代码'],
- 'name': row['名称'],
- 'price': row['bond_price'],
- 'amount': self.single_limit
- })
-
- return buy_list, sell_list
- class SmallCapMomentumStrategy:
- """小市值动量策略 - 30万资金"""
-
- def __init__(self, capital: float = 300000):
- self.capital = capital
- self.position_limit = 10 # 最多持有10只
- self.single_limit = capital / self.position_limit # 每只3万
- self.stop_loss = -0.08 # 8%止损
- self.market_filter = True # 启用市场过滤器
-
- def get_all_stocks(self) -> pd.DataFrame:
- """获取全市场股票列表"""
- try:
- df = ak.stock_zh_a_spot_em()
- df['market_cap'] = df['总市值'].astype(float) / 1e8 # 转为亿
- df['turnover'] = df['成交额'].astype(float) / 1e4 # 转为万
- df['change_20d'] = df['20日涨跌幅'].astype(float)
-
- # 过滤条件
- mask = (
- (df['market_cap'] < 50) & # 市值小于50亿
- (df['turnover'] > 1000) & # 成交额大于1000万
- (~df['名称'].str.contains('ST|退', na=False)) & # 排除ST和退市
- (~df['代码'].str.startswith('68')) & # 排除科创板
- (~df['代码'].str.startswith('8')) & # 排除北交所
- (~df['代码'].str.startswith('4')) & # 排除新三板
- (df['change_20d'] > -20) # 排除暴跌股
- )
-
- return df[mask].copy()
- except Exception as e:
- logger.error(f"获取股票数据失败: {e}")
- return pd.DataFrame()
-
- def market_trend_ok(self) -> bool:
- """检查市场趋势 - 中证1000是否站上20日均线"""
- try:
- # 获取中证1000指数
- df = ak.index_zh_a_hist(symbol="000852", period="daily")
- if len(df) < 30:
- return True # 数据不足,默认允许交易
-
- df['ma20'] = df['close'].rolling(20).mean()
- latest_close = df['close'].iloc[-1]
- latest_ma20 = df['ma20'].iloc[-1]
-
- is_bullish = latest_close > latest_ma20
- logger.info(f"中证1000: 收盘价{latest_close:.2f}, MA20:{latest_ma20:.2f}, 趋势:{'多头' if is_bullish else '空头'}")
- return is_bullish
-
- except Exception as e:
- logger.error(f"获取指数数据失败: {e}")
- return True
-
- def get_candidates(self) -> pd.DataFrame:
- """获取候选股票(20日涨幅排名)"""
- df = self.get_all_stocks()
- if df.empty:
- return df
-
- # 按20日涨幅排序,取前200
- df = df.nlargest(200, 'change_20d')
-
- logger.info(f"小市值动量候选池: {len(df)}只")
- return df[['代码', '名称', 'market_cap', 'change_20d', 'turnover', '最新价']]
-
- def generate_signals(self, current_positions: List[str]) -> Tuple[List[Dict], List[str], bool]:
- """
- 生成交易信号
- Returns: (buy_list, sell_list, market_ok)
- """
- market_ok = self.market_trend_ok()
-
- if not market_ok:
- logger.warning("市场趋势不佳,建议减仓或清仓")
- # 返回空买入列表,卖出所有持仓
- return [], current_positions, False
-
- candidates = self.get_candidates()
- if candidates.empty:
- return [], [], True
-
- buy_list = []
- sell_list = []
-
- # 目标持仓(前10只)
- target_codes = candidates.head(self.position_limit)['代码'].tolist()
-
- # 需要卖出的
- for code in current_positions:
- if code not in target_codes:
- sell_list.append(code)
-
- # 需要买入的
- for _, row in candidates.head(self.position_limit).iterrows():
- if row['代码'] not in current_positions:
- buy_list.append({
- 'code': row['代码'],
- 'name': row['名称'],
- 'price': row['最新价'],
- 'amount': self.single_limit
- })
-
- return buy_list, sell_list, True
- class HighDividendStrategy:
- """高股息防御策略 - 20万资金"""
-
- def __init__(self, capital: float = 200000):
- self.capital = capital
- self.target_dividend_yield = 0.05 # 目标股息率5%
- self.holdings = [] # 当前持仓
-
- # 核心标的池(历史高股息+稳定)
- self.core_pool = [
- {'code': '600900', 'name': '长江电力', 'sector': '水电'},
- {'code': '601088', 'name': '中国神华', 'sector': '煤炭'},
- {'code': '601288', 'name': '农业银行', 'sector': '银行'},
- {'code': '601006', 'name': '大秦铁路', 'sector': '交运'},
- {'code': '600377', 'name': '宁沪高速', 'sector': '高速'},
- {'code': '600887', 'name': '伊利股份', 'sector': '消费'},
- {'code': '000895', 'name': '双汇发展', 'sector': '食品'},
- {'code': '600048', 'name': '保利发展', 'sector': '地产'},
- ]
-
- def get_dividend_data(self, code: str) -> Dict:
- """获取个股股息数据"""
- try:
- # 获取历史分红数据
- df = ak.stock_dividend_cninfo(symbol=code)
- if df.empty:
- return {'dividend_yield': 0, 'years': 0}
-
- # 计算平均股息率(近3年)
- recent = df.head(3)
- avg_yield = recent['股息率'].mean() if '股息率' in recent.columns else 0
-
- return {
- 'dividend_yield': avg_yield,
- 'years': len(df)
- }
- except Exception as e:
- logger.error(f"获取{code}股息数据失败: {e}")
- return {'dividend_yield': 0, 'years': 0}
-
- def screen_stocks(self) -> List[Dict]:
- """筛选高股息股票"""
- results = []
-
- for stock in self.core_pool:
- try:
- # 获取实时价格
- df = ak.stock_zh_a_spot_em()
- stock_info = df[df['代码'] == stock['code']]
-
- if stock_info.empty:
- continue
-
- price = float(stock_info['最新价'].values[0])
- dividend_data = self.get_dividend_data(stock['code'])
-
- # 估算当前股息率(假设分红金额不变)
- if dividend_data['years'] > 0:
- # 简化计算:使用历史平均股息率
- current_yield = dividend_data['dividend_yield']
- else:
- current_yield = 0
-
- results.append({
- 'code': stock['code'],
- 'name': stock['name'],
- 'sector': stock['sector'],
- 'price': price,
- 'dividend_yield': current_yield,
- 'score': current_yield * 100 # 评分就是股息率
- })
-
- except Exception as e:
- logger.error(f"处理{stock['code']}失败: {e}")
-
- # 按股息率排序
- results = sorted(results, key=lambda x: x['dividend_yield'], reverse=True)
- return results
-
- def generate_allocation(self) -> List[Dict]:
- """生成配置方案"""
- stocks = self.screen_stocks()
- if not stocks:
- return []
-
- # 选择股息率前5的股票,每只4万
- selected = stocks[:5]
- allocation = []
-
- for stock in selected:
- if stock['dividend_yield'] >= 0.04: # 至少4%股息率
- shares = int(40000 / stock['price'] / 100) * 100 # 整手
- allocation.append({
- 'code': stock['code'],
- 'name': stock['name'],
- 'price': stock['price'],
- 'shares': shares,
- 'dividend_yield': stock['dividend_yield'],
- 'invest_amount': shares * stock['price']
- })
-
- return allocation
- class RiskManager:
- """风险管理器"""
-
- def __init__(self, portfolio: PortfolioManager):
- self.portfolio = portfolio
- self.max_drawdown_total = 0.12 # 总回撤12%红线
- self.max_drawdown_strategy = 0.15 # 单策略回撤15%
- self.max_loss_per_trade = 0.08 # 单笔8%止损
-
- def check_portfolio_risk(self, current_value: float) -> Dict:
- """检查组合风险"""
- initial_value = self.portfolio.total_capital
- drawdown = (initial_value - current_value) / initial_value
-
- alerts = []
- actions = []
-
- if drawdown > self.max_drawdown_total:
- alerts.append(f"⚠️ 总回撤 {drawdown*100:.1f}% 超过红线 {self.max_drawdown_total*100:.1f}%")
- actions.append("HALF_ALL") # 全部减仓50%
- elif drawdown > 0.08:
- alerts.append(f"⚠️ 总回撤 {drawdown*100:.1f}% 接近警戒线")
-
- return {
- 'drawdown': drawdown,
- 'alerts': alerts,
- 'actions': actions,
- 'is_safe': len(alerts) == 0
- }
-
- def check_stop_loss(self, position: Dict, current_price: float) -> bool:
- """检查是否需要止损"""
- entry_price = position.get('entry_price', current_price)
- loss_pct = (current_price - entry_price) / entry_price
-
- if loss_pct < -self.max_loss_per_trade:
- logger.warning(f"止损触发: {position['code']} 亏损 {loss_pct*100:.1f}%")
- return True
- return False
- class BacktestEngine:
- """回测引擎"""
-
- def __init__(self, strategy, start_date: str, end_date: str, initial_capital: float):
- self.strategy = strategy
- self.start_date = start_date
- self.end_date = end_date
- self.initial_capital = initial_capital
- self.capital = initial_capital
- self.positions = {} # 当前持仓
- self.trades = [] # 交易记录
- self.daily_values = [] # 每日净值
-
- def run(self) -> Dict:
- """运行回测"""
- logger.info(f"开始回测: {self.start_date} 至 {self.end_date}")
-
- # 这里简化处理,实际应该按日遍历
- # 由于AKShare历史数据获取限制,这里提供框架
-
- # 模拟回测结果
- results = {
- 'initial_capital': self.initial_capital,
- 'final_value': self.capital,
- 'total_return': 0,
- 'annual_return': 0,
- 'max_drawdown': 0,
- 'sharpe_ratio': 0,
- 'trade_count': len(self.trades),
- 'win_rate': 0
- }
-
- return results
- class QuantSystem:
- """量化交易系统主类"""
-
- def __init__(self):
- self.portfolio = PortfolioManager(total_capital=1000000)
- self.cb_strategy = ConvertibleBondStrategy(capital=400000)
- self.sc_strategy = SmallCapMomentumStrategy(capital=300000)
- self.hd_strategy = HighDividendStrategy(capital=200000)
- self.risk_manager = RiskManager(self.portfolio)
-
- def daily_run(self):
- """每日运行"""
- logger.info("=" * 60)
- logger.info("开始每日策略运行")
- logger.info("=" * 60)
-
- # 1. 检查风险
- # current_value = self.calculate_portfolio_value()
- # risk_status = self.risk_manager.check_portfolio_risk(current_value)
-
- # 2. 生成各策略信号
- logger.info("\n--- 可转债双低策略 ---")
- cb_buys, cb_sells = self.cb_strategy.generate_signals([])
- logger.info(f"买入信号: {len(cb_buys)}只, 卖出信号: {len(cb_sells)}只")
-
- logger.info("\n--- 小市值动量策略 ---")
- sc_buys, sc_sells, market_ok = self.sc_strategy.generate_signals([])
- logger.info(f"买入信号: {len(sc_buys)}只, 卖出信号: {len(sc_sells)}只, 市场状态: {'正常' if market_ok else '空头'}")
-
- logger.info("\n--- 高股息防御策略 ---")
- hd_allocation = self.hd_strategy.generate_allocation()
- logger.info(f"配置方案: {len(hd_allocation)}只股票")
- for item in hd_allocation:
- logger.info(f" {item['name']}({item['code']}): {item['shares']}股 @ {item['price']:.2f}, 股息率{item['dividend_yield']*100:.1f}%")
-
- # 3. 保存状态
- self.portfolio.save_state()
-
- logger.info("\n策略运行完成")
-
- def generate_report(self) -> str:
- """生成交易报告"""
- report = []
- report.append("=" * 60)
- report.append("量化交易系统日报")
- report.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- report.append("=" * 60)
-
- # 资金分配
- report.append("\n【资金分配】")
- for strategy, ratio in self.portfolio.allocations.items():
- amount = self.portfolio.total_capital * ratio
- report.append(f" {strategy}: {ratio*100:.0f}% = {amount:,.0f}元")
-
- # 策略详情
- report.append("\n【可转债双低策略】")
- cb_candidates = self.cb_strategy.get_candidates()
- if not cb_candidates.empty:
- report.append(f"候选池: {len(cb_candidates)}只")
- report.append("Top 5:")
- for _, row in cb_candidates.head(5).iterrows():
- report.append(f" {row['名称']}: 价格{row['bond_price']:.2f}, 溢价率{row['premium_rate']:.1f}%, 双低{row['double_low_score']:.1f}")
-
- report.append("\n【小市值动量策略】")
- market_ok = self.sc_strategy.market_trend_ok()
- report.append(f"市场趋势: {'多头' if market_ok else '空头/观望'}")
- sc_candidates = self.sc_strategy.get_candidates()
- if not sc_candidates.empty:
- report.append(f"候选池: {len(sc_candidates)}只")
- report.append("Top 5:")
- for _, row in sc_candidates.head(5).iterrows():
- report.append(f" {row['名称']}: 市值{row['market_cap']:.1f}亿, 20日涨幅{row['change_20d']:.1f}%")
-
- report.append("\n【高股息防御策略】")
- hd_allocation = self.hd_strategy.generate_allocation()
- if hd_allocation:
- report.append("推荐配置:")
- for item in hd_allocation:
- report.append(f" {item['name']}: {item['shares']}股, 约{item['invest_amount']:,.0f}元, 股息率{item['dividend_yield']*100:.1f}%")
-
- return "\n".join(report)
- def main():
- """主函数"""
- print("=" * 60)
- print("量化交易系统 v1.0")
- print("100万资金管理 - 可转债双低 + 小市值动量 + 高股息防御")
- print("=" * 60)
-
- system = QuantSystem()
-
- # 运行日报
- system.daily_run()
-
- # 生成报告
- report = system.generate_report()
- print("\n" + report)
-
- # 保存报告
- report_file = f"report_{datetime.now().strftime('%Y%m%d')}.txt"
- with open(report_file, 'w', encoding='utf-8') as f:
- f.write(report)
- print(f"\n报告已保存: {report_file}")
- if __name__ == "__main__":
- main()
|