开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

雪球数据中台建设

发表于 2021-03-22

背景介绍

雪球是中国最大的股票投资社区,其使命是“连接投资的一切”,愿景是成为“全球最大的投资交流交易平台”。雪球的核心目标是为股票、基金投资者提供优质的信息获取、投资交流和行情交易体验。

在雪球内部,为了达到”连接投资的一切”专门成立了大数据部门,背负起建设雪球数据中台的责任以及”连接雪球数据的一切”的使命,有来自各部门的数据,来自各APP的数据,来自各种微服务的数据,如何能快速的将数据进行整合治理,快速达成业务需要的数据报表,以完成数据分析乃至数据决策,成为掣肘雪球各业务发展的一个突出问题,如何解决呢?

数据中台AIBO—连接数据的一切

想要快速集成各种数据,必须做到存储和计算的分离,想要数据分析和决策,必须提供便捷的数据分析工具,想要各业务线集成打通,必须拥有权限和数据目录管理,想要快速响应业务,必须提供数据API, 做好服务快速构建和治理,基于对诸多问题的拆解,我们发现想要解决所有数据问题,就必须拥有一个雪球专属的数据中台,没有任何一家企业能够通过购买通用的服务来解决所有业务问题,数据中台也并非万能,需要与业务紧密结合,快速提供业务对数据的获取能力,整合分析能力,报表能力等。

既然目标确定,我们可以看一下建设一个企业的数据中台,必须要有哪些能力(目前并没有一个统一的认知,融合雪球的现状:

  • 数据集成:需要能快速的对各种结构化以及非结构化的数据做集成
  • 数据目录:需要理清公司重要的数据资产,建设统一的数据目录
  • 数据标签和打通: 需要打通各业务的数据,同时梳理出数据标签(如用户标签,股票标签,帖子标签,广告标签等)
  • 数据分析:对业务价值的探索和分析,离不开高效的数据分析工具支持,需要提供统一且灵活的数据分析工具
  • 数据权限: 需要对各类数据做好权限管理,防止数据泄露的同时做好数据分享和协作
  • 数据服务:需要对业务提供数据支撑,快速开发数据服务,以微服务思想驱动业务, 同时需要做好服务治理,用数据分析来更新迭代数据服务(不用的数据服务下线,核心的数据服务保证高可用和稳定性)

从数据流和数据应用角度来看,整个AIBO架构如下:

图片

数据集成

雪球数据中台,采用的是存储与计算分离的架构设计,数据集成是对数据做ETL和整合,集成后存储到hive数据仓库里,主要有来自如下几方面的数据:

  • Kafka topic数据: 通过flume同步落地到hive,flume集群添加配置即可,后续也会完善成标准的数据接入接口,以配置驱动完成消息队列数据的快速接入
  • mysql业务数据库数据: 通过sqoop导表到hive,AIBO开发了专门的接口,来快速做到数据库表的集合,标准化的完成mysql数据的整合
  • 数据ETL工作: 需要对底层数据源,做一定的ETL工作,以满足业务需求,此点是灵活多变,根据业务需要来完成的,通过SQL,python,shell等方式来完成,视具体情况而定
  • 数据依赖调度: 采用DolphinScheduler,图形化的处理数据依赖

数据目录–User+Event

数据目录,也即数据管理中心,需要集成尽可能多的有价值的数据资产,也需要提供各种数据的数据定义, 雪球是一款ToC的产品,面向大众用户,所以很自然的,AIBO的底层数据均是和用户相关,描述谁,在什么时间,做了什么事,这件事有什么特殊的属性等,所以抽象来看就是 User+Event的数据模型,数据目录也就是常用的用户分析系统里的事件定义。

图片

数据标签和打通–USER+EVENT+Item

Item即为数据物料,数据标签的意义,在一个用户完成某个事件时,我们知道这个用户是谁,就自然想知道这个用户有哪些标签属性,如年龄,性别,粉丝数,关注大V数,收藏帖子数,发帖频次等从各业务线来描述的用户数据标签,再拓展一下, 当某个人在雪球看了某个股票行情,我们也可能想知道这个人在看股票行情时,都喜欢看哪一类股票的行情,由此也需要股票的标签,如股票所属行业,股票市盈率,市净率,将利率等信息, 同样用户在雪球点击某个广告时,我们也想要知道用户爱点哪类的广告,从而需要知道广告的物料信息,从技术上来讲,也很好理解,就是Event的某个属性里,存在一个Item物料数据的ID外键,我们可以通过此外键来打通物料数据,如event里的uid可以打通user_info用户标签,adid可以打通广告标签,stock_code打通股票标签等。

Item的标签可以是任意物料信息,如电商里的订单信息,产品信息,物流信息等,在雪球里也有很多各业务的物料信息以及全雪球用户标签数据。

Item的标签是可以灵活添加和维护的,通过数据集成和自定义ETL来完成,通过DolphinScheduler完成依赖管理。

图片

数据分析—通用模型分析能力

成功的数据应用,离不开数据分析和探索,从而用数据来影响业务,打造数据应用的闭环,数据分析工具是数据中台中重要的一环,分析数据,创造价值不可或缺的一步,需要提供足够便利的数据分析,AIBO目前提供以下数据分析能力:

  • 事件分析
  • 留存分析
  • 漏斗分析
  • 自定义数据看板
  • 分群对比
  • ABTest

事件分析

用户在APP/网页等平台上产生的行为可称之为事件。如启动APP、注册、登陆、浏览帖子、入金等都是事件的一种。事件分析可实现指定细分维度下(应用版本、设备品牌、操作系统等)的指标(APP启动用户数、注册用户数、入金用户数等)计算与展示,帮助快速、高效解决日常运营分析中遇到的问题。

图片

功能:支持自定义维度,自定义指标,支持分群对比功能,支持更多图表展示,以及数据download。

留存分析

留存分析是一种用来分析用户参与情况/活跃程度的分析模型,考察进行初始行为后的用户中,有多少人会进行后续行为。这是衡量产品对用户价值高低的重要指标。

图片

功能: 可自定义初始行为和后续行为,可按初始行为或后续行为的任意维度进行维度分组剖析, 可做用户分群对比, 可多种图表查看数据。

漏斗分析

漏斗模型主要用于分析一个多步骤过程中每一步的转化与流失情况。可以用于验证用户是否按照产品设计的路径达成最终目的,也可来分析用户完成核心转化步骤的情况。通过分析每一步之间的转化与流失情况,发现潜在问题,定位流失用户。

图片

功能: 自由定义漏斗事件步骤,自由为每步事件添加过滤条件,分群对比,多图表展示漏斗。

自定义数据看板

根据自身对数据的跟踪需要将分析过程中保存的指标和图表添加到看板中,构成自定义看板,并可通过分享看板给指定人员,方便日常数据监控

图片

功能: 在事件分析,留存,漏斗等分析页面,可快速保存图表,并自由定义数据看板。

分群对比

AIBO的所有灵活模型,均支持用户分群对比,用户分群可以通过任意事件,或者任意模型的剖析过滤条件来进行保存,同时也支持静态的基于文件上传的用户分群配置,也支持动态的(条件定制,随着时间的改变分群的用户集也随之改变)。

图片

功能: 快速的对比两个或多个用户分群,在各个模型上的数据表现情况。

ABTest

辅助产品进行升级,支持AB实验的定制,和雪球大前端团队共同协作,在底层APP上进行ABTest实验分流的支持,业务方只需要定义ABTest的多个页面版本,以及实验的目标用户分群(可通过已配置过的分群来做筛选,也可以自由定义条件),开启实验后,会提供T+1和T+0的数据分析统计,以快速的进行产品改版的优胜劣态。

图片

功能: ABTest为产品升级提供支持,自定义人群分流,数据效果实时统计,自由定义核心指标以及辅助指标。

数据服务

在技术架构上,以微服务的方式,为业务线提供各种数据支持,通过已有数据模型以及模型的扩展,进行数据组合,可共享数据配置和数据目录元数据,通用的数据服务集成到AIBO通用的数据服务接口里,业务定制的采用微服务进行开发和管理,根据业务需要进行灵活调整。

总结与展望

雪球在数据中台的建设上,采用存储和计算查询分离的架构设计,以微服务方式进行数据API和数据分析模型的扩展,对通用的业务需求进行抽象整合,以保证业务顺利进行,形成数据整合+数据目录配置+数据分析+数据应用的闭环迭代,后续会逐步补充数据中台AIBO的能力,完善数据安全的更细粒度支持,完善数据血缘关系(可在数据目录里看到所有etl过程),相信在不断的业务迭代下,雪球数据中台能越来越完善,越来越强大,通过数据驱动业务,完成业务升华。

对技术感兴趣的同事,可以关注雪球工程师团队公众号,后续还会带来各模块的技术和设计分享,敬请期待。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Boot 实现接口幂等性的 4 种方案!还有谁

发表于 2021-03-22

公众号:Java小咖秀,网站:javaxks.com

作者 : 超级小豆丁 ,链接: mydlq.club/article/94/

一、什么是幂等性

幂等是一个数学与计算机学概念,在数学中某一元运算为幂等时,其作用在任一元素两次后会和其作用一次的结果相同。

[在计算机中编程中,一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。幂等函数或幂等方法是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。]

二、什么是接口幂等性

在 HTTP/1.1 中,对幂等性进行了定义。它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外),即第一次请求的时候对资源产生了副作用,但是以后的多次请求都不会再对资源产生副作用。

这里的副作用是不会对结果产生破坏或者产生不可预料的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

三、为什么需要实现幂等性

在接口调用时一般情况下都能正常返回信息不会重复提交,不过在遇见以下情况时可以就会出现问题,如:

  • 前端重复提交表单: 在填写一些表格时候,用户填写完成提交,很多时候会因网络波动没有及时对用户做出提交成功响应,致使用户认为没有成功提交,然后一直点提交按钮,这时就会发生重复提交表单请求。
  • 用户恶意进行刷单: 例如在实现用户投票这种功能时,如果用户针对一个用户进行重复提交投票,这样会导致接口接收到用户重复提交的投票信息,这样会使投票结果与事实严重不符。
  • 接口超时重复提交: 很多时候 HTTP 客户端工具都默认开启超时重试的机制,尤其是第三方调用接口时候,为了防止网络波动超时等造成的请求失败,都会添加重试机制,导致一个请求提交多次。
  • 消息进行重复消费: 当使用 MQ 消息中间件时候,如果发生消息中间件出现错误未及时提交消费信息,导致发生重复消费。

使用幂等性最大的优势在于使接口保证任何幂等性操作,免去因重试等造成系统产生的未知的问题。

四、引入幂等性后对系统的影响

幂等性是为了简化客户端逻辑处理,能放置重复提交等操作,但却增加了服务端的逻辑复杂性和成本,其主要是:

  • 把并行执行的功能改为串行执行,降低了执行效率。
  • 增加了额外控制幂等的业务逻辑,复杂化了业务功能;

所以在使用时候需要考虑是否引入幂等性的必要性,根据实际业务场景具体分析,除了业务上的特殊要求外,一般情况下不需要引入的接口幂等性。

五、Restful API 接口的幂等性

现在流行的 Restful 推荐的几种 HTTP 接口方法中,分别存在幂等行与不能保证幂等的方法,如下:

  • √ 满足幂等
  • x 不满足幂等
    • 可能满足也可能不满足幂等,根据实际业务逻辑有关
方法类型 是否幂等 描述
Get √ Get 方法用于获取资源。其一般不会也不应当对系统资源进行改变,所以是幂等的。
Post × Post 方法一般用于创建新的资源。其每次执行都会新增数据,所以不是幂等的。
Put - Put 方法一般用于修改资源。该操作则分情况来判断是不是满足幂等,更新操作中直接根据某个值进行更新,也能保持幂等。不过执行累加操作的更新是非幂等。
Delete - Delete 方法一般用于删除资源。该操作则分情况来判断是不是满足幂等,当根据唯一值进行删除时,删除同一个数据多次执行效果一样。不过需要注意,带查询条件的删除则就不一定满足幂等了。例如在根据条件删除一批数据后,这时候新增加了一条数据也满足条件,然后又执行了一次删除,那么将会导致新增加的这条满足条件数据也被删除。

六、如何实现幂等性

方案一:数据库唯一主键

方案描述

数据库唯一主键的实现主要是利用数据库中主键唯一约束的特性,一般来说唯一主键比较适用于 “插入” 时的幂等性,其能保证一张表中只能存在一条带该唯一主键的记录。

使用数据库唯一主键完成幂等性时需要注意的是,该主键一般来说并不是使用数据库中自增主键,而是使用分布式 ID 充当主键(可以参考 Java 中分布式 ID 的设计方案 这篇文章),这样才能能保证在分布式环境下 ID 的全局唯一性。

适用操作:

  • 插入操作
  • 删除操作

使用限制:

  • 需要生成全局唯一主键 ID;

主要流程:

image.png

主要流程:

  • ① 客户端执行创建请求,调用服务端接口。
  • ② 服务端执行业务逻辑,生成一个分布式 ID,将该 ID 充当待插入数据的主键,然后执数据插入操作,运行对应的 SQL 语句。
  • ③ 服务端将该条数据插入数据库中,如果插入成功则表示没有重复调用接口。如果抛出主键重复异常,则表示数据库中已经存在该条记录,返回错误信息到客户端。

方案二:数据库乐观锁

方案描述:

[数据库乐观锁方案一般只能适用于执行 “更新操作” 的过程,我们可以提前在对应的数据表中多添加一个字段,充当当前数据的版本标识。这样每次对该数据库该表的这条数据执行更新时,都会将该版本标识作为一个条件,值为上次待更新数据中的版本标识的值。]

适用操作:

  • 更新操作

使用限制:

  • 需要数据库对应业务表中添加额外字段;

描述示例:

例如,存在如下的数据表中:

id name price
1 小米手机 1000
2 苹果手机 2500
3 华为手机 1600

为了每次执行更新时防止重复更新,确定更新的一定是要更新的内容,我们通常都会添加一个 version 字段记录当前的记录版本,这样在更新时候将该值带上,那么只要执行更新操作就能确定一定更新的是某个对应版本下的信息。

id name price version
1 小米手机 1000 10
2 苹果手机 2500 21
3 华为手机 1600 5

这样每次执行更新时候,都要指定要更新的版本号,如下操作就能准确更新 version=5 的信息:

1
sql复制代码UPDATE my_table SET price=price+50,version=version+1 WHERE id=1 AND version=5

上面 WHERE 后面跟着条件 id=1 AND version=5 被执行后,id=1 的 version 被更新为 6,所以如果重复执行该条 SQL 语句将不生效,因为 id=1 AND version=5 的数据已经不存在,这样就能保住更新的幂等,多次更新对结果不会产生影响。

方案三:防重 Token 令牌

方案描述:

[针对客户端连续点击或者调用方的超时重试等情况,例如提交订单,此种操作就可以用 Token 的机制实现防止重复提交。简单的说就是调用方在调用接口的时候先向后端请求一个全局 ID(Token),请求的时候携带这个全局 ID 一起请求(Token 最好将其放到 Headers 中),后端需要对这个 Token 作为 Key,用户信息作为 Value 到 Redis 中进行键值内容校验,如果 Key 存在且 Value 匹配就执行删除命令,然后正常执行后面的业务逻辑。如果不存在对应的 Key 或 Value 不匹配就返回重复执行的错误信息,这样来保证幂等操作。]

适用操作:

  • 插入操作
  • 更新操作
  • 删除操作

使用限制:

  • 需要生成全局唯一 Token 串;
  • 需要使用第三方组件 Redis 进行数据效验;

主要流程:

  • ① 服务端提供获取 Token 的接口,该 Token 可以是一个序列号,也可以是一个分布式 ID 或者 UUID 串。
  • ② 客户端调用接口获取 Token,这时候服务端会生成一个 Token 串。
  • ③ 然后将该串存入 Redis 数据库中,以该 Token 作为 Redis 的键(注意设置过期时间)。
  • ④ 将 Token 返回到客户端,客户端拿到后应存到表单隐藏域中。
  • ⑤ 客户端在执行提交表单时,把 Token 存入到 Headers 中,执行业务请求带上该 Headers。
  • ⑥ 服务端接收到请求后从 Headers 中拿到 Token,然后根据 Token 到 Redis 中查找该 key 是否存在。
  • ⑦ 服务端根据 Redis 中是否存该 key 进行判断,如果存在就将该 key 删除,然后正常执行业务逻辑。如果不存在就抛异常,返回重复提交的错误信息。

注意,在并发情况下,执行 Redis 查找数据与删除需要保证原子性,否则很可能在并发下无法保证幂等性。其实现方法可以使用分布式锁或者使用 Lua 表达式来注销查询与删除操作。

方案四、下游传递唯一序列号

方案描述:

所谓请求序列号,其实就是每次向服务端请求时候附带一个短时间内唯一不重复的序列号,该序列号可以是一个有序 ID,也可以是一个订单号,一般由下游生成,在调用上游服务端接口时附加该序列号和用于认证的 ID。

当上游服务器收到请求信息后拿取该 序列号 和下游 认证 ID 进行组合,形成用于操作 Redis 的 Key,然后到 Redis 中查询是否存在对应的 Key 的键值对,根据其结果:

  • 如果存在,就说明已经对该下游的该序列号的请求进行了业务处理,这时可以直接响应重复请求的错误信息。
  • 如果不存在,就以该 Key 作为 Redis 的键,以下游关键信息作为存储的值(例如下游商传递的一些业务逻辑信息),将该键值对存储到 Redis 中 ,然后再正常执行对应的业务逻辑即可。

适用操作:

  • 插入操作
  • 更新操作
  • 删除操作

使用限制:

  • 要求第三方传递唯一序列号;
  • 需要使用第三方组件 Redis 进行数据效验;

主要流程:

主要步骤:

  • ① 下游服务生成分布式 ID 作为序列号,然后执行请求调用上游接口,并附带 “唯一序列号” 与请求的“认证凭据 ID”。
  • ② 上游服务进行安全效验,检测下游传递的参数中是否存在 “序列号” 和“凭据 ID”。
  • ③ 上游服务到 Redis 中检测是否存在对应的 “序列号” 与“认证 ID”组成的 Key,如果存在就抛出重复执行的异常信息,然后响应下游对应的错误信息。如果不存在就以该 “序列号” 和“认证 ID”组合作为 Key,以下游关键信息作为 Value,进而存储到 Redis 中,然后正常执行接来来的业务逻辑。

上面步骤中插入数据到 Redis 一定要设置过期时间。这样能保证在这个时间范围内,如果重复调用接口,则能够进行判断识别。如果不设置过期时间,很可能导致数据无限量的存入 Redis,致使 Redis 不能正常工作。

七、实现接口幂等示例

这里使用防重 Token 令牌方案,该方案能保证在不同请求动作下的幂等性,实现逻辑可以看上面写的” 防重 Token 令牌” 方案,接下来写下实现这个逻辑的代码。

1、Maven 引入相关依赖

这里使用 Maven 工具管理依赖,这里在 pom.xml 中引入 SpringBoot、Redis、lombok 相关依赖。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
</parent>

<groupId>mydlq.club</groupId>
<artifactId>springboot-idempotent-token</artifactId>
<version>0.0.1</version>
<name>springboot-idempotent-token</name>
<description>Idempotent Demo</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<!--springboot web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--springboot data redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

2、配置连接 Redis 的参数

在 application 配置文件中配置连接 Redis 的参数。Spring Boot 基础就不介绍了,最新教程推荐看下面的教程。

如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
yaml复制代码spring:
redis:
ssl: false
host: 127.0.0.1
port: 6379
database: 0
timeout: 1000
password:
lettuce:
pool:
max-active: 100
max-wait: -1
min-idle: 0
max-idle: 20

3、创建与验证 Token 工具类

创建用于操作 Token 相关的 Service 类,里面存在 Token 创建与验证方法,其中:

  • Token 创建方法: 使用 UUID 工具创建 Token 串,设置以 “idempotent_token:“+“Token 串” 作为 Key,以用户信息当成 Value,将信息存入 Redis 中。
  • Token 验证方法: 接收 Token 串参数,加上 Key 前缀形成 Key,再传入 value 值,执行 Lua 表达式(Lua 表达式能保证命令执行的原子性)进行查找对应 Key 与删除操作。执行完成后验证命令的返回结果,如果结果不为空且非 0,则验证成功,否则失败。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
java复制代码import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class TokenUtilService {

@Autowired
private StringRedisTemplate redisTemplate;

/**
* 存入 Redis 的 Token 键的前缀
*/
private static final String IDEMPOTENT_TOKEN_PREFIX = "idempotent_token:";

/**
* 创建 Token 存入 Redis,并返回该 Token
*
* @param value 用于辅助验证的 value 值
* @return 生成的 Token 串
*/
public String generateToken(String value) {
// 实例化生成 ID 工具对象
String token = UUID.randomUUID().toString();
// 设置存入 Redis 的 Key
String key = IDEMPOTENT_TOKEN_PREFIX + token;
// 存储 Token 到 Redis,且设置过期时间为5分钟
redisTemplate.opsForValue().set(key, value, 5, TimeUnit.MINUTES);
// 返回 Token
return token;
}

/**
* 验证 Token 正确性
*
* @param token token 字符串
* @param value value 存储在Redis中的辅助验证信息
* @return 验证结果
*/
public boolean validToken(String token, String value) {
// 设置 Lua 脚本,其中 KEYS[1] 是 key,KEYS[2] 是 value
String script = "if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end";
RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
// 根据 Key 前缀拼接 Key
String key = IDEMPOTENT_TOKEN_PREFIX + token;
// 执行 Lua 脚本
Long result = redisTemplate.execute(redisScript, Arrays.asList(key, value));
// 根据返回结果判断是否成功成功匹配并删除 Redis 键值对,若果结果不为空和0,则验证通过
if (result != null && result != 0L) {
log.info("验证 token={},key={},value={} 成功", token, key, value);
return true;
}
log.info("验证 token={},key={},value={} 失败", token, key, value);
return false;
}

}

4、创建测试的 Controller 类

创建用于测试的 Controller 类,里面有获取 Token 与测试接口幂等性的接口,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
typescript复制代码import lombok.extern.slf4j.Slf4j;
import mydlq.club.example.service.TokenUtilService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@Slf4j
@RestController
public class TokenController {

@Autowired
private TokenUtilService tokenService;

/**
* 获取 Token 接口
*
* @return Token 串
*/
@GetMapping("/token")
public String getToken() {
// 获取用户信息(这里使用模拟数据)
// 注:这里存储该内容只是举例,其作用为辅助验证,使其验证逻辑更安全,如这里存储用户信息,其目的为:
// - 1)、使用"token"验证 Redis 中是否存在对应的 Key
// - 2)、使用"用户信息"验证 Redis 的 Value 是否匹配。
String userInfo = "mydlq";
// 获取 Token 字符串,并返回
return tokenService.generateToken(userInfo);
}

/**
* 接口幂等性测试接口
*
* @param token 幂等 Token 串
* @return 执行结果
*/
@PostMapping("/test")
public String test(@RequestHeader(value = "token") String token) {
// 获取用户信息(这里使用模拟数据)
String userInfo = "mydlq";
// 根据 Token 和与用户相关的信息到 Redis 验证是否存在对应的信息
boolean result = tokenService.validToken(token, userInfo);
// 根据验证结果响应不同信息
return result ? "正常调用" : "重复调用";
}

}

5、创建 SpringBoot 启动类

创建启动类,用于启动 SpringBoot 应用。基础教程就不介绍了,建议看下下面的教程,很全了。

1
2
3
4
5
6
7
8
9
10
11
typescript复制代码import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

}

6、写测试类进行测试

写个测试类进行测试,多次访问同一个接口,测试是否只有第一次能否执行成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
java复制代码import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.web.context.WebApplicationContext;

@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class IdempotenceTest {

@Autowired
private WebApplicationContext webApplicationContext;

@Test
public void interfaceIdempotenceTest() throws Exception {
// 初始化 MockMvc
MockMvc mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
// 调用获取 Token 接口
String token = mockMvc.perform(MockMvcRequestBuilders.get("/token")
.accept(MediaType.TEXT_HTML))
.andReturn()
.getResponse().getContentAsString();
log.info("获取的 Token 串:{}", token);
// 循环调用 5 次进行测试
for (int i = 1; i <= 5; i++) {
log.info("第{}次调用测试接口", i);
// 调用验证接口并打印结果
String result = mockMvc.perform(MockMvcRequestBuilders.post("/test")
.header("token", token)
.accept(MediaType.TEXT_HTML))
.andReturn().getResponse().getContentAsString();
log.info(result);
// 结果断言
if (i == 0) {
Assert.assertEquals(result, "正常调用");
} else {
Assert.assertEquals(result, "重复调用");
}
}
}

}

显示如下:

1
2
3
4
5
6
7
8
9
10
11
csharp复制代码[main] IdempotenceTest:  获取的 Token 串:980ea707-ce2e-456e-a059-0a03332110b4
[main] IdempotenceTest: 第1次调用测试接口
[main] IdempotenceTest: 正常调用
[main] IdempotenceTest: 第2次调用测试接口
[main] IdempotenceTest: 重复调用
[main] IdempotenceTest: 第3次调用测试接口
[main] IdempotenceTest: 重复调用
[main] IdempotenceTest: 第4次调用测试接口
[main] IdempotenceTest: 重复调用
[main] IdempotenceTest: 第5次调用测试接口
[main] IdempotenceTest: 重复调用

八、最后总结

幂等性是开发当中很常见也很重要的一个需求,尤其是支付、订单等与金钱挂钩的服务,保证接口幂等性尤其重要。在实际开发中,我们需要针对不同的业务场景我们需要灵活的选择幂等性的实现方式:

  • 对于下单等存在唯一主键的,可以使用 “唯一主键方案” 的方式实现。
  • 对于更新订单状态等相关的更新场景操作,使用 “乐观锁方案” 实现更为简单。
  • 对于上下游这种,下游请求上游,上游服务可以使用 “下游传递唯一序列号方案” 更为合理。
  • 类似于前端重复提交、重复下单、没有唯一 ID 号的场景,可以通过 Token 与 Redis 配合的 “防重 Token 方案” 实现更为快捷。

上面只是给与一些建议,再次强调一下,实现幂等性需要先理解自身业务需求,根据业务逻辑来实现这样才合理,处理好其中的每一个结点细节,完善整体的业务流程设计,才能更好的保证系统的正常运行。最后做一个简单总结,然后本博文到此结束,如下:

方案名称 适用方法 实现复杂度 方案缺点
数据库唯一主键 插入操作 删除操作 简单 - 只能用于插入操作;- 只能用于存在唯一主键场景;
数据库乐观锁 更新操作 简单 - 只能用于更新操作;- 表中需要额外添加字段;
请求序列号 插入操作 更新操作 删除操作 简单 - 需要保证下游生成唯一序列号;- 需要 Redis 第三方存储已经请求的序列号;
防重 Token 令牌 插入操作 更新操作 删除操作 适中 - 需要 Redis 第三方存储生成的 Token 串;

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

给Mybatis-Plus插上小翅膀,支持多表查询

发表于 2021-03-21

之前一直使用Mybatis-Plus,说实话,个人还是比较喜欢Mybatis-Plus。

ORM框架用的比较多的就两个,JPA和Mybatis。据说国内用Mybatis比较多,国外用JPA比较多。

而Mybatis-Plus是在Mybatis的基础上,增加了很多牛🍺的功能。

再粘一下官网介绍的特性,又啰嗦了:

  1. 无侵入:只做增强不做改变,引入它不会对现有工程产生影响,如丝般顺滑
  2. 损耗小:启动即会自动注入基本 CURD,性能基本无损耗,直接面向对象操作
  3. 强大的 CRUD 操作:内置通用 Mapper、通用 Service,仅仅通过少量配置即可实现单表大部分 CRUD 操作,更有强大的条件构造器,满足各类使用需求
  4. 支持 Lambda 形式调用:通过 Lambda 表达式,方便的编写各类查询条件,无需再担心字段写错
  5. 支持主键自动生成:支持多达 4 种主键策略(内含分布式唯一 ID 生成器 - Sequence),可自由配置,完美解决主键问题
  6. 支持 ActiveRecord 模式:支持 ActiveRecord 形式调用,实体类只需继承 Model 类即可进行强大的 CRUD 操作
  7. 支持自定义全局通用操作:支持全局通用方法注入( Write once, use anywhere )
  8. 内置代码生成器:采用代码或者 Maven 插件可快速生成 Mapper 、 Model 、 Service 、 Controller 层代码,支持模板引擎,更有超多自定义配置等您来使用
  9. 内置分页插件:基于 MyBatis 物理分页,开发者无需关心具体操作,配置好插件之后,写分页等同于普通 List 查询
  10. 分页插件支持多种数据库:支持 MySQL、MariaDB、Oracle、DB2、H2、HSQL、SQLite、Postgre、SQLServer 等多种数据库
  11. 内置性能分析插件:可输出 Sql 语句以及其执行时间,建议开发测试时启用该功能,能快速揪出慢查询
  12. 内置全局拦截插件:提供全表 delete 、 update 操作智能分析阻断,也可自定义拦截规则,预防误操作

详细的可以去官网看:mybatis.plus/
官网新域名也是牛🍺。反正用过的都说好。

至于JPA,虽然个人觉得有点太死板,不过也有值得学习的地方。

很早以前,用Mybatis-Plus的时候,有一个比较麻烦的问题,就是如果一组数据存在多张表中,这些表之间可能是一对一,一对多或者多对一,那我要想全部查出来就要调好几个Mapper的查询方法。代码行数一下就增加了很多。

之前也看过Mybatis-Plus的源码,想过如何使Mybatis-Plus支持多表联接查询。可是发现难度不小。因为Mybatis-Plus底层就只支持单表。

最近看到JPA的@OneToOne、@OneToMany、@ManyToMany这些注解,忽然一个想法就在我的脑海里闪现出来,如果像JPA那样使用注解的方式,是不是简单很多呢?

事先声明,全是自己想的,没有看JPA源码, 所以实现方式可能和JPA不一样。

说干就干

  1. 添加注解
  2. 处理注解
  3. 打包发布

可能有人不知道,其实Mybatis也是支持拦截器的,既然如此,用拦截器处理注解就可以啦。

注解 One2One

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码@Inherited
@Documented
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface One2One {

/**
* 本类主键列名
*/
String self() default "id";

/**
* 本类主键在关联表中的列名
*/
String as();

/**
* 关联的 mapper
*/
Class<? extends BaseMapper> mapper();

}

说一下,假如有两张表,A和B是一对一的关系,A表id在B表中是a_id,用这样的方式关联的。
在A的实体类中使用这个注解,self就是id,而as就是a_id,意思就是A的id作为a_id来查询,而mapper就是B的Mapper,下面是例子A就是UserAccount,B就是UserAddress。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@ApiModel(value = "UserAccount对象", description = "用户相关")
public class UserAccount extends Model<UserAccount> {

private static final long serialVersionUID = 1L;

@ApiModelProperty(value = "id")
@TableId(value = "id", type = IdType.AUTO)
private Long id;

@ApiModelProperty(value = "昵称")
private String nickName;

@TableField(exist = false)
//把id的值 作为userId 在 UserAddressMapper中 查询
@One2One(self = "id", as = "user_id", mapper = UserAddressMapper.class)
private UserAddress address;

@Override
protected Serializable pkVal() {
return this.id;
}
}

Mybatis拦截器 One2OneInterceptor

这里不再详细介绍拦截器了,之前也写了几篇关于Mybatis拦截器的,有兴趣的可以去看看。

Mybatis拦截器实现Geometry类型数据存储与查询

Mybatis拦截器打印完整SQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
java复制代码@Intercepts({
@Signature(type = ResultSetHandler.class, method = "handleResultSets", args = {Statement.class})
})
@Slf4j
public class One2OneInterceptor implements Interceptor {

@Override
public Object intercept(Invocation invocation) throws Throwable {
Object result = invocation.proceed();
if (result == null) {
return null;
}
if (result instanceof ArrayList) {
ArrayList list = (ArrayList) result;
for (Object o : list) {
handleOne2OneAnnotation(o);
}
} else {
handleOne2OneAnnotation(result);
}
return result;
}

@SneakyThrows
private void handleOne2OneAnnotation(Object o) {
Class<?> aClass = o.getClass();
Field[] fields = aClass.getDeclaredFields();
for (Field field : fields) {
field.setAccessible(true);
One2One one2One = field.getAnnotation(One2One.class);
if (one2One != null) {
String self = one2One.self();
Object value = MpExtraUtil.getValue(o, self);
String as = one2One.as();
Class<? extends BaseMapper> mapper = one2One.mapper();
BaseMapper baseMapper = SpringBeanFactoryUtils.getApplicationContext().getBean(mapper);
QueryWrapper<Object> eq = Condition.create().eq(as, value);
Object one = baseMapper.selectOne(eq);
field.set(o, one);
}
}
}

@Override
public Object plugin(Object o) {
return Plugin.wrap(o, this);
}

@Override
public void setProperties(Properties properties) {
}
}

Mybatis拦截器可以针对不同的场景进行拦截,比如:

  1. Executor:拦截执行器的方法。
  2. ParameterHandler:拦截参数的处理。
  3. ResultHandler:拦截结果集的处理。
  4. StatementHandler:拦截Sql语法构建的处理。

这里是通过拦截结果集的方式,在返回的对象上查找这个注解,找到注解后,再根据注解的配置,自动去数据库查询,查到结果后把数据封装到返回的结果集中。这样就避免了自己去多次调Mapper的查询方法。

难点:虽然注解上标明了是什么Mapper,可是在拦截器中取到的还是BaseMapper,而用BaseMapper实在不好查询,我试了很多方法,不过还好Mybatis-Plus支持使用Condition.create().eq(as, value);拼接条件SQL,然后可以使用baseMapper.selectOne(eq);去查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public class MpExtraUtil {

@SneakyThrows
public static Object getValue(Object o, String name) {
Class<?> aClass = o.getClass();
Field[] fields = aClass.getDeclaredFields();
for (Field field : fields) {
field.setAccessible(true);
if (field.getName().equals(name)) {
return field.get(o);
}
}
throw new IllegalArgumentException("未查询到名称为:" + name + " 的字段");
}
}

MpExtraUtil就是使用反射的方式,获取id的值。

再讲一个多对多的注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码@Inherited
@Documented
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Many2Many {

/**
* 本类主键列名
*/
String self() default "id";

/**
* 本类主键在中间表的列名
*/
String leftMid();

/**
* 另一个多方在中间表中的列名
*/
String rightMid();

/**
* 另一个多方在本表中的列名
*/
String origin();

/**
* 关联的 mapper
*/
Class<? extends BaseMapper> midMapper();

/**
* 关联的 mapper
*/
Class<? extends BaseMapper> mapper();

}

假设有A、 A_B,B三张表,在A的实体类中使用这个注解, self就是A表主键id,leftMid就是A表的id在中间表中的名字,也就是a_id,而rightMid是B表主键在中间表的名字,就是b_id, origin就是B表自己主键原来的名字,即id,midMapper是中间表的Mapper,也就是A_B对应的Mapper,mapper是B表的Mapper。

这个确实有点绕。

还有一个@One2Many就不说了,和@One2One一样,至于Many2One,从另一个角度看就是@One2One。

使用方法

先把表创建好,然后代码生成器一键生成代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
sql复制代码CREATE TABLE `user_account` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`nick_name` varchar(40) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '昵称',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='用户相关';

CREATE TABLE `user_address` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '地址id',
`user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
`address` varchar(200) DEFAULT NULL COMMENT '详细地址',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;


CREATE TABLE `user_class` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '课程id',
`class_name` varchar(20) DEFAULT NULL COMMENT '课程名称',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;


CREATE TABLE `user_hobby` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '爱好id',
`user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
`hobby` varchar(40) DEFAULT NULL COMMENT '爱好名字',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;


