ElasticSearch调研总结

ElasticSearch调研总结

ElasticSearch简介

​ Elasticsearch 是一个分布式可扩展的实时搜索和分析引擎,一个建立在全文搜索引擎 Apache Lucene(TM) 基础上的搜索引擎。ElasticSearch可以做的事儿:

  • 分布式的实时文件存储,并为每一个字段建立索引,默认存储在本地磁盘
  • 实时分析的分布式搜索引擎
  • 集群规模可以动态扩展,具备分布式的基本要求:动态扩展、容错、负载均衡

基本概念

  • Cluster (集群)

  • Node (节点)

    每个节点上运行一个ElasticSearch实例,节点启动后会自动广播查询,配置信息中cluster.name值相同的节点会构造成一个集群

  • Shard (分片)

    类似HDFS中的block,会将大容量的数据切分成多个Shard,不同的Shard可以分散存储在集群上不同的节点

  • Replia (副本)

    一个Replia是一个Shard的精准复制,每个Shard可含有0个或多个Replia。用于容错、并发查询

  • 面向文档

    ​ ElasticSearch是面向文档存储的,基本的存储单位就是文档,一条记录就是一个文档,文档格式统一为JSON格式,例如:

    1
    2
    3
    4
    5
    6
    7
    8
    {
    "name" : "John",
    "sex" : "Male",
    "age" : 25,
    "birthDate": "1990/05/01",
    "about" : "I love to go rock climbing",
    "interests": [ "sports", "music" ]
    }
  • index (索引)、type (类型)、id

    ES中通过index/type/id 来唯一标示一个文档。与Mysql概念对比:

    | Mysql | ES |
    | ————- | ——————- |
    | database(数据库) | index(索引) |
    | table(表) | type(类型) |
    | row(行) | 文档 |
    | column(列) | field |
    | schema | mapping |
    | index | everything is index |
    | sql | Query DSL |

  • Query DSL

    Mysql 中采用SQL语句进行查询,ES中统一采用Query DSL查询,详情可参考:Elasticsearch——Query DSLselect * from bank; 示例:

    1
    2
    3
    4
    curl -XPOST 'localhost:9200/bank/_search?pretty' -d '
    {
    "query": { "match_all": {} }
    }

集群搭建

  • 版本选择与下载、解压,下载地址 )

    • Note: 调研版本选择的是2.4.1,而非最新版本5.*(版本号2.4之后就直接是5.* ),原因:
      • 5. 要求JAVA8,5. 官方插件也要求JAVA8
      • 很多常用的第三方插件(bigdesk、head、sql、jdbc-sql)还未支持5.*
      • 2.4 与 5.* 性能实测差异不大,详见ElasticSearch 5.0 测评以及使用体验
  • 配置所有节点,将$ES_HOME/elasticsearch.yml中cluster.name 修改成相同值

  • $ES_HOME/bin/elasticsearch

    所有节点上执行执行,就启动了ES。会自动广播组成集群。

    JVM默认参数:

    1
    -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError -XX:+DisableExplicitGC -Dfile.encoding=UTF-8 -Djna.nosys=true
  • 集群访问地址:host:9200

    http://200.200.200.64:9200/

  • 插件安装

    • Note:ES安装包只提供了最基本的功能:本地分布式存储数据、创建索引、提供查询服务等。其余一些额外功能则需要第三方插件支持,如:集群状态监控、SQL转DSL Query、集群性能监控、Mysql数据导入ES等。详见:elasticsearch以及其常用插件安装

    • 访问(host:9200/_plugin/pluginname),例如:

      http://200.200.200.64:9200/_plugin/head/

