开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

实施sharding-jdbc,一些非常痛的注意点 不支持的

发表于 2021-03-28

原创:小姐姐味道(微信公众号ID:xjjdog),欢迎分享,转载请保留出处。任何不保留此声明的转载都是抄袭。

在《“分库分表” ?选型和流程要慎重,否则会失控》中,我们谈到处于驱动层的sharding-jdbc。开源做到这个水平,已经超棒了,不像tddl成了个太监。但还是有坑。

不过不能怪框架,毕竟有些sql,只有程序和鬼能懂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
xml复制代码<select id="getCodes" 
resultMap="BaseResultMap"
parameterType="java.util.Map">
<foreach collection="orderCodes"
index="index"
item="item"
open=""
separator="union all"
close="">
select
<include refid="Base_Column_List"/>
from order
where orderCode = #{item}
</foreach>
</select>

不支持的操作

分库分表后,就成为了一个阉割型的数据库。很多sql的特性是不支持的,需要使用其他手段改进。以下以3.0.0版本进行描述。

distinct

sharding-jdbc不支持distinct,单表可使用group by进行替代。多表联查可使用exists替代

1
2
3
4
sql复制代码select DISTINCT
a, b, c, d
from table
where df=0

改成

1
2
3
4
sql复制代码select a, b, c, d
from table
where df=0
group by a, b, c, d

having

sharding-jdbc不支持having,可使用嵌套子查询进行替代

union

sharding-jdbc不支持union(all),可拆分成多个查询,在程序拼接

关于子查询

sharding-jdbc不支持在子查询中出现同样的表,如
以下可以==>

1
sql复制代码SELECT COUNT(*) FROM (SELECT * FROM t_order o)

以下报错==>

1
sql复制代码SELECT COUNT(*) FROM (SELECT * FROM t_order o WHERE o.id IN (SELECT id FROM t_order WHERE status = ?))

由于归并的限制,子查询中包含聚合函数目前无法支持。

mybatis 注释

sharding-jdbc不支持sql中的<!-- – >注释,如必须使用则写在sql前,或使用/* */

不支持text字段

改为varchar,好几年的bug了,但是没改

case when

某些case when是不支持的,比如不在聚合函数中的case when,需要将这部分sql逻辑写到程序里。

case when不应该是DBA禁用的函数么?我们在填坑

一些奇怪的反应

这个是可以的

1
sql复制代码select  a-b from dual

但这个不可以…

1
sql复制代码select (a-b)c from dual

sharding 也不支持如下形式查询,解析紊乱

1
sql复制代码and (1=1 or 1=1)

关于分页

严禁无切分键的深分页!因为会对SQL进行以下解释,然后在内存运行。

1
sql复制代码select *  from a limit 10 offset 1000

=======>

1
sql复制代码Actual SQL:db0 ::: select *  from a limit 1010 offset 0

关于表名

表名需与sharding-jdbc配置一致,推荐均为小写。因为路由是放在hashmap里的,没有区分大小写…所以如果你的sql写错了会找不到。

配置冗余

每一张表都要配置路由信息才能够被正确解析,如果你库里的表太多,这个配置文件会膨胀的特别大,上千行也是有的。所以在yml中可以将配置文件分开。

1
makefile复制代码spring.profiles.include: sharding

如何扫多库

比如一些定时任务,需要遍历所有库。

方法1:遍历所有库

使用以下方式拿到真正的数据库列表

1
java复制代码Map<String, DataSource> map = ShardingDataSource.class.cast(dataSource).getDataSourceMap();

然后在每一个库上执行扫描逻辑。这种情况下无法使用mybaits,需要写原生jdbc

方法2:根据切分键遍历

此种方法会拿到一个切分键的列表,比如日期等。然后通过遍历这个列表执行业务逻辑。此种方法在列表特别大的时候执行会比较缓慢。

如何验证

分库分表很危险,因为一旦数据入错库,后续的修理很麻烦。所以刚开始可以将路由信息指向到源表,即:只验证SQL路由的准确性。等待所有的SQL路由都验证通过,再切换到真正的分库或者表。

确保能够打印SQL

1
arduino复制代码sharding.jdbc.config.sharding.props.sql.show: true

将sql打印到单独的文件(logback)

1
2
3
4
5
6
7
8
9
10
11
xml复制代码<appender name="SQL" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/sharding.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/backup/sharding.log.%d{yyyy-MM-dd}
</fileNamePattern>
<maxHistory>100</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${ENCODER_PATTERN}</pattern>
</encoder>
</appender>

写一些脚本进行SQL文件的验证。我这里有个通用的,你可以改下你的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
python复制代码import sys
import re
import getopt

def process(SQL):
one= "".join(line.strip().replace("\n", " ") for line in SQL)
place = [m.groups()[0] if m.groups()[0] else m.groups()[1] for m in re.finditer(r"[ ]+(\w+)[ ]*=[ ]*\?|(\?)", one)]

if len(place):
mat = re.search(r"::: \[\[(.*)\]\]", one)
if mat is not None:
vals = [str(i).strip() for i in str(mat.groups()[0]).split(',')]
if "splitKey" in place:
for i in range(len(place)):
part = place[i]
//这里写你的逻辑
else:
print("no splitKey", one)

SQL = []
def process_line(line):
global SQL
if "Actual SQL" in line:
SQL = []
SQL.append(line)
else:
if line.strip().endswith("]]"):
SQL.append(line)
process(SQL)
SQL = []
else:
SQL.append(line)

opts, args = getopt.getopt(sys.argv[1:], "bf")

for op, value in opts:
if op == "-b":
print("enter comman mode , such as 'python x.py -b sharding.log > result'")
with open(args[0], "rb") as f:
for line in f:
process_line(line)
elif op== "-f":
print("enter stream scroll mode , such as 'python x.py -f sharding.log '")
with open(args[0], "rb") as f:
f.seek(0,2)
while True:
last_pos = f.tell()
line = f.readline()
if line: process_line(line)

其他

你可能要经常切换路由,所以某些时候路由信息要放在云端能够动态修改。

哦对了,我这里还有一段开发阶段的验证代码,能让你快速验证SQL能否正确解析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
ini复制代码
@RunWith(SpringRunner.class)
@SpringBootTest(classes = App.class)

public class ShardingTest {
@Autowired
DataSource dataSource;

@Test
public void testGet() {
try {
Connection conn = dataSource.getConnection();
PreparedStatement stmt;
ResultSet rs;
String sql = new String(Files.readAllBytes(Paths.get("/tmp/a.sql")));

stmt = conn.prepareStatement(sql);
rs = stmt.executeQuery();
printRS(rs);

} catch (Exception ex) {
ex.printStackTrace();
}
}
public static void printRS(ResultSet rs) throws Exception {
ResultSetMetaData rsmd = rs.getMetaData();
int columnsNumber = rsmd.getColumnCount();
while (rs.next()) {
for (int i = 1; i <= columnsNumber; i++) {
if (i > 1) System.out.print(", ");
String columnValue = rs.getString(i);
System.out.print(columnValue + " " + rsmd.getColumnName(i));
}
System.out.println("");
}
}
}

开源一套以教学为目的系统,欢迎star:github.com/xjjdog/bcma…。它包含ToB复杂业务、互联网高并发业务、缓存应用;DDD、微服务指导。模型驱动、数据驱动。了解大型服务进化路线,编码技巧、学习Linux,性能调优。Docker/k8s助力、监控、日志收集、中间件学习。前端技术、后端实践等。主要技术:SpringBoot+JPA+Mybatis-plus+Antd+Vue3。

有SQL规范的团队是幸福的,分库分表简单的很。而动辄几百行,有各种复杂函数的SQL,就只能一步一个坑了。

话说回来,如果不是为了事务这个特性,为了支持老掉牙的业务,谁会用这分完后人不像人,鬼不像鬼的东西。

作者简介:小姐姐味道 (xjjdog),一个不允许程序员走弯路的公众号。聚焦基础架构和Linux。十年架构,日百亿流量,与你探讨高并发世界,给你不一样的味道。我的个人微信xjjdog0,欢迎添加好友,​进一步交流。​

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

工作四年,分享50个让你代码更好的小建议

发表于 2021-03-28

前言

工作了四年,看过很多思考不够深入的代码,因此写一下总结吧,50个让你代码更好的建议。其中的一些点,我以前的文章也写过啦,这次主要汇总一下。希望大家日常写代码多点思考,多点总结,加油!同时哪里有不对的,也望指出,感谢哈~

  • 公众号:捡田螺的小男孩
  • github地址:github.com/whx123/Java…

1. 仅是判断是否存在时,select count 比 select 具体的列,更好。

我们经常遇到类似的业务场景,如,判断某个用户userId是否是会员。

(反例): 一些小伙伴会这样实现,先查从用户信息表查出用户记录,然后再去判断是否是会员:

1
2
3
4
5
6
7
8
csharp复制代码<select id="selectUserByUserId" resultMap="BaseResultMap">
selct user_id , vip_flag from user_info where user_id =#{userId};
</select>

boolean isVip (String userId){
UserInfo userInfo = userInfoDAp.selectUserByUserId(userId);
return UserInfo!=null && "Y".equals(userInfo.getVipFlag())
}

(正例): 针对这种业务场景,其实更好的实现,是直接select count一下,如下:

1
2
3
4
5
6
7
8
csharp复制代码<select id="countVipUserByUserId" resultType="java.lang.Integer">
selct count(1) from user_info where user_id =#{userId} and vip_flag ='Y';
</select>

boolean isVip (String userId){
int vipNum = userInfoDAp.countVipUserByUserId(userId);
return vipNum>0
}

2. 复杂的if逻辑条件,可以调整顺序,让程序更高效

假设业务需求是这样:如果用户是会员,并且第一次登陆时,需要发一条通知的短信。假如没有经过思考,代码很可能直接这样写了。

1
2
3
scss复制代码if(isUserVip && isFirstLogin){
sendMsgNotify();
}

假设总共有5个请求进来,isUserVip通过的有3个请求,isFirstLogin通过的有1个请求。 那么以上代码,isUserVip执行的次数为5次,isFirstLogin执行的次数也是3次,如下:

image.png

如果调整一下isUserVip和isFirstLogin的顺序呢?

1
2
3
scss复制代码if(isFirstLogin && isUserVip ){
sendMsg();
}

isFirstLogin执行的次数是5次,isUserVip执行的次数是1次,如下:

image.png

如果你的isFirstLogin,判断逻辑只是select count 一下数据库表,isUserVip也是select count 一下数据库表的话,显然,把isFirstLogin放在前面更高效。

3. 写查询Sql的时候,只查你需要用到的字段,还有通用的字段,拒绝反手的select *

反例:

1
csharp复制代码select * from user_info where user_id =#{userId};

正例:

1
csharp复制代码 selct user_id , vip_flag from  user_info where user_id =#{userId};

理由:

  • 节省资源、减少网络开销。
  • 可能用到覆盖索引,减少回表,提高查询效率。

4. 优化你的程序,拒绝创建不必要的对象

如果你的变量,后面的逻辑判断,一定会被赋值;或者说,只是一个字符串变量,直接初始化字符串常量就可以了,没有必要愣是要new String().

反例:

1
ini复制代码String s = new String ("欢迎关注公众号:捡田螺的小男孩");

正例:

1
arduino复制代码String s=  "欢迎关注公众号:捡田螺的小男孩 ”;

5. 初始化集合时,指定容量

阿里的开发手册,也明确提到这个点:
image.png

假设你的map要存储的元素个数是15个左右,最优写法如下

1
2
3
4
ini复制代码 //initialCapacity = 15/0.75+1=21
Map map = new HashMap(21);
又因为hashMap的容量跟2的幂有关,所以可以取32的容量
Map map = new HashMap(32);

6.catch了异常,需要打印出具体的exception,方便更好定位问题

反例:

1
2
3
4
5
php复制代码try{
// do something
}catch(Exception e){
log.info("捡田螺的小男孩,你的程序有异常啦");
}

正例:

1
2
3
4
5
php复制代码try{
// do something
}catch(Exception e){
log.info("捡田螺的小男孩,你的程序有异常啦:",e); //把exception打印出来
}

理由:

  • 反例中,并没有把exception出来,到时候排查问题就不好查了啦,到底是SQl写错的异常还是IO异常,还是其他呢?所以应该把exception打印到日志中哦~

7. 打印日志的时候,对象没有覆盖Object的toString的方法,直接把类名打印出来了。

我们在打印日志的时候,经常想看下一个请求参数对象request是什么。于是很容易有类似以下这些代码:

1
2
3
vbscript复制代码publick Response dealWithRequest(Request request){
log.info("请求参数是:".request.toString)
}

打印结果如下:

1
css复制代码请求参数是:local.Request@49476842

这是因为对象的toString方法,默认的实现是“类名@散列码的无符号十六进制”。所以你看吧,这样子打印日志就没啥意思啦,你都不知道打印的是什么内容。

所以一般对象(尤其作为传参的对象),都覆盖重写toString()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
typescript复制代码class Request {

private String age;

private String name;

@Override
public String toString() {
return "Request{" +
"age='" + age + '\'' +
", name='" + name + '\'' +
'}';
}
}

publick Response dealWithRequest(Request request){
log.info("请求参数是:".request.toString)
}

打印结果如下:

1
ini复制代码请求参数是:Request{age='26', name='公众号:捡田螺的小男孩'}

8. 一个方法,拒绝过长的参数列表。

假设有这么一个公有方法,形参有四个。。。

1
2
3
arduino复制代码public void getUserInfo(String name,String age,String sex,String mobile){
// do something ...
}

如果现在需要多传一个version参数进来,并且你的公有方法是类似dubbo这种对外提供的接口的话,那么你的接口是不是需要兼容老版本啦?

1
2
3
4
5
6
7
8
9
10
arduino复制代码public void getUserInfo(String name,String age,String sex,String mobile){
// do something ...
}

/**
* 新接口调这里
*/
public void getNewUserInfo(String name,String age,String sex,String mobile,String version){
// do something ...
}

所以呢,一般一个方法的参数,一般不宜过长。过长的参数列表,不仅看起来不优雅,并且接口升级时,可能还要考虑新老版本兼容。如果参数实在是多怎么办呢?可以用个DTO对象包装一下这些参数呢~如下:

1
2
3
4
5
6
7
8
9
10
arduino复制代码public void getUserInfo(UserInfoParamDTO userInfoParamDTO){
// do something ...
}

class UserInfoParamDTO{
private String name;
private String age;
private String sex;
private String mobile;
}

用个DTO对象包装一下,即使后面有参数变动,也可以不用动对外接口了,好处杠杠的。

9. 使用缓冲流,减少IO操作

反例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码/**
* 公众号:捡田螺的小男孩
* @desc: 复制一张图片文件
*/
public class MainTest {
public static void main(String[] args) throws FileNotFoundException {
long begin = System.currentTimeMillis();
try (FileInputStream input = new FileInputStream("C:/456.png");
FileOutputStream output = new FileOutputStream("C:/789.png")) {
byte[] bytes = new byte[1024];
int i;
while ((i = input.read(bytes)) != -1) {
output.write(bytes,0,i);
}
} catch (IOException e) {
log.error("复制文件发生异常",e);
}
log.info("常规流读写,总共耗时ms:"+(System.currentTimeMillis() - begin));
}
}

运行结果:

1
makefile复制代码常规流读写,总共耗时ms:52

使用FileInputStream、FileOutputStream实现文件读写功能,是没有什么问题的。但是呢,可以使用缓冲流BufferedReader、BufferedWriter、BufferedInputStream、BufferedOutputStream等,减少IO次数,提高读写效率。

如果是不带缓冲的流,读取到一个字节或者字符的,就会直接输出数据了。而带缓冲的流,读取到一个字节或者字符时,先不输出,而是等达到缓冲区的最大容量,才一次性输出。

正例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码/**
* 公众号:捡田螺的小男孩
* @desc: 复制一张图片文件
*/
public class MainTest {
public static void main(String[] args) throws FileNotFoundException {
long begin = System.currentTimeMillis();
try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream("C:/456.png"));
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(new FileOutputStream("C:/789.png"))) {
byte[] bytes = new byte[1024];
int i;
while ((i = input.read(bytes)) != -1) {
output.write(bytes,0,i);
}
} catch (IOException e) {
log.error("复制文件发生异常",e);
}
log.info("总共耗时ms"+(System.currentTimeMillis() - begin));
}
}

运行结果:

1
makefile复制代码缓冲流读写,总共耗时ms:12

10. 优化你的程序逻辑,比如前面已经查到的数据,在后面的方法也用到的话,是可以把往下传参的,减少方法调用/查表

反例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
scss复制代码public Response dealRequest(Request request){

UserInfo userInfo = userInfoDao.selectUserByUserId(request.getUserId);
if(Objects.isNull(request)){
return ;
}

insertUserVip(request.getUserId);

}

private int insertUserVip(String userId){
//又查了一次
UserInfo userInfo = userInfoDao.selectUserByUserId(request.getUserId);
//插入用户vip流水
insertUserVipFlow(userInfo);
....
}

很显然,以上程序代码,已经查到 userInfo,然后又把userId传下去,又查多了一次。。。实际上,可以把userInfo传下去的,这样可以省去一次查表操作,程序更高效。

正例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scss复制代码public Response dealRequest(Request request){

UserInfo userInfo = userInfoDao.selectUserByUserId(request.getUserId);
if(Objects.isNull(request)){
return ;
}

insertUserVip(userInfo);
}

private int insertUserVip(UserInfo userInfo){
//插入用户vip流水
insertUserVipFlow(userInfo);
....
}

11. 不要为了方便,直接在代码中使用0,1等魔法值,应该要用enum枚举代替。

反例:

1
2
3
4
5
6
7
scss复制代码if("0".equals(userInfo.getVipFlag)){
//非会员,提示去开通会员
tipOpenVip(userInfo);
}else if("1".equals(userInfo.getVipFlag)){
//会员,加勋章返回
addMedal(userInfo);
}

正例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
arduino复制代码if(UserVipEnum.NOT_VIP.getCode.equals(userInfo.getVipFlag)){
//非会员,提示去开通会员
tipOpenVip(userInfo);
}else if(UserVipEnum.VIP.getCode.equals(userInfo.getVipFlag)){
//会员,加勋章返回
addMedal(userInfo);
}

public enum UserVipEnum {

VIP("1","会员"),
NOT_VIP("0","非会员"),:;

private String code;
private String desc;

UserVipEnum(String code, String desc) {
this.code = code;
this.desc = desc;
}
}

