开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

4种方案,帮你解决Maven创建项目过慢问题

发表于 2021-11-29

在实际开发中,我们通常会用到maven的archetype插件(原型框架)来生成项目框架。但是无奈,创建时,总会卡在

1
csharp复制代码[INFO] Generating project in Batch mode

等很久才构建完,有时还会报错,导致构建失败!

不多废话了,直接上解决办法吧:

解决办法1

在创建Maven项目时加上archetypeCatalog=internal参数,点击“+”添加参数archetypeCatalog=internal。 如下:

图片

图片

解决办法2

在Maven的VM Options加上-DarchetypeCatalog=internal或-DarchetypeCatalog=local参数, 如下:Settings -> Build, Execution, Deployment -> Build Tools -> Maven -> Runner -> VM Options

图片

解决办法3

(1)准备archetype-catalog.xml文件

方式1:通过浏览器查看repo1.maven.org/maven2/arch…

方式2:通过命令行 curl http://repo1.maven.org/maven2/archetype-catalog.xml > archetype-catalog.xml

图片

(2)将上述文件archetype-catalog.xml放置到maven的默认路径下

注意,这不是指的安装路径,而是mvn运行时默认的存放repository的路径,一般在用户根目录下的一个隐藏目录,~/.m2。如果做过更改,可以在maven的设置文件中查看具体在哪个位置,设置文件在maven安装目录/config/settings.xml中。

图片

(3)在构建时,在archetype:generate后加上 -DarchetypeCatalog=local参数。IDE如eclipse和idea,都是可以在创建maven工程时设置构建参数的。

解决方法4

修改下载下来的maven目录下面conf/settings.xml文件,修改镜像源,使用国内阿里巴巴的镜像源;

1
2
3
4
5
6
xml复制代码<mirror>
        <id>alimaven</id>
        <name>aliyun maven</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        <mirrorOf>central</mirrorOf>        
</mirror>

图片

测试一下:

图片

再测试一次:我的天啊,1.146 s完成。

图片

希望能给大家带来帮助吧!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

百度商业大规模高性能全息日志检索技术揭秘

发表于 2021-11-29

图片

一、背景介绍

百度商业产品是服务于百度广告主用来投放广告而打造的产品生态。包含搜索推广、信息流推广、品牌等推广渠道以及观星盘、基木鱼等营销工具。

图片

百度商业产品全景图

这一系列商业产品底层多为复杂的 Java 业务系统。复杂性主要体现在底层微服务子系统多、应用间调用关系复杂、基础组件依赖多。复杂性高就就意味着容易出问题,并且出了问题定位困难。但是这些产品出问题会直接导致广告主是否成功投放广告或者修改出价、创意等操作失败。

「如果有过在广告业务系统一线工作经历的同学,应该深知排查线上问题的枯燥和耗时」

如何在出问题第一时间定位问题,从而快速止损和修复问题,是商业产品系统中一个关键的技术痛点。为了解决这个痛点,百度商业平台部打造了大规模分布式微服务监控系统。公众号前文**「[百度商业大规模微服务业务监控系统-凤睛](http://mp.weixin.qq.com/s?__biz=Mzg5MjU0NTI5OQ==&mid=2247491371&idx=1&sn=695d770526e5d151eec512b5489c835f&chksm=c03d2f57f74aa64123dbb09b076955545251780c95f7af7dde9a6efaa9cfbc6a83d23623178e&scene=21#wechat_redirect)」**已经讲述了凤睛如何通过自研无侵入探针以及高性能调用链存储系统为百度各业务线提供微服务系统性能指标、业务黄金指标、健康状况、监控告警等。

当收到线上报警时,值班同学需要首先找到出问题的根因模块,然后找出该模块出错的服务接口,最后定位到问题代码行栈。凤睛提供了调用链数据能够查看各个调用环节的状态码、耗时等,同时也收录了业务系统打印的错误堆栈。

图片

凤睛提供的调用链表格视图(红色箭头标识的是耗时长关键路径)

大部分情况下,通过调用链以及系统打印的错误堆栈可以确定问题。但是,部分情况下问题与用户请求返回、业务访问缓存的情况等等比较特殊的场景有关。这需要通过系统打印的业务日志辅助定位。

凤睛并没有采集和存储业务日志,这与数据量有关,它部署在数千个微服务子系统上,运行在数万个容器中。每天采集的调用链数据条数达千亿,日存储数据在TB级别。而更加庞大、抽象程度最低的业务日志,预估单日总量接近PB级别,存储开销实在太大。

二、技术原理

传统做法:为了检索到单个请求相关的所有业务日志,日志会被采集走存储在 ES 里面提供检索功能。

图片

日志采集架构

诚然,Kibana+ES会提供更丰富灵活的检索功能,但是对于凤睛这种平台级别监控系统,基本不可行。ES的资源成本过于昂贵,整个平台单日日志数据接近 PB。如果全部存储在 ES 中,那么集群资源消耗以及维护成本都是很高的。并且,单纯定位线上问题,并不需要特别复杂的日志检索功能。

那么能否在少量资源消耗下,满足用户可以看到单个请求相关的完整调用链以及业务日志呢?

凤睛整个迭代过程就是不断利用有限资源来创造性解决实际问题的过程。而真正好的系统架构亦然如此,要**「因地制宜」。在阿里巴巴钟华同学的《企业IT架构转型之道:阿里巴巴中台战略思想与架构实战》一书中,开篇就提到过「好的创新一定是基于企业现状因地制宜」。**

目前商业平台 Java 系统统一部署在企业级微服务托管平台 Jarvis上,同时凤睛探针能够无侵入式跟踪和采集系统的动作,这是我们的技术优势。能否利用这个优势,来规避掉存储资源有限的短板。探针既然能够记录系统在每个请求发生过程中的所有的动作;那么同样可以记录系统打印日志的动作。

图片

全息日志技术思路示意图

我们通过探针记录下请求相关的业务日志文件名、日志偏移量,并且存储数据库中。当用户在Jarvis管理端检索调用链相关的业务日志时,系统会先通过调用链 ID 去获取相关的虚拟容器地址、日志文件名、日志偏移量等元数据信息,然后通过这些元数据去具体的容器中取到完整的日志内容,最后展现给用户。

图片

全息日志实际产品效果图

这样虽然我们只消耗少量的存储和计算资源,也可以轻松检索到海量调用链相关的业务日志。这个受限于日志在容器中的实际存储时间,但是线上问题很少需要借助远久历史日志来分析定位。绝大多数情况下借助当前日志就可以满足需求了。

「全息日志技术是凤睛自研技术,也申请了相关专利」

三、算法实现

全息日志技术设计中分为两个主要的部分:

  1. 日志元数据采集:拦截打印日志的前后操作,进行元数据采集。
  2. 元数据解析:解析元数据定位出日志文件当前位置以及日志所在文件的位置。

在通常情况下,一份日志消息可能打印到多个日志文件中,日志文件可能根据配置的滚动策略进行基于时间或者大小的滚动,不同的时间进行日志检索需要能够自动区分出日志当前所在的实际位置,用户不需要感知底层日志文件位置变化。

在设计和实现中关键问题的解决:

  1. 元数据采集的性能和准确性问题

为了保证元数据能够被准确的采集,需要基于凤睛探针拦截打印日志的方法。

图片

元数据项与采集原理图

  • 在原始打印日志操作之前插入字节码,记录开始时间、读文件标识符获取打印前文件偏移量、文件的滚动策略(包括文件最大大小、文件滚动时间等)、日志级别;
  • 在原始打印日志操作之后插入字节码,记录结束时间、读文件标识符获取打印后文件偏移量、日志内容当前写入的文件、文件按滚动策略归档后策略、同名归档时归档序号等.
  • 在采集文件偏移量时通过直接读取文件描述符而不是直接读文件内容来提高性能。同时,在日志打印内容里注入每次调用唯一的traceId做更精准的标注,其他数据的采集用来日志检索时解析使用。
  1. 元数据解析

用户发起日志检索时,使用算法解析出此时此刻日志内容所在位置。

图片

检索算法流程图

  • 根据调用链traceId查询出与该traceId相同的所以日志元数据记录;
  • 分别获取日志打印结束时间、日志打印时当前文件名、文件归档策略;
  • 解析归档策略;
  • 根据归档策略注入不同基准参数,模拟出一个日志打印器,以此获取到此时此刻文件位置;
  • 根据文件位置,以及文件前后偏移量,读取出两个偏移量之前的日志内容;
  • 用traceId进行内容校准.

四、结语

凤睛通过自研的全息日志技术能够帮助业务方快速检索到业务请求相关的完整调用链以及完整的业务日志。作为分布式追踪系统,我们也补齐了追踪领域最后一块短板。但是业务系统的复杂性,也决定了凤睛作为一个平台化的业务监控产品所面临的诸多挑战。

作者介绍:

李奇原,百度商业平台研发部-资深研发工程师

前后负责过商业平台API网关、商业平台部微服务凤睛监控系统。对构建高性能、高可用分布式系统有较多实践和较深入的理解。

推荐阅读:

|深入理解 WKWebView(入门篇)—— WebKit 源码调试与分析

|快速剪辑-助力度咔智能剪辑提效实践

|短视频个性化Push工程精进之路

———- END ———-

百度 Geek 说

百度官方技术公众号上线啦!

技术干货 · 行业资讯 · 线上沙龙 · 行业大会

招聘信息 · 内推信息 · 技术书籍 · 百度周边

欢迎各位同学关注

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

『Netty核心』数据交互源码解读

发表于 2021-11-29

「这是我参与11月更文挑战的第30天,活动详情查看:2021最后一次更文挑战」。

点赞再看,养成习惯👏👏

1、NioEventLoopGroup

1
java复制代码EventLoopGroup bossGroup = new NioEventLoopGroup(1);

初始化的时候如果有传线程数量的话以传的为主,如果没有传含有的子线程NioEventLoop的个数默认为cpu核数的两倍。

1
2
3
4
5
6
7
java复制代码private static final int DEFAULT_EVENT_LOOP_THREADS = 
Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads",
NettyRuntime.availableProcessors() * 2));

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

