1. 项目目标

本项目旨在帮助学习者掌握量化交易中常用的统计模型应用及简单交易策略开发流程。通过设计、实现和优化基于统计模型的交易策略,学习者将深入理解量化交易的核心概念、技术实现及绩效评估方法。

统计套利与统计模型是量化交易中的经典方法,它们基于市场价格的统计特性,捕捉短期市场异常并从中获利。这类策略通常具有较高的胜率和稳定性,是量化投资组合中的重要组成部分。

2. 基础框架与提供资源

2.1. 回测引擎核心代码

# backtester.py
import pandas as pd
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt

class Backtester:
    def __init__(self, data, initial_capital=100000.0, commission=0.0003):
        """
        初始化回测引擎

        参数:
        data: DataFrame, 包含日期索引和OHLCV数据
        initial_capital: float, 初始资金
        commission: float, 交易佣金比例
        """
        self.data = data.copy()
        self.initial_capital = initial_capital
        self.commission = commission
        self.positions = pd.Series(0, index=data.index)
        self.capital = pd.Series(initial_capital, index=data.index)
        self.trades = []

    def run_strategy(self, signals):
        """
        运行回测策略

        参数:
        signals: Series, 交易信号 (1: 买入, -1: 卖出, 0: 持仓不变)

        返回:
        DataFrame, 回测结果
        """
        # 确保信号与数据长度一致
        if len(signals) != len(self.data):
            raise ValueError("信号长度与数据长度不匹配")

        # 初始化结果容器
        positions = pd.Series(0, index=self.data.index)
        holdings = pd.Series(0.0, index=self.data.index)
        cash = pd.Series(self.initial_capital, index=self.data.index)
        equity = pd.Series(0.0, index=self.data.index)

        # 遍历每个交易日
        for i in range(len(self.data)):
            # 当日价格
            close_price = self.data['close'].iloc[i]

            # 如果不是第一个交易日,继承前一日持仓
            if i > 0:
                positions.iloc[i] = positions.iloc[i-1]
                cash.iloc[i] = cash.iloc[i-1]

            # 交易信号处理
            if signals.iloc[i] == 1 and positions.iloc[i] <= 0:  # 买入信号
                # 计算可买入数量
                affordable_shares = int(cash.iloc[i] / (close_price * (1 + self.commission)))
                # 更新持仓
                new_position = affordable_shares
                # 记录交易
                if affordable_shares > 0:
                    self.trades.append({
                        'date': self.data.index[i],
                        'action': 'BUY',
                        'price': close_price,
                        'quantity': new_position - positions.iloc[i],
                        'commission': close_price * (new_position - positions.iloc[i]) * self.commission
                    })
                    # 更新现金和持仓
                    trade_cost = close_price * (new_position - positions.iloc[i])
                    commission_cost = trade_cost * self.commission
                    cash.iloc[i] -= (trade_cost + commission_cost)
                    positions.iloc[i] = new_position

            elif signals.iloc[i] == -1 and positions.iloc[i] >= 0:  # 卖出信号
                if positions.iloc[i] > 0:
                    # 记录交易
                    self.trades.append({
                        'date': self.data.index[i],
                        'action': 'SELL',
                        'price': close_price,
                        'quantity': positions.iloc[i],
                        'commission': close_price * positions.iloc[i] * self.commission
                    })
                    # 更新现金和持仓
                    trade_value = close_price * positions.iloc[i]
                    commission_cost = trade_value * self.commission
                    cash.iloc[i] += (trade_value - commission_cost)
                    positions.iloc[i] = 0

            # 计算当日持仓市值和总资产
            holdings.iloc[i] = positions.iloc[i] * close_price
            equity.iloc[i] = cash.iloc[i] + holdings.iloc[i]

        # 整合结果
        results = pd.DataFrame({
            'positions': positions,
            'holdings': holdings,
            'cash': cash,
            'equity': equity,
            'returns': equity.pct_change()
        })

        self.results = results
        return results

    def plot_results(self):
        """绘制回测结果图表"""
        if not hasattr(self, 'results'):
            raise ValueError("请先运行策略回测")

        plt.figure(figsize=(12, 8))

        # 绘制资产曲线
        plt.subplot(2, 1, 1)
        plt.plot(self.results.index, self.results['equity'], label='Portfolio Value')
        plt.plot(self.results.index, self.results['cash'], label='Cash', alpha=0.5)
        plt.title('Backtest Results')
        plt.xlabel('Date')
        plt.ylabel('Value')
        plt.legend()
        plt.grid(True)

        # 绘制持仓和价格
        plt.subplot(2, 1, 2)
        plt.plot(self.data.index, self.data['close'], label='Close Price')
        plt.fill_between(self.results.index, 0, self.results['positions'],
                        alpha=0.3, label='Position')
        plt.xlabel('Date')
        plt.ylabel('Price / Position')
        plt.legend()
        plt.grid(True)

        plt.tight_layout()
        plt.show()

    def get_performance_stats(self):
        """计算策略绩效统计指标"""
        if not hasattr(self, 'results'):
            raise ValueError("请先运行策略回测")

        daily_returns = self.results['returns'].dropna()

        # 计算年化收益率
        annual_return = (daily_returns.mean() * 252) * 100

        # 计算波动率
        volatility = (daily_returns.std() * np.sqrt(252)) * 100

        # 计算夏普比率
        risk_free_rate = 0.02  # 假设的无风险利率
        sharpe_ratio = (annual_return / 100 - risk_free_rate) / (volatility / 100)

        # 计算最大回撤
        equity = self.results['equity']
        rolling_max = equity.cummax()
        drawdown = (equity - rolling_max) / rolling_max
        max_drawdown = drawdown.min() * 100

        # 计算交易统计
        num_trades = len(self.trades)
        if num_trades > 0:
            profit_trades = sum(1 for t in self.trades if t['action'] == 'SELL' and
                               t['price'] * t['quantity'] - t.get('buy_price', 0) * t['quantity'] > 0)
            win_rate = profit_trades / num_trades if num_trades > 0 else 0
        else:
            win_rate = 0

        # 组织结果
        stats = {
            'Initial Capital': self.initial_capital,
            'Final Equity': self.results['equity'].iloc[-1],
            'Total Return (%)': ((self.results['equity'].iloc[-1] / self.initial_capital) - 1) * 100,
            'Annual Return (%)': annual_return,
            'Volatility (%)': volatility,
            'Sharpe Ratio': sharpe_ratio,
            'Max Drawdown (%)': max_drawdown,
            'Number of Trades': num_trades,
            'Win Rate (%)': win_rate * 100 if num_trades > 0 else 0
        }

        return stats

