开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

LinkedBlockingQueue 和 Concurre

发表于 2020-08-19
  1. 简单的开篇

LinkedBlockingQueue 和 ConcurrentLinkedQueue 是 Java 高并发场景中最常使用的队列。尽管这两个队列经常被用作并发场景的数据结构,但它们之间仍有细微的特征和行为差异。
在这篇文章中,我将和大家一起探讨这两者之间的异同点。欢迎大家在留言讨论~

  1. LinkedBlockingQueue

首先 LinkedBlockingQueue 是一个 “可选且有界” 的阻塞队列实现,你可以根据需要指定队列的大小。
接下来,我将创建一个LinkedBlockingQueue,它最多可以包含100个元素:

1
java复制代码BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(100);

当然,我们也可以通过不指定大小,来创建一个无界的 LinkedBlockingQueue:

1
java复制代码BlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>();

无界队列表示在创建时未指定队列的大小。因此,队列可以随着元素的添加而动态增长。但是,如果没有剩余内存,则队列将抛出 java.lang.OutOfMemory 错误。

这里留下一个问题给大家思考: 创建无界队列是好还是坏呢?

我们还可以从现有的集合来创建 LinkedBlockingQueue:

1
2
java复制代码Collection<Integer> listOfNumbers = Arrays.asList(1,2,3,4,5);
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(listOfNumbers);

LinkedBlockingQueue 实现了BlockingQueue接口,该接口为它提供了阻塞性质。

阻塞队列表示如果访问线程已满(当队列有界时)或变为空,则队列将阻塞该线程。如果队列已满,则添加新元素将阻塞访问线程,除非新元素有可用空间。类似地,如果队列为空,则访问元素会阻塞调用线程:

1
2
3
4
5
6
7
8
9
10
java复制代码ExecutorService executorService = Executors.newFixedThreadPool(1);
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
executorService.submit(() -> {
  try {
    queue.take();
  }
  catch (InterruptedException e) {
    // exception handling
  }
});

在上面的代码片段中,我们正在访问一个空队列。因此,take() 方法阻塞调用线程。

LinkedBlockingQueue 的阻塞特性与一些开销相关。这个代价是因为每个put或take操作在生产者线程或使用者线程之间都是锁争用的。因此,在许多生产者和消费者的情况下,put和take 动作可能会慢一些。

  1. ConcurrentLinkedQueue

首先声明,ConcurrentLinkedQueue 是一个无边界、线程安全且无阻塞的队列

创建一个空的 ConcurrentLinkedQueue:

1
java复制代码ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();

同上面一样,我们也可以从现有集合创建 ConcurrentLinkedQueue:

1
2
java复制代码Collection<Integer> listOfNumbers = Arrays.asList(1,2,3,4,5);
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(listOfNumbers);

不同于 LinkedBlockingQueue, *ConcurrentLinkedQueue是非阻塞的队列。因此,即使队列为空(empty),它也不会阻塞线程。相反,它会返回 空(null) 。虽然它是无界的,但如果没有额外的内存来添加新元素,它依旧会抛出 *java.lang.OutOfMemory 错误。
除了非阻塞之外,ConcurrentLinkedQueue还有其他特性。
在任何生产者-消费者场景中,消费者都不会满足于生产者;但是,多个生产者将相互竞争:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码int element = 1;
ExecutorService executorService = Executors.newFixedThreadPool(2);
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
 
Runnable offerTask = () -> queue.offer(element);
 
Callable<Integer> pollTask = () -> {
  while (queue.peek() != null) {
    return queue.poll().intValue();
  }
  return null;
};
 
executorService.submit(offerTask);
Future<Integer> returnedElement = executorService.submit(pollTask);
assertThat(returnedElement.get().intValue(), is(equalTo(element)));

第一个任务 offerTask 向队列中添加元素,第二个任务 pollTask 从队列中检索元素。pollTask 首先检查队列中的元素,因为ConcurrentLinkedQueue是非阻塞的,并且可以返回null值。

  1. 求同

LinkedBlockingQueue 和 ConcurrentLinkedQueue 都是队列实现,并具有一些共同特征。让我们讨论一下这两个队列的相似之处:

  1. 都 实现 Queue 接口
  2. 它们都使用 linked nodes 存储节点
  3. 都适用于并发访问场景
  1. 存异

尽管这两种队列都有某些相似之处,但也有一些实质性的特征差异:

特性 LinkedBlockingQueue ConcurrentLinkedQueue
阻塞性 阻塞队列,并实现blocking queue接口 非阻塞队列,不实现blocking queue接口
队列大小 可选的有界队列,这意味着可以在创建期间定义队列大小 无边界队列,并且没有在创建期间指定队列大小的规定
锁特性 基于锁的队列 无锁队列
算法 锁的实现基于 “双锁队列(two lock queue)” 算法 依赖于Michael&Scott算法来实现无阻塞、无锁队列
实现 在 双锁队列 算法机制中,LinkedBlockingQueue使用两种不同的锁,putLock和takeLock。put/take操作使用第一个锁类型,take/poll操作使用另一个锁类型 使用CAS(Compare And Swap)进行操作
阻塞行为 当队列为空时,它会阻塞访问线程 当队列为空时返回 null,它不会阻塞访问线程
  1. 总结才能进步

首先,我们分别讨论了这两种队列实现及其一些特性、相似性、以及它们之间的差异。这样的比较,是否让你对这两种队列有了更深刻的印象?

关注公众号: 锅外的大佬
千河流银的博客

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

抽丝剥茧Kotlin - 协程

发表于 2020-08-19

技术不止,文章有料,关注公众号 九心说,每周一篇高质好文,和九心在大厂路上肩并肩。

前言

文章接上篇,这一篇我们好好聊一聊协程的原理,通过上一篇的学习,相信大家对于如何使用协程已经非常熟悉了。

故事还得从上次的协程分享开始,由于大家对协程的实践并不多,所以大家对下面的这段代码如何执行争论不休:

1
2
3
4
5
6
7
8
9
10
11
12
Kotlin复制代码GlobalScope.launch {
val a = async {
1+2
}

val b = async {
1+3
}

val c = a + b
Log.e(TAG,"result:$c")
}

有人说,a 和 b 会串行执行,有人说,a 和 b 会并行执行,那么执行的结果到底是什么样的?我们将在下面的文章给出。

悲伤的故事

本个系列文章分为三篇,本文是第二篇:

《即学即用Kotlin - 协程》

《抽丝剥茧Kotlin - 协程基础篇》

《抽丝剥茧Kotlin - 协程Flow篇》

一、结构简要介绍

首先,我们得明确协程中有哪些东西,如果你会使用协程,那你肯定知道协程中有 CoroutineScope、CoroutineContext 和 CoroutineDispatcher,这些都是使用过程中我们可以接触到的 API。

我简单的整理了协程中主要的基础类:

类结构

协程的类结构可分为三部分:CoroutineScope、CoroutineContext 和 Continuation。

1. Continuation

如果你会使用协程,那你肯定知道,协程遇到耗时 suspend 操作可以挂起,等到任务结束的时候,协程会自动切回来。

它的奥秘就是 Continuation,Continuation 可以理解程续体,你可以理解其每次在协程挂起点将剩余的代码包括起来,等到结束以后执行剩余的内容。一个协程的代码块可能会被切割成若干个 Continuation,在每个需要挂起的地方都会分配一个 Continuation。

先抛出一些结论,协程在做耗时操作的时候,如果执行了耗时 suspend 操作,会自动挂起,但是这个耗时操作终究是要做的,只不过切换到其他线程去做了,做完以后协程就需要切回来,但是切到哪儿呢?这便是 Continuation 需要解决的问题。

Continuation 的流程是这样的:

Continuation流程

无论是使用 launch 还是 async 启动的协程,都会有一个结束的时候用来回调的 continuation。

2. CoroutineScope

关于 CoroutineScope 没有特别多要说的,它持有了 CoroutineContext,主要对协程的生命周期进行管理。

3. CoroutineContext

一开始看 CoroutineContext 觉得特别晕,不明白为啥要这么设计,看了 Bennyhuo 大佬的文章以后才稍微好转。

从上面协程的类的机构中可以看出,光看这个 CoroutineContext 这个接口(源码内容我们下面讲),会发现它有点像 List 集合,而继承自 CoroutineContext 接口的 Element 接口则定义了其中的元素。

随后,这个 Element 接口被划分成了两种类,Job 和 ContinuationInterceptor:

  • Job:从字面上来讲,它代表一个任务,Thread 也是执行任务,所以我们可以理解它定义了协程的一些东西,比如协程的状态,协程和子协程的管理方式等等。
  • ContinuationInterceptor:也从字面上来看,它是 Continuation 的拦截器,通过拦截 Continuation,完成我们想要完成的工作,比如说线程的切换。

二、结构源码分析

上面我们从概念上介绍了协程的三大件,在这部分,我们从源码分析。

1. Continuation

suspend 修饰的方法会在在编译期间被编译器做特殊处理,这种处理被成为CPS(续体转换风格) 转化,suspend 方法会被包裹成 Continuation。

说了这么久的 Continuation,我们还没有见过接口代码,由于接口内容不多,我就把所有的内容贴出来了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Kotlin复制代码/**
* Interface representing a continuation after a suspension point that returns a value of type `T`.
*/
@SinceKotlin("1.3")
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext

/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}

我们重点关注Continuation#resumeWith()方法,从注释来看,通过返回 suspend 挂起点的值来恢复协程的执行,协程可以从参数 Result<T>) 获取成功的值或者失败的结果,如果没有结果,那么 Result<T> 的泛型是 Unit。Resulut 这个类也特别简单,感兴趣的同学可以查看源码。

BaseContinuationImpl 实现了 Continuation 接口,我们看一下 Continuation#resumeWith 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Kotlin复制代码internal abstract class BaseContinuationImpl(
// 完成后调用的 Continuation
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {

public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// 1. 执行 suspend 中的代码块
val outcome = invokeSuspend(param)
// 2. 如果代码挂起就提前返回
if (outcome === COROUTINE_SUSPENDED) return
// 3. 返回结果
Result.success(outcome)
} catch (exception: Throwable) {
// 3. 返回失败结果
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// 4. 如果 completion 中还有子 completion,递归
current = completion
param = outcome
} else {
// 5. 结果通知
completion.resumeWith(outcome)
return
}
}
}
}
}

主要的过程我在注释中已经标注出来了,我来解释一下 Continuation 的机制。

每个 suspend 方法生成的 BaseContinuationImpl ,其构造方法有一个参数叫 completion ,它也是一个 Continuation,它的调用时机是在 suspen 方法执行完毕的时候。我们后面称

Job流程

这个流程展示给我们的内容很直观了,简单起见,我们直接看3、4和5这一个 launch 启动流程就好,通常一个 launch 生成一个外层 Continuation一个相应的结果 Continuation,我们后面称结果 continuation 为 complete,Continuation 调用顺序是:

  1. 调用外层 Continuation 中的 Continuation#resumeWith() 方法。
  2. 该方法会去执行 launch 包裹的代码块,并返回一个结果。
  3. 将上述代码块执行的结果交给 completion,由它完成协程结束的通知。

上述的过程只存在于一个 launch 并且里面没有执行其他耗时的挂起操作,对于这些情况,我们将会在下面的文章讨论。

抛出问题一: 可以看到,在注释2,遇到耗时的 suspend,返回的结果是一个 COROUTINE_SUSPENDED,后面会直接返回,耗时操作结束的时候,我们的 completion 怎么恢复呢?

2. CoroutineContext 和 Element

在概要分析的时候,我们说 CoroutineContext 的结构像一个集合,是从它的接口得出结论的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Kotlin复制代码public interface CoroutineContext {
// get 方法,通过 key 获取
public operator fun <E : Element> get(key: Key<E>): E?
// 累加操作
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
// 操作符 + , 实际的实现调用了 fold 方法
public operator fun plus(context: CoroutineContext): CoroutineContext
// 移除操作
public fun minusKey(key: Key<*>): CoroutineContext

// CoroutineContext 定义的 Key
public interface Key<E : Element>

// CoroutineContext 中元素的定义
public interface Element : CoroutineContext {
// key
public val key: Key<*>
//...
}
}

从中我们可以大致看出,CoroutineContext 中可以通过 Key 来获取元素 Element,并且 Element 接口也是继承自 CoroutineContext 接口。

除此以外,CoroutineContext 支持增加和移除操作,并且支持 + 操作符来完成增加。+ 操作符即 plus 方法是有具体实现的,感兴趣的可以自己看一下,主要涉及到了拦截器 ContinuationInterceptor 的添加。

1.1 Job

Job 的注释中阐述定义是这样的:

A background job. Conceptually, a job is a cancellable thing with a life-cycle that culminates in its completion.

从中我们可以得出:

  1. 后台任务
  2. 可取消
  3. 生命周期在完成它的时候结束

从后台任务的角度来看,Job 听着有点像 Thread,和 Thread 一样,Job 也有各种状态,文档中对 Job 各种状态的注释(感觉大佬们的注释写的真棒~):

注释

Job 另一个值得关注的点是对子 Job 的管理,主要的规则如下:

  1. 子 Job都会结束的时候,父 Job 才会结束
  2. 父 Job 取消的时候,子 Job 也会取消

上述的一些内容都可以从 Job 的接口文档中得出。那么,Job哪里来的?如果你看一下CoroutineScope#launch方法,你就会得出结论,该方法的返回类型就是 Job,我们每次调用该方法,都会创建一个 Job。

1.2 ContinuationInterceptor

顾名思义,Continuation 拦截器,先看接口:

1
2
3
4
5
6
7
8
9
10
kotlin复制代码interface ContinuationInterceptor : CoroutineContext.Element {
// ContinuationInterceptor 在 CoroutineContext 中的 Key
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
/**
* 拦截 continuation
*/
fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>

//...
}

这个接口可以提炼的就这两个信息:

  1. 拦截器的 Key,也就是说,无论你后面一个 CoroutineContext 放了多少个拦截器,Key 为 ContinuationInterceptor 的拦截器只能有一个。
  2. 我们都知道,Continuation 在调用其 Continuation#resumeWith() 方法,会执行其 suspend 修饰的函数的代码块,如果我们提前拦截到,是不是可以做点其他事情,比如说切换线程,这也是 ContinuationInterceptor 的作用之一。

需要说明一下,我们通过 Dispatchers 来指定协程发生的线程,Dispatchers 实现了 ContinuationInterceptor接口。

3. CoroutineScope

CoroutineScope 的接口很简单:

1
2
3
Kotlin复制代码public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}

它要求后续的实现都要提供 CoroutineContext,不过我们都知道,CoroutineContext 是协程中很重要的东西,既包括 Job,也包括调度器。

在上面的代码中,我多次使用了 Android Jetpack 中的 Lifecycle 中协程的扩展库,好处我们获取 CoroutineScope 更加简单,无需在组件 onDestroy 的时候手动 cancel,并且它的源码超级简单,前提是你会使用 Lifecycle:

1
2
3
4
5
6
7
8
9
10
11
12
13
Kotlin复制代码internal class LifecycleCoroutineScopeImpl(
override val lifecycle: Lifecycle,
override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {
// ...

override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
lifecycle.removeObserver(this)
coroutineContext.cancel()
}
}
}

并且它也支持你在指定的生命周期调用协程,大家看一下接口就明白了。

三、过程源码分析

先上一段使用代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Kotlin复制代码lifecycleScope.launch(Dispatchers.Main) {
val a = async { getResult(1, 2) }
val b = async { getResult(3, 5) }

val c = a.await() + b.await()
Log.e(TAG, "result:$c")
}

suspend fun getResult(a: Int, b: Int): Int {
return withContext(Dispatchers.IO) {
delay(1000)
return@withContext a + b
}
}

虽然代码很简单,但是源码还是比较复杂的,我们分步讲。

第一步 获取 CoroutineScope

我已经在上面说明了,我们使用的 Lifecycle 的协程拓展库,如果我们不使用拓展库,就得使用 MainScope,它们的 CoroutineContext 都是一样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Kotlin复制代码public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

// LifecycleCoroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
// ...
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main.immediate
)
// ...
return newScope
}
}

显而易见,MainScope 和 LifecycleCoroutineScope 都使用了 SupervisorJob() + Dispatchers.Main, 作为它们的 CoroutineContext。

说明一下,SupervisorJob 和Dispatchers.Main 很重要,它们分别代表了CoroutineContext 之前提及的 Job 和 ContinuationInterceptor,后面用到的时候再分析。

第二步 启动协程

直接进入 CoroutineScope#launch() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
Kotlin复制代码public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

上面的方法一共有三个参数,前两个不作过多介绍,第三个参数:

1
kotlin复制代码block: suspend CoroutineScope.() -> Unit)

这是一个方法,是一个 lambda 参数,同时也表明了它需要被 suspend 修饰。 继续看 launch 方法,发现它主要做了两件事:

  1. 组合新的 CoroutineContext
  2. 再创建一个 Continuation

组合新的CoroutineContext

在第一行代码 val newContext = newCoroutineContext(context) 做了第一件事,这里的 newCoroutineContext(context) 是一个扩展方法:

1
2
3
4
5
6
kotlin复制代码public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}

CoroutineScope 使用本身的 coroutineContext 集合,利用 + 操作符将我们在 launch 方法中提供的 coroutineContext 添加进来。

再创建一个Continuation

回到上一段代码,通常我们不会指定 start 参数,所以它会使用默认的 CoroutineStart.DEFAULT,最终 coroutine 会得到一个 StandaloneCoroutine。

StandaloneCoroutine 实现自 AbstractCoroutine,翻开上面的类图,你会发现,它实现了 Continuation、Job 和 CoroutineScope 等一堆接口。需要说明一下,这个 StandaloneCoroutine 其实是我们当前 Suspend Contination 的 complete。

接着会调用

1
Kotlin复制代码coroutine.start(start, coroutine, block)

这就表明协程开始启动了。

第三步 start

进入到 AbstractCoroutine#start 方法:

1
2
3
4
Kotlin复制代码public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}

跳过层层嵌套,最后到达了:

1
2
3
4
5
6
7
8
9
kotlin复制代码internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 外面再包一层 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做拦截处理
.intercepted()
// 调用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}

虽然这仅仅是一个函数,但是后面主要的逻辑都揭露了:

  1. 创建一个没有拦截过的 Continuation。
  2. 拦截 Continuation。
  3. 执行 Continuation#resumeWith 方法。

第四步 又创建 Continuation

我这里用了 又,因为我们在 launch 中已经创建了一个 AbstractContinuaion,不过它是一个 complete,从各个函数的行参就可以看出来。

不过我们 suspend 修饰的外层 Continuation 还没有创建,它来了,是 SuspendLambda,它继承自 ContinuationImpl,如果你问我为什么源码中没找到具体实现,我觉得可能跟 suspend 修饰符有关,由编译器处理,但是调用栈确实是这样的:

调用栈

看一下 SuspendLambda 类的实现:

1
2
3
4
5
6
7
Kotlin复制代码internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
constructor(arity: Int) : this(arity, null)
//...
}

可以看到,它的构造方法的形参就包括一个 complete。

第五步 拦截处理

回到:

1
2
3
4
5
6
7
8
9
kotlin复制代码internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 外面再包一层 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做拦截处理
.intercepted()
// 调用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}

里面的拦截方法 Continuation#intercepted() 方法是一个扩展方法:

1
2
3
Kotlin复制代码@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this

createCoroutineUnintercepted(receiver, completion) 返回的是一个 SuspendLambda,所以它肯定是一个 ContinuationImpl,看一下它的拦截方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Kotlin复制代码internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)

public override val context: CoroutineContext
get() = _context!!

public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
// ...
}

在 ContinuationImpl#intercepted()方法中,直接利用 context 这个数据结构通过 context[ContinuationInterceptor] 获取拦截器。

CoroutineDispatcher拦截实现

我们都知道 ContinuationInterceptor 具有拦截作用,它的直接实现是 CoroutineDispatcher 这个抽象类,所有其他调度器都直接或者间接继承这个类,我们关注一下它的拦截方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Kotlin复制代码public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
//...
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
// 1.拦截的 Continuation 被包了一层 DispatchedContinuation
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
//...
}

internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
// ...
override fun resumeWith(result: Result<T>) {
// ...
if (dispatcher.isDispatchNeeded(context)) {
// 2. 后面一个参数需要提供 Runnable,父类已经实现
dispatcher.dispatch(context, this)
}
//...
}
// ...
}

// SchedulerTask 是一个 Runnable
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
// ...
public final override fun run() {
// ...
try {
//...
withCoroutineContext(context, delegate.countOrElement) {
// 3. continuation 是 DispatchedContinuation 包裹的 continuation
continuation.resume(...)
}
}
//...
}
}

简单来说,就是对原有的 Continuation 的 resumeWith 操作加了一层拦截,就像这样:

拦截流程

加入 CoroutineDispatcher 以后,执行真正的 Continue#resumeWith() 之前,会执行 CoroutineDispatcher#dispatch() 方法,所以我们现在关注 CoroutineDispatcher#dispatch 具体实现即可。

讲一个CoroutineDispatcher具体实现

首先我们得明确这个 CoroutineDispatcher 来自哪里?它从 context 获取,context来自哪里?

注意 SuspendLambda 和 ContinuationImpl 的构造方法,SuspendLambda 中的参数没有 CoroutineContext,所以只能来自 completion 中的 CoroutineContext,而completion 的 CoroutineContext 来自 launch 方法中来自 CoroutineScope,默认是 SupervisorJob() + Dispatchers.Main,不过只有 Dispatchers.Main 继承了 CoroutineDispatcher。

Dispatchers.Main 是一个 MainCoroutineDispatcher,Android 中对应的 MainCoroutineDispatcher 是 HandlerContext:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Kotlin复制代码internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)

//...

override fun dispatch(context: CoroutineContext, block: Runnable) {
// 利用主线程的 Handler 执行任务
handler.post(block)
}

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
// 利用主线程的 Handler 延迟执行任务,将完成的 continuation 放在任务中执行
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}

