# tools-docker **Repository Path**: carollia/tools-docker ## Basic Information - **Project Name**: tools-docker - **Description**: 大数据相关工具的镜像和部署文件 - **Primary Language**: YAML - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2022-01-13 - **Last Updated**: 2022-01-14 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README **Table of Contents** *generated with [DocToc](https://github.com/thlorenz/doctoc)* - [数据采集和验证方案](#%E6%95%B0%E6%8D%AE%E9%87%87%E9%9B%86%E5%92%8C%E9%AA%8C%E8%AF%81%E6%96%B9%E6%A1%88) - [HDFS数据采集](#hdfs%E6%95%B0%E6%8D%AE%E9%87%87%E9%9B%86) - [Filebeat](#filebeat) - [Kafka-Eagle](#kafka-eagle) - [ES数据采集](#es%E6%95%B0%E6%8D%AE%E9%87%87%E9%9B%86) - [Logstash](#logstash) - [ES-Kibana](#es-kibana) - [Kibana设置索引生命周期策略](#kibana%E8%AE%BE%E7%BD%AE%E7%B4%A2%E5%BC%95%E7%94%9F%E5%91%BD%E5%91%A8%E6%9C%9F%E7%AD%96%E7%95%A5) - [Elastalert](#elastalert) - [MYSQL数据采集](#mysql%E6%95%B0%E6%8D%AE%E9%87%87%E9%9B%86) ## 数据采集和验证方案 ### HDFS数据采集 Filebeat -> Kafka -> Flink -> Hive 主要是采集数据到数仓, 做运营数据分析 通过Filebeat采集日志到Kafka,通过Kafka-Eagle对每个主题数据数量简单查询, 校验数据正确性 #### Filebeat Filebeat 是使用 Golang 实现的轻量型日志采集器,Filebeat 的可靠性很强,可以保证日志 At least once 的上报,同时也考虑了日志搜集中的各类问题,例如日志断点续读、文件名更改、日志 Truncated 等。 为了使得Filebeat开箱即用, 特别设计了Dockfile, 使得通过docker-compose.yml文件的环境变量便可以实现不同的功能 ```yaml version: "3" services: filebeat: restart: always image: xxx.com/tools/filebeat:v7.9.3_docker container_name: filebeat deploy: mode: replicated replicas: 1 restart_policy: condition: on-failure extra_hosts: # ip域名映射 - "master:IP" - "slaves01:IP" - "slaves02:IP" volumes: - /var/lib/docker/containers:/var/lib/docker/containers - /var/run/docker.sock:/var/run/docker.sock environment: CONTAINERS_IDS: '["*"]' # 修改处, *代表采集所有容器, 容器Id放入list则是多个容器日志 CONTAINERS_PATH: "\\/var\\/lib\\/docker\\/containers" OUTPUT_CONSOLE_ENABLE: "true" # 1.为true打印到控制台, 与kafka不能同时为true OUTPUT_KAFKA_ENABLE: "false" # 2.为true,控制台为false, 传输到kafka KAFKA_HOSTS: "['slaves02:9092']" # 填写kafka地址 KAFKA_VESION: "2.0.0" KAFKA_INVALID_TOPIC: "filebeat-invalid" # 设置无效数据传输的kafka主题, 非json,没有下面规则的message.type KAFKA_TOPIC: "filebeat-%{[message.type]}" # 通过日志的type进行数据分发到不同的主题 KAFKA_REQUIRED_ACKS: 1 # kafka的ack级别 KAFKA_COMPRESSION: "snappy" KAFKA_MAX_MESSAGE_BYTES: 1000000 PROCESSORS_JSON_FILEDS: "['message']" PROCESSORS_JSON_PROCESS_ARRAY: "true" PROCESSORS_JSON_MAX_DEPTH: 3 # Json字符串的解析层数 PROCESSORS_JSON_TARGET: "message" PROCESSORS_JSON_OVERWRITE_KEYS: "true" ``` #### Kafka-Eagle Kafka-Eagle用于验证Kafka数据, 创建主题,删除主题,KQL查询数据等 ![image.png](https://upload-images.jianshu.io/upload_images/16985353-49246fd9e18fb38b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) ```yaml version: '3' services: kafka-eagle: restart: always container_name: kafka-eagle image: xxx.com/tools/kafka-eagle:v2.0.2 # 镜像名称 ports: - "8048:8048" deploy: mode: replicated replicas: 1 restart_policy: condition: on-failure extra_hosts: - "master:IP" - "slaves01:IP" - "slaves02:IP" environment: ZK_CLUSTER_ALIAS: "test" # 此处可以开多个集群,用逗号分割,比如test,local, dev ZK_LOCAL: "" # 上述存在local,则这里需要填local集群的zookeeper地址 ZK_TEST: "master:2181" # 上述存在test,则这里需要填test集群的zookeeper地址 ZK_DEV: "" # 上述存在dev,则这里需要填dev集群的zookeeper地址 MYSQL_URL: "ip:port" # 数据库ip:port MYSQL_DATABASE: "ke" # 数据库名称 MYSQL_USER: "user" # 数据库用户 MYSQL_PWD: "password" # 数据库密码 ``` ### ES数据采集 Filebeat -> Kafka -> LogStash -> ES --> Elastalert (日志告警) 主要是对服务端日志或客户端日志进行采集和预警, 来及时监控服务运行状态, 提高用户体验 #### Logstash Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,与ES集成的方案非常成熟,以下Docker按天生成索引, 结合es-kibana的过期策略定时删除索引 ```yaml version: "3" services: logstash: restart: always image: xxx.com/tools/logstash:v7.9.3_kafka container_name: logstash-kafka deploy: mode: replicated replicas: 1 restart_policy: condition: on-failure extra_hosts: - "master:IP" - "slaves01:IP" - "slaves02:IP" healthcheck: disable: true environment: KAFKA_BROKER: "slaves02:9092" # 采集的kafka地址 KAFKA_GROUP_ID: "logstash-kafka" # kafka消费组ID LOGSTASH_CLIENT: "logstash-kafka" # kafka消费client KAFKA_AUTO_OFFSET_RESERT: "earliest" # kafka消费offset偏移方式 KAFKA_ENABLE_AUTO_COMMIT: "true" KAFKA_TOPICS: "['filebeat-nginx','filebeat-es']" # kafka消费主题list ES_HOST: "['master:9200']" # 输出到es地址 ES_INDEX: "%{[message][app]}-logstash-%{+YYYY.MM.dd}" # 索引名必须小写,按天分区 ES_USERNAME: "elastic" # 填写es的用户名 ES_PASSWORD: "elastic" # 填写es的密码 STDOUT: "true" # 是否打印到控制台 ``` #### ES-Kibana Elasticsearch一款基于Apache Lucene™开源搜索引擎,其核心是迄今为止最先进、性能最好的、功能最全的搜索引擎库Lucene。Elasticsearch使用简单,具有非常强大的全文搜索功能。 Kibana是一个与Elasticsearch协同工作的开源分析和可视化平台,Kibana 可以让你更方便地对 Elasticsearch 中数据进行操作,包括高级的数据分析以及在图表中可视化您的数据。 ![image.png](https://upload-images.jianshu.io/upload_images/16985353-3e7962d199a87e16.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) ```yaml version: "3.2" volumes: elastic_data_7.5.2: services: # sysctl -w vm.max_map_count=262144 es: image: docker.elastic.co/elasticsearch/elasticsearch:7.9.3 container_name: es volumes: - elastic_data_7.5.2:/usr/share/elasticsearch/data ports: - "9200:9200" - "9300:9300" environment: http.cors.allow-headers: "Authorization" node.name: "es" cluster.name: "docker-cluster" network.host: "0.0.0.0" network.publish_host: _eth0_ discovery.type: "single-node" bootstrap.memory_lock: "true" xpack.security.enabled: "true" ES_JAVA_OPTS: "-Xmx1g -Xms1g" ELASTIC_PASSWORD: "elastic" node.max_local_storage_nodes: 20 ulimits: memlock: soft: -1 hard: -1 nofile: soft: 655350 hard: 655350 nproc: 655350 logging: driver: "json-file" options: max-size: "1g" max-file: "1" restart: always kibana: image: docker.elastic.co/kibana/kibana:7.5.2 container_name: kibana ports: - "5601:5601" environment: # 大写环境变量生效,小写无效 SERVER_NAME: "kibana" SERVER_HOST: "0.0.0.0" ELASTICSEARCH_HOSTS: "http://es:9200" KIBANA_INDEX: ".kibana" ELASTICSEARCH_USERNAME: "elastic" ELASTICSEARCH_PASSWORD: "elastic" logging: driver: "json-file" options: max-size: "1g" max-file: "1" restart: always depends_on: - "es" ``` #### Kibana设置索引生命周期策略 1.创建索引模板 ![](https://upload-images.jianshu.io/upload_images/16985353-8df3326649b7cd70.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 2.创建过期删除策略 ![image.png](https://upload-images.jianshu.io/upload_images/16985353-15cba2b416d27917.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) ![image.png](https://upload-images.jianshu.io/upload_images/16985353-1af67059456713ac.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 3.应用过期策略到索引模板中 ![image.png](https://upload-images.jianshu.io/upload_images/16985353-530f7844d2dfcc85.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) #### Elastalert elastalert是yelp使用python开发的elasticsearch告警工具。以下封装了docker版本的,错误日志告警,自定义日志级别告警, 以及未产生日志的告警。 ```yaml version: "3.2" services: elastalert: image: epo.qianz.com/tools/elastalert:dingtalk_v4 container_name: es-alert logging: driver: "json-file" options: max-size: "500m" max-file: "1" restart: always healthcheck: disable: true environment: ELASTALERT_CONFIG_RUN_EVERY_SECONDS: 60 ELASTALERT_CONFIG_BUNFFER_TIME: 15 ELASTALERT_CONFIG_ES_HOST: "xxxxx" # todo: ES IP地址 ELASTALERT_CONFIG_ES_PORT: 9200 ELASTALERT_CONFIG_ES_USERNAME: "elastic" # todo: ES用户名 ELASTALERT_CONFIG_ES_PASSWORD: "elastic" # todo: ES密码 ELASTALERT_CONFIG_WRITEBACK_INDEX: "elastalert_status" ELASTALERT_CONFIG_ALERT_TIME_LIMIT_DAYS: 2 ELASTALERT_CONFIG_SMTP_HOST: "smtp.163.com" ELASTALERT_CONFIG_SMTP_PORT: 25 ELASTALERT_CONFIG_SMTP_SSL: "false" SMTP_USER: "xxx@163.com" # todo: 163邮箱用户 SMTP_PASSWORD: "xxx" # todo: 163邮箱用户授权码 # 错误日志报警, 日志中level有error RULE_NAME: "ES日志异常-DataQ" RULE_INDEX: "logstash-dataq*" RULE_FILTER_LEVEL: "error" # 日志中level有error # 告警日志异常, 日志中level有warn RULE_WARN_NAME: "ES日志异常-Events上报失败" RULE_WARN_INDEX: "logstash-dataq*" RULE_WARN_FILTER_LEVEL: "warn" # 错误和告警日志报警 RULE_TYPE: "frequency" RULE_NUM_EVENTS: 1 RULE_TIMEFRAME_HOURS: 1 RULE_ALERT: "elastalert_modules.dingtalk_alert.DingTalkAlerter" RULE_DINGTALK_WEBHOOK: "https:\\/\\/oapi.dingtalk.com\\/robot\\/send?access_token=xxx" # todo: 钉钉机器人webhook RULE_DINGTALK_MSGTYPE: "text" RULE_FROM_ADDR: "xxx@163.com" # todo: 发送的163邮箱用户 RULE_EMAIL: "xxx@dingtalk.com" # todo: 接收的邮箱用户 # 未产生日志报警 RULE_SPARE_ERROR_NAME: "EventsReport日志异常" RULE_SPARE_ERROR_INDEX: "logstash-event*" RULE_SPARE_ERROR_TEXT_ALERT: "ES日志异常:EventsReport服务10分钟未产生日志" RULE_FILTER_APP: "event" RULE_SPARE_ERROR_TYPE: "flatline" RULE_SPARE_ERROR_NUM_EVENTS: 1 RULE_SPARE_ERROR_TIMEFRAME_MINUTES: 600 RULE_SPARE_ERROR_ALERT: "elastalert_modules.dingtalk_alert.DingTalkAlerter" RULE_SPARE_ERROR_DINGTALK_WEBHOOK: "https:\\/\\/oapi.dingtalk.com\\/robot\\/send?access_token=xxx" # todo: 钉钉机器人webhook RULE_SPARE_ERROR_DINGTALK_MSGTYPE: "text" ``` ### MYSQL数据采集 Binlog -> canal -> Kafka 主要用于数据库日志的采集, 此为额外介绍, 开多个docker可以实现高可用, 此高可用并非集群概念, 而是当一台机器宕机时, 另一台机器会继续采集。每一时刻只有一台机器运作。 ```yaml version: "3" services: canal: restart: always image: xxx.com/tools/canal:1.1.5_kafka container_name: canal deploy: mode: replicated replicas: 1 restart_policy: condition: on-failure extra_hosts: - "master:IP" - "slaves01:IP" - "slaves02:IP" healthcheck: disable: true environment: # zk地址 CANAL_ZKSERVERS: "slaves02:2181" # tcp, kafka, rocketMQ, rabbitMQ CANAL_SERVERMODE: "kafka" # 使用druid处理所有的ddl解析来获取库和表名 CANAL_INSTANCE_FILTER_DRUID_DDL: "true" # 是否忽略ddl语句, 数据定义语言: create、drop、alter CANAL_INSTANCE_FILTER_QUERY_DDL: "true" # 是否忽略dml语句: 数据操纵语句: insert、delete、udpate 和select等(增添改查) CANAL_INSTANCE_FILTER_QUERY_DML: "false" # 是否忽略dcl语句: 数据控制语句, grant、revoke CANAL_INSTANCE_FILTER_QUERY_DCL: "true" # 是否忽略binlog表结构获取失败的异常 CANAL_INSTANCE_FILTER_TABLE_ERROR: "false" # kafka订阅 KAFKA_BOOTSTRAP_SERVERS: "slaves02:9092" KAFKA_ACKS: "all" KAFKA_COMPRESSION_TYPE: "snappy" KAFKA_BATCH_SIZE: 16384 KAFKA_MAX_REQUEST_SIZE: 1048576 KAFKA_BUFFER_MEMORY: 33554432 KAFKA_RETRIES: 1 # mysql instance config CANAL_DESTINATIONS: "example_dev" # mysql主库链接时起始的binlog文件 MYSQL_SLAVEID: 123456 CANAL_INSTANCE_MASTER_JOURNAL_NAME: "" CANAL_INSTANCE_MASTER_POSITION: "" # mysql主库链接时起始的binlog的时间戳 CANAL_INSTANCE_MASTER_TIMESTAMP: "" # 是否启用mysql gtid的订阅模式 CANAL_INSTANCE_GTIDON: "false" CANAL_INSTANCE_MASTER_GTID: "" # msyql 相关配置 CANAL_INSTANCE_DEFAULT_DATABASENAME: "database" # todo:采集的数据库 CANAL_INSTANCE_MASTER_ADDRESS: "IP:PORT" # todo:采集的数据库mysql地址端口 CANAL_INSTANCE_DBUSERNAME: "user" # todo:采集的数据库用户名 CANAL_INSTANCE_DBPASSWORD: "password" # todo:采集的数据库密码 #CANAL_INSTANCE_FILTER_REGEX: "account\\..*,account_log\\..*,money\\..*" CANAL_INSTANCE_FILTER_REGEX: " money\\.charge_prop_config" # todo:过滤表规则 CANAL_INSTANCE_FILTER_BLACK_REGEX: "mysql\\.slave_.*" # todo:黑名单 #推送主题配置 CANAL_MQ_TOPIC: "canal-binlog" # 推送kafka主题 CANAL_MQ_PARTITIONSNUM: 3 # 推送kafka主题分区数据 CANAL_MQ_PARTITIONHASH: ".*\\..*" # 推送kafka主题分发消息的规则 ```