基于CDC技术的ElasticSearch索引同步机制

概述

ElasticSearch作为一个基于Lucene的搜索引擎被广泛应用于各种应用系统,比如电商、新闻类、咨询类网站。在使用ElasticSearch开发应用的过程中,一个非常重要的过程是将数据导入到ElasticSearch索引中建立文档。在一开始系统规模比较小时,我们可以使用logstash来同步索引。logstash的好处是开方量少,只要进行编写简单的索引模板和同步sql,就能快速搭建索引同步程序。但是随着应用数据规模的变大,索引变化变得非常频繁。logstash的缺点也随着暴露,包括(1)不支持删除,只能通过修改字段属性软删除,随着应用使用时间的增长,ElasticSearch中会留存大量的无用数据,拖慢搜索速度。(2)sql分页效率低,sql查询慢。logstash的分页逻辑是先有一个大的子查询,然后再从子查询中分页获取数据,因此效率低下,当数据库数据量大时,一个分页查询就需要几百秒。同步几千万数据可能需要1天时间。因此我们决定放弃使用logstash,而改用使用canal来搭建基于CDC技术的ElasticSearch索引同步机制。

系统架构设计

CDC索引同步模型

如图所示,索引同步系统由几个部分组成,下面分点介绍。

(1)数据库

原始数据数据库

(2)Canal

Canal是阿里云开源的MySql数据库增量数据订阅和消费工具。它的实现原理是将自己伪装为一个MySQL slave,向MySql master发送dump协议;MySQL master收到dump请求,开始推送binary log给slave,canal解析binary log对象。

(3)Canal Client

Canal Client是自己实现的程序,通过从Canal Server中获取经过Canal解析之后的数据库binlog日志,做相应的业务逻辑处理。在本文介绍的基于CDC的索引同步系统中,Canal Client订阅搜索相关的数据库表的binlog日志,如果跟数据搜索相关的数据发生变化时,就向Rabbit发一条消息,表明数据发生变化了,通知同步Worker从MySQL同步数据到ES。

(4)RabbitMQ

消息队列,也可以选用Kafaka等其他消息队列,根据具体业务确定。

(5)索引同步Worker

Worker从消息队列中消费数据,根据消息从MySQL获取相应的数据并同步到ElasticSearch中。

Canal Client实现

Canal Client从Canal Server中获取binlog日志,并根据业务需求进行处理。以下通过一些关键代码介绍Canal Client的实现。

(1)在pom中添加Canal client的依赖。

1
2
3
4
5
xml复制代码      <dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>

(2)初始化Canal连接

CanalConfig包含了Canal的配置信息。CanalConnector为canal-client包中的类,我们通过这个类来连接server,获取binlog,关闭server。该服务基于SpringBoot。因此init会在CanalClientInitializer bean被创建时被调用,preDestory会在服务关闭,CanClientInitializer被销毁时被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码@Component
@Slf4j
public class CanalClientInitializer {


CanalConfig canalConfig;

CanalConnector connector;

CanalDataProcessor canalDataProcessor;


public CanalClientInitializer(@Autowired CanalConfig canalConfig, @Autowired CanalDataProcessor canalDataProcessor) {
this.canalConfig = canalConfig;
this.canalDataProcessor = canalDataProcessor;
}


@PostConstruct
public void init() throws InterruptedException {
connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getIp(), canalConfig.getPort()), canalConfig.getDestination(), "", "");
//建立连接
connector.connect();
//订阅相关的表
connector.subscribe(canalConfig.getSyncTable());
canalDataProcessor.process(connector);
}


@PreDestroy
public void preDestroy() {
log.info("stop the canal client");
canalDataProcessor.stopProcess();
}

}

(3)CanalDataProcessor获取并处理binlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
java复制代码@Component
@Slf4j
public class CanalDataProcessor {


boolean isRunning;

RabbitTemplate rabbitTemplate;

TableChangeProcessor tableChangeProcessor;

public CanalDataProcessor(@Autowired RabbitTemplate rabbitTemplate, @Autowired TableChangeProcessor processor) {
this.rabbitTemplate = rabbitTemplate;
this.tableChangeProcessor = processor;
}

@Async
public void process(CanalConnector connector) throws InterruptedException {
isRunning = true;
while (isRunning) {
try {
//获取消息
Message message = connector.getWithoutAck(100, 10L, TimeUnit.SECONDS);
//业务处理逻辑
processMessage(message);
//消息被成功执行,向Canal Server发送ack消息通知server该message已经被处理完成
connector.ack(message.getId());
} catch (Exception e) {
log.error("wtf", e);
//当消息没被成功处理完成时进行回滚,下次能够重新获取该Message
connector.rollback();
Thread.sleep(1000);
}
}
connector.disconnect();
}


public void stopProcess() {
isRunning = false;
}


private void processMessage(Message message) {
for(Entry entry : message.getEntries()) {
try {
tableChangeProcessor.process(entry);
} catch (Exception e) {
log.error("wtf", e);
continue;
}
}
}
}

