开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Elasticsearch 系列之一 —— 初识 简介 集群

发表于 2021-11-18

简介

​ Elasticsearch 故名思议,Elastic Search 一个分布式搜索中间件。据说是创始人给妻子开发搜索食谱的应用时,顺手做的中间件。果然,爱情的力量是伟大的,否则也不会有至今广受使用的 Elasticsearch 了。

​ 分布式、高性能、近实时是 Elasticsearch 的特点。它可以对几乎所有类型的数据(基本值类型、地理空间、IP 等)进行搜索,这依赖于针对不同的类型建立合适的索引结构,后面的系列我们将详细分析索引部分,本次我们分析 Elasticsearch 的系统概念与读写流程。

​ 为了叙述简洁,后文将 Elasticsearch 简写为 ES。

集群 Cluster

​ 作为一个分布式系统,肯定是需要由多个 ES 实例节点组成集群的。集群需要被管理,ES 中通过 master 来管理集群。集群为了保障高可用,避免单点故障,master 是经过选举选出来的。ES 中,并不是所有的节点都能作为 master 的候选者。

ES 中,一个实例节点也是可以自成集群的

如下图,是一个 常见的ES 集群架构图:

image-20211110100233827
节点 Node
=======

​ 单个 ES 实例,既称为一个节点。ES 中节点可以有以下一个或多个角色:

候选主节点-Master Eligible Node

​ 用来选主,只有候选主节点才有选举权和被选举权,其它节点不参与选主。

​ 选举出的主节点负责创建索引、删除索引、跟踪哪些节点是群集的一部分,并决定哪些分片分配给相关的节点、追踪集群中节点的状态等,稳定的主节点对集群的健康是非常重要的。

​ 通常,为了稳定性,最好使用低配置机器创建独立的候选主节点,且不小于3个。

数据节点-Date Node

​ 负责数据的存储和相关的操作,例如对数据进行增、删、改、查和聚合等操作,所以数据节点(Data 节点)对机器配置要求比较高,对 CPU、内存和 I/O 的消耗很大。

​ 数据节点是集群中压力很大的节点,最好将数据节点和主节点分开,以免影响主节点稳定性,导致脑裂,造成索引、数据不一致等。

​ 1、使用 SSD ,提升磁盘读写能力

​ 2、除了 JVM heap 本身使用的内存外,预留一定的内存给文件缓存,能加快文件访问,避免每次都要访问磁盘

​ 3、禁用 swap 机制

协调节点-Coordinating Node

​ 一般来说,每个节点都可承担协调节点的角色。通常,哪个节点接收客户端请求,就是本次请求的协调节点。协调节点用来处理客户端请求,进行请求分发,结果合并,并返回给客户端。

​ 协调节点的压力介于主节点和数据节点之间,不需要很高的 io 能力。将协调节点独立,有助于降低数据节点的压力,避免互相影响。

ES 集群除了可以有以上常见节点角色外,还有:Ingest node、Remote-eligible node、Machine learning node、Transform node ,感兴趣的可以取官网了解,本文就不一一介绍了。

分片 Shard

​ 一个 data 节点可包含多个 shard 。一个 shard 就是一个 Lucene 实例,索引由一系列 shard 组成。ES 之所以称作分布式搜索,既是可以将数据分散在多个 shard 上,提供更高的性能。

​ 因为写入时,需要通过路由,确定写在哪一个 shard 上,所以需要在索引创建时,确定好 shard 数,一旦设置后将不可变。

副本分片 Replica Shard

​ 一个主 shard 可以有0个或多个副本 shard。默认每个主 shard 都有一个副本 shard,副本 shard 永远不会和主 shard 在同一节点上。主要作用:

​ 1、故障转移:当主 shard 出现故障时,可以将副本 shard 提升为主 shard。

​ 2、提高性能:get、search 请求可以交给主 shard 或副本 shard 处理。

Lucene

​ Lucene 是一个全文检索库,ES 基于 lucene 来建立。一个 lucene 索引里有很多个 segemnt,每个 segment 都是一个索引结构。在搜索时,会搜索所有的 segment。segment 内部会构建倒排索引,用来检索。

​ segment 不可变,所以:

​ 1、当文档删除时:lucene 将文档额外标记为删除

​ 2、当更新时:先删除,再插入(新的 segmeng )

写入

​ 当协调节点收到写请求后,通过 routing 找到对应的主 shard,将写请求转发给主 shard,主 shard 写完之后,将写入并发发送给副本 shard,待副本 shard 写入后,返回给主 shard,当所有副本 shard 写完后,主 shard 返回请求给协调节点。

image-20211108095736473
​ 当节点接收到写入请求时,先将数据写入到 index buffer 之中,此时,写入的数据还不能被搜索到,直到数据被 refresh 机制(默认1秒)刷到文件系统缓存,形成不可变的 segment 小索引,此时才能被搜索(近实时搜索)。数据被 refresh 之后,是写入到文件缓存的,并不会立刻持久化到磁盘,而是通过 flush 机制刷到磁盘。

​ 数据写入到 index buffer 中的同时,还会写入到 translog,防止数据丢失。translog 默认5秒会 fsync 到磁盘,所以理论上5秒之内写入的数据,还是有丢失的可能。translog 并不会无限增大,当数据 flush 到磁盘后,translog 就可以进行清理了。

​ segment flush 时机:

​ 1、5分钟内没有对索引的请求

​ 2、translog 到达一定大小,默认 512m

​ 3、调用 flush api

针对整个集群 flush

1
shell复制代码curl -XPOST "http://127.0.0.1:9200/_flush/synced"

针对单个索引 flush

1
shell复制代码curl -XPOST "http://127.0.0.1:9200/demo/_flush/synced"

image-20211107222036408
​ 每秒生成一个 segment 会导致文件数量很多,同时造成搜索响应变慢,所以 ES 内部会不定期自动对 segment 进行合并,老的 segment 将删除并释放资源。当然也可以在 ES 压力不大时,进行手动强制合并。

1
shell复制代码curl -X POST "http://127.0.0.1:9200/demo/_forcemerge"

搜索

​ 当搜索请求到达第一个节点时,其将作为协调节点,负责处理请求。处理流程如图:

image-20211107223936603
​ 如果是分页搜索:假设客户端请求 from = 50,size = 50;协调节点发送给每个 shard 的请求 from = 0,size = from + size = 100;协调节点收到的数据量为 shard 数 * ( from + size ) ;最后再择出分页数据,fetch 得到要获取的数据返回。

​ 基于以上流程,ES 为了避免深度分页对系统的影响,默认对分页有1万条限制的。那怎么解决呢?

1、修改 index. max_result_window 调大限制,可动态设置

1
2
3
4
shell复制代码curl -XPUT "http://127.0.0.1:9200/demo/_settings" -H 'Content-Type: application/json' -d'
{
"index.max_result_window": "1000000"
}'

2、Scroll

​ 第一次查询时带上 scroll 参数代表是 scroll 查询,同时设置 scroll 上下文的过期时间:

1
2
3
4
5
6
7
shell复制代码curl -XGET "http://127.0.0.1:9200/demo/_search?scroll=10s" -H 'Content-Type: application/json' -d'
{
"size": 20,
"query": {
"match_all": {}
}
}'

​ 返回 _scroll_id:

1
2
3
4
json复制代码{
"_scroll_id": "i1BOVdVWjJyU...",
... //省略
}

​ 后续查询将上次返回的 _scroll_id 作为参数:

1
2
3
4
5
shell复制代码curl -XGETY "http://127.0.0.1:9200/_search/scroll" -H 'Content-Type: application/json' -d'
{
"scroll" : "10s",
"scroll_id" : "i1BOVdVWjJyU..."
}'

​ scroll 需要在 ES 服务端持续维持一个快照上下文,持有 segment 。此时 segment 合并仍可继续,但却不能删除老的 segment 释放资源。直到过期或者手动清理:

1
2
3
4
shell复制代码curl -XDELETE "http://127.0.0.1:9200/_search/scroll" -H 'Content-Type: application/json' -d'
{
"scroll_id" : "i1BOVdVWjJyU..."
}'