性能测试

  • 将Mysql单表中的数据导入ES

    • Mysql单表数据量:1亿行,16GB;导入ES后,占用内存:32GB

      • Note:第一次导入完成后,发现数据丢失,即从Mysql导入一亿行数据后,ES中count(*)仅有7600万行,后续会有原因分析
    • 导入方法:

      • 安装第三方插件:elasticsearch-jdbc,按照git上的操作指引进行数据导入,git地址:elasticsearch-jdbc,导入脚本:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        #!/bin/sh
        export JAVA_HOME=/home/knightyang/jdk1.8.0_131
        export PATH=${JAVA_HOME}/bin/:$PATH;
        JDBC_IMPORTER_HOME=/home/knightyang/elasticsearch-jdbc-2.3.4.0
        bin=$JDBC_IMPORTER_HOME/bin
        lib=$JDBC_IMPORTER_HOME/lib
        echo '{
        "type" : "jdbc",
        "jdbc": {
        "elasticsearch.autodiscover":true,
        "url":"jdbc:mysql://host:3306/test",
        "user":"test",
        "password":"test",
        "sql":"select *,id as _id from user_info_yqj ",
        "elasticsearch" : {
        "host" : "host",
        "port" : 9300
        },
        "index" : "yqj_info_index_more",
        "type" : "yqj_info_type_more"
        }
        }'| java \
        -cp "${lib}/*" \
        -Dlog4j.configurationFile=${bin}/log4j2.xml \
        org.xbib.tools.Runner \
        org.xbib.tools.JDBCImporter
  • 安装bigdesk插件,监控集群资源状态(Memory、CPU、GC、Thread Pool等)

    用于查看执行SQL时的资源消耗

  • 安装sql插件,可方便的将sql转换成DSL Query

    SQL: select count(*) from yqj_info_index_more

  • 安装head插件,可以方便的提交DSL Query

  • count(*) 统计一亿行数据耗时对比(清理查询缓存后多次执行,取平均值)

    | Mysql | ES |
    | —– | —– |
    | 51秒 | 900毫秒 |

    Note: Mysql 执行时间波动较大,从31秒 ~ 80秒,预计于机器负载有关。ES执行时间基本无波动。