2.2. 市场模拟器

# market_simulator.py
import pandas as pd
import numpy as np
import yfinance as yf
from datetime import datetime, timedelta

class MarketSimulator:
    def __init__(self):
        """初始化市场模拟器"""
        pass

    def fetch_data(self, ticker, start_date, end_date, interval='1d'):
        """
        从Yahoo Finance获取历史数据

        参数:
        ticker: str, 股票代码
        start_date: str, 开始日期,格式 'YYYY-MM-DD'
        end_date: str, 结束日期,格式 'YYYY-MM-DD'
        interval: str, 时间间隔,如 '1d', '1h', '1m'

        返回:
        DataFrame, 历史价格数据
        """
        data = yf.download(ticker, start=start_date, end=end_date, interval=interval)
        return data

    def generate_synthetic_data(self, days=252, volatility=0.01, trend=0.0001,
                              start_price=100, seed=None):
        """
        生成合成的价格数据

        参数:
        days: int, 数据天数
        volatility: float, 日波动率
        trend: float, 日趋势因子
        start_price: float, 起始价格
        seed: int, 随机数种子

        返回:
        DataFrame, 合成的价格数据
        """
        if seed is not None:
            np.random.seed(seed)

        # 生成日期序列
        end_date = datetime.now()
        start_date = end_date - timedelta(days=days)
        date_range = pd.date_range(start=start_date, end=end_date, freq='B')

        # 生成随机价格
        returns = np.random.normal(trend, volatility, len(date_range))
        price_series = start_price * (1 + returns).cumprod()

        # 创建OHLCV数据
        high = price_series * (1 + np.random.uniform(0, 0.005, len(date_range)))
        low = price_series * (1 - np.random.uniform(0, 0.005, len(date_range)))
        open_price = price_series * (1 + np.random.normal(0, 0.003, len(date_range)))
        volume = np.random.lognormal(10, 1, len(date_range)).astype(int)

        # 创建DataFrame
        data = pd.DataFrame({
            'open': open_price,
            'high': high,
            'low': low,
            'close': price_series,
            'volume': volume
        }, index=date_range)

        return data

    def add_noise(self, data, noise_level=0.005):
        """
        向价格数据添加噪声

        参数:
        data: DataFrame, 价格数据
        noise_level: float, 噪声级别

        返回:
        DataFrame, 添加噪声后的数据
        """
        noisy_data = data.copy()

        # 为OHLC添加噪声
        for col in ['open', 'high', 'low', 'close']:
            if col in noisy_data.columns:
                noise = np.random.normal(0, noise_level, len(noisy_data))
                noisy_data[col] = noisy_data[col] * (1 + noise)

        return noisy_data

    def create_mean_reverting_series(self, days=252, mean=100, reversion_strength=0.05,
                                   volatility=1.0, seed=None):
        """
        创建均值回归价格序列

        参数:
        days: int, 数据天数
        mean: float, 均值
        reversion_strength: float, 回归强度
        volatility: float, 波动率
        seed: int, 随机数种子

        返回:
        DataFrame, 均值回归价格数据
        """
        if seed is not None:
            np.random.seed(seed)

        # 生成日期序列
        end_date = datetime.now()
        start_date = end_date - timedelta(days=days)
        date_range = pd.date_range(start=start_date, end=end_date, freq='B')

        # 初始化价格
        prices = np.zeros(len(date_range))
        prices[0] = mean

        # 生成均值回归序列
        for i in range(1, len(date_range)):
            # 均值回归项 + 随机波动
            prices[i] = prices[i-1] + reversion_strength * (mean - prices[i-1]) + np.random.normal(0, volatility)

        # 创建OHLCV数据
        high = prices + np.random.uniform(0, 0.5, len(date_range))
        low = prices - np.random.uniform(0, 0.5, len(date_range))
        open_price = prices + np.random.normal(0, 0.3, len(date_range))
        volume = np.random.lognormal(10, 1, len(date_range)).astype(int)

        # 创建DataFrame
        data = pd.DataFrame({
            'open': open_price,
            'high': high,
            'low': low,
            'close': prices,
            'volume': volume
        }, index=date_range)

        return data