写代码的时候,不要一时兴起,就直接使用魔法值哈。使用魔法值,维护代码起来很难受的。

12. 当成员变量值不会改变时,优先定义为静态常量

反例:

1
2
3
4
arduino复制代码public class Task {
private final long timeout = 10L;
...
}

正例:

1
2
3
4
arduino复制代码public class Task {
private static final long TIMEOUT = 10L;
...
}

因为如果定义为static,即类静态常量,在每个实例对象中,它只有一份副本。如果是成员变量,每个实例对象中,都各有一份副本。显然,如果这个变量不会变的话,定义为静态常量更好一些。

13. 注意检验空指针,不要轻易相信业务,说正常逻辑某个参数不可能为空。

NullPointerException 在我们日常开发中非常常见,我们代码开发过程中,一定要对空指针保持灵敏的嗅觉。

主要有这几类空指针问题:

  • 包装类型的空指针问题
  • 级联调用的空指针问题
  • Equals方法左边的空指针问题
  • ConcurrentHashMap 类似容器不支持 k-v为 null。
  • 集合,数组直接获取元素
  • 对象直接获取属性

反例:

1
2
3
4
5
6
7
8
typescript复制代码public class NullPointTest {
public static void main(String[] args) {
String s = null;
if (s.equals("666")) { //s可能为空,会导致空指针问题
System.out.println("公众号:捡田螺的小男孩,干货满满");
}
}
}

14,捕获到的异常,不能忽略它,至少打点日志。

反例:

1
2
3
4
5
6
7
csharp复制代码public static void testIgnoreException() throws Exception {
try {
// 搞事情
} catch (Exception e) {
//捕获了异常,啥事情不做,日志也不打??
}
}

正例:

1
2
3
4
5
6
7
csharp复制代码public static void testIgnoreException() {
try {
// 搞事情
} catch (Exception e) {
log.error("异常了,联系开发小哥哥看看哈",e);
}
}

15. 采用Lambda表达式替换内部匿名类,使代码更优雅

JDK8出现了新特性-Lambda表达式。Lambda表达式不仅比匿名内部类更加优雅,并且在大多数虚拟机中,都是采用invokeDynamic指令实现,相对于匿名内部类,效率也更高

反例:

1
2
3
4
5
6
7
8
9
java复制代码  public void sortUserInfoList(List<UserInfo> userInfoList){
userInfoList.sort(new Comparator<UserInfo>() {
@Override
public int compare(UserInfo user1, UserInfo user2) {
Long userId1 = user1.getUserId();
Long userId2 = user2.getUserId();
return userId1.compareTo(userId2);
}});
}

正例:

1
2
3
4
5
6
7
ini复制代码   public void sortUserInfoList(List<UserInfo> userInfoList){
userInfoList.sort((user1, user2) -> {
Long userId1 = user1.getUserId();
Long userId2 = user2.getUserId();
return userId1.compareTo(userId2);
});
}

16. 通知类(如发邮件,有短信)的代码,建议异步处理。

假设业务流程这样:需要在用户登陆时,添加个短信通知它的粉丝。 很容易想到的实现流程如下:

image.png

假设提供sendMsgNotify服务的系统挂了,或者调用sendMsgNotify失败了,那么用户登陆就失败了。。。
一个通知功能导致了登陆主流程不可用,明显的捡了芝麻丢西瓜。那么有没有鱼鱼熊掌兼得的方法呢?有的,给发短信接口捕获异常处理,或者另开线程异步处理,如下:

image.png

因此,添加通知类等不是非主要,可降级的接口时,应该静下心来考虑是否会影响主要流程,思考怎么处理最好。

17. 处理Java日期时,当心YYYY格式设置的问题。

日常开发中,我们经常需要处理日期。我们要当时日期格式化的时候,年份是大写YYYY的坑。

1
2
3
4
5
6
7
ini复制代码Calendar calendar = Calendar.getInstance();
calendar.set(2019, Calendar.DECEMBER, 31);

Date testDate = calendar.getTime();

SimpleDateFormat dtf = new SimpleDateFormat("YYYY-MM-dd");
System.out.println("2019-12-31 转 YYYY-MM-dd 格式后 " + dtf.format(testDate));

运行结果:

1
yaml复制代码2019-12-31 转 YYYY-MM-dd 格式后 2020-12-31

为什么明明是2019年12月31号,就转了一下格式,就变成了2020年12月31号了?因为YYYY是基于周来计算年的,它指向当天所在周属于的年份,一周从周日开始算起,周六结束,只要本周跨年,那么这一周就算下一年的了。正确姿势是使用yyyy格式。

image.png

18. 如果一个类确定不会被继承,不会拿来搞AOP骚操作,可以指定final修饰符,如用final修饰一个工具类。

正例:

1
2
3
4
5
csharp复制代码public final class Tools {
public static void testFinal(){
System.out.println("工具类方法");
}
}

一个类指定了final修饰符,它不会被继承了,并且其所有方法都是final的了。Java编译器会找机会内联所有的final方法,提升了Java运行效率。

19. static静态变量不要依赖spring实例化变量,可能会导致初始化出错

之前看到项目有类似的代码。静态变量依赖于spring容器的bean。

1
arduino复制代码 private static SmsService smsService = SpringContextUtils.getBean(SmsService.class);

这个静态的smsService有可能获取不到的,因为类加载顺序不是确定的,而以上的代码,静态的smsService初始化强制依赖spring容器的实例了。正确的写法可以这样,如下:

1
2
3
4
5
6
7
8
9
csharp复制代码 private static SmsService  smsService =null;

//使用到的时候采取获取
public static SmsService getSmsService(){
if(smsService==null){
smsService = SpringContextUtils.getBean(SmsService.class);
}
return smsService;
}

20. 与类成员变量无关的方法,应当声明成静态方法

有些方法,与实例成员变量无关,就可以声明为静态方法。这一点,工具类用得很多。反例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
csharp复制代码/**
* BigDecimal的工具类
*/
public class BigDecimalUtils {

public BigDecimal ifNullSetZERO(BigDecimal in) {
return in != null ? in : BigDecimal.ZERO;
}

public BigDecimal sum(BigDecimal ...in){
BigDecimal result = BigDecimal.ZERO;
for (int i = 0; i < in.length; i++){
result = result.add(ifNullSetZERO(in[i]));
}
return result;
}

因为BigDecimalUtils工具类的方法都没有static修饰,所以,你要使用的时候,每次都要new一下啦,那不就耗资源去反复创建对象了嘛!!

1
2
ini复制代码BigDecimalUtils bigDecimalUtils = new BigDecimalUtils();
bigDecimalUtils.sum(a,b);

所以可以声明成静态变量,使用的时候,直接类名.方法调用即可,正例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
csharp复制代码/**
* BigDecimal的工具类
*/
public class BigDecimalUtils {

public static BigDecimal ifNullSetZERO(BigDecimal in) {
return in != null ? in : BigDecimal.ZERO;
}

public static BigDecimal sum(BigDecimal ...in){
BigDecimal result = BigDecimal.ZERO;
for (int i = 0; i < in.length; i++){
result = result.add(ifNullSetZERO(in[i]));
}
return result;
}

21. 不要用一个Exception捕捉所有可能的异常。

反例:

1
2
3
4
5
6
7
8
9
10
php复制代码
public void test(){
try{
//…抛出 IOException 的代码调用
//…抛出 SQLException 的代码调用
}catch(Exception e){
//用基类 Exception 捕捉的所有可能的异常,如果多个层次都这样捕捉,会丢失原始异常的有效信息哦
log.info(“Exception in test,exception:{}”, e);
}
}

正例:

1
2
3
4
5
6
7
8
9
10
11
12
scss复制代码public void test(){
try{
//…抛出 IOException 的代码调用
//…抛出 SQLException 的代码调用
}catch(IOException e){
//仅仅捕捉 IOException
log.info(“IOException in test,exception:{}”, e);
}catch(SQLException e){
//仅仅捕捉 SQLException
log.info(“SQLException in test,exception:{}”, e);
}
}

22. 函数不要过度封装,言简意赅即可。

反例:

1
2
3
4
5
6
7
typescript复制代码// 函数封装
public static boolean isUserVip(Boolean isVip) {
return Boolean.TRUE.equals(isVip);
}

// 使用代码
boolean isVip = isVip(user.getUserVip());

正例:

1
ini复制代码boolean isVip = Boolean.TRUE.equals(user.getUserVip());

函数不要过度封装,把意思表达清楚即可。并且,方法调用会引起入栈和出栈,导致消耗更多的CPU和内存,过度封装,会损耗性能的!

23. 如果变量的初值一定会被覆盖,就没有必要给变量赋初值。

反例:

1
2
3
4
5
6
ini复制代码List<UserInfo> userList = new ArrayList<>();
if (isAll) {
userList = userInfoDAO.queryAll();
} else {
userList = userInfoDAO.queryActive();
}

正例:

1
2
3
4
5
6
ini复制代码List<UserInfo> userList ;
if (isAll) {
userList = userInfoDAO.queryAll();
} else {
userList = userInfoDAO.queryActive();
}

24.金额数值计算要使用BigDecimal

看下这个浮点数计算的例子吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
csharp复制代码public class DoubleTest {
public static void main(String[] args) {
System.out.println(0.1+0.2);
System.out.println(1.0-0.8);
System.out.println(4.015*100);
System.out.println(123.3/100);

double amount1 = 3.15;
double amount2 = 2.10;
if (amount1 - amount2 == 1.05){
System.out.println("OK");
}
}
}

运行结果:

1
2
3
4
复制代码0.30000000000000004
0.19999999999999996
401.49999999999994
1.2329999999999999

因为计算机是以二进制存储数值的,对于浮点数也是。对于计算机而言,0.1无法精确表达,这就是为什么浮点数会导致精确度缺失的。因此,金额计算,一般都是用BigDecimal 类型

1
2
3
csharp复制代码System.out.println(new BigDecimal(0.1).add(new BigDecimal(0.2)));
//output:
0.3000000000000000166533453693773481063544750213623046875

其实,使用 BigDecimal 表示和计算浮点数,必须使用字符串的构造方法来初始化 BigDecimal,并且,还要关注BigDecimal的几位小数点,它有八种舍入模式等

25. 注意Arrays.asList的几个坑

  • 基本类型不能作为 Arrays.asList方法的参数,否则会被当做一个参数。
1
2
3
4
5
6
7
8
9
arduino复制代码public class ArrayAsListTest {
public static void main(String[] args) {
int[] array = {1, 2, 3};
List list = Arrays.asList(array);
System.out.println(list.size());
}
}
//运行结果
1
  • Arrays.asList 返回的 List 不支持增删操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
php复制代码public class ArrayAsListTest {
public static void main(String[] args) {
String[] array = {"1", "2", "3"};
List list = Arrays.asList(array);
list.add("5");
System.out.println(list.size());
}
}

// 运行结果
Exception in thread "main" java.lang.UnsupportedOperationException
at java.util.AbstractList.add(AbstractList.java:148)
at java.util.AbstractList.add(AbstractList.java:108)
at object.ArrayAsListTest.main(ArrayAsListTest.java:11)

Arrays.asList 返回的 List 并不是我们期望的 java.util.ArrayList,而是 Arrays 的内部类ArrayList。内部类的ArrayList没有实现add方法,而是父类的add方法的实现,是会抛出异常的呢。

  • 使用Arrays.asLis的时候,对原始数组的修改会影响到我们获得的那个List
1
2
3
4
5
6
7
8
9
10
11
12
13
ini复制代码public class ArrayAsListTest {
public static void main(String[] args) {
String[] arr = {"1", "2", "3"};
List list = Arrays.asList(arr);
arr[1] = "4";
System.out.println("原始数组"+Arrays.toString(arr));
System.out.println("list数组" + list);
}
}

//运行结果
原始数组[1, 4, 3]
list数组[1, 4, 3]

26,及时关闭IO资源流

应该大家都有过这样的经历,windows系统桌面如果打开太多文件或者系统软件,就会觉得电脑很卡。当然,我们linux服务器也一样,平时操作文件,或者数据库连接,IO资源流如果没关闭,那么这个IO资源就会被它占着,这样别人就没有办法用了,这就造成资源浪费。

所以使用完IO流,记得关闭哈。可以使用try-with-resource关闭的:

1
2
3
4
5
6
7
8
9
10
java复制代码/*
* 关注公众号,捡田螺的小男孩
*/
try (FileInputStream inputStream = new FileInputStream(new File("jay.txt")) {
// use resources
} catch (FileNotFoundException e) {
log.error(e);
} catch (IOException e) {
log.error(e);
}

27. 尽量使用函数内的基本类型临时变量

  • 在方法函数内,基本类型参数以及临时变量,都是保存在栈中的,访问速度比较快。
  • 对象类型的参数和临时变量的引用都保存在栈中,内容都保存在堆中,访问速度较慢。
  • 在类中,任何类型的成员变量都保存在堆(Heap)中,访问速度较慢。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
arduino复制代码public class AccumulatorUtil {

private double result = 0.0D;
//反例
public void addAllOne( double[] values) {
for(double value : values) {
result += value;
}
}
//正例,先在方法内声明一个局部临时变量,累加完后,再赋值给方法外的成员变量
public void addAll1Two(double[] values) {
double sum = 0.0D;
for(double value : values) {
sum += value;
}
result += sum;
}
}

28. 如果数据库一次查询的数量过多,建议分页处理。

如果你的Sql一次性查出来的数据量比较多,建议分页处理。

反例:

1
2
csharp复制代码
select user_id,name,age from user_info ;

正例:

1
csharp复制代码select user_id,name,age from user_info limit #{offset},#{pageSize};

如果偏移量特别大的时候,查询效率就变得低下。可以这接种方式优化:

1
2
3
4
5
6
7
csharp复制代码//方案一 :返回上次查询的最大记录(偏移量)
select id,name from user_info where id>10000 limit #{pageSize}.

//方案二:order by + 索引
select id,name from user_info order by id limit #{offset},#{pageSize}

//方案三:在业务允许的情况下限制页数:

29. 尽量减少对变量的重复计算

一般我们写代码的时候,会以以下的方式实现遍历:

1
2
3
css复制代码for (int i = 0; i < list.size; i++){

}

如果list数据量比较小那还好。如果list比较大时,可以优化成这样:

1
2
3
perl复制代码for (int i = 0, int length = list.size; i < length; i++){

}

理由:

  • 对方法的调用,即使是只有一个语句,也是有有消耗的,比如创建栈帧。如果list比较大时,多次调用list.size也是会有资源消耗的。

30. 修改对外老接口的时候,思考接口的兼容性。

很多bug都是因为修改了对外老接口,但是却不做兼容导致的。关键这个问题多数是比较严重的,可能直接导致系统发版失败的。新手程序员很容易就犯这个错误了哦~

所以,如果你的需求是在原来接口上修改,,尤其这个接口是对外提供服务的话,一定要考虑接口兼容。举个例子吧,比如dubbo接口,原本是只接收A,B参数,现在你加了一个参数C,就可以考虑这样处理。

1
2
3
4
5
6
7
8
scss复制代码//老接口
void oldService(A,B);{
//兼容新接口,传个null代替C
newService(A,B,null);
}

//新接口,暂时不能删掉老接口,需要做兼容。
void newService(A,B,C);

31 代码采取措施避免运行时错误(如数组边界溢出,被零除等)

日常开发中,我们需要采取措施规避数组边界溢出,被零整除,空指针等运行时错误。

类似代码比较常见:

1
ini复制代码String name = list.get(1).getName(); //list可能越界,因为不一定有2个元素哈

所以,应该采取措施,预防一下数组边界溢出,正例:

1
2
3
scss复制代码if(CollectionsUtil.isNotEmpty(list)&& list.size()>1){
String name = list.get(1).getName();
}

32. 注意 ArrayList.toArray() 强转的坑

1
2
3
4
5
6
7
typescript复制代码public class ArrayListTest {
public static void main(String[] args) {
List<String> list = new ArrayList<String>(1);
list.add("公众号:捡田螺的小男孩");
String[] array21 = (String[])list.toArray();//类型转换异常
}
}

因为返回的是Object类型,Object类型数组强转String数组,会发生ClassCastException。解决方案是,使用toArray()重载方法toArray(T[] a)

1
arduino复制代码String[] array1 = list.toArray(new String[0]);//可以正常运行

33. 尽量不在循环里远程调用、或者数据库操作,优先考虑批量进行。

程操作或者数据库操作都是比较耗网络、IO资源的,所以尽量不在循环里远程调用、不在循环里操作数据库,能批量一次性查回来尽量不要循环多次去查。(但是呢,也不要一次性查太多数据哈,要分批500一次酱紫)

正例:

1
scss复制代码remoteBatchQuery(param);

反例:

1
2
3
scss复制代码for(int i=0;i<n;i++){
remoteSingleQuery(param)
}

34. 写完代码,脑洞一下多线程执行会怎样,注意并发一致性问题

我们经常见的一些业务场景,就是先查下有没有记录,再进行对应的操作(比如修改)。但是呢,(查询+修改)合在一起不是原子操作哦,脑洞下多线程,就会发现有问题了,

反例:

1
2
3
4
5
6
javascript复制代码if(isAvailable(ticketId){  //非原子操作	
1、给现金增加操作
2、deleteTicketById(ticketId)
}else{
return "没有可用现金券";
}

为了更容易理解它,看这个流程图吧:

image.png

  • 1.线程A加现金
  • 2.线程B加现金
  • 3.线程A删除票标志
  • 4.线程B删除票标志

显然这样存在并发问题,正例应该利用数据库删除操作的原子性,如下:

1
2
3
4
5
kotlin复制代码if(deleteAvailableTicketById(ticketId) == 1){	//原子操作
1、给现金增加操作
}else{
return “没有可用现金券”
}

35 多线程异步优先考虑恰当的线程池,而不是new thread,同时考虑线程池是否隔离

为什么优先使用线程池?使用线程池有这几点好处呀

  • 它帮我们管理线程,避免增加创建线程和销毁线程的资源损耗。
  • 提高响应速度。
  • 重复利用。

同时呢,尽量不要所有业务都共用一个线程池,需要考虑线程池隔离。就是不同的关键业务,分配不同的线程池,然后线程池参数也要考虑恰当哈。之前写过几篇线程池的,觉得还不错,有兴趣的朋友可以看一下哈

36. 优化程序结构,尽量减少方法的重复调用

反例:

1
2
3
4
5
arduino复制代码    public static void listDetail(List<UserInfo> userInfoList) {
for (int i = 0; i < userInfoList.size(); i++) {
//重复调用userList.size()方法了
}
}

正例:

1
2
3
4
5
6
arduino复制代码 public static void listDetail(List<UserInfo> userInfoList) {
int length = userInfoList.size();
for (int i = 0; i < length; i++) {
//减少调用userList.size()方法,只在length变量调了一次。
}
}

37,直接大文件或者一次性从数据库读取太多数据到内存,可能导致OOM问题

如果一次性把大文件或者数据库太多数据达到内存,是会导致OOM的。所以,为什么查询DB数据库,一般都建议分批。

读取文件的话,一般文件不会太大,才使用Files.readAllLines()。为什么呢?因为它是直接把文件都读到内存的,预估下不会OOM才使用这个吧,可以看下它的源码:

1
2
3
4
5
6
7
8
9
10
11
12
ini复制代码public static List<String> readAllLines(Path path, Charset cs) throws IOException {
try (BufferedReader reader = newBufferedReader(path, cs)) {
List<String> result = new ArrayList<>();
for (;;) {
String line = reader.readLine();
if (line == null)
break;
result.add(line);
}
return result;
}
}

如果是太大的文件,可以使用Files.line()按需读取,当时读取文件这些,一般是使用完需要关闭资源流的哈。

38. 调用第三方接口,需要考虑异常处理,安全性,超时重试这几个点。

日常开发中,经常需要调用第三方服务,或者分布式远程服务的的话,需要考虑:

  • 异常处理(比如,你调别人的接口,如果异常了,怎么处理,是重试还是当做失败)
  • 超时(没法预估对方接口一般多久返回,一般设置个超时断开时间,以保护你的接口)
  • 重试次数(你的接口调失败,需不需要重试,需要站在业务上角度思考这个问题)

简单一个例子,你一个http请求调别人的服务,需要考虑设置connect-time,和retry次数。

39 不要使用循环拷贝集合,尽量使用JDK提供的方法拷贝集合

  • JDK提供原生API方法,可以直接指定集合的容量,避免多次扩容损耗性能。
  • 这些方法的底层调用System.arraycopy方法实现,进行数据的批量拷贝效率更高。

反例:

1
2
3
4
5
6
7
8
9
10
11
scss复制代码public List<UserInfo> copyMergeList(List<UserInfo> user1List, List<UserInfo> user2List) {
List<UserInfo> userList = new ArrayList<>(user1List.size() + user2List.size());
for (UserInfo user : user1List) {
userList.add(user);
}
for (UserInfo user : user2List) {
userList.add(user);
}

return user1List;
}

正例:

1
2
3
4
5
6
swift复制代码public List<UserInfo> copyMergeList(List<UserInfo> user1List, List<UserInfo> user2List) {
List<UserInfo> userList = new ArrayList<>(user1List.size() + user2List.size());
userList.addAll(user1List);
userList.addAll(user2List);
return user1List;
}

40. 对于复杂的代码逻辑,添加清楚的注释

写代码的时候,是没有必要写太多的注释的,好的方法变量命名就是最好的注释。但是,如果是业务逻辑很复杂的代码,真的非常有必要写清楚注释。清楚的注释,更有利于后面的维护。

41. 多线程情况下,考虑线性安全问题

在高并发情况下,HashMap可能会出现死循环。因为它是非线性安全的,可以考虑使用ConcurrentHashMap。 所以这个也尽量养成习惯,不要上来反手就是一个new HashMap();

  • Hashmap、Arraylist、LinkedList、TreeMap等都是线性不安全的;
  • Vector、Hashtable、ConcurrentHashMap等都是线性安全的

42. 使用spring事务功能时,注意这几个事务未生效的坑

日常业务开发中,我们经常跟事务打交道,事务失效主要有以下几个场景:

  • 底层数据库引擎不支持事务
  • 在非public修饰的方法使用
  • rollbackFor属性设置错误
  • 本类方法直接调用
  • 异常被try…catch吃了,导致事务失效。

反例:

1
2
3
4
5
6
7
8
9
10
11
12
typescript复制代码public class TransactionTest{
public void A(){
//插入一条数据
//调用方法B (本地的类调用,事务失效了)
B();
}

@Transactional
public void B(){
//插入数据
}
}

注解的事务方法给本类方法直接调用,事务失效

43. 使用Executors声明线程池,newFixedThreadPool的OOM问题

1
2
3
4
5
6
7
8
9
10
ini复制代码 ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executor.execute(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
//do nothing
}
});
}

IDE指定JVM参数:-Xmx8m -Xms8m :

image.png

运行结果:

image.png

我们看下源码,其实newFixedThreadPool使用的是无界队列!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
csharp复制代码public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
...
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
...
}

newFixedThreadPool线程池的核心线程数是固定的,它使用了近乎于无界的LinkedBlockingQueue阻塞队列。当核心线程用完后,任务会入队到阻塞队列,如果任务执行的时间比较长,没有释放,会导致越来越多的任务堆积到阻塞队列,最后导致机器的内存使用不停的飙升,造成JVM OOM。

44. catch住异常后,尽量不要使用e.printStackTrace(),而是使用log打印。

反例:

1
2
3
4
5
php复制代码try{
// do what you want
}catch(Exception e){
e.printStackTrace();
}

正例:

1
2
3
4
5
php复制代码try{
// do what you want
}catch(Exception e){
log.info("你的程序有异常啦",e);
}

45. 接口需要考虑幂等性

接口是需要考虑幂等性的,尤其抢红包、转账这些重要接口。最直观的业务场景,就是用户连着点两次,你的接口有没有hold住。

一般幂等技术方案有这几种:

  • 查询操作
  • 唯一索引
  • token机制,防止重复提交
  • 数据库的delete/update操作
  • 乐观锁
  • 悲观锁
  • Redis、zookeeper 分布式锁(以前抢红包需求,用了Redis分布式锁)
  • 状态机幂等

46. 对于行数比较多的函数,建议划分小函数,增强可读性。

反例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
csharp复制代码public class Test {
private String name;
private Vector<Order> orders = new Vector<Order>();

public void printOwing() {
//print banner
System.out.println("****************");
System.out.println("*****customer Owes *****");
System.out.println("****************");

//calculate totalAmount
Enumeration env = orders.elements();
double totalAmount = 0.0;
while (env.hasMoreElements()) {
Order order = (Order) env.nextElement();
totalAmount += order.getAmout();
}

//print details
System.out.println("name:" + name);
System.out.println("amount:" + totalAmount);
}
}

正例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
scss复制代码public class Test {
private String name;
private Vector<Order> orders = new Vector<Order>();

public void printOwing() {

//print banner
printBanner();
//calculate totalAmount
double totalAmount = getTotalAmount();
//print details
printDetail(totalAmount);
}

void printBanner(){
System.out.println("****************");
System.out.println("*****customer Owes *****");
System.out.println("****************");
}

double getTotalAmount(){
Enumeration env = orders.elements();
double totalAmount = 0.0;
while (env.hasMoreElements()) {
Order order = (Order) env.nextElement();
totalAmount += order.getAmout();
}
return totalAmount;
}

void printDetail(double totalAmount){
System.out.println("name:" + name);
System.out.println("amount:" + totalAmount);
}

}

一个过于冗长的函数或者一段需要注释才能让人理解用途的代码,可以考虑把它切分成一个功能明确的函数单元,并定义清晰简短的函数名,这样会让代码变得更加优雅。

47. 你的关键业务代码,一般建议搞点日志保驾护航。

关键业务代码无论身处何地,都应该有足够的日志保驾护航。

比如:你实现转账业务,转个几百万,然后转失败了,接着客户投诉,然后你还没有打印到日志,想想那种水深火热的困境下,你却毫无办法。。。

那么,你的转账业务都需要那些日志信息呢?至少,方法调用前,入参需要打印需要吧,接口调用后,需要捕获一下异常吧,同时打印异常相关日志吧,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
c复制代码public void transfer(TransferDTO transferDTO){
log.info("invoke tranfer begin");
//打印入参
log.info("invoke tranfer,paramters:{}",transferDTO);
try {
res= transferService.transfer(transferDTO);
}catch(Exception e){
log.error("transfer fail,cifno:{},account:{}",transferDTO.getCifno(),
transferDTO.getaccount())
log.error("transfer fail,exception:{}",e);
}
log.info("invoke tranfer end");
}

除了打印足够的日志,我们还需要注意一点是,日志级别别混淆使用,别本该打印info的日志,你却打印成error级别,告警半夜三更催你起来排查问题就不好了。

48. 某些可变因素,如红包皮肤等等,做成配置化是否会更好呢。

假如产品提了个红包需求,圣诞节的时候,红包皮肤为圣诞节相关的,春节的时候,红包皮肤等。

反例:

1
2
3
4
5
ini复制代码if(duringChristmas){
img = redPacketChristmasSkin;
}else if(duringSpringFestival){
img = redSpringFestivalSkin;
}

如果到了元宵节的时候,运营小姐姐突然又有想法,红包皮肤换成灯笼相关的,这时候,是不是要去修改代码了,重新发布了?从一开始,实现一张红包皮肤的配置表,将红包皮肤做成配置化呢?更换红包皮肤,只需修改一下表数据就好了。

49,.直接迭代需要使用的集合,无须在额外操作

直接迭代需要使用的集合,无需通过其它操作获取数据,比较典型就是Map的迭代遍历:

反例:

1
2
3
4
5
ini复制代码Map<Long, UserDO> userMap = ...;
for (Long userId : userMap.keySet()) {
UserDO user = userMap.get(userId);
...
}

正例:

1
2
3
4
5
6
ini复制代码Map<Long, UserDO> userMap = ...;
for (Map.Entry<Long, UserDO> userEntry : userMap.entrySet()) {
Long userId = userEntry.getKey();
UserDO user = userEntry.getValue();
...
}

50. 策略模式+工厂方法优化冗余的if else

反例:

1
2
3
4
5
6
7
8
9
csharp复制代码    String medalType = "guest";
if ("guest".equals(medalType)) {
System.out.println("嘉宾勋章");
} else if ("vip".equals(medalType)) {
System.out.println("会员勋章");
} else if ("guard".equals(medalType)) {
System.out.println("展示守护勋章");
}
...

首先,我们把每个条件逻辑代码块,抽象成一个公共的接口,我们根据每个逻辑条件,定义相对应的策略实现类,可得以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
typescript复制代码//勋章接口
public interface IMedalService {
void showMedal();
}

//守护勋章策略实现类
public class GuardMedalServiceImpl implements IMedalService {
@Override
public void showMedal() {
System.out.println("展示守护勋章");
}
}
//嘉宾勋章策略实现类
public class GuestMedalServiceImpl implements IMedalService {
@Override
public void showMedal() {
System.out.println("嘉宾勋章");
}
}
//VIP勋章策略实现类
public class VipMedalServiceImpl implements IMedalService {
@Override
public void showMedal() {
System.out.println("会员勋章");
}
}

接下来,我们再定义策略工厂类,用来管理这些勋章实现策略类,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
arduino复制代码//勋章服务工产类
public class MedalServicesFactory {

private static final Map<String, IMedalService> map = new HashMap<>();
static {
map.put("guard", new GuardMedalServiceImpl());
map.put("vip", new VipMedalServiceImpl());
map.put("guest", new GuestMedalServiceImpl());
}
public static IMedalService getMedalService(String medalType) {
return map.get(medalType);
}
}

优化后,正例如下:

1
2
3
4
5
6
7
ini复制代码ublic class Test {
public static void main(String[] args) {
String medalType = "guest";
IMedalService medalService = MedalServicesFactory.getMedalService(medalType);
medalService.showMedal();
}
}

参考与感谢

  • Java编码技巧之高效代码50例
  • 写代码有这些想法,同事才不会认为你是复制粘贴程序员
  • 写代码有这16个好习惯,可以减少80%非业务的bug
  • Java日常开发的21个坑,你踩过几个?
  • if-else代码优化的八种方案

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

群消息已读回执(这个屌),究竟是推还是拉?

发表于 2021-03-28

每当发出一条微信消息,都希望对方尽快看到,并尽快回复,但始终不知道对方是否阅读。

每当收到一条不能立马回复的微信消息,都默默返回,假装没看见。

画外音:不想回复的人,唉,你只是个好人。

微信用于个人社交,产品设计上,在线状态,强制已读回执都有可能暴露个人隐私,故微信并无相关功能。

钉钉用于商务交流,其“强制已读回执”功能,让职场人无法再“假装不在线”,“假装没收到”。

有甚者,钉钉的群有“强制已读回执”功能,你在群里发出的消息,能够知道谁读了消息,谁没有读消息。

群消息的流程如何,接收方如何确保收到群消息,发送方如何收已读回执,究竟是拉取,还是推送,是今天要聊的话题。

一、群消息投递流程,以及可达性保证

大家一起跟着楼主的节奏,一步一步来看群消息怎么设计。

核心问题1:群消息,只存一份?还是,每个成员存一份?

答:存一份,为每个成员设置一个群消息队列,会有大量数据冗余,并不合适。

核心问题2:如果群消息只存一份,怎么知道每个成员读了哪些消息?

答:可以利用群消息的偏序关系,记录每个成员的last_ack_msgid(last_ack_time),这条消息之前的消息已读,这条消息之后的消息未读。该方案意味着,对于群内的每一个用户,只需要记录一个值即可。

解答上述两个核心问题后,很容易得到群消息的核心数据结构。

群消息表:记录群消息。

group_msgs(msgid, gid, sender_uid, time, content);

各字段的含义为:消息ID,群ID,发送方UID,发送时间,发送内容。

群成员表:记录群里的成员,以及每个成员收到的最后一条群消息。

group_users(gid, uid, last_ack_msgid);

各字段的含义为:群ID,群成员UID,群成员最后收到的一条群消息ID。

在核心数据结构设计完之后,一起来看看群消息发送的流程。

业务场景:

(1)一个群中有A, uid1, uid2, uid3四名成员;

(2)A, uid1, uid2在线,期望实时收到在线消息;

(3)uid3离线,期望未来拉取到离线消息;

其整个消息发送的流程1-4如上图:\

(1)A发出群消息;

(2)server收到消息后,一来要将群消息落地,二来要查询群里有哪些群成员,以便实施推送;

(3)对于群成员,查询在线状态;

(4)对于在线的群成员,实施推送;

这个流程里,只要第二步消息落地完成,就能保证群消息不会丢失。

核心问题3:如何保证接收方一定收到群消息?

答:各个收到消息后,要修改各群成员的last_ack_msgid,以告诉系统,这一条消息确认收到了。

在线消息,离线消息的last_ack_msgid的修改,又各有不同。

对于在线的群友,收到群消息后,第一时间会ack,修改last_ack_msgid。

对于离线的群友,会在下一次登录时,拉取未读的所有群离线消息,并将last_ack_msgid修改为最新的一条消息。

核心问题4:如果ack丢失,群友会不会拉取重复的群消息?

答:会,可以根据msgid在客户端本地做去重,即使系统层面收到了重复的消息,仍然可以保证良好的用户体验。

上述流程,只能确保接收方收到消息,发送方仍然不知道哪些人在线阅读了消息,哪些人离线未阅读消息,并没有实现已读回执,那已读回执会对系统设计产生什么样的影响呢?

二、已读回执流程

对于发送方发送的任何一条群消息,都需要知道,这条消息有多少人已读多少人未读,就需要一个基础表来记录这个关系。

消息回执表:用来记录消息的已读回执。

msg_acks(sender_uid, msgid, recv_uid, gid,if_ack);

各字段的含义为:发送方UID,消息ID,回执方UID,群ID,回执标记。

增加了已读回执逻辑后,群消息的流程会有细微的改变。

步骤二,server收到消息后,除了要:\

(1)将群消息落地;

(2)查询群里有哪些群成员,以便实施推送;

之外,还需要:

(3)插入每条消息的初始回执状态;

\

接收方修改last_ack_msgid的流程,会变为:\

(1)发送ack请求;

(2)修改last_ack_msgid,并且,修改已读回执if_ack状态;

(3)查询发送方在线状态;

(4)向发送方实时推送已读回执(如果发送方在线);

如果发送方不在线,ta会在下次登录的时候:

(5)从关联表里拉取每条消息的已读回执;

这里的初步结论是:

(1)如果发送方在线,会实时被推送已读回执;

(2)如果发送方不在线,会在下次在线时拉取已读回执;

三、流程优化方案

再次详细的分析下,群消息已读回执的“消息风暴扩散系数”,假设每个群有200个用户,其中20%的用户在线,即40各用户在线。群用户每发送一条群消息,会有:

(1)40个消息,通知给群友;

(2)40个ack修改last_ack_msgid,发给服务端;

(3)40个已读回执,通知给发送方;

可见,其消息风暴扩散系数非常之大。

同时:

(1)需要存储40条ack记录;

群数量,群友数量,群消息数量越来越多之后,存储也会成为问题。

是否有优化方案呢?

群消息的推送,能否改为接收方轮询拉取?

答:不能,消息接收,实时性是核心指标。

对于 last_ack_msgid 的修改,真的需要每个群消息都进行ack么?

答:其实不需要,可以批量ack,累计收到N条群消息(例如10条),再向服务器发送一次last_ack_msgid的修改请求,同时修改这个请求之前所有请求的已读回执,这样就能将40个发送给服务端的ack请求量,降为原来的1/10。

会带来什么副作用?

答:last_ack_msgid的作用是,记录接收方最近新取的一条群消息,如果不实时更新,可能导致,异常退出时,有一些群消息没来得及更新last_ack_msgid,使得下次登陆时,拉取到重复的群消息。但这不是问题,客户端可以根据msgid去重,用户体验不会受影响。

发送方在线时,对于已读回执的发送,真的需要实时推送么?

答:其实不需要,发送方每发一条消息,会收到40个已读回执,采用轮询拉取(例如1分钟一次,一个小时也就60个请求),可以大大降低请求量。

画外音:或者直接放到应用层keepalive请求里,做到0额外请求增加。

会带来什么副作用?

答:已读回执更新不实时,最坏的情况下,1分钟才更新回执。当然,可以根据性能与产品体验来折衷配置这个轮询时间。

如何降低数据量?

答:回执数据不是核心数据

(1)已读的消息,可以进行物理删除,而不是标记删除;

(2)超过N长时间的回执,归档或者删除掉;

四、总结

对于群消息已读回执,一般来说:

(1)如果发送方在线,会实时被推送已读回执;

(2)如果发送方不在线,会在下次在线时拉取已读回执;

如果要对进行优化,可以:

(1)接收方累计收到N条群消息再批量ack;

(2)发送方轮询拉取已读回执;

(3)物理删除已读回执数据,定时删除或归档非核心历史数据;

推还是拉?任何脱离业务的架构设计都是耍流氓。

**架构师之路-分享技术思路**

若有收获,随手转发。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

高并发下如何保证接口的幂等性?

发表于 2021-03-28

前言

接口幂等性问题,对于开发人员来说,是一个跟语言无关的公共问题。本文分享了一些解决这类问题非常实用的办法,绝大部分内容我在项目中实践过的,给有需要的小伙伴一个参考。

不知道你有没有遇到过这些场景:

  1. 有时我们在填写某些form表单时,保存按钮不小心快速点了两次,表中竟然产生了两条重复的数据,只是id不一样。
  2. 我们在项目中为了解决接口超时问题,通常会引入了重试机制。第一次请求接口超时了,请求方没能及时获取返回结果(此时有可能已经成功了),为了避免返回错误的结果(这种情况不可能直接返回失败吧?),于是会对该请求重试几次,这样也会产生重复的数据。
  3. mq消费者在读取消息时,有时候会读取到重复消息(至于什么原因这里先不说,有兴趣的小伙伴,可以找我私聊),如果处理不好,也会产生重复的数据。

最近无意间获得一份BAT大厂大佬写的刷题笔记,一下子打通了我的任督二脉,越来越觉得算法没有想象中那么难了。
BAT大佬写的刷题笔记,让我offer拿到手软

没错,这些都是幂等性问题。

接口幂等性是指用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。

这类问题多发于接口的:

  • insert操作,这种情况下多次请求,可能会产生重复数据。
  • update操作,如果只是单纯的更新数据,比如:update user set status=1 where id=1,是没有问题的。如果还有计算,比如:update user set status=status+1 where id=1,这种情况下多次请求,可能会导致数据错误。

那么我们要如何保证接口幂等性?本文将会告诉你答案。

  1. insert前先select

通常情况下,在保存数据的接口中,我们为了防止产生重复数据,一般会在insert前,先根据name或code字段select一下数据。如果该数据已存在,则执行update操作,如果不存在,才执行 insert操作。

图片

该方案可能是我们平时在防止产生重复数据时,使用最多的方案。但是该方案不适用于并发场景,在并发场景中,要配合其他方案一起使用,否则同样会产生重复数据。我在这里提一下,是为了避免大家踩坑。

  1. 加悲观锁

在支付场景中,用户A的账号余额有150元,想转出100元,正常情况下用户A的余额只剩50元。一般情况下,sql是这样的:

1
ini复制代码update user amount = amount-100 where id=123;

如果出现多次相同的请求,可能会导致用户A的余额变成负数。这种情况,用户A来可能要哭了。于此同时,系统开发人员可能也要哭了,因为这是很严重的系统bug。

为了解决这个问题,可以加悲观锁,将用户A的那行数据锁住,在同一时刻只允许一个请求获得锁,更新数据,其他的请求则等待。

通常情况下通过如下sql锁住单行数据:

1
sql复制代码select * from user id=123 for update;

具体流程如下:

图片

具体步骤:

  1. 多个请求同时根据id查询用户信息。
  2. 判断余额是否不足100,如果余额不足,则直接返回余额不足。
  3. 如果余额充足,则通过for update再次查询用户信息,并且尝试获取锁。
  4. 只有第一个请求能获取到行锁,其余没有获取锁的请求,则等待下一次获取锁的机会。
  5. 第一个请求获取到锁之后,判断余额是否不足100,如果余额足够,则进行update操作。
  6. 如果余额不足,说明是重复请求,则直接返回成功。

需要特别注意的是:如果使用的是mysql数据库,存储引擎必须用innodb,因为它才支持事务。此外,这里id字段一定要是主键或者唯一索引,不然会锁住整张表。

悲观锁需要在同一个事务操作过程中锁住一行数据,如果事务耗时比较长,会造成大量的请求等待,影响接口性能。

此外,每次请求接口很难保证都有相同的返回值,所以不适合幂等性设计场景,但是在防重场景中是可以的使用的。

在这里顺便说一下,防重设计 和 幂等设计,其实是有区别的。防重设计主要为了避免产生重复数据,对接口返回没有太多要求。而幂等设计除了避免产生重复数据之外,还要求每次请求都返回一样的结果。

  1. 加乐观锁

既然悲观锁有性能问题,为了提升接口性能,我们可以使用乐观锁。需要在表中增加一个timestamp或者version字段,这里以version字段为例。

在更新数据之前先查询一下数据:

1
ini复制代码select id,amount,version from user id=123;

如果数据存在,假设查到的version等于1,再使用id和version字段作为查询条件更新数据:

1
2
ini复制代码update user set amount=amount+100,version=version+1
where id=123 and version=1;

更新数据的同时version+1,然后判断本次update操作的影响行数,如果大于0,则说明本次更新成功,如果等于0,则说明本次更新没有让数据变更。

由于第一次请求version等于1是可以成功的,操作成功后version变成2了。这时如果并发的请求过来,再执行相同的sql:

1
2
ini复制代码 update user set amount=amount+100,version=version+1
where id=123 and version=1;

该update操作不会真正更新数据,最终sql的执行结果影响行数是0,因为version已经变成2了,where中的version=1肯定无法满足条件。但为了保证接口幂等性,接口可以直接返回成功,因为version值已经修改了,那么前面必定已经成功过一次,后面都是重复的请求。

具体流程如下:图片

具体步骤:

  1. 先根据id查询用户信息,包含version字段
  2. 根据id和version字段值作为where条件的参数,更新用户信息,同时version+1
  3. 判断操作影响行数,如果影响1行,则说明是一次请求,可以做其他数据操作。
  4. 如果影响0行,说明是重复请求,则直接返回成功。
  1. 加唯一索引

绝大数情况下,为了防止重复数据的产生,我们都会在表中加唯一索引,这是一个非常简单,并且有效的方案。

1
sql复制代码alter table `order` add UNIQUE KEY `un_code` (`code`);

加了唯一索引之后,第一次请求数据可以插入成功。但后面的相同请求,插入数据时会报Duplicate entry '002' for key 'order.un_code异常,表示唯一索引有冲突。

虽说抛异常对数据来说没有影响,不会造成错误数据。但是为了保证接口幂等性,我们需要对该异常进行捕获,然后返回成功。

如果是java程序需要捕获:DuplicateKeyException异常,如果使用了spring框架还需要捕获:MySQLIntegrityConstraintViolationException异常。

具体流程图如下:

图片

具体步骤:

  1. 用户通过浏览器发起请求,服务端收集数据。
  2. 将该数据插入mysql
  3. 判断是否执行成功,如果成功,则操作其他数据(可能还有其他的业务逻辑)。
  4. 如果执行失败,捕获唯一索引冲突异常,直接返回成功。
    最近我建了新的技术交流群,打算将它打造成高质量的活跃群,欢迎小伙伴们加入。

我以往的技术群里技术氛围非常不错,大佬很多。

image.png

加微信:su_san_java,备注:加群,即可加入该群。

  1. 建防重表

有时候表中并非所有的场景都不允许产生重复的数据,只有某些特定场景才不允许。这时候,直接在表中加唯一索引,显然是不太合适的。

针对这种情况,我们可以通过建防重表来解决问题。

该表可以只包含两个字段:id 和 唯一索引,唯一索引可以是多个字段比如:name、code等组合起来的唯一标识,例如:susan_0001。

具体流程图如下:

图片

具体步骤:

  1. 用户通过浏览器发起请求,服务端收集数据。
  2. 将该数据插入mysql防重表
  3. 判断是否执行成功,如果成功,则做mysql其他的数据操作(可能还有其他的业务逻辑)。
  4. 如果执行失败,捕获唯一索引冲突异常,直接返回成功。

需要特别注意的是:防重表和业务表必须在同一个数据库中,并且操作要在同一个事务中。

  1. 根据状态机

很多时候业务表是有状态的,比如订单表中有:1-下单、2-已支付、3-完成、4-撤销等状态。如果这些状态的值是有规律的,按照业务节点正好是从小到大,我们就能通过它来保证接口的幂等性。

假如id=123的订单状态是已支付,现在要变成完成状态。

1
sql复制代码update `order` set status=3 where id=123 and status=2;

第一次请求时,该订单的状态是已支付,值是2,所以该update语句可以正常更新数据,sql执行结果的影响行数是1,订单状态变成了3。

后面有相同的请求过来,再执行相同的sql时,由于订单状态变成了3,再用status=2作为条件,无法查询出需要更新的数据,所以最终sql执行结果的影响行数是0,即不会真正的更新数据。但为了保证接口幂等性,影响行数是0时,接口也可以直接返回成功。

具体流程图如下:

图片

具体步骤:

  1. 用户通过浏览器发起请求,服务端收集数据。
  2. 根据id和当前状态作为条件,更新成下一个状态
  3. 判断操作影响行数,如果影响了1行,说明当前操作成功,可以进行其他数据操作。
  4. 如果影响了0行,说明是重复请求,直接返回成功。

主要特别注意的是,该方案仅限于要更新的表有状态字段,并且刚好要更新状态字段的这种特殊情况,并非所有场景都适用。

  1. 加分布式锁

其实前面介绍过的加唯一索引或者加防重表,本质是使用了数据库的分布式锁,也属于分布式锁的一种。但由于数据库分布式锁的性能不太好,我们可以改用:redis或zookeeper。

鉴于现在很多公司分布式配置中心改用apollo或nacos,已经很少用zookeeper了,我们以redis为例介绍分布式锁。

目前主要有三种方式实现redis的分布式锁:

  1. setNx命令
  2. set命令
  3. Redission框架

每种方案各有利弊,具体实现细节我就不说了,有兴趣的朋友可以加我微信找我私聊。

具体流程图如下:

图片

具体步骤:

  1. 用户通过浏览器发起请求,服务端会收集数据,并且生成订单号code作为唯一业务字段。
  2. 使用redis的set命令,将该订单code设置到redis中,同时设置超时时间。
  3. 判断是否设置成功,如果设置成功,说明是第一次请求,则进行数据操作。
  4. 如果设置失败,说明是重复请求,则直接返回成功。

需要特别注意的是:分布式锁一定要设置一个合理的过期时间,如果设置过短,无法有效的防止重复请求。如果设置过长,可能会浪费redis的存储空间,需要根据实际业务情况而定。

  1. 获取token

除了上述方案之外,还有最后一种使用token的方案。该方案跟之前的所有方案都有点不一样,需要两次请求才能完成一次业务操作。

  1. 第一次请求获取token
  2. 第二次请求带着这个token,完成业务操作。

具体流程图如下:

第一步,先获取token。

图片

第二步,做具体业务操作。

图片

最近我建了新的技术交流群,打算将它打造成高质量的活跃群,欢迎小伙伴们加入。

我以往的技术群里技术氛围非常不错,大佬很多。

image.png

加微信:su_san_java,备注:加群,即可加入该群。

具体步骤:

  1. 用户访问页面时,浏览器自动发起获取token请求。
  2. 服务端生成token,保存到redis中,然后返回给浏览器。
  3. 用户通过浏览器发起请求时,携带该token。
  4. 在redis中查询该token是否存在,如果不存在,说明是第一次请求,做则后续的数据操作。
  5. 如果存在,说明是重复请求,则直接返回成功。
  6. 在redis中token会在过期时间之后,被自动删除。

以上方案是针对幂等设计的。

如果是防重设计,流程图要改改:

图片

需要特别注意的是:token必须是全局唯一的。

最近无意间获得一份BAT大厂大佬写的刷题笔记,一下子打通了我的任督二脉,越来越觉得算法没有想象中那么难了。
BAT大佬写的刷题笔记,让我offer拿到手软

最后说一句(求关注,别白嫖我)

如果这篇文章对您有所帮助,或者有所启发的话,帮忙扫描下发二维码关注一下,您的支持是我坚持写作最大的动力。

求一键三连:点赞、转发、在看。

关注公众号:【苏三说技术】,在公众号中回复:面试、代码神器、开发手册、时间管理有超赞的粉丝福利,另外回复:加群,可以跟很多BAT大厂的前辈交流和学习。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

2021了,还不会Kubernetes吗系列文章(一)——

发表于 2021-03-28

1.什么是 Kubernetes

  • Kubernetes 看作是用来是一个部署镜像的平台
  • 可以用来操作多台机器调度部署镜像
  • 在 Kubernetes 中,可以使用集群来组织服务器的。集群中会存在一个 Master 节点,该节点是 Kubernetes 集群的控制节点,负责调度集群中其他服务器的资源。其他节点被称为 Node
  1. 基础安装

  • Master & Node 节点都需要安装

2.1 安装些必备组件

  • vim 是 Linux 下的一个文件编辑器
  • wget 可以用作文件下载使用
  • ntpdate 则是可以用来同步时区
1
js复制代码yum install vim wget ntpdate -y

2.2 关闭防火墙

  • kubernetes 会创建防火墙规则,先关闭firewalld
1
js复制代码systemctl stop firewalld & systemctl disable firewalld

2.3 关闭 Swap 分区

  • Swap 是 Linux 的交换分区,在系统资源不足时,Swap 分区会启用,这个我们不需要
  • 应该让新创建的服务自动调度到集群的其他 Node 节点中去,而不是使用 Swap 分区
1
2
js复制代码#临时关闭
swapoff -a

2.4 关闭 Selinux

  • 关闭Selinux是为了支持容器可以访问宿主机文件系统
1
2
3
4
5
6
7
js复制代码# 暂时关闭 selinux
setenforce 0

# 永久关闭
vi /etc/sysconfig/selinux
# 修改以下参数,设置为disable
SELINUX=disabled

2.5 统一我们的系统时间和时区

  • 使用 ntpdate 来统一我们的系统时间和时区,服务器时间与阿里云服务器对齐
1
2
3
4
5
6
js复制代码# 统一时区,为上海时区
ln -snf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
bash -c "echo 'Asia/Shanghai' > /etc/timezone"

# 统一使用阿里服务器进行时间更新
ntpdate ntp1.aliyun.com

2.6 安装 Docker

  • 在 kubernetes 中的组件,服务都可以 Docker 镜像方式部署的,所以需要安装Docker
  • device-mapper-persistent-data: 存储驱动,Linux上的许多高级卷管理技术
  • lvm: 逻辑卷管理器,用于创建逻辑磁盘分区使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
js复制代码yum install -y yum-utils device-mapper-persistent-data lvm2


sudo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
yum install docker-ce -y
systemctl start docker
systemctl enable docker

sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["https://fwvjnv59.mirror.aliyuncs.com"]
}
EOF

sudo systemctl daemon-reload
sudo systemctl restart docker.service

2.7 安装 Kubernetes 组件

2.7.1 切换阿里云源

1
2
3
4
5
6
7
8
9
10
js复制代码cat <<EOF > /etc/yum.repos.d/kubernetes.repo
[kubernetes]
name=Kubernetes
baseurl=http://mirrors.aliyun.com/kubernetes/yum/repos/kubernetes-el7-x86_64
enabled=1
gpgcheck=0
repo_gpgcheck=0
gpgkey=http://mirrors.aliyun.com/kubernetes/yum/doc/yum-key.gpg
http://mirrors.aliyun.com/kubernetes/yum/doc/rpm-package-key.gpg
EOF

2.7.2 安装 Kubernetes 组件

  • kubelet 是 Kubernetes 中的核心组件。它会运行在集群的所有节点上,并负责创建启动服务容器
  • kubectl 则是Kubernetes的命令行工具。可以用来管理,删除,创建资源
  • kubeadm 则是用来初始化集群,子节点加入的工具
1
2
3
js复制代码yum install -y kubelet kubeadm kubectl
# 启动kubelet
systemctl enable kubelet && systemctl start kubelet

2.8 设置bridge-nf-call-iptables

  • 配置内核参数,将桥接的IPV4浏览传递到iptables链
  • 开启了bridge-nf-call-iptables
1
js复制代码echo 1 > /proc/sys/net/bridge/bridge-nf-call-iptables
  1. Master

  • Master 节点是集群内的调度和主要节点

3.1 修改主机名称为 master

1
js复制代码hostnamectl set-hostname  master

3.2 配置hosts

1
2
3
4
js复制代码ip addr
vim /etc/hosts

172.31.178.169 master master

3.3 配置 Kubernetes 初始化文件

  • init-defaults 输出一份默认初始化配置文件
1
2
js复制代码kubeadm config print init-defaults > init-kubeadm.conf
vim init-kubeadm.conf
  • 更换 Kubernetes 镜像仓库为阿里云镜像仓库,加速组件拉取
  • 替换 ip 为自己主机 ip
  • 配置 pod 网络为 flannel 网段
  • 为了让集群之间可以互相通信,需要配置子网络,这些在后面的flannel网络中需要用到
    • 10.96.0.0/12 是Kubernetes内部的网络pods需要的网络
    • 10.244.0.0/16 是Kubernetes内部services需要的网络
1
2
3
4
5
6
7
8
9
10
11
diff复制代码- imageRepository: k8s.gcr.io 更换k8s镜像仓库
+ imageRepository: registry.cn-hangzhou.aliyuncs.com/google_containers
- localAPIEndpointc,advertiseAddress为master ip ,port默认不修改
localAPIEndpoint:
+ advertiseAddress: 172.31.178.169 # 此处为master的IP
bindPort: 6443
# 配置子网络
networking:
dnsDomain: cluster.local
serviceSubnet: 10.96.0.0/12
+ podSubnet: 10.244.0.0/16 # 添加这个

3.3 拉取其它组件

  • kubeadm 可以用来拉取我们的默认组件镜像
  • kube-apiserver 提供接口服务,可以让外网访问集群
  • kube-controller-manager 内部的控制指令工具
  • kube-scheduler 内部的任务调度器
  • kube-proxy 反向代理和负载均衡,流量转发
  • pause 进程管理工具
  • etcd 保持 集群内部的数据一致性
  • coredns 集群内网通信

k8s-1

1
2
3
4
js复制代码// 查看缺少的组件
kubeadm config images list --config init-kubeadm.conf
// 拉取缺少的组件
kubeadm config images pull --config init-kubeadm.conf

3.4 初始化 Kubernetes

