聊聊BinaryLogFileReader

本文主要研究一下BinaryLogFileReader

BinaryLogFileReader

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogFileReader.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
复制代码public class BinaryLogFileReader implements Closeable {

public static final byte[] MAGIC_HEADER = new byte[]{(byte) 0xfe, (byte) 0x62, (byte) 0x69, (byte) 0x6e};

private final ByteArrayInputStream inputStream;
private final EventDeserializer eventDeserializer;

public BinaryLogFileReader(File file) throws IOException {
this(file, new EventDeserializer());
}

public BinaryLogFileReader(File file, EventDeserializer eventDeserializer) throws IOException {
this(file != null ? new BufferedInputStream(new FileInputStream(file)) : null, eventDeserializer);
}

public BinaryLogFileReader(InputStream inputStream) throws IOException {
this(inputStream, new EventDeserializer());
}

public BinaryLogFileReader(InputStream inputStream, EventDeserializer eventDeserializer) throws IOException {
if (inputStream == null) {
throw new IllegalArgumentException("Input stream cannot be NULL");
}
if (eventDeserializer == null) {
throw new IllegalArgumentException("Event deserializer cannot be NULL");
}
this.inputStream = new ByteArrayInputStream(inputStream);
try {
byte[] magicHeader = this.inputStream.read(MAGIC_HEADER.length);
if (!Arrays.equals(magicHeader, MAGIC_HEADER)) {
throw new IOException("Not a valid binary log");
}
} catch (IOException e) {
try {
this.inputStream.close();
} catch (IOException ex) {
// ignore
}
throw e;
}
this.eventDeserializer = eventDeserializer;
}

/**
* @return deserialized event or null in case of end-of-stream
*/
public Event readEvent() throws IOException {
return eventDeserializer.nextEvent(inputStream);
}

@Override
public void close() throws IOException {
inputStream.close();
}

}
  • BinaryLogFileReader实现了Closeable接口,其close方法会关闭inputStream;它定义了inputStream、eventDeserializer两个属性,其构造器接收binlog的文件或者inputStream,也允许指定eventDeserializer;其readEvent方法通过eventDeserializer.nextEvent(inputStream)来返回Event,读完的时候返回null

EventDeserializer

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
复制代码public class EventDeserializer {

private final EventHeaderDeserializer eventHeaderDeserializer;
private final EventDataDeserializer defaultEventDataDeserializer;
private final Map<EventType, EventDataDeserializer> eventDataDeserializers;

private EnumSet<CompatibilityMode> compatibilitySet = EnumSet.noneOf(CompatibilityMode.class);
private int checksumLength;

private final Map<Long, TableMapEventData> tableMapEventByTableId;

private EventDataDeserializer tableMapEventDataDeserializer;
private EventDataDeserializer formatDescEventDataDeserializer;

//......

public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
if (inputStream.peek() == -1) {
return null;
}
EventHeader eventHeader = eventHeaderDeserializer.deserialize(inputStream);
EventData eventData;
switch (eventHeader.getEventType()) {
case FORMAT_DESCRIPTION:
eventData = deserializeFormatDescriptionEventData(inputStream, eventHeader);
break;
case TABLE_MAP:
eventData = deserializeTableMapEventData(inputStream, eventHeader);
break;
default:
EventDataDeserializer eventDataDeserializer = getEventDataDeserializer(eventHeader.getEventType());
eventData = deserializeEventData(inputStream, eventHeader, eventDataDeserializer);
}
return new Event(eventHeader, eventData);
}

//......

}
  • EventDeserializer定义了eventHeaderDeserializer、defaultEventDataDeserializer等属性;其nextEvent方法先通过eventHeaderDeserializer.deserialize(inputStream)解析eventHeader,之后根据eventHeader.getEventType()来解析不同的eventData,最后将header及data组装为Event返回

EventHeader

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/event/EventHeader.java

1
2
3
4
5
6
7
8
复制代码public interface EventHeader extends Serializable {

long getTimestamp();
EventType getEventType();
long getServerId();
long getHeaderLength();
long getDataLength();
}
  • EventHeader定义了getTimestamp、getEventType、getServerId、getHeaderLength、getDataLength方法

EventData

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/event/EventData.java

1
2
复制代码public interface EventData extends Serializable {
}
  • EventData接口继承了Serializable接口

小结

BinaryLogFileReader实现了Closeable接口,其close方法会关闭inputStream;它定义了inputStream、eventDeserializer两个属性,其构造器接收binlog的文件或者inputStream,也允许指定eventDeserializer;其readEvent方法通过eventDeserializer.nextEvent(inputStream)来返回Event,读完的时候返回null

doc

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%