CREATE TABLE `user_mid_class` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '中间表id',
`user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
`class_id` bigint(20) DEFAULT NULL COMMENT '课程id',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

添加依赖,已经发布到中央仓库了,可以直接使用:

1
2
3
4
5
xml复制代码<dependency>
<groupId>top.lww0511</groupId>
<artifactId>mp-extra</artifactId>
<version>1.0.1</version>
</dependency>

在启动类上添加注解

@EnableMpExtra

配置拦截器

因为一般项目都会配置自己的MybatisConfiguration,我在这里配置后,打包,然后被引入,是无法生效的。

所以就想了一种折中的方法。

以前MybatisConfiguration是通过new出来的,现在通过MybatisExtraConfig.getMPConfig();来获取,这样获取到的MybatisConfiguration就已经添加好了拦截器。

完整Mybatis-Plus配置类例子,注意第43行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
java复制代码@Slf4j
@Configuration
@MapperScan(basePackages = "com.ler.demo.mapper", sqlSessionTemplateRef = "sqlSessionTemplate")
public class MybatisConfig {

private static final String BASE_PACKAGE = "com.ler.demo.";

@Bean("dataSource")
public DataSource dataSource() {
try {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUrl("jdbc:mysql://localhost/mp-extra?useUnicode=true&useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai");
dataSource.setUsername("root");
dataSource.setPassword("adminadmin");

dataSource.setInitialSize(1);
dataSource.setMaxActive(20);
dataSource.setMinIdle(1);
dataSource.setMaxWait(60_000);
dataSource.setPoolPreparedStatements(true);
dataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
dataSource.setTimeBetweenEvictionRunsMillis(60_000);
dataSource.setMinEvictableIdleTimeMillis(300_000);
dataSource.setValidationQuery("SELECT 1");
return dataSource;
} catch (Throwable throwable) {
log.error("ex caught", throwable);
throw new RuntimeException();
}
}

@Bean(name = "sqlSessionFactory")
public SqlSessionFactory sqlSessionFactory(@Qualifier("dataSource") DataSource dataSource) throws Exception {
MybatisSqlSessionFactoryBean factoryBean = new MybatisSqlSessionFactoryBean();
factoryBean.setDataSource(dataSource);
factoryBean.setVfs(SpringBootVFS.class);
factoryBean.setTypeAliasesPackage(BASE_PACKAGE + "entity");

Resource[] mapperResources = new PathMatchingResourcePatternResolver().getResources("classpath*:mapper/*.xml");
factoryBean.setMapperLocations(mapperResources);
// 43行 获取配置
MybatisConfiguration configuration = MybatisExtraConfig.getMPConfig();
configuration.setDefaultScriptingLanguage(MybatisXMLLanguageDriver.class);
configuration.setJdbcTypeForNull(JdbcType.NULL);
configuration.setMapUnderscoreToCamelCase(true);
configuration.addInterceptor(new SqlExplainInterceptor());
configuration.setUseGeneratedKeys(true);
factoryBean.setConfiguration(configuration);
return factoryBean.getObject();
}

@Bean(name = "sqlSessionTemplate")
public SqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}

@Bean(name = "transactionManager")
public PlatformTransactionManager platformTransactionManager(@Qualifier("dataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}

@Bean(name = "transactionTemplate")
public TransactionTemplate transactionTemplate(@Qualifier("transactionManager") PlatformTransactionManager transactionManager) {
return new TransactionTemplate(transactionManager);
}

}

在实体类上建立关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@ApiModel(value = "UserAccount对象", description = "用户相关")
public class UserAccount extends Model<UserAccount> {

private static final long serialVersionUID = 1L;

@ApiModelProperty(value = "id")
@TableId(value = "id", type = IdType.AUTO)
private Long id;

@ApiModelProperty(value = "昵称")
private String nickName;

@TableField(exist = false)
//把id的值 作为userId 在 UserAddressMapper中 查询
@One2One(self = "id", as = "user_id", mapper = UserAddressMapper.class)
private UserAddress address;

@TableField(exist = false)
@One2Many(self = "id", as = "user_id", mapper = UserHobbyMapper.class)
private List<UserHobby> hobbies;

@TableField(exist = false)
@Many2Many(self = "id", leftMid = "user_id", rightMid = "class_id", origin = "id"
, midMapper = UserMidClassMapper.class, mapper = UserClassMapper.class)
private List<UserClass> classes;

@Override
protected Serializable pkVal() {
return this.id;
}

}

主要是那几个注解。对了还要加@TableField(exist = false),不然会报错。

测试接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码@Slf4j
@RestController
@RequestMapping("/user")
@Api(value = "/user", description = "用户")
public class UserAccountController {

@Resource
private UserAccountService userAccountService;

@Resource
private UserAccountMapper userAccountMapper;

@ApiOperation("查询一个")
@ApiImplicitParams({
@ApiImplicitParam(name = "", value = "", required = true),
})
@GetMapping(value = "/one", name = "查询一个")
public HttpResult one() {
//service
UserAccount account = userAccountService.getById(1L);
//mapper
// UserAccount account = userAccountMapper.selectById(1L);
//AR模式
// UserAccount account = new UserAccount();
// account.setId(1L);
// account = account.selectById();
return HttpResult.success(account);
}
}

接口非常简单,调用内置的getById,可是却查出了所有相关的数据,这都是因为配置的那些注解。


可以看到其实发送了好几条SQL。第一条是userAccountService.getById(1L),后面几条都是自动发送的。


源码 gitee.com/github-2635…

GitHub: github.com/LerDer/mp-e…

示例 gitee.com/github-2635…

总结

实在不想贴太多代码,其实还是挺简单的,源码地址还有示例地址都贴出来啦,有兴趣的可以去看一下。觉得好用可以点个Star。欢迎大家一起来贡献。

最后欢迎大家关注我的公众号,共同学习,一起进步。加油🤣

搜索 南诏Blog 关注

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java8 中的 Stream 那么彪悍,你知道它的原理是什

发表于 2021-03-21

公众号:Java小咖秀,网站:javaxks.com

作者 : 岁月安然 ,链接: elsef.com/2019/09/16/Java8中Stream的原理分析

Java 8 API 添加了一个新的抽象称为流 Stream,可以让你以一种声明的方式处理数据。

Stream 使用一种类似用 SQL 语句从数据库查询数据的直观方式来提供一种对 Java 集合运算和表达的高阶抽象。

Stream API 可以极大提高 Java 程序员的生产力,让程序员写出高效率、干净、简洁的代码。

本文会对 Stream 的实现原理进行剖析。

Stream 的组成与特点

Stream(流)是一个来自数据源的元素队列并支持聚合操作:

  • 元素是特定类型的对象,形成一个队列。Java中的Stream并不会向集合那样存储和管理元素,而是按需计算
  • 数据源流的来源可以是集合Collection、数组Array、I/O channel, 产生器generator 等
  • 聚合操作类似SQL语句一样的操作, 比如filter, map, reduce, find, match, sorted等
    和以前的Collection操作不同, Stream 操作还有两个基础的特征:
  • Pipelining: 中间操作都会返回流对象本身。这样多个操作可以串联成一个管道, 如同流式风格(fluent style)。这样做可以对操作进行优化, 比如延迟执行 (laziness evaluation) 和短路 ( short-circuiting)
  • 内部迭代:以前对集合遍历都是通过Iterator或者For-Each的方式, 显式的在集合外部进行迭代, 这叫做外部迭代。Stream提供了内部迭代的方式, 通过访问者模式 (Visitor) 实现。
    和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。

Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。Java 的并行 API 演变历程基本如下:

1
2
3
4
5
sql复制代码1.0-1.4 中的 java.lang.Thread  
5.0 中的 java.util.concurrent
6.0 中的 Phasers 等
7.0 中的 Fork/Join 框架
8.0 中的 Lambda

Stream具有平行处理能力,处理的过程会分而治之,也就是将一个大任务切分成多个小任务,这表示每个任务都是一个操作:

1
2
3
ini复制代码List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEach(out::println);

可以看到一行简单的代码就帮我们实现了并行输出集合中元素的功能,但是由于并行执行的顺序是不可控的所以每次执行的结果不一定相同。

如果非得相同可以使用forEachOrdered方法执行终止操作:

1
2
3
ini复制代码List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEachOrdered(out::println);

这里有一个疑问,如果结果需要有序,是否和我们的并行执行的初衷相悖?是的,这个场景下明显无需使用并行流,直接用串行流执行即可, 否则性能可能更差,因为最后又强行将所有并行结果进行了排序。

OK,下面我们先介绍一下Stream接口的相关知识。

BaseStream 接口

Stream的父接口是BaseStream,后者是所有流实现的顶层接口,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public interface BaseStream<T, S extends BaseStream<T, S>>
extends AutoCloseable {
Iterator<T> iterator();

Spliterator<T> spliterator();

boolean isParallel();

S sequential();

S parallel();

S unordered();

S onClose(Runnable closeHandler);

void close();
}

其中,T为流中元素的类型,S为一个BaseStream的实现类,它里面的元素也是T并且S同样是自己:

S extends BaseStream<T, S>

是不是有点晕?

其实很好理解,我们看一下接口中对S的使用就知道了:如sequential()、parallel()这两个方法,它们都返回了S实例,也就是说它们分别支持对当前流进行串行或者并行的操作,并返回「改变」后的流对象。

如果是并行一定涉及到对当前流的拆分,即将一个流拆分成多个子流,子流肯定和父流的类型是一致的。子流可以继续拆分子流,一直拆分下去…

也就是说这里的S是BaseStream的一个实现类,它同样是一个流,比如Stream、IntStream、LongStream等。

Stream 接口

再来看一下Stream的接口声明:

1
java复制代码public interface Stream<T> extends BaseStream<T, Stream<T>>

参考上面的解释这里不难理解:即Stream可以继续拆分为Stream,我们可以通过它的一些方法来证实:

1
2
3
4
5
6
7
8
java复制代码    Stream<T> filter(Predicate<? super T> predicate);
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
Stream<T> sorted();
Stream<T> peek(Consumer<? super T> action);
Stream<T> limit(long maxSize);
Stream<T> skip(long n);
...

这些都是操作流的中间操作,它们的返回结果必须是流对象本身。

关闭流操作

BaseStream 实现了 AutoCloseable 接口,也就是 close() 方法会在流关闭时被调用。同时,BaseStream 中还给我们提供了onClose()方法:

1
2
java复制代码/** * Returns an equivalent stream with an additional close handler. Close * handlers are run when the {@link #close()} method * is called on the stream, and are executed in the order they were * added. All close handlers are run, even if earlier close handlers throw * exceptions. If any close handler throws an exception, the first * exception thrown will be relayed to the caller of {@code close()}, with * any remaining exceptions added to that exception as suppressed exceptions * (unless one of the remaining exceptions is the same exception as the * first exception, since an exception cannot suppress itself.) May * return itself. * * <p>This is an <a href="package-summary.html#StreamOps">intermediate * operation</a>. * * @param closeHandler A task to execute when the stream is closed * @return a stream with a handler that is run if the stream is closed */
S onClose(Runnable closeHandler);

当AutoCloseable的close()接口被调用的时候会触发调用流对象的onClose()方法,但有几点需要注意:

  • onClose() 方法会返回流对象本身,也就是说可以对改对象进行多次调用
  • 如果调用了多个onClose() 方法,它会按照调用的顺序触发,但是如果某个方法有异常则只会向上抛出第一个异常
  • 前一个 onClose() 方法抛出了异常不会影响后续 onClose() 方法的使用
  • 如果多个 onClose() 方法都抛出异常,只展示第一个异常的堆栈,而其他异常会被压缩,只展示部分信息

并行流和串行流

BaseStream接口中分别提供了并行流和串行流两个方法,这两个方法可以任意调用若干次,也可以混合调用,但最终只会以最后一次方法调用的返回结果为准。

参考parallel()方法的说明:

Returns an equivalent stream that is parallel. May return
itself, either because the stream was already parallel, or because
the underlying stream state was modified to be parallel.

所以多次调用同样的方法并不会生成新的流,而是直接复用当前的流对象。

下面的例子里以最后一次调用parallel()为准,最终是并行地计算sum:

1
2
3
4
5
6
java复制代码stream.parallel()
.filter(...)
.sequential()
.map(...)
.parallel()
.sum();

ParallelStream 背后的男人:ForkJoinPool

ForkJoin 框架是从 JDK7 中新特性,它同 ThreadPoolExecutor 一样,也实现了 Executor 和 ExecutorService 接口。它使用了一个「无限队列」来保存需要执行的任务,而线程的数量则是通过构造函数传入, 如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的 CPU 数量会被设置为线程数量作为默认值。

ForkJoinPool 主要用来使用分治法 (Divide-and-Conquer Algorithm) 来解决问题,典型的应用比如快速排序算法。这里的要点在于,ForkJoinPool 需要使用相对少的线程来处理大量的任务。比如要对 1000 万个数据进行排序,那么会将这个任务分割成两个500 万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于 500 万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于 10 时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概 2000000 + 个。

问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行,想象一下归并排序的过程。

所以当使用 ThreadPoolExecutor 时,使用分治法会存在问题,因为 ThreadPoolExecutor 中的线程无法向 任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。

那么使用 ThreadPoolExecutor 或者 ForkJoinPool,会有什么性能的差异呢?

首先,使用 ForkJoinPool 能够使用数量有限的线程来完成非常多的具有「父子关系」的任务,比如使用 4 个线程来完成超过 200 万个任务。使用 ThreadPoolExecutor 时,是不可能完成的,因为 ThreadPoolExecutor 中的 Thread 无法选择优先执行子任务,需要完成 200 万个具有父子关系的任务时,也需要 200 万个线程,显然这是不可行的。

Work Stealing 原理:

-(1)每个工作线程都有自己的工作队列 WorkQueue;-(2)这是一个双端队列 dequeue,它是线程私有的;-(3)ForkJoinTask 中 fork 的子任务,将放入运行该任务的工作线程的队头,工作线程将以 LIFO 的顺序来处理工作队列中的任务,即堆栈的方式;-(4)为了最大化地利用 CPU,空闲的线程将从其它线程的队列中「窃取」任务来执行;-(5)但是是从工作队列的尾部窃取任务,以减少和队列所属线程之间的竞争;-(6)双端队列的操作:push()/pop() 仅在其所有者工作线程中调用,poll() 是由其它线程窃取任务时调用的;-(7)当只剩下最后一个任务时,还是会存在竞争,是通过 CAS 来实现的;

用 ForkJoinPool 的眼光来看 ParallelStream

Java 8 为 ForkJoinPool 添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是 ForkJoinPool 类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的 CPU 数量。当调用 Arrays 类上添加的新方法时,自动并行化就会发生。比如用来排序一个数组的并行快速排序,用来对一个数组中的元素进行并行遍历。自动并行化也被运用在 Java 8 新添加的 Stream API 中。

比如下面的代码用来遍历列表中的元素并执行需要的操作:

1
2
3
java复制代码List<UserInfo> userInfoList =
DaoContainers.getUserInfoDAO().queryAllByList(new UserInfoModel());
userInfoList.parallelStream().forEach(RedisUserApi::setUserIdUserInfo);

对于列表中的元素的操作都会以并行的方式执行。forEach方法会为每个元素的计算操作创建一个任务,该任务会被前文中提到的ForkJoinPool中的 commonPool 处理。以上的并行计算逻辑当然也可以使用ThreadPoolExecutor完成,但是就代码的可读性和代码量而言,使用ForkJoinPool明显更胜一筹。

对于ForkJoinPool通用线程池的线程数量,通常使用默认值就可以了,即运行时计算机的处理器数量。也可以通过设置系统属性:-Djava.util.concurrent .ForkJoinPool.common.parallelism=N(N 为线程数量), 来调整ForkJoinPool的线程数量。

值得注意的是,当前执行的线程也会被用来执行任务,所以最终的线程个数为N+1,1 就是当前的主线程。

这里就有一个问题,如果你在并行流的执行计算使用了阻塞操作,如 I/O,那么很可能会导致一些问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public static String query(String question) {
List<String> engines = new ArrayList<String>();
engines.add("http://www.google.com/?q=");
engines.add("http://duckduckgo.com/?q=");
engines.add("http://www.bing.com/search?q=");

// get element as soon as it is available
Optional<String> result = engines.stream().parallel().map((base) - {
String url = base + question;
// open connection and fetch the result
return WS.url(url).get();
}).findAny();
return result.get();
}

这个例子很典型,让我们来分析一下:

  • 这个并行流计算操作将由主线程和 JVM 默认的ForkJoinPool.commonPool()来共同执行。
  • map中是一个阻塞方法,需要通过访问HTTP接口并得到它的response,所以任何一个 worker 线程在执行到这里的时候都会阻塞并等待结果。
  • 所以当此时再其他地方通过并行流方式调用计算方法的时候,将会受到此处阻塞等待的方法的影响。
  • 目前的ForkJoinPool的实现并未考虑补偿等待那些阻塞在等待新生成的线程的工作 worker 线程,所以最终ForkJoinPool.commonPool()中的线程将备用光并且阻塞等待。
    ?正如我们上面那个列子的情况分析得知,lambda 的执行并不是瞬间完成的, 所有使用 parallel streams 的程序都有可能成为阻塞程序的源头, 并且在执行过程中程序中的其他部分将无法访问这些 workers,这意味着任何依赖 parallel streams 的程序在什么别的东西占用着 common ForkJoinPool 时将会变得不可预知并且暗藏危机。

小结:

1
2
3
4
5
6
python复制代码当需要处理递归分治算法时,考虑使用 ForkJoinPool。
仔细设置不再进行任务划分的阈值,这个阈值对性能有影响。
Java 8 中的一些特性会使用到 ForkJoinPool 中的通用线程池。在某些场合下,需要调整该线程池的默认的线程数量
lambda 应该尽量避免副作用,也就是说,避免突变基于堆的状态以及任何 IO
lambda 应该互不干扰,也就是说避免修改数据源(因为这可能带来线程安全的问题)
避免访问在流操作生命周期内可能会改变的状态

并行流的性能

并行流框架的性能受以下因素影响:

  • 数据大小:数据够大,每个管道处理时间够长,并行才有意义;
  • 源数据结构:每个管道操作都是基于初始数据源,通常是集合,将不同的集合数据源分割会有一定消耗;
  • 装箱:处理基本类型比装箱类型要快;
  • 核的数量:默认情况下,核数量越多,底层 fork/join 线程池启动线程就越多;
  • 单元处理开销:花在流中每个元素身上的时间越长,并行操作带来的性能提升越明显;

源数据结构分为以下 3 组:

  • 性能好:ArrayList、数组或IntStream.range(数据支持随机读取,能轻易地被任意分割)
  • 性能一般:HashSet、TreeSet(数据不易公平地分解,大部分也是可以的)
  • 性能差:LinkedList(需要遍历链表,难以对半分解)、Stream.iterate和BufferedReader.lines(长度未知,难以分解)
    注意:下面几个部分节选自:Streams 的幕后原理,顺便感谢一下作者 Brian Goetz,写的太通透了。

NQ 模型

要确定并行性是否会带来提速,需要考虑的最后两个因素是:可用的数据量和针对每个数据元素执行的计算量。

在我们最初的并行分解描述中,我们采用的概念是拆分来源,直到分段足够小,以致解决该分段上的问题的顺序方法更高效。分段大小必须依赖于所解决的问题,确切的讲,取决于每个元素完成的工作量。例如,计算一个字符串的长度涉及的工作比计算字符串的 SHA-1 哈希值要少得多。为每个元素完成的工作越多,“大到足够利用并行性” 的阈值就越低。类似地,拥有的数据越多, 拆分的分段就越多,而不会与 “太小” 阈值发生冲突。

一个简单但有用的并行性能模型是 NQ 模型,其中 N 是数据元素数量,Q 是为每个元素执行的工作量。乘积 N*Q 越大,就越有可能获得并行提速。对于具有很小的 Q 的问题,比如对数字求和,您通常可能希望看到 N > 10,000 以获得提速;随着 Q 增加,获得提速所需的数据大小将会减小。

并行化的许多阻碍(比如拆分成本、组合成本或遇到顺序敏感性)都可以通过 Q 更高的操作来缓解。尽管拆分某个 LinkedList 特征的结果可能很糟糕,但只要拥有足够大的 Q,仍然可能获得并行提速。

遇到顺序

遇到顺序指的是来源分发元素的顺序是否对计算至关重要。一些来源(比如基于哈希的集合和映射)没有有意义的遇到顺序。流标志 ORDERED 描述了流是否有有意义的遇到顺序。JDK 集合的 spliterator 会根据集合的规范来设置此标志;一些中间操作可能注入 ORDERED (sorted()) 或清除它 (unordered())。

如果流没有遇到顺序,大部分流操作都必须遵守该顺序。对于顺序执行,会「自动保留遇到顺序」,因为元素会按遇到它们的顺序自然地处理。甚至在并行执行中,许多操作(无状态中间操作和一些终止操作(比如 reduce())),遵守遇到顺序不会产生任何实际成本。但对于其他操作(有状态中间操作,其语义与遇到顺序关联的终止操作,比如 findFirst() 或 forEachOrdered()), 在并行执行中遵守遇到顺序的责任可能很重大。如果流有一个已定义的遇到顺序,但该顺序对结果没有意义, 那么可以通过使用 unordered() 操作删除 ORDERED 标志,加速包含顺序敏感型操作的管道的顺序执行。

作为对遇到顺序敏感的操作的示例,可以考虑 limit(),它会在指定大小处截断一个流。在顺序执行中实现 limit() 很简单:保留一个已看到多少元素的计数器,在这之后丢弃任何元素。但是在并行执行中,实现 limit() 要复杂得多;您需要保留前 N 个元素。此要求大大限制了利用并行性的能力;如果输入划分为多个部分,您只有在某个部分之前的所有部分都已完成后,才知道该部分的结果是否将包含在最终结果中。因此,该实现一般会错误地选择不使用所有可用的核心,或者缓存整个试验性结果,直到您达到目标长度。

如果流没有遇到顺序,limit() 操作可以自由选择任何 N 个元素,这让执行效率变得高得多。知道元素后可立即将其发往下游, 无需任何缓存,而且线程之间唯一需要执行的协调是发送一个信号来确保未超出目标流长度。

遇到顺序成本的另一个不太常见的示例是排序。如果遇到顺序有意义,那么 sorted() 操作会实现一种稳定 排序 (相同的元素按照它们进入输入时的相同顺序出现在输出中),而对于无序的流,稳定性(具有成本)不是必需的。distinct() 具有类似的情况:如果流有一个遇到顺序,那么对于多个相同的输入元素,distinct() 必须发出其中的第一个, 而对于无序的流,它可以发出任何元素 — 同样可以获得高效得多的并行实现。

在您使用 collect() 聚合时会遇到类似的情形。如果在无序流上执行 collect(groupingBy()) 操作, 与任何键对应的元素都必须按它们在输入中出现的顺序提供给下游收集器。此顺序对应用程序通常没有什么意义,而且任何顺序都没有意义。在这些情况下,可能最好选择一个并发 收集器(比如 groupingByConcurrent()),它可以忽略遇到顺序, 并让所有线程直接收集到一个共享的并发数据结构中(比如 ConcurrentHashMap),而不是让每个线程收集到它自己的中间映射中, 然后再合并中间映射(这可能产生很高的成本)。

什么时候该使用并行流

谈了这么多,关于并行流parallelStream的使用注意事项需要格外注意,它并不是解决性能的万金油,相反,如果使用不当会严重影响性能。我会在另外一篇文章里单独谈这个问题。

References

  • movingon.cn/2017/05/02/…
  • www.jianshu.com/p/bd825cb89…
  • jrebel.com/rebellabs/j…
  • blog.csdn.net/weixx3/arti…
  • www.ibm.com/developerwo…
  • www.ibm.com/developerwo…
  • juejin.cn/post/684490…
  • stackoverrun.com/cn/q/103411…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 多线程 并行处理 Fork Join

发表于 2021-03-20

总文档 :文章目录

Github : github.com/black-ant

Fork / Join 是一个工具框架 , 其核心思想在于将一个大运算切成多个小份 , 最大效率的利用资源 , 其主要涉及到三个类 : ForkJoinPool / ForkJoinTask / RecursiveTask

一 . Fork / Join 入门

什么是 Fork / Join
该框架是一个工具 , 通过分而治之的方式尝试将所有可用的处理器内核使用起来帮助加速并行处理

  • fork : 递归地将任务分解为较小的独立子任务 , 直到它们足够简单以便异步执行
  • join : 将所有子任务的结果递归的连接成单个结果

原理简述 :

  • Fork / Join 的执行是先把一个大任务分解(fork)成许多个独立的小任务,然后起多线程并行去处理这些小任务。处理完得到结果后再进行合并(join)就得到我们的最终结果。
  • Fork / Join 使用的算法为 work-stealing(工作窃取)
    • 该算法会把分解的小任务放在多个双端队列中,而线程在队列的头和尾部都可获取任务。
    • 当有线程把当前负责队列的任务处理完之后,它还可以从那些还没有处理完的队列的尾部窃取任务来处理

Fork / Join 线程池 :

  • ForkJoinPool : 用于管理 ForkJoinWorkerThread 类型的工作线程
    • 实现了 ExecutorService接口 的多线程处理器
    • 把一个大的任务划分为若干个小的任务并发执行,充分利用可用的资源,进而提高应用的执行效率

参考 @ blog.csdn.net/tyrroo/arti…

ForkJoin.png

二 . 说一说 RecursiveTask

RecursiveTask 是一种 ForkJoinTask 的递归实现 , 例如可以用于计算斐波那契数列 :

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码 class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) { this.n = n; }
Integer compute() {
if (n <= 1)
return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}

RecursiveTask 继承了 ForkJoinTask 接口 ,其内部有几个主要的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码
// Node 1 : 返回结果 , 存放最终结果
V result;

// Node 2 : 抽象方法 compute , 用于计算最终结果
protected abstract V compute();

// Node 3 : 获取最终结果
public final V getRawResult() {
return result;
}

// Node 4 : 最终执行方法 , 这里是需要调用具体实现类compute
protected final boolean exec() {
result = compute();
return true;
}

常见使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码@ 
public class ForkJoinPoolService extends RecursiveTask<Integer> {

private static final int THRESHOLD = 2; //阀值
private int start;
private int end;

public ForkJoinPoolService(Integer start, Integer end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
int middle = (start + end) / 2;
ForkJoinPoolService leftTask = new ForkJoinPoolService(start, middle);
ForkJoinPoolService rightTask = new ForkJoinPoolService(middle + 1, end);
//执行子任务
leftTask.fork();
rightTask.fork();
//等待子任务执行完,并得到其结果
Integer rightResult = rightTask.join();
Integer leftResult = leftTask.join();
//合并子任务
sum = leftResult + rightResult;
}
return sum;
}

}

三 . Fork Join 用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码// 前提 : 需要继承 RecursiveTask<Integer> 类 , 且实现 compute 方法
public class ForkJoinPoolReferenceService extends RecursiveTask<Integer> {

private File file;
private Integer salt;

public ForkJoinPoolReferenceService(File file, Integer salt) {
this.file = file;
this.salt = salt;
}

@Override
protected Integer compute() {
return ForkFileUtils.read(file, salt);
}
}


// 方式一 : Fork Join 方式
ForkJoinPoolReferenceService rt = new ForkJoinPoolReferenceService(files.get(0), i);
rt.fork();
result = result + rt.join();

// 方式二 : submit 方式
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(rt);
result = result + forkJoinTask.get();

三 . ForkJoinTask 用法

ForkJoinTask:代表fork/join里面任务类型,我们一般用它的两个子类RecursiveTask、RecursiveAction。这两个区别在于RecursiveTask任务是有返回值,RecursiveAction没有返回值。任务的处理逻辑包括任务的切分都集中在compute()方法里面。

四 . ForkJoinPool 线程池

作用 : ForkJoinPool为来自非ForkJoinTask客户端的提交提供入口点,以及管理和监视操作,最原始的任务都要交给它才能处理 .

主要功能包括 :

  • 负责控制整个fork/join有多少个workerThread,workerThread的创建,激活都是由它来掌控。
  • 负责workQueue队列的创建和分配,每当创建一个workerThread,它负责分配相应的workQueue。
  • 把接到的活都交给workerThread去处理,它可以说是整个frok/join的容器。

备注 :
ForkJoinPool不同于其他类型的ExecutorService,主要是因为它使用了窃取工作:池中的所有线程都试图找到并执行提交到池中的任务和/或其他活动任务创建的任务(如果没有工作,最终会阻塞等待工作)。

ForkJoinPool 基础用法

1
2
3
4
5
java复制代码ForkJoinPool forkJoinPool = new ForkJoinPool();
// 这是一个继承 ForkJoinTask 的类
ForkJoinPoolService countTask = new ForkJoinPoolService(1, 200);
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(countTask);
System.out.println(forkJoinTask.get());

当大多数任务衍生出其他子任务时,以及当许多小任务从外部客户端提交到池时,这使得高效处理成为可能。

特别是当在构造函数中将asyncMode设置为true时,ForkJoinPool s也可能适合用于从未连接的事件风格任务。

ForkJoin 前知识点 : ForkJoinWorkerThread

ForkJoinWorkerThread 为 fork/join里面真正干活的”工人”,本质是一个线程 ,其里面有一个ForkJoinPool.WorkQueue的队列存放着它要干的活,接活之前它要向ForkJoinPool注册(registerWorker),拿到相应的workQueue。然后就从workQueue里面拿任务出来处理。

ForkJoinWorkerThread 依附于ForkJoinPool而存活,如果ForkJoinPool的销毁了,它也会跟着结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码
// Node 1 : 内部属性 , 一个是当前工作线程池
final ForkJoinPool pool; // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

// Node 2 : WorkQueue 对象 , WorkQueue 是 ForkJoinPool 内部类
// 1 支持工作窃取和外部任务提交的队列
// 2 可以避免多个工作队列实例或多个队列数组共享缓存线
// 3 双端队列
static final class WorkQueue {
// TODO : 后续有必要会深入了解该类
}

// Node 3 : run 方法
Step 1 : 判断workQueue 是否为空
Step 2 : pool.runWorker(workQueue) 将 workQueue 加入 pool 池

// Node 4 : InnocuousForkJoinWorkerThread
private static final ThreadGroup innocuousThreadGroup =createThreadGroup();
?- 正如之前猜测的 , 线程组果然用在了这里

ForkJoinPool 代码深入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
java复制代码
// Node 1 : 内部构建线程方式
static final class DefaultForkJoinWorkerThreadFactory
M- ForkJoinWorkerThread newThread(ForkJoinPool pool)
?- 通过 new ForkJoinWorkerThread(pool) 返回新 Thread

// Node 2 : 内部属性
ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
?- 用于创建 ForkJoinWorkerThread ,可以被重写
RuntimePermission modifyThreadPermission;
?- 该对象用于控制启用和杀掉线程的权限
static final ForkJoinPool common;
?- 公共静态池
volatile long ctl;
?- 主要用于判断状态


// Node 3 : 默认属性
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L;
?- 线程的初始超时值(以纳秒为单位)
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;
?- 空闲超时时间
private static final int DEFAULT_COMMON_MAX_SPARES = 256
?- 在静态初始化期间commonMaxSpares的初始值
private static final int SPINS = 0
?- 在阻塞前自旋等待的次数




// Node 4 : 主要方法
> tryAddWorker :
主要1 :add = U.compareAndSwapLong(this, CTL, c, nc); CAS 判断状态
主要2 : createWorker(); 创建工作

> WorkQueue registerWorker(ForkJoinWorkerThread wt)
主要1 : WorkQueue w = new WorkQueue(this, wt);
主要2 : 推入 workQueues

> runWorker : 运行一个 worker
重点1 : t = scan(w, r) : 扫描并试图窃取一个顶级任务
重点2 : w.runTask(t); : 执行给定的任务和任何剩余的本地任务

> scan : 窃取逻辑
// TODO

> <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)


// Node 5 :构造函数

public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode)
- parallelism : 可并行级别 , 即可并行运行的线程数量
- factory : 线程工厂
- handler : 异常捕获处理器。当执行的任务中出现异常,并从任务中被抛出时,就会被handler捕获
- asyncMode : 工作模式类型 , 存在于队列中的待执行任务,即可以使用先进先出的工作模式,也可以使用后进先出的工作模式

总结

总体来说还是不够深入 , 包括其中的性能 , invoke 实际上都还没有测试 , 实际上 ForkJoinPool 源码深入都不到一成 , 但是看源码看的有点头疼了 ,先这样了 , 后续会尽力把他完善清楚

参考与感谢

1
2
3
java复制代码[芋道源码](http://www.iocoder.cn/JUC/sike/aqs-3/)

[死磕系列](http://cmsblogs.com/?cat=151)

了解更多

Fork / Join 的性能问题

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

正则表达式模式(特殊符号和字符) 正则表达式

发表于 2021-03-20

正则表达式

特殊符号和字符

纯手打,就当熟悉了。

使用则已匹配符号匹配多个正则表达式

表示择一匹配的管道符号(|),也就是键盘上的竖线,表示一个“从多个模式中选择其一”的操作。它用于分割不同的正则表达式。例如,在下面的表格中,左边是一些运用择一匹配的模式,右边是左边相应的模式所能匹配的字符。

正则表达式模式 匹配的字符串
at home
r2d2 c3po
bat bet

匹配任意单个字符

**点号或者句号(.)**符号匹配除了换行符\n以外的任何字符(Python正则表达式有一个编译标记[S或者BOTALL],该标记能够推翻这个限制,使点号能够匹配换行符)。无论字母、数字、空格(并不包括“\n”换行符)、可打印字符、不可打印字符、不可打印字符,还是一个符号,使用点号都能够匹配它们。

正则表达式模式 匹配的字符串
f.o 匹配在字母“f”和“o”之间的任意一个字符:例如fao、f9o、f#o
.. 任意两个字符
.end 匹配在字符串end之前的任意一个字符

从字符串起始或者结尾或者单词边界匹配

如果要匹配字符串的开始位置,就必须使用脱字符(^)或者特殊字符\A(反斜线和大写字母A)。同样,美元符号($)或者\Z将用于匹配字符串的末尾位置。

正则表达式模式 匹配的字符串
^From 任何以From作为起始的字符串
/bin/tcsh$ 任何以/bin/tcsh作为结尾的字符串
^Subject:hi$ 任何由单独的字符串Subject:hi构成的字符串
正则表达式模式 匹配的字符串
the 任何包含the的字符串
\bthe 任何以the开始的字符串
\bthe\b 仅仅匹配单词the
\Bthe 任何包含但并不以the作为起始的字符串

如果想要逐字匹配这些字符中的任何一个(或者全部),就必须使用反斜线进行转义。例如,如果你想要匹配任何以美元符号结尾的字符串,一个可行的正则表达式方案就是使用模式.*$$。

特殊字符\b和\B可以用来匹配字符边界。

创建字符集

该正则表达式能够匹配一对方括号中包含的任何字符。下面为一些示例。

正则表达式模式 匹配的字符串
b[aeiu]t bat、bet、bit、but
[cr] [23] [dp] [o2] 一个包含四个字符的字符串,第一个字符是“c”或“r”,然后是“2”或“3”,后面是“d”或“p”,最后要么是“o”要么是“2”。例如,c2do、r3p2、r2d2、c3po等

限定范围和否定

除了单字符以外,字符集还支持匹配指定的字符范围。方括号中两个符号中间用连字符(-)连接,用于指定一个字符的范围;例如,A-Z、a-z或者0-9分别用于表示大写字母、小写字母和数值数字。

正则表达式模式 匹配的字符串
z.[0-9] 字母“z”后面跟着任何一个字符,然后跟着一个数字
[r-u] [env-y] [us] 字母“r”,“s”,“t”或者“u”后面跟着“e”、“n”、“v”、“w”、“x”或者“y”,然后跟着“u”或者“s”
[^aeiou] 一个非元音字符
[^\t\n] 不匹配制表符或者\n
[“-a] 在一个ASCII系统中,所有字符都位于“”和“a”之间,即34-97之间

使用闭包操作符实现存在性和频数匹配

**加号(+)**操作符将匹配一次或者多次出现的正则表达式(也叫做正闭包操作符),**问号(?)**操作符将匹配零次或者一次出现的正则表达式。

正则表达式模式 匹配的字符串
[dn]ot? 字母“d”或者“n”,后面跟着一个“o”,然后是最多一个“t”,例如,do、no、dot、not
0?[1-9] 任何数值数字,它可能前置一个“0”,例如,匹配一系列数(表示从1~9月的数值),不管是一个还是两个数字
[0-9]{15,16} 匹配15或者16个数字
</? [ ^ >]+> 匹配全部有效的(和无效的)HTML标签
[KQRBNP] [a-h] [1-8]-[a-h] [1-8] 在“长代数”标记法,表示国际象棋合法的棋盘移动(仅移动,不包括吃子和将军)。即“K”、“Q”、“R”、“B”、“N”或“P”等字母后面加上“a1”~“h8”之间的棋盘坐标。前面的坐标表示从哪里开始走棋,后面的坐标代表走到哪个位置(空格)上

表示字符集的特殊字符

d表示匹配任何十进制数字。另一个特殊字符(\w)能够用于表示全部字母数字的字符集,\s可以用来表示空格字符。\D表示任何非十进制数。

正则表达式模式 匹配的字符串
\w+-\d+ 一个由字母数字组成的字符串和一串由一个连字符分隔的数字
[A-Za-z]\w* 第一个字符是字母;其余字符(如果存在)可以是字母或者数字(几乎等价于Python中的有效标识符)
\d{3}-\d{3}-\d{4} 美国电话号码的格式,前面是区号前缀,例如800-555-1212
\w+@\w+\ .com 以XXX@YYY.com格式表示的简单电子邮件地址

使用圆括号指定分组

一对圆括号可以实现以下任意一个(或者两个)功能:

​ ·对正则表达式进行分组;

​ ·匹配子组。

正则表达式模式 匹配的字符串
\d+(.\d*) 表示简单浮点数的字符串;也就是说,任何十进制数字,后面可以接一个小数点和零个或者多个十进制数字,例如“0.004”、“2”、“75.”等
(Mr?s?.)?[A-Z] [a-z]*[A-Za-z-]+ 名字和姓氏,以及对名字的限制(如果有,首字母必须大写,后续字母小写),全名前可以有可选的“Mr.”、“Mrs.”、“Ms.”或者“M.”作为称谓,以及灵活可选的姓氏,可以有多个单词、横线以及大写字母

扩展表示法

以问号开始(?…)

正则表达式模式 匹配的字符串
(?:\w+.)* 以句点作为结尾的字符串,例如“google.”、“twitter.”、“facebook.”,但是这些匹配不会保存下来供后续的使用和数据检索
(?#comment) 此处并不做匹配,只是作为注释
(?=.com) 如果一个字符串后面跟着“.com”才做匹配操作,并不使用任何目标字符串
(?!.net) 如果一个字符串后面不是跟着“.net”才做匹配操作
(?<=800-) 如果字符串之前为“800-”才做匹配,假定为电话号码,同样,并不使用任何输入字符串
(?<!192.168.) 如果一个字符串之前不是“192.168.”才做匹配操作,假定用于过滤掉一组C类IP地址
(?(1)y x)

参考资料:Python核心编程(第3版)【美】Wesley Chun著 孙波翔 李斌 李晗 译(中国工信出版社、人民邮电出版社)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

docker-compose搭建kafka集群及spring

发表于 2021-03-20

docker搭建kafka集群(基于centos7)

1 docker安装

1
2
3
4
5
6
7
8
9
10
arduino复制代码yum remove docker \
docker-client \
docker-client-latest \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-selinux \
docker-engine-selinux \
docker-engine

1.1 安装docker 源

1
2
3
4
5
6
arduino复制代码yum install -y yum-utils device-mapper-persistent-data lvm2

yum-config-manager \
--add-repo \
http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
yum install docker-ce-18.03.0.ce

1.2 安装docker所需工具

1
kotlin复制代码yum install -y yum-utils device-mapper-persistent-data lvm2

1.3 安装指定版本docker

1
复制代码yum install -y docker-ce-18.09.9-3.el7

1.4 启动docker

1
bash复制代码systemctl enable docker && systemctl start docker

1.5 因国内下载docker镜像存在问题,需要配置下docker镜像源

1
2
3
4
5
bash复制代码vi /etc/docker/daemon.json 
## daemon.json 内容如下(daemon.json 初始不存在)
{
"registry-mirrors": ["https://alzgoonw.mirror.aliyuncs.com","http://hub-mirror.c.163.com"]
}

1.6 配置完docker 镜像后记得重启docker

1
复制代码systemctl restart docker

2 docker-compose 安装

2.1 下载docker-compose

1
bash复制代码curl -L "https://github.com/docker/compose/releases/download/1.23.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

2.2 添加docker-compose 执行命令

1
bash复制代码chmod +x /usr/local/bin/docker-compose

3 使用docker-compose安装docker集群

3.1环境配置

3.1.1 kafka目录
1
2
3
arduino复制代码mkdir kafka1
mkdir kafka2
mkdir kafka3
3.1.2 zookeeper集群目录
1
2
3
arduino复制代码mkdir zookeeper1
mkdir zookeeper2
mkdir zookeeper3
3.1.3 zookeeper其他目录及配置
1
2
3
4
5
bash复制代码mkdir zooConfig
cd zooConfig
mkdir zoo1
mkdir zoo2
mkdir zoo3

3.1.4在zoo1,zoo2,zoo3中分别创建myid文件,并写入分别写入id数字,如zoo1中的myid中写入1

1
2
3
bash复制代码echo 1 ./zoo1/myid
echo 2 ./zoo1/myid
echo 3 ./zoo1/myid

3.1.5 创建zoo配置文件zoo.cfg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
ini复制代码# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/data
dataLogDir=/datalog
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1
server.1= 172.23.0.11:2888:3888
server.2= 172.23.0.12:2888:3888
server.3= 172.23.0.13:2888:3888

3.2 创建docker容器网络

docker network create --driver bridge --subnet 172.23.0.0/25 --gateway 172.23.0.1 zookeeper_network

3.3 docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
yaml复制代码version: '2'

services:

zoo1:
image: zookeeper:3.4.14
restart: always
container_name: zoo1
hostname: zoo1
ports:
- "2181:2181"
volumes:
- "./zooConfig/zoo.cfg:/conf/zoo.cfg"
- "/disk/docker/zookeeper1/data:/data"
- "/disk/docker/zookeeper1/datalog:/datalog"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
networks:
default:
ipv4_address: 172.23.0.11

zoo2:
image: zookeeper:3.4.14
restart: always
container_name: zoo2
hostname: zoo2
ports:
- "2182:2181"
volumes:
- "./zooConfig/zoo.cfg:/conf/zoo.cfg"
- "/disk/docker/zookeeper2/data:/data"
- "/disk/docker/zookeeper2/datalog:/datalog"
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
networks:
default:
ipv4_address: 172.23.0.12

zoo3:
image: zookeeper:3.4.14
restart: always
container_name: zoo3
hostname: zoo3
ports:
- "2183:2181"
volumes:
- "./zooConfig/zoo.cfg:/conf/zoo.cfg"
- "/disk/docker/zookeeper3/data:/data"
- "/disk/docker/zookeeper3/datalog:/datalog"
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
networks:
default:
ipv4_address: 172.23.0.13

kafka1:
image: wurstmeister/kafka:2.12-2.0.1
restart: always
container_name: kafka1
hostname: kafka1
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.18.255.9:9092
KAFKA_ADVERTISED_HOST_NAME: kafka1
KAFKA_HOST_NAME: kafka1
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_ADVERTISED_PORT: 9092
KAFKA_BROKER_ID: 0
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
volumes:
- /etc/localtime:/etc/localtime
- "/disk/docker/kafka1/logs:/kafka"
links:
- zoo1
- zoo2
- zoo3
networks:
default:
ipv4_address: 172.23.0.14

kafka2:
image: wurstmeister/kafka:2.12-2.0.1
restart: always
container_name: kafka2
hostname: kafka2
ports:
- 9093:9092
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.18.255.9:9093
KAFKA_ADVERTISED_HOST_NAME: kafka2
KAFKA_HOST_NAME: kafka2
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_ADVERTISED_PORT: 9093
KAFKA_BROKER_ID: 1
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
volumes:
- /etc/localtime:/etc/localtime
- "/disk/docker/kafka2/logs:/kafka"
links:
- zoo1
- zoo2
- zoo3
networks:
default:
ipv4_address: 172.23.0.15

kafka3:
image: wurstmeister/kafka:2.12-2.0.1
restart: always
container_name: kafka3
hostname: kafka3
ports:
- 9094:9092
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.18.255.9:9094
KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_HOST_NAME: kafka3
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_ADVERTISED_PORT: 9094
KAFKA_BROKER_ID: 2
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
volumes:
- /etc/localtime:/etc/localtime
- "/disk/docker/kafka3/logs:/kafka"
links:
- zoo1
- zoo2
- zoo3
networks:
default:
ipv4_address: 172.23.0.16

kafka-manager:
image: hlebalbau/kafka-manager:1.3.3.22
restart: always
container_name: kafka-manager
hostname: kafka-manager
ports:
- 9000:9000
links:
- kafka1
- kafka2
- kafka3
- zoo1
- zoo2
- zoo3
environment:
ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181
KAFKA_BROKERS: kafka1:9092,kafka2:9093,kafka3:9094
APPLICATION_SECRET: letmein
KAFKA_MANAGER_AUTH_ENABLED: "true"
KAFKA_MANAGER_USERNAME: "admin"
KAFKA_MANAGER_PASSWORD: "admin"
KM_ARGS: -Djava.net.preferIPv4Stack=true
networks:
default:
ipv4_address: 172.23.0.10

networks:
default:
external:
name: zookeeper_network

注意:

1.网上不少配置JMX_PORT端口,因启动就会出错,我这里没有配置,如必须要配置,见附录1kafka配置JMX_PORT出错解决方案;

2.将配置"PLAINTEXT://172.18.255.9:9094" 中的ip换成自己的机器的ip

3.4 一定要记得开启防火墙

1
2
3
4
5
6
7
8
9
10
css复制代码## 开启kafka-manager外网访问端口
firewall-cmd --zone=public --add-port=9000/tcp --permanent
##这三个为kafka集群节点互相通信的端口,服务器端需要开启,否则各个节点间无法互通
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --zone=public --add-port=9093/tcp --permanent
firewall-cmd --zone=public --add-port=9094/tcp --permanent
## 重启防火墙
firewall-cmd --reload
## 最好重启下docker
systemctl restart docker

3.5 启动kafka集群

3.5.1 启动集群

docker-compose -f docker-compose.yml up -d

image.png

3.5.2 停止集群

docker-compose -f docker-compose.yml stop

3.6 kafka集群检查

3.6.1 查看zookeeper集群是否正常
1
2
bash复制代码docker exec -it zoo1 bash
bin/zkServer.sh status
3.6.2 创建topic
1
2
3
4
5
sql复制代码docker exec -it kafka1 bash
kafka-topics.sh --create --zookeeper zoo1:2181 --replication-factor 1 --partitions 3 --topic test001
kafka-topics.sh --list --zookeeper zoo1:2181
kafka-topics.sh --list --zookeeper zoo2:2181
kafka-topics.sh --list --zookeeper zoo3:2181
3.6.3 生产消息

kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test001

image.png
下面两条警告日志不影响使用,实际是连通状态的

3.6.3 消费消息

kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic test001 --from-beginning

image.png

3.7 kafka-manager访问

3.7.1 浏览器打开访问

http://你的虚拟机ip:9000/

image.png

3.7.2 配置你创建的集群

image.png

3.7.2 下面就可以查看你的集群了

image.png

3 spring-boot连接kafka集群

3.1 application.yml配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
yaml复制代码spring:
application:
name: kafka-service
kafka:
# 指定kafka 代理地址,可以多个
bootstrap-servers: 192.168.207.82:9092,192.168.207.82:9093,192.168.207.82:9094
producer:
retries: 0
# 每次批量发送消息的数量
batch-size: 16384
# 缓存容量
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 指定默认消费者group id
group-id: consumer-tutorial
auto-commit-interval: 100
auto-offset-reset: earliest
enable-auto-commit: true
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 指定listener 容器中的线程数,用于提高并发量
listener:
concurrency: 3

3.2 启动类

1
2
3
4
5
6
7
less复制代码@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
@EnableEurekaClient
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
}

3.3 用了监听启动观察kafka生产及消费情况,小伙伴们可以单测手动调试

3.3.1 KafkaProducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
less复制代码@Component
@EnableScheduling
public class KafkaProducer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 定时任务
*/
@Scheduled(cron = "00/1 * * * * ?")
public void send(){
String message = UUID.randomUUID().toString();
ListenableFuture<?> future = kafkaTemplate.send("test003","kafka message test");
String message2 = "第二种类的订阅消息发送";
ListenableFuture<?> future2 = kafkaTemplate.send("test002",message2);
future.addCallback(o -> System.out.println("send-消息发送成功:" + message), throwable -> System.out.println("消息发送失败:" + message));
future2.addCallback(o -> System.out.println("send-消息发送成功:" + message2), throwable -> System.out.println("消息发送失败:" + message2));
}
}
3.3.1 KafkaConsumer
1
2
3
4
5
6
7
8
9
10
11
12
13
typescript复制代码@Component
public class KafkaConsumer {

@KafkaListener(topics = {"test001"})
public void receive(String message){
System.out.println("test001--消费消息:" + message);
}

@KafkaListener(topics = {"test002"})
public void receive2(String message){
System.out.println("test002--消费消息:" + message);
}
}

image.png
以上spring-boot项目就自己搭建吧,这里不赘述了

踩坑不易,欢迎大家评阅

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

TiDB Operator 源码阅读 (二) Operato

发表于 2021-03-20

在上一篇文章中我们讨论了 TiDB Operator 的应用场景,了解了 TiDB Operator 可以在 Kubernetes 集群中管理 TiDB 的生命周期。可是,TiDB Operator 的代码是怎样运行起来的?TiDB 组件的生命周期管理的逻辑又是如何编排的呢?我们将从 Operator 模式的视角,介绍 TiDB Operator 的代码实现,在这篇文章中我们主要讨论 controller-manager 的实现,介绍从代码入口到组件的生命周期事件被触发中间的过程。

Operator模式的演化: 从 Controller 模式到 Operator 模式

TiDB Operator 参考了 kube-controller-manager 的设计,了解 Kubernetes 的设计有助于了解 TiDB Operator 的代码逻辑。Kubernetes 内的 Resources 都是通过 Controller 实现生命周期管理的,例如 Namespace、Node、Deployment、Statefulset 等等,这些 Controller 的代码在 kube-controller-manager 中实现并由 kube-controller-manager 启动后调用。

为了支持用户自定义资源的开发需求,Kubernetes 社区基于上面的开发经验,提出了 Operator 模式。Kubernetes 支持通过 CRD(CustomResourceDefinition)来描述自定义资源,通过 CRD 创建 CR(CustomResource)对象,开发者实现相应 Controller 处理 CR 及关联资源的变更的需求,通过比对资源最新状态和期望状态,逐步完成运维操作,实现最终资源状态与期望状态一致。通过定义 CRD 和实现对应 Controller,无需将代码合并到 Kubernetes 中编译使用, 即可完成一个资源的生命周期管理。

TiDB Operator 的 Controller Manager

TiDB Operator 使用 tidb-controller-manager 管理各个 CRD 的 Controller。从 cmd/controller-manager/main.go 开始,tidb-controller-manager 首先加载了 kubeconfig,用于连接 kube-apiserver,然后使用一系列 NewController 函数,加载了各个 Controller 的初始化函数。

1
2
3
4
5
6
7
8
9
go复制代码controllers := []Controller{
tidbcluster.NewController(deps),
dmcluster.NewController(deps),
backup.NewController(deps),
restore.NewController(deps),
backupschedule.NewController(deps),
tidbinitializer.NewController(deps),
tidbmonitor.NewController(deps),
}

在 Controller 的初始化函数过程中,会初始化一系列 Informer,这些 Informer 主要用来和 kube-apiserver 交互获取 CRD 和相关资源的变更。以 TiDBCluster 为例,在初始化函数 NewController 中,会初始化 Informer 对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
go复制代码tidbClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueTidbCluster,
UpdateFunc: func(old, cur interface{}) {
c.enqueueTidbCluster(cur)
},
DeleteFunc: c.enqueueTidbCluster,
})
statefulsetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addStatefulSet,
UpdateFunc: func(old, cur interface{}) {
c.updateStatefulSet(old, cur)
},
DeleteFunc: c.deleteStatefulSet,
})

Informer 中添加了处理添加,更新,删除事件的 EventHandler,把监听到的事件涉及到的 CR 的 Key 加入队列。

初始化完成后启动 InformerFactory 并等待 cache 同步完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码informerFactories := []InformerFactory{
deps.InformerFactory,
deps.KubeInformerFactory,
deps.LabelFilterKubeInformerFactory,
}
for _, f := range informerFactories {
f.Start(ctx.Done())
for v, synced := range f.WaitForCacheSync(wait.NeverStop) {
if !synced {
klog.Fatalf("error syncing informer for %v", v)
}
}
}

随后 tidb-controller-manager 会调用各个 Controller 的 Run 函数,开始循环执行 Controller 的内部逻辑。

1
2
3
4
5
go复制代码// Start syncLoop for all controllers
for _,controller := range controllers {
c := controller
go wait.Forever(func() { c.Run(cliCfg.Workers,ctx.Done()) },cliCfg.WaitDuration)
}

以 TiDBCluster Controller 为例,Run 函数会启动 worker 处理工作队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
go复制代码// Run runs the tidbcluster controller.
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Info("Starting tidbcluster controller")
defer klog.Info("Shutting down tidbcluster controller")

for i := 0; i < workers; i++ {
go wait.Until(c.worker, time.Second, stopCh)
}

<-stopCh
}

Worker 会调用 processNextWorkItem 函数,弹出队列的元素,然后调用 sync 函数进行同步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
go复制代码// worker runs a worker goroutine that invokes processNextWorkItem until the the controller's queue is closed
func (c *Controller) worker() {
for c.processNextWorkItem() {
}
}

// processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never
// invoked concurrently with the same key.
func (c *Controller) processNextWorkItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
if err := c.sync(key.(string)); err != nil {
if perrors.Find(err, controller.IsRequeueError) != nil {
klog.Infof("TidbCluster: %v, still need sync: %v, requeuing", key.(string), err)
} else {
utilruntime.HandleError(fmt.Errorf("TidbCluster: %v, sync failed %v, requeuing", key.(string), err))
}
c.queue.AddRateLimited(key)
} else {
c.queue.Forget(key)
}
return true
}

Sync 函数会根据 Key 获取对应的 CR 对象,例如这里的 TiDBCluster 对象,然后对这个 TiDBCluster 对象进行同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
go复制代码// sync syncs the given tidbcluster.
func (c *Controller) sync(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing TidbCluster %q (%v)", key, time.Since(startTime))
}()

ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
tc, err := c.deps.TiDBClusterLister.TidbClusters(ns).Get(name)
if errors.IsNotFound(err) {
klog.Infof("TidbCluster has been deleted %v", key)
return nil
}
if err != nil {
return err
}

return c.syncTidbCluster(tc.DeepCopy())
}

func (c *Controller) syncTidbCluster(tc *v1alpha1.TidbCluster) error {
return c.control.UpdateTidbCluster(tc)
}

syncTidbCluster 函数调用 updateTidbCluster 函数,进而调用一系列组件的 Sync 函数实现 TiDB 集群管理的相关工作。在 pkg/controller/tidbcluster/tidb_cluster_control.go 的 updateTidbCluster 函数实现中,我们可以看到各个组件的 Sync 函数在这里调用,在相关调用代码注释里描述着每个 Sync 函数执行的生命周期操作事件,可以帮助理解每个组件的 Reconcile 需要完成哪些工作,例如 PD 组件:

1
2
3
4
5
6
7
8
9
10
11
go复制代码// works that should do to making the pd cluster current state match the desired state:
// - create or update the pd service
// - create or update the pd headless service
// - create the pd statefulset
// - sync pd cluster status from pd to TidbCluster object
// - upgrade the pd cluster
// - scale out/in the pd cluster
// - failover the pd cluster
if err := c.pdMemberManager.Sync(tc); err != nil {
return err
}

我们将在下篇文章中介绍组件的 Sync 函数完成了哪些工作,TiDBCluster Controller 是怎样完成各个组件的生命周期管理。

小结

通过这篇文章,我们了解到 TiDB Operator 如何从 cmd/controller-manager/main.go 初始化运行和如何实现对应的 Controller 对象,并以 TidbCluster Controller 为例介绍了 Controller 从初始化到实际工作的过程以及 Controller 内部的工作逻辑。通过上面的代码运行逻辑的介绍,我们清楚了组件的生命周期控制循环是如何被触发的,问题已经被缩小到如何细化这个控制循环,添加 TiDB 特殊的运维逻辑,使得 TiDB 能在 Kubernetes 上部署和正常运行,完成其他的生命周期操作。我们将在下一篇文章中讨论如何细化这个控制循环,讨论组件的控制循环的实现。

我们介绍了社区对于 Operator 模式的探索和演化。对于一些希望使用 Operator 模式开发资源管理系统的小伙伴,Kubernetes 社区中提供了 Kubebuilder 和 Operator Framework 两个 Controller 脚手架项目。相比于参考 kubernetes/sample-controller 进行开发,Operator 脚手架基于 kubernetes-sigs/controller-runtime 生成 Controller 代码,减少了许多重复引入的模板化的代码。开发者只需要专注于完成 CRD 对象的控制循环部分即可,而不需要关心控制循环启动之前的准备工作。

如果有什么好的想法,欢迎通过 #sig-k8s 或 pingcap/tidb-operator 参与 TiDB Operator 社区交流。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

必知必会的十个高级 SQL 概念

发表于 2021-03-20

公众号:Java小咖秀,网站:javaxks.com

作者 :Dimitris Poulopoulos ,链接:towardsdatascience.com/ten-advance…

随着数据量持续增长,对合格数据专业人员的需求也会增长。具体而言,对 SQL 流利的专业人士的需求日益增长,而不仅仅是在初级层面。

因此,Stratascratch 的创始人 Nathan Rosidi 以及我觉得我认为 10 个最重要和相关的中级到高级 SQL 概念。

那个说,我们走了!

1. 常见表表达式(CTEs)

如果您想要查询子查询,那就是 CTEs 施展身手的时候 - CTEs 基本上创建了一个临时表。

使用常用表表达式(CTEs)是模块化和分解代码的好方法,与您将文章分解为几个段落的方式相同。

请在 Where 子句中使用子查询进行以下查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
sql复制代码SELECT 
name,
salary
FROM
People
WHERE
NAME IN ( SELECT DISTINCT NAME FROM population WHERE country = "Canada" AND city = "Toronto" )
AND salary >= (
SELECT
AVG( salary )
FROM
salaries
WHERE
gender = "Female")

这似乎似乎难以理解,但如果在查询中有许多子查询,那么怎么样?这就是 CTEs 发挥作用的地方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
vbnet复制代码with toronto_ppl as (
SELECT DISTINCT name
FROM population
WHERE country = "Canada"
AND city = "Toronto"
)
, avg_female_salary as (
SELECT AVG(salary) as avgSalary
FROM salaries
WHERE gender = "Female"
)
SELECT name
, salary
FROM People
WHERE name in (SELECT DISTINCT FROM toronto_ppl)
AND salary >= (SELECT avgSalary FROM avg_female_salary)

现在很清楚,Where 子句是在多伦多的名称中过滤。如果您注意到,CTE 很有用,因为您可以将代码分解为较小的块,但它们也很有用,因为它允许您为每个 CTE 分配变量名称(即 toronto_ppl 和 avg_female_salary)

同样,CTEs 允许您完成更高级的技术,如创建递归表。

2. 递归 CTEs.

递归 CTE 是引用自己的 CTE,就像 Python 中的递归函数一样。递归 CTE 尤其有用,它涉及查询组织结构图,文件系统,网页之间的链接图等的分层数据,尤其有用。

递归 CTE 有 3 个部分:

锚构件:返回 CTE 的基本结果的初始查询

递归成员:引用 CTE 的递归查询。这是所有与锚构件的联盟

停止递归构件的终止条件

以下是获取每个员工 ID 的管理器 ID 的递归 CTE 的示例:

1
2
3
4
5
6
7
8
9
10
11
sql复制代码with org_structure as (
SELECT id
, manager_id
FROM staff_members
WHERE manager_id IS NULL
UNION ALL
SELECT sm.id
, sm.manager_id
FROM staff_members sm
INNER JOIN org_structure os
ON os.id = sm.manager_id

3. 临时函数

如果您想了解有关临时函数的更多信息,请检查此项,但知道如何编写临时功能是重要的原因:

它允许您将代码的块分解为较小的代码块
它适用于写入清洁代码
它可以防止重复,并允许您重用类似于使用 Python 中的函数的代码。
考虑以下示例:

1
2
3
4
5
6
7
8
sql复制代码SELECT name
, CASE WHEN tenure < 1 THEN "analyst"
WHEN tenure BETWEEN 1 and 3 THEN "associate"
WHEN tenure BETWEEN 3 and 5 THEN "senior"
WHEN tenure > 5 THEN "vp"
ELSE "n/a"
END AS seniority
FROM employees

相反,您可以利用临时函数来捕获案例子句。

1
2
3
4
5
6
7
8
9
10
11
sql复制代码CREATE TEMPORARY FUNCTION get_seniority(tenure INT64) AS (
CASE WHEN tenure < 1 THEN "analyst"
WHEN tenure BETWEEN 1 and 3 THEN "associate"
WHEN tenure BETWEEN 3 and 5 THEN "senior"
WHEN tenure > 5 THEN "vp"
ELSE "n/a"
END
);
SELECT name
, get_seniority(tenure) as seniority
FROM employees

通过临时函数,查询本身更简单,更可读,您可以重复使用资历函数!

4. 使用 CASE WHEN 枢转数据

您很可能会看到许多要求在陈述时使用 CASE WHEN 的问题,这只是因为它是一种多功能的概念。如果要根据其他变量分配某个值或类,则允许您编写复杂的条件语句。

较少众所周知,它还允许您枢转数据。例如,如果您有一个月列,并且您希望为每个月创建一个单个列,则可以使用语句追溯数据的情况。

示例问题:编写 SQL 查询以重新格式化表,以便每个月有一个收入列。

Initial table:

id revenue month
1 8000 Jan
2 9000 Jan
3 10000 Feb
1 7000 Feb
1 6000 Mar

Result table:

id Jan_Revenue Feb_Revenue Mar_Revenue … Dec_Revenue
1 8000 7000 6000 … null
2 9000 null null … null
3 null 10000 null … null

5.EXCEPT vs NOT IN

除了几乎不相同的操作。它们都用来比较两个查询 / 表之间的行。所说,这两个人之间存在微妙的细微差别。

首先,除了过滤删除重复并返回不同的行与不在中的不同行。

同样,除了在查询 / 表中相同数量的列,其中不再与每个查询 / 表比较单个列。推荐:Java 面试练题宝典

6. 自联结

一个 SQL 表自行连接自己。你可能会认为没有用,但你会感到惊讶的是这是多么常见。在许多现实生活中,数据存储在一个大型表中而不是许多较小的表中。在这种情况下,可能需要自我连接来解决独特的问题。

让我们来看看一个例子。

示例问题:给定下面的员工表,写出一个 SQL 查询,了解员工的工资,这些员工比其管理人员工资更多。对于上表来说,Joe 是唯一一个比他的经理工资更多的员工。

Id Name Salary ManagerId
1 Joe 70000 3
2 Henry 80000 4
3 Sam 60000 NULL
4 Max 90000 NULL

Answer:

1
2
3
4
5
6
css复制代码SELECT  
a.Name as Employee
FROM
Employee as a
JOIN Employee as b on a.ManagerID = b.Id
WHERE a.Salary > b.Salary

7.Rank vs Dense Rank vs Row Number

它是一个非常常见的应用,对行和价值进行排名。以下是公司经常使用排名的一些例子:

  • 按购物,利润等数量排名最高值的客户
  • 排名销售数量的顶级产品
  • 以最大的销售排名顶级国家
  • 排名在观看的分钟数,不同观众的数量等观看的顶级视频。

在 SQL 中,您可以使用几种方式将 “等级” 分配给行,我们将使用示例进行探索。考虑以下 Query 和结果:

1
2
3
4
5
6
sql复制代码SELECT Name  
, GPA
, ROW_NUMBER() OVER (ORDER BY GPA desc)
, RANK() OVER (ORDER BY GPA desc)
, DENSE_RANK() OVER (ORDER BY GPA desc)
FROM student_grades

ROW_NUMBER()返回每行开始的唯一编号。当存在关系时(例如,BOB vs Carrie),ROW_NUMBER()如果未定义第二条标准,则任意分配数字。

Rank()返回从 1 开始的每行的唯一编号,除了有关系时,等级()将分配相同的数字。同样,差距将遵循重复的等级。

dense_rank()类似于等级(),除了重复等级后没有间隙。请注意,使用 dense_rank(),Daniel 排名第 3,而不是第 4 位()。

###v8. 计算 Delta 值
另一个常见应用程序是将不同时期的值进行比较。例如,本月和上个月的销售之间的三角洲是什么?或者本月和本月去年这个月是什么?

在将不同时段的值进行比较以计算 Deltas 时,这是 Lead()和 LAG()发挥作用时。

这是一些例子:

1
2
3
4
5
6
7
8
9
10
vbnet复制代码# Comparing each month's sales to last month  
SELECT month
, sales
, sales - LAG(sales, 1) OVER (ORDER BY month)
FROM monthly_sales
# Comparing each month's sales to the same month last year
SELECT month
, sales
, sales - LAG(sales, 12) OVER (ORDER BY month)
FROM monthly_sales

9. 计算运行总数

如果你知道关于 row_number()和 lag()/ lead(),这可能对您来说可能不会惊喜。但如果你没有,这可能是最有用的窗口功能之一,特别是当您想要可视化增长!

使用具有 SUM()的窗口函数,我们可以计算运行总数。请参阅下面的示例:

1
2
3
4
sql复制代码SELECT Month  
, Revenue
, SUM(Revenue) OVER (ORDER BY Month) AS Cumulative
FROM monthly_revenue

10. 日期时间操纵

您应该肯定会期望某种涉及日期时间数据的 SQL 问题。例如,您可能需要将数据分组组或将可变格式从 DD-MM-Yyyy 转换为简单的月份。

您应该知道的一些功能是:

  • 提炼
  • 日元
  • date_add,date_sub.
  • date_trunc.

示例问题:给定天气表,写一个 SQL 查询,以查找与其上一个(昨天)日期相比的温度较高的所有日期的 ID。

Id(INT) RecordDate(DATE) Temperature(INT)
1 2015-01-01 10
2 2015-01-02 25
3 2015-01-03 20
4 2015-01-04 30

Answer:

1
2
3
4
5
6
7
8
css复制代码SELECT  
a.Id
FROM
Weather a,
Weather b
WHERE
a.Temperature > b.Temperature
AND DATEDIFF(a.RecordDate, b.RecordDate) = 1

谢谢阅读!

就这样!我希望这有助于您在面试准备中 - 我相信,如果您知道这 10 个内部概念,那么在那里大多数 SQL 问题时,你会做得很好。

一如既往,祝你学习努力最好!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

多线程之间的通信

发表于 2021-03-19

wait()、notify()、notifyAll()

如果线程之间采用synchronized来保证线程安全,则可以利用wait()、notify()、notifyAll()来实现线程通信。这三个方法都不是Thread类中所声明的方法,而是Object类中声明的方法。原因是每个对象都拥有锁,所以让当前线程等待某个对象的锁,当然应该通过这个对象来操作。并且因为当前线程可能会等待多个线程的锁,如果通过线程来操作,就非常复杂了。另外,这三个方法都是本地方法,并且被final修饰,无法被重写。

wait()方法可以让当前线程释放对象锁并进入阻塞状态。notify()方法用于唤醒一个正在等待相应对象锁的线程,使其进入就绪队列,以便在当前线程释放锁后竞争锁,进而得到CPU的执行。notifyAll()用于唤醒所有正在等待相应对象锁的线程,使它们进入就绪队列,以便在当前线程释放锁后竞争锁,进而得到CPU的执行。

每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列。就绪队列存储了已就绪(将要竞争锁)的线程,阻塞队列存储了被阻塞的线程。当一个阻塞线程被唤醒后,才会进入就绪队列,进而等待CPU的调度。反之,当一个线程被wait后,就会进入阻塞队列,等待被唤醒。

await()、signal()、signalAll()

如果线程之间采用Lock来保证线程安全,则可以利用await()、signal()、signalAll()来实现线程通信。这三个方法都是Condition接口中的方法,该接口是在Java 1.5中出现的,它用来替代传统的wait+notify实现线程间的协作,它的使用依赖于 Lock。相比使用wait+notify,使用Condition的await+signal这种方式能够更加安全和高效地实现线程间协作。

Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition() 。 必须要注意的是,Condition 的 await()/signal()/signalAll() 使用都必须在lock保护之内,也就是说,必须在lock.lock()和lock.unlock之间才可以使用。事实上,await()/signal()/signalAll() 与wait()/notify()/notifyAll()有着天然的对应关系。即:Conditon中的await()对应Object的wait(),Condition中的signal()对应Object的notify(),Condition中的signalAll()对应Object的
notifyAll()。

BlockingQueue

Java 5提供了一个BlockingQueue接口,虽然BlockingQueue也是Queue的子接口,但它的主要用途并不是作为容器,而是作为线程通信的工具。BlockingQueue具有一个特征:当生产者线程试图向BlockingQueue中放入元素时,如果该队列已满,则该线程被阻塞;当消费者线程试图从BlockingQueue中取出元素时,如果该队列已空,则该线程被阻塞。

程序的两个线程通过交替向BlockingQueue中放入元素、取出元素,即可很好地控制线程的通信。线程之间需要通信,最经典的场景就是生产者与消费者模型,而BlockingQueue就是针对该模型提供的解决方案。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…702703704…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%