1
js复制代码kubeadm init --config init-kubeadm.conf
  • kubeadm join 可以快速将 Node 节点加入到 Master 集群内
  • Master 节点需要执行的初始化命令
  • 将默认的 Kubernetes 认证文件拷贝进 .kube 文件夹内,才能默认使用该配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
js复制代码Your Kubernetes control-plane has initialized successfully!

To start using your cluster, you need to run the following as a regular user:

mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config

Alternatively, if you are the root user, you can run:

export KUBECONFIG=/etc/kubernetes/admin.conf

You should now deploy a pod network to the cluster.
Run "kubectl apply -f [podnetwork].yaml" with one of the options listed at:
https://kubernetes.io/docs/concepts/cluster-administration/addons/

Then you can join any number of worker nodes by running the following on each as root:

kubeadm join 172.31.178.169:6443 --token abcdef.0123456789abcdef \
--discovery-token-ca-cert-hash sha256:8aac19f4dbe68f1e15ba3d80e141acdc912e353f9757ad69187e8fb9780bc975

3.5 安装 Flannel

  • flannel 主要的作用是通过创建一个虚拟网络,让不同节点下的服务有着全局唯一的IP地址,且服务之前可以互相访问和连接。
  • 集群内网网络通信协议通信模式采用了Flannel协议
1
2
3
4
5
6
7
8
9
10
11
js复制代码#wget https://raw.githubusercontent.com/coreos/flannel/master/Documentation/kube-flannel.yml
wget http://img.golderbrother.cn/kube-flannel.yml
docker pull quay.io/coreos/flannel:v0.13.0-rc2
kubectl apply -f kube-flannel.yml
net-conf.json: |
{
"Network": "10.244.0.0/16",
"Backend": {
"Type": "vxlan"
}
}

3.6 查看启动情况

1
2
3
4
js复制代码kubectl get nodes

NAME STATUS ROLES AGE VERSION
master Ready control-plane,master 9m34s v1.20.4

3.7 Node节点配置

  • Node 节点的地位则是负责运行服务容器,负责接收调度的。
  • 先执行基础安装
1
js复制代码hostnamectl set-hostname node1

3.8 拷贝 Master 节点配置文件

  • 将 master 节点的配置文件拷贝 k8s 到 node1 节点
1
js复制代码scp $HOME/.kube/config root@172.31.178.170:~/
  • 在node1节点归档配置文件
1
2
3
js复制代码mkdir -p $HOME/.kube
sudo mv $HOME/config $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config

3.9 加入 Master 节点

  • 让 Node 节点加入到 master 集群内
1
2
js复制代码kubeadm join 172.16.81.164:6443 --token abcdef.0123456789abcdef \
--discovery-token-ca-cert-hash sha256:b4a059eeffa2e52f2eea7a5d592be10c994c7715c17bda57bbc3757d4f13903d
  • 如果刚才的命令丢了,可以在 master 机器上使用 kubeadm token create 重新生成一条命令
1
js复制代码kubeadm token create --print-join-command

3.10. 安装 Flannel

1
2
js复制代码scp ~/kube-flannel.yml root@172.31.178.170:~/
kubectl apply -f kube-flannel.yml

4. 查看状态

1
2
3
4
js复制代码kubectl get nodes
NAME STATUS ROLES AGE VERSION
master Ready control-plane,master 24m v1.20.4
node1 Ready <none> 101s v1.20.4

5.直接布署nginx

1
2
3
4
5
6
7
8
9
10
11
js复制代码kubectl create deployment nginx --image=nginx
[root@master ~]# kubectl expose deployment nginx --port=80 --type=NodePort
service/nginx exposed
kubectl get pod,svc
NAME READY STATUS RESTARTS AGE
pod/nginx-6799fc88d8-bt5n6 1/1 Running 0 5m32s

curl 127.0.0.1:32636
//快速扩容为3个副本
[root@master ~]# kubectl scale deployment nginx --replicas=3
deployment.apps/nginx scaled

6.通过yaml布署mysql

6.1 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
yaml复制代码apiVersion: v1
kind: ReplicationController
metadata:
name: mysql
spec:
replicas: 1 #Pod副本的期待数量
selector:
app: mysql #符合目标的Pod拥有此标签
template: #根据此模板创建Pod的副本(实例)
metadata:
labels:
app: mysql #Pod副本拥有的标签,对应RC的Selector
spec:
containers: #Pod内容器的定义部分
- name: mysql #容器的名称
image: mysql #容器对应的Docker image
ports:
- containerPort: 3306 #容器应用监听的端口号
env: #注入容器内的环境变量
- name: MYSQL_ROOT_PASSWORD
value: "123456"

6.2 创建POD

1
2
3
4
5
6
js复制代码kubectl create -f mysql-rc.yaml
replicationcontroller/mysql created

kubectl get pods
NAME READY STATUS RESTARTS AGE
mysql 1/1 Running 0 5m56s

6.3 查看状态

1
js复制代码kubectl describe pod mysql

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

操作手册 AD 及 LDAP 操作

发表于 2021-03-28

总文档 :文章目录

Github : github.com/black-ant

操作手册系列主要记录平时积累的无体系的操作笔记 , 这一篇主要是 LDAP (AD) 的相关笔记 .

前言

LDAP 是 一个 轻量目录协议 ,全名 Lightweight Directory Access Protocol , 他用于发布目录信息到许多不同资源的协议,LDAP 类似于一个集中的地址本 ,类似于一个电话簿

而本篇会涉及到的是 OpenLDAP 和 Windows AD , 他们两者都是 LDAP 协议的实现类 .

一 . LDAP 基础

特点 :

  • LDAP 支持TCP / IP
  • LDAP 可以说一个特殊的数据库 , LDAP 实现了数据结构的存储
  • LDAP 采用树状结构
  • LDAP 对查询进行了优化 , 他的读性能更加优秀
  • LDAP是一种开放Internet标准,LDAP协议是跨平台的Interent协议
  • 通过推和拉来复制技术 ,允许使用ACI ( 访问权限的控制 )
  • LDAP 协议是开发的标准协议
  • LDAP支持强认证方式

LDAP 服务器

  • LDAP 采用 Client/server模型
  • LDAP 服务是 由 目录数据库 和 一套访问协议组成的系统
  • LDAP 服务器用来处理查询和更新LDAP 目录

二 . LDAP 属性

关键组成

  • dc : 一条记录所属区域 ( 哪一棵树 ) ,域名组件 ,在最顶层
  • dc -> Domain Component
  • dn (distinguished Name 唯一标识名): 一条记录的详细位置
  • rdn : 类似于相对路劲 : CN=张三
  • rdn -> Relative Distinguished Name
  • Base DN : LDAP目录树的最顶部,即根
  • GUID : 全局唯一标识 ,GUID 是一个 128位 数值
  • UPN : 用户主体名称 ,比DN 更加短的标识路径
  • UPN : zhangsan@moonxy.com

Attribute 默认属性:

  • cn common Name: 姓名
  • sn sur name: 姓
  • ou organizational Unit Name: 单位(部门)名称 ,所属组织
  • o organization: 组织-公司
  • c countryName: 国家
  • dc domainComponent : 域名
  • telephoneNumber : 电话号码
  • objectClass : 内置属性

结构

  • dc -> ou -> cn
  • dc是一个域名 ,一颗数 ,树下有很多 ou 组织 ,组织下拥有 cn
  • n 是路劲 ,例如 :CN=张三,OU=Web前端组,OU=软件开发部,DC=moonxy,DC=com
  • n 相对路劲 : CN=张三 、OU=Web前端组

基本概念

  • Entry : 条目 ,记录项 ,是最基础的记录单元 (dn + rdn + Base DN)
  • Attribute :属性 ,每个条目都有属性 (名称-值)()
  • ObjectClass : 对象类是属性的集合,对象类有三种类型 ,结构类型(Structural)、抽象类型(Abstract)和辅助类型(Auxiliary)
  • Schema : 模式 ,对象类的集合
  • backend & database : backend 用于操作 ,database 用于存储

三 . LDAP 实现之一 Windows AD

Windows AD 简介

Active Directory 域内的 directory database(目录数据库)被用来存储用户账户、计算机账户、打印机和共享文件夹等对象,而提供目录服务的组件就是 Active Directory (活动目录)域服务(Active Directory Domain Service,AD DS)

在AD 域服务中 ,AD 就是一个命名空间 ,利用AD ,我们可以通过对象的名称来找到和这个对象有关的所有信息

image.png

AD 主要成员 :
AD 域对象和属性 : AD 域内的资源以对象(Object )的形式存在 ,通过Attriburte 来描述其特征,可以说对象本身就是属性的集合
DC :域控制器( Domain Controller ) : 域内可以有多台域控制器,每台域控制器地位平等 ,各自存储着一份相同的Active Directory
管理工具 : ctive Directory 用户和计算机 + Active Directory 管理中心

AD 与 LDAP 直观区别

  • AD : Active Directory : AD 是 windows 的一种服务 ,用于存储 Windows 网络中的用户账号 ,组 ,计算机
  • Active Directory = LDAP服务器+LDAP应用(Windows域控) ,AD 可以说是LDPA 的 一种应用实例 , 通过 LDAP 协议 访问AD
  • Active Directory先实现一个LDAP服务器,然后自己先用这个LDAP服务器实现了自己的一个具体应用(域控)
  • 简单点说 ,就是通过 LDAP 协议将 数据写入 AD Server
  • 通过 AD 的 目录结构 来存储账号是合理的

四 . 操作指南

代码是基于 Windows AD 2012 进行实操 , LDAP 需要自行兼容 . 当然 ,以下代码使用的是 javax.naming , 是比较偏底层的操作方式 , 也可以选择 SpringLDAP , 用的也比较舒服.

!!!! 更详细的操作可以参考 net.tirasa.connid.bundles.ldap 包 , 很多操作最开始都是从该包学习的

Node 1 : 创建 Connect

Naming 包中通过 LdapContext 对象实现与 LDAP 的连接 , 其中 SSL 使用 ldaps://636 连接 , 非SSL 使用 ldap://389 进行连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
java复制代码
private static final String LDAP_CTX_FACTORY = "com.sun.jndi.ldap.LdapCtxFactory";
public static final String CONNECT_TIMEOUT_ENV_PROP = "com.sun.jndi.ldap.connect.timeout";
public static final String READ_TIMEOUT_ENV_PROP = "com.sun.jndi.ldap.read.timeout";

public LdapContext createLdapContext() {
final java.util.Hashtable<Object, Object> env = new java.util.Hashtable<Object, Object>();

// 定义 LDAP 工厂类
env.put(Context.INITIAL_CONTEXT_FACTORY, LDAP_CTX_FACTORY);

// 构建LDAP 访问地址
env.put(Context.PROVIDER_URL, getLdapUrls());
env.put(Context.REFERRAL, "follow");

// 定义超时时间
env.put(CONNECT_TIMEOUT_ENV_PROP,Long.toString(config.getConnectTimeout()));
env.put(READ_TIMEOUT_ENV_PROP, Long.toString(config.getReadTimeout()));

// 开启SSL / 信任密钥
if (config.isSsl()) {
env.put(Context.SECURITY_PROTOCOL, "ssl");
}
env.put(LDAP_CTX_SOCKET_FACTORY, TrustAllSocketFactory.class.getName());

env.put(LDAP_BINARY_ATTRIBUTE,
SDDL_ATTR + " " + OBJECTGUID + " " + OBJECTSID);
// 访问当时 账号
env.put(Context.SECURITY_AUTHENTICATION, "simple");
env.put(Context.SECURITY_PRINCIPAL, "administrator@antblack");

// 密码
env.put(Context.SECURITY_CREDENTIALS, "123456");
LdapContext context = null;
try {
context = new InitialLdapContext(env, null);
} catch (NamingException e) {
e.printStackTrace();
}

return context;
}

private String getLdapUrls() {
StringBuilder builder = new StringBuilder();
builder.append("ldap://");
builder.append(config.getHost());
builder.append(':');
builder.append(config.getPort());
for (String failover : LdapUtil.nullAsEmpty(config.getFailover())) {
builder.append(' ');
builder.append(failover);
}
return builder.toString();
}

Node 2 : 属性构建操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
java复制代码
// Step 1 : 构建属性 ,Naming 包通过 Attribute 管理属性
final BasicAttributes ldapAttrs = new BasicAttributes(true);
dapAttrs.put(ldapAttr);


// step 3 : 创建对象
Context context = ctx.createSubcontext("OU=研发0219,DC=antblack,DC=com,DC=cn", adAttrs);


// 构建属性
public BasicAttributes getAttriutes() {
BasicAttributes adAttrs = new BasicAttributes(true);
adAttrs.put(getAttribute("description", "test"));

BasicAttribute objectClass = new BasicAttribute("objectClass");
for (String ldapClass : orgClass) {
objectClass.add(ldapClass);
}
adAttrs.put(objectClass);
return adAttrs;
}

public BasicAttribute getAttribute(String key, String value) {
return new BasicAttribute(key, value);
}

// ORG
private Set<String> orgClass = new TreeSet<>();
orgClass.add("organizationalUnit");
orgClass.add("top")

// GROUP
orgClass.add("group");
orgClass.add("top");

// Person
orgClass.add("organizationalPerson");
orgClass.add("top");
orgClass.add("person");
orgClass.add("user");

Node 3 : 构建 GUID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
JAVA复制代码/**
* 转换为 GUID
*/
public static String exchangeGUID(String guid) {
return Hex.getEscaped(GUID.getGuidAsByteArray(guid));
}

/**
* entryDN 转换为 UID
*/
public String entryExchangeGUID(final String entryDN) throws NamingException {
return GUID.getGuidAsString((byte[]) getEntryID(entryDN).get());
}


// 通常都是取 entryUUID , 但是也可以通过类型定制 ,如下
/**
* The LDAP attribute to map Uid to.
*/
private String uidAttribute = "entryUUID";

/**
* The LDAP attribute to map Gid to.
*/
private String gidAttribute = "entryUUID";

if (oclass.equals(ObjectClass.GROUP)) {
clazz = oclass;
idAttribute = conn.getConfiguration().getGidAttribute();
} else if (oclass.equals(ObjectClass.ACCOUNT)) {
clazz = oclass;
idAttribute = conn.getConfiguration().getUidAttribute();
} else {
clazz = ObjectClass.ALL;
idAttribute = null;
}

Node 4 : AD 查询

LDAP 有一套完整的查询语句 , 以下举例
@ www.ietf.org/rfc/rfc2254…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
java复制代码// 查询所有
(objectClass=*)

// objectGUID 查询
(&(objectClass=*)(objectGUID=c92131cee-dd23-445-9967-c462123455667))
(objectGUID=l\D7\ABTP\14\0CL\B5\A9\3E\E60\F0\90\29)
// GUID 要转换格式 16 进制

// cn 查询
(&(objectClass=*)(cn=O组织U125))

// sAMAccountName
(&(objectClass=*)(sAMAccountName=O组织U125))

// syncSearch
(|(&(objectClass=user))(objectClass=group)(&(isDeleted=FALSE)(objectClass=user)))




// Java 查询方式
public void search(String info, String baseOUName) throws NamingException {

// step 1 : 搜索根路径 --> baseDN
baseOUName = "OU=" + baseOUName + ",DC=antblack,DC=com,DC=cn";

// step 2 : 定义搜索范围
SearchControls searchCtls = new SearchControls();
searchCtls.setSearchScope(SearchControls.SUBTREE_SCOPE);

// step 3 : 准备搜索语句
String searchFilter = ADSearchUtils.getNativeFilter(ADSearchType.EQUALS, "name",
info, ObjectClass.ORGANIZATION);

// step 4 : 搜索
try {
final NamingEnumeration<SearchResult> results =
ctx.search(baseOUName, searchFilter, searchCtls);

// step 5 : 处理结果
Set backMap = new HashSet();
while (results.hasMoreElements()) {
SearchResult sr = (SearchResult) results.next();
logger.info("------> this is result :{} <-------", sr.getAttributes().get("name"));
backMap.add(sr.getAttributes());
}

logger.info("------> this back is :{} <-------", backMap.size());
} catch (NamingException e) {
logger.error("E----> error :{} -- content :{}", e.getClass() + e.getMessage(), e);
throw e;
}
}


// 标识符类型
filter = "(" filtercomp ")"
filtercomp = and / or / not / item
and = "&" filterlist
or = "|" filterlist
not = "!" filter
filterlist = 1*filter
item = simple / present / substring / extensible
simple = attr filtertype value
filtertype = equal / approx / greater / less
equal = "="
approx = "~="
greater = ">="
less = "<="
extensible = attr [":dn"] [":" matchingrule] ":=" value
/ [":dn"] ":" matchingrule ":=" value
present = attr "=*"
substring = attr "=" [initial] any [final]
initial = value
any = "*" *(value "*")
final = value
attr = AttributeDescription from Section 4.1.5 of [1]
matchingrule = MatchingRuleId from Section 4.1.9 of [1]
value = AttributeValue from Section 4.1.6 of [1]


// 查询操作可参考文档



// 注意点 :
1 搜索的时候同样要考虑特殊字符 , 可以使用 ASCII 码替换 , 例如 (cn=*\2a*)
(o=Parens R Us \28for all your parenthetical needs\29)
(cn=*\2A*)
(filename=C:\5cMyFile)
(bin=\00\00\00\04)
(sn=Lu\c4\8di\c4\87)

Node 5 : 其他操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码// 修改属性
public final static int ADD_ATTRIBUTE = 1;
public final static int REPLACE_ATTRIBUTE = 2;
public final static int REMOVE_ATTRIBUTE = 3;

modifyAttributes(entryDN, attrToModify, DirContext.REPLACE_ATTRIBUTE);
private void modifyAttributes(final String entryDN, final List<ModificationItem> modItems) {
try {
conn.getInitialContext().modifyAttributes(entryDN, modItems.toArray(new ModificationItem[modItems.size()]));
} catch (NamingException e) {
throw new ConnectorException(e);
}
}

// 重命名/修改路径
LdapContext ctx = conn.getInitialContext().newInstance(null);
ctx.addToEnvironment("java.naming.ldap.deleteRDN", deleteOldRdn);
ctx.rename(oldName, newName);

// 组操作
组操作通过 member / memberOf 属性进行操作

五 . 特殊操作

5.1 User 权限操作

1
2
java复制代码// 通过在 BasicAttributes 中添加 userAccountControl 属性实现设置用户的权限
public static final String UACCONTROL_ATTR = "userAccountControl";