​ 当遍历大量数据时,可通过 slice scroll 加快速度

3、Search After

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
shell复制代码curl -XGET "http://127.0.0.1:9200/demo/_search" -H 'Content-Type: application/json' -d'
{
"from": 0,//from 永远从0开始
"size": 10,
"query": {
"match_all": {}
},
"search_after": [ //初次不传,后续使用上次最后元素的排序值作为参数
674688494483205
],
"sort": [
{
"_id": { //稳定排序,建议最后排序字段用 _id
"order": "desc"
}
}
]
}'

​ 需要设置排序字段,并且是稳定排序,通常最后排序字段是 _id。记录上页最后元素的排序值,作为下次的查询条件。相比 scroll 是服务端无状态的,不占用 ES 资源。

总结

​ 本文带大家简单了解 ES 的一些基本系统概念以及写入和搜索的大致流程,看起来是不是和大多数分布式数据库大同小异呢?后续我们将逐步深入,剖析 ES 原理以及性能方面。

参考:

ES 官网 www.elastic.co/guide/en/el…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

LeetCode-2 两数相加 - 19

发表于 2021-11-18

「这是我参与11月更文挑战的18天,活动详情查看:2021最后一次更文挑战」

给你两个 非空 的链表,表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的,并且每个节点只能存储 一位 数字。

请你将两个数相加,并以相同形式返回一个表示和的链表。

你可以假设除了数字 0 之外,这两个数都不会以 0 开头。

来源:力扣(LeetCode) 链接:leetcode-cn.com/problems/ad…

图片.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
ini复制代码输入:l1 = [2,4,3], l2 = [5,6,4]
输出:[7,0,8]
解释:342 + 465 = 807.

示例 2:

输入:l1 = [0], l2 = [0]
输出:[0]

示例 3:

输入:l1 = [9,9,9,9,9,9,9], l2 = [9,9,9,9]
输出:[8,9,9,9,0,0,0,1]



提示:

每个链表中的节点数在范围 [1, 100] 内
0 <= Node.val <= 9
题目数据保证列表表示的数字不含前导零

来源:力扣(LeetCode)
链接:https://leetcode-cn.com/problems/add-two-numbers

思路

  • 首先这道题是一道模拟题,和这道题相同难度的类似,大数模拟问题
    • 这道题是结合链表去出的
    • 所以,我们在处理这道题的时候,需要预先处理好几个点
    1. 哑节点问题,先创建一个节点,让变量指向这个节点,进行持续遍历
    2. 进位问题,我们需要保证每次的进位补零0️⃣问题
    3. 边界问题,这道题只要满足两个链表一个不为空就可以
    • 下面代码的sum是每次的下一个节点的值,而cc代表进位
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码/**
* Definition for singly-linked list.
* public class ListNode {
* int val;
* ListNode next;
* ListNode() {}
* ListNode(int val) { this.val = val; }
* ListNode(int val, ListNode next) { this.val = val; this.next = next; }
* }
*/
class Solution {
public ListNode addTwoNumbers(ListNode l1, ListNode l2) {
ListNode pre = new ListNode(0);
ListNode cur = pre;
int cc = 0;
while (l1 != null || l2 != null) {
int x = l1 == null ? 0:l1.val;
int y = l2 == null ? 0:l2.val;
int sum = x + y + cc;
cc = sum / 10;
sum = sum % 10;
cur.next = new ListNode(sum);
cur = cur.next;
if (l1 != null) l1 = l1.next;
if (l2 != null) l2 = l2.next;
}
if (cc == 1) cur.next = new ListNode(cc);
return pre.next;
}
}

图片.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

简述消息队列在电商系统使用场景以及工作模式

发表于 2021-11-18

概述

消息队列(Message Queue),是分布式系统中重要的组件,是一种进程间通信或者是同一进程的不同线程的通信方式。和 http 同步协议不同的是,消息队列是一种异步的通信协议,不需要立即获得结果。

消息队列的使用场景

  • 异步处理
  • 流量控制
  • 应用解耦

应用解耦

消息队列的一个作用就是实现系统应用之间的解耦。举例一下电商系统的中的订单系统。
当创建一个订单时:

  1. 发起支付
  2. 扣减库存
  3. 发消息告知用户
  4. 更新统计数据

这些订单下游的系统都需要实时获得订单数据,随着业务量的增大和业务的变更,有一段时间不需要发消息给客户,或者需要添加功能,每次都需要不断的调式订单系统和下游系统。

引入消息队列后,订单服务在创建订单时发送一条信息到消息队列主题 Order 中,所有的下游都订阅主题Order,这样无论增加、减少下游系统还是下游系统的功能如何变化,订单服务都不需要做更改了,实现了订单服务和下游服务的解耦。

异步处理

异步处理是将很多串行进行的步骤转成异步处理,还是已订单系统为例,下单订单需要创建订单和锁定库存,确定本次请求后马上给用户返回响应,然后把后续请求的数据的都在消息队列,由消息队列异步处理。

这样把五个步骤减少为两个步骤,假设每个步骤处理时间需要500ms,在不考虑网络延迟的情况下:

串行处理: 500 * 5 = 2500ms
并行处理:500 * 2 = 1000ms

系统响应时间缩短一半以上。这样响应速度更快,而且把请求放在后续操作,可以充分利用更多的资源处理请求。
所以我们可以看到,实现异步操作的服务:

  • 更快地返回结果
  • 减少等待时间,提升系统总体性能

流量控制

在购物网站的做一个秒杀活动,平时网站能支撑每秒1000次并发请求,但是电商秒杀一下请求猛增到每秒3000次请求,多出来的请求,可能直接让系统宕机。
所以我们就需要使用消息队列来控制流量,当系统短时间接收到大量请求时,会先将请求堆积到消息队列上,后端服务从消息队列上消费数据,消息队列相对于给后端服务做了一次缓冲。

优缺点

上面的概述总结起来有个三个优点:异步、削峰和解耦。
缺点有以下几个:

  • 系统可用性降低
  • 增加系统复杂度
  • 可能会数据一致性问题,比如数据丢失,数据重复传输

RabbitMQ消息队列五种工作模式

在rabbitmq官网教程上介绍了几种工作模式,

简单(simple)模式

The simplest thing that does something

从上面的示意图看出来 simple 模式有以下几个特征:

  • 只有一个生产者、一个消费者和一个队列
  • 生产者和消费者在发送和接收消息时,只需要指定队列名称,而不需要发送那个 Exchange 交换机。

工作(Work)模式


在多个消费者之间分配任务(竞争消费者模式)
创建一个工作队列,添加多个消费者共同消费工作队列上的任务。每一个消息都给一个消费者消费

发布订阅(Publish/Subscribe)模式


工作模式中每个消息只能被一个消费者消费,发布订阅模式是每个消息同时给多个消费者消费。
上图中的X表示Exchange 交换器,Exchange 类型有:Direct、Topic、Headers和Fanout。

  • 发布订阅用的是 Fanout
  • Fanout 是不需要指定具体的队列名,Exchange 会将消息转发所有的绑定的队列

路由(Routing)模式


路由模式中的交换器类型为 direct,在同一个交换器,由生产者指定指定目标队列。

  • 指定规则按照 RoutingKey 指定
  • 消费者通过 BindingKey 绑定自己接收消息队列
  • 只有 RoutingKey 和 BindingKey 匹配队列才会收到信息

RoutingKey 是生产者指定 Exchange 路由到哪个队列,BindingKey 用于消费者绑定到某个队列

主题(Topic)模式


主题模式是根据通配符绑定队列,其中 * 可以替换任务一个标识符,# 可以替换多个标识符,通配符和名称使用 . 隔开。

  • 主题模式的 Exchange 类型为 topic
  • 每个消息可以被多个队列消费

参考

  • 为什么需要消息队列?
  • 消息队列及常见消息队列介绍

如果觉得文章对你有帮助的话,请点个赞吧!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

RocketMQ—Producer(一)启动流程解密 一、P

发表于 2021-11-18

以下为RocketMQ—生产者系列文章索引:

image.png

一、Producer介绍

Producer 是 RocketMQ 消息的投递者,负责生产消息。它会与NameServer集群中的其中一个节点(随机)建立长连接(Keep-alive),定期从NameServer读取Topic路由信息,将路由信息保存在本地内存中;它向提供Topic服务的Master Broker建立长连接,且定时向Master Broker发送心跳;它只会向Master Broker发送消息,从Message Queue列表中选择合适的Queue发送消息,实现负载均衡;它支持发送消息类型有多种,例如:普通消息、事物消息、定时消息等;它发送消息的方式支持三种:同步、异步、单向方式等.可简单查看生产端与Master Broker 和NameServer简单交互图:

image.png

备注: 生产者还可向broker查询消息等其他功能交互。

二、生产者启动流程:

在了解具体生产启动流程之前,我们先提出出几个问题,带着问题去分析源码:

  1. 消息生产者启动时具体做了什么?
  2. 一个应用需要发送多个topic,不同topic需要发送到不同集群的broker,如何处理?

我们可先了解和分析生产者相关的类图关系:

image.png

从类图中可以看出,MQProducer有两种实现方式。

一个是 DefaultMQProducer(非事务消息生产者); 一个是 TransactionMQProducer(支持事务消息);

接下来先对接个类核心参数或方法进行简单分析:

##2.1 MqAdmin

MqAdmin:核心方法解析(Mq管理基础接口)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
arduino复制代码//创建一个主题
void createTopic(final String key, final String newTopic, final int queueNum) throws MQClientException;
//根据 时间戳从队列中 查找其偏移量
long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
//查找该消息 队列中 最大的物理偏移量
long maxOffset(final MessageQueue mq) throws MQClientException;
//查找该消息队列中最小物理偏移量。
long minOffset(final MessageQueue mq) throws MQClientException;
//获取最早的存储消息时间
long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
//根据消息偏移量查找消息
MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
//根据条件查询消息
QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end) throws MQClientException, InterruptedException;
//根据 主题 与 消息ID 查找消息 。
MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

2.2 核心方法解析

MQProducer:核心方法解析(生产者基础接口):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码//启动
void start() throws MQClientException;
//关闭
void shutdown();
//根据topic获取对应队列信息
List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
//同步-消息发送
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,RemotingException, MQBrokerException, InterruptedException;
//异步-消息发送
void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
//同步-选择队列消息发送
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg, final long timeout) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;
//单向-消息发送
void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, InterruptedException;
//事务消息-发送
TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException;
//批量消息-发送
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

备注: 其中启动start()和关闭shutdown()表示生产者的启动和关闭、

2.3 clientConfig

clientConfig:核心属性方法解析(客户端配置)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
typescript复制代码//nameServer-地址,默认从:系统属性:rocketmq.namesrv.addr 或 环境变量:NAMESRV_ADDR 中获取
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
//实例名字,默认:DEFAULT 或者 系统属性-rocketmq.client.name
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
//构建 mq客户端的 id,例子:ip@instanceName@unitName : 172.16.62.75@19312@unitName
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}

//设置namesrv地址
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}

备注: namesrvAddr表示nameServer地址,可调用setNamesrvAddr方法设置,或者通过环境变量、系统属性设置;buildMQClientId表示设置生产者Id.

三、TransactionMQProducer:(事务消息,后续单独讲解,本章忽略)

(略)

四、DefaultMQProducer 核心属性方法解析:(非事务消息生产者)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码// 构造器
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
// 各种发送消息
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg);
}
// 启动方法
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}

备注: DefaultMQProducer的构造器,send和start等相关的方法,其实都是围绕DefaultMQProducerImpl来转,defaultMQProducerImpl:默认生产者的实现类,其start方法作为生产者启动的核心方法,接下来将核心分析其start方法的实现.

DefaultMQProducerImpl#start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
kotlin复制代码/**
* mq-producer 启动
* @param startFactory
* @throws MQClientException
*/
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
// 0-服务状态设置
this.serviceState = ServiceState.START_FAILED;
//1-检测配置
this.checkConfig();
//2-并改变生产者的 instanceName为进程 ID。
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
//3-创建 MQClientlnstance实例
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//4-向 MQClientlnstance注册生产者。
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//5-默认topic信息缓存( this.defaultMQProducer.getCreateTopicKey() = 'TBW102' )
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
//6-是否启动-mQClientFactory
if (startFactory) {
mQClientFactory.start();
}

log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;//设置状态为 运行中
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//7-发送心跳到所有broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

分析如下:

0-服务状态设置:

设置状态值的意义是为了防止重复启动,其枚举类为:ServiceState; 如果初始化状态不等于:CREATE_JUST,则异常跑出

1-检测配置:

1
2
3
4
5
6
7
8
9
10
11
csharp复制代码private void checkConfig() throws MQClientException {
Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
if (null == this.defaultMQProducer.getProducerGroup()) {
throw new MQClientException("producerGroup is null", null);
}
//生产所属组 不能等于 DEFAULT_PRODUCER
if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
null);
}
}

备注:为了检测-producerGroup的合法性

2-并改变生产者的instanceName为进程 ID。

1
2
3
4
5
6
7
8
9
10
11
csharp复制代码// 判断producerGroup是否等于CLIENT_INNER_PRODUCER
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}

是调用ClientConfig#changeInstanceNameToPID
publicvoid changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
}
}

备注:instanceName == DEFAULT, 将其改为 启动的 进程ID,目的是为了MQClientInstance的构建

3-创建MQClientlnstance实例

MQClientManager管理MQClientInstance,其内部维护的数据结构为:ConcurrentHashMap,key:clientId,且MQClientManager本身是单例模式,核心方法分析如下: MQClientManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ini复制代码private static MQClientManager instance = new MQClientManager();//-单列模式
private AtomicInteger factoryIndexGenerator = new AtomicInteger();//index的工厂
// MQClientInstance 缓存
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>();

//构建返回MQClientInstance
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();//构建mq客户端的 id
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance = new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}

备注:

ClientConfig.buildMQClientId 在上面已分析,是为了构建clientId;getAndCreateMQClientInstance此方法的目的就是为了构建或查询MQClientInstance. MQClientInstance:封装了 RocketMQ 网络处理 API,是消息生产者( Producer)、消息消费者 (Consumer)与 NameServer、 Broker打交道的网络通道.

接下来分析多个生产者公用同一个MQClientInstance的优点和缺点:

  1. 优点:一般来讲,为了减少客户端的使用资源,如果将所有的instanceName和unitName设置为同样的值,就会只创建一个MQClientInstance实例(用于生产者的topic发送消息在同一套broker集群)
  2. 缺点:如果多个topic复用MQClientInstance会有怎么的结果呢?这种情况会出现在你在一个JVM里启动了多个Producer时,且没有设置instanceName和unitName,那么这两个Producer会公用一个MQClientInstance,发送的消息会路由到同一个集群。

例如,你起了两个Producer,并且配置的NameServer地址不一样,本意是让这两个Producer往不同集群上分配消息,但是由于共用了一个MQClientInstance,这个MQClientInstance是基于先来的Producer配置构建的,第二个Producer和他公用后被认为是同一instance,配置是相同的,消息的路由就是相同的,就没有达到你想要的效果。

4-向MQClientInstance注册生产者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码//key:group,    value: 生产者
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
// 将当前生产者加入到 MQClientlnstance管理中
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
}

备注:DefaultMQProducerImpl实现的接口类为:MQProducerInner

5-添加默认topic信息缓存,此处需要理解topicPublishInfoTable数据结构的意思

1
2
3
arduino复制代码//key:topic value:TopicPublishInfo-路由相关信息,用于消息发送
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();

TopicPublishInfo:

分析,熟悉的佩服熟悉的味道,MessageQueue和TopicRouteData在NameServer已分析相当清除,分析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
kotlin复制代码public class TopicPublishInfo {
//是否是顺序消息
private boolean orderTopic = false;
//是否包含路由信息
private boolean haveTopicRouterInfo = false;
//该主题队列的消息队列
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
//每选择一次消息 队列, 该值会自增1,如果 Integer.MAX_VALUE, 则重置为 0,用于选择消息队列。
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
//路由信息
private TopicRouteData topicRouteData;
//选择队列方法,lastBrokerName其实上一次发送失败的brokerName,如果不为空,本次选择队列发送所在的brokerName则选择其他的brokerName
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
//如果消息发 送再失败的话 , 下次进行 消息队列选择 时规避上次 MesageQueue 所 在的 Broker, 否 则还 是 很有可能再次失败 。
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
//直接用 sendWhichQueue 自增再获取值 , 与当前路由 表 中消息 队列个数取模, 返回该位置的 MessageQueue(selectOneMessageQueue()方法)
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
//messageQueueList-长度大于0
public boolean ok() {
return null != this.messageQueueList && !this.messageQueueList.isEmpty();
}
//是否包含路由信息
public boolean isHaveTopicRouterInfo() {
return haveTopicRouterInfo;
}

6-启动-MQClientInstance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
kotlin复制代码     MQClientInstance#start
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED; //1>状态-设置启动失败
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) { //2>判断nameSrvAddr地址是否为空,http获取nameSrvAddr
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel : netty
this.mQClientAPIImpl.start(); // 3>启动netty相关
// Start various schedule tasks
this.startScheduledTask(); //4>【重要】启动相关定时任务
// Start pull service
this.pullMessageService.start(); //5>消费端相关,后续讲解
// Start rebalance service
this.rebalanceService.start(); //6>消费端相关,后续讲解
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false); //7>内部启动一个mqProducter,startFactory=false
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING; >8 状态设置运行中
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}

备注:后续单独讲解:this.startScheduledTask();

7-发送心跳到所有broker

(this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
kotlin复制代码        public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllBroker(); 1> 发送心跳到所有broker
this.uploadFilterClassSource(); 2> 更新过滤filterSource 可忽略不看
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed.");
}
}

备注:sendHeartbeatToAllBroker,相对简单,

对返回结果维护了brokerVersionTable(ConcurrentHashMap),你不可错过.因为会有定时任务定时发送心跳至所有broker

小结: 通过7个步骤我们已经了解到生产者的启动流程,大致分为:检测相关配置、注册构建相关类(例如:MQClientInstance相关、netty相关等)、然后启动相关定时任务;简单总结生产者的启动流程,如下:

image.png


本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Flask 入门系列之 数据库集成!

发表于 2021-11-18

这是我参与11月更文挑战的第18天,活动详情查看:2021最后一次更文挑战。

数据库是大多数 Web 应用的基础设施,只要想把数据存储下来,就离不开数据库,下面将一起学习一下如何给 Flask 应用添加数据库支持。

使用Flask-SQLAlchemy管理数据库

Flask-SQLAlchemy 集成了 SQLAlchemy,它简化了连接数据库服务器、管理数据库及操作会话等各类工作,让 Flask 更方便的进行数据存储及处理,我们不必过多关心原生 SQL 语句,只需要使用 Python 类就可以轻松的完成对数据库表的增删改查操作,并且该插件还支持多种数据库类型,如MySQL、PostgreSQL、和SQLite等。

我们可以使用pip install flask-sqlalchemy进行安装。

配置Flask_SQLAlchemy

下面以 SQLite 数据库为例,Flask-SQLAlchemy 数据库的 url 通过配置变量SQLALCHEMY_DATABASE_URI指定,通过 Flask-SQLAlchemy 提供的 SQLAlchemy 类传入 Flask 的实例 app,创建 db 实例,表示程序使用的数据库,这个 db 对象能够使用 Flask-SQLAlchemy 的所有功能。

1
2
3
4
5
6
7
8
9
10
python复制代码from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

