| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- from __future__ import annotations
- import numpy as np
- import pandas as pd
- EVENT_LOG_COLUMNS = [
- 'event_date',
- 'from_state',
- 'to_state',
- 'event_state',
- 'event_type',
- 'horizon',
- 'confirm_horizon',
- 'asset_forward_close_return',
- 'strategy_forward_return',
- 'target_exposure',
- 'has_risk_off_within_confirm',
- ]
- EVENT_SUMMARY_COLUMNS = [
- 'event_type',
- 'event_state',
- 'count',
- 'avg_asset_forward_return',
- 'avg_strategy_forward_return',
- 'avg_target_exposure',
- ]
- def _forward_strategy_return(series: pd.Series, horizon: int) -> pd.Series:
- values = series.fillna(0.0).to_numpy(dtype=float)
- out = np.full(len(values), np.nan, dtype=float)
- for i in range(len(values)):
- end = i + horizon
- if end < len(values):
- out[i] = np.prod(1.0 + values[i + 1 : end + 1]) - 1.0
- return pd.Series(out, index=series.index, dtype=float)
- def _classify_event_type(
- *,
- from_state: str,
- to_state: str,
- asset_forward_close_return: float,
- has_risk_off_within_confirm: bool,
- ) -> str:
- if to_state == 'risk_off':
- return 'crash_onset'
- if from_state == 'euphoric_late' and pd.notna(asset_forward_close_return) and asset_forward_close_return < 0.0:
- return 'crowded_unwind'
- if from_state in {'risk_off', 'chop'} and to_state in {'repair', 'trend'}:
- if pd.notna(asset_forward_close_return) and asset_forward_close_return > 0.0 and not has_risk_off_within_confirm:
- return 'true_repair'
- return 'false_rebound'
- return 'state_transition'
- def build_transition_event_log(
- df: pd.DataFrame,
- *,
- horizon: int = 10,
- confirm_horizon: int = 10,
- ) -> pd.DataFrame:
- if 'state' not in df.columns:
- raise ValueError('state column required for event diagnostics.')
- if 'close' not in df.columns:
- raise ValueError('close column required for event diagnostics.')
- if 'strategy_return_net' not in df.columns:
- raise ValueError('strategy_return_net column required for event diagnostics.')
- if horizon <= 0 or confirm_horizon <= 0:
- raise ValueError('horizon and confirm_horizon must be positive integers.')
- out = df.copy().sort_index()
- out['state_prev'] = out['state'].shift(1)
- out['state_change'] = out['state'] != out['state_prev']
- if not out.empty:
- out.iloc[0, out.columns.get_loc('state_change')] = False
- out['asset_forward_close_return'] = out['close'].shift(-horizon) / out['close'] - 1.0
- out['strategy_forward_return'] = _forward_strategy_return(out['strategy_return_net'], horizon=horizon)
- events = out[out['state_change']].copy()
- if events.empty:
- return pd.DataFrame(columns=EVENT_LOG_COLUMNS)
- rows: list[dict[str, object]] = []
- states = out['state'].astype(str)
- for ts, row in events.iterrows():
- from_state = str(row['state_prev'])
- to_state = str(row['state'])
- try:
- pos = int(out.index.get_loc(ts))
- except Exception:
- continue
- future_states = states.iloc[pos + 1 : pos + 1 + confirm_horizon]
- has_risk_off = bool((future_states == 'risk_off').any())
- asset_fwd = float(row['asset_forward_close_return']) if pd.notna(row['asset_forward_close_return']) else np.nan
- event_type = _classify_event_type(
- from_state=from_state,
- to_state=to_state,
- asset_forward_close_return=asset_fwd,
- has_risk_off_within_confirm=has_risk_off,
- )
- rows.append(
- {
- 'event_date': ts,
- 'from_state': from_state,
- 'to_state': to_state,
- 'event_state': to_state,
- 'event_type': event_type,
- 'horizon': int(horizon),
- 'confirm_horizon': int(confirm_horizon),
- 'asset_forward_close_return': asset_fwd,
- 'strategy_forward_return': (
- float(row['strategy_forward_return']) if pd.notna(row['strategy_forward_return']) else np.nan
- ),
- 'target_exposure': float(row['target_exposure']) if 'target_exposure' in row and pd.notna(row['target_exposure']) else np.nan,
- 'has_risk_off_within_confirm': has_risk_off,
- }
- )
- event_log = pd.DataFrame(rows)
- if event_log.empty:
- return pd.DataFrame(columns=EVENT_LOG_COLUMNS)
- return event_log[EVENT_LOG_COLUMNS]
- def summarize_transition_events(
- df: pd.DataFrame,
- horizon: int = 10,
- confirm_horizon: int = 10,
- ) -> pd.DataFrame:
- event_log = build_transition_event_log(df, horizon=horizon, confirm_horizon=confirm_horizon)
- if event_log.empty:
- return pd.DataFrame(columns=EVENT_SUMMARY_COLUMNS)
- summary = (
- event_log.groupby(['event_type', 'event_state'])
- .agg(
- count=('event_type', 'size'),
- avg_asset_forward_return=('asset_forward_close_return', 'mean'),
- avg_strategy_forward_return=('strategy_forward_return', 'mean'),
- avg_target_exposure=('target_exposure', 'mean'),
- )
- .reset_index()
- )
- return summary.sort_values(['count', 'event_type'], ascending=[False, True]).reset_index(drop=True)
|