权限的设置并不是任意的 , 部分权限只能由特定的子权限跳上去 ,AD 提供了如下权限 :

image.png

需要注意的是 , AD 权限的互转不是任意的 , 下图前四个权限只有指定的权限才能转过去 , 其他的权限可以互转!!

image.png

5.2 Group 权限操作

1
2
java复制代码// 通过在 BasicAttributes 中添加 groupType 属性实现设置组的权限
public static final String LDAP_GROUP_TYPE = "groupType";

image.png

5.3 国籍操作

1
2
3
4
5
6
7
8
java复制代码// AD 中有多个属性来控制国际
public static final String COUNTRY = "c";
public static final String COUNTRY_NAME = "co";
public static final String COUNTRY_CODE = "countryCode";

BasicAttribute c = new BasicAttribute(COUNTRY,countryCode.getCountrySign());
BasicAttribute co = new BasicAttribute(COUNTRY_NAME,countryCode.getCountryName());
BasicAttribute code = new BasicAttribute(COUNTRY_CODE,countryCode.getCode());

image.png

5.4 连接池问题

参考文档 @ docs.oracle.com/javase/jndi…

该文档中对连接池做了很详细的描述

1
2
3
4
5
6
java复制代码// 常规配置连接池的方式 :
env.put("com.sun.jndi.ldap.connect.pool","true");
env.put("com.sun.jndi.ldap.connect.pool.authentication", "simple");
env.put("com.sun.jndi.ldap.connect.pool.maxsize", "3");
env.put("com.sun.jndi.ldap.connect.pool.prefsize", "1");
env.put("com.sun.jndi.ldap.connect.pool.timeout", "300000");

The default rule is that plain (non-SSL) connections that use simple or no authentication are allowed to be pooled
即连接池可能存在不会生效的可能 , 这个时候需要考虑修改配置

com.sun.jndi.ldap.connect.pool.protocol: 同时允许使用普通连接和SSL连接
com.sun.jndi.ldap.connect.pool.authentication : (none/simple/DIGEST-MD5)允许使用匿名(none)、简单和摘要- md5认证类型的连接池 , 需要配置

image.png

5.5 AD 特殊字符

AD 中可以通过ASCII 码传入特殊字符 ,例如可以将名称 , 但是要注意的是 = 或者类似的符号 ,在 AD 中本身就存在 , 他在生成的时候 , AD会自动添加 / 转义 , 所以对于我们使用的时候 , 就需要把自动转义的加上!!

image.png

总结

大概整理了一点皮毛 ,因为不想跑虚拟机 , 很多暂时没录上 , 后续如果有机会会考虑放上去 (太小众了 , 也不知道有没有人看)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Golang 并发编程实践 Go主题月

发表于 2021-03-27

人是一种高并发的物种,细品。

初识

对 Go 语言的第一印象就是其原生地支持并发编程,而且使用的是协程,比线程更加轻量。

ybk93cwmg3y51.jpg

关于进程、线程和协程的区别

  • 进程是“程序执行的一个实例” ,担当分配系统资源的实体。进程创建必须分配一个完整的独立地址空间。进程切换只发生在内核态。
  • 线程:线程是进程的一个执行流,独立执行它自己的程序代码,是程序执行流的最小单元,是处理器调度和分派的基本单位。一个进程可以有一个或多个线程。
  • 协程:协程不是进程或线程,其执行过程更类似于子例程,或者说不带返回值的函数调用。在语言级别可以创建并发协程,然后编写代码去进行管理。Go 将这一步承包下来,使协程并发运行成本更低。

a.jpeg

Go 实现最简单的并发

1
2
3
4
5
go复制代码for i := 0; i < 10; i++ {
go func(n int) {
fmt.Println(n)
}(i)
}

项目实践

最近有个项目需要同时调用多个 job,并要等待这些 job 完成之后才能往下执行。

串行执行 job

最开始,我们拥有一个执行 job 的方法,并且串行执行所有的 job:

1
2
3
4
5
6
7
go复制代码func buildJob(name string) {
...
}

buildJob("A")
buildJob("B")
buildJob("C")

并行执行 job

因为所有 job 是可以并发执行的,这样就不用必须等待上一个 job 执行完成后,才能继续执行其他 job。我们可以使用 Go 语言的关键字 go 来快速启用一个 goroutine,下面我们将并发地执行三个 job:

1
2
3
go复制代码go buildJob("A")
go buildJob("B")
go buildJob("C")

等待所有 job 完成

怎样才能知道每个 job 是否已经完成,这里可以使用 channel 进行通信,并使用 select 检查执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
go复制代码func buildJob(ch chan error, name string) {
var err error

... // build job

ch <- err // finnaly, send the result into channel
}

func build() error {
jobCount := 3
errCh := make(err chan error, jobCount)
defer close(errCh) // 关闭 channel

go buildJob(errCh, "A")
go buildJob(errCh, "B")
go buildJob(errCh, "C")

for {
select {
case err := <-ch:
if err != nil {
return err
}
}

jobCount--
if jobCount <= 0 {
break
}
}

return nil
}

发现问题

当 job A 执行失败时,build 方法会 return err 退出,并执行 close(errCh)。可是此时另外两个 job B 和 C 可能还没执行完成,同时也会把结果发给 errCh,但由于这个时候 errCh 已经被关闭了,会导致程序退出 panic: send on closed channel。

优化代码

在给 channel 发送数据的时候,可以使用接收数据的第二个值判断 channel 是否关闭:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
go复制代码func buildJob(ch chan error, name string) {
var err error

... // build job

if _, ok := <-ch; !ok {
return
}
ch <- err // finnaly, send the result into channel
}

func build() error {
jobCount := 3
errCh := make(err chan error, jobCount)
defer close(errCh) // 关闭 channel

go buildJob(errCh, "A")
go buildJob(errCh, "B")
go buildJob(errCh, "C")

for {
select {
case err := <-ch:
if err != nil {
return err
}
}

jobCount--
if jobCount <= 0 {
break
}
}

return nil
}

总结

Go 并发编程看似只需要一个关键字 go 就可以跑起来一个 goroutine,但真正实践中,还是有需要问题需要去处理的。

原文链接:k8scat.com/posts/go/co…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

盘点 MQ 消息队列总览 一 前言 二 架构梳理

发表于 2021-03-27

总文档 :文章目录

Github : github.com/black-ant

开新章了 , 后续会用 4-5 个篇幅 , 来聊一聊 MQ 消息队列 , 也不知道能不能聊清楚 ..

这一篇没有太多的技术深度 , 主要是 MQ 的 总览 ,如果能把MQ说清楚 , 这一篇的目的也就达到了

一 . 前言

MQ 的框架有很多 , 很难每个都讲清楚 , 这里只主要说说以下几种 :

  • RabbitMQ :
  • Kafka
  • RocketMQ
  • TubeMQ
  • ZeroMQ
  • ActiveMQ

二 . 架构梳理

2.1 前置知识点 : JMS

JMS 是很多 MQ 框架的基础 , JMS 定义了一组接口和语义,允许 Java 应用程序与其他消息传递实现进行通信 , 它即指明了规范 , 也定义了实现结构 .

JMS 基础 :

JMS 有以下几个重要的作用和目的 :

  • 解决 RPC 系统的紧耦合的问题
  • 使开发异步发送和接收业务数据和事件的业务应用程序变得容易
  • 可以被各种各样的企业消息传递产品轻松有效地支持
  • 相对较低层次的抽象,运行在互补层之下
  • 定义一组接口和语义,允许 Java 应用程序与其他消息传递实现进行通信

JMS 成员组成 :

  • JMS provider : 实现JMS规范的消息传递系统。
  • JMS clients : 发送和接收消息的 Java 应用程序
  • Messages : 用于在 JMS 客户机之间传递信息的对象
  • Administered objects : 由管理员为使用 JMS 客户机而创建的预配置 JMS 对象

消息传递模型 :

JMS 中定义了2种消息传递的方式

  • 点对点(Queue destination) :

在此模型中,消息从生产者传递到一个使用者。消息被传递到目的地(一个队列) ,然后传递到为该队列注册的消费者之一。虽然任意数量的生产者都可以向队列发送消息,但是每个消息都保证由一个使用者传递和使用。如果没有注册消费者使用消息,队列将保存消息,直到消费者注册使用消息为止

  • 发布/订阅(主题目的地) :

在这个模型中,消息从生产者传递到任意数量的消费者。消息被传递到主题目的地,然后传递到所有主题。此外,任意数量的生产者都可以向主题目的地发送消息,每个消息都可以传递给任意数量的订阅者。如果没有注册使用者,主题目的地将不包含消息,除非它为非活动使用者提供了持久订阅。持久订阅表示在主题目的地注册的使用者,该使用者在消息发送到主题时可以处于非活动状态

JMS 消息组成 :

消息由三部分组成: header、properties 和 body

header :

每条消息都需要消息头,它包含用于路由和标识消息的信息。其中一些字段由 JMS 提供者在生成和传递消息期间自动设置,其他字段由客户机根据消息设置

属性(属性是可选的) :

提供客户端可用于筛选消息的值。它们提供有关数据的附加信息,例如创建数据的进程、创建数据的时间。可以将属性视为头的扩展,并由属性名/值对组成。通过使用属性,客户机可以通过指定作为选择条件的某些值来微调它们对消息的选择

主体(也是可选的)包含要交换的实际数据。JMS 规范定义了 JMS 提供者必须支持的六种类型的消息:

  • Message : 表示没有消息正文的消息
  • StreamMessage : 一种消息,其主体包含 Java 基本类型流。它是按顺序编写和读取的
  • MapMessage : 消息的正文包含一组名称/值对。没有定义条目的顺序
  • TextMessage : 消息体包含一个Java字符串
  • ObjectMessage : 消息的主体包含序列化的 Java 对象
  • BytesMessage : 主体包含未解释字节流的消息

2.2 RabbitMQ

RabbitMq 基于 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)实现 , 他有如下的特点 :

  • 支持许多消息传递协议,如 AMQP、 MQTT、 STOMP,因此也称为混合代理。
  • 支持几种发布-订阅、点对点、请求-回复消息传递技术。
  • 使用了智能 broker/dumb 消费者模型,并致力于一致地向消费者传递消息。
  • 支持 Java、 Ruby、。NET、 PHP 和许多其他语言,并提供了一些插件,可以添加这些插件来扩展用例和集成场景。
  • 提供了同步和异步通信模式。

RabbitMQ 主要概念 :

  • 虚拟主机 : 一个虚拟主机持有一组消息交换机、队列和绑定
  • 消息 : 消息不具名 , 由消息头和消息体组成
  • 绑定 : 交换机需要和队列绑定
  • 信道 : 多路复用连接中的一条独立的双向数据流通道。
  • 交换机 : Exchange用于转发消息,但它不会做存储

ExchangeType

  • fanout: 会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中
  • direct : direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue
  • topic : 将消息路由到binding key与routing key相匹配的Queue中 , 同时可以使用通配符 : 比如:“*” 匹配特定位置的任意文本,“.” 把路由键分为了几部分,“#” 匹配所有规则等
  • headers : 根据消息内容中的header 属性

image.png

RabbitMQ 特点 :
1、可靠性(Reliability)

RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

2、灵活的路由(Flexible Routing)

在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

3、消息集群(Clustering)

多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

4、高可用(Highly Available Queues)

队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

5、多种协议(Multi-protocol)

RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

6、多语言客户端(Many Clients)

RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。

7、管理界面(Management UI)

RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

8、跟踪机制(Tracing)

如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

9、插件机制(Plugin System)

RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

RabbitMQ 流程 :

Rabbit001.jpg

2.3 Kafka

Apache Kafka 是一种分布式数据存储,经过优化以实时提取和处理流数据。流数据是指由数千个数据源持续生成的数据,通常可同时发送数据记录。流平台需要处理这些持续流入的数据,按照顺序逐步处理。

Kafka 为其用户提供三项主要功能

  • 发布和订阅记录流
  • 按照记录的生成顺序高效地存储记录流
  • 实时处理记录流

Kafak 的成员 :

  • Producer : 根据 partition 方法 ,将消息发布到指定的Topic 的 partition 里面 。
  • **kafka 集群 ** 接收到producer 发过来的消息后 ,将其持久化到硬盘
  • Consumer : 从 kafka 集群 pull 消息 ,并且控制获取消息的offset
  • Topic : kafka 处理的消息源的不同分类
  • partition : Topic 物理上的分组 ,一个Topic 可以分为多个 Partition ,每个 Partition 都是一个有序的队列 ,partition 每条消息都会分配一个有序的ID(offset)
  • replicas : Partition 的副本集
  • leader : replicas 中的角色,producer 和 consumer 只跟Leader 交互
  • follower : replicas 中的一个角色 ,从 leader 中复制数据 ,follower 是Leader 的备选者
  • Message : 消息 ,通信的基本单元 ,每个 Producer 可以向一个 Topic 发布一些消息
  • Consumer group : 一个消息可以被多个Group 中的一个 consumer 消费

Kafka 流程 :

kafka001.jpg

Pub Sub 流程:

  1. 生产者定期向主题发送消息。
  2. Kafka代理存储为该特定主题配置的分区中的所有消息。 它确保消息在分区之间平等共享。 如果生产者发送两个消息并且有两个分区,Kafka将在第一分区中存储一个消息,在第二分区中存储第二消息。
  3. 消费者订阅特定主题。
  4. 一旦消费者订阅主题,Kafka将向消费者提供主题的当前偏移,并且还将偏移保存在Zookeeper系综中。
  5. 消费者将定期请求Kafka(如100 Ms)新消息。
  6. 一旦Kafka收到来自生产者的消息,它将这些消息转发给消费者。
  7. 消费者将收到消息并进行处理。
  8. 一旦消息被处理,消费者将向Kafka代理发送确认。
  9. 一旦Kafka收到确认,它将偏移更改为新值,并在Zookeeper中更新它。 由于偏移在Zookeeper中维护,消费者可以正确地读取下一封邮件,即使在服务器暴力期间。
  10. 以上流程将重复,直到消费者停止请求。
  11. 消费者可以随时回退/跳到所需的主题偏移量,并阅读所有后续消息。

订阅流程 :

  1. 生产者以固定间隔向某个主题发送消息。
  2. Kafka存储在为该特定主题配置的分区中的所有消息,类似于前面的方案。
  3. 单个消费者订阅特定主题,假设 Topic-01 为 Group ID 为 Group-1 。
  4. Kafka以与发布 - 订阅消息相同的方式与消费者交互,直到新消费者以相同的组ID 订阅相同主题 Topic-01
  5. 一旦新消费者到达,Kafka将其操作切换到共享模式,并在两个消费者之间共享数据。 此共享将继续,直到用户数达到为该特定主题配置的分区数。
  6. 一旦消费者的数量超过分区的数量,新消费者将不会接收任何进一步的消息,直到现有消费者取消订阅任何一个消费者。 出现这种情况是因为Kafka中的每个消费者将被分配至少一个分区,并且一旦所有分区被分配给现有消费者,新消费者将必须等待。

Kafka 整体架构 :

image.png

Kafka 对比 RabbitMQ

image.png

2.4 RocketMQ

Apache RocketMQ是一个统一的消息传递引擎,轻量级数据处理平台

RocketMQ 成员 :

  • 生产者(Producer): 负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。
  • 消费者(Consumer): 负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。
  • 消息服务器(Broker): 是消息存储中心,主要作用是接收来自 Producer 的消息并存储, Consumer 从这里取得消息。
  • 名称服务器(NameServer): 用来保存 Broker 相关 Topic 等元信息并给 Producer ,提供 Consumer 查找 Broker 信息。

RocketMQ 流程

1、启动 Namesrv

Namesrv起 来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。

2、Broker 启动

Broker 跟所有的 Namesrv 保持长连接,定时发送心跳包。
心跳包中,包含当前 Broker 信息(IP+端口等)以及存储所有 Topic 信息。
注册成功后,Namesrv 集群中就有 Topic 跟 Broker 的映射关系。

3、收发消息前,先创建 Topic

创建 Topic 时,需要指定该 Topic 要存储在 哪些 Broker上。也可以在发送消息时自动创建Topic。

4、Producer 发送消息

启动时,先跟 Namesrv 集群中的其中一台建立长连接,并从Namesrv 中获取当前发送的 Topic 存在哪些 Broker 上,然后跟对应的 Broker 建立长连接,直接向 Broker 发消息。

5、Consumer 消费消息

Consumer 跟 Producer 类似。跟其中一台 Namesrv 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

Rocket02.jpg

这张图来自于自身理解 , 不确定有没有问题….

image.png

对比其他消息队列 (PS : 来着 RocketMQ 官方)

RocketMQ.jpg

2.5 ActiveMQ

Apache ActiveMQ 是最流行的开源、多协议、基于 java 的消息服务器。它支持行业标准协议,

Apache ActiveMQ 速度很快,支持许多跨语言客户机和协议,具有易于使用的企业集成模式和许多高级特性,同时完全支持 JMS 1.1和 J2EE 1.4。Apache ActiveMQ 是在 Apache 2.0许可协议下发布的

Apache ActiveMQ 代理是 Java 消息服务(Java Messaging Service,JMS)的实现。JMS 是一种 Java 规范,它允许应用程序以简单和标准的方式在彼此之间来回发送数据。

ActiveMQ 是一个 JMS 提供者。JMS 提供者形成了一个软件框架,用于促进在应用程序中使用 JMS 概念。ActiveMQ 的一个节点允许客户端连接到它并使用这些消息传递概念,这个节点称为“ ActiveMQ Broker”

ActiveMQ 特点 :

1
2
3
4
5
6
7
8
9
10
11
java复制代码1 支持多种跨语言协议 :  Java, C, C++, C#, Ruby, Perl, Python, PHP
2 支持企业集成方案 , 可集成在 在 JMS 客户机和 Message Broker 中
3 支持许多 例如 信息组, 虚拟目的地, 及 综合目的地
4 完全支持 JMS 1.1和 J2EE 1.4,支持瞬态、持久、事务和 XA 消息传递
5 支持 Spring
6 支持常见的 J2EE 服务器
7 支持热插拔
8 支持高性能日志
9 支持高性能集群
10 提供Web API
11 允许 web 浏览器成为消息传递结构的一部分

ActiveMQ 模式 :
ActiveMQ 基于 JMS 实现 ,所以它支持 JMS 带来的功能 :

P2P (点对点) 消息域使用 queue 作为 Destination,消息可以被同步或异步的发送和接收,每个消息只会给一个 Consumer 传送一次。