(4)TableChangeProcessor

TableChangeProcessor中为具体的业务逻辑,处理Message,获取跟搜索相关的数据变化,发送相应的消息到消息队列中。

注意点

(1)忽略搜索无关的数据字段变化,避免不必要的索引更新,降低服务器压力。如Products表中有一个product_weight表示商品重量发生了变化,但其实商品重量跟搜索无关,那就不要关心这个变化。

(2)对于搜索中不会出现的数据,不要写入到ES中,比如电商商品中的下架商品,另外,如果商品被下架,则要进行监听通知索引同步Worker从es中删除索引文档。这样能够降低ES中总的索引文档数量,提升搜索效率。

(3)要考虑Rabbit挂掉或者队列写满,消息无法写入的情况;首先应该在Rabbit发送消息时添加重试,其次应该在重试几次还是失败的情况下抛出异常,canal消息流回滚,下次还是能够获取到这个数据变化的Canal消息,避免数据变动的丢失。

(4)注意目前Canal只支持单Client。如果要实现高可用,则需要依赖于ZooKeeper,一个Client作为工作Client,其余Client作为冷备,当工作Client挂掉时,冷备Client监听到ZooKeeper数据变化,抢占锁成为工作Client。

Canal Worker实现

索引同步Worker从消息队列中获取Canal Client发送的跟搜索相关的数据库变化消息。举个例子,比如商品表中跟搜索相关的字段发生了变化,Canal Client会发送以下一条数据:

1
2
3
4
5
json复制代码{
"change_id": "694212527059369984",
"change_type": 1, //商品发生变化
"change_time": "1600741397"
}

在Worker中监听队列消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码@Component
@Slf4j
public class ProductChangeQueueListener {

@Autowired
@Qualifier("snake")
ObjectMapper om;

@Autowired
ChangeEventHandlerFactory changeEventHandlerFactory;

@RabbitListener(queues = RabbitConfig.PRODUCT_QUEUE_NAME, containerFactory = "customRabbitListenerContainerFactory")
public void onChange(Message message) {

ChangeEvent event = parse(message);
if(event == null) {
return;
}

changeEventHandlerFactory.handle(event);
}

private ChangeEvent parse(Message message) {
ChangeEvent event = null;
try {
event = om.readValue(new String(message.getBody()), ChangeEvent.class);
} catch (Exception e) {
log.error("同步失败,解析失败", e);
}
return event;
}


}

ChangeEventHandlerFactory为事件处理器的工厂类。以下为一个事件处理器的实现。它监听changeType为CHANGE_TYPE_OUT_PRODUCT的事件,从数据库中获取到变动的数据,构建ES的IndexRequest,并将Request存入到RequestBulkBuffer中,等待批量同步到ES中。有些同学可能会有疑问,为何不直接从Canal中获取数据,主要原因是Canal中只包含了单表数据,但是索引文档可能包含了多表的数据,因此还需要从MySQL获取数据。如果索引文档中只包含单表数据,可以考虑在ChangeEvent中包含修改之后的数据,索引同步Woker就不用再从MySql中再获取一遍数据,提升Worker工作效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码@Component
@Slf4j
public class OutProductEventHandler implements ChangeEventHandler {

@Autowired
ProductDao productDao;

@Autowired
RequestBulkBuffer buffer;


@Autowired
OutProductChangeRequestBuilder builder;

@Override
@Retryable
public boolean handle(ChangeEvent changeEvent) {
if (!match(changeEvent)) {
return false;
}

Tuple dataTuple = productDao.getProductWithStore(changeEvent.getChangeId());
if (dataTuple == null) {
return true;
}
Product product = dataTuple.get(QProduct.product);
Store store = dataTuple.get(QStore.store);

IndexRequest request = null;
try {
request = builder.convertToUpdateQuery(getTimestampNow(), product, store);
} catch (Exception e) {
log.error("wtf", e);
}
if (request == null) {
return true;
}
buffer.add(request);
return true;

}

@Override
public boolean match(ChangeEvent changeEvent) {
return ChangeEvent.CHANGE_TYPE_OUT_PRODUCT == changeEvent.getChangeType();
}
}

