开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

简析积分系统

发表于 2021-05-29

为什么需要积分系统

随着互联网的发展,特别是大厂纷纷平台化之后,会员服务渐渐成为各个平台非常重要的组成部分。针对会员进行运营,不论是面向现有会员的精细化运营还是潜在会员的定向运营,都需要非常有针对性的进行设计。这也就意味着面对会员的运营系统的业务会极其的复杂,而在此之下,也就对支撑其的运营系统提出了非常高的要求。但是仔细分析不难发现,会员运营的核心其实是围绕积分进行展开的:通过利用积分,对会员在平台的活动行为和消费行为进行激励与引导,可以带来以下的效果:

  1. 提高潜在会员的转化率和现有会员的重复下单率;
  2. 培养会员的使用习惯和依赖性;
  3. 提升会员的活跃度;
  4. 增加会员对平台活动参与的积极性;

总的来说,积分系统实现了以下方面的作用:

  1. 对会员行为进行强化。这其中包括了
    1. 对潜在会员的导入,比如注册送积分可以快速提升平台的访问量;
    2. 激励现有会员的特定行为,比如消费后评价送积分,会强化会员的评论行为;或者消费积分享折扣,能提高会员的转化率;
    3. 增加会员粘性,比如完成日常任务送积分,以提升会员对平台的依赖性。
  2. 对会员行为进行弱化。比如对发布违规信息扣减积分,可以阻止会员发布违规信息。

从上述例子不难看出,对于会员而言,一共有三类积分行为:积分获取,积分消费,积分扣减。

积分获取

会员通过在平台进行活动,特别是消费活动后,系统会根据积分获取规则自动将积分发放到会员的积分账户。不过需要注意的是,消费活动需要完成支付且不存在退换货的情况下才可以获得积分。积分获取一般有以下几类:

  1. 行为积分获取。包括注册,评级,登录,使用特定支付渠道支付等行为。
  2. 等级积分获取。通过将会员按照等级区分,使不同等级会员拥有不同的权限,在积分获取时使高等级会员获得更多积分,激励会员提升自我等级。
  3. 消费积分获取。会员通过在平台的消费行为将是会员获取积分的主要途径,而这其中一般又会划分为三类
    1. 普通消费积分获取。根据消费的金额,获得一定比例的奖励积分,刺激会员进行消费。
    2. 满额积分获取。消费达到一定数额或者数量,获得一定奖励积分,刺激会员单次消费金额。
    3. 促销积分获取。针对不同日期或者商品的消费,获得额外积分,刺激会员在特定日期或者对特定商品进行消费。

这些积分获取不仅可以单独出现,也可以通过组合累加,设计出非常复杂的各种规则。如,指定时间指定商品双倍积分,VIP 会员万元以上消费双倍积分,新会员首次下单双倍积分,等等规则。

积分消费

会员在获取积分后需要通过消费积分获取各种权益,既引导了会员的行为,又增加了会员的粘性。享受积分带来的好处,还可以刺激会员更多的获取积分。常见的积分消费形式有以下几类

  1. 积分和现金消费。在指定的时间购买指定商品时,可以按照系统设计的规则,将积分按照一定比例一定上限抵扣部分现金使用。
  2. 兑换商品消费。设立商品兑换区,将兑换商品所需积分设定好,会员消费积分直接兑换商品即可,无需再额外支付现金。
  3. 兑换优惠券消费。会员按照规则将积分兑换成优惠券,在之后的消费中再使用。这其中又包括抵扣券和折扣券。
  4. 积分赠送。会员可以选择将自己的积分赠予其他会员。

积分扣减

通过积分扣减对会员行为进行惩罚,影响会员的行为。常见的扣分行为有长时间未登录,提供虚假注册资料,发布违规信息,发布恶意评价,刷单等等。

积分系统的特点

通过对于积分系统的分类,我们会发现积分系统拥有以下这些特点

  1. 不需要太复杂的结构。积分规则再业务上都能很简单进行描述,即使是复杂规则也可以通过多个简单规则组合得到,所以并不需要太复杂的结构。
  2. 不能通过浅显的算法解决业务问题。虽然单一规则,即使是很复杂的那种,在现实上也可以很简单,通过 if else 的组合完全可以满足需求。但是考虑到需要支撑所有业务,这种实现方式将会变得无比繁杂。
  3. 业务逻辑经常变化。除了一些常用规则外,大量规则都是随着运营的需求而随时改变的,因此软件的发布频率,即使使用敏捷开发,也完全不可能跟上需求变动的频率。
  4. 领域专家可以提供支持但是没有技术背景。熟悉业务逻辑和流程的领域专家通常没有技术背景,对于业务需求很可能无法使用技术语言来进行描述。

基于这些特点,在设计积分系统时应该将积分规则进行解耦,使其按照一定数据结构在运行时注入系统。然后对其设计一个运行时解释器,使其能够在系统中生效。通常我们将这个解释器称作规则引擎,而规则引擎就是积分系统的核心。

规则引擎的特点

  1. 申明式。规则引擎应该让我们申明做什么而不是怎么做。通过申明,将复杂的业务逻辑简化为逻辑表述,并对其进行验证。相对于编码,逻辑表述具有更好的可阅读性。
  2. 业务逻辑与数据分离。将数据放在业务对象中,而业务逻辑放在规则中。这样使得几乎不变的业务数据结构和随时都可能改变的业务逻辑解耦开来,方便进行维护。
  3. 可理解的规则。通过对规则建模并配合文档,使其接近自然语言。这样可以使得没有技术背景的领域专家通过规则模型描述业务问题。
  4. 业务知识集中化。将已有规则集中管理,依赖其高可理解性,创建出一个业务知识库,可以起到业务文档的作用。
  5. 解释能力。引擎应该能够解释根据规则做出决策的依据是什么。
  6. 快速。规则引擎应该通过算法,高效地对规则进行匹配。

作为积分系统的核心,实现一个满足以上特点的规则引擎开销会非常的大。不过市面上有大量开源和商用的规则引擎已经存在,在能力不足以实现一个规则引擎,而又要快速上线一个积分系统的时候,选择开源规则引擎或者购买商业规则引擎可能是一个更好的选择。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

手把手教你java线程池动态配置调整

发表于 2021-05-29

解释一下什么是线程池的动态调整?

java程序中使用线程池,在运行过程中动态的调整核心线程数(core_size)、最大线程数(max_size)、存放排队任务的queue大小(blockingQueue size)

为什么会来实现线程池的动态调整?

来自于美团的公众号“Java线程池实现原理及其在美团业务中的实践”中提到的动态化线程池,看到后眼前一亮,还有这种操作?为什么以前没想到过?所有的技术的方案都是业务所遇到的问题而产生出来的,我想美团应该是流量的不均衡,导致有时需要快速扩充处理能力,而不用发布代码产生的一种解决方案吧。

说完背景,我们就直接动手来实现吧

1,要实现运行时动态、那么需要动态的参数需要暴露set方法。美团的文章已经交代了,JDK原生线程池ThreadPoolExecutor提供了如下几个public的setter方法,如下图所示:

image.png

2,即然支持运行时set,那岂不是很简单,我们只需要选择一个可以热更新的组件就可以完成,线程池的动态化,这里我们选择配置中心apollo来作为热更新的组件。

初始化线程池(核心线程数=2,最大线程数=4,等待任务最大为10)

1
2
3
4
5
6
7
8
9
java复制代码 static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
10L, TimeUnit.SECONDS,
new CLinkedBlockingQueue<>(10),
new ThreadFactoryBuilder().setNameFormat("c_t_%d").build(), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("rejected");
}
});

每1s添加一个任务,每个任务执行时间为10S的话,这个线程池的变化应该是什么样的,如下图:

image.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
java复制代码public static void main(String[] args)  {

initApolloConfig();

Config config = ConfigService.getConfig("application");
//apollo 值发生变更添加监听器
config.addChangeListener(new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent configChangeEvent) {
ConfigChange threadCoreSize = configChangeEvent.getChange("thread_core_size");
ConfigChange threadMaxSize = configChangeEvent.getChange("thread_max_size");
ConfigChange queueSize = configChangeEvent.getChange("queue_size");
if (threadCoreSize!=null){
threadPoolExecutor.setCorePoolSize(Integer.parseInt(threadCoreSize.getNewValue()));
}
if (threadMaxSize!=null){
threadPoolExecutor.setMaximumPoolSize(Integer.parseInt(threadMaxSize.getNewValue()));
}
if (queueSize!=null){
CLinkedBlockingQueue cLinkedBlockingQueue =(CLinkedBlockingQueue)threadPoolExecutor.getQueue();
cLinkedBlockingQueue.setCapacity(Integer.parseInt(queueSize.getNewValue()));
}
}
});

System.out.println("threadPoolExecutor init status:");
printThreadPoolStatus();

//每1S添加一个任务
for (int i=0;i<100;i++){
try {
Thread.sleep(1000);
dynamicThreadPoolAddTask(i);
printThreadPoolStatus();
}catch (InterruptedException ex){
ex.printStackTrace();
}
}
}
/***
* 打印当前线程池的状态
*/
private static void printThreadPoolStatus(){
String s=String.format("core_size:%s,thread_current_size:%s;" +
"thread_max_size:%s;queue_current_size:%s,total_task_count:%s",threadPoolExecutor.getCorePoolSize(),
threadPoolExecutor.getActiveCount(),threadPoolExecutor.getMaximumPoolSize(),threadPoolExecutor.getQueue().size(),
threadPoolExecutor.getTaskCount());
System.out.println(s);
}
/***
* 给线程池添加任务
* @param i
*/
private static void dynamicThreadPoolAddTask(int i){
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
System.out.println(i);
}catch (InterruptedException ex){

}
}
});
}

运行,看执行结果,能很清楚看出来线程池core_size->queue->max_size之间的关系,如下图:

image.png

我们搞清楚了线程池的核心参数的关系,接下来我们就来试试动态的更新这些参数。如何更新这些参数,已经在上面的代码片段中有给出,添加apollo的变化监听,然后set对应属性。

我们运行看效果,可以看出来,运行过程中core_size,queue_size,max_size 都发生了变化

image.png

image.png

image.png

细心的读者,应该能看出来,到最后线程池的活跃线程数量始终等于core_size,没有增长到max_size。如果您理解我前面将的核心参数之间的关系,那么这个问题就不是问题了。

还有1个问题是jdk本身的LinkedBlockingQueue 并没有公开capacity 的set方法,所以我们是copy了LinkedBlockingQueue的源码并将公开capacity 的set方法,才得以实现队列的扩容。

好了,关于线程池的动态化配置到这里结束了。保持好奇、实践得真知。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

还在写一大堆if/else吗?快来体验策略模式吧

发表于 2021-05-29

为什么要替代if/else

开发过程中避免不了使用if/else进行逻辑判断,如果逻辑很复杂、判断逻辑会频繁改动或者后期会增加判断逻辑,那么if/else会导致代码很难理解且难以维护,建议大家采用策略模式来代替if/else

策略模式

将非固定功能抽取出来,独立定义接口和实现类,在Manager类中聚合,实现组装类的效果,达到以下效果:

  1. 可读性,编程规范性,便于其他程序员阅读和理解
  2. 可扩展性,可维护性,需要添加新功能时,非常方便,成本低
  3. 可靠性,添加新功能或减少功能后,对原有功能没有影响
  4. 满足开闭原则

示例

假设有这样的一个需求,根据员工上个月迟到和加班次数来计算奖金,如果迟到>=3次就每次扣100元,如果加班>5次就每次加50元,这个需求不管是否合理,只是一种假设。

先定义一个对象来记录迟到和加班的次数

1
2
3
4
java复制代码StaffData staffData = StaffData.builder()
.chidao(3)
.jiaban(10)
.build();

使用if/else的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public static int calc(StaffData staffData) {
int total = 0;
if (staffData.getChidao() != null && staffData.getChidao() >= 3) {
total -= staffData.getChidao() * 100;
}
if (staffData.getJiaban() != null && staffData.getJiaban() > 5) {
total += staffData.getJiaban() * 50;
}
return total;
}

System.out.println("奖金=" + calc(staffData));

如果后期增加了需求,在StaffData中添加请假次数,请假超过3次则每次扣100元的规则,那么需要继续在calc方法中添加if

如果又要新增一个规则,在StaffData中添加迟到次数和基本工资,请假超过5次,迟到超过5次,直接扣基本工资的一半,仍然需要在calc方法中添加代码,最后calc方法会越来越复杂,日积月累之后很难再去维护,修改不当还会导致原先的逻辑发生错误

采用策略模式

先定义一个Policy接口,通常定义canApply方法来进行逻辑判断,定义apply方法执行相应的业务逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public interface Policy {
/**
* 当前策略是否匹配
*
* @param data
* @return
*/
boolean canApply(StaffData data);

/**
* 执行策略
*
* @param data
* @return
*/
int apply(StaffData data);
}

定义一个策略管理类,这个类负责注入所有策略,然后对外提供一个apply方法,遍历调用所有的策略,并做相应的处理,简化外部调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Component
public class PolicyManager {
@Autowired
private List<Policy> policies;

public int apply(StaffData data) {
int total = 0;
for (Policy policy : policies) {
if (policy.canApply(data)) {
total += policy.apply(data);
}
}
return total;
}
}

接下来可以定义需要的策略了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码/**
* 规则如果迟到>=3次就匹配
*/

@Component
public class Test1Policy implements Policy {
@Override
public boolean canApply(StaffData data) {
return data.getChidao() != null && data.getChidao() >= 3;
}

@Override
public int apply(StaffData data) {
// 迟到1次扣100
return -data.getChidao() * 100;
}
}

再定义一个策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码/**
* 如果加班超过5次就匹配
*/

@Component
public class Test2Policy implements Policy {
@Override
public boolean canApply(StaffData data) {
return data.getJiaban() != null && data.getJiaban() > 5;
}

@Override
public int apply(StaffData data) {
// 加班1次加50
return data.getJiaban() * 50;
}
}

最后测试一下逻辑是否正确

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@SpringBootApplication
public class PolicyTest {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(PolicyTest.class, args);
PolicyManager manager = context.getBean(PolicyManager.class);

StaffData staffData = StaffData.builder()
.chidao(3)
.jiaban(10)
.build();
System.out.println("奖金=" + manager.apply(staffData));
}
}

后期添加计算规则时新增一个策略类就可以实现,不用去修改原先写好的策略类,代码也很清晰,各个策略类只负责自己要处理的业务

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

golang社招面试总结(一)|周末学习

发表于 2021-05-29

本文已参与周末学习计划,点击链接查看详情:juejin.cn/post/696572…

先描述一下背景。今年5月中旬入职了现在的公司,base北京。之前作为2019届的应届生在武汉的一家公司工作了不到两年。4月上旬有了换工作的想法,这个想法来的非常不合时宜,其中最有风险的一条是之前没有做任何面试准备。现在入职了两周左右了,整理一下当时经历的面试题目吧,给之后的学习提供点线索,也欢迎大家来讨论。

golang

基础相关

  1. var和:=来定义变量有什么不同?new和make初始化变量有什么不同呢?
  2. defer是做什么用的,如果一个程序里面有多个defer会怎么样?
  3. 切片的底层是怎么实现的?interface的两种用法了解吗?了解interface的底层实现吗?
  4. 怎么用的map,会有并发问题吗?怎么求一个map的长度(len就可以,害)。sync.Map可以用len来求长度吗?(应该是想要问sync.Map的底层吧)
  5. 有缓冲和无缓冲channel的区别
  6. select中如果没有default会出现什么情况?(select会阻塞,直到有case可以执行)case中的通道被关闭了会出现什么情况?(只有这一个的话会死循环,可以用,ok判断一下)
  7. map是有序的吗?map底层实现怎么处理hash碰撞的问题?
  8. recover和panic的实现原理
  9. 是否了解反射
  10. 普通的map加锁使用与直接用sync.map有什么区别?(分别说了使用上和性能上)