//..
}

重点来了,调度任务最后竟然交给了主线程的 Handler,其实想想也对,主线程的任务最后一般都会交给主线程的 Handler。

好奇的同学可能问了,如果不是主线程呢?不是主线程就利用的线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Kotlin复制代码public open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long,
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
// 执行期
override val executor: Executor
get() = coroutineScheduler
private var coroutineScheduler = createScheduler()

override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
DefaultExecutor.dispatch(context, block)
}
}

结果可以说是很清晰了,coroutineScheduler 是一个线程池,如果像了解具体的过程,同学们可以自行查看代码。

读到这里,你可能有一点明白 CoroutineContext 为什么要设计成一种数据结构:

  1. coroutineContext[ContinuationInterceptor] 就可以直接取到当前协程的拦截器,并且一个协程只能对应一个调度器。
  2. 调度器都放在其他 coroutineContext 的前面,所以在执行协程的时候,可以做拦截处理。

同理,我们也可以使用 coroutineContext[Job] 获取当前协程。

第六步 resumeWith

再次回到:

1
2
3
4
5
6
7
8
9
Kotlin复制代码internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 外面再包一层 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做拦截处理
.intercepted()
// 调用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}

现在我们看 Continue#resumeCancellableWith() 方法,它是一个扩展方法,里面的调度逻辑是:

  1. DispatchContinuation#resumeCancellableWith
  2. CoroutineDispatcher#dispatch
  3. Continuation#resumeWith

这里的 Continuation 就是 SuspendLambda,它继承了 BaseContinuationImpl,我们看一下它的实现方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Kotlin复制代码internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// 1. 执行 suspend 里面的代码块
val outcome = invokeSuspend(param)
// 2. 如果代码块里面执行了挂起方法,会提前返回
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// 3. 如果完成的completion也是BaseContinuationImpl,就会进入循环
current = completion
param = outcome
} else {
// 4. 执行 completion resumeWith 方法
completion.resumeWith(outcome)
return
}
}
}
}
}

这边被我分为2个部分:

  • 执行 suspend 方法,并获取结果
  • 调用 complete(放在下一步讲)

执行suspend方法

在第一处会先执行 suspend 修饰的方法内容,在方法里面可能又会调度 suspend 方法,比如说我们的实例方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Kotlin复制代码lifecycleScope.launch(Dispatchers.Main) {
val a = async { getResult(1, 2) }
val b = async { getResult(3, 5) }

val c = a.await() + b.await()
Log.e(TAG, "result:$c")
}

suspend fun getResult(a: Int, b: Int): Int {
return withContext(Dispatchers.IO) {
delay(1000)
return@withContext a + b
}
}

因为我们在 getResult 执行了延时操作,所以我们 launch 方法肯定执行了耗时挂起方法,所以 BaseContinuationImpl#invokeSuspend 方法会返回一个 COROUTINE_SUSPENDED ,结果你也看到了,该方法会提前结束。(说明一下,我没有找到BaseContinuationImpl#invokeSuspend 方法的具体实现,我猜可能跟编译器有关)

我猜你肯定跟我一样好奇,遇到耗时挂起会提前返回,那么耗时挂起如何对 complete 进行恢复的?

我们看一下 delay(1000) 这个延时操作在主线程是如何处理的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Kotlin复制代码public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}


internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
//...

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}

//...
}

可以看到,将恢复任务包了一个 Runnable,交给 Handler 的 Handler#postDelayed() 方法了。

第七步 complete resumeWith

对于 complete 的处理一般会有两种。

complete是BaseContinuationImpl

第一种情况是我们称之为套娃,完成回调的 Continuation 它本身也有自己的完成回调 Continuation,接下来循环就对了。

调用complete的resumeWith

第二种情况,就是通过 complete 去完成回调,由于 complete 是 AbstractContinuation,我们看一下它的 resumeWith:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Kotlin复制代码public abstract class AbstractCoroutine<in T>(
/**
* The context of the parent coroutine.
*/
@JvmField
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
// ...
public final override fun resumeWith(result: Result<T>) {
// 1. 获取当前协程的技术状态
val state = makeCompletingOnce(result.toState())
// 2. 如果当前还在等待完成,说明还有子协程没有结束
if (state === COMPLETING_WAITING_CHILDREN) return
// 3. 执行结束恢复的方法,默认为空
afterResume(state)
}

// 这是父类 JobSupport 中的 makeCompletingOnce 方法
// 为了方便查看,我复制过来
internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
loopOnState { state ->
// tryMakeCompleting 的内容主要根据是否有子Job做不同处理
val finalState = tryMakeCompleting(state, proposedUpdate)
when {
finalState === COMPLETING_ALREADY ->
throw IllegalStateException(
"Job $this is already complete or completing, " +
"but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
)
finalState === COMPLETING_RETRY -> return@loopOnState
else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
}
}
}
}

这段代码的意思其实也很简单,就是协程即将完成,得先评估一下协程的技术状态,别协程还有东西在运行,就给结束了。对于一些有子协程的一些协程,会等待子协程结束的时候,才会结束当前协程。

一个 launch 的过程大概就是这样了。大致的流程图是这样的:

launch

下面我们再谈谈 async。

四、关于async

async 和 launch 的代码相似度很高:

1
2
3
4
5
6
7
8
9
10
11
12
Kotlin复制代码public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

最终也会进行三步走:

1
2
3
4
5
6
7
8
9
Kotlin复制代码internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 外面再包一层 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做拦截处理
.intercepted()
// 调用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}

不同的是,async 返回的是一个 Deferred<T>,我们需要调用 Deferred#await() 去获取返回结果,它的实现在 JobSupport:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Kotlin复制代码private open class DeferredCoroutine<T>(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
// ... awaitInternal方法来自父类 JobSupport
override suspend fun await(): T = awaitInternal() as T
// ...

// 这是 JobSupport 中的实现
internal suspend fun awaitInternal(): Any? {
// 循环获取结果
while (true) { // lock-free loop on state
val state = this.state
// 1. 如果处于完成状态
if (state !is Incomplete) {
if (state is CompletedExceptionally) { // Slow path to recover stacktrace
recoverAndThrow(state.cause)
}
return state.unboxState()
}
// 2. 除非需要重试,不然就 break
if (startInternal(state) >= 0) break
}
// 等待挂起的方法
return awaitSuspend() // slow-path
}
}

它的具体过程可以从我的注释看出,就不一一介绍了,感兴趣的同学可以查看源码。

1. 本文一开始的讨论

本文一开始的代码是错的,连编译器都过不了,尴尬~

正确的代码应该是:

1
2
3
4
5
6
7
8
9
10
11
12
Kotlin复制代码GlobalScope.launch {
val a = async {
1+2
}

val b = async {
1+3
}

val c = a.await() + bawait()
Log.e(TAG,"result:$c")
}

如果是正确的代码,这里可能分两种情况:

如果你放在UI线程,那肯定是串行的,这时候有人说,我在 a 里使用 delay(1000),在 b 里使用 delay(2000),得到 c 的时候就花了 2000 毫秒啊,这不是并行吗?事情并不是这样的,delay 操作使用了 Handler#postDelay 方法,一个延迟了 1000 毫秒执行,一个延迟了 2000 毫秒执行,但是主线程只有一个,所以只能是串行。

如果是子线程,通常都是并行的,因为我们使用了线程池啊~

总结

写这边源码分析的时候,一些细节总是找不到,比如说 suspendLambda 的子类找不到,自己对 Kotlin 的学习有待深入。

菜

所以本文有些地方还值得商榷,如果你有更好的理解,欢迎下方交流。

前段时间自己生了一点小病,每周都往医院跑,所以导致整个人都很丧,写博客的效率也很低,这篇文章写了快一个月,哎,太难了~,不过,这周开始就恢复正常。

GO

文章参考:

《破解 Kotlin 协程(3) - 协程调度篇》

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

啧啧 都看看 Go 都干了些啥 🏆 技术专题第二期 Go

发表于 2020-08-19

Go 语言都能做些什么呢

大家好,我是好家伙,漫画 Go 语言小册的作者。这次我要给大家带来一些关于Go语言的一些分享。作为一个程序员,身处这个时代,掌握一门技术,就如同修得一身好武功一样,不仅要经过不断地实战经验,还要有足够的热情,循序渐进才能到达高手的境界。在众多开发语言中,如何才能找到一个适合自己修炼的武功秘籍。那就得对这个语言有所了解。知道它的威力,才能更好的掌握,成为高手就指日可待。

Go 语言作为一门后端语言,到底能够有多大威力,具体能够做些什么,到底能够发出什么样的招式才能够吸引你学习它?那么接下来就简单露几招,还望各位大佬们指教!

gocv 计算机视觉


OpenCV是一个基于BSD许可(开源)发行的跨平台计算机视觉和机器学习软件库,可以运行在Linux、Windows、Android和Mac OS操作系统上。实现了图像处理和计算机视觉方面的很多通用算法。
OpenCV用C++语言编写,它具有C ++,Python,Java和MATLAB接口,并支持Windows,Linux,Android和Mac OS,OpenCV主要倾向于实时视觉应用, 如今也提供对于C#、Ch、Ruby,GO的支持也就是gocv。https://gocv.io/

  • 安装 gocv go get -u -d gocv.io/x/gocv
  • 安装 MinGW-W64 https://sourceforge.net/projects/mingw-w64/files/Toolchains%20targetting%20Win32/Personal%20Builds/mingw-builds/7.3.0/
  • 安装CMake https://cmake.org/download/
  • 安装编译gocv
1
2
go复制代码chdir %GOPATH%\src\gocv.io\x\gocv
win_build_opencv.cmd

gocv将图片二值化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
go复制代码package main

import (
"gocv.io/x/gocv"
)

//测试示例
func main() {
filename := "test.png"
window := gocv.NewWindow("Hello")
img := gocv.IMRead(filename, gocv.IMReadColor)
destImage := gocv.NewMat()
//二值化 转为灰度图
gocv.CvtColor(img, &destImage, gocv.ColorBGRToGray)

for {
window.IMShow(destImage)
if window.WaitKey(1) >= 0 {
break
}
}
}

gocv 人脸检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
go复制代码package main

import (
"fmt"
"image"
"image/color"
"os"

"gocv.io/x/gocv"
)

func main() {
if len(os.Args) < 3 {
fmt.Println("How to run:\n\tfacedetect [camera ID] [classifier XML file]")
return
}

// parse args
deviceID := os.Args[1]
xmlFile := os.Args[2]

// open webcam
webcam, err := gocv.OpenVideoCapture(deviceID)
if err != nil {
fmt.Printf("error opening video capture device: %v\n", deviceID)
return
}
defer webcam.Close()

// open display window
window := gocv.NewWindow("Face Detect")
defer window.Close()

// prepare image matrix
img := gocv.NewMat()
defer img.Close()

// color for the rect when faces detected
blue := color.RGBA{0, 0, 255, 0}

// load classifier to recognize faces
classifier := gocv.NewCascadeClassifier()
defer classifier.Close()

if !classifier.Load(xmlFile) {
fmt.Printf("Error reading cascade file: %v\n", xmlFile)
return
}

fmt.Printf("Start reading device: %v\n", deviceID)
for {
if ok := webcam.Read(&img); !ok {
fmt.Printf("Device closed: %v\n", deviceID)
return
}
if img.Empty() {
continue
}

// detect faces
rects := classifier.DetectMultiScale(img)
fmt.Printf("found %d faces\n", len(rects))

// draw a rectangle around each face on the original image,
// along with text identifing as "Human"
for _, r := range rects {
gocv.Rectangle(&img, r, blue, 3)

size := gocv.GetTextSize("Human", gocv.FontHersheyPlain, 1.2, 2)
pt := image.Pt(r.Min.X+(r.Min.X/2)-(size.X/2), r.Min.Y-2)
gocv.PutText(&img, "Human", pt, gocv.FontHersheyPlain, 1.2, blue, 2)
}

// show the image in the window, and wait 1 millisecond
window.IMShow(img)
if window.WaitKey(1) >= 0 {
break
}
}
}

gobot 机器人


gobot 是一个使用Go语言编写的适用于机器人,无人机和物联网的IoT框架。https://gobot.io/

  • 支持35不同平台。
  • 支持输入/输出(GPIO)通讯的设备。
  • 模拟I / O(AIO)驱动程序。
  • I2C驱动程序。
  • SPI驱动器。

树莓派

Raspberry Pi 中文名为树莓派,简写为RPi 只有信用卡大小的微型电脑,其系统基于Linux,它是一款基于ARM的微型电脑主板。可连接键盘、鼠标和网线,具备PC的基本功能。最新版本的树莓派4B,拥有4G内存,引入USB 3.0,支持双屏4K输出,CPU和GPU的速度也更快。

运行在树莓派上的Go程序实现坦克机器人。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
go复制代码package drivers

// import "fmt"

type Motor struct {
PWMA int
AIN1 int
AIN2 int
BIN1 int
BIN2 int
PWMB int
Driver *PCA9685Driver
Debug bool
}

type MotorPosition int

const (
MotorPosition_Left MotorPosition = 0
MotorPosition_Right MotorPosition = 1
MotorPosition_All MotorPosition = -1
)

type MotorDirection int

const (
MotorDirection_Forward MotorDirection = 1
MotorDirection_Backward MotorDirection = -1
)

func NewMotor(driver *PCA9685Driver) *Motor {
motor := &Motor{
PWMA: 0,
AIN1: 1,
AIN2: 2,
BIN1: 3,
BIN2: 4,
PWMB: 5,
Driver: driver,
Debug: false,
}
var PWM float32 = 125
// fmt.Println("设置电机PWM速率:", PWM)
driver.SetPWMFreq(PWM)
return motor
}

// 前进
func (this *Motor) Forward(speed int) {
this.Run(MotorPosition_All, MotorDirection_Forward, speed)
}

//后退
func (this *Motor) Backward(speed int) {
this.Run(MotorPosition_All, MotorDirection_Backward, speed)
}

//左转,原地转向
func (this *Motor) Left(speed int) {
this.Run(MotorPosition_Left, MotorDirection_Backward, speed)
this.Run(MotorPosition_Right, MotorDirection_Forward, speed)
}

//右转,原地转向
func (this *Motor) Right(speed int) {
this.Run(MotorPosition_Right, MotorDirection_Backward, speed)
this.Run(MotorPosition_Left, MotorDirection_Forward, speed)
}

//直接操作电机运转
func (this *Motor) Run(motor MotorPosition, direction MotorDirection, speed int) {
if speed > 100 {
speed = 100
}
//同时操作所有电机
if motor == MotorPosition_All {
this.Run(MotorPosition_Left, direction, speed)
this.Run(MotorPosition_Right, direction, speed)
return
}
//设置默认PWM调速通道为电机a的调速通道
pwmChannel := this.PWMA
//设置默认操作电机为A
PIN1 := this.AIN1
PIN2 := this.AIN2
//根据参数设置操作电机和调速通道为电机B
if motor == MotorPosition_Right {
pwmChannel = this.PWMB
PIN1 = this.BIN1
PIN2 = this.BIN2
}

//如果参数为后退,翻转PIN1和PIN2的位置
if direction == MotorDirection_Backward {
PIN1, PIN2 = PIN2, PIN1
}
// fmt.Println("pwmChannel:", pwmChannel, ",speed:", speed, ",PIN1:", PIN1, ",PIN2:", PIN2)
//设置速度
this.Driver.SetPWM(pwmChannel, 0, uint16(speed*(4096/100)))

//设置正电极
this.Driver.SetPWM(PIN1, 0, 4095)

//设置负电极
this.Driver.SetPWM(PIN2, 0, 0)

}

// func (this *Motor) SetPWM(channel int, on uint16, off uint16) (err error) {
// this.Driver
// if _, err := p.connection.Write([]byte{byte(PCA9685_LED0_ON_L + 4*channel), byte(on), byte(on >> 8), byte(off), byte(off >> 8)}); err != nil {
// return err
// }

// return
// }