basedir = os.path.abspath(app.root_path)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(basedir, 'data.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)

其中,SQLALCHEMY_TRACK_ MODIFICATIONS配置变量表示是否追踪对象的修改,在此设为False。

定义数据库模型

所谓数据模型,就是用来映射数据库表的 Python 类,一个数据模型类对应数据库中的一个表,类的属性代表数据库表的字段。所有的模型类都需要继承 Flask-SQLAlchemy 提供db.Model基类。

新建model.py文件,定义User类如下:

1
2
3
4
5
6
7
python复制代码from app import db


class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_name = db.Column(db.String)
password = db.Column(db.String)

创建数据库和表

运行 Flask 应用,然后在终端中输入flask shell命令,进入 Python 交互环境,如下所示:

1
2
3
python复制代码>>> from app import db  # 从app.py中导入db实例
>>> from model import * # 从model.py中导入所有模型类
>>> db.create_all() # 用db.create_all()创建数据库表

执行完之后,会在项目根目录生成一个data.db库文件。
注意:
数据库和表一旦创建后,之后对模型类的改动不会自动作用到实际的表中,比如,在模型类中添加或删除字段,修改字段的名称和类型,再次调用db.create_all()不会重新创建表或是更新表,只有通过db.drop_all()删除数据库中所有的表之后再调用db.create_all()才能重新创建表,那么就会出现这么一个问题:

这样操作的话,数据库表被删除重建了,那表中原有的数据也都没有了,这肯定是不行的,这时就出现了数据库迁移的概念,先留个坑,下篇文章介绍。

数据库操作

现在我们创建了模型,也生成了数据库和表,接下来就来学习一下常用的数据库操作。数据库操作主要是CRUD(Create 创建、Read 读取/查询、 Update 更新、Delete 删除)。

Create

添加一条记录到数据库中,主要分为以下三步:

  • 使用 Python 模型类创建对象作为一条记录
  • 添加新创建的对象到数据库会话中
  • 提交数据库会话
    如下,在上面的交互环境下,创建一个新用户:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
python复制代码>>> from app import db  # 从app.py中导入db实例
>>> from model import User # 导入模型类User
>>> user1=User(user_name='tigeriaf', password='123456') # 创建用户1
>>> user2=User(user_name='admin', password='123456') # 创建用户2
# 添加新创建的对象到数据库会话中
>>> db.session.add(user1)
>>> db.session.add(user2)
# 将数据库会话提交,数据写入data.db文件
>>> db.session.commit()
# 测试
>>> print(user1.id)
1
>>> print(user2.id)
2

image.png

另外,除了依次调用add()方法添加记录,也可以使用add_all()一次添加包含多个模型类对象的列表。

Read

使用模型类

提供的 query 属性
通过模型类提供的 query 属性附加调用各种过滤方法可以查询数据库表的数据,查询模式如下:

<模型类>.query.<过滤方法>.<查询方法>

从某个模型类出发,通过在 query 属性对应的 Query 对象上附加的过滤方法和查询函数对模型类对应的表中的记 进行各种筛选等,最终返回包含对应数据库记录数据的模型类实例,对返回的实例调用属性即可获取对应的字段数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
python复制代码# 查询全部
>>> User.query.all()
[<User 1>, <User 2>]
# 查询指定id的记录
>>> user1=User.query.get(1)
>>> user1.user_name
'tigeriaf'
# 查询条数
>>> User.query.count()
2
# 查询user_name为admin的用户
>>> User.query.filter_by(user_name='admin').all()
[<User 2>]

SQLAlcherny 提供了很多过滤方法,使用这些过滤方法可以获取更精确的查询,这里就不展开了。
完整的查询、过滤方法可以查看:The Query Object。

Update

更新一条记录非常简单,直接给模型类的属性附上新的值,然后调用commit()方法提交会话即可。
如下,修改 id 为 2 的用户的用户名 user_name。

1
2
3
4
5
6
7
python复制代码>>> user2=User.query.get(2)
>>> user2.user_name
'admin'
>>> user2.user_name='张三'
>>> db.session.commit()
>>> user2.user_name
'张三'

Delete

使用delete()进行数据记录的删除,如下:

1
2
3
4
5
python复制代码>>> db.session.delete(user2)
>>> db.session.commit()
>>> user2=User.query.get(2)
>>> print(user2)
None

在视图函数里操作数据库

在视图函数里操作数据库的方式在 Python Shell 交互环境下大致是相同的,无非就是多了从请求对象获取数据及验证数据的步骤,如下一个案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
python复制代码@app.route('/user', methods=['GET', 'POST'])
def user():
if request.method == 'POST':
user_name = request.form['user_name']
password = request.form['password']
user = User(user_name=user_name, password=password)
db.session.add(user)
db.session.commit()
return 'user:{} add success! id:{}'.format(user_name, user.id)

else:
user_id = request.args.get('user_id')
user = User.query.get(user_id)
if user:
return 'Hello user:{}!'.format(user.user_name)
else:
return 'failed'

上述代码中,视图函数 user 接受两种方式的请求,分别完成添加用户、查询用户的功能,将接收数据存储在数据库中。

发送请求测试如下:
image.png

image.png

原创不易,如果小伙伴们觉得有帮助,麻烦点个赞再走呗~

最后,感谢女朋友在工作和生活中的包容、理解与支持 !

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

那一天,我被Redis主从架构支配的恐惧

发表于 2021-11-18

面试官:要不你来讲讲你最近在看的点呗?可以拉出来一起讨论下(今天我也不知道要问什么)

候选者:最近在看「Redis」相关的内容

面试官:嗯,我记得已经问过Redis的基础和持久化了

面试官:要不你来讲讲你公司的Redis是什么架构的咯?

候选者:我前公司的Redis架构是「分片集群」,使用的是「Proxy」层来对Key进行分流到不同的Redis服务器上

候选者:支持动态扩容、故障恢复等等…

面试官:那你来聊下Proxy层的架构和基本实现原理?

候选者:抱歉,这块由中间件团队负责,具体我也没仔细看过

候选者:…

面试官:….

候选者:不过,我可以给你讲讲现有常见开源的Redis架构(:

面试官:那只能这样了,好吧,你开始吧

候选者:那我从基础讲起吧?

候选者:在之前提到了Redis有持久化机制,即便Redis重启了,可以依靠RDB或者AOF文件对数据进行重新加载

候选者:但在这时,只有一台Redis服务器存储着所有的数据,此时如果Redis服务器「暂时」没办法修复了,那依赖Redis的服务就没了

候选者:所以,为了Redis「高可用」,现在基本都会给Redis做「备份」:多启一台Redis服务器,形成「主从架构」

候选者:「从服务器」的数据由「主服务器」复制过去,主从服务器的数据是一致的

候选者:如果主服务器挂了,那可以「手动」把「从服务器」升级为「主服务器」,缩短不可用时间

面试官:那「主服务器」是如何把自身的数据「复制」给「从服务器」的呢?

候选者:「复制」也叫「同步」,在Redis使用的是「PSYNC」命令进行同步,该命令有两种模型:完全重同步和部分重同步

候选者:可以简单理解为:如果是第一次「同步」,从服务器没有复制过任何的主服务器,或者从服务器要复制的主服务器跟上次复制的主服务器不一样,那就会采用「完全重同步」模式进行复制

候选者:如果只是由于网络中断,只是「短时间」断连,那就会采用「部分重同步」模式进行复制

候选者:(假如主从服务器的数据差距实在是过大了,还是会采用「完全重同步」模式进行复制)

面试官:那「同步」的原理过程可以稍微讲下嘛?

候选者:嗯,没问题的

候选者:主服务器要复制数据到从服务器,首先是建立Socket「连接」,这个过程会干一些信息校验啊、身份校验啊等事情

候选者:然后从服务器就会发「PSYNC」命令给主服务器,要求同步(这时会带「服务器ID」RUNID和「复制进度」offset参数,如果从服务器是新的,那就没有)

候选者:主服务器发现这是一个新的从服务器(因为参数没带上来),就会采用「完全重同步」模式,并把「服务器ID」(runId)和「复制进度」(offset)发给从服务器,从服务器就会记下这些信息。

面试官:嗯…

候选者:随后,主服务器会在后台生成RDB文件,通过前面建立好的连接发给从服务器

候选者:从服务器收到RDB文件后,首先把自己的数据清空,然后对RDB文件进行加载恢复

候选者:这个过程中,主服务器也没闲着(继续接收着客户端的请求)

面试官:嗯…

候选者:主服务器把生成RDB文件「之后修改的命令」会用「buffer」记录下来,等到从服务器加载完RDB之后,主服务器会把「buffer」记录下的命令都发给从服务器

候选者:这样一来,主从服务器就达到了数据一致性了(复制过程是异步的,所以数据是『最终一致性』)

面试官:嗯…

面试官:那「部分重同步」的过程呢?

候选者:嗯,其实就是靠「offset」来进行部分重同步。每次主服务器传播命令的时候,都会把「offset」给到从服务器

候选者:主服务器和从服务器都会将「offset」保存起来(如果两边的offset存在差异,那么说明主从服务器数据未完全同步)

候选者:从服务器断连之后,就会发「PSYNC」命令给主服务器,同样也会带着RUNID和offset(重连之后,这些信息还是存在的)

面试官:嗯…

候选者:主服务器收到命令之后,看RUNID是否能对得上,对得上,说明这可能以前就复制过一部分了

候选者:接着检查该「offset」是否在主服务器记录的offset还存在

候选者:(这里解释下,因为主服务器记录offset使用的是一个环形buffer,如果该buffer满了,会覆盖以前的记录)

候选者:如果找到了,那就把从缺失的一部分offer开始,把对应的修改命令发给从服务器

候选者:如果从环形buffer没找到,那只能使用「完全重同步」模式再次进行主从复制了

面试官:主从复制这块我了解了,那你说到现在,Redis主库如果挂了,你还是得「手动」将从库升级为主库啊

面试官:你知道有什么办法能做到「自动」进行故障恢复吗?

候选者:必须的啊,接下来就到了「哨兵」登场了

面试官:开始你的表演吧。

候选者:「哨兵」干的事情主要就是:监控(监控主服务器的状态)、选主(主服务器挂了,在从服务器选出一个作为主服务器)、通知(故障发送消息给管理员)和配置(作为配置中心,提供当前主服务器的信息)

候选者:可以把「哨兵」当做是运行在「特殊」模式下的Redis服务器,为了「高可用」,哨兵也是集群架构的。

候选者:首先它需要跟Redis主从服务器创建对应的连接(获取它们的信息)

候选者:每个哨兵不断地用ping命令看主服务器有没有下线,如果主服务器在「配置时间」内没有正常响应,那当前哨兵就「主观」认为该主服务器下线了

候选者:其他「哨兵」同样也会ping该主服务器,如果「足够多」(还是看配置)的哨兵认为该主服务器已经下线,那就认为「客观下线」,这时就要对主服务器执行故障转移操作。

面试官:嗯…

候选者:「哨兵」之间会选出一个「领头」,选出领头的规则也比较多,总的来说就是先到先得(哪个快,就选哪个)

候选者:由「领头哨兵」对已下线的主服务器进行故障转移

面试官:嗯…

候选者:首先要在「从服务器」上挑选出一个,来作为主服务器

候选者:(这里也挑选讲究,比如:从库的配置优先级、要判断哪个从服务器的复制offset最大、RunID大小、跟master断开连接的时长…)

候选者:然后,以前的从服务器都需要跟新的主服务器进行「主从复制」

候选者:已经下线的主服务器,再次重连的时候,需要让他成为新的主服务器的从服务器

面试官:嗯…我想问问,Redis在主从复制的和故障转移的过程中会导致数据丢失吗

候选者:显然是会的,从上面的「主从复制」流程来看,这个过程是异步的(在复制的过程中:主服务器会一直接收请求,然后把修改命令发给从服务器)

候选者:假如主服务器的命令还没发完给从服务器,自己就挂掉了。这时候想要让从服务器顶上主服务器,但从服务器的数据是不全的(:

候选者:还有另一种情况就是:有可能哨兵认为主服务器挂了,但真实是主服务器并没有挂( 网络抖动),而哨兵已经选举了一台从服务器当做是主服务器了,此时「客户端」还没反应过来,还继续写向旧主服务器写数据

候选者:等到旧主服务器重连的时候,已经被纳入到新主服务器的从服务器了…所以,那段时间里,客户端写进旧主服务器的数据就丢了

候选者:上面这两种情况(主从复制延迟&&脑裂),都可以通过配置来「尽可能」避免数据的丢失

候选者:(达到一定的阈值,直接禁止主服务器接收写请求,企图减少数据丢失的风险)

面试官:要不再来聊聊Redis分片集群?

候选者:嗯…分片集群说白了就是往每个Redis服务器存储一部分数据,所有的Redis服务器数据加起来,才组成完整的数据(分布式)

候选者:要想组成分片集群,那就需要对不同的key进行「路由」(分片)

候选者:现在一般的路由方案有两种:「客户端路由」(SDK)和「服务端路由」(Proxy)

候选者:客户端路由的代表(Redis Cluster),服务端路由的代表(Codis)

面试官:要不来详细讲讲它们的区别呗?

候选者:今天有点儿困了,要不下次呗?

本文总结:

  • Redis实现高可用:
+ AOF/RDB持久化机制
+ 主从架构(主服务器挂了,手动由从服务器顶上)
+ 引入哨兵机制自动故障转义
  • 主从复制原理:
+ PSYNC命令两种模式:完全重同步、部分重同步
+ 完全重同步:主从服务器建立连接、主服务器生成RDB文件发给从服务器、主服务器不阻塞(相关修改命令记录至buffer)、将修改命令发给从服务器
+ 部分重同步:从服务器断线重连,发送RunId和offset给主服务器,主服务器判断offset和runId,将还未同步给从服务器的offset相关指令进行发送
  • 哨兵机制:
+ 哨兵可以理解为特殊的Redis服务器,一般会组成哨兵集群
+ 哨兵主要工作是监控、告警、配置以及选主
+ 当主服务器发生故障时,会「选出」一台从服务器来顶上「客观下线」的服务器,由「领头哨兵」进行切换
  • 数据丢失:
+ Redis的主从复制和故障转移阶段都有可能发生数据丢失问题(通过配置尽可能避免)

欢迎关注我的微信公众号【Java3y】来聊聊Java面试,对线面试官系列持续更新中!

【对线面试官-移动端】系列 一周两篇持续更新中!

【对线面试官-电脑端】系列 一周两篇持续更新中!

原创不易!!求三连!!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

小码农教你复制带随机指针的链表 复制带随机指针的链表

发表于 2021-11-18

「这是我参与11月更文挑战的第18天,活动详情查看:2021最后一次更文挑战」

复制带随机指针的链表

天下傻逼独一个就是我,我忘记了选c语言,用c++结果错误看一脸懵逼

image-20211028093751124

题目

image-20211027185429832

这题链表的复制难的地方就是随机指针random 如何复制他的指向,很多人的确想到了malloc节点,我也想到了malloc一个一个的节点,但是基本所有人都卡死在这里了,就是我们如何链,在这里我们就会又遇到先天性大佬的天赋思维力的压制,上一题是空间联想的压制。(来吧我小码农就喜欢给先天以及后天的大佬锤炼)

思想

普通拷贝

image-20211027202915998

实际上想到这样也已经了不起了,有点磨具雏形了

顺藤摸瓜(这个是真的奇兵记,暗度陈仓的感觉)

image-20211028004315304

脱裤子还需提裤子人(上面暗度陈仓,这个就是单刀直入)

image-20211028004358166

代码实现

cur为NULL的时候就是停止copy的时候
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
c复制代码	struct Node* cur = head;
if(!cur)
return NULL;
while(cur)
{
//每次我们都要malloc一个copy节点
struct Node* copy = (struct Node*)malloc(sizeof(struct Node));
//开始给数据
copy->val = cur->val;
//开始插入
copy->next = cur->next;
cur->next = copy;
//开始cur移动
cur = copy->next;
}

有人说为什么不在上面malloc节点的时候就链,因为那时候小孩还没出生你就让你的还在上学吗,没错为了卷你们,我准备偷偷生个小孩
1
2
3
4
5
6
7
8
9
10
11
12
c复制代码 //节点copy好后开始random相链
cur = head;
while(cur)
{
//重新把之前开辟的节点给copy维护
struct Node* copy = cur->next;
if(cur->random == NULL)
copy->random = NULL;
else
copy->random = cur->random->next;//上面的GIF的核心代码
cur = copy->next;
}

然后拿下来一个个尾插,就是单刀直入你要还原原来的链表顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
c复制代码 //开始一个一个尾插起来
struct Node* copyHead = NULL;
struct Node* copyTail = NULL;
cur = head;
while(cur)
{
//重新把之前开辟的节点给copy维护
struct Node* copy = cur->next;
//这边你需要一个next来还原原来链表顺序,你不能用完人家对人家不负责任
struct Node* next = copy->next;
if(copyHead == NULL)
{
copyHead = copy;
copyTail = copy;
}
else
{
copyTail->next = copy;
copyTail = copy;
}
cur->next = next;//这一步看你对原来链表负不负责任
cur = next;
}
return copyHead;

image-20211028094601284

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
c复制代码struct Node* copyRandomList(struct Node* head) {
struct Node* cur = head;
if(!cur)
return NULL;
while(cur)
{
//每次我们都要malloc一个copy节点
struct Node* copy = (struct Node*)malloc(sizeof(struct Node));
//开始给数据
copy->val = cur->val;
//开始插入
copy->next = cur->next;
cur->next = copy;
//开始cur移动
cur = copy->next;
}
//节点copy好后开始random相链
cur = head;
while(cur)
{
//重新把之前开辟的节点给copy维护
struct Node* copy = cur->next;
if(cur->random == NULL)
copy->random = NULL;
else
copy->random = cur->random->next;//上面的GIF的核心代码
cur = copy->next;
}
//开始一个一个尾插起来
struct Node* copyHead = NULL;
struct Node* copyTail = NULL;
cur = head;
while(cur)
{
//重新把之前开辟的节点给copy维护
struct Node* copy = cur->next;
//这边你需要一个next来还原原来链表顺序,你不能用完人家对人家不负责任
struct Node* next = copy->next;
if(copyHead == NULL)
{
copyHead = copy;
copyTail = copy;
}
else
{
copyTail->next = copy;
copyTail = copy;
}
cur->next = next;//这一步看你对原来链表负不负责任
cur = next;
}
return copyHead;
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

利用stream对map集合进行过滤

发表于 2021-11-18

「这是我参与11月更文挑战的第11天,活动详情查看:2021最后一次更文挑战」。

最近公司在大张旗鼓的进行代码审核,从中也发现自己写代码的不好习惯。一次无意的点到了公司封装的对map集合过滤的方法,发现了stream。于是研究了一下。并对原有的代码再次结合Optional进行重构下

原有方法说明

主要处理过滤条件Map对象,过滤掉了null和空字符串 等操作

  • 1.利用filter对null和空字符串进行过滤
  • 2.利用map进行对于Stream中包含的元素使用给定的转换函数进行转换操作
  • 3.collect进行聚合成map

由于公司的代码不合适自己展示,我这里自己仿照公司的写了个类似的简单方法,然后一步一步优化

自定义map工具类处理方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
typescript复制代码//这里的代码和原先的代码相比做了修改,去掉了map进行对于Stream中包含的元素使用给定的转换函数进行转换操作

public static Map<String, Object> parseMapForFilter(Map<String, Object> map) {
if (map == null) {
return null;
} else {
map = map.entrySet().stream()
.filter((e) -> checkValue(e.getValue()))
.collect(Collectors.toMap(
(e) -> (String) e.getKey(),
(e) -> e.getValue()
));
}
return map;
}

private static boolean checkValue(Object object) {

if (object instanceof String && "".equals(object)) {
return false;
}

if (null == object) {
return false;
}

return true;


}

测试下

1
2
3
4
5
6
7
8
9
10
11
csharp复制代码    public static void main(String[] args) {
Map<String,Object> params = new HashMap<>(16);

params.put("a","");
params.put("b",null);
params.put("c","c");

params = MapUtil.parseMapForFilter(params);
System.out.println(params);
System.out.println(MapUtil.parseMapForFilter(null));
}

输出结果

1
2
ini复制代码{c=c}
null

优化parseMapForFilter 方法,加入Optional类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scss复制代码public static Map<String, Object> parseMapForFilterByOptional(Map<String, Object> map) {

return Optional.ofNullable(map).map(
(v) -> {
Map params = v.entrySet().stream()
.filter((e) -> checkValue(e.getValue()))
.collect(Collectors.toMap(
(e) -> (String) e.getKey(),
(e) -> e.getValue()
));

return params;
}
).orElse(null);
}

是不是感觉更清晰了呢?

测试一下

1
2
3
4
5
6
7
8
9
10
11
12
csharp复制代码    public static void main(String[] args) {
Map<String, Object> params = new HashMap<>(16);

params.put("a", "");
params.put("b", null);
params.put("c", "c");

params = MapUtil.parseMapForFilterByOptional(params);

System.out.println(params);
System.out.println(MapUtil.parseMapForFilterByOptional(null));
}

结果

1
2
ini复制代码{c=c}
null

优化checkValue方法

1
2
3
4
5
typescript复制代码    private static boolean checkValueByOptional(Object object) {
return (Boolean) Optional.ofNullable(object)
.filter((e) -> e instanceof String && e.equals("") ? false : true)
.orElse(false);
}

总结

大家是不是感觉lambda 写法更加简单明了,不再充满着if判断。但如果大家首写的时候,肯定感觉不习惯,我刚开始写的时候,也是很别扭,根本不知道怎么写。一点点去尝试。更多细节基础的问题这里不再叙述。

只是感觉这个对map的处理还是很好的。并且实际工作中用到的地方比较多。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Hi,你想要的在线创建架构图都在这儿!(一)

发表于 2021-11-18

这是我参与11月更文挑战的第11天,活动详情查看:2021最后一次更文挑战

🤞 个人主页:@青Cheng序员石头

🤞 粉丝福利:加粉丝群 一对一问题解答,获取免费的丰富简历模板、提高学习资料等,做好新时代的卷卷王!

在《一页纸创意思考术》这本书上,作者相当明确的表达了“不用怀疑,世界上所有的问题都可以用画图解决”!相较于文字,图不仅有助于自己清晰思考,更有助于让他人更容易理解你的想法。

一、前言

IT 项目负责人或架构师,甚至普通的软件开发人员的基本任务之一是创建应用程序关系图。它可以是应用程序流、基础结构图或软件设计等等。

对于专业的画图工具,我们其实再熟悉不过了,比如微软的Visio,它也许是我们接触的最早的专业画图工具了。但使用它毕竟要在本地电脑上进行安装配置,甚至需要为其付费,在使用上不够方便,在线的话语工具应运而生。

作为软件设计开发人员,如果我们能在几分钟内创建一个漂亮的专业软件或基础设施图,那将是多么棒的一件事情。今天这篇文章带大家了解几款流行的在线画图的工具。

二、Draw IO

首先强烈推荐一个免费的工具,它就是draw.io,使用它有助于我们创建流程图或任何具有大量形状的图表,并且能做到正确可视化我们想要表达的基础架构。它还有一个强大的功能,那就是提供了丰富的模板,让我们可以借鉴这些模板更好的表达我们的想法。

screenshot-app-diagrams-net-1637158559552.png
其支持云端本地等多种存储方式

screenshot-app-diagrams-net-1637159092348.png

screenshot-app-diagrams-net-1637158631886.png

借助模板创建部署图

总而言之,免费的、支持丰富模板、自由度很高,这样的在线画图软件不香?不过其确实也有缺点,比如分享、多人协作、思维导图是不支持的,这个就看使用场景了。

二、Process On

这个工具, process on是一个专业强大的作图工具,支持多人实时在线协作,可用于原型图、UML、BPMN、网络拓扑图等多种图形绘制。

其同样提供了丰富的模板,更加友好的画图体验等,并且还提供了活跃的社区、模板分享、团队模式等丰富的功能。

但其不完全免费,免费的体验受限于相当少的文件存储个数、有限的图导出格式、协作人员人数有限、思维导图功能简单等等。

screenshot-www-processon-com-diagraming-61414d521efad46826bd9f90-1637159911302.png

screenshot-www-processon-com-diagrams-new-1637159834375.png

接下来的在线创建架构图的工具还会有:

  • Terrastruct
  • gliffy
  • Edraw Max
  • Cacoo
  • Lucidchart
  • Creately
  • Coggle
  • Mindmeister
  • yED

朋友,你用的是哪一种呢?评论留言你常用的工具,你认为最好的在线画图工具!


少年,没看够?点击石头的详情介绍,随便点点看看,说不定有惊喜呢?欢迎支持点赞/关注/评论,有你们的支持是我更文最大的动力,多谢啦!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

堆的实现

发表于 2021-11-18

「这是我参与11月更文挑战的第17天,活动详情查看:2021最后一次更文挑战」

💦 堆的实现

1、堆向下调整算法
1
2
scss复制代码现在我们给出一个数组,逻辑上看做一颗完全二叉树。我们通过从根节点开始的向下调整算法可以把它调整成一个小堆。
向下调整算法有一个前提:左右子树必须是一个堆 (包括大堆和小堆),才能调整。

    ❗ 建堆 ❕

        有一个随机值的数组,把它理解成完全二叉树,并模拟成堆 (大堆/小堆)

    ——————————————————-Cut———————————————————

1
c复制代码int array[] = {27, 15, 19, 18, 28, 34, 65, 49, 25, 37}

    ❓ 观察上面这组数据 ❔

        根下面的左右子树都是小根堆,其实堆向下调整算法就是针对这种特殊数据结构

    ——————————————————-Cut———————————————————

    ❓ 针对于这种类型的数据应该怎么调堆 ❔

       思路:从根开始与左右孩子比较,如果孩子比父亲小,则两两交换位置,再继续往下调,直到左右孩子都比父亲大或者调到叶子具体见下图:

在这里插入图片描述
    ——————————————————-Cut———————————————————

    ❓ 如果不满足左右子树是堆,怎么调整 ❔

1
c复制代码int array[] = {27, 37, 28, 18, 19, 34, 65, 25, 49, 15}

    根的左右子树 37、28 都不满足:这里的想法就是先让左右子树先满足;而对于左右子树 37、28 来说又需要让 37 先满足;这样依此类推直至满足堆的条件。那干脆就从倒数的第一棵树,也就是倒数的第一个非叶子节点开始调整

在这里插入图片描述
在这里插入图片描述

2、堆的创建

    ❗ 这里从倒数的第一个非叶子节点开始调整 ❕

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
c复制代码#include<stdio.h>

//实现父子交换的函数
void Swap(int* px, int* py)
{
int temp = *px;
*px = *py;
*py = temp;
}
//实现调整
void AdjustDown(int* arr, int sz, int parent)
{
//确定左孩子的下标
int child = parent * 2 + 1;
//孩子的下标超出数组的范围就停止
while (child < sz)
{
//确定左右孩子中较小/大的那个
//左孩子大于右孩子,所以让child记录较小孩子的下标 || (arr[child]<arr[child+1]记录较大孩子的下标)
if (arr[child] > arr[child + 1] && child + 1 < sz)
{
child++; //(当只有一个左孩子时,会越界,且后面使用时会发生非法访问)
}
//判断父亲和小孩子
//小孩子小于父亲,则交换,且继续调整 || (arr[child]>arr[parent]大孩子大于父亲,则交换,且继续调整)
if (arr[child] < arr[parent])
{
Swap(&arr[child], &arr[parent]);
//迭代
parent = child;
//重新确定左孩子的下标(当最后的叶子节点是parent时,这时去确定child会以读的方式越界,但可以不关心)
child = parent * 2 + 1;
}
//小孩子大于父亲,则停止调整
else
{
break;
}
}
}
//堆排序 -> 效率更高
void HeapSort(int* arr, int sz)
{
//建堆
int i = 0;
//从最后一棵树开始调整,也就是最后一个节点的父亲
for (i = (sz - 1 - 1) / 2; i >= 0; i--)
{
AdjustDown(arr, sz, i);
}
}
int main()
{
//左右子树都为堆
int arr1[] = { 27, 15, 19, 18, 28, 34, 65, 49, 25, 37 };
//左右子树都为非堆
int arr2[] = { 27, 37, 28, 18, 19, 34, 65, 25, 49, 15 };

HeapSort(arr1, sizeof(arr1) / sizeof(arr1[0]));
int i = 0;
for (i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++)
{
printf("%d ", arr1[i]);
}

printf("\n");

HeapSort(arr2, sizeof(arr2) / sizeof(arr2[0]));
for (i = 0; i < sizeof(arr2) / sizeof(arr2[0]); i++)
{
printf("%d ", arr2[i]);
}
return 0;
}

    💨 输出结果:

      小堆
在这里插入图片描述

      大堆
在这里插入图片描述

3、堆的时间复杂度

      ❓ 证明建堆的时间复杂度是O(N) ❔

1
2
scss复制代码因为堆是完全二叉树,而满二叉树也是完全二叉树,此处为了简化使用满二叉树来证明
(时间复杂度本来看的就是近似值,多几个节点不影响最终结果)

在这里插入图片描述

建堆的调用次数用 T(N) 表示:(从最后一个非叶子节点 <也就是倒数第二层> 开始,最坏的情况下:倒数第二层每个节点最多能向下调 1 次;倒数第三层每个节点最多能向下调 2 次;倒数第四层每个节点最多能向下调 3 次;)

每层节点个数 ×\times× 最坏情况向下调整次数:

T(N) = 2^h-2^ ×\times× 1 + 2^h-3^ ×\times× 2 + … … + 2^0^ ×\times× (h-1)

这里我们从上至下开始

T(N) = 2^0^ ×\times× (h - 1) + 2^1^ ×\times× (h - 2) + 2^2^ ×\times× (h - 3) + 2^3^ ×\times× (h - 4) + … … + 2^h-3^ ×\times× 2 + 2^h-2^ ×\times× 1

    ❗ 错位相减法 ❕

      等号左右两边乘个 2 得到一个新公式,再用新公式减去旧的公式,具体见下图

在这里插入图片描述

4、堆的插入

先插入一个10到数组的尾上,再进行向上调整算法,直到满足堆。

在这里插入图片描述

5、堆的删除

删除堆是删除堆顶的数据,将堆顶的数据和最后一个数据交换换,然后删除数组最后一个数据,再进行向下调整算法。

在这里插入图片描述

6、堆的代码实现

⚠ 注意 ⚠

    ▶ 堆的初始化一般是使用数组进行初始化的

    ▶ 堆的插入数据不分头插、尾插,将数据插入后,原来堆的属性不变

      先放在数组的最后一个位置,再向上调整

    ▶ 堆的删除数据删除的是堆顶的数据,将数据删除后,原来堆的属性不变

      为了效率,将第一个和最后一个元素交换,再减容,然后再调整

❗ 这里需要三个文件 ❕

    1️⃣ Heap.h,用于函数的声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
c复制代码#pragma once

//头
#include<stdio.h>
#include<assert.h>
#include<stdlib.h>
#include<string.h>
#include<stdbool.h>

typedef int HPDataType;

//C++ -> priority_queue 在C++里用的是优先级队列,其底层就是一个堆
//大堆
typedef struct Heap
{
HPDataType* a;
int size;
int capacity;
}HP;
//函数的声明
//交换
void Swap(int* px, int* py);
//向下调整
void AdjustDown(int* arr, int n, int parent);
//向上调整
void AdjustUp(int* a, int child);
//使用数组进行初始化
void HeapInit(HP* php, HPDataType* a, int n);
//回收空间
void HeapDestroy(HP* php);
//插入x,保持它继续是堆
void HeapPush(HP* php, HPDataType x);
//删除堆顶的数据,保持它继续是堆
void HeapPop(HP* php);
//获取堆顶的数据,也就是最值
HPDataType HeapTop(HP* php);
//判空
bool HeapEmpty(HP* php);
//堆的数据个数
int HeapSize(HP* php);
//输出
void HeapPrint(HP* php);

    2️⃣ Heap.c,用于函数的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
c复制代码#include"Heap.h"


void Swap(int* px, int* py)
{
int temp = *px;
*px = *py;
*py = temp;
}
void AdjustDown(int* arr, int n, int parent)
{
int child = parent * 2 + 1;
while (child < n)
{
if (arr[child] < arr[child + 1] && child + 1 < n)
{
child++;
}
if (arr[child] > arr[parent])
{
Swap(&arr[child], &arr[parent]);
parent = child;
child = parent * 2 + 1;
}
else
{
break;
}
}
}
void AdjustUp(int* a, int child)
{
int parent = (child - 1) / 2;
while (child > 0)
{
if (a[child] > a[parent])
{
Swap(&a[child], &a[parent]);
child = parent;
parent = (child - 1) / 2;
}
else
{
break;
}
}
}
void HeapPrint(HP* php)
{
assert(php);

int i = 0;
for (i = 0; i < php->size; i++)
{
printf("%d ", php->a[i]);
}
printf("\n");
}
//1、对于HeapCreate函数,结构体不是外面传进来的,而是在函数内部自己malloc空间,再创建的
/*
HP* HeapCreate(HPDataType* a, int n)
{}
*/
//2、对于HeapInit函数,在外面定义一个结构体,把结构体的地址传进来
void HeapInit(HP* php, HPDataType* a, int n)
{
assert(php);
//malloc空间(当前数组大小一样的空间)
php->a = (HPDataType*)malloc(sizeof(HPDataType) * n);
if (php->a == NULL)
{
printf("malloc fail\n");
exit(-1);
}
//使用数组初始化
memcpy(php->a, a, sizeof(HPDataType) * n);
php->size = n;
php->capacity = n;
//建堆
int i = 0;
for (i = (n - 1 - 1) / 2; i >= 0; i--)
{
AdjustDown(php->a, n, i);
}
}
void HeapDestroy(HP* php)
{
assert(php);
free(php->a);
php->a = NULL;
php->size = php->capacity = 0;
}
bool HeapEmpty(HP* php)
{
assert(php);
return php->size == 0;
}
void HeapPush(HP* php, HPDataType x)
{
assert(php);

//空间不够,增容
if (php->size == php->capacity)
{
HPDataType* temp = (HPDataType*)realloc(php->a, php->capacity * 2 * sizeof(HPDataType));
if (temp == NULL)
{
printf("realloc fail\n");
exit(-1);
}
else
{
php->a = temp;
}
php->capacity *= 2;
}
//将x放在最后
php->a[php->size] = x;
php->size++;
//向上调整
AdjustUp(php->a, php->size - 1);
}
void HeapPop(HP* php)
{
assert(php);
//没有数据删除就报错
assert(!HeapEmpty(php));
//交换首尾
Swap(&php->a[0], &php->a[php->size-1]);
php->size--;
//向下调整
AdjustDown(php->a, php->size, 0);
}
HPDataType HeapTop(HP* php)
{
assert(php);
//没有数据获取就报错
assert(!HeapEmpty(php));

return php->a[0];
}
int HeapSize(HP* php)
{
assert(php);

return php->size;
}

    3️⃣ Test.c,用于测试函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
c复制代码#include"Heap.h"

void TestHeap()
{
int arr[] = { 27, 37, 28, 18, 19, 34, 65, 25, 49, 15 };
HP hp;
HeapInit(&hp, arr, sizeof(arr)/sizeof(arr[0]));
HeapPrint(&hp);
HeapPush(&hp, 18);
HeapPrint(&hp);
HeapPush(&hp, 98);
HeapPrint(&hp);
printf("\n\n");
//将堆这数据结构实现好后,我们就可以利用这些接口实现排序
while(!HeapEmpty(&hp))
{
printf("%d ", HeapTop(&hp));
HeapPop(&hp);
}
printf("\n");

}
int main()
{
TestHeap();
return 0;
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…299300301…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%