开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

不能再等啦,手撕LRU、LFU,今天你必须学会!!! 分享大

发表于 2021-07-21

「本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!」

前言

就目前情况而言,只要你简历上敢写Redis,大厂面试官就一定敢叫你手写LRU,但对于手写LFU相对而言还是比较少见,但如果,你写完LRU还能对面试官说你还掌握LFU算法,那么即使你写不出来只说出个大概,那么在面试官心中也是大大加分的!!!

那么,今天就让我来帮助大家掌握这个高频考点吧!!!

==温馨提示==:主要理解思路在注释中
注释有点浅,大家可以复制到自己的编译器中好好看看

大厂面试题专栏,里面有更多更优质的大厂面试题,可以来看一看瞅一瞅哦!!!


手撕LRU

LRU总体上还是比较简单的,只要你掌握概念,即使长时间没写也还能记得!

LRU总体上思路:最近使用的放在最近的位置(最左边),那么保证了这个最少使用的就自然而然离你远了(往右边去了)

所以说使用双向链表来实现,但光有这还不够,我们还需要使用Map来将key映射到对应的节点

在双向链表中,使用虚拟头节点和尾节点来作为节点,这样在添加节点和删除节点的时候就不需要检查相邻的节点是否存在。

在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
java复制代码public class Main {


//定义双向链表
class DoubleLinkedList {
private int key;
private int value;
private DoubleLinkedList next;
private DoubleLinkedList prev;

public DoubleLinkedList() {
}

public DoubleLinkedList(int key, int value) {
this.key = key;
this.value = value;
}
}


//虚拟头节点,尾节点
private DoubleLinkedList head,tail;
//存放映射的HashMap
private Map<Integer,DoubleLinkedList> map = new HashMap<>();
//容量
private int capacity;
//实际元素个数
private int size;

//数据初始化
public Main(int capacity) {
this.capacity = capacity;
head = new DoubleLinkedList();
tail = new DoubleLinkedList();
head.next = tail;
tail.prev = head;
this.map = new HashMap<>();
}

//对外暴露的get方法
public int get(int key) {
//如果不存在key,则返回-1
if (!map.containsKey(key)) {
return -1;
}
//来到这,说明有这个key,那么就给put出来
DoubleLinkedList node = map.get(key);

//因为使用过了该节点,那么就需要将其移动到头节点
moveToHead(node);
//返回value值
return node.value;
}

//对外暴露的put方法
public void put(int key,int value) {

//如果存在该key
if (map.containsKey(key)) {
//那么get出来,替换value值,因为使用过,所以移动到头节点
DoubleLinkedList node = map.get(key);
node.value = value;
moveToHead(node);
}else {
//因为不存在该key
//所以创建新的节点
DoubleLinkedList newNode = new DoubleLinkedList(key, value);

//将节点加入到map中去,并将其添加到头节点,元素总数量加1
map.put(key,newNode);
addHead(newNode);
size++;

//如果元素总数量大于最大容量
if (size > capacity) {
//那么删除最后一个节点,也就是最少使用的节点
removeLastNode();
}
}
}

//删除末尾节点
private void removeLastNode() {
//tail为虚拟尾节点,所以它前面的节点就是最后一个节点
DoubleLinkedList lastNode = tail.prev;

//删除节点
removeNode(lastNode);
map.remove(lastNode.key);
size--;
}

//移动节点到头节点
private void moveToHead(DoubleLinkedList node) {
removeNode(node);
addHead(node);
}

//添加节点到头节点
private void addHead(DoubleLinkedList node) {
node.prev = head;
node.next = head.next;
head.next.prev = node;
head.next = node;
}

//删除节点
private void removeNode(DoubleLinkedList node) {
node.next.prev = node.prev;
node.prev.next = node.next;
node.prev = null;
node.next = null;
}

}

力扣:LRU地址


手撕LFU

LFU就不像是LFU那样根据最近使用之类的来编排
LFU是根据使用的频次,简单来说是根据使用的次数来编排(在保证次数的情况下,根据最近使用来排),所以对于链表节点需要多定义一个使用的次数count

在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
java复制代码public class Main {


//定义链表
class Node {
private int key;
private int value;
private int count;
private Node next;
private Node prev;

public Node() {
}

public Node(int key, int value,int count) {
this.key = key;
this.value = value;
this.count = count;
}
}


//key与对应的Node节点映射
private Map<Integer,Node> keyMap;
//频次与其对应区域首节点的映射
private Map<Integer,Node> cntMap;
//虚拟头节点、尾节点
private Node head,tail;
//最大容量
private int capacity;
//元素实际个数
private int size;


//初始化
public Main(int capacity) {
this.capacity = capacity;
this.head = new Node();
this.tail = new Node();
head.next = tail;
tail.prev = head;
keyMap = new HashMap<>();
cntMap = new HashMap<>();
}


//对外暴露get方法
public int get(int key) {
//容量为0,或者不存在key则返回-1
if (capacity == 0 || !keyMap.containsKey(key)) {
return -1;
}
//去除key对应的节点
Node node = keyMap.get(key);

//更新使用频次
updateNode_fre(node);
//返回value
return node.value;
}

//更新节点使用频次
private void updateNode_fre(Node node) {
//旧的使用频次
int oldCnt = node.count;
//新的使用频次
int newCnt = node.count + 1;

//需要保存在哪个节点前插入的
//因为插入的位置可能是区域的首节点,也可能不是
Node next = null;

//如果当前节点是该使用频次的首节点
if (cntMap.get(oldCnt) == node) {

//如果当前区域还有下一个节点,那么直接把当前区域的首节点设置为下一节点就好了
if (node.next.count == node.count) {
cntMap.put(oldCnt,node.next);
}else {
//如果没有下一个节点了,那么就需要删除此区域了。因为该节点频次要更新了
cntMap.remove(oldCnt);
}

//如果不存在新区域
if (cntMap.get(newCnt) == null) {
//那么创建新区域,并将该节点设置为首节点
cntMap.put(newCnt,node);
//频次要更新
node.count++;
//不需要后续操作了
return;
}else {
//如果存在,那么就需要删除该节点,将next设置为新区域首节点
removeNode(node);
next = cntMap.get(newCnt);
}
}else {
//如果不是,那么删除节点
removeNode(node);
//如果存在新使用频次的区域则 next就等于新使用区域频次的首节点
//如果没有,那么在当前使用频次前面插入就ok了
if (cntMap.get(newCnt) == null) {
next = cntMap.get(oldCnt);
}else {
next = cntMap.get(newCnt);
}
}

//能来到这的node,频次都是没有更新的,所以更新频次
node.count++;
//更新频次映射
cntMap.put(newCnt,node);
//在指定节点前插入
addNode(node,next);
}


//对外暴露put方法
public void put(int key,int value) {
//如果容量为0,则不能put键值对
if (capacity == 0) return;

//如果存在key
if (keyMap.containsKey(key)) {
//取出节点,替换value,更新
Node node = keyMap.get(key);
node.value = value;
updateNode_fre(node);
}else {

//如果元素个数等于了最大容量
if (size == capacity) {
//那么就需要删除莫尾节点
deleteLastNode();
}

//创建新节点,默认使用次数为1
Node newNode = new Node(key, value,1);
//因为是新节点,所以插入到count=1区域的首部
Node next = cntMap.get(1);
//但如果没有这个区域,那么直接插入到
if (next == null) {
next = tail;
}

//插入、更新
addNode(newNode,next);
keyMap.put(key,newNode);
cntMap.put(1,newNode);
size++;
}
}

//添加节点
private void addNode(Node node, Node next) {
node.next = next;
node.prev = next.prev;
next.prev.next = node;
next.prev = node;
}

//删除末尾节点
private void deleteLastNode() {
Node lastNode = tail.prev;
//如果该节点是此区域的唯一一个节点,那么需要将此区域删除
if (cntMap.get(lastNode.count) == lastNode) {
cntMap.remove(lastNode.count);
}
removeNode(lastNode);
keyMap.remove(lastNode.key);
size--;
}

//删除节点
private void removeNode(Node node) {
node.next.prev = node.prev;
node.prev.next = node.next;
node.next = null;
node.prev = null;
}

}

力扣:LFU地址


最后

我是 Code皮皮虾,一个热爱分享知识的 皮皮虾爱好者,未来的日子里会不断更新出对大家有益的博文,期待大家的关注!!!

创作不易,如果这篇博文对各位有帮助,希望各位小伙伴可以==一键三连哦!==,感谢支持,我们下次再见~

分享大纲

大厂面试题专栏


在这里插入图片描述

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

JDK18新特性

发表于 2021-07-21

一. 接口

以往的接口中只能有抽象方法,不能有其他方法,子类必须要实现接口中的所有方法。

现在在接口中可以定义静态方法并实现,可以直接通过接口.方法名来进行调用。

在接口中可以定义默认方法,并且自己实现,子类可以不实现这些默认方法,默认方法使用default关键字进行定义。

image.png

二.函数式接口

接口中只有一个抽象方法的接口叫做函数式接口,JDK中提供了四种基础的函数式接口

  1. Consumer 消费型接口【只接收,不返回】 void accept(T t)
  2. Supplier 供给型接口【只返回,不接收】 T get()
  3. Function 函数型接口 【有输入,有输出,标准函数类型】 R apply(T t)
  4. Predicate 断言型接口【对传入的值进行判断】 boolean test(T t)

三.Lambda表达式

Lambda表达式用于简化匿名内部类的书写,尤其是对于函数式接口来说,更是大大的减少了我们需要写的代码,并且可读性也更好。

基本格式:

1
2
3
4
5
rust复制代码(参数列表)->{return 实际方法体}
如果方法只有一个参数a,且方法体只有一行,那么可以继续简写:a->方法体

为什么省略掉了修饰符和返回值?因为Lambda表达式主要用于函数式接口,其中只有一个唯一的抽象方法,
修饰符和返回类型都是唯一确定的。

举例:

1
2
3
4
5
6
7
8
9
typescript复制代码        //传统的匿名内部类
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println("接收了:"+s);
}
};
//使用Lambda表达式
Consumer<String> consumer1 = s-> System.out.println(s);

一个特殊的表达式 ::

1
2
3
4
5
6
7
8
9
10
11
ini复制代码     //1. 对象::实例方法    方法体中的方法和函数式接口中的方法返回值和参数个数都相同,可以进行简化
Consumer<String> consumer = (s)-> System.out.println(s);
Consumer<String> consumer1 = System.out::println;

//2. ::静态方法 函数式接口的参数和返回类型和方法体中调用方法的参数和返回体类型相同。
Comparator<Integer> comparator = (o1,o2)->Integer.compare(o1,o2);
Comparator<Integer> comparator1 = Integer::compareTo;

//3. :: 实例方法
Function<People,String> function = e-> e.getName();
Function<People,String> function1 = People::getName;

四.stream()流

stream流是jdk1.8出现的用来对集合进行操作的一种形式

4.1 获取流的方式

  1. 所有的Collection集合都可以通过stream()的方式获取流。
  2. Stream接口的静态方法可以获取相应的流。
1
2
3
4
5
6
7
8
scss复制代码       System.out.println("------------------------迭代流----------------------------");
Stream<Integer> stream2 = Stream.iterate(0, x -> x + 2);
stream2.limit(1).forEach(System.out::println);
System.out.println("------------------------生成流----------------------------");

//生成流
Stream<Integer> stream3 = Stream.generate(() -> new Random().nextInt(100));
stream3.limit(1).forEach(System.out::println);

4.2 流的操作

Stream流接口中定义了许多对于集合的操作方法,总的来说分为两大类:中间操作和结束操作。

  • 中间操作会返回另外一个流,通过这种方式将多个中间操作连接起来,形成一个调用链,从而转换为另外一个流,除非调用链后存在一个结束操作,否则中间操作流程不会对结果进行任何处理。
  • 结束操作会返回一个具体的结果。如boolean,integer,list等

4.2.1 中间操作【链式编程】

  • filter 过滤、limit 限制、skip 跳过、distinct 去重、sorted 排序
  • map 数据映射、paraller 获取并行流
1
2
3
4
5
6
7
8
9
10
11
12
13
scss复制代码List<String> list = new ArrayList<>();
list.add("lvsdfghsd");
list.add("lvsfsdgh");
list.add("lvsfsdgh");
list.add("556dfs");
list.add("lvsd9");

list.stream()
.sorted((o1,o2)->o1.length()-o2.length())
.distinct()
.skip(1)
.filter(e->e.startsWith("lv"))
.forEach(System.out::println);

4.2.2 结束操作

  • forEach 遍历、min 求最小、max 求最大、count 技术
  • reduce collect anyMatch 判断流中至少存在一个元素 allMatch