这里的children是NioEventLoopGroup的一个成员变量,通过for循环一个个赋值,

1
2
java复制代码this.children = new EventExecutor[nThreads];
this.children[i] = this.newChild((Executor)executor, args);

所以跟上面介绍的类似,一个NioEventLoopGroup里面有多个NioEventLoop

1
2
3
4
5
java复制代码NioEventLoopGroup.class
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider)args[0]
((SelectStrategyFactory)args[1]).newSelectStrategy(), (RejectedExecutionHandler)args[2]);
}

一个NioEventLoop里面有Selector和TaskQueue

1
2
3
4
5
6
7
8
9
10
java复制代码SingleThreadEventExecutor.class
this.taskQueue = this.newTaskQueue(this.maxPendingTasks);
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue(maxPendingTasks);
}

NioEventLoop.class
this.provider = selectorProvider;
NioEventLoop.SelectorTuple selectorTuple = this.openSelector();//调用nio方法
this.selector = selectorTuple.selector;

2、ServerBootstrap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码//创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来配置参数
bootstrap.group(bossGroup,workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为服务器的通道实现
//初始化服务器连接队列大小,服务器处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
//多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChannelInitializer<SocketChannel>() { //创建通道初始化对象,设置初始化参数
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//对workGroup的SocketChannel设置处理器
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});

上面本质就是通过链式编程的形式给ServerBootstrap的各个成员变量赋初值,以便在后续使用。

3、NioServerSocketChannel

NioServerSocketChannel初始化主要返回ServerSocketChannel,这步也是对nio的封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码NioServerSocketChannel.class
private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
try {
//ServerSocketChannel serverSocket = ServerSocketChannel.open();
return provider.openServerSocketChannel();
} catch (IOException var2) {
throw new ChannelException("Failed to open a server socket.", var2);
}
}

public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

同时也执行this.pipeline = this.newChannelPipeline();为该成员变量pipeline赋初始值,通过pipeline将各个handler串连起来,这里只是初始化tail和head,后续调用的时候会使用到。

1
2
3
4
5
6
7
8
9
java复制代码protected DefaultChannelPipeline(Channel channel) {
this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
this.voidPromise = new VoidChannelPromise(channel, true);
this.tail = new DefaultChannelPipeline.TailContext(this);
this.head = new DefaultChannelPipeline.HeadContext(this);
this.head.next = this.tail;
this.tail.prev = this.head;
}

同时也将该channel设置为非阻塞和接收连接,也是对nio的封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
//readInterestOp传进来是16也就是SelectionKey.OP_ACCEPT,这里只是普通的赋值,还没进行绑定
this.readInterestOp = readInterestOp;

try {
//serverSocket.configureBlocking(false);
ch.configureBlocking(false);
} catch (IOException var7) {
try {
ch.close();
} catch (IOException var6) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a partially initialized socket.", var6);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", var7);
}
}

4、源码剖析图:

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

JSON解析教程(Gson、FastJson、Jackson

发表于 2021-11-29

​

JSON即JavaScript Object Notation(JavaScript对象表示法),现在常被作为不同程序之间通信的语言。算是机器之间交流的语法规则的一种,对于程序员来讲掌握JSON的解析变得非常重要,这将有助于我们和程序之间的沟通。

一个简单的JSON案例如下:

1
2
3
4
5
6
7
8
9
10
11
12
json复制代码{
"name":"小王同学",
"age":18,
"pengyou":["张三","李四","王二","麻子",{
"name":"野马老师",
"info":"像匹野马一样狂奔在技术钻研的道路上"
}],
"heihei":{
"name":"大长刀",
"length":"40m"
}
}

下面介绍三种常用的解析方法:

1 Gson

)​

  • 将对象转换为JSON字符串
  1. 引入JAR包
  2. 在需要转换JSON字符串的位置编写如下代码即可:
1
java复制代码String json = new Gson().toJSON(要转换的对象);

  • 将JSON字符串转换为对象
  1. 引入JAR包
  2. 在需要转换Java对象的位置, 编写如下代码
1
java复制代码对象 = new Gson().fromJson(JSON字符串,对象类型.class);

案例Demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public class Demo1 {
public static void main(String[] args) {
// 创建json对象
Gson gson = new Gson();
// 转换成json {"id":"100","name":"金苹果","info":"种植苹果"}
Book book = new Book("100", "金苹果", "种植苹果");
String s = gson.toJson(book);
System.out.println(s);
// json转字符串
Book book1 = gson.fromJson(s, Book.class);
System.out.println(book1);
}
}

)​

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public static void main(String[] args) {
// 创建json对象
Gson gson = new Gson();
// json转字符串 {"id":"100","name":"金苹果","info":"种植苹果","page":["锄禾日当午","汗滴禾下土"]}
HashMap data = gson.fromJson("{"id":"100","name":"金苹果","info":"种植苹果","page":["锄禾日当午","汗滴禾下土"]}", HashMap.class);
System.out.println(data.get("page"));
// json中的数组会被转成List集合
System.out.println(data.get("page").getClass());

List page = (List) data.get("page");
System.out.println(page.get(1));
}

)​

2 FastJson

)​

  • 将对象转换为JSON字符串

转换JSON字符串的步骤:

  1. 引入JAR包
  2. 在需要转换JSON字符串的位置编写如下代码即可
  • 将JSON字符串转换为对象
  1. 引入JAR包
  2. 在需要转换Java对象的位置, 编写如下代码:
1
java复制代码类型 对象名=JSON.parseObject(JSON字符串, 类型.class);

或

1
java复制代码List<类型> list=JSON.parseArray(JSON字符串,类型.class);

案例Demo:

1
2
3
4
5
6
7
8
9
10
java复制代码public static void main(String[] args) {
Book book = new Book("1002", "唐诗300首","床前明月光");
// 转换 {"id":"1002","info":"床前明月光","name":"唐诗300首"}
String json = JSON.toJSONString(book);
System.out.println(json);

// json转对象
Book book1 = JSON.parseObject("{"id":"1002","info":"床前明月光","name":"唐诗300首"}", Book.class);
System.out.println(book1.getName());
}

)​

1
2
3
4
5
java复制代码public static void main(String[] args) {
// json转数组 ["一二三", "二三四", "三四五"]
List<String> strings = JSON.parseArray("["一二三", "二三四", "三四五"]", String.class);
System.out.println(strings.get(2));
}

)​

3 Jackson

  • 将JSON字符串转换为对象

步骤:

  1. 导入jackson的相关jar包

)​

  1. 创建Jackson核心对象 ObjectMapper
  2. 调用ObjectMapper的相关方法进行转换
1
java复制代码readValue(json字符串数据,Class)

1
2
3
4
5
6
7
8
java复制代码@Test
public void tes5() throws Exception {
String json = "{"gender":"男","name":"张三","age":23}";

ObjectMapper mapper = new ObjectMapper();
Person person = mapper.readValue(json, Person.class);
System.out.println(person);// 输出Person类的toString()
}

  • Java对象转换JSON
  1. 导入jackson的相关jar包
  2. 创建Jackson核心对象 ObjectMapper
  3. 调用ObjectMapper的相关方法进行转换
1
java复制代码writeValue(参数1,obj):

参数1:

File:将obj对象转换为JSON字符串,并保存到指定的文件中

Writer:将obj对象转换为JSON字符串,并将json数据填充到字符输出流中

OutputStream:将obj对象转换为JSON字符串,并将json数据填充到字节输出流中

1
java复制代码writeValueAsString(obj):将对象转为json字符串

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码Person p = new Person();
p.setName("张三");
p.setAge(23);
p.setGender("男");

ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(p);
System.out.println(json);

//writeValue,将数据写到d://a.txt文件中
mapper.writeValue(new File("d://a.txt"), p);

//writeValue,将数据关联到Writer中
mapper.writeValue(new FileWriter("d://b.txt"),p);

案例:校验用户名是否存在

服务器响应的数据,在客户端使用时,要想当做json数据格式使用。有两种解决方案:

  1. $.get(type):将最后一个参数type指定为”json”
  2. 在服务器端设置MIME类型
1
java复制代码response.setContentType("application/json;charset=utf-8");

前端界面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
html复制代码<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>注册页面</title>
<script src="js/jquery-3.3.1.min.js"></script>
<script>
$(function () {
$("#username").blur(function () {
var username = $(this).val();
// 发送ajax请求
//期望服务器响应回的数据格式:{"userExsit":true,"msg":"此用户名太受欢迎,请更换一个"}
// {"userExsit":false,"msg":"用户名可用"}
$.get("findUserServlet",{username:username}, function (data) {
var span = $("#s_username");

if (data.userExsit){
span.css("color","red");
span.html(data.msg);
}else {
span.css("color","green");
span.html(data.msg);
}
},"json");
});
});
</script>
</head>
<body>
<form>

<input type="text" id="username" name="username" placeholder="请输入用户名">
<span id="s_username"></span>
<br>
<input type="password" name="password" placeholder="请输入密码"><br>
<input type="submit" value="注册"><br>
</form>
</body>
</html>

相应的servlet:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码@WebServlet("/findUserServlet")
public class FindUserServlet extends HttpServlet {
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
// 1.获取用户名
String username = request.getParameter("username");

// 2.调用service层判断用户名是否存在
//设置响应的数据格式为json
response.setContentType("application/json;charset=utf-8");

Map<String , Object> map = new HashMap<>();
if("tom".equals(username)){
//存在
map.put("userExsit",true);
map.put("msg","此用户名太受欢迎,请更换一个");
}else{
//不存在
map.put("userExsit",false);
map.put("msg","用户名可用");
}

//将map转为json,并且传递给客户端
//将map转为json
ObjectMapper mapper = new ObjectMapper();
//并且传递给客户端
mapper.writeValue(response.getWriter(),map);

}

protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
this.doPost(request, response);
}
}

\

​

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

如何避免无意间创建多余对象 6 避免创建不必要的对象

发表于 2021-11-29

看完这本《Effective Java》,我悟了

6 避免创建不必要的对象

从字面意思上来看,大家肯定都知道创建不必要的对象是错误的做法。但这一节其实主要是提醒我们避免无意识的创建不必要对象的代码写法。

例1:

1
java复制代码String s = new String("abc");

是错误的写法,正确的写法应该是:

1
java复制代码String s = "abc";

原因是第一种写法每次被执行的时候都会创建一个新的String实例,但这些全都是重复的!

例2:

我们要优先使用静态工厂方法而不是构造器来避免创建不必要的对象,如Boolean.valueOf(String)总是要优先于构造器Boolean(String)使用。因为构造器每次被调用都会创建一个新对象,静态工厂不这样。

例3:
创建成本昂贵的对象时,应该将其缓存起来。

例如正则表达式匹配的代码中,String.matches方法内部创建了一个Pattern实例,这个创建的成本很高,因为需要将正则表达式编译成有限状态机,所以应该将其缓存起来:

1
2
3
4
5
6
7
java复制代码public class RomanNumerals {
private static final Pattern ID = Pattern.compile("^\d{15}$)|(^\d{18}$)|(^\d{17}(\d|X|x)$");

static boolean isRomanNumeral(String s){
return ID.matcher(s).matches();
}
}

这样一来,每次调用isRomanNumeral时都会重用同一个ID实例

例4:
上面的Pattern实例是不变的,但在某些场景下实例是可变的,这时就可以考虑适配器。适配器是这样一个对象:它将功能委托给一个后备对象,为后备对象提供一个替代前面功能的接口。

例如Map接口的KeySet方法,每次调用返回的都是同一个Set实例,虽然Set实例是可变的,但其中一个变化时其他的也会跟着变,因为他们本身就是一个。

例5:
优先使用基本类型而不是装箱类型,原因在于下面这个例子:

1
2
3
4
5
6
7
java复制代码private static long sum(){
Long sum = 0L;
for(long i = 0; i <= Integer.MAX_VALUE; i ++)
sum += i;

return sum;
}

这段程序执行起来没有任何问题,但实际情况会慢一点,因为sum的类型是Long而不是long,所以程序构造了大约2^31个Long实例。

这一点在我记忆中和工作里的要求不一致,为此我专门去翻阅了阿里巴巴Java开发手册,里面是这样描写的:
在这里插入图片描述
可见公司在这个问题的考虑上是业务优先了,所以小伙伴们可以斟酌使用时的取舍,我个人还是推荐使用包装类型的。

避免一个误区:
不要看完这一章节就陷入了创建对象的代价非常昂贵的逻辑怪圈里去了,反之维护自己的对象池来避免创建对象是一种错误的做法。因为现代JVM的实现里有高度优化的垃圾收集器,其性能很容易就超过了轻量级对象池的性能。

一个正确的示例是数据库连接池,因为建立一个数据库的连接是非常昂贵的。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

打通JAVA与内核系列之一ReentrantLock锁的实现

发表于 2021-11-29

简介:写JAVA代码的同学都知道,JAVA里的锁有两大类,一类是synchronized锁,一类是concurrent包里的锁(JUC锁)。其中synchronized锁是JAVA语言层面提供的能力,在此不展开,本文主要讨论JUC里的ReentrantLock锁。

作者 | 蒋冲

来源 | 阿里技术公众号

写JAVA代码的同学都知道,JAVA里的锁有两大类,一类是synchronized锁,一类是concurrent包里的锁(JUC锁)。其中synchronized锁是JAVA语言层面提供的能力,在此不展开,本文主要讨论JUC里的ReentrantLock锁。

一 JDK层

1 AbstractQueuedSynchronizer

ReentrantLock的lock(),unlock()等API其实依赖于内部的Synchronizer(注意,不是synchronized)来实现。Synchronizer又分为FairSync和NonfairSync,顾名思义是指公平和非公平。

当调用ReentrantLock的lock方法时,其实就只是简单地转交给Synchronizer的lock()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scala复制代码代码节选自:java.util.concurrent.locks.ReentrantLock.java
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
......
}

public void lock() {
sync.lock();
}

那么这个sync又是什么?我们看到Sync 继承自AbstractQueueSynchronizer(AQS),AQS是concurrent包的基石,AQS本身并不实现任何同步接口(比如lock,unlock,countDown等等),但是它定义了一个并发资源控制逻辑的框架(运用了template method 设计模式),它定义了acquire和release方法用于独占地(exclusive)获取和释放资源,以及acquireShared和releaseShared方法用于共享地获取和释放资源。比如acquire/release用于实现ReentrantLock,而acquireShared/releaseShared用于实现CountDownLacth,Semaphore。比如acquire的框架如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码    /**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

整体逻辑是,先进行一次tryAcquire,如果成功了,就没啥事了,调用者继续执行自己后面的代码,如果失败,则执行addWaiter和acquireQueued。其中tryAcquire()需要子类根据自己的同步需求进行实现,而acquireQueued() 和addWaiter() 已经由AQS实现。addWaiter的作用是把当前线程加入到AQS内部同步队列的尾部,而acquireQueued的作用是当tryAcquire()失败的时候阻塞当前线程。

addWaiter的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
//创建节点,设置关联线程和模式(独占或共享)
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 如果尾节点不为空,说明同步队列已经初始化过
if (pred != null) {
//新节点的前驱节点设置为尾节点
node.prev = pred;
// 设置新节点为尾节点
if (compareAndSetTail(pred, node)) {
//老的尾节点的后继节点设置为新的尾节点。 所以同步队列是一个双向列表。
pred.next = node;
return node;
}
}
//如果尾节点为空,说明队列还未初始化,需要初始化head节点并加入新节点
enq(node);
return node;
}

enq(node)的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 如果tail为空,则新建一个head节点,并且tail和head都指向这个head节点
//队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联
if (compareAndSetHead(new Node()))
tail = head;
} else {
//第二次循环进入这个分支,
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

addWaiter执行结束后,同步队列的结构如下所示:

acquireQueued的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码 /**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取当前node的前驱node
final Node p = node.predecessor();
//如果前驱node是head node,说明自己是第一个排队的线程,则尝试获锁
if (p == head && tryAcquire(arg)) {
//把获锁成功的当前节点变成head node(哑节点)。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

acquireQueued的逻辑是:

判断自己是不是同步队列中的第一个排队的节点,则尝试进行加锁,如果成功,则把自己变成head node,过程如下所示:

如果自己不是第一个排队的节点或者tryAcquire失败,则调用shouldParkAfterFailedAcquire,其主要逻辑是使用CAS将节点状态由 INITIAL 设置成 SIGNAL,表示当前线程阻塞等待SIGNAL唤醒。如果设置失败,会在 acquireQueued 方法中的死循环中继续重试,直至设置成功,然后调用parkAndCheckInterrupt 方法。parkAndCheckInterrupt的作用是把当前线程阻塞挂起,等待唤醒。parkAndCheckInterrupt的实现需要借助下层的能力,这是本文的重点,在下文中逐层阐述。

2 ReentrantLock

下面就让我们一起看看ReentrantLock是如何基于AbstractQueueSynchronizer实现其语义的。

ReentrantLock内部使用的FairSync和NonfairSync,它们都是AQS的子类,比如FairSync的主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
scala复制代码/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

AQS中最重要的一个字段就是state,锁和同步器的实现都是围绕着这个字段的修改展开的。AQS可以实现各种不同的锁和同步器的原因之一就是,不同的锁或同步器按照自己的需要可以对同步状态的含义有不同的定义,并重写对应的tryAcquire, tryRelease或tryAcquireshared, tryReleaseShared等方法来操作同步状态。

我们来看看ReentrantLock的FairSync的tryAcquire的逻辑:

  1. 如果此时state(private volatile int state)是0,那么就表示这个时候没有人占有锁。但因为是公平锁,所以还要判断自己是不是首节点,然后才尝试把状态设置为1,假如成功的话,就成功的占有了锁。compareAndSetState 也是通过CAS来实现。CAS 是原子操作,而且state的类型是volatile,所以state 的值是线程安全的。
  2. 如果此时状态不是0,那么再判断当前线程是不是锁的owner,如果是的话,则state 递增,当state溢出时,会抛错。如果没溢出,则返回true,表示成功获取锁。
  3. 上述都不满足,则返回false,获取锁失败。

至此,JAVA层面的实现基本说清楚了,小结一下,整个框架如下所示:

关于unlock的实现,限于篇幅,就不讨论了,下文重点分析lock过程中是如何把当前线程阻塞挂起的,就是上图中的unsafe.park()是如何实现的。

二 JVM层

Unsafe.park和Unsafe.unpark 是sun.misc.Unsafe类的native 方法,

1
2
3
java复制代码public native void unpark(Object var1);

public native void park(boolean var1, long var2);

这两个方法的实现是在JVM的hotspot/src/share/vm/prims/unsafe.cpp 文件中,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
scss复制代码UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
UnsafeWrapper("Unsafe_Park");
EventThreadPark event;
#ifndef USDT2
HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
#else /* USDT2 */
HOTSPOT_THREAD_PARK_BEGIN(
(uintptr_t) thread->parker(), (int) isAbsolute, time);
#endif /* USDT2 */
JavaThreadParkedState jtps(thread, time != 0);
thread->parker()->park(isAbsolute != 0, time);
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
#else /* USDT2 */
HOTSPOT_THREAD_PARK_END(
(uintptr_t) thread->parker());
#endif /* USDT2 */
if (event.should_commit()) {
const oop obj = thread->current_park_blocker();
if (time == 0) {
post_thread_park_event(&event, obj, min_jlong, min_jlong);
} else {
if (isAbsolute != 0) {
post_thread_park_event(&event, obj, min_jlong, time);
} else {
post_thread_park_event(&event, obj, time, min_jlong);
}
}
}
UNSAFE_END

核心是逻辑是thread->parker()->park(isAbsolute != 0, time); 就是获取java线程的parker对象,然后执行它的park方法。每个java线程都有一个Parker实例,Parker类是这样定义的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
ini复制代码class Parker : public os::PlatformParker {
private:
volatile int _counter ;
...
public:
void park(bool isAbsolute, jlong time);
void unpark();
...
}
class PlatformParker : public CHeapObj<mtInternal> {
protected:
enum {
REL_INDEX = 0,
ABS_INDEX = 1
};
int _cur_index; // which cond is in use: -1, 0, 1
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [2] ; // one for relative times and one for abs.

public: // TODO-FIXME: make dtor private
~PlatformParker() { guarantee (0, "invariant") ; }

public:
PlatformParker() {
int status;
status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());
assert_status(status == 0, status, "cond_init rel");
status = pthread_cond_init (&_cond[ABS_INDEX], NULL);
assert_status(status == 0, status, "cond_init abs");
status = pthread_mutex_init (_mutex, NULL);
assert_status(status == 0, status, "mutex_init");
_cur_index = -1; // mark as unused
}
};