Pub/Sub(发布/订阅,Publish/Subscribe) 消息域使用 topic 作为 Destination,发布者向 topic 发送消息,订阅者注册接收来自 topic 的消息。发送到 topic 的任何消息都将自动传递给所有订阅者。接收方式(同步和异步)与 P2P 域相同。

ActiveMQ 工作方式

一旦消息进入系统,它们就被安排成两种模式: 队列和主题。队列是由代理和客户端生成和使用的消息的 FIFO (先进先出)管道。生产者创建消息并将其推送到这些队列中。然后消费者应用程序轮询和收集这些消息,每次一条消息。主题是基于订阅的消息广播通道。当生产应用程序发送消息时,“订阅”该主题的多个收件人将接收消息的广播。这个生成应用程序在主题消息上下文中有时被称为发布者,

ActiveMQ 流程 :

image.png

这里涉及到确认消费的相关逻辑 :

  • (1)、客户接收消息;
  • (2)、客户处理消息;
  • (3)、消息被确认;

总共有四种确认机制 :

Session.AUTO_ACKNOWLEDGE : 客户(消费者)成功从receive方法返回时,或者从MessageListener.onMessage方法成功返回时,会话自动确认消息,然后自动删除消息.

Session.CLIENT_ACKNOWLEDGE : 客户通过显式调用消息的acknowledge方法确认消息,。 即在接收端调用message.acknowledge();方法,否则,消息是不会被删除的.

Session. DUPS_OK_ACKNOWLEDGE : 不是必须确认,是一种“懒散的”消息确认,消息可能会重复发送,在第二次重新传送消息时,消息头的JMSRedelivered会被置为true标识当前消息已经传送过一次,客户端需要进行消息的重复处理控制。

Session.SESSION_TRANSACTED : 事务提交并确认。

ActiveMQ 和 Rabbit MQ 对比

基于版本———- > :

  • ActiveMQ 是一个基于 Java 消息服务客户机的开源消息代理,
  • RabbitMQ 是在 Advanced Message Queueing 协议上实现的。

工作方式———- > :

RabbitMQ 基于中心工作,这使得它成为一种独特的方法。RabbitMQ 是非常便携和用户友好的。因为负载平衡或持久消息队列等大型操作只在有限的代码行上运行。但是这种方法的可伸缩性较差,速度也较慢,因为其延迟是从中心节点和消息信封的大小添加的。ActiveMQ 更容易实现,并提供了高级特性,如集群、缓存、日志记录和消息存储。

RabbitMQ 嵌入到应用程序中,并充当中途服务。它区分了支持加密、在发生中断时将数据存储在磁盘中作为预先计划、组成集群、重复服务以具有高可靠性。它部署在 OTP 平台上,以确保作为整个系统关键节点的队列的最大可伸缩性和稳定性。

实现方式———- > :
ActiveMQ 由 Java 消息服务客户端组成,该客户端能够支持多个客户端或服务器。实现了 RabbitMQ 来设计高级消息队列协议。它被扩展到支持不同的协议,如 MQTT 和 STOMP。

其他优点———- >:

RabbitQ 支持多个消息传递协议,提供确认和消息队列。它可以通过各种语言启用,比如 Python、。和 Java。它还可以让开发人员使用 Chef、 Docker 和 Puppet 等应用程序。它通过开发可能的集群来提供高吞吐量和高可用性。通过可插式身份验证和授权的支持,它可以轻松地处理公共和私有云。HTTP-API 是一个命令行工具,其用户界面有助于管理和监视 RabbitMQ。

ActiveMQ 具有多种优点,可以根据需要应用,效率高。它支持 c,c + + ,。NET 和 Python,可以通过高级消息队列协议嵌入多平台应用程序。它可以通过面向流文本的消息协议 STOMP 在 web 应用程序之间灵活地交换消息。它还编程管理物联网设备。

2.6 ZeroMQ

这个 MQ 就厉害了 ,注意看 , 有很大区别 :

ZeroMQ 基础

ZeroMQ (也称为 ømq、0MQ 或 ZMQ)是一个高性能的异步消息传递库(不是个框架),旨在用于分布式或并发应用程序。它提供了一个消息队列,但是与面向消息的中间件不同的是,ZeroMQ 系统可以在没有专用消息代理的情况下运行。

需要注意的是 ,Zero MQ 不是一个常规意义上的消息中间件 , 他是传输层 , 是一个库 , 用于实现应用程序和进程之间的消息传递和通信系统——快速和异步 , 他们的目标是 成为标准网络协议栈的一部分

ZeroMQ 通过各种传输(TCP、进程内、进程间、多播、 WebSocket 等)支持常见的消息传递模式(pub/sub、 request/reply、 client/server 等) ,使进程间消息传递变得像线程间消息传递一样简单。这使你的代码清晰,模块化,并且极其容易扩展。

很有趣的地方是 , Zero MQ 根据不同的语言提供了不同的实现 , 例如 Java 在 Git 上面就可以找到一个 JZMQ

ZeroMQ 逻辑架构

  • Sockets : 使用套接字,用户与这些套接字的交互类似于 TCP 套接字,它们之间的区别在于每个套接字都能够处理多个对等通信
  • Worker Thread : 各种对象驻留在工作线程中每个对象都由一个父对象持有(所有权在图中由一条简单的完整线表示)。许多对象由套接字直接持有; 然而,有一些实例表明实体是由套接字所有的对象控制的
  • Listener : TCP 侦听器实体侦听传入的 TCP 连接,并为每个新连接生成引擎/会话对象
  • Session : 它是与 ZeroMQ 套接字通信的会话对象
  • Engine : 引擎对象与网络通信
  • Pipe : 当会话与套接字交换消息时。有两个传递消息的方向 Pipe 对象处理要在其中传递的消息的每个方向。实际上,每个管道都是一个无锁队列,用于在线程之间快速传递消息

ZeroMQ0.jpg

ZeroMQ 自吹

1
2
3
4
5
6
7
8
9
10
java复制代码TCP:ZeroMQ基于消息,使用消息模式而不是字节流。
XMPP:ZeroMQ更简单、快速、更底层。Jabber可建在ØMQ之上。
AMQP:完成相同的工作,ZeroMQ要快100倍,而且不需要代理(规范更简洁——比AMQP的规范文档少278页)
IPC:ZeroMQ可以跨主机通信
CORBA:ZeroMQ不会将复杂到恐怖的消息格式强加于你。
RPC:ZeroMQ完全是异步的,你可以随时增加/删除参与者。
RFC 1149:ZeroMQ比它快多了!
29west LBM:ZeroMQ是自由软件!
IBM Low-latency:ZeroMQ是自由软件!
Tibco:ZeroMQ仍然是自由软件!

ZeroMQ 消息模式

发布订阅

ZeroMQ.jpg

Pull or Push

ZeroMQPull.jpg

异步请求响应

ZeroMQ Request.jpg

ZeroMQ 和 RabbitMQ 区别

image.png

ZeroMQ 对比 Kafka

image.png

2.7 TubeMQ

TubeMQ 是腾讯出品 , 现在已经捐赠给了 Apache , TubeMQ属于万亿级分布式消息中间件,专注于海量数据下的数据传输和存储,在性能、可靠性,和成本方面具有独特的优势 , 官方给出的数据是支持10万亿+的数据访问

TubeMQ 成员架构

tubeMQ.jpg

Portal: 负责对外交互和运维操作的Portal部分,包括API和Web两块,API对接集群之外的管理系统,Web是在API基础上对日常运维功能做的页面封装;

Master: 负责集群控制的Control部分,该部分由1个或多个Master节点组成,Master HA通过Master节点间心跳保活、实时热备切换完成(这是大家使用TubeMQ的Lib时需要填写对应集群所有Master节点地址的原因),主Master负责管理整个集群的状态、资源调度、权限检查、元数据查询等;

Broker: 负责实际数据存储的Store部分,该部分由相互之间独立的Broker节点组成,每个Broker节点对本节点内的Topic集合进行管理,包括Topic的增、删、改、查,Topic内的消息存储、消费、老化、分区扩容、数据消费的offset记录等,集群对外能力,包括Topic数目、吞吐量、容量等,通过水平扩展Broker节点来完成;

Client: 负责数据生产和消费的Client部分,该部分我们以Lib形式对外提供,大家用得最多的是消费端,相比之前,消费端现支持Push、Pull两种数据拉取模式,数据消费行为支持顺序和过滤消费两种。对于Pull消费模式,支持业务通过客户端重置精确offset以支持业务extractly-once消费,同时,消费端新推出跨集群切换免重启的BidConsumer客户端;

Zookeeper: 负责offset存储的zk部分,该部分功能已弱化到仅做offset的持久化存储,考虑到接下来的多节点副本功能该模块暂时保留。

TubeMQ 特点

  • 纯 Java 实现语言
  • 引入 Master 协调节点:相比 Kafka 依赖于 Zookeeper 完成元数据的管理和实现 HA 保障不同,TubeMQ 系统采用的是自管理的元数据仲裁机制方式进行,Master 节点通过采用内嵌数据库 BDB 完成集群内元数据的存储、更新以及 HA 热切功能,负责 TubeMQ 集群的运行管控和配置管理操作,对外提供接口等;通过 Master 节点,TubeMQ 集群里的 Broker 配置设置、变更及查询实现了完整的自动化闭环管理,减轻了系统维护的复杂度
  • 服务器侧消费负载均衡:TubeMQ 采用的是服务侧负载均衡的方案,而不是客户端侧操作,提升系统的管控能力同时简化客户端实现,更便于均衡算法升级
  • 系统行级锁操作:对于 Broker 消息读写中存在中间状态的并发操作采用行级锁,避免重复问题
  • Offset 管理调整:Offset 由各个 Broker 独自管理,ZK 只作数据持久化存储用(最初考虑完全去掉ZK依赖,考虑到后续的功能扩展就暂时保留)
  • 消息读取机制的改进:TubeMQ 采用的是消息随机读取模式,同时为了降低消息时延又增加了内存缓存读写,对于带 SSD 设备的机器,增加消息滞后转 SSD 消费的处理,解决消费严重滞后时吞吐量下降以及 SSD 磁盘容量小、刷盘次数有限的问题,使其满足业务快速生产消费的需求
  • 消费者行为管控:支持通过策略实时动态地控制系统接入的消费者行为,包括系统负载高时对特定业务的限流、暂停消费,动态调整数据拉取的频率等;
  • 服务分级管控:针对系统运维、业务特点、机器负载状态的不同需求,系统支持运维通过策略来动态控制不同消费者的消费行为,比如是否有权限消费、消费时延分级保证、消费限流控制,以及数据拉取频率控制等
  • 系统安全管控:根据业务不同的数据服务需要,以及系统运维安全的考虑,TubeMQ 系统增加了 TLS 传输层加密管道,生产和消费服务的认证、授权,以及针对分布式访问控制的访问令牌管理,满足业务和系统运维在系统安全方面的需求
  • 资源利用率提升改进:相比于 Kafka,TubeMQ 采用连接复用模式,减少连接资源消耗;通过逻辑分区构造,减少系统对文件句柄数的占用,通过服务器端过滤模式,减少网络带宽资源使用率;通过剥离对 Zookeeper 的使用,减少 Zookeeper 的强依赖及瓶颈限制
  • 客户端改进:基于业务使用上的便利性以,我们简化了客户端逻辑,使其做到最小的功能集合,我们采用基于响应消息的接收质量统计算法来自动剔出坏的 Broker 节点,基于首次使用时作连接尝试来避免大数据量发送时发送受阻

TubeMQ 对比

TODO

2.8 DDMQ

DDMQ 是滴滴出行架构部基于 Apache RocketMQ 构建的消息队列产品

DDMQ 结构

ddmq.png

因为是国内开发的 ,文档还是很多的 , 结构上和 RocketMQ 比较类似 , 可以直接看文档 @ www.oschina.net/p/ddmq

2.9 IBM MQ

作用 :

您可以使用IBM®MQ使应用程序能够在不同的时间和许多不同的计算环境中通信。

IBM MQ可以将任何类型的数据作为消息传输,使业务能够构建灵活的、可重用的体系结构,如面向服务的体系结构(SOA)环境。它与广泛的计算平台、应用程序、web服务和通信协议一起工作,用于丰富安全的消息传递。IBM MQ提供了一个通信层,用于可见性和控制组织内外的消息和数据流。

特点

  • 从大型机到移动设备的多功能消息传递集成,为动态异构环境提供单一、健壮的消息传递主干。
  • 具有丰富安全特性的消息传递,可产生可审计的结果。
  • 高性能消息传输,以提高速度和可靠性交付数据。
  • 简化消息管理并减少使用复杂工具所花费的时间的管理功能。
  • 支持可扩展性和业务增长的开放标准开发工具。

PS : IBM MQ 现阶段只是听项目上提到过 ,但是实际上对他的理解还不是很深 , 这里先留个坑 , 详细的可以看 IBM 官网 ,他们的都很完善

三 . 总结

每个框架都会有自己的取舍 , 比如 ZeroMQ 虽然快 ,但是却舍弃了一定的完整性 , 需要合适的选择 .

附上一张拉来的表格 , 表格内容暂时不保证 , 后面还有几篇文档的计划 , 压测的时候再改

image.png

感谢

@ juejin.cn/user/322782…

@ www.ibm.com/

@ rocketmq.apache.org/docs/motiva…

@ www.oschina.net/p/ddmq

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Redis专题-必知必会的基础知识

发表于 2021-03-27

Redis有哪些优缺点?

优点

  • 读写性能优异, Redis能读的速度是110000次/s,写的速度是81000次/s。
  • 支持数据持久化,支持AOF和RDB两种持久化方式。
  • 支持事务,Redis的所有操作都是原子性的,同时Redis还支持对几个操作合并后的原子性执行。
  • 数据结构丰富,除了支持string类型的value外还支持hash、set、zset、list等数据结构。
  • 支持主从复制,主机会自动将数据同步到从机,可以进行读写分离。
    缺点
  • 数据库容量受到物理内存的限制,不能用作海量数据的高性能读写,因此Redis适合的场景主要局限在较小数据量的高性能操作和运算上。
  • Redis 不具备自动容错和恢复功能,主机从机的宕机都会导致前端部分读写请求失败,需要等待机器重启或者手动切换前端的IP才能恢复。
  • 主机宕机,宕机前有部分数据未能及时同步到从机,切换IP后还会引入数据不一致的问题,降低了系统的可用性。
  • Redis 较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。

Redis为什么这么快?

  • 完全基于内存,绝大部分请求是纯粹的内存操作,非常快速。
  • 数据结构简单,读取速度快。比如SDS、双端链表、压缩链表、跳跃链表。
  • 处理网络请求采用单线程,避免了不必要的上下文切换和竞争条件。

Redis 4.0 开始,用异步线程处理一些耗时操作。如,异步线程实现惰性删除(解决大KEY删除阻塞主线程)、异步 AOF (解决磁盘 IO 紧张时,fsync 执行一次很慢)等等。

  • 使用多路 I/O 复用模型,非阻塞 IO。
  • 使用底层模型不同,它们之间底层实现方式以及与客户端之间通信的应用协议不一样,Redis 直接自己构建了 VM 机制 ,因为一般的系统调用系统函数的话,会浪费一定的时间去移动和请求。

Redis中有多少数据库?

Redis服务器默认会创建16个数据库,可通过配置的database选项修改。默认情况下,Redis客户端的目标数据库为0号数据库。

Redis数据结构?

  • 基本数据类型:string、list、set、zset、hash
  • 高级数据结构: Bitmap、HyperLogLog、GEO

Redis Module:BloomFilter、RedisSearch、Redis-ML、JSON。Stream功能(5.0新增)

Redis使用场景

  • 数据缓存
  • 会话缓存
  • 时效性数据
  • 访问频率
  • 计数器
  • 社交列表
  • 记录用户判定信息
  • 交集、并集和差集
  • 热门列表与排行榜
  • 最新动态
  • 消息队列
  • 分布式锁

Redis持久化

Redis 提供了两种方式,实现数据的持久化到硬盘。Redis启动会优先加载AOF文件,当未开启AOF时,才会加载RDB文件(未开启也不会加载)。

RDB 持久化(全量),fork 一个子进程,将内存中指定时间间隔内的数据集快照写入临时文件,成功后,再替换之前的文件,用二进制压缩存储。通过配置save参数、save(阻塞)、bgsave、shutdown、flushall命令触发。

  • 优点:灵活设置备份频率和周期,适合冷备份,数据集的时候易恢复,子进程持久化不会影响主进程IO。
  • 缺点:丢失数据窗口大,当数据集较大时,fork子进程可能会导致整个服务器停止服务几百毫秒,甚至是 1 秒钟

不建议在主 Redis 节点开启 RDB 功能,会带来一定时间的阻塞,特别是数据量大的时候

AOF持久化(增量),以协议格式将被执行的写命令追加到服务器状态的aof_buf缓冲区,然后根据对应的策略向硬盘中同步数据。随着文件越来越大,需要定期对AOF文件进行重写(fork子进程来完成)。配置appendonly yes开启AOF。

AOF文件的加载需要先创建一个伪客户端,然后把命令一条条发送给Redis服务端,服务端再完整执行一遍相应的命令。

  • 写入策略:配置appendsync ,每秒同步(everysec),等到缓冲区满才同步(no),每次发生数据变更立即同步(always)。
  • 重写机制:通过auto-aof-rewrite-min-size、auto-aof-rewrite-percentage配置重写时机。重写只会保留最终数据,多条写命令会合并为一条,已经超时的数据不会写入文件。重写过程中父进程执行的命令会通过管道按批次发送给子进程,由子进程重写回放。子进程退出后只有少量命令还累计在父进程,父进程只需回放少量命令即可。
  • 优点:更高的数据安全性,采用append模式没有磁盘寻址的开销,写入性能非常高,而且也不会破坏已经保存的数据。
  • 缺点:AOF文件通常要大于RDB文件,大数据集恢复是其速度慢于RDB,AOF这种较为复杂的基于命令日志/merge/回放的方式容易出bug

在 Redis4.0 版本开始,允许使用RDB-AOF混合持久化方式(5.0默认开启)。优化重写机制,重写后新的AOF文件前半段是RDB格式全量数据,后半段是AOF格式增量数据。在 Redis 实例重启时,会使用RDB持久化文件重新构建内存,再使用AOF重放近期的操作指令来实现完整恢复重启之前的状态。