2.3. 基本的绩效评估指标实现

# performance_metrics.py
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

class ReturnMetrics:
    """计算收益相关指标的类"""
    
    @staticmethod
    def calculate_returns(equity_curve):
        """计算每日收益率"""
        return equity_curve.pct_change().dropna()
    
    @staticmethod
    def calculate_cumulative_returns(returns):
        """计算累积收益率"""
        return (1 + returns).cumprod() - 1
    
    @staticmethod
    def calculate_annual_return(returns):
        """计算年化收益率"""
        trading_days = 252
        total_days = len(returns)
        total_return = (1 + returns).prod() - 1
        return (1 + total_return) ** (trading_days / total_days) - 1

class RiskMetrics:
    """计算风险相关指标的类"""
    
    @staticmethod
    def calculate_volatility(returns):
        """计算波动率"""
        return returns.std() * np.sqrt(252)
    
    @staticmethod
    def calculate_sharpe_ratio(returns, risk_free_rate=0.0):
        """计算夏普比率"""
        excess_returns = returns - risk_free_rate / 252
        return excess_returns.mean() / excess_returns.std() * np.sqrt(252)
    
    @staticmethod
    def calculate_sortino_ratio(returns, risk_free_rate=0.0):
        """计算索提诺比率"""
        excess_returns = returns - risk_free_rate / 252
        downside_returns = excess_returns[excess_returns < 0]
        downside_deviation = downside_returns.std() * np.sqrt(252)
        return (excess_returns.mean() * 252) / downside_deviation if downside_deviation != 0 else np.nan
    
    @staticmethod
    def calculate_max_drawdown(equity_curve):
        """计算最大回撤"""
        rolling_max = equity_curve.cummax()
        drawdown = (equity_curve - rolling_max) / rolling_max
        
        max_drawdown = drawdown.min()
        max_drawdown_idx = drawdown.idxmin()
        peak_idx = equity_curve[:max_drawdown_idx].idxmax()
        
        if max_drawdown_idx == equity_curve.index[-1]:
            recovery_idx = None
            recovery_time = None
        else:
            recovery_mask = equity_curve[max_drawdown_idx:] >= equity_curve[peak_idx]
            recovery_idx = recovery_mask[recovery_mask].first_valid_index() if recovery_mask.any() else None
            recovery_time = (recovery_idx - max_drawdown_idx).days if recovery_idx else None
        
        drawdown_duration = (max_drawdown_idx - peak_idx).days
        
        drawdown_info = {
            'max_drawdown': max_drawdown,
            'peak_date': peak_idx,
            'trough_date': max_drawdown_idx,
            'recovery_date': recovery_idx,
            'drawdown_duration_days': drawdown_duration,
            'recovery_duration_days': recovery_time
        }
        
        return max_drawdown, drawdown_info
    
    @staticmethod
    def calculate_calmar_ratio(annual_return, max_drawdown):
        """计算卡玛比率"""
        return -annual_return / max_drawdown if max_drawdown != 0 else np.inf
    
    @staticmethod
    def calculate_omega_ratio(returns, threshold=0.0):
        """计算欧米茄比率"""
        threshold_daily = threshold / 252
        gain = returns[returns > threshold_daily].sum()
        loss = -returns[returns < threshold_daily].sum()
        return gain / loss if loss != 0 else np.inf

class TradeMetrics:
    """计算交易相关指标的类"""
    
    @staticmethod
    def calculate_win_rate(trades):
        """计算胜率"""
        if not trades:
            return 0.0
        winning_trades = sum(1 for t in trades if t['profit'] > 0)
        return winning_trades / len(trades)
    
    @staticmethod
    def calculate_profit_factor(trades):
        """计算盈利因子"""
        total_profit = sum(t['profit'] for t in trades if t['profit'] > 0)
        total_loss = -sum(t['profit'] for t in trades if t['profit'] < 0)
        return total_profit / total_loss if total_loss != 0 else np.inf
    
    @staticmethod
    def calculate_average_profit(trades):
        """计算平均盈利"""
        return np.mean([t['profit'] for t in trades]) if trades else 0
    
    @staticmethod
    def calculate_average_loss(trades):
        """计算平均亏损"""
        losses = [t['profit'] for t in trades if t['profit'] < 0]
        return np.mean(losses) if losses else 0
    
    @staticmethod
    def calculate_average_holding_period(trades):
        """计算平均持仓周期"""
        if not trades or not any('holding_period' in t for t in trades):
            return None
        return np.mean([t['holding_period'] for t in trades if 'holding_period' in t])

class BenchmarkMetrics:
    """计算基准比较相关指标的类"""
    
    @staticmethod
    def calculate_beta(returns, benchmark_returns):
        """计算贝塔值"""
        return returns.cov(benchmark_returns) / benchmark_returns.var()
    
    @staticmethod
    def calculate_alpha(annual_return, beta, benchmark_annual_return, risk_free_rate=0.0):
        """计算阿尔法值"""
        return annual_return - risk_free_rate - beta * (benchmark_annual_return - risk_free_rate)
    
    @staticmethod
    def calculate_information_ratio(annual_return, benchmark_annual_return, returns, benchmark_returns):
        """计算信息比率"""
        tracking_error = (returns - benchmark_returns).std() * np.sqrt(252)
        return (annual_return - benchmark_annual_return) / tracking_error if tracking_error != 0 else np.nan