park方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
ini复制代码void Parker::park(bool isAbsolute, jlong time) {
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
if (Atomic::xchg(0, &_counter) > 0) return;

Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;

if (Thread::is_interrupted(thread, false)) {
return;
}

// Next, demultiplex/decode time arguments
timespec absTime;
if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
return;
}
if (time > 0) {
unpackTime(&absTime, isAbsolute, time);
}

////进入safepoint region,更改线程为阻塞状态
ThreadBlockInVM tbivm(jt);
// Don't wait if cannot get lock since interference arises from
// unblocking. Also. check interrupt before trying wait
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
//如果线程被中断,或者尝试给互斥变量加锁时失败,比如被其它线程锁住了,直接返回
return;
}
//到这里,意味着pthread_mutex_trylock(_mutex)成功
int status ;
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
OrderAccess::fence();
return;
}

#ifdef ASSERT
// Don't catch signals while blocked; let the running threads have the signals.
// (This allows a debugger to break into the running thread.)
sigset_t oldsigs;
sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

assert(_cur_index == -1, "invariant");
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
} else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
if (status != 0 && WorkAroundNPTLTimedWaitHang) {
pthread_cond_destroy (&_cond[_cur_index]) ;
pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
}
}
_cur_index = -1;
assert_status(status == 0 || status == EINTR ||
status == ETIME || status == ETIMEDOUT,
status, "cond_timedwait");

#ifdef ASSERT
pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif

_counter = 0 ;
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, "invariant") ;
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();

// If externally suspended while waiting, re-suspend
if (jt->handle_special_suspend_equivalent_condition()) {
jt->java_suspend_self();
}
}

park的思路:parker内部有个关键字段_counter, 这个counter用来记录所谓的“permit”,当_counter大于0时,意味着有permit,然后就可以把_counter设置为0,就算是获得了permit,可以继续运行后面的代码。如果此时_counter不大于0,则等待这个条件满足。

下面我具体来看看park的具体实现:

  1. 当调用park时,先尝试能否直接拿到“许可”,即_counter>0时,如果成功,则把_counter设置为0,并返回。
  2. 如果不成功,则把线程的状态设置成_thread_in_vm并且_thread_blocked。_thread_in_vm 表示线程当前在JVM中执行,_thread_blocked表示线程当前阻塞了。
  3. 拿到mutex之后,再次检查_counter是不是>0,如果是,则把_counter设置为0,unlock mutex并返回
  4. 如果_counter还是不大于0,则判断等待的时间是否等于0,然后调用相应的pthread_cond_wait系列函数进行等待,如果等待返回(即有人进行unpark,则pthread_cond_signal来通知),则把_counter设置为0,unlock mutex并返回。

所以本质上来讲,LockSupport.park 是通过pthread库的条件变量pthread_cond_t来实现的。下面我们就来看看pthread_cond_t 是怎么实现的。

三 GLIBC 层

pthread_cond_t 典型的用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
scss复制代码#include < pthread.h>
#include < stdio.h>
#include < stdlib.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /*初始化互斥锁*/
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; //初始化条件变量

void *thread1(void *);
void *thread2(void *);

int i=1;
int main(void)
{
pthread_t t_a;
pthread_t t_b;
pthread_create(&t_a,NULL,thread1,(void *)NULL);/*创建进程t_a*/
pthread_create(&t_b,NULL,thread2,(void *)NULL); /*创建进程t_b*/
pthread_join(t_b, NULL);/*等待进程t_b结束*/
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
exit(0);
}
void *thread1(void *junk)
{
for(i=1;i<=9;i++)
{
pthread_mutex_lock(&mutex);//
if(i%3==0)
pthread_cond_signal(&cond);/*条件改变,发送信号,通知t_b进程*/
else
printf("thead1:%d/n",i);
pthread_mutex_unlock(&mutex);//*解锁互斥量*/
printf("Up Unlock Mutex/n");
sleep(1);
}
}
void *thread2(void *junk)
{
while(i<9)
{
pthread_mutex_lock(&mutex);
if(i%3!=0)
pthread_cond_wait(&cond,&mutex);/*等待*/
printf("thread2:%d/n",i);
pthread_mutex_unlock(&mutex);
printf("Down Ulock Mutex/n");
sleep(1);
}

}

重点就是:无论是pthread_cond_wait还是pthread_cond_signal 都必须得先pthread_mutex_lock。如果没有这个保护,可能会产生race condition,漏掉信号。pthread_cond_wait()函数一进入wait状态就会自动release mutex。当其他线程通过pthread_cond_signal或pthread_cond_broadcast把该线程唤醒,使pthread_cond_wait()返回时,该线程又自动获得该mutex。

整个过程如下图所示:

1 pthread_mutex_lock

例如,在Linux中,使用了称为Futex(快速用户空间互斥锁的简称)的系统。

在此系统中,对用户空间中的互斥变量执行原子增量和测试操作。

如果操作结果表明锁上没有争用,则对pthread_mutex_lock的调用将返回,而无需将上下文切换到内核中,因此获取互斥量的操作可以非常快。

仅当检测到争用时,系统调用(称为futex)才会发生,并且上下文切换到内核中,这会使调用进程进入睡眠状态,直到释放互斥锁为止。

还有很多更多的细节,尤其是对于可靠和/或优先级继承互斥,但这就是它的本质。

nptl/pthread_mutex_lock.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
ini复制代码int
PTHREAD_MUTEX_LOCK (pthread_mutex_t *mutex)
{
/* See concurrency notes regarding mutex type which is loaded from __kind
in struct __pthread_mutex_s in sysdeps/nptl/bits/thread-shared-types.h. */
unsigned int type = PTHREAD_MUTEX_TYPE_ELISION (mutex);

LIBC_PROBE (mutex_entry, 1, mutex);

if (__builtin_expect (type & ~(PTHREAD_MUTEX_KIND_MASK_NP
| PTHREAD_MUTEX_ELISION_FLAGS_NP), 0))
return __pthread_mutex_lock_full (mutex);

if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP))
{
FORCE_ELISION (mutex, goto elision);
simple:
/* Normal mutex. */
LLL_MUTEX_LOCK_OPTIMIZED (mutex);
assert (mutex->__data.__owner == 0);
}
#if ENABLE_ELISION_SUPPORT
else if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_ELISION_NP))
{
elision: __attribute__((unused))
/* This case can never happen on a system without elision,
as the mutex type initialization functions will not
allow to set the elision flags. */
/* Don't record owner or users for elision case. This is a
tail call. */
return LLL_MUTEX_LOCK_ELISION (mutex);
}
#endif
else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex)
== PTHREAD_MUTEX_RECURSIVE_NP, 1))
{
/* Recursive mutex. */
pid_t id = THREAD_GETMEM (THREAD_SELF, tid);

/* Check whether we already hold the mutex. */
if (mutex->__data.__owner == id)
{
/* Just bump the counter. */
if (__glibc_unlikely (mutex->__data.__count + 1 == 0))
/* Overflow of the counter. */
return EAGAIN;

++mutex->__data.__count;

return 0;
}

/* We have to get the mutex. */
LLL_MUTEX_LOCK_OPTIMIZED (mutex);

assert (mutex->__data.__owner == 0);
mutex->__data.__count = 1;
}
else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex)
== PTHREAD_MUTEX_ADAPTIVE_NP, 1))
{
if (LLL_MUTEX_TRYLOCK (mutex) != 0)
{
int cnt = 0;
int max_cnt = MIN (max_adaptive_count (),
mutex->__data.__spins * 2 + 10);
do
{
if (cnt++ >= max_cnt)
{
LLL_MUTEX_LOCK (mutex);
break;
}
atomic_spin_nop ();
}
while (LLL_MUTEX_TRYLOCK (mutex) != 0);

mutex->__data.__spins += (cnt - mutex->__data.__spins) / 8;
}
assert (mutex->__data.__owner == 0);
}
else
{
pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
assert (PTHREAD_MUTEX_TYPE (mutex) == PTHREAD_MUTEX_ERRORCHECK_NP);
/* Check whether we already hold the mutex. */
if (__glibc_unlikely (mutex->__data.__owner == id))
return EDEADLK;
goto simple;
}

pid_t id = THREAD_GETMEM (THREAD_SELF, tid);

/* Record the ownership. */
mutex->__data.__owner = id;
#ifndef NO_INCR
++mutex->__data.__nusers;
#endif

LIBC_PROBE (mutex_acquired, 1, mutex);

return 0;
}

pthread_mutex_t的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
arduino复制代码typedef union
{
struct __pthread_mutex_s
{
int __lock;
unsigned int __count;
int __owner;
unsigned int __nusers;
int __kind;
int __spins;
__pthread_list_t __list;
} __data;
......
} pthread_mutex_t;

其中__kind字段是指锁的类型,取值如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ini复制代码/* Mutex types.  */
enum
{
PTHREAD_MUTEX_TIMED_NP,
PTHREAD_MUTEX_RECURSIVE_NP,
PTHREAD_MUTEX_ERRORCHECK_NP,
PTHREAD_MUTEX_ADAPTIVE_NP
#if defined __USE_UNIX98 || defined __USE_XOPEN2K8
,
PTHREAD_MUTEX_NORMAL = PTHREAD_MUTEX_TIMED_NP,
PTHREAD_MUTEX_RECURSIVE = PTHREAD_MUTEX_RECURSIVE_NP,
PTHREAD_MUTEX_ERRORCHECK = PTHREAD_MUTEX_ERRORCHECK_NP,
PTHREAD_MUTEX_DEFAULT = PTHREAD_MUTEX_NORMAL
#endif
#ifdef __USE_GNU
/* For compatibility. */
, PTHREAD_MUTEX_FAST_NP = PTHREAD_MUTEX_TIMED_NP
#endif
};