Redis数据过期策略

  • 定时删除:在设置键的过期时间的同时,创建一个定时器(timer),让定时器在键的过期时间来临时,立即执行对键的删除操作。对CPU不友好。
  • 惰性删除:放任键过期不管,但是每次从键空间中获取键时,都检查取得的键是否过期,如果过期的话,就删除该键;如果没有过期,就返回该键。
  • 定期删除:在规定的时间内,分多次遍历服务器中的各个数据库,从数据库的expires字典中随机检查一部分键的过期时间,并删除其中的过期键。

expires字典会保存所有设置了过期时间的key的过期时间数据,其中,key是指向键空间中的某个键的指针,value是该键的毫秒精度的UNIX时间戳表示的过期时间。Redis中同时使用了惰性过期和定期过期两种过期策略。

Redis数据淘汰策略

Redis内存数据集大小上升到一定大小的时候,就会进行数据淘汰策略。Redis 提供了以下8种数据淘汰策略:

  • 全局的键空间选择性移除
    • noeviction:新写入操作会报错。(默认)
    • allkeys-lru:在键空间中,移除最久未使用的key。(这个是最常用的)
    • allkeys-random:在键空间中,随机移除某个key。
    • allkeys-lfu:在键空间中,移除最少使用频次的key(4.0的)
  • 设置过期时间的键空间选择性移除
    • volatile-lru:在设置了过期时间的键空间中,移除最久未使用的key。
    • volatile-random:在设置了过期时间的键空间中,随机移除某个key。
    • volatile-ttl:在设置了过期时间的键空间中,有更早过期时间的key优先移除。
    • volatile-lfu:在设置了过期时间的键空间中,移除最少使用频次的key(4.0的)

LRU:并不是一个严格的 LRU 实现,通过采样一小部分键,然后在采样键中回收最适合(拥有最久未被访问时间)的那个。

MySQL里有2000w数据,Redis中只存20w的数据,如何保证Redis中的数据都是热点数据?选择 allkeys-lru 策略。如果在 Redis 4.0 版本,可以考虑使用 volatile-lfu

如果有大量的 key 需要设置同一时间过期,一般需要注意什么?

如果大量的 key 过期时间设置的过于集中,一般需要在时间上加一个随机值,使得过期时间分散一些。调大 hz 参数。

Redis事务

Redis事务就是一次性、顺序性、排他性的执行一个队列中的一系列命令。没有隔离级别的概念。事务中任意命令执行失败,其余命令仍会被执行。

通过MULTI命令开启一个事务,在该语句之后执行的命令,都将被视为事务之内的操作,最后我们可以通过执行 EXEC/DISCARD命令来提交/回滚该事务内的所有操作。开启事务后,所有语句,发送给 Redis Server ,都会暂存在 Server 中。在事务开启之前,如果客户端与服务器之间出现通讯故障并导致网络断开,其后所有待执行的语句都将不会被服务器执行。

如何实现 Redis CAS 操作?

在 Redis 的事务中,WATCH命令可用于提供CAS功能。

Redis主从同步

image.png

psync命令实现全量复制和增量复制。增量复制通过主服务器id、复制偏移量以及主服务器复制积压缓冲区实现。

Redis哨兵

image.png

Redis集群

redis cluster采用数据分片的哈希槽来进行数据存储和数据的读取。redis cluster的新增和删除节点都需要手动来分配槽区。redis cluster执行读写操作的都是master节点。

image.png

Redis实现分布式锁

image.png

高频面试题

image.png

如何使用 Redis 实现消息队列?

一般使用 list 结构作为队列,rpush 生产消息,lpop 消费消息。当 lpop 没有消息的时候,要适当 sleep 一会再重试。还可使用blpop,在没有消息的时候,它会阻塞住直到消息到来。使用 pub / sub 主题订阅者模式,可以实现 1:N 的消息队列。但是在消费者下线的情况下,生产的消息会丢失。

延时队列: 使用 sortedset ,拿时间戳作为 score ,消息内容作为 key 调用 zadd 来生产消息,消费者用 zrangebyscore 指令获取 N 秒之前的数据轮询进行处理。

参考

  • 《Redis设计与实现》(黄健宏)
  • www.processon.com/view/5f7fc4…
  • svip.iocoder.cn/Redis/Inter…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 多线程 不一样的锁

发表于 2021-03-26

总文档 :文章目录

Github : github.com/black-ant

锁是整个多线程中出现最频繁的概念之一 , 我们在之前细说 synchronized 中曾经简单说了一下 , 这里我们试着完善整个体系 , 来说说其他的几个类 .

一 . Lock 接口

Lock 接口是一切的基础 , 它抽象类一种用于控制多个线程对共享资源的访问的工具 .

> 提供了以下方法用于抽象整个业务 :

  • void lock()
  • void lockInterruptibly() throws InterruptedException : 打断锁
  • boolean tryLock() : 非阻塞尝试获取一个锁
  • boolean tryLock(long time, TimeUnit unit) throws InterruptedException : 带时间尝试
  • void unlock()
  • Condition newCondition()

> Lock 接口提供了区别于隐式监视锁更多的功能 :

  • 保证排序
  • 不可重入使用
  • 死锁检测
  • 本身可以作为同步语句中的目标
  • 获取锁实例的监视器锁与调用该实例的任何lock()方法没有指定的关系

> 内存同步 :

  • 成功的锁操作与成功的锁操作具有相同的内存同步效果。
  • 成功的解锁操作与成功的解锁操作具有相同的内存同步效果。
  • 不成功的锁定和解锁操作,以及可重入的锁定/解锁操作,不需要任何内存同步效果。

二 . ReentranLock

2.1 ReentranLock 入门

ReentranLock 即重入锁 , 表示在单个线程内,这个锁可以反复进入,也就是说,一个线程可以连续两次获得同一把锁 .

ReentranLock 比 synchronized 提供更具拓展行的锁操作。它允许更灵活的结构,可以具有完全不同的性质,并且可以支持多个相关类的条件对象。

它的优势有:

  • 可以使锁更公平。
  • 递归无阻塞的同步机制。
  • 可以使线程在等待锁的时候响应中断。
  • 可以让线程尝试获取锁,并在无法获取锁的时候立即返回或者等待一段时间。
  • 可以在不同的范围,以不同的顺序获取和释放锁。

它的特点有:

  • 可重入互斥锁
  • 同时提供公平方式和非公平方式
    • 公平锁 : 公平锁的锁获取是有顺序的

ReentranLock 基本使用

1
2
3
4
5
6
7
8
9
java复制代码private Lock lock = new ReentrantLock();

public void test() {
lock.lock();
for (int i = 0; i < 5; i++) {
logger.info("------> CurrentThread [{}] , i : [{}] <-------", Thread.currentThread().getName(), i);
}
lock.unlock();
}

2.2 内部重要类

2.2.1 Sync

Sync 是 ReentranLock 的内部抽象类 , 其后续会用来实现两种不同的锁 , 这里先看看Sync 内部做了什么

Node 1 : 继承于AbstractQueuedSynchronizer
又名 AQS , 这些大家就知道它了 , Sync 使用aqs state 来表示锁上的持有数

1
java复制代码abstract static class Sync extends AbstractQueuedSynchronizer

Node 2 : 有一个抽象方法 lock , 后续的公平和非公平会分别实现对应的方法

1
2
3
4
5
6
7
8
9
10
java复制代码abstract void lock();
// ?- 非公平锁的同步对象
static final class NonfairSync extends Sync
> 区别方法 : final void lock() : 对比公平锁有一个修改state 的操作 , 修改成功则设置当前拥有独占访问权限的线程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());

// ?- 公平锁同步对象
static final class FairSync extends Sync
> 区别方法 : tryAcquire(int acquires) , 其中最大的缺别在于会查询是否有线程等待获取的时间长于当前线程

Node 3 : nonfairTryAcquire 方法干了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码       
final boolean nonfairTryAcquire(int acquires) {
// 获取当前 Thread 和状态
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// CAS 设置状态
if (compareAndSetState(0, acquires)) {
// 设置当前拥有独占访问权限的线程
// null 表示没有线程获取了访问权限
setExclusiveOwnerThread(current);
return true;
}
}
// 返回由 setExclusiveOwnerThread 设置的最后一个线程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

Node 3 : tryRelease 释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码
// 重要可以看到2个操作 : setExclusiveOwnerThread + setState
// setExclusiveOwnerThread 为 null 表示没有线程获取了访问权限

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

2.3 synchronized 和 ReentrantLock 异同

相同点

  • 都实现了多线程同步和内存可见性语义 (隐式监视器锁定)。
  • 都是可重入锁

不同点

  • 同步实现机制不同
    • synchronized 通过 Java 对象头锁标记和 Monitor 对象实现同步。
    • ReentrantLock 通过CAS、AQS(AbstractQueuedSynchronizer)和 LockSupport(用于阻塞和解除阻塞)实现同步。
  • 可见性实现机制不同
    • synchronized 依赖 JVM 内存模型保证包含共享变量的多线程内存可见性。
    • ReentrantLock 通过 AQS 的 volatile state 保证包含共享变量的多线程内存可见性。
  • 使用方式不同
    • synchronized 可以修饰实例方法(锁住实例对象)、静态方法(锁住类对象)、代码块(显示指定锁对象)。
    • ReentrantLock 显示调用 tryLock 和 lock 方法,需要在 finally 块中释放锁。
  • 功能丰富程度不同
    • synchronized 不可设置等待时间、不可被中断(interrupted)。
    • ReentrantLock 提供有限时间等候锁(设置过期时间)、可中断锁(lockInterruptibly)、condition(提供 await、condition(提供 await、signal 等方法)等丰富功能
  • 锁类型不同
    • synchronized 只支持非公平锁。
    • ReentrantLock 提供公平锁和非公平锁实现。当然,在大部分情况下,非公平锁是高效的选择。

总结 :
在 synchronized 优化以前,它的性能是比 ReenTrantLock 差很多的,但是自从 synchronized 引入了偏向锁,轻量级锁(自旋锁)后,两者的性能就差不多了 .

在两种方法都可用的情况下,官方甚至建议使用 synchronized 。
并且,实际代码实战中,可能的优化场景是,通过读写分离,进一步性能的提升,所以使用 ReentrantReadWriteLock

2.3 ReentrantLock 深入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码// 常用方法 :
- void lock()
- Condition newCondition()
- boolean tryLock()
- void unlock()

--------------
// Node 1 : 基础于Lock 接口 , 并且支持序列化
ReentrantLock implements Lock, java.io.Serializable

--------------
// Node 2 : 内部类 , ReentrantLock 中有几个很重要的 sync 类 , Sync 是同步控制的基础


--------------
// Node 3 : 公平非公平的切换方式
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

--------------
// Node 4 : Lock 方法的实现 , 默认调用 NonfairSync
public void lock() {
sync.lock();
}


--------------
// Node 5 : lockInterruptibly 的实现方式
sync.acquireInterruptibly(1);

// Node 6 :
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

三 . ReadWriteLock

读写锁是用来提升并发程序性能的锁分离技术的 Lock 实现类。可以用于 “多读少写” 的场景,读写锁支持多个读操作并发执行,写操作只能由一个线程来操作。

ReadWriteLock 对向数据结构相对不频繁地写入,但是有多个任务要经常读取这个数据结构的这类情况进行了优化。

ReadWriteLock 使得你可以同时有多个读取者,只要它们都不试图写入即可。如果写锁已经被其他任务持有,那么任何读取者都不能访问,直至这个写锁被释放为止。

ReadWriteLock 对程序性能的提高主要受制于如下几个因素:

  1. 数据被读取的频率与被修改的频率相比较的结果。
  2. 读取和写入的时间
  3. 有多少线程竞争
  4. 是否在多处理机器上运行

特征 :

  • 公平性:支持公平性和非公平性。
  • 重入性:支持重入。读写锁最多支持 65535 个递归写入锁和 65535 个递归读取锁。
  • 锁降级:遵循获取写锁,再获取读锁,最后释放写锁的次序,如此写锁能够降级成为读锁。

深入 ReadWriteLock :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码ReadWriteLock 是一个接口 , 它仅仅提供了2个方法 : 

/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();

/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();

四 . ReentrantReadWriteLock

重入锁 ReentrantLock 是排他锁,排他锁在同一时刻仅有一个线程可以进行访问 , ReentrantReadWriteLock 则是可重入的读写锁实现类 , 只要没有线程 writer , 读取锁可以由多个 Reader 线程同时保持

I- ReadWriteLock
M- Lock readLock();
M- Lock writeLock();

C- ReentrantReadWriteLock : 可重入的读写锁实现类
I- ReadWriteLock

?- 内部维护了一对相关的锁,一个用于只读操作,另一个用于写入操作 , 写锁是独占的,读锁是共享的

4.1 ReentrantReadWriteLock 深入

使用案例 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码    Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
if (!cacheValid) {
data = "test";
cacheValid = true;
}
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
}

Node 1 : 内部提供了2个内部属性 , 这也就是为什么能做到独写锁分离

1
2
3
4
java复制代码// 读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
// 写锁
private final ReentrantReadWriteLock.WriteLock writerLock;

Node 2 : 再次出现的 Sync , 老规矩 , Sync 还是通过 fair 去判断创建

1
2
3
4
5
6
java复制代码final Sync sync;
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

Node 3 : Sync 内部状态控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码// 读取和写入计数提取常量和函数。Lock state在逻辑上分为两个 :
// 较低的(低16)表示排他(写入)锁保持计数,较高的(高16)表示共享(读取)锁保持计数。
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

计数的方式 :

// 获得持有读状态的锁的线程数量
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
读状态,等于 S >>> 16 (无符号补 0 右移 16 位)

// 获得持有写状态的锁的次数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
写状态,等于 S & 0x0000FFFF(将高 16 位全部抹去)

Node 4 : HoldCounter 类的作用 : 每个读线程需要单独计数用于重入

1
2
3
4
5
6
7
8
java复制代码// 每个线程读取保持计数的计数器。作为ThreadLocal维护 , 缓存在cachedHoldCounter 
static final class HoldCounter {
int count = 0;
// 非引用有助于垃圾回收
final long tid = getThreadId(Thread.currentThread());
}
// 成功获取readLock的最后一个线程的保持计数
private transient HoldCounter cachedHoldCounter;

Node 5 : ThreadLocalHoldCounter , 为了反序列化机制

1
2
3
4
5
6
7
8
java复制代码static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
// 当前线程持有的可重入读锁的数量。仅在构造函数和readObject中初始化。当线程的读保持计数下降到0时删除
private transient ThreadLocalHoldCounter readHolds;

Node 6 : Sync 内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码NonfairSync : 不公平锁
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
// 如果暂时出现在队列头的线程(如果存在)是正在等待的写入器,则阻塞
// 如果在其他已启用的、尚未从队列中耗尽的读取器后面有一个正在等待的写入器,那么新的读取器将不会阻塞
return apparentlyFirstQueuedIsExclusive();
}

FairSync : 公平锁
static final class FairSync extends Sync {
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}

五 . Condition

5.1 Condition 简介

1
2
3
4
5
6
java复制代码在 Java SE 5 后,Java 提供了 Lock 接口,相对于 synchronized 而言,Lock 提供了条件 Condition ,对线程的等待、唤醒操作更加详细和灵活 


> AQS 等待队列与 Condition 队列是两个相互独立的队列
#await() 就是在当前线程持有锁的基础上释放锁资源,并新建 Condition 节点加入到 Condition 的队列尾部,阻塞当前线程 。
#signal() 就是将 Condition 的头节点移动到 AQS 等待节点尾部,让其等待再次获取锁。

5.2 Condition 流程

image.png

image.png

5.3 Condition 源码

Condition 其实是一个接口 , 其在 AQS 中存在一个是实现类 , ConditionObject , 我们就主要说说它 :

Node 1 : 属性对象

1
2
3
4
java复制代码// condition queue 第一个节点
private transient Node firstWaiter;
// condition queue 最后一个节点
private transient Node lastWaiter;

Node 2 : 核心方法 doSignal + doSignalAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码
// doSignal : 删除和传输节点,直到碰到非取消的1或null
private void doSignal(Node first) {
do {
// 先判断是否为头节点或者null
// 注意其中的 = 是赋值 : !transferForSignal(first) &&(first = firstWaiter) != null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// transferForSignal 是 AQS 方法, 将节点从条件队列转移到同步队列 , 主要是 CAS 操作修改状态
// Node p = enq(node); 这里面是一个Node 拼接操作 , 其实可以理解为已经将 Node 加入对应的队列里面了
} while (!transferForSignal(first) &&(first = firstWaiter) != null);
}



// doSignalAll : 删除和传输所有节点 , 注意区别 , 这里不像 Notify 是通知是有线程去获取
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

Node 3 : 主要方法 await

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// Step 1 : 添加到 Condition 队列
Node node = addConditionWaiter();
// Step 2 : 使用当前状态值调用release , 且返回保存的状态
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果一个节点(总是最初放置在条件队列上的节点)现在正在同步队列上等待重新获取,则返回true
// 即如果有节点等待
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 以独占不可中断模式获取已经在队列中的线程。
// 用于条件等待方法和获取方法。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

// awaitNanos(long nanosTimeout) : 定时条件等待
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();


// awaitUntil(Date deadline) : 实现绝对定时条件等待 , 即一个定时操作
// 超时后直接传输节点
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}

Node 4 : Release 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码    // 使用当前状态值调用release;返回保存的状态。
// 取消节点并在失败时抛出异常。
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取状态
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
// 设置当前状态为取消
node.waitStatus = Node.CANCELLED;
}
}


public final boolean release(int arg) {
// 实现具体重写
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

Node 5 : 其他方法 :

  • 方法 addConditionWaiter : 增加了一个新的服务员等待队列。
  • awaitUninterruptibly :实现不可中断条件等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
// 保存的状态作为参数 , 如果失败,抛出IllegalMonitorStateException
int savedState = fullyRelease(node);
boolean interrupted = false;
// 阻塞直到有信号
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
// acquireQueued : 保存的state作为参数 , 以独占不可中断模式获取已经在队列中的线程 , 重新获取
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}

总结

写到这里 , 多线程系列准备告一段落了 , 其实整个系列写了一半不到 , 还远远没完 . 但是最近拜读了Java并发编程的艺术后 ,感觉自己对多线程的理解还远远不够 , 所以决定花一段时间再深读一下 , 从而充实整个文档 .

参考及感谢

多线程集合

芋道源码

死磕系列

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…697698699…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%