开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Redis 实现单位时间内限制频率功能

发表于 2021-11-02

1.使用场景

1.重要日志每小时触发次数。
2.限制登录次数。
3.分布式部署情况下的上述情况…

2.代码

示例中为限制1小时内15次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码/**
* 每个人每小时限制触发次数
*/
private final int LOCK_TIMES = 15;

// 自定义Key
String key = "Ltz:" + sid;
// 获取已触发次数
String sendUser = redisTemplate.opsForValue().get(key);

// 为空,说明之前没有触发过
if (ObjectUtil.isEmpty(sendUser)) {
// 空,说明是第一次,写入1,并且设置过期时间(1小时)
redisTemplate.opsForValue().set(key, "1", 1, TimeUnit.HOURS);
} else {
// 后续发送
if (Integer.parseInt(sendUser) >= LOCK_TIMES) {
// 超过限制次数
// TODO
}
redisTemplate.opsForValue().increment(key);
}

3.一个坑

使用redisTemplate时,需要在使用时,注入RedisTemplate<String, String> redisTemplate。

如果使用RedisTemplate<String, Object>会报如下错。
1
sql复制代码ERR value is not an integer or out of range

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

浅谈在线广告分配策略

发表于 2021-11-02

来自公众号:[Gopher指北](https://isites.gitlab.io/gopher/)

在线广告,也称网络广告、互联网广告,顾名思义,指的是在线媒体上投放的广告。平时我们在刷信息流、短视频、新闻和微博均可以看见它的影子。对于比较大的广告平台,用户定向后依旧会有大量的广告可以下发,而从大量的广告中选择合适的广告展现给用户就是本篇要讨论的主题——在线广告分配策略。

名词描述

为了更好的理解本文,先提前做一些名词描述。

eCPM(Effective Cost Per Mille): 指的是每一千次展示可以获得的广告收入。此指标反映盈利能力,不代表实际收入。不同的广告主会选择CPC、CPM等不同出价方式,因此广告分配时无法以纯粹的出价进行比较,所以才有了ecpm这一指标用于评估不同出价方式的广告可以给广告平台带来的收益。

定向广告:所谓”定向”实际上是对受众的筛选,即广告的显示是根据访问者来决定的,先进的广告管理系统 能够提供多种多样的定向方式。

最好的一定合适嘛

对于广告平台而言收益最大化是优先事项。为了保证收益最大化,对于每一次广告请求我们都选择ecpm最高的广告下发。这个逻辑从理论上来看是正确的,但在实际中就不一定了,那么它到底会有什么问题呢?

  1. ⼴告消耗超预算限额。
  2. 广告预算消耗不尽。
  3. 空结果问题。
  4. 部分广告消耗过快影响广告主投放体验和用户产品体验。

问题分析

问题1

对问题1进行分析时,我们需要先有这样一个共识,广告的点击、曝光等数据上报有一定的延时。

由于广告分配策略未考虑预算消耗信息,当消耗接近预算限额时未能及时减缓曝光速度,导致本应分配给其他广告主的流量依旧分配给了预算受限的广告主,这是对广告平台流量的浪费(流量越大的平台浪费会愈加严重)。

问题2

部分中小广告主竞争力弱(出价低),很难获取足够的曝光量,这种情形当广告充裕时尤为明显。

问题3

一方面可能是因为广告资源不足,另外一方面也有可能是定向广告消耗过快(详见下面的例子)。

问题4

广告按照ecpm排序,会导致广告消耗速度差异较大直接影响广告主的投放体验,甚至于用户反复看到重复的广告直接影响用户产品体验,再反过来影响到广告的CVR。

为了进一步说明纯按价高者得这一算法的不足之处,请看下面的特殊例子。

广告 出价($) 预算($) 定向
A 0.5 100 男,游戏
B 1 100 男,游戏,运动

以上述广告为例,现有男,游戏和男,运动请求各100。理想最大收益为150$,但是按照上述策略分配广告时,会出现男,游戏这100请求先到达时优先消耗B广告,男,运动`这100请求达到时无广告可消耗。按照ecpm排序的算法又称为Greedy算法,该算法会让高价值广告快速消耗。

合适的才是最好的

Balance算法

与Greedy算法不同的是,Kalyanasundaram和Pruhs提出的Balance算法忽略单个bidder的出价,尽可能平衡所有bidder的预算消耗,使得其在线时间尽可能⻓,即尽量使得所有⼴告都保持匀速投放。其算法描述如下:

1
2
3
4
5
6
kotlin复制代码当一个满足一些定向广告的请求到达时:
if 广告预算消耗完 {
continue
} else {
选择一个(消耗/预算)值最小的一个广告
}

相比贪心算法,Balance算法平衡所有广告的消耗速度,能够有效解决贪心算法广告快速消耗的问题,但在广告消耗不尽的问题上依旧不是最佳解决方案。我们看下面特殊例子:

广告 出价($) 预算($) 定向
A 1 100 男,游戏
B 0.01 100 男,游戏,运动

以上述广告为例,现有男,游戏和男,运动请求各100。理想对最大收益为110$,根据balance算法其总的预算消耗仅为几美元。当男,游戏这100请求先到达时,B广告一定会先消耗完,当男,运动`100请求到达时依旧会无广告可消耗。

那Balance算法适用场景到底是什么,下面我们以极限法来考虑这个问题。

假设一:如果广告A和广告B的出价分别为1000$和1$(CPC)

很明显,广告A具有更大的优势理应优先展示。根据前面的例子,Balance算法是无法解决这种极值场景的,而Greedy算法则充分兼顾了平台的利益以及广告主急切花钱的心情。

假设二:如果所有广告出价分别为10$(CPC)

Greedy算法是和出价有关的,而Balance算法仅和预算有关。根据控制变量法很容易知道Balance算法正是为了这种场景而生。

小结:根据前面的假设以及论文中的描述我们总结如下结论:

  • Balance算法更适用于广告出价比较接近的场景
  • Greedy算法则比较适用于广告出价差异比较大的场景

MSVV算法

只有小孩子才做选做题,我们成年人全都要。Balance算法和Greedy算法各有优劣且适用场景不同,那有没有算法能够融合两者的优点呢?这正式MSVV算法的思路。

为了更清楚描述新算法,先给出一些基本定义:

  • 消耗比例:'消耗/总预算'并记作 υ\upsilonυ。总预算由广告主设置,并且可动态调整。
  • 权衡函数:Ψ(υ)=1−e(υ−1)\Psi(\upsilon) = 1 - e^{(\upsilon-1)}Ψ(υ)=1−e(υ−1)
  • 缩放出价:出价∗Ψ(υ)出价 * \Psi(\upsilon)出价∗Ψ(υ)

算法描述如下:

1
2
3
4
5
6
go复制代码当一个满足一些定向广告的请求到达时:
if 广告预算消耗完 {
continue
} else {
选择一个`缩放出价`值最大的广告
}

上述的权衡函数为一个单调下降的函数,且υ\upsilonυ取值范围为[0,1]。权衡函数分布图如下:

当所有广告出价相等时,由于权衡函数是一个单调下降的函数,因此MSVV算法就正好退化成Balance算法。另一方面,如果出价差异非常大时,MSVV算法在大多数情况都不会改变出价的顺序,此时MSVV表现更接近Greedy算法。考虑更极端的情况,当所有广告预算都是无限时,MSVV算法直接退化为Greedy算法,因为此时权衡函数为常量1−1e1 - {\frac 1 e}1−e1​。

为了验证MSVV算法的适应性,我们看下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
go复制代码type ad struct {
cost float64
total float64
price float64
}

func scaled(price, cost, total float64) float64 {
return price * (1.0 - math.Pow(math.E, cost/total-1))
}

func main() {
a := &ad{0, 100, 1}
b := &ad{0, 100, 0.01}
// 模拟`男,游戏`到达时,a和b同时消耗
for i := 0; i < 100; i++ {
aCp := scaled(a.price, a.cost, a.total)
bCp := scaled(b.price, b.cost, b.total)
if a.cost >= a.total || b.cost >= b.total {
break
}
if aCp >= bCp {
a.cost += a.price
} else {
b.cost += b.price
}
}
// 模拟`男,运动`到达时,仅b可消耗
for i := 0; i < 100; i++ {
if b.cost >= b.total {
break
}
b.cost += b.price
}
fmt.Println(a.cost + b.cost)
}

MSVV算法在前面的极值例子中收益分别为116.5和101,其整体表现基本符合预期。

总结

现在回顾前面的问题,消耗过快以及减缓曝光速度都在Balance算法的射程内(广告资源不足只有通过其他手段解决了)。从广告平台的收益角度考虑Greedy算法更佳。那么结合两者优点的MSVV算法可谓是每个广告平台居家旅行之必备利器。

在线广告老许也是初次接触,而且正在努力储备知识,以期日后可持续发展。如果文中有不正确的地方欢迎各位读者指正和交流。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【微服务的绊脚石--分布式事务】 Seata-AT模式深入分

发表于 2021-11-02

前言

在上一篇[微服务的绊脚石–分布式事务] SEATA解决方案介绍中,介绍了微服务架构的问题之一:分布式事务,以及业界常见的解决方案。这一篇,针对我和身边同事在学习Seata过程中遇到的各种问题,结合当前最新的版本Seata 1.4.2的代码实现,跟大家一起深入了解一下Seata。主要的问题有:

  1. Seata的核心竞争力是什么?
  2. Seata内部有哪些模块?
  3. AT 与 XA 方案有什么不同?
  4. 具体Commit/Rollback流程是怎样的?
  5. AT模式是如何初始化的?
  6. RM为什么不需要@GlobalTransactional?
  7. @GlobalLock有什么用?
  8. 为什么ExceptionHandler会导致全局事务失效?

Seata主要的竞争力是什么?

上一篇中我们介绍过2PC/XA,TCC和SAGA方案。2PC/XA的优点是对业务代码无侵入,但是它的缺点也是很明显:必须要求数据库对 XA 协议的支持,且由于 XA 协议自身的特点,它会造成事务资源长时间得不到释放,锁定周期长,而且在应用层上面无法干预,因此性能很差,属于杀敌一千自损八百。而TCC和SAGA方案都是业务侵入式的,提交逻辑的实现必然伴随着回滚逻辑(或者补偿逻辑)的实现,这样的代码会变得非常臃肿,维护成本高。

据阿里工程师介绍,AT模式作为 Seata的默认模式,虽然也是类似于 XA 方案的两段式提交方案,但一开始就是冲着业务无侵入性与高性能方向走,这正是我们对解决分布式事务问题迫切的需求。

Seata内部有哪些模块?

Seata-AT 的设计思路是将一个分布式事务作为一个全局事务,在下面挂若干个分支事务,而一个分支事务是一个满足 ACID 的本地事务,因此我们可以像操作本地事务一样操作分布式事务。

Seata 内部定义了 3个模块来处理全局事务和分支事务,这三个组件分别是:

  • Transaction Coordinator (TC):事务协调器,维护全局事务的运行状态,独立部署,负责协调并驱动全局事务的提交或回滚。
  • Transaction Manager (TM):控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
  • Resource Manager (RM):控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

image.png

全局事务的执行步骤:

  1. TM 向 TC 申请开启一个全局事务,TC 创建全局事务后返回全局唯一的 XID,XID 会在全局事务的上下文中传播
  2. RM 向 TC 注册分支事务,该分支事务归属于拥有相同 XID 的全局事务;
  3. TM 向 TC 发起全局提交或回滚;
  4. TC 调度 XID 下的分支事务完成提交或者回滚。

AT 与 XA 方案有什么不同?

Seata 的事务提交方式跟 XA 协议的两段式提交在总体思路上来说基本是一致的,那它们之间有什么不同呢?

我们都知道 XA 协议它依赖的是数据库层面来保障事务的一致性,也即是说 XA 的各个分支事务是在数据库层面上驱动的,由于 XA 的各个分支事务需要有 XA 的驱动程序,一方面会导致数据库与 XA 驱动耦合,另一方面它会导致各个分支的事务资源锁定周期长,这也是它没有在互联网公司流行的重要因素。

基于 XA 协议以上的问题,Seata 另辟蹊径,既然在依赖数据库层会导致这么多问题,那我就从应用层做手脚,这还得从 Seata 的 RM 模块说起,前面也说过 RM 的主要作用了,其实 RM 在内部做了对数据库对象,如DataSource, Connection, Statement做了一层代理。

image.png

Seata 对数据源做了代理,所以我们在使用 Seata 时,实际上用的数据源是 Seata 自带的数据源代理 DataSourceProxy,Seata 在这层代理中加入了很多逻辑,主要是解析 SQL,把业务数据在更新前后的数据镜像组织成回滚日志,并将 UndoLog 日志插入 undo_log 表中,保证每条更新数据的业务 sql 都有对应的回滚日志存在。

这样做的好处就是,本地事务执行完可以立即释放本地事务锁定的资源,然后向 TC 上报分支状态。当 TM 决定全局提交时,就不需要同步协调处理了,TC 会异步调度各个 RM 分支事务删除对应的 UndoLog 日志即可,这个步骤非常快速地可以完成;当 TM 决议全局回滚时,RM 收到 TC 发送的回滚请求,RM 通过 XID 找到对应的 UndoLog 回滚日志,然后执行回滚日志完成回滚操作。

image.png
如上图所示,XA 方案的 RM 是放在数据库层的,它依赖了数据库的 XA 驱动程序。

image.png

而Seata 的 RM 实际上是已中间件的形式放在应用层,不用依赖数据库对协议的支持,完全剥离了分布式事务方案对数据库在协议支持上的要求。

具体Commit/Rollback流程是怎样的?

image.png

概括来讲,AT 模式的工作流程分为两个阶段。一阶段进行业务 SQL 执行,并通过 SQL 拦截、SQL 改写等过程生成修改数据前后的快照(Image),并作为 UndoLog 和业务修改在同一个本地事务中提交。

如果一阶段成功那么二阶段仅仅异步删除刚刚插入的 UndoLog;如果二阶段失败则通过 UndoLog 生成反向 SQL 语句回滚一阶段的数据修改。其中关键的 SQL 解析和拼接工作借助了 Druid Parser 中的代码,这部分本文并不涉及,感兴趣的小伙伴可以去翻看源码,并不是很复杂。

下面,我们以上一篇中的order_tbl为例来说明整个 AT 分支的工作过程。

业务表:order_tbl

Field Type Key
id int PRI
user_id varchar(255)
commodity_code varchar(255)
count int
money int

AT 分支事务的业务逻辑:

1
sql复制代码insert into order_tbl values (12, '1002', '2001', 1, 5);

一阶段

image.png

过程:

  1. 解析 SQL:得到 SQL 的类型(INSERT),表(order_tbl),条件等相关的信息。查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。如果是更新数据,会根据更新语句的条件,生成如下SQL查询前镜像。
1
bash复制代码select id, user_id, commodity_code, count, money from product where id = 12;
  1. 执行业务 SQL:插入这条记录。
  2. 查询后镜像:根据前镜像的结果,通过 主键 定位数据。
1
bash复制代码select id, user_id, commodity_code, count, money from product where id = 12;

得到后镜像:

id user_id commodity_code count money
1 1002 2001 1 5
  1. 插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG 表中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
less复制代码{
"@class": "io.seata.rm.datasource.undo.BranchUndoLog",
"xid": "172.27.0.2:8091:18207193960247322",
"branchId": 18207193960247324,
"sqlUndoLogs": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType": "INSERT",
"tableName": "order_tbl",
"beforeImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords",
"tableName": "order_tbl",
"rows": [
"java.util.ArrayList",
[]
]
},
"afterImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "order_tbl",
"rows": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PRIMARY_KEY",
"type": 4,
"value": 12
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "user_id",
"keyType": "NULL",
"type": 12,
"value": "1002"
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "commodity_code",
"keyType": "NULL",
"type": 12,
"value": "2001"
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "count",
"keyType": "NULL",
"type": 4,
"value": 1
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "money",
"keyType": "NULL",
"type": 4,
"value": 5
}
]
]
}
]
]
}
}
]
]
}
  1. 提交前,向 TC 注册分支:申请 order_tbl 表中,主键值等于 12 的记录的 全局锁 。
  2. 本地事务提交:业务数据的更新和前面步骤中生成的 UndoLog 一并提交。
  3. 将本地事务提交的结果上报给 TC。

