# sync-mongo **Repository Path**: freeqiao/sync-mongo ## Basic Information - **Project Name**: sync-mongo - **Description**: mongo 同步导入到ElasticSearch或者mongo,支持数据清洗操作 - **Primary Language**: Java - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 0 - **Created**: 2019-08-19 - **Last Updated**: 2022-10-30 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # mongo同步数据 #### 项目介绍 项目中日志等数据放在了mongo中,每个月都会插入到一个新集合中,写了个同步数据自动增量同步程序 1. 支持历史数据迁移,从集合第一个开始同步到最后一个(之前局域网内1亿多数据迁移包含清洗到2、3天) 2. 支持增量数据更新,最新数据入mongo中后会在约定时间内同步到es中区(默认30秒) 3. 支持数据清洗扩展,如指定数据过滤收集,数据格式内容重写,扩展代码实现手机号等信息补充 4. 多环境配置执行,支持java和纯js版本数据同步,具体看env使用哪个(java版本性能远高于js版本) 5. 支持ssh随带代理,打通网络(通过跳板网络抓取同步数据) 6. 支持一条记录读取按不同规则分发到多个es或者mongo中区(能节省宽带) 7. 数据源获取单线程同步,按objectid增量查询(主键利用),同步过程不会对数据库造成任何性能压力 8. 系统简单,纯java代码编写,不依赖三方框架,只添加了java驱动等基础jar完成同步, 9. 支持mongo数据定制查询拆机,能通过”sql“过滤有缩小数据范围,加快同步速度 10 支持多任务采集 #### 技术实现核心思路 1. mongo数据库objectID是根据 时间戳+机器+进程号+随机数,因为id是由各个插入业务节点单独计算,考虑到业务节点生成到插入时间,假如是接口业务开始生成时候到实际业务完成(其他系统超时等)总共时间,这个时间范围设为”延时时间“,然后根据延时时间确定程序最后采集到数据id最大值 如 延时30s 定时任务每秒执行一次采集,采集数据应该是 上一次记录到小于 (时间戳-30s)+0000000000 这个值 #### 系统配置 系统配置都在env下面 1. es.json es数据库的配置文件 2. mongo.json mongo配置文件,可以支持代理连接 3. loadjs.json 加载js配置文件,如js加密框架可以通过文件引入函数,此文件也引入自定义myfunction.js文件 4. myfunction.js 自定义js函数文件,清洗操作,接口转换可以在这里定义 5. sync.json 数据同步任务配置

{
    "source": "xh",  # mongo.json数据源id
    "collection": "20[0-9]{2}[_]webServiceLog[_][0-9]{2}",  # 数据源中集合名称支持正则匹配,多个按名称排序采集
    "use": true,  # 采集任务是否执行标记
    "where": [    #数据源查询数据时候过滤条件,比如查询状态200数据都可以这里过滤,如果数据里面含有太多非目的数据,可以考虑在查询过程中过滤,如果少量可以在js函数或java函数中处理
          {
            "name": "apiKey",
            "value": "12121121"
          }
        ],
    "plugs": [   #可以不用
          "org.zezs.sync.mongo.plugs.phone.PhoneLocationCache" #js处理程序中 定制化手机定位插件,建议看代码  通过 “setPhoneCache” 函数将java中读取映射关系map传入js函数环境变量中
        ],
    "chains": [   # 采集任务通道
      {
        "dest": "152local",  # 同步到目的库名/index
        "destType": "es",    # 同步到目的类型  es/mongo
        "db": "webservicelog_xhpro",   # 同步到目的库名/index
        "table": "webservicelog",       # 同步到目的集合名/type
        "jsscript": "return logData(obj)",  # 如果是js清洗话,可以用这个执行myfunction.js文件中自定义清洗函数
        "javaProesser":"org.zezs.sync.mongo.dataProess.MyLoggerProesser", # 如果不配置java清洗规则,执行js清洗,如果配置java执行java代码中处理规则,这个需要编写java代码
        "mappings": [  #提取字段,用来直接提取数据源中的字段,具体看org.zezs.sync.mongo.config.sync.metadata.Mappings类,支持字段重新命名(如果指定destField),如果写了value标示固定值
          {
            "sourceField": "visitDate" 
          },
          {
            "sourceField": "apiKey"
          },
          {
            "sourceField": "status"
          },
          {
            "sourceField": "is_charge"
          },
          {
            "sourceField": "consumeTime"
          }
        ],
        "rules": [   #老版本规则,完全配置化同步清洗数据   建议直接用jsscript(默认js处理器) +myfunction.js完成,此处空集合配置即可
          {
              "id": "规则编号,唯一即可",
              "where": [            #过滤数据映射条件
                {
                  "name": "apiKey",
                  "value": "121212121"
                }
              ],                    #数据转换规则
              "jsscript": "if(obj.status=='0'){req=JSON.parse(obj.requestParameter)  ; rsp=JSON.parse(obj.returnParam);  if(rsp.code=='0'&&!req.dc&&req.KEY!='QQ4eau58'&&rsp.data.result=='0') return {'n':SYNC_DES(req.name),'p':SYNC_DES(req.phone)}}"
          
          }
        ],
        "use": true  #采集任务是否启用
      }
    ],
    "limit": 1000   #从数据源采集数据,每次1000条数据
  },

# 最终建议 关于数据处理,统一在myfunction.js或者javaProesser中处理,如果有java编程能力建议用javaProesser,这个处理速度快 # 使用说明 使用pom.xml打包,自己定义好环境选择,然后用java -jar sync-mongo-0.0.1-SNAPSHOT.jar 执行同步程序