并发相关

  1. golang协程的调度,聊聊对GMP模型的理解
  2. 怎么实现并发?
  3. 怎么让协程等待?
  4. 聊聊goroutine泄漏?怎么避免和排查goroutine泄漏的问题?
  5. goroutine的状态机
  6. 聊聊channel的特性,从nil的channel中取数据会发生什么?

其他

  1. 哪些情况下会panic?所有的panic都可以recover吗?recover函数的使用过程,是怎么捕获的?
  2. context是做什么用的?
  3. 有什么办法可以获得函数调用的链路?
  4. 怎么处理协程运行的超时问题?
  5. 怎么优雅地关闭通道?
  6. 从面向对象的角度聊聊java和golang的区别
  7. golang的GC
  8. golang的http路由实现懂吗,可以描述一下吗?
  9. golang程序hang住了可能是什么原因(说了可能死循环导致无法GC,golang1.13),怎么排查(可以用pprof)

数据结构

  1. 数组查找的时间复杂度是多少,为什么?
  2. 了解跳表吗?
  3. 你知道哪些树结构,各自的优缺点分别是什么?(回答的时候再回答一下应用场景就更好了)

操作系统

  1. 用过锁吗?读写锁加读锁的时候会阻塞写操作吗?会阻塞读操作吗?(用过,会,不会)

Linux

  1. 用过ps命令解决过什么问题吗?查询出来的结果各个字段有什么含义?
  2. 了解僵尸进程吗?怎么避免?
  3. 怎么根据pid获取父进程的pid(回答说去/proc下面),怎么根据pid找执行路径呢?怎么根据端口号找pid(回答说lsof -i,用netstat也可以但是我不知道,面试官补充的)

数据库

  1. Redis和MySQL的区别,什么时候用Redis,什么时候用MySQL(这里我说了下我们服务用到的场景)?如果让你设计一个秒杀系统,要怎么设计?(面试官后来提示了一下说redis又一个自增key)

MySQL

  1. 有几种隔离级别?默认级别是什么?可重复读是怎么解决幻读的?了解mvcc嘛?下面几种mysql哪个会用到联合索引
1
2
3
css复制代码Select * from table where a=1 and b=1 and c=1;
Select * from table where a=1 and b>1 and c=1;
Select * from table where a=1 and b>1 order by c
  1. mysql为什么用B+树而不是红黑树
  2. 有一个表,id字段是唯一的,name字段可以重复。写一个SQL语句,查询出所有的name重复的记录。(大致说了select group_concat(id) from xxx group by name)
  3. MySQL中用的索引引擎是什么(innodb),让我说一下innodb的实现(b+树)为什么用b+树而不是红黑树或者b树(红黑是二叉,b树多余存储了业务数据)b+树的搜索时间复杂度(O(logn))。innodb是表锁还是行锁(这个没答上,后面多看看吧,行级锁都是基于索引的,如果一条SQL语句用不到索引是不会使用行级锁的,会使用表级锁)

Redis

  1. 缓存击穿怎么办?布隆过滤器怎么实现?
  2. 有哪几种结构,跳表是用来实现什么的,sds了解吗
  3. redis。用过哪些结构,回答说只用过string。也看过一些用List的(judge、alarm和smsgateway就用过),lpush、lpop是一对吗?
  4. redis是怎么设置过期时间的?

coding

  1. 实现集合
  2. 用数组实现队列
  3. 下面代码的输出
1
2
3
4
5
6
7
8
scss复制代码x := []int{1,2,3}
Func(arr []int){
arr[0] = 7
arr = append(arr, 100)
arr = append(arr, 101)
fmt.Println(arr)
}(x)
fmt.Println(x)
  1. 数据到达100个就发送,如果超过了一定时间还没到100个,也发送掉。
  2. 写一段程序。任务数m个,最大并发是n。有一个任务执行出错所有的goroutine都退出。

算法

  1. 五子棋判赢
  2. 数组中查找重复的数字。长度是n,数组中数字是小于n的非负整数
  3. 最大子序和
  4. 25匹马,5个赛道,需要几次才能得到前三名
  5. 双向链表,每两个反转一下
  6. 浏览器前进后退,怎么设计?(用两个栈)
  7. 快排
  8. 二分查找
  9. 10个ip,3个ip是重复的,乱序ip,并且输出乱序后的ip,要求是:重复的ip不会相邻
  10. 验证一个字符串是不是浮点数
  11. 被三整除的最大和

其他

  1. 从浏览器输入一个url到最后显示页面,中间都发生了什么,越详细越好
  2. 流量劫持懂吗?比如输入baidu.com最后导向了xiaomi.com(诚实说了不懂)
  3. 开放性题目:给一个小时时间,站在公司门口观察,最后预估一天内有多少来面试的人。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SQL 从入门到放弃:ROW_NUMBER() OVER 和

发表于 2021-05-29

最近在写 SQL,菜如老狗的我在大哥的指导下学会了用一些方便的 SQL 函数代替以前繁杂的写法,对于常见的数据场景还是很有用的。

提起 SQL,(从前的)我脑子只有 SELECT、COUNT()、SUM()、JOIN、GROUP BY 等云云。对于较为复杂的数据场景,总是绞尽脑汁的用 GROUP BY 和 JOIN 来实现,却不知有类似功能的 SQL 函数。

下面举个栗子,说说我学到的一些 SQL 函数和简化 SQL 的方法,以 Hive SQL 作为模版。代表因为 SQL 函数和语法大多类似,原理通用,在使用其他 SQL 时参考即可。

一个栗子

(假装)有张订单流水表 t_order_detail,它的结构是这样的:

字段名 字段描述
order_id 订单 id,订单的唯一标识
user_id 用户 id,标识订单所属的用户
merchant_id 商户 id,标识订单所属的商户 id
state 订单状态,具有已确认(0)、已完结(1)、已取消(2)等状态
create_time 记录的创建时间
data_version 版本号,对同一笔订单进行插入、更新操作时,版本号也相应更新

每次对数据库中的订单进行插入、更新操作,系统都会进行上报,新增一条流水到 t_order_detail 中。例如,order_id 为 1001的订单被创建,后续状态被扭转为已完结,在流水表中就会存在下述记录:

order_id user_id merchant_id state create_time data_version
10001 user_1 merchant_1 0 1622249031 1
10001 user_1 merchant_1 1 1622249082 2

然后,(假装有个)产品爸爸想要了解订单和用户情况,需要以下数据:

  • 当日总订单数
  • 当日总购买用户数
  • 当日总的和根据商户分组的订单状态为已完结的订单数和用户数
  • 当日总的和根据商户分组的订单状态为已取消的订单数和用户数

我们把产品爸爸的需求翻译一下,就变成了求下列的当日数据:

  • 根据 order_id 去重的记录总数,total_order_day。
  • 根据 user_id 去重的记录总数,total_user_day。
  • 最新一条记录 state=1、根据 merchant_id 分组以及总的:
    • 根据 order_id 去重的记录总数,total_finish_order_day。
    • 根据 user_id 去重的记录总数,total_finish_user_day。
  • 最新一条记录 state=2、根据 merchant_id 分组以及总的:
  • 根据 order_id 去重的记录总数,total_cancel_order_day。
  • 根据 user_id 去重的记录总数,total_cancel_user_day。

假设我们表的记录现在是这样的,一共有三笔订单,user_1 在 merchant_1、merchant_2 下有2笔订单,订单终态分别为1和2;user_2 在 merchant_1 下有1笔订单,订单终态为2:

order_id user_id merchant_id state create_time data_version
10001 user_1 merchant_1 1 1622249082 2
10001 user_1 merchant_1 0 1622249031 1
10002 user_1 merchant_2 0 1622249011 1
10002 user_1 merchant_2 2 1622249022 2
10003 user_2 merchant_1 0 1622249031 1
10003 user_2 merchant_1 2 1622249082 2

根据上述需求,可以拆分为以下步骤:

  • 由于我们只关注每个订单的最新一条记录(即版本号最大的记录),因此先可以先对表做一次清洗,获得新表数据 t_clean_order_detail。
  • 根据新表获得数据。