其中: 

  • PTHREAD_MUTEX_TIMED_NP,这是缺省值,也就是普通锁。
  • PTHREAD_MUTEX_RECURSIVE_NP,可重入锁,允许同一个线程对同一个锁成功获得多次,并通过多次unlock解锁。
  • PTHREAD_MUTEX_ERRORCHECK_NP,检错锁,如果同一个线程重复请求同一个锁,则返回EDEADLK,否则与PTHREAD_MUTEX_TIMED_NP类型相同。
  • PTHREAD_MUTEX_ADAPTIVE_NP,自适应锁,自旋锁与普通锁的混合。

mutex默认用的是PTHREAD_MUTEX_TIMED_NP,所以会走到LLL_MUTEX_LOCK_OPTIMIZED,这是个宏:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
arduino复制代码# define LLL_MUTEX_LOCK_OPTIMIZED(mutex) lll_mutex_lock_optimized (mutex)

lll_mutex_lock_optimized (pthread_mutex_t *mutex)
{
/* The single-threaded optimization is only valid for private
mutexes. For process-shared mutexes, the mutex could be in a
shared mapping, so synchronization with another process is needed
even without any threads. If the lock is already marked as
acquired, POSIX requires that pthread_mutex_lock deadlocks for
normal mutexes, so skip the optimization in that case as
well. */
int private = PTHREAD_MUTEX_PSHARED (mutex);
if (private == LLL_PRIVATE && SINGLE_THREAD_P && mutex->__data.__lock == 0)
mutex->__data.__lock = 1;
else
lll_lock (mutex->__data.__lock, private);
}

由于不是LLL_PRIVATE,所以走lll_lock, lll_lock也是个宏:

1
2
csharp复制代码#define lll_lock(futex, private)        \
__lll_lock (&(futex), private)

注意这里出现了futex,本文的后续主要就是围绕它展开的。

1
2
3
4
5
6
7
8
9
10
11
12
13
scss复制代码#define __lll_lock(futex, private)                                      \
((void) \
({ \
int *__futex = (futex); \
if (__glibc_unlikely \
(atomic_compare_and_exchange_bool_acq (__futex, 1, 0))) \
{ \
if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
__lll_lock_wait_private (__futex); \
else \
__lll_lock_wait (__futex, private); \
} \
}))

其中,atomic_compare_and_exchange_bool_acq是尝试通过原子操作尝试将__futex(就是mutex->__data.__lock)从0变为1,如果成功就直接返回了,如果失败,则调用__lll_lock_wait,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
arduino复制代码void
__lll_lock_wait (int *futex, int private)
{
if (atomic_load_relaxed (futex) == 2)
goto futex;

while (atomic_exchange_acquire (futex, 2) != 0)
{
futex:
LIBC_PROBE (lll_lock_wait, 1, futex);
futex_wait ((unsigned int *) futex, 2, private); /* Wait if *futex == 2. */
}
}

在这里先要说明一下,pthread将futex的锁状态定义为3种:

  • 0,代表当前锁空闲无锁,可以进行快速上锁,不需要进内核。
  • 1,代表有线程持有当前锁,如果这时有其它线程需要上锁,就必须标记futex为“锁竞争”,然后通过futex系统调用进内核把当前线程挂起。
  • 2,代表锁竞争,有其它线程将要或正在内核的futex系统中排队等待锁。

所以上锁失败进入到__lll_lock_wait这里后,先判断futex 是不是等于2,如果是则说明大家都在排队,你也排着吧(直跳转到futex_wait)。如果不等于2,那说明你是第一个来竞争的人,把futex设置成2,告诉后面来的人要排队,然后自己以身作则先排队。

futex_wait 实质上就是调用futex系统调用。在第四节,我们就来仔细分析这个系统调用。

2 pthread_cond_wait

本质也是走到futex系统调用,限于篇幅就不展开了。

四 内核层

为什么要有futex,它解决什么问题?何时加入内核的?

简单来讲,futex的解决思路是:在无竞争的情况下操作完全在user space进行,不需要系统调用,仅在发生竞争的时候进入内核去完成相应的处理(wait 或者 wake up)。所以说,futex是一种user mode和kernel mode混合的同步机制,需要两种模式合作才能完成,futex变量位于user space,而不是内核对象,futex的代码也分为user mode和kernel mode两部分,无竞争的情况下在user mode,发生竞争时则通过sys_futex系统调用进入kernel mode进行处理。

用户态的部分已经在前面讲解了,本节重点讲解futex在内核部分的实现。

futex 设计了三个基本数据结构:futex_hash_bucket,futex_key,futex_q。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
arduino复制代码struct futex_hash_bucket {
atomic_t waiters;
spinlock_t lock;
struct plist_head chain;
} ____cacheline_aligned_in_smp;

struct futex_q {
struct plist_node list;
struct task_struct *task;
spinlock_t *lock_ptr;
union futex_key key; //唯一标识uaddr的key值
struct futex_pi_state *pi_state;
struct rt_mutex_waiter *rt_waiter;
union futex_key *requeue_pi_key;
u32 bitset;
};

union futex_key {
struct {
unsigned long pgoff;
struct inode *inode;
int offset;
} shared;
struct {
unsigned long address;
struct mm_struct *mm;
int offset;
} private;
struct {
unsigned long word;
void *ptr;
int offset;
} both;
};

其实还有个struct __futex_data, 如下所示,这个

1
2
3
4
5
6
7
c复制代码static struct {
struct futex_hash_bucket *queues;
unsigned long hashsize;
} __futex_data __read_mostly __aligned(2*sizeof(long));

#define futex_queues (__futex_data.queues)
#define futex_hashsize (__futex_data.hashsize)

在futex初始化的时候(futex_init),会确定hashsize,比如24核cpu时,hashsize = 8192。然后根据这个hashsize调用alloc_large_system_hash分配数组空间,并初始化数组元素里的相关字段,比如plist_head, lock。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
ini复制代码static int __init futex_init(void)
{
unsigned int futex_shift;
unsigned long i;

#if CONFIG_BASE_SMALL
futex_hashsize = 16;
#else
futex_hashsize = roundup_pow_of_two(256 * num_possible_cpus());
#endif

futex_queues = alloc_large_system_hash("futex", sizeof(*futex_queues),
futex_hashsize, 0,
futex_hashsize < 256 ? HASH_SMALL : 0,
&futex_shift, NULL,
futex_hashsize, futex_hashsize);
futex_hashsize = 1UL << futex_shift;

futex_detect_cmpxchg();

for (i = 0; i < futex_hashsize; i++) {
atomic_set(&futex_queues[i].waiters, 0);
plist_head_init(&futex_queues[i].chain);
spin_lock_init(&futex_queues[i].lock);
}

return 0;
}

这些数据结构之间的关系如下所示:

脑子里有了数据结构,流程就容易理解了。futex_wait的总体流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
ini复制代码static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
ktime_t *abs_time, u32 bitset)
{
struct hrtimer_sleeper timeout, *to = NULL;
struct restart_block *restart;
struct futex_hash_bucket *hb;
struct futex_q q = futex_q_init;
int ret;

if (!bitset)
return -EINVAL;
q.bitset = bitset;

if (abs_time) {
to = &timeout;
hrtimer_init_on_stack(&to->timer, (flags & FLAGS_CLOCKRT) ?
CLOCK_REALTIME : CLOCK_MONOTONIC,
HRTIMER_MODE_ABS);
hrtimer_init_sleeper(to, current);
hrtimer_set_expires_range_ns(&to->timer, *abs_time,
current->timer_slack_ns);
}

retry:
/*
* Prepare to wait on uaddr. On success, holds hb lock and increments
* q.key refs.
*/
ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
if (ret)
goto out;

/* queue_me and wait for wakeup, timeout, or a signal. */
futex_wait_queue_me(hb, &q, to);

/* If we were woken (and unqueued), we succeeded, whatever. */
ret = 0;
/* unqueue_me() drops q.key ref */
if (!unqueue_me(&q))
goto out;
ret = -ETIMEDOUT;
if (to && !to->task)
goto out;

/*
* We expect signal_pending(current), but we might be the
* victim of a spurious wakeup as well.
*/
if (!signal_pending(current))
goto retry;

ret = -ERESTARTSYS;
if (!abs_time)
goto out;

restart = &current->restart_block;
restart->fn = futex_wait_restart;
restart->futex.uaddr = uaddr;
restart->futex.val = val;
restart->futex.time = *abs_time;
restart->futex.bitset = bitset;
restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;

ret = -ERESTART_RESTARTBLOCK;

out:
if (to) {
hrtimer_cancel(&to->timer);
destroy_hrtimer_on_stack(&to->timer);
}
return ret;
}

函数 futex_wait_setup主要做两件事,一是对uaddr进行hash,找到futex_hash_bucket并获取它上面的自旋锁,二是判断*uaddr是否为预期值。如果不相等则会立即返回,由用户态继续trylock。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
scss复制代码*
* futex_wait_setup() - Prepare to wait on a futex
* @uaddr: the futex userspace address
* @val: the expected value
* @flags: futex flags (FLAGS_SHARED, etc.)
* @q: the associated futex_q
* @hb: storage for hash_bucket pointer to be returned to caller
*
* Setup the futex_q and locate the hash_bucket. Get the futex value and
* compare it with the expected value. Handle atomic faults internally.
* Return with the hb lock held and a q.key reference on success, and unlocked
* with no q.key reference on failure.
*
* Return:
* - 0 - uaddr contains val and hb has been locked;
* - <1 - -EFAULT or -EWOULDBLOCK (uaddr does not contain val) and hb is unlocked
*/
static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,
struct futex_q *q, struct futex_hash_bucket **hb)
{
u32 uval;
int ret;
retry:
//初始化futex_q, 把uaddr设置到futex_key的字段中,将来futex_wake时也是通过这个key来查找futex。
ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &q->key, VERIFY_READ);
if (unlikely(ret != 0))
return ret;

retry_private:
//根据key计算hash,然后在数组里找到对应的futex_hash_bucket
*hb = queue_lock(q);
//原子地将uaddr的值读到uval中
ret = get_futex_value_locked(&uval, uaddr);

if (ret) {
queue_unlock(*hb);

ret = get_user(uval, uaddr);
if (ret)
goto out;

if (!(flags & FLAGS_SHARED))
goto retry_private;

put_futex_key(&q->key);
goto retry;
}
//如果当前uaddr指向的值不等于val,即说明其他进程修改了
//uaddr指向的值,等待条件不再成立,不用阻塞直接返回。
if (uval != val) {
queue_unlock(*hb);
ret = -EWOULDBLOCK;
}

out:
if (ret)
put_futex_key(&q->key);
return ret;
}

然后调用futex_wait_queue_me 把当前进程挂起:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
scss复制代码/**
* futex_wait_queue_me() - queue_me() and wait for wakeup, timeout, or signal
* @hb: the futex hash bucket, must be locked by the caller
* @q: the futex_q to queue up on
* @timeout: the prepared hrtimer_sleeper, or null for no timeout
*/
static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
struct hrtimer_sleeper *timeout)
{
/*
* The task state is guaranteed to be set before another task can
* wake it. set_current_state() is implemented using smp_store_mb() and
* queue_me() calls spin_unlock() upon completion, both serializing
* access to the hash list and forcing another memory barrier.
*/
set_current_state(TASK_INTERRUPTIBLE);
queue_me(q, hb);

/* Arm the timer */
if (timeout)
hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);

/*
* If we have been removed from the hash list, then another task
* has tried to wake us, and we can skip the call to schedule().
*/
if (likely(!plist_node_empty(&q->list))) {
/*
* If the timer has already expired, current will already be
* flagged for rescheduling. Only call schedule if there
* is no timeout, or if it has yet to expire.
*/
if (!timeout || timeout->task)
freezable_schedule();
}
__set_current_state(TASK_RUNNING);
}

futex_wait_queue_me主要做几件事:

  1. 将当前进程插入到等待队列,就是把futex_q 挂到futex_hash_bucket上
  2. 启动定时任务
  3. 主动触发内核进程调度

五 总结

本文主要是对JAVA中的ReentrantLock.lock流程进行了自上而下的梳理。

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Sentry 监控 - Snuba 数据中台架构(SnQL

发表于 2021-11-29

本文描述了 Snuba 查询语言 (SnQL)。

系列

  • 1 分钟快速使用 Docker 上手最新版 Sentry-CLI - 创建版本
  • 快速使用 Docker 上手 Sentry-CLI - 30 秒上手 Source Maps
  • Sentry For React 完整接入详解
  • Sentry For Vue 完整接入详解
  • Sentry-CLI 使用详解
  • Sentry Web 性能监控 - Web Vitals
  • Sentry Web 性能监控 - Metrics
  • Sentry Web 性能监控 - Trends
  • Sentry Web 前端监控 - 最佳实践(官方教程)
  • Sentry 后端监控 - 最佳实践(官方教程)
  • Sentry 监控 - Discover 大数据查询分析引擎
  • Sentry 监控 - Dashboards 数据可视化大屏
  • Sentry 监控 - Environments 区分不同部署环境的事件数据
  • Sentry 监控 - Security Policy 安全策略报告
  • Sentry 监控 - Search 搜索查询实战
  • Sentry 监控 - Alerts 告警
  • Sentry 监控 - Distributed Tracing 分布式跟踪
  • Sentry 监控 - 面向全栈开发人员的分布式跟踪 101 系列教程(一)
  • Sentry 监控 - Snuba 数据中台架构简介(Kafka+Clickhouse)
  • Sentry 监控 - Snuba 数据中台架构(Data Model 简介)
  • Sentry 监控 - Snuba 数据中台架构(Query Processing 简介)
  • Sentry 官方 JavaScript SDK 简介与调试指南
  • Sentry 监控 - Snuba 数据中台架构(编写和测试 Snuba 查询)

以下是 SnQL 的查询结构:

1
2
3
4
5
6
7
8
9
10
11
sql复制代码MATCH simple | join | subquery
SELECT [expressions] | [aggregations BY expressions]
ARRAY JOIN [column]
WHERE condition [[AND | OR] condition]*
HAVING condition [[AND | OR] condition]*
ORDER BY expressions ASC|DESC [, expressions ASC|DESC]*
LIMIT expression BY n
LIMIT n
OFFSET n
GRANULARITY n
TOTALS boolean

这些查询作为字符串发送到 /:dataset/snql 端点,编码为以下格式的 JSON body:

1
2
3
4
5
6
7
json复制代码{
"query": "<query>",
"dataset": "<dataset>",
"consistent": bool,
"turbo": bool,
"debug": bool,
}

数据集(dataset)通过查询使用的 url 隐含。在 JSON 主体中,除了 query 之外的所有字段都是可选的。

MATCH

我们的数据模型由实体图表示。该子句标识了我们正在查询的子图(subgraphs)的模式。目前支持三种类型的 MATCH 子句:

Simple:

MATCH (<entity> [SAMPLE n])

这相当于我们当前的所有查询。 这是从单个实体(事件、事务等)查询数据。可以通过将其与实体一起添加来向查询添加可选 sample。

例如:MATCH (events)

Subquery:

MATCH { <query> }

花括号内可以是另一个完整的 SQL 查询。子查询的 SELECT/BY 子句中的任何内容都将使用指定的别名在外部查询中公开。

例如:

1
2
3
4
5
sql复制代码MATCH {
MATCH (transactions)
SELECT avg(duration) AS avg_d BY transaction
}
SELECT max(avg_d)

Join(连接):

MATCH (<alias>: <entity> [SAMPLE n]) -[<join>]-> (<alias>: <entity> [SAMPLE n])

一个 join 代表一个多节点子图(subgraph),是一个包含不同节点之间的多个关系的子图。目前支持节点之间的 1..n、n..1 和 1..1 有向关系。

对于 JOIN,每个实体都必须有一个别名,这是一个唯一的字符串。 抽样(Sampling)也可以应用于 join 中的任何实体。<join> 是在 Snuba 中的 Entity 中指定的字符串,是一组 join 条件的简写。可以有多个 join 子句,用逗号分隔。

例如:

1
2
3
4
5
sql复制代码MATCH
(e: events) -[grouped]-> (g: groupedmessage),
(e: events) -[assigned]-> (a: groupassignee)
SELECT count() AS tot BY e.project_id, g.id
WHERE a.user_id = "somebody"

join 类型(left/inner)和 join key 是数据模型的一部分,而不是查询的一部分。它们被硬编码在实体代码中。 这是因为没有实体可以安全地与底层数据库的分布式版本中的任何其他实体连接。

match 子句提供给 where 子句的元组(tuple)看起来与传统 join 子句生成的元组完全一样:

1
2
3
4
5
6
json复制代码[
{"e.project_id": 1, "g.id": 10}
{"e.project_id": 1, "g.id": 11}
{"e.project_id": 2, "g.id": 20}
...
]

SELECT .. BY

该子句指定应在输出中返回哪些结果。如果存在聚合(aggregation),则 BY 子句中的所有内容都被视为分组 key。 如果我们想要聚合整个结果集,则可以在没有 BY 子句的情况下进行聚合,但在这种情况下,SELECT 中只能包含聚合。即使有 BY 子句,空的 SELECT 子句也是无效的。

SELECT 子句中的表达式可以是列、算术、函数或三者的任意组合。 如果查询是 join,则每一列都必须有一个符合条件的别名,该别名与 MATCH 子句中的实体别名之一匹配。

WHERE

这是在聚合之前发生的查询的过滤器(如 SQL 中的 WHERE)。

条件是 LHS OP RHS* 形式的中缀表达式,其中 LHS 和 RHS 是字面值或表达式。OP 指的是一个特定的运算符来比较两个值。 这些运算符是 =、!=、<、<=、>、>=、IN、NOT IN、LIKE、NOT LIKE、IS NULL、IS NOT NULL 之一。请注意,当使用像 IS NULL 这样的运算符时,RHS 是可选的。

可以使用布尔关键字 AND 或 OR 组合条件。它们也可以使用 () 进行分组。

HAVING

像 WHERE 子句一样工作,但它在 SELECT 子句中声明的聚合之后应用。 所以我们可以在这里对聚合函数的结果应用条件。

ORDER BY

指定对结果集进行排序的表达式。

LIMIT BY/LIMIT/OFFSET

不言自明,它们采用整数并在 Clickhouse 查询中设置相应的值。 如果查询未指定 limit 或 offset,它们将分别默认为 1000 和 0。

GRANULARITY

一个整数,表示对基于时间的结果进行分组的粒度。

TOTALS

如果设置为 True,来自 Snuba 的响应将有一个 “totals” key,其中包含所有选定行的总值。

SAMPLE

如果 MATCH 子句中的节点未提供采样率,则可以在此处指定。 在这种情况下,Snuba 会将 sample right 分配给查询中的节点之一。sample 可以是介于 0 和 1 之间的浮点数,表示要采样的行的百分比。

或者它可以是一个大于 1 的整数,表示要采样的行数。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

TCP协议的粘包问题&解决方式 粘包问题 解决粘包问题 结语

发表于 2021-11-29

本文正在参与 “网络协议必知必会”征文活动

粘包问题

首先说明一点,TCP有粘包问题,UDP没有粘包问题。

发送端可以是1KB地发送数据,而接收端的应用程序可以2KB地提走数据,当然也有可能一次提走3K或6K数据,或者一次只提走几个字节的数据,也就是说,应用程序所看到的数据是一个整体,或说是一个流(stream),一条消息有多少字节对应用程序是不可见的,因此TCP协议是面向流的协议,这也是容易出现粘包问题的原因。而UDP是面向消息的协议,每个UDP段都是一条消息,应用程序必须以消息为单位提取数据,不能一次提取任意字节的数据,这一点和TCP是很不同的。

TCP是面向连接的,面向流的,提供高可靠性服务。 收发两端(客户端和服务器端)都要有一一成对的socket,因此,发送端为了将多个发往接收端的包,更有效的发到对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。 这样,接收端就难于分辨出来了,必须提供科学的拆包机制。 即面向流的通信是无消息保护边界的。

UDP是无连接的,面向消息的,提供高效率服务。 不会使用块的合并优化算法,, 由于UDP支持的是一对多的模式,所以接收端的skbuff(套接字缓冲区)采用了链式结构来记录每一个到达的UDP包,在每个UDP包中就有了消息头(消息来源地址,端口等信息)。 这样,对于接收端来说,就容易进行区分处理了。 即面向消息的通信是有消息保护边界的。

**基于TCP协议的粘包问题主要就是因为接收方不知道消息之间的界限,不知道每次应该提取多少字节的数据造成的。**总结一下,会在两种情况下出现粘包问题:

  • 将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包,造成粘包
  • 客户端发送了一段数据,服务端只收了一小部分,服务端下次再收的时候还是从缓冲区拿上次遗留的数据,产生粘包

解决粘包问题

问题的根源在于,接收端不知道发送端将要传送的字节流的长度,所以解决粘包的方法就是围绕,如何让发送端在发送数据前,把自己将要发送的字节流总大小让接收端知晓,然后接收端来一个循环接收完所有数据。每个消息接收完,不留残余。

具体做法是:发送端发数据前,先将待发的数据长度告知接收端。将数据长度放在一个固定长度的字节中发给接收端;接收端先接收这个固定长度的数据头,从这个数据头中获悉待接收数据的长度,做好循环接收的准备。

可以借助struct模块,该模块可以将任意数据类型转换成固定长度的bytes,因此借助该模块就可以将发送方发送的数据总大小通过struct模块打包成固定长度的bytes,接收端接收后获取发送方发送的数据总大小之后,使用循环接收即可。

把报头做成字典,字典里包含将要发送的真实数据的详细信息,然后json序列化,然后用struck将序列化后的数据长度打包成4个字节。

发送时: 先发报头长度,再编码报头内容然后发送,最后发真实内容;

接收时: 先接收报头长度,用struct取出来,根据取出的长度收取报头内容,然后解码反序列化,从反序列化的结果中取出待取数据的详细信息,然后去取真实的数据内容。

服务端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
python复制代码import subprocess
import struct
import json
import socket

server=socket(AF_INET,SOCK_STREAM)
server.bind(('127.0.0.1',8083))
server.listen(5)

# 服务端应该做两件事
# 第一件事:循环地从板连接池中取出链接请求与其建立双向链接,拿到链接对象
while True:
conn,client_addr=server.accept()

# 第二件事:拿到链接对象,与其进行通信循环
while True:
try:
cmd=conn.recv(1024)
if len(cmd) == 0:break
obj=subprocess.Popen(cmd.decode('utf-8'),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)

stdout_res=obj.stdout.read()
stderr_res=obj.stderr.read()
total_size=len(stdout_res)+len(stderr_res)

# 1、制作头
header_dic={
"filename":"a.txt",
"total_size":total_size,
"md5":"1111111111"
}

json_str = json.dumps(header_dic)
json_str_bytes = json_str.encode('utf-8')


# 2、先把头的长度发过去
x=struct.pack('i',len(json_str_bytes))
conn.send(x)

# 3、发头信息
conn.send(json_str_bytes)
# 4、再发真实的数据
conn.send(stdout_res)
conn.send(stderr_res)

except Exception:
break
conn.close()

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
python复制代码import struct
import json
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8083))