class VisualizationTools:
    """绘制各种分析图表的类"""
    
    @staticmethod
    def plot_equity_curve(equity_curve, benchmark=None):
        """绘制权益曲线"""
        plt.figure(figsize=(12, 6))
        plt.plot(equity_curve, label='Strategy', linewidth=2)
        
        if benchmark is not None:
            plt.plot(benchmark, label='Benchmark', linewidth=2, alpha=0.7)
        
        plt.title('Equity Curve')
        plt.xlabel('Date')
        plt.ylabel('Value')
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()
    
    @staticmethod
    def plot_drawdown_underwater(equity_curve):
        """绘制回撤水下图"""
        rolling_max = equity_curve.cummax()
        drawdown = (equity_curve - rolling_max) / rolling_max
        
        plt.figure(figsize=(12, 6))
        plt.plot(drawdown, color='red', linewidth=2)
        plt.fill_between(drawdown.index, drawdown, 0, color='red', alpha=0.3)
        plt.title('Drawdown Underwater Chart')
        plt.xlabel('Date')
        plt.ylabel('Drawdown')
        plt.grid(True)
        plt.tight_layout()
        plt.show()
    
    @staticmethod
    def plot_monthly_returns_heatmap(returns):
        """绘制月度收益热图"""
        monthly_returns = returns.resample('M').apply(lambda x: (1 + x).prod() - 1)
        monthly_returns_matrix = monthly_returns.groupby([
            monthly_returns.index.year.rename('Year'),
            monthly_returns.index.month.rename('Month')
        ]).first().unstack('Month')
        
        plt.figure(figsize=(12, 6))
        heatmap = plt.pcolor(monthly_returns_matrix, cmap='RdYlGn', edgecolors='white', linewidths=1)
        plt.colorbar(heatmap)
        
        plt.yticks(np.arange(0.5, len(monthly_returns_matrix.index)), monthly_returns_matrix.index)
        plt.xticks(np.arange(0.5, 13), ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'])
        
        for i in range(len(monthly_returns_matrix.index)):
            for j in range(len(monthly_returns_matrix.columns)):
                value = monthly_returns_matrix.iloc[i, j]
                if not np.isnan(value):
                    text_color = 'white' if abs(value) > 0.05 else 'black'
                    plt.text(j + 0.5, i + 0.5, f'{value:.1%}',
                           ha='center', va='center', color=text_color)
        
        plt.title('Monthly Returns (%)')
        plt.tight_layout()
        plt.show()
class PerformanceReport:
    """生成综合绩效报告的类"""
    
    @staticmethod
    def generate_report(equity_curve, trades=None, benchmark=None, risk_free_rate=0.0):
        """生成绩效报告"""
        # 计算基础指标
        returns = ReturnMetrics.calculate_returns(equity_curve)
        cumulative_returns = ReturnMetrics.calculate_cumulative_returns(returns)
        annual_return = ReturnMetrics.calculate_annual_return(returns)
        
        # 计算风险指标
        volatility = RiskMetrics.calculate_volatility(returns)
        sharpe = RiskMetrics.calculate_sharpe_ratio(returns, risk_free_rate)
        sortino = RiskMetrics.calculate_sortino_ratio(returns, risk_free_rate)
        max_drawdown, drawdown_info = RiskMetrics.calculate_max_drawdown(equity_curve)
        calmar = RiskMetrics.calculate_calmar_ratio(annual_return, max_drawdown)
        omega = RiskMetrics.calculate_omega_ratio(returns, risk_free_rate)
        
        # 构建报告
        report = {
            'General': {
                'Start Date': equity_curve.index[0],
                'End Date': equity_curve.index[-1],
                'Duration': f"{(equity_curve.index[-1] - equity_curve.index[0]).days} days",
                'Initial Equity': equity_curve.iloc[0],
                'Final Equity': equity_curve.iloc[-1],
                'Total Return': f"{cumulative_returns.iloc[-1]:.2%}",
                'Annual Return': f"{annual_return:.2%}",
                'Annual Volatility': f"{volatility:.2%}"
            },
            'Risk Metrics': {
                'Sharpe Ratio': f"{sharpe:.2f}",
                'Sortino Ratio': f"{sortino:.2f}",
                'Calmar Ratio': f"{calmar:.2f}",
                'Omega Ratio': f"{omega:.2f}",
                'Maximum Drawdown': f"{max_drawdown:.2%}",
                'Drawdown Start': drawdown_info['peak_date'],
                'Drawdown End': drawdown_info['trough_date'],
                'Drawdown Duration': f"{drawdown_info['drawdown_duration_days']} days",
                'Recovery Duration': f"{drawdown_info['recovery_duration_days']} days" if drawdown_info['recovery_duration_days'] else "Not Recovered"
            }
        }
        
        # 添加基准比较指标
        if benchmark is not None:
            benchmark_returns = ReturnMetrics.calculate_returns(benchmark)
            benchmark_cumulative_returns = ReturnMetrics.calculate_cumulative_returns(benchmark_returns)
            benchmark_annual_return = ReturnMetrics.calculate_annual_return(benchmark_returns)
            
            beta = BenchmarkMetrics.calculate_beta(returns, benchmark_returns)
            alpha = BenchmarkMetrics.calculate_alpha(annual_return, beta, benchmark_annual_return, risk_free_rate)
            information_ratio = BenchmarkMetrics.calculate_information_ratio(
                annual_return, benchmark_annual_return, returns, benchmark_returns)
            
            report['Benchmark Comparison'] = {
                'Beta': f"{beta:.2f}",
                'Alpha (annualized)': f"{alpha:.2%}",
                'Information Ratio': f"{information_ratio:.2f}",
                'Benchmark Return': f"{benchmark_cumulative_returns.iloc[-1]:.2%}",
                'Excess Return': f"{cumulative_returns.iloc[-1] - benchmark_cumulative_returns.iloc[-1]:.2%}"
            }
        
        # 添加交易统计
        if trades:
            win_rate = TradeMetrics.calculate_win_rate(trades)
            avg_profit = TradeMetrics.calculate_average_profit(trades)
            avg_loss = TradeMetrics.calculate_average_loss(trades)
            profit_factor = TradeMetrics.calculate_profit_factor(trades)
            avg_holding_period = TradeMetrics.calculate_average_holding_period(trades)
            
            report['Trade Statistics'] = {
                'Number of Trades': len(trades),
                'Win Rate': f"{win_rate:.2%}",
                'Average Profit': f"{avg_profit:.2f}",
                'Average Loss': f"{avg_loss:.2f}",
                'Profit Factor': f"{profit_factor:.2f}",
                'Average Holding Period': f"{avg_holding_period:.1f} days" if avg_holding_period else "N/A"
            }
        
        return report

3. 学习者需完成内容

3.1 实现多种统计模型

作为本项目的核心部分,您需要实现至少3种不同的统计模型用于市场分析和交易信号生成:

a. 移动平均交叉策略

这是一个经典的趋势跟踪策略。您需要实现: