概述
ElasticSearch作为一个基于Lucene的搜索引擎被广泛应用于各种应用系统,比如电商、新闻类、咨询类网站。在使用ElasticSearch开发应用的过程中,一个非常重要的过程是将数据导入到ElasticSearch索引中建立文档。在一开始系统规模比较小时,我们可以使用logstash来同步索引。logstash的好处是开方量少,只要进行编写简单的索引模板和同步sql,就能快速搭建索引同步程序。但是随着应用数据规模的变大,索引变化变得非常频繁。logstash的缺点也随着暴露,包括(1)不支持删除,只能通过修改字段属性软删除,随着应用使用时间的增长,ElasticSearch中会留存大量的无用数据,拖慢搜索速度。(2)sql分页效率低,sql查询慢。logstash的分页逻辑是先有一个大的子查询,然后再从子查询中分页获取数据,因此效率低下,当数据库数据量大时,一个分页查询就需要几百秒。同步几千万数据可能需要1天时间。因此我们决定放弃使用logstash,而改用使用canal来搭建基于CDC技术的ElasticSearch索引同步机制。
系统架构设计
如图所示,索引同步系统由几个部分组成,下面分点介绍。
(1)数据库
原始数据数据库
(2)Canal
Canal是阿里云开源的MySql数据库增量数据订阅和消费工具。它的实现原理是将自己伪装为一个MySQL slave,向MySql master发送dump协议;MySQL master收到dump请求,开始推送binary log给slave,canal解析binary log对象。
(3)Canal Client
Canal Client是自己实现的程序,通过从Canal Server中获取经过Canal解析之后的数据库binlog日志,做相应的业务逻辑处理。在本文介绍的基于CDC的索引同步系统中,Canal Client订阅搜索相关的数据库表的binlog日志,如果跟数据搜索相关的数据发生变化时,就向Rabbit发一条消息,表明数据发生变化了,通知同步Worker从MySQL同步数据到ES。
(4)RabbitMQ
消息队列,也可以选用Kafaka等其他消息队列,根据具体业务确定。
(5)索引同步Worker
Worker从消息队列中消费数据,根据消息从MySQL获取相应的数据并同步到ElasticSearch中。
Canal Client实现
Canal Client从Canal Server中获取binlog日志,并根据业务需求进行处理。以下通过一些关键代码介绍Canal Client的实现。
(1)在pom中添加Canal client的依赖。
1 | xml复制代码 <dependency> |
(2)初始化Canal连接
CanalConfig包含了Canal的配置信息。CanalConnector为canal-client包中的类,我们通过这个类来连接server,获取binlog,关闭server。该服务基于SpringBoot。因此init会在CanalClientInitializer bean被创建时被调用,preDestory会在服务关闭,CanClientInitializer被销毁时被调用。
1 | java复制代码@Component |
(3)CanalDataProcessor获取并处理binlog
1 | java复制代码@Component |
(4)TableChangeProcessor
TableChangeProcessor中为具体的业务逻辑,处理Message,获取跟搜索相关的数据变化,发送相应的消息到消息队列中。
注意点
(1)忽略搜索无关的数据字段变化,避免不必要的索引更新,降低服务器压力。如Products表中有一个product_weight表示商品重量发生了变化,但其实商品重量跟搜索无关,那就不要关心这个变化。
(2)对于搜索中不会出现的数据,不要写入到ES中,比如电商商品中的下架商品,另外,如果商品被下架,则要进行监听通知索引同步Worker从es中删除索引文档。这样能够降低ES中总的索引文档数量,提升搜索效率。
(3)要考虑Rabbit挂掉或者队列写满,消息无法写入的情况;首先应该在Rabbit发送消息时添加重试,其次应该在重试几次还是失败的情况下抛出异常,canal消息流回滚,下次还是能够获取到这个数据变化的Canal消息,避免数据变动的丢失。
(4)注意目前Canal只支持单Client。如果要实现高可用,则需要依赖于ZooKeeper,一个Client作为工作Client,其余Client作为冷备,当工作Client挂掉时,冷备Client监听到ZooKeeper数据变化,抢占锁成为工作Client。
Canal Worker实现
索引同步Worker从消息队列中获取Canal Client发送的跟搜索相关的数据库变化消息。举个例子,比如商品表中跟搜索相关的字段发生了变化,Canal Client会发送以下一条数据:
1 | json复制代码{ |
在Worker中监听队列消息:
1 | java复制代码@Component |
ChangeEventHandlerFactory为事件处理器的工厂类。以下为一个事件处理器的实现。它监听changeType为CHANGE_TYPE_OUT_PRODUCT的事件,从数据库中获取到变动的数据,构建ES的IndexRequest,并将Request存入到RequestBulkBuffer中,等待批量同步到ES中。有些同学可能会有疑问,为何不直接从Canal中获取数据,主要原因是Canal中只包含了单表数据,但是索引文档可能包含了多表的数据,因此还需要从MySQL获取数据。如果索引文档中只包含单表数据,可以考虑在ChangeEvent中包含修改之后的数据,索引同步Woker就不用再从MySql中再获取一遍数据,提升Worker工作效率。
1 | java复制代码@Component |
在上面的OutProductEventHandler类中,我们并不直接在该类中使用RestHighLevelClient将文档更新到ES索引,而是将IndexRequest暂存到RequestBulkBuffer中。RestBulkBuffer使用CircularFifoBuffer作为存储数据结构。
1 | java复制代码@Component |
CircularFifoBuffer是一个经过改造的环形队列实现。允许多线程写,在我们这个应用场景中只支持也只需支持单线程读->处理->移除处理完的数据。当环形队列缓存满时,借助于semaphore,写入线程将会被阻塞,在后面的Worker如何防止数据丢失中,我们来阐述为什么要这么做。
1 | java复制代码/** |
下面这个类为缓存的消费者,它循环从buffer中获取一定数据的数据,并使用RestHighLevelClient将数据批量同步到ES。在Worker启动时,会创建一个线程调用startConsume,在服务关闭时该线程结束。
1 | java复制代码@Slf4j |
以下为Worker的主要几个类的代码。在索引同步系统中,高可用并不是最重要的,因为我们的搜索本身是一个准实时系统,只需要保证最终一致性就可以了,我们主要需要避免的是数据变更的丢失。以下说明在Worker中是如何避免数据丢失的。
避免数据丢失
(1)如果Rabbit挂掉,没关系,Canal Client那边在Rabbit挂掉期间无法消费binlog,会等待Rabbit重启之后再处理数据变化。Worker只要能做到Rabbit重启之后重连就行。
(2)如果MySQL挂掉,则Worker无法从数据库中获取数据,则消息处理失败,消息会堆积在Rabbit中。等MySQL重新上线之后,消息重新开始处理,数据也不会丢失。
(3)如果ES挂掉,则批量处理线程消费buffer中的数据时会失败,buffer会被生产者填满,由于CircularFifoBuffer在被填满时使用了信号量阻塞生产者线程,消息又会被堆积在Rabbit中,等待ES重新上线之后,消息重新开始处理,数据也不会丢失。
(4)如果Rabbit队列被写满,emmm,设置好在内存被占满时将消息写入硬盘然后搞一个大一点的硬盘吧,Rabbit默认应该就是这么做的。然后做好预警,当消息达到一定量时抓紧处理,一般来说可能性不是很大。
(5)版本冲突,如果商品表中某一条数据如商品A在同一秒内变化了两次,消息队列中有连续两条消息,又由于这两条消息可能在两个线程中被消费,由于网络,计算机性能等原因,先变的数据后被写入ES中,导致ES中数据和MySql数据不一致。因此我们在更新索引时使用ES的外部版本号。使用从MySQL中取数据时的时间戳作为版本号,只有当时间戳比当前版本号大或相等时才能变更文档,否则ES会报版本冲突错误。
1 | java复制代码 private IndexRequest convertToUpdateQuery(Long timestamp, OutStoreProduct outStoreProduct) throws JsonProcessingException { |
关于全量同步
以上只是实现了增量同步,在索引初始化时,我们需要做全量同步操作,将数据从数据库初始化到ES索引中。我们可以在Worker中写一个接口,该接口实现逻辑分批将数据同步任务发到消息队列中,其它worker收到消息后完成对应任务。比如我们可以发布每一个门店的数据同步任务,worker每收到一个消息,同步一个门店的数据。
总结
综上,本系统是一个近实时的能够保证ES和MySQL数据一致性的高效索引同步系统。
本文转载自: 掘金