func (this *Motor) MotorStop(motor MotorPosition) {
if motor == MotorPosition_All {
this.MotorStop(MotorPosition_Left)
this.MotorStop(MotorPosition_Right)
return
}
//设置默认PWM调速通道为电机a的调速通道
pwmChannel := this.PWMA
if motor == MotorPosition_Right {
pwmChannel = this.PWMB
}
//设置速度
this.Driver.SetPWM(pwmChannel, 0, 0)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
go复制代码package core

import (
"fmt"
"gobot.io/x/gobot"
"airobot/robot/drivers"
"gobot.io/x/gobot/platforms/raspi"
)

//机器人主结构,存储gobot实例和各个初始化的驱动以及操作器
type Robot struct {
//gobot实例
gobot *gobot.Robot
//主板适配器
adaptor *raspi.Adaptor
//电机操作板驱动
motorDriver *drivers.PCA9685Driver
//电机操作器
motor *drivers.Motor
//电机操作指令列队
motorCmds chan (*CMDData)
}

//初始化机器人监听
func (this *Robot) Start() {
this.motorCmds = make(chan (*CMDData), 100)
go this.listenMotorCmds()
}

func (this *Robot) listenMotorCmds() {
m := NewMotor(this)
wait:
cmd := <-this.motorCmds
m.execute(cmd)
goto wait
}

//机器人状态通道,用于通知webapi启动,启动过程中通道会压入0,启动完成后压入1.
var RobotState = make(chan int, 1)

//机器人操作指令主通道,通过websocket接收指令并压入此通道
var RobotCMD = make(chan *CMD, 100)

//全局机器人实例
var bot *Robot

//机器人启动方法
func StartRobot() {

bot = &Robot{}

fmt.Println("机器人开始启动.")
RobotState <- 0
fmt.Println("初始化主板适配器.")
bot.adaptor = raspi.NewAdaptor()
fmt.Println("初始化电机驱动.")
bot.motorDriver = drivers.NewPCA9685Driver(bot.adaptor)
bot.Start()
work := func() {
fmt.Println("初始化电机操作器.")
bot.motor = drivers.NewMotor(bot.motorDriver)
// var PWM float32 = 125
// fmt.Println("设置电机PWM速率:", PWM)
// bot.motorDriver.SetPWMFreq(PWM)
fmt.Println("机器人启动完成.")
fmt.Println("机器人开始等待指令.")
RobotState <- 1
begin:
cmd := <-RobotCMD
// fmt.Println("接收到指令:", cmd)

switch cmd.Channel {
case "motor":
bot.motorCmds <- &cmd.Data
break
}
goto begin
}

robot := gobot.NewRobot("tankBot",
[]gobot.Connection{bot.adaptor},
[]gobot.Device{bot.motorDriver},
work,
)
robot.Start()
}


视频直播服务

livego 使用纯 go 语言写的一款简单高效的直播服务器。https://github.com/gwuhaolin/livego

  • 支持的传输协议 RTMP, AMF,HLS,HTTP-FLV
  • 支持的容器格式 FLV,TS
  • 支持的编码格式 H264,AAC, MP3

使用

  • 1 启动服务:执行 livego 二进制文件启动 livego 服务;
  • 2 访问 http://localhost:8090/control/get?room=movie 获取一个房间的channelkey(channelkey用于推流,movie用于播放).
  • 3 推流: 通过RTMP协议推送视频流到地址 rtmp://localhost:1935/{appname}/{channelkey} (appname默认是live)

例如: 使用 ffmpeg -re -i demo.flv -c copy -f flv

1
go复制代码rtmp://localhost:1935/{appname}/{channelkey} 推流(下载demo flv);

播放: 支持多种播放协议,播放地址如下:
RTMP:rtmp://localhost:1935/{appname}/movie
FLV:http://127.0.0.1:7001/{appname}/movie.flv
HLS:http://127.0.0.1:7002/{appname}/movie.m3u8

1
2
go复制代码//ffmpeg命令 推流
ffmpeg -re -i demo.flv -c copy -f flv rtmp://localhost:1935/{appname}/{channelkey}

文档地址:https://github.com/gwuhaolin/livego/blob/master/README_cn.md

CRON 定时任务


go语言支持cron定时任务 https://github.com/jakecoffman/cron

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
go复制代码package main

import (
"fmt"

"github.com/jakecoffman/cron"
)

func main() {
c := cron.New()
//每天早上6点执行Testfun函数
c.AddFunc("0 0 6 * * ? ", Testfunc, "定时任务")
c.Start()
}

func Testfunc() {
fmt.Println("定时任务")
//TODO 处理逻辑....
}

Excelize 一个 Go 语言版本的 Excel 文档 API


Go 语言编写的用于操作 Office Excel 文档基础库,基于 ECMA-376,ISO/IEC 29500 国际标准。可以使用它来读取、写入由 Microsoft Excel™ 2007 及以上版本创建的电子表格文档。支持 XLSX / XLSM / XLTM 等多种文档格式,高度兼容带有样式、图片(表)、透视表、切片器等复杂组件的文档,并提供流式读写 API

  • 文档地址:https://xuri.me/excelize/zh-hans/
  • 项目地址:https://github.com/360EntSecGroup-Skylar/excelize
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
go复制代码package main

import (
"fmt"
"github.com/360EntSecGroup-Skylar/excelize"
)

func main() {
f := excelize.NewFile()
// 创建一个 sheet
index := f.NewSheet("Sheet1")
// 设置值
f.SetCellValue("Sheet2", "A2", "Hello world.")
f.SetCellValue("Sheet1", "B2", 100)
// 设置工作簿的活动工作表。
f.SetActiveSheet(index)
// 保存excel
if err := f.SaveAs("Book1.xlsx"); err != nil {
fmt.Println(err)
}
}

🏆 技术专题第二期 | 我与 Go 的那些事……

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

2w字 + 40张图带你参透并发编程!

发表于 2020-08-19

并发历史

在计算机最早期的时候,没有操作系统,执行程序只需要一种方式,那就是从头到尾依次执行。任何资源都会为这个程序服务,在计算机使用某些资源时,其他资源就会空闲,就会存在 浪费资源 的情况。

这里说的浪费资源指的是资源空闲,没有充分使用的情况。

操作系统的出现为我们的程序带来了 并发性,操作系统使我们的程序能够同时运行多个程序,一个程序就是一个进程,也就相当于同时运行多个进程。

操作系统是一个并发系统,并发性是操作系统非常重要的特征,操作系统具有同时处理和调度多个程序的能力,比如多个 I/O 设备同时在输入输出;设备 I/O 和 CPU 计算同时进行;内存中同时有多个系统和用户程序被启动交替、穿插地执行。操作系统在协调和分配进程的同时,操作系统也会为不同进程分配不同的资源。

操作系统实现多个程序同时运行解决了单个程序无法做到的问题,主要有下面三点

  • 资源利用率,我们上面说到,单个进程存在资源浪费的情况,举个例子,当你在为某个文件夹赋予权限的时候,输入程序无法接受外部的输入字符,只有等到权限赋予完毕后才能接受外部输入。总的来讲,就是在等待程序时无法执行其他工作。如果在等待程序时可以运行另一个程序,那么将会大大提高资源的利用率。(资源并不会觉得累)因为它不会划水~
  • 公平性,不同的用户和程序都能够使用计算机上的资源。一种高效的运行方式是为不同的程序划分时间片来使用资源,但是有一点需要注意,操作系统可以决定不同进程的优先级。虽然每个进程都有能够公平享有资源的权利,但是当有一个进程释放资源后的同时有一个优先级更高的进程抢夺资源,就会造成优先级低的进程无法获得资源,进而导致进程饥饿。
  • 便利性,单个进程是是不用通信的,通信的本质就是信息交换,及时进行信息交换能够避免信息孤岛,做重复性的工作;任何并发能做的事情,单进程也能够实现,只不过这种方式效率很低,它是一种顺序性的。

但是,顺序编程(也称为串行编程)也不是一无是处的,串行编程的优势在于其直观性和简单性,客观来讲,串行编程更适合我们人脑的思考方式,但是我们并不会满足于顺序编程,we want it more!!! 。资源利用率、公平性和便利性促使着进程出现的同时,也促使着线程的出现。

如果你还不是很理解进程和线程的区别的话,那么我就以我多年操作系统的经验(吹牛逼,实则半年)来为你解释一下:进程是一个应用程序,而线程是应用程序中的一条顺序流。

进程中会有多个线程来完成一些任务,这些任务有可能相同有可能不同。每个线程都有自己的执行顺序。

每个线程都有自己的栈空间,这是线程私有的,还有一些其他线程内部的和线程共享的资源,如下所示。

在计算机中,一般堆栈指的就是栈,而堆指的才是堆

线程会共享进程范围内的资源,例如内存和文件句柄,但是每个线程也有自己私有的内容,比如程序计数器、栈以及局部变量。下面汇总了进程和线程共享资源的区别

线程是一种轻量级的进程,轻量级体现在线程的创建和销毁要比进程的开销小很多。

注意:任何比较都是相对的。

在大多数现代操作系统中,都以线程为基本的调度单位,所以我们的视角着重放在对线程的探究。

线程

什么是多线程

多线程意味着你能够在同一个应用程序中运行多个线程,我们知道,指令是在 CPU 中执行的,多线程应用程序就像是具有多个 CPU 在同时执行应用程序的代码。

其实这是一种假象,线程数量并不等于 CPU 数量,单个 CPU 将在多个线程之间共享 CPU 的时间片,在给定的时间片内执行每个线程之间的切换,每个线程也可以由不同的 CPU 执行,如下图所示

并发和并行的关系

并发意味着应用程序会执行多个的任务,但是如果计算机只有一个 CPU 的话,那么应用程序无法同时执行多个的任务,但是应用程序又需要执行多个任务,所以计算机在开始执行下一个任务之前,它并没有完成当前的任务,只是把状态暂存,进行任务切换,CPU 在多个任务之间进行切换,直到任务完成。如下图所示

并行是指应用程序将其任务分解为较小的子任务,这些子任务可以并行处理,例如在多个CPU上同时进行。

优势和劣势

合理使用线程是一门艺术,合理编写一道准确无误的多线程程序更是一门艺术,如果线程使用得当,能够有效的降低程序的开发和维护成本。

Java 很好的在用户空间实现了开发工具包,并在内核空间提供系统调用来支持多线程编程,Java 支持了丰富的类库 java.util.concurrent 和跨平台的内存模型,同时也提高了开发人员的门槛,并发一直以来是一个高阶的主题,但是现在,并发也成为了主流开发人员的必备素质。

虽然线程带来的好处很多,但是编写正确的多线程(并发)程序是一件极困难的事情,并发程序的 Bug 往往会诡异地出现又诡异的消失,在当你认为没有问题的时候它就出现了,难以定位 是并发程序的一个特征,所以在此基础上你需要有扎实的并发基本功。那么,并发为什么会出现呢?

并发为什么会出现

计算机世界的快速发展离不开 CPU、内存和 I/O 设备的高速发展,但是这三者一直存在速度差异性问题,我们可以从存储器的层次结构可以看出

CPU 内部是寄存器的构造,寄存器的访问速度要高于高速缓存,高速缓存的访问速度要高于内存,最慢的是磁盘访问。

程序是在内存中执行的,程序里大部分语句都要访问内存,有些还需要访问 I/O 设备,根据漏桶理论来说,程序整体的性能取决于最慢的操作也就是磁盘访问速度。

因为 CPU 速度太快了,所以为了发挥 CPU 的速度优势,平衡这三者的速度差异,计算机体系机构、操作系统、编译程序都做出了贡献,主要体现为:

  • CPU 使用缓存来中和和内存的访问速度差异
  • 操作系统提供进程和线程调度,让 CPU 在执行指令的同时分时复用线程,让内存和磁盘不断交互,不同的 CPU 时间片 能够执行不同的任务,从而均衡这三者的差异
  • 编译程序提供优化指令的执行顺序,让缓存能够合理的使用

我们在享受这些便利的同时,多线程也为我们带来了挑战,下面我们就来探讨一下并发问题为什么会出现以及多线程的源头是什么

线程带来的安全性问题

线程安全性是非常复杂的,在没有采用同步机制的情况下,多个线程中的执行操作往往是不可预测的,这也是多线程带来的挑战之一,下面我们给出一段代码,来看看安全性问题体现在哪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码public class TSynchronized implements Runnable{

static int i = 0;

public void increase(){
i++;
}


@Override
public void run() {
for(int i = 0;i < 1000;i++) {
increase();
}
}

public static void main(String[] args) throws InterruptedException {

TSynchronized tSynchronized = new TSynchronized();
Thread aThread = new Thread(tSynchronized);
Thread bThread = new Thread(tSynchronized);
aThread.start();
bThread.start();
System.out.println("i = " + i);
}
}

这段程序输出后会发现,i 的值每次都不一样,这不符合我们的预测,那么为什么会出现这种情况呢?我们先来分析一下程序的运行过程。

TSynchronized 实现了 Runnable 接口,并定义了一个静态变量 i,然后在 increase 方法中每次都增加 i 的值,在其实现的 run 方法中进行循环调用,共执行 1000 次。

可见性问题

在单核 CPU 时代,所有的线程共用一个 CPU,CPU 缓存和内存的一致性问题容易解决,CPU 和 内存之间

如果用图来表示的话我想会是下面这样

在多核时代,因为有多核的存在,每个核都能够独立的运行一个线程,每颗 CPU 都有自己的缓存,这时 CPU 缓存与内存的数据一致性就没那么容易解决了,当多个线程在不同的 CPU 上执行时,这些线程操作的是不同的 CPU 缓存

因为 i 是静态变量,没有经过任何线程安全措施的保护,多个线程会并发修改 i 的值,所以我们认为 i 不是线程安全的,导致这种结果的出现是由于 aThread 和 bThread 中读取的 i 值彼此不可见,所以这是由于 可见性 导致的线程安全问题。

####原子性问题

看起来很普通的一段程序却因为两个线程 aThread 和 bThread 交替执行产生了不同的结果。但是根源不是因为创建了两个线程导致的,多线程只是产生线程安全性的必要条件,最终的根源出现在 i++ 这个操作上。

这个操作怎么了?这不就是一个给 i 递增的操作吗?也就是 i++ => i = i + 1,这怎么就会产生问题了?

因为 i++ 不是一个 原子性 操作,仔细想一下,i++ 其实有三个步骤,读取 i 的值,执行 i + 1 操作,然后把 i + 1 得出的值重新赋给 i(将结果写入内存)。

当两个线程开始运行后,每个线程都会把 i 的值读入到 CPU 缓存中,然后执行 + 1 操作,再把 + 1 之后的值写入内存。因为线程间都有各自的虚拟机栈和程序计数器,他们彼此之间没有数据交换,所以当 aThread 执行 + 1 操作后,会把数据写入到内存,同时 bThread 执行 + 1 操作后,也会把数据写入到内存,因为 CPU 时间片的执行周期是不确定的,所以会出现当 aThread 还没有把数据写入内存时,bThread 就会读取内存中的数据,然后执行 + 1操作,再写回内存,从而覆盖 i 的值,导致 aThread 所做的努力白费。

为什么上面的线程切换会出现问题呢?

我们先来考虑一下正常情况下(即不会出现线程安全性问题的情况下)两条线程的执行顺序

可以看到,当 aThread 在执行完整个 i++ 的操作后,操作系统对线程进行切换,由 aThread -> bThread,这是最理想的操作,一旦操作系统在任意 读取/增加/写入 阶段产生线程切换,都会产生线程安全问题。例如如下图所示

最开始的时候,内存中 i = 0,aThread 读取内存中的值并把它读取到自己的寄存器中,执行 +1 操作,此时发生线程切换,bThread 开始执行,读取内存中的值并把它读取到自己的寄存器中,此时发生线程切换,线程切换至 aThread 开始运行,aThread 把自己寄存器的值写回到内存中,此时又发生线程切换,由 aThread -> bThread,线程 bThread 把自己寄存器的值 +1 然后写回内存,写完后内存中的值不是 2 ,而是 1, 内存中的 i 值被覆盖了。

我们上面提到 原子性 这个概念,那么什么是原子性呢?

并发编程的原子性操作是完全独立于任何其他进程运行的操作,原子操作多用于现代操作系统和并行处理系统中。

原子操作通常在内核中使用,因为内核是操作系统的主要组件。但是,大多数计算机硬件,编译器和库也提供原子性操作。

在加载和存储中,计算机硬件对存储器字进行读取和写入。为了对值进行匹配、增加或者减小操作,一般通过原子操作进行。在原子操作期间,处理器可以在同一数据传输期间完成读取和写入。 这样,其他输入/输出机制或处理器无法执行存储器读取或写入任务,直到原子操作完成为止。

简单来讲,就是原子操作要么全部执行,要么全部不执行。数据库事务的原子性也是基于这个概念演进的。

有序性问题

在并发编程中还有带来让人非常头疼的 有序性 问题,有序性顾名思义就是顺序性,在计算机中指的就是指令的先后执行顺序。一个非常显而易见的例子就是 JVM 中的类加载

这是一个 JVM 加载类的过程图,也称为类的生命周期,类从加载到 JVM 到卸载一共会经历五个阶段 加载、连接、初始化、使用、卸载。这五个过程的执行顺序是一定的,但是在连接阶段,也会分为三个过程,即 验证、准备、解析 阶段,这三个阶段的执行顺序不是确定的,通常交叉进行,在一个阶段的执行过程中会激活另一个阶段。

有序性问题一般是编译器带来的,编译器有的时候确实是 好心办坏事,它为了优化系统性能,往往更换指令的执行顺序。

活跃性问题

多线程还会带来活跃性问题,如何定义活跃性问题呢?活跃性问题关注的是 某件事情是否会发生。

如果一组线程中的每个线程都在等待一个事件的发生,而这个事件只能由该组中正在等待的线程触发,这种情况会导致死锁。

简单一点来表述一下,就是每个线程都在等待其他线程释放资源,而其他资源也在等待每个线程释放资源,这样没有线程抢先释放自己的资源,这种情况会产生死锁,所有线程都会无限的等待下去。

死锁的必要条件

造成死锁的原因有四个,破坏其中一个即可破坏死锁

  • 互斥条件:指进程对所分配到的资源进行排它性使用,即在一段时间内某资源只由一个进程占用。如果此时还有其它进程请求资源,则请求者只能等待,直至占有资源的进程释放。
  • 请求和保持条件:指进程已经保持至少一个资源,但又提出了新的资源请求,而该资源已被其它进程占有,此时请求进程阻塞,但又对自己已获得的其它资源保持占有。
  • 不剥夺条件:指进程已获得的资源,在未使用完之前,不能被剥夺,只能在使用完时由自己释放。
  • 循环等待:指在发生死锁时,必然存在一个进程对应的环形链。

换句话说,死锁线程集合中的每个线程都在等待另一个死锁线程占有的资源。但是由于所有线程都不能运行,它们之中任何一个资源都无法释放资源,所以没有一个线程可以被唤醒。

如果说死锁很痴情的话,那么活锁用一则成语来表示就是 弄巧成拙。

某些情况下,当线程意识到它不能获取所需要的下一个锁时,就会尝试礼貌的释放已经获得的锁,然后等待非常短的时间再次尝试获取。可以想像一下这个场景:当两个人在狭路相逢的时候,都想给对方让路,相同的步调会导致双方都无法前进。

现在假想有一对并行的线程用到了两个资源。它们分别尝试获取另一个锁失败后,两个线程都会释放自己持有的锁,再次进行尝试,这个过程会一直进行重复。很明显,这个过程中没有线程阻塞,但是线程仍然不会向下执行,这种状况我们称之为 活锁(livelock)。

如果我们期望的事情一直不会发生,就会产生活跃性问题,比如单线程中的无限循环

1
2
3
java复制代码while(true){...}

for(;;){}

在多线程中,比如 aThread 和 bThread 都需要某种资源,aThread 一直占用资源不释放,bThread 一直得不到执行,就会造成活跃性问题,bThread 线程会产生饥饿,我们后面会说。

性能问题

与活跃性问题密切相关的是 性能 问题,如果说活跃性问题关注的是最终的结果,那么性能问题关注的就是造成结果的过程,性能问题有很多方面:比如服务时间过长,吞吐率过低,资源消耗过高,在多线程中这样的问题同样存在。

在多线程中,有一个非常重要的性能因素那就是我们上面提到的 线程切换,也称为 上下文切换(Context Switch),这种操作开销很大。

在计算机世界中,老外都喜欢用 context 上下文这个词,这个词涵盖的内容很多,包括上下文切换的资源,寄存器的状态、程序计数器等。context switch 一般指的就是这些上下文切换的资源、寄存器状态、程序计数器的变化等。

在上下文切换中,会保存和恢复上下文,丢失局部性,把大量的时间消耗在线程切换上而不是线程运行上。

为什么线程切换会开销如此之大呢?线程间的切换会涉及到以下几个步骤

将 CPU 从一个线程切换到另一线程涉及挂起当前线程,保存其状态,例如寄存器,然后恢复到要切换的线程的状态,加载新的程序计数器,此时线程切换实际上就已经完成了;此时,CPU 不在执行线程切换代码,进而执行新的和线程关联的代码。

引起线程切换的几种方式

线程间的切换一般是操作系统层面需要考虑的问题,那么引起线程上下文切换有哪几种方式呢?或者说线程切换有哪几种诱因呢?主要有下面几种引起上下文切换的方式

  • 当前正在执行的任务完成,系统的 CPU 正常调度下一个需要运行的线程
  • 当前正在执行的任务遇到 I/O 等阻塞操作,线程调度器挂起此任务,继续调度下一个任务。
  • 多个任务并发抢占锁资源,当前任务没有获得锁资源,被线程调度器挂起,继续调度下一个任务。
  • 用户的代码挂起当前任务,比如线程执行 sleep 方法,让出CPU。
  • 使用硬件中断的方式引起上下文切换

线程安全性

在 Java 中,要实现线程安全性,必须要正确的使用线程和锁,但是这些只是满足线程安全的一种方式,要编写正确无误的线程安全的代码,其核心就是对状态访问操作进行管理。最重要的就是最 共享(Shared)的 和 可变(Mutable)的状态。只有共享和可变的变量才会出现问题,私有变量不会出现问题,参考程序计数器。

对象的状态可以理解为存储在实例变量或者静态变量中的数据,共享意味着某个变量可以被多个线程同时访问、可变意味着变量在生命周期内会发生变化。一个变量是否是线程安全的,取决于它是否被多个线程访问。要使变量能够被安全访问,必须通过同步机制来对变量进行修饰。

如果不采用同步机制的话,那么就要避免多线程对共享变量的访问,主要有下面两种方式

  • 不要在多线程之间共享变量
  • 将共享变量置为不可变的

我们说了这么多次线程安全性,那么什么是线程安全性呢?

什么是线程安全性

多个线程可以同时安全调用的代码称为线程安全的,如果一段代码是安全的,那么这段代码就不存在 竞态条件。仅仅当多个线程共享资源时,才会出现竞态条件。

根据上面的探讨,我们可以得出一个简单的结论:当多个线程访问某个类时,这个类始终都能表现出正确的行为,那么就称这个类是线程安全的。

单线程就是一个线程数量为 1 的多线程,单线程一定是线程安全的。读取某个变量的值不会产生安全性问题,因为不管读取多少次,这个变量的值都不会被修改。

原子性

我们上面提到了原子性的概念,你可以把原子性操作想象成为一个不可分割 的整体,它的结果只有两种,要么全部执行,要么全部回滚。你可以把原子性认为是 婚姻关系 的一种,男人和女人只会产生两种结果,好好的 和 说散就散,一般男人的一生都可以把他看成是原子性的一种,当然我们不排除时间管理(线程切换)的个例,我们知道线程切换必然会伴随着安全性问题,男人要出去浪也会造成两种结果,这两种结果分别对应安全性的两个结果:线程安全(好好的)和线程不安全(说散就散)。

竞态条件

有了上面的线程切换的功底,那么竞态条件也就好定义了,它指的就是两个或多个线程同时对一共享数据进行修改,从而影响程序运行的正确性时,这种就被称为竞态条件(race condition) ,线程切换是导致竞态条件出现的诱导因素,我们通过一个示例来说明,来看一段代码

1
2
3
4
5
6
7
8
9
10
11
java复制代码public class RaceCondition {

private Signleton single = null;
public Signleton newSingleton(){
if(single == null){
single = new Signleton();
}
return single;
}

}

在上面的代码中,涉及到一个竞态条件,那就是判断 single 的时候,如果 single 判断为空,此时发生了线程切换,另外一个线程执行,判断 single 的时候,也是空,执行 new 操作,然后线程切换回之前的线程,再执行 new 操作,那么内存中就会有两个 Singleton 对象。

加锁机制

在 Java 中,有很多种方式来对共享和可变的资源进行加锁和保护。Java 提供一种内置的机制对资源进行保护:synchronized 关键字,它有三种保护机制

  • 对方法进行加锁,确保多个线程中只有一个线程执行方法;
  • 对某个对象实例(在我们上面的探讨中,变量可以使用对象来替换)进行加锁,确保多个线程中只有一个线程对对象实例进行访问;
  • 对类对象进行加锁,确保多个线程只有一个线程能够访问类中的资源。

synchronized 关键字对资源进行保护的代码块俗称 同步代码块(Synchronized Block),例如

1
2
3
java复制代码synchronized(lock){
// 线程安全的代码
}

每个 Java 对象都可以用做一个实现同步的锁,这些锁被称为 内置锁(Instrinsic Lock)或者 监视器锁(Monitor Lock)。线程在进入同步代码之前会自动获得锁,并且在退出同步代码时自动释放锁,而无论是通过正常执行路径退出还是通过异常路径退出,获得内置锁的唯一途径就是进入这个由锁保护的同步代码块或方法。

synchronized 的另一种隐含的语义就是 互斥,互斥意味着独占,最多只有一个线程持有锁,当线程 A 尝试获得一个由线程 B 持有的锁时,线程 A 必须等待或者阻塞,直到线程 B 释放这个锁,如果线程 B 不释放锁的话,那么线程 A 将会一直等待下去。

线程 A 获得线程 B 持有的锁时,线程 A 必须等待或者阻塞,但是获取锁的线程 B 可以重入,重入的意思可以用一段代码表示

1
2
3
4
5
6
7
8
9
10
java复制代码public class Retreent {

public synchronized void doSomething(){
doSomethingElse();
System.out.println("doSomething......");
}

public synchronized void doSomethingElse(){
System.out.println("doSomethingElse......");
}

获取 doSomething() 方法锁的线程可以执行 doSomethingElse() 方法,执行完毕后可以重新执行 doSomething() 方法中的内容。锁重入也支持子类和父类之间的重入,具体的我们后面会进行介绍。

volatile 是一种轻量级的 synchronized,也就是一种轻量级的加锁方式,volatile 通过保证共享变量的可见性来从侧面对对象进行加锁。可见性的意思就是当一个线程修改一个共享变量时,另外一个线程能够 看见 这个修改的值。volatile 的执行成本要比 synchronized 低很多,因为 volatile 不会引起线程的上下文切换。

我们还可以使用原子类 来保证线程安全,原子类其实就是 rt.jar 下面以 atomic 开头的类

除此之外,我们还可以使用 java.util.concurrent 工具包下的线程安全的集合类来确保线程安全,具体的实现类和其原理我们后面会说。

可以使用不同的并发模型来实现并发系统,并发模型说的是系统中的线程如何协作完成并发任务。不同的并发模型以不同的方式拆分任务,线程可以以不同的方式进行通信和协作。

竞态条件和关键区域

竞态条件是在关键代码区域发生的一种特殊条件。关键区域是由多个线程同时执行的代码部分,关键区域中的代码执行顺序会对造成不一样的结果。如果多个线程执行一段关键代码,而这段关键代码会因为执行顺序不同而造成不同的结果时,那么这段代码就会包含竞争条件。

并发模型和分布式系统很相似

并发模型其实和分布式系统模型非常相似,在并发模型中是线程彼此进行通信,而在分布式系统模型中是 进程 彼此进行通信。然而本质上,进程和线程也非常相似。这也就是为什么并发模型和分布式模型非常相似的原因。

分布式系统通常要比并发系统面临更多的挑战和问题比如进程通信、网络可能出现异常,或者远程机器挂掉等等。但是一个并发模型同样面临着比如 CPU 故障、网卡出现问题、硬盘出现问题等。

因为并发模型和分布式模型很相似,因此他们可以相互借鉴,例如用于线程分配的模型就类似于分布式系统环境中的负载均衡模型。

其实说白了,分布式模型的思想就是借鉴并发模型的基础上推演发展来的。

认识两个状态

并发模型的一个重要的方面是,线程是否应该共享状态,是具有共享状态还是独立状态。共享状态也就意味着在不同线程之间共享某些状态

状态其实就是数据,比如一个或者多个对象。当线程要共享数据时,就会造成 竞态条件 或者 死锁 等问题。当然,这些问题只是可能会出现,具体实现方式取决于你是否安全的使用和访问共享对象。

独立的状态表明状态不会在多个线程之间共享,如果线程之间需要通信的话,他们可以访问不可变的对象来实现,这是最有效的避免并发问题的一种方式,如下图所示

使用独立状态让我们的设计更加简单,因为只有一个线程能够访问对象,即使交换对象,也是不可变的对象。

并发模型

并行 Worker

第一个并发模型是并行 worker 模型,客户端会把任务交给 代理人(Delegator),然后由代理人把工作分配给不同的 工人(worker)。如下图所示

并行 worker 的核心思想是,它主要有两个进程即代理人和工人,Delegator 负责接收来自客户端的任务并把任务下发,交给具体的 Worker 进行处理,Worker 处理完成后把结果返回给 Delegator,在 Delegator 接收到 Worker 处理的结果后对其进行汇总,然后交给客户端。

并行 Worker 模型是 Java 并发模型中非常常见的一种模型。许多 java.util.concurrent 包下的并发工具都使用了这种模型。

并行 Worker 的优点

并行 Worker 模型的一个非常明显的特点就是很容易理解,为了提高系统的并行度你可以增加多个 Worker 完成任务。

并行 Worker 模型的另外一个好处就是,它会将一个任务拆分成多个小任务,并发执行,Delegator 在接受到 Worker 的处理结果后就会返回给 Client,整个 Worker -> Delegator -> Client 的过程是异步的。

并行 Worker 的缺点

同样的,并行 Worker 模式同样会有一些隐藏的缺点

共享状态会变得很复杂

实际的并行 Worker 要比我们图中画出的更复杂,主要是并行 Worker 通常会访问内存或共享数据库中的某些共享数据。

这些共享状态可能会使用一些工作队列来保存业务数据、数据缓存、数据库的连接池等。在线程通信中,线程需要确保共享状态是否能够让其他线程共享,而不是仅仅停留在 CPU 缓存中让自己可用,当然这些都是程序员在设计时就需要考虑的问题。线程需要避免 竞态条件,死锁 和许多其他共享状态造成的并发问题。

多线程在访问共享数据时,会丢失并发性,因为操作系统要保证只有一个线程能够访问数据,这会导致共享数据的争用和抢占。未抢占到资源的线程会 阻塞。

现代的非阻塞并发算法可以减少争用提高性能,但是非阻塞算法比较难以实现。

可持久化的数据结构(Persistent data structures) 是另外一个选择。可持久化的数据结构在修改后始终会保留先前版本。因此,如果多个线程同时修改一个可持久化的数据结构,并且一个线程对其进行了修改,则修改的线程会获得对新数据结构的引用。

虽然可持久化的数据结构是一个新的解决方法,但是这种方法实行起来却有一些问题,比如,一个持久列表会将新元素添加到列表的开头,并返回所添加的新元素的引用,但是其他线程仍然只持有列表中先前的第一个元素的引用,他们看不到新添加的元素。

持久化的数据结构比如 链表(LinkedList) 在硬件性能上表现不佳。列表中的每个元素都是一个对象,这些对象散布在计算机内存中。现代 CPU 的顺序访问往往要快的多,因此使用数组等顺序访问的数据结构则能够获得更高的性能。CPU 高速缓存可以将一个大的矩阵块加载到高速缓存中,并让 CPU 在加载后直接访问 CPU 高速缓存中的数据。对于链表,将元素分散在整个 RAM 上,这实际上是不可能的。

无状态的 worker

共享状态可以由其他线程所修改,因此,worker 必须在每次操作共享状态时重新读取,以确保在副本上能够正确工作。不在线程内部保持状态的 worker 成为无状态的 worker。

作业顺序是不确定的

并行工作模型的另一个缺点是作业的顺序不确定,无法保证首先执行或最后执行哪些作业。任务 A 在任务 B 之前分配给 worker,但是任务 B 可能在任务 A 之前执行。

流水线

第二种并发模型就是我们经常在生产车间遇到的 流水线并发模型,下面是流水线设计模型的流程图

这种组织架构就像是工厂中装配线中的 worker,每个 worker 只完成全部工作的一部分,完成一部分后,worker 会将工作转发给下一个 worker。

每道程序都在自己的线程中运行,彼此之间不会共享状态,这种模型也被称为无共享并发模型。

使用流水线并发模型通常被设计为非阻塞I/O,也就是说,当没有给 worker 分配任务时,worker 会做其他工作。非阻塞I/O 意味着当 worker 开始 I/O 操作,例如从网络中读取文件,worker 不会等待 I/O 调用完成。因为 I/O 操作很慢,所以等待 I/O 非常耗费时间。在等待 I/O 的同时,CPU 可以做其他事情,I/O 操作完成后的结果将传递给下一个 worker。下面是非阻塞 I/O 的流程图

在实际情况中,任务通常不会按着一条装配线流动,由于大多数程序需要做很多事情,因此需要根据完成的不同工作在不同的 worker 之间流动,如下图所示

任务还可能需要多个 worker 共同参与完成

响应式 - 事件驱动系统

使用流水线模型的系统有时也被称为 响应式 或者 事件驱动系统,这种模型会根据外部的事件作出响应,事件可能是某个 HTTP 请求或者某个文件完成加载到内存中。

Actor 模型

在 Actor 模型中,每一个 Actor 其实就是一个 Worker, 每一个 Actor 都能够处理任务。

简单来说,Actor 模型是一个并发模型,它定义了一系列系统组件应该如何动作和交互的通用规则,最著名的使用这套规则的编程语言是 Erlang。一个参与者Actor对接收到的消息做出响应,然后可以创建出更多的 Actor 或发送更多的消息,同时准备接收下一条消息。

Channels 模型

在 Channel 模型中,worker 通常不会直接通信,与此相对的,他们通常将事件发送到不同的 通道(Channel)上,然后其他 worker 可以在这些通道上获取消息,下面是 Channel 的模型图

有的时候 worker 不需要明确知道接下来的 worker 是谁,他们只需要将作者写入通道中,监听 Channel 的 worker 可以订阅或者取消订阅,这种方式降低了 worker 和 worker 之间的耦合性。

流水线设计的优点

与并行设计模型相比,流水线模型具有一些优势,具体优势如下

不会存在共享状态

因为流水线设计能够保证 worker 在处理完成后再传递给下一个 worker,所以 worker 与 worker 之间不需要共享任何状态,也就无需考虑并发问题。你甚至可以在实现上把每个 worker 看成是单线程的一种。

有状态 worker

因为 worker 知道没有其他线程修改自身的数据,所以流水线设计中的 worker 是有状态的,有状态的意思是他们可以将需要操作的数据保留在内存中,有状态通常比无状态更快。

更好的硬件整合

因为你可以把流水线看成是单线程的,而单线程的工作优势在于它能够和硬件的工作方式相同。因为有状态的 worker 通常在 CPU 中缓存数据,这样可以更快地访问缓存的数据。

使任务更加有效的进行

可以对流水线并发模型中的任务进行排序,一般用来日志的写入和恢复。

流水线设计的缺点

流水线并发模型的缺点是任务会涉及多个 worker,因此可能会分散在项目代码的多个类中。因此很难确定每个 worker 都在执行哪个任务。流水线的代码编写也比较困难,设计许多嵌套回调处理程序的代码通常被称为 回调地狱。回调地狱很难追踪 debug。

函数性并行

函数性并行模型是最近才提出的一种并发模型,它的基本思路是使用函数调用来实现。消息的传递就相当于是函数的调用。传递给函数的参数都会被拷贝,因此在函数之外的任何实体都无法操纵函数内的数据。这使得函数执行类似于原子操作。每个函数调用都可以独立于任何其他函数调用执行。

当每个函数调用独立执行时,每个函数都可以在单独的 CPU 上执行。这也就是说,函数式并行并行相当于是各个 CPU 单独执行各自的任务。

JDK 1.7 中的 ForkAndJoinPool 类就实现了函数性并行的功能。Java 8 提出了 stream 的概念,使用并行流也能够实现大量集合的迭代。

函数性并行的难点是要知道函数的调用流程以及哪些 CPU 执行了哪些函数,跨 CPU 函数调用会带来额外的开销。

我们之前说过,线程就是进程中的一条顺序流,在 Java 中,每一条 Java 线程就像是 JVM 的一条顺序流,就像是虚拟 CPU 一样来执行代码。Java 中的 main() 方法是一条特殊的线程,JVM 创建的 main 线程是一条主执行线程,在 Java 中,方法都是由 main 方法发起的。在 main 方法中,你照样可以创建其他的线程(执行顺序流),这些线程可以和 main 方法共同执行应用代码。

Java 线程也是一种对象,它和其他对象一样。Java 中的 Thread 表示线程,Thread 是 java.lang.Thread 类或其子类的实例。那么下面我们就来一起探讨一下在 Java 中如何创建和启动线程。

创建并启动线程

在 Java 中,创建线程的方式主要有三种

  • 通过继承 Thread 类来创建线程
  • 通过实现 Runnable 接口来创建线程
  • 通过 Callable 和 Future 来创建线程

下面我们分别探讨一下这几种创建方式

继承 Thread 类来创建线程

第一种方式是继承 Thread 类来创建线程,如下示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public class TJavaThread extends Thread{

static int count;

@Override
public synchronized void run() {
for(int i = 0;i < 10000;i++){
count++;
}
}

public static void main(String[] args) throws InterruptedException {

TJavaThread tJavaThread = new TJavaThread();
tJavaThread.start();
tJavaThread.join();
System.out.println("count = " + count);
}
}

线程的主要创建步骤如下

  • 定义一个线程类使其继承 Thread 类,并重写其中的 run 方法,run 方法内部就是线程要完成的任务,因此 run 方法也被称为 执行体
  • 创建了 Thread 的子类,上面代码中的子类是 TJavaThread
  • 启动方法需要注意,并不是直接调用 run 方法来启动线程,而是使用 start 方法来启动线程。当然 run 方法可以调用,这样的话就会变成普通方法调用,而不是新创建一个线程来调用了。
1
2
3
4
5
6
java复制代码public static void main(String[] args) throws InterruptedException {

TJavaThread tJavaThread = new TJavaThread();
tJavaThread.run();
System.out.println("count = " + count);
}

这样的话,整个 main 方法只有一条执行线程也就是 main 线程,由两条执行线程变为一条执行线程

Thread 构造器只需要一个 Runnable 对象,调用 Thread 对象的 start() 方法为该线程执行必须的初始化操作,然后调用 Runnable 的 run 方法,以便在这个线程中启动任务。我们上面使用了线程的 join 方法,它用来等待线程的执行结束,如果我们不加 join 方法,它就不会等待 tJavaThread 的执行完毕,输出的结果可能就不是 10000

可以看到,在 run 方法还没有结束前,run 就被返回了。也就是说,程序不会等到 run 方法执行完毕就会执行下面的指令。

使用继承方式创建线程的优势:编写比较简单;可以使用 this 关键字直接指向当前线程,而无需使用 Thread.currentThread() 来获取当前线程。

使用继承方式创建线程的劣势:在 Java 中,只允许单继承(拒绝肛精说使用内部类可以实现多继承)的原则,所以使用继承的方式,子类就不能再继承其他类。

使用 Runnable 接口来创建线程

相对的,还可以使用 Runnable 接口来创建线程,如下示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public class TJavaThreadUseImplements implements Runnable{

static int count;

@Override
public synchronized void run() {
for(int i = 0;i < 10000;i++){
count++;
}
}

public static void main(String[] args) throws InterruptedException {

new Thread(new TJavaThreadUseImplements()).start();
System.out.println("count = " + count);
}

}

线程的主要创建步骤如下

  • 首先定义 Runnable 接口,并重写 Runnable 接口的 run 方法,run 方法的方法体同样是该线程的线程执行体。
  • 创建线程实例,可以使用上面代码这种简单的方式创建,也可以通过 new 出线程的实例来创建,如下所示
1
2
java复制代码TJavaThreadUseImplements tJavaThreadUseImplements = new TJavaThreadUseImplements();
new Thread(tJavaThreadUseImplements).start();
  • 再调用线程对象的 start 方法来启动该线程。

线程在使用实现 Runnable 的同时也能实现其他接口,非常适合多个相同线程来处理同一份资源的情况,体现了面向对象的思想。

使用 Runnable 实现的劣势是编程稍微繁琐,如果要访问当前线程,则必须使用 Thread.currentThread() 方法。

使用 Callable 接口来创建线程

Runnable 接口执行的是独立的任务,Runnable 接口不会产生任何返回值,如果你希望在任务完成后能够返回一个值的话,那么你可以实现 Callable 接口而不是 Runnable 接口。Java SE5 引入了 Callable 接口,它的示例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public class CallableTask implements Callable {

static int count;
public CallableTask(int count){
this.count = count;
}

@Override
public Object call() {
return count;
}

public static void main(String[] args) throws ExecutionException, InterruptedException {

FutureTask<Integer> task = new FutureTask((Callable<Integer>) () -> {
for(int i = 0;i < 1000;i++){
count++;
}
return count;
});
Thread thread = new Thread(task);
thread.start();

Integer total = task.get();
System.out.println("total = " + total);
}
}

我想,使用 Callable 接口的好处你已经知道了吧,既能够实现多个接口,也能够得到执行结果的返回值。Callable 和 Runnable 接口还是有一些区别的,主要区别如下

  • Callable 执行的任务有返回值,而 Runnable 执行的任务没有返回值
  • Callable(重写)的方法是 call 方法,而 Runnable(重写)的方法是 run 方法。
  • call 方法可以抛出异常,而 Runnable 方法不能抛出异常

使用线程池来创建线程

首先先来认识一下顶级接口 Executor,Executor 虽然不是传统线程创建的方式之一,但是它却成为了创建线程的替代者,使用线程池的好处如下

  • 利用线程池能够复用线程、控制最大并发数。
  • 实现任务线程队列缓存策略和拒绝机制。
  • 实现某些与时间相关的功能,如定时执行、周期执行等。
  • 隔离线程环境。比如,交易服务和搜索服务在同一台服务器上,分别开启两个线程池,交易线程的资源消耗明显要大;因此,通过配置独立的线程池,将较慢的交易服务与搜索服务隔开,避免个服务线程互相影响。

你可以使用如下操作来替换线程创建

1
2
3
4
5
6
7
java复制代码new Thread(new(RunnableTask())).start()

// 替换为

Executor executor = new ExecutorSubClass() // 线程池实现类;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

ExecutorService 是 Executor 的默认实现,也是 Executor 的扩展接口,ThreadPoolExecutor 类提供了线程池的扩展实现。Executors 类为这些 Executor 提供了方便的工厂方法。下面是使用 ExecutorService 创建线程的几种方式

CachedThreadPool

从而简化了并发编程。Executor 在客户端和任务之间提供了一个间接层;与客户端直接执行任务不同,这个中介对象将执行任务。Executor 允许你管理异步任务的执行,而无须显示地管理线程的生命周期。

1
2
3
4
5
6
7
java复制代码public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
for(int i = 0;i < 5;i++){
service.execute(new TestThread());
}
service.shutdown();
}

CachedThreadPool 会为每个任务都创建一个线程。

注意:ExecutorService 对象是使用静态的 Executors 创建的,这个方法可以确定 Executor 类型。对 shutDown 的调用可以防止新任务提交给 ExecutorService ,这个线程在 Executor 中所有任务完成后退出。

FixedThreadPool

FixedThreadPool 使你可以使用有限的线程集来启动多线程

1
2
3
4
5
6
7
java复制代码public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(5);
for(int i = 0;i < 5;i++){
service.execute(new TestThread());
}
service.shutdown();
}

