# analysis **Repository Path**: quant-seminar/analysis ## Basic Information - **Project Name**: analysis - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: dev - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-04-09 - **Last Updated**: 2026-05-15 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 量化因子分析系统 (Quantitative Factor Analysis System) [![Python](https://img.shields.io/badge/python-3.12+-blue.svg)](https://www.python.org/downloads/) [![FastAPI](https://img.shields.io/badge/FastAPI-0.135.3+-green.svg)](https://fastapi.tiangolo.com/) [![DolphinDB](https://img.shields.io/badge/DolphinDB-3.0.4+-orange.svg)](https://www.dolphindb.com/) [![Docker](https://img.shields.io/badge/docker-supported-blue.svg)](https://www.docker.com/) 一个基于 FastAPI 和 DolphinDB 的企业级量化金融因子分析平台,专为量化研究员和策略开发者设计,提供完整的因子挖掘、验证和回测解决方案。 ## 📋 核心功能 ### 🔬 高级因子分析 - **单因子完整分析**: IC分析、分组回测、风险调整收益 - **多周期支持**: 日线/周线/月线数据分析 - **统计显著性检验**: t检验、信息比率、胜率统计 - **因子衰减分析**: 多期收益预测能力评估 - **行业中性化**: 消除行业偏差的纯因子效应 ### 🎯 专业回测引擎 - **分组回测**: 自动10分位数分组,计算多空组合收益 - **基准比较**: 对比市场基准(沪深300)超额收益 - **风险调整**: 夏普比率、最大回撤、卡尔玛比率 - **换手率分析**: 因子稳定性和交易成本评估 - **累积收益曲线**: 可视化因子表现历史 ### 🔧 灵活因子构建 - **200+ 技术指标**: 完整集成TA-Lib技术分析库 - **复合因子表达式**: 支持嵌套运算和条件逻辑 - **时间序列操作**: 滞后、移动平均、指数加权等 - **横截面操作**: 排序、分位数、行业相对值 - **自定义函数**: 可扩展的因子计算框架 ### 🏢 多市场覆盖 - **A股主要指数**: 沪深300、中证1000、中证2000 - **全市场数据**: 支持任意股票池自定义分析 - **行业分类**: 申万一级行业中性化处理 - **市值加权**: 流通市值加权组合构建 ## 🏗️ 系统架构 ### 技术栈概览 ``` Frontend (WebSocket Client) ↓ FastAPI WebSocket API ↓ 异步因子计算引擎 ↓ ┌─────────────────────────────┐ │ 数据源层 │ ├─────────────────────────────┤ │ • Tushare API (基础数据) │ │ • 外部数据服务 (高频数据) │ │ • DolphinDB (历史数据) │ └─────────────────────────────┘ ↓ ┌─────────────────────────────┐ │ 计算引擎 │ ├─────────────────────────────┤ │ • TA-Lib (技术指标) │ │ • pandas (数据处理) │ │ • 自定义因子算子 │ └─────────────────────────────┘ ↓ ┌─────────────────────────────┐ │ 分析引擎 │ ├─────────────────────────────┤ │ • DolphinDB 回测模块 │ │ • IC/RankIC 计算 │ │ • 风险模型 │ └─────────────────────────────┘ ``` ### 核心组件 - **FastAPI 服务器**: 异步Web框架,支持WebSocket实时通信 - **DolphinDB**: 高性能时序数据库,专业金融计算引擎 - **因子计算引擎**: 基于AST的表达式解析和执行引擎 - **数据获取层**: 多源数据聚合,支持缓存和容错 - **回测引擎**: 完整的单因子分析和评价体系 ## 🚀 快速开始 ### 系统要求 #### 硬件要求 - **CPU**: 4核及以上 (推荐8核,因子计算密集型) - **内存**: 8GB及以上 (推荐16GB,大数据集分析) - **存储**: 50GB可用空间 (数据缓存和结果存储) - **网络**: 稳定的互联网连接 (数据获取) #### 软件环境 - **Python**: 3.12+ (必需,使用了新语法特性) - **DolphinDB**: 3.0.4+ 服务器 - **操作系统**: Linux/Windows/macOS - **内存数据库**: DolphinDB Server 运行内存建议4GB+ ### 详细安装指南 #### 1. 克隆项目 ```bash git clone cd analysis ``` #### 2. Python环境配置 **方案一: 使用 uv (强烈推荐)** ```bash # 安装 uv (现代Python包管理器) curl -LsSf https://astral.sh/uv/install.sh | sh # 或者 Windows: powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex" # 创建虚拟环境并安装依赖 uv venv .venv source .venv/bin/activate # Windows: .venv\Scripts\activate # 安装所有依赖 (包括开发依赖) uv pip install -e ".[dev]" ``` **方案二: 使用传统 pip** ```bash # 创建虚拟环境 python -m venv .venv source .venv/bin/activate # Windows: .venv\Scripts\activate # 升级pip并安装依赖 pip install --upgrade pip pip install -e . ``` #### 3. DolphinDB安装与配置 **下载安装** ```bash # Linux wget https://www.dolphindb.com/downloads/DolphinDB_Linux64_V3.0.4.tar.gz tar -xzf DolphinDB_Linux64_V3.0.4.tar.gz # 启动服务器 cd DolphinDB_Linux64_V3.0.4 ./dolphindb -console 0 # 后台运行: nohup ./dolphindb & ``` **基础配置** (dolphindb.cfg) ```ini # 端口配置 localSite=localhost:8848:local8848 controllerSite=localhost:8900:controller # 内存配置 (根据实际内存调整) maxMemSize=4 regularArrayMemoryLimit=512 bigArrayMemoryLimit=1024 # 数据目录 dataDirectory=/path/to/data logDirectory=/path/to/logs # 启用Web管理界面 webLoginRequired=false ``` **安装量化模块** 在DolphinDB控制台执行: ```dolphindb // 安装因子回测模块 installPlugin("factorBacktest") loadPlugin("factorBacktest") // 安装技术分析模块 installPlugin("ta") loadPlugin("ta") ``` #### 4. 环境变量配置 创建 `.env` 文件 (复制 `.env.example` 并修改): ```env #################### # 运行环境配置 #################### PROD=false # 是否生产环境 DEBUG=true # 开启调试模式 LOG_LEVEL=INFO # 日志级别: DEBUG/INFO/WARNING/ERROR #################### # DolphinDB 配置 #################### DOLPHIN_HOST=localhost # DolphinDB服务器地址 DOLPHIN_PORT=8848 # DolphinDB端口 DOLPHIN_USERNAME=admin # 用户名 DOLPHIN_PASSWORD=123456 # 密码 DOLPHIN_DATABASE=dfs://analysis # 分布式数据库名(可选) #################### # 数据源配置 #################### # Tushare Pro API (必需) TUSHARE_TOKEN=your_tushare_token_here_get_from_tushare_pro # 外部数据服务 (可选,提供高频数据) DATA_SERVICE_URL=http://localhost:9000 DATA_SERVICE_TOKEN=optional_auth_token # 数据缓存配置 CACHE_ENABLED=true CACHE_TTL=3600 # 缓存过期时间(秒) CACHE_SIZE=1000 # 最大缓存条目数 #################### # 服务配置 #################### # API服务 API_HOST=0.0.0.0 API_PORT=8000 API_WORKERS=1 # 工作进程数 # 文件存储 MEDIA_ROOT=./media # 结果文件存储目录 MAX_FILE_SIZE=100 # 最大文件大小(MB) FILE_RETENTION_DAYS=30 # 文件保留天数 #################### # 性能优化 #################### # 异步配置 MAX_CONCURRENT_REQUESTS=10 # 最大并发请求数 REQUEST_TIMEOUT=300 # 请求超时时间(秒) CONNECTION_POOL_SIZE=20 # 连接池大小 # 计算优化 USE_MULTIPROCESSING=true # 启用多进程 MAX_WORKERS=4 # 最大工作进程数 CHUNK_SIZE=10000 # 数据分块大小 ``` #### 5. 获取Tushare Token 1. 注册账号: 访问 [Tushare官网](https://tushare.pro/register) 2. 实名认证: 完成个人实名认证 3. 获取token: 在用户中心复制API TOKEN 4. 配置环境: 将token填入 `.env` 文件 #### 6. 启动服务 **开发模式** ```bash # 激活虚拟环境 source .venv/bin/activate # 启动开发服务器 (支持热重载) uvicorn main:app --reload --host 0.0.0.0 --port 8000 --log-level info # 或者使用 Python 直接运行 python -m uvicorn main:app --reload ``` **生产模式** ```bash # 使用 gunicorn (Linux/macOS) gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000 # 使用 Docker (推荐) docker build -t analysis:latest . docker run -d --name analysis-server \ -p 8000:8000 \ -v $(pwd)/.env:/app/.env \ -v $(pwd)/media:/app/media \ analysis:latest ``` #### 7. 验证安装 访问 API 文档确认服务正常: ```bash # API 文档 curl http://localhost:8000/docs # 健康检查 curl http://localhost:8000/health # WebSocket 测试 (使用 wscat) npm install -g wscat wscat -c ws://localhost:8000/factor/single-analysis ``` ### 故障排除 #### 常见问题 **1. DolphinDB连接失败** ```bash # 检查DolphinDB是否运行 netstat -tlnp | grep 8848 # 检查连接参数 telnet localhost 8848 # 查看DolphinDB日志 tail -f /path/to/dolphindb/logs/dolphindb.log ``` **2. Tushare API限制** ```python # 检查账户状态 import tushare as ts ts.set_token('your_token') pro = ts.pro_api() print(pro.query('user')) # 查看积分和权限 ``` **3. 内存不足** - 减少 `CHUNK_SIZE` 参数 - 限制并发请求数 `MAX_CONCURRENT_REQUESTS` - 增加虚拟内存或升级硬件 **4. 依赖安装失败** ```bash # 清理pip缓存 pip cache purge # 使用清华源安装 pip install -i https://pypi.tuna.tsinghua.edu.cn/simple/ package_name # 安装编译工具 (Linux) sudo apt-get install build-essential python3-dev # 安装TA-Lib依赖 (Ubuntu/Debian) sudo apt-get install libta-lib-dev # macOS brew install ta-lib ``` ## 📡 API接口详解 ### WebSocket 因子分析 #### 连接地址 ``` ws://localhost:8000/factor/single-analysis ``` #### 完整请求格式 ```json { "index_code": "399300.SZ", "period": "DAILY", "required_fields": [ "close_hfq", // 后复权收盘价 "open_hfq", // 后复权开盘价 "high_hfq", // 后复权最高价 "low_hfq", // 后复权最低价 "vol", // 成交量 "amount", // 成交额 "turnover_rate", // 换手率 "pe_ttm", // 滚动市盈率 "pb", // 市净率 "total_mv" // 总市值 ], "derived_columns": [ { "name": "rsi_14", "definition": { "op": "talib", "func": "RSI", "prices": ["close_hfq"], "params": {"timeperiod": 14} } }, { "name": "bollinger_position", "definition": { "op": "div", "left": { "op": "sub", "left": "close_hfq", "right": { "op": "rolling_mean", "col": "close_hfq", "window": 20 } }, "right": { "op": "rolling_std", "col": "close_hfq", "window": 20 } } } ], "start_date": "2020.01.01", "end_date": "2024.01.01", "save_file": true } ``` #### 参数详细说明 ##### index_code (必需) 支持的股票池代码: | 代码 | 名称 | 成分股数量 | 描述 | |------|------|-----------|------| | `399300.SZ` | 沪深300 | 300只 | 大盘蓝筹股,市值权重 | | `000852.SH` | 中证1000 | 1000只 | 中小盘成长股 | | `932000.CSI` | 中证2000 | 2000只 | 小盘价值股 | ##### period (可选,默认 "DAILY") 分析周期: | 值 | 说明 | 适用场景 | |----|------|----------| | `DAILY` | 日频数据 | 短期因子,高频策略 | | `WEEKLY` | 周频数据 | 中期趋势,降低噪音 | | `MONTHLY` | 月频数据 | 长期价值,基本面分析 | ##### required_fields (必需) 基础数据字段,支持字段列表: **价格类**: - `open`, `high`, `low`, `close`: 原始价格 - `open_hfq`, `high_hfq`, `low_hfq`, `close_hfq`: 后复权价格 - `pre_close`: 前收盘价 - `change`, `pct_chg`: 涨跌额、涨跌幅 **成交类**: - `vol`: 成交量(股) - `amount`: 成交额(元) - `turnover_rate`: 换手率(%) - `turnover_rate_f`: 自由流通换手率 - `volume_ratio`: 量比 **估值类**: - `pe_ttm`: 滚动市盈率 - `pb`: 市净率 - `ps_ttm`: 滚动市销率 - `dv_ttm`: 股息率 **市值类**: - `total_mv`: 总市值(万元) - `circ_mv`: 流通市值(万元) - `free_share`: 自由流通股本 ##### derived_columns (必需) 自定义因子定义数组,每个元素包含: - `name`: 因子名称(字符串,用作列名) - `definition`: 因子计算表达式(嵌套字典) #### WebSocket通信流程 ##### 1. 建立连接 ```javascript const ws = new WebSocket('ws://localhost:8000/factor/single-analysis'); ws.onopen = function() { console.log('WebSocket连接已建立'); }; ``` ##### 2. 发送参数 ```javascript ws.onopen = function() { const params = { index_code: "399300.SZ", period: "DAILY", required_fields: ["close_hfq", "vol"], derived_columns: [{ name: "momentum_20", definition: { op: "pct_change", col: "close_hfq", periods: 20 } }] }; ws.send(JSON.stringify(params)); }; ``` ##### 3. 接收状态更新 ```javascript ws.onmessage = function(event) { const data = JSON.parse(event.data); console.log(`状态: ${data.status}`); console.log(`消息: ${data.message}`); // 状态类型: // "等待发送参数" - 等待客户端发送分析参数 // "等待重新发送参数" - 参数验证失败,需重新发送 // "运行中" - 正在执行分析,message显示当前步骤 // "准备发送" - 即将发送结果 // "报告已保存" - 结果已保存到文件 }; ``` ##### 4. 接收分析结果 ```javascript ws.onmessage = function(event) { if (event.data instanceof Blob) { // 二进制结果 (pickle格式) const reader = new FileReader(); reader.onload = function(e) { const arrayBuffer = e.target.result; // 发送到后端Python服务进行pickle反序列化 parsePickleResult(arrayBuffer); }; reader.readAsArrayBuffer(event.data); } else { // 文本状态消息 const data = JSON.parse(event.data); handleStatusUpdate(data); } }; ``` #### 错误处理 常见错误码和处理: ```javascript ws.onerror = function(error) { console.error('WebSocket错误:', error); }; ws.onclose = function(event) { if (event.code === 1000) { console.log('分析完成,连接正常关闭'); } else { console.error(`连接异常关闭: ${event.code} - ${event.reason}`); // 可能的原因: // 1001 - 参数验证失败 // 1002 - 数据获取失败 // 1003 - 计算过程出错 // 1006 - 网络连接异常 } }; ``` ### REST API 接口 #### 文件下载接口 **端点**: `GET /media/{file_id}` **参数**: - `file_id`: 文件唯一标识符(UUID格式) **响应**: - 成功(200): 返回pickle序列化的分析结果 - 失败(404): 文件不存在 **使用示例**: ```python import requests import pickle # 下载分析结果 response = requests.get('http://localhost:8000/media/your-file-id') if response.status_code == 200: result = pickle.loads(response.content) print("因子分析结果:", result['factor_analysis']) print("IC分析结果:", result['ic_analysis']) else: print("文件下载失败") ``` #### 健康检查接口 **端点**: `GET /health` **响应**: ```json { "status": "healthy", "timestamp": "2024-01-01T12:00:00Z", "version": "0.1.0", "dependencies": { "dolphindb": "connected", "tushare": "accessible", "data_service": "available" } } ``` #### 系统信息接口 **端点**: `GET /info` **响应**: ```json { "supported_indices": [ "399300.SZ", "000852.SH", "932000.CSI" ], "supported_periods": [ "DAILY", "WEEKLY", "MONTHLY" ], "available_fields": [ "open", "high", "low", "close", "vol", "amount", "pe_ttm", "pb" ], "factor_operations": [ "rolling_mean", "rolling_std", "pct_change", "talib", "shift", "add", "sub", "mul", "div" ] } ``` ## 🧮 因子表达式语法 ### 表达式结构 因子表达式采用JSON格式的抽象语法树(AST),支持嵌套组合和复杂逻辑: ```json { "op": "operation_name", // 操作类型 "param1": "value1", // 操作参数 "param2": { // 嵌套表达式 "op": "nested_operation", "col": "column_name" } } ``` ### 基础操作类型 #### 1. 列引用 ```json { "op": "col", "col": "close_hfq" } ``` #### 2. 时间序列操作 ##### 滞后/超前 ```json { "op": "shift", "col": "close_hfq", "periods": 1 // 正数为滞后,负数为超前 } ``` ##### 差分 ```json { "op": "diff", "col": "close_hfq", "periods": 1 // 差分阶数 } ``` ##### 百分比变化 ```json { "op": "pct_change", "col": "close_hfq", "periods": 20 // 计算20日收益率 } ``` #### 3. 滚动窗口统计 ##### 移动平均 ```json { "op": "rolling_mean", "col": "close_hfq", "window": 20 // 20日均线 } ``` ##### 移动标准差 ```json { "op": "rolling_std", "col": "close_hfq", "window": 20 } ``` ##### 其他滚动统计 ```json // 滚动最大值 {"op": "rolling_max", "col": "high_hfq", "window": 252} // 滚动最小值 {"op": "rolling_min", "col": "low_hfq", "window": 252} // 滚动中位数 {"op": "rolling_median", "col": "close_hfq", "window": 20} // 滚动分位数 {"op": "rolling_quantile", "col": "close_hfq", "window": 20, "quantile": 0.8} // 滚动偏度 {"op": "rolling_skew", "col": "close_hfq", "window": 60} // 滚动峰度 {"op": "rolling_kurt", "col": "close_hfq", "window": 60} ``` #### 4. 指数加权移动平均 ```json { "op": "ewm_mean", "col": "close_hfq", "span": 12 // 半衰期为12天 } ``` #### 5. 数学运算 ##### 四则运算 ```json // 加法 { "op": "add", "left": "close_hfq", "right": "open_hfq" } // 减法 { "op": "sub", "left": "high_hfq", "right": "low_hfq" } // 乘法 { "op": "mul", "left": "close_hfq", "right": 1.05 } // 除法 { "op": "div", "left": "close_hfq", "right": { "op": "shift", "col": "close_hfq", "periods": 1 } } ``` ##### 数学函数 ```json // 幂运算 {"op": "pow", "col": "vol", "exp": 0.5} // 自然对数 {"op": "log", "col": "close_hfq"} // 绝对值 {"op": "abs", "col": "pct_chg"} // 平方根 {"op": "sqrt", "col": "vol"} // 三角函数 {"op": "sin", "col": "angle_column"} {"op": "cos", "col": "angle_column"} ``` #### 6. 比较操作 ```json // 大于 {"op": "gt", "left": "close_hfq", "right": "open_hfq"} // 小于 {"op": "lt", "left": "rsi", "right": 30} // 大于等于 {"op": "ge", "left": "pe_ttm", "right": 0} // 小于等于 {"op": "le", "left": "pb", "right": 10} // 等于 {"op": "eq", "left": "industry", "right": "银行"} // 不等于 {"op": "ne", "left": "st_flag", "right": 1} ``` #### 7. 逻辑运算 ```json // 逻辑与 {"op": "and", "left": condition1, "right": condition2} // 逻辑或 {"op": "or", "left": condition1, "right": condition2} // 逻辑非 {"op": "not", "col": condition} ``` #### 8. 信号检测 ##### 交叉信号 ```json // 向上突破 { "op": "cross_above", "left": "close_hfq", "right": { "op": "rolling_mean", "col": "close_hfq", "window": 20 } } // 向下跌破 { "op": "cross_below", "left": "rsi", "right": 70 } ``` ##### 连续计数 ```json // 连续上涨天数 { "op": "num_consecutive_gt", "col": "pct_chg", "threshold": 0 } // 连续下跌天数 { "op": "num_consecutive_lt", "col": "pct_chg", "threshold": 0 } ``` #### 9. 技术指标 (TA-Lib) ##### RSI相对强弱指标 ```json { "op": "talib", "func": "RSI", "prices": ["close_hfq"], "params": {"timeperiod": 14} } ``` ##### 布林带 ```json { "op": "talib", "func": "BBANDS", "prices": ["close_hfq"], "params": { "timeperiod": 20, "nbdevup": 2, "nbdevdn": 2, "matype": 0 }, "output": "upperband" // 指定输出: upperband/middleband/lowerband } ``` ##### MACD ```json { "op": "talib", "func": "MACD", "prices": ["close_hfq"], "params": { "fastperiod": 12, "slowperiod": 26, "signalperiod": 9 }, "output": "macd" // 输出选项: macd/macdsignal/macdhist } ``` ##### KDJ随机指标 ```json { "op": "talib", "func": "STOCH", "prices": ["high_hfq", "low_hfq", "close_hfq"], "params": { "fastk_period": 9, "slowk_period": 3, "slowd_period": 3 }, "output": "slowk" // K值,slowd为D值 } ``` ##### 威廉指标 ```json { "op": "talib", "func": "WILLR", "prices": ["high_hfq", "low_hfq", "close_hfq"], "params": {"timeperiod": 14} } ``` ##### ATR平均真实波幅 ```json { "op": "talib", "func": "ATR", "prices": ["high_hfq", "low_hfq", "close_hfq"], "params": {"timeperiod": 14} } ``` #### 10. 累积统计 ```json // 累积和 {"op": "cum_sum", "col": "pct_chg"} // 累积均值 {"op": "cum_mean", "col": "vol"} // 累积最大值 {"op": "cum_max", "col": "high_hfq"} // 累积最小值 {"op": "cum_min", "col": "low_hfq"} // 累积计数(非空值) {"op": "cum_count", "col": "close_hfq"} ``` #### 11. 日期时间提取 ```json // 年份 {"op": "year", "col": "time"} // 月份 {"op": "month", "col": "time"} // 日期 {"op": "day", "col": "time"} // 星期几(0=周一) {"op": "weekday", "col": "time"} // 季度 {"op": "quarter", "col": "time"} // 小时 {"op": "hour", "col": "time"} // 是否周末 {"op": "is_weekend", "col": "time"} ``` #### 12. 类型转换 ```json // 转整数 {"op": "astype", "col": "float_column", "dtype": "int"} // 转浮点数 {"op": "astype", "col": "int_column", "dtype": "float"} // 转布尔值 {"op": "astype", "col": "numeric_column", "dtype": "bool"} // 转字符串 {"op": "astype", "col": "any_column", "dtype": "str"} ``` #### 13. 集合操作 ```json // 包含检查 { "op": "isin", "col": "industry", "values": ["银行", "保险", "证券"] } ``` #### 14. 自定义表达式 ```json { "op": "eval", "expr": "(close_hfq - open_hfq) / (high_hfq - low_hfq)" } ``` ### 复合因子示例 #### 1. 价量背离因子 ```json { "name": "price_volume_divergence", "definition": { "op": "sub", "left": { "op": "pct_change", "col": "close_hfq", "periods": 20 }, "right": { "op": "pct_change", "col": "vol", "periods": 20 } } } ``` #### 2. 布林带位置因子 ```json { "name": "bollinger_position", "definition": { "op": "div", "left": { "op": "sub", "left": "close_hfq", "right": { "op": "rolling_mean", "col": "close_hfq", "window": 20 } }, "right": { "op": "mul", "left": { "op": "rolling_std", "col": "close_hfq", "window": 20 }, "right": 2 } } } ``` #### 3. RSI均值回归因子 ```json { "name": "rsi_mean_reversion", "definition": { "op": "sub", "left": 50, "right": { "op": "talib", "func": "RSI", "prices": ["close_hfq"], "params": {"timeperiod": 14} } } } ``` #### 4. 动量反转组合因子 ```json { "name": "momentum_reversal", "definition": { "op": "add", "left": { "op": "mul", "left": { "op": "pct_change", "col": "close_hfq", "periods": 252 }, "right": 0.7 }, "right": { "op": "mul", "left": { "op": "mul", "left": { "op": "pct_change", "col": "close_hfq", "periods": 20 }, "right": -1 }, "right": 0.3 } } } ``` #### 5. 量价趋势确认因子 ```json { "name": "volume_price_trend", "definition": { "op": "mul", "left": { "op": "gt", "left": "close_hfq", "right": { "op": "shift", "col": "close_hfq", "periods": 1 } }, "right": { "op": "gt", "left": "vol", "right": { "op": "rolling_mean", "col": "vol", "window": 20 } } } } ``` ### 因子命名规范 建议的因子命名规范: - **技术指标**: `指标名_参数` (如: `rsi_14`, `ma_20`) - **价量关系**: `price_volume_xxx` (如: `price_volume_corr`) - **动量类**: `momentum_xxx` (如: `momentum_20d`) - **均值回归**: `mean_reversion_xxx` (如: `mean_reversion_bb`) - **波动率**: `volatility_xxx` (如: `volatility_garch`) - **行业**: `industry_xxx` (如: `industry_momentum`) ### 性能优化建议 1. **避免重复计算**: 将公共子表达式提取为独立因子 2. **合理设置窗口**: 过大的窗口会影响计算效率 3. **数据类型**: 布尔运算结果会自动转换为0/1 4. **缓存策略**: 系统会自动缓存中间计算结果 ## 📊 完整分析流程 ### 流程概览 ``` 用户请求 → 参数验证 → 获取成分股 → 拉取基础数据 → 计算衍生因子 → 因子预处理 → 单因子分析 → IC分析 → 输出结果 ``` ### 详细步骤 #### 1. 参数验证与成分股获取 ```python # 获取指数最新成分股 index_weight = pro.index_weight(index_code="399300.SZ") latest_date = index_weight["trade_date"].max() constituent_stocks = index_weight[ index_weight["trade_date"] == latest_date ]["con_code"] print(f"获取到 {len(constituent_stocks)} 只成分股") # 输出: 获取到 300 只成分股 ``` #### 2. 基础数据获取 ```python # 从外部数据服务获取股票日线数据 data_df = await fetch_stock_daily_data( start_date="2020-01-01", end_date="2024-01-01", codes=constituent_stocks.tolist(), factors=required_fields + ["circ_mv", "pct_chg_hfq"] # 添加必需字段 ) print(f"获取数据形状: {data_df.shape}") # 输出: 获取数据形状: (300000, 15) ``` 数据格式示例: ``` time code close_hfq vol amount ... 0 2020-01-02 000001.SZ 1841.69 12453 23456789 ... 1 2020-01-02 000002.SZ 4832.29 8976 45678912 ... 2 2020-01-03 000001.SZ 1875.53 15234 34567891 ... ``` #### 3. 衍生因子计算 ```python # 按股票分组计算衍生因子 data_df = compute_derived(data_df, derived_columns) # 计算过程示例: # 1. 按code分组 # 2. 每组内按time排序 # 3. 应用因子表达式计算 # 4. 合并结果 ``` 计算后数据结构: ``` time code close_hfq rsi_14 bollinger_position ... 0 2020-01-02 000001.SZ 1841.69 NaN NaN ... 1 2020-01-03 000001.SZ 1875.53 NaN NaN ... 15 2020-01-20 000001.SZ 1823.45 45.2 -0.34 ... ``` #### 4. 数据预处理和上传 ```python # 数据标准化 data_df = data_df.rename(columns={ "pct_chg_hfq": "ret", # 收益率 "circ_mv": "mktmv" # 市值 }) # 数据类型转换 data_df["ret"] = data_df["ret"] / 100 # 转为小数 data_df["time"] = data_df["time"].astype("datetime64[ms]") # 准备行业映射 code2industry = dict(zip(stock_basic["ts_code"], stock_basic["industry"])) # 准备基准数据 benchmark_table = pro.index_daily(ts_code="399300.SZ", start_date="20200101") benchmark_table = benchmark_table.rename(columns={ "trade_date": "time", "pct_chg": "ret" }) # 上传到DolphinDB session.upload({ "code2industry": code2industry, "dataTB": data_df[["time", "code", "mktmv", "ret", factor_name]], "benchmarkTB": benchmark_table }) ``` #### 5. DolphinDB因子预处理 ```dolphindb // 创建行业表 industryTB = select time, code from dataTB industryTB["industry"] = code2industry[industryTB["code"]] // 分离数据表 factorTB = select time, code, your_factor from dataTB mktmvTB = select time, code, mktmv from dataTB retTB = select time, code, ret from dataTB ``` ##### 因子预处理参数说明 ```dolphindb factorNew = factorBacktest::preprocess( factorTB, // 因子数据表 "your_factor", // 因子列名 mktmvTB = mktmvTB, // 市值表(用于加权) industryTB = industryTB, // 行业表(用于中性化) dateName = "time", // 日期列名 securityName = "code", // 证券列名 mvName = "mktmv", // 市值列名 indName = "industry", // 行业列名 delOutlierIf = true, // 是否去极值 standardizeIf = true, // 是否标准化 neutralizeIf = true, // 是否行业中性化 delOutlierMethod = "mad", // 去极值方法: mad/quantile standardizeMethod = "rank", // 标准化方法: rank/zscore n = 3, // mad方法的倍数 modify = false, // 是否修改原数据 startDate = 2020.01.01, // 开始日期 endDate = 2024.01.01 // 结束日期 ) ``` **预处理步骤详解**: 1. **去极值**: 使用MAD(中位数绝对偏差)方法 ``` 上界 = 中位数 + 3 * MAD 下界 = 中位数 - 3 * MAD ``` 2. **标准化**: 使用排序法(rank) ``` 标准化值 = (rank(因子值) - 1) / (总数 - 1) * 2 - 1 结果范围: [-1, 1] ``` 3. **中性化**: 行业市值中性化 ``` 中性化因子 = 因子值 - 行业均值 - β_市值 * log(市值) ``` #### 6. 单因子分析 ```dolphindb factor_analysis = singleFactorAnalysis( factorTB = factorNew, // 预处理后因子数据 retTB = retTB, // 收益率表 factorName = "your_factor", // 因子名称 nGroups = 10, // 分组数(十分位数) rf = 0.0, // 无风险收益率 mktmvDF = mktmvTB, // 市值表 dateName = "time", // 日期列名 securityName = "code", // 证券列名 retName = "ret", // 收益率列名 mvName = "mktmv", // 市值列名 startDate = 2020.01.01, // 开始日期 endDate = 2024.01.01, // 结束日期 maxLags = NULL, // 滞后阶数 benchmark = benchmarkTB, // 基准收益率 period = "DAILY" // 分析频率 ) ``` **分析内容包含**: 1. **分组统计**: 按因子值分为10组的基础统计 2. **收益分析**: 各组平均收益、累积收益、胜率 3. **风险指标**: 波动率、最大回撤、夏普比率 4. **多空组合**: 顶部组合vs底部组合的表现 5. **基准比较**: 相对基准的超额收益 #### 7. IC分析 ```dolphindb ic_analysis = getFactorIc( factorNew, // 预处理后因子数据 retTB, // 收益率数据 "your_factor", // 因子名称 "time", // 时间列名 "code", // 代码列名 "ret", // 收益率列名 2020.01.01, // 开始日期 2024.01.01 // 结束日期 ) ``` **IC分析包含**: 1. **时序IC**: 每日IC值序列 2. **IC统计**: 均值、标准差、t统计量、显著性 3. **RankIC**: 秩相关系数 4. **IC_IR**: 信息比率 (IC均值/IC标准差) 5. **胜率**: IC>0的比例 ### 结果数据结构 #### 因子分析结果 ```python factor_analysis = { "group_stats": { "avg_ret": [0.001, 0.002, ..., 0.015], # 各组平均收益 "vol": [0.025, 0.024, ..., 0.023], # 各组收益波动率 "sharpe": [0.04, 0.08, ..., 0.65], # 各组夏普比率 "max_drawdown": [0.15, 0.14, ..., 0.08], # 各组最大回撤 "win_rate": [0.48, 0.49, ..., 0.55], # 各组胜率 "turnover": [0.25, 0.23, ..., 0.18] # 各组换手率 }, "long_short": { "ret": 0.014, # 多空收益 "vol": 0.12, # 多空波动率 "sharpe": 1.17, # 多空夏普比率 "max_dd": 0.08, # 多空最大回撤 "calmar": 1.75 # 卡尔玛比率 }, "cumulative_returns": { "dates": ["2020-01-01", "2020-01-02", ...], "group_1": [1.0, 1.001, 1.003, ...], # 第1组累积收益 "group_10": [1.0, 1.002, 1.018, ...], # 第10组累积收益 "long_short": [1.0, 1.001, 1.015, ...] # 多空组合累积收益 } } ``` #### IC分析结果 ```python ic_analysis = { "time_series": { "dates": ["2020-01-01", "2020-01-02", ...], "ic": [0.05, -0.02, 0.08, ...], # 每日IC值 "rank_ic": [0.04, -0.01, 0.06, ...], # 每日RankIC值 "cum_ic": [0.05, 0.03, 0.11, ...] # 累积IC }, "statistics": { "ic_mean": 0.045, # IC均值 "ic_std": 0.125, # IC标准差 "ic_ir": 0.36, # 信息比率 "ic_skew": -0.15, # IC偏度 "ic_kurt": 2.8, # IC峰度 "t_stat": 8.5, # t统计量 "p_value": 1e-15, # p值 "win_rate": 0.58 # IC>0胜率 }, "rank_ic_stats": { "mean": 0.041, # RankIC均值 "std": 0.118, # RankIC标准差 "ir": 0.35, # RankIC信息比率 "win_rate": 0.59 # RankIC>0胜率 } } ``` ### 结果解读指南 #### 因子有效性判断标准 **强有效因子**: - IC > 0.05 且 t统计量 > 3 - 多空收益夏普比率 > 1.5 - 各组收益单调性强 - 胜率 > 55% **中等有效因子**: - IC > 0.03 且 t统计量 > 2 - 多空收益夏普比率 > 1.0 - 各组收益有明显差异 - 胜率 > 52% **弱有效或无效因子**: - IC < 0.02 或 t统计量 < 1.5 - 多空收益夏普比率 < 0.5 - 各组收益差异不明显 - 胜率 < 50% #### 风险提示 1. **过拟合风险**: IC很高但在样本外表现差 2. **数据挖掘**: 尝试过多因子可能产生偶然性 3. **交易成本**: 高换手率会侵蚀实际收益 4. **容量限制**: 因子衰减和容量约束 5. **市场环境**: 因子表现的时变性 ### 性能监控 #### 实时性能指标 ```python performance_metrics = { "data_fetch_time": "2.3s", # 数据获取耗时 "factor_compute_time": "1.8s", # 因子计算耗时 "analysis_time": "4.5s", # 分析计算耗时 "total_time": "8.6s", # 总耗时 "memory_usage": "1.2GB", # 内存使用 "data_size": "419350 rows" # 数据量 } ``` #### 优化建议 - 数据量大时考虑分批处理 - 使用缓存减少重复计算 - 合理设置因子计算窗口 - 监控内存使用避免溢出 ## 📈 使用示例 ### Python WebSocket客户端 #### 基础客户端实现 ```python import asyncio import websockets import json import pickle from typing import Dict, Any class FactorAnalysisClient: def __init__(self, url: str = "ws://localhost:8000/factor/single-analysis"): self.url = url self.websocket = None self.results = {} async def connect(self): """建立WebSocket连接""" self.websocket = await websockets.connect(self.url) print("WebSocket连接已建立") async def send_analysis_request(self, params: Dict[str, Any]): """发送分析请求""" if not self.websocket: raise ConnectionError("WebSocket未连接") await self.websocket.send(json.dumps(params)) print("分析参数已发送") async def receive_messages(self): """接收服务器消息""" async for message in self.websocket: if isinstance(message, bytes): # 接收到分析结果(pickle格式) try: result = pickle.loads(message) self.results = result print("分析结果接收完成") self.print_summary() break except Exception as e: print(f"结果解析失败: {e}") else: # 接收到状态消息 try: data = json.loads(message) print(f"[{data['status']}] {data['message']}") except json.JSONDecodeError: print(f"收到消息: {message}") def print_summary(self): """打印分析结果摘要""" if not self.results: print("无分析结果") return factor_analysis = self.results.get('factor_analysis', {}) ic_analysis = self.results.get('ic_analysis', {}) print("\n=== 因子分析摘要 ===") if 'long_short' in factor_analysis: ls = factor_analysis['long_short'] print(f"多空收益: {ls.get('ret', 0):.4f}") print(f"夏普比率: {ls.get('sharpe', 0):.4f}") print(f"最大回撤: {ls.get('max_dd', 0):.4f}") print("\n=== IC分析摘要 ===") if 'statistics' in ic_analysis: ic_stats = ic_analysis['statistics'] print(f"IC均值: {ic_stats.get('ic_mean', 0):.4f}") print(f"IC IR: {ic_stats.get('ic_ir', 0):.4f}") print(f"t统计量: {ic_stats.get('t_stat', 0):.2f}") print(f"胜率: {ic_stats.get('win_rate', 0):.2%}") async def analyze_factor(self, params: Dict[str, Any]): """完整分析流程""" try: await self.connect() await self.send_analysis_request(params) await self.receive_messages() except Exception as e: print(f"分析过程出错: {e}") finally: if self.websocket: await self.websocket.close() print("连接已关闭") # 使用示例 async def main(): client = FactorAnalysisClient() # 定义分析参数 params = { "index_code": "399300.SZ", "period": "DAILY", "required_fields": [ "close_hfq", "open_hfq", "high_hfq", "low_hfq", "vol", "amount", "turnover_rate" ], "derived_columns": [ { "name": "rsi_14", "definition": { "op": "talib", "func": "RSI", "prices": ["close_hfq"], "params": {"timeperiod": 14} } } ], "start_date": "2020.01.01", "end_date": "2023.12.31", "save_file": False } await client.analyze_factor(params) # 返回结果供后续分析 return client.results # 运行分析 if __name__ == "__main__": results = asyncio.run(main()) ``` #### 批量因子测试 ```python import asyncio from itertools import product class BatchFactorTester: def __init__(self): self.client = FactorAnalysisClient() self.test_results = [] def generate_factor_configs(self): """生成批量测试的因子配置""" factor_configs = [] # RSI系列因子 for period in [7, 14, 21, 28]: factor_configs.append({ "name": f"rsi_{period}", "definition": { "op": "talib", "func": "RSI", "prices": ["close_hfq"], "params": {"timeperiod": period} } }) # 移动平均系列 for short, long in [(5, 20), (10, 30), (20, 60)]: factor_configs.append({ "name": f"ma_ratio_{short}_{long}", "definition": { "op": "div", "left": { "op": "rolling_mean", "col": "close_hfq", "window": short }, "right": { "op": "rolling_mean", "col": "close_hfq", "window": long } } }) # 动量因子系列 for period in [5, 10, 20, 60]: factor_configs.append({ "name": f"momentum_{period}", "definition": { "op": "pct_change", "col": "close_hfq", "periods": period } }) return factor_configs async def test_single_factor(self, factor_config, base_params): """测试单个因子""" params = base_params.copy() params["derived_columns"] = [factor_config] try: await self.client.analyze_factor(params) result = self.client.results # 提取关键指标 factor_analysis = result.get('factor_analysis', {}) ic_analysis = result.get('ic_analysis', {}) summary = { "factor_name": factor_config["name"], "ic_mean": ic_analysis.get('statistics', {}).get('ic_mean', 0), "ic_ir": ic_analysis.get('statistics', {}).get('ic_ir', 0), "t_stat": ic_analysis.get('statistics', {}).get('t_stat', 0), "win_rate": ic_analysis.get('statistics', {}).get('win_rate', 0), "long_short_ret": factor_analysis.get('long_short', {}).get('ret', 0), "sharpe": factor_analysis.get('long_short', {}).get('sharpe', 0), "max_drawdown": factor_analysis.get('long_short', {}).get('max_dd', 0) } self.test_results.append(summary) print(f"✓ {factor_config['name']}: IC={summary['ic_mean']:.4f}, IR={summary['ic_ir']:.4f}") except Exception as e: print(f"✗ {factor_config['name']}: 测试失败 - {e}") async def run_batch_test(self): """运行批量测试""" base_params = { "index_code": "399300.SZ", "period": "DAILY", "required_fields": ["close_hfq", "vol", "amount"], "start_date": "2020.01.01", "end_date": "2023.12.31", "save_file": False } factor_configs = self.generate_factor_configs() print(f"开始测试 {len(factor_configs)} 个因子...") for i, factor_config in enumerate(factor_configs, 1): print(f"\n[{i}/{len(factor_configs)}] 测试因子: {factor_config['name']}") await self.test_single_factor(factor_config, base_params) self.analyze_results() def analyze_results(self): """分析批量测试结果""" if not self.test_results: print("无测试结果") return import pandas as pd df = pd.DataFrame(self.test_results) print("\n=== 批量测试结果汇总 ===") print(f"总测试因子数: {len(df)}") print(f"有效因子数 (IC>0.02): {len(df[df['ic_mean'] > 0.02])}") print(f"显著因子数 (t_stat>2): {len(df[df['t_stat'] > 2])}") print("\n=== 最佳因子Top 5 (按IC IR排序) ===") top_factors = df.nlargest(5, 'ic_ir') for _, factor in top_factors.iterrows(): print(f"{factor['factor_name']}: IC={factor['ic_mean']:.4f}, IR={factor['ic_ir']:.4f}, Sharpe={factor['sharpe']:.4f}") # 保存结果 df.to_csv("factor_test_results.csv", index=False) print(f"\n结果已保存到 factor_test_results.csv") # 运行批量测试 async def run_batch_test(): tester = BatchFactorTester() await tester.run_batch_test() if __name__ == "__main__": asyncio.run(run_batch_test()) ``` ### JavaScript/Node.js客户端 ```javascript const WebSocket = require('ws'); const fs = require('fs'); class FactorAnalysisClient { constructor(url = 'ws://localhost:8000/factor/single-analysis') { this.url = url; this.ws = null; this.results = null; } connect() { return new Promise((resolve, reject) => { this.ws = new WebSocket(this.url); this.ws.on('open', () => { console.log('WebSocket连接已建立'); resolve(); }); this.ws.on('error', (error) => { console.error('WebSocket错误:', error); reject(error); }); }); } sendAnalysisRequest(params) { if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { throw new Error('WebSocket未连接'); } this.ws.send(JSON.stringify(params)); console.log('分析参数已发送'); } receiveMessages() { return new Promise((resolve, reject) => { this.ws.on('message', (data) => { if (Buffer.isBuffer(data)) { // 接收到二进制结果,保存到文件 fs.writeFileSync('analysis_result.pkl', data); console.log('分析结果已保存到 analysis_result.pkl'); resolve(data); } else { // 接收到状态消息 try { const message = JSON.parse(data.toString()); console.log(`[${message.status}] ${message.message}`); } catch (e) { console.log('收到消息:', data.toString()); } } }); this.ws.on('close', (code, reason) => { if (code === 1000) { console.log('分析完成,连接正常关闭'); } else { console.error(`连接异常关闭: ${code} - ${reason}`); reject(new Error(`WebSocket关闭: ${code}`)); } }); }); } async analyzeFactor(params) { try { await this.connect(); this.sendAnalysisRequest(params); await this.receiveMessages(); } catch (error) { console.error('分析过程出错:', error); throw error; } finally { if (this.ws) { this.ws.close(); } } } } // 使用示例 async function main() { const client = new FactorAnalysisClient(); const params = { index_code: "399300.SZ", period: "DAILY", required_fields: ["close_hfq", "vol"], derived_columns: [{ name: "volume_ma_ratio", definition: { op: "div", left: "vol", right: { op: "rolling_mean", col: "vol", window: 20 } } }], start_date: "2022.01.01", end_date: "2023.12.31", save_file: true }; try { await client.analyzeFactory(params); console.log('分析完成'); } catch (error) { console.error('分析失败:', error); } } main(); ``` ### Jupyter Notebook 高级示例 ```python # cell 1: 环境设置 import asyncio import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from typing import Dict, List import warnings warnings.filterwarnings('ignore') # 设置中文字体和图表样式 plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS'] plt.rcParams['axes.unicode_minus'] = False sns.set_style("whitegrid") # cell 2: 因子分析类 class FactorAnalyzer: def __init__(self, client_class): self.client = client_class() self.analysis_results = {} async def analyze_momentum_factors(self): """分析动量类因子""" momentum_periods = [5, 10, 20, 60, 120, 252] for period in momentum_periods: params = { "index_code": "399300.SZ", "period": "DAILY", "required_fields": ["close_hfq"], "derived_columns": [{ "name": f"momentum_{period}d", "definition": { "op": "pct_change", "col": "close_hfq", "periods": period } }], "start_date": "2018.01.01", "end_date": "2023.12.31", "save_file": False } await self.client.analyze_factor(params) self.analysis_results[f"momentum_{period}d"] = self.client.results return self.analysis_results def plot_ic_analysis(self, factor_name: str): """绘制IC分析图""" result = self.analysis_results.get(factor_name) if not result: return ic_data = result['ic_analysis']['time_series'] dates = pd.to_datetime(ic_data['dates']) ic_values = ic_data['ic'] fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8)) # IC时序图 ax1.plot(dates, ic_values, alpha=0.7, linewidth=0.8) ax1.axhline(y=0, color='red', linestyle='--', alpha=0.5) ax1.set_title(f'{factor_name} - IC时序分析') ax1.set_ylabel('IC值') ax1.grid(True, alpha=0.3) # IC分布图 ax2.hist(ic_values, bins=50, alpha=0.7, color='skyblue', edgecolor='black') ax2.axvline(x=0, color='red', linestyle='--', alpha=0.7) ax2.axvline(x=np.mean(ic_values), color='green', linestyle='-', alpha=0.7, label=f'均值: {np.mean(ic_values):.4f}') ax2.set_title('IC分布直方图') ax2.set_xlabel('IC值') ax2.set_ylabel('频数') ax2.legend() ax2.grid(True, alpha=0.3) plt.tight_layout() plt.show() def plot_cumulative_returns(self, factor_name: str): """绘制累积收益曲线""" result = self.analysis_results.get(factor_name) if not result: return cum_ret_data = result['factor_analysis']['cumulative_returns'] dates = pd.to_datetime(cum_ret_data['dates']) plt.figure(figsize=(12, 6)) # 绘制各分组累积收益 for i in range(1, 11): group_key = f'group_{i}' if group_key in cum_ret_data: returns = cum_ret_data[group_key] label = f'第{i}组' if i in [1, 10] else None alpha = 1.0 if i in [1, 10] else 0.3 linewidth = 2.0 if i in [1, 10] else 1.0 plt.plot(dates, returns, alpha=alpha, linewidth=linewidth, label=label) # 绘制多空组合 if 'long_short' in cum_ret_data: plt.plot(dates, cum_ret_data['long_short'], color='red', linewidth=2, label='多空组合') plt.title(f'{factor_name} - 累积收益曲线') plt.xlabel('日期') plt.ylabel('累积收益') plt.legend() plt.grid(True, alpha=0.3) plt.tight_layout() plt.show() def create_factor_comparison_table(self) -> pd.DataFrame: """创建因子比较表""" comparison_data = [] for factor_name, result in self.analysis_results.items(): factor_stats = result['factor_analysis']['long_short'] ic_stats = result['ic_analysis']['statistics'] comparison_data.append({ '因子名称': factor_name, 'IC均值': ic_stats['ic_mean'], 'IC标准差': ic_stats['ic_std'], 'IC_IR': ic_stats['ic_ir'], 't统计量': ic_stats['t_stat'], 'IC胜率': ic_stats['win_rate'], '多空收益': factor_stats['ret'], '夏普比率': factor_stats['sharpe'], '最大回撤': factor_stats['max_dd'], '卡尔玛比率': factor_stats.get('calmar', 0) }) df = pd.DataFrame(comparison_data) return df.round(4) # cell 3: 运行分析 analyzer = FactorAnalyzer(FactorAnalysisClient) results = await analyzer.analyze_momentum_factors() # cell 4: 生成比较表 comparison_df = analyzer.create_factor_comparison_table() print("=== 动量因子比较表 ===") print(comparison_df.to_string(index=False)) # cell 5: 可视化分析 for factor_name in ['momentum_20d', 'momentum_60d']: analyzer.plot_ic_analysis(factor_name) analyzer.plot_cumulative_returns(factor_name) ``` ### 高级应用场景 #### 1. 因子组合优化 ```python from scipy.optimize import minimize import cvxpy as cp class FactorPortfolioOptimizer: def __init__(self, factor_results: Dict): self.factor_results = factor_results self.factor_names = list(factor_results.keys()) self.ic_matrix = self._build_ic_matrix() def _build_ic_matrix(self) -> pd.DataFrame: """构建IC相关性矩阵""" ic_data = {} for factor, result in self.factor_results.items(): ic_data[factor] = result['ic_analysis']['time_series']['ic'] return pd.DataFrame(ic_data).corr() def optimize_portfolio(self, target_vol: float = 0.15) -> Dict: """优化因子权重组合""" n_factors = len(self.factor_names) # 定义变量 weights = cp.Variable(n_factors) # 预期收益(使用IC_IR作为代理) expected_returns = np.array([ self.factor_results[factor]['ic_analysis']['statistics']['ic_ir'] for factor in self.factor_names ]) # 风险模型(IC相关性) risk_matrix = self.ic_matrix.values # 目标函数:最大化夏普比率 portfolio_return = weights @ expected_returns portfolio_risk = cp.quad_form(weights, risk_matrix) # 约束条件 constraints = [ cp.sum(weights) == 1, # 权重和为1 weights >= 0, # 权重非负 portfolio_risk <= target_vol**2 # 风险约束 ] # 求解优化问题 problem = cp.Problem(cp.Maximize(portfolio_return), constraints) problem.solve() # 返回结果 optimal_weights = weights.value return { 'weights': dict(zip(self.factor_names, optimal_weights)), 'expected_return': portfolio_return.value, 'expected_risk': np.sqrt(portfolio_risk.value), 'sharpe_ratio': portfolio_return.value / np.sqrt(portfolio_risk.value) } # 使用示例 optimizer = FactorPortfolioOptimizer(results) optimal_portfolio = optimizer.optimize_portfolio(target_vol=0.12) print("=== 最优因子组合 ===") for factor, weight in optimal_portfolio['weights'].items(): print(f"{factor}: {weight:.4f}") ``` #### 2. 因子择时策略 ```python class FactorTimingStrategy: def __init__(self, factor_results: Dict): self.factor_results = factor_results def build_timing_signals(self, factor_name: str) -> pd.DataFrame: """构建因子择时信号""" result = self.factor_results[factor_name] ic_data = result['ic_analysis']['time_series'] df = pd.DataFrame({ 'date': pd.to_datetime(ic_data['dates']), 'ic': ic_data['ic'] }) # 计算IC移动平均 df['ic_ma5'] = df['ic'].rolling(5).mean() df['ic_ma20'] = df['ic'].rolling(20).mean() # 生成择时信号 df['signal'] = 0 df.loc[df['ic_ma5'] > df['ic_ma20'], 'signal'] = 1 # 做多信号 df.loc[df['ic_ma5'] < df['ic_ma20'], 'signal'] = -1 # 做空信号 return df def backtest_timing_strategy(self, factor_name: str) -> Dict: """回测择时策略""" signals = self.build_timing_signals(factor_name) # 获取因子收益数据 factor_rets = self.factor_results[factor_name]['factor_analysis'] long_short_rets = factor_rets['cumulative_returns']['long_short'] # 计算择时后的收益 timing_rets = [] for i, signal in enumerate(signals['signal']): if i < len(long_short_rets) - 1: factor_ret = long_short_rets[i+1] / long_short_rets[i] - 1 timing_ret = signal * factor_ret timing_rets.append(timing_ret) # 计算绩效指标 timing_rets = np.array(timing_rets) return { 'total_return': np.prod(1 + timing_rets) - 1, 'volatility': np.std(timing_rets) * np.sqrt(252), 'sharpe_ratio': np.mean(timing_rets) / np.std(timing_rets) * np.sqrt(252), 'max_drawdown': self._calculate_max_drawdown(1 + timing_rets), 'win_rate': np.sum(timing_rets > 0) / len(timing_rets) } def _calculate_max_drawdown(self, cumulative_returns): """计算最大回撤""" peak = np.maximum.accumulate(cumulative_returns) drawdown = (cumulative_returns - peak) / peak return abs(drawdown.min()) # 使用示例 timing_strategy = FactorTimingStrategy(results) timing_results = timing_strategy.backtest_timing_strategy('momentum_20d') print("=== 因子择时策略回测 ===") for metric, value in timing_results.items(): print(f"{metric}: {value:.4f}") ``` ### 最佳实践建议 #### 1. 参数设置优化 ```python # 针对不同市场环境调整参数 bear_market_params = { "start_date": "2018.01.01", # 包含熊市期间 "end_date": "2020.03.31" } bull_market_params = { "start_date": "2020.04.01", # 疫情后牛市 "end_date": "2021.12.31" } # 分别测试不同市场环境下的因子表现 ``` #### 2. 数据质量检查 ```python def validate_factor_data(result): """验证因子数据质量""" factor_analysis = result.get('factor_analysis', {}) ic_analysis = result.get('ic_analysis', {}) checks = { 'has_factor_analysis': bool(factor_analysis), 'has_ic_analysis': bool(ic_analysis), 'sufficient_data_points': len(ic_analysis.get('time_series', {}).get('dates', [])) > 500, 'reasonable_ic_range': abs(ic_analysis.get('statistics', {}).get('ic_mean', 0)) < 0.5, 'no_extreme_returns': abs(factor_analysis.get('long_short', {}).get('ret', 0)) < 2.0 } return all(checks.values()), checks # 使用示例 is_valid, check_details = validate_factor_data(results['momentum_20d']) print(f"数据有效性: {is_valid}") print("检查详情:", check_details) ``` #### 3. 错误处理和重试机制 ```python import time from functools import wraps def retry_on_failure(max_retries=3, delay=5): def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return await func(*args, **kwargs) except Exception as e: print(f"尝试 {attempt + 1} 失败: {e}") if attempt < max_retries - 1: print(f"等待 {delay} 秒后重试...") time.sleep(delay) else: print("重试次数已用完,分析失败") raise e return wrapper return decorator # 使用装饰器 @retry_on_failure(max_retries=3, delay=10) async def robust_factor_analysis(params): client = FactorAnalysisClient() await client.analyze_factor(params) return client.results ``` ## 🐳 Docker 部署 ### 单容器部署 #### 构建镜像 ```bash # 克隆项目 git clone cd analysis # 构建Docker镜像 docker build -t analysis:latest . # 查看镜像大小 docker images | grep analysis ``` #### 运行容器 ```bash # 基础运行 docker run -d \ --name analysis-server \ -p 8000:8000 \ analysis:latest # 完整配置运行 docker run -d \ --name analysis-server \ -p 8000:8000 \ -v $(pwd)/.env:/app/.env \ -v $(pwd)/media:/app/media \ -v $(pwd)/logs:/app/logs \ --restart unless-stopped \ --memory 4g \ --cpus 2.0 \ analysis:latest ``` #### 容器管理 ```bash # 查看容器状态 docker ps -a | grep analysis # 查看容器日志 docker logs -f analysis-server # 进入容器调试 docker exec -it analysis-server bash # 停止和删除容器 docker stop analysis-server docker rm analysis-server ``` ### Docker Compose 部署 创建 `docker-compose.yml`: ```yaml version: '3.8' services: analysis: build: . container_name: analysis-server ports: - "8000:8000" environment: - PROD=true - LOG_LEVEL=INFO volumes: - ./media:/app/media - ./logs:/app/logs - ./.env:/app/.env:ro restart: unless-stopped depends_on: - dolphindb - redis networks: - analysis-network healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] interval: 30s timeout: 10s retries: 3 start_period: 40s deploy: resources: limits: memory: 4G cpus: '2.0' reservations: memory: 2G cpus: '1.0' dolphindb: image: ddb/dolphindb:3.0.4 container_name: dolphindb-server ports: - "8848:8848" - "8900:8900" volumes: - ./dolphindb/data:/opt/dolphindb/data - ./dolphindb/log:/opt/dolphindb/log - ./dolphindb/config:/opt/dolphindb/config environment: - DDB_LICENSE_FILE=/opt/dolphindb/config/dolphindb.lic restart: unless-stopped networks: - analysis-network redis: image: redis:7-alpine container_name: redis-cache ports: - "6379:6379" volumes: - redis_data:/data command: redis-server --appendonly yes --maxmemory 1gb --maxmemory-policy allkeys-lru restart: unless-stopped networks: - analysis-network nginx: image: nginx:alpine container_name: nginx-proxy ports: - "80:80" - "443:443" volumes: - ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro - ./nginx/ssl:/etc/nginx/ssl:ro depends_on: - analysis restart: unless-stopped networks: - analysis-network volumes: redis_data: networks: analysis-network: driver: bridge ``` #### Nginx配置 (`nginx/nginx.conf`) ```nginx events { worker_connections 1024; } http { upstream analysis_backend { server analysis:8000; } map $http_upgrade $connection_upgrade { default upgrade; '' close; } server { listen 80; server_name localhost; # API代理 location /api/ { proxy_pass http://analysis_backend/; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; } # WebSocket代理 location /ws/ { proxy_pass http://analysis_backend/; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; proxy_read_timeout 300s; proxy_send_timeout 300s; } # 静态文件 location /docs { proxy_pass http://analysis_backend/docs; } } } ``` #### 启动完整环境 ```bash # 启动所有服务 docker-compose up -d # 查看服务状态 docker-compose ps # 查看日志 docker-compose logs -f analysis # 停止服务 docker-compose down # 停止并删除数据卷 docker-compose down -v ``` ### 生产环境部署 #### 多阶段构建优化 创建优化的 `Dockerfile.prod`: ```dockerfile # 第一阶段:构建依赖 FROM python:3.12-slim as builder # 安装系统依赖 RUN apt-get update && apt-get install -y \ build-essential \ curl \ && rm -rf /var/lib/apt/lists/* # 安装uv RUN pip install uv # 设置工作目录 WORKDIR /app # 复制依赖文件 COPY pyproject.toml uv.lock* ./ # 创建虚拟环境并安装依赖 RUN uv venv /opt/venv ENV PATH="/opt/venv/bin:$PATH" RUN uv pip install --no-deps -r uv.lock # 第二阶段:运行时镜像 FROM python:3.12-slim # 安装运行时系统依赖 RUN apt-get update && apt-get install -y \ libta-lib0 \ curl \ && rm -rf /var/lib/apt/lists/* # 创建非root用户 RUN useradd --create-home --shell /bin/bash analysis # 复制虚拟环境 COPY --from=builder /opt/venv /opt/venv ENV PATH="/opt/venv/bin:$PATH" # 设置工作目录 WORKDIR /app # 复制应用代码 COPY --chown=analysis:analysis . . # 切换到非root用户 USER analysis # 健康检查 HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \ CMD curl -f http://localhost:8000/health || exit 1 # 暴露端口 EXPOSE 8000 # 启动命令 CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"] ``` #### Kubernetes 部署 创建 `k8s/deployment.yaml`: ```yaml apiVersion: apps/v1 kind: Deployment metadata: name: analysis-deployment labels: app: analysis spec: replicas: 3 selector: matchLabels: app: analysis template: metadata: labels: app: analysis spec: containers: - name: analysis image: analysis:latest ports: - containerPort: 8000 env: - name: PROD value: "true" - name: DOLPHIN_HOST valueFrom: secretKeyRef: name: analysis-secrets key: dolphin-host resources: requests: memory: "2Gi" cpu: "1" limits: memory: "4Gi" cpu: "2" livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 30 readinessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 5 periodSeconds: 10 --- apiVersion: v1 kind: Service metadata: name: analysis-service spec: selector: app: analysis ports: - protocol: TCP port: 80 targetPort: 8000 type: LoadBalancer ``` ### 监控和日志 #### Prometheus监控配置 创建 `monitoring/prometheus.yml`: ```yaml global: scrape_interval: 15s scrape_configs: - job_name: 'analysis' static_configs: - targets: ['analysis:8000'] metrics_path: /metrics scrape_interval: 30s ``` #### 应用监控指标 在应用中添加监控: ```python # main.py 添加监控端点 from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST from fastapi import Response # 定义监控指标 REQUEST_COUNT = Counter('analysis_requests_total', 'Total analysis requests', ['status']) REQUEST_DURATION = Histogram('analysis_request_duration_seconds', 'Request duration') @app.get("/metrics") async def metrics(): return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) # WebSocket中添加监控 @REQUEST_DURATION.time() async def single_analysis(ws: WebSocket): try: # 原有逻辑 REQUEST_COUNT.labels(status='success').inc() except Exception: REQUEST_COUNT.labels(status='error').inc() raise ``` #### 日志配置 创建 `logging.conf`: ```ini [loggers] keys=root,analysis [handlers] keys=consoleHandler,fileHandler [formatters] keys=simpleFormatter,detailFormatter [logger_root] level=INFO handlers=consoleHandler [logger_analysis] level=DEBUG handlers=consoleHandler,fileHandler qualname=analysis propagate=0 [handler_consoleHandler] class=StreamHandler level=INFO formatter=simpleFormatter args=(sys.stdout,) [handler_fileHandler] class=handlers.RotatingFileHandler level=DEBUG formatter=detailFormatter args=('logs/analysis.log', 'a', 10485760, 5) [formatter_simpleFormatter] format=%(asctime)s - %(name)s - %(levelname)s - %(message)s [formatter_detailFormatter] format=%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s ``` ## 📚 项目结构详解 ``` analysis/ ├── main.py # FastAPI应用入口,定义路由和中间件 ├── config.py # 配置管理,环境变量加载 ├── pyproject.toml # Python项目配置,依赖声明 ├── uv.lock # 锁定的依赖版本 ├── Dockerfile # 容器化配置 ├── docker-compose.yml # 多容器编排 ├── .env.example # 环境变量示例 ├── .gitignore # Git忽略规则 ├── README.md # 项目文档 │ ├── core/ # 核心业务逻辑 │ ├── __init__.py # 包初始化,导出公共接口 │ │ │ ├── routers/ # API路由定义 │ │ ├── __init__.py │ │ ├── factor/ # 因子分析相关路由 │ │ │ ├── __init__.py │ │ │ ├── views.py # 路由定义和注册 │ │ │ └── single_analysis.py # 单因子分析WebSocket处理 │ │ └── media/ # 文件服务路由 │ │ ├── __init__.py │ │ └── views.py # 文件下载处理 │ │ │ ├── utils/ # 工具函数模块 │ │ ├── __init__.py # 工具函数统一导出 │ │ ├── derive.py # 因子计算引擎核心 │ │ ├── fetch.py # 数据获取工具 │ │ ├── send.py # WebSocket通信工具 │ │ ├── logger.py # 日志配置 │ │ └── ts_api.py # Tushare API封装 │ │ │ └── database/ # 数据库连接 │ ├── __init__.py │ └── session.py # DolphinDB会话管理 │ ├── tests/ # 测试代码 │ ├── __init__.py │ ├── test_factor_engine.py # 因子计算引擎测试 │ ├── test_api.py # API接口测试 │ └── test_integration.py # 集成测试 │ ├── notebooks/ # Jupyter笔记本 │ ├── test_alphalens.ipynb # Alphalens风格分析示例 │ ├── test_build.ipynb # 构建测试 │ └── examples/ # 更多示例 │ ├── momentum_factors.ipynb # 动量因子研究 │ ├── technical_indicators.ipynb # 技术指标因子 │ └── factor_combination.ipynb # 因子组合优化 │ ├── scripts/ # 脚本文件 │ ├── setup_db.py # 数据库初始化脚本 │ ├── data_migration.py # 数据迁移脚本 │ └── benchmark_test.py # 性能基准测试 │ ├── docs/ # 文档目录 │ ├── api.md # API详细文档 │ ├── factor_syntax.md # 因子语法手册 │ ├── deployment.md # 部署指南 │ └── contributing.md # 贡献指南 │ ├── config/ # 配置文件 │ ├── logging.conf # 日志配置 │ ├── nginx.conf # Nginx配置 │ └── supervisor.conf # 进程管理配置 │ ├── media/ # 存储目录 (运行时创建) │ └── .gitkeep # 保持目录存在 │ └── logs/ # 日志目录 (运行时创建) └── .gitkeep # 保持目录存在 ``` ### 核心模块详解 #### 1. 因子计算引擎 (`core/utils/derive.py`) **设计理念**: 基于抽象语法树(AST)的表达式解析和执行引擎 **关键特性**: - 支持200+种操作类型 - 嵌套表达式递归解析 - 按股票分组并行计算 - 内置错误处理和类型检查 **核心函数**: ```python def apply_operation(df: pd.DataFrame, op_def: dict) -> pd.Series: """在单只股票数据上执行操作定义""" def compute_derived(df: pd.DataFrame, derived_defs: list[dict]) -> pd.DataFrame: """批量计算衍生因子""" ``` **性能优化**: - 向量化操作减少循环 - 惰性求值避免不必要计算 - 结果缓存提高重复计算效率 #### 2. 数据获取层 (`core/utils/fetch.py`) **设计模式**: 适配器模式,统一多数据源接口 **功能特性**: - 异步数据获取 - 自动重试和容错 - 数据格式标准化 - 缓存机制 **数据流程**: ``` 外部API → 原始数据 → 格式转换 → 数据验证 → 返回DataFrame ``` #### 3. WebSocket通信 (`core/routers/factor/single_analysis.py`) **通信协议**: 基于JSON的状态消息 + 二进制结果传输 **状态机**: ``` 连接建立 → 参数接收 → 数据获取 → 因子计算 → 分析执行 → 结果发送 → 连接关闭 ``` **错误处理**: 全链路异常捕获,友好错误提示 #### 4. DolphinDB集成 (`core/database/session.py`) **会话管理**: 连接池 + 会话复用 **数据上传**: Python对象自动序列化为DolphinDB表格 **脚本执行**: 动态生成DolphinDB脚本,执行专业金融分析 ### 数据流架构 ``` ┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ │ 用户请求 │───▶│ 参数验证 │───▶│ 获取成分股 │ └─────────────────┘ └──────────────────┘ └─────────────────┘ │ ┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ │ 结果返回 │◀───│ DolphinDB分析 │◀───│ 获取基础数据 │ └─────────────────┘ └──────────────────┘ └─────────────────┘ │ ┌──────────────────┐ ┌─────────────────┐ │ 数据预处理 │◀───│ 计算衍生因子 │ └──────────────────┘ └─────────────────┘ ``` ## 🔧 开发指南 ### 开发环境配置 #### 1. 安装开发依赖 ```bash # 安装开发工具 uv pip install -e ".[dev]" # 安装pre-commit hooks pre-commit install # 安装测试覆盖工具 uv pip install pytest-cov coverage ``` #### 2. IDE配置 **VS Code 配置** (`.vscode/settings.json`): ```json { "python.defaultInterpreterPath": "./.venv/bin/python", "python.linting.enabled": true, "python.linting.pylintEnabled": true, "python.formatting.provider": "black", "python.sortImports.provider": "isort", "files.autoSave": "onFocusChange", "editor.formatOnSave": true } ``` **PyCharm 配置**: - 项目解释器: 选择虚拟环境中的Python - 代码风格: 启用Black和isort - 调试配置: FastAPI应用启动配置 ### 代码规范 #### 1. Python代码风格 使用Black + isort + pylint组合: **pyproject.toml配置**: ```toml [tool.black] line-length = 88 target-version = ['py312'] include = '\.pyi?$' extend-exclude = ''' /( # 第三方库 | \.git | \.mypy_cache | \.pytest_cache | build | dist )/ ''' [tool.isort] profile = "black" multi_line_output = 3 line_length = 88 skip_gitignore = true [tool.pylint] max-line-length = 88 disable = [ "missing-docstring", "too-few-public-methods", "import-outside-toplevel" ] ``` #### 2. 提交信息规范 使用 Conventional Commits: ``` feat: 添加新的技术指标支持 fix: 修复IC计算中的数据对齐问题 docs: 更新API文档 style: 代码格式调整 refactor: 重构因子计算引擎 test: 添加WebSocket通信测试 chore: 更新依赖版本 ``` ### 测试框架 #### 1. 单元测试 ```python # tests/test_factor_engine.py import pytest import pandas as pd import numpy as np from core.utils.derive import apply_operation, compute_derived class TestFactorEngine: @pytest.fixture def sample_data(self): """生成测试数据""" dates = pd.date_range('2020-01-01', periods=100, freq='D') codes = ['000001.SZ', '000002.SZ'] data = [] for code in codes: for date in dates: data.append({ 'time': date, 'code': code, 'close_hfq': 100 + np.random.randn() * 5, 'vol': 1000000 + np.random.randint(0, 500000) }) return pd.DataFrame(data) def test_rolling_mean_operation(self, sample_data): """测试滚动均值操作""" # 单只股票数据 single_stock = sample_data[sample_data['code'] == '000001.SZ'].copy() op_def = { "op": "rolling_mean", "col": "close_hfq", "window": 5 } result = apply_operation(single_stock, op_def) # 验证结果 assert len(result) == len(single_stock) assert pd.isna(result.iloc[:4]).all() # 前4个值应为NaN assert not pd.isna(result.iloc[4]) # 第5个值开始有效 def test_talib_integration(self, sample_data): """测试TA-Lib集成""" single_stock = sample_data[sample_data['code'] == '000001.SZ'].copy() op_def = { "op": "talib", "func": "RSI", "prices": ["close_hfq"], "params": {"timeperiod": 14} } result = apply_operation(single_stock, op_def) # 验证RSI结果范围 valid_rsi = result.dropna() assert (valid_rsi >= 0).all() assert (valid_rsi <= 100).all() def test_compute_derived_batch(self, sample_data): """测试批量因子计算""" derived_defs = [ { "name": "ma_5", "definition": { "op": "rolling_mean", "col": "close_hfq", "window": 5 } }, { "name": "momentum_10", "definition": { "op": "pct_change", "col": "close_hfq", "periods": 10 } } ] result = compute_derived(sample_data, derived_defs) # 验证结果 assert 'ma_5' in result.columns assert 'momentum_10' in result.columns assert len(result) == len(sample_data) def test_complex_expression(self, sample_data): """测试复合表达式""" single_stock = sample_data[sample_data['code'] == '000001.SZ'].copy() # 布林带位置 = (价格 - 均值) / (2 * 标准差) op_def = { "op": "div", "left": { "op": "sub", "left": "close_hfq", "right": { "op": "rolling_mean", "col": "close_hfq", "window": 20 } }, "right": { "op": "mul", "left": { "op": "rolling_std", "col": "close_hfq", "window": 20 }, "right": 2 } } result = apply_operation(single_stock, op_def) # 验证布林带位置合理范围 valid_result = result.dropna() assert (valid_result >= -3).all() # 通常在±3倍标准差内 assert (valid_result <= 3).all() ``` #### 2. 集成测试 ```python # tests/test_integration.py import pytest import asyncio from unittest.mock import AsyncMock, patch from core.routers.factor.single_analysis import single_analysis @pytest.mark.asyncio class TestIntegration: async def test_websocket_analysis_flow(self): """测试完整的WebSocket分析流程""" # 模拟WebSocket mock_ws = AsyncMock() # 模拟接收参数 mock_ws.receive_json.return_value = { "index_code": "399300.SZ", "period": "DAILY", "required_fields": ["close_hfq"], "derived_columns": [{ "name": "test_factor", "definition": { "op": "rolling_mean", "col": "close_hfq", "window": 5 } }], "start_date": "2023.01.01", "end_date": "2023.12.31", "save_file": False } # 模拟外部依赖 with patch('core.utils.ts_api.pro') as mock_pro, \ patch('core.utils.fetch.fetch_stock_daily_data') as mock_fetch, \ patch('core.database.session.create_session') as mock_session: # 配置模拟数据 mock_pro.index_weight.return_value = pd.DataFrame({ 'trade_date': ['20231201', '20231201'], 'con_code': ['000001.SZ', '000002.SZ'] }) mock_fetch.return_value = pd.DataFrame({ 'time': pd.date_range('2023-01-01', periods=200), 'code': ['000001.SZ'] * 200, 'close_hfq': np.random.randn(200) + 100, 'circ_mv': np.random.randn(200) + 1000000, 'pct_chg_hfq': np.random.randn(200) * 0.05 }) # 执行分析 await single_analysis(mock_ws) # 验证调用 mock_ws.accept.assert_called_once() mock_ws.send_bytes.assert_called() def test_api_error_handling(self): """测试API错误处理""" # 测试各种错误情况 pass ``` #### 3. 性能测试 ```python # tests/test_performance.py import time import pytest from core.utils.derive import compute_derived class TestPerformance: @pytest.mark.performance def test_large_dataset_computation(self): """测试大数据集计算性能""" # 生成大规模测试数据 (1000只股票 * 252天) n_stocks = 1000 n_days = 252 data = generate_large_test_data(n_stocks, n_days) derived_defs = [{ "name": "complex_factor", "definition": { "op": "div", "left": { "op": "rolling_mean", "col": "close_hfq", "window": 20 }, "right": { "op": "rolling_std", "col": "close_hfq", "window": 20 } } }] # 测试计算时间 start_time = time.time() result = compute_derived(data, derived_defs) end_time = time.time() computation_time = end_time - start_time # 性能断言 (应在合理时间内完成) assert computation_time < 30 # 30秒内完成 assert len(result) == len(data) assert 'complex_factor' in result.columns print(f"大数据集计算耗时: {computation_time:.2f}秒") print(f"处理数据量: {n_stocks}只股票 × {n_days}天 = {len(data)}行") print(f"计算速度: {len(data)/computation_time:.0f}行/秒") ``` ### 调试技巧 #### 1. 本地调试 ```python # 设置断点调试WebSocket import pdb async def single_analysis(ws: WebSocket): await ws.accept() # 设置断点 pdb.set_trace() try: param = await get_param(ws) # ... 其他逻辑 ``` #### 2. 日志调试 ```python # 详细日志配置 import logging # 设置调试级别 logger = logging.getLogger('analysis') logger.setLevel(logging.DEBUG) # 在关键位置添加日志 logger.debug(f"接收到参数: {param}") logger.info(f"开始计算因子: {factor_name}") logger.warning(f"数据量较大: {len(data_df)} 行") ``` ### 贡献指南 #### 1. 提交流程 1. **Fork项目**: 从主仓库fork到个人仓库 2. **创建分支**: `git checkout -b feature/new-indicator` 3. **开发功能**: 编写代码和测试 4. **运行测试**: `pytest tests/` 确保所有测试通过 5. **代码检查**: `black . && isort . && pylint core/` 6. **提交代码**: `git commit -m "feat: 添加MACD技术指标"` 7. **推送分支**: `git push origin feature/new-indicator` 8. **创建PR**: 在GitHub创建Pull Request #### 2. 添加新因子操作 ```python # 在 core/utils/derive.py 的 apply_operation 函数中添加 elif op == 'your_new_operation': col = resolve_col(op_def['col']) param = op_def.get('param', default_value) # 实现你的操作逻辑 result = your_calculation(col, param) return result ``` #### 3. 扩展数据源 ```python # 在 core/utils/fetch.py 中添加新的数据获取函数 async def fetch_alternative_data(start_date, end_date, codes, factors): """获取替代数据源""" async with aiohttp.ClientSession() as session: # 实现数据获取逻辑 pass return processed_dataframe ``` ### 性能优化 #### 1. 代码层面优化 - 使用向量化操作替代循环 - 合理使用缓存减少重复计算 - 异步IO提高并发性能 - 内存管理避免内存泄漏 #### 2. 系统层面优化 - 调整DolphinDB内存配置 - 使用Redis缓存热点数据 - 配置连接池减少连接开销 - 负载均衡分散请求压力 ## ❓ 常见问题解答 ### 安装和配置问题 #### Q1: 安装TA-Lib时报错 "Microsoft Visual C++ 14.0 is required" **A**: Windows用户需要安装Visual Studio Build Tools或使用预编译的wheel包: ```bash # 方法1: 使用conda安装 (推荐) conda install -c conda-forge ta-lib # 方法2: 使用预编译wheel pip install --find-links https://www.lfd.uci.edu/~gohlke/pythonlibs/ TA-Lib # 方法3: 安装Visual Studio Build Tools # 下载并安装: https://visualstudio.microsoft.com/visual-cpp-build-tools/ ``` #### Q2: DolphinDB连接超时怎么办? **A**: 检查以下几个方面: ```bash # 1. 确认DolphinDB服务是否启动 netstat -tlnp | grep 8848 # 2. 检查防火墙设置 sudo ufw allow 8848 # 3. 验证连接参数 telnet localhost 8848 # 4. 检查DolphinDB配置文件 cat dolphindb.cfg | grep -E "(localSite|maxConnections)" ``` #### Q3: Tushare API调用频率限制 **A**: 根据账户权限合理设置调用频率: ```python # 基础权限用户 - 每分钟120次调用 - 建议加入延时: time.sleep(0.5) # 高级权限用户 - 每分钟500次调用 - 可以适当提高并发数 # 解决方案 import time from functools import wraps def rate_limit(calls_per_minute=100): def decorator(func): last_called = [0.0] @wraps(func) def wrapper(*args, **kwargs): elapsed = time.time() - last_called[0] left_to_wait = 60.0 / calls_per_minute - elapsed if left_to_wait > 0: time.sleep(left_to_wait) ret = func(*args, **kwargs) last_called[0] = time.time() return ret return wrapper return decorator ``` ### 使用问题 #### Q4: 因子计算结果全为NaN **A**: 检查以下几个方面: 1. **数据窗口不足**: ```python # 确保数据长度 > 计算窗口 if len(data) < window_size: print(f"数据不足: {len(data)} < {window_size}") ``` 2. **数据类型错误**: ```python # 确保数值列为数值类型 data['close_hfq'] = pd.to_numeric(data['close_hfq'], errors='coerce') ``` 3. **时间排序问题**: ```python # 确保按时间正确排序 data = data.sort_values(['code', 'time']) ``` #### Q5: WebSocket连接中断 **A**: 实现重连机制: ```python async def websocket_with_retry(url, max_retries=3): for attempt in range(max_retries): try: async with websockets.connect(url) as ws: # 正常处理逻辑 return await handle_websocket(ws) except (websockets.ConnectionClosed, asyncio.TimeoutError) as e: print(f"连接失败 (尝试 {attempt + 1}/{max_retries}): {e}") if attempt < max_retries - 1: await asyncio.sleep(5) # 等待5秒后重试 else: raise ``` #### Q6: 内存使用过高 **A**: 优化内存使用: ```python # 1. 分批处理大数据集 def process_in_chunks(data, chunk_size=10000): for i in range(0, len(data), chunk_size): chunk = data.iloc[i:i+chunk_size] yield process_chunk(chunk) # 2. 及时释放内存 import gc del large_dataframe gc.collect() # 3. 使用更高效的数据类型 data = data.astype({ 'code': 'category', # 字符串转为类别类型 'volume': 'int32', # 降低数值精度 }) ``` ### 性能问题 #### Q7: 分析速度太慢 **A**: 性能优化策略: 1. **减少数据量**: ```python # 限制时间范围 params = { "start_date": "2022.01.01", # 缩短时间窗口 "end_date": "2023.12.31" } # 限制股票池 params = { "index_code": "399300.SZ", # 使用较小的股票池 } ``` 2. **优化因子表达式**: ```python # 避免过深的嵌套 bad_factor = { "op": "div", "left": {"op": "rolling_mean", "col": "close", "window": 20}, "right": {"op": "rolling_std", "col": "close", "window": 20} } # 拆分为多个简单因子 good_factors = [ {"name": "ma20", "definition": {"op": "rolling_mean", "col": "close", "window": 20}}, {"name": "std20", "definition": {"op": "rolling_std", "col": "close", "window": 20}}, {"name": "ratio", "definition": {"op": "div", "left": "ma20", "right": "std20"}} ] ``` 3. **使用缓存**: ```python # 启用数据缓存 params = { "cache_enabled": True, "cache_ttl": 3600 # 1小时缓存 } ``` ### 数据问题 #### Q8: 股票代码格式不正确 **A**: 统一股票代码格式: ```python def normalize_stock_code(code): """标准化股票代码格式""" code = code.upper().strip() if '.' not in code: if code.startswith('00') or code.startswith('30'): code += '.SZ' elif code.startswith('60') or code.startswith('68'): code += '.SH' return code # 批量处理 codes = [normalize_stock_code(c) for c in raw_codes] ``` #### Q9: 缺失数据处理 **A**: 数据清洗策略: ```python # 1. 检查缺失比例 missing_ratio = data.isnull().sum() / len(data) print(missing_ratio) # 2. 删除缺失过多的列 data = data.drop(columns=missing_ratio[missing_ratio > 0.3].index) # 3. 前向填充 data = data.groupby('code').fillna(method='ffill') # 4. 删除仍有缺失的行 data = data.dropna() ``` ### 错误排查 #### Q10: 如何查看详细错误信息? **A**: 启用调试模式: ```python # 1. 设置日志级别 import logging logging.basicConfig(level=logging.DEBUG) # 2. 在.env文件中启用调试 DEBUG=true LOG_LEVEL=DEBUG # 3. 查看WebSocket错误详情 ws.onclose = function(event) { console.error('WebSocket关闭:', event.code, event.reason); }; ws.onerror = function(error) { console.error('WebSocket错误:', error); }; ``` #### Q11: DolphinDB执行出错 **A**: 检查DolphinDB脚本: ```python # 在执行前打印生成的脚本 print("执行的DolphinDB脚本:") print(dolphindb_script) # 检查DolphinDB日志 tail -f /path/to/dolphindb/logs/dolphindb.log # 验证数据上传 session.run("schema(dataTB)") # 查看表结构 session.run("size(dataTB)") # 查看行数 ``` ## 🚨 安全注意事项 ### 1. 环境变量安全 ```bash # 不要将敏感信息提交到Git echo ".env" >> .gitignore echo "*.log" >> .gitignore # 使用强密码 DOLPHIN_PASSWORD=ComplexPassword123! TUSHARE_TOKEN=your_real_token_here ``` ### 2. API访问控制 ```python # 添加访问限制 @app.middleware("http") async def rate_limiting_middleware(request: Request, call_next): # 实现频率限制逻辑 pass # 添加身份验证 @app.middleware("http") async def auth_middleware(request: Request, call_next): # 实现认证逻辑 pass ``` ### 3. 数据安全 - 定期备份重要数据 - 加密敏感配置文件 - 限制文件访问权限 - 清理临时文件 ## 📊 监控与运维 ### 系统监控 #### 关键指标 ```python # 应用层指标 - 请求响应时间 - WebSocket连接数 - 因子计算成功率 - 错误发生频率 # 系统层指标 - CPU使用率 - 内存使用率 - 磁盘IO - 网络带宽 # DolphinDB指标 - 连接数 - 查询执行时间 - 内存使用量 - 数据表大小 ``` #### 告警配置 ```yaml # Prometheus告警规则 groups: - name: analysis.rules rules: - alert: HighErrorRate expr: rate(analysis_requests_total{status="error"}[5m]) > 0.1 for: 5m labels: severity: warning annotations: summary: "因子分析错误率过高" - alert: HighMemoryUsage expr: process_resident_memory_bytes / 1024 / 1024 > 4096 for: 10m labels: severity: critical annotations: summary: "内存使用超过4GB" ``` ### 日志管理 #### 日志轮转配置 ```python # logging.conf [handler_fileHandler] class=handlers.RotatingFileHandler args=('logs/analysis.log', 'a', 104857600, 10) # 100MB轮转,保留10个文件 ``` #### 结构化日志 ```python import structlog logger = structlog.get_logger() logger.info( "factor_analysis_completed", factor_name="momentum_20d", duration_seconds=15.6, data_points=50000, ic_value=0.045 ) ``` ## 🔄 升级指南 ### 版本升级流程 #### 1. 备份数据 ```bash # 备份配置文件 cp .env .env.backup # 备份媒体文件 tar -czf media_backup.tar.gz media/ # 备份DolphinDB数据 cp -r /path/to/dolphindb/data /backup/location/ ``` #### 2. 更新代码 ```bash # 停止服务 docker-compose down # 拉取最新代码 git pull origin main # 更新依赖 uv pip install -e . # 重新构建镜像 docker-compose build # 启动服务 docker-compose up -d ``` #### 3. 验证升级 ```bash # 检查服务状态 curl http://localhost:8000/health # 运行测试 pytest tests/ # 验证核心功能 python scripts/integration_test.py ``` ### 兼容性说明 #### API兼容性 - 主版本升级可能包含破坏性变更 - 次版本升级保持向后兼容 - 补丁版本仅包含错误修复 #### 数据格式兼容性 - 因子表达式格式保持稳定 - WebSocket消息格式向后兼容 - 结果文件格式可能在主版本间变化 ## 🎯 路线图 ### 近期计划 (3个月) - [ ] 添加更多技术指标支持 (KDJ, BOLL, CCI等) - [ ] 优化大数据集处理性能 - [ ] 增加因子组合分析功能 - [ ] 完善API文档和示例 ### 中期计划 (6个月) - [ ] 支持分钟级高频数据分析 - [ ] 添加机器学习因子挖掘 - [ ] 实现实时因子监控告警 - [ ] 开发Web前端界面 ### 长期计划 (1年) - [ ] 支持多资产类别(期货、债券等) - [ ] 集成风险管理模块 - [ ] 添加回测引擎和策略评估 - [ ] 构建因子数据库和知识图谱 ## 🤝 贡献指南 ### 参与贡献 我们欢迎各种形式的贡献: - 🐛 报告Bug和问题 - 💡 提出新功能建议 - 📝 改进文档 - 🔧 提交代码修复 - 🧪 编写测试用例 - 🌍 翻译和本地化 ### 贡献流程 1. **Fork项目** - 在GitHub上fork本项目 2. **创建分支** - 基于main分支创建功能分支 3. **开发功能** - 实现新功能或修复bug 4. **编写测试** - 为新功能添加测试用例 5. **更新文档** - 更新相关文档和示例 6. **提交代码** - 使用规范的提交信息 7. **创建PR** - 创建Pull Request并等待review 8. **代码review** - 根据反馈修改代码 9. **合并代码** - 通过review后合并到主分支 ### 代码审查标准 - **功能完整性**: 新功能是否完整实现需求 - **代码质量**: 遵循项目编码规范 - **测试覆盖**: 包含充分的单元测试和集成测试 - **文档完整**: 更新相关API文档和使用说明 - **性能考虑**: 不引入性能退化 - **兼容性**: 保持向后兼容性 ### 认可贡献者 感谢所有为本项目做出贡献的开发者: - **核心开发者**: 主要功能开发和架构设计 - **测试工程师**: 测试用例编写和质量保证 - **文档贡献者**: 文档编写和翻译 - **社区维护者**: 问题解答和社区管理 - **用户反馈者**: Bug报告和功能建议 ## 📄 许可证 本项目采用 **MIT许可证** 开源,详见 [LICENSE](LICENSE) 文件。 ### 许可证要点 - ✅ **商业使用**: 允许用于商业项目 - ✅ **修改代码**: 允许修改和定制 - ✅ **分发代码**: 允许分发原始或修改版本 - ✅ **私人使用**: 允许个人使用和学习 - ⚠️ **责任限制**: 不承担使用风险和责任 - 📝 **保留版权**: 需保留原始版权声明 ### 第三方依赖 本项目使用的主要开源依赖: | 依赖库 | 许可证 | 用途 | |--------|--------|------| | FastAPI | MIT | Web框架 | | pandas | BSD-3-Clause | 数据处理 | | TA-Lib | BSD-2-Clause | 技术分析 | | DolphinDB Python API | Apache-2.0 | 数据库接口 | | aiohttp | Apache-2.0 | HTTP客户端 | ## 📞 支持与联系 ### 获取帮助 1. **查看文档** - 首先查看本README和项目文档 2. **搜索Issue** - 在GitHub Issues中搜索相关问题 3. **创建Issue** - 如果没有找到答案,创建新的Issue 4. **讨论区** - 在GitHub Discussions参与社区讨论 ### 问题反馈 如遇到问题,请提供以下信息: ``` **环境信息**: - 操作系统: (如 Ubuntu 20.04) - Python版本: (如 Python 3.12.1) - 项目版本: (如 v0.1.0) - DolphinDB版本: (如 3.0.4) **问题描述**: 简要描述遇到的问题 **复现步骤**: 1. 执行命令... 2. 设置参数... 3. 观察到错误... **期望行为**: 描述期望的正确行为 **实际行为**: 描述实际发生的错误行为 **错误日志**: ``` 粘贴相关的错误日志 ``` **其他信息**: 任何可能有助于解决问题的额外信息 ``` ### 社区交流 - 📧 **邮件联系**: analysis-support@example.com - 💬 **微信群**: 添加微信号 xxx 加入讨论群 - 📱 **QQ群**: 123456789 (量化因子分析交流群) - 🐦 **Twitter**: @analysis_project - 📺 **B站专栏**: 定期发布使用教程和技术分享 ### 商业支持 如需商业级支持和定制开发,请联系: - 📧 **商务邮箱**: business@example.com - 📞 **联系电话**: +86-xxx-xxxx-xxxx - 🏢 **公司地址**: 北京市朝阳区xxx路xxx号 **商业服务内容**: - 🔧 定制化功能开发 - 🚀 性能优化和调优 - 📚 专业培训和咨询 - 🛡️ 7x24技术支持 - ☁️ 云服务部署和运维 ---
**⭐ 如果这个项目对您有帮助,请给我们一个Star!** **🔄 持续关注项目更新,获取最新功能特性** **🤝 欢迎加入社区,与志同道合的量化研究者交流** --- *构建专业的量化因子分析平台,赋能量化投资研究* 🚀