结语

创作不易,如果对您有帮助的话请点个赞哦~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

领导:谁再用定时任务实现关闭订单,立马滚蛋!

发表于 2021-07-21

在电商、支付等领域,往往会有这样的场景,用户下单后放弃支付了,那这笔订单会在指定的时间段后进行关闭操作,细心的你一定发现了像某宝、某东都有这样的逻辑,而且时间很准确,误差在1s内;那他们是怎么实现的呢?

一般的做法有如下几种

  • 定时任务关闭订单
  • rocketmq延迟队列
  • rabbitmq死信队列
  • 时间轮算法
  • redis过期监听

一、定时任务关闭订单(最low)

一般情况下,最不推荐的方式就是关单方式就是定时任务方式,原因我们可以看下面的图来说明

image.png

我们假设,关单时间为下单后10分钟,定时任务间隔也是10分钟;通过上图我们看出,如果在第1分钟下单,在第20分钟的时候才能被扫描到执行关单操作,这样误差达到10分钟,这在很多场景下是不可接受的,另外需要频繁扫描主订单号造成网络IO和磁盘IO的消耗,对实时交易造成一定的冲击,所以PASS

二、rocketmq延迟队列方式

延迟消息
生产者把消息发送到消息服务器后,并不希望被立即消费,而是等待指定时间后才可以被消费者消费,这类消息通常被称为延迟消息。
在RocketMQ开源版本中,支持延迟消息,但是不支持任意时间精度的延迟消息,只支持特定级别的延迟消息。
消息延迟级别分别为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18个级别。

发送延迟消息(生产者)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码/**
* 推送延迟消息
* @param topic
* @param body
* @param producerGroup
* @return boolean
*/
public boolean sendMessage(String topic, String body, String producerGroup)
{
try
{
Message recordMsg = new Message(topic, body.getBytes());
producer.setProducerGroup(producerGroup);

//设置消息延迟级别,我这里设置14,对应就是延时10分钟
// "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
recordMsg.setDelayTimeLevel(14);
// 发送消息到一个Broker
SendResult sendResult = producer.send(recordMsg);
// 通过sendResult返回消息是否成功送达
log.info("发送延迟消息结果:======sendResult:{}", sendResult);
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("发送时间:{}", format.format(new Date()));

return true;
}
catch (Exception e)
{
e.printStackTrace();
log.error("延迟消息队列推送消息异常:{},推送内容:{}", e.getMessage(), body);
}
return false;
}

消费延迟消息(消费者)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码/**
* 接收延迟消息
*
* @param topic
* @param consumerGroup
* @param messageHandler
*/
public void messageListener(String topic, String consumerGroup, MessageListenerConcurrently messageHandler)
{
ThreadPoolUtil.execute(() ->
{
try
{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setConsumerGroup(consumerGroup);
consumer.setVipChannelEnabled(false);
consumer.setNamesrvAddr(address);
//设置消费者拉取消息的策略,*表示消费该topic下的所有消息,也可以指定tag进行消息过滤
consumer.subscribe(topic, "*");
//消费者端启动消息监听,一旦生产者发送消息被监听到,就打印消息,和rabbitmq中的handlerDelivery类似
consumer.registerMessageListener(messageHandler);
consumer.start();
log.info("启动延迟消息队列监听成功:" + topic);
}
catch (MQClientException e)
{
log.error("启动延迟消息队列监听失败:{}", e.getErrorMessage());
System.exit(1);
}
});
}

实现监听类,处理具体逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码/**
* 延迟消息监听
*
*/
@Component
public class CourseOrderTimeoutListener implements ApplicationListener<ApplicationReadyEvent>
{

@Resource
private MQUtil mqUtil;

@Resource
private CourseOrderTimeoutHandler courseOrderTimeoutHandler;

@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent)
{
// 订单超时监听
mqUtil.messageListener(EnumTopic.ORDER_TIMEOUT, EnumGroup.ORDER_TIMEOUT_GROUP, courseOrderTimeoutHandler);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码/**
* 实现监听
*/
@Slf4j
@Component
public class CourseOrderTimeoutHandler implements MessageListenerConcurrently
{

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list)
{
// 得到消息体
String body = new String(msg.getBody());
JSONObject userJson = JSONObject.parseObject(body);
TCourseBuy courseBuyDetails = JSON.toJavaObject(userJson, TCourseBuy.class);

// 处理具体的业务逻辑,,,,,

DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("消费时间:{}", format.format(new Date()));

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}

这种方式相比定时任务好了很多,但是有一个致命的缺点,就是延迟等级只有18种(商业版本支持自定义时间),如果我们想把关闭订单时间设置在15分钟该如何处理呢?显然不够灵活。

三、rabbitmq死信队列的方式

Rabbitmq本身是没有延迟队列的,只能通过Rabbitmq本身队列的特性来实现,想要Rabbitmq实现延迟队列,需要使用Rabbitmq的死信交换机(Exchange)和消息的存活时间TTL(Time To Live)

死信交换机
一个消息在满足如下条件下,会进死信交换机,记住这里是交换机而不是队列,一个交换机可以对应很多队列。

一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
上面的消息的TTL到了,消息过期了。

队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
死信交换机就是普通的交换机,只是因为我们把过期的消息扔进去,所以叫死信交换机,并不是说死信交换机是某种特定的交换机

消息TTL(消息存活时间)
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取值较小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

1
2
3
4
java复制代码byte[] messageBodyBytes = "Hello, world!".getBytes();  
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);

可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:当上面的消息扔到队列中后,过了60秒,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去

处理流程图

image.png

创建交换机(Exchanges)和队列(Queues)

创建死信交换机

image.png

如图所示,就是创建一个普通的交换机,这里为了方便区分,把交换机的名字取为:delay

创建自动过期消息队列
这个队列的主要作用是让消息定时过期的,比如我们需要2小时候关闭订单,我们就需要把消息放进这个队列里面,把消息过期时间设置为2小时

image.png

创建一个一个名为delay_queue1的自动过期的队列,当然图片上面的参数并不会让消息自动过期,因为我们并没有设置x-message-ttl参数,如果整个队列的消息有消息都是相同的,可以设置,这里为了灵活,所以并没有设置,另外两个参数x-dead-letter-exchange代表消息过期后,消息要进入的交换机,这里配置的是delay,也就是死信交换机,x-dead-letter-routing-key是配置消息过期后,进入死信交换机的routing-key,跟发送消息的routing-key一个道理,根据这个key将消息放入不同的队列

创建消息处理队列
这个队列才是真正处理消息的队列,所有进入这个队列的消息都会被处理

image.png

消息队列的名字为delay_queue2
消息队列绑定到交换机
进入交换机详情页面,将创建的2个队列(delayqueue1和delayqueue2)绑定到交换机上面

image.png
自动过期消息队列的routing key 设置为delay
绑定delayqueue2

image.png

delayqueue2 的key要设置为创建自动过期的队列的x-dead-letter-routing-key参数,这样当消息过期的时候就可以自动把消息放入delay_queue2这个队列中了
绑定后的管理页面如下图:

image.png

当然这个绑定也可以使用代码来实现,只是为了直观表现,所以本文使用的管理平台来操作
发送消息

1
2
3
4
5
6
java复制代码String msg = "hello word";  
MessageProperties messageProperties = newMessageProperties();
messageProperties.setExpiration("6000");
messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());
Message message = newMessage(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("delay", "delay",message);

设置了让消息6秒后过期
注意:因为要让消息自动过期,所以一定不能设置delay_queue1的监听,不能让这个队列里面的消息被接受到,否则消息一旦被消费,就不存在过期了

接收消息
接收消息配置好delay_queue2的监听就好了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
java复制代码package wang.raye.rabbitmq.demo1;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
publicclassDelayQueue{
/** 消息交换机的名字*/
publicstaticfinalString EXCHANGE = "delay";
/** 队列key1*/
publicstaticfinalString ROUTINGKEY1 = "delay";
/** 队列key2*/
publicstaticfinalString ROUTINGKEY2 = "delay_key";
/**
* 配置链接信息
* @return
*/
@Bean
publicConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = newCachingConnectionFactory("120.76.237.8",5672);
connectionFactory.setUsername("kberp");
connectionFactory.setPassword("kberp");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); // 必须要设置
return connectionFactory;
}
/**
* 配置消息交换机
* 针对消费者配置
FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange :通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
publicDirectExchange defaultExchange() {
returnnewDirectExchange(EXCHANGE, true, false);
}
/**
* 配置消息队列2
* 针对消费者配置
* @return
*/
@Bean
publicQueue queue() {
returnnewQueue("delay_queue2", true); //队列持久
}
/**
* 将消息队列2与交换机绑定
* 针对消费者配置
* @return
*/
@Bean
@Autowired
publicBinding binding() {
returnBindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);
}
/**
* 接受消息的监听,这个监听会接受消息队列1的消息
* 针对消费者配置
* @return
*/
@Bean
@Autowired
publicSimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = newSimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
container.setMessageListener(newChannelAwareMessageListener() {
publicvoid onMessage(Message message, com.rabbitmq.client.Channel channel) throwsException{
byte[] body = message.getBody();
System.out.println("delay_queue2 收到消息 : "+ newString(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
}
});
return container;
}
}

这种方式可以自定义进入死信队列的时间;是不是很完美,但是有的小伙伴的情况是消息中间件就是rocketmq,公司也不可能会用商业版,怎么办?那就进入下一节

四、时间轮算法

image.png

(1)创建环形队列,例如可以创建一个包含3600个slot的环形队列(本质是个数组)

(2)任务集合,环上每一个slot是一个Set
同时,启动一个timer,这个timer每隔1s,在上述环形队列中移动一格,有一个Current Index指针来标识正在检测的slot。

Task结构中有两个很重要的属性:
(1)Cycle-Num:当Current Index第几圈扫描到这个Slot时,执行任务
(2)订单号,要关闭的订单号(也可以是其他信息,比如:是一个基于某个订单号的任务)

假设当前Current Index指向第0格,例如在3610秒之后,有一个订单需要关闭,只需:
(1)计算这个订单应该放在哪一个slot,当我们计算的时候现在指向1,3610秒之后,应该是第10格,所以这个Task应该放在第10个slot的Set中
(2)计算这个Task的Cycle-Num,由于环形队列是3600格(每秒移动一格,正好1小时),这个任务是3610秒后执行,所以应该绕3610/3600=1圈之后再执行,于是Cycle-Num=1

Current Index不停的移动,每秒移动到一个新slot,这个slot中对应的Set,每个Task看Cycle-Num是不是0:
(1)如果不是0,说明还需要多移动几圈,将Cycle-Num减1
(2)如果是0,说明马上要执行这个关单Task了,取出订单号执行关单(可以用单独的线程来执行Task),并把这个订单信息从Set中删除即可。
(1)无需再轮询全部订单,效率高
(2)一个订单,任务只执行一次
(3)时效性好,精确到秒(控制timer移动频率可以控制精度)

五、redis过期监听

1.修改redis.windows.conf配置文件中notify-keyspace-events的值
默认配置notify-keyspace-events的值为 “”
修改为 notify-keyspace-events Ex 这样便开启了过期事件

2. 创建配置类RedisListenerConfig(配置RedisMessageListenerContainer这个Bean)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码package com.zjt.shop.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;


@Configuration
public class RedisListenerConfig {

@Autowired
private RedisTemplate redisTemplate;

/**
* @return
*/
@Bean
public RedisTemplate redisTemplateInit() {

// key序列化
redisTemplate.setKeySerializer(new StringRedisSerializer());

//val实例化
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());

return redisTemplate;
}

@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}

}

3.继承KeyExpirationEventMessageListener创建redis过期事件的监听类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
java复制代码package com.zjt.shop.common.util;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.zjt.shop.modules.order.service.OrderInfoService;
import com.zjt.shop.modules.product.entity.OrderInfoEntity;
import com.zjt.shop.modules.product.mapper.OrderInfoMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}

@Autowired
private OrderInfoMapper orderInfoMapper;