在上面的OutProductEventHandler类中,我们并不直接在该类中使用RestHighLevelClient将文档更新到ES索引,而是将IndexRequest暂存到RequestBulkBuffer中。RestBulkBuffer使用CircularFifoBuffer作为存储数据结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Component
public class RequestBulkBuffer {

CircularFifoBuffer buffer;

public RequestBulkBuffer(CircularFifoBuffer buffer) {
this.buffer = buffer;
}


public void add(DocWriteRequest<?> request) {
buffer.add(request);
}

}

CircularFifoBuffer是一个经过改造的环形队列实现。允许多线程写,在我们这个应用场景中只支持也只需支持单线程读->处理->移除处理完的数据。当环形队列缓存满时,借助于semaphore,写入线程将会被阻塞,在后面的Worker如何防止数据丢失中,我们来阐述为什么要这么做。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
java复制代码/**
* 允许多线程写
* 只允许单线程->读->处理->移除
*/
public class CircularFifoBuffer {

private Logger logger = LoggerFactory.getLogger(CircularFifoBuffer.class.getName());


private transient Object[] elements;

private transient int start = 0;
private transient int end = 0;

private transient boolean full = false;

private final int maxElements;

private ReentrantLock addLock;

private Semaphore semaphore;

public CircularFifoBuffer(int size) {
if (size <= 0) {
throw new IllegalArgumentException("The size must be greater than 0");
}
elements = new Object[size];
maxElements = elements.length;
addLock = new ReentrantLock();
semaphore = new Semaphore(size);
}


public int size() {
int size = 0;

if (end < start) {
size = maxElements - start + end;
} else if (end == start) {
size = (full ? maxElements : 0);
} else {
size = end - start;
}

return size;
}

public boolean isEmpty() {
return size() == 0;
}

public boolean isFull() {
return size() == maxElements;
}

public int maxSize() {
return maxElements;
}

public void clear() {
full = false;
start = 0;
end = 0;
Arrays.fill(elements, null);
}

public boolean add(Object element) {
if (null == element) {
throw new NullPointerException("Attempted to add null object to buffer");
}

addLock.lock();
try {
semaphore.acquire();
} catch (Exception e) {
logger.error("RingBuffer", "线程退出,添加失败");
return false;
}

elements[end++] = element;


if (end >= maxElements) {
end = 0;
}

if (end == start) {
full = true;
}

addLock.unlock();

return true;

}

public Object get() {
if (isEmpty()) {
return null;
}

return elements[start];
}


public Object remove() {
if (isEmpty()) {
return null;
}

Object element = elements[start];
if(null != element) {
elements[start++] = null;
if (start >= maxElements) {
start = 0;
}
full = false;
semaphore.release();
}
return element;
}


/**
* @param size the max size of elements will return
*/
public Object[] get(int size) {
int queueSize = size();
if (queueSize == 0) { //empty
return new Object[0];
}
int realFetchSize = queueSize >= size ? size : queueSize;
if (end > start) {
return Arrays.copyOfRange(elements, start, start + realFetchSize);
} else {
if (maxElements - start >= realFetchSize) {
return Arrays.copyOfRange(elements, start, start + realFetchSize);
} else {
return ArrayUtils.addAll(
Arrays.copyOfRange(elements, start, maxElements),
Arrays.copyOfRange(elements, 0, realFetchSize - (maxElements - start))
);
}
}
}


public Object[] getAll() {
return get(size());
}



public Object[] remove(int size) {
if(isEmpty()) {
return new Object[0];
}
int queueSize = size();
int realFetchSize = queueSize >= size ? size : queueSize;
Object [] retArr = new Object[realFetchSize];
for(int i=0;i<realFetchSize;i++) {
retArr[i] = remove();
}

return retArr;
}

}

