# bulk_data_framework **Repository Path**: taikun928/bulk_data_framework ## Basic Information - **Project Name**: bulk_data_framework - **Description**: 大批量数据本地处理框架 - **Primary Language**: Python - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-01-08 - **Last Updated**: 2026-01-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # bulk_data_framework 本项目是在本地环境处理海量 CSV 的一站式框架:主程序顺序读取巨型 CSV, 将其切成固定行数的小块写入 `chunk_cache/`,再把每个切块交给 Celery 任务后台异步 执行,完成过滤、清洗以及按券分文件输出。 ## 环境依赖 1. Python 3.11+ 2. Redis(本地或远端均可),通过 `.env` 指定连接: ```ini REDIS_HOST=127.0.0.1 REDIS_PORT=6379 REDIS_DB=0 REDIS_PASSWORD= ``` 3. 安装依赖: ```bash python -m pip install -r requirements.txt ``` ## 运行流程 1. **启动 Celery Worker**(Windows 需 `--pool=solo`): ```bash celery -A celery_app worker -l info --pool=solo ``` 2. **执行调度脚本**,将源 CSV 切块并投递到 Celery: ```bash python split_and_dispatch.py --input 12151000-100.csv --chunk-size 50000 ``` - 切块文件保存到 `chunk_cache/` - 每个切块触发一次 `process_chunk` 任务 3. **查看输出** 任务完成后,输出目录 `output_csvs/` 下会以交易日期命名(同一交易日的不同券会汇总到同一个 CSV),例如 `20251215.csv`。 文件内容固定为: ``` securityID,MDEntryType,MDEntryPx,MDEntrySize,PartyName,updateTime ``` 仅保留带 `PartyName` 且 `VenueTypeGW` 符合配置的 `NoMDEntries` 明细,并把 `MDEntryDate + MDEntryTime` 组合成 `updateTime`。 > 可使用 `celery -A celery_app inspect active` 等命令查看任务进度。 ## 常用参数 - `--input/-i`:源 CSV 路径(默认 `12151000-100.csv`) - `--chunk-size/-c`:每个切块包含的行数 - `--chunk-dir/-d`:切块缓存目录(默认 `chunk_cache/`) - `--encoding/-e`:源文件编码(默认 `utf-8`) ## 自定义过滤/输出 `config.py` 汇集了所有可调项: - `EXTRACT_FIELDS`:输出文件列名,默认即 `securityID,MDEntryType,MDEntryPx,MDEntrySize,PartyName,updateTime` - `DATE_FIELD`:控制输出文件名中的日期来源(输出命名格式为 `.csv`) - `CHUNK_SIZE` / `CHUNK_DIR` / `OUTPUT_DIR`:切块大小与目录 - `VALUE_SUFFIXES`:按字段定义需要去掉的字符串后缀,例如去除 `securityID` 的 `=ct` - `ALLOWED_SECURITIES`:若仅输出某些券,将券代码写进集合即可,如 `{"160408IB","257256SH"}` - `should_keep(record)`:自定义整行过滤逻辑 - `clean_record(record)`:统一清洗步骤(去除空白、空串→`None` 等) - `ALLOWED_VENUE_TYPES`:限定 `NoMDEntries` 中 `VenueTypeGW` 的取值,默认只保留 `"QDMESP"`,留空集合即可取消限制 - `tasks._build_output_rows`:默认仅展开同时满足 `PartyName` 与 `ALLOWED_VENUE_TYPES` 条件的条目,可在此根据需要调整 - `tasks.py` 在写入前会加载同名 CSV 已有的记录,并用整行内容去重,避免重复数据再次 写入。如果输出极大,可以考虑根据业务自定义更紧凑的去重键(例如 `MDEntryID`) 修改配置后重新运行调度脚本即可;Celery Worker 会在处理每个切块时读取最新配置。 若需要进一步自定义展开逻辑,可在 `tasks.py` 中扩展。