| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393 |
- import pandas as pd
- import numpy as np
- import akshare as ak
- import warnings
- import json
- import os
- from datetime import datetime, timedelta
- warnings.filterwarnings('ignore')
- # ==================== 配置管理模块 ====================
- class ConfigManager:
- """配置文件管理类"""
-
- def __init__(self, config_file='config.json'):
- self.config_file = config_file
- self.config = self.load_config()
-
- def load_config(self):
- """加载配置文件"""
- try:
- if os.path.exists(self.config_file):
- with open(self.config_file, 'r', encoding='utf-8') as f:
- config = json.load(f)
- print(f"配置文件加载成功: {self.config_file}")
- return config
- else:
- print(f"配置文件不存在,使用默认配置: {self.config_file}")
- return self.get_default_config()
- except Exception as e:
- print(f"配置文件加载失败: {e},使用默认配置")
- return self.get_default_config()
-
- def get_default_config(self):
- """获取默认配置"""
- return {
- "data_source": {
- "use_local_file": False,
- "local_file_path": "D:\\work\\project\\catfly\\data\\SZ#399673_30min.csv"
- },
- "strategy": {
- "initial_capital": 1000000,
- "backtest_start_date": "2025-10-01",
- "prewamp_days": 30,
- "position_size_pct": 1.0,
- "stop_loss_pct": 0.008,
- "take_profit_pct": 0.015,
- "max_hold_bars": 16
- }
- }
-
- def get(self, section, key, default=None):
- """获取配置项"""
- try:
- return self.config.get(section, {}).get(key, default)
- except:
- return default
-
- def save_config(self):
- """保存配置到文件"""
- try:
- with open(self.config_file, 'w', encoding='utf-8') as f:
- json.dump(self.config, f, indent=2, ensure_ascii=False)
- print(f"配置文件保存成功: {self.config_file}")
- except Exception as e:
- print(f"配置文件保存失败: {e}")
- # ==================== 数据获取模块 ====================
- class IntradayDataFetcher:
- """30分钟K线数据获取类"""
-
- def __init__(self, config_manager=None):
- self.symbol = "399673" # 创业板50指数
- self.config_manager = config_manager
-
- def fetch_30min_data(self, start_date=None, end_date=None) -> pd.DataFrame:
- """获取指定时间范围的30分钟K线数据"""
- try:
- if start_date is None:
- start_date = datetime.now() - timedelta(days=60)
- if end_date is None:
- end_date = datetime.now()
-
- print(f"正在获取创业板50指数的30分钟K线数据...")
- print(f"时间范围: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}")
-
- # 检查数据源开关
- use_local_file = False
- local_file_path = ""
-
- if self.config_manager:
- use_local_file = self.config_manager.get('data_source', 'use_local_file', False)
- local_file_path = self.config_manager.get('data_source', 'local_file_path', '')
-
- # 如果开关打开,从本地文件读取
- if use_local_file:
- print(f"数据源开关: 本地文件模式")
- print(f"本地文件路径: {local_file_path}")
- return self._load_local_file(local_file_path, start_date, end_date)
- else:
- print(f"数据源开关: 在线获取模式")
- return self._fetch_online_data(start_date, end_date)
-
- except Exception as e:
- print(f"获取数据时出错: {str(e)}")
- raise
-
- def _load_local_file(self, file_path, start_date, end_date) -> pd.DataFrame:
- """从本地文件加载数据"""
- try:
- if not os.path.exists(file_path):
- raise FileNotFoundError(f"本地文件不存在: {file_path}")
-
- print(f"正在从本地文件读取数据...")
- print(f"文件路径: {file_path}")
-
- # 检查文件扩展名,选择不同的读取方式
- if file_path.endswith('.txt'):
- # 处理文本格式文件
- print("检测到文本格式文件,使用文本解析模式...")
- return self._parse_text_file(file_path, start_date, end_date)
- else:
- # 处理CSV格式文件
- print("检测到CSV格式文件,使用CSV解析模式...")
- data = pd.read_csv(file_path)
- return self._process_dataframe(data, start_date, end_date)
-
- except Exception as e:
- print(f"从本地文件加载数据失败: {e}")
- raise
-
- def _parse_text_file(self, file_path, start_date, end_date) -> pd.DataFrame:
- """解析文本格式的数据文件"""
- try:
- data_list = []
-
- # 尝试多种编码格式
- encodings = ['gbk', 'gb2312', 'utf-8', 'latin-1']
- lines = None
-
- for encoding in encodings:
- try:
- with open(file_path, 'r', encoding=encoding) as f:
- lines = f.readlines()
- print(f"成功使用编码: {encoding}")
- break
- except UnicodeDecodeError:
- continue
-
- if lines is None:
- raise ValueError("无法读取文件,尝试了多种编码格式都失败")
-
- # 跳过前两行(标题行)
- for line in lines[2:]:
- line = line.strip()
- if not line:
- continue
-
- parts = line.split()
- if len(parts) >= 7:
- try:
- # 格式: 日期 时间 开盘 最高 最低 收盘 成交量 成交额
- date_time_str = f"{parts[0]} {parts[1]}"
- datetime_obj = pd.to_datetime(date_time_str, format='%Y/%m/%d %H%M')
-
- data_list.append({
- 'DateTime': datetime_obj,
- 'Open': float(parts[2]),
- 'High': float(parts[3]),
- 'Low': float(parts[4]),
- 'Close': float(parts[5]),
- 'Volume': float(parts[6]),
- 'Amount': float(parts[7]) if len(parts) > 7 else 0
- })
- except (ValueError, IndexError) as e:
- print(f"跳过异常行: {line[:50]}... 错误: {e}")
- continue
-
- if not data_list:
- raise ValueError("文本文件中没有解析到有效数据")
-
- print(f"成功解析 {len(data_list)} 条数据")
- data = pd.DataFrame(data_list)
- return self._process_dataframe(data, start_date, end_date)
-
- except Exception as e:
- print(f"解析文本文件失败: {e}")
- raise
-
- def _process_dataframe(self, data, start_date, end_date) -> pd.DataFrame:
- """处理和标准化数据框"""
- try:
- # 检查并转换列名
- print(f"原始数据列名: {data.columns.tolist()}")
- print(f"原始数据行数: {len(data)}")
-
- # 标准化列名
- column_mapping = {
- '时间': 'DateTime', '日期': 'DateTime', 'datetime': 'DateTime', 'time': 'DateTime',
- '开盘': 'Open', 'open': 'Open', 'Open': 'Open', '开盘价': 'Open',
- '收盘': 'Close', 'close': 'Close', 'Close': 'Close', '收盘价': 'Close',
- '最高': 'High', 'high': 'High', 'High': 'High', '最高价': 'High',
- '最低': 'Low', 'low': 'Low', 'Low': 'Low', '最低价': 'Low',
- '成交量': 'Volume', 'volume': 'Volume', 'Volume': 'Volume', 'vol': 'Volume'
- }
-
- # 重命名列
- data.rename(columns=column_mapping, inplace=True)
-
- # 设置时间索引
- if 'DateTime' in data.columns:
- data['DateTime'] = pd.to_datetime(data['DateTime'])
- data.set_index('DateTime', inplace=True)
- else:
- raise ValueError("数据中找不到时间列(DateTime/时间/日期)")
-
- data.sort_index(inplace=True)
-
- # 筛选时间范围
- filtered_data = data[(data.index >= start_date) & (data.index <= end_date)].copy()
-
- if filtered_data.empty:
- print(f"警告:指定时间范围没有数据")
- print(f"可用数据范围: {data.index[0]} 到 {data.index[-1]}")
- print(f"请求的时间范围: {start_date} 到 {end_date}")
- # 返回空数据框而不是抛出异常
- return filtered_data
-
- # 确保必需的列存在
- required_columns = ['Open', 'High', 'Low', 'Close', 'Volume']
- missing_columns = [col for col in required_columns if col not in filtered_data.columns]
- if missing_columns:
- raise ValueError(f"数据缺少必需的列: {missing_columns}")
-
- # 添加缺失的列
- if 'Amount' not in filtered_data.columns:
- filtered_data['Amount'] = 0
-
- # 计算基础指标(本地文件可能缺少这些)
- if 'Returns' not in filtered_data.columns:
- filtered_data['Returns'] = filtered_data['Close'].pct_change()
- if 'High_Low_Pct' not in filtered_data.columns:
- filtered_data['High_Low_Pct'] = (filtered_data['High'] - filtered_data['Low']) / filtered_data['Close'].shift(1)
- if 'Close_Open_Pct' not in filtered_data.columns:
- filtered_data['Close_Open_Pct'] = (filtered_data['Close'] - filtered_data['Open']) / filtered_data['Open']
-
- # 处理缺失值
- filtered_data.ffill(inplace=True)
- filtered_data.dropna(inplace=True)
- print(f"本地文件数据处理成功: {len(filtered_data)}条")
- print(f"数据范围: {filtered_data.index[0]} 到 {filtered_data.index[-1]}")
-
- return filtered_data
-
- except Exception as e:
- print(f"处理数据框失败: {e}")
- raise
-
- def _fetch_online_data(self, start_date, end_date) -> pd.DataFrame:
- """在线获取30分钟K线数据 - 东方财富优先,新浪财经备用"""
- data = None
-
- # ===== 方法1: 东方财富数据源(主要数据源) =====
- try:
- print("[DATA_SOURCE_1] 正在使用东方财富30分钟K线接口...")
- data = ak.index_zh_a_hist_min_em(symbol=self.symbol, period="30")
-
- if not data.empty and len(data) >= 50:
- print(f"[SUCCESS] 东方财富获取到{len(data)}条30分钟数据")
- print(f"[TIME_RANGE] 数据范围: {data.index[0]} 到 {data.index[-1]}")
-
- # 检查数据时效性
- latest_time = data.index[-1]
- current_time = datetime.now()
- if hasattr(latest_time, 'hour') and hasattr(latest_time, 'minute'):
- time_delay = current_time - latest_time
- print(f"[DATA_DELAY] 数据延迟: {time_delay}")
- else:
- print(f"[INFO] 数据索引类型: {type(latest_time)}")
-
- else:
- print(f"[FAIL] 东方财富数据不足或为空")
-
- except Exception as e:
- print(f"[ERROR] 东方财富数据源失败: {e}")
-
- # ===== 方法2: 新浪财经数据源(备用数据源) =====
- if data is None or data.empty or len(data) < 50:
- try:
- print("[DATA_SOURCE_2] 正在使用新浪财经30分钟K线接口...")
- import requests
- import json
- import re
-
- symbol = "sz399673"
- url = f"https://quotes.sina.cn/cn/api/jsonp_v2.php/var_{symbol}_30_1768824839904=/CN_MarketDataService.getKLineData?symbol={symbol}&scale=30&ma=no&datalen=1023"
-
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
- 'Referer': 'https://finance.sina.com.cn/'
- }
-
- response = requests.get(url, headers=headers, timeout=15)
- response_text = response.text
-
- # 解析JSONP响应
- array_pattern = r'=([\[ ].+?\])'
- match = re.search(array_pattern, response_text)
-
- if match:
- json_str = match.group(1)
- else:
- json_start = response_text.find('[')
- json_end = response_text.rfind(']') + 1
- if json_start >= 0 and json_end > json_start:
- json_str = response_text[json_start:json_end]
- else:
- raise Exception("无法解析JSONP响应")
-
- data_dict = json.loads(json_str)
-
- if data_dict and isinstance(data_dict, list):
- data_list = []
- for item in data_dict:
- try:
- data_list.append({
- 'DateTime': item.get('day'),
- 'Open': float(item.get('open', 0)),
- 'High': float(item.get('high', 0)),
- 'Low': float(item.get('low', 0)),
- 'Close': float(item.get('close', 0)),
- 'Volume': float(item.get('volume', 0)),
- 'Amount': 0
- })
- except Exception:
- continue
-
- if data_list:
- data = pd.DataFrame(data_list)
- print(f"[SUCCESS] 新浪财经获取到{len(data)}条30分钟数据")
- print(f"[TIME_RANGE] 数据范围: {data.index[0]} 到 {data.index[-1]}")
- else:
- print("[FAIL] 新浪财经数据解析失败")
- else:
- print("[FAIL] 新浪财经返回数据格式错误")
-
- except Exception as e:
- print(f"[ERROR] 新浪财经数据源失败: {e}")
-
- # ===== 数据验证和格式化 =====
- if data is None or data.empty:
- raise ValueError("[FATAL_ERROR] 所有数据源均无法获取数据")
-
- # 重命名列(针对东方财富数据格式)
- data.rename(columns={
- '时间': 'DateTime', '开盘': 'Open', '收盘': 'Close',
- '最高': 'High', '最低': 'Low', '成交量': 'Volume',
- '成交额': 'Amount', '振幅': 'Amplitude', '涨跌幅': 'Change_Pct',
- '涨跌额': 'Change_Amount', '换手率': 'Turnover'
- }, inplace=True)
-
- # 设置时间索引
- if 'DateTime' in data.columns:
- data['DateTime'] = pd.to_datetime(data['DateTime'])
- data.set_index('DateTime', inplace=True)
- else:
- raise ValueError("[ERROR] 数据中找不到时间列(DateTime)")
-
- data.sort_index(inplace=True)
-
- # 筛选时间范围
- filtered_data = data[(data.index >= start_date) & (data.index <= end_date)].copy()
-
- if filtered_data.empty:
- print(f"[WARNING] 筛选后数据为空,可用范围: {data.index[0]} 到 {data.index[-1]}")
- raise ValueError(f"[ERROR] 指定时间范围没有数据")
-
- # 计算基础指标
- filtered_data['Returns'] = filtered_data['Close'].pct_change()
- filtered_data['High_Low_Pct'] = (filtered_data['High'] - filtered_data['Low']) / filtered_data['Close'].shift(1)
- filtered_data['Close_Open_Pct'] = (filtered_data['Close'] - filtered_data['Open']) / filtered_data['Open']
-
- # 处理缺失值
- filtered_data.ffill(inplace=True)
- filtered_data.dropna(inplace=True)
- print(f"[FINAL_DATA] 成功处理{len(filtered_data)}条数据")
- print(f"[FINAL_RANGE] 最终数据范围: {filtered_data.index[0]} 到 {filtered_data.index[-1]}")
-
- return filtered_data
-
- def calculate_intraday_indicators(self, data: pd.DataFrame) -> pd.DataFrame:
- """计算30分钟技术指标"""
- print("正在计算30分钟技术指标...")
- df = data.copy()
-
- # 短期移动平均线
- df['MA6'] = df['Close'].rolling(window=6).mean() # 3小时
- df['MA12'] = df['Close'].rolling(window=12).mean() # 6小时
- df['MA24'] = df['Close'].rolling(window=24).mean() # 12小时(一天)
-
- # RSI
- delta = df['Close'].diff()
- gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
- loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
- rs = gain / loss
- df['RSI'] = 100 - (100 / (1 + rs))
-
- # 布林带
- df['BB_middle'] = df['Close'].rolling(window=20).mean()
- bb_std = df['Close'].rolling(window=20).std()
- df['BB_upper'] = df['BB_middle'] + (bb_std * 2)
- df['BB_lower'] = df['BB_middle'] - (bb_std * 2)
- df['BB_width'] = (df['BB_upper'] - df['BB_lower']) / df['BB_middle']
-
- # MACD
- exp1 = df['Close'].ewm(span=12, adjust=False).mean()
- exp2 = df['Close'].ewm(span=26, adjust=False).mean()
- df['MACD'] = exp1 - exp2
- df['MACD_signal'] = df['MACD'].ewm(span=9, adjust=False).mean()
- df['MACD_hist'] = df['MACD'] - df['MACD_signal']
-
- # KDJ
- low_9 = df['Low'].rolling(window=9).min()
- high_9 = df['High'].rolling(window=9).max()
- rsv = (df['Close'] - low_9) / (high_9 - low_9) * 100
- df['K'] = rsv.ewm(com=2, adjust=False).mean()
- df['D'] = df['K'].ewm(com=2, adjust=False).mean()
- df['J'] = 3 * df['K'] - 2 * df['D']
-
- # ATR
- high_low = df['High'] - df['Low']
- high_close = abs(df['High'] - df['Close'].shift())
- low_close = abs(df['Low'] - df['Close'].shift())
- true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
- df['ATR'] = true_range.rolling(window=14).mean()
- df['ATR_Pct'] = df['ATR'] / df['Close']
-
- # 动量指标
- df['Momentum'] = df['Close'] / df['Close'].shift(4) - 1 # 2小时动量
-
- # 成交量变化
- df['Volume_MA'] = df['Volume'].rolling(window=12).mean()
- df['Volume_Ratio'] = df['Volume'] / df['Volume_MA']
-
- # 价格动量
- df['Price_Momentum'] = (df['Close'] - df['Close'].shift(6)) / df['Close'].shift(6)
-
- print("技术指标计算完成")
- return df
- # ==================== 多空双向信号生成器 ====================
- class DualDirectionSignalGenerator:
- """多空双向信号生成器"""
-
- def __init__(self):
- self.long_signal_count = 0
- self.short_signal_count = 0
- self.total_signal_count = 0
-
- def generate_dual_direction_signals(self, data: pd.DataFrame) -> pd.DataFrame:
- """生成多空双向信号"""
- print("正在生成多空双向信号...")
-
- signals = []
- df = data.copy()
-
- for i in range(24, len(df)): # 至少需要12小时(24个30分钟)的历史数据
- current_bar = df.iloc[i]
- current_time = df.index[i]
-
- # 跳过不适合交易的时间段
- if hasattr(current_time, 'hour'): # 有小时信息的30分钟数据
- hour = current_time.hour
- if hour < 9 or hour > 15: # 只在交易时间内
- continue
-
- # 生成基础信号数据
- signal = {
- 'DateTime': str(current_time),
- 'Open': current_bar['Open'],
- 'High': current_bar['High'],
- 'Low': current_bar['Low'],
- 'Close': current_bar['Close'],
- 'Volume': current_bar['Volume'],
- 'RSI': current_bar['RSI'],
- 'MACD': current_bar['MACD'],
- 'MACD_hist': current_bar['MACD_hist'],
- 'K': current_bar['K'],
- 'D': current_bar['D'],
- 'J': current_bar['J'],
- 'ATR_Pct': current_bar['ATR_Pct'],
- 'Volume_Ratio': current_bar['Volume_Ratio'],
- 'Price_Momentum': current_bar['Price_Momentum'],
- 'Close_Open_Pct': current_bar['Close_Open_Pct']
- }
-
- # 计算做多信号强度
- long_score, long_signals = self._calculate_long_signals(current_bar, df, i)
-
- # 计算做空信号强度
- short_score, short_signals = self._calculate_short_signals(current_bar, df, i)
-
- # 设置信号分数和描述
- signal['Long_Score'] = long_score
- signal['Long_Signals'] = ', '.join(long_signals) if long_signals else ''
- signal['Short_Score'] = short_score
- signal['Short_Signals'] = ', '.join(short_signals) if short_signals else ''
-
- # 决定最终信号方向和强度
- final_signal = 0
- signal_type = ''
-
- # 信号优先级和冲突处理
- if long_score >= 4 and short_score >= 4:
- # 两个方向都达到阈值,选择信号强度更高的
- if long_score > short_score:
- final_signal = 1
- signal_type = f'做多翻转(强度{long_score} vs {short_score})'
- self.long_signal_count += 1
- elif short_score > long_score:
- final_signal = -1
- signal_type = f'做空反转(强度{short_score} vs {long_score})'
- self.short_signal_count += 1
- else:
- # 强度相等时,根据当前价格位置决定
- bb_position = (current_bar['Close'] - current_bar['BB_lower']) / (current_bar['BB_upper'] - current_bar['BB_lower'])
- if bb_position < 0.3: # 偏向下轨,优先做多
- final_signal = 1
- signal_type = f'做多翻转(位置优先)'
- self.long_signal_count += 1
- elif bb_position > 0.7: # 偏向上轨,优先做空
- final_signal = -1
- signal_type = f'做空反转(位置优先)'
- self.short_signal_count += 1
- else:
- # 中间位置,暂不开仓
- final_signal = 0
- signal_type = '信号冲突(强度相等)'
-
- elif long_score >= 4:
- final_signal = 1
- signal_type = '做多翻转'
- self.long_signal_count += 1
-
- elif short_score >= 4:
- final_signal = -1
- signal_type = '做空反转'
- self.short_signal_count += 1
-
- self.total_signal_count = self.long_signal_count + self.short_signal_count
-
- signal['Signal'] = final_signal
- signal['Signal_Type'] = signal_type
-
- signals.append(signal)
-
- signals_df = pd.DataFrame(signals)
-
- if len(signals_df) > 0:
- signals_df.set_index('DateTime', inplace=True)
- else:
- print("警告:没有生成任何信号")
-
- print(f"多空双向信号生成完成")
- print(f"做多信号: {self.long_signal_count}个")
- print(f"做空信号: {self.short_signal_count}个")
- print(f"总信号: {self.total_signal_count}个")
- if len(signals_df) > 0:
- print(f"信号密度: {self.total_signal_count/len(signals_df)*100:.2f}%")
-
- return signals_df
-
- def _calculate_long_signals(self, current_bar, df, i):
- """计算做多信号强度"""
- long_score = 0
- long_signals = []
-
- # 1. RSI超卖做多
- if current_bar['RSI'] < 30:
- long_score += 2
- long_signals.append("RSI超卖")
- elif current_bar['RSI'] < 35:
- long_score += 1
- long_signals.append("RSI偏弱")
-
- # 2. KDJ超卖做多
- if current_bar['K'] < 20 and current_bar['D'] < 20:
- long_score += 2
- long_signals.append("KDJ超卖")
- elif current_bar['J'] < 0:
- long_score += 1
- long_signals.append("KDJ极端超卖")
-
- # 3. MACD金叉
- if current_bar['MACD_hist'] > 0 and df.iloc[i-1]['MACD_hist'] <= 0:
- long_score += 2
- long_signals.append("MACD金叉")
- elif current_bar['MACD_hist'] > df.iloc[i-1]['MACD_hist']:
- long_score += 1
- long_signals.append("MACD改善")
-
- # 4. 价格触及布林带下轨
- if current_bar['Close'] <= current_bar['BB_lower'] * 1.005:
- long_score += 2
- long_signals.append("触及下轨")
- elif current_bar['Close'] <= current_bar['BB_lower'] * 1.01:
- long_score += 1
- long_signals.append("接近下轨")
-
- # 5. 连续下跌后的反转
- recent_returns = df.iloc[i-6:i]['Returns']
- if recent_returns.min() < -0.015:
- consecutive_decline = sum(recent_returns < 0)
- if consecutive_decline >= 4:
- long_score += 2
- long_signals.append("连续下跌反转")
-
- # 6. 价格动量反转
- if current_bar['Price_Momentum'] < -0.02:
- long_score += 1
- long_signals.append("动量超卖")
-
- # 7. 成交量配合
- if current_bar['Volume_Ratio'] > 1.2:
- long_score += 1
- long_signals.append("放量配合")
-
- # 8. 当日开盘价格关系
- try:
- daily_high = df[df.index.date == df.index[i].date()]['High'].max()
- daily_low = df[df.index.date == df.index[i].date()]['Low'].min()
- daily_range = daily_high - daily_low
-
- if daily_range > 0:
- position_in_day = (current_bar['Close'] - daily_low) / daily_range
- if position_in_day < 0.3:
- long_score += 1
- long_signals.append("日内低位")
- except:
- pass
- # MA趋势过滤
- if current_bar['MA6'] < current_bar['MA12'] < current_bar['MA24']:
- long_score -= 1
- long_signals.append("MA下降趋势惩罚")
- elif current_bar['MA6'] > current_bar['MA12']:
- long_score += 1
- long_signals.append("MA短期上行")
- return long_score, long_signals
- def _calculate_short_signals(self, current_bar, df, i):
- """计算做空信号强度"""
- short_score = 0
- short_signals = []
-
- # 1. RSI超买做空
- if current_bar['RSI'] > 70:
- short_score += 2
- short_signals.append("RSI超买")
- elif current_bar['RSI'] > 65:
- short_score += 1
- short_signals.append("RSI偏强")
-
- # 2. KDJ超买做空
- if current_bar['K'] > 80 and current_bar['D'] > 80:
- short_score += 2
- short_signals.append("KDJ超买")
- elif current_bar['J'] > 100:
- short_score += 1
- short_signals.append("KDJ极端超买")
-
- # 3. MACD死叉
- if current_bar['MACD_hist'] < 0 and df.iloc[i-1]['MACD_hist'] >= 0:
- short_score += 2
- short_signals.append("MACD死叉")
- elif current_bar['MACD_hist'] < df.iloc[i-1]['MACD_hist']:
- short_score += 1
- short_signals.append("MACD恶化")
-
- # 4. 价格触及布林带上轨
- if current_bar['Close'] >= current_bar['BB_upper'] * 0.995:
- short_score += 2
- short_signals.append("触及上轨")
- elif current_bar['Close'] >= current_bar['BB_upper'] * 0.99:
- short_score += 1
- short_signals.append("接近上轨")
-
- # 5. 连续上涨后的反转
- recent_returns = df.iloc[i-6:i]['Returns']
- if recent_returns.max() > 0.015:
- consecutive_rise = sum(recent_returns > 0)
- if consecutive_rise >= 4:
- short_score += 2
- short_signals.append("连续上涨反转")
-
- # 6. 价格动量反转
- if current_bar['Price_Momentum'] > 0.02:
- short_score += 1
- short_signals.append("动量超买")
-
- # 7. 成交量配合
- if current_bar['Volume_Ratio'] > 1.2:
- short_score += 1
- short_signals.append("放量配合")
-
- # 8. 当日开盘价格关系
- try:
- daily_high = df[df.index.date == df.index[i].date()]['High'].max()
- daily_low = df[df.index.date == df.index[i].date()]['Low'].min()
- daily_range = daily_high - daily_low
-
- if daily_range > 0:
- position_in_day = (current_bar['Close'] - daily_low) / daily_range
- if position_in_day > 0.7:
- short_score += 1
- short_signals.append("日内高位")
- except:
- pass
- # MA趋势过滤
- if current_bar['MA6'] > current_bar['MA12'] > current_bar['MA24']:
- short_score -= 1
- short_signals.append("MA上升趋势惩罚")
- elif current_bar['MA6'] < current_bar['MA12']:
- short_score += 1
- short_signals.append("MA短期下行")
- return short_score, short_signals
- # ==================== 多空双向交易执行器 ====================
- class DualDirectionExecutor:
- """多空双向交易执行器"""
-
- def __init__(self, initial_capital=1000000):
- self.initial_capital = initial_capital
- self.params = {
- 'commission_rate': 0.0, # 无手续费
- 'slippage_rate': 0.0, # 无滑点
- 'position_size_pct': 1.0, # 每次开仓100%仓位
- 'stop_loss_pct': 0.008, # 0.8%止损
- 'take_profit_pct': 0.02, # 2.0%止盈
- 'max_hold_bars': 16, # 最多持有8小时(16个30分钟)
- 'min_signal_strength': 4 # 最小信号强度
- }
-
- def execute_dual_direction_trades(self, signals_df: pd.DataFrame) -> tuple:
- """执行多空双向交易"""
- print("正在执行多空双向交易...")
-
- df = signals_df.copy()
-
- # 初始化
- trades = []
- capital = self.initial_capital
-
- # 持仓状态
- long_position = 0 # 做多持仓数量
- short_position = 0 # 做空持仓数量
- long_entry_price = 0 # 做多开仓价
- short_entry_price = 0 # 做空开仓价
- long_entry_time = None # 做多开仓时间
- short_entry_time = None # 做空开仓时间
- long_holding_bars = 0 # 做多持仓周期
- short_holding_bars = 0 # 做空持仓周期
- long_entry_signals = '' # 做多入场信号
- short_entry_signals = '' # 做空入场信号
-
- # 添加资金列
- df = df.copy()
- df['capital'] = float(capital) # 确保是浮点数
- df['long_position'] = 0
- df['short_position'] = 0
- df['net_value'] = float(capital) # 确保是浮点数
-
- for i in range(len(df)):
- current_time = df.index[i]
- current_bar = df.iloc[i]
- price = current_bar['Close']
-
- # 更新当前净值
- current_value = capital
- if long_position > 0:
- current_value += long_position * price
- if short_position < 0:
- # 做空盈亏
- short_pnl = (short_entry_price - price) * abs(short_position)
- margin_held = abs(short_position) * short_entry_price
- current_value += margin_held + short_pnl
-
- df.iloc[i, df.columns.get_loc('net_value')] = current_value
-
- # 开仓逻辑 - 只在无持仓时开仓
- if long_position == 0 and short_position == 0:
- # 做多开仓
- if current_bar['Signal'] == 1:
- position_size = int((capital * self.params['position_size_pct']) / price)
- if position_size > 0:
- cost = position_size * price * (1 + self.params['commission_rate'] + self.params['slippage_rate'])
-
- if cost <= capital:
- long_position = position_size
- long_entry_price = price
- long_entry_time = current_time
- long_entry_signals = current_bar.get('Long_Signals', '')
- long_holding_bars = 0
- capital -= cost
- # 计算预计止损止盈价格
- long_stop_loss_price = long_entry_price * (1 - self.params['stop_loss_pct'])
- long_take_profit_price = long_entry_price * (1 + self.params['take_profit_pct'])
-
- df.iloc[i, df.columns.get_loc('long_position')] = long_position
-
- print(f"\n{'='*60}")
- print(f"[LONG_OPEN] 做多开仓信号 #{len(trades) + 1}")
- print(f"{'='*60}")
- print(f"开仓时间: {long_entry_time}")
- print(f"开仓价格: {long_entry_price:.2f} 元")
- print(f"预计止损: {long_stop_loss_price:.2f} 元 (-{self.params['stop_loss_pct']*100:.1f}%)")
- print(f"预计止盈: {long_take_profit_price:.2f} 元 (+{self.params['take_profit_pct']*100:.1f}%)")
- print(f"持仓数量: {position_size} 股")
- print(f"开仓市值: {position_size * long_entry_price:,.2f} 元")
- print(f"交易成本: {cost:,.2f} 元")
- print(f"剩余资金: {capital:,.2f} 元")
- print(f"入场信号: {long_entry_signals}")
- print(f"总资产: {capital + position_size * long_entry_price:,.2f} 元")
-
- # 做空开仓
- elif current_bar['Signal'] == -1:
- position_value = capital * self.params['position_size_pct']
- position_size = int(position_value / price)
-
- if position_size > 0:
- margin_required = position_size * price
- commission = position_size * price * (self.params['commission_rate'] + self.params['slippage_rate'])
- total_cost = margin_required + commission
-
- if total_cost <= capital:
- short_position = -position_size
- short_entry_price = price
- short_entry_time = current_time
- short_entry_signals = current_bar.get('Short_Signals', '')
- short_holding_bars = 0
- capital -= total_cost
- # 计算预计止损止盈价格
- short_stop_loss_price = short_entry_price * (1 + self.params['stop_loss_pct'])
- short_take_profit_price = short_entry_price * (1 - self.params['take_profit_pct'])
-
- df.iloc[i, df.columns.get_loc('short_position')] = short_position
-
- print(f"\n{'='*60}")
- print(f"[SHORT_OPEN] 做空开仓信号 #{len(trades) + 1}")
- print(f"{'='*60}")
- print(f"开仓时间: {short_entry_time}")
- print(f"开仓价格: {short_entry_price:.2f} 元")
- print(f"预计止损: {short_stop_loss_price:.2f} 元 (+{self.params['stop_loss_pct']*100:.1f}%)")
- print(f"预计止盈: {short_take_profit_price:.2f} 元 (-{self.params['take_profit_pct']*100:.1f}%)")
- print(f"做空数量: {position_size} 股")
- print(f"开仓市值: {position_size * short_entry_price:,.2f} 元")
- print(f"保证金占用: {margin_required:,.2f} 元")
- print(f"手续费: {commission:,.2f} 元")
- print(f"总扣除: {total_cost:,.2f} 元")
- print(f"剩余资金: {capital:,.2f} 元")
- print(f"入场信号: {short_entry_signals}")
- print(f"总资产: {capital + margin_required:,.2f} 元")
-
- # 平仓逻辑 - 做多平仓
- elif long_position > 0:
- long_holding_bars += 1
- # 计算止损止盈价格
- stop_loss = long_entry_price * (1 - self.params['stop_loss_pct'])
- take_profit = long_entry_price * (1 + self.params['take_profit_pct'])
- exit_signal = False
- exit_reason = ''
- exit_price = price
- # 止损
- if price <= stop_loss:
- exit_signal = True
- loss_pct = (long_entry_price - stop_loss) / long_entry_price * 100
- exit_reason = f"做多止损触发(价格{price:.2f}跌破止损线{stop_loss:.2f},亏损{loss_pct:.2f}%)"
- exit_price = price # 使用实际市场价格,而不是止损线价格
- # 止盈
- elif price >= take_profit:
- exit_signal = True
- profit_pct = (price - long_entry_price) / long_entry_price * 100
- exit_reason = f"做多止盈触发(价格{price:.2f}突破止盈线{take_profit:.2f},盈利{profit_pct:.2f}%)"
- exit_price = price # 使用实际市场价格,而不是止盈线价格
-
- # 最大持仓时间
- elif long_holding_bars >= self.params['max_hold_bars']:
- exit_signal = True
- current_pnl_pct = (price - long_entry_price) / long_entry_price * 100
- exit_reason = f"做多时间止损(持仓{long_holding_bars}周期达上限{self.params['max_hold_bars']}周期,当前盈亏{current_pnl_pct:+.2f}%)"
-
- # 做多信号消失
- elif current_bar['RSI'] > 70:
- exit_signal = True
- current_pnl_pct = (price - long_entry_price) / long_entry_price * 100
- exit_reason = f"做多RSI超买平仓(RSI={current_bar['RSI']:.1f}超买,信号消失,当前盈亏{current_pnl_pct:+.2f}%)"
-
- # 执行平仓
- if exit_signal:
- # 计算盈亏
- gross_pnl = (exit_price - long_entry_price) * long_position
- open_cost = long_position * long_entry_price * (self.params['commission_rate'] + self.params['slippage_rate'])
- close_revenue = long_position * exit_price
- close_cost = close_revenue * (self.params['commission_rate'] + self.params['slippage_rate'])
- pnl = gross_pnl - open_cost - close_cost
-
- # 更新资金
- capital += close_revenue - close_cost
-
- # 记录交易
- trade = {
- '交易方向': '做多',
- '开仓时间': long_entry_time,
- '平仓时间': current_time,
- '开仓价格': long_entry_price,
- '平仓价格': exit_price,
- '仓位': long_position,
- '盈亏金额': pnl,
- '盈亏百分比': (exit_price - long_entry_price) / long_entry_price * 100,
- '退出原因': exit_reason,
- '持仓周期数': long_holding_bars,
- '持仓小时数': long_holding_bars * 0.5,
- '入场信号': long_entry_signals,
- '平仓时资金': capital,
- '开仓市值': long_position * long_entry_price,
- '预计止损价格': long_stop_loss_price,
- '预计止盈价格': long_take_profit_price
- }
- trades.append(trade)
-
- # 打印平仓详情
- profit_ratio = (exit_price - long_entry_price) / long_entry_price * 100
- status = "[PROFIT]" if pnl > 0 else "[LOSS]"
-
- print(f"\n{'='*60}")
- print(f"{status} [LONG_CLOSE] 做多平仓信号 #{len(trades)}")
- print(f"{'='*60}")
- print(f"平仓时间: {current_time}")
- print(f"平仓价格: {exit_price:.2f} 元")
- print(f"持仓时长: {long_holding_bars * 0.5:.1f} 小时 ({long_holding_bars} 个30分钟周期)")
- print(f"退出原因: {exit_reason}")
- print(f"{'-'*60}")
- print(f"盈亏金额: {pnl:+,.2f} 元")
- print(f"盈亏比例: {profit_ratio:+.2f}%")
- print(f"{'-'*60}")
- print(f"当前资金: {capital:,.2f} 元")
- print(f"累计收益率: {(capital / self.initial_capital - 1) * 100:+.2f}%")
- print(f"{'='*60}")
-
- # 重置做多持仓
- long_position = 0
- long_entry_price = 0
- long_entry_time = None
- long_holding_bars = 0
-
- # 平仓逻辑 - 做空平仓
- elif short_position < 0:
- short_holding_bars += 1
- # 计算止损止盈价格(做空逻辑相反)
- stop_loss_price = short_entry_price * (1 + self.params['stop_loss_pct']) # 价格上涨止损
- take_profit_price = short_entry_price * (1 - self.params['take_profit_pct']) # 价格下跌止盈
- exit_signal = False
- exit_reason = ''
- exit_price = price
- # 止损(价格上涨)
- if price >= stop_loss_price:
- exit_signal = True
- loss_pct = (stop_loss_price - short_entry_price) / short_entry_price * 100
- exit_reason = f"做空止损触发(价格{price:.2f}突破止损线{stop_loss_price:.2f},亏损{loss_pct:.2f}%)"
- exit_price = price # 使用实际市场价格,而不是止损线价格
- # 止盈(价格下跌)
- elif price <= take_profit_price:
- exit_signal = True
- profit_pct = (short_entry_price - price) / short_entry_price * 100
- exit_reason = f"做空止盈触发(价格{price:.2f}跌破止盈线{take_profit_price:.2f},盈利{profit_pct:.2f}%)"
- exit_price = price # 使用实际市场价格,而不是止盈线价格
-
- # 最大持仓时间
- elif short_holding_bars >= self.params['max_hold_bars']:
- exit_signal = True
- current_pnl_pct = (short_entry_price - price) / short_entry_price * 100
- exit_reason = f"做空时间止损(持仓{short_holding_bars}周期达上限{self.params['max_hold_bars']}周期,当前盈亏{current_pnl_pct:+.2f}%)"
-
- # 做空信号消失
- elif current_bar['RSI'] < 30:
- exit_signal = True
- current_pnl_pct = (short_entry_price - price) / short_entry_price * 100
- exit_reason = f"做空RSI超卖平仓(RSI={current_bar['RSI']:.1f}超卖,信号消失,当前盈亏{current_pnl_pct:+.2f}%)"
-
- # 执行平仓
- if exit_signal:
- # 计算盈亏
- gross_pnl = (short_entry_price - exit_price) * abs(short_position)
- open_commission = abs(short_position) * short_entry_price * (self.params['commission_rate'] + self.params['slippage_rate'])
- close_commission = abs(short_position) * exit_price * (self.params['commission_rate'] + self.params['slippage_rate'])
- net_pnl = gross_pnl - close_commission
-
- # 更新资金(返还保证金 + 净盈亏)
- margin_returned = abs(short_position) * short_entry_price
- capital += margin_returned + net_pnl
-
- # 记录交易
- trade = {
- '交易方向': '做空',
- '开仓时间': short_entry_time,
- '平仓时间': current_time,
- '开仓价格': short_entry_price,
- '平仓价格': exit_price,
- '仓位': abs(short_position),
- '盈亏金额': net_pnl,
- '盈亏百分比': (short_entry_price - exit_price) / short_entry_price * 100,
- '退出原因': exit_reason,
- '持仓周期数': short_holding_bars,
- '持仓小时数': short_holding_bars * 0.5,
- '入场信号': short_entry_signals,
- '平仓时资金': capital,
- '开仓市值': abs(short_position) * short_entry_price,
- '保证金返还': margin_returned,
- '预计止损价格': short_stop_loss_price,
- '预计止盈价格': short_take_profit_price
- }
- trades.append(trade)
-
- # 打印平仓详情
- profit_ratio = (short_entry_price - exit_price) / short_entry_price * 100
- status = "[PROFIT]" if net_pnl > 0 else "[LOSS]"
-
- print(f"\n{'='*60}")
- print(f"{status} [SHORT_CLOSE] 做空平仓信号 #{len(trades)}")
- print(f"{'='*60}")
- print(f"平仓时间: {current_time}")
- print(f"平仓价格: {exit_price:.2f} 元")
- print(f"持仓时长: {short_holding_bars * 0.5:.1f} 小时 ({short_holding_bars} 个30分钟周期)")
- print(f"退出原因: {exit_reason}")
- print(f"{'-'*60}")
- print(f"价差盈亏: {gross_pnl:+,.2f} 元")
- print(f"平仓手续费: {close_commission:,.2f} 元")
- print(f"净盈亏: {net_pnl:+,.2f} 元")
- print(f"盈亏比例: {profit_ratio:+.2f}%")
- print(f"保证金返还: {margin_returned:,.2f} 元")
- print(f"{'-'*60}")
- print(f"当前资金: {capital:,.2f} 元")
- print(f"累计收益率: {(capital / self.initial_capital - 1) * 100:+.2f}%")
- print(f"{'='*60}")
-
- # 重置做空持仓
- short_position = 0
- short_entry_price = 0
- short_entry_time = None
- short_holding_bars = 0
-
- # 更新资金和持仓状态
- df.iloc[i, df.columns.get_loc('capital')] = capital
- df.iloc[i, df.columns.get_loc('long_position')] = long_position
- df.iloc[i, df.columns.get_loc('short_position')] = short_position
-
- # 强制平仓剩余持仓
- # 做多持仓强制平仓
- if long_position > 0:
- final_price = df.iloc[-1]['Close']
-
- gross_pnl = (final_price - long_entry_price) * long_position
- open_cost = long_position * long_entry_price * (self.params['commission_rate'] + self.params['slippage_rate'])
- close_revenue = long_position * final_price
- close_cost = close_revenue * (self.params['commission_rate'] + self.params['slippage_rate'])
- pnl = gross_pnl - open_cost - close_cost
-
- capital += close_revenue - close_cost
-
- trade = {
- '交易方向': '做多',
- '开仓时间': long_entry_time,
- '平仓时间': df.index[-1],
- '开仓价格': long_entry_price,
- '平仓价格': final_price,
- '仓位': long_position,
- '盈亏金额': pnl,
- '盈亏百分比': (final_price - long_entry_price) / long_entry_price * 100,
- '退出原因': f'做多强制平仓(回测结束,持仓{long_holding_bars}周期,最终价格{final_price:.2f},盈亏{(final_price - long_entry_price) / long_entry_price * 100:+.2f}%)',
- '持仓周期数': long_holding_bars,
- '持仓小时数': long_holding_bars * 0.5,
- '入场信号': long_entry_signals,
- '平仓时资金': capital,
- '开仓市值': long_position * long_entry_price,
- '预计止损价格': long_stop_loss_price,
- '预计止盈价格': long_take_profit_price
- }
- trades.append(trade)
-
- print(f"\n{'='*60}")
- print(f"[FORCE] [LONG_CLOSE] 做多强制平仓信号 #{len(trades)}")
- print(f"{'='*60}")
- print(f"平仓时间: {df.index[-1]}")
- print(f"平仓价格: {final_price:.2f} 元")
- print(f"退出原因: 做多强制平仓(回测结束)")
- print(f"盈亏金额: {pnl:+,.2f} 元")
- print(f"{'='*60}")
-
- # 更新最终资金到results_df
- df.iloc[-1, df.columns.get_loc('capital')] = capital
- df.iloc[-1, df.columns.get_loc('net_value')] = capital
-
- # 做空持仓强制平仓
- if short_position < 0:
- final_price = df.iloc[-1]['Close']
-
- gross_pnl = (short_entry_price - final_price) * abs(short_position)
- close_commission = abs(short_position) * final_price * (self.params['commission_rate'] + self.params['slippage_rate'])
- net_pnl = gross_pnl - close_commission
-
- margin_returned = abs(short_position) * short_entry_price
- capital += margin_returned + net_pnl
-
- trade = {
- '交易方向': '做空',
- '开仓时间': short_entry_time,
- '平仓时间': df.index[-1],
- '开仓价格': short_entry_price,
- '平仓价格': final_price,
- '仓位': abs(short_position),
- '盈亏金额': net_pnl,
- '盈亏百分比': (short_entry_price - final_price) / short_entry_price * 100,
- '退出原因': f'做空强制平仓(回测结束,持仓{short_holding_bars}周期,最终价格{final_price:.2f},盈亏{(short_entry_price - final_price) / short_entry_price * 100:+.2f}%)',
- '持仓周期数': short_holding_bars,
- '持仓小时数': short_holding_bars * 0.5,
- '入场信号': short_entry_signals,
- '平仓时资金': capital,
- '开仓市值': abs(short_position) * short_entry_price,
- '保证金返还': margin_returned,
- '预计止损价格': short_stop_loss_price,
- '预计止盈价格': short_take_profit_price
- }
- trades.append(trade)
-
- print(f"\n{'='*60}")
- print(f"[FORCE] [SHORT_CLOSE] 做空强制平仓信号 #{len(trades)}")
- print(f"{'='*60}")
- print(f"平仓时间: {df.index[-1]}")
- print(f"平仓价格: {final_price:.2f} 元")
- print(f"退出原因: 做空强制平仓(回测结束)")
- print(f"盈亏金额: {net_pnl:+,.2f} 元")
- print(f"{'='*60}")
-
- # 更新最终资金到results_df
- df.iloc[-1, df.columns.get_loc('capital')] = capital
- df.iloc[-1, df.columns.get_loc('net_value')] = capital
-
- trades_df = pd.DataFrame(trades)
-
- if len(trades_df) > 0:
- # 统一时间格式
- for col in trades_df.columns:
- if '时间' in col:
- trades_df[col] = pd.to_datetime(trades_df[col])
- trades_df = trades_df.sort_values('开仓时间')
-
- print(f"多空双向交易执行完成,共{len(trades_df)}笔交易")
-
- return df, trades_df
- # ==================== 验证分析模块 ====================
- def validate_dual_direction_results(results_df, trades_df, initial_capital):
- """验证多空双向交易结果"""
- print("\n" + "=" * 80)
- print("创业板50 30分钟多空双向交易结果验证")
- print("=" * 80)
-
- print(f"\n【基础数据验证】")
- final_capital = results_df['net_value'].iloc[-1]
- total_return = (final_capital - initial_capital) / initial_capital * 100
-
- print(f"初始资金: {initial_capital:,.2f}元")
- print(f"最终资金: {final_capital:,.2f}元")
- print(f"总收益率: {total_return:.2f}%")
- print(f"交易次数: {len(trades_df)}笔")
-
- if len(trades_df) > 0:
- # 按交易方向分类统计
- long_trades = trades_df[trades_df['交易方向'] == '做多']
- short_trades = trades_df[trades_df['交易方向'] == '做空']
-
- print(f"\n【交易方向统计】")
- print(f"做多交易: {len(long_trades)}笔 ({len(long_trades)/len(trades_df)*100:.1f}%)")
- print(f"做空交易: {len(short_trades)}笔 ({len(short_trades)/len(trades_df)*100:.1f}%)")
-
- # 做多统计
- if len(long_trades) > 0:
- long_win_trades = long_trades[long_trades['盈亏金额'] > 0]
- long_lose_trades = long_trades[long_trades['盈亏金额'] < 0]
- long_total_pnl = long_trades['盈亏金额'].sum()
-
- print(f"\n【做多交易统计】")
- print(f"盈利交易: {len(long_win_trades)}笔 ({len(long_win_trades)/len(long_trades)*100:.1f}%)")
- print(f"亏损交易: {len(long_lose_trades)}笔 ({len(long_lose_trades)/len(long_trades)*100:.1f}%)")
- print(f"平均持仓时间: {long_trades['持仓小时数'].mean():.1f}小时")
- print(f"平均收益率: {long_trades['盈亏百分比'].mean():.2f}%")
- print(f"总盈亏: {long_total_pnl:+,.2f}元")
-
- # 做空统计
- if len(short_trades) > 0:
- short_win_trades = short_trades[short_trades['盈亏金额'] > 0]
- short_lose_trades = short_trades[short_trades['盈亏金额'] < 0]
- short_total_pnl = short_trades['盈亏金额'].sum()
-
- print(f"\n【做空交易统计】")
- print(f"盈利交易: {len(short_win_trades)}笔 ({len(short_win_trades)/len(short_trades)*100:.1f}%)")
- print(f"亏损交易: {len(short_lose_trades)}笔 ({len(short_lose_trades)/len(short_trades)*100:.1f}%)")
- print(f"平均持仓时间: {short_trades['持仓小时数'].mean():.1f}小时")
- print(f"平均收益率: {short_trades['盈亏百分比'].mean():.2f}%")
- print(f"总盈亏: {short_total_pnl:+,.2f}元")
-
- # 整体统计
- win_trades = trades_df[trades_df['盈亏金额'] > 0]
- lose_trades = trades_df[trades_df['盈亏金额'] < 0]
-
- print(f"\n【整体交易统计】")
- print(f"盈利交易: {len(win_trades)}笔 ({len(win_trades)/len(trades_df)*100:.1f}%)")
- print(f"亏损交易: {len(lose_trades)}笔 ({len(lose_trades)/len(trades_df)*100:.1f}%)")
- print(f"平均持仓时间: {trades_df['持仓小时数'].mean():.1f}小时")
- print(f"平均收益率: {trades_df['盈亏百分比'].mean():.2f}%")
-
- # 按退出原因统计
- print(f"\n【退出原因统计】")
- for reason, count in trades_df['退出原因'].value_counts().items():
- percentage = count / len(trades_df) * 100
- reason_pnl = trades_df[trades_df['退出原因'] == reason]['盈亏金额'].sum()
- print(f" {reason}: {count}次 ({percentage:.1f}%) - 总盈亏: {reason_pnl:+,.2f}元")
- # ==================== 主程序 ====================
- def main():
- """主程序 - 运行30分钟多空双向策略"""
-
- print("=" * 80)
- print("创业板50 30分钟多空双向策略")
- print("=" * 80)
-
- # 加载配置文件
- config_manager = ConfigManager('config.json')
- # 自动加载best_parameters.json覆盖默认参数
- best_params_file = 'best_parameters.json'
- if os.path.exists(best_params_file):
- try:
- with open(best_params_file, 'r', encoding='utf-8') as f:
- best_data = json.load(f)
- best_params = best_data.get('best_params', {})
- strategy_keys = ['position_size_pct', 'stop_loss_pct', 'take_profit_pct', 'max_hold_bars']
- for key in strategy_keys:
- if key in best_params:
- config_manager.config.setdefault('strategy', {})[key] = best_params[key]
- print(f"已加载优化参数: {best_params_file}")
- except Exception as e:
- print(f"加载优化参数失败: {e},使用config.json默认参数")
- # 从配置文件读取参数
- BACKTEST_START_DATE = config_manager.get('strategy', 'backtest_start_date', "2025-10-01")
- PREWARMP_DAYS = config_manager.get('strategy', 'prewamp_days', 30)
- INITIAL_CAPITAL = config_manager.get('strategy', 'initial_capital', 1000000)
-
- # 读取截止时间配置,支持"now"或具体日期
- backtest_end_config = config_manager.get('strategy', 'backtest_end_date', "now")
- if backtest_end_config.lower() == "now":
- BACKTEST_END_DATE = datetime.now().strftime('%Y-%m-%d')
- else:
- BACKTEST_END_DATE = backtest_end_config
-
- # 转换日期格式
- start_date = datetime.strptime(BACKTEST_START_DATE, "%Y-%m-%d")
- end_date = datetime.strptime(BACKTEST_END_DATE, "%Y-%m-%d").replace(hour=23, minute=59, second=59)
-
- # 计算数据获取开始时间(回测开始时间 - 预热期)
- data_start_date = start_date - timedelta(days=PREWARMP_DAYS)
-
- # 显示数据源配置
- use_local_file = config_manager.get('data_source', 'use_local_file', False)
- data_source_mode = "本地文件模式" if use_local_file else "在线获取模式"
- local_file_path = config_manager.get('data_source', 'local_file_path', '')
-
- print(f"\n策略参数:")
- print(f" 回测期间: {BACKTEST_START_DATE} 至 {BACKTEST_END_DATE}")
- print(f" 数据获取期间: {data_start_date.strftime('%Y-%m-%d')} 至 {BACKTEST_END_DATE}")
- print(f" 指标预热期: {PREWARMP_DAYS}天")
- print(f" K线周期: 30分钟")
- print(f" 初始资金: {INITIAL_CAPITAL:,}元")
- print(f" 标的指数: 创业板50 (399673)")
- print(f" 交易方向: 多空双向交易")
- print(f" 数据源: {data_source_mode}")
- if use_local_file:
- print(f" 本地文件路径: {local_file_path}")
-
- try:
- # Phase 1: 数据获取
- print(f"\n【Phase 1: 30分钟数据获取】")
- fetcher = IntradayDataFetcher(config_manager)
-
- # 获取包含预热期的完整数据
- full_data = fetcher.fetch_30min_data(start_date=data_start_date, end_date=end_date)
- full_data = fetcher.calculate_intraday_indicators(full_data)
-
- # 筛选回测期间的数据
- original_len = len(full_data)
- backtest_data = full_data[(full_data.index >= start_date) & (full_data.index <= end_date)].copy()
- print(f"筛选回测数据: {original_len} -> {len(backtest_data)} 条")
- print(f"回测数据范围: {backtest_data.index[0]} 到 {backtest_data.index[-1]}")
-
- # Phase 2: 信号生成
- print(f"\n【Phase 2: 多空双向信号生成】")
- signal_gen = DualDirectionSignalGenerator()
- signals_df = signal_gen.generate_dual_direction_signals(backtest_data)
-
- # Phase 3: 交易执行
- print(f"\n【Phase 3: 多空双向交易执行】")
- executor = DualDirectionExecutor(initial_capital=INITIAL_CAPITAL)
- # 用配置参数覆盖默认值
- executor.params['stop_loss_pct'] = config_manager.get('strategy', 'stop_loss_pct', 0.008)
- executor.params['take_profit_pct'] = config_manager.get('strategy', 'take_profit_pct', 0.02)
- executor.params['max_hold_bars'] = config_manager.get('strategy', 'max_hold_bars', 16)
- executor.params['position_size_pct'] = config_manager.get('strategy', 'position_size_pct', 1.0)
- results_df, trades_df = executor.execute_dual_direction_trades(signals_df)
-
- # Phase 4: 验证分析
- print(f"\n【Phase 4: 结果验证与分析】")
- validate_dual_direction_results(results_df, trades_df, INITIAL_CAPITAL)
-
- # Phase 5: 导出数据
- if len(trades_df) > 0:
- print(f"\n【Phase 5: 导出交易数据】")
-
- # 统一时间格式
- for col in trades_df.columns:
- if '时间' in col:
- trades_df[col] = pd.to_datetime(trades_df[col]).dt.strftime('%Y-%m-%d %H:%M:%S')
-
- # 生成带时间戳的文件名
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- output_file = f'cyb50_30min_dual_direction_trades_{timestamp}.csv'
-
- trades_df.to_csv(output_file, index=False, encoding='utf-8-sig')
- print(f"多空双向交易记录已保存到: {output_file}")
- print(f"时间戳格式: YYYY-MM-DD HH:MM:SS")
-
- # 策略总结
- print(f"\n" + "=" * 80)
- print("多空双向策略运行总结")
- print("=" * 80)
-
- if len(trades_df) > 0:
- final_capital = results_df['net_value'].iloc[-1]
- total_return = (final_capital - INITIAL_CAPITAL) / INITIAL_CAPITAL * 100
-
- # 按方向统计
- long_trades = trades_df[trades_df['交易方向'] == '做多']
- short_trades = trades_df[trades_df['交易方向'] == '做空']
-
- print(f"初始资金: {INITIAL_CAPITAL:,.2f}元")
- print(f"最终资金: {final_capital:,.2f}元")
- print(f"总收益率: {total_return:.2f}%")
- print(f"总交易次数: {len(trades_df)}笔")
- print(f"做多交易: {len(long_trades)}笔")
- print(f"做空交易: {len(short_trades)}笔")
- print(f"整体胜率: {(trades_df['盈亏金额'] > 0).sum() / len(trades_df) * 100:.1f}%")
- print(f"平均收益率: {trades_df['盈亏百分比'].mean():.2f}%")
- print(f"最大单笔盈利: {trades_df['盈亏金额'].max():+,.2f}元")
- print(f"最大单笔亏损: {trades_df['盈亏金额'].min():+,.2f}元")
-
- print(f"\n[SUCCESS] 多空双向策略运行成功!")
- else:
- print("未产生任何交易信号")
-
- except Exception as e:
- print(f"\n[ERROR] 多空双向策略运行出错: {str(e)}")
- import traceback
- traceback.print_exc()
-
- finally:
- print(f"\n" + "=" * 80)
- if __name__ == "__main__":
- main()
|