有了 FixedThreadPool 使你可以一次性的预先执行高昂的线程分配,因此也就可以限制线程的数量。这可以节省时间,因为你不必为每个任务都固定的付出创建线程的开销。

SingleThreadExecutor

SingleThreadExecutor 就是线程数量为 1 的 FixedThreadPool,如果向 SingleThreadPool 一次性提交了多个任务,那么这些任务将会排队,每个任务都会在下一个任务开始前结束,所有的任务都将使用相同的线程。SingleThreadPool 会序列化所有提交给他的任务,并会维护它自己(隐藏)的悬挂队列。

1
2
3
4
5
6
7
java复制代码public static void main(String[] args) {
ExecutorService service = Executors.newSingleThreadExecutor();
for(int i = 0;i < 5;i++){
service.execute(new TestThread());
}
service.shutdown();
}

从输出的结果就可以看到,任务都是挨着执行的。我为任务分配了五个线程,但是这五个线程不像是我们之前看到的有换进换出的效果,它每次都会先执行完自己的那个线程,然后余下的线程继续走完这条线程的执行路径。你可以用 SingleThreadExecutor 来确保任意时刻都只有唯一一个任务在运行。

休眠

影响任务行为的一种简单方式就是使线程 休眠,选定给定的休眠时间,调用它的 sleep() 方法, 一般使用的TimeUnit 这个时间类替换 Thread.sleep() 方法,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码public class SuperclassThread extends TestThread{

@Override
public void run() {
System.out.println(Thread.currentThread() + "starting ..." );

try {
for(int i = 0;i < 5;i++){
if(i == 3){
System.out.println(Thread.currentThread() + "sleeping ...");
TimeUnit.MILLISECONDS.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(Thread.currentThread() + "wakeup and end ...");
}

public static void main(String[] args) {
ExecutorService executors = Executors.newCachedThreadPool();
for(int i = 0;i < 5;i++){
executors.execute(new SuperclassThread());
}
executors.shutdown();
}
}

关于 TimeUnit 中的 sleep() 方法和 Thread.sleep() 方法的比较,请参考下面这篇博客

(www.cnblogs.com/xiadongqing…)

优先级

上面提到线程调度器对每个线程的执行都是不可预知的,随机执行的,那么有没有办法告诉线程调度器哪个任务想要优先被执行呢?你可以通过设置线程的优先级状态,告诉线程调度器哪个线程的执行优先级比较高,请给这个骑手马上派单,线程调度器倾向于让优先级较高的线程优先执行,然而,这并不意味着优先级低的线程得不到执行,也就是说,优先级不会导致死锁的问题。优先级较低的线程只是执行频率较低。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public class SimplePriorities implements Runnable{

private int priority;

public SimplePriorities(int priority) {
this.priority = priority;
}

@Override
public void run() {
Thread.currentThread().setPriority(priority);
for(int i = 0;i < 100;i++){
System.out.println(this);
if(i % 10 == 0){
Thread.yield();
}
}
}

@Override
public String toString() {
return Thread.currentThread() + " " + priority;
}

public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
for(int i = 0;i < 5;i++){
service.execute(new SimplePriorities(Thread.MAX_PRIORITY));
}
service.execute(new SimplePriorities(Thread.MIN_PRIORITY));
}
}

toString() 方法被覆盖,以便通过使用 Thread.toString() 方法来打印线程的名称。你可以改写线程的默认输出,这里采用了 Thread[pool-1-thread-1,10,main] 这种形式的输出。

通过输出,你可以看到,最后一个线程的优先级最低,其余的线程优先级最高。注意,优先级是在 run 开头设置的,在构造器中设置它们不会有任何好处,因为这个时候线程还没有执行任务。

尽管 JDK 有 10 个优先级,但是一般只有MAX_PRIORITY,NORM_PRIORITY,MIN_PRIORITY 三种级别。

作出让步

我们上面提过,如果知道一个线程已经在 run() 方法中运行的差不多了,那么它就可以给线程调度器一个提示:我已经完成了任务中最重要的部分,可以让给别的线程使用 CPU 了。这个暗示将通过 yield() 方法作出。

有一个很重要的点就是,Thread.yield() 是建议执行切换CPU,而不是强制执行CPU切换。

对于任何重要的控制或者在调用应用时,都不能依赖于 yield() 方法,实际上, yield() 方法经常被滥用。

后台线程

后台(daemon) 线程,是指运行时在后台提供的一种服务线程,这种线程不是属于必须的。当所有非后台线程结束时,程序也就停止了,**同时会终止所有的后台线程。**反过来说,只要有任何非后台线程还在运行,程序就不会终止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public class SimpleDaemons implements Runnable{

@Override
public void run() {
while (true){
try {
TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread() + " " + this);
} catch (InterruptedException e) {
System.out.println("sleep() interrupted");
}
}
}

public static void main(String[] args) throws InterruptedException {
for(int i = 0;i < 10;i++){
Thread daemon = new Thread(new SimpleDaemons());
daemon.setDaemon(true);
daemon.start();
}
System.out.println("All Daemons started");
TimeUnit.MILLISECONDS.sleep(175);
}
}

在每次的循环中会创建 10 个线程,并把每个线程设置为后台线程,然后开始运行,for 循环会进行十次,然后输出信息,随后主线程睡眠一段时间后停止运行。在每次 run 循环中,都会打印当前线程的信息,主线程运行完毕,程序就执行完毕了。因为 daemon 是后台线程,无法影响主线程的执行。

但是当你把 daemon.setDaemon(true) 去掉时,while(true) 会进行无限循环,那么主线程一直在执行最重要的任务,所以会一直循环下去无法停止。

ThreadFactory

按需要创建线程的对象。使用线程工厂替换了 Thread 或者 Runnable 接口的硬连接,使程序能够使用特殊的线程子类,优先级等。一般的创建方式为

1
2
3
4
5
java复制代码class SimpleThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
return new Thread(r);
}
}

