# sync-mongo **Repository Path**: freeqiao/sync-mongo ## Basic Information - **Project Name**: sync-mongo - **Description**: mongo 同步导入到ElasticSearch或者mongo,支持数据清洗操作 - **Primary Language**: Java - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 0 - **Created**: 2019-08-19 - **Last Updated**: 2022-10-30 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # mongo同步数据 #### 项目介绍 项目中日志等数据放在了mongo中,每个月都会插入到一个新集合中,写了个同步数据自动增量同步程序 1. 支持历史数据迁移,从集合第一个开始同步到最后一个(之前局域网内1亿多数据迁移包含清洗到2、3天) 2. 支持增量数据更新,最新数据入mongo中后会在约定时间内同步到es中区(默认30秒) 3. 支持数据清洗扩展,如指定数据过滤收集,数据格式内容重写,扩展代码实现手机号等信息补充 4. 多环境配置执行,支持java和纯js版本数据同步,具体看env使用哪个(java版本性能远高于js版本) 5. 支持ssh随带代理,打通网络(通过跳板网络抓取同步数据) 6. 支持一条记录读取按不同规则分发到多个es或者mongo中区(能节省宽带) 7. 数据源获取单线程同步,按objectid增量查询(主键利用),同步过程不会对数据库造成任何性能压力 8. 系统简单,纯java代码编写,不依赖三方框架,只添加了java驱动等基础jar完成同步, 9. 支持mongo数据定制查询拆机,能通过”sql“过滤有缩小数据范围,加快同步速度 10 支持多任务采集 #### 技术实现核心思路 1. mongo数据库objectID是根据 时间戳+机器+进程号+随机数,因为id是由各个插入业务节点单独计算,考虑到业务节点生成到插入时间,假如是接口业务开始生成时候到实际业务完成(其他系统超时等)总共时间,这个时间范围设为”延时时间“,然后根据延时时间确定程序最后采集到数据id最大值 如 延时30s 定时任务每秒执行一次采集,采集数据应该是 上一次记录到小于 (时间戳-30s)+0000000000 这个值 #### 系统配置 系统配置都在env下面 1. es.json es数据库的配置文件 2. mongo.json mongo配置文件,可以支持代理连接 3. loadjs.json 加载js配置文件,如js加密框架可以通过文件引入函数,此文件也引入自定义myfunction.js文件 4. myfunction.js 自定义js函数文件,清洗操作,接口转换可以在这里定义 5. sync.json 数据同步任务配置
{
"source": "xh", # mongo.json数据源id
"collection": "20[0-9]{2}[_]webServiceLog[_][0-9]{2}", # 数据源中集合名称支持正则匹配,多个按名称排序采集
"use": true, # 采集任务是否执行标记
"where": [ #数据源查询数据时候过滤条件,比如查询状态200数据都可以这里过滤,如果数据里面含有太多非目的数据,可以考虑在查询过程中过滤,如果少量可以在js函数或java函数中处理
{
"name": "apiKey",
"value": "12121121"
}
],
"plugs": [ #可以不用
"org.zezs.sync.mongo.plugs.phone.PhoneLocationCache" #js处理程序中 定制化手机定位插件,建议看代码 通过 “setPhoneCache” 函数将java中读取映射关系map传入js函数环境变量中
],
"chains": [ # 采集任务通道
{
"dest": "152local", # 同步到目的库名/index
"destType": "es", # 同步到目的类型 es/mongo
"db": "webservicelog_xhpro", # 同步到目的库名/index
"table": "webservicelog", # 同步到目的集合名/type
"jsscript": "return logData(obj)", # 如果是js清洗话,可以用这个执行myfunction.js文件中自定义清洗函数
"javaProesser":"org.zezs.sync.mongo.dataProess.MyLoggerProesser", # 如果不配置java清洗规则,执行js清洗,如果配置java执行java代码中处理规则,这个需要编写java代码
"mappings": [ #提取字段,用来直接提取数据源中的字段,具体看org.zezs.sync.mongo.config.sync.metadata.Mappings类,支持字段重新命名(如果指定destField),如果写了value标示固定值
{
"sourceField": "visitDate"
},
{
"sourceField": "apiKey"
},
{
"sourceField": "status"
},
{
"sourceField": "is_charge"
},
{
"sourceField": "consumeTime"
}
],
"rules": [ #老版本规则,完全配置化同步清洗数据 建议直接用jsscript(默认js处理器) +myfunction.js完成,此处空集合配置即可
{
"id": "规则编号,唯一即可",
"where": [ #过滤数据映射条件
{
"name": "apiKey",
"value": "121212121"
}
], #数据转换规则
"jsscript": "if(obj.status=='0'){req=JSON.parse(obj.requestParameter) ; rsp=JSON.parse(obj.returnParam); if(rsp.code=='0'&&!req.dc&&req.KEY!='QQ4eau58'&&rsp.data.result=='0') return {'n':SYNC_DES(req.name),'p':SYNC_DES(req.phone)}}"
}
],
"use": true #采集任务是否启用
}
],
"limit": 1000 #从数据源采集数据,每次1000条数据
},
# 最终建议
关于数据处理,统一在myfunction.js或者javaProesser中处理,如果有java编程能力建议用javaProesser,这个处理速度快
# 使用说明
使用pom.xml打包,自己定义好环境选择,然后用java -jar sync-mongo-0.0.1-SNAPSHOT.jar 执行同步程序