while True:
cmd=input('cmd>>:').strip()
if len(cmd) == 0: continue
client.send(cmd.encode('utf-8'))

# 接收端
# 1、先收4个字节,从中提取接下来要收的头的长度
x = client.recv(4)
header_len=struct.unpack('i',x)[0]

# 2、接收头,并解析
json_str_bytes=client.recv(header_len)
json_str=json_str_bytes.decode('utf-8')
header_dic=json.loads(json_str)
print(header_dic)
total_size=header_dic["total_size"]

# 3、接收真实的数据
recv_size = 0
while recv_size < total_size:
recv_data=client.recv(1024)
recv_size+=len(recv_data)
print(recv_data.decode('utf-8'),end='')
else:
print()

结语

文章首发于微信公众号程序媛小庄,同步于掘金、知乎。

码字不易,转载请说明出处,走过路过的小伙伴们伸出可爱的小指头点个赞再走吧(╹▽╹)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Kubernetes 的 Taint 和 Toleratio

发表于 2021-11-29

作者:SRE运维博客

博客地址: www.cnsre.cn/

文章地址:www.cnsre.cn/posts/21112…

相关话题:www.cnsre.cn/kubernetes/


Taint 和 Toleration(污点和容忍)

在 k8s 集群中,节点亲和性 NodeAffinity 是一种 Pod 上定义的属性,能够让 Pod 可以按找我们的需求调度到指定节点上,而 Taints (污点) 则于NodeAffinity (节点亲和性)是相反的,它是一种 Node 上定义的属性,可以让 Pod 不被调度到带污点的节点上,甚至会对带污点节点上已有的 Pod 进行驱逐。对应的 k8s 可以给 Pod 设置 Tolerations(容忍) 让 Pod 能够对有污点的节点设置容忍,将 Pod 调度到该节点。 Taints 一般是配合 Tolerations 使用的。

为 node 设置污点和容忍

1
2
3
makefile复制代码NoSchedule: 一定不能被调度
PreferNoSchedule: 尽量不要调度
NoExecute: 不仅不会调度, 还会驱逐Node上已有的Pod

为 node1 设置 taint:

1
2
3
bash复制代码kubectl taint nodes k8s-node1 key1=value1:NoSchedule
kubectl taint nodes k8s-node1 key1=value1:NoExecute
kubectl taint nodes k8s-node1 key2=value2:NoSchedule

查看 node1 上的 taint:

1
bash复制代码kubectl describe nodes k8s-node1 |grep Taint

删除上面的 taint:

1
2
3
4
bash复制代码kubectl taint nodes k8s-node1 key1:NoSchedule-
kubectl taint nodes k8s-node1 key1:NoExecute-
kubectl taint nodes k8s-node1 key2:NoSchedule-
kubectl taint nodes k8s-node1 key1- # 删除指定key所有的effect

为 pod 设置 toleration

只要在 pod 的 spec 中设置 tolerations 字段即可,可以有多个 key,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
yaml复制代码tolerations:            # containers同级
- key: "key1" # 能容忍的污点key
operator: "Equal" # Equal等于表示key=value , Exists不等于,表示当值不等于下面value正常
value: "value1" # 值
effect: "NoSchedule" # effect策略,可以设置为 NoSchedule、PreferNoSchedule 或 NoExecute
- key: "key1"
operator: "Equal"
value: "value1"
effect: "NoExecute"
- key: "node.alpha.kubernetes.io/unreachable"
operator: "Exists"
effect: "NoExecute"
tolerationSeconds: 4000
  • tolerations 和 containers 同级。
  • key 能容忍的污点 key。
  • operator Equal 等于表示 key=value , Exists 不等于,表示当值不等于下面 value 正常
  • value 可以设置为 NoSchedule、PreferNoSchedule 或 NoExecute。
  • effect effect 策略
  • tolerationSeconds 是当 pod 需要被驱逐时,可以继续在 node 上运行的时间。

具体的使用方法请参考官方文档。


作者:SRE运维博客

博客地址: www.cnsre.cn/

文章地址:www.cnsre.cn/posts/21112…

相关话题:www.cnsre.cn/tags/kubern…


本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

C++异常机制概述

发表于 2021-11-29

C++异常机制概述

异常处理是C++的一项语言机制,用于在程序中处理异常事件。异常事件在 C++ 中表示为异常对象。异常事件发生时,程序使用throw关键字抛出异常表达式,抛出点称为异常出现点,由操作系统为程序设置当前异常对象,然后执行程序的当前异常处理代码块,在包含了异常出现点的最内层的 try 块,依次匹配catch语句中的异常对象(只进行类型匹配,catch参数有时在 catch 语句中并不会使用到)。若匹配成功,则执行 catch 块内的异常处理语句,然后接着执行 try…catch… 块之后的代码。如果在当前的 try…catch… 块内找不到匹配该异常对象的catch语句,则由更外层的 try…catch… 块来处理该异常;如果当前函数内所有的 try…catch… 块都不能匹配该异常,则递归回退到调用栈的上一层去处理该异常。如果一直退到主函数 main() 都不能处理该异常,则调用系统函数 terminate() 终止程序。\

一个最简单的 try…catch… 的例子如下所示。我们有个程序用来记班级学生考试成绩,考试成绩分数的范围在 0-100 之间,不在此范围内视为数据异常:

1
csharp复制代码int main(){    int score=0;    while (cin >> score)    {        try        {            if (score > 100 || score < 0)            {                throw score;            }            //将分数写入文件或进行其他操作        }        catch (int score)        {            cerr << "你输入的分数数值有问题,请重新输入!";            continue;        }    }}

throw 关键字
在上面这个示例中,throw 是个关键字,与抛出表达式构成了 throw 语句。其语法为:

1
arduino复制代码throw 表达式;

throw 语句必须包含在 try 块中,也可以是被包含在调用栈的外层函数的 try 块中,如:

1
python复制代码//示例代码:throw包含在外层函数的try块中void registerScore(int score){    if (score > 100 || score < 0)        throw score; //throw语句被包含在外层main的try语句块中    //将分数写入文件或进行其他操作}int main(){    int score=0;    while (cin >> score)    {        try        {            registerScore(score);        }        catch (int score)        {            cerr << "你输入的分数数值有问题,请重新输入!";            continue;        }    }}

执行 throw 语句时,throw 表达式将作为对象被复制构造为一个新的对象,称为异常对象。异常对象放在内存的特殊位置,该位置既不是栈也不是堆,在 window 上是放在线程信息块 TIB 中。这个构造出来的新对象与本级的 try 所对应的 catch 语句进行类型匹配,类型匹配的原则在下面介绍。

图片

在本例中,依据 score 构造出来的对象类型为 int,与 catch(int score) 匹配上,程序控制权转交到 catch 的语句块,进行异常处理代码的执行。如果在本函数内与 catch 语句的类型匹配不成功,则在调用栈的外层函数继续匹配,如此递归执行直到匹配上 catch 语句,或者直到 main 函数都没匹配上而调用系统函数 terminate() 终止程序。当执行一个 throw 语句时,跟在 throw 语句之后的语句将不再被执行,throw 语句的语法有点类似于 return,因此导致在调用栈上的函数可能提早退出。

异常对象

异常对象是一种特殊的对象,编译器依据异常抛出表达式复制构造异常对象,这要求抛出异常表达式不能是一个不完全类型(一个类型在声明之后定义之前为一个不完全类型。不完全类型意味着该类型没有完整的数据与操作描述),而且可以进行复制构造,这就要求异常抛出表达式的复制构造函数(或移动构造函数)、析构函数不能是私有的。

异常对象不同于函数的局部对象,局部对象在函数调用结束后就被自动销毁,而异常对象将驻留在所有可能被激活的 catch 语句都能访问到的内存空间中,也即上文所说的 TIB。当异常对象与 catch 语句成功匹配上后,在该 catch 语句的结束处被自动析构。在函数中返回局部变量的引用或指针几乎肯定会造成错误,同样的道理,在 throw 语句中抛出局部变量的指针或引用也几乎是错误的行为。如果指针所指向的变量在执行 catch 语句时已经被销毁,对指针进行解引用将发生意想不到的后果。throw 出一个表达式时,该表达式的静态编译类型将决定异常对象的类型。所以当 throw 出的是基类指针的解引用,而该指针所指向的实际对象是派生类对象,此时将发生派生类对象切割。除了抛出用户自定义的类型外,C++ 标准库定义了一组类,用户报告标准库函数遇到的问题。这些标准库异常类只定义了几种运算,包括创建或拷贝异常类型对象,以及为异常类型的对象赋值。

2.png

3.png

catch 关键字

catch语句匹配被抛出的异常对象。如果 catch 语句的参数是引用类型,则该参数可直接作用于异常对象,即参数的改变也会改变异常对象,而且在 catch 中重新抛出异常时会继续传递这种改变。如果 catch 参数是传值的,则复制构函数将依据异常对象来构造catch 参数对象。在该 catch 语句结束的时候,先析构 catch 参数对象,然后再析构异常对象。

在进行异常对象的匹配时,编译器不会做任何的隐式类型转换或类型提升。除了以下几种情况外,异常对象的类型必须与 catch 语句的声明类型完全匹配:

  1. 允许从非常量到常量的类型转换。
  2. 允许派生类到基类的类型转换。
  3. 数组被转换成指向数组(元素)类型的指针。
  4. 函数被转换成指向函数类型的指针。

寻找 catch 语句的过程中,匹配上的未必是类型完全匹配那项,而在是最靠前的第一个匹配上的 catch 语句(我称它为最先匹配原则)。所以,派生类的处理代码 catch 语句应该放在基类的处理 catch 语句之前,否则先匹配上的总是参数类型为基类的 catch 语句,而能够精确匹配的 catch 语句却不能够被匹配上。在 catch 块中,如果在当前函数内无法解决异常,可以继续向外层抛出异常,让外层catch 异常处理块接着处理。此时可以使用不带表达式的 throw 语句将捕获的异常重新抛出:

1
scss复制代码catch(type x){    //做了一部分处理    throw;}

被重新抛出的异常对象为保存在 TIB 中的那个异常对象,与 catch 的参数对象没有关系,若 catch 参数对象是引用类型,可能在 catch 语句内已经对异常对象进行了修改,那么重新抛出的是修改后的异常对象;若catch参数对象是非引用类型,则重新抛出的异常对象并没有受到修改。使用 catch(…){} 可以捕获所有类型的异常,根据最先匹配原则,catch(…){} 应该放在所有 catch 语句的最后面,否则无法让其他可以精确匹配的 catch 语句得到匹配。通常在catch(…){} 语句中执行当前可以做的处理,然后再重新抛出异常。注意,catch 中重新抛出的异常只能被外层的 catch 语句捕获。

栈展开、RAII

其实栈展开已经在前面说过,就是从异常抛出点一路向外层函数寻找匹配的 catch 语句的过程,寻找结束于某个匹配的 catch 语句或标准库函数 terminate。这里重点要说的是栈展开过程中对局部变量的销毁问题。我们知道,在函数调用结束时,函数的局部变量会被系统自动销毁,类似的,throw 可能会导致调用链上的语句块提前退出,此时,语句块中的局部变量将按照构成生成顺序的逆序,依次调用析构函数进行对象的销毁。例如下面这个例子:

1
css复制代码//一个没有任何意义的类class A{public:    A() :a(0){ cout << "A默认构造函数" << endl; }    A(const  A& rsh){ cout << "A复制构造函数" << endl; }    ~A(){ cout << "A析构函数" << endl; }private:    int  a;};int main(){        try        {            A a ;            throw a;        }        catch (A a)        {            ;        }    return 0;}

程序将输出:

图片

定义变量 a 时调用了默认构造函数,使用 a 初始化异常变量时调用了复制构造函数,使用异常变量复制构造 catch 参数对象时同样调用了复制构造函数。三个构造对应三个析构,也即 try 语句块中局部变量 a 自动被析构了。然而,如果 a 是在自由存储区上分配的内存时:

1
csharp复制代码int main(){    try    {        A * a= new A;        throw *a;    }    catch (A a)    {        ;    }    getchar();    return 0;}

程序运行结果:

图片

同样的三次构造,却只调用了两次的析构函数!说明 a 的内存在发生异常时并没有被释放掉,发生了内存泄漏。
RAII机制有助于解决这个问题,RAII(Resource acquisition is initialization,资源获取即初始化)。它的思想是以对象管理资源。为了更为方便、鲁棒地释放已获取的资源,避免资源死锁,一个办法是把资源数据用对象封装起来。程序发生异常,执行栈展开时,封装了资源的对象会被自动调用其析构函数以释放资源。C++ 中的智能指针便符合RAII。关于这个问题详细可以看 《Effective C++》条款13. 异常机制与构造函数

异常机制的一个合理的使用是在构造函数中。构造函数没有返回值,所以应该使用异常机制来报告发生的问题。更重要的是,构造函数抛出异常表明构造函数还没有执行完,其对应的析构函数不会自动被调用,因此析构函数应该先析构所有所有已初始化的基对象,成员对象,再抛出异常。
C++ 类构造函数初始化列表的异常机制,称为 function-try block。一般形式为:

1
php复制代码myClass::myClass(type1 pa1)    try:  _myClass_val (初始化值){  /*构造函数的函数体 */}  catch ( exception& err ){  /* 构造函数的异常处理部分 */};

异常机制与析构函数C++ 不禁止析构函数向外界抛出异常,但析构函数被期望不向外界函数抛出异常。析构函数中向函数外抛出异常,将直接调用 terminator() 系统函数终止程序。如果一个析构函数内部抛出了异常,就应该在析构函数的内部捕获并处理该异常,不能让异常被抛出析构函数之外。可以如此处理:

  1. 若析构函数抛出异常,调用 std::abort() 来终止程序。
  2. 在析构函数中 catch 捕获异常并作处理。

noexcept修饰符与noexcept操作符

noexcept 修饰符是 C++11 新提供的异常说明符,用于声明一个函数不会抛出异常。编译器能够针对不抛出异常的函数进行优化,另一个显而易见的好处是你明确了某个函数不会抛出异常,别人调用你的函数时就知道不用针对这个函数进行异常捕获。在 C++98中关于异常处理的程序中你可能会看到这样的代码:

1
csharp复制代码void func() throw(int ,double ) {...}void func() throw(){...}

这是 throw 作为函数异常说明,前者表示 func()这个函数可能会抛出 int 或 double 类型的异常,后者表示 func() 函数不会抛出异常。事实上前者很少被使用,在 C++11 这种做法已经被摒弃,而后者则被 C++11 的 noexcept 异常声明所代替:

1
arduino复制代码void func() noexcept {...}//等价于void func() throw(){...}

在 C++11 中,编译器并不会在编译期检查函数的 noexcept 声明,因此,被声明为noexcept 的函数若携带异常抛出语句还是可以通过编译的。在函数运行时若抛出了异常,编译器可以选择直接调用 terminate() 函数来终结程序的运行,因此,noexcept 的一个作用是阻止异常的传播,提高安全性.上面一点提到了,我们不能让异常逃出析构函数,因为那将导致程序的不明确行为或直接终止程序。实际上出于安全的考虑,C++11 标准中让类的析构函数默认也是 noexcept 的。同样是为了安全性的考虑,经常被析构函数用于释放资源的 delete 函数,C++11 也默认将其设置为 noexcept。

noexcept也可以接受一个常量表达式作为参数,例如:

1
scss复制代码void func() noexcept(常量表达式);

常量表达式的结果会被转换成 bool 类型,noexcept(bool) 表示函数不会抛出异常,noexcept(false) 则表示函数有可能会抛出异常。故若你想更改析构函数默认的 noexcept声明,可以显式地加上 noexcept(false) 声明,但这并不会带给你什么好处。
异常处理的性能分析
异常处理机制的主要环节是运行期类型检查。当抛出一个异常时,必须确定异常是不是从 try 块中抛出。异常处理机制为了完善异常和它的处理器之间的匹配,需要存储每个异常对象的类型信息以及 catch 语句的额外信息。由于异常对象可以是任何类型(如用户自定义类型),并且也可以是多态的,获取其动态类型必须要使用运行时类型检查(RTTI),此外还需要运行期代码信息和关于每个函数的结构。当异常抛出点所在函数无法解决异常时,异常对象沿着调用链被传递出去,程序的控制权也发生了转移。转移的过程中为了将异常对象的信息携带到程序执行处(如对异常对象的复制构造或者 catch 参数的析构),在时间和空间上都要付出一定的代价,本身也有不安全性,特别是异常对象是个复杂的类的时候。异常处理技术在不同平台以及编译器下的实现方式都不同,但都会给程序增加额外的负担,当异常处理被关闭时,额外的数据结构、查找表、一些附加的代码都不会被生成,正是因为如此,对于明确不抛出异常的函数,我们需要使用 noexcept 进行声明。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…125126127…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%