Executors.defaultThreadFactory 方法提供了一个更有用的简单实现,它在返回之前将创建的线程上下文设置为已知值

ThreadFactory 是一个接口,它只有一个方法就是创建线程的方法

1
2
3
4
5
java复制代码public interface ThreadFactory {

// 构建一个新的线程。实现类可能初始化优先级,名称,后台线程状态和 线程组等
Thread newThread(Runnable r);
}

下面来看一个 ThreadFactory 的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码public class DaemonThreadFactory implements ThreadFactory {

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}

public class DaemonFromFactory implements Runnable{

@Override
public void run() {
while (true){
try {
TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread() + " " + this);
} catch (InterruptedException e) {
System.out.println("Interrupted");
}
}
}

public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool(new DaemonThreadFactory());
for(int i = 0;i < 10;i++){
service.execute(new DaemonFromFactory());
}
System.out.println("All daemons started");
TimeUnit.MILLISECONDS.sleep(500);
}
}

Executors.newCachedThreadPool 可以接受一个线程池对象,创建一个根据需要创建新线程的线程池,但会在它们可用时重用先前构造的线程,并在需要时使用提供的 ThreadFactory 创建新线程。

1
2
3
4
5
6
java复制代码public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

加入一个线程

一个线程可以在其他线程上调用 join() 方法,其效果是等待一段时间直到第二个线程结束才正常执行。如果某个线程在另一个线程 t 上调用 t.join() 方法,此线程将被挂起,直到目标线程 t 结束才回复(可以用 t.isAlive() 返回为真假判断)。

也可以在调用 join 时带上一个超时参数,来设置到期时间,时间到期,join方法自动返回。

对 join 的调用也可以被中断,做法是在线程上调用 interrupted 方法,这时需要用到 try…catch 子句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码public class TestJoinMethod extends Thread{

@Override
public void run() {
for(int i = 0;i < 5;i++){
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
System.out.println("Interrupted sleep");
}
System.out.println(Thread.currentThread() + " " + i);
}
}

public static void main(String[] args) throws InterruptedException {
TestJoinMethod join1 = new TestJoinMethod();
TestJoinMethod join2 = new TestJoinMethod();
TestJoinMethod join3 = new TestJoinMethod();

join1.start();
// join1.join();

join2.start();
join3.start();
}
}

join() 方法等待线程死亡。 换句话说,它会导致当前运行的线程停止执行,直到它加入的线程完成其任务。

线程异常捕获

由于线程的本质,使你不能捕获从线程中逃逸的异常,一旦异常逃出任务的 run 方法,它就会向外传播到控制台,除非你采取特殊的步骤捕获这种错误的异常,在 Java5 之前,你可以通过线程组来捕获,但是在 Java 5 之后,就需要用 Executor 来解决问题,因为线程组不是一次好的尝试。

下面的任务会在 run 方法的执行期间抛出一个异常,并且这个异常会抛到 run 方法的外面,而且 main 方法无法对它进行捕获

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public class ExceptionThread implements Runnable{

@Override
public void run() {
throw new RuntimeException();
}

public static void main(String[] args) {
try {
ExecutorService service = Executors.newCachedThreadPool();
service.execute(new ExceptionThread());
}catch (Exception e){
System.out.println("eeeee");
}
}
}

为了解决这个问题,我们需要修改 Executor 产生线程的方式,Java5 提供了一个新的接口 Thread.UncaughtExceptionHandler ,它允许你在每个 Thread 上都附着一个异常处理器。Thread.UncaughtExceptionHandler.uncaughtException() 会在线程因未捕获临近死亡时被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码public class ExceptionThread2 implements Runnable{

@Override
public void run() {
Thread t = Thread.currentThread();
System.out.println("run() by " + t);
System.out.println("eh = " + t.getUncaughtExceptionHandler());

// 手动抛出异常
throw new RuntimeException();
}
}

// 实现Thread.UncaughtExceptionHandler 接口,创建异常处理器
public class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler{

@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("caught " + e);
}
}

public class HandlerThreadFactory implements ThreadFactory {

@Override
public Thread newThread(Runnable r) {
System.out.println(this + " creating new Thread");
Thread t = new Thread(r);
System.out.println("created " + t);
t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
System.out.println("ex = " + t.getUncaughtExceptionHandler());
return t;
}
}

public class CaptureUncaughtException {

public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool(new HandlerThreadFactory());
service.execute(new ExceptionThread2());
}
}

在程序中添加了额外的追踪机制,用来验证工厂创建的线程会传递给UncaughtExceptionHandler,你可以看到,未捕获的异常是通过 uncaughtException 来捕获的。

你好,我是 cxuan,我自己手写了四本 PDF,分别是 Java基础总结、HTTP 核心总结、计算机基础知识,操作系统核心总结,我已经整理成为 PDF,可以关注公众号 Java建设者 回复 PDF 领取优质资料。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

PHP的垃圾回收机制-PHP高级面试题+详解

发表于 2020-08-18

面试10家公司,收获9个offer,2020年PHP 面试问题图标

ps:本篇内容包括精选面试题与知识篇。

PHP面试题关于PHP的垃圾回收机制,PHP的垃圾回收机制引用计数 (reference counting) GC 机制,PHP可以自动进行内存管理,清除不需要的对象,PHP面试题分享PHP关于垃圾回收机制的面试题:

★我的php学习交流社群——856460874。群内管理已准备好 整理好的BAT等一线大厂进阶知识体系备好(相关学习资料以及笔面试题)欢迎获取一起晋升=点击加

面试题篇

  • 介绍一下PHP的垃圾回收机制

PHP使用了引用计数(reference counting)GC机制,同时使用根缓冲区机制,当php发现有存在循环引用的zval时,就会把其投入到根缓冲区,当根缓冲区达到配置文件中的指定数量后,就会进行垃圾回收,以此解决循环引用导致的内存泄漏问题。

    1. 如果引用计数减少到零,所在变量容器将被清除(free),不属于垃圾;
    1. 如果一个zval的引用计数减少后还大于0,那么它会进入垃圾周期。其次,在一个垃圾周期中,通过检查引用计数是否减1,并且检查哪些变量容器的引用次数是零,来发现哪部分是垃圾。

每个对象都内含一个引用计数器refcount,每个reference连接到对象,计数器加1。当reference离开生存空间或被设为 NULL,计数器减1。当某个对象的引用计数器为零时,PHP知道你将不再需要使用这个对象,释放其所占的内存空间。

  • 下列关于PHP垃圾回收的说法,错误的是?

A、开启/关闭垃圾回收机制可以通过修改php配置实现

B、可以在程序中使用gc_enable() 和 gc_disable()开启和关闭。

C、PHP中的垃圾回收机制,会大幅度提升系统性能。

D、开启垃圾回收机制后,针对内存泄露的情况,可以节省大量的内存空间,但是由于垃圾回收算法运行耗费时间,开启垃圾回收算法会增加脚本的执行时间。

参考答案:C

答案解析:PHP中的垃圾回收机制,仅仅在循环回收算法确实运行时会有时间消耗上的增加。但是在平常的(更小的)脚本中应根本就没有性能影响。

  • php垃圾回收机制的说法错误的是?

A、在一个垃圾周期中,通过检查引用计数是否减1,并且检查哪些变量容器的引用次数是零,来发现哪部分是垃圾

B、可以通过调用gc_enable() 和 gc_disable()函数来打开和关闭垃圾回收机制

C、通过清理未被使用的变量来节省内存的占用

D、php代码执行完毕后会自动执行垃圾回收,所以不需要手动执行垃圾回收

参考答案:D

答案解析:php一段代码有可能要长时间执行,但若此期间有未引用的变量的话,就会占用内存的空间,导致运行缓慢等问题


知识篇

一、概念

垃圾回收是一个多数编程语言中都带有的内存管理机制。与非托管性语言相反:C, C++ 和 Objective C,用户需要手动收集内存,带有 GC 机制的语言:Java, javaScript 和 PHP 可以自动管理内存。

垃圾回收机制(gc)顾名思义,就是废物重利用的意思,是一种动态存储分配的方案。它会自动释放程序不再需要的已分配的内存块。垃圾回收机制可以让程序员不必过分关心程序内存分配,从而将更多的精力投入到业务逻辑。

在现在的流行各种语言当中,垃圾回收机制是新一代语言所共有的特征,如Python、PHP、C#、Ruby等都使用了垃圾回收机制。

二、PHP垃圾回收机制

1、在PHP5.3版本之前,使用的垃圾回收机制是单纯的“引用计数”。

什么叫做引用计数?

由于PHP是用C来写的,C里面有一种东西叫做结构体,我们PHP的变量在C中就是用这种方式存储的。

每个PHP的变量都存在于一个叫做zval的容器中,一个zval容器,除了包含变量名和值,还包括两个字节的额外信息:

● 一个叫做’is_ref’,是个布尔值,用来表示这个变量是否属于引用集合,通过这个字节,我们php才能把普通变量和引用变量区分开来。

● 第二个额外字节就是’refcount’,用来表示指向这个容器的变量的个数。

即:

① 每个内存对象都分配一个计数器,当内存对象被变量引用时,计数器+1;

② 当变量引用撤掉后(执行unset()后),计数器-1;

③ 当计数器=0时,表明内存对象没有被使用,该内存对象则进行销毁,垃圾回收完成。

并且PHP在一个生命周期结束后就会释放此进程/线程所占的内容,这种方式决定了PHP在前期不需要过多考虑内存的泄露问题。

但是当两个或多个对象互相引用形成环状后,内存对象的计数器则不会消减为0;这时候,这一组内存对象已经没用了,但是不能回收,从而导致内存泄露的现象。

php5.3开始,使用了新的垃圾回收机制,在引用计数基础上,实现了一种复杂的算法,来检测内存对象中引用环的存在,以避免内存泄露。

  • 2、随着PHP的发展,PHP开发者的增加以及其所承载的业务范围的扩大,在PHP5.3中引入了更加完善的垃圾回收机制,新的垃圾回收机制解决了无法处理循环的引用内存泄漏问题。

如官方文档所说:每个php变量存在一个叫”zval”的变量容器中。一个zval变量容器,除了包含变量的类型和值,还包括两个字节的额外信息。第一个是”is_ref”,是个bool值,用来标识这个变量是否是属于引用集合(reference set)。通过这个字节,php引擎才能把普通变量和引用变量区分开来,由于php允许用户通过使用&来使用自定义引用,zval变量容器中还有一个内部引用计数机制,来优化内存使用。

第二个额外字节是”refcount”,用以表示指向这个zval变量容器的变量(也称符号即symbol)个数。所有的符号存在一个符号表中,其中每个符号都有作用域(scope)。

官方文档所说,可以使用Xdebug来检查引用计数情况:

1
2
3
4
5
6
7
text复制代码<?php
$a = "new string";
$c = $b = $a;
xdebug_debug_zval( 'a' );
unset( $b, $c );
xdebug_debug_zval( 'a' );
?>

以上例程会输出:

1
2
text复制代码a: (refcount=3, is_ref=0)='new string'
a: (refcount=1, is_ref=0)='new string'

注意:从PHP7的NTS版本开始,以上例程的引用将不再被计数,即c=c=c=b=$a之后a的引用计数也是1.具体分类如下:

在PHP 7中,zval可以被引用计数或不被引用。在zval结构中有一个标志确定了这一点。

① 对于null,bool,int和double的类型变量,refcount永远不会计数;

② 对于对象、资源类型,refcount计数和php5的一致;

③ 对于字符串,未被引用的变量被称为“实际字符串”。而那些被引用的字符串被重复删除(即只有一个带有特定内容的被插入的字符串)并保证在请求的整个持续时间内存在,所以不需要为它们使用引用计数;如果使用了opcache,这些字符串将存在于共享内存中,在这种情况下,您不能使用引用计数(因为我们的引用计数机制是非原子的);

④对于数组,未引用的变量被称为“不可变数组”。其数组本身计数与php5一致,但是数组里面的每个键值对的计数,则按前面三条的规则(即如果是字符串也不在计数);如果使用opcache,则代码中的常量数组文字将被转换为不可变数组。

再次,这些生活在共享内存,因此不能使用refcounting。

我们的demo例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
text复制代码<?php
echo '测试字符串引用计数';
$a = "new string";
$b = $a;
xdebug_debug_zval( 'a' );
unset( $b);
xdebug_debug_zval( 'a' );
$b = &$a;
xdebug_debug_zval( 'a' );
echo '测试数组引用计数';
$c = array('a','b');
xdebug_debug_zval( 'c' );
$d = $c;
xdebug_debug_zval( 'c' );
$c[2]='c';
xdebug_debug_zval( 'c' );
echo '测试int型计数';
$e = 1;
xdebug_debug_zval( 'e' );

看到的输出如下:

三、回收周期

默认的,PHP的垃圾回收机制是打开的,然后有个php.ini设置允许你修改它:zend.enable_gc 。

当垃圾回收机制打开时,算法会判断每当根缓存区存满时,就会执行循环查找。根缓存区有固定的大小,默认10,000,可以通过修改PHP源码文件Zend/zend_gc.c中的常量GC_ROOT_BUFFER_MAX_ENTRIES,然后重新编译PHP,来修改这个值。当垃圾回收机制关闭时,循环查找算法永不执行,然而,根将一直存在根缓冲区中,不管在配置中垃圾回收机制是否激活。

除了修改配置zend.enable_gc ,也能通过分别调用gc_enable() 和 gc_disable()函数在运行php时来打开和关闭垃圾回收机制。调用这些函数,与修改配置项来打开或关闭垃圾回收机制的效果是一样的。即使在可能根缓冲区还没满时,也能强制执行周期回收。你能调用gc_collect_cycles()函数达到这个目的。这个函数将返回使用这个算法回收的周期数。

允许打开和关闭垃圾回收机制并且允许自主的初始化的原因,是由于你的应用程序的某部分可能是高时效性的。在这种情况下,你可能不想使用垃圾回收机制。当然,对你的应用程序的某部分关闭垃圾回收机制,是在冒着可能内存泄漏的风险,因为一些可能根也许存不进有限的根缓冲区。

因此,就在你调用gc_disable()函数释放内存之前,先调用gc_collect_cycles()函数可能比较明智。因为这将清除已存放在根缓冲区中的所有可能根,然后在垃圾回收机制被关闭时,可留下空缓冲区以有更多空间存储可能根。

四、性能影响

1、内存占用空间的节省

首先,实现垃圾回收机制的整个原因是为了一旦先决条件满足,通过清理循环引用的变量来节省内存占用。在PHP执行中,一旦根缓冲区满了或者调用gc_collect_cycles() 函数时,就会执行垃圾回收。

2、执行时间增加

垃圾回收影响性能的第二个领域是它释放已泄漏的内存耗费的时间。

通常,PHP中的垃圾回收机制,仅仅在循环回收算法确实运行时会有时间消耗上的增加。但是在平常的(更小的)脚本中应根本就没有性能影响。

3、在平常脚本中有循环回收机制运行的情况下,内存的节省将允许更多这种脚本同时运行在你的服务器上。因为总共使用的内存没达到上限。

这种好处在长时间运行脚本中尤其明显,诸如长时间的测试套件或者daemon脚本此类。同时,对通常比Web脚本运行时间长的脚本应用程序,新的垃圾回收机制,应该会大大改变一直以来认为内存泄漏问题难以解决的看法。

最后,祝所有大家在面试中过关斩将,拿到心仪offer。

对此我整理了一些资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、服务器性能调优、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql优化、shell脚本、Docker、微服务、Nginx等多个知识点高级进阶干货需要的可以免费分享给大家

如果想与一群3-8年资深开发者一起交流学习的话,需要,我的官方群-点击此处。

腾讯T3-T4标准精品PHP架构师教程目录大全,只要你看完保证薪资上升一个台阶(持续更新)​!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

深入理解golang中的接口

发表于 2020-08-17

接口是Go语言编程中数据类型的关键。在Go语言的实际编程中,几乎所有的数据结构都围绕接口展开,接口是Go语言中所有数据结构的核心。Go语言中的接口实际上是一组方法的集合,接口和gomock配合使用可以使得我们写出易于测试的代码.但是除了在反射等使用场景中我们很难直接感知到接口的存在(虽然大多数人使用反射的时候也没有感知到接口在其中发挥的作用),但是想要深入理解Go语言,我们必须对接口有足够的了解.接下来我们将从接口的数据结构、结构体如何转变成interface和Go语言中动态派发的实现这些方面来一起学习Go语言中的接口.

简述

在我们揭开interface的面纱前,先让我们一起去了解下在开发中使用接口能够给我们带来哪些好处。提到接口则不得不提面向对象设计中的依赖倒置原则,由Object Mentor 公司总裁罗伯特·马丁(Robert C.Martin)于 1996 年在 C++ Report 上发表的文章首先提出。依赖倒置的原始定义为:

High level modules shouldnot depend upon low level modules.Both should depend upon abstractions.Abstractions should not depend upon details. Details should depend upon abstractions

其核心思想是:要面向接口编程,而不是面向实现编程。由于在软件设计中,细节具有多边形,而设计良好的抽象层则更加稳定,因此以抽象为基础搭建起来的架构要比以细节为基础搭建起来的架构要稳定得多.
在Java和C#这些面向对象的程序语言中都有接口的概念。以Java为例,Java中的接口除了定义方法签名之外,还可以定义变量,在实现了此接口的类中可以直接使用这些变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
csharp复制代码public interface HumanInterface{
public String name="test";
public void eat();
}

public class Man implements HumanInterface{
public void eat(){
System.out.println(name+"eat a lot")
}
}
public class Women implements HumanInterface{
public void eat(){
System.out.println(name+"eat very little")
}
}

Java中的类必须显式的声明实现的接口,但是在Go语言中接口是隐式实现的,只需要实现了接口中定义的全部方法及实现了接口。

数据结构

使用

1
2
3
4
5
6
7
8
9
10
11
12
go复制代码type myinterface interface {
Func1() string
Func2() string
}
type MyStruct struct {
}
func (m *MyStruct) Func1() string {
return fmt.Sprintf("Func1 implement")
}
func (m *MyStruct) Func2() string {
return fmt.Sprintf("Func2 implement")
}

从上面的代码中我们发现MyStruct的实现中并没有找到myinterface的身影,就像上面提到的Go语言中的接口实现都是隐式的。如果我们在上述实现中去掉Func2 方法的实现,如果在具体的使用代码中没有涉及到变量赋值(变量类型为myinterface)、传递参数(接收者为myinterface)以及返回参数(返回参数类型为myinterface)并不会出现编译出错的情况。这是因为Go语言只会在上述三种情况下才会检查类型是否实现了对应的接口。

iface 和eface

Go语言中接口分为两种类型,分别是包含一组的方法的接口和空接口。在src/runtime/runtime2.go文件中分别使用iface和eface两个结构体来描述。空接口是接口类型的特殊形式,空接口没有任何方法,因此任何类型都无需实现空接口。从实现的角度看,任何值都满足这个接口的需求。因此空接口类型可以保存任何值,也可以从空接口中转换出原值。两种接口都是用interface声明。但是由于空接口在Go语言中非常常见,所以使用特殊类型实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
go复制代码type iface struct {
tab *itab
data unsafe.Pointer
}

type itab struct {
inter *interfacetype //接口定义的类型信息
_type *_type //接口实际指向值得类型信息
hash uint32 // copy of _type.hash. Used for type switches.
_ [4]byte
fun [1]uintptr // 接口方法实现列表,即函数地址列表,按字典序排序
}

type eface struct {
_type *_type //类型
data unsafe.Pointer //底层数据的指针
}

type _type struct {
size uintptr //存储了类型需要占用的内存空间,主要在初始化时内存分配使用
ptrdata uintptr // size of memory prefix holding all pointers
hash uint32 //用于判断类型相等
tflag tflag //类型的Tags
align uint8 //结构体内对齐
fieldAlign uint8 //结构体作为field时的对齐
kind uint8 //类型编号,定义域runtime/typekind.go
// function for comparing objects of this type
// (ptr to object A, ptr to object B) -> ==?
equal func(unsafe.Pointer, unsafe.Pointer) bool
// gcdata stores the GC type data for the garbage collector.
// If the KindGCProg bit is set in kind, gcdata is a GC program.
// Otherwise it is a ptrmask bitmap. See mbitmap.go for details.
gcdata *byte
str nameOff
ptrToThis typeOff

//nameOff 和 typeOff 类型是 int32 ,这两个值是链接器负责嵌入的,相对于可执行文件的元信息的偏移量。元信息会在运行期,加载到 runtime.moduledata 结构体中 (src/runtime/symtab.go)。
runtime 提供了一些 helper 函数,这些函数能够帮你找到相对于 moduledata 的偏移量,比如 resolveNameOff (src/runtime/type.go) and resolveTypeOff (src/runtime/type.go)
}

在上面得代码中我们给出了_type和itab类型字段得解释,当然我们并不需要对每个字段都了解其用途,只需要有个大概的概念。

_type结构体相对较为简单,并没有太多可说之处,相信各位读者对照着注释就可以轻松理解。所以接下来我们就聊聊itab结构体。首先itab除了_type字段外多了interfacetype。interfacetype从字面上来说可以轻易得知它代表的是当前的接口类型,那么_type对应的则必然是接口所指向值的类型信息,

hash则是_type.hash的拷贝,fun数组持有组成该interface虚函数表的函数的指针,所以fun数组保存的元素数量和具体类型相关联而无法设置成固定大小。

1
2
3
4
5
6
7
8
9
10
rust复制代码type interfacetype struct {
typ _type
pkgpath name
mhdr []imethod
}

