开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

SpringCloud系列教程(八)之整合seata分布式事

发表于 2021-11-09

阅读提醒:

  1. 本文面向的是有一定springboot基础者
  2. 本次教程使用的Spring Cloud Hoxton RELEASE版本
  3. 本文依赖上一篇的工程,请查看上一篇文章以做到无缝衔接,或者直接下载源码:github.com/WinterChenS…

两个月没有更新了,这次趁着刷技术文章的机会,把目前比较热门的分布式事务框架seata整合一下,分布式事务的出现是因为微服务导致业务分部在不同的服务中,不能像本地事务一样使用事务。

前情概要

  • SpringCloud系列教程(一)开篇
  • SpringCloud系列教程(二)之Nacos | 8月更文挑战
  • SpringCloud系列教程(三)之Open Feign | 8月更文挑战
  • SpringCloud系列教程(四)之SpringCloud Gateway | 8月更文挑战
  • SpringCloud系列教程(五)之SpringCloud Gateway 网关聚合开发文档 swagger knife4j 和登录权限统一验证
  • SpringCloud系列教程(六)之SpringCloud 使用sentinel作为熔断器
  • SpringCloud系列教程(七)之使用Spring Cloud Sleuth+Zipkin实现链路追踪

什么是seata?

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

关于更多的原理可以参考官方文档,这里就不赘述了:Seata 是什么

下载安装seata-server

下载

下载地址:Releases · seata/seata (github.com)

window下载zip,linux/mac下载tar.gz

注意:如何安装nacos请看这:Nacos 快速入门,建议查看前面的文章以做到丝滑入戏。

安装

解压之后修改配置文件registry.conf(这里主要是配置nacos作为配置中心):

1
2
3
4
5
6
7
8
9
10
11
protobuf复制代码config {
type = "nacos" ## 这里修改为nacos,并且修改下面对应的配置

nacos {
serverAddr = "127.0.0.1:8848"
namespace = "public"
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
}
}

然后继续修改注册中心(nacos作为注册中心):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protobuf复制代码registry {

type = "nacos"

nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "DEFAULT_GROUP" #这里的group需要与业务服务在一个group内
namespace = "public"
cluster = "default"
username = "nacos"
password = "nacos"
}
}

本文是以nacos作为注册中心和配置中心的,如果需要其它的方式可以查看官方文档。

上传配置

在上传之前先下载对应的配置文件模板

  1. 首先下载config.txt文件:seata/script/config-center at develop · seata/seata (github.com) ,将其放入到seata的解压目录下;见图例1
  2. 然后在目录下找到对应的配置中心的目录下的shell脚本,这里使用的是nacos-config.sh seata/script/config-center/nacos at develop · seata/seata (github.com),将其放入到${SEATA_DIR}/script/config-center/nacos/nacos-config.sh 注意:${SEATA_DIR} 是seata的根目录;见图例2
  3. 到seata的解压根目录下运行 sh script/config-center/nacos/nacos-config.sh -h 127.0.0.1 -p 8848 -g SEATA_GROUP -u nacos -w nacos 注意:nacos-config.sh是第2步下载的脚本文件,见图例3

图例1:

图例1

图例2:

图例2

图例3:

图例3

图例4:

图例4

在nacos控制台(localhost:8848/nacos,账密:nacos/nacos):

可以看到配置已经成功上传

具体的配置请查看:seata/config.txt at develop · seata/seata (github.com)

配置参数官方对照表:Seata 参数配置

注意修改对应数据库的配置,可以直接在nacos中修改配置。

有一个比较关键的点,也是很容易出错的点,有一个配置参数单独拿出来讲一下,
service.vgroupMapping.<你的服务名称>-group=default 这个分组需要seata-server和client都保持一致,当然,seata是可以存在多个事务分组的,比如我们订单业务涉及到库存等等逻辑,那么将这些在同一个事务的服务加入到同一个事务分组内,就如默认的配置文件中设置为:service.vgroupMapping.my_test_tx_group=default 那么我们需要在服务的application.yml中配置该组:

1
2
yaml复制代码seata:
tx-service-group: my_test_tx_group

启动seata-server:

windows:打开控制台:./seata-server.bat -h 127.0.0.1

linux/macos: sh [seata-server.sh](http://seata-server.sh) -h 127.0.0.1

成功启动:

成功启动

nacos注册中心:

nacos注册中心

多种部署方式:

  • 使用 Docker 部署 Seata Server
  • 使用 Kubernetes 部署 Seata Server
  • 使用 Helm 部署 Seata Server
  • Seata 高可用部署

新增seata的数据表:

对应的sql文件请查看:seata/script/server/db at 1.4.0 · seata/seata (github.com)

下载之后在数据库中新建库:seata,然后将建库脚本导入。

工程改造

1.复制工程

将工程:spring-cloud-nacos-consumer 复制一份,改为:order-server

将工程:spring-cloud-nacos-provider 复制一份,改为:stock-server

注意:复制之后需要修改部分pom配置才可以(改为对应的名称):

1
2
3
4
xml复制代码<artifactId>order-server</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>order-server</name>
<description>Demo project for Spring Boot</description>

并且在父pom中加入:

1
2
3
4
5
xml复制代码<modules>
...
<module>stock-server</module>
<module>order-server</module>
</modules>

然后重新导入依赖即可

如果还存在异常,请将.imi文件删除

2.增加依赖

在两个工程模块的pom中增加依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
xml复制代码<!-- seata -->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>

3.修改工程的application.yml配置

order-server

1
2
3
4
5
6
yaml复制代码logging:
level:
io:
seata: debug
seata:
tx-service-group: my_test_tx_group

stock-server

1
2
3
4
5
6
yaml复制代码logging:
level:
io:
seata: debug
seata:
tx-service-group: my_test_tx_group

4.数据库初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
sql复制代码-- 创建 order库、业务表、undo_log表
create database seata_order;
use seata_order;

DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `undo_log`
(
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`branch_id` BIGINT(20) NOT NULL,
`xid` VARCHAR(100) NOT NULL,
`context` VARCHAR(128) NOT NULL,
`rollback_info` LONGBLOB NOT NULL,
`log_status` INT(11) NOT NULL,
`log_created` DATETIME NOT NULL,
`log_modified` DATETIME NOT NULL,
`ext` VARCHAR(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;

-- 创建 stock库、业务表、undo_log表
create database seata_stock;
use seata_stock;

DROP TABLE IF EXISTS `stock_tbl`;
CREATE TABLE `stock_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
PRIMARY KEY (`id`),
UNIQUE KEY (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `undo_log`
(
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`branch_id` BIGINT(20) NOT NULL,
`xid` VARCHAR(100) NOT NULL,
`context` VARCHAR(128) NOT NULL,
`rollback_info` LONGBLOB NOT NULL,
`log_status` INT(11) NOT NULL,
`log_created` DATETIME NOT NULL,
`log_modified` DATETIME NOT NULL,
`ext` VARCHAR(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;

-- 初始化库存模拟数据
INSERT INTO seata_stock.stock_tbl (id, commodity_code, count) VALUES (1, 'product-1', 9999999);
INSERT INTO seata_stock.stock_tbl (id, commodity_code, count) VALUES (2, 'product-2', 0);

5.引入mybatis plus

父pom引入依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
xml复制代码<!-- mybatis plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.3.4</version>
</dependency>

<!-- 提供mysql驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>

两个工程分别引入依赖:

1
2
3
4
5
6
7
8
9
10
xml复制代码<!-- mybatis plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>

6.order-server业务修改

新增部分类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码@Data
@TableName("order_tbl")
@Accessors(chain = true)
@Builder
public class Order implements Serializable {

private static final longserialVersionUID= 1L;

@JsonFormat(shape = JsonFormat.Shape.STRING)
@TableId(value="id" ,type = IdType.AUTO)
/** */
@TableField("id")
private Integer id;

/** */
@TableField("user_id")
private String userId;

/** */
@TableField("commodity_code")
private String commodityCode;

/** */
@TableField("count")
private Integer count;

/** */
@TableField("money")
private BigDecimal money;

@Tolerate
public Order(){}
}
1
2
3
4
java复制代码@Mapper
public interface OrderTblMapper extends BaseMapper<Order> {

}

mapperxml:

1
2
3
4
5
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.winterchen.nacos.mapper.OrderTblMapper">

</mapper>

service:

1
2
3
4
5
java复制代码public interface OrderTblService extends IService<Order> {

void placeOrder(String userId, String commodityCode, Integer count);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码@Service
public class OrderTblServiceImpl extends ServiceImpl<OrderTblMapper, Order> implements OrderTblService {

@Autowired
private StockFeignClient stockFeignClient;

/**
* 下单:创建订单、减库存,涉及到两个服务
*
* @param userId
* @param commodityCode
* @param count
*/
@GlobalTransactional
@Transactional(rollbackFor = Exception.class)
@Override
public void placeOrder(String userId, String commodityCode, Integer count) {
BigDecimal orderMoney = new BigDecimal(count).multiply(new BigDecimal(5));
Order order = new Order().setUserId(userId).setCommodityCode(commodityCode).setCount(count).setMoney(orderMoney);
baseMapper.insert(order);
stockFeignClient.deduct(commodityCode, count);
}

}

StockFeignClient

1
2
3
4
5
6
7
java复制代码@FeignClient(name = "stock-server")
public interface StockFeignClient {

@PostMapping("/api/stock/deduct")
Boolean deduct(@RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码@Api(tags="订单API")
@RestController
@RequestMapping("/api/order")
public class OrderTblController {

@Autowired
private OrderTblService orderTblService;

/**
* 下单:插入订单表、扣减库存,模拟回滚
*
* @return
*/
@PostMapping("/placeOrder/commit")
public Boolean placeOrderCommit() {

orderTblService.placeOrder("1", "product-1", 1);
return true;

}

/**
* 下单:插入订单表、扣减库存,模拟回滚
*
* @return
*/
@PostMapping("/placeOrder/rollback")
public Boolean placeOrderRollback() {
// product-2 扣库存时模拟了一个业务异常,
orderTblService.placeOrder("1", "product-2", 1);
return true;
}

@PostMapping("/placeOrder")
public Boolean placeOrder(String userId, String commodityCode, Integer count) {
orderTblService.placeOrder(userId, commodityCode, count);
return true;
}
}

7.stock-server业务修改

entity:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码@Data
@TableName("stock_tbl")
@Accessors(chain = true)
@Builder
public class Stock implements Serializable {

private static final long serialVersionUID = 1L;

@JsonFormat(shape = JsonFormat.Shape.STRING)
@TableId(value="id" ,type = IdType.AUTO)
/** */
@TableField("id")
private Integer id;

/** */
@TableField("commodity_code")
private String commodityCode;

/** */
@TableField("count")
private Integer count;

@Tolerate
public Stock(){}
}

mapper:

1
2
3
4
java复制代码@Mapper
public interface StockTblMapper extends BaseMapper<Stock> {

}

mapperxml:

1
2
3
4
5
java复制代码<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.winterchen.nacos.mapper.StockTblMapper">

</mapper>

service:

1
2
3
4
5
java复制代码public interface StockTblService extends IService<Stock> {

void deduct(String commodityCode, int count);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码@Service
public class StockTblServiceImpl extends ServiceImpl<StockTblMapper, Stock> implements StockTblService {

@Transactional(rollbackFor = Exception.class)
@Override
public void deduct(String commodityCode, int count) {
if (commodityCode.equals("product-2")) {
throw new RuntimeException("异常:模拟业务异常:stock branch exception");
}

QueryWrapper<Stock> wrapper = new QueryWrapper<>();
wrapper.setEntity(new Stock().setCommodityCode(commodityCode));
Stock stock = baseMapper.selectOne(wrapper);
stock.setCount(stock.getCount() - count);

baseMapper.updateById(stock);
}
}

controller:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码@Api(tags="库存API")
@RestController
@RequestMapping("/api/stock")
public class StockTblController {

@Autowired
private StockTblService stockTblService;

/**
* 减库存
*
* @param commodityCode 商品代码
* @param count 数量
* @return
*/
@PostMapping(path = "/deduct")
public Boolean deduct(String commodityCode, Integer count) {
stockTblService.deduct(commodityCode, count);
return true;
}

}

8. 修改gateway:

修改GatewayConfiguration

initCustomizedApis 新增对order-server和stock-server的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码ApiDefinition api3 = new ApiDefinition("order")
.setPredicateItems(new HashSet<ApiPredicateItem>() {{

add(new ApiPathPredicateItem().setPattern("/order/**")
.setMatchStrategy(SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX));
}});
ApiDefinition api4 = new ApiDefinition("stock")
.setPredicateItems(new HashSet<ApiPredicateItem>() {{
add(new ApiPathPredicateItem().setPattern("/stock/**")
.setMatchStrategy(SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX));
}});
definitions.add(api3);
definitions.add(api4);

initGatewayRules 新增规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
java复制代码rules.add(new GatewayFlowRule("order")
.setCount(10)
.setIntervalSec(1)
);
rules.add(new GatewayFlowRule("order")
.setCount(2)
.setIntervalSec(2)
.setBurst(2)
.setParamItem(new GatewayParamFlowItem()
.setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_CLIENT_IP)
)
);

rules.add(new GatewayFlowRule("stock")
.setCount(10)
.setIntervalSec(1)
.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)
.setMaxQueueingTimeoutMs(600)
.setParamItem(new GatewayParamFlowItem()
.setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_HEADER)
.setFieldName("X-Sentinel-Flag")
)
);
rules.add(new GatewayFlowRule("stock")
.setCount(1)
.setIntervalSec(1)
.setParamItem(new GatewayParamFlowItem()
.setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_URL_PARAM)
.setFieldName("pa")
)
);
rules.add(new GatewayFlowRule("stock")
.setCount(2)
.setIntervalSec(30)
.setParamItem(new GatewayParamFlowItem()
.setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_URL_PARAM)
.setFieldName("type")
.setPattern("warn")
.setMatchStrategy(SentinelGatewayConstants.PARAM_MATCH_STRATEGY_CONTAINS)
)
);

rules.add(new GatewayFlowRule("stock")
.setResourceMode(SentinelGatewayConstants.RESOURCE_MODE_CUSTOM_API_NAME)
.setCount(5)
.setIntervalSec(1)
.setParamItem(new GatewayParamFlowItem()
.setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_URL_PARAM)
.setFieldName("pn")
)
);

appilcation.yml新增对order-server和stock-server的路由配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
yaml复制代码
spring:
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
gateway:
discovery:
locator:
enabled: false
lowerCaseServiceId: true
routes:
- id: provider
uri: lb://winter-nacos-provider
predicates:
- Path=/provider/**
filters:
- StripPrefix=1 #StripPrefix=1就代表截取路径的个数为1,比如前端过来请求/test/good/1/view,匹配成功后,路由到后端的请求路径就会变成http://localhost:8888/good/1/view
- id: consumer
uri: lb://winter-nacos-consumer
predicates:
- Path=/consumer/**
filters:
- StripPrefix=1
- id: auth
uri: lb://auth
predicates:
- Path=/auth/**
filters:
- StripPrefix=1
- id: order-server -------新增order-server的路由规则
uri: lb://order-server
predicates:
- Path=/order/**
filters:
- StripPrefix=1
- id: stock-server --------- 新增stock-server的路由规则
uri: lb://stock-server
predicates:
- Path=/stock/**
filters:
- StripPrefix=1

测试:

首先启动对应的服务:

  • spring-cloud-gateway
  • spring-cloud-auth
  • order-server
  • stock-server

然后打开swagger进行测试: consumer服务

测试提交:

订单创建成功:

库存扣减成功:

测试回滚:

此时数据库里面都正常回滚。

总结

以上就是本教程的全部内容了,seata是一款非常好用的分布式事务框架,为开发人员提供了比较简单的API,seata默认使用的是AT模式的事务,当然,可以结合自身的业务选择比较合适的分布式事务模式,具体的配置可以参考官方文档。

本项目的源码地址为:WinterChenS/spring-cloud-hoxton-study: spring cloud hoxton release study (github.com)

参考文献:

Seata 是什么

Seata(Fescar)分布式事务 整合 Spring Cloud

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一起学习PHP中的Tidy扩展库

发表于 2021-11-09

这个扩展估计很多同学可能都没听说过,这可不是泰迪熊呀,而是一个处理 HTML 相关操作的扩展,主要是可以用于 HTML 、 XHTML 、 XML 这类数据格式内容的格式化及展示。

关于 Tidy 库

Tidy 库扩展是随 PHP 一起发布的,也就是说,我们可以在编译安装 PHP 时加上 –with-tidy 来一起安装这个扩展,也可以在事后通过源码包中 ext/ 文件夹下的 tidy 目录中的源码来进行安装。同时,Tidy 扩展还需要依赖一个 tidy 函数库,我们需要在操作系统上安装,如果是 CentOS 的话,直接 yum install libtidy-devel 就可以了。

Tidy 格式化

首先我们来看一下如何通过这个 Tidy 扩展库来格式化一段 HTML 代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
php复制代码$content = <<<EOF
<html><head><title>test</title></head> <body><p>error<br>another line</i></body>
</html>
EOF;

$tidy = new Tidy();
$config = [
'indent'=>true,
'output-xhtml'=>true,
];
$tidy->parseString($content, $config);
$tidy->cleanRepair();

echo $tidy, PHP_EOL;
// <html xmlns="http://www.w3.org/1999/xhtml">
// <head>
// <title>
// test
// </title>
// </head>
// <body>
// <p>
// error<br />
// another line
// </p>
// </body>
// </html>

我们定义的 content中的这段HTML代码是没有任何格式的非常不规范的一段HTML代码。通过实例化一个Tidy对象之后,使用parseString()方法,并执行cleanRepair()方法之后,再直接打印content 中的这段 HTML 代码是没有任何格式的非常不规范的一段 HTML 代码。通过实例化一个 Tidy 对象之后,使用 parseString() 方法,并执行 cleanRepair() 方法之后,再直接打印 content中的这段HTML代码是没有任何格式的非常不规范的一段HTML代码。通过实例化一个Tidy对象之后,使用parseString()方法,并执行cleanRepair()方法之后,再直接打印tidy 对象,我们就获得了格式化之后的 HTML 代码。看起来是不是非常地规范,不管是 xmlns 还是 缩进 格式都非常标准。

parseString() 方法有两个参数,第一个参数就是需要格式化的字符串。第二个参数是格式化的配置,这个配置接收的是一个数组,同时它内部的内容也必须是 Tidy 组件中所定义的那些配置信息。这些配置信息我们可以在文后的第二条链接中进行查询。这里我们只配置了两个内容, indent 表示是否应用缩进块级,output-xhtml 表示是否输出为 xhtml 。

cleanRepair() 方法用于对已解析的内容执行清除和修复的操作,其实也就是格式化的清理工作。

注意我们在测试代码中是直接打印的 Tidy 对象,也就是说,这个对象实现了 __toString() ,而它真正的样子其实是这样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
php复制代码var_dump($tidy);
// object(tidy)#1 (2) {
// ["errorBuffer"]=>
// string(112) "line 1 column 1 - Warning: missing <!DOCTYPE> declaration
// line 1 column 70 - Warning: discarding unexpected </i>"
// ["value"]=>
// string(195) "<html xmlns="http://www.w3.org/1999/xhtml">
// <head>
// <title>
// test
// </title>
// </head>
// <body>
// <p>
// error<br />
// another line
// </p>
// </body>
// </html>"
// }

各种属性信息获取

1
2
3
4
5
6
7
8
9
php复制代码var_dump($tidy->isXml()); // bool(false)

var_dump($tidy->isXhtml()); // bool(false)

var_dump($tidy->getStatus()); // int(1)

var_dump($tidy->getRelease()); // string(10) "2017/11/25"

var_dump($tidy->getHtmlVer()); // int(500)

我们可以通过 Tidy 对象的属性获取一些关于待处理文档的信息,比如是否是 XML ,是否是 XHTML 内容。

getStatus() 返回的是 Tidy 对象的状态信息,当前这个 1 表示的是有警告或辅助功能错误的信息,从上面打印的 Tidy 对象的内容我们就可以看出,在这个对象的 errorBuffer 属性中是有 warning 报警信息的。

getRelease() 返回的是当前 Tidy 组件的版本信息,也就是你在操作系统上安装的那个 tidy 组件的信息。getHtmlVer() 返回的是检测到的 HTML 版本,这里的 500 没有更多的说明和介绍资料,不知道这个 500 是什么意思。

除了上面的这些内容之后,我们还可以获得前面 $config 中的配置信息及相关的说明。

1
2
3
4
php复制代码var_dump($tidy->getOpt('indent')); // int(1)

var_dump($tidy->getOptDoc('output-xhtml'));
// string(489) "This option specifies if Tidy should generate pretty printed output, writing it as extensible HTML. <br/>This option causes Tidy to set the DOCTYPE and default namespace as appropriate to XHTML, and will use the corrected value in output regardless of other sources. <br/>For XHTML, entities can be written as named or numeric entities according to the setting of <code>numeric-entities</code>. <br/>The original case of tags and attributes will be preserved, regardless of other options. "

getOpt() 方法需要一个参数,也就是需要查询的 config中配置的信息内容,如果是查看我们没有在config 中配置的信息内容,如果是查看我们没有在 config中配置的信息内容,如果是查看我们没有在config 中配置的参数的话,那么返回就都是默认的配置值。getOptDoc() 非常贴心,它返回的是关于某个参数的说明文档。

最后,是更加干货的一些方法,可以直接操作节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
php复制代码echo $tidy->head(), PHP_EOL;
// <head>
// <title>
// test
// </title>
// </head>

$body = $tidy->body();

var_dump($body);
// object(tidyNode)#2 (9) {
// ["value"]=>
// string(60) "<body>
// <p>
// error<br />
// another line
// </p>
// </body>"
// ["name"]=>
// string(4) "body"
// ["type"]=>
// int(5)
// ["line"]=>
// int(1)
// ["column"]=>
// int(40)
// ["proprietary"]=>
// bool(false)
// ["id"]=>
// int(16)
// ["attribute"]=>
// NULL
// ["child"]=>
// array(1) {
// [0]=>
// object(tidyNode)#3 (9) {
// ["value"]=>
// string(37) "<p>
// ………………
// ………………

echo $tidy->html(), PHP_EOL;
// <html xmlns="http://www.w3.org/1999/xhtml">
// <head>
// <title>
// test
// </title>
// </head>
// <body>
// <p>
// error<br />
// another line
// </p>
// </body>
// </html>

echo $tidy->root(), PHP_EOL;
// <html xmlns="http://www.w3.org/1999/xhtml">
// <head>
// <title>
// test
// </title>
// </head>
// <body>
// <p>
// error<br />
// another line
// </p>
// </body>
// </html>

相信不需要过多地解释就能够看出,head() 返回的就是 标签里面的内容,而 body() 、html() 也都是对应的相关标签,root() 返回的则是根结点的全部内容,可以看作是整个文档内容。

这些方法函数返回的内容其实都是一个 TidyNode 对象,这个我们在后面再详细地说明。

直接转换为字符串

上面的操作代码我们都是基于 parseString() 这个方法。它没有返回值,或者说返回的只是一个 布尔 类型的成功失败标识。如果我们需要获取格式化之后的内容,只能直接将对象当做字符串或者使用 root() 来获得所有的内容。其实,还有一个方法直接就是返回一个格式化后的字符串的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
php复制代码$tidy = new Tidy();
$repair = $tidy->repairString($content, $config);

echo $repair, PHP_EOL;
// <html xmlns="http://www.w3.org/1999/xhtml">
// <head>
// <title>
// test
// </title>
// </head>
// <body>
// <p>
// error<br />
// another line
// </p>
// </body>
// </html>

repairString() 方法的参数和 parseString() 是一模一样的,唯一不同的就是它是返回的一个字符串,而不是在 Tidy 对象内部进行操作。

转换错误信息

在最开始的测试代码中,我们使用 var_dump() 打印 Tidy 对象时就看到了 errorBuffer 这个变量里是有错误信息的。这回我们再来一个有更多问题的 HTML 代码片断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
php复制代码$html = <<<HTML
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">

<p>paragraph</p>
HTML;
$tidy = new Tidy();
$tidy->parseString($html);
$tidy->cleanRepair();

echo $tidy->errorBuffer, PHP_EOL;
// line 4 column 1 - Warning: <p> isn't allowed in <head> elements
// line 4 column 1 - Info: <head> previously mentioned
// line 4 column 1 - Warning: inserting implicit <body>
// line 4 column 1 - Warning: inserting missing 'title' element

$tidy ->diagnose();
echo $tidy->errorBuffer, PHP_EOL;
// line 4 column 1 - Warning: <p> isn't allowed in <head> elements
// line 4 column 1 - Info: <head> previously mentioned
// line 4 column 1 - Warning: inserting implicit <body>
// line 4 column 1 - Warning: inserting missing 'title' element
// Info: Doctype given is "-//W3C//DTD XHTML 1.0 Strict//EN"
// Info: Document content looks like XHTML 1.0 Strict
// Tidy found 3 warnings and 0 errors!

在这段测试代码中,我们又使用了一个新的 diagnose() 方法,它的作用是对文档进行诊断测试,并且在 errorBuffer 这个对象变量中添加有关文档的更多信息。

TidyNode 操作

之前我们说到过,head()、html()、body()、root() 这几个方法返回的都是一个 TidyNode 对象,那么这个对象有什么特殊的地方吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
php复制代码$html = <<<EOF
<html><head>
<?php echo '<title>title</title>'; ?>
<#
/* JSTE code */
alert('Hello World');
#>
</head>
<body>

<?php
// PHP code
echo 'hello world!';
?>

<%
/* ASP code */
response.write("Hello World!")
%>

<!-- Comments -->
Hello World
</body></html>
Outside HTML
EOF;

$tidy = new Tidy();
$tidy->parseString($html);

$tidyNode = $tidy->html();

showNodes($tidyNode);

function showNodes($node){

if($node->isComment()){
echo '========', PHP_EOL,'This is Comment Node :"', $node->value, '"', PHP_EOL;
}
if($node->isText()){
echo '--------', PHP_EOL,'This is Text Node :"', $node->value, '"', PHP_EOL;
}
if($node->isAsp()){
echo '++++++++', PHP_EOL,'This is Asp Script :"', $node->value, '"', PHP_EOL;
}
if($node->isHtml()){
echo '********', PHP_EOL,'This is HTML Node :"', $node->value, '"', PHP_EOL;
}
if($node->isPhp()){
echo '########', PHP_EOL,'This is PHP Script :"', $node->value, '"', PHP_EOL;
}
if($node->isJste()){
echo '@@@@@@@@', PHP_EOL,'This is JSTE Script :"', $node->value, '"', PHP_EOL;
}

if($node->name){
// getParent()
if($node->getParent()){
echo '&&&&&&&& ', $node->name ,' getParent is : ', $node->getParent()->name, PHP_EOL;
}

// hasSiblings
echo '^^^^^^^^ ', $node->name, ' has siblings is : ';
var_dump($node->hasSiblings());
echo PHP_EOL;
}

if($node->hasChildren()){
foreach($node->child as $child){
showNodes($child);
}
}
}

// ………………
// ………………
// ********
// This is HTML Node :"<head>
// <?php echo '<title>title</title>'; ><#
// /* JSTE code */
// alert('Hello World');
// #>
// <title></title>
// </head>
// "
// &&&&&&&& head getParent is : html
// ^^^^^^^^ head has siblings is : bool(true)
// ………………
// ………………
// ++++++++
// This is Asp Script :"<%
// /* ASP code */
// response.write("Hello World!")
// %>"
// ………………
// ………………

这段代码具体的测试步骤和各个函数的解释就不详细地一一列举说明了。大家通过代码就可以看出来,我们的 TidyNode 对象可以判断各个节点的内容,比如是否还有子结点、是否有兄弟结点。对象结点内容,可以判断结点的格式,是否是注释、是否是文本、是否是 JS 代码、是否是 PHP 代码、是否是 ASP 代码之类的内容。不知道看到这里的你是什么感觉,反正我是觉得这个玩意就非常有意思了,特别是判断 PHP 代码这些的方法。

信息统计函数

最后我们再来看一下 Tidy 扩展库中的一些统计函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
php复制代码$html = <<<EOF
<p>test</i>
<bogustag>bogus</bogustag>
EOF;
$config = array('accessibility-check' => 3,'doctype'=>'bogus');
$tidy = new Tidy();
$tidy->parseString($html, $config);

echo 'tidy access count: ', tidy_access_count($tidy), PHP_EOL;
echo 'tidy config count: ', tidy_config_count($tidy), PHP_EOL;
echo 'tidy error count: ', tidy_error_count($tidy), PHP_EOL;
echo 'tidy warning count: ', tidy_warning_count($tidy), PHP_EOL;

// tidy access count: 4
// tidy config count: 2
// tidy error count: 1
// tidy warning count: 6

其实它们返回的这些数量都是一些错误信息的数量。tidy_access_count() 表示的是遇到的辅助功能警告数量,tidy_config_count() 是配置信息错误的数量,另外两个从名字就看出来了,也就不用我多说了。

总结

总之,Tidy 扩展库又是一个不太常见但非常有意思的库。对于某些场景,比如模板开发之类的功能来说还是有一些用武之地的。大家可以报着学习的心态好好再深入的了解一下,说不定它正好就能解决你现在最棘手的问题哦!

测试代码:

github.com/zhangyue050…

参考文档:

www.php.net/manual/zh/b…

tidy.sourceforge.net/docs/quickr…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

12CR2 RAC安装避坑指南(Redhat7) 一、导读

发表于 2021-11-09

「这是我参与11月更文挑战的第9天,活动详情查看:2021最后一次更文挑战」

最新解决方案:Oracle集成基础安装包+补丁包

一、导读

本文主要介绍Redhat7.6系统安装oracle 12201版本RAC的一些坑以及避坑方式。绝对干货满满,物超所值,欢迎补充和纠错。

二、环境介绍

OS: redhat 7.6 64位(3.10.0-957.el7.x86_64)

ORACLE: 12CR2 RAC

三、避坑指南

Notes:坑点主要位于Grid软件安装步骤中cvu的check和root.sh执行。

接下来是本篇文章的重点以及主旨,简单直接:

坑1:ASM device sharedness check

Shared Storage Accessibility:/dev/asm_ocr …FAILED (PRVG-11506)

可参照MOS文档:

12.2: PRVG-0802 : Storage type for path “/dev/mapper/asm011p1” could not be determined (Doc ID 2251322.1)

这里先不说解决方案,mos建议是打补丁:Apply patch 25784424, if CVU storage check fails for ASMLib paths 。

坑2:CLSRSC-400: A system reboot is required to continue installing.

此坑简直为巨坑无比,root.sh执行到进度14/19时,突然停止并且报错****CLSRSC-400,让我重启主机系统,重启之后安装已经终止了,无奈卸载重装依然报错。

可参照MOS文档:

ALERT: root.sh Fails With “CLSRSC-400” While Installing GI 12.2.0.1 on RHEL or OL with RedHat Compatible Kernel (RHCK) 7.3 (Doc ID 2284463.1)

这里先不说解决方案,mos建议是通过applyOneOffs打补丁:Interim patch 25078431 is required before installing 12.2 GI on Linux 7.3 (RedHat and OL7 with RHCK).

坑3:kgfnGetConnDetails requires 4 parameters at/u01/app/12.2.0/grid/lib/asmcmdbase.pm line 5704.

此坑是修复以上两个坑之后,root.sh执行到进度19/19时,突然停止,并报错如上,显示root.sh执行失败。

)

可参照MOS文档:

install.sh Hung And root.sh Is Failing At asmcmd lsdg –suppressheader While Installing A Zone With Clusterware (Doc ID 2414241.1)

ASMCMD Failing With “KGFNGETCONNDETAILS Requires 4 Parameters at <GI_HOME>/lib/asmcmdbase.pm (Doc ID 2748316.1)

解决方案:

1
bash复制代码/usr/bin/make -f /u01/app/12.2.0/grid/rdbms/lib/ins_rdbms.mk client_sharedlib libasmclntsh12.ohso libasmperl12.ohso ORACLE_HOME=/u01/app/12.2.0/grid

1.如果已经遇到该错误,那么执行以上命令之后,重新执行root.sh即可,如果提前看到了本文,请按方法2提前执行;

*2.__出现执行root.sh提示框之时执行以上命令,需在两个节点以root身份执行该命令,GRID_HOME路径请根据实际情况填写*

所有节点执行完之后,再执行root.sh。

最后来总结以下坑1、2的解决方案:

参考MOS文档:

How to Apply a Grid Infrastructure Patch Before Grid Infrastructure Configuration (before root.sh or rootupgrade.sh or gridsetup.bat) is Executed (Doc ID 1410202.1)

通过12C开始支持的_applyPSU__方式,提前给Grid软件打上最新的补丁,修复坑1,2的bug:_

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
sql复制代码1.解压OPatch补丁包
$ su - grid -c "unzip /soft/p6880880_122010_Linux-x86-64.zip -d /u01/app/12.2.0/grid/"
Notes:选择A

2.查看OPatch补丁包版本
$ su - grid -c "/u01/app/12.2.0/grid/OPatch/opatch version"
OPatch Version: 12.2.0.1.24

3.解压最新PSU补丁
$ su - grid -c "unzip /soft/p32226491_122010_Linux-x86-64.zip -d /soft"

4.执行grid安装
$ ./gridSetup.sh -applyPSU /soft/32226491
Preparing the home to patch...
Applying the patch /soft/32226491/...
Successfully applied the patch.
The log can be found at: /u01/app/oraInventory/logs/GridSetupActions2021-04-01_04-18-54PM/installerPatchActions_2021-04-01_04-18-54PM.log
Launching Oracle Grid Infrastructure Setup Wizard...


5.安装grid成功之后查看补丁
$ opatch lspatches
26839277;DBWLM RELEASE UPDATE 12.2.0.1.0(ID:170913) (26839277)
32231681;ACFS JAN 2021 RELEASE UPDATE 12.2.0.1.210119 (32231681)
32253903;TOMCAT RELEASE UPDATE 12.2.0.1.0(ID:RELEASE) (32253903)
31802727;OCW OCT 2020 RELEASE UPDATE 12.2.0.1.201020 (31802727)
32228578;Database Jan 2021 Release Update : 12.2.0.1.210119 (32228578)

OPatch succeeded.

下一节主要介绍环境配置中需要注意的一些细节,请参考。

四、环境配置注意点

1.Reverse Path Filtering(rp_filter)

关于如何设置rp_filter,可以参照MOS文档和官方文档:

RAC and Oracle Clusterware Best Practices and Starter Kit (Linux) (Doc ID 811306.1)

docs.oracle.com/database/12…

1
2
3
4
5
6
7
ini复制代码对于Linux Kernels 2.6.31(包括例如Oracle Linux和RedHat)及更高版本,在反向路径过滤中已修复了一个错误。 由于此错误修复,可能会在多互连系统上阻止/丢弃互连数据包。 为避免这种情况,使用多个NIC进行专用互连的Oracle RAC系统现在需要rp_filter参数的特定设置。

例如,在eth1和eth2是专用互连NIC,而eth0是公用网络NIC的情况下,请设置/etc/sysctl.conf中的参数将专用地址的rp_filter设置为2(宽松过滤),将公用地址的rp_filter设置为1(严格过滤):

net.ipv4.conf.eth2.rp_filter = 2
net.ipv4.conf.eth1.rp_filter = 2
net.ipv4.conf.eth0.rp_filter = 1

2.Memlock&&HugePages

关于如何设置标准大页HugePages内存,可以参照MOS文档和官方文档:

What is Memlock and How to Calculate the Values for Memlock? (Doc ID 2511230.1)

docs.oracle.com/database/12…

一般在安装部署时,由于无法估算应用对标准大页的需要,因此通常是禁用HugePages内存,当禁用HugePages内存时,最大锁定内存限制应设置为至少3145728 KB(3 GB)。

1
2
3
4
5
6
bash复制代码cat <<EOF >>/etc/security/limits.conf
oracle soft memlock 3145728
oracle hard memlock 3145728
grid soft memlock 3145728
grid hard memlock 3145728
EOF

关于启用大页内存的一些限制:

a.自动内存管理(AMM)和HugePages不兼容。 使用AMM时,通过在/ dev / shm下创建文件来分配整个SGA内存。 当Oracle数据库通过AMM分配SGA时,不会保留HugePages。 要在Oracle Database 12c上使用HugePages,必须禁用AMM。

b.您必须取消设置MEMORY_TARGET和MEMORY_MAX_TARGET初始化参数。 例如,要取消设置数据库实例的参数,请使用命令ALTER SYSTEM RESET。

c.确保正确配置了HugePages,因为如果应用程序未使用过多的HugePages,则系统可能会耗尽内存。

d.如果实例启动时HugePages不足,并且初始化参数use_large_pages设置为only,则数据库无法启动,并且警报日志消息会提供有关Hugepages的必要信息。

在已知风险和限制条件下,如果要启用HugePages,通过以下步骤来看看如何设置hugepages,假设我们现在内存为256G:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
bash复制代码1.运行以下命令以确定内核是否支持HugePages:
$ grep Huge /proc/meminfo

2.使用root用户编辑/etc/security/limits.conf文件中的memlock设置。 内存锁设置以KB为单位指定,启用HugePages内存时,最大锁定内存限制应设置为当前RAM的至少90%,禁用HugePages内存时,最大锁定内存限制应设置为至少3145728 KB(3 GB)。 例如,我们是 256 GB RAM,则添加以下条目以增加最大的内存锁定地址空间:(256GB*0.9*1024*1024=241591910.4KB)

cat <<EOF >>/etc/security/limits.conf
oracle soft memlock 241591910
oracle hard memlock 241591910
grid soft memlock 241591910
grid hard memlock 241591910
EOF

3.内存锁设置后以oracle用户身份登录并运行ulimit -l命令以验证新的内存锁设置:
$ ulimit -l
241591910

4.运行以下命令以显示Hugepagesize变量的值:
$ grep Hugepagesize /proc/meminfo

5.通过Oracle官方文档提供的一个脚本,该脚本为当前共享内存段的大页配置计算推荐值:
a.创建shell脚本hugepages_settings.sh
b.添加以下脚本内容:
#!/bin/bash
#
# hugepages_settings.sh
#
# Linux bash script to compute values for the
# recommended HugePages/HugeTLB configuration
#
# Note: This script does calculation for all shared memory
# segments available when the script is run, no matter it
# is an Oracle RDBMS shared memory segment or not.
# Check for the kernel version
KERN=`uname -r | awk -F. '{ printf("%d.%d\n",$1,$2); }'`
# Find out the HugePage size
HPG_SZ=`grep Hugepagesize /proc/meminfo | awk {'print $2'}`
# Start from 1 pages to be on the safe side and guarantee 1 free HugePage
NUM_PG=1
# Cumulative number of pages required to handle the running shared memory segments
for SEG_BYTES in `ipcs -m | awk {'print $5'} | grep "[0-9][0-9]*"`
do
MIN_PG=`echo "$SEG_BYTES/($HPG_SZ*1024)" | bc -q`
if [ $MIN_PG -gt 0 ]; then
NUM_PG=`echo "$NUM_PG+$MIN_PG+1" | bc -q`
fi
done
# Finish with results
case $KERN in
'2.4') HUGETLB_POOL=`echo "$NUM_PG*$HPG_SZ/1024" | bc -q`;
echo "Recommended setting: vm.hugetlb_pool = $HUGETLB_POOL" ;;
'2.6'|'3.8') echo "Recommended setting: vm.nr_hugepages = $NUM_PG" ;;
*) echo "Unrecognized kernel version $KERN. Exiting." ;;
esac
# End

c.授予shell脚本执行权限
$ chmod +x hugepages_settings.sh

d.运行hugepages_settings.sh脚本来计算hugepages配置的值:
$ ./hugepages_settings.sh
Notes:在运行此脚本之前,请确保运行使用大页面的所有应用程序。

6.设置以下内核参数,其中value是您在步骤5中确定的HugePages值:
# sysctl -w vm.nr_hugepages=value

7.要确保在系统重新启动后分配了HugePages,请将以下参数添加到/etc/sysctl.conf文件中,其中value是您在步骤5中确定的HugePages值:
vm.nr_hugepages=value
Notes:如果无法使用nr_hugepages设置HugePages分配,则可用内存可能会碎片化。 重新启动服务器,以使Hugepages分配生效。

8.运行以下命令以检查可用的大页面:
$ grep Huge /proc/meminfo

9.重新启动实例。

3.Transparent HugePages&&NUMA

为什么要把Transparent HugePages和NUMA放在一起讲呢?很简单,因为他们的配置方法是相同的,请参考MOS文档和官方文档:

\*ALERT: Disable Transparent HugePages on SLES11, RHEL6, RHEL7, OL6, OL7, and UEK2 and above (Doc ID 1557478.1) ***

docs.oracle.com/database/12…

关于透明大页,oracle的建议:如果您正在运行RedHat / OEL 6/7,SLES 11/12或UEK2内核,请确保禁用“透明HugePages”以防止性能问题和节点/实例驱逐。

关闭透明大页有两种方式,分别为:

a.在/etc/rc.local中添加以下行并重新启动服务器(尽管不建议使用rc.local,但仍可以在Redhat 7上完成此操作),该方法linux6和7通用:

1
2
3
4
5
6
7
8
9
10
11
12
typescript复制代码root用户下:
cat <<EOF >> /etc/rc.local
if test -f /sys/kernel/mm/transparent_hugepage/enabled; then
echo never > /sys/kernel/mm/transparent_hugepage/enabled
fi
if test -f /sys/kernel/mm/transparent_hugepage/defrag; then
echo never > /sys/kernel/mm/transparent_hugepage/defrag
fi
EOF

Notes:redhat7需要授予rc.local执行权限:
chmod +x /etc/rc.local

b.将以下内容添加到/etc/default/grub(到/boot/grub/grub.conf的符号链接)的内核引导行中,然后重新引导服务器(这是首选方法),但只支持linux7以上:

这里禁用numa也是同样的设置方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bash复制代码1.添加关闭命令到/etc/grub.conf文件中,添加在quiet后面:
sed -i 's/quiet/quiet transparent_hugepage=never numa=off/' /etc/default/grub

2.重新引导服务器
a.MBR分区
$ grub2-mkconfig -o /etc/grub2.cfg

b.GPT分区表
$grub2-mkconfig -o /etc/grub2-efi.cfg

Notes:需要注意系统主分区的引导方式,MBR还是GPT,可以通过fdisk -l查看主分区的Disk label type值。

3.重启服务器

4.查看配置是否生效
$ cat /proc/cmdline

4.Disk I/O Scheduler

关于设置Disk I/O Scheduler,可参考官方文档:

docs.oracle.com/en/database…

为了获得Oracle ASM的最佳性能,Oracle建议您使用Deadline I/O Scheduler。

在每个群集节点上,输入以下命令以验证是否配置了Deadline disk I/O调度程序以供使用:

1
2
shell复制代码# cat /sys/block/${ASM_DISK}/queue/scheduler
noop [deadline] cfq

在此示例中,默认磁盘 I/O 调度程序为Deadline,而ASM_DISK为Oracle自动存储管理(Oracle ASM)磁盘设备。

如果输出为空:

_Notes:_在某些虚拟环境(VM)和特殊设备(例如快速存储设备)上,上述命令的输出可能为空。 操作系统或VM绕过内核 I/O 调度,并将所有 I/O 请求直接提交给设备。 在这种环境下,请勿更改 I/O Scheduler设置。

接下来,我们来设置一下Disk I/O Scheduler:

1
2
3
4
5
6
7
8
9
10
11
12
13
ruby复制代码1.使用文本编辑器为Oracle ASM设备创建一个UDEV规则文件,将以下行添加到规则文件并保存: 
# cat<<EOF >/etc/udev/rules.d/60-oracle-schedulers.rules
ACTION=="add|change", KERNEL=="sd[a-z]", ATTR{queue/rotational}=="0", ATTR{queue/scheduler}="deadline"
EOF

2.如果是RAC集群,需要将规则文件复制到集群上的所有其他节点。 例如:
$ scp 60-oracle-schedulers.rules root@node2:/etc/udev/rules.d/

3.加载规则文件并重新启动UDEV服务
# udevadm control --reload-rules

4.验证磁盘 I/O 调度程序已设置为Deadline
# cat /sys/block/${ASM_DISK}/queue/scheduler

5.ORACLE_HOSTNAME环境变量

可参照官方文档:

docs.oracle.com/en/database…

您必须设置ORACLE_HOSTNAME环境变量才能在多别名计算机上安装Oracle数据库。 多别名计算机是多个别名解析到的计算机。

具有多个别名的计算机是在单个IP地址下向命名服务注册的计算机,但是它将多个别名解析为该地址。 命名服务将这些别名中的任何一个解析到同一台计算机。 在此类计算机上安装Oracle数据库之前,请将Oracle安装所有者环境变量ORACLE_HOSTNAME设置为要使用其主机名的计算机。

1
2
3
4
bash复制代码cat <<EOF >> /home/oracle/.bash_profile
$ ORACLE_HOSTNAME=somehost.example.com
$ export ORACLE_HOSTNAME
EOF

6.AVAHI daemon&&NOZEROCONF

关于配置AVAHI daemon&&NOZEROCONF,可以参照MOS文档和官方文档:

\*RAC and Oracle Clusterware Best Practices and Starter Kit (Linux) (Doc ID 811306.1) ***

\*CSSD Fails to Join the Cluster After Private Network Recovered if avahi Daemon is up and Running (Doc ID 1501093.1) ***

en.wikipedia.org/wiki/Zero-c…

1
2
3
4
5
6
7
8
9
10
bash复制代码1.关闭并禁用avahi-daemon服务
systemctl stop avahi-daemon.socket
systemctl stop avahi-daemon.service
systemctl disable avahi-daemon.service
ps -ef|grep avahi-daemon

2.On Oracle Linux/Redhat Linux, "NOZEROCONF=yes" must be included in /etc/sysconfig/network
echo "NOZEROCONF=yes" >> /etc/sysconfig/network

3.Once avahi is disabled, restart the stack or reboot the node.

7./dev/shm

关于/dev/shm可参考mos文档:

\*Bug 25907259 - Linux:RHEL7: cvu reports /dev/shm not mounted when it is mounted (Doc ID 25907259.8) ***

**Bug 21441387 - CVU reports /dev/shm as NOT mounted when it is mounted (PRVE-0421) (Doc ID 21441387.8) **

1
2
3
4
5
html复制代码Rediscovery Notes
If /dev/shm mount check is performed on RHEL7

Workaround
ignore error messages
1
2
3
4
bash复制代码cat <<EOF >> /etc/fstab
tmpfs /dev/shm tmpfs size=4G 0 0
EOF
mount -o remount /dev/shm

_Notes:_Mos建议我们可以忽略该提示,这里我们可以手动添加到fstab文件中,即可。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

『十倍程序员』快速接入Sentinel

发表于 2021-11-09

「这是我参与11月更文挑战的第6天,活动详情查看:2021最后一次更文挑战」。

Sentinel 的使用可以分为两个部分:

  • 核心库(Java 客户端):不依赖任何框架/库,能够运行于 Java 7 及以上的版本的运行时环境,同时对 Dubbo / Spring Cloud 等框架也有较好的支持(见 主流框架适配)。
  • 控制台(Dashboard):Dashboard 主要负责管理推送规则、监控、管理机器信息等。

启动 Sentinel 控制台

概述

Sentinel 提供一个轻量级的开源控制台,它提供机器发现以及健康情况管理、监控(单机和集群),规则管理和推送的功能。这里,我们将会详细讲述如何通过简单的步骤就可以使用这些功能。

接下来,我们将会逐一介绍如何整合 Sentinel 核心库和 Dashboard,让它发挥最大的作用。同时我们也在阿里云上提供企业级的 Sentinel 服务:AHAS Sentinel 控制台,您只需要几个简单的步骤,就能最直观地看到控制台如何实现这些功能,并体验多样化的监控及全自动托管的集群流控能力。

Sentinel 控制台包含如下功能:

  • 查看机器列表以及健康情况:收集 Sentinel 客户端发送的心跳包,用于判断机器是否在线。
  • 监控 (单机和集群聚合):通过 Sentinel 客户端暴露的监控 API,定期拉取并且聚合应用监控信息,最终可以实现秒级的实时监控。
  • 规则管理和推送:统一管理推送规则。
  • 鉴权:生产环境中鉴权非常重要。这里每个开发者需要根据自己的实际情况进行定制。

注意:Sentinel 控制台目前仅支持单机部署。Sentinel 控制台项目提供 Sentinel 功能全集示例,不作为开箱即用的生产环境控制台,若希望在生产环境使用请根据文档自行进行定制和改造。

获取 Sentinel 控制台

您可以从 release 页面 下载最新版本的控制台 jar 包。

您也可以从最新版本的源码自行构建 Sentinel 控制台:

  • 下载 控制台 工程
  • 使用以下命令将代码打包成一个 fat jar: mvn clean package

启动

注意:启动 Sentinel 控制台需要 JDK 版本为 1.8 及以上版本。

使用如下命令启动控制台:

1
shell复制代码java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar

其中 -Dserver.port=8080 用于指定 Sentinel 控制台端口为 8080。

从 Sentinel 1.6.0 起,Sentinel 控制台引入基本的登录功能,默认用户名和密码都是 sentinel。可以参考 鉴权模块文档 配置用户名和密码。

注:若您的应用为 Spring Boot 或 Spring Cloud 应用,您可以通过 Spring 配置文件来指定配置,详情请参考 Spring Cloud Alibaba Sentinel 文档。

启动成功可以看到默认的应用:sentinel-dashboard

img

客户端接入控制台

以SpringBoot项目为例:

通过 pom.xml 引入 JAR 包

1
2
3
4
5
xml复制代码<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>

配置启动参数

在application.yml或application.properties文件中增加配置

1
2
3
4
5
6
7
8
9
10
11
xml复制代码spring:
application:
name: rameo
security:
user: # 配置安全用户
name: admin
password: 123456
cloud:
sentinel:
transport:
dashboard: 127.0.0.1:8080 #sentinel控制台的请求地址

定义资源

资源 是 Sentinel 中的核心概念之一。最常用的资源是我们代码中的 Java 方法。 当然,您也可以更灵活的定义你的资源,例如,把需要控制流量的代码用 Sentinel API SphU.entry("HelloWorld") 和 entry.exit() 包围起来即可。在下面的例子中,我们将 System.out.println("hello world"); 作为资源(被保护的逻辑),用 API 包装起来。参考代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public static void main(String[] args) {
// 配置规则.
initFlowRules();

while (true) {
// 1.5.0 版本开始可以直接利用 try-with-resources 特性
try (Entry entry = SphU.entry("HelloWorld")) {
// 被保护的逻辑
System.out.println("hello world");
} catch (BlockException ex) {
// 处理被流控的逻辑
System.out.println("blocked!");
}
}
}

完成以上两步后,代码端的改造就完成了。

您也可以通过我们提供的 注解支持模块,来定义我们的资源,类似于下面的代码:

1
2
3
4
5
java复制代码@SentinelResource("HelloWorld")
public void helloWorld() {
// 资源中的逻辑
System.out.println("hello world");
}

这样,helloWorld() 方法就成了我们的一个资源。注意注解支持模块需要配合 Spring AOP 或者 AspectJ 一起使用。

定义规则

接下来,通过流控规则来指定允许该资源通过的请求次数,例如下面的代码定义了资源 HelloWorld 每秒最多只能通过 20 个请求。

1
2
3
4
5
6
7
8
9
10
java复制代码private static void initFlowRules(){
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource("HelloWorld");
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// Set limit QPS to 20.
rule.setCount(20);
rules.add(rule);
FlowRuleManager.loadRules(rules);
}

完成上面 3 步,Sentinel 就能够正常工作了。更多的信息可以参考 使用文档。

启动项目

默认情况下Sentinel 会在客户端首次调用的时候进行初始化(懒加载),开始向控制台发送心跳包。

需要先访问接口,才能在控制台看到效果。

img

Sentinel与Hystrix的区别

功能 Sentinel Hystrix
隔离策略 信号量隔离(并发线程数限流) 线程池隔离/信号量隔离
熔断降级策略 基于响应时间、异常比率、异常数 基于异常比率
实时统计实现 滑动窗口(LeapArray) 滑动窗口(基于 RxJava)
动态规则配置 支持多种数据源 支持多种数据源
扩展性 多个扩展点 插件的形式
基于注解的支持 支持 支持
限流 基于 QPS,支持基于调用关系的限流 有限的支持
流量整形 支持预热模式、匀速器模式、预热排队模式(流量规则处可配置) 不支持
系统自适应保护 支持 不支持
控制台 提供开箱即用的控制台,可配置规则、查看秒级监控、机器发现等 简单的监控查看

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java-线程池详解

发表于 2021-11-09

线程池

线程池详解

  1. 参数含义

image.png
2. 线程创建时机

image.png

  • 假设我们的任务特别的多,已经达到了workQueue的容量上限,这时线程池就会启动后备力量,也就是maxPoolsize最大线程数,线程池会在corePoolsize核心线程数的基础上继续创建线程来执行任务
  • 假设任务被不断提交,线程池会持续创建线程直到线程数达到maxPoolsize最大线程数
  • 如果依然有任务被提交,这就超过了线程池的最大处理能力,这个时候线程池就会拒绝这些任务
  • 可以看到实际上任务进来之后,线程池会逐一判断 corePoolsize、 workQueue、 maxPoolsize如果依然不能满足需求,则会拒绝任务
  • corePoolsize指的是核心线程数,线程池初始化时线程数默认为0,当有新的任务提交后,会创建新线程执行任务,如果不做特殊设置,此后线程数通常不会再小于core Poolsize,因为它们是核心线程,即便未来可能没有可执行的任务也不会被销毁
  • 随着任务量的增加,在任务队列满了之后,线程池会进一步创建新线程,最多可以达到maxPoolsize来应对任务多的场景,如果未来线程有空闲,大于 corePoolsize的线程会被合理回收
  • 所以正常情况下,线程池中的线程数量会处在corepoolsize与 maxPoolsize的闭区间内
  1. 线程池特点
  • 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加线程
  • 线程池只有在任务队列填满时才创建多于corePoolsize的线程,如果使用的是无界队列(例如LinkedBlockingQueue),那么由于队列不会满,所以线程数不会超过 corePoolsize
  • 通过设置corePoolsize和maxPoolsize为相同的值,就可以创建固定大小的线程池
  • 通过设置maxPoolsize为很高的值,例如 nteger.MAX_VALUE,就可以允许线程池创建任意多的线程
  1. 参数详解
  • keepAliveTime 时间单位

当线程池中线程数量多于核心线程数时,而此时又没有任务可做,线程池就会检测线程的keepAliveTime如果超过规定的时间,无事可做的线程就会被销毁,以便减少内存的占用和资源消耗如果后期任务又多了起来,线程池也会根据规则重新创建线程,所以这是一个可伸缩的过程,比较灵活我们也可以用 setKeepAliveTime()方法动态改变keepAliveTime的参数值

  • ThreadFactory

ThreadFactory实际上是一个线程工厂,它的作用是生产线程以便执行任务我们可以选择使用默认的线程工厂,创建的线程都会在同一个线程组并拥有一样的优先级,且都不是守护线程我们也可以选择自己定制线程工厂,以方便给线程自定义命名不同的线程池内的线程通常会根据具体业务来定制不同的线程名

线程拒绝策略

新建一个线程池,使用容量上限为10的ArrayBlockingQueue作为任务队列,并且指定线程池的核心线程数为5,最大线程数为10,假设此时有20个耗时任务被提交,在这种情况下,线程池会首先创建核心数量的线程,也就是5个线程来执行任务,然后往队列里去放任务,队列的10个容量被放满了之后继续创建新线程,直到达到最大线程数10。此时线程池中一共有20个任务,其中10个任务正在被10个线程执行,还有10个任务在任务队列中等待,而且由于线程池的最大线程数量就是10,所以已经不能再增加更多的线程来帮忙处理任务了,这就意味着此时线程池工作饱和,这个时候再提交新任务时就会被拒绝

image.png

Java 在 ThreadPoolExecutor 类中为我们提供了 4 种默认的拒绝策略来应对不同的场景,都实现了 RejectedExecutionHandler 接口,如图所示:

image.png

  • AbortPolicy

第一种拒绝策略是 AbortPolicy,这种拒绝策略在拒绝任务时,会直接抛出一个类型为 RejectedExecutionException 的 RuntimeException,让你感知到任务被拒绝了,于是你便可以根据业务逻辑选择重试或者放弃提交等策略

  • DiscardPolicy

第二种拒绝策略是 DiscardPolicy,这种拒绝策略正如它的名字所描述的一样,当新任务被提交后直接被丢弃掉,也不会给你任何的通知,相对而言存在一定的风险,因为我们提交的时候根本不知道这个任务会被丢弃,可能造成数据丢失

  • DiscardOldestPolicy

第三种拒绝策略是 DiscardOldestPolicy,如果线程池没被关闭且没有能力执行,则会丢弃任务队列中的头结点,通常是存活时间最长的任务,这种策略与第二种不同之处在于它丢弃的不是最新提交的,而是队列中存活时间最长的,这样就可以腾出空间给新提交的任务,但同理它也存在一定的数据丢失风险

  • CallerRunsPolicy

第四种拒绝策略是 CallerRunsPolicy,相对而言它就比较完善了,当有新任务提交后,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。这样做主要有两点好处

1).第一点新提交的任务不会被丢弃,这样也就不会造成业务损失

2).第二点好处是,由于谁提交任务谁就要负责执行任务,这样提交任务的线程就得负责执行任务,而执行任务又是比较耗时的,在这段期间,提交任务的线程被占用,也就不会再提交新的任务,减缓了任务提交的速度,相当于是一个负反馈。在此期间,线程池中的线程也可以充分利用这段时间来执行掉一部分任务,腾出一定的空间,相当于是给了线程池一定的缓冲期

线程池好处

使用线程池的好处

使用线程池比手动创建线程主要有三点好处。

第一点,线程池可以解决线程生命周期的系统开销问题,同时还可以加快响应速度。因为线程池中的线程是可以复用的,我们只用少量的线程去执行大量的任务,这就大大减小了线程生命周期的开销。而且线程通常不是等接到任务后再临时创建,而是已经创建好时刻准备执行任务,这样就消除了线程创建所带来的延迟,提升了响应速度,增强了用户体验。

第二点,线程池可以统筹内存和 CPU 的使用,避免资源使用不当。线程池会根据配置和任务数量灵活地控制线程数量,不够的时候就创建,太多的时候就回收,避免线程过多导致内存溢出,或线程太少导致 CPU 资源浪费,达到了一个完美的平衡。

第三点,线程池可以统一管理资源。比如线程池可以统一管理任务队列和线程,可以统一开始或结束任务,比单个线程逐一处理任务要更方便、更易于管理,同时也有利于数据统计,比如我们可以很方便地统计出已经执行过的任务的数量。

6种常见线程池

  • FixedThreadPool

第一种线程池叫作 FixedThreadPool,它的核心线程数和最大线程数是一样的,所以可以把它看作是固定线程数的线程池,它的特点是线程池中的线程数除了初始阶段需要从 0 开始增加外,之后的线程数量就是固定的,就算任务数超过线程数,线程池也不会再创建更多的线程来处理任务,而是会把超出线程处理能力的任务放到任务队列中进行等待。而且就算任务队列满了,到了本该继续增加线程数的时候,由于它的最大线程数和核心线程数是一样的,所以也无法再增加新的线程了。

  • CachedThreadPool

第二种线程池是 CachedThreadPool,可以称作可缓存线程池,它的特点在于线程数是几乎可以无限增加的(实际最大可以达到 Integer.MAX_VALUE,为 2^31-1,这个数非常大,所以基本不可能达到),而当线程闲置时还可以对线程进行回收。也就是说该线程池的线程数量不是固定不变的,当然它也有一个用于存储提交任务的队列,但这个队列是 SynchronousQueue,队列的容量为0,实际不存储任何任务,它只负责对任务进行中转和传递,所以效率比较高。

  • ScheduledThreadPool

第三个线程池是 ScheduledThreadPool,它支持定时或周期性执行任务。比如每隔 10 秒钟执行一次任务,而实现这种功能的方法主要有 3 种,如代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);

 

service.schedule(new Task(), 10, TimeUnit.SECONDS);

 

service.scheduleAtFixedRate(new Task(), 10, 10, TimeUnit.SECONDS);

 

service.scheduleWithFixedDelay(new Task(), 10, 10, TimeUnit.SECONDS);

那么这 3 种方法有什么区别呢?

1.第一种方法 schedule 比较简单,表示延迟指定时间后执行一次任务,如果代码中设置参数为 10 秒,也就是 10 秒后执行一次任务后就结束。

2.第二种方法 scheduleAtFixedRate 表示以固定的频率执行任务,它的第二个参数 initialDelay 表示第一次延时时间,第三个参数 period 表示周期,也就是第一次延时后每次延时多长时间执行一次任务。

3.第三种方法 scheduleWithFixedDelay 与第二种方法类似,也是周期执行任务,区别在于对周期的定义,之前的 scheduleAtFixedRate 是以任务开始的时间为时间起点开始计时,时间到就开始执行第二次任务,而不管任务需要花多久执行;而 scheduleWithFixedDelay 方法以任务结束的时间为下一次循环的时间起点开始计时。

  • SingleThreadExecutor

第四种线程池是 SingleThreadExecutor,它会使用唯一的线程去执行任务,原理和 FixedThreadPool 是一样的,只不过这里线程只有一个,如果线程在执行任务的过程中发生异常,线程池也会重新创建一个线程来执行后续的任务。这种线程池由于只有一个线程,所以非常适合用于所有任务都需要按被提交的顺序依次执行的场景,而前几种线程池不一定能够保障任务的执行顺序等于被提交的顺序,因为它们是多线程并行执行的

  • SingleThreadScheduledExecutor

第五个线程池是 SingleThreadScheduledExecutor,它实际和第三种 ScheduledThreadPool 线程池非常相似,它只是 ScheduledThreadPool 的一个特例,内部只有一个线程,如源码所示:

1
2
java复制代码
new ScheduledThreadPoolExecutor(1)

它只是将 ScheduledThreadPool 的核心线程数设置为了 1。

总结上述的五种线程池,我们以核心线程数、最大线程数,以及线程存活时间三个维度进行对比,如表格所示

image.png

  • ForkJoinPool

image.png
这个线程池是在 JDK 7 加入的,它的名字 ForkJoin 也描述了它的执行机制,主要用法和之前的线程池是相同的,也是把任务交给线程池去执行,线程池中也有任务队列来存放任务。但是 ForkJoinPool 线程池和之前的线程池有两点非常大的不同之处。第一点它非常适合执行可以产生子任务的任务。

我们有一个 Task,这个 Task 可以产生三个子任务,三个子任务并行执行完毕后将结果汇总给 Result,比如说主任务需要执行非常繁重的计算任务,我们就可以把计算拆分成三个部分,这三个部分是互不影响相互独立的,这样就可以利用 CPU 的多核优势,并行计算,然后将结果进行汇总。这里面主要涉及两个步骤,第一步是拆分也就是 Fork,第二步是汇总也就是 Join,到这里你应该已经了解到 ForkJoinPool 线程池名字的由来了。

打印出斐波那契数列的第 0 到 9 项的值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
java复制代码
package com.edu;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.ForkJoinTask;

import java.util.concurrent.RecursiveTask;

/**

* @version 1.0 2020-10-08

* @auther <a href="mailto:fengwei@uni-ubi.com">参宿</a>

* @description

* @since 1.0

*/

public class Fibonacci extends RecursiveTask<Integer> {

int n;

public Fibonacci(int n) {

this.n = n;

}

@Override

protected Integer compute() {

if (n <= 1) {

return n;

}

Fibonacci f1 = new Fibonacci(n - 1);

f1.fork();

Fibonacci f2 = new Fibonacci(n - 2);

f2.fork();

return f1.join() + f2.join();

}

public static void main(String[] args) throws ExecutionException, InterruptedException {

ForkJoinPool forkJoinPool = new ForkJoinPool();

for (int i = 0; i < 10; i++) {

ForkJoinTask task = forkJoinPool.submit(new Fibonacci(i));

System.out.println(task.get());

}

}

}

来看第二点不同,第二点不同之处在于内部结构,之前的线程池所有的线程共用一个队列,但 ForkJoinPool 线程池中每个线程都有自己独立的任务队列,如图所示

image.png

ForkJoinPool 线程池内部除了有一个共用的任务队列之外,每个线程还有一个对应的双端队列 deque,这时一旦线程中的任务被 Fork 分裂了,分裂出来的子任务放入线程自己的 deque 里,而不是放入公共的任务队列中。如果此时有三个子任务放入线程 t1 的 deque 队列中,对于线程 t1 而言获取任务的成本就降低了,可以直接在自己的任务队列中获取而不必去公共队列中争抢也不会发生阻塞(除了后面会讲到的 steal 情况外),减少了线程间的竞争和切换,是非常高效的

image.png

线程池内部结构

image.png

线程池的内部结构主要由四部分组成,如图所示。

  • 第一部分是线程池管理器,它主要负责管理线程池的创建、销毁、添加任务等管理操作,它是整个线程池的管家。
  • 第二部分是工作线程,也就是图中的线程 t0~t9,这些线程勤勤恳恳地从任务队列中获取任务并执行。
  • 第三部分是任务队列,作为一种缓冲机制,线程池会把当下没有处理的任务放入任务队列中,由于多线程同时从任务队列中获取任务是并发场景,此时就需要任务队列满足线程安全的要求,所以线程池中任务队列采用 BlockingQueue 来保障线程安全。
  • 第四部分是任务,任务要求实现统一的接口,以便工作线程可以处理和执行。

阻塞队列

image.png
线程池中的这四个主要组成部分最值得我们关注的就是阻塞队列了,如表格所示,不同的线程池会选用不同的阻塞队列。

  • LinkedBlockingQueue

对于 FixedThreadPool 和 SingleThreadExector 而言,它们使用的阻塞队列是容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue,可以认为是无界队列。由于 FixedThreadPool 线程池的线程数是固定的,所以没有办法增加特别多的线程来处理任务,这时就需要 LinkedBlockingQueue 这样一个没有容量限制的阻塞队列来存放任务。这里需要注意,由于线程池的任务队列永远不会放满,所以线程池只会创建核心线程数量的线程,所以此时的最大线程数对线程池来说没有意义,因为并不会触发生成多于核心线程数的线程。

  • SynchronousQueue

第二种阻塞队列是 SynchronousQueue,对应的线程池是 CachedThreadPool。线程池 CachedThreadPool 的最大线程数是 Integer 的最大值,可以理解为线程数是可以无限扩展的。CachedThreadPool 和上一种线程池 FixedThreadPool 的情况恰恰相反,FixedThreadPool 的情况是阻塞队列的容量是无限的,而这里 CachedThreadPool 是线程数可以无限扩展,所以 CachedThreadPool 线程池并不需要一个任务队列来存储任务,因为一旦有任务被提交就直接转发给线程或者创建新线程来执行,而不需要另外保存它们。

我们自己创建使用 SynchronousQueue 的线程池时,如果不希望任务被拒绝,那么就需要注意设置最大线程数要尽可能大一些,以免发生任务数大于最大线程数时,没办法把任务放到队列中也没有足够线程来执行任务的情况。

  • DelayedWorkQueue

第三种阻塞队列是DelayedWorkQueue,它对应的线程池分别是 ScheduledThreadPool 和 SingleThreadScheduledExecutor,这两种线程池的最大特点就是可以延迟执行任务,比如说一定时间后执行任务或是每隔一定的时间执行一次任务。DelayedWorkQueue 的特点是内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构。之所以线程池 ScheduledThreadPool 和 SingleThreadScheduledExecutor 选择 DelayedWorkQueue,是因为它们本身正是基于时间执行任务的,而延迟队列正好可以把任务按时间进行排序,方便任务的执行。

风险

不应该自动创建线程池,所谓的自动创建线程池就是直接调用 Executors 的各种方法来生成前面学过的常见的线程池,例如 Executors.newCachedThreadPool()。但这样做是有一定风险的,接下来我们就来逐一分析自动创建线程池可能带来哪些问题。

  • FixedThreadPool

它是线程数量固定的线程池,如源码所示,newFixedThreadPool 内部实际还是调用了 ThreadPoolExecutor 构造函数

1
2
3
4
5
6
java复制代码
public static ExecutorService newFixedThreadPool(int nThreads) { 

    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

}

通过往构造函数中传参,创建了一个核心线程数和最大线程数相等的线程池,它们的数量也就是我们传入的参数,这里的重点是使用的队列是容量没有上限的 LinkedBlockingQueue,如果我们对任务的处理速度比较慢,那么随着请求的增多,队列中堆积的任务也会越来越多,最终大量堆积的任务会占用大量内存,并发生 OOM ,也就是OutOfMemoryError,这几乎会影响到整个程序,会造成很严重的后果。

  • SingleThreadExecutor

第二种线程池是 SingleThreadExecutor,我们来分析下创建它的源码。

1
2
3
4
5
6
java复制代码
public static ExecutorService newSingleThreadExecutor() { 

    return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));

}

newSingleThreadExecutor 和 newFixedThreadPool 的原理是一样的,只不过把核心线程数和最大线程数都直接设置成了 1,但是任务队列仍是无界的 LinkedBlockingQueue,所以也会导致同样的问题,也就是当任务堆积时,可能会占用大量的内存并导致 OOM。

  • CachedThreadPool

第三种线程池是 CachedThreadPool,创建它的源码下所示。

1
2
3
4
5
6
java复制代码
public static ExecutorService newCachedThreadPool() { 

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());

}

这里的 CachedThreadPool 和前面两种线程池不一样的地方在于任务队列使用的是 SynchronousQueue,SynchronousQueue 本身并不存储任务,而是对任务直接进行转发,这本身是没有问题的,但你会发现构造函数的第二个参数被设置成了 Integer.MAX_VALUE,这个参数的含义是最大线程数,所以由于 CachedThreadPool 并不限制线程的数量,当任务数量特别多的时候,就可能会导致创建非常多的线程,最终超过了操作系统的上限而无法创建新线程,或者导致内存不足。

  • ScheduledThreadPool 和 SingleThreadScheduledExecutor

第四种线程池 ScheduledThreadPool 和第五种线程池 SingleThreadScheduledExecutor 的原理是一样的,创建 ScheduledThreadPool 的源码如下所示。

1
2
3
4
5
6
java复制代码
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 

    return new ScheduledThreadPoolExecutor(corePoolSize);

}

而这里的 ScheduledThreadPoolExecutor 是 ThreadPoolExecutor 的子类,调用的它的构造方法如下所示。

1
2
3
4
5
6
java复制代码
public ScheduledThreadPoolExecutor(int corePoolSize) { 

    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());

}

我们通过源码可以看出,它采用的任务队列是 DelayedWorkQueue,这是一个延迟队列,同时也是一个无界队列,所以和 LinkedBlockingQueue 一样,如果队列中存放过多的任务,就可能导致 OOM。

你可以看到,这几种自动创建的线程池都存在风险,相比较而言,我们自己手动创建会更好,因为我们可以更加明确线程池的运行规则,不仅可以选择适合自己的线程数量,更可以在必要的时候拒绝新任务的提交,避免资源耗尽的风险。

线程数设置

调整线程池中的线程数量的最主要的目的是为了充分并合理地使用 CPU 和内存等资源,从而最大限度地提高程序的性能。在实际工作中,我们需要根据任务类型的不同选择对应的策略。

CPU 密集型任务

CPU 密集型任务,比如加密、解密、压缩、计算等一系列需要大量耗费 CPU 资源的任务。对于这样的任务最佳的线程数为 CPU 核心数的 1~2 倍,如果设置过多的线程数,实际上并不会起到很好的效果。此时假设我们设置的线程数量是 CPU 核心数的 2 倍以上,因为计算任务非常重,会占用大量的 CPU 资源,所以这时 CPU 的每个核心工作基本都是满负荷的,而我们又设置了过多的线程,每个线程都想去利用 CPU 资源来执行自己的任务,这就会造成不必要的上下文切换,此时线程数的增多并没有让性能提升,反而由于线程数量过多会导致性能下降。

针对这种情况,我们最好还要同时考虑在同一台机器上还有哪些其他会占用过多 CPU 资源的程序在运行,然后对资源使用做整体的平衡。

耗时IO型任务

第二种任务是耗时 IO 型,比如数据库、文件的读写,网络通信等任务,这种任务的特点是并不会特别消耗 CPU 资源,但是 IO 操作很耗时,总体会占用比较多的时间。对于这种任务最大线程数一般会大于 CPU 核心数很多倍,因为 IO 读写速度相比于 CPU 的速度而言是比较慢的,如果我们设置过少的线程数,就可能导致 CPU 资源的浪费。而如果我们设置更多的线程数,那么当一部分线程正在等待 IO 的时候,它们此时并不需要 CPU 来计算,那么另外的线程便可以利用 CPU 去执行其他的任务,互不影响,这样的话在任务队列中等待的任务就会减少,可以更好地利用资源。

《Java并发编程实战》的作者 Brain Goetz 推荐的计算方法:

1
2
复制代码
线程数 = CPU 核心数 *(1+平均等待时间/平均工作时间)

通过这个公式,我们可以计算出一个合理的线程数量,如果任务的平均等待时间长,线程数就随之增加,而如果平均工作时间长,也就是对于我们上面的 CPU 密集型任务,线程数就随之减少。

太少的线程数会使得程序整体性能降低,而过多的线程也会消耗内存等其他资源,所以如果想要更准确的话,可以进行压测,监控 JVM 的线程情况以及 CPU 的负载情况,根据实际情况衡量应该创建的线程数,合理并充分利用资源。

结论

综上所述我们就可以得出一个结论:

  • 线程的平均工作时间所占比例越高,就需要越少的线程;
  • 线程的平均等待时间所占比例越高,就需要越多的线程;
  • 针对不同的程序,进行对应的实际测试就可以得到最合适的选择。

自定义线程池

  • 核心线程数

第一个需要设置的参数往往是 corePoolSize 核心线程数,在上一课时我们讲过,合理的线程数量和任务类型,以及 CPU 核心数都有关系,基本结论是线程的平均工作时间所占比例越高,就需要越少的线程;线程的平均等待时间所占比例越高,就需要越多的线程。而对于最大线程数而言,如果我们执行的任务类型不是固定的,比如可能一段时间是 CPU 密集型,另一段时间是 IO 密集型,或是同时有两种任务相互混搭。那么在这种情况下,我们可以把最大线程数设置成核心线程数的几倍,以便应对任务突发情况。当然更好的办法是用不同的线程池执行不同类型的任务,让任务按照类型区分开,而不是混杂在一起,这样就可以按照上一课时估算的线程数或经过压测得到的结果来设置合理的线程数了,达到更好的性能。

  • 阻塞队列

对于阻塞队列这个参数而言,我们可以选择之前介绍过的 LinkedBlockingQueue 或者 SynchronousQueue 或者 DelayedWorkQueue,不过还有一种常用的阻塞队列叫 ArrayBlockingQueue,它也经常被用于线程池中,这种阻塞队列内部是用数组实现的,在新建对象的时候要求传入容量值,且后期不能扩容,所以 ArrayBlockingQueue 的最大的特点就是容量是有限的。这样一来,如果任务队列放满了任务,而且线程数也已经达到了最大值,线程池根据规则就会拒绝新提交的任务,这样一来就可能会产生一定的数据丢失。

但相比于无限增加任务或者线程数导致内存不足,进而导致程序崩溃,数据丢失还是要更好一些的,如果我们使用了 ArrayBlockingQueue 这种阻塞队列,再加上我们限制了最大线程数量,就可以非常有效地防止资源耗尽的情况发生。此时的队列容量大小和 maxPoolSize 是一个 trade-off,如果我们使用容量更大的队列和更小的最大线程数,就可以减少上下文切换带来的开销,但也可能因此降低整体的吞吐量;如果我们的任务是 IO 密集型,则可以选择稍小容量的队列和更大的最大线程数,这样整体的效率就会更高,不过也会带来更多的上下文切换。

  • 线程工厂

对于线程工厂threadFactory 这个参数,我们可以使用默认的 defaultThreadFactory,也可以传入自定义的有额外能力的线程工厂,因为我们可能有多个线程池,而不同的线程池之间有必要通过不同的名字来进行区分,所以可以传入能根据业务信息进行命名的线程工厂,以便后续可以根据线程名区分不同的业务进而快速定位问题代码。比如可以通过com.google.common.util.concurrent.ThreadFactory

Builder 来实现,如代码所示。

1
2
3
4
java复制代码
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();

ThreadFactory rpcFactory = builder.setNameFormat("rpc-pool-%d").build();

我们生成了名字为 rpcFactory 的 ThreadFactory,它的 nameFormat 为 “rpc-pool-%d” ,那么它生成的线程的名字是有固定格式的,它生成的线程的名字分别为”rpc-pool-1”,”rpc-pool-2” ,以此类推。

  • 拒绝策略

最后一个参数是拒绝策略,我们可以根据业务需要,选择第 11 讲里的四种拒绝策略之一来使用:AbortPolicy,DiscardPolicy,DiscardOldestPolicy 或者 CallerRunsPolicy。除此之外,我们还可以通过实现 RejectedExecutionHandler 接口来实现自己的拒绝策略,在接口中我们需要实现 rejectedExecution 方法,在 rejectedExecution 方法中,执行例如打印日志、暂存任务、重新执行等自定义的拒绝策略,以便满足业务需求。如代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码
private static class CustomRejectionHandler implements RejectedExecutionHandler { 

    @Override

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 

        //打印日志、暂存任务、重新执行等拒绝策略

    } 

}

总结

所以定制自己的线程池和我们的业务是强相关的,首先我们需要掌握每个参数的含义,以及常见的选项,然后根据实际需要,比如说并发量、内存大小、是否接受任务被拒绝等一系列因素去定制一个非常适合自己业务的线程池,这样既不会导致内存不足,同时又可以用合适数量的线程来保障任务执行的效率,并在拒绝任务时有所记录方便日后进行追溯

正确关闭线程池

创建一个线程数固定为 10 的线程池,并且往线程池中提交 100 个任务,如代码所示。

1
2
3
4
5
6
7
8
java复制代码
ExecutorService service = Executors.newFixedThreadPool(10);

 for (int i = 0; i < 100; i++) { 

    service.execute(new Task());

 }

那么如果现在我们想关闭该线程池该如何做呢?本课时主要介绍 5 种在 ThreadPoolExecutor 中涉及关闭线程池的方法,如下所示。

1
2
3
4
5
6
7
8
9
10
java复制代码
void shutdown;

boolean isShutdown;

boolean isTerminated;

boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

List<Runnable> shutdownNow;
  • shutdown()

第一种方法叫作 shutdown(),它可以安全地关闭一个线程池,调用 shutdown() 方法之后线程池并不是立刻就被关闭,因为这时线程池中可能还有很多任务正在被执行,或是任务队列中有大量正在等待被执行的任务,调用 shutdown() 方法后线程池会在执行完正在执行的任务和队列中等待的任务后才彻底关闭。但这并不代表 shutdown() 操作是没有任何效果的,调用 shutdown() 方法后如果还有新的任务被提交,线程池则会根据拒绝策略直接拒绝后续新提交的任务。

  • isShutdown()

第二个方法叫作 isShutdown(),它可以返回 true 或者 false 来判断线程池是否已经开始了关闭工作,也就是是否执行了 shutdown 或者 shutdownNow 方法。这里需要注意,如果调用 isShutdown() 方法的返回的结果为 true 并不代表线程池此时已经彻底关闭了,这仅仅代表线程池开始了关闭的流程,也就是说,此时可能线程池中依然有线程在执行任务,队列里也可能有等待被执行的任务

  • isTerminated()

第三种方法叫作 isTerminated(),这个方法可以检测线程池是否真正“终结”了,这不仅代表线程池已关闭,同时代表线程池中的所有任务都已经都执行完毕了,因为我们刚才说过,调用 shutdown 方法之后,线程池会继续执行里面未完成的任务,不仅包括线程正在执行的任务,还包括正在任务队列中等待的任务。比如此时已经调用了 shutdown 方法,但是有一个线程依然在执行任务,那么此时调用 isShutdown 方法返回的是 true ,而调用 isTerminated 方法返回的便是 false ,因为线程池中还有任务正在在被执行,线程池并没有真正“终结”。直到所有任务都执行完毕了,调用 isTerminated() 方法才会返回 true,这表示线程池已关闭并且线程池内部是空的,所有剩余的任务都执行完毕了。

  • awaitTermination()

第四个方法叫作 awaitTermination(),它本身并不是用来关闭线程池的,而是主要用来判断线程池状态的。比如我们给 awaitTermination 方法传入的参数是 10 秒,那么它就会陷入 10 秒钟的等待,直到发生以下三种情况之一:

1.等待期间(包括进入等待状态之前)线程池已关闭并且所有已提交的任务(包括正在执行的和队列中等待的)都执行完毕,相当于线程池已经“终结”了,方法便会返回 true;

2.等待超时时间到后,第一种线程池“终结”的情况始终未发生,方法返回 false;

3.等待期间线程被中断,方法会抛出 InterruptedException 异常。

也就是说,调用 awaitTermination 方法后当前线程会尝试等待一段指定的时间,如果在等待时间内,线程池已关闭并且内部的任务都执行完毕了,也就是说线程池真正“终结”了,那么方法就返回 true,否则超时返回 fasle。

我们则可以根据 awaitTermination() 返回的布尔值来判断下一步应该执行的操作。

  • shutdownNow()

最后一个方法是 shutdownNow(),也是 5 种方法里功能最强大的,它与第一种 shutdown 方法不同之处在于名字中多了一个单词 Now,也就是表示立刻关闭的意思。在执行 shutdownNow 方法之后,首先会给所有线程池中的线程发送 interrupt 中断信号,尝试中断这些任务的执行,然后会将任务队列中正在等待的所有任务转移到一个 List 中并返回,我们可以根据返回的任务 List 来进行一些补救的操作,例如记录在案并在后期重试。shutdownNow() 的源码如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码
public List<Runnable> shutdownNow() { 

    List<Runnable> tasks;

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try { 

        checkShutdownAccess();

        advanceRunState(STOP);

        interruptWorkers();

        tasks = drainQueue();

    } finally { 

        mainLock.unlock();

    } 

 

    tryTerminate();

    return tasks;

 }

可以看到源码中有一行 interruptWorkers() 代码,这行代码会让每一个已经启动的线程都中断,这样线程就可以在执行任务期间检测到中断信号并进行相应的处理,提前结束任务。这里需要注意的是,由于 Java 中不推荐强行停止线程的机制的限制,即便我们调用了 shutdownNow 方法,如果被中断的线程对于中断信号不理不睬,那么依然有可能导致任务不会停止。可见我们在开发中落地最佳实践是很重要的,我们自己编写的线程应当具有响应中断信号的能力,正确停止线程的方法在第 2 讲有讲过,应当利用中断信号来协同工作。

在掌握了这 5 种关闭线程池相关的方法之后,我们就可以根据自己的业务需要,选择合适的方法来停止线程池,比如通常我们可以用 shutdown() 方法来关闭,这样可以让已提交的任务都执行完毕,但是如果情况紧急,那我们就可以用 shutdownNow 方法来加快线程池“终结”的速度。

线程复用原理

线程池会使用固定数量或可变数量的线程来执行任务,但无论是固定数量或可变数量的线程,其线程数量都远远小于任务数量,面对这种情况线程池可以通过线程复用让同一个线程去执行不同的任务,那么线程复用背后的原理是什么呢?

线程池可以把线程和任务进行解耦,线程归线程,任务归任务,摆脱了之前通过 Thread 创建线程时的一个线程必须对应一个任务的限制。在线程池中,同一个线程可以从 BlockingQueue 中不断提取新任务来执行,其核心原理在于线程池对 Thread 进行了封装,并不是每次执行任务都会调用 Thread.start() 来创建新线程,而是让每个线程去执行一个“循环任务”,在这个“循环任务”中,不停地检查是否还有任务等待被执行,如果有则直接去执行这个任务,也就是调用任务的 run 方法,把 run 方法当作和普通方法一样的地位去调用,相当于把每个任务的 run() 方法串联了起来,所以线程数量并不增加。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Boot整合WebSocket

发表于 2021-11-09

这是我参与11月更文挑战的第6天,活动详情查看:2021最后一次更文挑战。

本节简单介绍下如何在Spring Boot引入WebSocket,实现简单的客户端与服务端建立长连接并互发送文本消息。

框架搭建

新建一个Spring Boot项目,项目的pom内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.7.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cc.mrbird</groupId>
<artifactId>spring-boot-websocket-socketjs</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-websocket-socketjs</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

引入了spring-boot-starter-websocket和spring-boot-starter-web依赖。

构建服务端

在cc.mrbird.socket目录下新建handler包,然后在该包下新建MyStringWebSocketHandler继承TextWebSocketHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码@Component
public class MyStringWebSocketHandler extends TextWebSocketHandler {

private Logger log = LoggerFactory.getLogger(this.getClass());

@Override
public void afterConnectionEstablished(WebSocketSession session) {
log.info("和客户端建立连接");
}

@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
session.close(CloseStatus.SERVER_ERROR);
log.error("连接异常", exception);
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
super.afterConnectionClosed(session, status);
log.info("和客户端断开连接");
}

@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// 获取到客户端发送过来的消息
String receiveMessage = message.getPayload();
log.info(receiveMessage);
// 发送消息给客户端
session.sendMessage(new TextMessage(fakeAi(receiveMessage)));
// 关闭连接
// session.close(CloseStatus.NORMAL);
}

private static String fakeAi(String input) {
if (input == null || "".equals(input)) {
return "你说什么?没听清︎";
}
return input.replace('你', '我')
.replace("吗", "")
.replace('?', '!')
.replace('?', '!');
}
}

该类重写了父类AbstractWebSocketHandler的四个方法:

QQ20200316-185332@2x

  • afterConnectionEstablished,和客户端链接成功的时候触发该方法;
  • handleTransportError,和客户端连接失败的时候触发该方法;
  • afterConnectionClosed,和客户端断开连接的时候触发该方法;
  • handleTextMessage,和客户端建立连接后,处理客户端发送的请求。

WebSocketSession对象代表每个客户端会话,包含许多实用方法:

QQ20200316-185851@2x

方法见名知意,就不赘述了。

此外,因为我们的目的是实现和客户端的通信,并且内容为文本内容,所以我们继承的是TextWebSocketHandler;如果传输的是二进制内容,则可以继承BinaryWebSocketHandler,更多信息可以自行查看WebSocketHandler的子类。

接着在cc.mrbird.socket目录下新建configure包,然后在该包下新建WebSocketServerConfigure配置类:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Configuration
@EnableWebSocket
public class WebSocketServerConfigure implements WebSocketConfigurer {

@Autowired
private MyStringWebSocketHandler myStringWebSocketHandler;

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myStringWebSocketHandler, "/connect").withSockJS();
}
}

@EnableWebSocket用于开启WebSocket相关功能,我们注入了上面创建的MyStringWebSocketHandler,并将其注册到了WebSocketHandlerRegistry。

上面代码的含义是,当客户端通过/connecturl和服务端连接通信时,使用MyStringWebSocketHandler处理会话。withSockJS的含义是,通信的客户端是通过SockJS实现的,下面会介绍到。

构建客户端

SockJS是一个JS插件,用于构建WebSocket,兼容性好。

在resources目录下新建static包,然后在该包下新建client.html:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
html复制代码<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket客户端</title>
<script src="https://cdn.bootcss.com/sockjs-client/0.3.4/sockjs.min.js"></script>
<link href="https://cdn.bootcss.com/twitter-bootstrap/4.4.1/css/bootstrap.min.css" rel="stylesheet">
</head>
<body>
<style>
.jumbotron {
width: 100%;
}

#text {
height: 3rem;
font-size: 1rem;
line-height: 3rem;
margin: 1rem;
}

.btn {
margin-right: 5px;
}

#connect {
margin-left: 1rem;
}

#log {
margin: 1rem 0 0 1rem;
}

</style>
<div class="container">
<div class="row">
<div class="jumbotron">
<input type="text" placeholder="请输入你想传输的内容" id="text" class="col-lg-12"/>
<input type="button" value="连接" class="btn btn-info" id="connect" onclick="connect()"/>
<input type="button" value="发送" class="btn btn-success" id="sent" disabled="disabled" onclick="sent()"/>
<input type="button" value="断开" class="btn btn-danger" id="disconnect" disabled="disabled"
onclick="disconnect()"/>

<div id="log">
<p>聊天记录:</p>
</div>
</div>
</div>
</div>
<script type="text/javascript">
let text = document.querySelector('#text');
let connectBtn = document.querySelector("#connect");
let sentBtn = document.querySelector("#sent");
let disconnectBtn = document.querySelector("#disconnect");
let logDiv = document.querySelector("#log");

let ws = null;

function connect() {
let targetUri = "/connect";
ws = new SockJS(targetUri);
ws.onopen = function () {
setConnected(true);
log('和服务端连接成功!');
};
ws.onmessage = function (event) {
log('服务端说:' + event.data);
};
ws.onclose = function () {
setConnected(false);
log('和服务端断开连接!')
}
}

function sent() {
if (ws != null) {
ws.send(text.value);
log('客户端说:' + text.value);
} else {
log('请先建立连接!')
}
}

function disconnect() {
if (ws != null) {
ws.close();
ws = null;
}
setConnected(false);
}

function log(value) {
let content = document.createElement('p');
content.innerHTML = value;
logDiv.appendChild(content);
text.value = '';
}

function setConnected(connected) {
connectBtn.disabled = connected;
disconnectBtn.disabled = !connected;
sentBtn.disabled = !connected;
}
</script>
</body>
</html>

html,css那些都不重要,重要的是我们引入了SockJS库。在connect()方法中,我们通过new SockJS(/connect)和上面的服务端建立了Socket通信。SockJS对象包含几个常用的实用方法:

  • onopen,和服务端讲了连接后的回调方法;
  • onmessage,服务端返回消息时的回调方法;
  • onclose,和服务端断开连接的回调方法;
  • send,发送消息给服务端;
  • close,断开和服务端的连接。

上面的JS较为简单,其他逻辑自己看看吧。

通信测试

启动项目,浏览器访问:http://localhost:8080/client.html:

QQ20200316-191957@2x

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

死磕synchronized二:系统剖析延迟偏向篇一 是什么

发表于 2021-11-09

哈喽,大家好,我是江湖人送外号[道格牙]的子牙老师。

近期准备写一个专栏:从Hotspot源码角度剖析synchronized。前前后后大概有10篇,会全网发,写完后整理成电子书放公众号供大家下载。对本专栏感兴趣的、希望彻彻底底学明白synchronized的小伙伴可以关注一波。电子书整理好了会通过公众号群发告知大家。我的公众号:硬核子牙。

市面上关于synchronized的资料已经很多了,我这个专栏跟那些资料有啥差别呢:

  1. 更系统。市面上目前虽然资料众多,但都是零散的。有些资料讲得东西甚至是相互冲突的,都不知道信谁的。我准备从Java层面到JVM层面到操作系统层面系统的去分析用synchronized后呈现的每个现象背后的本质。synchronized很多知识点市面上是没有资料讲的,我给它补上。
  2. 更接近真相。市面上的很多资料,有的是基于字节码解释器那块的代码yy出来的,有的是东拼西凑整合出来的,各个说的都像真的一样,把看的人搞蒙圈了。我准备从模板解释器代码入手,单步调试着研究,有些不确定的自己写代码去证明,争取分享给大家的都是本来如此的知识。不确定的地方我会标注出来。
  3. 授人以鱼不如授人以渔。我会以大家学完后能够手写出synchronized的标准来设计这个专栏。因为从我自己研究的角度来说,抛开语言的障碍,synchronized的每种机制如果让你实现你手足无措,那你还是没有真正地理解synchronized。言外之意就是你不一定要去手写,但是你在脑海中回想,比如CAS、锁膨胀、锁对象加锁解锁……你大概知道代码是怎么写的。

本篇文章是第二篇,聚焦分析偏向锁延迟策略:

  1. 什么是延迟偏向
  2. 为什么需要延迟偏向
  3. 延迟偏向机制是怎样的
  4. 延迟偏向对锁膨胀的影响及证明
  5. 从Hotspot源码角度证明

内容有点多,分两篇发。

是什么

什么是偏向延迟呢?见名知意:偏向锁就算是开启的,也不是马上就可以用的,中间有个延时。
对应的JVM参数是BiasedLockingStartupDelay,默认是4秒,可通过-XX:BiasedLockingStartupDelay修改

为什么

不知道大家在看到JVM中有偏向延迟这个机制的时候,脑海中有没有冒出这么几个问题:1、为什么要搞个偏向延迟?2、这个延时是从什么时候开始计算的?从JVM启动时吗?

为什么要设计

先回答第一个问题。这个问题的答案在网上有很多版本,最权威的答案就是这段注释。翻译过来就是说:这是一个启动时间回归的解决方案。说人话就是这样做,JVM启动可以更快。

为什么会更快呢?按照注释的说法:因为JVM在启动期间会采取大量安全点来消除偏差。这跟偏向锁有啥关系?说下我的理解哈,不一定是JVM工程师设计此的初衷。安全点大家应该是很熟悉了,启用安全点会带来STW。而偏向锁的撤销与重偏向判断,也是需要启用安全点的,因为需要扫描所有线程的虚拟机栈,需要内存静止才能保证结果准确。而JVM在启动期间用到的锁,包括初始化很多类的过程中用的锁,都会经过偏向锁逻辑,如果没有偏向延迟,就会带来更多的STW,导致JVM启动时间过长。

多说一句,有点不好理解:启动期间启用安全点消除偏差是一种先行发生策略,是为了保证启动期间有互动的多个线程的业务先后顺序。跟为了追求低延时数据同步插入内存屏障触发即时回写内存是差不多的思想。

延时何时计算

看下上面的代码,如果有延时,就创建一个任务。偏向延迟就是在这个任务中完成的,是由WatcherThread执行的,延迟偏向以后,WatcherThread执行到任务的task方法,创建一个VM_Operation丢入VMThread的任务池队列,等待VMThread执行。如果木有延时,就很直接了,创建一个VM_Operation丢人VMThread的任务池队列,等待VMThread执行。

顺便吐槽下,这个延时实现相当复杂。总之,Hotspot源码里面,就没有简单的东西。作为局外人,有些地方,我是真的不解,为啥要搞那么复杂。比如偏向锁整个机制,也是复杂的一批。

那延时是从哪开始的呢?是WatcherThread执行到sleep方法开始的,因为计算剩余时间一定需要与当前时间进行对比。

1
2
3
4
5
6
7
8
arduino复制代码void WatcherThread::run() {
……
// Calculate how long it'll be until the next PeriodicTask work
// should be done, and sleep that amount of time.
int time_waited = sleep();
……
PeriodicTask::real_time_tick(time_waited);
}

锁类型

synchronized对应的锁类型有这些:

  1. 无锁
  2. 偏向锁
  3. 轻量级锁
  4. 重量级锁

这些锁存储在哪个位置呢?对象头中。Java的每个对象,在JVM中的结构如图。Mark Word区域就是对象头。展开来就是上图的样子

对锁膨胀的影响

好的答案从好的问题开始。对于这个问题,咱们从这几个问题开始着手:

  1. 延迟偏向之前创建的对象是什么锁
  2. 延迟偏向之后创建的对象是什么锁
  3. 创建的对象的锁是是如何被延迟偏向影响的。这个下篇讲
  4. 延迟偏向之后,之前创建的对象持有的锁会被批量修改吗
  5. 有或无延迟偏向,锁如何膨胀

延迟偏向之前创建的对象是无锁状态。细节下篇讲

延迟偏向之后创建的对象是未偏向的偏向锁。细节后面讲

延迟偏向之前创建的对象,延迟偏向后,是不会批量修改的。言外之意就是延迟偏向之前创建的对象是无锁,延迟偏向之后还是无锁。

延迟偏向之前是无锁,膨胀后不会经历偏向锁,会直接膨胀成轻量级锁

延迟偏向之后创建的对象是未偏向的偏向锁,经过synchronized就会变成偏向当前线程的偏向锁

关于synchronized的锁膨胀逻辑,后面写文章细讲。

系列文章

1、JVM如何执行synchronized修饰的方法

推荐阅读

1、你是不是想问,那些技术大牛是如何练成的?我来告诉你

2、从hotspot源码层面剖析Java的多态实现原理

3、如何找到native方法对应的Hotspot源码

结语

其实技术这个行业真的不难,如果有人带,打底子1-2年,沉淀2-3年,足矣。我自己一步步探索,走得还算顺利,大概花了七年时间。

给大家看看我之前写的一些项目,证明下我不是在吹牛。我不喜欢吹牛,我也不喜欢水课水文章,内心接受不了。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Fork/Join使用学习

发表于 2021-11-09

这是我参与11月更文挑战的第6天,活动详情查看:2021最后一次更文挑战。
JDK7提供了一个将任务“分而治之”的框架 — Fork/Join。它把一个大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割。分割的子任务分别放到双端队列里,然后启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。

Fork/Join的思想如下所示:QQ截图20190702172347.png

RecursiveTask

QQ截图20190703111904.png

RecursiveTask适用于将任务分而治之,并且有返回值的情况,举个计算1到100和的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
java复制代码public class RecursiveTest {
// 定义最小区间为10
private final static int MAX_THRESHOLD = 10;

public static void main(String[] args) {
final ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> future = forkJoinPool.submit(new CalculateRecursiveTask(1, 100));
try {
Integer result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

private static class CalculateRecursiveTask extends RecursiveTask<Integer> {
// 起始
private int start;
// 结束
private int end;

public CalculateRecursiveTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
// 如果起始和结束范围小于我们定义的区间范围,则直接计算
if ((end - start) <= MAX_THRESHOLD) {
return IntStream.rangeClosed(start, end).sum();
} else {
// 否则,将范围一分为二,分成两个子任务
int middle = (start + end) / 2;
CalculateRecursiveTask leftTask = new CalculateRecursiveTask(start, middle);
CalculateRecursiveTask rightTask = new CalculateRecursiveTask(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();

// 汇总子任务
return leftTask.join() + rightTask.join();
}
}
}
}

ForkJoinPool使用submit或invoke提交的区别:invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit是异步执行,只有在Future调用get的时候会阻塞。

启动程序输出如下:

5050

其实这里执行子任务调用fork方法并不是最佳的选择,最佳的选择是invokeAll方法:

1
2
3
4
5
6
java复制代码// 执行子任务
// leftTask.fork();
// rightTask.fork();
invokeAll(leftTask,rightTask);
// 汇总子任务
return leftTask.join() + rightTask.join();

RecursiveAction

QQ截图20190703114411.png

使用方式和RecursiveTask类似,只不过没有返回值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码public class RecursiveActionTest {
// 定义最小区间为10
private final static int MAX_THRESHOLD = 10;
private final static AtomicInteger SUM = new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException {
final ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.submit(new CalculateRecursiveAction(0, 100));
forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
System.out.println(SUM);
}

private static class CalculateRecursiveAction extends RecursiveAction {
// 起始
private final int start;
// 结束
private final int end;

private CalculateRecursiveAction(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected void compute() {
// 如果起始和结束范围小于我们定义的区间范围,则直接计算
if ((end - start) <= MAX_THRESHOLD) {
SUM.addAndGet(IntStream.rangeClosed(start, end).sum());
} else {
// 否则,将范围一分为二,分成两个子任务
int middle = (end + start) / 2;
CalculateRecursiveAction leftAction = new CalculateRecursiveAction(start, middle);
CalculateRecursiveAction rightAction = new CalculateRecursiveAction(middle + 1, end);
// 执行子任务
invokeAll(leftAction, rightAction);
// 没有汇总子任务结果过程,因为没有返回值。
}
}
}
}

输出结果也是5050。

理解Fork/Join框架API

Fork/Join框架在java.util.concurrent包下被实现。它的核心有4个类:

  • ForkJoinTask: 这是一个抽象任务类,并且运行在ForkJoinPool中。
  • ForkJoinPool:这是一个线程池管理并运行众多ForkJoinTask任务。
  • RecursiveAction: ForkJoinTask的子类,这个类没有返回值。
  • RecursiveTask: ForkJoinTask的子类,有返回值。

基本上,我们解决问题的代码是在RecursiveAction或者RecursiveTask中进行的,然后将任务提交由ForkJoinPool`执行,ForkJoinPool处理从线程管理到多核处理器的利用等各种事务。

我们先来理解一下这些类中的关键方法。

ForkJoinTask

这是一个运行在ForkJoinPool中的抽象的任务类。类型V指定了任务的返回结果。ForkJoinTask是一个类似线程的实体,它表示任务的轻量级抽象,而不是实际的执行线程。该机制允许由ForkJoinPool中的少量实际线程管理大量任务。其关键方法是:

  • final ForkJoinTask fork()
  • final V join()
  • final V invoke()

fork()方法提交并执行异步任务,该方法返回ForkJoinTask并且调用线程继续运行。

join()方法等待任务直到返回结果。

invoke()方法是组合了fork()和join(),它开始一个任务并等待结束返回结果。

此外,ForkJoinTask中还提供了用于一次调用多个任务的两个静态方法

  • static void invokeAll(ForkJoinTask task1, ForkJoinTask task2) :执行两个任务
  • static void invokeAll(ForkJoinTask<?>… taskList):执行任务集合

RecursiveAction

这是一个递归的ForkJoinTask子类,不返回结果。Recursive意思是任务可以通过分治策略分成自己的子任务(在下面的下一节中,您将看到如何划分代码示例)。

我们必须重写compute()方法,并将计算代码写在其中:

1
java复制代码protected abstract void compute();

RecursiveTask

和RecursiveAction一样,但是RecursiveTask有返回结果,结果类型由V指定。我们仍然需要重写compute()方法:

1
java复制代码protected abstract V compute();

ForkJoinPool

这是Fork/Join框架的核心类。它负责线程的管理和ForkJoinTask的执行,为了执行ForkJoinTask,首先需要获取到ForkJoinPool的实例。

有两种构造器方式可以获取ForkJoinPool的实例,第一种使用构造器创建:

  • ForkJoinPool(): 使用默认的构造器创建实例,该构造器创建出的池与系统中可用的处理器数量相等。
  • ForkJoinPool(int parallelism):该构造器指定处理器数量,创建具有自定义并行度级别的池,该级别的并行度必须大于0,且不超过可用处理器的实际数量。

并行性的级别决定了可以并发执行的线程的数量。换句话说,它决定了可以同时执行的任务的数量——但不能超过处理器的数量。

但是,这并不限制池可以管理的任务的数量。ForkJoinPool可以管理比其并行级别多得多的任务。

获取ForkJoinPool实例的第二种方法是使用以下ForkJoinPool的静态方法获取公共池实例:

1
java复制代码public static ForkJoinPool commonPool();

这种方式创建的池不受shutdown()或者shutdownNow()方法的影响,但是他会在System.exit()时会自动中止。任何依赖异步任务处理的程序在主体程序中止前都应该调用awaitQuiescence()方法。该方式是静态的,可以自动被使用。

作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

那么为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

什么时候用

上面只是为了演示Fork/Join的用法,实际是采用这种方式计算反而更加费时,因为切割任务,分配线程需要额外的开销。其实什么时候用不必太纠结,一个足够大的任务,如果采用Fork/Join来处理比传统处理方式快的话,那就毫不犹豫的选择它吧!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java-锁

发表于 2021-11-09

锁

Java 中的锁而言,一把锁也有可能同时占有多个标准,符合多种分类,比如 ReentrantLock 既是可中断锁,又是可重入锁。

根据分类标准我们把锁分为以下 7 大类别,分别是:

  • 偏向锁/轻量级锁/重量级锁;
  • 可重入锁/非可重入锁;
  • 共享锁/独占锁;
  • 公平锁/非公平锁;
  • 悲观锁/乐观锁;
  • 自旋锁/非自旋锁;
  • 可中断锁/不可中断锁

偏向锁/轻量级锁/重量级锁

第一种分类是偏向锁/轻量级锁/重量级锁,这三种锁特指 synchronized 锁的状态,通过在对象头中的 mark word 来表明锁的状态。

  • 偏向锁

如果自始至终,对于这把锁都不存在竞争,那么其实就没必要上锁,只需要打个标记就行了,这就是偏向锁的思想。一个对象被初始化后,还没有任何线程来获取它的锁时,那么它就是可偏向的,当有第一个线程来访问它并尝试获取锁的时候,它就将这个线程记录下来,以后如果尝试获取锁的线程正是偏向锁的拥有者,就可以直接获得锁,开销很小,性能最好。

  • 轻量级锁

JVM 开发者发现在很多情况下,synchronized 中的代码是被多个线程交替执行的,而不是同时执行的,也就是说并不存在实际的竞争,或者是只有短时间的锁竞争,用 CAS 就可以解决,这种情况下,用完全互斥的重量级锁是没必要的。轻量级锁是指当锁原来是偏向锁的时候,被另一个线程访问,说明存在竞争,那么偏向锁就会升级为轻量级锁,线程会通过自旋的形式尝试获取锁,而不会陷入阻塞。

  • 重量级锁

重量级锁是互斥锁,它是利用操作系统的同步机制实现的,所以开销相对比较大。当多个线程直接有实际竞争,且锁竞争时间长的时候,轻量级锁不能满足需求,锁就会膨胀为重量级锁。重量级锁会让其他申请却拿不到锁的线程进入阻塞状态。

image.png

综上所述,偏向锁性能最好,可以避免执行 CAS 操作。而轻量级锁利用自旋和 CAS 避免了重量级锁带来的线程阻塞和唤醒,性能中等。重量级锁则会把获取不到锁的线程阻塞,性能最差

可重入锁/非可重入锁

image.png

共享锁/独占锁

共享锁指的是我们同一把锁可以被多个线程同时获得,而独占锁指的就是,这把锁只能同时被一个线程获得。我们的读写锁,就最好地诠释了共享锁和独占锁的理念。读写锁中的读锁,是共享锁,而写锁是独占锁。读锁可以被同时读,可以同时被多个线程持有,而写锁最多只能同时被一个线程持有

公平锁/非公平锁

公平锁的公平的含义在于如果线程现在拿不到这把锁,那么线程就都会进入等待,开始排队,在等待队列里等待时间长的线程会优先拿到这把锁,有先来先得的意思。而非公平锁就不那么“完美”了,它会在一定情况下,忽略掉已经在排队的线程,发生插队现象

悲观锁/乐观锁

悲观锁的概念是在获取资源之前,必须先拿到锁,以便达到“独占”的状态,当前线程在操作资源的时候,其他线程由于不能拿到锁,所以其他线程不能来影响我。而乐观锁恰恰相反,它并不要求在获取资源前拿到锁,也不会锁住资源;相反,乐观锁利用 CAS 理念,在不独占资源的情况下,完成了对资源的修改。

自旋锁/非自旋锁

自旋锁的理念是如果线程现在拿不到锁,并不直接陷入阻塞或者释放 CPU 资源,而是开始利用循环,不停地尝试获取锁,这个循环过程被形象地比喻为“自旋”,就像是线程在“自我旋转”。相反,非自旋锁的理念就是没有自旋的过程,如果拿不到锁就直接放弃,或者进行其他的处理逻辑,例如去排队、陷入阻塞等

可中断锁/不可中断锁

在 Java 中,synchronized 关键字修饰的锁代表的是不可中断锁,一旦线程申请了锁,就没有回头路了,只能等到拿到锁以后才能进行其他的逻辑处理。而我们的 ReentrantLock 是一种典型的可中断锁,例如使用 lockInterruptibly 方法在获取锁的过程中,突然不想获取了,那么也可以在中断之后去做其他的事情,不需要一直傻等到获取到锁才离开

悲观锁/乐观锁本质

悲观锁比较悲观,它认为如果不锁住这个资源,别的线程就会来争抢,就会造成数据结果错误,所以悲观锁为了确保结果的正确性,会在每次获取并修改数据时,都把数据锁住,让其他线程无法访问该数据,这样就可以确保数据内容万无一失

image.png

image.png

synchronized 背后的“monitor 锁”

获取和释放 monitor 锁的时机

最简单的同步方式就是利用 synchronized 关键字来修饰代码块或者修饰一个方法,那么这部分被保护的代码,在同一时刻就最多只有一个线程可以运行,而 synchronized 的背后正是利用 monitor 锁实现的。所以首先我们来看下获取和释放 monitor 锁的时机,每个 Java 对象都可以用作一个实现同步的锁,这个锁也被称为内置锁或 monitor 锁,获得 monitor 锁的唯一途径就是进入由这个锁保护的同步代码块或同步方法,线程在进入被 synchronized 保护的代码块之前,会自动获取锁,并且无论是正常路径退出,还是通过抛出异常退出,在退出的时候都会自动释放锁。

synchronized 和 Lock 如何选择?

相同点

synchronized 和 Lock 的相同点非常多,我们这里重点讲解 3 个比较大的相同点。

1.synchronized 和 Lock 都是用来保护资源线程安全的。

2.都可以保证可见性。

image.png

对于 synchronized 而言,线程 A 在进入 synchronized 块之前或在 synchronized 块内进行操作,对于后续的获得同一个 monitor 锁的线程 B 是可见的,也就是线程 B 是可以看到线程 A 之前的操作的,这也体现了 happens-before 针对 synchronized 的一个原则

而对于 Lock 而言,它和 synchronized 是一样,都可以保证可见性,如图所示,在解锁之前的所有操作对加锁之后的所有操作都是可见的
image.png
3.synchronized 和 ReentrantLock 都拥有可重入的特点

ReentrantLock 是 Lock 接口的一个最主要的实现类,在对比 synchronized 和 Lock 的时候,也会选择 Lock 的主要实现类来进行对比。可重入指的是某个线程如果已经获得了一个锁,现在试图再次请求这个它已经获得的锁,如果它无需提前释放这个锁,而是直接可以继续使用持有的这个锁,那么就是可重入的。如果必须释放锁后才能再次申请这个锁,就是不可重入的。而 synchronized 和 ReentrantLock 都具有可重入的特性

不同点

image.png

  • 加解锁顺序不同

对于 Lock 而言如果有多把 Lock 锁,Lock 可以不完全按照加锁的反序解锁,比如我们可以先获取 Lock1 锁,再获取 Lock2 锁,解锁时则先解锁 Lock1,再解锁 Lock2,加解锁有一定的灵活度,如代码所示。

1
2
3
4
5
6
7
8
9
10
java复制代码
lock1.lock();

lock2.lock();

...

lock1.unlock();

lock2.unlock();

但是 synchronized 无法做到,synchronized 解锁的顺序和加锁的顺序必须完全相反,例如:

1
2
3
4
5
6
7
8
9
10
java复制代码
synchronized(obj1){

    synchronized(obj2){

        ...

    }

}

那么在这里,顺序就是先对 obj1 加锁,然后对 obj2 加锁,然后对 obj2 解锁,最后解锁 obj1。这是因为 synchronized 加解锁是由 JVM 实现的,在执行完 synchronized 块后会自动解锁,所以会按照 synchronized 的嵌套顺序加解锁,不能自行控制。

  • synchronized 锁不够灵活

一旦 synchronized 锁已经被某个线程获得了,此时其他线程如果还想获得,那它只能被阻塞,直到持有锁的线程运行完毕或者发生异常从而释放这个锁。如果持有锁的线程持有很长时间才释放,那么整个程序的运行效率就会降低,而且如果持有锁的线程永远不释放锁,那么尝试获取锁的线程只能永远等下去。

相比之下,Lock 类在等锁的过程中,如果使用的是 lockInterruptibly 方法,那么如果觉得等待的时间太长了不想再继续等待,可以中断退出,也可以用 tryLock() 等方法尝试获取锁,如果获取不到锁也可以做别的事,更加灵活。

  • synchronized 锁只能同时被一个线程拥有,但是 Lock 锁没有这个限制

例如在读写锁中的读锁,是可以同时被多个线程持有的,可是 synchronized 做不到。

如何选择

讲完了 synchronized 和 Lock 的相同点和区别,最后我们再来看下如何选择它们,在 Java 并发编程实战和 Java 核心技术里都认为:

如果能不用最好既不使用 Lock 也不使用 synchronized。因为在许多情况下你可以使用 java.util.concurrent 包中的机制,它会为你处理所有的加锁和解锁操作,也就是推荐优先使用工具类来加解锁。

如果 synchronized 关键字适合你的程序, 那么请尽量使用它,这样可以减少编写代码的数量,减少出错的概率。因为一旦忘记在 finally 里 unlock,代码可能会出很大的问题,而使用 synchronized 更安全。

如果特别需要 Lock 的特殊功能,比如尝试获取锁、可中断、超时功能等,才使用 Lock

Lock常用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码
public interface Lock {

    void lock();

    void lockInterruptibly() throws InterruptedException;

    boolean tryLock();

    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    void unlock();

    Condition newCondition();

}

接下来重点分析这 5 种方法的作用和用法,这 5 种方法分别是 lock()、tryLock()、tryLock(long time, TimeUnit unit) 和 lockInterruptibly()、unlock()。

lock() 方法

在 Lock 接口中声明了 4 种方法来获取锁(lock()、tryLock()、tryLock(long time, TimeUnit unit)和lockInterruptibly()),那么这 4 种方法具体有什么区别呢?

首先,lock() 是最基础的获取锁的方法。在线程获取锁时如果锁已被其他线程获取,则进行等待,是最初级的获取锁的方法。

对于 Lock 接口而言,获取锁和释放锁都是显式的,不像 synchronized 那样是隐式的,所以 Lock 不会像 synchronized 一样在异常时自动释放锁(synchronized 即使不写对应的代码也可以释放),lock 的加锁和释放锁都必须以代码的形式写出来,所以使用 lock() 时必须由我们自己主动去释放锁,因此最佳实践是执行 lock() 后,首先在 try{} 中操作同步资源,如果有必要就用 catch{} 块捕获异常,然后在 finally{} 中释放锁,以保证发生异常时锁一定被释放,示例代码如下所示。

公平锁与非公平锁 为什么需要非公平锁?

对比公平和非公平的优缺点

image.png

公平锁的优点在于各个线程公平平等,每个线程等待一段时间后,都有执行的机会,而它的缺点就在于整体执行速度更慢,吞吐量更小,相反非公平锁的优势就在于整体执行速度更快,吞吐量更大,但同时也可能产生线程饥饿问题,也就是说如果一直有线程插队,那么在等待队列中的线程可能长时间得不到运行。

  • 源码分析

在 ReentrantLock 类包含一个 Sync 类,这个类继承自AQS(AbstractQueuedSynchronizer),代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码
public class ReentrantLock implements Lock, java.io.Serializable {

 

private static final long serialVersionUID = 7373984872572414699L;

 

/** Synchronizer providing all implementation mechanics */

 

private final Sync sync;

Sync 类的代码:

1
2
java复制代码
abstract static class Sync extends AbstractQueuedSynchronizer {...}

根据代码可知,Sync 有公平锁 FairSync 和非公平锁 NonfairSync两个子类:

1
2
3
4
java复制代码
static final class NonfairSync extends Sync {...}

static final class FairSync extends Sync {...}

公平锁与非公平锁的加锁方法的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码
protected final boolean tryAcquire(int acquires) {

    final Thread current = Thread.currentThread();

    int c = getState();

    if (c == 0) {

        if (!hasQueuedPredecessors() && //这里判断了 hasQueuedPredecessors()

                compareAndSetState(0, acquires)) {

            setExclusiveOwnerThread(current);

            return true;

        }

    } else if (current == getExclusiveOwnerThread()) {

        int nextc = c + acquires;

        if (nextc < 0) {

            throw new Error("Maximum lock count exceeded");

        }

        setState(nextc);

        return true;

    }

    return false;

}

非公平锁的锁获取源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码
final boolean nonfairTryAcquire(int acquires) {

    final Thread current = Thread.currentThread();

    int c = getState();

    if (c == 0) {

        if (compareAndSetState(0, acquires)) { //这里没有判断      hasQueuedPredecessors()

            setExclusiveOwnerThread(current);

            return true;

        }

    }

    else if (current == getExclusiveOwnerThread()) {

        int nextc = c + acquires;

        if (nextc < 0) // overflow

        throw new Error("Maximum lock count exceeded");

        setState(nextc);

        return true;

    }

    return false;

}

我们可以明显的看出公平锁与非公平锁的 lock() 方法唯一的区别就在于公平锁在获取锁时多了一个限制条件:hasQueuedPredecessors() 为 false,这个方法就是判断在等待队列中是否已经有线程在排队了。这也就是公平锁和非公平锁的核心区别,如果是公平锁,那么一旦已经有线程在排队了,当前线程就不再尝试获取锁;对于非公平锁而言,无论是否已经有线程在排队,都会尝试获取一下锁,获取不到的话,再去排队。

这里有一个特例需要我们注意,针对 tryLock() 方法,它不遵守设定的公平原则。

例如,当有线程执行 tryLock() 方法的时候,一旦有线程释放了锁,那么这个正在 tryLock 的线程就能获取到锁,即使设置的是公平锁模式,即使在它之前已经有其他正在等待队列中等待的线程,简单地说就是 tryLock 可以插队。

看它的源码就会发现:

1
2
3
4
5
6
java复制代码
public boolean tryLock() {

    return sync.nonfairTryAcquire(1);

}

这里调用的就是 nonfairTryAcquire(),表明了是不公平的,和锁本身是否是公平锁无关。

公平锁就是会按照多个线程申请锁的顺序来获取锁,从而实现公平的特性。非公平锁加锁时不考虑排队等待情况,直接尝试获取锁,所以存在后申请却先获得锁的情况,但由此也提高了整体的效率。

读写锁

在没有读写锁之前,我们假设使用普通的 ReentrantLock,那么虽然我们保证了线程安全,但是也浪费了一定的资源,因为如果多个读操作同时进行,其实并没有线程安全问题,我们可以允许让多个读操作并行,以便提高程序效率。

但是写操作不是线程安全的,如果多个线程同时写,或者在写的同时进行读操作,便会造成线程安全问题。

整体思路是它有两把锁,第 1 把锁是写锁,获得写锁之后,既可以读数据又可以修改数据,而第 2 把锁是读锁,获得读锁之后,只能查看数据,不能修改数据。读锁可以被多个线程同时持有,所以多个线程可以同时查看数据。

读写锁的获取规则

我们在使用读写锁时遵守下面的获取规则:

1.如果有一个线程已经占用了读锁,则此时其他线程如果要申请读锁,可以申请成功。

2.如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁,因为读写不能同时操作。

3.如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,都必须等待之前的线程释放写锁,同样也因为读写不能同时,并且两个线程不应该同时写。

一句话总结:要么是一个或多个线程同时有读锁,要么是一个线程有写锁,但是两者不会同时出现。也可以总结为:读读共享、其他都互斥(写写互斥、读写互斥、写读互斥)

案例

ReentrantReadWriteLock 是 ReadWriteLock 的实现类,最主要的有两个方法:readLock() 和 writeLock() 用来获取读锁和写锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
java复制代码
/**

* 描述: 演示读写锁用法

*/

public class ReadWriteLockDemo {

private static final ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(

false);

private static final ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock

.readLock();

private static final ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock

.writeLock();

private static void read() {

readLock.lock();

try {

System.out.println(Thread.currentThread().getName() + "得到读锁,正在读取");

Thread.sleep(500);

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

System.out.println(Thread.currentThread().getName() + "释放读锁");

readLock.unlock();

}

}

private static void write() {

writeLock.lock();

try {

System.out.println(Thread.currentThread().getName() + "得到写锁,正在写入");

Thread.sleep(500);

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

System.out.println(Thread.currentThread().getName() + "释放写锁");

writeLock.unlock();

}

}

public static void main(String[] args) throws InterruptedException {

new Thread(() -> read()).start();

new Thread(() -> read()).start();

new Thread(() -> write()).start();

new Thread(() -> write()).start();

}

}

总结ReentrantReadWriteLock

  • 插队策略
  1. 公平策略下,只要队列里有线程已经在排队,就不允许插队。

2.非公平策略下:

1).如果允许读锁插队,那么由于读锁可以同时被多个线程持有,所以可能造成源源不断的后面的线程一直插队成功,导致读锁一直不能完全释放,从而导致写锁一直等待,为了防止“饥饿”,在等待队列的头结点是尝试获取写锁的线程的时候,不允许读锁插队

2).写锁可以随时插队,因为写锁并不容易插队成功,写锁只有在当前没有任何其他线程持有读锁和写锁的时候,才能插队成功,同时写锁一旦插队失败就会进入等待队列,所以很难造成“饥饿”的情况,允许写锁插队是为了提高效率。

  • 升降级策略:

只能从写锁降级为读锁,不能从读锁升级为写锁。

自旋锁

自旋锁:

“自旋”可以理解为“自我旋转”,这里的“旋转”指“循环”,比如 while 循环或者 for 循环。“自旋”就是自己在这里不停地循环,直到目标达成。而不像普通的锁那样,如果获取不到锁就进入阻塞

|-> 对比自旋和非自旋的获取锁的流程

image.png

|-> 自旋锁的好处

首先,阻塞和唤醒线程都是需要高昂的开销的,如果同步代码块中的内容不复杂,那么可能转换线程带来的开销比实际业务代码执行的开销还要大。

在很多场景下,可能我们的同步代码块的内容并不多,所以需要的执行时间也很短,如果我们仅仅为了这点时间就去切换线程状态,那么其实不如让线程不切换状态,而是让它自旋地尝试获取锁,等待其他线程释放锁,有时我只需要稍等一下,就可以避免上下文切换等开销,提高了效率。

那就是自旋锁用循环去不停地尝试获取锁,让线程始终处于 Runnable 状态,节省了线程状态切换带来的开销

案例: 可重入的自旋锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
java复制代码
package lesson27;

 

import java.util.concurrent.atomic.AtomicReference;

import java.util.concurrent.locks.Lock;

 

/**

 * 描述:     实现一个可重入的自旋锁

 */

public class ReentrantSpinLock  {

 

    private AtomicReference<Thread> owner = new AtomicReference<>();

 

    //重入次数

    private int count = 0;

 

    public void lock() {

        Thread t = Thread.currentThread();

        if (t == owner.get()) {

            ++count;

            return;

        }

        //自旋获取锁

        while (!owner.compareAndSet(null, t)) {

            System.out.println("自旋了");

        }

    }

 

    public void unlock() {

        Thread t = Thread.currentThread();

        //只有持有锁的线程才能解锁

        if (t == owner.get()) {

            if (count > 0) {

                --count;

            } else {

                //此处无需CAS操作,因为没有竞争,因为只有线程持有者才能解锁

                owner.set(null);

            }

        }

    }

 

    public static void main(String[] args) {

        ReentrantSpinLock spinLock = new ReentrantSpinLock();

        Runnable runnable = new Runnable() {

            @Override

            public void run() {

                System.out.println(Thread.currentThread().getName() + "开始尝试获取自旋锁");

                spinLock.lock();

                try {

                    System.out.println(Thread.currentThread().getName() + "获取到了自旋锁");

                    Thread.sleep(4000);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                } finally {

                    spinLock.unlock();

                    System.out.println(Thread.currentThread().getName() + "释放了了自旋锁");

                }

            }

        };

        Thread thread1 = new Thread(runnable);

        Thread thread2 = new Thread(runnable);

        thread1.start();

        thread2.start();

    }

}
  • 缺点

最大的缺点就在于虽然避免了线程切换的开销,但是它在避免线程切换开销的同时也带来了新的开销,因为它需要不停得去尝试获取锁。如果这把锁一直不能被释放,那么这种尝试只是无用的尝试,会白白浪费处理器资源。也就是说,虽然一开始自旋锁的开销低于线程切换,但是随着时间的增加,这种开销也是水涨船高,后期甚至会超过线程切换的开销,得不偿失。

  • 适用场景

自旋锁适用于并发度不是特别高的场景,以及临界区比较短小的情况,这样我们可以利用避免线程切换来提高效率

可是如果临界区很大,线程一旦拿到锁,很久才会释放的话,那就不合适用自旋锁,因为自旋会一直占用 CPU 却无法拿到锁,白白消耗资源

JVM 锁优化

相比于 JDK 1.5,在 JDK 1.6 中 HotSopt 虚拟机对 synchronized 内置锁的性能进行了很多优化,包括自适应的自旋、锁消除、锁粗化、偏向锁、轻量级锁等。有了这些优化措施后,synchronized 锁的性能得到了大幅提高,下面我们分别介绍这些具体的优化

自适应自旋锁

在 JDK 1.6 中引入了自适应的自旋锁来解决长时间自旋的问题。自适应意味着自旋的时间不再固定,而是会根据最近自旋尝试的成功率、失败率,以及当前锁的拥有者的状态等多种因素来共同决定。自旋的持续时间是变化的,自旋锁变“聪明”了。比如,如果最近尝试自旋获取某一把锁成功了,那么下一次可能还会继续使用自旋,并且允许自旋更长的时间;但是如果最近自旋获取某一把锁失败了,那么可能会省略掉自旋的过程,以便减少无用的自旋,提高效率

锁消除

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码
@Override

public synchronized StringBuffer append(Object obj) {

    toStringCache = null;

    super.append(String.valueOf(obj));

    return this;

}

从代码中可以看出,这个方法是被 synchronized 修饰的同步方法,因为它可能会被多个线程同时使用。

但是在大多数情况下,它只会在一个线程内被使用,如果编译器能确定这个 StringBuffer 对象只会在一个线程内被使用,就代表肯定是线程安全的,那么我们的编译器便会做出优化,把对应的 synchronized 给消除,省去加锁和解锁的操作,以便增加整体的效率。

粗化锁

如果我们释放了锁,紧接着什么都没做,又重新获取锁,例如下面这段代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码
public void lockCoarsening() {

    synchronized (this) {

        //do something

    }

    synchronized (this) {

        //do something

    }

    synchronized (this) {

        //do something

    }

}

其实这种释放和重新获取锁是完全没有必要的,如果我们把同步区域扩大,也就是只在最开始加一次锁,并且在最后直接解锁,那么就可以把中间这些无意义的解锁和加锁的过程消除,相当于是把几个 synchronized 块合并为一个较大的同步块。这样做的好处在于在线程执行这些代码时,就无须频繁申请与释放锁了,这样就减少了性能开销。

不过,我们这样做也有一个副作用,那就是我们会让同步区域变大。如果在循环中我们也这样做,如代码所示:

1
2
3
4
5
6
7
8
9
10
JAVA复制代码
for (int i = 0; i < 1000; i++) {

    synchronized (this) {

        //do something

    }

}

也就是我们在第一次循环的开始,就开始扩大同步区域并持有锁,直到最后一次循环结束,才结束同步代码块释放锁的话,这就会导致其他线程长时间无法获得锁。所以,这里的锁粗化不适用于循环的场景,仅适用于非循环的场景。

锁粗化功能是默认打开的,用 -XX:-EliminateLocks 可以关闭该功能。

偏向锁/轻量级锁/重量级锁

这三种锁是特指 synchronized 锁的状态的,通过在对象头中的 mark word 来表明锁的状态

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 线程正确打开方式

发表于 2021-11-09

「这是我参与11月更文挑战的第3天,活动详情查看:2021最后一次更文挑战」

前言

虽然关于 Java 线程的文章已经被写烂了。但是别人总结的文章只能是别人的知识点,虽然我眼睛看会了,但是脑子是记不住的。所以往后学了知识、看了文章最好还是总结下来才是自己的。

线程是操作系统调度的最小单位。

进程是操作系统分配资源的最小单位。

相信这两句话,大家应该是比较清楚的。这两句话理解也不难,就是从操作系统层面去理解线程与进程的关系。要想理解线程与进程的概念我推荐大佬的文章。

  • 廖雪峰-进程和线程
  • 阮一峰 进程与线程的一个简单解释

下面就总结一下 Java 中如何使用线程。

正文

线程实现

Java 中线程的实现有很多方式主要是一下两种:

  • 继承 Thread类,并重写 run方法
  • 实现 Runnable 接口的 run方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public class ThreadDemo {
public static void main(String[] args) {

Thread t1 = new ExtendThread();
t1.start();
Thread t2 = new Thread(new ImpRunnable());
t2.start();
}
}

class ExtendThread extends Thread {
@Override
public void run(){
System.out.println("Extends Thread success");
}
}

class ImpRunnable implements Runnable {

@Override
public void run() {
System.out.println("Implements Runnable success");
}
}

那么为什么对于线程有两种方式去初始化,继承Thread类或者实现Runnable接口有什么区别呢?

  • Thread 类是 Java 实现Runnable接口的严格封装,因此只有当修改或扩展时,才应该继承该类
  • Runnable 接口出现更符合面向对象思想(单继承多实现),创建线程更轻量化。

总结:推荐优先使用实现Runnable接口的方式来自定义线程类。

线程池

实际上创建线程的方式还有一种是通过 Callable 和 Future 创建。这种方式是为了解决线程运行没有返回值的情况,主要配合线程池使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码class ImpCallable implements Callable<Integer> {

public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建线程池
ExecutorService executor = Executors.newCachedThreadPool();
ImpCallable task = new ImpCallable();
// 获取任务结果
Future<Integer> result = executor.submit(task);
System.out.println("Task success and result is " + result.get());
}

@Override
public Integer call() throws Exception {
System.out.println("Implements Callable success");
return 100;
}
}

注意这里的 Executors 就是 Java 中用于创建线程池的工厂方法。返回的线程池都实现了 ExecutorService 接口。

关于线程池的初始化主要是 ThreadPoolExecutor 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
this.mainLock = new ReentrantLock();
this.workers = new HashSet();
this.termination = this.mainLock.newCondition();
if (corePoolSize >= 0 && maximumPoolSize > 0 && maximumPoolSize >= corePoolSize && keepAliveTime >= 0L) {
if (workQueue != null && threadFactory != null && handler != null) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
} else {
throw new NullPointerException();
}
} else {
throw new IllegalArgumentException();
}
}

主要的参数解释如下:

  • corePoolSize 核心线程数
  • maximumPoolSize 最大线程数,表示在线程池中最多能创建多少个线程
  • keepAliveTime 表示线程没有任务执行时最多保持多久时间会终止
  • unit keepAliveTime 时间单位
  • workQueue 阻塞队列,用于线程池中的线程数目达到corePoolSize后,缓存任务
  • threadFactory 线程工厂,主要用来创建线程
  • handler 拒绝策略,用户设置线程池无法处理任务的规则

关于 Java 中常见的四种线程池:

  • newCachedThreadPool 可缓存线程池,可灵活回收空闲线程
  • newFixedThreadPool 定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
  • newScheduledThreadPool 定长线程池,支持定时及周期性任务执行。
  • newSingleThreadExecutor 单线程化的线程池,单线程来执行任务。保证所有任务顺序执行。

《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

参考资料

  • Java 并发编程:线程池的使用
  • 深入浅出 Java 多线程
  • Java并发(三)线程池原理

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…389390391…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%