加密货币市场具有高波动性、全天候交易和特殊市场结构等独特特征,这为量化分析师提供了丰富的研究素材和交易机会。通过构建专业的收益率统计分析工具,我们可以深入了解不同加密资产的风险收益特征,为投资决策和策略开发提供数据支持。
本项目旨在开发一个综合性的加密货币收益率分析工具,帮助学习者掌握金融数据处理、统计分析和量化方法在加密市场的应用。
项目目标:
# crypto_data_fetcher.py
import os
import pandas as pd
import numpy as np
import ccxt
import time
from datetime import datetime, timedelta
import pandas_ta as ta
class CryptoDataFetcher:
"""加密货币数据获取类"""
def __init__(self, exchange_id='binance', api_key=None, api_secret=None):
"""
初始化数据获取器
参数:
exchange_id (str): 交易所ID,默认为'binance'
api_key (str): API密钥,默认为None
api_secret (str): API密钥,默认为None
"""
exchange_class = getattr(ccxt, exchange_id)
self.exchange = exchange_class({
'apiKey': api_key,
'secret': api_secret,
'enableRateLimit': True,
'options': {'defaultType': 'spot'}
})
def get_ohlcv(self, symbol, timeframe='1d', limit=1000, since=None):
"""
获取OHLCV数据
参数:
symbol (str): 交易对,如'BTC/USDT'
timeframe (str): 时间框架,如'1m', '5m', '1h', '1d'等
limit (int): 获取的K线数量
since (int): 开始时间戳(毫秒)
返回:
pd.DataFrame: 包含OHLCV数据的DataFrame
"""
try:
ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, since, limit)
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
return df
except Exception as e:
print(f"获取OHLCV数据失败: {e}")
return pd.DataFrame()
def get_historical_data(self, symbol, timeframe='1d', days=365):
"""
获取历史数据
参数:
symbol (str): 交易对,如'BTC/USDT'
timeframe (str): 时间框架
days (int): 获取多少天的数据
返回:
pd.DataFrame: 包含历史OHLCV数据的DataFrame
"""
now = datetime.now()
since = int((now - timedelta(days=days)).timestamp() * 1000)
all_data = []
while since < now.timestamp() * 1000:
data = self.get_ohlcv(symbol, timeframe, limit=1000, since=since)
if len(data) == 0:
break
all_data.append(data)
since = data.index[-1].timestamp() * 1000 + 1
time.sleep(self.exchange.rateLimit / 1000) # 遵守API速率限制
if not all_data:
return pd.DataFrame()
result = pd.concat(all_data)
result = result[~result.index.duplicated(keep='first')]
return result.sort_index()
def get_tickers(self, symbols=None):
"""
获取交易对的ticker信息
参数:
symbols (list): 交易对列表,如['BTC/USDT', 'ETH/USDT']
返回:
dict: 交易对ticker信息
"""
try:
return self.exchange.fetch_tickers(symbols)
except Exception as e:
print(f"获取ticker失败: {e}")
return {}
def get_markets(self):
"""
获取所有可用市场
返回:
list: 可用市场列表
"""
return self.exchange.load_markets()
# 使用示例
# fetcher = CryptoDataFetcher()
# btc_data = fetcher.get_historical_data('BTC/USDT', '1d', 90)
# app.py
import streamlit as st
import pandas as pd
import numpy as np
import plotly.graph_objects as go
import plotly.express as px
from crypto_data_fetcher import CryptoDataFetcher
# 设置页面
st.set_page_config(
page_title="Crypto收益率分析工具",
page_icon="📊",
layout="wide"
)
# 初始化会话状态
if 'data' not in st.session_state:
st.session_state.data = {}
if 'selected_coins' not in st.session_state:
st.session_state.selected_coins = []
# 侧边栏
st.sidebar.title("配置参数")
# 数据获取设置
exchange_options = ['binance', 'coinbase', 'kraken', 'huobi', 'kucoin']
selected_exchange = st.sidebar.selectbox("选择交易所", exchange_options)
timeframe_options = ['1m', '5m', '15m', '30m', '1h', '4h', '1d', '1w']
selected_timeframe = st.sidebar.selectbox("选择时间周期", timeframe_options, index=6) # 默认1d
available_days = [30, 60, 90, 180, 365]
selected_days = st.sidebar.selectbox("数据周期(天)", available_days, index=2) # 默认90天
# 创建数据获取器实例
@st.cache_resource
def get_data_fetcher(exchange_id):
return CryptoDataFetcher(exchange_id=exchange_id)
data_fetcher = get_data_fetcher(selected_exchange)
# 获取可用交易对
@st.cache_data(ttl=3600)
def get_available_pairs(exchange_id):
fetcher = CryptoDataFetcher(exchange_id=exchange_id)
markets = fetcher.get_markets()
usdt_pairs = [symbol for symbol in markets.keys() if symbol.endswith('/USDT')]
return sorted(usdt_pairs)
# 选择交易对
available_pairs = get_available_pairs(selected_exchange)
default_coins = ['BTC/USDT', 'ETH/USDT']
default_indices = [available_pairs.index(coin) if coin in available_pairs else 0 for coin in default_coins]
selected_coins = st.sidebar.multiselect(
"选择加密货币",
available_pairs,
default=[pair for pair in default_coins if pair in available_pairs]
)
st.session_state.selected_coins = selected_coins
# 数据加载按钮
if st.sidebar.button("加载数据"):
progress_bar = st.progress(0)
for i, coin in enumerate(selected_coins):
st.session_state.data[coin] = data_fetcher.get_historical_data(
coin,
selected_timeframe,
selected_days
)
progress_bar.progress((i + 1) / len(selected_coins))
st.sidebar.success(f"成功加载 {len(selected_coins)} 个交易对的数据!")
# 主界面
st.title("Crypto收益率统计分析工具")
# 数据预览部分
if st.session_state.data:
st.header("数据预览")
tabs = st.tabs(list(st.session_state.data.keys()))
for i, (coin, data) in enumerate(st.session_state.data.items()):
with tabs[i]:
st.dataframe(data)
# 简单价格图表
fig = px.line(
data,
y='close',
title=f"{coin} 收盘价走势"
)
st.plotly_chart(fig, use_container_width=True)
else:
st.info("请在侧边栏选择交易对并点击'加载数据'按钮开始分析")
# 添加功能区占位符
st.header("收益率分析")
st.info("请实现收益率计算方法...")
st.header("波动性分析")
st.info("请实现波动性分析功能...")
st.header("相关性分析")
st.info("请实现相关性分析模块...")
st.header("策略回测")
st.info("请实现基本策略回测框架...")
if __name__ == "__main__":
# 应用启动方法
# streamlit run app.py
pass
# data_storage.py
import pandas as pd
import numpy as np
import os
import json
import pickle
from datetime import datetime
class CryptoDataStorage:
"""加密货币数据存储类"""
def __init__(self, base_path="./data"):
"""
初始化数据存储器
参数:
base_path (str): 数据存储的基础路径
"""
self.base_path = base_path
self._create_directory_structure()
def _create_directory_structure(self):
"""创建数据存储的目录结构"""
directories = [
self.base_path,
f"{self.base_path}/raw",
f"{self.base_path}/processed",
f"{self.base_path}/models",
f"{self.base_path}/results"
]
for directory in directories:
if not os.path.exists(directory):
os.makedirs(directory)
def save_raw_data(self, data, symbol, timeframe):
"""
保存原始数据
参数:
data (pd.DataFrame): 要保存的数据
symbol (str): 交易对,如'BTC/USDT'
timeframe (str): 时间框架,如'1d'
"""
symbol_path = symbol.replace("/", "_")
file_path = f"{self.base_path}/raw/{symbol_path}_{timeframe}.csv"
data.to_csv(file_path)
def load_raw_data(self, symbol, timeframe):
"""
加载原始数据
参数:
symbol (str): 交易对,如'BTC/USDT'
timeframe (str): 时间框架,如'1d'
返回:
pd.DataFrame: 加载的数据
"""
symbol_path = symbol.replace("/", "_")
file_path = f"{self.base_path}/raw/{symbol_path}_{timeframe}.csv"
if os.path.exists(file_path):
return pd.read_csv(file_path, index_col=0, parse_dates=True)
else:
return None
def save_processed_data(self, data, name):
"""
保存处理后的数据
参数:
data (pd.DataFrame): 要保存的数据
name (str): 数据名称
"""
file_path = f"{self.base_path}/processed/{name}.csv"
data.to_csv(file_path)
def load_processed_data(self, name):
"""
加载处理后的数据
参数:
name (str): 数据名称
返回:
pd.DataFrame: 加载的数据
"""
file_path = f"{self.base_path}/processed/{name}.csv"
if os.path.exists(file_path):
return pd.read_csv(file_path, index_col=0, parse_dates=True)
else:
return None
def save_model(self, model, name):
"""
保存模型
参数:
model: 要保存的模型
name (str): 模型名称
"""
file_path = f"{self.base_path}/models/{name}.pkl"
with open(file_path, 'wb') as f:
pickle.dump(model, f)
def load_model(self, name):
"""
加载模型
参数:
name (str): 模型名称
返回:
加载的模型
"""
file_path = f"{self.base_path}/models/{name}.pkl"
if os.path.exists(file_path):
with open(file_path, 'rb') as f:
return pickle.load(f)
else:
return None
def save_results(self, results, name):
"""
保存结果
参数:
results (dict): 要保存的结果
name (str): 结果名称
"""
file_path = f"{self.base_path}/results/{name}.json"
# 处理非JSON可序列化对象
for key, value in results.items():
if isinstance(value, (np.ndarray, pd.Series)):
results[key] = value.tolist()
elif isinstance(value, pd.DataFrame):
results[key] = value.to_dict()
elif isinstance(value, datetime):
results[key] = value.isoformat()
with open(file_path, 'w') as f:
json.dump(results, f, indent=4)
def load_results(self, name):
"""
加载结果
参数:
name (str): 结果名称
返回:
dict: 加载的结果
"""
file_path = f"{self.base_path}/results/{name}.json"
if os.path.exists(file_path):
with open(file_path, 'r') as f:
return json.load(f)
else:
return None
# visualization_templates.py
import plotly.graph_objects as go
import plotly.express as px
import plotly.figure_factory as ff
import pandas as pd
import numpy as np
from plotly.subplots import make_subplots
class CryptoVisualization:
"""加密货币数据可视化模板类"""
@staticmethod
def plot_price_chart(df, title=None, include_volume=True):
"""
绘制价格图表
参数:
df (pd.DataFrame): 包含OHLCV数据的DataFrame
title (str): 图表标题
include_volume (bool): 是否包含成交量
返回:
go.Figure: Plotly图表对象
"""
if include_volume:
fig = make_subplots(rows=2, cols=1, shared_xaxes=True,
vertical_spacing=0.03, row_heights=[0.7, 0.3])
else:
fig = go.Figure()
# 添加K线图
candlestick = go.Candlestick(
x=df.index,
open=df['open'],
high=df['high'],
low=df['low'],
close=df['close'],
name="OHLC"
)
if include_volume:
fig.add_trace(candlestick, row=1, col=1)
else:
fig.add_trace(candlestick)
# 添加成交量
if include_volume and 'volume' in df.columns:
colors = ['green' if row['close'] >= row['open'] else 'red' for i, row in df.iterrows()]
volume_bar = go.Bar(
x=df.index,
y=df['volume'],
name="Volume",
marker_color=colors
)
fig.add_trace(volume_bar, row=2, col=1)
# 设置图表布局
fig.update_layout(
title=title,
xaxis_rangeslider_visible=False,
yaxis_title="Price",
xaxis_title="Date",
height=600,
template="plotly_white"
)
if include_volume:
fig.update_yaxes(title_text="Volume", row=2, col=1)
return fig
@staticmethod
def plot_returns_distribution(returns, title=None):
"""
绘制收益率分布图
参数:
returns (pd.Series): 收益率数据
title (str): 图表标题
返回:
go.Figure: Plotly图表对象
"""
fig = make_subplots(rows=1, cols=2, subplot_titles=("收益率时间序列", "收益率分布"))
# 添加收益率时间序列
fig.add_trace(
go.Scatter(x=returns.index, y=returns.values, mode='lines', name="Returns"),
row=1, col=1
)
# 添加收益率分布
hist_data = [returns.dropna().values]
group_labels = ['Returns']
# 创建分布图
hist_fig = ff.create_distplot(hist_data, group_labels, show_hist=True,
bin_size=(returns.max() - returns.min()) / 50)
for trace in hist_fig['data']:
fig.add_trace(trace, row=1, col=2)
# 设置图表布局
fig.update_layout(
title=title,
height=400,
template="plotly_white"
)
return fig
@staticmethod
def plot_correlation_matrix(corr_matrix, title=None):
"""
绘制相关性矩阵热图
参数:
corr_matrix (pd.DataFrame): 相关性矩阵
title (str): 图表标题
返回:
go.Figure: Plotly图表对象
"""
fig = go.Figure(data=go.Heatmap(
z=corr_matrix.values,
x=corr_matrix.columns,
y=corr_matrix.index,
colorscale='RdBu',
zmin=-1, zmax=1,
text=np.round(corr_matrix.values, 2),
texttemplate="%{text:.2f}",
textfont={"size":10},
hoverongaps=False
))
fig.update_layout(
title=title,
height=500,
width=700,
template="plotly_white"
)
return fig
@staticmethod
def plot_rolling_statistics(data, window=30, title=None):
"""
绘制滚动统计图
参数:
data (pd.Series): 数据
window (int): 滚动窗口大小
title (str): 图表标题
返回:
go.Figure: Plotly图表对象
"""
rolling_mean = data.rolling(window=window).mean()
rolling_std = data.rolling(window=window).std()
rolling_sharpe = (rolling_mean / rolling_std) * np.sqrt(252) # 假设日度数据,年化
fig = make_subplots(rows=3, cols=1, shared_xaxes=True,
subplot_titles=("滚动均值", "滚动标准差", "滚动夏普比率"),
vertical_spacing=0.05)
# 添加滚动均值
fig.add_trace(
go.Scatter(x=rolling_mean.index, y=rolling_mean.values, mode='lines', name=f"{window}日滚动均值"),
row=1, col=1
)
# 添加滚动标准差
fig.add_trace(
go.Scatter(x=rolling_std.index, y=rolling_std.values, mode='lines', name=f"{window}日滚动标准差"),
row=2, col=1
)
# 添加滚动夏普比率
fig.add_trace(
go.Scatter(x=rolling_sharpe.index, y=rolling_sharpe.values, mode='lines', name=f"{window}日滚动夏普比率"),
row=3, col=1
)
# 设置图表布局
fig.update_layout(
title=title,
height=700,
template="plotly_white"
)
return fig
@staticmethod
def plot_strategy_performance(equity_curve, benchmark=None, title=None):
"""
绘制策略表现图
参数:
equity_curve (pd.Series): 策略权益曲线
benchmark (pd.Series): 基准权益曲线,可选
title (str): 图表标题
返回:
go.Figure: Plotly图表对象
"""
fig = make_subplots(rows=2, cols=1, shared_xaxes=True,
subplot_titles=("累计收益", "回撤"),
vertical_spacing=0.05, row_heights=[0.7, 0.3])
# 添加策略累计收益
fig.add_trace(
go.Scatter(x=equity_curve.index, y=equity_curve.values, mode='lines', name="策略"),
row=1, col=1
)
# 添加基准累计收益(如果有)
if benchmark is not None:
fig.add_trace(
go.Scatter(x=benchmark.index, y=benchmark.values, mode='lines', name="基准"),
row=1, col=1
)
# 计算回撤
drawdown = (equity_curve / equity_curve.cummax() - 1) * 100
# 添加回撤
fig.add_trace(
go.Scatter(x=drawdown.index, y=drawdown.values, mode='lines',
name="回撤", fill='tozeroy', fillcolor='rgba(255,0,0,0.2)'),
row=2, col=1
)
# 设置图表布局
fig.update_layout(
title=title,
height=600,
template="plotly_white"
)
fig.update_yaxes(title_text="累计收益", row=1, col=1)
fig.update_yaxes(title_text="回撤(%)", row=2, col=1)
return fig
需实现一个完整的returns_calculator.py模块,包含以下功能: