开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

毕业设计-在线考试系统

发表于 2021-05-05

前言

​ 本期跟大家分享的是在线考试系统。受疫情影响,更多的提倡无接触办公,比如互联网的面试通常都是在线面试,先在线答题,答题过了之后再进行线上视频面试;比如学校里的考试,也可以通过线上考试去解决。因此,在线考试系统的用途会越来越广。本项目除源码外,还写了比较多的文档以及完善的视频教程

image-20210310132226782

​ (想要源码的同学请私信我~)

工程架构

应用分层

image-20201226111957265

上面的分层架构摘自阿里巴巴java开发手册,我对此做了一些调整,实际分层结构如下:

技术栈

前端:vue + element

后端:jdk1.8 + springboot + redis + mysql

系统设计

运行效果

系统登录

image-20210310132651929

dashboard

image-20210310132735629

试卷库

image-20210505192502619

进入考试

有单选题、多选题和判断题三大类型,学生在规定时间内完成考试,然后点击“交卷”即可完成考试

image-20210310133000313

我的考试

在我的考试模块可以看到所有自己已经参加过得考试,点击“查看考试详情”,可以看到自己的分数以及错题分析

image-20210310133522357

错题分析

image-20210505193538851

问题管理

​ 问题管理模块,可以新建考题,维护考题的选型以及问题解析

image-20210310133736856

考试管理

在考试管理模块,可以新建一张试卷,试卷中选择前面维护的问题。设置试卷的分数以及试卷的考试试卷。

image-20210310134006172

这里做一个大致的功能介绍,详细的功能和具体代码实现,会在视频中讲解~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 8 Predicate 接口

发表于 2021-05-05

简介

布尔值函数

观察其源码

image.png

要素察觉

  • 返回值是boolean
  • 有抽象方法test
  • 还有与或非(and、negate、or)的默认方法方法,暗示可以两个Predicate对象组合使用
  • 如果理解BiFunction,理解Predicate易如反掌

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码public class PredicateTest {
public static void main(String[] args) {
PredicateTest test = new PredicateTest();
List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
test.printByCondition(list,l -> l%2==0);//print 偶数
System.out.println("\n"+"-----------------");
test.printByConditionNegate(list,l -> l%2==0); //print 非偶数
System.out.println("\n"+"-----------------");
test.printByConditionAnd(list,l -> l%2==0,l -> l>4); //print 偶数且大于4的数
System.out.println("\n"+"-----------------");
test.printByConditionOr(list,l -> l%2==0,l -> l<4); //print 偶数或小于4 的数
}
public void printByCondition(List<Integer> list, Predicate<Integer> predicate){
//l表示list中的某一个元素 //某个元素满足某种条件就输出
list.forEach( l -> { if (predicate.test(l)) System.out.print(l+" "); });
}
public void printByConditionNegate(List<Integer> list, Predicate<Integer> predicate){
//l表示list中的某一个元素 //某元素不满足某种条件输出
list.forEach( l -> { if (predicate.negate().test(l)) System.out.print(l+" "); });
}
public void printByConditionAnd(List<Integer> list, Predicate<Integer> predicate1,Predicate<Integer> predicate2){
//某元素同时满足两个条件
list.forEach( l -> { if (predicate1.and(predicate2).test(l)) System.out.print(l+" "); });
}
public void printByConditionOr(List<Integer> list, Predicate<Integer> predicate1,Predicate<Integer> predicate2){
//某元素满足某一个条件
list.forEach( l -> { if (predicate1.or(predicate2).test(l)) System.out.print(l+" "); });
}
}

解释

image.png

image.png

image.png

image.png

  • 一定要时刻记住Lambda表达式含义,即实现接口中的一个抽象方法,最终还是要使用InterFaceClass.method
  • 参数和返回值一个都少不了

小结

Lambda表达式作用

  • 传递行为,而不仅仅是值

  • 提升抽象层次

    • 1
      2
      3
      4
      java复制代码public void printByCondition(List<Integer> list, Predicate<Integer> predicate){
      //l表示list中的某一个元素 //某个元素满足某种条件就输出
      list.forEach( l -> { if (predicate.test(l)) System.out.print(l+" "); });
      }
    • 单看这个方法只知道满足某个条件就输出,但具体是什么条件并不知道,所以说提升了抽象层次

  • API重用性更好

  • 更加灵活

    • 同一个方法,可以随意的用lambda表达式替换和现实,所以重用性和灵活性更高

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

盘点 MQ Kafka 一览

发表于 2021-05-05

总文档 :文章目录

Github : github.com/black-ant

一 . 前言

之前对RabbitMQ 进行了简单的分析 , 这一篇来过一下 Kafka 的相关操作.

二 . 基础使用

kafka 是一个很常用的高性能消息队列 , 先看一下基础的使用

Maven 核心依赖

1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

配置信息

1
2
3
4
5
6
7
8
9
10
11
12
properties复制代码spring.kafka.bootstrap-servers=127.0.0.1:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id=ant
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 配置消费者消息的key和value的编解码方式-consumer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.listener.missing-topics-fatal=false

生产消息

1
2
3
4
5
6
java复制代码    @Resource
private KafkaTemplate<String, String> kafkaTemplate;

public void produce() {
kafkaTemplate.send("start", "one", "are you ok?" + "----" + i);
}

消费消息

1
2
3
4
java复制代码@KafkaListener(id = "one", topics = "start", clientIdPrefix = "myClientId")
public void listener0(ConsumerRecord<?, ?> record) {
logger.info("------> this is in listerner 0:{}<-------", record.value());
}

总结

  • KafkaTemplate 发布消息
  • KafkaListener 监听消息

整体来说 , 还是和之前的 Rabbit 使用一致 , 具体的内部使用 , 后续源码中再详细看

三 . 基础知识点

基础成员参考消息队列文档.

四 . 工具源码一览

我们围绕 KafkaTemplate 和KafkaListener 2个类进行分析.

4.1 KafkaTemplate

发送的起点

可以看到 , 其中是通过 Future 获取返回结果

1
2
3
4
5
6
7
8
9
java复制代码protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
final Producer<K, V> producer = getTheProducer();
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
producer.send(producerRecord, buildCallback(producerRecord, producer, future));
if (this.autoFlush) {
flush();
}
return future;
}

消息的创建

主要逻辑还是通过 org.apache.kafka.clients.producer.Producer 对象来完成发送 , 这是一个 Kafka 原生包 , 属于kafka-client .

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
java复制代码// 第一步 : 消息的构建
C0- KafkaProducer
M_01- send(ProducerRecord<K, V> record, Callback callback)
- 首先调用 ProducerInterceptors 构建一个 ProducerRecord -> PS:M_01_01
?- 注意 , 这里主要是通过拦截器添加额外的功能 , 会内部进行一个 For 循环
|- For 许纳湖List<ProducerInterceptor<K, V>>
- 调用对应的 interceptor.onSend(interceptRecord) , 替换之前的 ProducerInterceptors
- 如果集合为空 , 则直接返回正确的
- 调用 doSend 主流程 -> M_02
M_02- doSend
1- 先判断 Sender 对象是否存在且运行 , 否则抛出异常 -> PS:M_02_02
2- 调用 waitOnMetadata , 判断集群元数据可用
3- 构建 Cluster 对象
4- 构建序列化 serializedKey , serializedValue
5- 生成 partition -> PS:M_02_03
6- 生成最终发送对象 TopicPartition
7- 构建 Header 和 设置 readOnly 属性
8- 确定消息size
9- 构建 callback 拦截器 -> PS:M_02_05
10- 发送消息 , 获得一个 RecordAccumulator.RecordAppendResult 用于异步获取结果
?- 其中包含一个 future 对象
?- RecordAccumulator append 发起发送


// M_02 代码
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// 首先,确保主题的元数据可用
ClusterAndWaitTime clusterAndWaitTime;
try {
// 内部判断逻辑 :
// 构建元数据 , 发起 metadata.fetch() ,判断超时时间
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException....;
throw e;
}
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw ....;
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw ....;
}
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);

setReadOnly(record.headers());
Header[] headers = record.headers().toArray();

int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
// 生产者回调将确保调用'回调'和拦截回调
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
// 事务管理
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);

// 发送消息且返回一个 RecordAppendResult 对象
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (Exception e) {
// ... 省略异常处理 , 主要有以下操作
// ApiException / InterruptedException / BufferExhaustedException / KafkaException / Exception
// this.errors.record();
// this.interceptors.onSendError(record, tp, e);
}
}


// Step 2 : 消息的发送 -- 见后文

PS:M_01_01 ProducerRecord 对象的作用

作用: 每一个 ProducerRecord 都是一个 消息 , 该对象中包含 topic ,partition , headers 用于映射发送的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public class ProducerRecord<K, V> {

// Topic 类型
private final String topic;

// 如果指定了一个有效的分区号,该分区将在发送记录时使用
// 如果没有分区 , 但指定了一个键,则使用该键的哈希值来选择分区
// 如果键和分区都没有 , 则分区将以循环方式分配
private final Integer partition;
private final Headers headers;

// 监听的 Id
private final K key;
// 发送的消息体
private final V value;

// 如果用户没有提供时间戳,生产者将记录当前时间。
// Kafka最终使用的时间戳取决于为 Topic 配置的时间戳类型。
private final Long timestamp;
//.................
}

image.png

PS:M_02_02 Sender 对象

作用: 核心发送对象 , 用于消息发送 , 用于集群处理

所属包: org.apache.kafka.clients.producer.internals

1
java复制代码TODO

PS:M_02_03 生成 partition 核心逻辑

1
2
3
4
5
6
7
java复制代码    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

PS:M_02_03 InterceptorCallback 作用

消息的发送

整体来说消息的发送存在3条线 :

  • 线路一 : 项目启动时 , KafkaMessageListenerContainer run 开始循环
    • 调用 pollSelectionKeys 完成 channel write 流程
  • 线路二 : 设置 send
    • Sender # runOnce sendProducerData(currentTimeMs) 中设置 一个 ClientRequest
  • 线路三 : 获取 Send 发送
    • Sender # runOnce client.poll(pollTimeout, currentTimeMs) 中 发起 poll 执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码
// PS : 消息的产生和发送是异步的 , 消息的发送主要依赖于 Sender

public class Sender implements Runnable{.........}

// Step 1 : Thread run 方法一览
C2- Sender
M2_01- runOnce()
- client.poll(pollTimeout, currentTimeMs) 发送消息
M2_02- sendProduceRequest : 核心发送方法
- 构建了一个 RequestCompletionHandler . 其中有个 onComplete 用于后续回调
- 构建一个 ClientRequest
- 调用 ClientRequest.send 发送消息
?-NetworkClient.doSend -> PS:M2_02_01


// PS:M2_02_01 底层一览
public void send(Send send) {
String connectionId = send.destination();
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
if (closingChannels.containsKey(connectionId)) {
// 确保通知通过'断开',离开通道在状态中关闭被触发
this.failedSends.add(connectionId);
} else {
try {
// 此处使用 KafkaChannel 进行处理
channel.setSend(send);
} catch (Exception e) {
// 更新状态以保持一致性,通道关闭后将被丢弃
channel.state(ChannelState.FAILED_SEND);
// 确保在下一次轮询中处理' failedsent '时通过' disconnected '通知
this.failedSends.add(connectionId);
close(channel, CloseMode.DISCARD_NO_NOTIFY);
if (!(e instanceof CancelledKeyException)) {
throw e;
}
}
}
}

// 隐藏方法 Selector , 在上文中 M2_02 中设置了 send , 此处进行处理
C- Selector
M- pollSelectionKeys
- send = channel.write()

消息的回调操作

回调是基于 Sender # handleProduceResponse 发起 , 该方法在 sendProduceRequest 方法中设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码
// Start : 请求的返回处理
C- Sender # handleProduceResponse
// 核心代码一览
ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
TopicPartition tp = entry.getKey();
ProduceResponse.PartitionResponse partResp = entry.getValue();
ProducerBatch batch = batches.get(tp);
completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
}
this.sensors.recordLatency(response.destination(), response.requestLatencyMs());

// End : template 回调
// 在 KafkaTemplate.doSend 中设置
C-KafkaTemplate
M- doSend 设置
- producer.send(producerRecord, buildCallback(producerRecord, producer, future));
M- buildCallback
-

// PS : 请求到 Callback 的流转
C- Sender # handleProduceResponse
C- Sender # completeBatch
C- ProduceBatch # done
C- ProduceBatch # completeFutureAndFireCallbacks
C- InterceptorCallback # onCompletion
// 此处就会调用 End : template 回调

Transaction 的管理

TransactionSynchronizationManager

4.2 KafkaListener

kafka Listener 的核心是 KafkaListenerAnnotationBeanPostProcessor , 初始化的方式是基于 BeanPostProcessor 类的postProcessAfterInitialization 来完成

Kafka 的消费过程有以下几个路线 :

  • container 的注册
  • container 的循环监听
  • 消息的消费

KafkaMQ.png

container 注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
java复制代码C3- KafkaListenerAnnotationBeanPostProcessor
M3_01- postProcessAfterInitialization
?- 这里来自于Bean 加载的 applyBeanPostProcessorsAfterInitialization 方法 , 处理Bean 的前置扩展
- 获取 Class 级别的 KafkaListener
- 通过 MethodIntrospector 反射获取各方法的 KafkaListener
- 注意 , 这里是一个 set 集合 , 意味着他允许一个方法上面编注多个 @KafkaListener -> PS:M3_01_01
FOR- 循环处理 KafkaListener
- processKafkaListener 循环处理
M3_02- processKafkaListener
- 判断是否为代理方法
- 创建一个 MethodKafkaListenerEndpoint 节点 , 将当前方法设置到 MethodKafkaListenerEndpoint 节点中
- 为节点设置相关的属性
?- Bean / MessageHandlerMethodFactory / Id / GroupId / TopicPartitions
?- Topics / TopicPattern / ClientIdPrefix
- 调用 KafkaListenerEndpointRegistrar 注册当前节点
M3_03- Method checkProxy(Method methodArg, Object bean)
- 获取当前 method
- 返回由AOP代理代理的接口
- 从代理接口中获取对应的 method
M3_04- processListener
- 继续构建 MethodKafkaListenerEndpoint
?- Bean / MessageHandlerMethodFactory / Id / GroupId / TopicPartitions / Topics /TopicPattern / ClientIdPrefix
?- 根据条件处理 concurrency / autoStartup / Group /
- 通过 BeanFactory 构建 KafkaListenerContainerFactory
- 为 MethodKafkaListenerEndpoint 设置属性
?- beanFactory / ErrorHandler
- 调用 registrar.registerEndpoint 注册当前 endpoint
?- 该对象会在 afterSingletonsInstantiated 中被消费
M3_05- afterSingletonsInstantiated
- 为 registrar 注册各种属性
?- BeanFactory / EndpointRegistry / ContainerFactoryBeanName
- 调用 registerAllEndpoints 处理 registrar -> M4_06
M3_07- resolveContainerFactory
- 返回一个 KafkaListenerContainerFactory


// M3_01 核心代码
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
// 获取 Class 级别的 KafkaListener
Class<?> targetClass = AopUtils.getTargetClass(bean);
Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<Method> multiMethods = new ArrayList<>();
// 获取方法级别注解
Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
if (hasClassLevelListeners) {
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
(ReflectionUtils.MethodFilter) method ->
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
multiMethods.addAll(methodsWithHandler);
}
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(bean.getClass());
}
else {
// Non-empty set of methods
for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (KafkaListener listener : entry.getValue()) {
// 核心处理逻辑
processKafkaListener(listener, method, bean, beanName);
}
}
}
if (hasClassLevelListeners) {
processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
}
}
return bean;
}


// M3_03 核心伪代码 :
private Method checkProxy(Method methodArg, Object bean) {
Method method = methodArg;
if (AopUtils.isJdkDynamicProxy(bean)) {
method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();
for (Class<? iface : proxiedInterfaces) {
method = iface.getMethod(method.getName(), method.getParameterTypes());
break;
}
}
return method;
}

// 手动注入 Listener
// 无意中在代码中发现了一个 KafkaListenerConfigurer 接口 , 该接口可以用于手动注入 Listener
I- KafkaListenerConfigurer
M- configureKafkaListeners(KafkaListenerEndpointRegistrar registrar)
- 构建了一个 MessageListenerContainer


// M4_01 的触发流程 : afterPropertiesSet , 再往上为 DefaultListableBeanFactory # afterSingletonsInstantiated

C4- KafkaListenerEndpointRegistrar
F4_01- List<KafkaListenerEndpointDescriptor> endpointDescriptors
F4_02- Map<String, MessageListenerContainer> listenerContainers
M4_01- registerEndpoint
- 构建一个 KafkaListenerEndpointDescriptor
- 如果startImmediately 初始化就启动 , synchronized 上锁注册到容器中
- 否则先添加到 List<KafkaListenerEndpointDescriptor> 集合中 -> M4_02
M4_02- registerAllEndpoints
- synchronized 上锁注册到容器中
- 注册容器 -> M4_04
M4_03- resolveContainerFactory : 返回一个 KafkaListenerContainerFactory
M4_04- registerListenerContainer
- 构建一个 MessageListenerContainer , 并且加入 Map<String, MessageListenerContainer>
?- createListenerContainer 时会注册 Method
- 如果包含 Group , 则加入List<MessageListenerContainer> 集合
?- 该集合会在 start(继承于Lifecycle)方法中调用 -> M4_05
M4_05- startIfNecessary
- 调用 MessageListenerContainer start 方法
M4_06- createListenerContainer
-
M4_07- start
FOR- getListenerContainers 获取所有的 MessageListenerContainer , 依次调用 startIfNecessary
?- M4_05


C05- MessageListenerContainer
M5_01- start()
-

C06- AbstractMessageListenerContainer
M6_01- doStart()
- 最终调用 KafkaMessageListenerContainer
M6_02- run()
- 这里通过一个 While 方法进行循环处理 ->
M6_03- pollAndInvoke()
- invokeListener(records) 反射处理

C07- KafkaMessageListenerContainer
M7_01- doStart()
- 准备 ContainerProperties
- 校验 AckMode 模式
- 构建 GenericMessageListener
- 构建 ListenerConsumer
- 修改容器状态
?- 这里和后面的循环监听形成前后关联
- 这里还用了一个 CountDownLatch 来等待执行
- 开始线程的执行 , 这里会执行 listenConsumer 的 run 方法 -> PS:M7_01_02
M7_02- doInvokeOnMessage()
- 调用对应 MessageListener 的 onMessage , 最终调用 M8_1
M7_03- onMessage()
-

// M7_01 代码
protected void doStart() {
if (isRunning()) {
return;
}
if (this.clientIdSuffix == null) { // stand-alone container
checkTopics();
}
// 准备 ContainerProperties
ContainerProperties containerProperties = getContainerProperties();
checkAckMode(containerProperties);

Object messageListener = containerProperties.getMessageListener();
if (containerProperties.getConsumerTaskExecutor() == null) {
SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
ListenerType listenerType = determineListenerType(listener);
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
// 修改容器状态
setRunning(true);
this.startLatch = new CountDownLatch(1);
// 此处开始现场的执行
this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
try {
if (!this.startLatch.await(containerProperties.getConsumerStartTimout().toMillis(), TimeUnit.MILLISECONDS)) {
publishConsumerFailedToStart();
}
}catch (@SuppressWarnings(UNUSED) InterruptedException e) {
Thread.currentThread().interrupt();
}
}


C8- ConcurrentMessageListenerContainer
M8_01- doStart()
- 校验 topic
- 启动所有的 KafkaMessageListenerContainer



C10- MethodKafkaListenerEndpoint
M10_01- createMessageListener
- messageListener.setHandlerMethod 注册 Method

PS:M3_01_01 多KafkaListener 注解

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码kafkaTemplate.send("start", "one", "are you ok one?" + "----");
kafkaTemplate.send("topic1", "two", "are you ok two?" + "----");


@KafkaListener(id = "one", topics = "start", clientIdPrefix = "myClientId")
@KafkaListener(id = "two", topics = "start", clientIdPrefix = "myClientId")
public void listener0(ConsumerRecord<?, ?> record) {
logger.info("------> this is in listerner 0:{}<-------", record.value());
}

// 结果
2021-05-04 22:04:35.094 INFO 20012 --- [ two-0-C-1] c.g.k.demo.service.KafkaConsumerService : ------> this is in listerner 0:are you ok two?----<-------
2021-05-04 22:04:35.098 INFO 20012 --- [ one-0-C-1] c.g.k.demo.service.KafkaConsumerService : ------> this is in listerner 0:are you ok one?----<-------

image.png

PS:M7_01_02 执行线程

image.png

container 的循环监听

主要逻辑在 KafkaMessageListenerContainer 内部类 ListenerConsumer 中

  • C- ConsumerListener # run
  • C- KafkaMessageListenerContainer # pollAndInvoke
  • C- KafkaMessageListenerContainer # invokeOnMessage
  • C- KafkaMessageListenerContainer # doInvokeOnMessage
    • 该类中调用对应的 MessagingMessageListenerAdapter 进行最终执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码
// Step 1 : 起点
C- KafkaMessageListenerContainer.ListenerConsumer
public void run() {
publishConsumerStartingEvent();
this.consumerThread = Thread.currentThread();
if (this.consumerSeekAwareListener != null) {
this.consumerSeekAwareListener.registerSeekCallback(this);
}
KafkaUtils.setConsumerGroupId(this.consumerGroupId);
this.count = 0;
this.last = System.currentTimeMillis();
initAssignedPartitions();
publishConsumerStartedEvent();
while (isRunning()) {
try {
// 执行 invoke 逻辑
pollAndInvoke();
}catch (@SuppressWarnings(UNUSED) WakeupException e) {
// ... 省略异常处理
}
}
wrapUp();
}

// Step 2 : 循环中 poll 对象
C- ListenerConsumer
M- pollAndInvoke() : 核心方法 , poll 获取并且映射到相关方法
- ConsumerRecords<K, V> records = doPoll()
|- this.consumer.poll(this.pollTimeout)
- invokeListener(records)
M- doPoll


// Step 3 :在 doInvokeWithRecords 中 , 会对消息进行迭代处理
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
doInvokeRecordListener(record, null, iterator)
}


// Step 4 : 在 doInvokeOnMessage 中 , 调用对应的 ListenerAdapter 完成消息处理
// PS : 该 在 KafkaMessageListenerContainer 中通过构造器生成
C- KafkaMessageListenerContainer.ListenerConsumer

// 通过四种监听类型 , 选择处理的 listener
// ACKNOWLEDGING_CONSUMER_AWARE,CONSUMER_AWARE,ACKNOWLEDGING,SIMPLE

最终执行处理

最终会从 MessagingMessageListenerAdapter 中获取 执行的 Handler ,该 Handler 在 KafkaListenerEndpointRegistrar 是注入

1
2
3
4
5
6
7
8
9
java复制代码 // 最终执行类

C08- RecordMessagingMessageListenerAdapter
M8_1- onMessage
- 获取对应的 Method . 反射调用
?- 该 HandlerAdapter 在 MethodKafkaListenerEndpoint # createMessageListener 是处理

C10- RecordMessagingMessageListenerAdapter
M10_1- onMessage

五 . Kafka 要点深入

TODO : 后续文章一个个节点分析 Kafka 如果实现相关功能

FAQ

java.nio.file.FileSystemException -> 另一个程序正在使用此文件,进程无法访问

问题详情 : D:\tmp\kafka-logs\topic_1-0\00000000000000000000.timeindex.deleted: 另一个程序正在使用此文件,进程无法访问

解决方案 : 手动删除\kafka-logs里的日志文件重启kafka

总结

这篇文章主要说了2部分 , 发送和监听 , 发送主要基于 client.poll 发送 , 监听核心主要是扫描和 while 循环 consumer poll 拉取 .

了解这一段流程后 ,后续就可以开始详细看看其在集群等更多功能下 , 如何操作和定制

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 设计模式之单例模式

发表于 2021-05-05

定义:指一个类只有一个实例,且该类能自行创建这个实例的一种模式。

  1. 特点

单例模式有 3 个特点:

  1. 单例类只有一个实例对象;
  2. 该单例对象必须由单例类自行创建;
  3. 单例类对外提供一个访问该单例的全局访问点。

一般来说,系统中只需要有一个实例就能满足系统需要时,那么就可以设计成单例模式。

比如 Windows 的回收站,数据库的连接池,系统中的日志对象等等。(当然,如果你非要设计成多个,我也没有办法是不是)

  1. 优点和缺点

单例模式的优点:

  1. 单例模式可以保证内存里只有一个实例,减少了内存的开销。
  2. 可以避免对资源的多重占用。
  3. 单例模式设置全局访问点,可以优化和共享资源的访问。

单例模式的缺点:

  1. 单例模式一般没有接口,扩展困难。如果要扩展,则除了修改原来的代码,没有第二种途径,违背开闭原则。
  2. 在并发测试中,单例模式不利于代码调试。在调试过程中,如果单例中的代码没有执行完,也不能模拟生成一个新的对象。
  3. 单例模式的功能代码通常写在一个类中,如果功能设计不合理,则很容易违背单一职责原则。
  1. 结构

单例模式的 UML 类图如下图所示,非常简单,只有一个类。

上述类图很好地体现了单例模式的3个特点。

  1. 一个私有实例常量 INSTANCE,保证只有一个实例对象;
  2. 一个私有的构造器 private Singleton(){} 保证外部无法实例化,只能由自身创建;
  3. 通过公共的 getInstance() 方法提供一个访问该单例的全局访问点。
  1. 实现

单例模式可以按不同维度对其进行分类:

  • 线程安全维度:线程安全的单例模式、线程不安全的单例模式
  • 对象创建时机:饿汉式的单例模式、懒汉式的单例模式
    • 饿汉式 - 第一次调用前(或说类被 JVM 加载时)就已经被实例化了。
    • 懒汉式 - 只有在第一次调用的时候才会被实例化。

4.1 枚举类单例

因为 Java 保证枚举类的每个枚举都是单例,所以我们只需要编写一个只有一个枚举的类即可,而且它是线程安全的。

枚举类也完全可以像其他类那样定义自己的字段、方法,如下方实例代码中的 name 参数,getName() 及 setName() 方法等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码package com.junmoyu.singleton;

/**
* 枚举的单例实现 - 线程安全
* 绝对防止多实例化,即使是在面反序列化和反射攻击时
*/
public enum EnumSingleton {
/**
* 唯一实例
*/
INSTANCE;

/**
* 如果没有初始化的内容,可删除此方法
*/
EnumSingleton() {
System.out.println(getClass().getCanonicalName() + " 被实例化,内存地址为:" + hashCode());
this.name = "莫语";
}

private String name;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

Effective Java 书中也推荐使用这种单例模式。因为它足够简单,线程安全,且天然可以防止多实例化,即使是在面反序列化和反射攻击时。

如果需要在单例中做初始化操作,可以使用构造方法实现,否则是不需要构造方法的。使用时可以直接用 EnumSingleton.INSTANCE.getName() 来调用单例中的方法。

在类中添加一个 main() 方法测试一下看看,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public enum EnumSingleton {
INSTANCE;

// ... 省略其他代码
public static void main(String[] args) throws Exception {
// 延迟加载测试
System.out.println("测试代码启动");
Thread.sleep(1000);

// 方法调用测试
System.out.println("name: " + EnumSingleton.INSTANCE.getName());
EnumSingleton.INSTANCE.setName("junmoyu.com");
System.out.println("name: " + EnumSingleton.INSTANCE.getName());

// 反射测试
// 枚举天然防止反射攻击
Class<EnumSingleton> clazz = (Class<EnumSingleton>) Class.forName("com.junmoyu.singleton.EnumSingleton");
Constructor<EnumSingleton> constructor = clazz.getDeclaredConstructor(null);
// 这里将直接抛出异常
EnumSingleton singleton = constructor.newInstance();
}
}

运行后结果如下:

1
2
3
4
5
6
7
8
vbnet复制代码com.junmoyu.singleton.EnumSingleton 被实例化,hashCode:460141958
代码启动
name: 莫语
name: junmoyu.com
Exception in thread "main" java.lang.NoSuchMethodException: com.junmoyu.singleton.EnumSingleton.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.getDeclaredConstructor(Class.java:2178)
at com.junmoyu.singleton.EnumSingleton.main(EnumSingleton.java:51)

从结果中可以看出,枚举类的单例模式不是延迟加载的,且可以防止反射创建多个实例。关于反射的话题会在后面的章节专门讲解。

4.2 饿汉式单例

饿汉式单例,其创建对象的时机是在第一次调用之前,在类被 JVM 加载时就会被创建。其代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码package com.junmoyu.singleton;

/**
* 饿汉式单例模式 - 线程安全
* 该类在程序加载时就已经初始化完成了
*/
public class EagerlySingleton {
/**
* 初始化静态实例
*/
private static EagerlySingleton INSTANCE = new EagerlySingleton();

/**
* 私有构造函数,保证无法从外部进行实例化
*/
private EagerlySingleton() {
System.out.println(getClass().getCanonicalName() + " 被实例化,hashCode:" + hashCode());
}

/**
* 可被用户调用以获取类的实例
*/
public static EagerlySingleton getInstance() {
return INSTANCE;
}

public static void main(String[] args) throws Exception {
// 延迟加载测试
System.out.println("测试代码启动");
Thread.sleep(1000);

// 多线程测试
for (int i = 0; i < 5; i++) {
new Thread(() -> System.out.println("多线程测试:hashCode:" + "@" + EagerlySingleton.getInstance().hashCode())).start();
}
}
}

饿汉式单例提供了已被实例化的静态实例 INSTANCE,所以不存在多个线程创建多个实例的情况,所以它是线程安全的。

这种单例模式的缺点是即使单例没有被使用,对象也会被创建,占用资源(但其实并不会占用太多资源,视具体业务情况而定)。运行main()方法测试一下。

1
2
3
4
5
6
7
perl复制代码com.junmoyu.singleton.EagerlySingleton 被实例化,hashCode:460141958
测试代码启动
多线程测试:hashCode:@460141958
多线程测试:hashCode:@460141958
多线程测试:hashCode:@460141958
多线程测试:hashCode:@460141958
多线程测试:hashCode:@460141958

可以看到饿汉式单例的确是延迟加载的,而且线程安全。其实线程安全问题比较难以测试,因为此类的确是线程安全的,所以仅做演示,后面会出现线程安全问题的单例方式会着重说明。反射的问题后面也会有专门的章节进行说明。

4.3 静态内部类单例

静态内部类实现的单例与上面的饿汉式单例有点相似,这种单例模式也是 线程安全的,但它却是延迟加载的,所以对于一些需要延迟加载的单例来说,这种方式是一种非常不错的选择。其代码实现如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码package com.junmoyu.singleton;

/**
* 静态内部类 - 线程安全,延迟加载
* 写法简单,且可延迟加载,较推荐此种实现
*/
public class StaticInnerClassSingleton {
/**
* 使用静态内部类来实现延迟加载
*/
private static class HelperHolder {
private static final StaticInnerClassSingleton INSTANCE = new StaticInnerClassSingleton();
}

/**
* 私有构造方法
*/
private StaticInnerClassSingleton() {
System.out.println(getClass().getCanonicalName() + " 被实例化,hashCode:" + hashCode());
}

/**
* 获取单例实例
*/
public static StaticInnerClassSingleton getInstance() {
return HelperHolder.INSTANCE;
}

public static void main(String[] args) throws Exception {
// 延迟加载测试
System.out.println("测试代码启动");
Thread.sleep(1000);

// 多线程测试
for (int i = 0; i < 5; i++) {
new Thread(() -> System.out.println("多线程测试:hashCode:" + "@" + StaticInnerClassSingleton.getInstance().hashCode())).start();
}
}
}

与饿汉式单例不同的是,静态内部类单例是延迟加载的,对于一些占用资源多且使用频率不高的单例来说是个非常不错的实现,而且它也是线程安全的,如果你的业务需要线程安全且延迟加载的单例模式,那么静态内部类是个非常不错的选择。同样运行 main()
进行测试,结果如下:

1
2
3
4
5
6
7
perl复制代码测试代码启动
com.junmoyu.singleton.StaticInnerClassSingleton 被实例化,hashCode:1251394951
多线程测试:hashCode:@1251394951
多线程测试:hashCode:@1251394951
多线程测试:hashCode:@1251394951
多线程测试:hashCode:@1251394951
多线程测试:hashCode:@1251394951

通过日志可以明显发现,当主线程 main()启动之后,且睡眠等待一分钟,在调用了 StaticInnerClassSingleton.getInstance().hashCode()
方法时该类才被实例化,且在多线程中,仅被实例化了一次。

4.4 懒汉式单例 - 线程不安全

懒汉式的特点就是延迟加载,即对象会在第一次调用时才会被实例化,避免资源消耗。

如下方代码所示,这是最简单的一种写法,但这是 线程不安全 的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码package com.junmoyu.singleton;

/**
* 懒汉式 - 线程不安全
* 非常不推荐使用
*/
public class ThreadUnsafeLazyLoadedSingleton {

private static ThreadUnsafeLazyLoadedSingleton INSTANCE = null;

/**
* 私有构造方法
*/
private ThreadUnsafeLazyLoadedSingleton() {
System.out.println(getClass().getCanonicalName() + " 被实例化,hashCode:" + hashCode());
}

/**
* 可被用户调用以获取类的实例 - 线程不安全
*/
public static ThreadUnsafeLazyLoadedSingleton getInstance() {
if (INSTANCE == null) {
INSTANCE = new ThreadUnsafeLazyLoadedSingleton();
}
return INSTANCE;
}

public static void main(String[] args) throws Exception {
// 延迟加载测试
System.out.println("测试代码启动");
Thread.sleep(1000);

// 多线程测试
for (int i = 0; i < 5; i++) {
new Thread(() -> System.out.println("多线程测试:hashCode:" + "@" + ThreadUnsafeLazyLoadedSingleton.getInstance().hashCode())).start();
}
}
}

直接运行 main() 测试一下,结果如下:

1
2
3
4
5
6
7
8
9
perl复制代码测试代码启动
com.junmoyu.singleton.ThreadUnsafeLazyLoadedSingleton 被实例化,hashCode:847507483
多线程测试:hashCode:@847507483
com.junmoyu.singleton.ThreadUnsafeLazyLoadedSingleton 被实例化,hashCode:319699154
多线程测试:hashCode:@319699154
com.junmoyu.singleton.ThreadUnsafeLazyLoadedSingleton 被实例化,hashCode:758108352
多线程测试:hashCode:@758108352
多线程测试:hashCode:@847507483
多线程测试:hashCode:@847507483

从结果可以明显的看到,该类虽然是延迟加载的,但是在多线程中,被实例化多次,这是线程不安全,非常不推荐使用!

4.5 懒汉式单例 - 线程安全

既然上一种方式是线程不安全的,那么基于上面的写法,做一些修改,让它线程安全不就可以嘛。比较简单的一种做法是在getInstance()方法上加锁,添加 synchronized 关键字即可。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码package com.junmoyu.singleton;

/**
* 懒汉式 - 线程安全,延迟加载
* 但因为 getInstance() 方法加锁,导致多线程下性能较差,不推荐使用
*/
public class ThreadSafeLazyLoadedSingleton {
private static ThreadSafeLazyLoadedSingleton INSTANCE = null;

/**
* 私有构造方法
*/
private ThreadSafeLazyLoadedSingleton() {
System.out.println(getClass().getCanonicalName() + " 被实例化,hashCode:" + hashCode());
}

/**
* 可被用户调用以获取类的实例 - 线程安全
* 使用 synchronized 加锁以实现线程安全
*/
public static synchronized ThreadSafeLazyLoadedSingleton getInstance() {
if (INSTANCE == null) {
INSTANCE = new ThreadSafeLazyLoadedSingleton();
}
return INSTANCE;
}
}

通过加锁,每次只有一个线程允许访问 getInstance(),实例化对象,当实例化完成后,这个线程才会解锁,其他线程就没办法创建实例了,也就实现了线程安全。

但是这种方式有一个很大的缺点,就是每次使用时都会因为锁而非常消耗性能,因为每次调用 getInstance() 都只有一个线程可以访问,其他线程只能干等着。所以这种方式也是不推荐的。测试代码大家可自己运行看看结果。

4.6 懒汉式单例 - 双重校验锁

到这里,我们可以看到以上两种懒汉式单例都有非常明显的缺陷,那么怎么解决呢?

基于上一种方式的代码,既然在方法上加锁会影响性能,那么我们把锁加在方法里面,加锁之前先判断一下是否已经实例化了是不是就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public class ThreadSafeLazyLoadedSingleton {
public static ThreadSafeLazyLoadedSingleton getInstance() {
if (INSTANCE == null) {
// 1 号位置
synchronized (ThreadSafeLazyLoadedSingleton.class) {
// 2 号位置
INSTANCE = new ThreadSafeLazyLoadedSingleton();
}
}
return INSTANCE;
}
}

如上方代码所示,这样就不会出现每次调用 getInstance()都只能有一个线程访问导致性能问题了是不是。

但是仔细想一想上面是不是在多线程的环境下还是会有问题呢?比如此时有线程A和线程B两个线程,两个线程同时访问 getInstance()方法,同时到达 1 号位置,此时他们会争抢锁,因为只能有一个线程进入下面的代码块。

假设线程A此时抢到了锁,线程B未抢到锁,在1号位置等待。当线程A在 2 号位置 创建完实例,返回之后解锁,此时对象已经被实例化了。解锁之后,线程B就可以获取锁了,获取锁之后,线程B也可以再创建一个实例。

针对上面这种情况,还需要再进行优化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public class ThreadSafeLazyLoadedSingleton {
public static ThreadSafeLazyLoadedSingleton getInstance() {
if (INSTANCE == null) {
// 1 号位置
synchronized (ThreadSafeLazyLoadedSingleton.class) {
// 2 号位置
if (INSTANCE == null) {
INSTANCE = new ThreadSafeLazyLoadedSingleton();
}
}
}
return INSTANCE;
}
}

在 2 号位置 同样加上非空判断,这样即使出现上述的情况,当实例化之后,其他线程获取锁,进入到 2 号位置,也无法再创建实例了。这种方式就是懒汉式单例的最终解决方案:双重校验锁。

至于为什么要双重校验,答案就在上面的那个例子里面了。双重校验锁 单例模式完整的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码package com.junmoyu.singleton;

/**
* 懒汉式最终解决方案 - 线程安全,延迟加载
* 也叫双重校验锁
*/
public class DoubleCheckLockingSingleton {
/**
* 加入 volatile 保证线程可见性,防止指令重排导致实例被多次实例化
* 否则线程不安全
*/
private volatile static DoubleCheckLockingSingleton INSTANCE = null;

/**
* 私有构造方法
*/
private DoubleCheckLockingSingleton() {
System.out.println(getClass().getCanonicalName() + " 被实例化,hashCode:" + hashCode());
}

/**
* 线程安全的实例获取,使用双重检查,避免每次获取实例时都加锁
* 但这种模式依然是有隐患的,INSTANCE 常量必须添加 volatile 关键字才能避免指令重排,保持线程可见性
* 而 volatile 在 JDK 1.5 之后才支持
*/
public static DoubleCheckLockingSingleton getInstance() {
if (INSTANCE == null) {
synchronized (DoubleCheckLockingSingleton.class) {
if (INSTANCE == null) {
INSTANCE = new DoubleCheckLockingSingleton();
}
}
}
return INSTANCE;
}
}

测试方法与之前一样,在此就不演示了。

要注意的事,这种实现方式,INSTANCE 常量是必须要加上 volatile 关键字的,不然还是不能保证完全的线程安全,不加 volatile 可能会因为 JVM 指令重排而出现问题。具体原因在下一个章节详细说明。

  1. 进阶

5.1 volatile 关键字解析

首先来看一下 双重校验锁 会出现什么问题。我们先要了解对象的创建过程(new 关键字),它简单的分为三个阶段:

1.分配对象内存空间。 2.初始化对象。 3.设置对象指向内存空间。

但是实际上第二步和第三步的顺序是可以互换的,在 JVM 的优化中存在一种指令重排序的机制,可以加快 JVM 的运行速度。

那么现在我们来做个实验,运行 DoubleCheckLockingSingleton.main() 方法后,在 target 目录下找到它的 DoubleCheckLockingSingleton.class
文件,然后使用 javap -c DoubleCheckLockingSingleton.class > DCL.txt 来生成字节码文件。

打开文件可以在 public static com.junmoyu.singleton.DoubleCheckLockingSingleton getInstance(); 下方看到 getInstance()
方法整个字节码执行过程。

1
2
3
4
yaml复制代码17: new           #9      // class com/junmoyu/singleton/ThreadSafeLazyLoadedSingleton
20: dup
21: invokespecial #10 // Method "<init>":()V
24: putstatic #5 // Field INSTANCE:Lcom/junmoyu/singleton/ThreadSafeLazyLoadedSingleton;

上面四个步骤是节选了 new DoubleCheckLockingSingleton() 的执行过程。

  • 17 : new 指令在 java 堆上为 ThreadSafeLazyLoadedSingleton 对象分配内存空间,并将地址压入操作栈顶
  • 20 : dup 指令为复制操作栈顶值,并将其压入栈顶,这时操作栈上有连续相同的两个对象地址
  • 21 : 调用实例的构造函数,实例化对象,这一步会弹出一个之前入栈的对象地址
  • 24 : 将对象地址赋值给常量 INSTANCE

由上可看到创建一个对象并非原子操作,而是分成了多个步骤,如果 JVM 重排序后,21 在 24 之后,此时分配完了对象的内存空间,且把内存地址复制给了常量 INSTANCE,那么此时 INSTANCE != null。

如果此时有另外一个线程调用 getInstance() 就会直接返回 INSTANCE 常量,然而对象其实还没有实例化完成,返回的将是一个空的对象。执行过程如下:

执行步骤 线程1 线程2
step1 分配对象内存空间
step2 将对象内存地址赋值给常量 INSTANCE
step3 判断对象是否为 null
step4 对象不为 null, 返回 INSTANCE
step5 访问 INSTANCE 对象
step6 初始化对象

如果出现上表所示的情况,那么线程2将获取到一个空的对象,访问对象的参数或方法都将出现异常。所以需要加上 volatile 关键字。volatile 关键字有两个作用:

  1. 保证对象的可见性。
  2. 防止指令重排序。

对象的创建可能发生指令的重排序,使用 volatile 可以禁止指令的重排序,保证多线程环境下的线程安全。

那么至此,相信你对 双重校验锁 的单例为什么要使用 volatile 关键字的原因已经了解清楚了。

但是你可能还有一个疑惑,为什么之前在 public static synchronized LazyLoadedSingleton getInstance() 方法上加锁时并未提到需要加 volatile 关键字呢。

其实原因也很简单,因为这里锁的是方法,即使出现了指令重排,其他的线程在调用 getInstance() 时也无法获取实例,因为方法被加锁了,自然也不会出现问题,只是方法加锁性能损耗较大而已。

在 Spring 源码中也可以看到类似的例子,如 org.springframework.beans.factory.xml.DefaultNamespaceHandlerResolver
类中的 private volatile Map<String, Object> handlerMappings; 参数与 private Map<String, Object> getHandlerMappings()
方法就是使用 双重校验锁 的方式编写的。感兴趣的可以自行查看。

5.2 反射会导致单例失效嘛?

至于反射会不会导致单例失效,我们不妨测试一下。在各个单例类的 main() 方法中添加如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public class EagerlySingleton {
// ...

public static void main(String[] args) throws Exception {
// 反射测试
// 通过反射的方式直接调用私有构造器(通过在构造器里抛出异常可以解决此问题)
Class<EagerlySingleton> clazz = (Class<EagerlySingleton>) Class.forName("com.junmoyu.singleton.EagerlySingleton");
Constructor<EagerlySingleton> constructor = clazz.getDeclaredConstructor(null);

EagerlySingleton singleton1 = constructor.newInstance();
EagerlySingleton singleton2 = constructor.newInstance();

System.out.println("反射测试:singleton1 hashCode:" + "@" + singleton1.hashCode());
System.out.println("反射测试:singleton2 hashCode:" + "@" + singleton2.hashCode());
}
}

运行后查看日志可以发现,两个对象的 hashCode 是不一样的。除了枚举实现的单例模式外,其他拥有私有构造器的实现方式均可通过反射来创建多个实例。

要解决的话也很简单。饿汉式单例模式的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码/**
* 饿汉式单例模式 - 线程安全
*/
public class EagerlySingleton {
private static final EagerlySingleton INSTANCE = new EagerlySingleton();

private EagerlySingleton() {
// 防止通过反射进行实例化从而破坏单例
// 最好放在开头,如不需要删除即可
if (INSTANCE != null) {
throw new IllegalStateException("Already initialized.");
}
System.out.println(getClass().getCanonicalName() + " 被实例化,hashCode:" + hashCode());
}

public static EagerlySingleton getInstance() {
return INSTANCE;
}
}

其他方式实现的单例和饿汉式单例一样,在此就不一一说明了。再次运行 main() 方法测试反射。可以打印日志如下:

1
2
3
4
csharp复制代码...
Caused by: java.lang.IllegalStateException: Already initialized.
at com.junmoyu.singleton.StaticInnerClassSingleton.<init>(StaticInnerClassSingleton.java:27)
... 5 more

可以看到想要通过反射实例化的时候,直接抛出了异常,并没有进行实例化的操作。

5.3 反序列化问题

除了反射以外,使用反序列化也同样会破坏单例。

还是以 EagerlySingleton 类来测试,先让其实现 Serializable 接口,然后在 main() 方法里面添加以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码package com.junmoyu.singleton.serializable;

public class EagerlySingleton implements Serializable {
private static final EagerlySingleton INSTANCE = new EagerlySingleton();

private EagerlySingleton() {
}

public static EagerlySingleton getInstance() {
return INSTANCE;
}

public static void main(String[] args) throws Exception {
// 反序列化测试
// 将对象写入文件
ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream("tempFile"));
EagerlySingleton osInstance = EagerlySingleton.getInstance();
System.out.println("反序列化测试:osInstance hashCode:" + "@" + osInstance.hashCode());
os.writeObject(osInstance);

// 从文件中读取对象
File file = new File("tempFile");
ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
EagerlySingleton isInstance = (EagerlySingleton) is.readObject();
// 查看 hashCode 是否相同
System.out.println("反序列化测试:isInstance hashCode:" + "@" + isInstance.hashCode());
}
}

运行 main()方法测试一下,查看日志结果如下:

1
2
3
perl复制代码com.junmoyu.singleton.serializable.DoubleCheckLockingSingleton 被实例化,hashCode:460141958
反序列化测试:osInstance hashCode:@460141958
反序列化测试:isInstance hashCode:@81628611

可以看到序列化对象和反序列化之后的对象 hashCode 并不相同。

想要解决反序列化的问题,只需要添加一个 readResolve() 方法即可。代码如下:

1
2
3
4
5
6
7
8
9
10
11
java复制代码public class EagerlySingleton implements Serializable {
// ...

/**
* 如果有序列化需求,需要添加此方法以防止反序列化时重新创建新实例
* 如无序列化需求可不加,同时去除 implements Serializable
*/
private Object readResolve() {
return INSTANCE;
}
}

再次运行 main()方法测试一下,查看日志结果如下:

1
2
3
perl复制代码com.junmoyu.singleton.serializable.EagerlySingleton 被实例化,hashCode:460141958
反序列化测试:osInstance hashCode:@460141958
反序列化测试:isInstance hashCode:@460141958

可以发现两个对象的 hashCode 已经是一致的了。

枚举类的单例,是天然可以绝对防止多实例化的,反射及反序列化都无效。

至于为什么加了 readResolve()就可以防止反序列化重新创建实例,就要深入源码解析了。这里就不详细叙述了,简单说一下。

反序列化的对象获取是通过方法 ObjectInputStream#readObject(),进入源码,可以看到 Object obj = readObject0(false); 这行代码最终返回的对象。继续进入源码,在
ObjectInputStream#readObject0()方法中可以看到一个 switch 选择器,找到下面这块重点代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public class ObjectInputStream extends InputStream implements ObjectInput, ObjectStreamConstants {
// ...
private Object readObject0(boolean unshared) throws IOException {
// ...
try {
switch (tc) {
// ...
case TC_ENUM:
// 单例中为什么枚举最安全,感兴趣的同学可以看一下这里的实现
return checkResolve(readEnum(unshared));
case TC_OBJECT:
// 这里的 readOrdinaryObject 就是读取对象的方法了
return checkResolve(readOrdinaryObject(unshared));
}
} finally {
}
}
}

进入 readOrdinaryObject() 方法,最终的答案都在这里了。

这个方法里面,最重要的两块代码,我圈出来了。第一块其实就是我们在没有加入 readResolve()
方法时,它通过反射创建了一个新的实例,在第二块代码的判断里 desc.hasReadResolveMethod() == false 将不会执行 if 里面的语句。它返回就是之前创建的新实例了。

if 中的代码其实就是调用 readResolve() 方法,然后将获取到的对象替换掉第一块代码里面创建的新实例,而readResolve() 方法不正是返回了单例的实例嘛。所以如果加了这个方法,就会执行 if
里面的代码,用单例的实例去替换掉反射创建的实例。

所以现在你知道为什么加入 readResolve() 方法就可以防止反序列化了吧。

5.4 你以为这就结束了?

虽然不想再啰嗦了(对不住了!),但是还有一个重点!

如果有两个类加载器(class loader)的存在,那是两个类加载器可能各自创建自己的单例模式。

因为每个类加载器都定义了一个命名空间,如果有两个或以上的类加载器,不同的类加载器可能会加载同一个类,那么从整个程序来看,同一个类就被加载多次了。也就是会有多个单例的实例并存。

所以,如果你的程序有多个类加载器又同时使用了单例模式,那么就要小心了。有一个解决方法就是你可以自行指定类加载器,并指定同一个类加载器。

  1. 拓展

另外其实还有一种稍微特殊一点的 “单例” 模式,可以称之为 线程单例,那就是使用 ThreadLocal 使每一个线程拥有自己的单例。

比如 mybatis 3.5.x 版本中的 org.apache.ibatis.executor.ErrorContext 类,就是使用了此种方式实现的,感兴趣的可以自行研究,在此不再赘述。简单贴下代码感受下:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码package org.apache.ibatis.executor;

public class ErrorContext {
// 这里使用了函数式接口 Supplier<T>,更优雅的初始化 ThreadLocal
private static final ThreadLocal<ErrorContext> LOCAL = ThreadLocal.withInitial(ErrorContext::new);

private ErrorContext() {
}

public static ErrorContext instance() {
return LOCAL.get();
}
}
  1. 总结

至此,我们讨论了六种单例模式的实现方式。

  1. 枚举实现 - 线程安全
  2. 饿汉式单例 - 线程安全
  3. 静态内部类实现 - 线程安全、延迟加载
  4. 普通懒汉式 - 线程不安全、延迟加载
  5. 方法加锁懒汉式 - 线程安全、延迟加载,但性能差
  6. 双重校验锁懒汉式 - 线程安全、延迟加载

且除了枚举实现的单例外,其他均有反射及序列化会破坏单例的情况。那么综合来看的话,枚举实现的单例是最优的方案,也是 Effective Java 书中推荐的方案。然而在 Java
及一些框架的源码中使用枚举单例的例子很少,不知道是为什么,可能是我看的源码还不够多吧。

因为枚举实现的单例模式其实也属于饿汉式,所以如果在实例化时需要执行耗时操作的话,则不建议使用。

那么除此之外较好的单例实现还有静态内部类的实现,以及双重校验锁的实现,可以根据自己的业务需要灵活选择。

项目源代码地址: https://github.com/moyu-jun/java-design-patterns

更多 Java 设计模式系列文章: https://junmoyu.com/2021/design-patterns

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Python+selenium+firefox模拟登录微博并

发表于 2021-05-05

1:环境python3.5,最新 firefox,selenium-3.14.0.

本来准备用无界面的,但是感觉效果不好看出来所以先用有界面的浏览器来做。分几次来慢慢写。这节先配置好环境。

2:安装:

Python Selenium库的版本要对应浏览器的版本,不然可能会出现打不开浏览器,或者打开浏览器,打不开页面的情况。这个非常重要。

开始时:

我直接 pip install -i pypi.douba.com/simple selenium 。下载的版本不对。(太低了。)

最新版的应该是如下图的:

Python+selenium+firefox模拟登录微博并爬取数据(1)

切记一定要安装最新的这个版本 :

Pip install selenium==3.14.0。

接下来 就是下载驱动了:

Github上面下载,地址为:

github.com/mozilla/gec…

Python+selenium+firefox模拟登录微博并爬取数据(1)

最新的是这个。他要求的selenium版本要在3.11 之上。所以这里得尊重一下。开始我没注意到python 安装的selenium的版本。导致。访问页面一直打不开。

下载后,解压保存到一个目录中。我放在了python的安装目录下的Script目录下。因为这个目录我配到了环境变量中。

Python+selenium+firefox模拟登录微博并爬取数据(1)

最后去火狐官网下载一个最新的浏览器。(安装过程中有选项,最好把自动升级最好关了)。

Python+selenium+firefox模拟登录微博并爬取数据(1)

这些装完了,就可以使用了。:

Python+selenium+firefox模拟登录微博并爬取数据(1)

这里页面加载需要一些时间,为了保证能正常的加载上。我们然他睡十秒再执行。

最后完成:(先不要在意这些细节)

Python+selenium+firefox模拟登录微博并爬取数据(1)

下节做模拟登陆并跳转到榜单界面。
image

你要不要也来试试,用 Python 测测你和女神的颜值差距(仅供娱乐,请勿联想) 如果真的遇到好的同事,那算你走运,加油,抓紧学到手。
python、爬虫技巧资源分享Q群:766610200
包含python, pythonweb、爬虫、数据分析等Python技巧,以及人工智能、大数据、数据挖掘、自动化办公等的学习方法。
打造从零基础到项目开发上手实战全方位解析!
点击:加入

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

深入理解Java的String类 String类 字符串常量

发表于 2021-05-05

文章已收录Github精选,欢迎Star:github.com/yehongzhi/l…

String类

在Java中String类的使用的频率可谓相当高。它是Java语言中的核心类,在java.lang包下,主要用于字符串的比较、查找、拼接等等操作。如果要深入理解一个类,最好的方法就是看看源码:

1
2
3
4
5
6
7
8
9
java复制代码public final class String implements java.io.Serializable, Comparable<String>, CharSequence {
/** The value is used for character storage. */
private final char value[];

/** Cache the hash code for the string */
private int hash; // Default to 0

//...
}

从源码中,可以看出以下几点:

  • String类被final关键字修饰,表示String类不能被继承,并且它的成员方法都默认为final方法。
  • String类实现了Serializable、CharSequence、 Comparable接口。
  • String类的值是通过char数组存储的,并且char数组被private和final修饰,字符串一旦创建就不能再修改。

下面通过几个问题不断加深对String类的理解。

问题一

上面说字符串一旦创建就不能再修改,String类提供的replace()方法不就可以替换修改字符串的内容吗?

实际上replace()方法并没有对原字符串进行修改,而是创建了一个新的字符串返回,看看源码就知道了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码public String replace(char oldChar, char newChar) {
if (oldChar != newChar) {
int len = value.length;
int i = -1;
char[] val = value; /* avoid getfield opcode */
while (++i < len) {
if (val[i] == oldChar) {
break;
}
}
if (i < len) {
char buf[] = new char[len];
for (int j = 0; j < i; j++) {
buf[j] = val[j];
}
while (i < len) {
char c = val[i];
buf[i] = (c == oldChar) ? newChar : c;
i++;
}
//创建一个新的字符串返回
return new String(buf, true);
}
}
return this;
}

其他方法也是一样,无论是sub、concat还是replace操作都不是在原有的字符串上进行的,而是重新生成了一个新的字符串对象。

问题二

为什么要使用final关键字修饰String类?

首先要讲final修饰类的作用,被final修饰的类不能被继承,类中的所有成员方法都会被隐式地指定为final方法。也就是不能拥有子类,成员方法也不能被重写。

回到问题,String类被final修饰主要基于安全性和效率两点考虑。

  • 安全性

因为字符串是不可变的,所以是多线程安全的,同一个字符串实例可以被多个线程共享。这样便不用因为线程安全问题而使用同步。字符串自己便是线程安全的。

String被许多的Java类(库)用来当做参数,比如网络连接地址URL,文件路径path,还有反射机制所需要的String参数等,假若String不是固定不变的,将会引起各种安全隐患。

  • 效率

字符串不变性保证了hash码的唯一性,因此可以放心的进行缓存,这也是一种性能优化手段,意味着不必每次都取计算新的哈希码。

只有当字符串是不可变的,字符串池才有可能实现,字符串常量池是java堆内存中一个特殊的存储区域,当创建一个String对象,假如此字符串值已经存在于常量池中,则不会创建一个新的对象,而是引用已经存在的对象。

字符串常量池

字符串的分配和其他对象分配一样,是需要消耗高昂的时间和空间的,而且字符串我们使用的非常多。JVM为了提高性能和减少内存的开销,所以在实例化字符串的时候使用字符串常量池进行优化。

池化思想其实在Java中并不少见,字符串常量池也是类似的思想,当创建字符串时,JVM会首先检查字符串常量池,如果该字符串已经存在常量池中,那么就直接返回常量池中的实例引用。如果字符串不存在常量池中,就会实例化该字符串并且将其放到常量池中。

我们可以写个简单的例子证明:

1
2
3
4
5
java复制代码public static void main(String[] args) throws Exception {
String s1 = "abc";
String s2 = "abc";
System.out.println(s1 == s2);//true
}

还有一个面试中经常问的,new String(“abc”)创建了几个对象?

这可能就是想考你对字符串常量池的理解,我一般回答是一个或者两个对象。

如果之前”abc”字符串没有使用过,毫无疑问是创建两个对象,堆中创建了一个String对象,字符串常量池创建了一个,一共两个。

如果之前已经使用过了”abc”字符串,则不会再在字符串常量池创建对象,而是从字符串常量缓冲区中获取,只会在堆中创建一个String对象。

1
2
3
java复制代码String s1 = "abc";
String s2 = new String("abc");
//s2这行代码,只会创建一个对象

字符串拼接

字符串的拼接在Java中是很常见的操作,但是拼接字符串并不是简简单单地使用”+”号即可,还有一些要注意的点,否则会造成效率低下。

比如下面这段代码:

1
2
3
4
5
6
7
java复制代码public static void main(String[] args) throws Exception {
String s = "";
for (int i = 0; i < 10; i++) {
s+=i;
}
System.out.println(s);//0123456789
}

在循环内使用+=拼接字符串会有什么问题呢?我们反编译一下看看就知道了。

其实反编译后,我们可以看到String类使用”+=”拼接的底层其实是使用StringBuilder,先初始化一个StringBuilder对象,然后使用append()方法拼接,最后使用toString()方法得到结果。

问题在于如果在循环体内使用+=拼接,会创建很多临时的StringBuilder对象,拼接后再调用toString()赋给原String对象。这会生成大量临时对象,严重影响性能。

所以在循环体内进行字符串拼接时,建议使用StringBuilder或者StringBuffer类,例子如下:

1
2
3
4
5
6
7
java复制代码public static void main(String[] args) throws Exception {
StringBuilder s = new StringBuilder();
for (int i = 0; i < 10; i++) {
s.append(i);
}
System.out.println(s.toString());//0123456789
}

StringBuilder和StringBuffer的区别在于,StringBuffer的方法都被sync关键字修饰,所以是线程安全的,而StringBuilder则是线程不安全的(效率高)。

总结

回顾一下,本文介绍了String类的不可变的特点,还有字符串常量池的作用,最后简单地从JVM编译的层面对字符串拼接提出一点建议。所谓温故而知新,即使是一些很基础很常见的类,如果深入去探索的话,也会有一番收获。

这篇文章就讲到这里了,感谢大家的阅读,希望看完大家能有所收获!

文章持续更新,微信搜索『java技术爱好者』,关注后第一时间收到推送的技术文章,文章分类收录于github:github.com/yehongzhi,总能找到你感兴趣的

觉得有用就点个赞吧,你的点赞是我创作的最大动力~

我是一个努力让大家记住的程序员。我们下期再见!!!

能力有限,如果有什么错误或者不当之处,请大家批评指正,一起学习交流!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

平衡搜索二叉树对比(AVL、红黑树)

发表于 2021-05-05

前言

0.1 二叉搜索树(BST)

  • 根节点的值大于其左子树中任意一个节点的值,小于其右节点中任意一节点的值,这一规则适用于二叉查找树中的每一个节点。

0.2 二叉搜索树查找的效率是基于二叉搜索树的深度。

二叉树节点的插入是根据根节点的值,判断与当前插入值的大小,小的话向左,大的向右;如果左右各有节点,则重复左右节点的寻找。直到某个节点的左/右节点为空,插入。

二叉搜索树.png

举个例子:如果要插入11这个值,5 -> 7 -> 8 -> 9,找到9之后比9大,而且9的右子树值为null,所以插入9的右子树。

0.3 极端情况

图片.png
极端情况下,树会变成一个链表,这样的话,查找的时间复杂度会变成O(n)。

平衡搜索二叉树就是为了解决这种情况

一、AVL树

    1. 发明者 G. M. Adelson-Velsky和 Evgenii Landis(AVL)。
    1. Balance Factor(平衡因子):是它的左子树的高度减去它的右子树的高度(有时相反)。balance factor = {-1, 0, 1}。
    1. 通过旋转操作来进行平衡(四种)

AVL树.png

1.1 AVL的基础旋转操作

当某个节点的左右子树高度差在Balance Factor之外的时候,AVL就会进行子树的旋转操作从而达到平衡。

1.1.1 右右子树 —> 左旋

右右子树 —> 左旋.png

1.1.2 左左子树 —> 右旋

左左子树 —> 右旋.png

1.1.3 左右子树 —> 左右旋

先变成左左子树

图片.png
再进行右旋

图片.png

1.1.4 右左子树 —> 右左旋

先变成右右子树

图片.png
再进行左旋

图片.png

1.2 AVL的旋转操作

AVL如果存在子树的情况下,左/右旋转时,要把子树也带到另一个结点上

1.2.1 AVL带子树的左旋

AVL左旋.gif

1.2.2 AVL带子树的右旋

AVL右旋.gif

1.2.3 总结

AVL旋转.png

二、红黑树

上述的AVL树可以保证每个结点的左右子树高度都能平衡。但是这样就需要每次插入都去判断当前结点是否是平衡的,有时候我们并不需要这种非常严格的平衡机制,只需要相对来说平衡即可。

于是我们就有了近似平衡二叉树这种数据结构,可以减少因为平衡二叉树而消耗的资源。

2.1 红黑树的定义

红黑树是一种近似平衡的二叉搜索树(Binary Search Tree),它能够确保任何一
个结点的左右子树的高度差小于两倍。

2.2 红黑树的性质

  • 每个结点要么是红色,要么是黑色
  • 根结点是黑色
  • 每个叶结点(NIL结点,空结点)是黑色的。
  • 不能有相邻接的两个红色结点
  • 从任一结点到其每个叶子的所有路径都包含相同数目的黑色结点。

尤其是最后两条性质,保证了红黑树的左右子树高度差不会超过两倍。

红黑树.png

三、AVL与红黑树的对比

AVL 红黑树 备注
查询效率 较高 较低 AVL是绝对平衡的,而红黑树最坏情况左右高度会差一倍
增删效率 较低 较高 AVL要时刻保持左右子树绝对平衡,红黑树没有那么严格
存储空间 较高 较低 AVL每个结点都要存储一个整数来记录平衡因子,红黑树只需要一个bit,来标识红/黑
常用于 数据库 语言库 数据库相对来说查询多,增删少。而用语言库,比如Java的HashMap,增删和查的几率基本上是一半一半

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 8 之Function和BiFunct

发表于 2021-05-05

简介

为了更好的将函数作为参数,Java遂引入了Function接口

Function<T, R>的使用

看其源码

image.png

要素察觉

  • 该接口会接收一个参数,且会产生结果
  • 在使用这个接口前需要明确定义参数类型和返回的结果类型(Java的泛型就是这么回事)
  • 里面有一个apply方法将会对参数进行操作,并返回结果
  • 因为是函数式接口(@FunctionalInterface),可以通过lambda表达式实现该接口

测试

1
2
3
4
5
6
7
8
9
10
java复制代码public class FunctionTest {
public static void main(String[] args) {
FunctionTest test = new FunctionTest();
//lambda表达式实现了apply,也就实现了Function接口,不要被<Integer, Integer>影响了
System.out.println(test.operate(5, integer -> integer * integer));//第二个参数是方法
} //输入参数的类型//返回类型
public int operate(int i, Function<Integer, Integer> function){
return function.apply(i);//apply的返回值是Integer,参数是i
}
}

Function<T, R>进阶玩法,使用Function的默认方法compose和andThen

再观察一波Function<T, R>接口源码

image.png

要素察觉

  • 两个方法都使用了两次apply,只是this.apply执行的先后不同
  • 说明两个Function可以组合起来用 => 两个apply可以组合使用 => 可以用两个Lambda表达式实现apply

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码
public class FunctionTest {
public static void main(String[] args) {
FunctionTest test = new FunctionTest();
System.out.println(test.operate1(2,integer -> integer*3,integer -> integer*integer));
System.out.println("-----------------------");
System.out.println(test.operate2(2,integer -> integer*3,integer -> integer*integer));
}
public int operate1(int i,Function<Integer, Integer> function1 ,Function<Integer, Integer> function2){
return function1.compose(function2).apply(i);//当前对象(this)为function1
}
public int operate2(int i,Function<Integer, Integer> function1 ,Function<Integer, Integer> function2){
return function1.andThen(function2).apply(i);//当前对象(this)为function1
}
}

解释

  • compose
    image.png

执行f1之前(before)先执行f2

  • andThen
    image.png

执行f1之后(after)再执行f2

小结

  • 总之,要分清当前function,也就是当前对象,判断执行顺序。对象有方法和成员变量。
  • 本例中,function1都是当前对象,调用它的方法,以及将function2作为参数传给它。
  • function是对象,只是有函数的功能而已。面向对象不能丢

BiFunction<T,U,R>的使用

看其源码

image.png

要素察觉

  • BiFunction接收两个参数返回一个结果
  • 它有一个抽象方法apply,接收两个参数
  • 它有一个andThen的默认方法
  • 和Function比起来多了一个参数,少了一个compose方法
    • 因为Function只需要一个参数,BiFunction刚好可以返回一个参数,可以先BiFuction再Function,所以有andThen方法
    • BiFunction是当前对象,如果它后执行,其他Function先执行,且它们都只会返回一个结果,就会导致BiFunction只有一个参数,满足不了BiFunction的需要两个参数的需求,所以没有compose方法

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public class FunctionTest {
public static void main(String[] args) {
FunctionTest test = new FunctionTest();
System.out.println(test.operate3(2,3, (a,b) -> a-b));
System.out.println("-----------------------");
System.out.println(test.operate4(3,4,(a,b) -> a*b, result -> result*10));
} //指定参数类型和返回类型
public int operate3(int a, int b, BiFunction<Integer,Integer,Integer> function){
return function.apply(a,b);
}
public int operate4(int a, int b, BiFunction<Integer,Integer,Integer> function1,Function<Integer, Integer> function2){
return function1.andThen(function2).apply(a,b);//andThen的参数是Function

}
}

image.png

解释

image.png

image.png

其实Bifuntction和Function差不多,BiFunction接收两个参数,Function接收一个参数

最后

这两个接口有啥用?

  • 当需要对数据进行操作时且你想用lambda表达式时,就可以用它,而不需要自己去写一个满足函数式接口的接口。
  • 而且对一个或两个数据的修改和操作是不同的,比如两个数的加减乘除,就不需要定义四个方法,只要Bifunction接口,用不同的Lambda表达式实现就好了
  • 某个操作只用一次用Lamdba表达式即可,就不需要单独封装成方法了
1
2
3
4
5
6
7
java复制代码System.out.println(test.operate3(2,3, (a,b) -> a+b));// 只用一次,就不需要封装成说明add方法之类的
System.out.println(test.operate3(2,3, (a,b) -> a-b));
System.out.println(test.operate3(2,3, (a,b) -> a*b));
System.out.println(test.operate3(2,3, (a,b) -> a/b));// 只用一次,就不需要封装成说明div方法之类的
public int operate3(int a, int b, BiFunction<Integer,Integer,Integer> function){
return function.apply(a,b);
}
  • 如果你一个方法都不想封装,可以这么写
1
2
3
4
5
6
7
java复制代码public class FunctionTest {
public static void main(String[] args) {
Function<Integer, Integer> function;//省去一堆基本数据类型的变量定义了
function = a ->a*3;
System.out.println(function.apply(2));
}
}

另外一个小样例,对集合进行操作

  • People
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class People {
String name;
int age;
//getter和setter,自行添加
public People(String name, int age) {
this.name = name;
this.age = age;
}

@Override
public String toString() {
return "People{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
  • 需求:获取People集合中年龄小于特定的元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public class FunctionTestPro {
public static void main(String[] args) {
FunctionTestPro testPro = new FunctionTestPro();
People p1 = new People("jetty",18);
People p2 = new People("kitty",12);
People p3 = new People("Matty",70);
List<People> list = Arrays.asList(p1,p2,p3);
//获取年龄小于70的元素
List<People> list1 = testPro.getPeople(70,list,(age,peopleList) ->peopleList.stream()//形成流
.filter(people -> people.getAge()< age ) //获取小于age的元素
.collect(Collectors.toList()));//将结果累积到List中,并返回
list1.forEach(System.out::println);//输出
//是数字码的,请重写People的toString方法
} //条件 需要被操作的集合 // 条件age为Integer 被操作的对象为List 返回值为List
public List<People> getPeople(int age, List<People> peopleList ,BiFunction<Integer,List<People>,List<People>> function){
return function.apply(age,peopleList);//两个参数
}
}
  • Stream流,将对象以流水的形态通过各种方法过滤,得到所需结果
  • 对于Lambda表达式,如果方法体只有一行,不需要{}也不需要return,它会自动return,如果需要return的话。
  • 如果多行请用{}和return
    • (a, b) ->{ a=a*b; return a+b; }

什么时候用Java 提供的Function接口

  • 主要是要有将函数作为参数的思想。
    • 你可以在以下情况下使用Java提供的Function接口:
    • 数据映射:当你需要对集合中的每个元素进行某种映射操作,生成一个新的集合时,可以使用Function接口。
1
2
3
4
java复制代码List<String> names = Arrays.asList("Alice", "Bob", "Charlie");
Function<String, Integer> nameLengthMapper = String::length;
List<Integer> nameLengths = names.stream().map(nameLengthMapper).collect(Collectors.toList());
// nameLengths 现在包含 [5, 3, 7]
+ **连续操作**:当你需要在一系列的操作中传递行为时,Function接口可以作为方法参数,使代码更具灵活性。
1
2
3
4
5
6
7
java复制代码public void processWithFunction(String data, Function<String, Integer> processor) {
int result = processor.apply(data);
// 处理结果...
}

processWithFunction("42", Integer::parseInt);
processWithFunction("Hello", String::length);
+ **函数组合**:你可以将多个Function接口连接在一起,形成一个复杂的函数组合。
1
2
3
4
java复制代码Function<Integer, Integer> addOne = x -> x + 1;
Function<Integer, Integer> multiplyByTwo = x -> x * 2;
Function<Integer, Integer> addOneAndMultiplyByTwo = addOne.andThen(multiplyByTwo);
int result = addOneAndMultiplyByTwo.apply(5); // 结果为 12 (5 + 1 = 6, 6 * 2 = 12)
  • 更好的语义

    • 当涉及多个转换步骤或操作组合时,Function接口可以提供更具说服力的例子。让我们考虑以下情况:假设你有一个字符串列表,其中包含表示人员年龄的字符串,但是你需要计算这些成年的人员年龄的平均值。

    • 我们可以使用Function接口来先将字符串转换为整数,然后计算平均值。尽管这个例子中的三个操作似乎很简单,但考虑到功能的可组合性和代码的可重用性,Function接口仍然非常有用。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      java复制代码public class Main {
      public static void main(String[] args) {
      List<String> ageStrings = Arrays.asList("25", "30", "40", "22", "28");

      // 将字符串转换为整数的方法
      Function<List<String>, List<Integer>> stringToInteger = ageStringLs -> ageStringLs
      .stream()
      .map(Integer::parseInt)
      .collect(Collectors.toList());

      // 过滤掉未成年人的方法
      Function<List<Integer>, List<Integer>> filterAdults = ages -> ages.stream()
      .filter(age -> age >= 18)
      .collect(Collectors.toList());

      // 计算平均值的函数
      Function<List<Integer>, Double> averageFunction = list -> list.stream()
      .mapToDouble(Integer::doubleValue)
      .average()
      .orElse(0.0);

      // 将三个函数组合在一起,其实就是三个步骤,计算平均年龄,一套下来就获得了所需数据
      Double averageAge = stringToInteger.andThen(filterAdults)
      .andThen(averageFunction)
      .apply(ageStrings);

      System.out.println("平均年龄:" + averageAge);
      }
      }
    • 这样,就不用写成,像下面那样的代码了,按顺序调用方法,并获取各自的返回值

      1
      2
      3
      4
      5
      6
      7
      8
      java复制代码public static void main(String[] args) { 
      List<String> ageStrings = Arrays.asList("25", "30", "40", "22", "28");
      List<Integer> ages = convertToIntegerList(ageStrings); // 将字符串转换为整数列表
      List<Integer> adultsAges = filterAdults(ages); // 过滤未成年人
      double averageAge = calculateAverage(adultsAges); // 计算平均年龄

      System.out.println("平均年龄:" + averageAge);
      }//下面定义若干个static方法,convertToIntegerList()、filterAdults()、calculateAverage()

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一文了解Mysql读写分离原理及配置 前言

发表于 2021-05-05

前言

关于MySQL的知识点总结了一个图谱,分享给大家:

复制概述

Mysql内建的复制功能是构建大型,高性能应用程序的基础。将Mysql的数据分布到多个系统上去,这种分布的机制,是通过将Mysql的某一台主机的数据复制到其它主机(slaves)上,并重新执行一遍来实现的。复制过程中一个服务器充当主服务器,而一个或多个其它服务器充当从服务器。主服务器将更新写入二进制日志文件,并维护文件的一个索引以跟踪日志循环。这些日志可以记录发送到从服务器的更新。当一个从服务器连接主服务器时,它通知主服务器从服务器在日志中读取的最后一次成功更新的位置。从服务器接收从那时起发生的任何更新,然后封锁并等待主服务器通知新的更新。

请注意当你进行复制时,所有对复制中的表的更新必须在主服务器上进行。否则,你必须要小心,以避免用户对主服务器上的表进行的更新与对从服务器上的表所进行的更新之间的冲突。

1.1 mysql支持的复制类型:

(1):基于语句的复制:在主服务器上执行的SQL语句,在从服务器上执行同样的语句。MySQL默认采用基于语句的复制,效率比较高。一旦发现没法精确复制时,会自动选着基于行的复制。   

(2):基于行的复制:把改变的内容复制过去,而不是把命令在从服务器上执行一遍,从mysql5.0开始支持   

(3):混合类型的复制: 默认采用基于语句的复制,一旦发现基于语句的无法精确的复制时,就会采用基于行的复制。

1.2 复制解决的问题

MySQL复制技术有以下一些特点:

(1) 数据分布 (Data distribution )

(2) 负载平衡(load balancing)

(3) 备份(Backups)

(4) 高可用性和容错行 High availability and failover

1.3 复制如何工作

整体上来说,复制有3个步骤:

(1)master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events);

(2)slave将master的binary log events拷贝到它的中继日志(relay log);

(3)slave重做中继日志中的事件,将改变反映它自己的数据。

下图描述了复制的过程:

该过程的第一部分就是master记录二进制日志。在每个事务更新数据完成之前,master在二日志记录这些改变。MySQL将事务串行的写入二进制日志,即使事务中的语句都是交叉执行的。在事件写入二进制日志完成后,master通知存储引擎提交事务。下一步就是slave将master的binary log拷贝到它自己的中继日志。首先,slave开始一个工作线程——I/O线程。I/O线程在master上打开一个普通的连接,然后开始binlog dump process。Binlog dump process从master的二进制日志中读取事件,如果已经跟上master,它会睡眠并等待master产生新的事件。I/O线程将这些事件写入中继日志。SQL slave thread(SQL从线程)处理该过程的最后一步。SQL线程从中继日志读取事件,并重放其中的事件而更新slave的数据,使其与master中的数据一致。只要该线程与I/O线程保持一致,中继日志通常会位于OS的缓存中,所以中继日志的开销很小。此外,在master中也有一个工作线程:和其它MySQL的连接一样,slave在master中打开一个连接也会使得master开始一个线程。复制过程有一个很重要的限制——复制在slave上是串行化的,也就是说master上的并行更新操作不能在slave上并行操作。

复制配置

有两台MySQL数据库服务器Master和slave,Master为主服务器,slave为从服务器,初始状态时,Master和slave中的数据信息相同,当Master中的数据发生变化时,slave也跟着发生相应的变化,使得master和slave的数据信息同步,达到备份的目的。

要点: 负责在主、从服务器传输各种修改动作的媒介是主服务器的二进制变更日志,这个日志记载着需要传输给从服务器的各种修改动作。因此,主服务器必须激活二进制日志功能。从服务器必须具备足以让它连接主服务器并请求主服务器把二进制变更日志传输给它的权限。环境: Master和slave的MySQL数据库版本同为5.0.18 操作系统:unbuntu 11.10 IP地址:10.100.0.100

2.1、创建复制帐号

1、在Master的数据库中建立一个备份帐户:每个slave使用标准的MySQL用户名和密码连接master。进行复制操作的用户会授予REPLICATION SLAVE权限。用户名的密码都会存储在文本文件master.info中

命令如下: mysql > GRANT REPLICATION SLAVE,RELOAD,SUPER ON . TO backup@’10.100.0.200’ IDENTIFIED BY ‘1234’;

建立一个帐户backup,并且只能允许从10.100.0.200这个地址上来登陆,密码是1234。

(如果因为mysql版本新旧密码算法不同,可以设置:set password for ‘backup’@’10.100.0.200’=old_password(‘1234’))

2.2、拷贝数据

(假如是你完全新安装mysql主从服务器,这个一步就不需要。因为新安装的master和slave有相同的数据)

关停Master服务器,将Master中的数据拷贝到B服务器中,使得Master和slave中的数据同步,并且确保在全部设置操作结束前,禁止在Master和slave服务器中进行写操作,使得两数据库中的数据一定要相同!

2.3、配置master

接下来对master进行配置,包括打开二进制日志,指定唯一的servr ID。例如,在配置文件加入如下值:

server-id=1 log-bin=mysql-bin

server-id:为主服务器A的ID值 log-bin:二进制变更日值

重启master,运行SHOW MASTER STATUS,输出如下:

image.png

2.4、配置slave

Slave的配置与master类似,你同样需要重启slave的MySQL。如下:

log_bin= mysql-bin server_id = 2 relay_log= mysql-relay-bin log_slave_updates = 1 read_only= 1

server_id是必须的,而且唯一。slave没有必要开启二进制日志,但是在一些情况下,必须设置,例如,如果slave为其它slave的master,必须设置bin_log。在这里,我们开启了二进制日志,而且显示的命名(默认名称为hostname,但是,如果hostname改变则会出现问题)。

relay_log配置中继日志,log_slave_updates表示slave将复制事件写进自己的二进制日志(后面会看到它的用处)。
有些人开启了slave的二进制日志,却没有设置log_slave_updates,然后查看slave的数据是否改变,这是一种错误的配置。所以,尽量使用read_only,它防止改变数据(除了特殊的线程)。但是,read_only并是很实用,特别是那些需要在slave上创建表的应用。

2.5、启动slave

接下来就是让slave连接master,并开始重做master二进制日志中的事件。你不应该用配置文件进行该操作,而应该使用CHANGE MASTER TO语句,该语句可以完全取代对配置文件的修改,而且它可以为slave指定不同的master,而不需要停止服务器。如下:
`
mysql> CHANGE MASTER TO MASTER_HOST=’server1’,

1
2
3
4
5
6
7
ini复制代码-> MASTER_USER='repl',

-> MASTER_PASSWORD='p4ssword',

-> MASTER_LOG_FILE='mysql-bin.000001',

-> MASTER_LOG_POS=0;

`
MASTER_LOG_POS的值为0,因为它是日志的开始位置。

你可以用SHOW SLAVE STATUS语句查看slave的设置是否正确:

`mysql> SHOW SLAVE STATUS\G

*************************** 1. row ***************************

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
yaml复制代码         Slave_IO_State:

Master_Host: server1

Master_User: repl

Master_Port: 3306

Connect_Retry: 60

Master_Log_File: mysql-bin.000001

Read_Master_Log_Pos: 4

Relay_Log_File: mysql-relay-bin.000001

Relay_Log_Pos: 4

Relay_Master_Log_File: mysql-bin.000001

Slave_IO_Running: No

Slave_SQL_Running: No

...omitted...

Seconds_Behind_Master: NULL`

Slave_IO_State, Slave_IO_Running, 和Slave_SQL_Running是No

表明slave还没有开始复制过程。日志的位置为4而不是0,这是因为0只是日志文件的开始位置,并不是日志位置。实际上,MySQL知道的第一个事件的位置是4。

为了开始复制,你可以运行:

`mysql> START SLAVE;

运行SHOW SLAVE STATUS查看输出结果:

mysql> SHOW SLAVE STATUS\G

*************************** 1. row ***************************

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
yaml复制代码         Slave_IO_State: Waiting for master to send event

Master_Host: server1

Master_User: repl

Master_Port: 3306

Connect_Retry: 60

Master_Log_File: mysql-bin.000001

Read_Master_Log_Pos: 164

Relay_Log_File: mysql-relay-bin.000001

Relay_Log_Pos: 164

Relay_Master_Log_File: mysql-bin.000001

Slave_IO_Running: Yes

Slave_SQL_Running: Yes

...omitted...

Seconds_Behind_Master: 0`

在这里主要是看:

Slave_IO_Running=Yes Slave_SQL_Running=Yes

slave的I/O和SQL线程都已经开始运行,而且Seconds_Behind_Master不再是NULL。日志的位置增加了,意味着一些事件被获取并执行了。如果你在master上进行修改,你可以在slave上看到各种日志文件的位置的变化,同样,你也可以看到数据库中数据的变化。

你可查看master和slave上线程的状态。在master上,你可以看到slave的I/O线程创建的连接:

在master上输入show processlist\G;

image.png

行2为处理slave的I/O线程的连接。

在slave服务器上运行该语句:

image.png

行1为I/O线程状态,行2为SQL线程状态。

2.5、添加新slave服务器

假如master已经运行很久了,想对新安装的slave进行数据同步,甚至它没有master的数据。

此时,有几种方法可以使slave从另一个服务开始,例如,从master拷贝数据,从另一个slave克隆,从最近的备份开始一个slave。Slave与master同步时,需要三样东西:

(1)master的某个时刻的数据快照;

(2)master当前的日志文件、以及生成快照时的字节偏移。这两个值可以叫做日志文件坐标(log file coordinate),因为它们确定了一个二进制日志的位置,你可以用SHOW MASTER STATUS命令找到日志文件的坐标;

(3)master的二进制日志文件。

可以通过以下几中方法来克隆一个slave:

(1)冷拷贝(cold copy)

停止master,将master的文件拷贝到slave;然后重启master。缺点很明显。

(2)热拷贝(warm copy)

如果你仅使用MyISAM表,你可以使用mysqlhotcopy拷贝,即使服务器正在运行。

(3)使用mysqldump

使用mysqldump来得到一个数据快照可分为以下几步:

<1>锁表:如果你还没有锁表,你应该对表加锁,防止其它连接修改数据库,否则,你得到的数据可以是不一致的。如下:
mysql> FLUSH TABLES WITH READ LOCK;
< 2>在另一个连接用mysqldump创建一个你想进行复制的数据库的转储:
shell> mysqldump –all-databases –lock-all-tables >dbdump.db
< 3>对表释放锁。
mysql> UNLOCK TABLES;

深入了解复制

已经讨论了关于复制的一些基本东西,下面深入讨论一下复制。

3.1、基于语句的复制(Statement-Based Replication)

MySQL 5.0及之前的版本仅支持基于语句的复制(也叫做逻辑复制,logical replication),这在数据库并不常见。master记录下改变数据的查询,然后,slave从中继日志中读取事件,并执行它,这些SQL语句与master执行的语句一样。

这种方式的优点就是实现简单。此外,基于语句的复制的二进制日志可以很好的进行压缩,而且日志的数据量也较小,占用带宽少——例如,一个更新GB的数据的查询仅需要几十个字节的二进制日志。而mysqlbinlog对于基于语句的日志处理十分方便。

但是,基于语句的复制并不是像它看起来那么简单,因为一些查询语句依赖于master的特定条件,例如,master与slave可能有不同的时间。所以,MySQL的二进制日志的格式不仅仅是查询语句,还包括一些元数据信息,例如,当前的时间戳。即使如此,还是有一些语句,比如,CURRENT USER函数,不能正确的进行复制。此外,存储过程和触发器也是一个问题。

另外一个问题就是基于语句的复制必须是串行化的。这要求大量特殊的代码,配置,例如InnoDB的next-key锁等。并不是所有的存储引擎都支持基于语句的复制。

3.2、基于记录的复制(Row-Based Replication)

MySQL增加基于记录的复制,在二进制日志中记录下实际数据的改变,这与其它一些DBMS的实现方式类似。这种方式有优点,也有缺点。优点就是可以对任何语句都能正确工作,一些语句的效率更高。主要的缺点就是二进制日志可能会很大,而且不直观,所以,你不能使用mysqlbinlog来查看二进制日志。
对于一些语句,基于记录的复制能够更有效的工作,如:

mysql> INSERT INTO summary_table(col1, col2, sum_col3) -> SELECT col1, col2, sum(col3) -> FROM enormous_table -> GROUP BY col1, col2;

假设,只有三种唯一的col1和col2的组合,但是,该查询会扫描原表的许多行,却仅返回三条记录。此时,基于记录的复制效率更高。

另一方面,下面的语句,基于语句的复制更有效:

mysql> UPDATE enormous_table SET col1 = 0;
此时使用基于记录的复制代价会非常高。由于两种方式不能对所有情况都能很好的处理,所以,MySQL 5.1支持在基于语句的复制和基于记录的复制之前动态交换。你可以通过设置session变量binlog_format来进行控制。

3.3、复制相关的文件

除了二进制日志和中继日志文件外,还有其它一些与复制相关的文件。

如下:

(1)mysql-bin.index

服务器一旦开启二进制日志,会产生一个与二日志文件同名,但是以.index结尾的文件。它用于跟踪磁盘上存在哪些二进制日志文件。MySQL用它来定位二进制日志文件。它的内容如下(我的机器上):

image.png

(2)mysql-relay-bin.index

该文件的功能与mysql-bin.index类似,但是它是针对中继日志,而不是二进制日志。内容如下:
.\mysql-02-relay-bin.000017
.\mysql-02-relay-bin.000018

(3)master.info

保存master的相关信息。不要删除它,否则,slave重启后不能连接master。内容如下(我的机器上):

image.png

I/O线程更新master.info文件,内容如下(我的机器上):

image.png

(4)relay-log.info

包含slave中当前二进制日志和中继日志的信息。

3.4、发送复制事件到其它slave

当设置log_slave_updates时,你可以让slave扮演其它slave的master。此时,slave把SQL线程执行的事件写进行自己的二进制日志(binary log),然后,它的slave可以获取这些事件并执行它。如下:

image.png

3.5、复制过滤(Replication Filters)

复制过滤可以让你只复制服务器中的一部分数据,有两种复制过滤:在master上过滤二进制日志中的事件;在slave上过滤中继日志中的事件。如下:

image.png

复制的常用拓扑结构

复制的体系结构有以下一些基本原则:

(1)每个slave只能有一个master;

(2)每个slave只能有一个唯一的服务器ID;

(3)每个master可以有很多slave;

(4)如果你设置log_slave_updates,slave可以是其它slave的master,从而扩散master的更新。

MySQL不支持多主服务器复制(Multimaster Replication)——即一个slave可以有多个master。但是,通过一些简单的组合,我们却可以建立灵活而强大的复制体系结构。

4.1、单一master和多slave

由一个master和一个slave组成复制系统是最简单的情况。Slave之间并不相互通信,只能与master进行通信。如下:

image.png

如果写操作较少,而读操作很时,可以采取这种结构。你可以将读操作分布到其它的slave,从而减小master的压力。但是,当slave增加到一定数量时,slave对master的负载以及网络带宽都会成为一个严重的问题。
这种结构虽然简单,但是,它却非常灵活,足够满足大多数应用需求。一些建议:

(1)不同的slave扮演不同的作用(例如使用不同的索引,或者不同的存储引擎);

(2)用一个slave作为备用master,只进行复制;

(3)用一个远程的slave,用于灾难恢复;

4.2、主动模式的Master-Master(Master-Master in Active-Active Mode)

Master-Master复制的两台服务器,既是master,又是另一台服务器的slave。如图:

image.png

主动的Master-Master复制有一些特殊的用处。例如,地理上分布的两个部分都需要自己的可写的数据副本。这种结构最大的问题就是更新冲突。假设一个表只有一行(一列)的数据,其值为1,如果两个服务器分别同时执行如下语句:
在第一个服务器上执行:

mysql> UPDATE tbl SET col=col + 1;
在第二个服务器上执行:
mysql> UPDATE tbl SET col=col * 2;

那么结果是多少呢?一台服务器是4,另一个服务器是3,但是,这并不会产生错误。
实际上,MySQL并不支持其它一些DBMS支持的多主服务器复制(Multimaster Replication),这是MySQL的复制功能很大的一个限制(多主服务器的难点在于解决更新冲突),但是,如果你实在有这种需求,你可以采用MySQL Cluster,以及将Cluster和Replication结合起来,可以建立强大的高性能的数据库平台。但是,可以通过其它一些方式来模拟这种多主服务器的复制。

4.3、主动-被动模式的Master-Master(Master-Master in Active-Passive Mode)

这是master-master结构变化而来的,它避免了M-M的缺点,实际上,这是一种具有容错和高可用性的系统。它的不同点在于其中一个服务只能进行只读操作。如图:

image.png

4.4、带从服务器的Master-Master结构(Master-Master with Slaves)

这种结构的优点就是提供了冗余。在地理上分布的复制结构,它不存在单一节点故障问题,而且还可以将读密集型的请求放到slave上。

最后

我这边整理了一份MySQL相关资料文档、Spring系列全家桶、Java的系统化资料:(包括Java核心知识点、面试专题和20年最新的互联网真题、电子书等)有需要的朋友可以关注公众号【程序媛小琬】即可获取。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

公司禁止JOIN查询怎么办? 场景 解决方案 代码优化:封装

发表于 2021-05-05

场景

很多公司(特别是做电商的)其实都是不允许多表关联查询的,或者严格控制关联的表数量,比如最多关联2、3张表。此时,如果某个需求又确实需要进行关联查询怎么办呢?

比如前端有个页面:

id product_name price user_name user_age
10086 iphone 12 pro 6666 zhangsan 18

很明显,这个页面字段来自两张表:

  • t_product
  • t_user

正常来说,直接这样写SQL即可:

1
2
3
sql复制代码SELECT p.id, p.product_name, p.price, u.user_name, u.user_age
FROM t_product p
LEFT JOIN t_user u ON p.user_id=u.id;

但上面说了,不能关联查询。

解决方案

作为替代方案,可以先从t_product表查出10条数据到内存(t_product作为主表),然后取出10条数据的uid,再调用UserService#listUser(uids),得到对应的userList。此时内存中有10条product,也有10条user,匹配组合即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码public List<ProductExtendsTO> getList(Integer page, Integer pageSize) {
// 1.查询Product
List<Product> productList = listProduct(page, pageSize);
// 2.取出里面的所有uid
List<Long> uids = productList.stream()
.map(Product::getUid)
.collect(Collectors.toList());
// 3.查出userList
List<User> userList = listUser(uids);
// 4.把List转成Map
Map<Long, User> userMap = new HashMap<Long, User>();
for(userList : user){
userMap.put(user.getId(), user);
}

// 组合并返回数据
List<ProductExtendsTO> result = new ArrayList<>();
productList.foreach(product->{
ProductExtendsTO productExtends = new ProductExtendsTO();
BeanUtils.copyProperties(product, productExtends);
// 根据product的uid从userMap获取user(此处省略user null判断)
User user = userMap.get(product.getUid());
productExtends.setUserAge(user.getUserAge());
productExtends.setUserName(user.getUserName());
result.add(productExtends);
});

return result;
}

上面的代码可以优化为(主要第4点):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public List<ProductExtendsTO> getList(Integer page, Integer pageSize) {
// 1.查询Product
List<Product> productList = listProduct(page, pageSize);
// 2.取出里面的所有uid
List<Long> uids = productList.stream()
.map(Product::getUid)
.collect(Collectors.toList());
// 3.查出userList
List<User> userList = listUser(uids);
// 4.把List转成Map(优化了这里)
Map<Long, User> userMap = userList.stream()
.collect(Collectors.toMap(User::getId(), user->user));

// 组合并返回数据
List<ProductExtendsTO> result = new ArrayList<>();
productList.foreach(product->{
ProductExtendsTO productExtends = new ProductExtendsTO();
BeanUtils.copyProperties(product, productExtends);
// 根据product的uid从userMap获取user(此处省略user null判断)
User user = userMap.get(product.getUid());
productExtends.setUserAge(user.getUserAge());
productExtends.setUserName(user.getUserName());
result.add(productExtends);
})

return result;
}

代码优化:封装ConvertUtil

List转Map是非常普遍的需求,Stream API其实还是有点啰嗦(代码太长了),所以我们可以试着封装一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码public final class ConvertUtil {

private ConvertUtil() {
}

/**
* 将List转为Map
*
* @param list 原数据
* @param keyExtractor Key的抽取规则
* @param <K> Key
* @param <V> Value
* @return
*/
public static <K, V> Map<K, V> listToMap(List<V> list,
Function<V, K> keyExtractor) {
if (list == null || list.isEmpty()) {
return new HashMap<>();
}

Map<K, V> map = new HashMap<>(list.size());
for (V element : list) {
// 利用keyExtractor从对象中抽取Key
K key = keyExtractor.apply(element);
// 这里默认key不能为null
if (key == null) {
continue;
}
map.put(key, element);
}

return map;
}
}

除了List转Map,从List中抽取特定字段的需求也是非常普遍的,比如上面代码:

1
2
3
4
java复制代码// 2.取出里面的所有uid(省略null判断)
List<Long> uids = productList.stream()
.map(Product::getUid)
.collect(Collectors.toList());

意思是从productList中抽取uids。为了复用,我们也封装一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
java复制代码public class ConvertUtil {

private ConvertUtil() {
}

/**
* 将List映射为List,比如List<Person> personList转为List<String> nameList
*
* @param originList 原数据
* @param mapper 映射规则
* @param <T> 原数据的元素类型
* @param <R> 新数据的元素类型
* @return
*/
public static <T, R> List<R> resultToList(List<T> originList,
Function<T, R> mapper) {
if (list == null || list.isEmpty()) {
return new ArrayList<>();
}

List<R> newList = new ArrayList<>(originList.size());
for (T originElement : originList) {
R newElement = mapper.apply(originElement);
if (newElement == null) {
continue;
}
newList.add(newElement);
}

return newList;
}

/**
* 将List转为Map
*
* @param list 原数据
* @param keyExtractor Key的抽取规则
* @param <K> Key
* @param <V> Value
* @return
*/
public static <K, V> Map<K, V> listToMap(List<V> list,
Function<V, K> keyExtractor) {
if (list == null || list.isEmpty()) {
return new HashMap<>();
}

Map<K, V> map = new HashMap<>(list.size());
for (V element : list) {
K key = keyExtractor.apply(element);
if (key == null) {
continue;
}
map.put(key, element);
}

return map;
}
}

上面权当抛砖引玉,大家可以基于实际需求自行扩展ConvertUtil,让它更好用。

总结:

  • List转Map,重点是传入Map中Key的抽取规则,也就是KeyExtractor,用了函数式接口
  • List抽取FieldList,重点也是定义字段的抽取规则,也用了函数式接口

其他解决策略

有时遇到复杂的统计报表等数据,很难通过上面“内存关联”的方式完成需求,此时可以让公司的大数据部门提供接口,直接从大数据那边获取数据。但这个并不需要我们操心:小公司适当关联查询无伤大雅,大公司一般都有大数据部门。

五一假期最后一天,收拾收拾心情准备回杭搬砖。

我是bravo1988,下次见。

よろしく・つづく

我昨晚梦见你了.gif

往期文章:

漫画:从JVM锁扯到Redis分布式锁

深入浅出Java线程基础

深入浅出Java注解

Tomcat外传:孤独的小猫咪

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…675676677…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%