如何获得每个 order_id 对应的最新一条记录

清洗过后的理想数据为:

order_id user_id merchant_id state create_time data_version
10001 user_1 merchant_1 1 1622249082 2
10002 user_1 merchant_2 2 1622249022 2
10003 user_2 merchant_1 2 1622249082 2

获取思路是这样的:根据 order_id 分组,在每个分组中取出 data_version 最大的一条。

说到分组,我那贫瘠的 SQL 词库里就只想到了 GROUP BY。

使用 GROUP BY 的写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
sql复制代码-- t_latest_record:获取当天内,根据 order_id 分组,每个分组内的 order_id 以及最大的版本号的数据。
-- t_total_record:获取当天内所有订单流水数据。
-- ${today_begin_time}:变量,代表当天00:00的时间戳。
-- ${today_end_time}:变量,代表当天23:59的时间戳。
SELECT
*
FROM
(
SELECT
${today} AS today,
order_id,
MAX(data_version) AS data_version
FROM
t_order_detail
WHERE
create_time >= ${today_begin_time}
AND create_time <= ${today_end_time}
GROUP BY
order_id
) t_latest_record
LEFT JOIN (
SELECT
*
FROM
t_order_detail
WHERE
create_time >= ${today_begin_time}
AND create_time <= ${today_end_time}
) t_total_record
ON t_latest_record.order_id = t_total_record.order_id
AND t_latest_record.data_version = t_total_record.data_version;

整体思路是:根据 order_id 分组,获得每个订单的 order_id 以及最大的 data_version,再用左连接取到 order_id 和 data_version 相等的记录,即我们要的最新一条记录。

这样看也不是很复杂,但我们运算了2次,让2个临时表做了一次左连接。看起来比较繁琐。有没有函数直接能代替上述过程?

使用 ROW_NUMBER() OVER 的写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
sql复制代码-- t_sorted_order_detail:根据 order_id 分组后,再根据 row_num 排序得到的订单流水数据。
SELECT
*
FROM
(
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY
order_id
ORDER BY
data_version DESC
) AS row_num
FROM
t_order_detail
WHERE
create_time >= ${today_begin_time}
AND create_time <= ${today_end_time}
)t_sorted_order_detail
WHERE
row_num = 1;

SQL 看起来是不是清爽多了?

说明

  • ROW_NUMBER ( ) OVER ( [query_partition_clause] order_by_clause )
    它的作用是,根据某个字段分组,然后根据字段排序,并拿到排序第一条记录。
    PARTITION BY 承担了 GROUP BY 的角色,即根据某些字段分组;ORDER BY 即排序,即根据某些字段对每个分组的数据进行排序。然后 ROW_NUMBER() OVER 这个函数就会为每条记录返回在分组内排好的序号。

因此,根据上述 SQL,我们得到 t_sorted_order_detail 表数据就为:

order_id user_id merchant_id state create_time data_version row_num
10001 user_1 merchant_1 1 1622249082 2 1
10001 user_1 merchant_1 0 1622249031 1 2
10002 user_1 merchant_2 2 1622249022 2 1
10002 user_1 merchant_2 0 1622249011 1 2
10003 user_2 merchant_1 2 1622249082 2 1
10003 user_2 merchant_1 0 1622249031 1 2

然后,我们取每个分组里面 row_num 为1的数据,就得到了版本号最大的数据了。

根据清洗后的表获得数据

现在我们得到了清洗后的表:t_clean_order_detail。它的数据如下(row_num 在此处无用,因此去掉):

order_id user_id merchant_id state create_time data_version
10001 user_1 merchant_1 1 1622249082 2
10002 user_1 merchant_2 2 1622249022 2
10003 user_2 merchant_1 2 1622249082 2

接下来我们进行第二步:根据新表获得统计数据。

如何减少多余运算

首先看两个总数如何获得:

  • 根据 order_id 去重的记录总数,total_order_day。
  • 根据 user_id 去重的记录总数,total_user_day。

以前的我:两个数据,两次运算,每个临时表对应一个数据,然后两个临时表 JOIN 获得两个数据。

实际上,这两个数据的来源相同,计算逻辑相似。当遇到这种情况,就可以合并运算。这样不仅可以提高效率,还能让 SQL 更简洁。

例如,获取这两个总数的 SQL :

1
2
3
4
5
6
7
8
sql复制代码SELECT
COUNT(1) AS total_order_day,
COUNT(DISTINCT (user_id)) AS total_user_day
FROM
t_clean_order_detail
WHERE
create_time >= ${today_begin_time}
AND create_time <= ${today_end_time};

说明

  • COUNT()
    作用为计算满足条件的行数,COUNT(1)返回总行数。
  • DISTINCT
    根据字段去重。

如何对数据进行上卷分析

上卷,用人话来说,就是汇总数据得到总值。在后面的4个数据中,不仅要根据 merchant_id 分组得到小计,还要得到总值。我们来看下如何获取。

因为它们也是逻辑类似,来源相同的数据。因此我们只取其中2个来讲解:

  • 当日最新一条记录 state=2、根据 merchant_id 分组以及总的:
  • 根据 order_id 去重的记录总数,total_cancel_order_day。
  • 根据 user_id 去重的记录总数,total_cancel_user_day。

光看文字比较抽象,拿 t_clean_order_detail 的数据为例,我们的理想数据是:

total_cancel_order_day total_cancel_user_day merchant_id
1 1 mechant_1
1 1 merchant_2
2 2 总

以前的我看到会想:分组,GROUP BY 搞定;还要一个总的,另外计算一个总的结果,然后把结果进行 UNION,完美。

但是,当我了解到了 SQL 自带的多维分析语句和函数,我才知道年轻的我多么幼稚:根本不用两次计算,还要自己整合结果。只要一套组合拳,一次运算就足够解决。

首先看下,不考虑根据 merchant_id 分组的情况,要获取的 total_cancel_user_day、total_cancel_user_day 的 SQL :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
sql复制代码SELECT
COUNT(IF (
state = 2,
1,
null
)) AS total_cancel_order_day,
COUNT(DISTINCT (
IF (
state = 2,
user_id,
null
)
)) AS total_cancel_user_day
FROM
t_clean_order_detail
WHERE
update_time >= ${today_begin_time}
AND update_time <= ${today_begin_time};

跟前面的 SQL 其实很像。

接下来考虑根据 merchant_id 分组和获得总计时的 SQL。利用 SQL 提供的多维分析函数,我们使用 GROUP BY 根据 merchant_id 分组,使用 WITH ROLLUP 得到每个分组的小计和总计,使用 GROUPING 区分每个分组小计和总计的行,使用 DECODE 为记数维度( merchant_id 和总)命名。

得到如下 SQL:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
sql复制代码SELECT
COUNT(IF (
state = 2,
1,
null
)) AS total_cancel_order_day,
COUNT(DISTINCT (
IF (
state = 2,
user_id,
null
)
)) AS total_cancel_user_day,
DECODE(GROUPING(merchant_id),
1, "总",
merchant_id
) AS merchant_id
FROM
t_clean_order_detail
WHERE
update_time >= ${today_begin_time}
AND update_time <= ${today_begin_time}
GROUP BY
merchant_id WITH ROLLUP;

说明

  • IF(boolean testCondition, T valueTrue, T valueFalseOrNull)
    条件返回。testCondition为 true 或者非 NULL 时,返回 valueTrue;否则返回 valueFalseOrNull。
  • DECODE(expression , search , result [, search , result]… [, default])
    DECODE 函数与一系列嵌套的 IF-THEN-ELSE 语句相似。expression 与 search 等依次进行比较。如果 expression 和第 i 个 search 项匹配,就返回第 i 个对应的 result 。如果 expression 与任何的 search 值都不匹配,则返回 default 。
  • ROLLUP
    ROLLUP 对 groupbyClause 进行扩展,可以令 SELECT 语句根据分组的维度计算多层小计,并计算总计。
  • GROUPING
    使用 ROLLUP 中的一个列作为参数,GROUPING 函数在遇到 ROLL UP 生成的 NULL 值时,返回1。即如果这一列是个小计或总计时,GROUPING 返回1,否则返回0。它只能用在 ROLLUP 或者 CUBE 的查询里。

总结

回顾一下前面的三个知识点

  • 如何获得每个 order_id 对应的最新一条记录
  • 如何减少多余运算
  • 如何对数据进行上卷分析

这三个问题,是否有一点思路了?如果没有,那我就给你说声对不起。(卑微)

最后,本人充满热情和耐心教女朋友写 SQL,假一赔十包教包会,请问女朋友在哪里领?

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

女友半夜加班发自拍 python男友用30行代码发现惊天秘密

发表于 2021-05-29

事情是这样的

正准备下班的python开发小哥哥

接到女朋友今晚要加班的电话

并给他发来一张背景模糊的自拍照

如下 ↓ ↓ ↓

)​

敏感的小哥哥心生疑窦,难道会有原谅帽

)​

然后python撸了一段代码 分析照片

分析下来 emmm

)​

拍摄地址居然在 XXX酒店

小哥哥崩溃之余 大呼上当

)​

python分析照片

小哥哥将发给自己的照片原图下载下来

并使用python写了一个脚本

读取到了照片拍摄的详细的地址

详细到了具体的街道和酒店名称

引入exifread模块

首先安装python的exifread模块,用于照片分析

pip install exifread 安装exfriead模块

1
2
3
4
5
6
arduino复制代码PS C:\WINDOWS\system32> pip install exifread
Collecting exifread
Downloading ExifRead-2.3.2-py3-none-any.whl (38 kB)
Installing collected packages: exifread
Successfully installed exifread-2.3.2
PS C:\WINDOWS\system32> pip install json

GPS经纬度信息

其实我们平时拍摄的照片里,隐藏了大量的私密信息

包括 拍摄时间、极其精确 具体的GPS信息。

下面是通过exifread模块,来读取照片内的经纬度信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
python复制代码#读取照片的GPS经纬度信息
def find_GPS_image(pic_path):
GPS = {}
date = ''
with open(pic_path, 'rb') as f:
tags = exifread.process_file(f)
for tag, value in tags.items():
#纬度
if re.match('GPS GPSLatitudeRef', tag):
GPS['GPSLatitudeRef'] = str(value)
#经度
elif re.match('GPS GPSLongitudeRef', tag):
GPS['GPSLongitudeRef'] = str(value)
#海拔
elif re.match('GPS GPSAltitudeRef', tag):
GPS['GPSAltitudeRef'] = str(value)
elif re.match('GPS GPSLatitude', tag):
try:
match_result = re.match('\[(\w*),(\w*),(\w.*)/(\w.*)\]', str(value)).groups()
GPS['GPSLatitude'] = int(match_result[0]), int(match_result[1]), int(match_result[2])
except:
deg, min, sec = [x.replace(' ', '') for x in str(value)[1:-1].split(',')]
GPS['GPSLatitude'] = latitude_and_longitude_convert_to_decimal_system(deg, min, sec)
elif re.match('GPS GPSLongitude', tag):
try:
match_result = re.match('\[(\w*),(\w*),(\w.*)/(\w.*)\]', str(value)).groups()
GPS['GPSLongitude'] = int(match_result[0]), int(match_result[1]), int(match_result[2])
except:
deg, min, sec = [x.replace(' ', '') for x in str(value)[1:-1].split(',')]
GPS['GPSLongitude'] = latitude_and_longitude_convert_to_decimal_system(deg, min, sec)
elif re.match('GPS GPSAltitude', tag):
GPS['GPSAltitude'] = str(value)
elif re.match('.*Date.*', tag):
date = str(value)
return {'GPS_information': GPS, 'date_information': date}

百度API将GPS转地址

这里需要使用调用百度API,将GPS经纬度信息转换为具体的地址信息。

这里,你需要一个调用百度API的ak值,这个可以注册一个百度开发者获得,当然,你也可以使用博主的这个ak

调用之后,就可以将拍摄时间、拍摄详细地址都解析出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ini复制代码def find_address_from_GPS(GPS):
secret_key = 'zbLsuDDL4CS2U0M4KezOZZbGUY9iWtVf'
if not GPS['GPS_information']:
return '该照片无GPS信息'
#经纬度信息
lat, lng = GPS['GPS_information']['GPSLatitude'], GPS['GPS_information']['GPSLongitude']
baidu_map_api = "http://api.map.baidu.com/geocoder/v2/?ak={0}&callback=renderReverse&location={1},{2}s&output=json&pois=0".format(
secret_key, lat, lng)
response = requests.get(baidu_map_api)
#百度API转换成具体的地址
content = response.text.replace("renderReverse&&renderReverse(", "")[:-1]
print(content)
baidu_map_address = json.loads(content)
#将返回的json信息解析整理出来
formatted_address = baidu_map_address["result"]["formatted_address"]
province = baidu_map_address["result"]["addressComponent"]["province"]
city = baidu_map_address["result"]["addressComponent"]["city"]
district = baidu_map_address["result"]["addressComponent"]["district"]
location = baidu_map_address["result"]["sematic_description"]
return formatted_address,province,city,district,location

if __name__ == '__main__':
GPS_info = find_GPS_image(pic_path='C:/女友自拍.jpg')
address = find_address_from_GPS(GPS=GPS_info)
print("拍摄时间:" + GPS_info.get("date_information"))
print('照片拍摄地址:' + str(address))

老王得到的结果是这样的

照片拍摄地址:(‘云南省红河哈尼族彝族自治州弥勒县’, ‘云南省’, ‘红河哈尼族彝族自治州’, ‘弥勒县’, ‘湖泉酒店-A座东南128米’)

云南弥勒湖泉酒店,这明显不是老王女友工作的地方,老王搜索了一下,这是一家温泉度假酒店。

顿时就明白了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
arduino复制代码{"status":0,"result":{"location":{"lng":103.41424699999998,"lat":24.410461020097278},
"formatted_address":"云南省红河哈尼族彝族自治州弥勒县",
"business":"",
"addressComponent":{"country":"中国",
"country_code":0,
"country_code_iso":"CHN",
"country_code_iso2":"CN",
"province":"云南省",
"city":"红河哈尼族彝族自治州",
"city_level":2,"district":"弥勒县",
"town":"","town_code":"","adcode":"532526",
"street_number":"",
"direction":"","distance":""},
"sematic_description":"湖泉酒店-A座东南128米",
"cityCode":107}}

拍摄时间:2021:5:03 20:05:32
照片拍摄地址:('云南省红河哈尼族彝族自治州弥勒县', '云南省', '红河哈尼族彝族自治州', '弥勒县', '湖泉酒店-A座东南128米')

完整代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
python复制代码import exifread
import re
import json
import requests
import os

#转换经纬度格式
def latitude_and_longitude_convert_to_decimal_system(*arg):
"""
经纬度转为小数, param arg:
:return: 十进制小数
"""
return float(arg[0]) + ((float(arg[1]) + (float(arg[2].split('/')[0]) / float(arg[2].split('/')[-1]) / 60)) / 60)

#读取照片的GPS经纬度信息
def find_GPS_image(pic_path):
GPS = {}
date = ''
with open(pic_path, 'rb') as f:
tags = exifread.process_file(f)
for tag, value in tags.items():
#纬度
if re.match('GPS GPSLatitudeRef', tag):
GPS['GPSLatitudeRef'] = str(value)
#经度
elif re.match('GPS GPSLongitudeRef', tag):
GPS['GPSLongitudeRef'] = str(value)
#海拔
elif re.match('GPS GPSAltitudeRef', tag):
GPS['GPSAltitudeRef'] = str(value)
elif re.match('GPS GPSLatitude', tag):
try:
match_result = re.match('\[(\w*),(\w*),(\w.*)/(\w.*)\]', str(value)).groups()
GPS['GPSLatitude'] = int(match_result[0]), int(match_result[1]), int(match_result[2])
except:
deg, min, sec = [x.replace(' ', '') for x in str(value)[1:-1].split(',')]
GPS['GPSLatitude'] = latitude_and_longitude_convert_to_decimal_system(deg, min, sec)
elif re.match('GPS GPSLongitude', tag):
try:
match_result = re.match('\[(\w*),(\w*),(\w.*)/(\w.*)\]', str(value)).groups()
GPS['GPSLongitude'] = int(match_result[0]), int(match_result[1]), int(match_result[2])
except:
deg, min, sec = [x.replace(' ', '') for x in str(value)[1:-1].split(',')]
GPS['GPSLongitude'] = latitude_and_longitude_convert_to_decimal_system(deg, min, sec)
elif re.match('GPS GPSAltitude', tag):
GPS['GPSAltitude'] = str(value)
elif re.match('.*Date.*', tag):
date = str(value)
return {'GPS_information': GPS, 'date_information': date}

#通过baidu Map的API将GPS信息转换成地址。
def find_address_from_GPS(GPS):
"""
使用Geocoding API把经纬度坐标转换为结构化地址。
:param GPS:
:return:
"""
secret_key = 'zbLsuDDL4CS2U0M4KezOZZbGUY9iWtVf'
if not GPS['GPS_information']:
return '该照片无GPS信息'
lat, lng = GPS['GPS_information']['GPSLatitude'], GPS['GPS_information']['GPSLongitude']
baidu_map_api = "http://api.map.baidu.com/geocoder/v2/?ak={0}&callback=renderReverse&location={1},{2}s&output=json&pois=0".format(
secret_key, lat, lng)
response = requests.get(baidu_map_api)
content = response.text.replace("renderReverse&&renderReverse(", "")[:-1]
print(content)
baidu_map_address = json.loads(content)
formatted_address = baidu_map_address["result"]["formatted_address"]
province = baidu_map_address["result"]["addressComponent"]["province"]
city = baidu_map_address["result"]["addressComponent"]["city"]
district = baidu_map_address["result"]["addressComponent"]["district"]
location = baidu_map_address["result"]["sematic_description"]
return formatted_address,province,city,district,location
if __name__ == '__main__':
GPS_info = find_GPS_image(pic_path='C:/Users/pacer/desktop/img/5.jpg')
address = find_address_from_GPS(GPS=GPS_info)
print("拍摄时间:" + GPS_info.get("date_information"))
print('照片拍摄地址:' + str(address))

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一个完整的、全面 k8s 化的集群稳定架构

发表于 2021-05-29

前言

我司的集群时刻处于崩溃的边缘,通过近三个月的掌握,发现我司的集群不稳定的原因有以下几点:

1
2
3
4
5
复制代码1、发版流程不稳定
2、缺少监控平台【最重要的原因】
3、缺少日志系统
4、极度缺少有关操作文档
5、请求路线不明朗

总的来看,问题的主要原因是缺少可预知的监控平台,总是等问题出现了才知道。次要的原因是服务器作用不明朗和发版流程的不稳定。

解决方案

发版流程不稳定

重构发版流程。业务全面k8s化,构建以kubernetes为核心的ci/cd流程。

发版流程

有关发版流程如下:

)

浅析:研发人员提交代码到developer分支(时刻确保developer分支处于最新的代码),developer分支合并到需要发版环境对应的分支,触发企业微信告警,触发部署在k8s集群的gitlab-runner pod,新启runner pod 执行ci/cd操作。

在这个过程中需要有三个步骤:测试用例、打包镜像、更新pod。

第一次部署服务在k8s集群环境的时候可能需要:创建namespace、创建imagepullsecret、创建pv(storageclass)、创建deployment(pod controller)、创建svc、创建ingress、等。其中镜像打包推送阿里云仓库和从阿里云仓库下载镜像使用vpc访问,不走公网,无网速限制。流程完毕,runner pod 销毁,gitlab 返回结果。

需要强调的一点是,在这里的资源资源清单不包含configmap或者secret,牵扯到安全性的问题,不应该出

现在代码仓库中,我司是使用rancher充当k8s多集群管理平台,上述安全问题在rancher的dashboard中由运维来做的。

服务部署逻辑图

有关服务部署逻辑图如下:

)

根据发版流程的浅析,再根据逻辑图可以明确发版流程。在这里看到我司使用的是kong代替nginx,做认证、鉴权、代理。而slb的ip绑定在kong上。0,1,2属于test job;3属于build job;4,5,6,7属于change pod 阶段。并非所有的服务都需要做存储,需要根据实际情况来定,所以需要在kubernetes.sh里写判断。在这里我试图使用一套CI应用与所有的环境,所以需要在kubernetes.sh中用到的判断较多,且.gitlab-ci.yml显得过多。建议是使用一个ci模版,应用于所有的环境,毕竟怎么省事怎么来。还要考虑自己的分支模式,具体参考:Git 分支开发规范手册,Git 提交规范

缺少监控预警平台

构建可信赖且符合我司集群环境的联邦监控平台,实现对几个集群环境的同时监控和预故障告警,提前介入。

监控预警逻辑图

有关监控预警逻辑图如下:

)

浅析:总的来说,我这里使用到的监控方案是prometheus➕shell脚本或go脚本➕sentry。使用到的告警方式是企业微信或者企业邮箱。

上图三种颜色的线代表三种监控方式需要注意。脚本主要是用来做备份告警、证书告警、抓贼等。prometheus这里采用的是根据prometheus-opertor修改的prometheus资源清单,数据存储在nas上。sentry严格的来讲属于日志收集类的平台,在这里我将其归为监控类,是因为我看中了其收集应用底层代码的崩溃信息的能力,属于业务逻辑监控, 旨在对业务系统运行过程中产生的错误日志进行收集归纳和监控告警。

注意这里使用的是联邦监控平台,而部署普通的监控平台。

联邦监控预警平台逻辑图

多集群联邦监控预警平台逻辑图如下:

)

因为我司有几个k8s集群,如果在每个集群上都部署一套监控预警平台的话,管理起来太过不便,所以这里我采取的策略是使用将各监控预警平台实行一个联邦的策略,使用统一的可视化界面管理。这里我将实现三个级别饿监控:操作系统级、应用程序级、业务级。对于流量的监控可以直接针对kong进行监控,模版7424。

缺少日志系统

随着业务全面k8s化进程的推进,对于日志系统的需求将更加渴望,k8s的特性是服务的故障日志难以获取。建立可观测的能过滤的日志系统可以降低对故障的分析难度。

有关日志系统逻辑图如下:

)

浅析:在业务全面上k8s化后,方便了管理维护,但对于日志的管理难度就适当上升了。

我们知道pod的重启是有多因素且不可控的,而每次pod重启都会重新记录日志,即新pod之前的日志是不可见的。当然了有多种方法可以实现日志长存:远端存储日志、本机挂载日志等。出于对可视化、可分析等的考虑,选择使用elasticsearch构建日志收集系统。

极度缺少有关操作文档

建立以语雀–> 运维相关资料为中心的文档中心,将有关操作、问题、脚本等详细记录在案,以备随时查看。

)

浅析:因安全性原因,不便于过多同事查阅。

运维的工作比较特殊,安全化、文档化是必须要保障的。我认为不论是运维还是运维开发,书写文档都是必须要掌握的,为己也好,为他也罢。文档可以简写,但必须要含苞核心的步骤。我还是认为运维的每一步操作都应该记录下来。

请求路线不明朗

根据集群重构的新思路,重新梳理集群级流量请求路线,构建具备:认证、鉴权、代理、连接、保护、控制、观察等一体的流量管理,有效控制故障爆炸范围。

请求路线逻辑图如下:

)

浅析:客户访问https://www.cnblogs.com/zisefeizhu 经过kong网关鉴权后进入特定名称空间(通过名称空间区分项目),因为服务已经拆分为微服务,服务间通信经过istio认证、授权,需要和数据库交互的去找数据库,需要写或者读存储的去找pv,需要转换服务的去找转换服务…… 然后返回响应。

总结

综上所述,构建以:以kubernetes为核心的ci/cd发版流程、以prometheus为核心的联邦监控预警平台、以elasticsearch为核心的日志收集系统、以语雀为核心的文档管理中心、以kong及istio为核心的南北东西流量一体化服务,可以在高平发,高可靠性上做到很好保障。

附:总体架构逻辑图

)

注:请根据箭头和颜色来分析。

浅析:上图看着似乎过于混乱,静下心来,根据上面的拆分模块一层层分析还是可以看清晰的。这里我用不同颜色的连线代表不同模块的系统,根据箭头走还是蛮清晰的。

根据我司目前的业务流量,上述功能模块,理论上可以实现集群的维稳。私认为此套方案可以确保业务在k8s集群上稳定的运行一段时间,再有问题就属于代码层面的问题了。这里没有使用到中间件,倒是使用到了缓存redis不过没画出来。我规划在上图搞定后再在日志系统哪里和转换服务哪里增加个中间件kafka或者rq 看情况吧。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一次Orika封装导致的疯狂GC和锁等待问题排查

发表于 2021-05-29

背景

个人维护的一个后台服务使用了Orika框架去处理bean对象之间的映射,并对Orika框架做了简单的封装,减少冗余代码。这里简单说明下Orika框架的原理,Orika框架是在运行时动态创建字节码,生成最小开销的映射器来实现Bean对象之间映射的。其他Bean映射的框架有Mapstruct、BeanCopier等。

问题

后台服务在内部测试环境运行一段时间后,突然发现部分接口响应很慢,而且接口越刷越慢,平均接口耗时十几秒,造成前端接口响应超时,刷不出数据。

问题排查

初步排查

针对相关接口业务逻辑做初步排查后,怀疑耗时主要Orika的bean映射过程。是Orika框架有bug?还是封装的Orika工具类有隐藏的问题?

深入排查

1) 封装的Orika工具类处理Bean映射时耗时突然变长,可能有线程阻塞。

本地启动服务,使用jmeter去本地测试验证,每秒100线程请求并发,循环5次。借助arthas工具,使用thread -b 命令查看当前服务是否有阻塞的线程。

jmeter

Orika阻塞

经过测试,确实存在阻塞的线程,由Orika的DefaultMapperFactory.registerClassMap方法引起了线程阻塞。

2)分析DefaultMapperFactory.registerClassMap源码

这块代码是Orika生成映射器的主要逻辑,方法代码块加了synchronized 重量级锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
kotlin复制代码public synchronized <A, B> void registerClassMap(ClassMap<A, B> classMap) {
this.classMapRegistry.put(new MapperKey(classMap.getAType(), classMap.getBType()), classMap);
//判断映射器是否正在构建中或者是否已经构建过,若是则重新构建注册
if (this.isBuilding || this.isBuilt) {
MappingContext context = this.contextFactory.getContext();
try {
if (classMap.getUsedMappers().isEmpty()) {
classMap = classMap.copyWithUsedMappers(this.discoverUsedMappers(classMap));
}
this.buildMapper(classMap, this.isBuilding, context);
this.buildObjectFactories(classMap, context);
this.initializeUsedMappers(classMap);
this.mapperFacade.factoryModified(this);
} finally {
//释放缓存中映射bean的类型、类属性等信息
this.contextFactory.release(context);
}
}
}

这里简单说下synchronized锁,锁的原理是基于Java 对象头和 Monitor实现的。

当多个线程同时请求资源临界区时,对象监视器会设置几种状态用来区分请求的线程,涉及线程状态流转可以简单理解下图:

Synchronize

通过线程状态流转可以看出使用了synchronized锁,在并发情况下会有锁等待,这个锁的效率比较低。

由此可以分析,确实因为DefaultMapperFactory.registerClassMap方法阻塞引起的耗时。

但是为什么会不断走到这个动态生成映射器的方法呢?有可能是封装的Orika工具类导致的问题。

3)封装的MapperUtil工具类分析

封装的工具类封装了Orika的基础操作,并缓存了Orika的mapperFacade。

工具类完整代码地址:gitee.com/LuXianSheng…

经过分析,是代码在特殊情况下本地缓存穿透,不断执行classMapBuilder.byDefault().register() 去注册映射器。核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
kotlin复制代码/**
* 获取自定义映射
*
* @param toClass 映射类
* @param dataClass 数据映射类
* @param targetPropertyMap 处理字段名称不一致 (key 原对象属性 value 目标对象属性)
* @return 映射类对象
*/
private fun <E, T> getMapperFacade(
toClass: Class<E>,
dataClass: Class<T>,
targetPropertyMap: Map<String, String> = emptyMap(),
excludes: List<String> = emptyList(),
converterMap: Map<String, String> = emptyMap()
): MapperFacade {
val mapKey = dataClass.canonicalName + "_" + toClass.canonicalName
var mapperFacade = CACHE_MAPPER_FACADE_MAP[mapKey]
//此处缓存穿透
if (mapperFacade != null && excludes.isEmpty() && converterMap.isEmpty()) {
return mapperFacade
}

//排除属性
var classMapBuilder = MAPPER_FACTORY.classMap(dataClass, toClass)
excludes.forEach {
classMapBuilder = classMapBuilder.exclude(it)
}

//属性映射
targetPropertyMap.forEach {
val converterId = converterMap[it.key]
if (converterId != null) {
classMapBuilder.fieldMap(it.key, it.value).converter(converterId).add()
} else {
classMapBuilder.field(it.key, it.value)
}
}

classMapBuilder.byDefault().register()
mapperFacade = MAPPER_FACTORY.mapperFacade
CACHE_MAPPER_FACADE_MAP[mapKey] = mapperFacade
return mapperFacade
}

问题的主要是原因找到了,可是还存在一个问题,为什么接口会越刷越慢?如果不断注册生成映射器平均耗时是相近的话,应该不会出现很多线程处理在队列排队情况。可以注意到Orika是不断动态生成class加载进jvm,是不是jvm引起的呢?

4) JVM排查

获取服务运行的GC日志,将GC日志通过GCeasy在线分析工具分析,可视化结果如下:

jvm1

上图主要展示JVM的内存分布,展示了年轻代、老年代、元空间以及JVM给分配的大小和程序运行过程中使用的峰值大小。

jvm2

上图主要展示jvm的一些关键性指标,如下

  • Throughput表示的是吞吐量
  • Latency表示响应时间
  • Avg Pause GC Time 平均GC时间
  • Max Pause GC TIme 最大GC时间

jvm3

上图Interactive Graphs展示了运行交互的一些指标数据,如下:

  • Heap after GC:GC之后堆的使用情况
  • Heap before GC:GC之前堆的使用情况
  • GC Duration:GC持续时间
  • Reclaimed Bytes:GC回收掉的垃圾对象的内存大小
  • Young Gen:年轻代堆的使用情况
  • Old Gen:老年代堆的使用情况
  • Meta Space:元空间的使用情况
  • A & P:每次GC的时候堆内存分配和晋升情况。其中红色的线表示每次GC的时候年轻代里面有多少内存(对象)晋升到了老年代。

jvm4

上图是显示一些GC的统计信息。每种GC总共回收了多少内存、总共用了多长时间、平均时间、以及每种GC的单独统计信息等。

jvm5

上图显示的是GC的一些原因信息。

通过以上数据分析,可以看出年轻代内存空间不足,申请扩容失败,年轻代GC次数频繁,但是GC后内存回收效果不明显,元数据数据大小有些异常。

5)本地测试验证

本地安装的jdk是1.8,jvm配置是按照默认配置。本地模拟请求100qps,使用jstat -gcutill 命令查询jvm情况

jvm6
jvm7
上图可以看到1秒的时间就触发了一次GC,随着QPS不断增加,GC越频繁,实际上应用不应该出现短时间频繁GC。

测试jvm元数据改

元数据起初有突峰变化,后面增长缓慢,主要是锁等待导致的。

6)分析

元数据不断的增加,随着请求量越多,元数据会超过阈值,年轻代不断新建映射器的class对象,请求越多,GC的次数越频繁,最终年轻代申请扩容失败OOM 导致接口响应处理越来越慢以及接口响应超时。

结果

综合来看,主要是封装的MapperUtil工具的本地的缓存穿透,不断的执行classMap映射器方法,动态生成很多class类,同时造成synchronized锁争抢和线程阻塞排队等待、jvm的年轻代频繁gc以及元数据的增大,造成结果响应越来越慢。

说明

这边在gitee提交了一个Orika的测试代码,可以复现文章提到的问题,有兴趣的可以本地运行分析验证。

仓库地址:gitee.com/LuXianSheng…

Jemter测试脚本:放在仓库项目中的resource目录

注意事项:一开始可能调用接口很快,没能那么快复现文章提到的问题,因为仓库的项目只是一个测试代码,可以调搞QPS循环测试,可以发现接口响应会越来越慢。

作者:slagsea

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MySQL 数据表优化设计(二):数值字段类型如何选择?

发表于 2021-05-29

在MySQL 中有两种数值类型,整型和实数(即带有小数的数字)。整型可以通过 TINYINT、SMALLINT、MEDIUMINT、INT 或 BIGINT 表示,分别对应8bit、16bit、24bit、32bit 和64bit 的存储空间。实数可以通过 FLOAT、DOUBLE 和 DECIMAL 表示,其中 FLOAT 和 DOUBLE 即通常所说的浮点数,由于计算机二进制存储的精度问题,其计算得到的是近似的结果。

整型字段类型设计

整型可以选择是否是无符号数,这样可以禁止使用负数,并且可以将存储的最大值翻倍,例如 TINYINT UNSIGNED类型的字段可以存储的值的范围是0-255,而不是有符号的-128-127。使用无符号还是有符号取决与字段类型的范围,但是如果确定字段是无符号的,那么优先选择无符号类型,因为最大值翻倍可以在一定程度上避免使用更大存储空间的整型类型。

MySQL 允许我们指定整型的宽度,例如 INT(11)。这对于大部分应用并没有太大意义——实际上 MySQL 并不会限制合法值的范围,也就是即便指定为 INT(11),实际上只要数值类型的存储空间足够,也可以存储超过设定宽度的整型,例如 INT(12)的数字也可能可以存储到 INT(11)类型的字段中。对于存储和计算而言,INT(1)和 INT(20)是等效的。

如果确实要考虑性能,应该是根据业务真实的数值范围来确定使用整型字段类型,原则是只要字段类型不会超过业务系统的数值最大值,那么尽可能地选择低存储长度的整型类型。如果是无符号的数值,那就尽量使用无符号属性。例如,假设使用整型替代枚举的话,如果枚举数量不超过255个,那就优先使用 UNSIGNED TINYINT 类型。

实数

实数不仅仅可以用来存储浮点数,实际上还可以使用 DECIMAL 类型来存储超出 BIGINT 类型的数值。对于 浮点数,MySQL 支持精确浮点数类型和不精确浮点数。

FLOAT 和 DOUBLE 类型支持标准数学运算的近似运算,浮点数的实际计算结果的精确度依赖于实现浮点数的平台。DECIMAL 类型用于存储精确的浮点数,在 MySQL 5.0以后,DECIMAL 也支持精确的数学运算(更早的版本实际上是使用浮点数来进行 DECIMAL 运算的)。但是,由于 CPU 本身不能直接精确计算浮点数,因此 DECIMAL 数据类型的计算速度会比浮点数要慢。

浮点数和 DECIMAL 都支持指定精度。DECIMAL 类型的可以分别指定小数点前后最大的数字位数,这会影响数据列的存储空间占用。MySQL5.0版本以后将数字位以二进制形式存储(每9位数使用4个字节存储)。例如 DECIMAL(18, 9)将在小数点两侧均为9位数字,算上小数点(占一个字节),总共需要9个字节来存储。DECIMAL 最大的数字位数是65(包含小数位和整数位),例如下面的表表创建语句会报错提示#1426 - Too-big precision 66 specified for 'number'. Maximum is 65.:

1
2
3
4
sql复制代码CREATE TABLE t_numbers ( 
id INT(11) AUTO_INCREMENT PRIMARY KEY,
number DECIMAL(66,1)
);

对于 FLOAT 和 DOUBLE类型,也可以类似 DECIMAL 那种方式指定整数位和小数位来确定存储范围和精度。不同的长度会使得 MySQL默认选择不同的数据类型并使用近似值存储数据。FLOAT 类型的存储长度固定为4个字节, DOUBLE 类型的存储长度固定为8个字节。精度是不确定的,指定精度对存储空间并没有帮助,因此从计算准确度考虑,建议是不要指定精度。在内部计算的时候,MySQL 会选择使用 DOUBLE类型计算 FLOAT 类型的数据。

由于 DECIMAL 占据的空间更大以及计算资源消耗也更大,因此建议只有在需要精确表示数值的情况下选择使用 DECIMAL(例如金融数据,如金额)。如果考虑计算性能,也可以考虑使用 BIGINT 来存储精确的浮点数,例如将金额统一乘以固定的倍数转换为 BIGINT 进行运算,这种方式的计算效率和存储空间都会更小。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Springboot actuator不可不注意的安全问题-

发表于 2021-05-29

Springboot actuator 能做什么,不再这里详细介绍,这里主要讲actuator带来的安全问题。

1. 通过httptrace 越权

httptrace是该应用每一次被http访问的记录,包含每次request的cookie、请求参数、等。设想一下如果拿到用户请求的cookie,那么就可以登录该用户的账户进行任意操作。

第一步,创建springboot 项目、pom引入actuator

1
2
3
4
java复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

第二步,在application.properties 中增加暴露所有端点配置,默认只暴露(/health)

management.endpoints.web.exposure.include=*

第三步,注入HttpTraceRepository Bean,httptrace的存储方式

1
2
3
4
5
6
7
8
9
10
java复制代码@Configuration
public class APPConfig {

@Bean
@ConditionalOnMissingBean(HttpTraceRepository.class)
public HttpTraceRepository traceRepository(){
return new InMemoryHttpTraceRepository();
}

}

看一下结果,暴露的有哪些端点,访问(http://127.0.0.1:9999/actuator/)

image.png

访问httptrace看一下有哪些信息http://127.0.0.1:9999/actuator/httptrace

image.png

如果这个cookie 是用户登录态cookie,那么就可以模拟该用户做任意操作了。

2. 通过heapdump 获取数据库密码,完成脱库

第一步,通过访问http://127.0.0.1:9999/actuator/env 查看应用配置信息

image.png

Json format 一下,搜索datasource,可以看到数据库账号、链接字符串,密码被隐藏掉了。所有和密码相关的actuator 都会在输出的时候替换成***。

image.png
第二步,通过配置文件不能看到密码,那就只能通过堆来看看能不能找到,因为我们所有的java对象都存在于堆上,这里可以看出来使用的使用阿里巴巴的的Druid连接池,所以数据库配置相关也应该在这个class 的实例上。

通过http://127.0.0.1:9999/actuator/heapdump 会执行dump并下载,然后我们通过Jhat 来分析该dump文件

image.png
访问http://127.0.0.1:7000/ 查看heapdump分析

在页面control+f 搜索datasource DataSourceInitializationMode

image.png
点进去经过2层就能找到heap中数据库的账号和密码以及url
image.png
到这里,我们就能成功拿到链接数据库的所有信息了。当然规范一点的公司用于程序链接的账号 密码 host是隔离的,也就是说就算拿到对的账号和密码,host是机房内才能访问,那么你依然无法链接数据库。如果用的是公网链接那么就发生严重的安全事故了。

数据安全是企业的根本,安全事故往往就发生在这些细节上,安全无小事,研发需谨慎。

上面的安全问题有很多种解决办法
1、通过应用本身的配置,只开放基础的端点,例如健康检查,服务中心会用来检测心跳。

2、通过引入auth,访问敏感端点授权。

3、通过软负载限制,如有使用例如nginx来做软负载,那么可以在nginx设置访问规则,访问敏感的端点,直接拒绝。

4、存储层本身的账号安全控制,例如统一引入加密机制。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…658659660…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%