type imethod struct {
name nameOff
ityp typeOff
}

interfacetype定义于src/runtime/type.go文件中,由三个字段组成,除了typ这个Go语言类型的runtime表示,还有pkgpath和mhdr两个字段,其主要作用就是interface的公共描述,类似的还有maptype、arraytype、chantype等,这些都在type.go文件中由定义,可以理解成Go语言类型的runtime外在的表现信息。

变量是如何转变成interface的

在上一部分内容中我们已经了解了interface的数据结构,接下来让我们通过下面的代码来了解它们时如何被初始化的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
go复制代码func main(){
var temp myinterface = MyStruct{ID:1}
temp.Func1()

}
type myinterface interface {
Func1() string
Func2() string
}

type MyStruct struct {
ID int64
ptr *int64
}
//go:noinline
func (m MyStruct) Func1() string {
return fmt.Sprintf("Func1 implement")
}
//go:noinline
func (m MyStruct) Func2() string {
return fmt.Sprintf("Func2 implement")
}

使用go tool compile -N -S -l test.go查看生成的汇编代码。在此我们只需要关心 var temp myinterface = MyStruct{ID:1} 这一行代码的细节,其他暂时忽略。生成的汇编代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
scss复制代码0x0024 00036 (test.go:8)        PCDATA  $0, $0
0x0024 00036 (test.go:8) PCDATA $1, $1
0x0024 00036 (test.go:8) XORPS X0, X0
0x0027 00039 (test.go:8) MOVUPS X0, ""..autotmp_1+48(SP)
0x002c 00044 (test.go:8) MOVQ $1, ""..autotmp_1+48(SP)
0x0035 00053 (test.go:8) PCDATA $0, $1
0x0035 00053 (test.go:8) LEAQ go.itab."".MyStruct,"".myinterface(SB), AX
0x003c 00060 (test.go:8) PCDATA $0, $0
0x003c 00060 (test.go:8) MOVQ AX, (SP)
0x0040 00064 (test.go:8) PCDATA $0, $1
0x0040 00064 (test.go:8) PCDATA $1, $0
0x0040 00064 (test.go:8) LEAQ ""..autotmp_1+48(SP), AX
0x0045 00069 (test.go:8) PCDATA $0, $0
0x0045 00069 (test.go:8) MOVQ AX, 8(SP)
0x004a 00074 (test.go:8) CALL runtime.convT2I(SB)
0x004f 00079 (test.go:8) PCDATA $0, $1
0x004f 00079 (test.go:8) MOVQ 24(SP), AX
0x0054 00084 (test.go:8) MOVQ 16(SP), CX
0x0059 00089 (test.go:8) PCDATA $1, $2
0x0059 00089 (test.go:8) MOVQ CX, "".temp+32(SP)
0x005e 00094 (test.go:8) PCDATA $0, $0
0x005e 00094 (test.go:8) MOVQ AX, "".temp+40(SP)

将上述过程分成三个部分

1. 分配空间

1
2
3
4
bash复制代码MOVQ    $1, ""..autotmp_1+48(SP)
...
LEAQ ""..autotmp_1+48(SP), AX
MOVQ AX, 8(SP)

1对应的是MyStruct的ID,,被存储在当前栈帧的自底向上+48偏移量的位置,。后续编译器可以根据它的存储位置来用地址对其进行引用。

2. 创建itab

1
2
go复制代码 LEAQ    go.itab."".MyStruct,"".myinterface(SB), AX
MOVQ AX, (SP)

看上去编译器已经为提前创建了必需的itab来表示iface,并且通过全局符号提供给我们使用。编译器这么做的原因不言而喻,毕竟不管在运行时创建了多少iface<myinterface,MyStruct>,只需要一个itab,从itab内的定义也可以看出其并不会和运行时所初始化的变量由任何关系。
在本文中并不会继续深入了解 go.itab.””.MyStruct,””.myinterface符号
,感兴趣的同学看这篇文章
,非常的深入细致

3. 分配数据

1
2
3
scss复制代码CALL    runtime.convT2I(SB)
MOVQ 24(SP), AX
MOVQ 16(SP), CX

在1、2中我们看到了解到目前栈顶(SP)保存着 go.itab.””.MyStruct,””.myinterface 的地址,8(sp)则保存着变量的地址。上面两个指针会作为参数传给convT2I函数,此函数会创建并返回interface。 src/runtime/iface.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scss复制代码func convT2I(tab *itab, elem unsafe.Pointer) (i iface) {
t := tab._type
if raceenabled {
raceReadObjectPC(t, elem, getcallerpc(), funcPC(convT2I))
}
if msanenabled {
msanread(elem, t.size)
}
x := mallocgc(t.size, t, true)
typedmemmove(t, x, elem)
i.tab = tab
i.data = x
return
}

上述代码做了4件事情:

  1. 它创建了一个 iface 的结构体 i。
  2. 它将我们刚给 i.tab 赋的值赋予了 itab 指针。
  3. 它 在堆上分配了一个 i.tab._type 的新对象 i.tab._type,然后将第二个参数 elem 指向的值拷贝到这个新对象上。
  4. 将最后的 interface 返回。
    现在我们终于得到了完整的interface

动态派发实现

下面是第一行实例化的汇编代码

1
2
3
4
5
6
7
8
9
10
scss复制代码MOVQ    $1, ""..autotmp_1+48(SP)
LEAQ go.itab."".MyStruct,"".myinterface(SB), AX
MOVQ AX, (SP)
LEAQ ""..autotmp_1+48(SP), AX
MOVQ AX, 8(SP)
CALL runtime.convT2I(SB)
MOVQ 24(SP), AX
MOVQ 16(SP), CX
MOVQ CX, "".temp+32(SP)
MOVQ AX, "".temp+40(SP)

接着是对方法间接调用的汇编代码

1
2
3
4
5
scss复制代码MOVQ    "".temp+32(SP), AX
MOVQ 24(AX), AX
MOVQ "".temp+40(SP), CX
MOVQ CX, (SP)
CALL AX

AX中保存的是itab的指针,实际上是指向go.itab.””.MyStruct,””.myinterface的指针.对其解饮用并offset 24个字节,上面itab的结构体定义我们可以得知此时指向的itab.fun . 并且我们已经知道了fun[0]实际上指向的是main.(MyStruct).Func1的指针. 因为方法本身没有参数,所以在入参的时候只需要传入receiver,并通过CALL指令即可完成函数调用.

如果我们修改代码为如下形式

1
scss复制代码temp.Func2()

这是再查看汇编代码,则和最初的有所不同

1
2
3
4
5
scss复制代码MOVQ    "".temp+32(SP), AX
MOVQ 32(AX), AX
MOVQ "".temp+40(SP), CX
MOVQ CX, (SP)
CALL AX

轻易可以得知其获取到的函数指针相对第一次的增加了8字节的偏移,这个很容易理解,因为上面提到过fun字段是接口方法实现列表是按照字典序排序的.

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

反制面试官 14张原理图 再也不怕被问 volati

发表于 2020-08-17

悟空
爱学习的程序猿,自主开发了Java学习平台、PMP刷题小程序。目前主修Java、多线程、SpringBoot、SpringCloud、k8s。本公众号不限于分享技术,也会分享工具的使用、人生感悟、读书总结。

絮叨

这一篇也算是Java并发编程的开篇,看了很多资料,但是轮到自己去整理去总结的时候,发现还是要多看几遍资料才能完全理解。还有一个很重要的点就是,画图是加深印象和检验自己是否理解的一个非常好的方法。

一、Volatile怎么念?

volatile怎么念

看到这个单词一直不知道怎么发音

1
2
3
sh复制代码英 [ˈvɒlətaɪl]  美 [ˈvɑːlətl]

adj. [化学] 挥发性的;不稳定的;爆炸性的;反复无常的

那Java中volatile又是干啥的呢?

二、Java中volatile用来干啥?

  • Volatile是Java虚拟机提供的轻量级的同步机制(三大特性)
    • 保证可见性
    • 不保证原子性
    • 禁止指令重排

要理解三大特性,就必须知道Java内存模型(JMM),那JMM又是什么呢?

volatile怎么念

三、JMM又是啥?

这是一份精心总结的Java内存模型思维导图,拿去不谢。

拿走不谢

原理图1-Java内存模型

3.1 为什么需要Java内存模型?

Why:屏蔽各种硬件和操作系统的内存访问差异

JMM是Java内存模型,也就是Java Memory Model,简称JMM,本身是一种抽象的概念,实际上并不存在,它描述的是一组规则或规范,通过这组规范定义了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式。

3.2 到底什么是Java内存模型?

  • 1.定义程序中各种变量的访问规则
  • 2.把变量值存储到内存的底层细节
  • 3.从内存中取出变量值的底层细节

3.3 Java内存模型的两大内存是啥?

原理图2-两大内存

  • 主内存
    • Java堆中对象实例数据部分
    • 对应于物理硬件的内存
  • 工作内存
    • Java栈中的部分区域
    • 优先存储于寄存器和高速缓存

3.4 Java内存模型是怎么做的?

Java内存模型的几个规范:

  • 1.所有变量存储在主内存
  • 2.主内存是虚拟机内存的一部分
  • 3.每条线程有自己的工作内存
  • 4.线程的工作内存保存变量的主内存副本
  • 5.线程对变量的操作必须在工作内存中进行
  • 6.不同线程之间无法直接访问对方工作内存中的变量
  • 7.线程间变量值的传递均需要通过主内存来完成

由于JVM运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存(有些地方称为栈空间),工作内存是每个线程的私有数据区域,而Java内存模型中规定所有变量都存储在主内存,主内存是共享内存区域,所有线程都可以访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行,首先要将变量从主内存拷贝到自己的工作内存空间,然后对变量进行操作,操作完成后再将变量写会主内存,不能直接操作主内存中的变量,各个线程中的工作内存中存储着主内存中的变量副本拷贝,因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成,其简要访问过程:

原理图3-Java内存模型

3.5 Java内存模型的三大特性

  • 可见性(当一个线程修改了共享变量的值时,其他线程能够立即得知这个修改)
  • 原子性(一个操作或一系列操作是不可分割的,要么同时成功,要么同时失败)
  • 有序性(变量赋值操作的顺序与程序代码中的执行顺序一致)

关于有序性:如果在本线程内观察,所有的操作都是有序的;如果在一个线程中观察另一个线程,所有的操作都是无序的。前半句是指“线程内似表现为串行的语义”(Within-Thread As-If-Serial Semantics),后半句是指“指令重排序”现象和“工作内存与主内存同步延迟”现象。

四、能给个示例说下怎么用volatile的吗?

考虑一下这种场景:

有一个对象的字段number初始化值=0,另外这个对象有一个公共方法setNumberTo100()可以设置number = 100,当主线程通过子线程来调用setNumberTo100()后,主线程是否知道number值变了呢?

答案:如果没有使用volatile来定义number变量,则主线程不知道子线程更新了number的值。

(1)定义如上述所说的对象:ShareData

1
2
3
4
5
6
7
java复制代码class ShareData {
int number = 0;

public void setNumberTo100() {
this.number = 100;
}
}

(2)主线程中初始化一个子线程,名字叫做子线程

子线程先休眠3s,然后设置number=100。主线程不断检测的number值是否等于0,如果不等于0,则退出主线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码public class volatileVisibility {
public static void main(String[] args) {
// 资源类
ShareData shareData = new ShareData();

// 子线程 实现了Runnable接口的,lambda表达式
new Thread(() -> {

System.out.println(Thread.currentThread().getName() + "\t come in");

// 线程睡眠3秒,假设在进行运算
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 修改number的值
myData.setNumberTo100();

// 输出修改后的值
System.out.println(Thread.currentThread().getName() + "\t update number value:" + myData.number);

}, "子线程").start();

while(myData.number == 0) {
// main线程就一直在这里等待循环,直到number的值不等于零
}

// 按道理这个值是不可能打印出来的,因为主线程运行的时候,number的值为0,所以一直在循环
// 如果能输出这句话,说明子线程在睡眠3秒后,更新的number的值,重新写入到主内存,并被main线程感知到了
System.out.println(Thread.currentThread().getName() + "\t 主线程感知到了 number 不等于 0");

/**
* 最后输出结果:
* 子线程 come in
* 子线程 update number value:100
* 最后线程没有停止,并行没有输出"主线程知道了 number 不等于0"这句话,说明没有用volatile修饰的变量,变量的更新是不可见的
*/
}
}

没有使用volatile

(3)我们用volatile修饰变量number

1
2
3
4
5
6
7
8
csharp复制代码class ShareData {
//volatile 修饰的关键字,是为了增加多个线程之间的可见性,只要有一个线程修改了内存中的值,其它线程也能马上感知
volatile int number = 0;

public void setNumberTo100() {
this.number = 100;
}
}

输出结果:

1
2
3
4
5
sh复制代码子线程	 come in
子线程 update number value:100
main 主线程知道了 number 不等于 0

Process finished with exit code 0

mark

小结:说明用volatile修饰的变量,当某线程更新变量后,其他线程也能感知到。

五、那为什么其他线程能感知到变量更新?

mark

其实这里就是用到了“窥探(snooping)”协议。在说“窥探(snooping)”协议之前,首先谈谈缓存一致性的问题。

5.1 缓存一致性

当多个CPU持有的缓存都来自同一个主内存的拷贝,当有其他CPU偷偷改了这个主内存数据后,其他CPU并不知道,那拷贝的内存将会和主内存不一致,这就是缓存不一致。那我们如何来保证缓存一致呢?这里就需要操作系统来共同制定一个同步规则来保证,而这个规则就有MESI协议。

如下图所示,CPU2 偷偷将num修改为2,内存中num也被修改为2,但是CPU1和CPU3并不知道num值变了。

原理图4-缓存一致性1

5.2 MESI

当CPU写数据时,如果发现操作的变量是共享变量,即在其它CPU中也存在该变量的副本,系统会发出信号通知其它CPU将该内存变量的缓存行设置为无效。如下图所示,CPU1和CPU3 中num=1已经失效了。

原理图5-缓存一致性2

当其它CPU读取这个变量的时,发现自己缓存该变量的缓存行是无效的,那么它就会从内存中重新读取。

如下图所示,CPU1和CPU3发现缓存的num值失效了,就重新从内存读取,num值更新为2。

原理图6-缓存一致性3

5.3 总线嗅探

那其他CPU是怎么知道要将缓存更新为失效的呢?这里是用到了总线嗅探技术。

每个CPU不断嗅探总线上传播的数据来检查自己缓存值是否过期了,如果处理器发现自己的缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置为无效状态,当处理器对这个数据进行修改操作的时候,会重新从内存中把数据读取到处理器缓存中。

原理图7-缓存一致性4

5.4 总线风暴

总线嗅探技术有哪些缺点?

由于MESI缓存一致性协议,需要不断对主线进行内存嗅探,大量的交互会导致总线带宽达到峰值。因此不要滥用volatile,可以用锁来替代,看场景啦~

六、能演示下volatile为什么不保证原子性吗?

原子性:一个操作或一系列操作是不可分割的,要么同时成功,要么同时失败。

这个定义和volatile啥关系呀,完全不能理解呀?Show me the code!

考虑一下这种场景:

当20个线程同时给number自增1,执行1000次以后,number的值为多少呢?

在单线程的场景,答案是20000,如果是多线程的场景下呢?答案是可能是20000,但很多情况下都是小于20000。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码package com.jackson0714.passjava.threads;

/**
演示volatile 不保证原子性
* @create: 2020-08-13 09:53
*/

public class VolatileAtomicity {
public static volatile int number = 0;

public static void increase() {
number++;
}

public static void main(String[] args) {

for (int i = 0; i < 50; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
increase();
}
}, String.valueOf(i)).start();
}

// 当所有累加线程都结束
while(Thread.activeCount() > 2) {
Thread.yield();
}

System.out.println(number);
}
}

执行结果:第一次19144,第二次20000,第三次19378。

volatile第一次执行结果

volatile第二次执行结果

volatile第三次执行结果

我们来分析一下increase()方法,通过反编译工具javap得到如下汇编代码:

1
2
3
4
5
6
7
java复制代码  public static void increase();
Code:
0: getstatic #2 // Field number:I
3: iconst_1
4: iadd
5: putstatic #2 // Field number:I
8: return

number++其实执行了3条指令:

getstatic:拿number的原始值
iadd:进行加1操作
putfield:把加1后的值写回

执行了getstatic指令number的值取到操作栈顶时,volatile关键字保证了number的值在此时是正确的,但是在执行iconst_1、iadd这些指令的时候,其他线程可能已经把number的值改变了,而操作栈顶的值就变成了过期的数据,所以putstatic指令执行后就可能把较小的number值同步回主内存之中。

总结如下:

在执行number++这行代码时,即使使用volatile修饰number变量,在执行期间,还是有可能被其他线程修改,没有保证原子性。

七、怎么保证输出结果是20000呢?

7.1 synchronized同步代码块

我们可以通过使用synchronized同步代码块来保证原子性。从而使结果等于20000

1
2
3
java复制代码public synchronized static void increase() {
number++;
}

synchronized同步代码块执行结果

但是使用synchronized太重了,会造成阻塞,只有一个线程能进入到这个方法。我们可以使用Java并发包(JUC)中的AtomicInterger工具包。

7.2 AtomicInterger原子性操作

我们来看看AtomicInterger原子自增的方法getAndIncrement()

AtomicInterger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public static AtomicInteger atomicInteger = new AtomicInteger();

public static void main(String[] args) {

for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
atomicInteger.getAndIncrement();
}
}, String.valueOf(i)).start();
}

// 当所有累加线程都结束
while(Thread.activeCount() > 2) {
Thread.yield();
}

System.out.println(atomicInteger);
}

多次运行的结果都是20000。

getAndIncrement的执行结果

八、禁止指令重排又是啥?

说到指令重排就得知道为什么要重排,有哪几种重排。

如下图所示,指令执行顺序是按照1>2>3>4的顺序,经过重排后,执行顺序更新为指令3->4->2->1。

原理图8-指令重排

会不会感觉到重排把指令顺序都打乱了,这样好吗?

可以回想下小学时候的数学题:2+3-5=?,如果把运算顺序改为3-5+2=?,结果也是一样的。所以指令重排是要保证单线程下程序结果不变的情况下做重排。

8.1 为什么要重排

计算机在执行程序时,为了提高性能,编译器和处理器常常会对指令做重排序。

8.2 有哪几种重排

  • 1.编译器优化重排:编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。
  • 2.指令级的并行重排:现代处理器采用了指令级并行技术来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。
  • 3.内存系统的重排:由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。

原理图9-三种重排

注意:

  • 单线程环境里面确保最终执行结果和代码顺序的结果一致
  • 处理器在进行重排序时,必须要考虑指令之间的数据依赖性
  • 多线程环境中线程交替执行,由于编译器优化重排的存在,两个线程中使用的变量能否保证一致性是无法确定的,结果无法预测。

8.3 举个例子来说说多线程中的指令重排?

设想一下这种场景:定义了变量num=0和变量flag=false,线程1调用初始化函数init()执行后,线程调用add()方法,当另外线程判断flag=true后,执行num+100操作,那么我们预期的结果是num会等于101,但因为有指令重排的可能,num=1和flag=true执行顺序可能会颠倒,以至于num可能等于100

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public class VolatileResort {
static int num = 0;
static boolean flag = false;
public static void init() {
num= 1;
flag = true;
}
public static void add() {
if (flag) {
num = num + 5;
System.out.println("num:" + num);
}
}
public static void main(String[] args) {
init();
new Thread(() -> {
add();
},"子线程").start();
}
}

先看线程1中指令重排:

num= 1;flag = true; 的执行顺序变为 flag=true;num = 1;,如下图所示的时序图

原理图10-线程1指令重排

如果线程2 num=num+5 在线程1设置num=1之前执行,那么线程2的num变量值为5。如下图所示的时序图。

原理图11-线程2在num=1之前执行

8.4 volatile怎么实现禁止指令重排?

我们使用volatile定义flag变量:

1
java复制代码static volatile boolean flag = false;

如何实现禁止指令重排:

原理:在volatile生成的指令序列前后插入内存屏障(Memory Barries)来禁止处理器重排序。

有如下四种内存屏障:

四种内存屏障

volatile写的场景如何插入内存屏障:

  • 在每个volatile写操作的前面插入一个StoreStore屏障(写-写 屏障)。
  • 在每个volatile写操作的后面插入一个StoreLoad屏障(写-读 屏障)。

原理图12-volatile写的场景如何插入内存屏障

StoreStore屏障可以保证在volatile写(flag赋值操作flag=true)之前,其前面的所有普通写(num的赋值操作num=1) 操作已经对任意处理器可见了,保障所有普通写在volatile写之前刷新到主内存。

volatile读场景如何插入内存屏障:

  • 在每个volatile读操作的后面插入一个LoadLoad屏障(读-读 屏障)。
  • 在每个volatile读操作的后面插入一个LoadStore屏障(读-写 屏障)。

原理图13-volatile读场景如何插入内存屏障

LoadStore屏障可以保证其后面的所有普通写(num的赋值操作num=num+5) 操作必须在volatile读(if(flag))之后执行。

十、volatile常见应用

这里举一个应用,双重检测锁定的单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码package com.jackson0714.passjava.threads;
/**
演示volatile 单例模式应用(双边检测)
* @author: 悟空聊架构
* @create: 2020-08-17
*/

class VolatileSingleton {
private static VolatileSingleton instance = null;
private VolatileSingleton() {
System.out.println(Thread.currentThread().getName() + "\t 我是构造方法SingletonDemo");
}
public static VolatileSingleton getInstance() {
// 第一重检测
if(instance == null) {
// 锁定代码块
synchronized (VolatileSingleton.class) {
// 第二重检测
if(instance == null) {
// 实例化对象
instance = new VolatileSingleton();
}
}
}
return instance;
}
}

代码看起来没有问题,但是 instance = new VolatileSingleton();其实可以看作三条伪代码:

1
2
3
java复制代码memory = allocate(); // 1、分配对象内存空间
instance(memory); // 2、初始化对象
instance = memory; // 3、设置instance指向刚刚分配的内存地址,此时instance != null

步骤2 和 步骤3之间不存在 数据依赖关系,而且无论重排前 还是重排后,程序的执行结果在单线程中并没有改变,因此这种重排优化是允许的。

1
2
3
java复制代码memory = allocate(); // 1、分配对象内存空间
instance = memory; // 3、设置instance指向刚刚分配的内存地址,此时instance != null,但是对象还没有初始化完成
instance(memory); // 2、初始化对象

如果另外一个线程执行:if(instance == null) 时,则返回刚刚分配的内存地址,但是对象还没有初始化完成,拿到的instance是个假的。如下图所示:

原理图14-双重检锁存在的并发问题

解决方案:定义instance为volatile变量

1
java复制代码private static volatile VolatileSingleton instance = null;

十一、volatile都不保证原子性,为啥我们还要用它?

奇怪的是,volatile都不保证原子性,为啥我们还要用它?

volatile是轻量级的同步机制,对性能的影响比synchronized小。

典型的用法:检查某个状态标记以判断是否退出循环。

比如线程试图通过类似于数绵羊的传统方法进入休眠状态,为了使这个示例能正确执行,asleep必须为volatile变量。否则,当asleep被另一个线程修改时,执行判断的线程却发现不了。

那为什么我们不直接用synchorized,lock锁?它们既可以保证可见性,又可以保证原子性为何不用呢?

因为synchorized和lock是排他锁(悲观锁),如果有多个线程需要访问这个变量,将会发生竞争,只有一个线程可以访问这个变量,其他线程被阻塞了,会影响程序的性能。

注意:当且仅当满足以下所有条件时,才应该用volatile变量

  • 对变量的写入操作不依赖变量的当前值,或者你能确保只有单个线程更新变量的值。
  • 该变量不会与其他的状态一起纳入不变性条件中。
  • 在访问变量时不需要加锁。

十二、volatile和synchronzied的区别

  • volatile只能修饰实例变量和类变量,synchronized可以修饰方法和代码块。
  • volatile不保证原子性,而synchronized保证原子性
  • volatile 不会造成阻塞,而synchronized可能会造成阻塞
  • volatile 轻量级锁,synchronized重量级锁
  • volatile 和synchronized都保证了可见性和有序性

十三、小结

  • volatile 保证了可见性:当一个线程修改了共享变量的值时,其他线程能够立即得知这个修改。
  • volatile 保证了单线程下指令不重排:通过插入内存屏障保证指令执行顺序。
  • volatitle不保证原子性,如a++这种自增操作是有并发风险的,比如扣减库存、发放优惠券的场景。
  • volatile 类型的64位的long型和double型变量,对该变量的读/写具有原子性。
  • volatile 可以用在双重检锁的单例模式种,比synchronized性能更好。
  • volatile 可以用在检查某个状态标记以判断是否退出循环。

参考资料:

《深入理解Java虚拟机》

《Java并发编程的艺术》

《Java并发编程实战》

期待后篇么?CAS走起!

你好,我是悟空哥,「7年项目开发经验,全栈工程师,开发组长,超喜欢图解编程底层原理」。

我还手写了 2 个小程序,Java 刷题小程序,PMP 刷题小程序,点击我的公众号菜单打开!

另外有 111 本架构师资料以及 1000 道 Java 面试题,都整理成了PDF。

可以关注公众号 「悟空聊架构」 回复 悟空 领取优质资料。

「转发->在看->点赞->收藏->评论!!!」 是对我最大的支持!

《Java并发必知必会》系列:

1.反制面试官 | 14张原理图 | 再也不怕被问 volatile!

2.程序员深夜惨遭老婆鄙视,原因竟是CAS原理太简单?

3.用积木讲解ABA原理 | 老婆居然又听懂了!

4.全网最细 | 21张图带你领略集合的线程不安全

5.5000字 | 24张图带你彻底理解Java中的21种锁

6.干货 | 一口气说出18种队列(Queue),面试稳了

这三年被分布式坑惨了,曝光十大坑 | 🏆 技术专题第五期征文

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

终于把Redis场景设计搞清楚了,需要掌握的都在这了!

发表于 2020-08-17

分布式缓存是分布式系统中的重要组件,主要解决高并发、大数据场景下,热点数据访问的性能问题,提供高性能的数据快速访问。

使用缓存常见场景是:项目中部分数据访问比较频繁,对下游 DB(例如 MySQL)造成服务压力,这时候可以使用缓存来提高效率。下面来讲BAT等一线企业中Redis各种应用场景核心设计!

一、常用指令

接下来看看每个数据结构常用的指令有哪些,我们用一张表比较清晰的展示:

二、场景解析

1.1string存储

1.2String 类型使用场景

场景一:商品库存数

从业务上,商品库存数据是热点数据,交易行为会直接影响库存。而 Redis 自身 String 类型提供了:

  1. set goods_id 10; 设置 id 为 good_id 的商品的库存初始值为 10;
  2. decr goods_id; 当商品被购买时候,库存数据减 1。

**依次类推的场景:**商品的浏览次数,问题或者回复的点赞次数等。这种计数的场景都可以考虑利用 Redis 来实现。

场景二:时效信息存储

Redis 的数据存储具有自动失效能力。也就是存储的 key-value 可以设置过期时间:set(key, value, expireTime)。

比如,用户登录某个 App 需要获取登录验证码, 验证码在 30 秒内有效。那么我们就可以使用 String 类型存储验证码,同时设置 30 秒的失效时间。

2.1hash存储数据

2.2Hash 类型使用场景

Redis 在存储对象(例如:用户信息)的时候需要对对象进行序列化转换然后存储。

还有一种形式,就是将对象数据转换为 JSON 结构数据,然后存储 JSON 的字符串到 Redis。

对于一些对象类型,还有一种比较方便的类型,那就是按照 Redis 的 Hash 类型进行存储。

例如,我们存储一些网站用户的基本信息, 我们可以使用:

这样就存储了一个用户基本信息,存储信息有:{name : 小明, phone : “123456”,sex : “男”}

当然这种类似场景还非常多, 比如存储订单的数据,产品的数据,商家基本信息等。以淘宝购物车为主

2.3实现信息存储的优缺点

1.原生:

  • set user: 1:name james;
  • set user:1:age 23;
  • set user:1:sex boy;

**优点:**简单直观,每个键对应一个值

**缺点:**键数过多,占用内存多,用户信息过于分散,不用于生产环境

2.将对象序列化存入

redis set user:1 serial ize (userInfo);

**优点:**编程简单,若使用序列化合理内存使用率高

**缺点:**序列化与反序列化有一定开销,更新属性时需要把userInfo全取出来进行反序列化,更新后再序列化到redis

3.hash存储:

hmset user:1 name james age 23 sex boy

**优点:**简单直观,使用合理可减少内存空间消耗

**缺点:**要控制ziplist 与hashtable两种编码转换,Mhashtable会消耗更多内存。

3.1List 类型使用场景

list 是按照插入顺序排序的字符串链表。可以在头部和尾部插入新的元素(双向链表实现,两端添加元素的时间复杂度为 O(1)) 。

场景一:消息队列实现

目前有很多专业的消息队列组件 Kafka、RabbitMQ 等。 我们在这里仅仅是使用 list 的特征来实现消息队列的要求。在实际技术选型的过程中,大家可以慎重思考。

list 存储就是一个队列的存储形式:

  1. lpush key value; 在 key 对应 list 的头部添加字符串元素;
  2. rpop key;移除列表的最后一个元素,返回值为移除的元素。

场景二:最新上架商品

在交易网站首页经常会有新上架产品推荐的模块, 这个模块是存储了最新上架前 100 名。

这时候使用 Redis 的 list 数据结构,来进行 TOP 100 新上架产品的存储。

Redis ltrim 指令对一个列表进行修剪(trim),这样 list 就会只包含指定范围的指定元素。

start 和 stop 都是由 0 开始计数的,这里的 0 是列表里的第一个元素(表头),1 是第二个元素。

如下伪代码演示:

4.1set 类型使用场景

set 也是存储了一个集合列表功能。和 list 不同,set 具备去重功能。当需要存储一个列表信息,同时要求列表内的元素不能有重复,这时候使用 set 比较合适。与此同时,set 还提供的交集、并集、差集。

例如,在交易网站,我们会存储用户感兴趣的商品信息,在进行相似用户分析的时候, 可以通过计算两个不同用户之间感兴趣商品的数量来提供一些依据。

获取到两个用户相似的产品, 然后确定相似产品的类目就可以进行用户分析。

类似的应用场景还有, 社交场景下共同关注好友, 相似兴趣 tag 等场景的支持。

4.2Set集合特殊的操作命令

setA={A,B,C} setB={B, C}

1)集合与集合之间的交集

sinter setA setB-->得到集合{B,C}

  1. 集合与集合之间的并集

sunion setA setB -->得到集合{A,B,C}

3)集合与集合之间的差集

sdiff setA setB-->得到集合{A}

4.3Set集合特殊的操作命令应用场景

如何实现微博的微关系设计?(看视频更香)

www.bilibili.com/video/av921…

5.1 Zset有序集合

常用于排行榜,如视频网站需要对用户上传视频做排行榜,或点赞数与集合有联系,不能有重复的成员

5.2Zset 类型使用场景

最后

用XMind画了一张导图记录Redis的学习笔记和一些面试解析(源文件对部分节点有详细备注和参考资料,欢迎关注我的公众号:阿风的架构笔记 后台发送【Redis】拿下载链接,已经完善更新):

公众号

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

秒杀系统设计

发表于 2020-08-17

点赞再看,养成习惯,微信搜一搜【三太子敖丙】关注这个互联网苟且偷生的工具人。

本文 GitHub github.com/JavaFamily 已收录,有一线大厂面试完整考点、资料以及我的系列文章。

背景

我之前写过一个秒杀系统的文章不过有些许瑕疵,所以我准备在之前的基础上进行二次创作,不过让我决心二创秒杀系统的原因是我最近面试了很多读者,动不动就是秒杀系统把我整蒙蔽了,我懵的主要是秒杀系统的细节大家都不知道,甚至不知道电商公司一个秒杀系统的组成部分。

我之前在某电商公司就是做电商活动的,所以这样的场景和很多解决方案我是比较清楚的,那我就从我自身去带着大家看看一个秒杀的设计细节以及中间各种解决方案的利弊,以下就是我设计的秒杀系统,几乎涵盖了市面上所有秒杀的实现细节:

正文

首先设计一个系统之前,我们需要先确认我们的业务场景是怎么样子的,我就带着大家一起假设一个场景好吧。

我们现场要卖1000件下面这个婴儿纸尿裤,然后我们根据以往这样秒杀活动的数据经验来看,目测来抢这100件纸尿裤的人足足有10万人。(南极人打钱!)

你一听,完了呀,这我们的服务器哪里顶得住啊!说真的直接打DB肯定挂,但是别急嘛,有暖男敖丙在,任何系统我们开始设计之前我们都应该去思考会出现哪些问题?这里我罗列了几个非常经典的问题:

问题

高并发:

是的高并发这个是我们想都不用想的一个点,一瞬间这么多人进来这不是高并发什么时候是呢?

是吧,秒杀的特点就是这样时间极短、 瞬间用户量大。

正常的店铺营销都是用极低的价格配合上短信、APP的精准推送,吸引特别多的用户来参与这场秒杀,爽了商家苦了开发呀。

秒杀大家都知道如果真的营销到位,价格诱人,几十万的流量我觉得完全不是问题,那单机的Redis我感觉3-4W的QPS还是能顶得住的,但是再高了就没办法了,那这个数据随便搞个热销商品的秒杀可能都不止了。

大量的请求进来,我们需要考虑的点就很多了,缓存雪崩,缓存击穿,缓存穿透这些我之前提到的点都是有可能发生的,出现问题打挂DB那就很难受了,活动失败用户体验差,活动人气没了,最后背锅的还是开发。

超卖:

但凡是个秒杀,都怕超卖,我这里举例的只是尿不湿,要是换成100个MacBook Pro,商家的预算经费卖100个可以赚点还可以造势,结果你写错程序多卖出去200个,你不发货用户投诉你,平台封你店,你发货就血亏,你怎么办?
(没事看了敖丙的文章直接不怕)

那最后只能杀个开发祭天解气了,秒杀的价格本来就低了,基本上都是不怎么赚钱的,超卖了就恐怖了呀,所以超卖也是很关键的一个点。

恶意请求:

你这么低的价格,假如我抢到了,我转手卖掉我不是血赚?就算我不卖我也不亏啊,那用户知道,你知道,别的别有用心的人(黑客、黄牛…)肯定也知道的。

那简单啊,我知道你什么时候抢,我搞个几十台机器搞点脚本,我也模拟出来十几万个人左右的请求,那我是不是意味着我基本上有80%的成功率了。

真实情况可能远远不止,因为机器请求的速度比人的手速往往快太多了,在贵州的敖丙我每年回家抢高铁票都是秒光的,我也不知道有没有黄牛的功劳,我要Diss你,黄牛。杰伦演唱会门票抢不到,我也Diss你。

Tip:科普下,小道消息了解到的,黄牛的抢票系统,比国内很多小公司的系统还吊很多,架构设计都是顶级的,我用顶配的服务加上顶配的架构设计,你还想看演唱会?还想回家?

不过不用黄牛我回家都难,我们云贵川跟我一样要回家过年的仔太多了555!

链接暴露:

前面几个问题大家可能都很好理解,一看到这个有的小伙伴可能会比较疑惑,啥是链接暴露呀?


相信是个开发同学都对这个画面一点都不陌生吧,懂点行的仔都可以打开谷歌的开发者模式,然后看看你的网页代码,有的就有URL,但是我写VUE的时候是事件触发然后去调用文件里面的接口看源码看不到,但是我可以点击一下查看你的请求地址啊,不过你好像可以对按钮在秒杀前置灰。

不管怎么样子都有危险,撇开外面的所有的东西你都挡住了,你卖这个东西实在便宜得过分,有诱惑力,你能保证开发不动心?开发知道地址,在秒杀的时候自己提前请求。。。(开发:怎么TM又是我)

数据库:

每秒上万甚至十几万的QPS(每秒请求数)直接打到数据库,基本上都要把库打挂掉,而且你服务不单单是做秒杀的还涉及其他的业务,你没做降级、限流、熔断啥的,别的一起挂,小公司的话可能全站崩溃404。

反正不管你秒杀怎么挂,你别把别的搞挂了对吧,搞挂了就不是杀一个程序员能搞定的。

程序员:我TM好难啊!

问题都列出来了,那怎么设计,怎么解决这些问题就是接下去要考虑的了,我们对症下药。

我会从我设计的秒杀系统从上到下去给大家介绍我们正常电商秒杀系统在每一层做了些什么,每一层存在的问题,难点等。

我们从前端开始:

前端

秒杀系统普遍都是商城网页、H5、APP、小程序这几项。

在前端这一层其实我们可以做的事情有很多,如果用node去做,甚至能直接处理掉整个秒杀,但是node其实应该属于后端,所以我不讨论node Service了。

资源静态化:

秒杀一般都是特定的商品还有页面模板,现在一般都是前后端分离的,页面一般都是不会经过后端的,但是前端也要自己的服务器啊,那就把能提前放入cdn服务器的东西都放进去,反正把所有能提升效率的步骤都做一下,减少真正秒杀时候服务器的压力。

秒杀链接加盐:

我们上面说了链接要是提前暴露出去可能有人直接访问url就提前秒杀了,那又有小伙伴要说了我做个时间的校验就好了呀,那我告诉你,知道链接的地址比起页面人工点击的还是有很大优势。

我知道url了,那我通过程序不断获取最新的北京时间,可以达到毫秒级别的,我就在00毫秒的时候请求,我敢说绝对比你人工点的成功率大太多了,而且我可以一毫秒发送N次请求,搞不好你卖100个产品我全拿了。

那这种情况怎么避免?

简单,把URL动态化,就连写代码的人都不知道,你就通过MD5之类的摘要算法加密随机的字符串去做url,然后通过前端代码获取url后台校验才能通过。

这个只能防止一部分没耐心继续破解下去的黑客,有耐心的人研究出来还是能破解,在电商场景存在很多这样的羊毛党,那怎么做呢?

后面我会说。

限流:

限流这里我觉得应该分为前端限流和后端限流。

物理控制:

大家有没有发现没到秒杀前,一般按钮都是置灰的,只有时间到了,才能点击。

这是因为怕大家在时间快到的最后几秒秒疯狂请求服务器,然后还没到秒杀的时候基本上服务器就挂了。

这个时候就需要前端的配合,定时去请求你的后端服务器,获取最新的北京时间,到时间点再给按钮可用状态。

按钮可以点击之后也得给他置灰几秒,不然他一样在开始之后一直点的。

你敢说你们秒杀的时候不是这样的?

前端限流:这个很简单,一般秒杀不会让你一直点的,一般都是点击一下或者两下然后几秒之后才可以继续点击,这也是保护服务器的一种手段。

后端限流:秒杀的时候肯定是涉及到后续的订单生成和支付等操作,但是都只是成功的幸运儿才会走到那一步,那一旦100个产品卖光了,return了一个false,前端直接秒杀结束,然后你后端也关闭后续无效请求的介入了。

Tip:真正的限流还会有限流组件的加入例如:阿里的Sentinel、Hystrix等。我这里就不展开了,就说一下物理的限流。

我们卖1000件商品,请求有10W,我们不需要把十万都放进来,你可以放1W请求进来,然后再进行操作,因为秒杀对于用户本身就是黑盒的,所以你怎么做的他们是没感知的,至于为啥放1W进来,而不是刚好1000,是因为会丢掉一些薅羊毛的用户,至于怎么判断,后面的风控阶段我会说。

Nginx:

Nginx大家想必都不陌生了吧,这玩意是高性能的web服务器,并发也随便顶几万不是梦,但是我们的Tomcat只能顶几百的并发呀,那简单呀负载均衡嘛,一台服务几百,那就多搞点,在秒杀的时候多租点流量机。

Tip:据我所知国内某大厂就是在去年春节活动期间租光了亚洲所有的服务器,小公司也很喜欢在双十一期间买流量机来顶住压力。


这样一对比是不是觉得你的集群能顶很多了。

恶意请求拦截也需要用到它,一般单个用户请求次数太夸张,不像人为的请求在网关那一层就得拦截掉了,不然请求多了他抢不抢得到是一回事,服务器压力上去了,可能占用网络带宽或者把服务器打崩、缓存击穿等等。

风控

我可以明确的告诉大家,前面的所有措施还是拦不住很多羊毛党,因为他们是专业的团队,他们可以注册很多账号来薅你的羊毛,而且不用机器请求,就用群控,操作几乎跟真实用户一模一样。

那怎么办,是不是无解了?

这个时候就需要风控同学的介入了,在请求到达后端之前,风控可以根据账号行为分析出这个账号机器人的概率大不大,我现在负责公司的某些特殊系统,每个用户的行为都是会送到我们大数据团队进行分析处理,给你打上对应标签的。

那黑客其实也有办法:养号

他们去黑市买真实用户有过很多记录的账号,买到了还不闲着,帮他们去购物啥的,让系统无法识别他们是黑号还是真实用户的号。

怎么办?

通杀!是的没有办法,只能通杀了,通杀的意思就是,我们通过风管分析出来这个用户是真实用户的概率没有其他用户概率大,那就认为他是机器了,丢弃他的请求。

之前的限流我们放进来10000个请求,但是我们真正的库存只有1000个,那我们就算出最有可能是真实用户的1000人进行秒杀,丢弃其他请求,因为秒杀本来就是黑盒操作的,用户层面是无感知的,这样设计能让真实的用户买到东西,还可以减少自己被薅羊毛的概率。

风控可以说是流量进入的最后一道门槛了,所以很多公司的风控是很强的,蚂蚁金服的风控大家如果了解过就知道了,你的资金在支付宝被盗了,他们是能做到全款补偿是有原因的。

后端

服务单一职责:

设计个能抗住高并发的系统,我觉得还是得单一职责。

什么意思呢,大家都知道现在设计都是微服务的设计思想,然后再用分布式的部署方式。

也就是我们下单是有个订单服务,用户登录管理等有个用户服务等等,那为啥我们不给秒杀也开个服务,我们把秒杀的代码业务逻辑放一起。

单一职责的好处就是就算秒杀没抗住,秒杀库崩了,服务挂了,也不会影响到其他的服务。(高可用)

Redis集群:

之前不是说单机的Redis顶不住嘛,那简单多找几个兄弟啊,秒杀本来就是读多写少,那你们是不是瞬间想起来我之前跟你们提到过的,Redis集群,主从同步、读写分离,我们还搞点哨兵,开启持久化直接无敌高可用!

库存预热:

秒杀的本质,就是对库存的抢夺,每个秒杀的用户来你都去数据库查询库存校验库存,然后扣减库存,撇开性能因数,你不觉得这样好繁琐,对业务开发人员都不友好,而且数据库顶不住啊。

开发:你tm总算为我着想一次了。

那怎么办?

我们都知道数据库顶不住但是他的兄弟非关系型的数据库Redis能顶啊!

那不简单了,我们要开始秒杀前你通过定时任务或者运维同学提前把商品的库存加载到Redis中去,让整个流程都在Redis里面去做,然后等秒杀介绍了,再异步的去修改库存就好了。

但是用了Redis就有一个问题了,我们上面说了我们采用主从,就是我们会去读取库存然后再判断然后有库存才去减库存,正常情况没问题,但是高并发的情况问题就很大了。

**多品几遍!!!**就比如现在库存只剩下1个了,我们高并发嘛,4个服务器一起查询了发现都是还有1个,那大家都觉得是自己抢到了,就都去扣库存,那结果就变成了-3,是的只有一个是真的抢到了,别的都是超卖的。咋办?

事务:

Redis本身是支持事务的,而且他有很多原子命令的,大家也可以用LUA,还可以用他的管道,乐观锁他也知支持。