下面这个类为缓存的消费者,它循环从buffer中获取一定数据的数据,并使用RestHighLevelClient将数据批量同步到ES。在Worker启动时,会创建一个线程调用startConsume,在服务关闭时该线程结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
java复制代码@Slf4j
public class RequestBulkConsumer {
private static final int DEFAULT_BULK_SIZE = 2000;

private CircularFifoBuffer buffer;
private EsBulkRequestService service;

private boolean isRunning = false;
private int bulkSize = DEFAULT_BULK_SIZE;

public RequestBulkConsumer(CircularFifoBuffer buffer, RestHighLevelClient client) {
this.buffer = buffer;
this.service = new EsBulkRequestService(client);
}

public void setBulkSize(int size) {
this.bulkSize = size;
}

public int getBulkSize() {
return bulkSize;
}

public boolean isRunning() {
return isRunning;
}


public void startConsume() {
if(isRunning) {
return;
}
isRunning = true;
while(true) {
if(!isRunning) {
break;
}

Object [] items = buffer.get(bulkSize);
if(items.length == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
} else {
List<DocWriteRequest<?>> requests = convert(items);
try {
BulkResponse response = service.request(requests);
processResponse(response);
buffer.remove(items.length);
if (items.length < bulkSize) {
Thread.sleep(3000);
}
} catch (InterruptedException e) {
break;
} catch (IOException e) {
log.error("wtf", e);
} catch (Exception e) {
log.error("wtf", e);
buffer.remove(items.length);
}
}
}
}


private List<DocWriteRequest<?>> convert(Object [] items) {
return Stream.of(items)
.map(i -> {
if(i instanceof DocWriteRequest) {
return (DocWriteRequest<?>) i;
} else {
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

public void stop() {
isRunning = false;
}


private void processResponse(BulkResponse bulkResponse) {
BulkItemResponse [] itemResponseArr = bulkResponse.getItems();
for(BulkItemResponse resp : itemResponseArr) {
DocWriteResponse docWriteResponse = resp.getResponse();
if(docWriteResponse instanceof IndexResponse) {
IndexResponse indexResponse = (IndexResponse) docWriteResponse;
if(indexResponse.getResult() != Result.CREATED && indexResponse.getResult() != Result.UPDATED) {
if(indexResponse.status() == RestStatus.CONFLICT) {
continue;
} else {
log.error("索引更新失败: {}, {}", indexResponse.getId(), resp.getFailureMessage());
}
}
} else if(docWriteResponse instanceof DeleteResponse) {
DeleteResponse deleteResponse = (DeleteResponse) docWriteResponse;
if(deleteResponse.getResult() != Result.DELETED) {
log.error("索引删除失败: {}, {}", deleteResponse.getId(), resp.getFailureMessage());
}
}
}
}
}

以下为Worker的主要几个类的代码。在索引同步系统中,高可用并不是最重要的,因为我们的搜索本身是一个准实时系统,只需要保证最终一致性就可以了,我们主要需要避免的是数据变更的丢失。以下说明在Worker中是如何避免数据丢失的。

避免数据丢失

(1)如果Rabbit挂掉,没关系,Canal Client那边在Rabbit挂掉期间无法消费binlog,会等待Rabbit重启之后再处理数据变化。Worker只要能做到Rabbit重启之后重连就行。

(2)如果MySQL挂掉,则Worker无法从数据库中获取数据,则消息处理失败,消息会堆积在Rabbit中。等MySQL重新上线之后,消息重新开始处理,数据也不会丢失。

(3)如果ES挂掉,则批量处理线程消费buffer中的数据时会失败,buffer会被生产者填满,由于CircularFifoBuffer在被填满时使用了信号量阻塞生产者线程,消息又会被堆积在Rabbit中,等待ES重新上线之后,消息重新开始处理,数据也不会丢失。

(4)如果Rabbit队列被写满,emmm,设置好在内存被占满时将消息写入硬盘然后搞一个大一点的硬盘吧,Rabbit默认应该就是这么做的。然后做好预警,当消息达到一定量时抓紧处理,一般来说可能性不是很大。

(5)版本冲突,如果商品表中某一条数据如商品A在同一秒内变化了两次,消息队列中有连续两条消息,又由于这两条消息可能在两个线程中被消费,由于网络,计算机性能等原因,先变的数据后被写入ES中,导致ES中数据和MySql数据不一致。因此我们在更新索引时使用ES的外部版本号。使用从MySQL中取数据时的时间戳作为版本号,只有当时间戳比当前版本号大或相等时才能变更文档,否则ES会报版本冲突错误。

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码    private IndexRequest convertToUpdateQuery(Long timestamp, OutStoreProduct outStoreProduct) throws JsonProcessingException {
IndexRequest indexRequest = new IndexRequest(indexName, "doc", outStoreProduct.getId());
if(StringUtils.isEmpty(outStoreProduct.getTooEbaoProductId())) {
log.error("商品 {} 的ebaoProductId为空,无法同步", outStoreProduct.getId());
return null;
}
indexRequest.source(om.writeValueAsString(outStoreProduct), XContentType.JSON)
.versionType(VersionType.EXTERNAL_GTE)
.version(timestamp)
.routing(outStoreProduct.getTooEbaoProductId());
return indexRequest;
}

关于全量同步

以上只是实现了增量同步,在索引初始化时,我们需要做全量同步操作,将数据从数据库初始化到ES索引中。我们可以在Worker中写一个接口,该接口实现逻辑分批将数据同步任务发到消息队列中,其它worker收到消息后完成对应任务。比如我们可以发布每一个门店的数据同步任务,worker每收到一个消息,同步一个门店的数据。

总结

综上,本系统是一个近实时的能够保证ES和MySQL数据一致性的高效索引同步系统。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%