import pandas as pd import numpy as np import akshare as ak import warnings import json import os from datetime import datetime, timedelta warnings.filterwarnings('ignore') # ==================== 配置管理模块 ==================== class ConfigManager: """配置文件管理类""" def __init__(self, config_file='config.json'): self.config_file = config_file self.config = self.load_config() def load_config(self): """加载配置文件""" try: if os.path.exists(self.config_file): with open(self.config_file, 'r', encoding='utf-8') as f: config = json.load(f) print(f"配置文件加载成功: {self.config_file}") return config else: print(f"配置文件不存在,使用默认配置: {self.config_file}") return self.get_default_config() except Exception as e: print(f"配置文件加载失败: {e},使用默认配置") return self.get_default_config() def get_default_config(self): """获取默认配置""" return { "data_source": { "use_local_file": False, "local_file_path": "D:\\work\\project\\catfly\\data\\SZ#399673_30min.csv" }, "strategy": { "initial_capital": 1000000, "backtest_start_date": "2025-10-01", "prewamp_days": 30, "position_size_pct": 1.0, "stop_loss_pct": 0.008, "take_profit_pct": 0.015, "max_hold_bars": 16 } } def get(self, section, key, default=None): """获取配置项""" try: return self.config.get(section, {}).get(key, default) except: return default def save_config(self): """保存配置到文件""" try: with open(self.config_file, 'w', encoding='utf-8') as f: json.dump(self.config, f, indent=2, ensure_ascii=False) print(f"配置文件保存成功: {self.config_file}") except Exception as e: print(f"配置文件保存失败: {e}") # ==================== 数据获取模块 ==================== class IntradayDataFetcher: """30分钟K线数据获取类""" def __init__(self, config_manager=None): self.symbol = "399673" # 创业板50指数 self.config_manager = config_manager def fetch_30min_data(self, start_date=None, end_date=None) -> pd.DataFrame: """获取指定时间范围的30分钟K线数据""" try: if start_date is None: start_date = datetime.now() - timedelta(days=60) if end_date is None: end_date = datetime.now() print(f"正在获取创业板50指数的30分钟K线数据...") print(f"时间范围: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}") # 检查数据源开关 use_local_file = False local_file_path = "" if self.config_manager: use_local_file = self.config_manager.get('data_source', 'use_local_file', False) local_file_path = self.config_manager.get('data_source', 'local_file_path', '') # 如果开关打开,从本地文件读取 if use_local_file: print(f"数据源开关: 本地文件模式") print(f"本地文件路径: {local_file_path}") return self._load_local_file(local_file_path, start_date, end_date) else: print(f"数据源开关: 在线获取模式") return self._fetch_online_data(start_date, end_date) except Exception as e: print(f"获取数据时出错: {str(e)}") raise def _load_local_file(self, file_path, start_date, end_date) -> pd.DataFrame: """从本地文件加载数据""" try: if not os.path.exists(file_path): raise FileNotFoundError(f"本地文件不存在: {file_path}") print(f"正在从本地文件读取数据...") print(f"文件路径: {file_path}") # 检查文件扩展名,选择不同的读取方式 if file_path.endswith('.txt'): # 处理文本格式文件 print("检测到文本格式文件,使用文本解析模式...") return self._parse_text_file(file_path, start_date, end_date) else: # 处理CSV格式文件 print("检测到CSV格式文件,使用CSV解析模式...") data = pd.read_csv(file_path) return self._process_dataframe(data, start_date, end_date) except Exception as e: print(f"从本地文件加载数据失败: {e}") raise def _parse_text_file(self, file_path, start_date, end_date) -> pd.DataFrame: """解析文本格式的数据文件""" try: data_list = [] # 尝试多种编码格式 encodings = ['gbk', 'gb2312', 'utf-8', 'latin-1'] lines = None for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: lines = f.readlines() print(f"成功使用编码: {encoding}") break except UnicodeDecodeError: continue if lines is None: raise ValueError("无法读取文件,尝试了多种编码格式都失败") # 跳过前两行(标题行) for line in lines[2:]: line = line.strip() if not line: continue parts = line.split() if len(parts) >= 7: try: # 格式: 日期 时间 开盘 最高 最低 收盘 成交量 成交额 date_time_str = f"{parts[0]} {parts[1]}" datetime_obj = pd.to_datetime(date_time_str, format='%Y/%m/%d %H%M') data_list.append({ 'DateTime': datetime_obj, 'Open': float(parts[2]), 'High': float(parts[3]), 'Low': float(parts[4]), 'Close': float(parts[5]), 'Volume': float(parts[6]), 'Amount': float(parts[7]) if len(parts) > 7 else 0 }) except (ValueError, IndexError) as e: print(f"跳过异常行: {line[:50]}... 错误: {e}") continue if not data_list: raise ValueError("文本文件中没有解析到有效数据") print(f"成功解析 {len(data_list)} 条数据") data = pd.DataFrame(data_list) return self._process_dataframe(data, start_date, end_date) except Exception as e: print(f"解析文本文件失败: {e}") raise def _process_dataframe(self, data, start_date, end_date) -> pd.DataFrame: """处理和标准化数据框""" try: # 检查并转换列名 print(f"原始数据列名: {data.columns.tolist()}") print(f"原始数据行数: {len(data)}") # 标准化列名 column_mapping = { '时间': 'DateTime', '日期': 'DateTime', 'datetime': 'DateTime', 'time': 'DateTime', '开盘': 'Open', 'open': 'Open', 'Open': 'Open', '开盘价': 'Open', '收盘': 'Close', 'close': 'Close', 'Close': 'Close', '收盘价': 'Close', '最高': 'High', 'high': 'High', 'High': 'High', '最高价': 'High', '最低': 'Low', 'low': 'Low', 'Low': 'Low', '最低价': 'Low', '成交量': 'Volume', 'volume': 'Volume', 'Volume': 'Volume', 'vol': 'Volume' } # 重命名列 data.rename(columns=column_mapping, inplace=True) # 设置时间索引 if 'DateTime' in data.columns: data['DateTime'] = pd.to_datetime(data['DateTime']) data.set_index('DateTime', inplace=True) else: raise ValueError("数据中找不到时间列(DateTime/时间/日期)") data.sort_index(inplace=True) # 筛选时间范围 filtered_data = data[(data.index >= start_date) & (data.index <= end_date)].copy() if filtered_data.empty: print(f"警告:指定时间范围没有数据") print(f"可用数据范围: {data.index[0]} 到 {data.index[-1]}") print(f"请求的时间范围: {start_date} 到 {end_date}") # 返回空数据框而不是抛出异常 return filtered_data # 确保必需的列存在 required_columns = ['Open', 'High', 'Low', 'Close', 'Volume'] missing_columns = [col for col in required_columns if col not in filtered_data.columns] if missing_columns: raise ValueError(f"数据缺少必需的列: {missing_columns}") # 添加缺失的列 if 'Amount' not in filtered_data.columns: filtered_data['Amount'] = 0 # 计算基础指标(本地文件可能缺少这些) if 'Returns' not in filtered_data.columns: filtered_data['Returns'] = filtered_data['Close'].pct_change() if 'High_Low_Pct' not in filtered_data.columns: filtered_data['High_Low_Pct'] = (filtered_data['High'] - filtered_data['Low']) / filtered_data['Close'].shift(1) if 'Close_Open_Pct' not in filtered_data.columns: filtered_data['Close_Open_Pct'] = (filtered_data['Close'] - filtered_data['Open']) / filtered_data['Open'] # 处理缺失值 filtered_data = filtered_data.ffill() filtered_data = filtered_data.dropna() print(f"本地文件数据处理成功: {len(filtered_data)}条") print(f"数据范围: {filtered_data.index[0]} 到 {filtered_data.index[-1]}") return filtered_data except Exception as e: print(f"处理数据框失败: {e}") raise def _fetch_online_data(self, start_date, end_date) -> pd.DataFrame: """在线获取30分钟K线数据 - 东方财富优先,新浪财经备用""" data = None # ===== 方法1: 东方财富数据源(主要数据源) ===== try: print("[DATA_SOURCE_1] 正在使用东方财富30分钟K线接口...") data = ak.index_zh_a_hist_min_em(symbol=self.symbol, period="30") if not data.empty and len(data) >= 50: print(f"[SUCCESS] 东方财富获取到{len(data)}条30分钟数据") print(f"[TIME_RANGE] 数据范围: {data.index[0]} 到 {data.index[-1]}") # 检查数据时效性 latest_time = data.index[-1] current_time = datetime.now() if hasattr(latest_time, 'hour') and hasattr(latest_time, 'minute'): time_delay = current_time - latest_time print(f"[DATA_DELAY] 数据延迟: {time_delay}") else: print(f"[INFO] 数据索引类型: {type(latest_time)}") else: print(f"[FAIL] 东方财富数据不足或为空") except Exception as e: print(f"[ERROR] 东方财富数据源失败: {e}") # ===== 方法2: 新浪财经数据源(备用数据源) ===== if data is None or data.empty or len(data) < 50: try: print("[DATA_SOURCE_2] 正在使用新浪财经30分钟K线接口...") import requests import json import re symbol = "sz399673" url = f"https://quotes.sina.cn/cn/api/jsonp_v2.php/var_{symbol}_30_1768824839904=/CN_MarketDataService.getKLineData?symbol={symbol}&scale=30&ma=no&datalen=1023" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Referer': 'https://finance.sina.com.cn/' } response = requests.get(url, headers=headers, timeout=15) response_text = response.text # 解析JSONP响应 array_pattern = r'=([\[ ].+?\])' match = re.search(array_pattern, response_text) if match: json_str = match.group(1) else: json_start = response_text.find('[') json_end = response_text.rfind(']') + 1 if json_start >= 0 and json_end > json_start: json_str = response_text[json_start:json_end] else: raise Exception("无法解析JSONP响应") data_dict = json.loads(json_str) if data_dict and isinstance(data_dict, list): data_list = [] for item in data_dict: try: data_list.append({ 'DateTime': item.get('day'), 'Open': float(item.get('open', 0)), 'High': float(item.get('high', 0)), 'Low': float(item.get('low', 0)), 'Close': float(item.get('close', 0)), 'Volume': float(item.get('volume', 0)), 'Amount': 0 }) except Exception: continue if data_list: data = pd.DataFrame(data_list) print(f"[SUCCESS] 新浪财经获取到{len(data)}条30分钟数据") print(f"[TIME_RANGE] 数据范围: {data.index[0]} 到 {data.index[-1]}") else: print("[FAIL] 新浪财经数据解析失败") else: print("[FAIL] 新浪财经返回数据格式错误") except Exception as e: print(f"[ERROR] 新浪财经数据源失败: {e}") # ===== 数据验证和格式化 ===== if data is None or data.empty: raise ValueError("[FATAL_ERROR] 所有数据源均无法获取数据") # 重命名列(针对东方财富数据格式) data.rename(columns={ '时间': 'DateTime', '开盘': 'Open', '收盘': 'Close', '最高': 'High', '最低': 'Low', '成交量': 'Volume', '成交额': 'Amount', '振幅': 'Amplitude', '涨跌幅': 'Change_Pct', '涨跌额': 'Change_Amount', '换手率': 'Turnover' }, inplace=True) # 设置时间索引 if 'DateTime' in data.columns: data['DateTime'] = pd.to_datetime(data['DateTime']) data.set_index('DateTime', inplace=True) else: raise ValueError("[ERROR] 数据中找不到时间列(DateTime)") data.sort_index(inplace=True) # 筛选时间范围 filtered_data = data[(data.index >= start_date) & (data.index <= end_date)].copy() if filtered_data.empty: print(f"[WARNING] 筛选后数据为空,可用范围: {data.index[0]} 到 {data.index[-1]}") raise ValueError(f"[ERROR] 指定时间范围没有数据") # 计算基础指标 filtered_data['Returns'] = filtered_data['Close'].pct_change() filtered_data['High_Low_Pct'] = (filtered_data['High'] - filtered_data['Low']) / filtered_data['Close'].shift(1) filtered_data['Close_Open_Pct'] = (filtered_data['Close'] - filtered_data['Open']) / filtered_data['Open'] # 处理缺失值 filtered_data = filtered_data.ffill() filtered_data = filtered_data.dropna() print(f"[FINAL_DATA] 成功处理{len(filtered_data)}条数据") print(f"[FINAL_RANGE] 最终数据范围: {filtered_data.index[0]} 到 {filtered_data.index[-1]}") return filtered_data def calculate_intraday_indicators(self, data: pd.DataFrame) -> pd.DataFrame: """计算30分钟技术指标""" print("正在计算30分钟技术指标...") df = data.copy() # 短期移动平均线 df['MA6'] = df['Close'].rolling(window=6).mean() # 3小时 df['MA12'] = df['Close'].rolling(window=12).mean() # 6小时 df['MA24'] = df['Close'].rolling(window=24).mean() # 12小时(一天) # RSI delta = df['Close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss df['RSI'] = 100 - (100 / (1 + rs)) # 布林带 df['BB_middle'] = df['Close'].rolling(window=20).mean() bb_std = df['Close'].rolling(window=20).std() df['BB_upper'] = df['BB_middle'] + (bb_std * 2) df['BB_lower'] = df['BB_middle'] - (bb_std * 2) df['BB_width'] = (df['BB_upper'] - df['BB_lower']) / df['BB_middle'] # MACD exp1 = df['Close'].ewm(span=12, adjust=False).mean() exp2 = df['Close'].ewm(span=26, adjust=False).mean() df['MACD'] = exp1 - exp2 df['MACD_signal'] = df['MACD'].ewm(span=9, adjust=False).mean() df['MACD_hist'] = df['MACD'] - df['MACD_signal'] # KDJ low_9 = df['Low'].rolling(window=9).min() high_9 = df['High'].rolling(window=9).max() rsv = (df['Close'] - low_9) / (high_9 - low_9) * 100 df['K'] = rsv.ewm(com=2, adjust=False).mean() df['D'] = df['K'].ewm(com=2, adjust=False).mean() df['J'] = 3 * df['K'] - 2 * df['D'] # ATR high_low = df['High'] - df['Low'] high_close = abs(df['High'] - df['Close'].shift()) low_close = abs(df['Low'] - df['Close'].shift()) true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1) df['ATR'] = true_range.rolling(window=14).mean() df['ATR_Pct'] = df['ATR'] / df['Close'] # 动量指标 df['Momentum'] = df['Close'] / df['Close'].shift(4) - 1 # 2小时动量 # 成交量变化 df['Volume_MA'] = df['Volume'].rolling(window=12).mean() df['Volume_Ratio'] = df['Volume'] / df['Volume_MA'] # 价格动量 df['Price_Momentum'] = (df['Close'] - df['Close'].shift(6)) / df['Close'].shift(6) print("技术指标计算完成") return df # ==================== 只做多信号生成器 ==================== class LongOnlySignalGenerator: """只做多信号生成器(移除做空信号)""" def __init__(self): self.long_signal_count = 0 self.total_signal_count = 0 def generate_long_only_signals(self, data: pd.DataFrame) -> pd.DataFrame: """生成只做多信号""" print("正在生成只做多信号...") signals = [] df = data.copy() for i in range(24, len(df)): # 至少需要12小时(24个30分钟)的历史数据 current_bar = df.iloc[i] current_time = df.index[i] # 跳过不适合交易的时间段 if hasattr(current_time, 'hour'): # 有小时信息的30分钟数据 hour = current_time.hour if hour < 9 or hour > 15: # 只在交易时间内 continue # 生成基础信号数据 signal = { 'DateTime': str(current_time), 'Open': current_bar['Open'], 'High': current_bar['High'], 'Low': current_bar['Low'], 'Close': current_bar['Close'], 'Volume': current_bar['Volume'], 'RSI': current_bar['RSI'], 'MACD': current_bar['MACD'], 'MACD_hist': current_bar['MACD_hist'], 'K': current_bar['K'], 'D': current_bar['D'], 'J': current_bar['J'], 'ATR_Pct': current_bar['ATR_Pct'], 'Volume_Ratio': current_bar['Volume_Ratio'], 'Price_Momentum': current_bar['Price_Momentum'], 'Close_Open_Pct': current_bar['Close_Open_Pct'] } # 计算做多信号强度 long_score, long_signals = self._calculate_long_signals(current_bar, df, i) # 设置信号分数和描述 signal['Long_Score'] = long_score signal['Long_Signals'] = ', '.join(long_signals) if long_signals else '' # 决定最终信号方向和强度 final_signal = 0 signal_type = '' # 只做多信号判断 if long_score >= 4: final_signal = 1 signal_type = '做多翻转' self.long_signal_count += 1 self.total_signal_count = self.long_signal_count signal['Signal'] = final_signal signal['Signal_Type'] = signal_type signals.append(signal) signals_df = pd.DataFrame(signals) if len(signals_df) > 0: signals_df.set_index('DateTime', inplace=True) else: print("警告:没有生成任何信号") print(f"只做多信号生成完成") print(f"做多信号: {self.long_signal_count}个") if len(signals_df) > 0: print(f"信号密度: {self.total_signal_count/len(signals_df)*100:.2f}%") return signals_df def _calculate_long_signals(self, current_bar, df, i): """计算做多信号强度""" long_score = 0 long_signals = [] # 1. RSI超卖做多 if current_bar['RSI'] < 30: long_score += 2 long_signals.append("RSI超卖") elif current_bar['RSI'] < 35: long_score += 1 long_signals.append("RSI偏弱") # 2. KDJ超卖做多 if current_bar['K'] < 20 and current_bar['D'] < 20: long_score += 2 long_signals.append("KDJ超卖") elif current_bar['J'] < 0: long_score += 1 long_signals.append("KDJ极端超卖") # 3. MACD金叉 if current_bar['MACD_hist'] > 0 and df.iloc[i-1]['MACD_hist'] <= 0: long_score += 2 long_signals.append("MACD金叉") elif current_bar['MACD_hist'] > df.iloc[i-1]['MACD_hist']: long_score += 1 long_signals.append("MACD改善") # 4. 价格触及布林带下轨 if current_bar['Close'] <= current_bar['BB_lower'] * 1.005: long_score += 2 long_signals.append("触及下轨") elif current_bar['Close'] <= current_bar['BB_lower'] * 1.01: long_score += 1 long_signals.append("接近下轨") # 5. 连续下跌后的反转 recent_returns = df.iloc[i-6:i]['Returns'] if recent_returns.min() < -0.015: consecutive_decline = sum(recent_returns < 0) if consecutive_decline >= 4: long_score += 2 long_signals.append("连续下跌反转") # 6. 价格动量反转 if current_bar['Price_Momentum'] < -0.02: long_score += 1 long_signals.append("动量超卖") # 7. 成交量配合 if current_bar['Volume_Ratio'] > 1.2: long_score += 1 long_signals.append("放量配合") # 8. 当日开盘价格关系 try: daily_high = df[df.index.date == df.index[i].date()]['High'].max() daily_low = df[df.index.date == df.index[i].date()]['Low'].min() daily_range = daily_high - daily_low if daily_range > 0: position_in_day = (current_bar['Close'] - daily_low) / daily_range if position_in_day < 0.3: long_score += 1 long_signals.append("日内低位") except: pass # MA趋势过滤 if current_bar['MA6'] < current_bar['MA12'] < current_bar['MA24']: long_score -= 1 long_signals.append("MA下降趋势惩罚") elif current_bar['MA6'] > current_bar['MA12']: long_score += 1 long_signals.append("MA短期上行") return long_score, long_signals # ==================== T+1只做多交易执行器 ==================== class LongOnlyT1Executor: """只做多T+1交易执行器(当天买入,第二天才能卖出)""" def __init__(self, initial_capital=1000000): self.initial_capital = initial_capital self.params = { 'commission_rate': 0.0001, # 万分之一 'slippage_rate': 0.0, # 无滑点 'position_size_pct': 1.0, # 每次开仓100%仓位 'stop_loss_pct': 0.008, # 0.8%止损 'take_profit_pct': 0.02, # 2.0%止盈 'max_hold_bars': 16, # 最多持有8小时(16个30分钟) 'min_signal_strength': 4 # 最小信号强度 } def execute_long_only_t1_trades(self, signals_df: pd.DataFrame) -> tuple: """执行只做多T+1交易""" print("正在执行只做多T+1交易...") df = signals_df.copy() # 初始化 trades = [] capital = self.initial_capital # 持仓状态 long_position = 0 # 做多持仓数量 long_entry_price = 0 # 做多开仓价 long_entry_time = None # 做多开仓时间 long_entry_date = None # 做多开仓日期(用于T+1判断) long_holding_bars = 0 # 做多持仓周期 long_entry_signals = '' # 做多入场信号 long_t1_affected = False # 是否受到T+1限制影响 # 添加资金列 df = df.copy() df['capital'] = float(capital) df['long_position'] = 0 df['net_value'] = float(capital) for i in range(len(df)): current_time = pd.to_datetime(df.index[i]) current_bar = df.iloc[i] price = current_bar['Close'] current_date = current_time.date() # 更新当前净值 current_value = capital if long_position > 0: current_value += long_position * price df.iloc[i, df.columns.get_loc('net_value')] = current_value # 开仓逻辑 - 只在无持仓时开仓 if long_position == 0: # 做多开仓 if current_bar['Signal'] == 1: position_size = int((capital * self.params['position_size_pct']) / price) if position_size > 0: cost = position_size * price * (1 + self.params['commission_rate'] + self.params['slippage_rate']) if cost <= capital: long_position = position_size long_entry_price = price long_entry_time = current_time long_entry_date = current_date # 记录买入日期 long_entry_signals = current_bar.get('Long_Signals', '') long_holding_bars = 0 long_t1_affected = False # 重置T+1限制标记 capital -= cost # 计算预计止损止盈价格 long_stop_loss_price = long_entry_price * (1 - self.params['stop_loss_pct']) long_take_profit_price = long_entry_price * (1 + self.params['take_profit_pct']) df.iloc[i, df.columns.get_loc('long_position')] = long_position print(f"\n{'='*60}") print(f"[LONG_OPEN] 做多开仓信号 #{len(trades) + 1}") print(f"{'='*60}") print(f"开仓时间: {long_entry_time}") print(f"开仓日期: {long_entry_date} (T+1限制: 最早{long_entry_date + timedelta(days=1)}后可卖出)") print(f"开仓价格: {long_entry_price:.2f} 元") print(f"预计止损: {long_stop_loss_price:.2f} 元 (-{self.params['stop_loss_pct']*100:.1f}%)") print(f"预计止盈: {long_take_profit_price:.2f} 元 (+{self.params['take_profit_pct']*100:.1f}%)") print(f"持仓数量: {position_size} 股") print(f"开仓市值: {position_size * long_entry_price:,.2f} 元") print(f"交易成本: {cost:,.2f} 元") print(f"剩余资金: {capital:,.2f} 元") print(f"入场信号: {long_entry_signals}") print(f"总资产: {capital + position_size * long_entry_price:,.2f} 元") # 平仓逻辑 - 做多平仓(T+1限制:买入当天不能卖出) elif long_position > 0: long_holding_bars += 1 # T+1检查:买入当天不能卖出 is_t1_restricted = (current_date == long_entry_date) # 计算止损止盈价格 stop_loss = long_entry_price * (1 - self.params['stop_loss_pct']) take_profit = long_entry_price * (1 + self.params['take_profit_pct']) exit_signal = False exit_reason = '' exit_price = price # T+1限制:即使触发止损/止盈,当天也不能卖出 if is_t1_restricted: # 检查是否触发止损/止盈,但只记录不执行 if price <= stop_loss: print(f"[T+1限制] {current_time} 价格{price:.2f}触及止损线{stop_loss:.2f},但当天买入不能卖出,继续持有") long_t1_affected = True # 标记受到了T+1限制影响 elif price >= take_profit: print(f"[T+1限制] {current_time} 价格{price:.2f}触及止盈线{take_profit:.2f},但当天买入不能卖出,继续持有") long_t1_affected = True # 标记受到了T+1限制影响 else: # T+1已过,可以正常平仓 # 止损 if price <= stop_loss: exit_signal = True loss_pct = (long_entry_price - stop_loss) / long_entry_price * 100 exit_reason = f"做多止损触发(价格{price:.2f}跌破止损线{stop_loss:.2f},亏损{loss_pct:.2f}%)" exit_price = price # 止盈 elif price >= take_profit: exit_signal = True profit_pct = (price - long_entry_price) / long_entry_price * 100 exit_reason = f"做多止盈触发(价格{price:.2f}突破止盈线{take_profit:.2f},盈利{profit_pct:.2f}%)" exit_price = price # 最大持仓时间 elif long_holding_bars >= self.params['max_hold_bars']: exit_signal = True current_pnl_pct = (price - long_entry_price) / long_entry_price * 100 exit_reason = f"做多时间止损(持仓{long_holding_bars}周期达上限{self.params['max_hold_bars']}周期,当前盈亏{current_pnl_pct:+.2f}%)" # 做多信号消失 elif current_bar['RSI'] > 70: exit_signal = True current_pnl_pct = (price - long_entry_price) / long_entry_price * 100 exit_reason = f"做多RSI超买平仓(RSI={current_bar['RSI']:.1f}超买,信号消失,当前盈亏{current_pnl_pct:+.2f}%)" # 执行平仓 if exit_signal: # 计算盈亏 gross_pnl = (exit_price - long_entry_price) * long_position open_cost = long_position * long_entry_price * (self.params['commission_rate'] + self.params['slippage_rate']) close_revenue = long_position * exit_price close_cost = close_revenue * (self.params['commission_rate'] + self.params['slippage_rate']) pnl = gross_pnl - open_cost - close_cost # 更新资金 capital += close_revenue - close_cost # 记录交易 trade = { '交易方向': '做多', '开仓时间': long_entry_time, '平仓时间': current_time, '开仓价格': long_entry_price, '平仓价格': exit_price, '仓位': long_position, '盈亏金额': pnl, '盈亏百分比': (exit_price - long_entry_price) / long_entry_price * 100, '退出原因': exit_reason, '持仓周期数': long_holding_bars, '持仓小时数': long_holding_bars * 0.5, 'T+1限制': '是' if long_t1_affected else '否', '入场信号': long_entry_signals, '平仓时资金': capital, '开仓市值': long_position * long_entry_price, '预计止损价格': long_stop_loss_price, '预计止盈价格': long_take_profit_price } trades.append(trade) # 打印平仓详情 profit_ratio = (exit_price - long_entry_price) / long_entry_price * 100 status = "[PROFIT]" if pnl > 0 else "[LOSS]" print(f"\n{'='*60}") print(f"{status} [LONG_CLOSE] 做多平仓信号 #{len(trades)}") print(f"{'='*60}") print(f"平仓时间: {current_time}") print(f"平仓价格: {exit_price:.2f} 元") print(f"持仓时长: {long_holding_bars * 0.5:.1f} 小时 ({long_holding_bars} 个30分钟周期)") print(f"退出原因: {exit_reason}") print(f"{'-'*60}") print(f"盈亏金额: {pnl:+,.2f} 元") print(f"盈亏比例: {profit_ratio:+.2f}%") print(f"{'-'*60}") print(f"当前资金: {capital:,.2f} 元") print(f"累计收益率: {(capital / self.initial_capital - 1) * 100:+.2f}%") print(f"{'='*60}") # 重置做多持仓 long_position = 0 long_entry_price = 0 long_entry_time = None long_entry_date = None long_holding_bars = 0 # 更新资金和持仓状态 df.iloc[i, df.columns.get_loc('capital')] = capital df.iloc[i, df.columns.get_loc('long_position')] = long_position # 强制平仓剩余持仓 if long_position > 0: final_price = df.iloc[-1]['Close'] final_time = pd.to_datetime(df.index[-1]) gross_pnl = (final_price - long_entry_price) * long_position open_cost = long_position * long_entry_price * (self.params['commission_rate'] + self.params['slippage_rate']) close_revenue = long_position * final_price close_cost = close_revenue * (self.params['commission_rate'] + self.params['slippage_rate']) pnl = gross_pnl - open_cost - close_cost capital += close_revenue - close_cost trade = { '交易方向': '做多', '开仓时间': long_entry_time, '平仓时间': df.index[-1], '开仓价格': long_entry_price, '平仓价格': final_price, '仓位': long_position, '盈亏金额': pnl, '盈亏百分比': (final_price - long_entry_price) / long_entry_price * 100, '退出原因': f'做多强制平仓(回测结束,持仓{long_holding_bars}周期,最终价格{final_price:.2f},盈亏{(final_price - long_entry_price) / long_entry_price * 100:+.2f}%)', '持仓周期数': long_holding_bars, '持仓小时数': long_holding_bars * 0.5, 'T+1限制': '是' if long_t1_affected else '否', '入场信号': long_entry_signals, '平仓时资金': capital, '开仓市值': long_position * long_entry_price, '预计止损价格': long_stop_loss_price, '预计止盈价格': long_take_profit_price } trades.append(trade) print(f"\n{'='*60}") print(f"[FORCE] [LONG_CLOSE] 做多强制平仓信号 #{len(trades)}") print(f"{'='*60}") print(f"平仓时间: {df.index[-1]}") print(f"平仓价格: {final_price:.2f} 元") print(f"退出原因: 做多强制平仓(回测结束)") print(f"T+1限制: {'是' if long_t1_affected else '否'}") print(f"盈亏金额: {pnl:+,.2f} 元") print(f"{'='*60}") trades_df = pd.DataFrame(trades) if len(trades_df) > 0: # 统一时间格式 for col in trades_df.columns: if '时间' in col: trades_df[col] = pd.to_datetime(trades_df[col]) trades_df = trades_df.sort_values('开仓时间') print(f"只做多T+1交易执行完成,共{len(trades_df)}笔交易") return df, trades_df # ==================== 验证分析模块 ==================== def validate_long_only_t1_results(results_df, trades_df, initial_capital): """验证只做多T+1交易结果""" print("\n" + "=" * 80) print("创业板50 30分钟只做多T+1交易结果验证") print("=" * 80) print(f"\n【基础数据验证】") final_capital = results_df['net_value'].iloc[-1] total_return = (final_capital - initial_capital) / initial_capital * 100 print(f"初始资金: {initial_capital:,.2f}元") print(f"最终资金: {final_capital:,.2f}元") print(f"总收益率: {total_return:.2f}%") print(f"交易次数: {len(trades_df)}笔") if len(trades_df) > 0: # 只做多统计 long_trades = trades_df print(f"\n【只做多交易统计】") long_win_trades = long_trades[long_trades['盈亏金额'] > 0] long_lose_trades = long_trades[long_trades['盈亏金额'] < 0] long_total_pnl = long_trades['盈亏金额'].sum() print(f"总交易数: {len(long_trades)}笔") print(f"盈利交易: {len(long_win_trades)}笔 ({len(long_win_trades)/len(long_trades)*100:.1f}%)") print(f"亏损交易: {len(long_lose_trades)}笔 ({len(long_lose_trades)/len(long_trades)*100:.1f}%)") print(f"平均持仓时间: {long_trades['持仓小时数'].mean():.1f}小时") print(f"平均收益率: {long_trades['盈亏百分比'].mean():.2f}%") print(f"总盈亏: {long_total_pnl:+,.2f}元") # 按退出原因统计 print(f"\n【退出原因统计】") for reason, count in trades_df['退出原因'].value_counts().items(): percentage = count / len(trades_df) * 100 reason_pnl = trades_df[trades_df['退出原因'] == reason]['盈亏金额'].sum() print(f" {reason}: {count}次 ({percentage:.1f}%) - 总盈亏: {reason_pnl:+,.2f}元") # T+1影响统计 if 'T+1限制' in trades_df.columns: t1_affected = trades_df[trades_df['T+1限制'] == '是'] print(f"\n【T+1限制影响】") print(f"受T+1限制的交易: {len(t1_affected)}笔 ({len(t1_affected)/len(trades_df)*100:.1f}%)") if len(t1_affected) > 0: print(f"T+1限制交易盈亏: {t1_affected['盈亏金额'].sum():+,.2f}元") # ==================== 主程序 ==================== def main(): """主程序 - 运行30分钟只做多T+1策略""" print("=" * 80) print("创业板50 30分钟只做多T+1策略") print("=" * 80) # 加载配置文件 config_manager = ConfigManager('config.json') # 自动加载best_parameters.json覆盖默认参数 best_params_file = 'best_parameters.json' if os.path.exists(best_params_file): try: with open(best_params_file, 'r', encoding='utf-8') as f: best_data = json.load(f) best_params = best_data.get('best_params', {}) strategy_keys = ['position_size_pct', 'stop_loss_pct', 'take_profit_pct', 'max_hold_bars'] for key in strategy_keys: if key in best_params: config_manager.config.setdefault('strategy', {})[key] = best_params[key] print(f"已加载优化参数: {best_params_file}") except Exception as e: print(f"加载优化参数失败: {e},使用config.json默认参数") # 从配置文件读取参数 BACKTEST_START_DATE = config_manager.get('strategy', 'backtest_start_date', "2025-10-01") PREWARMP_DAYS = config_manager.get('strategy', 'prewamp_days', 30) INITIAL_CAPITAL = config_manager.get('strategy', 'initial_capital', 1000000) # 读取截止时间配置,支持"now"或具体日期 backtest_end_config = config_manager.get('strategy', 'backtest_end_date', "now") if backtest_end_config.lower() == "now": BACKTEST_END_DATE = datetime.now().strftime('%Y-%m-%d') else: BACKTEST_END_DATE = backtest_end_config # 转换日期格式 start_date = datetime.strptime(BACKTEST_START_DATE, "%Y-%m-%d") end_date = datetime.strptime(BACKTEST_END_DATE, "%Y-%m-%d").replace(hour=23, minute=59, second=59) # 计算数据获取开始时间(回测开始时间 - 预热期) data_start_date = start_date - timedelta(days=PREWARMP_DAYS) # 显示数据源配置 use_local_file = config_manager.get('data_source', 'use_local_file', False) data_source_mode = "本地文件模式" if use_local_file else "在线获取模式" local_file_path = config_manager.get('data_source', 'local_file_path', '') print(f"\n策略参数:") print(f" 回测期间: {BACKTEST_START_DATE} 至 {BACKTEST_END_DATE}") print(f" 数据获取期间: {data_start_date.strftime('%Y-%m-%d')} 至 {BACKTEST_END_DATE}") print(f" 指标预热期: {PREWARMP_DAYS}天") print(f" K线周期: 30分钟") print(f" 初始资金: {INITIAL_CAPITAL:,}元") print(f" 标的指数: 创业板50 (399673)") print(f" 交易方向: 只做多(T+1限制)") print(f" 数据源: {data_source_mode}") if use_local_file: print(f" 本地文件路径: {local_file_path}") try: # Phase 1: 数据获取 print(f"\n【Phase 1: 30分钟数据获取】") fetcher = IntradayDataFetcher(config_manager) # 获取包含预热期的完整数据 full_data = fetcher.fetch_30min_data(start_date=data_start_date, end_date=end_date) full_data = fetcher.calculate_intraday_indicators(full_data) # 筛选回测期间的数据 original_len = len(full_data) backtest_data = full_data[(full_data.index >= start_date) & (full_data.index <= end_date)].copy() print(f"筛选回测数据: {original_len} -> {len(backtest_data)} 条") print(f"回测数据范围: {backtest_data.index[0]} 到 {backtest_data.index[-1]}") # Phase 2: 信号生成 print(f"\n【Phase 2: 只做多信号生成】") signal_gen = LongOnlySignalGenerator() signals_df = signal_gen.generate_long_only_signals(backtest_data) # Phase 3: 交易执行 print(f"\n【Phase 3: 只做多T+1交易执行】") executor = LongOnlyT1Executor(initial_capital=INITIAL_CAPITAL) # 用配置参数覆盖默认值 executor.params['stop_loss_pct'] = config_manager.get('strategy', 'stop_loss_pct', 0.008) executor.params['take_profit_pct'] = config_manager.get('strategy', 'take_profit_pct', 0.02) executor.params['max_hold_bars'] = config_manager.get('strategy', 'max_hold_bars', 16) executor.params['position_size_pct'] = config_manager.get('strategy', 'position_size_pct', 1.0) results_df, trades_df = executor.execute_long_only_t1_trades(signals_df) # Phase 4: 验证分析 print(f"\n【Phase 4: 结果验证与分析】") validate_long_only_t1_results(results_df, trades_df, INITIAL_CAPITAL) # Phase 5: 导出数据 if len(trades_df) > 0: print(f"\n【Phase 5: 导出交易数据】") # 统一时间格式 for col in trades_df.columns: if '时间' in col: trades_df[col] = pd.to_datetime(trades_df[col]).dt.strftime('%Y-%m-%d %H:%M:%S') # 生成带时间戳的文件名 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_file = f'cyb50_30min_long_only_t1_trades_{timestamp}.csv' trades_df.to_csv(output_file, index=False, encoding='utf-8-sig') print(f"只做多T+1交易记录已保存到: {output_file}") print(f"时间戳格式: YYYY-MM-DD HH:MM:SS") # 策略总结 print(f"\n" + "=" * 80) print("只做多T+1策略运行总结") print("=" * 80) if len(trades_df) > 0: final_capital = results_df['net_value'].iloc[-1] total_return = (final_capital - INITIAL_CAPITAL) / INITIAL_CAPITAL * 100 print(f"初始资金: {INITIAL_CAPITAL:,.2f}元") print(f"最终资金: {final_capital:,.2f}元") print(f"总收益率: {total_return:.2f}%") print(f"总交易次数: {len(trades_df)}笔") print(f"整体胜率: {(trades_df['盈亏金额'] > 0).sum() / len(trades_df) * 100:.1f}%") print(f"平均收益率: {trades_df['盈亏百分比'].mean():.2f}%") print(f"最大单笔盈利: {trades_df['盈亏金额'].max():+,.2f}元") print(f"最大单笔亏损: {trades_df['盈亏金额'].min():+,.2f}元") print(f"\n[SUCCESS] 只做多T+1策略运行成功!") else: print("未产生任何交易信号") except Exception as e: print(f"\n[ERROR] 只做多T+1策略运行出错: {str(e)}") import traceback traceback.print_exc() finally: print(f"\n" + "=" * 80) if __name__ == "__main__": main()