用Elasticsearch处理SQL查询

用Elasticsearch处理SQL查询

简介

最近碰到个需求:业务方mysql单机查询耗时无法满足需求、部分sql查询不出来,希望能提供个实时查询框架解决这个问题,最好是分布式的,好扩展嘛。(PS:最开始产品经理聊时,提到mysql 库表太大、单表count()都不能出结果。目测单表数据量估计快10亿了,结果后来和业务后台开发核对细节时,发现单表数据量最多也就百万级,最慢sql也就耗时几秒,哪儿有撒count()不出结果的,也是醉了,还是得多和搞技术的交流啊。真实需求是百万级数据量需要查询耗时毫秒级)

调研了各种实时查询框架:Elasticsearch、druid、phoenix、solr、impala,最终选定了Elasticsearch,版本5.4.0。原因简单来说:ES更简单、更快,当然缺点是对join的支持也不好、没有分区的概念。

本文主要聊聊如何用ES处理SQL业务,碰到的问题、解决版本。不会涉及ES的基本概念。

主要聊聊:

  • ES文档结构设计
  • Mysql数据如何导入ES
  • 写ES Query DSL中碰到的问题

文档结构的设计

在文档结构设计上尝试了2种方案:

方案1:将ES当关系数据库使,数据库表结构与ES文档结构一一对应。 参考系列文章:把Elasticsearch 当数据库使

简单说,就是对于数据库中的每一个表,都在ES中创建一个type(类似数据库里的表),ES type里的feild 名称/类型 与数据库表的字段名/类型 保持一致。

建表简单、mysql数据导入ES也简单,但是将SQL转换成ES Query DSL时 碰到一系列问题:

  • 问题一:ES Join支持不好

    • ES处理join的几种方式(select * from tbl1 join tbl2 on tbl1.id = tbl2.id;)

      • 应用层联接
        • ES转换成
          • 执行 select id, * from tbl1; 结果存数组变量tbl1_id_array
          • 执行 select * from tbl2 where id in [tbl1_id_array]
          • 结果再拼接
        • 适用于tbl1的数据量很小
      • 表字段冗余
        • ES 在建表时,将table1的信息冗余存入tabl2
      • 嵌套对象
        • 比如SQL里 一个表存储所有文章,一个表存储所有评论,评论表中存储文章id,通过join能查找一篇文章的所有评论
        • ES 可以只创建一个表,每篇文章里存储所有的评论,来规避join
      • 父-子关系文档
        • ES 需要在设计表的时候 就指定好2表之间的关系
        • 可以处理2张表都很大的情况,但是需要在创建表的时候就建立join关系,不大实用
    • 其他资料

    • Join总结

      • ES不太适合关系数据库的join。ES JOin只会返回单表的数据,Mysql Join 会返回左右2表的数据。另外join又分为left join、right join、inner join等等,ES得用户处理这些join逻辑,不大方便。
    • 问题二:Union ALL、子查询不支持

方案2:将Mysql的库表结构转换成ES容易处理的文档结构

简单说,就是通过新的文档结构查询时,不再需要join、union all、子查询了。

不过这个设计过程就比较痛苦了,需要比较深入的了解业务库表结构,各种摸爬滚打后,将库表结构转换成2种ES文档结构:

  • 由单个mysql表构成

    • 普通文档

      1. ES字段与mysql字段的名字、类型保持一致
      2. 若mysql表含有主键,就将主键作为ES的_id

        例如: mysql table 主键为(col1, col2),则可将col1_col2作为ES的_id

      3. 若mysql表里没有主键,则视情况由ES自动生成_id 或者用别的方式

    • 父文档
      • mysql里必须含有主键,并将主键值作为ES文档的_id值
    • 子文档
      • mysql表里必须要有父文档_id对应的字段

        例如 父文档为school,_id为”school_id”对应的值,那子文档class里必须存有字段”school_id”

  • 由多个mysql表构成

    • 确定唯一的main table。其余表数据会与该表有关联。
    • 确定其余副表是作为ES的nested common field 还是 nested array field。即主表的一行记录在副本中是最多有一行对应数据、还是多行对应数据。
  • 例子:ES device表的设计过程

    • 确定由哪些表构成。device表包含mysql的七张表:

      • tb_device_mac (device_id, product_id, status)
      • tb_device_mac_history (device_id, mac_address, status)
      • tb_device_auth (devce_id, user_id, auth_time)
      • tb_device_auth_history (devce_id, user_id, auth_time)
      • tb_device_bind (device_id, bind_time)
      • tb_device_bind_history (device_id, bind_time, unbind_time)
      • tb_product (product_id, product_name)
    • 确定主表: tb_device_mac

    • 确定nested common field 对应的副表: tb_product
    • 表融合: 相似结构的表合并成一个字段: 如 tb_device_mac 和 tb_device_mac_history 在ES里用一个字段存储,即tb_device_mac 会冗余存储
    • device ES 表结构

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      "properties": {
      "device_id" : {}, // 来自主表tb_device_mac的字段
      "mac_address": {},
      "status": {},
      "macs": { // 包含来自tb_device_mac 和 tb_device_mac_history的字段
      "type": "nested",
      "properties": {
      "device_id" : {},
      "product_id": {},
      "status": {},
      "is_history": {"type": "boolean"} // 新增字段
      }
      },
      "auths": {} // 存储来自tb_device_auth 和 tb_dvice_auth_history的数据
      "binds": {} // 存储来自tb_device_bind 和 tb_device_bind_history的数据
      "product": {} //存储来自tb_product的数据
      }
    • device 数据

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      {
      "device_id": 1,
      "mac_address": "ABCDEFG",
      "status": 0,
      "macs": [ // tb_device_mac 与 (tb_device_mac union tb_device_mac_history) 是一对多关系,因此用数组存储
      { // 来自tb_device_mac的数据会冗余存储一份在这里,便于处理tb_device_mac union all tb_device_mac_history 的数据
      "device_id": 1,
      "product_id": 12,
      "status": 0,
      "is_history": false
      },
      {
      "device_id": 1,
      "product_id": 12,
      "status": 1,
      "is_history": true
      }
      ],
      "binds": [{...}],
      "auths": [],
      "product": { // tb_product与tb_device_mac是一对一关系,因此这里不用数组存储
      "product_id": 12,
      "product_nam": "hello"
      }
      }
  • 文档设计总结

    • 虽然方案2相比于方案1,在设计难度上更高,但是能有效的解决ES对join、uion all、子查询支持不好的问题。

      Tips: 方案2将之前SQL join、子查询、union all设计到的表融合到ES的同一个文档中,能用简单Query DSL语句实现之前的复杂SQL

数据导入ES

应业务方需求,现网数据的写入、更新方式不变。也就是依旧还是会把数据更新到Mysql,不会直接写ES,而仅仅是查询ES。所以,这边的有个机制,把Mysql上的更新操作实时的同步到ES。

  • 这里采用了阿里开源的canal监听mysql binlog
  • 将mysql insert、update、delete涉及的数据发Kafka,
  • 写了个ES程序从Kafka读数据,
  • 封装成ES的IndexRequest/UpdateRequest/DeleteRequest,批量处理(构造BulkRequest,设置批量阈值为1000)。

ES同步程序里,会将mysql的部分insert请求,转换成ES的UpdateRequest,例如前文提到的文档结构device,mysql insert bind 会转换成 ES update device.binds 。这个过程中竟然还导致ES服务器进程出现OutOfMemory,导致服务器进程异常退出。

OutOfMemory原因分析:

  • ES服务器采用默认JVM配置,JVM内存为2G,-XX:+HeapDumpOnOutOfMemoryError会自动dump内存,生成hprof文件。
  • 分析发现,85%以上的内存被BulkShardRequest.iterms占用,iterms里竟然大部分都是IndexRequest,而不是UpdateRequest!!!

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    Class Name | Shallow Heap | Retained Heap
    ---------------------------------------------------------------------------------------------------------------------------------
    org.elasticsearch.action.bulk.BulkShardRequest @ 0xbf1945d0 | 64 | 1,654,253,216
    |- items org.elasticsearch.action.bulk.BulkItemRequest[1000] @ 0xbf194610 | 4,016 | 1,654,253,152
    | |- [736] org.elasticsearch.action.bulk.BulkItemRequest @ 0x8b9cb678 | 32 | 1,968,936
    | | |- request org.elasticsearch.action.index.IndexRequest @ 0x8b7eab88 | 120 | 1,968,792
    | | | |- source org.elasticsearch.common.bytes.PagedBytesReference @ 0x8b9cb5d8 | 32 | 1,968,600
    | | | |- opType org.elasticsearch.action.DocWriteRequest$OpType @ 0xafba9758 | 32 | 88
    | | | |- index java.lang.String @ 0xbf0bccd0 bigdata-realtime-v1 | 24 | 80
    | | | |- type java.lang.String @ 0xbf0bcd60 device | 24 | 56
    | | | |- id java.lang.String @ 0xbf0bcd98 1006685 | 24 | 56
    | | | '- Total: 13 entries | |
    | | |- primaryResponse org.elasticsearch.action.bulk.BulkItemResponse @ 0x8b9cb698 | 32 | 112
    | | |- <class> class org.elasticsearch.action.bulk.BulkItemRequest @ 0xb1e7ded0 | 8 | 8
    | | '- Total: 3 entries | |
    | |- [737] org.elasticsearch.action.bulk.BulkItemRequest @ 0x8bbac2e0 | 32 | 1,968,936
    | |- [738] org.elasticsearch.action.bulk.BulkItemRequest @ 0x8bd8ce88 | 32 | 1,968,936
    ---------------------------------------------------------------------------------------------------------------------------------
  • 查看ES源码,UpdateRequest会转换成IndexRequest,而由于同一个设备(device)文档,内嵌的数据量高达3w多(device.binds为含有3w多元素的数组),导致单个文档太大(3MB),不巧一个bulk里1000个UpdateRequest都是涉及这种文档的,最终导致OOM

  • 解决办法:
    • 调大ES JVM内存
    • 让业务查看数据是否引入脏数据,device.binds元素个数3000以内为正常

SQL转换成Query DSL

这块改写比较简单,目前只碰到一个问题:

  • ES 对于Count(distinct col) 采用基数估算,结果不精确,而业务方需要精确值
    • 解决办法:改写成select col from tbl group by col,客户端再count

总结

杀鸡用了牛刀,可惜没用上ES核心功能(搜索引擎)。后续会考虑用ELK实现日志实时搜索框架。