/**
* 针对redis数据失效事件,进行数据处理
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String key = message.toString();
//从失效key中筛选代表订单失效的key
if (key != null && key.startsWith("order_")) {
//截取订单号,查询订单,如果是未支付状态则为-取消订单
String orderNo = key.substring(6);
QueryWrapper<OrderInfoEntity> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("order_no",orderNo);
OrderInfoEntity orderInfo = orderInfoMapper.selectOne(queryWrapper);
if (orderInfo != null) {
if (orderInfo.getOrderState() == 0) { //待支付
orderInfo.setOrderState(4); //已取消
orderInfoMapper.updateById(orderInfo);
log.info("订单号为【" + orderNo + "】超时未支付-自动修改为已取消状态");
}
}
}
} catch (Exception e) {
e.printStackTrace();
log.error("【修改支付订单过期状态异常】:" + e.getMessage());
}
}
}

4:测试
通过redis客户端存一个有效时间为3s的订单:

image.png

结果:

image.png

总结:
以上方法只是个人对于关单的一些想法,可能有些地方有疏漏,请在公众号直接留言进行指出,当然如果你有更好的关单方式也可以随时沟通交流

更多精彩内容,请关注我的公众号「程序员阿牛」

我的个人博客站点:www.kuya123.com

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一份给算法新人们的「动态规划」讲解

发表于 2021-07-21

「本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!」

前言

公众号目前与「动态规划」相关系列包括:已经完结的「动态规划-路径问题」和正在更新「动态规划-背包问题」。

这都是默认大家有一定的「动态规划」认识的系列文章。

但事实上,可能有不少同学是刚接触算法,还处于只会使用「朴素/暴力解法」来解决算法问题的阶段,对于「动态规划」并不了解。

因此我特意翻出来大概是我六七年写的文章(当时更多作为学习笔记),来帮助大家对「动态规划」有个基本认识。

需要说明的是,本文只停留在对「动态规划」的感性认识,并没有深入到动态规划与图论的本质关系。

因此更多的面向是算法入门读者。

以下是正文内容。

演变过程

一直搞不懂「动态规划」和「记忆化搜索」之间的区别。

总觉得动态规划只是单纯的难在于对“状态”的抽象定义和“状态转移方程”的推导,并无具体的规律可循。

但从算法逐步优化角度而言,动态规划更多是从如下方式进行演化:

暴力递归 -> 记忆化搜索 -> 动态规划。

甚至可以说几乎所有的「动态规划」都可以通过「暴力递归」转换而来,前提是该问题是一个“无后效性”问题。

从对“个例”的朴素枚举做法,演变为对“集合”的枚举做法。

无后效性

所谓的“无后效性”是指:当某阶段的状态一旦确定,此后的决策过程和最终结果将不受此前的各种状态所影响。

可简单理解为当编写好一个递归函数之后,当可变参数确定之后,结果是唯一确定的。

可能你还是对什么是“无后效性”问题感到难以理解。没关系,我们再举一个更具象的例子,这是 LeetCode 62. Unique Paths :给定一个 m∗nm * nm∗n 的矩阵,从左上角作为起点,到达右下角共有多少条路径(机器人只能往右或者往下进行移动)。

这是一道经典的「动态规划」入门题目,也是一个经典的“无后效性”问题。

它的“无后效性”体现在:当给定了某个状态(一个具体的 m∗nm * nm∗n 的矩阵和某个起点,如 (1,2)),那么从这个点到达右下角的路径数量就是完全确定的。

而与如何到达这个“状态”无关,与机器人是经过点 (0,2) 到达的 (1,2),还是经过 (1,1) 到达的 (1,2) 无关。

这就是所谓的“无后效性”问题。

当我们尝试使用「动态规划」解决问题的时候,首先要关注该问题是否为一个“无后效性”问题。

1:暴力递归

经常我们面对一个问题,即使我们明确知道了它是一个“无后效性”问题,它可以通过「动态规划」来解决。我们还是觉得难以入手。

这时候我的建议是,先写一个「暴力递归」的版本。

还是以刚刚说到的 LeetCode 62. Unique Paths 举例:

1
2
3
4
5
6
7
8
9
10
java复制代码class Solution {
public int uniquePaths(int m, int n) {
return recursive(m, n, 0, 0);
}

private int recursive(int m, int n, int i, int j) {
if (i == m - 1 || j == n - 1) return 1;
return recursive(m, n, i + 1, j) + recursive(m, n, i, j + 1);
}
}

当我还不知道如何使用「动态规划」求解时,我会设计一个递归函数 recursiverecursiverecursive 。

函数传入矩阵信息和机器人当前所在的位置,返回在这个矩阵里,从机器人所在的位置出发,到达右下角有多少条路径。

有了这个递归函数之后,那问题其实就是求解 recursive(m,n,0,0)recursive(m, n, 0, 0)recursive(m,n,0,0):求解从 (0,0) 到右下角的路径数量。

接下来,实现这个函数:

  1. Base case: 由于题目明确了机器人只能往下或者往右两个方向走,所以可以定下来递归方法的 base case 是当已经处于矩阵的最后一行或者最后一列,即只一条路可以走。
  2. 其余情况:机器人既可以往右走也可以往下走,所以对于某一个位置来说,到达右下角的路径数量等于它右边位置到达右下角的路径数量 + 它下方位置到达右下角的路径数量。即 recursive(m,n,i+1,j) + recursive(m,n,i,j+1),这两个位置都可以通过递归函数进行求解。

其实到这里,我们已经求解了这个问题了。

但这种做法还有个严重的“性能”问题。

2:记忆化搜索

如果将我们上述的代码提交到 LeetCode,会得到 timeout 的结果。

可见「暴力递归」的解决方案“很慢”。

我们知道所有递归函数的本质都是“压栈”和“弹栈”。

既然这个过程很慢,我们可以通过将递归版本暴力解法的改为非递归的暴力解法,来解决 timeout 的问题吗?

答案是不行,因为导致 timeout 的原因不在于使用“递归”手段所带来的成本。

而在于在计算过程,我们进行了多次的重复计算。

我们尝试展开递归过程第几步来看看:

不难发现,在递归展开过程会遇到很多的重复计算。

随着我们整个递归过程的展开,重复计算的次数会呈倍数增长。

这才是「暴力递归」解决方案“慢”的原因。

既然是重复计算导致的 timeout,我们自然会想到将计算结果进行“缓存”的方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码class Solution {
private int[][] cache;

public int uniquePaths(int m, int n) {
cache = new int[m][n];
for (int i = 0; i < m; i++) {
int[] ints = new int[n];
Arrays.fill(ints, -1);
cache[i] = ints;
}
return recursive(m, n, 0, 0);
}

private int recursive(int m, int n, int i, int j) {
if (i == m - 1 || j == n - 1) return 1;
if (cache[i][j] == -1) {
if (cache[i + 1][j] == -1) {
cache[i + 1][j] = recursive(m, n, i + 1, j);
}
if (cache[i][j + 1] == -1) {
cache[i][j + 1] = recursive(m, n, i, j + 1);
}
cache[i][j] = cache[i + 1][j] + cache[i][j + 1];
}
return cache[i][j];
}
}

对「暴力递归」过程中的中间结果进行缓存,确保相同的情况只会被计算一次的做法,称为「记忆化搜索」。

做了这样的改进之后,提交 LeetCode 已经能 AC 并得到一个不错的评级了。

我们再细想一下就会发现,其实整个求解过程,对于每个情况(每个点)的访问次数并没有发生改变。

只是从「以前的每次访问都进行求解」改进为「只有第一次访问才真正求解」。

事实上,我们通过查看 recursive() 方法就可以发现:

当我们求解某一个点 (i,j)(i, j)(i,j) 的答案时,其实是依赖于 (i,j+1)(i, j + 1)(i,j+1) 和 (i+1,j)(i + 1, j)(i+1,j)。

也就是每求解一个点的答案,都需要访问两个点的结果。

这种情况是由于我们采用的是“自顶向下”的解决思路所导致的。

我们无法直观确定哪个点的结果会在什么时候被访问,被访问多少次。

所以我们不得不使用一个与矩阵相同大小的数组,将所有中间结果“缓存”起来。

换句话说,「记忆化搜索」解决的是重复计算的问题,并没有解决结果访问时机和访问次数的不确定问题。

2.1:次优解版本的「记忆化搜索」

关于「记忆化搜索」最后再说一下。

网上有不少博客和资料在编写「记忆化搜索」解决方案时,会编写类似如下的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码class Solution {
private int[][] cache;

public int uniquePaths(int m, int n) {
cache = new int[m][n];
for (int i = 0; i < m; i++) {
int[] ints = new int[n];
Arrays.fill(ints, -1);
cache[i] = ints;
}
return recursive(m, n, 0, 0);
}

private int recursive(int m, int n, int i, int j) {
if (i == m - 1 || j == n - 1) return 1;
if (cache[i][j] == -1) {
cache[i][j] = recursive(m, n, i + 1, j) + recursive(m, n, i, j + 1);
}
return cache[i][j];
}
}

可以和我上面提供的解决方案作对比。主要区别在于 if (cache[i][j] == -1) 的判断里面。

在我提供解决方案中,会在计算 cache[i][j] 时,尝试从“缓存”中读取 cache[i+1][j] 和 cache[i][j+1],确保每次调用 recursive() 都是必须的,不重复的。

网上大多数的解决方案只会在外层读取“缓存”,在真正计算 cache[i][j] 的时候并不采取先检查再调用的方式,直接调用 recursive() 计算子问题 。

虽然两者相比与直接的「暴力递归」都大大减少了计算次数(recursive() 的访问次数),但后者的计算次数显然要比前者高上不少。

你可能会觉得反正都是“自顶向下”,两者应该没有区别吧?

为此我提供了以下实验代码来比较它们对 recursive() 的调用次数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
java复制代码class Solution {

public static void main(String[] args) {
Solution solution = new Solution();
solution.uniquePaths(15, 15);
}

private int[][] cache;
private long count; // 统计 递归函数 的调用次数

public int uniquePaths(int m, int n) {
cache = new int[m][n];
for (int i = 0; i < m; i++) {
int[] ints = new int[n];
Arrays.fill(ints, -1);
cache[i] = ints;
}
// int result = recursive(m, n, 0, 0); // count = 80233199
// int result = cacheRecursive(m, n, 0, 0); // count = 393
int result = fullCacheRecursive(m, n, 0, 0); // count = 224
System.out.println(count);
return result;
}

// 完全缓存
private int fullCacheRecursive(int m, int n, int i, int j) {
count++;
if (i == m - 1 || j == n - 1) return 1;
if (cache[i][j] == -1) {
if (cache[i + 1][j] == -1) {
cache[i + 1][j] = fullCacheRecursive(m, n, i + 1, j);
}
if (cache[i][j + 1] == -1) {
cache[i][j + 1] = fullCacheRecursive(m, n, i, j + 1);
}
cache[i][j] = cache[i + 1][j] + cache[i][j + 1];
}
return cache[i][j];
}

// 只有外层缓存
private int cacheRecursive(int m, int n, int i, int j) {
count++;
if (i == m - 1 || j == n - 1) return 1;
if (cache[i][j] == -1) {
cache[i][j] = cacheRecursive(m, n, i + 1, j) + cacheRecursive(m, n, i, j + 1);
}
return cache[i][j];
}

// 不使用缓存
private int recursive(int m, int n, int i, int j) {
count++;
if (i == m - 1 || j == n - 1) return 1;
return recursive(m, n, i + 1, j) + recursive(m, n, i, j + 1);
}
}

因为我们使用 cache 数组的目的是减少 recursive() 函数的调用。

只有确保在每次调用 recursive() 之前先去 cache 数组检查,我们才可以将对 recursive() 函数的调用次数减到最少。

在数据为 15 的样本下,这是 O(393n)O(393n)O(393n) 和 O(224n)O(224n)O(224n) 的区别,但对于一些卡常数特别严重的 OJ,尤其重要。

所以我建议你在「记忆化搜索」的解决方案时,采取与我一样的策略:

确保在每次访问递归函数时先去“缓存”检查。尽管这有点“不美观”,但它能发挥「记忆化搜索」的最大作用。

3:从「自顶向下」到「自底向上」

你可能会想,为什么我们需要改进「记忆化搜索」,为什么需要明确中间结果的访问时机和访问次数?

因为一旦我们能明确中间结果的访问时机和访问次数,将为我们的算法带来巨大的提升空间。

前面说到,因为我们无法确定中间结果的访问时机和访问次数,所以我们不得不“缓存”全部中间结果。

但如果我们能明确中间结果的访问时机和访问次数,至少我们可以大大降低算法的空间复杂度。

这就涉及解决思路的转换:从「自顶向下」到「自底向上」 。

如何实现从「自顶向下」到「自底向上」的转变,还是通过具体的例子来理解。

这是 LeetCode 509. Fibonacci Number,著名的“斐波那契数列”问题。

如果不了解什么是“斐波那契数列”,可以查看对应的 维基百科。

由于斐波那契公式为:

f(n)={1n=1,2f(n−1)+f(n−2)n>2f(n) =\begin{cases}
1 & n = 1, 2 \
f(n - 1) + f(n - 2) & n > 2
\end{cases}f(n)={1f(n−1)+f(n−2)​n=1,2n>2​
天然适合使用递归:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public class Solution {
private int[] cache;
public int fib(int n) {
cache = new int[n + 1];
return recursive(n);
}

private int recursive(int n) {
if (n <= 1) return n;
if (n == 2) return 1;
if (cache[n] == 0) {
if (cache[n - 1] == 0) {
cache[n - 1] = recursive(n - 1);
}
if (cache[n - 2] == 0) {
cache[n - 2] = recursive(n - 2);
}
cache[n] = cache[n - 1] + cache[n - 2];
}
return cache[n];
}
}

但这仍然会有我们之前所说的问题,这些问题都是因为直接递归是“自顶向下”所导致的。

这样的解法的时空复杂度为 O(n)O(n)O(n) :每个值计算一次,使用了长度为 n+1n + 1n+1 的数组。

通过观察斐波那契公式,我们可以发现要计算某个 nnn ,只需要知道 n−1n - 1n−1 的解和 n−2n - 2n−2 的解。

同时 n=1n = 1n=1 和 n=2n = 2n=2 的解又是已知的(base case)。

所以我们大可以从 n=3n = 3n=3 出发,逐步往后迭代得出 nnn 的解。

由于计算某个值的解,只依赖该值的前一位的解和前两位的解,所以我们只需要使用几个变量缓存最近的中间结果即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码class Solution {
public int fib(int n) {
if (n <= 1) return n;
if (n == 2) return 1;

int prev1 = 1, prev2 = 1;
int cur = prev1 + prev2;
for (int i = 3; i <= n; i++) {
cur = prev1 + prev2;
prev2 = prev1;
prev1 = cur;
}
return cur;
}
}

这样我们就把原本空间复杂度为 O(n)O(n)O(n) 的算法降低为 O(1)O(1)O(1) :只是用了几个有限的变量。

但不是所有的「动态规划」都像“斐波那契数列”那么简单就能实现从“自顶向下”到“自底向上”的转变。

当然也不是毫无规律可循,尤其是我们已经写出了「暴力递归」的解决方案。

让我们再次回到 LeetCode 62. Unique Paths 当中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码class Solution {
public int uniquePaths(int m, int n) {
// 由于我们的「暴力递归」函数,真正的可变参数就是 i 和 j( 变化范围分别是 [0,m-1] 和 [0, n-1] )
// 所以建议一个二维的 dp 数组进行结果存储(相当于建一个表格)
int[][] dp = new int[m][n];

// 根据「暴力递归」函数中的 base case
// 我们可以直接得出 dp 中最后一行和最后一列的值(将表格的最后一行和最后一列填上)
for (int i = 0; i < n; i++) dp[m - 1][i] = 1
for (int i = 0; i < m; i++) dp[i][n - 1] = 1;

// 根据「暴力递归」函数中对其他情况的处理逻辑(依赖关系)编写循环
//(根据表格的最后一行和最后一列的值,得出表格的其他格子的值)
for (int i = m - 2; i >= 0; i--) {
for (int j = n - 2; j >= 0; j--) {
dp[i][j] = dp[i + 1][j] + dp[i][j + 1];
}
}

// 最终我们要的是 dp[0][0](表格中左上角的位置,也就起点的值)
return dp[0][0];

// 原「暴力递归」调用
// return recursive(m, n, 0, 0);
}

private int recursive(int m, int n, int i, int j) {
// base case
if (i == m - 1 || j == n - 1) return 1;
// 其余情况
return recursive(m, n, i + 1, j) + recursive(m, n, i, j + 1);
}
}

不难发现,我们甚至可以直接根据「暴力递归」来写出「动态规划」,而不需要关心原问题是什么。

简单的「动态规划」其实就是一个“打表格”的过程:

先根据 base case 定下来表格中的一些位置的值,再根据已得出值的位置去推算其他格子的信息。

推算所用到的依赖关系,也就是我们「暴力递归」中的“其余情况”处理逻辑。

动态规划的本质

动态规划的本质其实仍然是枚举:枚举所有的方案,并从中找出最优解。

但和「暴力递归」不同的是,「动态规划」少了很多的重复计算。

因为所依赖的这些历史结果,都被存起来了,因此节省了大量重复计算。

从这一点来说,「动态规划」和「记忆化搜索」都是类似的。

要把历史结果存起来,必然要使用数据结构,在 dp 中我们通常使用一维数组或者二维数据来存储,假设是 dp[]。

那么对应解 dp 问题我们有以下过程

  1. 状态定义 : 确定 dp[] 中元素的含义,也就是说需要明确 dp[i] 是代表什么内容
  2. 状态转移 :确定 dp[] 元素之间的关系,dp[i] 这个格子是由哪些 dp 格子推算而来的。如斐波那契数列中就有 dp[i] = dp[i - 1] + dp[i - 2]
  3. 起始值 :base case,dp[] 中的哪些格子是可以直接得出结果的。如斐波那契数列中就有 dp[0] = 0 和 dp[1] = 1

消除“后效性”

我们知道使用「动态规划」的前提是问题的“无后效性” 。

但是有些时候问题的“无后效性” 并不容易体现。

需要我们多引入一维来进行“消除”。

例如 LeetCode 上经典的「股票问题」,使用动态规划求解时往往需要多引入一维表示状态,有时候甚至需要再引入一维代表购买次数。

注意这里说的消除是带引号的,其实这样的做法更多的是作为一种“技巧”,它并没有真正改变问题“后效性” ,只是让问题看上去变得简单的。

总结

到这里我们已经可以简单回答「动态规划」和「记忆化搜索」的区别是什么了。

「记忆化搜索」本质是带“缓存”功能的「暴力递归」:

它只能解决重复计算的问题,而不能确定中间结果的访问时机和访问次数,本质是一种“自顶向下”的解决方式;

「动态规划」是一种“自底向上”的解决方案 :

能明确访问时机和访问次数,这为降低算法的空间复杂度带来巨大空间,我们可以根据依赖关系来决定保留哪些中间结果,而无须将全部中间结果进行“缓存”。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Go-Micro微服务框架使用

发表于 2021-07-21

本文暂不介绍GRPC,只介绍通过HTTP方式进行服务间的API调用。且所有示例均采用Go Mod方式管理依赖包。

1.Go-Micro微服务框架简介

Go-Micro类似Java中的SpringCloud,虽然生态上有一些区别,但是可以类比比较。官当Github地址为 github.com/micro/go-mi…,下载安装依赖命令:

1
go复制代码go get -u github.com/micro/go-micro

2.创建Web服务

Golang本身提供了丰富的http包,并且Gin等Web框架实现一个Web服务,但是为了贴合Go-Micro框架的统一规范,所以需要通过使用Go-Micro的github.com/micro/go-micro/web包来创建一个Web服务。

第一种方式:

直接通过web包创建一个服务,参数可选,也比较简单易懂,请自行Ctrl+左键查看源码。创建服务后可以直接像原生的http包一样使用。

1
2
3
4
5
6
7
8
9
10
go复制代码func main() {
service := web.NewService(
web.Name("cas"),
web.Address(":8001"),
)
service.HandleFunc("/hello", func(writer http.ResponseWriter, request *http.Request) {
// handler
})
service.Run()
}

第二种方式:
原生的http包在很多情况下处理并不是非常高效,只是提供了一些基础功能,所以Go-Micro提供集成第三方Web框架如:Gin。首先下载安装:

1
go复制代码go get -u "github.com/gin-gonic/gin"

Go-Micro整合Gin示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
go复制代码func main() {
engine := gin.Default()
engine.GET("/hello", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"msg": "hello,world",
})
})
service := web.NewService(
web.Name("cas"),
web.Address(":8001"),
web.Handler(engine),
)
service.Run()
}

除了上述的方式直接指定Web服务的相关参数,我们也可以通过命令行参数启动Web服务,这样我们可以在命令行通过指定不同的服务名和端口,快速的多开同一个服务。虽然实际开发部署环境很少用这种方式,但在我们自己调试服务注册与发现时,会非常有用。
 
示例如下,只需要调用Init()方法即可,可自行查看源码,源码中可查看有关的命令函参数名:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
> go复制代码func main() {
> engine := gin.Default()
> engine.GET("/hello", func(c *gin.Context) {
> c.JSON(http.StatusOK, gin.H{
> "msg": "hello,world",
> })
> })
> service := web.NewService(
> //web.Name("cas"),
> //web.Address(":80"),
> web.Handler(engine),
> )
> service.Init()
> service.Run()
> }
>
>

启动命令:

1
2
3
> go复制代码go run main.go -server_name cas -server_address :8001
>
>

3.服务注册

3.1 Consul

以Consul作为注册中心,官方下载地址:www.consul.io/downloads.h…。

Windows环境安装:

  1. 直接下载后将consul.exe解压出来;
  2. 将其所解压到的目录文件名添加到path环境变量;
  3. 在 cmd 命令窗口中执行:consul agent -dev命令即可启动Consul服务
  4. Consul 自带 UI 管理界面,访问地址:http://localhost:8500 ,可以看到当前注册的服务:
    在这里插入图片描述

Linux环境安装:

  1. 直接通过官网Linux版本Download处对应的地址下载:wget https://releases.hashicorp.com/consul/1.9.0/consul_1.9.0_linux_amd64.zip;
  2. 解压:unzip consul_1.9.0_linux_amd64.zip;
  3. 修改权限:chmod 777 consul;
  4. 整理一下,将consul文件夹复制到/usr/local/bin/目录下:cp consul /usr/local/bin/;
  5. 输入consul命令查看安装是否成功:consul version;
  6. 启动consul,并通过-client配置外网主机可访问:consul agent -dev -client=0.0.0.0;

当然其实最方便的就是直接通过docker拉个镜像,直接映射端口跑起来就可以了~

服务注册到Consul:

在安装好Consul并启动好服务后,下载Go-Micro提供的插件包go-plugins:

1
go复制代码go get -u github.com/micro/go-plugins

该包包含了Consul和其他的众多插件(如:eureka等),基本的使用方法是使用consul.NewRegistry()接口创建注册中心对象,然后将其集成到Go-Micro提供的Web服务配置中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
go复制代码package main

import (
"github.com/gin-gonic/gin"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/web"
"github.com/micro/go-plugins/registry/consul"
"net/http"
)

func main() {
consulReg := consul.NewRegistry(registry.Addrs(":8500"))
engine := gin.Default()
engine.GET("/hello", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"msg": "hello,world",
})
})
service := web.NewService(
web.Name("cas"),
web.Address(":8001"),
web.Registry(consulReg),
web.Handler(engine),
)
service.Init()
service.Run()
}

可以在Consul管理UI看到服务已经注册成功~
在这里插入图片描述

3.2 etcd

4 服务发现

从注册中心拿服务的方法也比较简单,简单几步即可拿到服务信息:

  1. 同服务注册一样,首先创建Consul注册中心对象:consul.NewRegistry();
  2. 通过Consul对象调用GetService("cas")方法拿到服务节点切片,参数为注册的服务名,所以在上文注册服务时指定的服务名很关键;
  3. 通过Go-Micro提供的selector包拿到具体的服务节点,该包提供2种常见的负载均衡算法,RoundRobin(轮询)和Random(随机)。对应方法返回的是一个Next方法,Next方法调用后才返回具体的服务节点,可以自行查看源码;
  4. 拿到具体的服务节点结构体对象后,则可以通过服务的IP等信息,发起HTTP的API调用。

示例代码和运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
go复制代码package main

import (
"github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/registry"
"github.com/micro/go-plugins/registry/consul"
"log"
)

func main() {
consulReg := consul.NewRegistry(registry.Addrs(":8500"))
service, err := consulReg.GetService("cas")
if err != nil {
log.Fatal("get service from consul err :",err)
}
node, err := selector.RoundRobin(service)()
if err != nil {
log.Fatal("get service node err :",err)
}
log.Printf("service node : %+v", node)
}
1
2
go复制代码//运行打印的log内容
2020-12-09 10:50:46.501937 I | service node : &{Id:ec20b3c9-7531-4921-89bc-73ee7e233525 Address:192.168.152.1:8002 Metadata:map[]}

如果启动时发生依赖报错,请参考我的另一篇博文对该问题的解决:Go-Micro启动依赖报错解决方法 。如要查看轮询或随机效果,可以自己写一个for循环间隔获取节点,查看节点信息的变化情况~

5.服务调用

5.1 HTTP基本调用方式

最基础的就是通过Golang官方提供的http包发起HTTP请求。直接在上文服务发现示例代码的基础上,在拿到服务节点信息后,发情API请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
go复制代码package main

import (
"fmt"
"github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/registry"
"github.com/micro/go-plugins/registry/consul"
"io/ioutil"
"log"
"net/http"
"strings"
)

func main() {
consulReg := consul.NewRegistry(registry.Addrs(":8500"))
// 服务发现
service, err := consulReg.GetService("cas")
if err != nil {
log.Fatalf("get service from consul err : %+v",err)
}
node, err := selector.RoundRobin(service)()
if err != nil {
log.Fatalf("get service node err : %+v",err)
}
log.Printf("service node : %+v", node)
// 服务调用
url := fmt.Sprintf("http://%s/hello", node.Address)
reqBody := strings.NewReader("")
request, err := http.NewRequest(http.MethodGet, url, reqBody)
if err != nil {
log.Fatalf("create request err : %+v",err)
}
response, err := http.DefaultClient.Do(request)
if err != nil {
log.Fatalf("send request err : %+v",err)
}
defer response.Body.Close()
rspBody, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Fatalf("read response body err : %+v",err)
}
log.Printf("rsp body : %+v", string(rspBody))
}

//运行结果:
//2020-12-09 11:45:16.422562 I | service node : &{Id:bda22dae-9bc2-46db-b325-bcc9ab2b6778 Address:192.168.152.1:8001 Metadata:map[]}
//2020-12-09 11:45:16.436525 I | rsp body : {"msg":"hello,world"}

5.1 Go-Micro中的HTTP调用方式(推荐)

这种方式也是使用http包,但和上述Golang官方提供的不同,这种方式使用的是Go-Micro的插件包go-plugins,具体为:"github.com/micro/go-plugins/client/http"包,go-plugins包除了有http client的基本功能,还支持Selector参数,自动选取服务,并支持json、protobuf等数据格式。

上述的基础调用方式可以明显发现调用过程其实还是比较繁琐的,而这种方法流程上简化了很多,插件对一些步骤进行了封装和自动化处理,步骤主要为将注册中心放到Selector选择器中,然后基于选择器创建httpClient,通过该客户端来发起请求,调用服务。需要注意的是,插件默认认为服务调用都是通过POST请求的方式,所以指定的接口一定要为POST:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
go复制代码package main

import (
"context"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/registry"
"github.com/micro/go-plugins/client/http"
"github.com/micro/go-plugins/registry/consul"
"log"
)

func main() {
consulReg := consul.NewRegistry(registry.Addrs(":8500"))
selector := selector.NewSelector(
selector.Registry(consulReg),
selector.SetStrategy(selector.RoundRobin),
)
httpClient := http.NewClient(
// 选择器
client.Selector(selector),
// 响应格式默认格式protobuf,设置为json
client.ContentType("application/json"),
)
req := map[string]string{}
request := httpClient.NewRequest("cas", "/hello", req)
rsp := map[string]interface{}{}
err := httpClient.Call(context.Background(), request, &rsp)
if err != nil {
log.Fatalf("request err: %+v", err)
}
log.Printf("%+v",rsp)
}

注意:如果出现运行报错:{"id":"go.micro.client","code":500,"detail":"none available","status":"Internal Server Error"} ,可以查看我的另一篇博文:Go-Micro客户端请求报500错误的解决方法,对该问题有详细的分析和解决。

其他需要注意的是服务调用这边只支持json形式传参,所以如Gin中请使用Bind()的方式获取请求参数:

1
2
3
4
5
6
7
8
9
10
11
go复制代码engine := gin.Default()
engine.POST("/hello", func(c *gin.Context) {
req := struct {
Name string `json:"name"`
}{}
c.BindJSON(&req)
c.JSON(http.StatusOK, gin.H{
"msg": "hello,world",
"name": req.Name,
})
})

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Nginx实战-配置SSL证书(CentOS环境),实现ht

发表于 2021-07-21

一、证书申请

  1. 申请SSL证书,申请之后会有两个文件提供下载(注意下载nginx版本),阿里云有免费的SSL证书申请
    • xxx.key
    • xxx.pem
  2. nginx安装版本使用的是1.16.1

二、配置SSL

2.1 证书上传

  1. 在nginx的安装目录下创建cert(别的名字也可以)
  2. 将下载的SSL证书文件上传到cert下

image.png

2.2 Server配置

  1. 进入到nginx下的conf文件夹下打开nginx.conf文件
  2. 取消https server的注释
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
nginx复制代码# HTTPS server
server {
listen 443 ssl;
server_name localhost;

ssl_certificate cert.pem;
ssl_certificate_key cert.key;

ssl_session_cache shared:SSL:1m;
ssl_session_timeout 5m;

ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;

location / {
root html;
index index.html index.htm;
}
}
  1. 需要配置一下说明的内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
nginx复制代码# HTTPS server
server {
# 注意这里就是443 ssl, 不要把ssl删除了
listen 443 ssl;
# 把localhost替换为SSL绑定的域名, 如www.codecoord.com
# server_name localhost;
server_name www.codecoord.com;

# 添加默认主目录和首页, 根据自己的路径修改
root /opt/nginx/html;
index index.html;

# cert.pem和cert.key替换为上传文件的路径(最好使用完整路径)
# ssl_certificate cert.pem;
# ssl_certificate_key cert.key;
ssl_certificate /opt/nginx/cert/cert.pem;
ssl_certificate_key /opt/nginx/cert/cert.key;

# 下面的不用动
ssl_session_cache shared:SSL:1m;
ssl_session_timeout 5m;

ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;

location / {
root html;
index index.html index.htm;
}
}
  1. 注意443端口需要在开启外网访问(比如阿里云服务器需要在控制台配置安全组, 不过默认是打开的)

2.2.3 配置转发

  1. 这一步是配置对外访问端口和将http请求强制转为https
  2. 删除多余配置,只需要留下以下配置
1
2
3
4
5
6
7
8
9
nginx复制代码server {
# 监听端口
listen 80;
# 改为自己的域名
server_name www.codecoord.com;
# 将http请求强制转为https
# rewrite:重写指令,$host$:请求地址,$1:请求参数,permanent:永久访问
rewrite ^(.*)$ https://$host$1 permanent;
}
  1. 上述两步配置完成后测试一下是否配置正确,在sbin目录下运行测试命令
    • ./nginx -t
1
2
3
4
nginx复制代码# 配置成功信息
[root@TianXin sbin]# ./nginx -t
nginx: the configuration file /opt/Nginx/conf/nginx.conf syntax is ok
nginx: configuration file /opt/Nginx/conf/nginx.conf test is successful
  1. 如果测试成功则重启nginx,使配置生效
1
nginx复制代码[root@TianXin sbin]# ./nginx -s reload
  1. 完整配置参考第四点配置示例
  2. 配置完成后访问域名,即可显示https信息

image.png

三、配置问题

3.1 ngx_http_ssl_module

  1. 注意如果是nginx 1.16.1之前版本, 配置内容会有有所变化,请参考别的版本配置
  2. 如果运行./nginx -t时出现以下错误,标识nginx没有安装SSL模块
1
2
3
nginx复制代码[root@tianxin conf]# nginx -t
nginx: [emerg] the "ssl" parameter requires ngx_http_ssl_module in /opt/nginx/conf/nginx.conf:112
nginx: configuration file /opt/nginx/conf/nginx.conf test failed
  1. 解决方法是重新配置nginx,重新编译带上–with-http_stub_status_module –with-http_ssl_module
  2. 可以重新安装nginx(建议, 可以避免很多问题)也可以不用重新安装, 不用重新安装只需要执行下面的两个命令即可
1
2
3
4
5
6
nginx复制代码# 清除编译文件
make clean
# 配置
./configure --prefix=/opt/nginx --with-http_stub_status_module --with-http_ssl_module
# 编译
make
  1. 不要执行make install 否则会覆盖原来的文件
  2. 关闭nginx
    • nginx -s stop
  3. 拷贝目录下的objs/nginx替换之前的nginx启动文件
    • cp objs/nginx /opt/nginx/sbin/
  4. 最后启动nginx即可

3.2 ERR_SSL_PROTOCOL_ERROR

  1. 此问题在该版本中出现是因为listen配置的时候把443 后面的ssl删除了导致这个错误
1
2
3
4
5
nginx复制代码server {
# 注意这里就是443 ssl, 不要把ssl删除了,之前的版本
listen 443 ssl;
...
}
  1. 解决方法就是不要把443后面的ssl漏了,注意中间有空格

四、配置示例

4.1 SSL完整配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
nginx复制代码#user  nobody;
worker_processes 1;

#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;

#pid logs/nginx.pid;

events {
worker_connections 1024;
}

http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;

server {
listen 80;
server_name www.codecoord.com codecoord.com;
rewrite ^(.*)$ https://$host$1 permanent;
}

# https
server {
# 注意这里就是443 ssl, 不要把ssl删除
listen 443 ssl;
# 替换为SSL绑定的域名, 如www.codecoord.com
server_name www.codecoord.com;
# 添加默认主目录和首页, 根据自己的路径修改
root /opt/nginx/html;
index index.html;

# cert.pem和cert.key替换为上传文件的路径
ssl_certificate /opt/nginx/cert/www.codecoord.com.pem;
ssl_certificate_key /opt/nginx/cert/www.codecoord.com.key;

# 下面的不用动
ssl_session_cache shared:SSL:1m;
ssl_session_timeout 5m;

ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;

location / {
root html;
index index.html index.htm;
try_files $uri $uri/ /index.html; # 解决vue页面刷新404问题
}
}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

分布式定时任务框架选型,写得真好!

发表于 2021-07-21

我们先思考下面几个业务场景的解决方案:

  • 支付系统每天凌晨1点跑批,进行一天清算,每月1号进行上个月清算
  • 电商整点抢购,商品价格8点整开始优惠
  • 12306购票系统,超过30分钟没有成功支付订单的,进行回收处理
  • 商品成功发货后,需要向客户发送短信提醒

类似的业务场景非常多,我们怎么解决?

为什么我们需要定时任务

很多业务场景需要我们某一特定的时刻去做某件任务,定时任务解决的就是这种业务场景。一般来说,系统可以使用消息传递代替部分定时任务,两者有很多相似之处,可以相互替换场景。如,上面发货成功发短信通知客户的业务场景,我们可以在发货成功后发送MQ消息到队列,然后去消费mq消息,发送短信。但在某些场景下不能互换:

a)时间驱动/事件驱动:内部系统一般可以通过时间来驱动,但涉及到外部系统,则只能使用时间驱动。如怕取外部网站价格,每小时爬一次

b)批量处理/逐条处理:批量处理堆积的数据更加高效,在不需要实时性的情况下比消息中间件更有优势。而且有的业务逻辑只能批量处理。如移动每个月结算我们的话费

c)实时性/非实时性:消息中间件能够做到实时处理数据,但是有些情况下并不需要实时,比如:vip升级

d)系统内部/系统解耦:定时任务调度一般是在系统内部,而消息中间件可用于两个系统间

java有哪些定时任务的框架

单机

  • timer:是一个定时器类,通过该类可以为指定的定时任务进行配置。TimerTask类是一个定时任务类,该类实现了Runnable接口,缺点异常未检查会中止线程
  • ScheduledExecutorService:相对延迟或者周期作为定时任务调度,缺点没有绝对的日期或者时间
  • spring定时框架:配置简单功能较多,如果系统使用单机的话可以优先考虑spring定时器

分布

  • Quartz:Java事实上的定时任务标准。但Quartz关注点在于定时任务而非数据,并无一套根据数据处理而定制化的流程。虽然Quartz可以基于数据库实现作业的高可用,但缺少分布式并行调度的功能
  • TBSchedule:阿里早期开源的分布式任务调度系统。代码略陈旧,使用timer而非线程池执行任务调度。众所周知,timer在处理异常状况时是有缺陷的。而且TBSchedule作业类型较为单一,只能是获取/处理数据一种模式。还有就是文档缺失比较严重
  • elastic-job:当当开发的弹性分布式任务调度系统,功能丰富强大,采用zookeeper实现分布式协调,实现任务高可用以及分片,目前是版本2.15,并且可以支持云开发
  • Saturn:是唯品会自主研发的分布式的定时任务的调度平台,基于当当的elastic-job 版本1开发,并且可以很好的部署到docker容器上。
  • xxl-job: 是大众点评员工徐雪里于2015年发布的分布式任务调度平台,是一个轻量级分布式任务调度框架,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。

分布式任务调度系统对比

参与对比的可选系统方案:elastic——job (以下简称E-Job)与 xxx-job(以下简称X-Job)

项目背景及社区力量

X-Job : 大众点评公司下员工许雪里、贡献者 3人; github有2470star、1015fork | QQ讨论群6个 | 有登记在使用的超过40家公司 | 文档齐全

E-Job : 当当网开源,贡献者17人; github有2524star、1015fork | QQ讨论群1个、源码讨论群1个 | 有登记在使用的超过50家公司 | 文档齐全 | 有明确的发展计划

支持集群部署

X-Job : 集群部署唯一要求为:保证每个集群节点配置(db和登陆账号等)保持一致。调度中心通过db配置区分不同集群。

执行器支持集群部署,提升调度系统可用性,同时提升任务处理能力。集群部署唯一要求为:保证集群中每个执行器的配置项 “xxl.job.admin.addresses/调度中心地址” 保持一致,执行器根据该配置进行执行器自动注册等操作。

E-Job : 重写Quartz基于数据库的分布式功能,改用Zookeeper实现注册中心

作业注册中心:基于Zookeeper和其客户端Curator实现的全局作业注册控制中心。用于注册,控制和协调分布式作业执行。

多节点部署时任务不能重复执行

X-Job : 使用Quartz基于数据库的分布式功能_E-Job_  : 将任务拆分为n个任务项后,各个服务器分别执行各自分配到的任务项。一旦有新的服务器加入集群,或现有服务器下线,elastic-job将在保留本次任务执行不变的情况下,下次任务开始前触发任务重分片。

日志可追溯

X-Job : 支持,有日志查询界面

E-Job : 可通过事件订阅的方式处理调度过程的重要事件,用于查询、统计和监控。Elastic-Job目前提供了基于关系型数据库两种事件订阅方式记录事件。

监控告警

X-Job : 调度失败时,将会触发失败报警,如发送报警邮件。

任务调度失败时邮件通知的邮箱地址,支持配置多邮箱地址,配置多个邮箱地址时用逗号分隔

E-Job : 通过事件订阅方式可自行实现

作业运行状态监控、监听作业服务器存活、监听近期数据处理成功、数据流类型作业(可通过监听近期数据处理成功数判断作业流量是否正常,如果小于作业正常处理的阀值,可选择报警。)、监听近期数据处理失败(可通过监听近期数据处理失败数判断作业处理结果,如果大于0,可选择报警。)

弹性扩容缩容

X-Job : 使用Quartz基于数据库的分布式功能,服务器超出一定数量会给数据库造成一定的压力

E-Job : 通过zk实现各服务的注册、控制及协调

支持并行调度

X-Job : 调度系统多线程(默认10个线程)触发调度运行,确保调度精确执行,不被堵塞。E-Job : 采用任务分片方式实现。将一个任务拆分为n个独立的任务项,由分布式的服务器并行执行各自分配到的分片项。

高可用策略

X-Job : “调度中心”通过DB锁保证集群分布式调度的一致性, 一次任务调度只会触发一次执行;

E-Job : 调度器的高可用是通过运行几个指向同一个ZooKeeper集群的Elastic-Job-Cloud-Scheduler实例来实现的。ZooKeeper用于在当前主Elastic-Job-Cloud-Scheduler实例失败的情况下执行领导者选举。通过至少两个调度器实例来构成集群,集群中只有一个调度器实例提供服务,其他实例处于”待命”状态。当该实例失败时,集群会选举剩余实例中的一个来继续提供服务。

失败处理策略

X-Job : 调度失败时的处理策略,策略包括:失败告警(默认)、失败重试;

E-Job : 弹性扩容缩容在下次作业运行前重分片,但本次作业执行的过程中,下线的服务器所分配的作业将不会重新被分配。失效转移功能可以在本次作业运行中用空闲服务器抓取孤儿作业分片执行。同样失效转移功能也会牺牲部分性能。

动态分片策略

X-Job : 分片广播任务以执行器为维度进行分片,支持动态扩容执行器集群从而动态增加分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。

执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务,同时传递分片参数;可根据分片参数开发分片任务;

E-Job : 支持多种分片策略,可自定义分片策略

默认包含三种分片策略:基于平均分配算法的分片策略、 作业名的哈希值奇偶数决定IP升降序算法的分片策略、根据作业名的哈希值对Job实例列表进行轮转的分片策略,支持自定义分片策略

elastic-job的分片是通过zookeeper来实现的。分片的分片由主节点分配,如下三种情况都会触发主节点上的分片算法执行:a、新的Job实例加入集群 b、现有的Job实例下线(如果下线的是leader节点,那么先选举然后触发分片算法的执行) c、主节点选举”

和quartz框架对比

  • 调用API的的方式操作任务,不人性化;
  • 需要持久化业务QuartzJobBean到底层数据表中,系统侵入性相当严重。
  • 调度逻辑和QuartzJobBean耦合在同一个项目中,这将导致一个问题,在调度任务数量逐渐增多,同时调度任务逻辑逐渐加重的情况加,此时调度系统的性能将大大受限于业务;
  • Quartz关注点在于定时任务而非数据,并无一套根据数据处理而定制化的流程。虽然Quartz可以基于数据库实现作业的高可用,但缺少分布式并行调度的功能。

综合对比

图片

总结和结论

共同点:E-Job和X-job都有广泛的用户基础和完整的技术文档,都能满足定时任务的基本功能需求。

不同点

  • X-Job 侧重的业务实现的简单和管理的方便,学习成本简单,失败策略和路由策略丰富。推荐使用在“用户基数相对少,服务器数量在一定范围内”的情景下使用
  • E-Job 关注的是数据,增加了弹性扩容和数据分片的思路,以便于更大限度的利用分布式服务器的资源。但是学习成本相对高些,推荐在“数据量庞大,且部署服务器数量较多”时使用

附 定时任务的其他方案

发货后超过10天未收货时系统自动确认收货的多种实现方式

每天定时半夜筛选第二天 可以自动确认收货的订单,然后第二天 每10分钟 执行一次确认收货 开销不会太大吧 时间也相对精确

自动确认收货这个状态如果仅仅是让客户端看的话,等用户下一次上线的时间,做一次运算就可以了。

延迟和定时消息投递 ActiveMQ提供了一种broker端消息定时调度机制。适用于:1、不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,2、想让消息没隔一定时间投递一次,一共投递指定的次数

RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter。利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。这时候消息就可以重新被消费。

  • END -

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Boot 这样优化,让你的项目飞起来!

发表于 2021-07-21

介绍

在SpringBoot的Web项目中,默认采用的是内置Tomcat,当然也可以配置支持内置的jetty,内置有什么好处呢?

  1. 方便微服务部署。
  2. 方便项目启动,不需要下载Tomcat或者Jetty

针对目前的容器优化,目前来说没有太多地方,需要考虑如下几个点

  • 线程数
  • 超时时间
  • jvm优化

针对上述的优化点来说,首先线程数是一个重点,初始线程数和最大线程数,初始线程数保障启动的时候,如果有大量用户访问,能够很稳定的接受请求。

而最大线程数量用来保证系统的稳定性,而超时时间用来保障连接数不容易被压垮,如果大批量的请求过来,延迟比较高,不容易把线程打满。这种情况在生产中是比较常见的 ,一旦网络不稳定,宁愿丢包也不愿意把机器压垮。

jvm优化一般来说没有太多场景,无非就是加大初始的堆,和最大限制堆,当然也不是无限增大,根据的情况进快速开始

在spring boot配置文件中application.yml,添加以下配置

1
2
3
4
5
yaml复制代码server:
tomcat:
min-spare-threads: 20
max-threads: 100
connection-timeout: 5000

这块对tomcat进行了一个优化配置,最大线程数是100,初始化线程是20,超时时间是5000ms

Jvm优化

这块主要不是谈如何优化,jvm优化是一个需要场景化的,没有什么太多特定参数,一般来说在server端运行都会指定如下参数

初始内存和最大内存基本会设置成一样的,具体大小根据场景设置,-server是一个必须要用的参数,至于收集器这些使用默认的就可以了,除非有特定需求。

1.使用-server模式

设置JVM使用server模式。64位JDK默认启动该模式

1
vbscript复制代码java -server -jar springboot-1.0.jar

2.指定堆参数

这个根据服务器的内存大小,来设置堆参数。

  • -Xms :设置Java堆栈的初始化大小
  • -Xmx :设置最大的java堆大小
1
vbscript复制代码java -server -Xms512m -Xmx768m  -jar springboot-1.0.jar

设置初始化堆内存为512MB,最大为768MB。

3.远程Debug

在服务器上将启动参数修改为:

1
2
3
4
5
ini复制代码java -Djavax.net.debug=
ssl -Xdebug -Xnoagent -Djava.compiler=
NONE -Xrunjdwp:transport=
dt_socket,server=y,suspend=
n,address=8888 -jar springboot-1.0.jar

这个时候服务端远程Debug模式开启,端口号为8888。

在IDEA中,点击Edit Configuration按钮。

img

出现弹窗,点击+按钮,找到Remote选项。

img

在【1】中填入Remote项目名称,在【2】中填IP地址和端口号,在【3】选择远程调试的项目module,配置完成后点击OK即可

如果碰到连接超时的情况,很有可能服务器的防火墙的问题,举例CentOs7,关闭防火墙

1
2
bash复制代码systemctl stop firewalld.service #停止firewall
systemctl disable firewalld.service #禁止firewall开机启动

点击debug按钮,IDEA控制台打印信息:

img

说明远程调试成功。

JVM工具远程连接

jconsole与Jvisualvm远程连接

通常我们的web服务都输部署在服务器上的,在window使用jconsole是很方便的,相对于Linux就有一些麻烦了,需要进行一些设置。

1.查看hostname,首先使用

1
css复制代码hostname -i

查看,服务器的hostname为127.0.0.1,这个是不对的,需要进行修改

2.修改hostname

修改/etc/hosts文件,将其第一行的“127.0.0.1 localhost.localdomain localhost”,修改为:“192.168.44.128 localhost.localdomain localhost”.“192.168.44.128”为实际的服务器的IP地

3.重启Linux,在服务器上输入hostname -i,查看实际设置的IP地址是否为你设置的

4.启动服务,参数为:

1
2
3
4
ini复制代码java -jar -Djava.rmi.server.hostname=192.168.44.128 -
Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=911 -
Dcom.sun.management.jmxremote.ssl=false -
Dcom.sun.management.jmxremote.authenticate=false jantent-1.0-SNAPSHOT.jar

ip为192.168.44.128,端口为911 。

5.打开Jconsole,进行远程连接,输入IP和端口即可

img

点击连接,经过稍稍等待之后,即可完成连接,如下图所示:

img

同理,JvisualVm的远程连接是同样的,启动参数也是一样。

然后在本机JvisualVm输入IP:PORT,即可进行远程连接:如下图所示:

img

相比较Jvisualvm功能更加强大一下,界面也更美观。

另外,作者已经完成了两个专栏的文章Mybatis进阶、Spring Boot 进阶 ,已经将专栏文章整理成书,有需要的公众号【码猿技术专栏】回复关键词Mybatis 进阶、Spring Boot 进阶免费获取。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【熬夜肝了】HBase设计的实践经验(全)

发表于 2021-07-20

本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!」

今天分享的内容是关于HBASE相关设计的实战经验,记录学到的东西。主要是线上(集群规模10~20台,每秒读写数据量在几十万条记录的量级)出现了bug, hbase暂时不提供服务了,即整理了该篇文章。

- 一、[HBASE简介]

- 二、[详解HBASE的读和写、读放大、合并、故障恢复等]

- 三、[HBASE在告警信息的使用]

- 四、 [HBASE的优化经验]

HBASE是什么?

HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System)所提供的分布式数据存储一样,HBase在Hadoop之上提供了类似于Bigtable的能力。

HBase是Apache的Hadoop项目的子项目。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。另一个不同的是HBase基于列的而不是基于行的模式。

(1)数据模型

image.png

这就是一张表,我们可以根据行键(roykey)获取一列族数据或多列族数据,每一个列族下面有不限量的列,每一个列上可以存储数据,每一个数据都是有版本的,可以通过时间戳来区别。所以我们在一张表中,知道行键,列族,列,版本时间戳可以确定一个唯一的值

(2)逻辑架构

任何一张表,他的rowkey是全局有序的,由于对物理存储上的考虑,我们把它放在多个机器上,我们按照大小或者其他策略,分为多个region。每一个region负责表示一份数据,region会在物理机器上,保证是一个均衡的状态。

image.png

(3)系统架构

首先它也是一套标准的存储架构。他的Hmaster主要负责简单的协调服务,比如region的转移,均衡,以及错误的恢复,实际上他并不参与查询,真正的查询是发生在region server。region server事负责存储的,刚才我们说过,每一个表会分为几个region。然后存储在region server。

这里最重要的部分是hlog。为了保证数据一致性,首先会写一份日志文件,这是数据库系统里面以来的一种特性,创建了日志以后,我们才能写入成功。我们刚才提到HBase里面有很多column-family列族,没个列族在一个region里对应一个store,store分别包含storefile和menstore。

为了后续对HBase可以优化,我们首先考虑把文件写入menstore里面,随着menstore里面的数据满了之后,会把数据分发到磁盘里,然后storefile和memstore整体的话,依赖一个数据模型,叫做lmstree。

然后,数据是采用append方式写入的,无论是插入,修改,删除。实际上都是不断的append的。比如说你的更新,删除的操作方式,都是以打标记方式写入,所以它避免了磁盘的随机io,提高了写入性能,当然的话,它的底层的话是建立在hdfs之上。

image.png

HBase 使用 Zookeeper 做分布式管理服务,来维护集群中所有服务的状态。Zookeeper 维护了哪些 servers 是健康可用的,并且在 server 故障时做出通知。Zookeeper 使用一致性协议来保证分布式状态的一致性。注意这需要三台或者五台机器来做一致性协议。

zk的分布式协议还是必须要掌握的,毕竟大数据中的香饽饽flink,hbase这些都是是用zk来做分布式协议的。

(4)怎么读?

  1. 定位,从 Meta table 获取 rowkey 属于哪个 Region Server 管理
  2. 相应的 Region Server 读写数据
  3. Meta table,保存了系统中所有的 region 列表,结构如下
  • Key:table, region start key, region id
  • Value:region server

(5)Region Server 是什么,存的什么?

Region Server 运行在 HDFS DataNode 上,由以下组件组成:

  • WAL:Write Ahead Log (写前日志)是分布式文件系统上的一个文件,用于存储新的还未被持久化存储的数据,它被用来做故障恢复。
  • BlockCache:这是读缓存,在内存中存储了最常访问的数据,是 LRU(Least Recently Used)缓存。
  • MemStore:这是写缓存,在内存中存储了新的还未被持久化到硬盘的数据。当被写入硬盘时,数据会首先被排序。注意每个 Region 的每个 Column Family 都会有一个 MemStore。
  • HFile 在硬盘上(HDFS)存储 HBase 数据,以有序 KeyValue 的形式。

(6)怎么写数据?

  1. 首先是将数据写入到 WAL 中(WAL 是在文件尾部追加,性能高)
  2. 加入到 MemStore 即写缓存, 服务端就可以向客户端返回 ack 表示写数据完成

(7)MemStore 即写缓存是个什么东西?

  • 缓存 HBase 的数据,这和 HFile 中的存储形式一样
  • 更新都以 Column Family 为单位进行排序

(8)缓存写完了怎么刷盘呢,总要写到磁盘上去吧?

  1. MemStore 中累积了足够多
  2. 整个有序数据集就会被写入一个新的 HFile 文件到 HDFS 上(顺序写)
  3. 这也是为什么 HBase 要限制 Column Family 数量的一个原因(列族不能太多)
  4. 维护一个最大序列号,这样就知道哪些数据被持久化了

(9)HFILE是什么鬼?

HFile 使用多层索引来查询数据而不必读取整个文件,这种多层索引类似于一个 B+ tree:

  • KeyValues 有序存储。
  • rowkey 指向 index,而 index 则指向了具体的 data block,以 64 KB 为单位。
  • 每个 block 都有它的叶索引。
  • 每个 block 的最后一个 key 都被存储在中间层索引。
  • 索引根节点指向中间层索引。

trailer 指向原信息数据块,它是在数据持久化为 HFile 时被写在 HFile 文件尾部。trailer 还包含例如布隆过滤器和时间范围等信息。

布隆过滤器用来跳过那些不包含指定 rowkey 的文件,时间范围信息则是根据时间来过滤,跳过那些不在请求的时间范围之内的文件。

刚才讨论的索引,在 HFile 被打开时会被载入内存,这样数据查询只要一次硬盘查询。

二、读放大、写放大、故障恢复

(10)什么时候会触发读合并?

上一篇说了,当我们读取数据时,首先是定位,从 Meta table 获取 rowkey 属于哪个 Region Server 管理,而Region Server又有读缓存、写缓存、HFILE

因此我们读一个数据,它有可能在读缓存(LRU),也有可能刚刚才写入还在写缓存,或者都不在,则HBase 会使用 Block Cache 中的索引和布隆过滤器来加载对应的 HFile 到内存,因此数据可能来自读缓存、scanner 读取写缓存和HFILE,这就叫左HBASE的读合并

之前说过写缓存可能存在多个HFILE,因此一个读请求可能会读多个文件,影响性能,这也被称为读放大(read amplification)。

(11)既然存在读放大,那么有没有少去读多个文件的办法? –写合并

简单的说就是,HBase 会自动合并一些小的 HFile,重写成少量更大的 HFiles

它使用归并排序算法,将小文件合并成大文件,有效减少 HFile 的数量

这个过程被称为 写合并(minor compaction)

(12)写合并针对的是哪些HFILE?

1.它合并重写每个 列族(Column Family) 下的所有的 HFiles

2.在这个过程中,被删除的和过期的 cell 会被真正从物理上删除,这能提高读的性能

3.但是因为 major compaction 会重写所有的 HFile,会产生大量的硬盘 I/O 和网络开销。这被称为写放大(Write Amplification)。

4.HBASE默认是自动调度,因为存在写放大,建议在凌晨或周末进行

5.Major compaction 还能将因为服务器 crash 或者负载均衡导致的数据迁移重新移回到离 Region Server 的地方,这样就能恢复本地数据( data locality)。

(13)不是说Region Server管理多个Region,到底管理几个呢,Region什么时候会扩容? –Region分裂

我们再来回顾一下 region 的概念:

  • HBase Table 被水平切分成一个或数个 regions。每个 region 包含了连续的,有序的一段 rows,以 start key 和 end key 为边界。
  • 每个 region 的默认大小为 1GB。
  • region 里的数据由 Region Server 负责读写,和 client 交互。
  • 每个 Region Server 可以管理约 1000 个 regions(它们可能来自一张表或者多张表)。

一开始每个 table 默认只有一个 region。当一个 region 逐渐变得很大时,它会分裂(split)成两个子 region,每个子 region 都包含了原来 region 一半的数据,这两个子 region 并行地在原来这个 region server 上创建,这个分裂动作会被报告给 HMaster。处于负载均衡的目的,HMaster 可能会将新的 region 迁移给其它 region server。

(14)因为分裂了,为了负载均衡可能在多个region server,造成了读放大,直到写合并的到来,重新迁移或合并到离 region server 节点附近的地方

(15)HBASE的数据怎么备份的?

所有的读写都发生在 HDFS 的主 DataNode 节点上。HDFS 会自动备份 WAL(写前日志) 和 HFile 的文件 blocks。HBase 依赖于 HDFS 来保证数据完整安全。当数据被写入 HDFS 时,一份会写入本地节点,另外两个备份会被写入其它节点。

WAL 和 HFiles 都会持久化到硬盘并备份。那么 HBase 是怎么恢复 MemStore 中还未被持久化到 HFile 的数据呢?

(16)HBASE的数据怎么宕机恢复的?

  1. 当某个 Region Server 发生 crash 时,它所管理的 region 就无法被访问了,直到 crash 被检测到,然后故障恢复完成,这些 region 才能恢复访问。Zookeeper 依靠心跳检测发现节点故障,然后 HMaster 会收到 region server 故障的通知。
  2. 当 HMaster 发现某个 region server 故障,HMaster 会将这个 region server 所管理的 regions 分配给其它健康的 region servers。为了恢复故障的 region server 的 MemStore 中还未被持久化到 HFile 的数据,HMaster 会将 WAL 分割成几个文件,将它们保存在新的 region server 上。每个 region server 然后回放各自拿到的 WAL 碎片中的数据,来为它所分配到的新 region 建立 MemStore。
  3. WAL 包含了一系列的修改操作,每个修改都表示一个 put 或者 delete 操作。这些修改按照时间顺序依次写入,持久化时它们被依次写入 WAL 文件的尾部。
  4. 当数据仍然在 MemStore 还未被持久化到 HFile 怎么办呢?WAL 文件会被回放。操作的方法是读取 WAL 文件,排序并添加所有的修改记录到 MemStore,最后 MemStore 会被刷写到 HFile。

HBASE基本讲解就到此结束了,开始讲解实战演练吧!


三、实战经验

怎么设计rowkey

加班回来才能开始写文章,好了,开始了。

在告警业务场景中,一般分为两类场景

  1. 瞬时事件类型 –通常开始就结束。
  2. 持续事件类型 –通常开始一段时间在结束。

针对这两种情况,我们可以涉及rowkey为 : 唯一标识id + 时间 + 告警类型

我们对id做一个md5,做一个哈希,这样可以保证数据的分配均衡.

指标平台

第二个场景叫做指标平台,我们使用kylin做了一层封装,在这上面可以选择我们在HBase存储好的数据,可以选择哪些维度,去查询哪些指标。比如这个成交数据,可以选择时间,城市。就会形成一张图,进而创建一张报表。然后这张报表可以分享给其他人使用。

为什么会选择Kylin呢,因为Kylin是一个molap引擎,他是一个运算模型,他满足我们的需求,对页面的相应的话,需要亚秒级的响应。

第二,他对并发有一定的要求,原始的数据达到了百亿的规模。另外需要具有一定的灵活性,最好有sql接口,以离线为主。综合考虑,我们使用的是Kylin。

Kylin简介

Kylin给他家简单介绍一下,Apache Kylin™是一个开源的分布式分析引擎,提供Hadoop之上的SQL查询接口及多维分析(OLAP)能力以支持超大规模数据,最初由eBay Inc.开发并贡献至开源社区。它能在亚秒内查询巨大的Hive表。他的原理比较简单,基于一个并运算模型,我预先知道,我要从那几个维度去查询某个指标。在预定好的指标和维度的情况下去把所有的情况遍历一遍。利用molap把所有的结果都算出来,在存储到HBase中。然后根据sql查询的维度和指标直接到HBase中扫描就行了。为什么能够实现亚秒级的查询,这就依赖于HBase的计算。

kylin架构

和刚才讲的逻辑是一致的,左边是数据仓库。所有的数据都在数据仓库中存储。中间是计算引擎,把每天的调度做好,转化为HBase的KY结构存储在HBase中,对外提供sql接口,提供路由功能,解析sql语句,转化为具体的HBase命令。

image.png

Kylin中有一个概念叫Cube和Cubold,其实这个逻辑也非常简单,比如已经知道查询的维度有A,b,c,d四个。那abcd查询的时候,可以取也可以不取。一共有16种组合,整体叫做,cube。其中每一种组合叫做Cuboid。

image.png

Kylin如何在Hbase中进行物理存储的 –这里引用的是贝壳的用例

首先定义一张原始表,有两个维度,year和city。

在定义一个指标,比如总价。下面是所有的组合,刚才说到Kylin里面有很多cuboid组合,比如说前面三行有一个cuboid:00000011组合,他们在HBase中的RowKey就是cuboid加上各维度的取值。

这里面会做一点小技巧,对维度的值做一些编码,如果把程式的原始值放到Rowkey里,会比较长。

Rowkey也会存在一个sell里任何一个版本的值都会存在里面。如果rowkey变的非常长,对HBase的压力会非常大,所有通过一个字典编码去减少长度,通过这种方法,就可以把kylin中的计算数据存储到HBase中。

image.png

用比特位来做一些特征,多个维度统计分析,速度是比较快的,也是大数据分析常用的手段!!!

当然我们在实际中也配合apache Phoenix提供查询功能

四、优化经验

我先给出一个图

image.png

1. scan缓存是否设置合理?

优化原理:在解释这个问题之前,首先需要解释什么是scan缓存,通常来讲一次scan会返回大量数据,因此客户端发起一次scan请求,实际并不会一次就将所有数据加载到本地,而是分成多次RPC请求进行加载,这样设计一方面是因为大量数据请求可能会导致网络带宽严重消耗进而影响其他业务,另一方面也有可能因为数据量太大导致本地客户端发生OOM。在这样的设计体系下用户会首先加载一部分数据到本地,然后遍历处理,再加载下一部分数据到本地处理,如此往复,直至所有数据都加载完成。数据加载到本地就存放在scan缓存中,默认100条数据大小。通常情况下,默认的scan缓存设置就可以正常工作的。但是在一些大scan(一次scan可能需要查询几万甚至几十万行数据)来说,每次请求100条数据意味着一次scan需要几百甚至几千次RPC请求,这种交互的代价无疑是很大的。因此可以考虑将scan缓存设置增大,比如设为500或者1000就可能更加合适。笔者之前做过一次试验,在一次scan扫描10w+条数据量的条件下,将scan缓存从100增加到1000,可以有效降低scan请求的总体延迟,延迟基本降低了25%左右。

优化建议:大scan场景下将scan缓存从100增大到500或者1000,用以减少RPC次数

2. get请求是否可以使用批量请求?

优化原理:HBase分别提供了单条get以及批量get的API接口,使用批量get接口可以减少客户端到RegionServer之间的RPC连接数,提高读取性能。另外需要注意的是,批量get请求要么成功返回所有请求数据,要么抛出异常。

优化建议:使用批量get进行读取请求

3. 请求是否可以显示指定列族或者列?

优化原理:HBase是典型的列族数据库,意味着同一列族的数据存储在一起,不同列族的数据分开存储在不同的目录下。如果一个表有多个列族,只是根据Rowkey而不指定列族进行检索的话不同列族的数据需要独立进行检索,性能必然会比指定列族的查询差很多,很多情况下甚至会有2倍~3倍的性能损失。

优化建议:可以指定列族或者列进行精确查找的尽量指定查找

4. 离线批量读取请求是否设置禁止缓存?

优化原理:通常离线批量读取数据会进行一次性全表扫描,一方面数据量很大,另一方面请求只会执行一次。这种场景下如果使用scan默认设置,就会将数据从HDFS加载出来之后放到缓存。可想而知,大量数据进入缓存必将其他实时业务热点数据挤出,其他业务不得不从HDFS加载,进而会造成明显的读延迟毛刺

优化建议:离线批量读取请求设置禁用缓存,scan.setBlockCache(false)

5. RowKey必须进行散列化处理(比如MD5散列),同时建表必须进行预分区处理

6.JVM内存配置量 < 20G,BlockCache策略选择LRUBlockCache;否则选择BucketCache策略的offheap模式;期待HBase 2.0的到来!

7. 观察RegionServer级别以及Region级别的storefile数,确认HFile文件是否过多

优化建议:

hbase.hstore.compactionThreshold设置不能太大,默认是3个;设置需要根据Region大小确定,通常可以简单的认为hbase.hstore.compaction.max.size = RegionSize / hbase.hstore.compactionThreshold

8. Compaction是否消耗系统资源过多?

(1)Minor Compaction设置:hbase.hstore.compactionThreshold设置不能太小,又不能设置太大,因此建议设置为5~6;hbase.hstore.compaction.max.size = RegionSize / hbase.hstore.compactionThreshold(2)Major Compaction设置:大Region读延迟敏感业务( 100G以上)通常不建议开启自动Major Compaction,手动低峰期触发。小Region或者延迟不敏感业务可以开启Major Compaction,但建议限制流量;(3)期待更多的优秀Compaction策略,类似于stripe-compaction尽早提供稳定服务

9.Bloomfilter是否设置?是否设置合理?

任何业务都应该设置Bloomfilter,通常设置为row就可以,除非确认业务随机查询类型为row+cf,可以设置为rowcol

BloomFilter是启用在每个ColumnFamily上的,我们可以使用JavaAPI

1
2
scss复制代码//我们可以通过ColumnDescriptor来指定开启的BloomFilter的类型
HColumnDescriptor.setBloomFilterType() //可选NONE、ROW、ROWCOL

我们还可以在创建Table的时候指定BloomFilter

1
javascript复制代码hbase> create 'mytable',{NAME => 'colfam1', BLOOMFILTER => 'ROWCOL'}

10.开启Short Circuit Local Read功能,具体配置戳这里官网

优化原理:当前HDFS读取数据都需要经过DataNode,客户端会向DataNode发送读取数据的请求,DataNode接受到请求之后从硬盘中将文件读出来,再通过TPC发送给客户端。Short Circuit策略允许客户端绕过DataNode直接读取本地数据。

11. Hedged Read功能是否开启?

优化原理:HBase数据在HDFS中一般都会存储三份,而且优先会通过Short-Circuit Local Read功能尝试本地读。但是在某些特殊情况下,有可能会出现因为磁盘问题或者网络问题引起的短时间本地读取失败,为了应对这类问题,社区开发者提出了补偿重试机制 – Hedged Read。该机制基本工作原理为:客户端发起一个本地读,一旦一段时间之后还没有返回,客户端将会向其他DataNode发送相同数据的请求。哪一个请求先返回,另一个就会被丢弃。 优化建议:开启Hedged Read功能,具体配置参考这里官网

12. 数据本地率是否太低?

数据本地率:HDFS数据通常存储三份,假如当前RegionA处于Node1上,数据a写入的时候三副本为(Node1,Node2,Node3),数据b写入三副本是(Node1,Node4,Node5),数据c写入三副本(Node1,Node3,Node5),可以看出来所有数据写入本地Node1肯定会写一份,数据都在本地可以读到,因此数据本地率是100%。现在假设RegionA被迁移到了Node2上,只有数据a在该节点上,其他数据(b和c)读取只能远程跨节点读,本地率就为33%(假设a,b和c的数据大小相同)。优化原理:数据本地率太低很显然会产生大量的跨网络IO请求,必然会导致读请求延迟较高,因此提高数据本地率可以有效优化随机读性能。数据本地率低的原因一般是因为Region迁移(自动balance开启、RegionServer宕机迁移、手动迁移等),因此一方面可以通过避免Region无故迁移来保持数据本地率,另一方面如果数据本地率很低,也可以通过执行major_compact提升数据本地率到100%。优化建议:避免Region无故迁移,比如关闭自动balance、RS宕机及时拉起并迁回飘走的Region等;在业务低峰期执行major_compact提升数据本地率

13. 线上bug, hbase暂时不提供服务了?

集群规模10~20台,每秒读写数据量在几十万条记录的量级

客户端与HBase集群进行RPC操作时会抛出NotServingRegionException异常,结果就导致了读写操作失败。大量的写操作被阻塞,写入了文件,系统也发出了警报!

问题排查

通过查看HBase Master运行日志,结合客户端抛出异常的时刻,发现当时HBase集群内正在进行Region的Split和不同机器之间的Region Balance

1)由于表中rowkey有时间字段,因此每天都需要新创建Region,同时由于写入数据量大,进一步触发了HBase的Region Split操作,这一过程一般耗时较长(测试时从线上日志来看,平均为10秒左右,Region大小为4GB),且Region Split操作触发较为频繁;

2)同时由于Region Split操作导致Region分布不均匀,进而触发HBase自动做Region Balance操作,Region迁移过程中也会导致Region下线,这一过程耗时较长(测试时从线上日志来看,平均为20秒左右)。

解决:

1)对于写端,可以将未写入成功的记录,添加到一个客户端缓存中,隔一段时间后交给一个后台线程统一重新提交一次;也可以通过setAutoFlush(flase, false)保证提交失败的记录不被抛弃,留在客户端writeBuffer中等待下次writeBuffer满了后再次尝试提交,直到提交成功为止。

2)对于读端,捕获异常后,可以采取休眠一段时间后进行重试等方式。

3)将timestamp字段改成一个周期循环的timestamp,如取timestamp % TS_MODE后的值,其中TS_MODE须大于等于表的TTL时间周期,这样才能保证数据不会被覆盖掉。经过这样改造后,即可实现Region的复用,避免Region的无限上涨。对于读写端的变更也较小,读写端操作时只需将timestamp字段取模后作为rowkey进行读写,另外,读端需要考虑能适应scan扫描时处理[startTsMode, endTsMode]和[endTsMode, startTsMode]两种情况

参考文章:

blog.csdn.net/shufangreal…

❤️❤️❤️❤️

非常感谢人才们能看到这里,如果这个文章写得还不错,觉得有点东西的话 求点赞👍 求关注❤️ 求分享👥 对帅气欧巴的我来说真的 非常有用!!!

如果本篇博客有任何错误,请批评指教,不胜感激 !

文末福利,最近整理一份面试资料《Java面试通关手册》,覆盖了Java核心技术、JVM、Java并发、SSM、微服务、数据库、数据结构等等。获取方式:GitHub github.com/Tingyu-Note…,更多内容关注公号:汀雨笔记,陆续奉上。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MySQL建表 1、前言 2、整数类型 3、 实数类型 4、

发表于 2021-07-20

1、前言

今天面试字节的时候,非常无奈,面试官不考查询语句,考的是建表语句,这我哪里会?里面的类型基本上都忘记了,今天做一次整体的记录

2、整数类型

整数类型有TINYINT,SMALLINT,MEDIUMINT,INT,BIGINT,存储空间及数值范围如下表

—类型 存储空间(单位为位) 数值范围
TINYINT 8 -128 ~ 127
SMALLINT 16 -32768 ~ 32767
MEDIUMINT 24 -8388608 ~ 8388607
INT 32 -2147483648 ~ 2147483647
BIGINT 64 太大了

3、 实数类型

FLOAT

单精度浮点型,使用8位—-B

DOUBLE

双精度浮点型,使用16位存储—-2B

DECIMAL

float和double进行计算时会发生精度损失,损精度损失原因可参考这篇文章:老板,用float存储金额为什么要扣我工资 需要精度计算的时候可以使用DECIMAL,使用DECIMAL需要额外的空间和计算开销,因此当且仅当需要精度计算时才使用

4、字符串类型

1. VARCHAR和CHAR

varchar和char是非常非常常用的字符串类型

VARCHAR

VARCHAR用于存储变长字符串,使用该类型存储字符串时需要额外使用1或2个额外字节记录字符串的长度:

  • 列的最大长度小于或等于255 => 使用1字节
  • 列的长度大于255 => 使用2字节

适用VARCHAR作为存储类型的场景:

  • 列更新很少 => 列经常更新容易产生页分裂
  • 列长度非固定 => VARCHAR存储时只使用必要空间,因此会省空间
CHAR

CHAR用于存储定长字符串,在存储CHAR类型时,会删除所有的末尾空格

使用CHAR最为存储类型的场景

  • 列几乎定长
  • 列长度很短 => VARCHAR需要额外字节存储长度
  • 列经常更新

2. BLOB和TEXT类型

BLOB和TEXT类型都是用来存储很大的数据,比如文章内容这些

BLOB

采用二进制方式存储, BLOB细分又可以分为TINYBLOB,SMALLBLOB,BLOB,MEDIUMBLOB, LONGBLOB

TEXT

采用字符方式存储,TEXT细分又可以分为TINYTEXT,SMALLTEXT,TEXT,MEDIUMTEXT, LONGTEXT

当BLOB和TEXT值太大时,InnoDB存储会使用外部存储区域来存储值,然后保存一个1~4字节的指针指向外部存储

5、日期和时间类型

常用的日期类型有DATETIME和TIMESTAMP

DATETIME

使用8字节存储,可以保存大范围的值,从1001~9999年

TIMESTAMP

使用4字节存储,保存范围比DATETIME小,从1970~2038年

对于需要存储更小粒度的日期和时间可以使用DOUBLE或BIGINT,当然不是存储小粒度也可以使用BIGINT

6、建表语法

1
sql复制代码CREATE TABLE table_name (column_name column_type);
  • 记住,是先写 表名,再写表的类型
1
2
3
4
5
6
7
sql复制代码CREATE TABLE IF NOT EXISTS `runoob_tbl`(
`runoob_id` INT UNSIGNED AUTO_INCREMENT,
`runoob_title` VARCHAR(100) NOT NULL,
`runoob_author` VARCHAR(40) NOT NULL,
`submission_date` DATE,
PRIMARY KEY ( `runoob_id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

7、面试

  • 面试时候,考官问的是,创建一张学生表,学生大概10000人,包含,学生学号、学生名字、学生性别、学生住址,需要选择合理的类型
1
2
3
4
5
6
7
8
9
sql复制代码CREATE TABLE IF NOT EXISTS 'students'{
'student_id' SMALLINT AUTO_INCREMENT NOT NULL,
'name' varchar(20) not null,
'age' TINYINT NOT NULL,
'sex' bit default 1,
'birth' datetime not null,
'address' varchar(255) not null,
PRIMARY KEY('student_id')
}ENGINE=InnoDB default CHARSET = UTF-8;
  • id:10000 人-可以采用 smallint 进行标识
  • name: 名字最长也就20个
  • age : 肯定小于100岁, 可采用 TINYINT 进行标识
  • sex: 用一个bit 就可以进行标识
  • birth 需要采用日期
  • 地址: 一般采用 varchar (255)
  • 设定自增 和 主键

参考

  • juejin.cn/post/684490…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…598599600…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%