市场情绪分析是量化交易中的重要分支,通过分析各类文本数据(如新闻、社交媒体、研报等)来捕捉市场参与者的情绪变化,进而预测市场走势。本项目旨在帮助学习者构建一个完整的基于市场情绪分析的量化交易策略,覆盖数据采集、文本处理、情绪分析、策略设计与回测优化等环节。
完成本项目后,学习者将能够:
import requests
import pandas as pd
from datetime import datetime, timedelta
import time
import os
from dotenv import load_dotenv
load_dotenv() # 加载环境变量
class DataCollector:
def __init__(self):
# 各API密钥
self.news_api_key = os.getenv("NEWS_API_KEY")
self.twitter_bearer_token = os.getenv("TWITTER_BEARER_TOKEN")
self.reddit_client_id = os.getenv("REDDIT_CLIENT_ID")
self.reddit_client_secret = os.getenv("REDDIT_CLIENT_SECRET")
def get_financial_news(self, keywords, start_date, end_date, max_results=100):
"""
从News API获取金融新闻
参数:
keywords (str): 搜索关键词,如"Bitcoin,Ethereum"
start_date (str): 开始日期,格式 YYYY-MM-DD
end_date (str): 结束日期,格式 YYYY-MM-DD
max_results (int): 最大返回结果数
返回:
pd.DataFrame: 包含新闻数据的DataFrame
"""
url = "<https://newsapi.org/v2/everything>"
params = {
'q': keywords,
'from': start_date,
'to': end_date,
'language': 'en',
'sortBy': 'publishedAt',
'pageSize': min(max_results, 100), # API限制
'apiKey': self.news_api_key
}
response = requests.get(url, params=params)
if response.status_code != 200:
print(f"Error: {response.status_code}, {response.text}")
return pd.DataFrame()
articles = response.json().get('articles', [])
# 创建DataFrame
if not articles:
return pd.DataFrame()
df = pd.DataFrame(articles)
df['publishedAt'] = pd.to_datetime(df['publishedAt'])
# 提取重要字段
df = df[['title', 'description', 'content', 'publishedAt', 'source', 'url']]
return df
def get_twitter_data(self, query, start_time, end_time, max_results=100):
"""
从Twitter API v2获取推文数据
参数:
query (str): 搜索查询,如"#Bitcoin"
start_time (str): 开始时间,格式 YYYY-MM-DDTHH:MM:SSZ
end_time (str): 结束时间,格式 YYYY-MM-DDTHH:MM:SSZ
max_results (int): 最大返回结果数
返回:
pd.DataFrame: 包含推文数据的DataFrame
"""
url = "<https://api.twitter.com/2/tweets/search/recent>"
headers = {
"Authorization": f"Bearer {self.twitter_bearer_token}"
}
params = {
'query': query,
'start_time': start_time,
'end_time': end_time,
'max_results': min(max_results, 100), # API限制
'tweet.fields': 'created_at,public_metrics,lang'
}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
print(f"Error: {response.status_code}, {response.text}")
return pd.DataFrame()
tweets = response.json().get('data', [])
# 创建DataFrame
if not tweets:
return pd.DataFrame()
df = pd.DataFrame(tweets)
df['created_at'] = pd.to_datetime(df['created_at'])
# 展开public_metrics
metrics_df = pd.json_normalize(df['public_metrics'])
df = pd.concat([df.drop('public_metrics', axis=1), metrics_df], axis=1)
return df
def get_reddit_data(self, subreddit, time_filter='week', limit=100):
"""
从Reddit API获取帖子数据
参数:
subreddit (str): 子版块名称,如"Bitcoin"
time_filter (str): 时间过滤器,如'day', 'week', 'month'
limit (int): 最大返回结果数
返回:
pd.DataFrame: 包含Reddit帖子数据的DataFrame
"""
# 获取认证token
auth = requests.auth.HTTPBasicAuth(self.reddit_client_id, self.reddit_client_secret)
data = {
'grant_type': 'password',
'username': os.getenv("REDDIT_USERNAME"),
'password': os.getenv("REDDIT_PASSWORD")
}
headers = {'User-Agent': 'FinanceApp/0.1'}
response = requests.post(
'<https://www.reddit.com/api/v1/access_token>',
auth=auth,
data=data,
headers=headers
)
if response.status_code != 200:
print(f"Authentication Error: {response.status_code}, {response.text}")
return pd.DataFrame()
token = response.json()['access_token']
headers['Authorization'] = f'bearer {token}'
# 获取帖子数据
url = f"<https://oauth.reddit.com/r/{subreddit}/top>"
params = {
't': time_filter,
'limit': min(limit, 100) # API限制
}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
print(f"Error: {response.status_code}, {response.text}")
return pd.DataFrame()
posts = response.json()['data']['children']
# 创建DataFrame
if not posts:
return pd.DataFrame()
data = []
for post in posts:
post_data = post['data']
data.append({
'title': post_data.get('title'),
'selftext': post_data.get('selftext'),
'created_utc': datetime.fromtimestamp(post_data.get('created_utc')),
'score': post_data.get('score'),
'num_comments': post_data.get('num_comments'),
'upvote_ratio': post_data.get('upvote_ratio'),
'permalink': post_data.get('permalink')
})
df = pd.DataFrame(data)
return df
def save_data(self, df, filename):
"""保存数据到CSV文件"""
df.to_csv(filename, index=False)
print(f"Data saved to {filename}")
# 使用示例
if __name__ == "__main__":
collector = DataCollector()
# 获取比特币相关新闻
end_date = datetime.now().strftime('%Y-%m-%d')
start_date = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')
news_df = collector.get_financial_news("Bitcoin OR Cryptocurrency", start_date, end_date)
collector.save_data(news_df, "bitcoin_news.csv")
# 注意:以下代码需要有对应的API密钥才能执行
# Twitter数据示例
# twitter_df = collector.get_twitter_data("#Bitcoin",
# (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%dT%H:%M:%SZ'),
# datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ'))
# collector.save_data(twitter_df, "bitcoin_tweets.csv")
# Reddit数据示例
# reddit_df = collector.get_reddit_data("Bitcoin", 'week')
# collector.save_data(reddit_df, "bitcoin_reddit.csv")
import pandas as pd
import numpy as np
import re
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
import spacy
import textblob
from textblob import TextBlob
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
import torch
# 下载必要的NLTK数据
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
class TextProcessor:
def __init__(self, use_spacy=False, use_transformers=False):
"""
初始化文本处理器
参数:
use_spacy (bool): 是否使用spaCy进行高级NLP处理
use_transformers (bool): 是否使用Transformers模型进行情感分析
"""
self.lemmatizer = WordNetLemmatizer()
self.stop_words = set(stopwords.words('english'))
self.vader_analyzer = SentimentIntensityAnalyzer()
# 添加金融领域的特定停用词
finance_stopwords = {'nasdaq', 'nyse', 'market', 'stock', 'share', 'price',
'index', 'exchange', 'trading', 'trader', 'investor'}
self.stop_words.update(finance_stopwords)
# 加载spaCy模型(可选)
self.use_spacy = use_spacy
if use_spacy:
try:
self.nlp = spacy.load("en_core_web_sm")
except:
print("spaCy模型未找到。请运行: python -m spacy download en_core_web_sm")
self.use_spacy = False
# 加载transformers模型(可选)
self.use_transformers = use_transformers
if use_transformers:
try:
model_name = "ProsusAI/finbert"
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
self.finbert_pipeline = pipeline("sentiment-analysis", model=self.model, tokenizer=self.tokenizer)
except:
print("无法加载FinBERT模型。请检查您的互联网连接和transformers库安装。")
self.use_transformers = False
def clean_text(self, text):
"""
清洗文本数据
参数:
text (str): 输入文本
返回:
str: 清洗后的文本
"""
if not isinstance(text, str):
return ""
# 转换为小写
text = text.lower()
# 移除URL
text = re.sub(r'http\\S+', '', text)
# 移除@用户名
text = re.sub(r'@\\w+', '', text)
# 移除标签符号
text = re.sub(r'#', '', text)
# 移除特殊字符和数字
text = re.sub(r'[^a-zA-Z\\s]', '', text)
# 移除多余空格
text = re.sub(r'\\s+', ' ', text).strip()
return text
def tokenize_text(self, text):
"""
文本分词
参数:
text (str): 输入文本
返回:
list: 分词后的token列表
"""
tokens = word_tokenize(text)
# 移除停用词
tokens = [token for token in tokens if token not in self.stop_words]
# 词形还原
tokens = [self.lemmatizer.lemmatize(token) for token in tokens]
return tokens
def extract_entities(self, text):
"""
使用spaCy提取命名实体
参数:
text (str): 输入文本
返回:
list: 提取的实体列表
"""
if not self.use_spacy:
return []
doc = self.nlp(text)
entities = [(ent.text, ent.label_) for ent in doc.ents]
return entities
def get_textblob_sentiment(self, text):
"""
使用TextBlob计算情感分数
参数:
text (str): 输入文本
返回:
tuple: (极性, 主观性)
"""
blob = TextBlob(text)
return blob.sentiment.polarity, blob.sentiment.subjectivity
def get_vader_sentiment(self, text):
"""
使用VADER计算情感分数
参数:
text (str): 输入文本
返回:
dict: 包含pos, neg, neu和compound分数的字典
"""
return self.vader_analyzer.polarity_scores(text)
def get_finbert_sentiment(self, text):
"""
使用FinBERT计算金融特定情感
参数:
text (str): 输入文本
返回:
dict: 情感分析结果
"""
if not self.use_transformers:
return {'label': 'neutral', 'score': 0.5}
# 限制文本长度
max_length = 512
if len(text) > max_length:
text = text[:max_length]
try:
result = self.finbert_pipeline(text)[0]
return result
except Exception as e:
print(f"FinBERT分析错误: {e}")
return {'label': 'neutral', 'score': 0.5}
def process_dataframe(self, df, text_column, new_column_prefix='processed'):
"""
处理DataFrame中的文本列
参数:
df (pd.DataFrame): 输入数据框
text_column (str): 包含文本的列名
new_column_prefix (str): 新列的前缀
返回:
pd.DataFrame: 处理后的数据框
"""
# 创建DataFrame的副本
result_df = df.copy()
# 清洗文本
result_df[f'{new_column_prefix}_text'] = result_df[text_column].apply(self.clean_text)
# 分词并计算token长度
result_df[f'{new_column_prefix}_tokens'] = result_df[f'{new_column_prefix}_text'].apply(self.tokenize_text)
result_df[f'{new_column_prefix}_token_count'] = result_df[f'{new_column_prefix}_tokens'].apply(len)
# TextBlob情感分析
sentiment_results = result_df[f'{new_column_prefix}_text'].apply(self.get_textblob_sentiment)
result_df[f'{new_column_prefix}_polarity'] = sentiment_results.apply(lambda x: x[0])
result_df[f'{new_column_prefix}_subjectivity'] = sentiment_results.apply(lambda x: x[1])
# VADER情感分析
vader_results = result_df[f'{new_column_prefix}_text'].apply(self.get_vader_sentiment)
result_df[f'{new_column_prefix}_vader_neg'] = vader_results.apply(lambda x: x['neg'])
result_df[f'{new_column_prefix}_vader_neu'] = vader_results.apply(lambda x: x['neu'])
result_df[f'{new_column_prefix}_vader_pos'] = vader_results.apply(lambda x: x['pos'])
result_df[f'{new_column_prefix}_vader_compound'] = vader_results.apply(lambda x: x['compound'])
# 如果启用,添加FinBERT情感分析
if self.use_transformers:
finbert_results = result_df[f'{new_column_prefix}_text'].apply(self.get_finbert_sentiment)
result_df[f'{new_column_prefix}_finbert_label'] = finbert_results.apply(lambda x: x['label'])
result_df[f'{new_column_prefix}_finbert_score'] = finbert_results.apply(lambda x: x['score'])
# 如果启用,添加命名实体识别
if self.use_spacy:
result_df[f'{new_column_prefix}_entities'] = result_df[f'{new_column_prefix}_text'].apply(self.extract_entities)
return result_df
# 使用示例
if __name__ == "__main__":
# 创建示例数据
data = {
'date': pd.date_range(start='2023-01-01', periods=3),
'headline': [
"Bitcoin surges to new all-time high as institutional investors pile in",
"Markets crash as inflation fears grow and central bank hints at rate hikes",
"Tech stocks rally despite mixed earnings reports from FAANG companies"
]
}
df = pd.DataFrame(data)
# 初始化文本处理器(简单版本,不使用spaCy和transformers以避免依赖问题)
processor = TextProcessor(use_spacy=False, use_transformers=False)
# 处理文本
processed_df = processor.process_dataframe(df, 'headline')
# 显示结果
print(processed_df.head())
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
import statsmodels.api as sm
class SentimentIndicator:
def calculate_daily_sentiment(self, df, date_column, sentiment_column, agg_method='mean'):
"""计算每日情绪得分"""
df = df.copy()
df[date_column] = pd.to_datetime(df[date_column])
df['date'] = df[date_column].dt.date
daily_sentiment = df.groupby('date')[sentiment_column].agg(agg_method).reset_index()
daily_sentiment['date'] = pd.to_datetime(daily_sentiment['date'])
return daily_sentiment
def calculate_indicators(self, daily_sentiment, windows=None):
"""计算所有情绪指标"""
if windows is None:
windows = {'momentum': 5, 'volatility': 10, 'zscore': 20, 'percentile': 60, 'rsi': 14}
sentiment_df = daily_sentiment.sort_values('date').copy()
sentiment_col = sentiment_df.columns[1] # 假设第2列是情绪分数
# 动量指标
w = windows['momentum']
sentiment_df[f'{sentiment_col}_sma{w}'] = sentiment_df[sentiment_col].rolling(window=w).mean()
sentiment_df[f'{sentiment_col}_momentum'] = sentiment_df[sentiment_col] - sentiment_df[f'{sentiment_col}_sma{w}']
sentiment_df[f'{sentiment_col}_pct_change'] = sentiment_df[sentiment_col].pct_change() * 100
# 波动性指标
w = windows['volatility']
sentiment_df[f'{sentiment_col}_volatility'] = sentiment_df[sentiment_col].rolling(window=w).std()
# Z分数
w = windows['zscore']
rolling_mean = sentiment_df[sentiment_col].rolling(window=w).mean()
rolling_std = sentiment_df[sentiment_col].rolling(window=w).std()
sentiment_df[f'{sentiment_col}_zscore'] = (sentiment_df[sentiment_col] - rolling_mean) / rolling_std
# 百分位数
w = windows['percentile']
def rolling_percentile(x):
current = x.iloc[-1]
if len(x) < w // 2:
return np.nan
return stats.percentileofscore(x, current) / 100
sentiment_df[f'{sentiment_col}_percentile'] = sentiment_df[sentiment_col].rolling(window=w).apply(
rolling_percentile, raw=False)
# RSI指标
w = windows['rsi']
delta = sentiment_df[sentiment_col].diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.rolling(window=w).mean()
avg_loss = loss.rolling(window=w).mean()
rs = avg_gain / avg_loss
sentiment_df[f'{sentiment_col}_rsi'] = 100 - (100 / (1 + rs))
return sentiment_df
def sentiment_divergence(self, sentiment_df, price_df, sentiment_col, price_col='close', window=10):
"""计算情绪与价格的背离"""
sentiment = sentiment_df.set_index('date')
price = price_df.set_index('date')
merged = pd.merge(sentiment, price, left_index=True, right_index=True, how='inner')
# 计算趋势方向
merged[f'{sentiment_col}_trend'] = np.sign(merged[sentiment_col].rolling(window=window).mean().diff())
merged[f'{price_col}_trend'] = np.sign(merged[price_col].rolling(window=window).mean().diff())
# 计算背离
merged['divergence'] = merged[f'{sentiment_col}_trend'] != merged[f'{price_col}_trend']
merged['bearish_divergence'] = ((merged[f'{price_col}_trend'] > 0) & (merged[f'{sentiment_col}_trend'] < 0))
merged['bullish_divergence'] = ((merged[f'{price_col}_trend'] < 0) & (merged[f'{sentiment_col}_trend'] > 0))
return merged.reset_index()
def plot_sentiment_indicators(self, df, date_column='date', price_column=None, figsize=(14, 10)):
"""绘制情绪指标图表"""
df[date_column] = pd.to_datetime(df[date_column])
sentiment_columns = [col for col in df.columns
if col != date_column and col != price_column and 'date' not in col.lower()]
fig, axes = plt.subplots(len(sentiment_columns), 1, figsize=figsize, sharex=True)
if len(sentiment_columns) == 1:
axes = [axes]
for i, col in enumerate(sentiment_columns):
ax = axes[i]
ax.plot(df[date_column], df[col], label=col)
ax.set_title(f'{col} Over Time')
ax.set_ylabel(col)
ax.grid(True, alpha=0.3)
ax.legend()
if price_column and price_column in df.columns:
ax2 = axes[-1].twinx()
ax2.plot(df[date_column], df[price_column], 'r-', alpha=0.5, label=price_column)
ax2.set_ylabel(price_column, color='r')
ax2.tick_params(axis='y', labelcolor='r')
ax2.legend(loc='upper right')
plt.tight_layout()
plt.xlabel('Date')
return fig
def analyze_relationship(self, sentiment_df, price_df, sentiment_col, price_col='close', lag_days=5, max_lag=10):
"""分析情绪与价格的关系"""
# 相关性分析
sentiment = sentiment_df.set_index('date')
price = price_df.set_index('date')
price[f'{price_col}_pct_change'] = price[price_col].pct_change()
merged = pd.merge(sentiment, price, left_index=True, right_index=True, how='inner')
# 相关性
correlations = []
for lag in range(1, lag_days + 1):
lagged_price_change = merged[f'{price_col}_pct_change'].shift(-lag)
valid_data = merged[[sentiment_col]].join(lagged_price_change).dropna()
if len(valid_data) > 5: # 确保有足够数据
corr, p_value = stats.pearsonr(valid_data[sentiment_col], valid_data[f'{price_col}_pct_change'])
correlations.append({
'lag_days': lag,
'correlation': corr,
'p_value': p_value,
'significant': p_value < 0.05
})
corr_result = pd.DataFrame(correlations) if correlations else None
# 格兰杰因果关系检验
sentiment_series = merged[sentiment_col].dropna()
price_change_series = merged[f'{price_col}_pct_change'].dropna()
min_length = min(len(sentiment_series), len(price_change_series))
if min_length > max_lag + 2:
data = pd.DataFrame({
'sentiment': sentiment_series.iloc[-min_length:].values,
'price_change': price_change_series.iloc[-min_length:].values
})
gc_results = []
for lag in range(1, max_lag + 1):
gc_res = sm.tsa.stattools.grangercausalitytests(data[['sentiment', 'price_change']], lag, verbose=False)
p_value = gc_res[lag][0]['ssr_chi2test'][1]
gc_results.append({
'lag': lag,
'p_value': p_value,
'significant': p_value < 0.05
})
gc_result = pd.DataFrame(gc_results)
else:
gc_result = None
return {'correlation': corr_result, 'granger_causality': gc_result}
# 使用示例
if __name__ == "__main__":
# 创建示例数据
sentiment_data = pd.DataFrame({
'date': pd.date_range(start='2023-01-01', periods=60),
'sentiment_score': np.random.normal(0.2, 0.5, 60)
})
price_data = pd.DataFrame({
'date': pd.date_range(start='2023-01-01', periods=60),
'close': np.cumsum(np.random.normal(0, 1, 60)) + 100
})
# 初始化并计算指标
indicator = SentimentIndicator()
daily_sentiment = indicator.calculate_daily_sentiment(sentiment_data, 'date', 'sentiment_score')
full_indicators = indicator.calculate_indicators(daily_sentiment)
# 绘制情绪指标
fig = indicator.plot_sentiment_indicators(full_indicators, price_column='close')
plt.show()
# 分析情绪与价格的关系
analysis = indicator.analyze_relationship(full_indicators, price_data, 'sentiment_score_zscore')
print("情绪与未来价格的相关性分析:")
print(analysis['correlation'])
# 情绪与价格的背离分析
divergence = indicator.sentiment_divergence(full_indicators, price_data, 'sentiment_score')
print("\\n情绪与价格的背离:")
print(divergence[['date', 'divergence', 'bearish_divergence', 'bullish_divergence']].tail())
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import pytz
import backtrader as bt
import warnings
import logging
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('SentimentStrategy')
# 忽略警告
warnings.filterwarnings('ignore')
class SentimentData(bt.feeds.PandasData):
"""
自定义数据源,增加情绪指标列
"""
lines = ('sentiment', 'sentiment_z', 'sentiment_momentum', 'sentiment_volatility', 'sentiment_rsi')
params = (
('sentiment', -1),
('sentiment_z', -1),
('sentiment_momentum', -1),
('sentiment_volatility', -1),
('sentiment_rsi', -1),
)
class BaseSentimentStrategy(bt.Strategy):
"""
基础情绪分析策略类
"""
params = (
('sentiment_threshold_high', 1.0), # 高情绪阈值
('sentiment_threshold_low', -1.0), # 低情绪阈值
('position_size', 1.0), # 仓位大小占总资金的比例
('stop_loss', 0.03), # 止损比例
('take_profit', 0.05), # 止盈比例
('use_zscore', True), # 是否使用Z分数
('use_momentum', False), # 是否使用动量
('use_volatility', False), # 是否使用波动性
('use_rsi', False), # 是否使用RSI
('rsi_high', 70), # RSI高阈值
('rsi_low', 30), # RSI低阈值
('trail_percent', 0.0), # 追踪止损比例(0表示不使用)
)
def log(self, txt, dt=None):
"""记录策略日志"""
dt = dt or self.datas[0].datetime.date(0)
logger.info(f'{dt.isoformat()} - {txt}')
def __init__(self):
"""初始化策略"""
# 存储数据线引用
self.dataclose = self.datas[0].close
self.sentiment = self.datas[0].sentiment
self.sentiment_z = self.datas[0].sentiment_z
self.sentiment_momentum = self.datas[0].sentiment_momentum
self.sentiment_volatility = self.datas[0].sentiment_volatility
self.sentiment_rsi = self.datas[0].sentiment_rsi
# 订单和仓位跟踪
self.order = None
self.buy_price = None
self.buy_comm = None
# 用于存储止损和止盈价格
self.stop_price = None
self.target_price = None
self.trail_price = None
# 绘图指标
if self.p.use_zscore:
bt.indicators.ExponentialMovingAverage(self.sentiment_z, period=10)
if self.p.use_momentum:
bt.indicators.ExponentialMovingAverage(self.sentiment_momentum, period=10)
if self.p.use_volatility:
bt.indicators.ExponentialMovingAverage(self.sentiment_volatility, period=10)
if self.p.use_rsi:
bt.indicators.ExponentialMovingAverage(self.sentiment_rsi, period=10)
def notify_order(self, order):
"""处理订单状态通知"""
if order.status in [order.Submitted, order.Accepted]:
# 订单已提交/接受 - 无操作
return
# 检查订单是否已完成
if order.status in [order.Completed]:
if order.isbuy():
self.log(f'买入执行: 价格={order.executed.price:.2f}, 成本={order.executed.value:.2f}, 手续费={order.executed.comm:.2f}')
self.buy_price = order.executed.price
self.buy_comm = order.executed.comm
# 设置止损和止盈价格
self.stop_price = self.buy_price * (1.0 - self.p.stop_loss)
self.target_price = self.buy_price * (1.0 + self.p.take_profit)
# 如果使用追踪止损,设置初始追踪价格
if self.p.trail_percent > 0:
self.trail_price = self.buy_price * (1.0 - self.p.trail_percent)
elif order.issell():
self.log(f'卖出执行: 价格={order.executed.price:.2f}, 成本={order.executed.value:.2f}, 手续费={order.executed.comm:.2f}')
# 重置止损和止盈价格
self.stop_price = None
self.target_price = None
self.trail_price = None
elif order.status in [order.Canceled, order.Margin, order.Rejected]:
self.log('订单取消/保证金不足/拒绝')
# 重置订单
self.order = None
def notify_trade(self, trade):
"""处理交易结果通知"""
if not trade.isclosed:
return
self.log(f'交易利润: 毛利={trade.pnl:.2f}, 净利={trade.pnlcomm:.2f}')
def get_sentiment_signal(self):
"""
获取情绪信号
返回: 1 (看涨), -1 (看跌), 0 (中性)
"""
signal = 0
# 根据不同情绪指标生成信号
if self.p.use_zscore:
if self.sentiment_z[0] > self.p.sentiment_threshold_high:
signal += 1
elif self.sentiment_z[0] < self.p.sentiment_threshold_low:
signal -= 1
if self.p.use_momentum:
if self.sentiment_momentum[0] > 0:
signal += 1
elif self.sentiment_momentum[0] < 0:
signal -= 1
if self.p.use_volatility:
# 波动性高时减少信号强度
if self.sentiment_volatility[0] > np.nanmean(self.sentiment_volatility.get(size=30)):
signal = signal * 0.5
if self.p.use_rsi:
if self.sentiment_rsi[0] > self.p.rsi_high:
signal += 1
elif self.sentiment_rsi[0] < self.p.rsi_low:
signal -= 1
# 标准化信号
if signal > 0:
return 1
elif signal < 0:
return -1
else:
return 0
def next(self):
"""
策略核心逻辑:每个交易日执行一次
"""
# 跳过缺失数据
if np.isnan(self.sentiment[0]):
return
# 如果有未完成的订单,不执行新操作
if self.order:
return
# 获取情绪信号
sentiment_signal = self.get_sentiment_signal()
# 更新追踪止损价格(如果启用)
if self.position and self.p.trail_percent > 0:
new_trail_price = self.dataclose[0] * (1.0 - self.p.trail_percent)
if new_trail_price > self.trail_price:
self.trail_price = new_trail_price
self.log(f'更新追踪止损价格: {self.trail_price:.2f}')
# 检查止损和止盈条件
if self.position:
# 止损
if self.dataclose[0] < self.stop_price:
self.log(f'触发止损: 当前价格={self.dataclose[0]:.2f}, 止损价格={self.stop_price:.2f}')
self.order = self.sell()
return
# 追踪止损
if self.p.trail_percent > 0 and self.dataclose[0] < self.trail_price:
self.log(f'触发追踪止损: 当前价格={self.dataclose[0]:.2f}, 追踪价格={self.trail_price:.2f}')
self.order = self.sell()
return
# 止盈
if self.dataclose[0] > self.target_price:
self.log(f'触发止盈: 当前价格={self.dataclose[0]:.2f}, 目标价格={self.target_price:.2f}')
self.order = self.sell()
return
# 基于情绪信号执行交易
if not self.position: # 当前无仓位
if sentiment_signal > 0: # 情绪积极,买入信号
cash_to_use = self.broker.getcash() * self.p.position_size
size = cash_to_use / self.dataclose[0]
self.log(f'买入信号: 价格={self.dataclose[0]:.2f}, 规模={size:.2f}, 情绪分数={self.sentiment_z[0]:.2f}')
self.order = self.buy(size=size)
else: # 当前有仓位
if sentiment_signal < 0: # 情绪消极,卖出信号
self.log(f'卖出信号: 价格={self.dataclose[0]:.2f}, 情绪分数={self.sentiment_z[0]:.2f}')
self.order = self.sell()
class SentimentBacktester:
"""情绪分析策略回测器"""
def __init__(self, price_data, sentiment_data, initial_cash=100000.0):
"""
初始化回测器
参数:
price_data (pd.DataFrame): 包含OHLCV数据的DataFrame
sentiment_data (pd.DataFrame): 包含情绪指标的DataFrame
initial_cash (float): 初始资金
"""
self.price_data = price_data
self.sentiment_data = sentiment_data
self.initial_cash = initial_cash
self.cerebro = bt.Cerebro()
self.cerebro.broker.setcash(initial_cash)
self.cerebro.broker.setcommission(commission=0.001) # 0.1%手续费
# 设置分析器
self.cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
self.cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
self.cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
self.cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
def prepare_data(self):
"""准备回测数据"""
# 确保两个数据框的日期格式一致
self.price_data['date'] = pd.to_datetime(self.price_data['date'])
self.sentiment_data['date'] = pd.to_datetime(self.sentiment_data['date'])
# 合并价格和情绪数据
merged_data = pd.merge(
self.price_data,
self.sentiment_data,
on='date',
how='left'
)
# 确保日期是索引
merged_data.set_index('date', inplace=True)
# 创建回测数据源
data = SentimentData(
dataname=merged_data,
open=merged_data['open'],
high=merged_data['high'],
low=merged_data['low'],
close=merged_data['close'],
volume=merged_data['volume'],
sentiment=merged_data['sentiment_score'],
sentiment_z=merged_data['sentiment_score_zscore'],
sentiment_momentum=merged_data['sentiment_score_momentum'],
sentiment_volatility=merged_data['sentiment_score_volatility'],
sentiment_rsi=merged_data['sentiment_score_rsi'],
openinterest=None,
datetime=None, # 使用索引作为日期
)
self.cerebro.adddata(data)
return data
def run_backtest(self, strategy_class=BaseSentimentStrategy, **strategy_params):
"""
运行回测
参数:
strategy_class: 要使用的策略类
**strategy_params: 策略参数
返回:
tuple: (最终价值, 分析结果)
"""
# 添加策略
self.cerebro.addstrategy(strategy_class, **strategy_params)
# 运行回测
results = self.cerebro.run()
self.strat = results[0]
# 获取回测结果
final_value = self.cerebro.broker.getvalue()
profit_perc = (final_value - self.initial_cash) / self.initial_cash * 100
# 收集分析结果
analysis = {
'initial_cash': self.initial_cash,
'final_value': final_value,
'profit_loss': final_value - self.initial_cash,
'profit_perc': profit_perc,
'sharpe_ratio': self.strat.analyzers.sharpe.get_analysis()['sharperatio'],
'max_drawdown': self.strat.analyzers.drawdown.get_analysis()['max']['drawdown'],
'return_perc': self.strat.analyzers.returns.get_analysis()['rtot'] * 100,
}
# 分析交易情况
trade_analysis = self.strat.analyzers.trades.get_analysis()
# 总交易数
analysis['total_trades'] = trade_analysis.get('total', 0)
# 盈利交易
if 'won' in trade_analysis:
analysis['winning_trades'] = trade_analysis['won']['total']
analysis['win_rate'] = (trade_analysis['won']['total'] / analysis['total_trades']) * 100 if analysis['total_trades'] > 0 else 0
analysis['avg_winning_trade'] = trade_analysis['won']['pnl']['average']
else:
analysis['winning_trades'] = 0
analysis['win_rate'] = 0
analysis['avg_winning_trade'] = 0
# 亏损交易
if 'lost' in trade_analysis:
analysis['losing_trades'] = trade_analysis['lost']['total']
analysis['loss_rate'] = (trade_analysis['lost']['total'] / analysis['total_trades']) * 100 if analysis['total_trades'] > 0 else 0
analysis['avg_losing_trade'] = trade_analysis['lost']['pnl']['average']
else:
analysis['losing_trades'] = 0
analysis['loss_rate'] = 0
analysis['avg_losing_trade'] = 0
# 利润因子
if analysis['losing_trades'] > 0 and 'lost' in trade_analysis and 'won' in trade_analysis:
analysis['profit_factor'] = abs(trade_analysis['won']['pnl']['total'] / trade_analysis['lost']['pnl']['total']) if trade_analysis['lost']['pnl']['total'] != 0 else float('inf')
else:
analysis['profit_factor'] = float('inf') if analysis['winning_trades'] > 0 else 0
return final_value, analysis
def plot_results(self, filename=None):
"""
绘制回测结果
参数:
filename (str, optional): 保存图表的文件名
"""
plt.figure(figsize=(15, 10))
plt.rcParams['axes.grid'] = True
# 绘制回测图表
self.cerebro.plot(style='candlestick', barup='green', bardown='red',
volup='green', voldown='red',
plotdist=0.1,
start=0, end=len(self.price_data))
if filename:
plt.savefig(filename)
plt.show()
def optimize_strategy(self, strategy_class=BaseSentimentStrategy, param_grid=None):
"""
优化策略参数
参数:
strategy_class: 要优化的策略类
param_grid (dict): 参数网格,格式为 {参数名: [参数值列表]}
返回:
tuple: (最佳参数, 最佳性能)
"""
if param_grid is None:
param_grid = {
'sentiment_threshold_high': [0.5, 1.0, 1.5, 2.0],
'sentiment_threshold_low': [-0.5, -1.0, -1.5, -2.0],
'position_size': [0.1, 0.25, 0.5, 1.0],
'stop_loss': [0.02, 0.03, 0.05],
'take_profit': [0.03, 0.05, 0.08],
'use_zscore': [True],
'use_momentum': [False, True],
'use_rsi': [False, True]
}
# 创建参数组合
param_combinations = self._create_param_combinations(param_grid)
best_sharpe = -np.inf
best_params = None
best_performance = None
# 遍历所有参数组合
for params in param_combinations:
# 重置cerebro实例
self.cerebro = bt.Cerebro()
self.cerebro.broker.setcash(self.initial_cash)
self.cerebro.broker.setcommission(commission=0.001)
# 设置分析器
self.cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
self.cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
self.cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
self.cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
# 准备数据
self.prepare_data()
# 添加策略
self.cerebro.addstrategy(strategy_class, **params)
# 运行回测
results = self.cerebro.run()
strat = results[0]
# 获取夏普比率
sharpe = strat.analyzers.sharpe.get_analysis()['sharperatio']
# 如果找到更好的参数
if sharpe > best_sharpe and not np.isnan(sharpe):
best_sharpe = sharpe
best_params = params
# 收集最佳性能指标
best_performance = {
'final_value': self.cerebro.broker.getvalue(),
'profit_perc': (self.cerebro.broker.getvalue() - self.initial_cash) / self.initial_cash * 100,
'sharpe_ratio': sharpe,
'max_drawdown': strat.analyzers.drawdown.get_analysis()['max']['drawdown'],
'return_perc': strat.analyzers.returns.get_analysis()['rtot'] * 100,
}
# 分析交易
trade_analysis = strat.analyzers.trades.get_analysis()
best_performance['total_trades'] = trade_analysis.get('total', 0)
if 'won' in trade_analysis:
best_performance['winning_trades'] = trade_analysis['won']['total']
best_performance['win_rate'] = (trade_analysis['won']['total'] / best_performance['total_trades']) * 100 if best_performance['total_trades'] > 0 else 0
else:
best_performance['winning_trades'] = 0
best_performance['win_rate'] = 0
if 'lost' in trade_analysis:
best_performance['losing_trades'] = trade_analysis['lost']['total']
best_performance['loss_rate'] = (trade_analysis['lost']['total'] / best_performance['total_trades']) * 100 if best_performance['total_trades'] > 0 else 0
else:
best_performance['losing_trades'] = 0
best_performance['loss_rate'] = 0
return best_params, best_performance
def _create_param_combinations(self, param_grid):
"""创建参数网格的所有组合"""
import itertools
keys = param_grid.keys()
values = param_grid.values()
combinations = list(itertools.product(*values))
param_combinations = []
for combo in combinations:
param_dict = dict(zip(keys, combo))
param_combinations.append(param_dict)
return param_combinations
# 使用示例
if __name__ == "__main__":
# 创建示例价格数据
dates = pd.date_range(start='2023-01-01', periods=100)
price_data = pd.DataFrame({
'date': dates,
'open': np.random.normal(100, 2, 100),
'high': np.random.normal(102, 2, 100),
'low': np.random.normal(98, 2, 100),
'close': np.random.normal(100, 2, 100),
'volume': np.random.randint(1000000, 5000000, 100)
})
# 生成一些具有趋势的价格
for i in range(1, len(price_data)):
price_data.loc[i, 'close'] = price_data.loc[i-1, 'close'] * (1 + np.random.normal(0.001, 0.02))
price_data.loc[i, 'high'] = max(price_data.loc[i, 'close'] * (1 + abs(np.random.normal(0, 0.01))),
price_data.loc[i, 'open'])
price_data.loc[i, 'low'] = min(price_data.loc[i, 'close'] * (1 - abs(np.random.normal(0, 0.01))),
price_data.loc[i, 'open'])
# 创建示例情绪数据
sentiment_data = pd.DataFrame({
'date': dates,
'sentiment_score': np.random.normal(0.2, 0.5, 100),
'sentiment_score_zscore': np.random.normal(0, 1, 100),
'sentiment_score_momentum': np.random.normal(0, 0.2, 100),
'sentiment_score_volatility': np.random.normal(0.2, 0.1, 100),
'sentiment_score_rsi': np.random.uniform(30, 70, 100)
})
# 使情绪数据呈现正相关性
for i in range(1, len(sentiment_data)):
price_change = price_data.loc[i, 'close'] / price_data.loc[i-1, 'close'] - 1
sentiment_data.loc[i, 'sentiment_score'] = sentiment_data.loc[i-1, 'sentiment_score'] + price_change * 2 + np.random.normal(0, 0.2)
sentiment_data.loc[i, 'sentiment_score_zscore'] = (sentiment_data.loc[i, 'sentiment_score'] - sentiment_data['sentiment_score'].mean()) / sentiment_data['sentiment_score'].std()
# 初始化回测器
backtester = SentimentBacktester(price_data, sentiment_data)
# 准备数据
backtester.prepare_data()
# 运行回测
final_value, analysis = backtester.run_backtest(
sentiment_threshold_high=1.0,
sentiment_threshold_low=-1.0,
position_size=0.5,
stop_loss=0.03,
take_profit=0.05,
use_zscore=True,
use_momentum=True
)
# 打印回测结果
print("回测结果:")
for key, value in analysis.items():
print(f"{key}: {value}")
# 绘制回测结果
backtester.plot_results()
# 优化策略参数
best_params, best_performance = backtester.optimize_strategy(
param_grid={
'sentiment_threshold_high': [0.5, 1.0, 1.5],
'sentiment_threshold_low': [-0.5, -1.0, -1.5],
'position_size': [0.25, 0.5],
'use_zscore': [True],
'use_momentum': [False, True]
}
)
print("\\n最佳参数:")
for key, value in best_params.items():
print(f"{key}: {value}")
print("\\n最佳性能:")
for key, value in best_performance.items():
print(f"{key}: {value}")
# 使用示例
if __name__ == "__main__":
# 创建示例价格数据
dates = pd.date_range(start='2023-01-01', periods=100)
price_data = pd.DataFrame({
'date': dates,
'open': np.random.normal(100, 2, 100),
'high': np.random.normal(102, 2, 100),
'low': np.random.normal(98, 2, 100),
'close': np.random.normal(100, 2, 100),
'volume': np.random.randint(1000000, 5000000, 100)
})
# 生成一些具有趋势的价格
for i in range(1, len(price_data)):
price_data.loc[i, 'close'] = price_data.loc[i-1, 'close'] * (1 + np.random.normal(0.001, 0.02))
price_data.loc[i, 'high'] = max(price_data.loc[i, 'close'] * (1 + abs(np.random.normal(0, 0.01))),
price_data.loc[i, 'open'])
price_data.loc[i, 'low'] = min(price_data.loc[i, 'close'] * (1 - abs(np.random.normal(0, 0.01))),
price_data.loc[i, 'open'])
# 创建示例情绪数据
sentiment_data = pd.DataFrame({
'date': dates,
'sentiment_score': np.random.normal(0.2, 0.5, 100),
'sentiment_score_zscore': np.random.normal(0, 1, 100),
'sentiment_score_momentum': np.random.normal(0, 0.2, 100),
'sentiment_score_volatility': np.random.normal(0.2, 0.1, 100),
'sentiment_score_rsi': np.random.uniform(30, 70, 100)
})
# 使情绪数据呈现正相关性
for i in range(1, len(sentiment_data)):
price_change = price_data.loc[i, 'close'] / price_data.loc[i-1, 'close'] - 1
sentiment_data.loc[i, 'sentiment_score'] = sentiment_data.loc[i-1, 'sentiment_score'] + price_change * 2 + np.random.normal(0, 0.2)
sentiment_data.loc[i, 'sentiment_score_zscore'] = (sentiment_data.loc[i, 'sentiment_score'] - sentiment_data['sentiment_score'].mean()) / sentiment_data['sentiment_score'].std()
# 初始化回测器
backtester = SentimentBacktester(price_data, sentiment_data)
# 准备数据
backtester.prepare_data()
# 运行回测
final_value, analysis = backtester.run_backtest(
sentiment_threshold_high=1.0,
sentiment_threshold_low=-1.0,
position_size=0.5,
stop_loss=0.03,
take_profit=0.05,
use_zscore=True,
use_momentum=True
)
# 打印回测结果
print("回测结果:")
for key, value in analysis.items():
print(f"{key}: {value}")
# 绘制回测结果
backtester.plot_results()
# 优化策略参数
best_params, best_performance = backtester.optimize_strategy(
param_grid={
'sentiment_threshold_high': [0.5, 1.0, 1.5],
'sentiment_threshold_low': [-0.5, -1.0, -1.5],
'position_size': [0.25, 0.5],
'use_zscore': [True],
'use_momentum': [False, True]
}
)
print("\\n最佳参数:")
for key, value in best_params.items():
print(f"{key}: {value}")
print("\\n最佳性能:")
for key, value in best_performance.items():
print(f"{key}: {value}")