Mysql导入ES丢数据原因分析

  • 导入过程:同性能测试中的导入方法

  • 原因分析:

    • ES中是否含有重复 _id?

      • 没有。ES中采用Mysql表的单一主键值做_id,不会出现重复
    • Mysql数据中是否有row含有特殊字段?(比如string太长、含有特殊编码字符等)

      • 没有。二分查找,找到ES中丢失的一些具体数据,数据本身并没有什么特殊
    • 分析Mysql数据导入ES的第三方插件源码: elasticsearch-jdbc

      • 日志中含有出错信息

        1
        2
        3
        4
        [15:28:15,983][ERROR][org.xbib.elasticsearch.helper.client.BulkTransportClient][elasticsearch[importer][listener][T#4]] bulk [45] failed with 8020 failed items, failure message = failure in bulk execution:
        [0]: index [yqj_info_index_more], type [yqj_info_type_more], id [11598123], message [RemoteTransportException[[Leader][ip:9300][indices:data/write/bulk[s][p]]]; nested: EsRejectedExecutionException[rejected execu
        tion of org.elasticsearch.transport.TransportService$4@7691eb0a on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@c2f9664[Running, pool size = 8, active threads
        = 8, queued tasks = 50, completed tasks = 21008]]];]
      • 出错原因:导入程序向ES发送数据的速率太快了,超过ES的处理能力。

      • 解决办法:

        1. 降低导入程序的发送速率 or
        2. 增大ES接收数据的线程池数量、缓存队列的size

elasticsearch-jdbc 原理分析

  • git地址:elasticsearch-jdbc

  • 处理时序图:

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    title: elasticseach-jdbc时序图
    Runner.main()-> JDBCImport.execute(): 1. 开始向ES导入数据
    Note right of JDBCImport.execute(): 启动一个ExecutorServer\n默认线程数为1
    JDBCImport.execute()-> Context.execute(): 2. JDBCImport中构造上下文context\n包含Source(mysql)、Sink(ES)
    Context.execute() -> Context.beforeFetch(): 3. 创建Source、Sink,设置相关参数
    Context.beforeFetch() -> Sink.beforeFetch(): 4. 创建BulkTransportClient,处理fetch前的准备工作\n如:创建ES索引
    Context.beforeFetch() -> Source.beforeFetch(): 5. 这里直接返回
    Context.execute() --> Source.fetch(): 6. 开始fetch数据了
    Source.fetch() -> Source.executeQuery(): 7. 通过JDBC执行SQL
    Source.executeQuery() -> Source.fetch(): 8. 返回SQL结果的迭代器\n
    Source.fetch() -> Source.processRow(): 9. 遍历处理每一行数据\n给数据加上ES的元数据\n如index、type、_id、version等
    Source.processRow() -> Sink.index(): 10. 将数据丢给Sink,构造成IndexRequest,缓存\n当缓存的IndexRequest数量达到一定阈值(10000)\n或经过一定时间后,将这批Request构造成BulkRequest
    Sink.index() -> BulkRequestHandler.execute(): 11. 将BulkRequest经底层Netty发送给ES Server\n若ES处理Failed,log the message
  • 关键步骤

    • JDBCImport中构造上下文context:包含Source(mysql)、Sink(ES)

    • Sink初始化,创建BulkTransportClient,处理fetch前的准备工作

      • 如:创建ES索引、设定缓存IndexRequest的阈值,同时处理的BulkRequest最大数量

        Note: ES处理BulkRequest的线程池默认最大线程数为8,Request缓存队列上线值为50。

        若缓存队列满了,还有Request过来就直接抛异常拒绝访问了。

    • Source中调用JDBC执行SQL,获取SQL结果的迭代器,遍历处理每一行

      • 若SQL结果字段名有与ES Control key同名时,就用该字段值替代ES meta。
      • 例如:SQL结果含有字段_id,就将字段值设定为ES存储的文档id
    • Sink将每一行数据封装成IndexRequest,并缓存、处理

      • 当缓存的IndexRequest数量达到阈值(10000),将缓存的所有IndexRequest封装成BulkRequest,netty发送给ES Server
      • 当定时线程到期后(默认设定为30秒),就将当前缓存的所有IndexReques封装成BulkRequest,发送给ES Server

ES 处理BulkRequest的流程

  • ES启动时,NettyTransport会创建Netty的ServerBootstrap,默认监听端口9300-9400

  • 接收message时,调用ChannelPipeline的ChannelUpStreamHandler依次处理,关键处理类:MessageChannelHandler

  • ES启动时,会注册很多action -> RequestHandler,当收到BulkRequest,调用HandledTransportAction进行处理(action为“data/write/bulk”)

  • 一系列调用后,转到TransportBulkAction.doExecute,根据index分组,若index不存在,会自动创建

  • TransportBulkAction.executeBulk() 将所有的IndexRequest取出,根据shard分组,构造BulkShardRequest

    • 属于同一shard的,都将存储在一起;给IndexRequest分配shard,似乎只是简单轮询
  • 遍历BulkShardRequest,调用TransportReplicationAction.doExecute(),Reroute阶段开始(就是将Request发送到Shard所在node)

    1
    2
    3
    4
    @Override
    protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
    new ReroutePhase((ReplicationTask) task, request, listener).run();
    }
  • 从Bulk线程池取出线程处理这个Request,后续存储、建索引过程还未分析

    • 默认线程数:8 。Min(处理器核数, 32)
    • 会有个BlockQueue对应这个线程池,若没空闲线程,就将request丢队列,若队列没位置就拒绝这个request

总结

  • ElasticSearch自身安装简单,但是好些功能(如集群监控、SQL转 DSL Query等)依赖第三方插件
    • 但是第三方插件稳定性有待商榷。如:Mysql导入ES的插件可能丢失数据(插入失败后未重试)
  • ElasticSearch能满足实时查询的需求,查询耗时也满足需求,硬件资源消耗也可接受
    • 一亿级的数据量,count(*)耗时1秒左右
    • 1GB的内存就能顺畅跑起来,执行查询时JVM内存也未见明显波动

参考资料