import pandas as pd import numpy as np import akshare as ak import warnings from datetime import datetime, timedelta warnings.filterwarnings('ignore') # ==================== 数据获取模块 ==================== class IntradayDataFetcher: """30分钟K线数据获取类""" def __init__(self): self.symbol = "399673" # 创业板50指数 def fetch_30min_data(self, start_date=None, end_date=None) -> pd.DataFrame: """获取指定时间范围的30分钟K线数据""" try: if start_date is None: start_date = datetime.now() - timedelta(days=60) if end_date is None: end_date = datetime.now() print(f"正在获取创业板50指数的30分钟K线数据...") print(f"时间范围: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}") # 使用数据源连接方式获取更多数据 # 首先尝试获取分钟级数据 try: data = ak.index_zh_a_hist_min_em(symbol=self.symbol, period="30") except Exception as e: print(f"获取30分钟数据失败: {e}") # 如果30分钟数据获取失败,尝试获取日线数据作为备选 print("尝试获取日线数据作为备选方案...") data = ak.index_zh_a_hist_em(symbol=self.symbol) if data.empty: raise ValueError("获取的数据为空") # 重命名列 data.rename(columns={ '时间': 'DateTime', '开盘': 'Open', '收盘': 'Close', '最高': 'High', '最低': 'Low', '成交量': 'Volume', '成交额': 'Amount', '振幅': 'Amplitude', '涨跌幅': 'Change_Pct', '涨跌额': 'Change_Amount', '换手率': 'Turnover', '日期': 'DateTime' # 备用字段名 }, inplace=True) # 设置时间索引 data['DateTime'] = pd.to_datetime(data['DateTime']) data.set_index('DateTime', inplace=True) data.sort_index(inplace=True) # 筛选指定时间范围的数据(使用宽松的开始时间,确保有预热数据) buffer_start = start_date - timedelta(days=60) # 增加60天缓冲 filtered_data = data[(data.index >= buffer_start) & (data.index <= end_date)].copy() if filtered_data.empty: print(f"警告:指定时间范围没有数据,使用所有可用数据") filtered_data = data.copy() # 检查数据量 print(f"获取数据总量: {len(data)}条") print(f"筛选后数据量: {len(filtered_data)}条") if len(filtered_data) < 20: # 放宽最低要求 raise ValueError(f"数据量严重不足:只获取到{len(filtered_data)}条数据,无法进行有效回测") # 计算基础指标 filtered_data['Returns'] = filtered_data['Close'].pct_change() filtered_data['High_Low_Pct'] = (filtered_data['High'] - filtered_data['Low']) / filtered_data['Close'].shift(1) filtered_data['Close_Open_Pct'] = (filtered_data['Close'] - filtered_data['Open']) / filtered_data['Open'] # 处理缺失值 filtered_data.fillna(method='ffill', inplace=True) filtered_data.dropna(inplace=True) print(f"最终可用数据: {len(filtered_data)}条") print(f"数据范围: {filtered_data.index[0]} 到 {filtered_data.index[-1]}") return filtered_data except Exception as e: print(f"获取数据时出错: {str(e)}") raise def calculate_intraday_indicators(self, data: pd.DataFrame) -> pd.DataFrame: """计算30分钟技术指标""" print("正在计算30分钟技术指标...") df = data.copy() # 短期移动平均线 df['MA6'] = df['Close'].rolling(window=6).mean() # 3小时 df['MA12'] = df['Close'].rolling(window=12).mean() # 6小时 df['MA24'] = df['Close'].rolling(window=24).mean() # 12小时(一天) # RSI delta = df['Close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss df['RSI'] = 100 - (100 / (1 + rs)) # 布林带 df['BB_middle'] = df['Close'].rolling(window=20).mean() bb_std = df['Close'].rolling(window=20).std() df['BB_upper'] = df['BB_middle'] + (bb_std * 2) df['BB_lower'] = df['BB_middle'] - (bb_std * 2) df['BB_width'] = (df['BB_upper'] - df['BB_lower']) / df['BB_middle'] # MACD exp1 = df['Close'].ewm(span=12, adjust=False).mean() exp2 = df['Close'].ewm(span=26, adjust=False).mean() df['MACD'] = exp1 - exp2 df['MACD_signal'] = df['MACD'].ewm(span=9, adjust=False).mean() df['MACD_hist'] = df['MACD'] - df['MACD_signal'] # KDJ low_9 = df['Low'].rolling(window=9).min() high_9 = df['High'].rolling(window=9).max() rsv = (df['Close'] - low_9) / (high_9 - low_9) * 100 df['K'] = rsv.ewm(com=2, adjust=False).mean() df['D'] = df['K'].ewm(com=2, adjust=False).mean() df['J'] = 3 * df['K'] - 2 * df['D'] # ATR high_low = df['High'] - df['Low'] high_close = abs(df['High'] - df['Close'].shift()) low_close = abs(df['Low'] - df['Close'].shift()) true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1) df['ATR'] = true_range.rolling(window=14).mean() df['ATR_Pct'] = df['ATR'] / df['Close'] # 动量指标 df['Momentum'] = df['Close'] / df['Close'].shift(4) - 1 # 2小时动量 # 成交量变化 df['Volume_MA'] = df['Volume'].rolling(window=12).mean() df['Volume_Ratio'] = df['Volume'] / df['Volume_MA'] # 价格动量 df['Price_Momentum'] = (df['Close'] - df['Close'].shift(6)) / df['Close'].shift(6) print("技术指标计算完成") return df # ==================== 翻转信号生成器 ==================== class ReversalSignalGenerator: """日内翻转信号生成器""" def __init__(self): self.signal_count = 0 def generate_reversal_signals(self, data: pd.DataFrame) -> pd.DataFrame: """生成日内翻转信号""" print("正在生成日内翻转信号...") signals = [] df = data.copy() for i in range(24, len(df)): # 至少需要12小时(24个30分钟)的历史数据 current_bar = df.iloc[i] current_time = df.index[i] # 跳过不适合交易的时间段(如午休时间等) hour = current_time.hour if hour < 9 or hour > 15: # 只在交易时间内 continue # 生成信号 signal = { 'DateTime': current_time, 'Open': current_bar['Open'], 'High': current_bar['High'], 'Low': current_bar['Low'], 'Close': current_bar['Close'], 'Volume': current_bar['Volume'], 'RSI': current_bar['RSI'], 'MACD': current_bar['MACD'], 'MACD_hist': current_bar['MACD_hist'], 'K': current_bar['K'], 'D': current_bar['D'], 'J': current_bar['J'], 'ATR_Pct': current_bar['ATR_Pct'], 'Volume_Ratio': current_bar['Volume_Ratio'], 'Price_Momentum': current_bar['Price_Momentum'], 'Close_Open_Pct': current_bar['Close_Open_Pct'] } # 计算各种翻转信号 reversal_score = 0 reversal_signals = [] # 1. RSI超卖翻转 if current_bar['RSI'] < 30: reversal_score += 2 reversal_signals.append("RSI超卖") elif current_bar['RSI'] < 35: reversal_score += 1 reversal_signals.append("RSI偏弱") # 2. KDJ超卖翻转 if current_bar['K'] < 20 and current_bar['D'] < 20: reversal_score += 2 reversal_signals.append("KDJ超卖") elif current_bar['J'] < 0: reversal_score += 2 reversal_signals.append("KDJ极端超卖") # 3. MACD金叉 if current_bar['MACD_hist'] > 0 and df.iloc[i-1]['MACD_hist'] <= 0: reversal_score += 2 reversal_signals.append("MACD金叉") elif current_bar['MACD_hist'] > df.iloc[i-1]['MACD_hist']: reversal_score += 1 reversal_signals.append("MACD改善") # 4. 价格触及布林带下轨 bb_width = current_bar['BB_width'] if current_bar['Close'] <= current_bar['BB_lower'] * 1.005: reversal_score += 2 reversal_signals.append("触及下轨") elif current_bar['Close'] <= current_bar['BB_lower'] * 1.01: reversal_score += 1 reversal_signals.append("接近下轨") # 5. 连续下跌后的反转 recent_returns = df.iloc[i-6:i]['Returns'] if recent_returns.min() < -0.015: # 最近2小时内有超过1.5%的下跌 consecutive_decline = sum(recent_returns < 0) if consecutive_decline >= 4: # 连续4个周期下跌 reversal_score += 2 reversal_signals.append("连续下跌反转") # 6. 价格动量反转 if current_bar['Price_Momentum'] < -0.02: # 3小时下跌超过2% reversal_score += 1 reversal_signals.append("动量超卖") # 7. 成交量配合 if current_bar['Volume_Ratio'] > 1.2: # 放量 reversal_score += 1 reversal_signals.append("放量配合") # 8. 当日开盘价格关系 daily_high = df[df.index.date == current_time.date()]['High'].max() daily_low = df[df.index.date == current_time.date()]['Low'].min() daily_range = daily_high - daily_low if daily_range > 0: position_in_day = (current_bar['Close'] - daily_low) / daily_range if position_in_day < 0.3: # 在当日低位区域 reversal_score += 1 reversal_signals.append("日内低位") # 设置信号 signal['Reversal_Score'] = reversal_score signal['Reversal_Signals'] = ', '.join(reversal_signals) if reversal_signals else '' # 生成买入信号(阈值降低以增加交易频率) if reversal_score >= 4: signal['Signal'] = 1 signal['Signal_Type'] = '做多翻转' self.signal_count += 1 else: signal['Signal'] = 0 signal['Signal_Type'] = '' signals.append(signal) signals_df = pd.DataFrame(signals) signals_df.set_index('DateTime', inplace=True) print(f"信号生成完成,共产生{self.signal_count}个翻转信号") print(f"信号密度: {self.signal_count/len(signals_df)*100:.2f}%") return signals_df # ==================== 日内交易执行器 ==================== class IntradayReversalExecutor: """日内翻转交易执行器""" def __init__(self, initial_capital=1000000): self.initial_capital = initial_capital self.params = { 'commission_rate': 0.0001, # 万分之一 'slippage_rate': 0.0, # 无滑点 'position_size_pct': 1.0, # 每次开仓100%仓位(满仓) 'stop_loss_pct': 0.008, # 0.8%止损 'take_profit_pct': 0.015, # 1.5%止盈 'max_hold_bars': 16, # 最多持有8小时(16个30分钟) 'min_signal_strength': 4 # 最小信号强度 } def execute_intraday_trades(self, signals_df: pd.DataFrame) -> tuple: """执行日内翻转交易""" print("正在执行日内翻转交易...") df = signals_df.copy() # 初始化 trades = [] capital = self.initial_capital position = 0 entry_price = 0 entry_time = None holding_bars = 0 entry_signals = '' # 添加资金列 df = df.copy() df['capital'] = capital df['position'] = 0 df['net_value'] = capital for i in range(len(df)): current_time = df.index[i] current_bar = df.iloc[i] price = current_bar['Close'] # 更新当前净值 if position > 0: current_value = capital + position * price df.iloc[i, df.columns.get_loc('net_value')] = current_value else: df.iloc[i, df.columns.get_loc('net_value')] = capital # 开仓逻辑 if position == 0 and current_bar['Signal'] == 1: # 开仓 position_size = int((capital * self.params['position_size_pct']) / price) if position_size > 0: cost = position_size * price * (1 + self.params['commission_rate'] + self.params['slippage_rate']) if cost <= capital: position = position_size entry_price = price entry_time = current_time entry_signals = current_bar.get('Reversal_Signals', '') holding_bars = 0 capital -= cost df.iloc[i, df.columns.get_loc('position')] = position # 平仓逻辑 elif position > 0: holding_bars += 1 # 计算止损止盈价格 stop_loss = entry_price * (1 - self.params['stop_loss_pct']) take_profit = entry_price * (1 + self.params['take_profit_pct']) exit_signal = False exit_reason = '' exit_price = price # 止损 if price <= stop_loss: exit_signal = True exit_reason = "止损" exit_price = stop_loss # 止盈 elif price >= take_profit: exit_signal = True exit_reason = "止盈" exit_price = take_profit # 最大持仓时间 elif holding_bars >= self.params['max_hold_bars']: exit_signal = True exit_reason = "时间止损" # 翻转信号消失 elif current_bar['RSI'] > 70: # RSI超买 exit_signal = True exit_reason = "RSI超买平仓" # 执行平仓 if exit_signal: # 计算盈亏 - 修复:包含开仓和平仓的总成本 gross_pnl = (exit_price - entry_price) * position # 开仓成本(已经在开仓时扣除) open_cost = position * entry_price * (self.params['commission_rate'] + self.params['slippage_rate']) # 平仓成本 close_revenue = position * exit_price close_cost = close_revenue * (self.params['commission_rate'] + self.params['slippage_rate']) # 净盈亏 = 价差收益 - 开仓成本 - 平仓成本 pnl = gross_pnl - open_cost - close_cost # 更新资金 capital += close_revenue - close_cost # 记录交易 trade = { '买入时间': entry_time, '卖出时间': current_time, '买入价格': entry_price, '卖出价格': exit_price, '仓位': position, '盈亏金额': pnl, '盈亏百分比': (exit_price - entry_price) / entry_price * 100, '退出原因': exit_reason, '持仓周期数': holding_bars, '持仓小时数': holding_bars * 0.5, '入场信号': entry_signals, '卖出时资金': capital, '开仓市值': position * entry_price } trades.append(trade) # 重置 position = 0 entry_price = 0 entry_time = None holding_bars = 0 # 更新资金 df.iloc[i, df.columns.get_loc('capital')] = capital df.iloc[i, df.columns.get_loc('position')] = position # 强制平仓剩余持仓 - 修复:包含开仓和平仓的总成本 if position > 0: final_price = df.iloc[-1]['Close'] # 计算总盈亏 gross_pnl = (final_price - entry_price) * position open_cost = position * entry_price * (self.params['commission_rate'] + self.params['slippage_rate']) close_revenue = position * final_price close_cost = close_revenue * (self.params['commission_rate'] + self.params['slippage_rate']) pnl = gross_pnl - open_cost - close_cost capital += close_revenue - close_cost trade = { '买入时间': entry_time, '卖出时间': df.index[-1], '买入价格': entry_price, '卖出价格': final_price, '仓位': position, '盈亏金额': pnl, '盈亏百分比': (final_price - entry_price) / entry_price * 100, '退出原因': '强制平仓', '持仓周期数': holding_bars, '持仓小时数': holding_bars * 0.5, '入场信号': entry_signals, '卖出时资金': capital, '开仓市值': position * entry_price } trades.append(trade) trades_df = pd.DataFrame(trades) if len(trades_df) > 0: trades_df['买入时间'] = pd.to_datetime(trades_df['买入时间']) trades_df['卖出时间'] = pd.to_datetime(trades_df['卖出时间']) trades_df = trades_df.sort_values('买入时间') print(f"交易执行完成,共{len(trades_df)}笔交易") return df, trades_df # ==================== 验证分析模块 ==================== def validate_intraday_results(results_df, trades_df, initial_capital): """验证日内交易结果""" print("\n" + "=" * 80) print("日内翻转交易结果验证") print("=" * 80) print(f"\n【基础数据验证】") final_capital = results_df['net_value'].iloc[-1] total_return = (final_capital - initial_capital) / initial_capital * 100 print(f"初始资金: {initial_capital:,.2f}元") print(f"最终资金: {final_capital:,.2f}元") print(f"总收益率: {total_return:.2f}%") print(f"交易次数: {len(trades_df)}笔") if len(trades_df) > 0: print(f"\n【交易统计】") win_trades = trades_df[trades_df['盈亏金额'] > 0] lose_trades = trades_df[trades_df['盈亏金额'] < 0] print(f"盈利交易: {len(win_trades)}笔 ({len(win_trades)/len(trades_df)*100:.1f}%)") print(f"亏损交易: {len(lose_trades)}笔 ({len(lose_trades)/len(trades_df)*100:.1f}%)") print(f"平均持仓时间: {trades_df['持仓小时数'].mean():.1f}小时") print(f"平均收益率: {trades_df['盈亏百分比'].mean():.2f}%") # 按退出原因统计 print(f"\n【退出原因统计】") for reason, count in trades_df['退出原因'].value_counts().items(): percentage = count / len(trades_df) * 100 reason_pnl = trades_df[trades_df['退出原因'] == reason]['盈亏金额'].sum() print(f" {reason}: {count}次 ({percentage:.1f}%) - 总盈亏: {reason_pnl:+,.2f}元") # ==================== 主程序 ==================== def main(): """主程序 - 运行30分钟日内翻转策略""" print("=" * 80) print("创业板50 30分钟日内翻转策略") print("=" * 80) # 策略参数 # 时间配置(调整为akshare数据可用的最近时间范围) BACKTEST_START_DATE = "2026-01-05" # 回测开始日期(调整为最近可用数据) BACKTEST_END_DATE = "2026-01-19" # 回测结束日期(使用当前日期) PREWARMP_DAYS = 10 # 指标预热期天数(调整为10天) INITIAL_CAPITAL = 100000 # 转换日期格式 start_date = datetime.strptime(BACKTEST_START_DATE, "%Y-%m-%d") end_date = datetime.strptime(BACKTEST_END_DATE, "%Y-%m-%d") # 计算数据获取开始时间(回测开始时间 - 预热期) data_start_date = start_date - timedelta(days=PREWARMP_DAYS) print(f"\n策略参数:") print(f" 回测期间: {BACKTEST_START_DATE} 至 {BACKTEST_END_DATE}") print(f" 数据获取期间: {data_start_date.strftime('%Y-%m-%d')} 至 {BACKTEST_END_DATE}") print(f" 指标预热期: {PREWARMP_DAYS}天") print(f" K线周期: 30分钟") print(f" 初始资金: {INITIAL_CAPITAL:,}元") print(f" 标的指数: 创业板50 (399673)") try: # Phase 1: 数据获取 print(f"\n【Phase 1: 30分钟数据获取】") fetcher = IntradayDataFetcher() # 获取包含预热期的完整数据 full_data = fetcher.fetch_30min_data(start_date=data_start_date, end_date=end_date) full_data = fetcher.calculate_intraday_indicators(full_data) # 筛选回测期间的数据 original_len = len(full_data) backtest_data = full_data[(full_data.index >= start_date) & (full_data.index <= end_date)].copy() print(f"筛选回测数据: {original_len} -> {len(backtest_data)} 条") print(f"回测数据范围: {backtest_data.index[0]} 到 {backtest_data.index[-1]}") # Phase 2: 信号生成 print(f"\n【Phase 2: 翻转信号生成】") signal_gen = ReversalSignalGenerator() signals_df = signal_gen.generate_reversal_signals(backtest_data) # Phase 3: 交易执行 print(f"\n【Phase 3: 日内交易执行】") executor = IntradayReversalExecutor(initial_capital=INITIAL_CAPITAL) results_df, trades_df = executor.execute_intraday_trades(signals_df) # Phase 4: 验证分析 print(f"\n【Phase 4: 结果验证与分析】") validate_intraday_results(results_df, trades_df, INITIAL_CAPITAL) # Phase 5: 导出数据 if len(trades_df) > 0: print(f"\n【Phase 5: 导出交易数据】") # 确保时间戳格式精确到分钟 trades_df['买入时间'] = pd.to_datetime(trades_df['买入时间']).dt.strftime('%Y-%m-%d %H:%M:%S') trades_df['卖出时间'] = pd.to_datetime(trades_df['卖出时间']).dt.strftime('%Y-%m-%d %H:%M:%S') # 生成带时间戳的文件名 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_file = f'cyb50_30min_intraday_reversal_trades_{timestamp}.csv' trades_df.to_csv(output_file, index=False, encoding='utf-8-sig') print(f"交易记录已保存到: {output_file}") print(f"时间戳格式: YYYY-MM-DD HH:MM:SS") # 策略总结 print(f"\n" + "=" * 80) print("策略运行总结") print("=" * 80) if len(trades_df) > 0: final_capital = results_df['net_value'].iloc[-1] total_return = (final_capital - INITIAL_CAPITAL) / INITIAL_CAPITAL * 100 print(f"初始资金: {INITIAL_CAPITAL:,.2f}元") print(f"最终资金: {final_capital:,.2f}元") print(f"总收益率: {total_return:.2f}%") print(f"交易次数: {len(trades_df)}笔") print(f"胜率: {(trades_df['盈亏金额'] > 0).sum() / len(trades_df) * 100:.1f}%") print(f"平均收益率: {trades_df['盈亏百分比'].mean():.2f}%") print(f"最大单笔盈利: {trades_df['盈亏金额'].max():+,.2f}元") print(f"最大单笔亏损: {trades_df['盈亏金额'].min():+,.2f}元") print(f"\n[SUCCESS] 策略运行成功!") else: print("未产生任何交易信号") except Exception as e: print(f"\n[ERROR] 策略运行出错: {str(e)}") import traceback traceback.print_exc() finally: print(f"\n" + "=" * 80) if __name__ == "__main__": main()