限流&降级&熔断&隔离:

这个为啥要做呢,不怕一万就怕万一,万一你真的顶不住了,限流,顶不住就挡一部分出去但是不能说不行,降级,降级了还是被打挂了,熔断,至少不要影响别的系统,隔离,你本身就独立的,但是你会调用其他的系统嘛,你快不行了你别拖累兄弟们啊。

消息队列(削峰填谷):

一说到这个名词,很多小伙伴就知道了,对的MQ,你买东西少了你直接100个请求改库我觉得没问题,但是万一秒杀一万个,10万个呢?服务器挂了,程序员又要背锅的。

秒杀就是这种瞬间流量很高,但是平时又没有流量的场景,那消息队列完全契合这样的场景了呀,削峰填谷。

Tip:可能小伙伴说我们业务达不到这个量级,没必要。但是我想说我们写代码,就不应该写出有逻辑漏洞的代码,至少以后公司体量上去了,别人一看居然不用改代码,一看代码作者是敖丙?有点东西!

你可以把它放消息队列,然后一点点消费去改库存就好了嘛,不过单个商品其实一次修改就够了,我这里说的是某个点多个商品一起秒杀的场景,像极了双十一零点。

数据库

数据库用MySQL只要连接池设置合理一般问题是不大的,不过一般大公司不缺钱而且秒杀这样的活动十分频繁,我之前所在的公司就是这样秒杀特卖这样的场景一直都是不间断的。

单独给秒杀建立一个数据库,为秒杀服务,表的设计也是竟可能的简单点,现在的互联网架构部署都是分库的。

至于表就看大家怎么设计了,该设置索引的地方还是要设置索引的,建完后记得用explain看看SQL的执行计划。(不了解的小伙伴也没事,MySQL章节去康康)

分布式事务

这为啥我不放在后端而放到最后来讲呢?

因为上面的任何一步都是可能出错的,而且我们是在不同的服务里面出错的,那就涉及分布式事务了,但是分布式事务大家想的是一定要成功什么的那就不对了,还是那句话,几个请求丢了就丢了,要保证时效和服务的可用可靠。

所以TCC和最终一致性其实不是很适合,TCC开发成本很大,所有接口都要写三次,因为涉及TCC的三个阶段。

最终一致性基本上都是靠轮训的操作去保证一个操作一定成功,那时效性就大打折扣了。

大家觉得不那么可靠的**两段式(2PC)和三段式(3PC)**就派上用场了,他们不一定能保证数据最终一致,但是效率上还算ok。

总结

到这里我想我已经基本上把该考虑的点还有对应的解决方案也都说了一下,不知道还有没有没考虑到的,但是就算没考虑到我想我这个设计,应该也能撑住一个完整的秒杀流程。

最后大家再看看这个秒杀系统或许会有新的感悟,是不是一个系统真的没有大家想的那么简单,而且我还是有漏掉的细节,这是一定的。

秒杀这章我脑细胞死了很多,考虑了很多个点,最后还是出来了,忍不住给自己点赞!

总结

我们玩归玩,闹归闹,别拿面试开玩笑。

秒杀不一定是每个同学都会问到的,至少肯定没Redis基础那样常问,但是一旦问到,大家一定要回答到点上。

至少你得说出可能出现的情况,需要注意的情况,以及对于的解决思路和方案,因为这才是一个coder的基本素养,这些你不考虑你也很难去进步。

最后就是需要对整个链路比较熟悉,注意是一个完整的链路,前端怎么设计的呀,网关的作用呀,怎么解决Redis的并发竞争啊,数据的同步方式呀,MQ的作用啊等等,相信你会有不错的收获。

不知道这是一次成功还是失败的二创,我里面所有提到的技术细节我都写了对应的文章,大家可以关注我去历史文章看看,天色已晚,我溜了。

我是敖丙,你知道的越多,你不知道的越多,我们下期见!

人才们的 【三连】 就是敖丙创作的最大动力,如果本篇博客有任何错误和建议,欢迎人才们留言!


文章持续更新,可以微信搜一搜「 三太子敖丙 」第一时间阅读,回复【资料】有我准备的一线大厂面试资料和简历模板,本文 GitHub github.com/JavaFamily 已经收录,有大厂面试完整考点,欢迎Star。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

HashMap、ConcurrentHashMap 17和

发表于 2020-08-16

本篇内容是学习的记录,可能会有所不足。

一:JDK1.7中的HashMap

JDK1.7的hashMap是由数组 + 链表组成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
php复制代码 /** 1 << 4,表示1,左移4位,变成10000,即16,以二进制形式运行,效率更高
* 默认的hashMap数组长度
* The default initial capacity - MUST be a power of two.
*/
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16

/**
* The maximum capacity, used if a higher value is implicitly specified
* by either of the constructors with arguments.
* MUST be a power of two <= 1<<30.
* hashMap的最大容量
*/
static final int MAXIMUM_CAPACITY = 1 << 30; //1 073 741 824

/**
* The load factor used when none specified in constructor.
* 负载因子
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;

/**
* An empty table instance to share when the table is not inflated.
*/
static final Entry<?,?>[] EMPTY_TABLE = {};

/**
* The table, resized as necessary. Length MUST Always be a power of two.
* hashTable,根据需要调整大小。长度一定是2的幂。
*/
transient Entry<K,V>[] table = (Entry<K,V>[]) EMPTY_TABLE;

/**
* The number of key-value mappings contained in this map.
* hashMap中元素的个数
*/
transient int size;

/**
* The next size value at which to resize (capacity * load factor).
* @serial
*/
// If table == EMPTY_TABLE then this is the initial capacity at which the
// table will be created when inflated.
int threshold;

/**
* The load factor for the hash table.
*
* @serial
*/
final float loadFactor;

/**
* The number of times this HashMap has been structurally modified
* Structural modifications are those that change the number of mappings in
* the HashMap or otherwise modify its internal structure (e.g.,
* rehash). This field is used to make iterators on Collection-views of
* the HashMap fail-fast. (See ConcurrentModificationException).
* 记录hashMap元素被修改的次数
*/
transient int modCount;

1:DEFAULT_INITIAL_CAPACITY,是hashMap默认的初始容量,它的大小一定是2的幂。

2:MAXIMUM_CAPACITY,hashMap支持的最大容量。

3:DEFAULT_LOAD_FACTOR,hashMap默认的负载因子,值为0.75,它决定hashMap数据的密度。

4:Entry<K,V>[] table,hashMap数组,可以根据自己的需要调整大小,长度一定是2的幂。

5:size,主要是记录hashMap中元素的数量。

6:threshold,调整hashMap后的值,即容量*负载因子。

7:loadFactor,可以调整的负载因子。

8:modCount,用来记录hashMap结构被修改的次数。

hashMap源码中有四个构造函数,初始化的时候可以知道容量和负载因子的大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
scss复制代码 /**   做了两件事:1、为threshold、loadFactor赋值   2、调用init()
* Constructs an empty <tt>HashMap</tt> with the specified initial
* capacity and load factor.
*
* @param initialCapacity the initial capacity
* @param loadFactor the load factor
* @throws IllegalArgumentException if the initial capacity is negative
* or the load factor is nonpositive
*/
public HashMap(int initialCapacity, float loadFactor) {
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal initial capacity: " +
initialCapacity);
if (initialCapacity > MAXIMUM_CAPACITY) //限制最大容量
initialCapacity = MAXIMUM_CAPACITY;
if (loadFactor <= 0 || Float.isNaN(loadFactor)) //检查 loadFactor
throw new IllegalArgumentException("Illegal load factor: " +
loadFactor);
//真正在做的,只是记录下loadFactor、initialCpacity的值
this.loadFactor = loadFactor; //记录下loadFactor
threshold = initialCapacity; //初始的 阈值threshold=initialCapacity=16
init();
}

/**
* Constructs an empty <tt>HashMap</tt> with the specified initial
* capacity and the default load factor (0.75).
*
* @param initialCapacity the initial capacity.
* @throws IllegalArgumentException if the initial capacity is negative.
*/
public HashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR);
}

/** 默认的初始化容量、默认的加载因子
* Constructs an empty <tt>HashMap</tt> with the default initial capacity
* (16) and the default load factor (0.75).
*/
public HashMap() { //16 0.75
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR);
}

/**
* Constructs a new <tt>HashMap</tt> with the same mappings as the
* specified <tt>Map</tt>. The <tt>HashMap</tt> is created with
* default load factor (0.75) and an initial capacity sufficient to
* hold the mappings in the specified <tt>Map</tt>.
*
* @param m the map whose mappings are to be placed in this map
* @throws NullPointerException if the specified map is null
*/
public HashMap(Map<? extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR);
inflateTable(threshold);

putAllForCreate(m);
}

接下来看下put方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ini复制代码 public V put(K key, V value) {
if (Entry<K,V>[] table == EMPTY_TABLE) {
inflateTable(threshold); //初始化表 (初始化、扩容 合并为了一个方法)
}
if (key == null) //对key为null做特殊处理
return putForNullKey(value);
int hash = hash(key); //计算hash值
int i = indexFor(hash, table.length); //根据hash值计算出index下标
for (Entry<K,V> e = table[i]; e != null; e = e.next) { //遍历下标为i处的链表
Object k;
if (e.hash == hash && ((k = e.key) == key || key.equals(k))) { //如果key值相同,覆盖旧值,返回新值
V oldValue = e.value;
e.value = value; //新值 覆盖 旧值
e.recordAccess(this); //do nothing
return oldValue; //返回旧值
}
}

modCount++; //修改次数+1,类似于一个version number
addEntry(hash, key, value, i);
return null;
}

可以看到到table是空的时候,调用了一个方法:

1
2
3
4
5
6
7
8
scss复制代码private void inflateTable(int toSize) {
// Find a power of 2 >= toSize
int capacity = roundUpToPowerOf2(toSize);
//
threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
table = new Entry[capacity]; //初始化表
initHashSeedAsNeeded(capacity);
}

这个方法用来初始化table和table的扩容,roundUpToPowerOf2可以保证hashMap的容量一定是2的幂。

hashMap put元素时,会先根据hash运算计算出hash值,然后根据hash值和table的长度进行取模,计算出元素在table中的下标,如果key相同就覆盖原来的旧值,如果不相同就加入链表中。

1
2
3
4
5
6
7
8
arduino复制代码/**
* Returns index for hash code h.
* 计算元素在table中的下标位置
*/
static int indexFor(int h, int length) {
// assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2";
return h & (length-1);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
scss复制代码 /**
* Adds a new entry with the specified key, value and hash code to
* the specified bucket. It is the responsibility of this
* method to resize the table if appropriate.
*
* Subclass overrides this to alter the behavior of put method.
*/
void addEntry(int hash, K key, V value, int bucketIndex) {
if ((size >= threshold) && (null != table[bucketIndex])) { //如果size大于threshold && table在下标为index的地方已经有entry了
resize(2 * table.length); //扩容,将数组长度变为原来两倍
hash = (null != key) ? hash(key) : 0; //重新计算 hash 值
bucketIndex = indexFor(hash, table.length); //重新计算下标
}

createEntry(hash, key, value, bucketIndex); //创建entry
}

/**
* Like addEntry except that this version is used when creating entries
* as part of Map construction or "pseudo-construction" (cloning,
* deserialization). This version needn't worry about resizing the table.
*
* Subclass overrides this to alter the behavior of HashMap(Map),
* clone, and readObject.
*/
void createEntry(int hash, K key, V value, int bucketIndex) {
Entry<K,V> e = table[bucketIndex]; //获取table中存的entry
table[bucketIndex] = new Entry<>(hash, key, value, e); //将新的entry放到数组中,next指向旧的table[i]
size++; //修改map中元素个数
}

当put的元素个数大于12时,即大于hashMap的容量*负载因子计算后的值,那么就会进行扩容,上述源代码可以看到扩容的条件, 除了大于12,还要看当前put进table所处的位置,是否为null,若是null,就不进行扩容,否则就扩容成原来容量的2倍,扩容后需要重新计算hash和计算下标,由于table的长度发生了变化,需要重新计算。

接下来看下get方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
ini复制代码public V get(Object key) {
if (key == null)
return getForNullKey();
Entry<K,V> entry = getEntry(key);

return null == entry ? null : entry.getValue();
}

/**
* Returns the entry associated with the specified key in the
* HashMap. Returns null if the HashMap contains no mapping
* for the key.
*/
final Entry<K,V> getEntry(Object key) {
if (size == 0) {
return null;
}

int hash = (key == null) ? 0 : hash(key);
for (Entry<K,V> e = table[indexFor(hash, table.length)];
e != null;
e = e.next) {
Object k;
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
}
return null;
}

get方法也是需要先计算hash然后计算下标,再去寻找元素。

)​

二:JDK1.8中的HashMap

JDK1.8中的hashMap和1.7最大的区别就是引入了红黑树

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
php复制代码/**
* The table, initialized on first use, and resized as
* necessary. When allocated, length is always a power of two.
* (We also tolerate length zero in some operations to allow
* bootstrapping mechanics that are currently not needed.)
*/
transient Node<K,V>[] table;

/**
* Holds cached entrySet(). Note that AbstractMap fields are used
* for keySet() and values().
*/
transient Set<Map.Entry<K,V>> entrySet;

/**
* The number of key-value mappings contained in this map.
*/
transient int size;

/**
* The number of times this HashMap has been structurally modified
* Structural modifications are those that change the number of mappings in
* the HashMap or otherwise modify its internal structure (e.g.,
* rehash). This field is used to make iterators on Collection-views of
* the HashMap fail-fast. (See ConcurrentModificationException).
*/
transient int modCount;

/**
* The next size value at which to resize (capacity * load factor).
*
* @serial
*/
// (The javadoc description is true upon serialization.
// Additionally, if the table array has not been allocated, this
// field holds the initial array capacity, or zero signifying
// DEFAULT_INITIAL_CAPACITY.)
int threshold;

/**
* The load factor for the hash table.
*
* @serial
*/
final float loadFactor;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
arduino复制代码/**
* The default initial capacity - MUST be a power of two.
*/
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16

/**
* The maximum capacity, used if a higher value is implicitly specified
* by either of the constructors with arguments.
* MUST be a power of two <= 1<<30.
*/
static final int MAXIMUM_CAPACITY = 1 << 30;

/**
* The load factor used when none specified in constructor.
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;

/**
* The bin count threshold for using a tree rather than list for a
* bin. Bins are converted to trees when adding an element to a
* bin with at least this many nodes. The value must be greater
* than 2 and should be at least 8 to mesh with assumptions in
* tree removal about conversion back to plain bins upon
* shrinkage.
*
*/
static final int TREEIFY_THRESHOLD = 8;

/**
* The bin count threshold for untreeifying a (split) bin during a
* resize operation. Should be less than TREEIFY_THRESHOLD, and at
* most 6 to mesh with shrinkage detection under removal.
*/
static final int UNTREEIFY_THRESHOLD = 6;

/**
* The smallest table capacity for which bins may be treeified.
* (Otherwise the table is resized if too many nodes in a bin.)
* Should be at least 4 * TREEIFY_THRESHOLD to avoid conflicts
* between resizing and treeification thresholds.
*/
static final int MIN_TREEIFY_CAPACITY = 64;

/**
* Basic hash bin node, used for most entries. (See below for
* TreeNode subclass, and in LinkedHashMap for its Entry subclass.)
*/
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
V value;
Node<K,V> next;

Node(int hash, K key, V value, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}

public final K getKey() { return key; }
public final V getValue() { return value; }
public final String toString() { return key + "=" + value; }

public final int hashCode() {
return Objects.hashCode(key) ^ Objects.hashCode(value);
}

public final V setValue(V newValue) {
V oldValue = value;
value = newValue;
return oldValue;
}

public final boolean equals(Object o) {
if (o == this)
return true;
if (o instanceof Map.Entry) {
Map.Entry<?,?> e = (Map.Entry<?,?>)o;
if (Objects.equals(key, e.getKey()) &&
Objects.equals(value, e.getValue()))
return true;
}
return false;
}
}

下面看下put方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
scss复制代码 public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}

/**
* Implements Map.put and related methods. 添加元素
*
* @param hash hash for key
* @param key the key
* @param value the value to put
* @param onlyIfAbsent if true, don't change existing value
* @param evict if false, the table is in creation mode.
* @return previous value, or null if none
*/
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0) //若table为null
n = (tab = resize()).length; //resize
if ((p = tab[i = (n - 1) & hash]) == null) //计算下标i,取出i处的元素为p,如果p为null
tab[i] = newNode(hash, key, value, null); //创建新的node,放到数组中
else { //若 p!=null
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k)))) //若key相同
e = p; //直接覆盖
else if (p instanceof TreeNode) //如果为 树节点
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value); //放到树中
else { //如果key不相同,也不是treeNode
for (int binCount = 0; ; ++binCount) { //遍历i处的链表
if ((e = p.next) == null) { //找到尾部
p.next = newNode(hash, key, value, null); //在末尾添加一个node
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st //如果链表长度 >= 8
treeifyBin(tab, hash); //将链表转成共黑树
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k)))) //若果key相同,直接退出循环
break;
p = e;
}
}
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}

可以看到,上述源代码中,put的时候加入了红黑树,当put元素时,若链表的长度大于8,即源代码中的TREEIFY_THRESHOLD的值,这个时候链表就会转化为红黑树结构;当进行扩容的时候,红黑树转移后,若元素个数小于6,那么就会重新转化为链表。

)​

三:JDK1.7中的ConcurrentHashMap

JDK1.7中的ConcurrentHashMap和JDK1.7中的HashMap的区别就是数组所存的元素,我们知道ConcurrentHashMap 是线程安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
ini复制代码public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key); // 计算Hash值
int j = (hash >>> segmentShift) & segmentMask; //计算下标j
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j); //若j处有segment就返回,若没有就创建并返回
return s.put(key, hash, value, false); //将值put到segment中去
}


final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value); //如果tryLock成功,就返回null,否则。。。
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash; //根据table数组的长度 和 hash值计算index小标
HashEntry<K,V> first = entryAt(tab, index); //找到table数组在 index处链表的头部
for (HashEntry<K,V> e = first;;) { //从first开始遍历链表
if (e != null) { //若e!=null
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) { //如果key相同
oldValue = e.value; //获取旧值
if (!onlyIfAbsent) { //若absent=false
e.value = value; //覆盖旧值
++modCount; //
}
break; //若已经找到,就退出链表遍历
}
e = e.next; //若key不相同,继续遍历
}
else { //直到e为null
if (node != null) //将元素放到链表头部
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first); //创建新的Entry
int c = count + 1; //count 用来记录元素个数
if (c > threshold && tab.length < MAXIMUM_CAPACITY) //如果hashmap元素个数超过threshold,并且table长度小于最大容量
rehash(node); //rehash跟resize的功能差不多,将table的长度变为原来的两倍,重新打包entries,并将给定的node添加到新的table
else //如果还有容量
setEntryAt(tab, index, node); //就在index处添加链表节点
++modCount; //修改操作数
count = c; //将count+1
oldValue = null; //
break;
}
}
} finally {
unlock(); //执行完操作后,释放锁
}
return oldValue; //返回oldValue
}

private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset 获取下标k处的offset,
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { //如果下标k处没有元素
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length; //根据proto 获得 cap参数
float lf = proto.loadFactor; //。。。
int threshold = (int)(cap * lf); //计算threshold
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck //如果下标k处仍然没有元素
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); //创建segment
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { //若下标k处仍然没有元素,自旋
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) //若通过CAS更新成功,则退出
break;
}
}
}
return seg;
}

1
2
3
4
swift复制代码/** segments中每个元素都是一个专用的hashtable
* The segments, each of which is a specialized hash table.
*/
final Segment<K,V>[] segments;

可以看到1.7中的ConcurrentHashMap数组中所存的是segments,每个segments下都是一个hashTable。当put元素时,会加锁,然后计算hash和下标,计算下标会计算两次,一次是在数组中的segments的位置,一次是在hashTable的位置。

)​

四:JDK1.8中的ConcurrentHashMap

JDK1.8中的ConcurrentHashMap和JDK1.8中的HashMap结构一样,只是在处理上有区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
ini复制代码public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); //计算hash值
int binCount = 0;
for (Node<K,V>[] tab = table;;) { //自旋
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) //table==null || table.length==0
tab = initTable(); //就initTable
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //若下标 i 处的元素为null
if (casTabAt(tab, i, null, //直接用CAS操作,i处的元素
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin 想emptybin中假如元素的时候,不需要加锁
}
else if ((fh = f.hash) == MOVED) //若下标 i 处的元素不为null,且f.hash==MOVED MOVED为常量值-1
tab = helpTransfer(tab, f); //
else { //如果是一般的节点
V oldVal = null;
synchronized (f) { //当头部元素不为null,且不需要转换成树时,需要进行同步操作
if (tabAt(tab, i) == f) {
if (fh >= 0) { //若 链表头部hash值 >=0
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { //如果key相同
oldVal = e.val;
if (!onlyIfAbsent) //且不为absent
e.val = value; //旧值覆盖新值
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null), { //如果链表遍历完成,还没退出,说明没有相同的key存在,在尾部添加节点
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { //如果f是Tree的节点
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

当put元素时,会使用CAS操作,去判断数组中所要put到的位置元素是否为空,为空就修改为当前的put的元素,若CAS操作失败,那么会自旋,这个时候发现数组里已经有元素了,那么就会锁住链表或者红黑树头部,把元素放入链表或者红黑树下面 。

五:hash冲突

当put的时候需要计算hash和下标,这个时候计算出来的值可能存在一样的,那么存到数组中的相同位置,就会发生hash冲突,

计算出的hash值一样一定会发生hash冲突,但是hash值一样的概率很小,计算出的下标值是一样的概率很大,所以hash冲突主要是由下标位置一样引起的,hashMap的解决方式是使用链地址法,即使用链表的方式解决,key一样的时候才会覆盖,否则就把元素放到链表的下一个位置。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…786787788…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%