二阶段-回滚

image.png

  1. 收到 TC 的分支回滚请求,开启一个本地事务,通过 XID 和 Branch ID 查找到相应的 UndoLog 记录。
  2. 数据校验:拿 UndoLog 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理。
  3. 根据 UndoLog 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:
1
ini复制代码delete from order_tbl where id = 12;
  1. 删除UndoLog。
  2. 提交本地事务。
  3. 把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。

二阶段-提交

image.png

  1. 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。查找UndoLog。
  2. 批量地删除相应 UndoLog 记录。
  3. 开始提交本地事务。
  4. 向TC汇报本地事务结果。

AT模式是如何初始化的?

整个全局事务是有TM负责开启的,上一篇的代码中是在BusinessService中开始全局事务的,我们注意到这里有一个@GlobalTransactional注解,详细的io.seata.spring.annotation.GlobalTransactional的代码可以参考这里。

1
2
3
4
5
6
7
java复制代码//BusinessService
@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
storageClient.deduct(commodityCode, orderCount);
orderClient.create(userId, commodityCode, orderCount);
}

在同一个包下,还有一个io.seata.spring.annotation.GlobalTransactionScanner,它继承了org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator,在它的initClient()方法中,对TM和RM进行了初始化。

1
2
3
4
5
6
7
8
9
10
java复制代码    private void initClient() {
// ...
//init TM
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
// ...
//init RM
RMClient.init(applicationId, txServiceGroup);
// ...
registerSpringShutdownHook();
}

TM的初始化

在TMClient 的init 方法中获取了io.seata.core.rpc.netty.TmNettyRemotingClient的实例,用于处理各种与服务端的消息交互。

1
2
3
4
5
java复制代码// TMClient
public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);
tmNettyRemotingClient.init();
}

TmNettyRemotingClient继承了io.seata.core.rpc.netty.AbstractNettyRemotingClient,在AbstractNettyRemotingClient的init方法中,设置了定时任务用于定时重发 RegisterTMRequest(RM 客户端会发送 RegisterRMRequest)请求尝试连接服务端,具体逻辑是: NettyClientChannelManager 中的 channels 中缓存了客户端 channel,如果此时 channels 不存在或者已过期,那么就会尝试连接服务端以重新获取 channel 并将其缓存到 channels 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码@Override
public void init() {
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
}

RM的初始化

在RMClient的init方法中,RmNettyRemotingClient.getInstance 处理逻辑与 TM 大致相同;ResourceManager 是 RM 资源管理器,负责分支事务的注册、提交、上报、以及回滚操作,以及全局锁的查询操作,DefaultResourceManager 会持有当前所有的 RM 资源管理器,进行统一调用处理。

TransactionMessageHandler 是 RM 消息处理器,用于负责处理从 TC 发送过来的指令,并对分支进行分支提交、分支回滚,以及 UndoLog 删除操作;最后 init 方法跟 TM 逻辑也大体一致;DefaultRMHandler 封装了 RM 分支事务的一些具体操作逻辑。

1
2
3
4
5
6
java复制代码public static void init(String applicationId, String transactionServiceGroup) {
RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
rmNettyRemotingClient.init();
}

添加拦截器

在GlobalTransactionScanner的wrapIfNecessary方法中,会扫描带有@GlobalTransactional,@GlobalLock等注解的方法,并添加对应的拦截器。

  1. 判断是否存在对应的注解
  2. 创建拦截器
  3. 将拦截器添加到目标对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
java复制代码/**
* The following will be scanned, and added corresponding interceptor
*/
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

//#### <1> ####
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}

if (globalTransactionalInterceptor == null) {
//#### <2> ####
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}

LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
//#### <3> ####
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}

事务处理

以全局事务@GlobalTransactional为例,在io.seata.spring.annotation.GlobalTransactionalInterceptor的invoke方法中,handleGlobalTransaction方法,在该方法中又调用了io.seata.tm.api.TransactionalTemplate的execute方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
return methodInvocation.proceed();
}

Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
boolean succeed = true;
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
// ...
});
}
// ...
}

在TransactionalTemplate的execute方法中执行了具体事务处理,比如开启事务、提交、回滚等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
java复制代码public Object execute(TransactionalExecutor business) throws Throwable {
// 1. Get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
GlobalTransaction tx = GlobalTransactionContext.getCurrent();

// 1.2 Handle the transaction propagation.
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
// ...

// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}

// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
beginTransaction(txInfo, tx);

Object rs;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}

// 4. everything is fine, commit.
commitTransaction(tx);

return rs;
} finally {
//5. clear
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally {
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}

RM为什么不需要@GlobalTransactional?

在上一篇的代码示例中,我们只在TM端的BusinessService 方法上添加了@GlobalTransactional注解,而在下游微服务中并没有添加任何注解,为什么也可以当做全局事务处理呢?

事务上下文

我们先来看看 Seata 的事务上下文,它是由 RootContext 来管理的。

应用开启一个全局事务后,RootContext 会自动绑定该事务的 XID,事务结束(提交或回滚完成),RootContext 会自动解绑 XID。

1
2
3
4
5
ini复制代码// 绑定 XID
RootContext.bind(xid);

// 解绑 XID
String xid = RootContext.unbind();

应用可以通过 RootContext 的 API 接口来获取当前运行时的全局事务 XID。

1
2
ini复制代码// 获取 XID
String xid = RootContext.getXID();

应用是否运行在一个全局事务的上下文中,就是通过 RootContext 是否绑定 XID 来判定的。

1
2
3
csharp复制代码    public static boolean inGlobalTransaction() {
return CONTEXT_HOLDER.get(KEY_XID) != null;
}

事务传播

Seata 全局事务的传播机制就是指事务上下文的传播,根本上,就是 XID 的应用运行时的传播方式。

1. 服务内部的事务传播

默认的,RootContext 的实现是基于 ThreadLocal 的,即 XID 绑定在当前线程上下文中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码public class ThreadLocalContextCore implements ContextCore {

private ThreadLocal<Map<String, String>> threadLocal = new ThreadLocal<Map<String, String>>() {
@Override
protected Map<String, String> initialValue() {
return new HashMap<String, String>();
}

};

@Override
public String put(String key, String value) {
return threadLocal.get().put(key, value);
}

@Override
public String get(String key) {
return threadLocal.get().get(key);
}

@Override
public String remove(String key) {
return threadLocal.get().remove(key);
}
}

所以服务内部的 XID 传播通常是天然的通过同一个线程的调用链路串连起来的。默认不做任何处理,事务的上下文就是传播下去的。

如果希望挂起事务上下文,则需要通过 RootContext 提供的 API 来实现:

1
2
3
4
5
6
7
scss复制代码// 挂起(暂停)
String xid = RootContext.unbind();

// TODO: 运行在全局事务外的业务逻辑

// 恢复全局事务上下文
RootContext.bind(xid);

2. 跨服务调用的事务传播

通过上述基本原理,我们可以很容易理解:

跨服务调用场景下的事务传播,本质上就是要把 XID 通过服务调用传递到服务提供方,并绑定到 RootContext 中去。

只要能做到这点,理论上 Seata 可以支持任意的微服务框架。

我们注意到,在Common模块的SeataFilter中,从Http请求Header中获取了全局事务ID XID,并将其设置到了事务上下文RootContext中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest req = (HttpServletRequest) servletRequest;
String xid = req.getHeader(RootContext.KEY_XID.toLowerCase());
boolean isBind = false;
if (StringUtils.isNotBlank(xid)) {
RootContext.bind(xid);
isBind = true;
}
try {
filterChain.doFilter(servletRequest, servletResponse);
} finally {
if (isBind) {
RootContext.unbind();
}
}
}

而在SeataRestTemplateInterceptor中,先从RootContext获取了XID,然后设置到了Http请求头中,这样下游的RM就能通过SeataFilter获取到XID了。

1
2
3
4
5
6
7
8
9
10
java复制代码@Override
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
String xid = RootContext.getXID();
if (StringUtils.isNotEmpty(xid)) {
requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
}

return clientHttpRequestExecution.execute(requestWrapper, bytes);
}

@GlobalLock有什么用?

前面初始化部分的代码中,也出现了对@GlobalLock的处理,那么它的用法跟 @GlobalTransactional 有什么区别呢?

如果是用 @GlobalLock 修饰的业务方法,虽然该方法并非某个全局事务下的分支事务,但是它对数据资源的操作也需要先查询全局锁,如果存在其他 Seata 全局事务正在修改,则该方法也需等待。所以,如果想要 Seata 全局事务执行期间,数据库不会被其他事务修改,则该方法需要强制添加 GlobalLock 注解,来将其纳入 Seata 分布式事务的管理范围。

功能有点类似于 Spring 的 @Transactional 注解,如果你希望开启事务,那么必须添加该注解,如果你没有添加那么事务功能自然不生效,业务可能出 BUG;Seata 也一样,如果你希望某个不在全局事务下的 SQL 操作不影响 AT 分布式事务,那么必须添加 GlobalLock 注解。

全局锁的组成和作用

Seata AT 模式的全局锁主要由表名加操作行的主键两个部分组成,通过在服务端保存全局锁的方法保证:

  1. 全局事务之前的写隔离
  2. 全局事务与被 GlobalLock 修饰方法间的写隔离性

全局锁的注册

当客户端在进行一阶段本地事务提交前,会先向服务端注册分支事务,此时会将修改行的表名、主键信息封装成全局锁一并发送到服务端进行保存,如果服务端保存时发现已经存在其他全局事务锁定了这些行主键,则抛出全局锁冲突异常,客户端循环等待并重试。

全局锁的查询

被 @GlobalLock 修饰的方法虽然不在某个全局事务下,但是其在提交事务前也会进行全局锁查询,如果发现全局锁正在被其他全局事务持有,则自身也会循环等待。

全局锁的释放

由于二阶段提交是异步进行的,当服务端向客户端发送 branch commit 请求后,客户端仅仅是将分支提交信息插入内存队列即返回,服务端只要判断这个流程没有异常就会释放全局锁。因此,可以说如果一阶段成功则在二阶段一开始就会释放全局锁,不会锁定到二阶段提交流程结束。

VTh76.png

但是如果一阶段失败二阶段进行回滚,则由于回滚是同步进行的,全局锁直到二阶段回滚完成才会被释放。

为什么ExceptionHandler会导致全局事务失效?

在SpringBoot项目中,我们经常使用@ControllerAdvice来构造ExceptionHandler,用于处理各种异常。有的时候会因此导致全局事务无法回滚,这是为什么呢?

以本文的 ApiExceptionHandler为例,

首先我们来看TM的执行,上文我们提到实际的全局事务处理,比如开启全局事务,提交,回滚,是在TransactionalTemplate的execute方法中实现的,所以为了能够回滚,就必须保证业务处理business.execute()会抛出异常。

1
2
3
4
5
6
7
8
java复制代码try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}

本文的样例代码是使用RestTemplate来调用其他服务的API,查看RestTemplate相关的代码发现,具体的错误处理实在org.springframework.web.client.DefaultResponseErrorHandler中完成的。

1
2
3
4
5
6
7
8
9
java复制代码public void debit(String userId, BigDecimal orderMoney) {
String url = "http://127.0.0.1:8083?userId=" + userId + "&orderMoney=" + orderMoney;
try {
restTemplate.getForEntity(url, Void.class);
} catch (Exception e) {
log.error("debit url {} ,error:", url, e);
throw new RuntimeException();
}
}

在DefaultResponseErrorHandler的 hasError方法中判断返回的Response是否有错,如果有错就会调用handleError 方法。如果HTTP响应的状态是4xx或者5xx,就会被判定为有错,然后在handleError 方法中抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public boolean isError() {
return (is4xxClientError() || is5xxServerError());
}

protected void handleError(ClientHttpResponse response, HttpStatus statusCode) throws IOException {
String statusText = response.getStatusText();
HttpHeaders headers = response.getHeaders();
byte[] body = getResponseBody(response);
Charset charset = getCharset(response);
String message = getErrorMessage(statusCode.value(), statusText, body, charset);

switch (statusCode.series()) {
case CLIENT_ERROR:
throw HttpClientErrorException.create(message, statusCode, statusText, headers, body, charset);
case SERVER_ERROR:
throw HttpServerErrorException.create(message, statusCode, statusText, headers, body, charset);
default:
throw new UnknownHttpStatusCodeException(message, statusCode.value(), statusText, headers, body, charset);
}
}

所以,在下游服务出现异常需要Rollback时,如果是使用RestTemplate来调用下游API,那么一定要保证,返回的HTTP状态是4xx或者5xx。如果使用其他方式调用API,也需要保证出错信息能反馈到TM端,并在TM端抛出异常。

参考

seata.io/zh-cn/docs/…

seata.io/zh-cn/blog/…

mp.weixin.qq.com/s/Pypkm5C9a…

chenjiayang.me/2019/06/29/…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

在 applicationyml 中配置 map,list

发表于 2021-11-02

map

Map<String, String>

config.java

1
java复制代码private Map<String, String> mapStr;

application.yml

1
2
3
yaml复制代码map-str:
a: aaa
b: bbb

application.properties

1
2
properties复制代码map-str.a=aa
map-str.b=bb

测试

1
2
3
4
5
6
7
8
java复制代码@Autowired
private Config config;

@Test
public void testConfig() {
// {a=aaa, b=bbb}
System.out.println(config.getMapStr());
}

Map<String, Model>

config.java

1
2
3
4
5
6
7
java复制代码public Map<String, MapConfig> mapConfig;

@Data
public static class MapConfig {
private String key;
private String val;
}

application.yml

1
2
3
4
5
6
7
yaml复制代码map-config:
map-one:
key: one-key
val: one-val
map-two:
key: two-key
val: two-val

application.properties

1
2
3
4
properties复制代码map-config.map-one.key=one-key
map-config.map-one.val=one-val
map-config.map-two.key=two-key
map-config.map-two.val=two-val

测试

1
2
3
4
5
6
7
8
java复制代码@Autowired
private Config config;

@Test
public void testConfig() {
// {map-one=Config.MapConfig(key=one-key, val=one-val), map-two=Config.MapConfig(key=two-key, val=two-val)}
System.out.println(config.getMapStr());
}

List

List

config.java

1
java复制代码private List<String> listStr;

application.yml

1
2
3
yaml复制代码list-str:
- a
- b

application.properties

1
2
properties复制代码list-str[0]=a
list-str[1]=b

测试

1
2
3
4
5
6
7
8
java复制代码@Autowired
private Config config;

@Test
public void testConfigGroup() {
// [a, b]
System.out.println(config.getListStr());
}

List

config.java

1
2
3
4
5
6
7
java复制代码private List<User> listObj;

@Data
public static class User {
String age;
String name;
}

application.yml

1
2
3
4
5
yaml复制代码list-obj:
- age: 23
name: 张三
- age: 20
name: 李四

application.properties

1
2
3
4
properties复制代码list-obj[0].age=23
list-obj[0].name=张三
list-obj[1].age=20
list-obj[1].name=李四

测试

1
2
3
4
5
6
7
8
java复制代码@Autowired
private Config config;

@Test
public void testConfigGroup() {
// [Config.User(age=23, name=张三), Config.User(age=20, name=李四)]
System.out.println(config.getListObj());
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

springboot集成xxl-job

发表于 2021-11-02

1.下载xxl-job-admin源码

1
2
3
ruby复制代码下载地址: https://github.com/xuxueli/xxl-job/releases
文档地址: https://www.xuxueli.com/xxl-job/
本人下载版本为2.3.0

2.执行数据库脚本

1
复制代码脚本存放路径: xxl-job-2.3.0\doc\db\tables_xxl_job.sql

3.修改配置文件

1
2
css复制代码配置文件路径: xxl-job-2.3.0\xxl-job-admin\src\main\resources\application.properties
修改数据库配置和邮箱配置,如下:

image.png

4.启动xxl-job-admin项目

1
css复制代码启动类路径: xxl-job-2.3.0\xxl-job-admin\src\main\java\com\xxl\job\admin\XxlJobAdminApplication.java

5.浏览器查看

1
2
bash复制代码地址: http://localhost:8080/xxl-job-admin
用户名 admin 密码 123456

image.png

6.引入maven配置

1
复制代码在另一个springboot应用中添加maven配置
1
2
3
4
5
xml复制代码<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>

7.配置文件修改

1
复制代码application.yml

image.png

8.编写配置类XxlJobConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
kotlin复制代码
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Value("${xxl.job.executor.appname}")
private String appName;

@Value("${xxl.job.executor.ip}")
private String ip;

@Value("${xxl.job.executor.port}")
private int port;

@Value("${xxl.job.accessToken}")
private String accessToken;

@Value("${xxl.job.executor.logpath}")
private String logPath;

@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;

@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appName);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

return xxlJobSpringExecutor;
}

}

9.编写测试任务类

1
2
3
4
5
6
7
8
9
10
11
12
13
scala复制代码import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;

@Component
public class SampleXxlJobHandler extends IJobHandler {

@Override
@XxlJob(value = "sampleXxlJobHandler")
public void execute() throws Exception {
System.out.println("自动任务" + this.getClass().getSimpleName() + "执行");
}
}

10.新增执行器

1
复制代码在执行器管理页面,点击新增按钮,弹出新增框。输入AppName (与第7点中配置的appname保持一致),名称,注册方式默认自动注册,点击保存。

image.png

11.新增自动任务

1
复制代码在任务管理页面,选择上一步新建的执行器,点击新增按钮,弹出新增框。输入任务描述,负责人,cron表达式(输入框右侧有按钮,点开可以辅助自动生成cron表达式),jobHandler(需要与第9步中的XxlJob注解中的值一致),点击保存。

image.png

12.执行自动任务

1
2
3
复制代码启动配置自动任务的springboot服务,在任务管理页面选中要执行的自动任务操作按钮右侧下拉三角,选择执行一次,查看执行结果。
正常开发中,点击启动保持任务一直执行。
在调度日志页面,可以搜索查看日志。

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

如何打造实时性的弹窗?

发表于 2021-11-02
  1. 前言

在 App 的运营活动中,对用户进行弹窗提示,是一种常见的运营方式。例如:用户已经下单但未付款的时候,可以给用户一个优惠券的弹窗提示。

神策 Android 弹窗 SDK[1] 主要针对的就是上述运营场景,运营人员可以在神策智能运营中配置弹窗的 UI 以及触发弹窗的一些条件,当用户满足配置的条件时,集成了弹窗 SDK 的 App 会展示弹窗。UI 效果如图 1-1 所示:

图 1-1 弹窗 UI 效果图

  1. 弹窗的实时性

在很多场景下,弹窗需要很高的实时性。如果弹窗的计算规则通过后端处理,符合条件时再下发给客户端,实时性将得不到保障。

为了解决这一问题,把计算逻辑等放在了客户端。简单来说,就是触发埋点事件之后会判断是否触发弹窗。

此外,弹窗 SDK 为了保证实时性,也做了很多工作,下面就逐一为大家进行介绍。

  1. 方案演进

3.1. 方案一:共用埋点数据采集线程

触发弹窗的时机,取决于运营同学在神策智能运营中的配置,那弹窗 SDK 如何来决定是否弹窗呢?

举个例子,运营同学的配置条件是:有商品页面的浏览数据就弹窗。因此,在 App 端监控到有商品页面浏览的埋点数据产生就会弹窗。

埋点数据采集任务是在埋点数据采集线程中执行的。因此,最初的想法是在埋点数据采集线程中监控埋点数据,如果有符合弹窗条件的数据,那么就展示弹窗。示例代码如下:

1
2
3
4
scss复制代码//判断是否弹窗
PlanManager.ensureShowDialog(data);
//数据缓存到数据库
enqueueEventMessage(data);

这种做法很简单,也能满足需求。它的优点如下:

  1. 从代码上来看,逻辑比较清晰;
  2. 判断是否弹窗和埋点数据采集都是在一个线程,方便维护。

但是,它的缺点也很明显:

  1. 如果在判断是否弹窗这一步,有阻塞或者异常,那么会影响埋点数据缓存到数据库;
  2. 耦合度非常高。现在是弹窗的业务需要监控数据,如果将来其他业务也需要监控数据,那么在埋点数据缓存之前,还需要增加更多的业务逻辑。

为了解决上述问题,我们拆分了埋点数据采集线程和弹窗判断线程。

3.2. 方案二:拆分埋点数据采集线程和弹窗判断线程

考虑到共用埋点数据采集线程的缺点,我们做了线程的拆分,如图 3-1 所示:

图 3-1 线程拆分示意图

此时,埋点数据采集线程和弹窗判断线程,是两个独立的线程。当埋点数据采集线程有新的数据时,会主动通知弹窗判断线程,让其处理弹窗业务。

这样做不仅降低了耦合度,并且弹窗业务不会影响到埋点数据采集。即使最极端的情况,比如弹窗判断线程因为某些原因出现了异常,埋点数据采集线程仍然能正常工作。

埋点数据采集线程只需要根据接口,回调给数据接收端就行,示例代码如下:

1
2
3
4
scss复制代码// 监控数据,并传给注册接口的地方
DataMonitorInterface.trackEvent(data);
// 数据缓存到数据库
mMessages.enqueueEventMessage(eventType.getEventType(), dataObj);

在弹窗判断线程中,收到数据后会缓存在队列,示例代码如下:

1
2
3
4
5
6
7
8
typescript复制代码public class SFDataMonitorImpl{
public void trackEvent(String data){
mSFPlanTaskManager.addTriggerTask(new Runnable() {
//判断是否弹窗
PlanManager.ensureShowDialog(data)
}
}
}

在新的线程中去执行此队列的任务,示例代码如下:

1
2
3
4
5
6
7
java复制代码public class SFPlanTriggerRunnable implements Runnable {  
@Override
public void run() {
Runnable downloadTask = mSFPlanTaskManager.getTriggerTask();
mPool.execute(downloadTask);
}
}

这种方案看上去已经很完美了,降低了埋点数据缓存和弹窗业务之间的耦合,并且代码上也做了拆分,互相之间的影响很小,同时也能满足各种业务场景。

但是,在测试过程中,我们发现其实弹窗 SDK 的网络请求是最耗时的一步,为了提高实时性,需要进行优化。

这一步主要是为了请求后端,拿到业务同学配置的弹窗信息。要介绍这一部分的优化,首先需要对弹窗 SDK 运行的流程有所了解,如图 3-2 所示:

图 3-2 弹窗 SDK 运行流程图

大致流程如下:

  1. 弹窗 SDK 初始化后,首先会读取本地缓存的弹窗数据;
  2. 在 App 进入前台时,会请求后端的弹窗数据,请求完成后会把后端返回的弹窗数据和本地的弹窗数据做对比,只取更新的部分;
  3. 接着处理埋点数据的任务。

将埋点数据采集和弹窗判断放到各自的串行队列中,具有如下优点:

  1. 里面的任务会按照我们添加的顺序进行执行;
  2. 一般情况下,不需要考虑并发导致数据不安全的问题。

但是,也有缺点:

  1. 当串行队列中的某一个任务发生阻塞时,其后的任务都会延迟执行,特别是此队列中还存在网络请求的任务。因为需要使用网络请求的结果,所以当网络请求完成后才能继续处理其他任务;
  2. 所有的任务都由一个线程来执行,特别在初始化的时候,串行队列的负担会比较重,除了图 3-2 中的两次 IO 操作,还有其他的 IO 操作,以及弹窗判断等任务。

那么,在此基础上还可以优化吗?答案是肯定的。

3.3. 方案三:抽离数据加载线程

其实,仔细思考之后可知:弹窗数据请求的任务不必和弹窗判断在同一个串行队列。当完成本地弹窗数据读取之后,就可以启动弹窗判断线程。

至于网络请求,本身是不可靠的。在弹窗的业务中,如果有本地数据,那么就用本地数据,不必等到网络的数据返回后再处理弹窗业务。

基于以上的思路,将数据加载线程抽离,读取到本地数据后,就进行业务的处理,详情参考示意图 3-3:

图 3-3 抽离数据加载线程示意图

流程看上去比之前的方案要复杂,同时牵扯到三个线程,但只要捋清楚它们之前的关系,以及在什么时候进行通信,就比较好理解:

  1. 初始化 SDK 后,我们启动了两个线程,分别是数据加载线程和弹窗判断线程,并且让弹窗判断线程处于等待状态,而让数据加载线程去加载本地的弹窗数据;
  2. 加载数据完成后,如果数据不为空,那么启动弹窗判断线程;
  3. 当 App 进入前台时,通知数据加载线程,加载网络请求返回的数据。

这里有一个场景需要特别注意:弹窗数据正在加载中,同时产生了埋点数据。此时需要根据弹窗数据判断埋点数据是否应该弹窗,但是弹窗数据还没有加载成功,应该怎么办呢?

此时,应该让埋点的数据先缓存在队列中。只有当弹窗数据加载完成后,才会执行缓存队列中的任务,这也是弹窗判断线程启动后就让它等待的原因。另外,在数据更新的过程中,因为更新的是同一份数据,所以也需要对这一步加锁。

在初始化 SDK 内部,等到弹窗数据加载成功时,才会启动弹窗判断线程。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
less复制代码readLocalPlanData(new Callback() {
@Override
public void onFailure(int code, String errorMessage) {

}

@Override
public void onResponse(String response) {
//弹窗判断线程启动
new TriggerThread().start();
}
});

其他的代码和方案二中的代码很类似,此处不再展示。

需要注意的是:如果使用了线程池,线程池也会缓存任务。如果有业务场景需要停止线程,那么就不能让线程池缓存任务,可以让线程阻塞执行:

1
2
3
4
5
6
7
java复制代码public class SFPlanTriggerRunnable implements Runnable {  
@Override
public void run() {
Runnable downloadTask = mSFPlanTaskManager.getTriggerTask();
mPool.submit(downloadTask).get();
}
}

这种方案将数据加载线程抽离,解决了网络请求阻塞弹窗判断的场景,它有两个优点:

  1. 使线程的职责更加清晰;
  2. 网络请求弹窗数据,不再阻塞弹窗线程。

当然,也有缺点:

  1. 代码变得比较复杂;
  2. 需要保证并发场景下数据的安全性。

不过,基于目前的方案,既能降低延迟,也能保证业务上的需求,目前看来是一次成功的改造。

  1. 总结

本文讲述了为了保证弹窗的实时性,弹窗 SDK 线程相关方案的演进过程。

弹窗 SDK 的实时性,经过不断努力终于取得了一定的成果。但在实际使用过程中,我们仍然碰到了不少难点:前后台判断的准确性、弹窗被遮盖等问题。不过,我们有信心在不久的未来一定会突破这些难点。

很多场景,只要我们朝着目标不断前行,不断优化自己的方案与代码,总能达成目标。

仔细斟酌,勇于尝试,犹如心有猛虎,细嗅蔷薇。

  1. 参考文献

[1]github.com/sensorsdata…

文章来自公众号——神策技术社区

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Linux 下进行大文件切割与合并

发表于 2021-11-02

往往是因为网络传输的限制,导致很多时候,我们需要在 Linux 系统下进行大文件的切割。这样将一个大文件切割成为多个小文件,进行传输,传输完毕之后进行合并即可。

文件切割 - split

在 Linux 系统下使用 split 命令进行大文件切割很方便

命令语法

1
2
3
4
5
6
7
ini复制代码-a: #指定输出文件名的后缀长度(默认为2个:aa,ab...)
-d: #指定输出文件名的后缀用数字代替
-l: #行数分割模式(指定每多少行切成一个小文件;默认行数是1000行)
-b: #二进制分割模式(支持单位:k/m)
-C: #文件大小分割模式(切割时尽量维持每行的完整性)

split [-a] [-d] [-l <行数>] [-b <字节>] [-C <字节>] [要切割的文件] [输出文件名]

使用实例

1
2
3
4
5
6
7
8
shell复制代码# 行切割文件
$ split -l 300000 users.sql /data/users_

# 使用数字后缀
$ split -d -l 300000 users.sql /data/users_

# 按字节大小分割
$ split -d -b 100m users.sql /data/users_

帮助信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
sql复制代码# 帮助信息
$ split --help
Usage: split [OPTION]... [FILE [PREFIX]]
Output pieces of FILE to PREFIXaa, PREFIXab, ...;
default size is 1000 lines, and default PREFIX is 'x'.

With no FILE, or when FILE is -, read standard input.

Mandatory arguments to long options are mandatory for short options too.
-a, --suffix-length=N generate suffixes of length N (default 2) 后缀名称的长度(默认为2)
--additional-suffix=SUFFIX append an additional SUFFIX to file names
-b, --bytes=SIZE put SIZE bytes per output file 每个输出文件的字节大小
-C, --line-bytes=SIZE put at most SIZE bytes of records per output file 每个输出文件的最大字节大小
-d use numeric suffixes starting at 0, not alphabetic 使用数字后缀代替字母后缀
--numeric-suffixes[=FROM] same as -d, but allow setting the start value
-e, --elide-empty-files do not generate empty output files with '-n' 不产生空的输出文件
--filter=COMMAND write to shell COMMAND; file name is $FILE 写入到shell命令行
-l, --lines=NUMBER put NUMBER lines/records per output file 设定每个输出文件的行数
-n, --number=CHUNKS generate CHUNKS output files; see explanation below 产生chunks文件
-t, --separator=SEP use SEP instead of newline as the record separator; 使用新字符分割
'\0' (zero) specifies the NUL character
-u, --unbuffered immediately copy input to output with '-n r/...' 无需缓存
--verbose print a diagnostic just before each 显示分割进度
output file is opened
--help display this help and exit 显示帮助信息
--version output version information and exit 显示版本信息
The SIZE argument is an integer and optional unit (example: 10K is 10*1024).
Units are K,M,G,T,P,E,Z,Y (powers of 1024) or KB,MB,... (powers of 1000).

CHUNKS may be:
N split into N files based on size of input
K/N output Kth of N to stdout
l/N split into N files without splitting lines/records
l/K/N output Kth of N to stdout without splitting lines/records
r/N like 'l' but use round robin distribution
r/K/N likewise but only output Kth of N to stdout

GNU coreutils online help: <http://www.gnu.org/software/coreutils/>
Full documentation at: <http://www.gnu.org/software/coreutils/split>
or available locally via: info '(coreutils) split invocation'

文件合并 - cat

在 Linux 系统下使用 cat 命令进行多个小文件的合并也很方便

命令语法

1
2
3
4
ini复制代码-n: #显示行号
-e: #以$字符作为每行的结尾
-t: #显示TAB字符(^I)
cat [-n] [-e] [-t] [输出文件名]

使用实例

`# 合并文件

$ cat /data/users_* > users.sql`

帮助信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
sql复制代码# 帮助信息
$ cat --h
Usage: cat [OPTION]... [FILE]...
Concatenate FILE(s) to standard output.

With no FILE, or when FILE is -, read standard input.

-A, --show-all equivalent to -vET
-b, --number-nonblank number nonempty output lines, overrides -n
-e equivalent to -vE
-E, --show-ends display $ at end of each line
-n, --number number all output lines
-s, --squeeze-blank suppress repeated empty output lines
-t equivalent to -vT
-T, --show-tabs display TAB characters as ^I
-u (ignored)
-v, --show-nonprinting use ^ and M- notation, except for LFD and TAB
--help display this help and exit
--version output version information and exit

Examples:
cat f - g Output f's contents, then standard input, then g's contents.
cat Copy standard input to standard output.

GNU coreutils online help: <http://www.gnu.org/software/coreutils/>
Full documentation at: <http://www.gnu.org/software/coreutils/cat>
or available locally via: info '(coreutils) cat invocation'

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

C语言教程01 - 初始C语言01 初始C语言01

发表于 2021-11-02

「这是我参与11月更文挑战的第2天,活动详情查看:2021最后一次更文挑战」


最近,复习了一下C语言,笔者将会在掘金同步更新我的复习进度!各位初学C语言的大一新生,以及想要复习C语言/C++知识的不要错过哦

✈如有错误,欢迎大佬们批评指正!谢谢


初始C语言01

1.c语言标准规定的long long 长度

1
c复制代码sizeof(long long) >=sizeof(long) >= sizeof(int)

2.实型 就是浮点型


3.计算机中的单位

1
2
3
4
5
6
7
c复制代码bit:比特位
byte: 字节 1byte = 8bit
kb = 1024byte
mb = 1024 kb
gb = 1024mb
tb = 1024 gb
pb = 1024 tb

4.一个工程里有且只有一个main函数

多文件情况下也是一样


5.局部变量和全局变量

局部变量 - 在大括号(代码块)内部定义的变量就是局部变量

全局变量 - 在大括号外部定义的变量就是全局变量

作用域

局部变量的作用域:局部变量所在的局部范围

全局变量的作用域:整个工程都可以使用

生命周期

1.程序的生命周期其实和main函数的生命周期一样

2.局部变量的生命周期:进入局部变量所在的范围开始,出局部变量范围生命周期结束

3.全局变量的生命周期:程序的生命周期


6.字面常量

直接写出来的值

1
2
c复制代码如: 3.14
a

7.const修饰的常变量

一个变量不能改变就说明具有常属性 ,但仍是变量,是常变量

例子:

1
2
3
4
5
c复制代码int n = 100;
int arr[n] ={0}; //错误,[]内的必需为常数 ,写成变量是变长数组,而C99才支持变长数组的概念

const int n = 100; //n不是常量,n只是具有常属性,不能被修改而已
int arr[n] = {0}; //错误,

8.#define定义的标识符常量

通过define定义出来的就是真的常量

注意:#define定义后面不跟分号

1
2
3
4
5
6
7
8
9
c复制代码#define Max 100
int main()
{
int a =Max;
int arr[Max] = {0}; //正确,因为#define定义出来的Max是常量
printf("%d\n",a);
Max = 200; //错误 Max为常量,不能被修改
return 0;
}

9.枚举

生活中有些东西可以一一列举

如:性别:男,女,保密 三元色:红绿蓝

1
2
3
4
5
6
7
8
9
10
c复制代码enum Sex		//enum Sex 是枚举类型
{
//枚举的可能取值--枚举常量,可以被赋值,否则第一个数从0开始,数值依次往下+1
MALE,
FEMALE,
SECRET
}; //注意要有分号

注意:枚举常量后面跟的是逗号 不是分号
最后一个枚举常量后面可以不跟逗号,其余要有逗号
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
c复制代码enum Sex
{
MALE,
FEMALE,
SECRET
};
int main()
{
enum Sex s = SECRET; //s是枚举变量,值是SECRET对应的值
s = 10; //正确,s是变量,可以更改s的值,
// MALE = 4; //ERR 因为MALE是常量,不能被修改

//若枚举常量不赋值,默认从0开始,值往下+1
printf("%d\n", MALE); //0
printf("%d\n", FEMALE); //1
printf("%d\n", SECRET); //2
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
c复制代码enum Sex
{
MALE=3,
FEMALE=7,
SECRET
};
int main()
{
printf("%d\n", MALE); //3
printf("%d\n", FEMALE); //7
printf("%d\n", SECRET); //8 //SECRET未被赋值,值顺着FEMALE往下+1
return 0;
}

10.关于变长数组

C99中的概念

编译器一定要支持C99标准,否则运行不了

变长数组不可以初始化

1
2
3
4
5
c复制代码//c99
int n = 10;
int arr[n] = {0}; //err 变长数组不可以初始化

int arr[n]; //正确

11.字符串

字符串末尾默认放\0 ,

image.png

对于数组而言,未初始化部分自动放\0


今天就先到这吧~希望对你有所帮助!欢迎老铁们点个关注订阅这个专题! 同时欢迎大佬们批评指正!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

技能篇:实际开发常用设计模式 创建型 结构型 行为型

发表于 2021-11-02

创建型

单例模式

单例对象能节约系统资源,一个对象的创建和消亡的开销可能很小。但是日常的服务接口,就算是一般小公司也有十几万的QPS吧。每一次的功能运转都创建新的对象来响应请求,十几万对象的创建和销毁,想想就是一笔大开销,所以 spring 管理构造的 bean 对象一般都是单例。而且单例模式可以更好的解决并发的问题,方便实现数据的同步性

  • 优点
    • 在内存中只有一个对象,节省内存空间
    • 避免频繁的创建销毁对象,可以提高性能
    • 避免对共享资源的多重占用,简化访问
    • 为整个系统提供一个全局访问点
  • 缺点
    • 不适用于变化频繁的对象
1
2
java复制代码//饿汉式
private static Singleton singleton = new Singleton();
1
2
3
4
5
6
7
8
java复制代码//懒汉式
private static Singleton singleton;
public static Singleton getSingleton(){
if (singleton == null) {
singleton = new Singleton(); //被动创建,在真正需要使用时才去创建
}
return singleton;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码//双重判断加锁机制
private volatile static Singleton instance;
//程序运行时创建一个静态只读的进程辅助对象
public static Singleton GetInstance() {
//先判断是否存在,不存在再加锁处理
if (instance == null){
synchronized (Singleton.class){
if(instance == null){
instance = new Singleton();
}
}
}
return instance;
}
1
2
3
4
5
java复制代码//静态初始化
private static readonly Singleton instance= new Singleton();
public static Singleton GetInstance(){
return instance;
}

工厂模式

使用者不关心对象的实例化过程,只关心对象的获取。工厂模式使得产品的实例化过程和消费者解耦

  • 优点
    • 一个调用者想创建一个对象,只需通过其名称或其他唯一键值在工厂获取
    • 扩展性高,如果想增加生产一种类型对象,只要扩展工厂类就可以
  • 缺点
    • 工厂类不太理想,因为每增加一产品,都要在工厂类中增加相应的生产判断逻辑,这是违背开闭原则的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码public interface Sender{  public void send();  }  
public class MailSender implements Sender {
@Override
public void send() {
System.out.println("this is mailsender!");
}
}
public class SmsSender implements Sender {
@Override
public void send() {
System.out.println("this is sms sender!");
}
}
public class SendFactory {
public Sender produce(String type) {
if ("mail".equals(type)) {
return new MailSender();
} else if ("sms".equals(type)) {
return new SmsSender();
} else {
return null;
}
}
//若还有其他产品 则在工厂里加对应的 produce 方法
}

建造者模式

主要解决在软件系统中一个复杂对象的创建工作,其通常由各个部分的子对象用一定的算法构成;由于需求的变化,这个复杂对象的各个部分经常面临着剧烈的变化,但是将它们组合在一起的算法却相对稳定

  • 优点
    • 扩展性好,对象每一个属性的构建相互独立,有利于解耦。
    • 建造者可以对创建过程逐步细化,而不对其它模块产生任何影响,便于控制细节风险
  • 缺点
    • 如果对象建造者发生变化,则建造者也要同步修改,后期维护成本较大
    • 一种建造者对应一种类型建造,一个建造者基本很难建造多种类型对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码@Data
class Product {
private String name;
private String price;
// Product 的建造者 Builder
public static class Builder{
public static Builder builder(){
Builder builder = Builder();
}
private Product product = new Product();
public Builder name(String name){ product.name = name; return this;}
public Builder price(String price){ product.price = price; return this; }
//返回产品对象
public Product build() { return product; }
}
}

结构型

适配器模式

连通上下游功能。一般是现有的功能和产品要求的接口不兼容,需要做转换适配。平时见到的 PO,BO,VO,DTO 模型对象之间的相互转换也是一种适配的过程

  • 优点:提高了类的复用,灵活性好
  • 缺点:过多地使用适配器,会让系统非常零乱,不易整体进行把握。比如,明明看到调用的是 A 接口,其实内部被适配成了 B 接口的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码//类的适配器模式
public class Source {
public void sayHello() {
System.out.println("lwl:hello!");
}
}
public interface Targetable {
/* Source方法相同 */
public void sayHello();
/* 新增的方法 */
public void hi();
}
// Source 用 Adapter 适配成 Targetable
public class Adapter extends Source implements Targetable {
@Override
public void hi() {
System.out.println("csc:hi!");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码//对象的适配器模式
public class Source {
public void sayHello() {
System.out.println("lwl:hello!");
}
}
public interface Targetable {
/* Source方法相同 */
public void sayHello();
/* 新增的方法 */
public void hi();
}
// Source的对象适配成 Targetable
public class Adapter implements Targetable {
private Source source;
public Adapter(Source source){ this.source = source; }
public void sayHello(){ source.sayHello(); }
@Override
public void hi() {
System.out.println("csc:hi!");
}
}

装饰器模式

增强对象功能,动态的为一个对象增加功能,而且还能动态撤销。(继承不能做到这一点,继承的功能是静态的,不能动态增删)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public interface Show(){ public void acting(); }
public class Artist implements Show {
public void acting(){
System.out.println("lwl 在唱歌!");
}
}
public class DecoratorArtist implements Show{
Artist artist;
DecoratorArt(Artist artist){
this.artist = artist;
}
public void acting(){
System.out.println("lwl 在弹钢琴!"); //增强的功能
this.artist.acting();
System.out.println("表演完毕!"); //增强的功能
}
}

代理模式

代理类是客户类和委托类的中介,可以通过给代理类增加额外的功能来扩展委托类的功能,这样只需要修改代理类而不需要再修改委托类,符合代码设计的开闭原则

  • 和装饰器模式的区别:代理模式着重于增强类功能,且对面屏蔽原对象的创建过程;装饰器模式增强的是对象,且装饰器模式有一个动态传递原对象的步骤
  • 和对象的适配器模式优点像:不过代理模式着重的是对原功能增强,适配器模式着重的是对新功能的兼容
  • 优点-1、职责清晰。 2、高扩展性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public class Artist implements Show {  
public void acting(){
System.out.println("lwl 在唱歌!");
}
}
public class ProxyArtist implements Show{
Artist artist;
ProxyArtist(){
this.artist = new Artist();//屏蔽了 artist 对象的创建
}
public void acting(){
System.out.println("lwl 在弹钢琴!"); //增强的功能
this.artist.acting();
System.out.println("表演完毕!"); //增强的功能
}
}
public class Demo {
public static void main(String[] arg){
Show show = new ProxyArtist();
show.acting();
}
}

桥接模式

桥接模式侧重于功能的抽象,从而基于这些抽象接口构建上层功能。一般的java 项目都会将接口和实现分离原因,就是基于桥接模式。提高了系统的扩展能力,当引用的底层逻辑有不同的设计实现时,继承抽象接口重新实现一套即可,旧的不变,符合代码设计的开闭原则

  • jdbc 的驱动:常用的JDBC 和 DriverManager,JDBC进行连接数据库的时候,在各个数据库之间进行切换,基本不需要动太多的代码,原因就是JDBC提供统一接口,每个数据库提供各自的实现,用一个叫做数据库驱动的程序来桥接
  • Unix 的文件系统:VFS(virtual File System)使得 Unix 系统可以在不同物理介质上的不同文件系统进行读写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码public interface FileSystem(){ 
public void open(int file);
public String loading(int file);
public void store(int file, String data);
}
//网络上的文件系统
public class NetFileSystem implements FileSystem {
public void open(int file){ System.out.println(" netfile opening...."); }
public String loading(int file) {System.out.println(" net loading ...."); }
public void store(int file, String data) {System.out.println(" send to network ...."); }
}
//磁盘文件系统
public class DiskFileSystem implements FileSystem{
public void open(int file){ System.out.println(" disk opening...."); }
public String loading(int file) {System.out.println(" disk loading ...."); }
public void store(int file, String data) {System.out.println(" write back disk ...."); }
}
public class Linux {
FileSystem fileSystem;
//底层功能提供接口,桥接模式:功能和具体实现分离
//可以桥接 NetFileSystem 或者 DiskFileSystem 作为文件系统
public void set(FileSystem fileSystem){ this.fileSystem = fileSystem; }
//上层功能读数据
public String read(int file){
fileSystem.open(file);
... // Linux 自己的系统功能
fileSystem.loading(file);
...
}
//上层功能写数据
public String write(int file, String data){
fileSystem.open(file);
....
fileSystem.store(file,data);
}
}
  • 可配合适配器模式使用

享元模式

多个对象共享某些属性。在创建有大量对象时,可能会造成内存溢出,把其中共同的部分抽象出来,如果有相同的请求,直接返回在内存中同一份属性,避免重新创建

  • 如 jdbc 连接池的连接对象,它们会共享池对象的 url、driverClassName、username、password 等属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public class ConnectionPool {  
private Vector<Connection> pool;
/*公有属性*/
private String url = "jdbc:mysql://localhost:3306/test";
private String username = "root";
private String password = "root";
private String driverClassName = "com.mysql.jdbc.Driver";
public ConnectionPool() {
pool = new Vector<Connection>(poolSize);
for (int i = 0; i < poolSize; i++) {
Class.forName(driverClassName);
// 每一个 conn 共享了 driverClassName ,url, username, password 等属性
Connection conn = DriverManager.getConnection(url, username, password);
pool.add(conn);
}
}
....
}

外观模式

  • 用多个不同的对象实现一组更复杂的功能。使得类与类之间的关系解耦。如 spring 将使用各个简单的 component、dao 实现复杂的service,就是一种外观模式
  • 功能的组合,组合优于继承
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码public class DAO { 
public void queryData(){
System.out.print(" query data ")
}
}
public class Deal {
public void dealData(){
System.out.print(" dealing data ")
}
}
public class Sender {
public void send(){
System.out.print(" send data ")
}
}
public class Service(){
private DAO dao;  
private Deal deal;  
private Sender sender;
//封装 DAO,Deal,Sender 的功能,统一对外提供服务
public void reponse(){
dao.queryData();
deal.dealData();
sender.send();
}
}

行为型

策略模式

策略模式侧重于不同的场景使用不同的策略。在有多种算法相似的情况下,解决 if…else 所带来的复杂和难以维护

  • 和桥接模式的区别:而桥接模式是结构型模式,侧重于分离底层功能的抽象和实现,底层只有一种实现也可以
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码// 上学的策略
abstract class Strategy{
private static final Map<Integer,Strategy> strategyMap = new ConcurrentHashMap<>();
public Strategy(){
strategyMap.put(getType(), this);
}
public static Strategy routing(int type){
return strategyMap.get(type);
}
abstract int getType();
abstract void method(); //留待子类实现差异
}
//跑路去学校
class RunningStrategy extends Strategy{
int getType() { return 0; }
void method() { System.out.println(" Run to school "); }
}
//公交去学校
class BusStrategy extends Strategy{
int getType() { return 1; }
void method() { System.out.println(" Go to school by bus "); }
}
//飞去学校
class FlyStrategy extends Strategy{
int getType() { return 2; }
void method() { System.out.println(" Fly to school "); }
}
class Context{
//使用不同的策略
void method(int strategy){
Strategy.routing(strategy).method();
}
}

模板方法

和享元模式有一定的相似处,享元模式侧重于属性的共享,而且是结构上的引用,不一定需要继承;而模板方法是共享相同行为,一定有继承行为

  • 区别于策略模式是它有能抽象出来的共同行为,每一个子类再实现有差异细节
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码abstract class AbstractHandler{
// handle是抽象出来的共同逻辑
void handle(String data){
System.out.println("通用逻辑1...");
stepOne(data);
System.out.println("通用逻辑2...");
stepTwo(data);
System.out.println("通用逻辑3...");
}
abstract void stepOne(String data); //留待子类实现差异
abstract void stepTwo(String data); //留待子类实现差异
}
class HelloHandler extends AbstractHandler{
@Override
void stepOne(String data) {
System.out.println("hello: "+data);
}
@Override
void stepTwo(String data) {
System.out.println("hi: "+data);
}
}

迭代子模式

循环处理多个相同对象,用来遍历集合或者数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码//迭代的抽象接口
public interface Iterator {
//前移
public Object previous();
//后移
public Object next();
public boolean hasNext();
}
// 数组的迭代类
public class ArrayIterator implements Iterator {
private Object[] datas;
private int cur = 0;
public ArrayIterator(Object[] datas){
this.datas = datas;
}
public String previous() {
if(cur > 0){ cur--;}
return datas[cur];
}
public Object next() {
if(cur < datas.length-1){ cur++;}
return datas[cur];
}
public boolean hasNext() {
return pos < datas.length-1 ? true :false;
}
}

责任链模式

负责处理上游的传递下来的对象,并传递给下一个处理者

  • 和迭代子模式的区别,责任链模式是多个hander处理同一个data,且 hander 处理具有顺序性,不用全部 hander 处理,可在某一 hander 中断,也可继续传递
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码abstract class Handler<T,R> {
private Handler<R,?> next;
abstract R handle(T data);
public void setNext(Handler<R, ?> next){ this.next = next; }
public void loopHandle(T data){
R result = this.handle(data);
if(next!=null && result!=null ) { next.loopHandle(result); }
}
}
//负责问候
class HelloHandler extends Handler<String, Integer> {
Integer handle(String data) {
System.out.println(data + " hello! ");
return 10;
}
}
//负责计数
class CountHandler extends Handler<Integer, Double> {
Double handle(Integer data) {
System.out.println(" it is " + data);
return 2.0;
}
}
public class demo{
public static void main(String[] args){
HelloHandler hello = new HelloHandler();
CountHandler count = new CountHandler();
hello.setNext(count);
hello.loopHandle("lwl");
}
}

观察者模式

事件通知: 定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知

  • 优点:观察者和被观察者是抽象耦合的
  • 缺点
    • 如果一个被观察者对象有很多的直接和间接的观察者的话,将所有的观察者都通知到会花费很多时间
    • 如果在观察者和观察目标之间有循环依赖的话,观察目标会触发它们之间进行循环调用,可能导致系统崩溃
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码//观察者
public abstract class Observer<T> {
public abstract void update(T data);
}
// 被观察对象
public class Subject<T> {
private List<Observer<T>> observers = new ArrayList<>();
private T state;
public void deal() {
....// 逻辑处理
//如果修改了 state,通知观察者
if(...) notifyAllObservers();
}
//增加一个观察观察
public void observe(Observer<T> observer) {
observers.add(observer);
}
public void notifyAllObservers() {
for (Observer<T> observer : observers) {
observer.update(state);
}
}
}

状态机模式

不同的状态不同的响应,实现状态之间的转移

  • 和策略模式的区别
    • 状态机模式是策略模式的孪生兄弟。策略模式可以让用户指定更换的策略算法,而状态机模式是状态在满足一定条件下的自动更换,用户无法指定状态,最多只能设置初始状态
    • 状态机模式重点在各状态之间的切换,从而做不同的事情;而策略模式更侧重于根据具体情况选择策略,并不涉及切换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码interface State<T> {
//当前状态进行处理数据,并返回下一个状态
abstract State<T> action(T data);
}
@Data
class Context<T>{
private State<T> state;
public void invoke(T data){
state != null ? state = state.action(data) : System.out.println(" nothing " + data);
}
}
// HelloState -> HiState
class HelloState implements State<String>{
public State<String> action(String data) {
System.out.println("hello!" + data);
return new HiState();
}
}
// HiState -> FineState
class HiState implements State<String>{
public State<String> action(String data) {
System.out.println("how are you ?" + data);
return new FineState();
}
}
//最后的状态
class FineState implements State<String>{
public State<String> action(String data) {
System.out.println("I am fine!");
return null;
}
}
public class demo{
public static void main(String[] args){
Context<String> context = new Context<>();
context.setState(new HelloState());
context.invoke("lwl");
context.invoke("lwl");
context.invoke("lwl");
context.invoke("lwl");
}
}

备忘录

记录上一次的状态,方便回滚。很多时候我们是需要记录当前的状态,这样做的目的就是为了允许用户取消不确定或者错误的操作,恢复到原先的状态

  • 缺点:消耗资源。如果类的成员变量过多,势必会占用比较大的资源,而且每一次保存都会消耗一定的内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码@Data
public class Memento {
private String state;
public Memento(String state){ this.state = state; }
}
@Data
public class Storage {
private String value;
public void storeMemento(){
return new Memento(value);
}
public void restoreMemento(Memento memento){
this.value = memento.getValue();
}
public void action(){ System.out.println(" Storage类逻辑运行 ");}

}
public class MementoPatternDemo {
public static void main(String[] args) {
Storage storage = new Storage();
storage.setValue(1);
storage.storeMemento();//备忘,一下
storage.action();//....逻辑运行
restoreMemento(Memento memento);//使用备忘录的恢复状态
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Guava发布订阅组件EventBus

发表于 2021-11-02

这是我参与11月更文挑战的第2天,活动详情查看:2021最后一次更文挑战

Guava是一个谷歌开发的开源库。Guava工程中包含了很多被Google的 Java项目广泛依赖的核心库。
本文主要介绍Guava中的EventBus组件。

EventBus

Eventbus是一种机制,它允许不同的组件在不了解彼此的情况下相互通信。组件可以将Event发送到Eventbus,而不知道谁将接收它,或者有多少其他组件将接收它。组件还可以监听Eventbus上的事件,而不知道是谁发送了事件。这样,组件就可以在不依赖彼此的情况下进行通信。另外,替换一个组件也很容易。只要新组件理解正在发送和接收的Events,其他组件就永远不会知道。

引入依赖

1
2
3
4
5
xml复制代码  <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>

快速使用

创建EventBus

使用Eventbus时,首先,为了注册监听器和发布事件,你需要一个总线:

1
java复制代码EventBus eventBus=new EventBus();

创建监听器

然后需要创建一个监听器,用于订阅EventBus上的事件。

1
2
3
4
5
6
7
8
java复制代码public class MyEventListener{
private static int numberEvents;
private static List<String> eventsList=new ArrayList<>();
@Subscribe
public void myEvent(String event) {
System.out.println(event);
numberEvents++;
}

注册监听器并发布事件

可以看到,处理程序方法使用@Subscribe注解;现在Listener应该被添加到Eventbus和发布事件。下面是一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class EventBusTest {

class EventListener{
private int numberEvents;
@Subscribe
public void myEvent(String event) {
System.out.println(event);
numberEvents++;
}
public int getNumberEvents(){
return numberEvents;
}
}

@Test
public void testEventBusSimpleTest(){
EventBus eventBus=new EventBus();
EventListener listener = new EventListener();
eventBus.register(listener);
eventBus.post("event1");
eventBus.post("event2");
assertEquals(2, listener.getNumberEvents());
}
}

实际案例

是不是很简单!我们可以通过EventBus解决一些实际场景中的问题,在单个JVM中进行异步处理。比如在一些交易系统中,在付款成功以后,需要将结果通知给付款方,同时也要通知收款方。

在这个例子中,我们需要两个监听器,一个用于发送付款成功的消息给用户,一个用于发送消息给收款方。

首先有一个用于表示交易的类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码class Transaction {
private final String transactionName;
private double amount = 0.0;
public Transaction(String transactionName, Double amount) {
this.transactionName = transactionName;
this.amount = amount;
}
public String getTransactionName() {
return transactionName;
}
public double getAmount() {
return amount;
}
@Override
public String toString() {
return "[transaction with name " + transactionName + " and amount " + amount + "]";
}
}

然后建立两个监听器,用于监听交易完成的事件,并作出响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码class SendPaymentListener {
@Subscribe
private void sendToPayment(Transaction transaction) {
System.out.println("【付款方】交易" + transaction.getTransactionName() + "交易完成,金额:" + transaction.getAmount());
// to do something...
}
}

class SendReceiveListener {
@Subscribe
private void sendToPayment(Transaction transaction) {
System.out.println("【收款方】交易" + transaction.getTransactionName() + "交易完成,金额:" + transaction.getAmount());
// to do something...
}
}

有了事件的订阅方之后,我们建立EventBus,并将监听器注册在EventBus上,然后将Event发布。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public class PaymentService {

static class EventBusFactory {
//create an asynch bus in new Thread
private static EventBus eventBus
= new AsyncEventBus(Executors.newCachedThreadPool());
public static EventBus getEventBus() {
return eventBus;
}
}

@Test
public void testPayment() {
//registering listeners
EventBusFactory.getEventBus().register(new SendPaymentListener());
EventBusFactory.getEventBus().register(new SendReceiveListener());
EventBusFactory.getEventBus().post(new Transaction("交易001", 100.0));
}
}

我们执行测试用例后,结果如下:

可以看出,我们并不用显式的去调用SendPaymentListener和SendReceiveListener的方法,EventBus内部便可以替我们完成事件的发布。

小结

Guava EventBus的内容比较简单,所以上面内容也写的很简单,你可以动手试试,体验一下Guava的发布订阅模式。如果对你有所帮助,点个赞就是对我最大的鼓励!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…436437438…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%