开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

⭐Redis分布式——主从复制、Sentinel、集群彻底吃

发表于 2021-10-09

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

本文已参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金。

一、主从复制

1、简介

主从复制是Redis分布式的基石,也是Redis高可用的保障。在Redis中,被复制的服务器称为主服务器(Master),对主服务器进行复制的服务器称为从服务器(Slave)。

​

主-从.png

主从复制的配置非常简单,有三种方式(其中IP-主服务器IP地址/PORT-主服务器Redis服务端口):

  1. 配置文件——redis.conf文件中,配置slaveof ip port
  2. 命令——进入Redis客户端执行slaveof ip port
  3. 启动参数—— ./redis-server –slaveof ip port

2、主从复制的演进

Redis的主从复制机制,并不是一开始就像6.x版本一样完善,而是一个版本一个版本迭代而来的。它大体上经过三个版本的迭代:

  • 2.8以前
  • 2.8~4.0
  • 4.0以后

随着版本的增长,Redis主从复制机制逐渐完善;但是他们的本质都是围绕同步(sync)和命令传播(command propagate)两个操作展开:

  • 同步(sync):指的是将从服务器的数据状态更新至主服务器当前的数据状态,主要发生在初始化或后续的全量同步。
  • 命令传播(command propagate):当主服务器的数据状态被修改(写/删除等),主从之间的数据状态不一致时,主服务将发生数据改变的命令传播给从服务器,让主从服务器之间的状态重回一致。

2.1 版本2.8以前

2.1.1 同步

2.8以前的版本,从服务器对主服务器的同步需要从服务器向主服务器发生sync命令来完成:\

2.8版本.png

  1. 从服务器接收到客户端发送的slaveof ip prot命令,从服务器根据ip:port向主服务器创建套接字连接
  2. 套接字成功连接到主服务器后,从服务器会为这个套接字连接关联一个专门用于处理复制工作的文件事件处理器,处理后续的主服务器发送的RDB文件和传播的命令
  3. 开始进行复制,从服务器向主服务器发送sync命令
  4. 主服务器接收到sync命令后,执行bgsave命令,主服务器主进程fork的子进程会生成一个RDB文件,同时将RDB快照产生后的所有写操作记录在缓冲区中
  5. bgsave命令执行完成后,主服务器将生成的RDB文件发送给从服务器,从服务器接收到RDB文件后,首先会清除本身的全部数据,然后载入RDB文件,将自己的数据状态更新成主服务器的RDB文件的数据状态
  6. 主服务器将缓冲区的写命令发送给从服务器,从服务器接收命令,并执行。
  7. 主从复制同步步骤完成

2.1.2 命令传播

当同步工作完成之后,主从之间需要通过命令传播来维持数据状态的一致性。

如下图,当前主从服务器之间完成同步工作之后,主服务接收客户端的DEL K6指令后删除了K6,此时从服务器仍然存在K6,主从数据状态并不一致。为了维持主从服务器状态一致,主服务器会将导致自己数据状态发生改变的命令传播到从服务器执行,当从服务器也执行了相同的命令之后,主从服务器之间的数据状态将会保持一致。\

2.8主从同步+命令传播.png

2.1.3 缺陷

从上面看不出2.8以前版本的主从复制有什么缺陷,这是因为我们还没有考虑网络波动的情况。了解分布式的兄弟们肯定听说过CAP理论,CAP理论是分布式存储系统的基石,在CAP理论中P(partition网络分区)必然存在,Redis主从复制也不例外。当主从服务器之间出现网络故障,导致一段时间内从服务器与主服务器之间无法通信,当从服务器重新连接上主服务器时,如果主服务器在这段时间内数据状态发生了改变,那么主从服务器之间将出现数据状态不一致。

在Redis 2.8以前的主从复制版本中,解决这种数据状态不一致的方式是通过重新发送sync命令来实现。虽然sync能保证主从服务器数据状态一致,但是很明显sync是一个非常消耗资源的操作。

​

sync命令执行,主从服务器需要占用的资源:

  • 主服务器执行BGSAVE生成RDB文件,会占用大量CPU、磁盘I/O和内存资源
  • 主服务器将生成的RDB文件发送给从服务器,会占用大量网络带宽,
  • 从服务器接收RDB文件并载入,会导致从服务器阻塞,无法提供服务

从上面三点可以看出,sync命令不仅会导致主服务器的响应能力下降,也会导致从服务器在此期间拒绝对外提供服务。

2.2 版本2.8-4.0

2.2.1 改进点

针对2.8以前的版本,Redis在2.8之后对从服务器重连后的数据状态同步进行了改进。改进的方向是减少全量同步(full resynchronizaztion)的发生,尽可能使用增量同步(partial resynchronization)。在2.8版本之后使用psync命令代替了sync命令来执行同步操作,psync命令同时具备全量同步和增量同步的功能:

  • 全量同步与上一版本(sync)一致
  • 增量同步中对于断线重连后的复制,会根据情况采取不同措施;如果条件允许,仍然只发送从服务缺失的部分数据。

2.2.2 psync如何实现

Redis为了实现从服务器断线重连后的增量同步,增加了三个辅助参数:

  • 复制偏移量(replication offset)
  • 积压缓冲区(replication backlog)
  • 服务器运行id(run id)
2.2.2.1 复制偏移量

在主服务器和从服务器内都会维护一个复制偏移量

  • 主服务器向从服务发送数据,传播N个字节的数据,主服务的复制偏移量增加N
  • 从服务器接收主服务器发送的数据,接收N个字节的数据,从服务器的复制偏移量增加N

正常同步的情况如下:\

偏移量.png

通过对比主从服务器之间的复制偏移量是否相等,能够得知主从服务器之间的数据状态是否保持一致。

假设此时A/B正常传播,C从服务器断线,那么将出现如下情况:\

偏移量+断线.png

很明显有了复制偏移量之后,从服务器C断线重连后,主服务器只需要发送从服务器缺少的100字节数据即可。但是主服务器又是如何知道从服务器缺少的是那些数据呢?

​

2.2.2.2 复制积压缓冲区

复制积压缓冲区是一个固定长度的队列,默认为1MB大小。当主服务器数据状态发生改变,主服务器将数据同步给从服务器的同时会另存一份到复制积压缓冲区中。\

复制积压缓冲区.png

复制积压缓冲区为了能和偏移量进行匹配,它不仅存储了数据内容,还记录了每个字节对应的偏移量:\

复制积压缓冲区+字节值+偏移量.png

当从服务器断线重连后,从服务器通过psync命令将自己的复制偏移量(offset)发送给主服务器,主服务器便可通过这个偏移量来判断进行增量传播还是全量同步。

  • 如果偏移量offset+1的数据仍然在复制积压缓冲区中,那么进行增量同步操作
  • 反之进行全量同步操作,与sync一致

Redis的复制积压缓冲区的大小默认为1MB,如果需要自定义应该如何设置呢?

很明显,我们希望能尽可能的使用增量同步,但是又不希望缓冲区占用过多的内存空间。那么我们可以通过预估Redis从服务断线后重连的时间T,Redis主服务器每秒接收的写命令的内存大小M,来设置复制积压缓冲区的大小S。

S = 2 * M * T

注意这里扩大2倍是为了留有一定的余地,保证绝大部分的断线重连都能采用增量同步。

​

2.2.2.3 服务器运行 ID

看到这里是不是再想上面已经可以实现断线重连的增量同步了,还要运行ID干嘛?其实还有一种情况没考虑,就是当主服务器宕机后,某台从服务器被选举成为新的主服务器,这种情况我们就通过比较运行ID来区分。

  • 运行ID(run id)是服务器启动时自动生成的40个随机的十六进制字符串,主服务和从服务器均会生成运行ID
  • 当从服务器首次同步主服务器的数据时,主服务器会发送自己的运行ID给从服务器,从服务器会保存在RDB文件中
  • 当从服务器断线重连后,从服务器会向主服务器发送之前保存的主服务器运行ID,如果服务器运行ID匹配,则证明主服务器未发生更改,可以尝试进行增量同步
  • 如果服务器运行ID不匹配,则进行全量同步

2.2.3 完整的psync

完整的psync过程非常的复杂,在2.8-4.0的主从复制版本中已经做到了非常完善。psync命令发送的参数如下:

psync

当从服务器没有复制过任何主服务器(并不是主从第一次复制,因为主服务器可能会变化,而是从服务器第一次全量同步),从服务器将会发送:

psync ? -1

psync.png
一起完整的psync流程如下图:\

一次完整的psync.png

一次完整的psync.png

  1. 从服务器接收到SLAVEOF 127.0.0.1 6379命令
  2. 从服务器返回OK给命令发起方(这里是异步操作,先返回OK,再保存地址和端口信息)
  3. 从服务器将IP地址和端口信息保存到Master Host和Master Port中
  4. 从服务器根据Master Host和Master Port主动向主服务器发起套接字连接,同时从服务将会未这个套接字连接关联一个专门用于文件复制工作的文件事件处理器,用于后续的RDB文件复制等工作
  5. 主服务器接收到从服务器的套接字连接请求,为该请求创建对应的套接字连接之后,并将从服务器看着一个客户端(在主从复制中,主服务器和从服务器之间其实互为客户端和服务端)
  6. 套接字连接建立完成,从服务器主动向主服务发送PING命令,如果在指定的超时时间内主服务器返回PONG,则证明套接字连接可用,否则断开重连
  7. 如果主服务器设置了密码(masterauth),那么从服务器向主服务器发送AUTH masterauth命令,进行身份验证。注意,如果从服务器发送了密码,主服务并未设置密码,此时主服务会发送no password is set错误;如果主服务器需要密码,而从服务器未发送密码,此时主服务器会发送NOAUTH错误;如果密码不匹配,主服务器会发送invalid password错误。
  8. 从服务器向主服务器发送REPLCONF listening-port xxxx(xxxx表示从服务器的端口)。主服务器接收到该命令后会将数据保存起来,当客户端使用INFO replication查询主从信息时能够返回数据
  9. 从服务器发送psync命令,此步骤请查看上图psync的两种情况
  10. 主服务器与从服务器之间互为客户端,进行数据的请求/响应
  11. 主服务器与从服务器之间通过心跳包机制,判断连接是否断开。从服务器每个1秒向主服务器发送命令,REPLCONF ACL offset(从服务器的复制偏移量),该机制可以保证主从之间数据的正确同步,如果偏移量不相等,主服务器将会采取增量/全量同步措施来保证主从之间数据状态一致(增量/全量的选择取决于,offset+1的数据是否仍在复制积压缓冲区中)

2.3 版本4.0

Redis 2.8-4.0版本仍然有一些改进的空间,当主服务器切换时,是否也能进行增量同步呢?因此Redis 4.0版本针对这个问题做了优化处理,psync升级为psync2.0。

psync2.0 抛弃了服务器运行ID,采用了replid和replid2来代替,其中replid存储的是当前主服务器的运行ID,replid2保存的是上一个主服务器运行ID。

  • 复制偏移量(replication offset)
  • 积压缓冲区(replication backlog)
  • 主服务器运行id(replid)
  • 上个主服务器运行id(replid2)

通过replid和replid2我们可以解决主服务器切换时,增量同步的问题:

  • 如果replid等于当前主服务器的运行id,那么判断同步方式增量/全量同步
  • 如果replid不相等,则判断replid2是否相等(是否同属于上一个主服务器的从服务器),如果相等,仍然可以选择增量/全量同步,如果不相等则只能进行全量同步。

二、Sentinel

1、简介

主从复制奠定了Redis分布式的基础,但是普通的主从复制并不能达到高可用的状态。在普通的主从复制模式下,如果主服务器宕机,就只能通过运维人员手动切换主服务器,很显然这种方案并不可取。

针对上述情况,Redis官方推出了可抵抗节点故障的高可用方案——Redis Sentinel(哨兵)。Redis Sentinel(哨兵):由一个或多个Sentinel实例组成的Sentinel系统,它可以监视任意多个主从服务器,当监视的主服务器宕机时,自动下线主服务器,并且择优选取从服务器升级为新的主服务器。

​

如下示例:当旧Master下线时长超过用户设定的下线时长上限,Sentinel系统就会对旧Master执行故障转移操作,故障转移操作包含三个步骤:

  1. 在Slave中选择数据最新的作为新的Master
  2. 向其他Slave发送新的复制指令,让其他从服务器成为新的Master的Slave
  3. 继续监视旧Master,如果其上线则将旧Master设置为新Master的Slave

sentinel监视主服务器下线.png

本文基于如下资源清单进行开展:

IP地址 节点角色 端口
192.168.211.104 Redis Master/ Sentinel 6379/26379
192.168.211.105 Redis Slave/ Sentinel 6379/26379
192.168.211.106 Redis Slave/ Sentinel 6379/26379

​

2、Sentinel初始化与网络连接

Sentinel并没有什么特别神奇的地方,它就是一个更加简单的Redis服务器,在Sentinel启动的时候它会加载不同的命令表和配置文件,因此从本质上来讲Sentinel就是一个拥有较少命令和部分特殊功能的Redis服务。当一个Sentinel启动时它需要经历如下步骤:

  1. 初始化Sentinel服务器
  2. 替换普通Redis代码为Sentinel的专用代码
  3. 初始化Sentinel状态
  4. 根据用户给定的Sentinel配置文件,初始化Sentinel监视的主服务器列表
  5. 创建连接主服务器的网络连接
  6. 根据主服务获取从服务器信息,创建连接从服务器的网络连接
  7. 根据发布/订阅获取Sentinel信息,创建Sentinel之间的网络连接

2.1 初始化Sentinel服务器

Sentinel本质上就是一个Redis服务器,因此启动Sentinel需要启动一个Redis服务器,但是Sentinel并不需要读取RDB/AOF文件来还原数据状态。

​

2.2 替换普通Redis代码为Sentinel的专用代码

Sentinel用于较少的Redis命令,大部分命令在Sentinel客户端都不支持,并且Sentinel拥有一些特殊的功能,这些需要Sentinel在启动时将Redis服务器使用的代码替换为Sentinel的专用代码。在此期间Sentinel会载入与普通Redis服务器不同的命令表。

Sentinel不支持SET、DBSIZE等命令;保留支持PING、PSUBSCRIBE、SUBSCRIBE、UNSUBSCRIBE、INFO等指令;这些指令在Sentinel工作中提供了保障。

​

2.3 初始化Sentinel状态

装载Sentinel的特有代码之后,Sentinel会初始化sentinelState结构,该结构用于存储Sentinel相关的状态信息,其中最重要的就是masters字典。

1
2
3
4
5
6
7
8
9
10
11
arduino复制代码 1struct sentinelState {
2
3    //当前纪元,故障转移使用
4    uint64_t current_epoch; 
5
6    // Sentinel监视的主服务器信息 
7    // key -> 主服务器名称 
8    // value -> 指向sentinelRedisInstance指针
9    dict *masters; 
10    // ...
11} sentinel;

2.4 初始化Sentinel监视的主服务器列表

Sentinel监视的主服务器列表保存在sentinelState的masters字典中,当sentinelState创建之后,开始对Sentinel监视的主服务器列表进行初始化。

  • masters的key是主服务的名字
  • masters的value是一个指向sentinelRedisInstance指针

主服务器的名字由我们sentinel.conf配置文件指定,如下主服务器名字为redis-master(我这里是一主二从的配置):

1
2
3
4
5
6
7
8
yaml复制代码1daemonize yes
2port 26379
3protected-mode no
4dir "/usr/local/soft/redis-6.2.4/sentinel-tmp"
5sentinel monitor redis-master 192.168.211.104 6379 2
6sentinel down-after-milliseconds redis-master 30000
7sentinel failover-timeout redis-master 180000
8sentinel parallel-syncs redis-master 1

sentinelRedisInstance实例保存了Redis服务器的信息(主服务器、从服务器、Sentinel信息都保存在这个实例中)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
arduino复制代码 1typedef struct sentinelRedisInstance {
2
3    // 标识值,标识当前实例的类型和状态。如SRI_MASTER、SRI_SLVAE、SRI_SENTINEL
4    int flags;
5
6    // 实例名称 主服务器为用户配置实例名称、从服务器和Sentinel为ip:port
7    char *name;
8
9    // 服务器运行ID
10    char *runid;
11
12    //配置纪元,故障转移使用
13    uint64_t config_epoch; 
14
15    // 实例地址
16    sentinelAddr *addr;
17
18    // 实例判断为主观下线的时长 sentinel down-after-milliseconds redis-master 30000
19    mstime_t down_after_period; 
20
21    // 实例判断为客观下线所需支持的投票数 sentinel monitor redis-master 192.168.211.104 6379 2
22    int quorum;
23
24    // 执行故障转移操作时,可以同时对新的主服务器进行同步的从服务器数量 sentinel parallel-syncs redis-master 1
25    int parallel-syncs;
26
27    // 刷新故障迁移状态的最大时限 sentinel failover-timeout redis-master 180000
28    mstime_t failover_timeout;
29
30    // ...
31} sentinelRedisInstance;

根据上面的一主二从配置将会得到如下实例结构:\

初始实例结构.png

2.5 创建连接主服务器的网络连接

当实例结构初始化完成之后,Sentinel将会开始创建连接Master的网络连接,这一步Sentinel将成为Master的客户端。

Sentinel和Master之间会创建一个命令连接和一个订阅连接:

  • 命令连接用于获取主从信息
  • 订阅连接用于Sentinel之间进行信息广播,每个Sentinel和自己监视的主从服务器之间会订阅sentinel:hello频道(注意Sentinel之间不会创建订阅连接,它们通过订阅sentinel:hello频道来获取其他Sentinel的初始信息)

命令连接和订阅连接.png

Sentinel在创建命令连接完成之后,每隔10秒钟向Master发送一次INFO指令,通过Master的回复信息可以获得两方面的知识:

  • Master本身的信息
  • Master下的Slave信息

主从信息.png

​

2.6 创建连接从服务器的网络连接

根据主服务获取从服务器信息,Sentinel可以创建到Slave的网络连接,Sentinel和Slave之间也会创建命令连接和订阅连接。\

Slave命令连接和订阅连接.png

当Sentinel和Slave之间创建网络连接之后,Sentinel成为了Slave的客户端,Sentinel也会每隔10秒钟通过INFO指令请求Slave获取服务器信息。

到这一步Sentinel获取到了Master和Slave的相关服务器数据。这其中比较重要的信息如下:

  • 服务器ip和port
  • 服务器运行id run id
  • 服务器角色role
  • 服务器连接状态mater_link_status
  • Slave复制偏移量slave_repl_offset(故障转移中选举新的Master需要使用)
  • Slave优先级slave_priority

此时实例结构信息如下所示:\

从服务器信息.png

2.7 创建Sentinel之间的网络连接

此时是不是还有疑问,Sentinel之间是怎么互相发现对方并且相互通信的,这个就和上面Sentinel与自己监视的主从之间订阅sentinel:hello频道有关了。

Sentinel会与自己监视的所有Master和Slave之间订阅sentinel:hello频道,并且Sentinel每隔2秒钟向sentinel:hello频道发送一条消息,消息内容如下:

PUBLISH sentinel:hello “,,,,,,,”

其中s代码Sentinel,m代表Master;ip表示IP地址,port表示端口、runid表示运行id、epoch表示配置纪元。

​

多个Sentinel在配置文件中会配置相同的主服务器ip和端口信息,因此多个Sentinel均会订阅sentinel:hello频道,通过频道接收到的信息就可获取到其他Sentinel的ip和port,其中有如下两点需要注意:

  • 如果获取到的runid与Sentinel自己的runid相同,说明消息是自己发布的,直接丢弃
  • 如果不相同,则说明接收到的消息是其他Sentinel发布的,此时需要根据ip和port去更新或新增Sentinel实例数据

Sentinel之间不会创建订阅连接,它们只会创建命令连接:\

sentinel之间的命令连接.png

此时实例结构信息如下所示:\

sentinel服务器信息.png

​

3、Sentinel工作

Sentinel最主要的工作就是监视Redis服务器,当Master实例超出预设的时限后切换新的Master实例。这其中有很多细节工作,大致分为检测Master是否主观下线、检测Master是否客观下线、选举领头Sentinel、故障转移四个步骤。

​

3.1 检测Master是否主观下线

Sentinel每隔1秒钟,向sentinelRedisInstance实例中的所有Master、Slave、Sentinel发送PING命令,通过其他服务器的回复来判断其是否仍然在线。​

1
erlang复制代码1sentinel down-after-milliseconds redis-master 30000

在Sentinel的配置文件中,当Sentinel PING的实例在连续down-after-milliseconds配置的时间内返回无效命令,则当前Sentinel认为其主观下线。Sentinel的配置文件中配置的down-after-milliseconds将会对其sentinelRedisInstance实例中的所有Master、Slave、Sentinel都适应。

无效指令指的是+PONG、-LOADING、-MASTERDOWN之外的其他指令,包括无响应

如果当前Sentinel检测到Master处于主观下线状态,那么它将会修改其sentinelRedisInstance的flags为SRI_S_DOWN\

主观下线状态修改.png

​

3.2 检测Master是否客观下线

当前Sentinel认为其下线只能处于主观下线状态,要想判断当前Master是否客观下线,还需要询问其他Sentinel,并且所有认为Master主观下线或者客观下线的总和需要达到quorum配置的值,当前Sentinel才会将Master标志为客观下线。\

客观下线状态修改.png
当前Sentinel向sentinelRedisInstance实例中的其他Sentinel发送如下命令:

1
xml复制代码1SENTINEL is-master-down-by-addr <ip> <port> <current_epoch> <runid>
  • ip:被判断为主观下线的Master的IP地址
  • port:被判断为主观下线的Master的端口
  • current_epoch:当前sentinel的配置纪元
  • runid:当前sentinel的运行id,runid

current_epoch和runid均用于Sentinel的选举,Master下线之后,需要选举一个领头Sentinel来选举一个新的Master,current_epoch和runid在其中发挥着重要作用,这个后续讲解。

​

接收到命令的Sentinel,会根据命令中的参数检查主服务器是否下线,检查完成后会返回如下三个参数:

  • down_state:检查结果1代表已下线、0代表未下线
  • leader_runid:返回*代表判断是否下线,返回runid代表选举领头Sentinel
  • leader_epoch:当leader_runid返回runid时,配置纪元会有值,否则一直返回0
  1. 当Sentinel检测到Master处于主观下线时,询问其他Sentinel时会发送current_epoch和runid,此时current_epoch=0,runid=*
  2. 接收到命令的Sentinel返回其判断Master是否下线时down_state = 1/0,leader_runid = *,leader_epoch=0

询问其他sentinel.png

​

3.3 选举领头Sentinel

down_state返回1,证明接收is-master-down-by-addr命令的Sentinel认为该Master也主观下线了,如果down_state返回1的数量(包括本身)大于等于quorum(配置文件中配置的值),那么Master正式被当前Sentinel标记为客观下线。

此时,Sentinel会再次发送如下指令:

1
xml复制代码1SENTINEL is-master-down-by-addr <ip> <port> <current_epoch> <runid>

此时的runid将不再是0,而是Sentinel自己的运行id(runid)的值,表示当前Sentinel希望接收到is-master-down-by-addr命令的其他Sentinel将其设置为领头Sentinel。这个设置是先到先得的,Sentinel先接收到谁的设置请求,就将谁设置为领头Sentinel。

发送命令的Sentinel会根据其他Sentinel回复的结果来判断自己是否被该Sentinel设置为领头Sentinel,如果Sentinel被其他Sentinel设置为领头Sentinel的数量超过半数Sentinel(这个数量在sentinelRedisInstance的sentinel字典中可以获取),那么Sentinel会认为自己已经成为领头Sentinel,并开始后续故障转移工作(由于需要半数,且每个Sentinel只会设置一个领头Sentinel,那么只会出现一个领头Sentinel,如果没有一个达到领头Sentinel的要求,Sentinel将会重新选举直到领头Sentinel产生为止)。

​

3.4 故障转移

故障转移将会交给领头sentinel全权负责,领头sentinel需要做如下事情:

  1. 从原先master的slave中,选择最佳的slave作为新的master
  2. 让其他slave成为新的master的slave
  3. 继续监听旧master,如果其上线,则将其设置为新的master的slave

这其中最难的一步是如果选择最佳的新Master,领头Sentinel会做如下清洗和排序工作:

  1. 判断slave是否有下线的,如果有从slave列表中移除
  2. 删除5秒内未响应sentinel的INFO命令的slave
  3. 删除与下线主服务器断线时间超过down_after_milliseconds * 10 的所有从服务器
  4. 根据slave优先级slave_priority,选择优先级最高的slave作为新master
  5. 如果优先级相同,根据slave复制偏移量slave_repl_offset,选择偏移量最大的slave作为新master
  6. 如果偏移量相同,根据slave服务器运行id run id排序,选择run id最小的slave作为新master

新的Master产生后,领头sentinel会向已下线主服务器的其他从服务器(不包括新Master)发送SLAVEOF ip port命令,使其成为新master的slave。

​

到这里Sentinel的的工作流程就算是结束了,如果新master下线,则循环流程即可!

三、集群

1、简介

Redis集群是Redis提供的分布式数据库方案,集群通过分片(sharding)进行数据共享,Redis集群主要实现了以下目标:

  • 在1000个节点的时候仍能表现得很好并且可扩展性是线性的。
  • 没有合并操作(多个节点不存在相同的键),这样在 Redis 的数据模型中最典型的大数据值中也能有很好的表现。
  • 写入安全,那些与大多数节点相连的客户端所做的写入操作,系统尝试全部都保存下来。但是Redis无法保证数据完全不丢失,异步同步的主从复制无论如何都会存在数据丢失的情况。
  • 可用性,主节点不可用,从节点能替换主节点工作。

关于Redis集群的学习,如果没有任何经验的弟兄们建议先看下这三篇文章(中文系列):

Redis集群教程

redis.cn/topics/clus…

Redis集群规范

redis.cn/topics/clus…

Redis3主3从伪集群部署

blog.csdn.net/qq_41125219…

下文内容依赖下图三主三从结构开展:\

8624ecbdf7f691d78ef48bc5afdb6d18.png

资源清单:

节点 IP 槽(slot)范围
Master[0] 192.168.211.107:6319 Slots 0 - 5460
Master[1] 192.168.211.107:6329 Slots 5461 - 10922
Master[2] 192.168.211.107:6339 Slots 10923 - 16383
Slave[0] 192.168.211.107:6369
Slave[1] 192.168.211.107:6349
Slave[2] 192.168.211.107:6359

Redis集群.png

2、集群内部

Redis 集群没有使用一致性hash, 而是引入了 哈希槽的概念。Redis 集群有16384个哈希槽,每个key通过CRC16校验后对16384取模来决定放置哪个槽,这种结构很容易添加或者删除节点。集群的每个节点负责一部分hash槽,比如上面资源清单的集群有3个节点,其槽分配如下所示:

  • 节点 Master[0] 包含 0 到 5460 号哈希槽
  • 节点 Master[1] 包含5461 到 10922 号哈希槽
  • 节点 Master[2] 包含10923到 16383 号哈希槽

深入学习Redis集群之前,需要了解集群中Redis实例的内部结构。当某个Redis服务节点通过cluster_enabled配置为yes开启集群模式之后,Redis服务节点不仅会继续使用单机模式下的服务器组件,还会增加custerState、clusterNode、custerLink等结构用于存储集群模式下的特殊数据。

​

如下三个数据承载对象一定要认真看,尤其是结构中的注释,看完之后集群大体上怎么工作的,心里就有数了,嘿嘿嘿;

​

2.1 clsuterNode

clsuterNode用于存储节点信息,比如节点的名字、IP地址、端口信息和配置纪元等等,以下代码列出部分非常重要的属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
arduino复制代码 1typedef struct clsuterNode {
2
3    // 创建时间
4    mstime_t ctime;
5
6    // 节点名字,由40位随机16进制的字符组成(与sentinel中讲的服务器运行id相同)
7    char name[REDIS_CLUSTER_NAMELEN];
8
9    // 节点标识,可以标识节点的角色和状态
10    // 角色 -> 主节点或从节点 例如:REDIS_NODE_MASTER(主节点) REDIS_NODE_SLAVE(从节点)
11    // 状态 -> 在线或下线 例如:REDIS_NODE_PFAIL(疑似下线) REDIS_NODE_FAIL(下线) 
12    int flags;
13
14    // 节点配置纪元,用于故障转移,与sentinel中用法类似
15    // clusterState中的代表集群的配置纪元
16    unit64_t configEpoch;
17
18    // 节点IP地址
19    char ip[REDIS_IP_STR_LEN];
20
21    // 节点端口
22    int port;
23
24    // 连接节点的信息
25    clusterLink *link;
26
27    // 一个2048字节的二进制位数组
28    // 位数组索引值可能为0或1
29    // 数组索引i位置值为0,代表节点不负责处理槽i
30    // 数组索引i位置值为1,代表节点负责处理槽i
31    unsigned char slots[16384/8];
32
33    // 记录当前节点处理槽的数量总和
34    int numslots;
35
36    // 如果当前节点是从节点
37    // 指向当前从节点的主节点
38    struct clusterNode *slaveof;
39
40    // 如果当前节点是主节点
41    // 正在复制当前主节点的从节点数量
42    int numslaves;
43
44    // 数组——记录正在复制当前主节点的所有从节点
45    struct clusterNode **slaves;
46
47} clsuterNode;

上述代码中可能不太好理解的是slots[16384/8],其实可以简单的理解为一个16384大小的数组,数组索引下标处如果为1表示当前槽属于当前clusterNode处理,如果为0表示不属于当前clusterNode处理。clusterNode能够通过slots来识别,当前节点处理负责处理哪些槽。

初始clsuterNode或者未分配槽的集群中的clsuterNode的slots如下所示:

初始slots[16384_8].png

假设集群如上面我给出的资源清单,此时代表Master[0]的clusterNode的slots如下所示:\

Master[0]的clusterNode的slots.png

2.2 clusterLink

clusterLink是clsuterNode中的一个属性,用于存储连接节点所需的相关信息,比如套接字描述符、输入输出缓冲区等待,以下代码列出部分非常重要的属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
arduino复制代码 1typedef struct clusterState {
2
3    // 连接创建时间
4    mstime_t ctime;
5
6    // TCP 套接字描述符
7    int fd;
8
9    // 输出缓冲区,需要发送给其他节点的消息缓存在这里
10    sds sndbuf;
11
12    // 输入缓冲区,接收打其他节点的消息缓存在这里
13    sds rcvbuf;
14
15    // 与当前clsuterNode节点代表的节点建立连接的其他节点保存在这里
16    struct clusterNode *node;
17} clusterState;

2.3 custerState

每个节点都会有一个custerState结构,这个结构中存储了当前集群的全部数据,比如集群状态、集群中的所有节点信息(主节点、从节点)等等,以下代码列出部分非常重要的属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
arduino复制代码 1typedef struct clusterState {
2
3    // 当前节点指针,指向一个clusterNode
4    clusterNode *myself;
5
6    // 集群当前配置纪元,用于故障转移,与sentinel中用法类似
7    unit64_t currentEpoch;
8
9    // 集群状态 在线/下线
10    int state;
11
12    // 集群中处理着槽的节点数量总和
13    int size;
14
15    // 集群节点字典,所有clusterNode包括自己
16    dict *node;
17
18    // 集群中所有槽的指派信息
19    clsuterNode *slots[16384];
20
21    // 用于槽的重新分配——记录当前节点正在从其他节点导入的槽
22    clusterNode *importing_slots_from[16384];
23
24    // 用于槽的重新分配——记录当前节点正在迁移至其他节点的槽
25    clusterNode *migrating_slots_to[16384];
26
27    // ...
28
29} clusterState;

在custerState有三个结构需要认真了解的,第一个是slots数组,clusterState中的slots数组与clsuterNode中的slots数组是不一样的,在clusterNode中slots数组记录的是当前clusterNode所负责的槽,而clusterState中的slots数组记录的是整个集群的每个槽由哪个clsuterNode负责,因此集群正常工作的时候clusterState的slots数组每个索引指向负责该槽的clusterNode,集群槽未分配之前指向null。

​

如图展示资源清单中的集群clusterState中的slots数组与clsuterNode中的slots数组:\

clusterState中Slots数组.png

Redis集群中使用两个slots数组的原因是出于性能的考虑:

  • 当我们需要获取整个集群中clusterNode分别负责什么槽时,只需要查询clusterState中的slots数组即可。如果没有clusterState的slots数组,则需要遍历所有的clusterNode结构,这样显然要慢一些
  • 此外clusterNode中的slots数组也有存在的必要,因为集群中任意一个节点之间需要知道彼此负责的槽,此时节点之间只需要互相传输clusterNode中的slots数组结构就行。

第二个需要认真了解的结构是node字典,该结构虽然简单,但是node字典中存储了所有的clusterNode,这也是Redis集群中的单个节点获取其他主节点、从节点信息的主要位置,因此我们也需要注意一下。

第三个需要认真了解的结构是importing_slots_from[16384]数组和migrating_slots_to[16384],这两个数组在集群重新分片时需要使用,需要重点了解,后面再说吧,这里说的话顺序不太对。

3、集群工作

3.1 槽(slot)如何指派?

Redis集群一共16384个槽,如上资源清单我们在三主三从的集群中,每个主节点负责自己相应的槽,而在上面的三主三从部署的过程中并未看到我指定槽给对应的主节点,这是因为Redis集群自己内部给我们划分了槽,但是如果我们想自己指派槽该如何整呢?

我们可以向节点发送如下命令,将一个或多个槽指派给当前节点负责:

CLUSTER ADDSLOTS

比如我们想把0和1槽指派给Master[0],我们只需要想Master[0]节点发送如下命令即可:

CLUSTER ADDSLOTS 0 1

当节点被指派了槽后,会将clusterNode的slots数组更新,节点会将自己负责处理的槽也就是slots数组通过消息发送给集群中的其他节点,其他节点在接收当消息后会更新对应clusterNode的slots数组以及clusterState的solts数组。

​

3.2 ADDSLOTS 在Redis集群内部是如何实现的呢?

这个其实也比较简单,当我们向Redis集群中的某个节点发送CLUSTER ADDSLOTS命令时,当前节点首先会通过clusterState中的slots数组来确认指派给当前节点的槽是否没有指派给其他节点,如果已经指派了,那么会直接抛出异常,返回错误给指派的客户端。如果指派给当前节点的所有槽都未指派给其他节点,那么当前节点会将这些槽指派给自己。

指派主要有三个步骤:

  1. 更新clusterState的slots数组,将指定槽slots[i]指向当前clusterNode
  2. 更新clusterNode的slots数组,将指定槽slots[i]处的值更新为1
  3. 向集群中的其他节点发送消息,将clusterNode的slots数组发送给其他节点,其他节点接收到消息后也更新对应的clusterState的slots数组和clusterNode的slots数组

3.3 集群这么多节点,客户端怎么知道请求哪个节点?

在了解这个问题之前先要知道一个点,Redis集群是怎么计算当前这个键属于哪个槽的呢?根据官网的介绍,Redis其实并未使用一致性hash算法,而是将每个请求的key通过CRC16校验后对16384取模来决定放置到哪个槽中。

HASH_SLOT = CRC16(key) mod 16384

此时,当客户端连接向某个节点发送请求时,当前接收到命令的节点首先会通过算法计算出当前key所属的槽i,计算完后当前节点会判断clusterState的槽i是否由自己负责,如果恰好由自己负责那么当前节点就会之间响应客户端的请求,如果不由当前节点负责,则会经历如下步骤:

  1. 节点向客户端返回MOVED重定向错误,MOVED重定向错误中会将计算好的正确处理该key的clusterNode的ip和port返回给客户端
  2. 客户端接收到节点返回的MOVED重定向错误时,会根据ip和port将命令转发给正确的节点,整个处理过程对程序员来说透明,由Redis集群的服务端和客户端共同负责完成。

3.4 如果我想将已经分配给A节点的槽重新分配给B节点,怎么整?

这个问题其实涵括了很多问题,比如移除Redis集群中的某些节点,增加节点等都可以概括为把哈希槽从一个节点移动到另外一个节点。并且Redis集群非常牛逼的一点也在这里,它支持在线(不停机)的分配,也就是官方说集群在线重配置(live reconfiguration )。

​

在将实现之前先来看下CLUSTER的指令,指令会了操作就会了:

  • CLUSTER ADDSLOTS slot1 [slot2] … [slotN]
  • CLUSTER DELSLOTS slot1 [slot2] … [slotN]
  • CLUSTER SETSLOT slot NODE node
  • CLUSTER SETSLOT slot MIGRATING node
  • CLUSTER SETSLOT slot IMPORTING node

CLUSTER 用于槽分配的指令主要有如上这些,ADDSLOTS 和DELSLOTS主要用于槽的快速指派和快速删除,通常我们在集群刚刚建立的时候进行快速分配的时候才使用。CLUSTER SETSLOT slot NODE node也用于直接给指定的节点指派槽。如果集群已经建立我们通常使用最后两个来重分配,其代表的含义如下所示:

  • 当一个槽被设置为 MIGRATING,原来持有该哈希槽的节点仍会接受所有跟这个哈希槽有关的请求,但只有当查询的键还存在原节点时,原节点会处理该请求,否则这个查询会通过一个 -ASK 重定向(-ASK redirection)转发到迁移的目标节点。
  • 当一个槽被设置为 IMPORTING,只有在接受到 ASKING 命令之后节点才会接受所有查询这个哈希槽的请求。如果客户端一直没有发送 ASKING 命令,那么查询都会通过 -MOVED 重定向错误转发到真正处理这个哈希槽的节点那里。

上面这两句话是不是感觉不太看的懂,这是官方的描述,不太懂的话我来给你通俗的描述,整个流程大致如下步骤:

  1. redis-trib(集群管理软件redis-trib会负责Redis集群的槽分配工作),向目标节点(槽导入节点)发送CLUSTER SETSLOT slot IMPORTING node命令,目标节点会做好从源节点(槽导出节点)导入槽的准备工作。
  2. redis-trib随即向源节点发送CLUSTER SETSLOT slot MIGRATING node命令,源节点会做好槽导出准备工作
  3. redis-trib随即向源节点发送CLUSTER GETKEYSINSLOT slot count命令,源节点接收命令后会返回属于槽slot的键,最多返回count个键
  4. redis-trib会根据源节点返回的键向源节点依次发送MIGRATE ip port key 0 timeout命令,如果key在源节点中,将会迁移至目标节点。
  5. 迁移完成之后,redis-trib会向集群中的某个节点发送CLUSTER SETSLOT slot NODE node命令,节点接收到命令后会更新clusterNode和clusterState结构,然后节点通过消息传播槽的指派信息,至此集群槽迁移工作完成,且集群中的其他节点也更新了新的槽分配信息。

3.5 如果客户端访问的key所属的槽正在迁移怎么办?

优秀的你总会想到这种并发情况,牛皮呀!大佬们!\

u=79087421,2199932123&fm=26&fmt=auto.webp

这个问题官方也考虑了,还记得我们在聊clusterState结构的时候么?importing_slots_from和migrating_slots_to就是用来处理这个问题的。

1
2
3
4
5
6
7
8
9
10
11
12
13
arduino复制代码 1typedef struct clusterState {
2
3    // ...
4
5    // 用于槽的重新分配——记录当前节点正在从其他节点导入的槽
6    clusterNode *importing_slots_from[16384];
7
8    // 用于槽的重新分配——记录当前节点正在迁移至其他节点的槽
9    clusterNode *migrating_slots_to[16384];
10
11    // ...
12
13} clusterState;
  • 当节点正在导出某个槽,则会在clusterState中的migrating_slots_to数组对应的下标处设置其指向对应的clusterNode,这个clusterNode会指向导入的节点。
  • 当节点正在导入某个槽,则会在clusterState中的importing_slots_from数组对应的下标处设置其指向对应的clusterNode,这个clusterNode会指向导出的节点。

有了上述两个相互数组,就能判断当前槽是否在迁移了,而且从哪里迁移来,要迁移到哪里去?搞笑不就是这么简单……

​

此时,回到问题中,如果客户端请求的key刚好属于正在迁移的槽。那么接收到命令的节点首先会尝试在自己的数据库中查找键key,如果这个槽还没迁移完成,且当前key刚好也还没迁移完成,那就直接响应客户端的请求就行。如果该key已经不在了,此时节点会去查询migrating_slots_to数组对应的索引槽,如果索引处的值不为null,而是指向了某个clusterNode结构,那说明这个key已经被迁移到这个clusterNode了。这个时候节点不会继续在处理指令,而是返回ASKING命令,这个命令也会携带导入槽clusterNode对应的ip和port。客户端在接收到ASKING命令之后就需要将请求转向正确的节点了,不过这里有一点需要注意的地方 (因此我放个表情包在这里,方便读者注意)。\

u=1042078930,3579951952&fm=26&fmt=auto.webp

前面说了,当节点发现当前槽不属于自己处理时会返回MOVED指令,那么在迁移中的槽时怎么处理的呢?这个Redis集群是这个玩的。

节点发现槽正在迁移则向客户端返回ASKING命令,客户端会接收到ASKING命令,其中包含了槽迁入的clusterNode的节点ip和port。那么客户端首先会向迁入的clusterNode发送一条ASKING命令,这个命令必须要发目的是告诉当前节点,你要破例处理这次请求,因为这个槽已经迁移到你这里了,你不能直接拒绝我(因此如果Redis未接收到ASKING命令,会直接查询节点的clusterState,而正在迁移中的槽还没有更新到clusterState中,那么只能直接返回MOVED,这样不就会一直循环很多次……),接收到ASKING命令的节点会强制执行一次这个请求(只执行一次,下次再来需要重新提前发送ASKING命令)。

​

4、集群故障

Redis集群故障比较简单,这个和sentinel中主节点宕机或者在指定最长时间内未响应,重新在从节点中选举新的主节点的方式其实差不多。当然前提是Redis集群中的每个主节点,我们提前设置了从节点,要不就嘿嘿嘿……没戏。其大致步骤如下:

  1. 正常工作的集群,每个节点之间会定期向其他节点发送PING命令,如果接收命令的节点未在规定时间内返回PONG消息 ,当前节点会将接收命令的节点的clusterNode的flags设置为REDIS_NODE_PFAIL,PFAIL并不是下线,而是疑似下线。
  2. 集群节点会通过发送消息的方式来告知其他节点,集群中各个节点的状态信息
  3. 如果集群中半数以上负责处理槽的主节点都将某个主节点设置为疑似下线,那么这个节点将会被标记位下线状态,节点会将接收命令的节点的clusterNode的flags设置为REDIS_NODE_FAIL,FAIL表示已下线
  4. 集群节点通过发送消息的方式来告知其他节点,集群中各个节点的状态信息,此时下线节点的从节点在发现自己的主节点已经被标记为下线状态了,那么是时候挺身而出了
  5. 下线主节点的从节点,会选举出一个从节点作为最新的主节点,执行被选中的节点指向SLAVEOF no one成为新的主节点
  6. 新的主节点会撤销掉原主节点的槽指派,并将这些槽指派修改为自己,也就是修改clusterNode结构和clusterState结构
  7. 新的主节点向集群广播一条PONG指令,其他节点将会知道有新的主节点产生,并更新clusterNode结构和clusterState结构
  8. 新的主节点如果会向原主节点剩余的从节点发送新的SLAVEOF指令,使其成为自己的从节点
  9. 最后新的主节点将会负责原主节点的槽的响应工作

这里我写得非常模糊,如果需要细致挖掘的一定要看这篇文章:

redis.cn/topics/clus…

或者可以看下黄健宏老师的《Redis设计与实现》这本书写得挺好,我也参考了很多内容。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Cloud应该怎么入门? 什么是微服务? 什么

发表于 2021-10-09

本文介绍Spring Cloud和相关组件,让大家大概了解什么是Spring Cloud以及各组件的介绍,并推荐一些学习书籍。

什么是微服务?

引用 ThoughtWorks 公司的首席科学家 Martin Fowler 的一段话:简而言之,微服务架构风格是一种将单个应用程序作为一套小型服务开发的方法,每种应用程序都在自己的进程中运行,并与轻量级机制(通常是HTTP资源API)进行通信。这些服务是围绕业务功能构建的,可以通过全自动部署机制独立部署。这些服务的集中管理最少,可以用不同的编程语言编写,并使用不同的数据存储技术。
说到微服务,就要从单体服务开始,很多应用初期,都是单体应用,因为初期讲究迭代速度,人员也很少,没有那么清晰的业务划分和边界,从敏捷开发的角度,单体服务是最适合初期的。但随着业务发展,业务越来越复杂,单体应用由于部署在一个进程里面,修改一个小功能,上线就会影响其他功能。随后系统逐步开始拆分:从数据库到应用,微服务就开始登场了。

什么是Spring Cloud ?

Spring Cloud是一个微服务框架,相比Dubbo等RPC框架, Spring Cloud提供全套的分布式系统解决方案,它依赖于 Spring Boot ,有快速开发、持续交付和容易部署等特点。
Spring Cloud不像其他Spring子项目那样相对独立,它是一个拥有诸多子项目的大型综合项目。
Spring Cloud有很多组件。
下面可以看下一个由Spring Cloud组成的架构图:
image.png
这里包含的Spring相关组件,有这些,下面一一介绍一下。
image.png

服务注册和发现组件 Eureka

介绍

利用 Eureka 组件可以很轻松地实现服务的注册和发现功能。Eureka 组件提供了服务的健康监测,以及界面友好的 UI 。通过 Eureka 组件提供的 UI,Eureka 组件可以让开发人员随时了解服务单元的运行情况。另外 Spring Cloud 也支持 Consul 和Zookeeper ,用于注册和发现服务。

Eureka原理

image.png
Eureka Server:服务注册中心。Eureka Server本身也集成了Eureka Client,彼此通过Eureka Client同步数据给其它实例又或者从其他实例同步数据。包含功能:服务剔除。
Application Service:服务提供者。包含功能:服务注册,服务同步,服务续约,服务下线。
Application Client:服务消费者。包含功能:获取服务,服务调用。
Make Remote Call:RESTful API的调用
us-east-1c,us-east-1d:表示一个集群。

CP VS AP

CAP原则又称CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。CAP理论提出就是针对分布式数据库环境的,所以,P这个属性是必须具备的。Eureka保证AP,zookeeper保证CP。zookeeper通过zab协议维持强一致性,服务注册相对Eureka会慢一些,要求过半数节点都写入成功才认为注册成功,Leader挂掉时,需要重新选举,整个集群不可用。Eureka为了实现更高的服务可用性,牺牲了一定的一致性。

客户端负载均衡 Spring Cloud Ribbon

介绍

Spring Cloud Ribbon是一个基于HTTP和TCP的客户端负载均衡工具,它基于Netflix Ribbon实现,可以将面向服务的REST模板请求自动转换成客户端负载均衡的服务调用。

使用

在Spring Cloud中,当Ribbon与Eureka配合使用时,Ribbon可自动从Eureka Server获取服务提供者地址列表,并基于负载均衡算法,选择其中一个服务提供者实例。

架构

image.png

RestTemplate

RestTemplate实现了对HTTP请求的封装处理,形成了一套模板化的调用方式。

几种负载均衡策略

  1. RandomRule:随机选取负载均衡策略,随机Random对象,在所有服务实例中随机找一个服务的索引号,然后从上线的服务中获取对应的服务。
  2. RoundRobinRule:线性轮询负载均衡策略。
  3. WeightedResponseTimeRule:响应时间作为选取权重的负载均衡策略,根据平均响应时间计算所有服务的权重,响应时间越短的服务权重越大,被选中的概率越高。刚启动时,如果统计信息不足,则使用线性轮询策略,等信息足够时,再切换到WeightedResponseTimeRule。
  4. RetryRule:使用线性轮询策略获取服务,如果获取失败则在指定时间内重试,重新获取可用服务。
  5. ClientConfigEnabledRoundRobinRule:默认通过线性轮询策略选取服务。通过继承该类,并且对choose方法进行重写,可以实现更多的策略,继承后保底使用RoundRobinRule策略。
  6. BestAvailableRule:继承自ClientConfigEnabledRoundRobinRule。从所有没有断开的服务中,选取到目前为止请求数量最小的服务。
  7. PredicateBasedRule:抽象类,提供一个choose方法的模板,通过调用AbstractServerPredicate实现类的过滤方法来过滤出目标的服务,再通过轮询方法选出一个服务。
  8. AvailabilityFilteringRule:按可用性进行过滤服务的负载均衡策略,会先过滤掉由于多次访问故障而处于断路器跳闸状态的服务,还有并发的连接数超过阈值的服务,然后对剩余的服务列表进行线性轮询。
  9. ZoneAvoidanceRule:本身没有重写choose方法,用的还是抽象父类PredicateBasedRule的choose。
    一般情况,选择默认的就可以了。

声明式服务调用 Spring Cloud Feign

介绍

Feign是一个声明式的 Web Service 客户端,只需要创建一个接口,并加上对应的 Feign Client 注解,即可进行远程调用。

特性

  1. 可插拔的注解支持
  2. 可插拔的 HTTP 编码器、解码器
  3. 支持 Hystrix 断路器、Fallback
  4. 支持 Ribbon 负载均衡
  5. 支持 HTTP 请求、响应压缩

熔断组件 Spring Cloud Hystrix

介绍

Hystrix是Netflix开源的一款容错框架,包含常用的容错方法。在高并发访问下,系统所依赖的服务的稳定性对系统的影响非常大,依赖有很多不可控的因素,比如网络连接变慢,资源突然繁忙,暂时不可用,服务脱机等。Hystrix利用熔断、线程池隔离、信号量隔离、降级回退等方法来处理依赖隔离,使系统变得高可用。

线程池VS信号量

相对于其他的熔断器,Hystrix可以采用线程池来做隔离,好处是通过将发送请求线程与执行请求的线程分离,可有效防止发生级联故障,但是弊端是就是它增加了计算的开销,每个业务请求(被包装成命令)在执行的时候,会涉及到请求排队,调度和上下文切换。而且他的线程池是针对每个服务,所以服务较多的场景下,那就会有很多个线程池,这也增加了系统本身的开销。

服务网关 Spring Cloud Gateway

介绍

Spring Cloud Gateway是Spring官方推出的服务网关的实现框架,相对于服务网关的概念有点类似于传统的反向代理服务器(如nginx),但反向代理一般都只是做业务无关的转发请求,而服务网关与服务的整合程度更高,可以看作也是整个服务体系的组成部分,通过过滤器等组件可以在网关中集成一些业务处理的操作(比如权限认证等)。

核心概念

  1. Route: 负责将某个外部请求路由到一个合适的地址,包含一个ID,一个目标地址,一系列的Predicate和Filter;
  2. Predicate: 基于Java 8 Function Predicate的断言机制,用于将请求匹配到某一个Route
  3. Filter: 类似于Servlet filter,可以在请求传递给下一级处理器之前对请求或响应进行修改,用于实现权限验证,日志记录,限流等功能

原理图

image.png
Gateway Client发送请求给Spring Cloud Gateway,Gateway Handler Mapping会判断请求的路径是否匹配路由的配置,如果匹配则会进入Gateway Web Handler,Web Handler会读取路由上所配置的过滤器,然后将该请求交给过滤器去处理,最后转发到路由配置的微服务。

消息驱动的微服务 Spring Cloud Stream

介绍

Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架。

特点

  1. 屏蔽底层 MQ 实现细节,Spring Cloud Stream 的 API 是统一的。如果从 Kafka 切到 RocketMQ,可以直接修改配置。
  2. 与 Spring 生态整合更加方便。Spring Cloud Data Flow的流计算都是基于 Spring Cloud Stream;Spring Cloud Bus 消息总线内部也是用的 Spring Cloud Stream。

Spring Mesh 下一代的Spring Cloud

介绍

根据Linkerd CEO William Morgan定义,Service Mesh是用于处理服务间通信的基础设施层,用于在云原生应用复杂的服务拓扑中实现可靠的请求传递。在实践中,Service Mesh通常是一组与应用一起部署,但对应用透明的轻量级网络代理。目前集团也有很多落地的场景,蚂蚁金服在双11大规模落地了 SOFAMosn,阿里巴巴在双11的部分电商核心应用上落地了完整的 Service Mesh 解决方案,可以侧面说明这块的确是未来的发展趋势。目前比较流行的开源软件 Istio 和 Linkerd 都是Service Mesh的实现。

架构

image.png
Service Mesh,它将分布式服务的通信抽象为单独一层,在这一层中实现负载均衡、服务发现、认证授权、监控追踪、流量控制等分布式系统所需要的功能,作为一个和服务对等的代理服务,和服务部署在一起,接管服务的流量,通过代理之间的通信间接完成服务之间的通信请求。替换的组件包括网关(gateway或者Zuul,由Ingress gateway或者egress替换),熔断器(hystrix,由SideCar替换),注册中心(Eureka及Eureka client,由Polit,SideCar替换),负责均衡(Ribbon,由SideCar替换),链路跟踪及其客户端(Pinpoint及Pinpoint client,由SideCar及Mixer替换)。

优点

  1. 屏蔽分布式系统通信的复杂性(负载均衡、服务发现、认证授权、监控追踪、流量控制等等),服务只用关注业务逻辑;
  2. 真正的语言无关,服务可以用任何语言编写,只需和Service Mesh通信即可;
  3. 对应用透明,Service Mesh组件可以单独升级;

推荐学习书籍

  1. 《Spring Cloud 微服务实战》:这本书比较初级,适合大概了解一下所有组件的实用方式。
  2. 《分布式服务架构》:虽然不是Spring Cloud的,不过有不少分布式的实战案例,也比较推荐。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Nginx调优:gzip的相关设置

发表于 2021-10-09

问题:

公司项目在开发一项3D全景功能时,需要用到threejs这个框架进行3D渲染,一般3D文件的大小都在40M以上,这次我们渲染的文件在44M左右,下载所花费的时间是80s。

以前的解决办法:

耐心劝说产品经理,这是正常现象,这么大文件肯定要这么多秒,别急啊,又不是加载不出来,你看,这不是加载出来了吗?

现在的解决办法:

文件这么大,是否可以压缩后再传输?这里就引出nginx gzip的概念,gzip分为两种压缩模式:

1、实时压缩,通过消耗cpu来实时压缩文件并返回给浏览器,该方式下响应头中Etag属性会有’W\’的字样,开启方式在nginx.conf中添加gzip on等属性即可。

2、静态文件压缩,这种方式需要提前将文件压缩成.gz文件,nginx在加载文件时会优先使用.gz文件,而不是实时压缩中的自行压缩方式,开启方式在nginx.conf中添加gzip_static on等属性,这里nginx中不自带gzip_static模块,需要编译并添加,此外需要在vue项目打包过程中配置gzip ->bingo成功

具体的详细参数设置详见 vue 项目开启gzip压缩和部署 vue项目开启Gzip压缩和性能优化 - guopengju - 博客园

效果:44Msize的ifc文件被压缩成8M,加载时间从80秒,降为12秒

image.png

此外,js、css、json等静态资源文件都得到了加载提升

Respect!

将来的解决办法:

请大家把更好的方法留在评论区!!!


衍生proxy_cache缓存

出发点:在做完上面gzip压缩后,大文件的传输问题基本得以解决,于是想进一步提高加载速度,静态资源在首次加载后,会产生memory cache和disk cache两种缓存,但是这类缓存都属于客户端缓存,换句话说,当另外一个用户访问网站时,他也是需要重新再加载一遍,针对此类问题,思考🤔🤔是否可以有服务端缓存,于是有了proxy_cache的想法:

用法很简单,各类博客都有说到:blog.csdn.net/qq_3172…

自己觉得重要的点:

1、proxy_cache的设置需要在proxy_pass的前提下,要不然无效

2、add_header Nginx-Cache $upstream_cache_status; 原来响应头中缓存是否中的属性是可以自定义。

3、这种缓存我分为两种情况:

(1)对后端响应的数据做缓存,可以将请求到的结果缓存下来,效果显著,但是弊端是针对实时性要求高的接口做缓存,在现实中不可取,还是得因地制宜吧。

(2)静态资源做缓存(不能直接添加proxy_cache,需要做一层代理转发):教程 www.jb51.net/article/…

效果不明显,但是通过分析别人的CDN资源,都有做缓存,是否真的有用后续再观察。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

02为什么要架构分层 1什么是分层架构 2分层的优点与缺

发表于 2021-10-09

1.什么是分层架构

将系统纵向的拆分为几层,每个层次有独立的功能,多个层次协同提供完整的功能

常见的分层模型

名称 实现细节 图片
MVC Model View Controller image.png
web+service+dao 表现层:展示用户指令 逻辑层:复杂业务的具体实现 数据访问层:处理与存储间的交互 image.png

分层的例子

名称 实现细节 图片
OSI七层网络模型 TCP/IP四层协议 网络层:负责端到端的寻址与建立连接 传输层:端到端的数据传输相邻两层会有数据交互 隔离关注点,不同的层专注不同的事情 image.png
Linux文件系统 VFS:虚拟文件系统,屏蔽系统间的差异层次是对下层不同实现的抽象 image.png

2.分层的优点与缺点

分层的优势 分层的劣势
分层可以简化系统设计,不同的人专注某一层的事情技术大盘:API,中台,基础架构 增加代码的复杂度简单的查询,需要在每个层次间进行数据传递
分层可以有很高的复用性中台、biz-lib、biz-common 每个层次单独部署,增加网络交互
分层可以便于做横向扩展针对某一层的瓶颈进行扩展 CPU耗时(增加机器),磁盘读写慢(SSD)….

3.如何分层

需要考虑清楚分层的边界问题

因需求变更和业务复杂度,导致分层模糊

image.png
终端显示层:各端模板渲染并执行显示的层。当前主要是 Velocity 渲染,JS 渲染, JSP 渲染,移动端展示等。

开放接口层:将 Service 层方法封装成开放接口,同时进行网关安全控制和流量控制等。

Web 层:主要是对访问控制进行转发,各类基本参数校验,或者不复用的业务简单处理等。

Service 层:业务逻辑层。

Manager 层:通用业务处理层。这一层主要有两个作用,其一,你可以将原先 Service 层的一些通用能力下沉到这一层,比如与缓存和存储交互策略,中间件的接入;其二,你也可以在这一层封装对第三方接口的调用,比如调用支付服务,调用审核服务等

DAO 层:数据访问层,与底层 MySQL、Oracle、HBase 等进行数据交互。

外部接口或第三方平台:包括其它部门 RPC 开放接口,基础平台,其它公司的 HTTP 接口

Manager 层提供原子的服务接口(多次调用结果一致),Service 层负责依据业务逻辑来编排原子接口。

image.png

4.总结

分层架构是软件设计思想的外在体现,是一种实现方式。

单一原则:每一层的拥有单一的职责

迪米特法则:数据的交互不能跨层,必须在相邻层间进行

开闭原则:将抽象层与实现层分离,抽象层是对实现层共有特征的归纳总结(不可修改),具体实现无限扩展(随意替换)

当某一层复杂度过高后,将其拆分为多层

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

消息通知服务怎么做(含代码) 前言 业务场景 流程图 定时通

发表于 2021-10-09

前言

最新做了一个消息通知服务,涉及到了消息监听,消息封装,消息发送等方面,今天介绍一下我们引用的消息模板处理,供大家参考。

业务场景

  • 即时通知(收到消息后立即通知)
  • 延时通知(指定时段通知)

image.png

流程图

image.png

消息服务监听消息队列消息,判断消息是需要实时通知(立即发送),定时通知(在指定时间发送),实时通知则立即调用发送业务进行消息发送(需要根据业务场景的消息量进行具体设计,可以采用批量消费、多线程消费、按照通知的类型拆分成多个topic进行发送及消费,比如短信一个topic、邮件一个topic、推送一个topic等等方式,下面展示定时通知的代码实现)

定时通知

消息实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public class MessageInfo implements Serializable {

/**
* 通知类型 sms mail push
*/
private String type;

/**
* 通知用户id
*/
private String userId;

/**
* 关键信息
*/
private Map<String, Object> info;

/**
* 通知时间(为空的时候实时推送 非空时候定时推送)
*/
private Long time;

// 省略get set...
}

监听消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
typescript复制代码@Component
public class MessageListener {

@Autowired
private TimingMessageCache timingMessageCache;

@Autowired
private SenderManager senderManager;

@Autowired
private TemplateService templateService;

@KafkaListener(topics = "message", group = "message-server-group")
public void messageListener(String message, Acknowledgment ack) {
try {
MessageInfo messageInfo = JSONUtil.toBean(message, MessageInfo.class);
Long time = messageInfo.getTime();
if (time != null) {
// 定时通知的消息放入定时消息缓存
timingMessageCache.add(messageInfo);
} else {
// 直接发送 获取模板
String templateCode = messageInfo.getTemplateCode();
String template = templateService.getTemplateByCode(templateCode);
String msg = TemplateUtils.buildMessage(template, messageInfo.getInfo());
String type = messageInfo.getType();
senderManager.send(messageInfo.getType(), messageInfo.getUserId(), msg);
}
ack.acknowledge();
} catch (Exception e) {
// 异常日志
}

}
}

监听业务系统发送的消息,根据time是否为空来判断是实时通知还是定时通知,定时通知放入定时通知缓存,实时通知直接发送,调用模板工具及发送实现

定时消息缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
ini复制代码@Component
public class TimingMessageCache {

@Resource
private RedisTemplate<String, MessageInfo> redisTemplate;

private static final String TIMING_MESSAGE_PREFIX = "timing_message:";

/**
* 添加
*
* @param messageInfo
*/
public void add(MessageInfo messageInfo) {
Long time = messageInfo.getTime();
String minute = FastDateFormat.getInstance("yyyy-MM-dd HH:mm").format(time);
String key = String.join(":", TIMING_MESSAGE_PREFIX, minute);
redisTemplate.opsForList().leftPush(key, messageInfo);
}

/**
* 消费指定分钟的所有消息 消息量不大的情况下可以使用此方法 否则可能出现oom
*
* @param time
* @return
*/
public List<MessageInfo> consumeAll(long time) {
String minute = FastDateFormat.getInstance("yyyy-MM-dd HH:mm").format(time);
String key = String.join(":", TIMING_MESSAGE_PREFIX, minute);
List<MessageInfo> messageInfoList = redisTemplate.opsForList().range(key, 0, -1);
redisTemplate.delete(key);
return messageInfoList;
}

/**
* 消费指定数目的消息、适合数量较大的场景下进行分批处理
*
* @param time
* @param limit
* @return
*/
public List<MessageInfo> consumePart(long time, int limit) {
List<MessageInfo> messageInfoList = new ArrayList<>();
for (int i = 0; i < limit; i++) {
String minute = FastDateFormat.getInstance("yyyy-MM-dd HH:mm").format(time);
String key = String.join(":", TIMING_MESSAGE_PREFIX, minute);
MessageInfo messageInfo = redisTemplate.opsForList().rightPop(key);
if (messageInfo != null) {
messageInfoList.add(messageInfo);
} else {
// 读到空了 结束循环
break;
}
}
return messageInfoList;
}

/**
* 获取指定时间消息数目
*
* @param time
* @return
*/
public long size(long time) {
String minute = FastDateFormat.getInstance("yyyy-MM-dd HH:mm").format(time);
String key = String.join(":", TIMING_MESSAGE_PREFIX, minute);
return redisTemplate.opsForList().size(key);
}
}

定时消息缓存,使用redis list进行存储,同一分钟的数据会存放在同一个list中,key示例 timing_message:2021-10-01 08:01

定时消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
scss复制代码@Component
@EnableScheduling
public class TimingMessageHandle {

@Autowired
private TimingMessageCache timingMessageCache;

@Autowired
private SenderManager senderManager;

@Autowired
private TemplateService templateService;

/**
* 消费定时消息
*/
@Scheduled(cron = "0 */1 * * * ?")
@Async
public void handle() {
Calendar calendar = Calendar.getInstance();
long time = calendar.getTimeInMillis();
consume(time);
}

/**
* 根据消费对应时间缓存中的消息
*
* @param time
*/
public void consume(long time) {
long size = timingMessageCache.size(time);
// 低于1000条 一次性消费完 这个1000可以进行配置 根据服务器性能进行调整
if (size < 1000) {
fewConsume(time);
} else {
// 数据量大 分批消费 每次消费1000条 解决总共条数非1000整数倍的情况 类似于分页读取最后一页的情况
for (long i = 0, limit; i < size; i += limit) {
limit = i + 1000 > size ? size - i : 1000;
send(timingMessageCache.consumePart(time, limit));
}
}
}


/**
* 消息量少的场景调用此方法
*/
private void fewConsume(long time) {
List<MessageInfo> messageInfoList = timingMessageCache.consumeAll(time);
send(messageInfoList);
}

/**
* 发送
*
* @param messageInfoList
*/
private void send(List<MessageInfo> messageInfoList) {
for (MessageInfo messageInfo : messageInfoList) {
// 根据模板编码获取对应模板
String templateCode = messageInfo.getTemplateCode();
// 根据模板编码获取模板
String template = templateService.getTemplateByCode(templateCode);
// 根据模板和消息关键数据生成消息内容
String msg = TemplateUtils.buildMessage(template, messageInfo.getInfo());
// 根据类型调用对应发送方法 短信、邮件、推送...
senderManager.send(messageInfo.getType(), messageInfo.getUserId(), msg);
}
}

}

前面将定时消息按照前缀+分钟时间作为缓存key,所以定时任务每分钟执行一次,用当前时间生成key去读取缓存,比如指定2021-10-01 08:10发送的消息,定时任务在2021-10-01 08:10就可以从缓存中读取到这些消息并处理掉。

发送实现(伪代码)

发送实现采用接口多实现,指定名称的方式处理,具体可以参考文章
文章链接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
typescript复制代码// 接口
public interface MessageSender {
void send(String userId, String msg);
}

// 短信发送实现
@Component("sms")
public class SmsSender implements MessageSender {
@Override
public void send(String userId, String msg) {
// 获取用户手机号
// 调用三方短信api发送短信
// 发送短信
}
}
// 邮件发送实现
@Component("mail")
public class MailSender implements MessageSender {
@Override
public void send(String userId, String msg) {
// 获取用户邮箱
// 调用邮箱api发送邮件
}
}
// 管理类 发送统一入口
@Component
public class SenderManager {

private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class);

/**
* 空实现避免获取不到对应发送实现出现空指针异常 同时可以打印日志
*/
private static final MessageSender EMPTY_RECEIVER = (userId, msg) -> {
LOGGER.info("none sender, userId = {}, msg = {}", userId, msg);
};

@Autowired
private Map<String, MessageSender> senderMessagesMap;

/**
* 根据类型调用对应发送实现
*
* @param type
* @param userId
* @param msg
*/
public void send(String type, String userId, String msg) {
senderMessagesMap.get(type).send(userId, msg);
}
}

模板相关(伪代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
typescript复制代码// 模板管理业务 提供根据模板code获取木板的方法
@Service
public class TemplateService {

public String getTemplateByCode(String templateCode) {
// 模板一般使用数据库进行存储 如果模板较少且改动得少 使用枚举进行管理也可以
return "";
}
}
// 模板工具类
public class TemplateUtils {

private TemplateUtils() {
}

public static String buildMessage(String template, Map<String, Object> info) {
ST st = new ST(template, '{', '}');
st.add("info", info);
return st.render();
}
}

模板的管理可以通过数据库管理或者配置文件,具体实现可以根据自己场景,模板code需要固定下来(约定大于配置),这样在业务系统中根据业务将对应模板code写死即可,在消息通知服务只要不改模板编码,随时调整模板都可以;下面部分介绍模板处理使用的依赖及使用fan

模板处理

通过几个demo演示模板操作的使用方法,供大家参考
示例模板:尊敬的客户,您购买的xxx商品已发货,预计在xxx时间到达目的地,请您注意查收。

引入ST4

1
2
3
4
5
6
7
xml复制代码<!-- 项目引入ST依赖 -->
<dependency>
<groupId>org.antlr</groupId>
<artifactId>ST4</artifactId>
<version>4.0.8</version>
<scope>compile</scope>
</dependency>

示例代码

1
2
3
4
5
6
7
8
9
10
11
typescript复制代码public static void main(String[] args) {
// 模板需要调整成ST支持格式
String template = "尊敬的客户,您购买的<info.goodsName>商品已发货,预计在<info.arrivalTime>时间到达目的地,请您注意查收。";
// 数据使用map或实体进行封装都可以
Map<String, Object> info = new HashMap<>(2);
info.put("goodsName", "iPhone 13 Pro Max 1T 远峰蓝色");
info.put("arrivalTime", "2021-10-01");
ST st = new ST(template, '<', '>');
st.add("info", info);
System.out.println(st.render());
}

ST对于模板变量占位方式比较灵活,但是这一块需要约定好,变量内容得边界使用’{}’还是’<>’都行,变量内容需要是xxx.xxx,因为我们复杂的模板数据可能来自多条不同数据、比如一个物流信息的数据可能来自于用户基础信息和订单信息及物流信息,这多个数据可能有相同的变量名,具体示例入下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
typescript复制代码public static void main(String[] args) {
String template = "尊敬的{user.name},您购买的{goods.name}商品已经发货,物流编号为{logistics.number},预计到货时间为{logistics.arrivalTime},请您注意按时查收。";
Map<String, Object> goods = new HashMap<>(2);
goods.put("name", "iPhone 13 Pro Max 1T 远峰蓝色");
goods.put("price", 12999);

Map<String, Object> user = new HashMap<>(2);
user.put("name", "杨女士");
user.put("age", 38);

Map<String, Object> logistics = new HashMap<>(2);
logistics.put("arrivalTime", "2021-10-01");
logistics.put("destination", "杭州");
logistics.put("number", "SF10000001");
ST st = new ST(template, '{', '}');
st.add("user", user);
st.add("goods", goods);
st.add("logistics", logistics);
System.out.println(st.render());
}

总结

一个消息通知服务技术难度并不大,更多的是结合自己的业务场景,业务量来进行设计,在微服务场景下,一个好用且灵活的通知服务能够减轻整个业务系统在消息通知业务上的开发投入,将定时通知的业务处理交给通知服务也是一个可行的方案,结合redis list + 定时任务,技术方案简单,业务灵活性强,业务系统需要给不同用户在不同时间点发送消息,只需要在一个时间点将所有的消息发送到消息系统,比如有100个用户开启了每日通知,且他们通知时间不同,业务系统只需要启动一个每日执行一次的定时任务,一次将所有的通知发送给消息系统,消息系统自行根据通知时间进行通知,这样对于业务系统来说业务更加简单,所有的通知都可以通过一个每日执行一次的定时任务全部发送到消息系统即可,且任务频率降低了,消息通知的压力只需要在消息服务处理即可。

升级空间

  • kafka修改为批量消费,参考文章
  • topic分区消费,消息服务分布式部署,参考文章
  • 拆分不同类型的通知为多个topic,如message-sms message-mail message-push,分别消费处理
  • 发送部分升级为多线程处理

本文更多是为了分享技术方案及业务解决思路,在业务细节比较抽象,部分使用伪代码代替,如有不解之处可以评论中提出来

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

谈谈java中的反射 什么是反射 反射的基础之class 反

发表于 2021-10-09

本文已参与「掘力星计划」,赢取创作大礼包,挑战创作激励金

抽奖说明在文末哦!

什么是反射

反射机制指的是程序在运行时能够获取自身的信息即在程序运行时动态加载类并获取类的详细信息,从而操作类或对象的属性和方法。本质是JVM得到class对象之后,再通过class对象进行反编译,从而获取对象的各种信息。反射是Java的特征之一,是一种间接操作目标对象的机制。

反射的基础之class

Class类是反射实现的基础,要想理解反射,首先要理解Class类。在程序运行期间,虚拟机为所有的对象维护一个被称为运行时的类型标识,这个信息跟踪着每个对象所属的类的完整结构信息,包括包名、类名、实现的接口、拥有的方法和字段等。可以通过特殊的Java类访问这些信息,这个类就是Clas 类。可以把Class类理解为类的类型,一个Class对象,称为类的类型对象,一个Class对象对应一个加载到JVM中的一个.class文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public final class Class<T> implements java.io.Serializable,
GenericDeclaration,
Type,
AnnotatedElement {
private static final int ANNOTATION= 0x00002000;
private static final int ENUM = 0x00004000;
private static final int SYNTHETIC = 0x00001000;

private static native void registerNatives();
static {
registerNatives();
}

...
}

首先JVM会将代码编译成一个.class字节码文件,然后被类加载器(Class Loader)加载进JVM的内存中,同时会创建一个类的Class对象存到堆中(注意这个不是new出来的对象,而是类的类型对象)。JVM在创建类对象前,会先检查其类是否加载,寻找类对应的Class对象,若加载好,则为其分配内存,然后再进行初始化。

在加载完一个类后,堆内存的方法区就产生了一个Class对象,这个对象就包含了完整的类的结构信息,通过这个Class对象可以看到类的结构,就好比一面镜子。所以称之为:反射。
更多class类加载知识请参考前面的文章:jvm系列:类加载机制

反射的基本使用

反射机制的常用的类

Java.lang.Class;

Java.lang.reflect.Constructor;

Java.lang.reflect.Field;

Java.lang.reflect.Method;

Java.lang.reflect.Modifier;

反射常用的API

对象. setAccessible(true);

Class:

获取公开属性:getField(“属性名”);

获取私有属性:getDeclaredField(“属性名”);

通过指定方法名称获取公开无参方法对象:getMethod(“方法名”, null);

获取所有公开方法对象:getMethods();

获取所有方法对象:getDeclaredMethods();

通过指定方法名称获取私有有参方法对象:getDeclaredMethod(“方法名”, 方法参数的类型……);

调用公开有参构造:getConstructor(构造参数类型);

调用私有有参构造:getDeclaredConstructor(构造参数类型);

判断是否是某个类的实例:isInstance(obj);

Field:

获取属性名:getName();

获取属性的类型:getType();

获取属性的修饰符:getModifiers();

设置属性值 1:类实例化对象 要设置的参数值:set(obj,obj);

Method:

方法对象.invoke(类实例化对象, 方法的参数数组); //执行方法

Constructor:

NewInstance();//通过构造获取到类的实例对象

下面看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ini复制代码public class Test(){
A a=new A();

Class c1=A.class;//任何一个类都有一个隐含的静态成员变量class

Class c2=a.getClass();//在已知类的对象的情况下通过getClass方法获取

Class c3 = null;

try {
c3 = Class.forName("com.xxxx.A");//类的全称
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}

class A{}

方法的反射:getName

getName方法可以打印出类类型的类名称。

1
2
3
4
5
6
7
8
arduino复制代码public class Test2 {
public static void main(String[] args) {
Class c1 = String.class;//String类的类类型
Class c2 = void.class;
System.out.println(c1.getName());
System.out.println(c2.getName());
}
}

成员变量反射

1
2
3
4
5
typescript复制代码public class TestUtil {
public static void printFieldMessage(Object obj){
Class c = obj.getClass();
//Field[] fs = c.getFields();
}

getFields()方法获取的所有的public的成员变量的信息。和方法的反射那里public的成员变量,也有一个获取所有自己声明的成员变量的信息:Field[] fs = c.getDeclaredFields(),然后遍历得到的结果如下:

1
2
3
4
5
6
7
8
ini复制代码for (Field field : fs) {
//得到成员变量的类型的类类型
Class fieldType = field.getType();
String typeName = fieldType.getName();
//得到成员变量的名称
String fieldName = field.getName();
System.out.println(typeName+" "+fieldName);
}

构造函数的反射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ini复制代码public static void Test(Object obj){
Class c = obj.getClass();
/*
* java.lang. Constructor中封装了构造函数的信息,它有两个方法:
* getConstructors()方法获取所有的public的构造函数
* getDeclaredConstructors()方法得到所有的自己声明的构造函数
*/
//Constructor[] cs = c.getConstructors();
Constructor[] cs = c.getDeclaredConstructors();
for (Constructor constructor : cs) {
System.out.print(constructor.getName()+"(");
//获取构造函数的参数列表->参数列表的类类型
Class[] paramTypes = constructor.getParameterTypes();
for (Class class1 : paramTypes) {
System.out.print(class1.getName()+",");
}
System.out.println(")");
}
}

总结

反射在实际编程中应用并不多,但是很多设计都与反射机制有关,比如:动态代理机制,JDBC连接数据库,
Spring/Hibernate框架(实际上是因为使用了动态代理,所以才和反射机制有关)。

抽奖说明

1.本活动由掘金官方支持 详情可见juejin.cn/post/701221…

2.通过评论和文章有关的内容即可参加,评论越走心越有机会中奖哦。

3.本月的文章都会参与抽奖活动,欢迎大家多多互动哦!

4.除掘金官方抽奖外本人也将送出周边礼物(马克杯一个和掘金徽章若干,马克杯将送给走心评论,徽章随机抽取,数量视评论人数增加)。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

聊聊我对 GraphQL 的一些认知

发表于 2021-10-09

每隔一段时间就能看到一篇 GraphQL 的文章,但是打开文章一看,基本上就是简单的介绍下 GraphQL 的特性。很多文章其实就是 github 上找个 GraphQL 的项目,然后按照对应的 demo 跑起来而已。有些文章明显是没有完整的项目实践经历,却在狂吹 GraphQL 的各种优点,让不熟悉 GraphQL 的同学以为这是神丹妙药,弄不好还要在项目中实践一番。

因为项目的背景(后面会讲到),我有幸参与过 GraphQL 在实际项目中的落地,本篇文章我会谈谈我对 GraphQL 的一些理解,当然这个也仅供读者参考。

GraphQL 优势

不知道大家有没有遇到过这样的一些场景,某个服务有几十个接口,更有甚者上百个也是有可能的。APP 或者其他下游要封装一个功能,需要调用 10 个接口左右,可能这些接口还涉及到不同的团队。不管开发,联调,测试,还是对于调用方,整个链条功能太多了。随着这些功能经过多个版本的迭代升级后,新+旧版本的接口谁也不敢大规模改动了,只能在原来的基础上做代码拼接,这基本就是祖传代码的由来。大部分同学基本的代码素养是有的,但是也只能任由这些祖传代码慢慢腐烂,原因为很简单,谁也不敢保证改动之后功能是不是有遗漏的地方。

有没有这样一个功能,将这些接口做一下聚合,然后将结果的集合返回给前端呢?在目前比较流行微服务架构体系下,有一个专门的中间层专门来处理这个事情,这个中间层叫 BFF(Backend For Frontend)。我曾经工作过的某公司想要将某业务的售卖相关功能给公司其他业务使用,但是接入方一看那么多接口,瞬间就决定不接了,逼不得已该业务平台紧急开发了 BFF 相关功能让其他业务方接入。

有些同学在稍微大点规模的公司做的工作就是合并各种接口,然后返回给调用方,这基本上就是 BFF 的主要工作内容了。除了类似 BFF 组建接入平台的方式,是否还有其他的方式能够只发出一个请求就能获取到一系列的接口返回值呢?

我在京东 APP 上随便截图了一个商品

类似这个页面,当用户打开这个页面的时候,按照目前比较流行的 REST 接口,需要 APP 至少发起下面这些请求:

  • 获取商品详情接口
  • 获取商品价格、优惠相关的接口
  • 获取评价接口
  • 获取种草秀接口
  • 获取问答接口

这些接口一般来说都比较重,里面有很多当前页面并不需要的字段,那有没有一种可能:APP 端发一次请求就能获取这个页面需要的所有字段,同时 APP 还能根据自己的需求只请求自己需要的字段呢?

答案是肯定的,那就是 GraphQL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
graphql复制代码query jdGoodsQuery {
goods {
detail {
id
pictures(first: 10) {
pic_id
thumb
}
spec {
name
size
weight
}
}
price {
price
origin_price
market_price
}
comment(first: 10) {
comment_id
topic_id
content
from_uid
}
self_show(first: 10) {
id
pic_id
}
}
}

对于上面京东商品详情的截图,类似这样的一个 Query 就可以把这个页面需要的所有的字段都获取到。

对于 REST 接口还有另外一个比较棘手的问题。当业务升级的时候,接口不可避免的需要升级,一个比较常见的问题,某个字段在新版本升级后不需要了,如何优雅的处理旧接口以及旧字段呢?

可能有些同学说,可以强行推动下游去升级,限定期限进行升级,如果下游不升级概不负责,这种方式基本只能自己欺骗自己了,因为当你接口下线导致业务方报错,这个责任只能你自己负责了。特别的,当某些接口是直接面对消费者时,这个问题会变得更加棘手。

2016 年那会就职过的一家做 AI 产品的公司,上市了一款智能读书的儿童产品,其基本功能就是绘本图书翻一页读一页,这也算是 HomeAI 落地比较成功的产品了,这个产品在天猫、京东的销量还可以。在 AI 风口的那会,最开始的定位的功能就是读书,随着市场的拓展,功能也逐渐变得复杂起来,当时最痛苦的事情莫过于短短 3 个月内,接口版本号从 v1 增加到 v12。由于当时产品的理念认为强制升级是不优美的,不符合产品设计美学,导致这款产品是没有强制升级功能的,于是导致的结果是 v1 到 v12 的接口总是有用户在使用的。你可能会说发公告、短信通知用户在某个期限内升级,如果不升级,出现产品无法使用情况概不负责,这也是行不通的,特别那些用户付费的产品,如果万一不能使用的话,那 12315 是会找上门的。

就在我们被无法下线的 API 接口折磨的时候,经过调研之后发现 GraphQL 正好有一个功能,“API 演进无需划分版本”,这不是瞌睡来了就有枕头吗,于是在技术负责人带领下(GraphQL 改造项目还没上线,他去了 Microsoft),我们开始推进 GraphQL 的改造工作。

说到这里基本上就把 GraphQL 的最容易吸引人的优点介绍完了。我们要知道没有任何技术是“银弹”,当某些人给你狂吹某个技术的优点,而没有说这项技术的缺点限制,或者简略的一笔带过的时候,你就需要小心了,说不定你就是那个小白鼠。

下面就进入我经历的 GraphQL 项目遇到的问题,也许我的处理方式是不正确,以下观点仅供参考。

GraphQL 难题

社区活跃度问题

GraphQL 是由 Facebook 开发,Facebook 也成立了 GraphQL 基金会,但是 Facebook 官方只提供了 JS 版本的开源实现,其他语言的实现都是 GraphQL 对应语言的非官方社区实现的,这就造成不同语言的理解和实现是有差异的。比如 Graphql schema 的合并工具,只有 JS 官方实现有对应的实现。

GraphQL 属于那种大家都觉得很不错,但是经过近 10 年的发展,依然是不温不火。

上面这个图是 GraphQL 官方的 landscape,你仔细看上面公司图标,一眼能看出来的大的公司只有 Github 了。Facebook 虽说是推出了 GraphQL 的规范以及 JS 的相关实现,但是他自己都没有放出有关 GraphQL 的实际接口,让人对这个技术的信任度都大打折扣。

当你遇到 GraphQL 问题的时候,要么搜出来全是 JS 的相关实现,要么干脆就没有人回答,总之 GraphQL 相关社区看着文档很丰富,其实遇到问题时能搜到解决问题的有用的资料并不多。

缓存问题

缓存对于 REST 接口来说是很容解决的,但是在 GraphQL 中却变得非常复杂。由于 GraphQL 的特性,即便它操作的是同一个实体,每次查询可能都各不相同。

举个例子,由于客户端可以自定义其需要的字段,如某次请求只需要某个人的名字,但是在另外一次查询中你可能也想知道他的消费积分。名字可能要查询 user.userProfile 库,而且消费积分可能要查询第三方系统。本次查询输入的是单个 user_id,下次查询输入的可能就是 userId_list 了,为了解决这些查询的缓存问题,你可能会设置很多 key 或者每个用户设置 key-value 放入缓存,这些都不是很优美的解决方案,归根到底还是因为 GraphQL 太多灵活,服务器的缓存如何设计都跟不上客户端的灵活的查询方式。

对于这个问题 Facebook 也有 DataLoader 的解决方案,当然只是 JS 版本的方案,其他语言的社区可能根本都没有对应的实现。按照我当初的调研这个东西真的不好用,还不如自己做缓存来得快。

GraphQL 缓存不只对服务端不友好,同时对客户端也是一个挑战,需要用户自己做客户端的缓存,因为 GraphQL Query 只有一个路由,而且都是 POST 方式。

网关问题

GraphQL 是强类型的,所以必须需要一个 schema 的存在。按照当初的实践经验,客户端有且只能存在一份 schema 文件,于是另外一个比较棘手的问题就出现了。

假如我们的服务是类似下图这个样子:

比如 server1 是商品服务,server2 是优惠服务,客户端如果要对接这两个服务的话,直接对接是不行的,因为客户端只能有一个 schema, 但是服务端却有两个 schema 文件。

这种情况该如何处理呢?当初经过调研发现 JS 提供了 schema merge 的工具,而且仅仅是个工具。其他语言压根就没有这个玩意。

这个的设计还会带来另外一个非常严重的问题,目前的 API 网关都是无法使用的。随着业务规模的扩大,走上微服务是迟早的事情,但是如果服务端全是基于 GraphQL 开发的,那么网关该如何处理呢?最近调研了下 APISIX 和 KONG 的最新版本,这两个业界有影响力的网关也仅仅是支持 GraphQL 协议的转发而已。

在我看来,在如今微服务比较流行市场下,GraphQL 唯一比较契合的场景的就是将 BFF 的 REST 换成 GraphQL,该 BFF 即做网关也做业务。其实这样做,也不是很完美,即限定了客户端只能有这一个 BFF,同时也让 BFF 变得不纯粹起来。看到美团有篇介绍 GraphQL 的实践的文章,就是让 GraphQL 做的 BFF 的相关工作,不过他们没有将 GraphQL 暴露给客户端,而是在 GraphQL 之上又包裹了一层 HTTP 接口。

复杂度问题

GraphQL 最大的好处就是客户端能按需查询,是便利了客户端,但是把问题的复杂度都移交给了服务器。服务器也不是想查就能查的,毕竟服务器也是资源限制的,不可能无限制的让客户端去索取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
css复制代码query deep3 {
viewer {
albums {
songs{
author {
company {
address {
...
}
}
}
}
}
}
}

由于 GraphQL 要追求从一个类型能查到 schema 的任意类型。比如这样的查询,能无限嵌套下去,每个 Type 对服务器来说都是对应的查询,服务器肯定承受不了的。

如何限制呢?GraphQL 提出的有复杂度和深度的相关概念,但是这两个值该如何去计算,只能靠服务器开发人员的估计。于是这样的场景就经常出现,开发初期约定复杂度 1000,过了两天客户端同学找了过来要提高到 3000,全是无尽的扯皮,不管设置多少,由于客户端查询的多样性总有不够用的情形出现。而且这个复杂度、深度是全局的,不是每一个 Query 能单独配置,这样就会造成这两个值最后变得可有可无。

限流问题

限流也是 GraphQL 的最难解决的难题之一,服务端不可能没有限流的,不然服务器稳定性就保障不了。对于 REST 来说接口的路由都是固定不变的,针对于不同的 URI 做限流是很容做到的。但是 GraphQL 限流的难点在哪里呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
javascript复制代码query maliciousQuery {
album(id: "some-id") {
photos(first: 9999) {
album {
photos(first: 9999) {
album {
photos(first: 9999) {
album {
#... Repeat this 10000 times...
}
}
}
}
}
}
}
}

这个请求会导致什么问题呢?客户端会发起一个请求 maliciousQuery,这个请求会去查询 some-id 的 album,这个 album 获取这个相册里的最大 9999 张图片,每个图片又要查询到所属的相册,就这么无限制的嵌套下去。这样的查询,服务器根本就影响不来,上面说到的复杂度和深度其实有点用处,但是用处不大。

曾经遇到这样的真实场景,GraphQL 项目已经部署线上,复杂度和深度也配置了,客户端同学在获取商品分页列表的同时也将对应商品详情以及商品详情的级联内容给取了出来,导致的结果服务器直接 OOM。原因跟上面这个例子很相似,就是嵌套的查询过多导致的。这个问题其实跟复杂度和深度是相关的,但是复杂度和深度真的是很难评估。

所以 GraphQL 限流的难题就在于客户端只发起一次请求,但是在服务器端可能被放大无数倍。如何能够有效的评估某个值能让客户端的嵌套比较合理,根据实践经验来看,这个值不是官方提供的复杂度和深度能解决的。

不过好消息 GraphQL 提供一种评估 GraphQL 限流的方式,另外一个中文版本的理论解析解决 GraphQL 的限流难题。理论是一回事,不过实施起来依然是困难重重。

结论

本文主要介绍了我曾经经历过的 GraphQL 落地的一点感悟,距离如今也有一段时间了,GraphQL 留给我的印象就停留在这些无法解决的问题上。曾经有人咨询我想用 GraphQL 去重构某个服务,被我比较激动的给打消了这个念头。这篇文章可能也有写的不对的地方,欢迎同学们指出。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

django 项目部署详细教程 【uwsgi + nginx

发表于 2021-10-09

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

本文同时参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金

项目部署

  1. 环境搭建

不使用 Anaconda 可直接跳到创建虚拟环境

1.1 Anaconda 下载

  • anaconda + virtualenv
  • anaconda下载: 清华大学开源软件镜像 or 官网

1.2 Anaconda 安装

  • 下载好后传到Linux
    在这里插入图片描述
  • 安装
1
ruby复制代码  root@iZwz9ijwralw5z37wd16xsZ:~# bash ~/Downloads/Anaconda3-5.3.1-Linux-x86_64.sh

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uxvB4xoo-1596510168906)(C:\Users\user\AppData\Roaming\Typora\typora-user-images\image-20200726085008704.png)]

  • 回车安装到默认路径即可
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QVZgCMiB-1596510168908)(C:\Users\user\AppData\Roaming\Typora\typora-user-images\image-20200726085429443.png)]
  • 建议输入yes,输入No的话还需要自己手动添加路径,否则conda将无法正常运行
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xnxv4n7e-1596510168910)(C:\Users\user\AppData\Roaming\Typora\typora-user-images\image-20200726085610006.png)]
  • 是否安装VSCode,随便,我这里不安装
  • 启动服务
1
ruby复制代码root@iZwz9ijwralw5z37wd16xsZ:~# source ~/.bashrc
  • 输入conda list , 安装成功会显示包名

1.3 环境配置

  • 创建 anaconda 环境
+ conda create -n name python版本号



1
ruby复制代码root@iZwz9ijwralw5z37wd16xsZ:~# conda create -n MyDjango python=3.6.2

1.4 创建虚拟环境

  • 下载虚拟环境pip包
    • pip install virtualenv
    • pip install virtualenvwrapper
  • 修改 .bashrc 文件
1
2
3
4
5
6
7
8
bash复制代码vim ~/.bashrc
# 添加下面代码
export WORKON_HOME=$HOME/.virtualenvs
export PROJECT_HOME=$HOME/workspace
source /usr/local/bin/virtualenvwrapper.sh

# 使 .bashrc 生效
source ~/.bashrc
  • 使用 anaconda环境/python环境 创建新虚拟环境
+ mkvirtualenv --python='指定路径' 环境名称
+ eg:
1
ini复制代码mkvirtualenv --python='/root/anaconda3/envs/MyDjango/bin/python' MyDjango

默认情况下,新创建的环境将会被保存在/Users/<user_name>/anaconda3/env目录下,其中,<user_name>为当前用户的用户名。
不指定python环境的话,默认会选择本地的python版本

  • 进入虚拟环境
+ workon MyDjango
  1. 项目拉到服务器

2.1 项目迁移

2.1.1 方法一(Xftp)

  • 通过 Xftp 将项目文件从本机直接拉到服务器
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VIpr7KCT-1596510168912)(C:\Users\user\AppData\Roaming\Typora\typora-user-images\image-20200726094725914.png)]

2.2.2 方法二(git)

  • 服务器安装git
    • apt-get install git
  • 通过 git clone 命令将仓库克隆到服务器
    • git clone gitee.com/ruochenchen…

2.2 安装项目依赖包

  • 在本机项目中通过 pip freeze > requirement.txt 命令将项目所需包输出到 requirement.txt 文件中
  • 服务器端项目中。通过 pip install -r requirements.txt 命令安装包
  1. uwsgi

  • 遵循 wsgi 协议的 web 服务器

在这里插入图片描述

3.1 uwsgi 的安装

  • pip install uwsgi

3.2 uwsgi 的配置

  • 项目部署时,需要修改 settings.py 文件
1
2
ini复制代码DEBUG=FALSE
ALLOWED_HOSTS=['*']
  • 在项目中新建一个文件 uwsgi.ini,方便启动/终止 uwsgi 服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ini复制代码[uwsgi]
# 使用nginx链接时使用
# socket=127.0.0.1:8000
# 直接做web服务器使用 python manage.py runserver ip:port
http=127.0.0.1:8000
# 项目目录 [pwd查看 直接填,不需要引号]
chdir=
# 项目中wsgi.py文件的目录,相对于项目目录
wsgi-file=
# 指定启动的工作进程数
processes=4
# 指定工作进程中的线程数
threads=2
# 进程中,有一个主进程
master=True
# 保存启动之后主进程的pid
pidfile=uwsgi.pid
# 设置uwsgi后台运行, uwsgi.log 保存日志信息
daemonize=uwsgi.log
# 设置虚拟环境的路径 [cd .virtualenvs]
virtualenv=

下面是我的uwsgi.ini 文件配置,可以参考一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ini复制代码[uwsgi]
# 使用nginx链接时使用
socket=127.0.0.1:8000
# 直接做web服务器使用 python manage.py runserver ip:port
# http=127.0.0.1:8000
# 项目目录 [pwd查看 直接填,不需要引号]
chdir=/root/MyDjango
# 项目中wsgi.py文件的目录,相对于项目目录
wsgi-file=MyDjango/wsgi.py
# 指定启动的工作进程数
processes=4
# 指定工作进程中的线程数
threads=2
# 进程中,有一个主进程
master=True
# 保存启动之后主进程的pid
pidfile=uwsgi.pid
# 设置uwsgi后台运行, uwsgi.log 保存日志信息
daemonize=uwsgi.log
# 设置虚拟环境的路径 [cd .virtualenvs]
virtualenv=/root/.virtualenvs/MyDjango

3.3 uwsgi 的启动和停止

  • 启动: uwsgi –ini 配置文件路径
    • 例如: uwsgi –ini uwsgi.ini
    • 启动后,ps aux | grep uwsgi 命令可以查看uwsgi 进程
  • 停止: uwsgi –stop uwsgi.pid 路径
    • 例如: uwsgi –stop uwsgi.pid

这里有一个小问题,如果遇到可以看一下解决方法

问题:

​ 执行启动后,报错信息 uwsgi: error while loading shared libraries: libpcre.so.1: cannot open shared object file: No such file or directory

解决方法:

​ sudo apt-get install libpcre3 libpcre3-dev # 安装需要的包

​ find / -name libpcre.so.3 # 找到libpcre.so.3(一般在根目录/lib/x86_64-linux-gnu下)

​ 找到 /lib/x86_64-linux-gnu/libpcre.so.3

​ sudo ln -s /lib/x86_64-linux-gnu/libpcre.so.3 /usr/lib/libpcre.so.1 # 做软链接即可

仅仅使用uwsgi,首页的静态文件不能显示

解决方法:

在uwsgi前面在加一个nginx服务器

nginx中进行配置

  1. 如果是动态请求,转交请求给uwsgi
  2. 如果是静态请求,提前把项目用到的静态文件放到nginx所在电脑的某个目录中

根据配置,nginx就会去目录下方找到静态文件,直接返回给用户

  1. nginx

在这里插入图片描述

4.1 nginx 配置转发请求给 uwsgi

  • uwsgi 需要更改配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ini复制代码[uwsgi]
# 使用nginx链接时使用
socket=127.0.0.1:8080
# 直接做web服务器使用 python manage.py runserver ip:port
# http=127.0.0.1:8080
# 项目目录 [pwd查看 直接填,不需要引号]
chdir=
# 项目中wsgi.py文件的目录,相对于项目目录
wsgi-file=
# 指定启动的工作进程数
processes=4
# 指定工作进程中的线程数
threads=2
# 进程中,有一个主进程
master=True
# 保存启动之后主进程的pid
pidfile=uwsgi.pid
# 设置uwsgi后台运行, uwsgi.log 保存日志信息
daemonize=uwsgi.log
# 设置虚拟环境的路径 [cd .virtualenvs]
virtualenv=
  • nginx配置转发请求给 uwsgi,配置路径为 /etc/nginx/sites-available/default
1
2
3
4
5
6
7
8
9
10
11
12
13
ini复制代码http{
server {
listen 80;
server_name localhost;

location / {
# 包含uwsgi的请求参数
include uwsgi_params;
# 转交请求给uwsgi
uwsgi_pass 127.0.0.1:8000 # uwsgi服务器的ip:port
}
}
}
  • 下面是我的 default 配置信息,加了静态文件处理【静态文件处理请继续往下看】,可以参考一下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ini复制代码upstream MyDjango {
server 127.0.0.1:8000;
}

server {
listen 80;
server_name localhost;

location / {
include uwsgi_params;
uwsgi_pass MyDjango;
}

location /static {
alias /var/www/MyDjango/static;
}
}

ps aux | grep nginx # 查看是否有nginx进程

  • 启动nginx
    • /etc/init.d/nginx start 【终止命令为: /etc/init.d/nginx stop】

也可以用如下命令
service nginx start
service nginx stop
service nginx restart
service nginx reload

  • 启动uwsgi
    • uwsgi –ini uwsgi.ini

4.2 nginx配置处理静态文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
csharp复制代码http{
server {
listen 80;
server_name localhost;

location / {
# 包含uwsgi的请求参数
include uwsgi_params;
# 转交请求给uwsgi
uwsgi_pass 127.0.0.1:8080 # uwsgi服务器的ip:port;
}

# 请求的路径中以static开头,就匹配下列地址
location /static {
# 指定静态文件存放的目录
alias 目录 例如: /var/xxx/static;
/var/www/MyDjango/static;
}
}
}
  • django settings.py 中配置收集静态文件路径
1
2
3
csharp复制代码# settings.py 中设置
STATIC_ROOT=收集的静态文件路径 例如: /var/xxx/static
# 例如,我的路径为 STATIC_ROOT = '/var/www/MyDjango/static'
  • 创建文件夹

sudo mkdir -p /var/www/MyDjango/static

普通用户没有权限,需要修改文件权限

sudo chmod 777 /var/xxx/static/

  • django 收集静态文件的命令
+ python manage.py collectstatic
+ 执行上面的命令会把项目中所使用的静态文件收集到 STATIC\_ROOT 指定的目录下
  • 重启nginx服务

/etc/init.d/nginx reload

到这里,项目就部署完成了,还有其他需求可以继续往下看。

4.3 nginx转发请求给另外地址

  • 在 location 对应的配置项中增加 proxy_pass 转发的服务器地址,

如当用户访问 127.0.0.1 时,在nginx 中配置把这个请求转发给 172.10.179.115:80(nginx)服务器,让这台服务器提供静态页面

  • nginx配置如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ini复制代码http{
server {
listen 80;
server_name localhost;

location / {
# 包含uwsgi的请求参数
include uwsgi_params;
# 转交请求给uwsgi
uwsgi_pass 127.0.0.1:8080 # uwsgi服务器的ip:port;
}

# 请求的路径中以static开头,就匹配下列地址
location /static {
# 指定静态文件存放的目录
alias 目录 例如: /var/xxx/static;
}

# 等号为精确匹配
location = / {
# 传递请求给静态文件服务器上的nginx
proxy_pass http://ip;
}
}
}

4.4 nginx 配置 upstream 实现负载均衡

在这里插入图片描述

  • nginx 配置负载均衡时,在 server 配置的前面增加 upstream 配置项
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
ini复制代码http{
# 名称随便起
upstream ruochen {
server 127.0.0.1:8080;
server 127.0.0.1:8081;
}

server {
listen 80;
server_name localhost;

location / {
# 包含uwsgi的请求参数
include uwsgi_params;
# 转交请求给uwsgi
# uwsgi_pass 127.0.0.1:8080 # uwsgi服务器的ip:port;
uwsgi_pass ruochen; # 上面名字是啥,这里就是啥
}

# 请求的路径中以static开头,就匹配下列地址
location /static {
# 指定静态文件存放的目录
alias 目录 例如: /var/xxx/static;
}

# 等号为精确匹配
location = / {
# 传递请求给静态文件服务器上的nginx
proxy_pass http://ip;
}
}
}

最后,欢迎大家关注我的个人微信公众号 『小小猿若尘』,获取更多IT技术、干货知识、热点资讯

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

我惊了!CompletableFuture居然有性能问题!

发表于 2021-10-09

本文已参与「掘力星计划」赢取创作大礼包,挑战创作激励金。

你好呀,我是歪歪。

国庆的时候闲来无事,就随手写了一点之前说的比赛的代码,目标就是保住前 100 混个大赛的文化衫就行了。

现在还混在前 50 的队伍里面,稳的一比。

其实我觉得大家做柔性负载均衡那题的思路其实都不会差太多,就看谁能把关键的信息收集起来并利用上了。

由于是基于 Dubbo 去做的嘛,调试的过程中,写着写着我看到了这个地方:

org.apache.dubbo.rpc.protocol.AbstractInvoker#waitForResultIfSync

先看我框起来的这一行代码,aysncResult 的里面有有个 CompletableFuture ,它调用的是带超时时间的 get() 方法,超时时间是 Integer.MAX_VALUE,理论上来说效果也就等同于 get() 方法了。

从我直观上来说,这里用 get() 方法也应该是没有任何毛病的,甚至更好理解一点。

但是,为什么没有用 get() 方法呢?

其实方法上的注释已经写到原因了,就怕我这样的人产生了这样的疑问:

抓住我眼球的是这这几个单词:

have serious performance drop。

性能严重下降。

大概就是说我们必须要调用 java.util.concurrent.CompletableFuture#get(long, java.util.concurrent.TimeUnit) 而不是 get() 方法,因为 get 方法被证明会导致性能严重的下降。

对于 Dubbo 来说, waitForResultIfSync 方法,是主链路上的方法。我个人觉得保守一点说,可以说 90% 以上的请求都会走到这个方法来,阻塞等待结果。所以如果该方法如果有问题,则会影响到 Dubbo 的性能。

Dubbo 作为中间件,有可能会运行在各种不同的 JDK 版本中,对于特定的 JDK 版本来说,这个优化确实是对于性能的提升有很大的帮助。

就算不说 Dubbo ,我们用到 CompletableFuture 的时候,get() 方法也算是我们常常会用到的一个方法。

另外,这个方法的调用链路我可太熟悉了。

因为我两年前写的第一篇公众号文章就是探讨 Dubbo 的异步化改造的,《Dubbo 2.7新特性之异步化改造》

当年,这部分代码肯定不是这样的,至少没有这个提示。

因为如果有这个提示的话,我肯定第一次写的时候就注意到了。

果然,我去翻了一下,虽然图片已经很模糊了,但是还是能隐约看到,之前确实是调用的 get() 方法:

我还称之为最“骚”的一行代码。

因为这一行的代码就是 Dubbo 异步转同步的关键代码。

前面只是一个引子,本文不会去写 Dubbo 相关的知识点。

主要写写 CompletableFuture 的 get() 到底有啥问题。

放心,这个点面试肯定不考。只是你知道这个点后,恰好你的 JDK 版本是没有修复之前的,写代码的时候可以稍微注意一下。

学 Dubbo 在方法调用的地方加上一样的 NOTICE,直接把逼格拉满。等着别人问起来的时候,你再娓娓道来。

或者不经意间看到别人这样写的时候,轻飘飘的说一句:这里有可能会有性能问题,可以去了解一下。

啥性能问题?

根据 Dubbo 注释里面的这点信息,我也不知道啥问题,但是我知道去哪里找问题。

这种问题肯定在 openJDK 的 bug 列表里面记录有案,所以第一站就是来这里搜索一下关键字:

bugs.openjdk.java.net/projects/JD…

一般来说,都是一些陈年老 BUG,需要搜索半天才能找到自己想要的信息。

但是,这次运气好到爆棚,弹出来的第一个就是我要找的东西,简直是搞的我都有点不习惯了,这难道是传说中的国庆献礼吗,不敢想不敢想。

标题就是:对CompletableFuture的性能改进。

里面提到了编号为 8227019 的 BUG。

bugs.openjdk.java.net/browse/JDK-…

我们一起看看这个 BUG 描述的是啥玩意。

标题翻译过来,大概意思就是说 CompletableFuture.waitingGet 方法里面有一个循环,这个循环里面调用了 Runtime.availableProcessors 方法。且这个方法被调用的很频繁,这样不好。

在详细描述里面,它提到了另外的一个编号为 8227006 的 BUG,这个 BUG 描述的就是为什么频繁调用 availableProcessors 不太好,但是这个我们先按下不表。

先研究一下他提到的这样一行代码:

1
2
ini复制代码 spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                    1 << 8 : 0; // Use brief spin-wait on multiprocessors

他说位于 waitingGet 里面,我们就去看看到底是怎么回事嘛。

但是我本地的 JDK 的版本是 1.8.0_271,其 waitingGet 源码是这样的:

java.util.concurrent.CompletableFuture#waitingGet

先不管这几行代码是啥意思吧,反正我发现没有看到 bug 中提到的代码,只看到了 spins=SPINS ,虽然 SPINS 调用了 Runtime.getRuntime().availableProcessors() 方法,但是该字段被 static 和 final 修饰了,也就不存在 BUG 中描述的“频繁调用”了。

于是我意识到我的版本是不对的,这应该是被修复之后的代码,所以去下载了几个之前的版本。

最终在 JDK 1.8.0_202 版本中找到了这样的代码:

和前面截图的源码的差异就在于前者多了一个 SPINS 字段,把 Runtime.getRuntime().availableProcessors() 方法的返回缓存了起来。

我一定要找到这行代码的原因就是要证明这样的代码确实是在某些 JDK 版本中出现过。

好了,现在我们看一下 waitingGet 方法是干啥的。

首先,调用 get() 方法的时候,如果 result 还是 null 那么说明异步线程执行的结果还没就绪,则调用 waitingGet 方法:

而来到 waitingGet 方法,我们只关注 BUG 相关这两个分支判断:

首先把 spins 的值初始化为 -1。

然后当 result 为 null 的时候,就一直进行 while 循环。

所以,如果进入循环,第一次一定会调用 availableProcessors 方法。然后发现是多处理器的运行环境,则把 spins 置为 1<<8 ,即 256。

然后再次进行循环,走入到 spins>0 的分支判断,接着做一个随机运算,随机出来的值如果大于等于 0 ,则对 spins 进行减一操作。

只有减到 spins 为 0 的时候才会进入到后面的这些被我框起来的逻辑中:

也就是说这里就是把 spins 从 256 减到 0,且由于随机函数的存在,循环次数一定是大于 256 次的。

但是还有一个大前提,那就是每次循环的时候都会去判断循环条件是否还成立。即判断 result 是否还是 null。为 null 才会继续往下减。

所以,你说这段代码是在干什么事儿?

其实注释上已经写的很清楚了:

Use brief spin-wait on multiprocessors。

brief,这是一个四级词汇哈,得记住,要考的。就是“短暂”的意思,是一个不规则动词,其最高级是 briefest。

对了,spin 这个单词大家应该认识吧,前面忘记给大家教单词了,就一起讲了,看小黑板:

所以注释上说的就是:如果是多处理器,则使用短暂的自旋等待一下。

从 256 减到 0 的过程,就是这个“brief spin-wait”。

但是仔细一想,在自旋等待的这个过程中,availableProcessors 方法只是在第一次进入循环的时候调用了一次。

那为什么说它耗费性能呢?

是的,确实是调用 get() 方法的只调用了一次,但是你架不住 get() 方法被调用的地方多啊。

就拿 Dubbo 举例,绝大部分情况下的大家的调用方式都用的是默认的同步调用的方案。所以每一次调用都会到异步转同步这里阻塞等待结果,也就说每次都会调用一次 get() 方法,即 availableProcessors 方法就会被调用一次。

那么解决方案是什么呢?

在前面我已经给大家看了,就是把 availableProcessors 方法的返回值找个字段给缓存起来:

但是后面跟了一个“problem”,这个“problem”就是说如果我们把多处理器这个值缓存起来了,假设程序运行的过程中出现了从多处理器到单处理器的运行环境变化这个值就不准确了,虽然这是一个不太可能的变化。但是即使这个“problem”真的发生了也没有关系,它只是会导致一个小小的性能损失。

所以就出现了前面大家看到的这样的代码,这就是 “we can cache this value in a field”:

而体现到具体的代码变更是这样的:

cr.openjdk.java.net/~shade/8227…

所以,当你去看这部分源码的时候,你会看到 SPINS 字段上其实还有很长一段话,是这样的:

给大家翻译一下:

1.在 waitingGet 方法中,进行阻塞操作前,进行旋转。

2.没有必要在单处理器上进行旋转。

3.调用 Runtime.availableProcessors 方法的成本是很高的,所以在此缓存该值。但是这个值是首次初始化时可用的 CPU 的数量。如果某系统在启动时只有一个 CPU 可以用,那么 SPINS 的值会被初始化为 0,即使后面再使更多的 CPU 在线,也不会发生变化。

当你有了前面的 BUG 的描述中的铺垫之后,你就明白了为什么这里写上了这么一大段话。

有的同学就真的去翻代码,也许你看到的是这样的:

什么情况?根本就看不到 SPINS 相关的代码啊,这不是欺骗老实人吗?

你别慌啊,猴急猴急的,我这不是还没说完嘛?

我们再把目光放到图片中的这句话上:

只需要在 JDK 8 中进行这个修复即可,因为 JDK 9 和更高版本的代码都不是这样的写的了。

比如在 JDK 9 中,直接拿掉了整个 SPINS 的逻辑,不要这个短暂的自旋等待了:

hg.openjdk.java.net/jdk9/jdk9/j…

虽然,拿掉了这个短暂的自旋等待,但是其实也算是学习了一个骚操作。

问:怎么在不引入时间的前提下,做出一个自旋等待的效果?

答案就是被拿掉的这段代码。

但是有一说一,我第一次看到这个代码的时候我就觉得别扭。这一个短短的自旋能延长多少时间呢?

加入这个自旋,是为了稍晚一点执行后续逻辑中的 park 代码,这个稍重一点的操作。但是我觉得这个 “brief spin-wait” 的收益其实是微乎其微的。

所以我也理解为什么后续直接把这一整坨代码拿掉了。而拿掉这一坨代码的时候,其实作者并没有意识到这里有 BUG。

而这里提到的作者,其实就是 Doug Lea 老爷子。

我为什么这样说呢?

依据就在这个 BUG 链接里面提到的编号为 8227018 的 BUG 中,它们其实描述的是同一个事情:

这里面有这样一段对话,出现了 David Holmes 和 Doug Lea:

Holmes 在这里面提到了 “cache this value in a field” 的解决方案,并得到了 Doug 的同意。

Doug 说: JDK 9 已经不用 spin 了。

所以,我个人理解是 Doug 在不知道这个地方有 BUG 的情况下,拿掉了 SPIN 的逻辑。至于是出于什么考虑,我猜测是收益确实不大,且代码具有一定的迷惑性。还不如拿掉之后,理解起来直观一点。

Doug Lea 大家都耳熟能详, David Holmes 是谁呢?

《Java 并发编程实战》的作者之一,端茶就完事了。

而你要是对我以前的文章印象足够深刻,那么你会发现早在《Doug Lea在J.U.C包里面写的BUG又被网友发现了。》这篇文章里面,他就已经出现过了:

老朋友又出现了,建议铁汁们把梦幻联动打在公屏上。

到底啥原因?

前面噼里啪啦的说了这么大一段,核心思想其实就是 Runtime.availableProcessors 方法的调用成本高,所以在 CompletableFuture.waitingGet 方法中不应该频繁调用这个方法。

但是 availableProcessors 为什么调用成本就高了,依据是啥,得拿出来看看啊!

这一小节,就给大家看看依据是什么。

依据就在这个 BUG 描述中:

bugs.openjdk.java.net/browse/JDK-…

标题上说:在 linux 环境下,Runtime.availableProcessors 执行时间增加了 100 倍。

增加了 100 倍,肯定是有两个不同的版本的对比,那么是哪两个版本呢?

在 1.8b191 之前的 JDK 版本上,下面的示例程序可以实现每秒 400 多万次对 Runtime.availableProcessors 的调用。

但在 JDK build 1.8b191 和所有后来的主要和次要版本(包括11)上,它能实现的最大调用量是每秒4万次左右,性能下降了100倍。

这就导致了 CompletableFuture.waitingGet 的性能问题,它在一个循环中调用了 Runtime.availableProcessors。因为我们的应用程序在异步代码中表现出明显的性能问题,waitingGet 就是我们最初发现问题的地方。

测试代码是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
ini复制代码  public static void main(String[] args) throws Exception {
        AtomicBoolean stop = new AtomicBoolean();
        AtomicInteger count = new AtomicInteger();

        new Thread(() -> {
            while (!stop.get()) {
                Runtime.getRuntime().availableProcessors();
                count.incrementAndGet();
            }
        }).start();

        try {
            int lastCount = 0;
            while (true) {
                Thread.sleep(1000);
                int thisCount = count.get();
                System.out.printf("%s calls/sec%n", thisCount - lastCount);
                lastCount = thisCount;
            }
        }
        finally {
            stop.set(true);
        }
    }

按照 BUG 提交者的描述,如果你在 64 位的 Linux 上,分别用 JDK 1.8b182 和 1.8b191 版本去跑,你会发现有近 100 倍的差异。

至于为什么有 100 倍的性能差异,一位叫做 Fairoz Matte 的老哥说他调试了一下,定位到问题出现在调用 “OSContainer::is_containerized()” 方法的时候:

而且他也定位到了问题出现的最开始的版本号是 8u191 b02,在这个版本之后的代码都会有这样的问题。

带来问题的那次版本升级干的事是改进 docker 容器检测和资源配置的使用。

所以,如果你的 JDK 8 是 8u191 b02 之前的版本,且系统调用并发非常高,那么恭喜你,有机会踩到这个坑。

然后,下面几位大佬基于这个问题给出了很多解决方案,并针对各种解决方案进行讨论。

有的解决方案,听起来就感觉很麻烦,需要编写很多的代码。

最终,大道至简,还是选择了实现起来比较简单的 cache 方案,虽然这个方案也有一点瑕疵,但是出现的概率非常低且是可以接受的。

再看get方法

现在我们知道了这个没有卵用的知识点之后,我们再看看为什么调用带超时时间的 get() 方法,没有这个问题。

java.util.concurrent.CompletableFuture#get(long, java.util.concurrent.TimeUnit)

首先可以看到内部调用的方法都不一样了:

有超时时间的 get() 方法,内部调用的是 timedGet 方法,入参就是超时时间。

点进 timedGet 方法就知道为什么调用带超时时间的 get() 方法没有问题了:

在代码的注释里面已经把答案给你写好了:我们故意不在这里旋转(像waitingGet那样),因为上面对 nanoTime() 的调用很像一个旋转。

可以看到在该方法内部,根本就没有对 Runtime.availableProcessors 的调用,所以也就不存在对应的问题。

现在,我们回到最开始的地方:

那么你说,下面的 asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS) 如果我们改成 asyncResult.get() 效果还是一样的吗?

肯定是不一样的。

再说一次:Dubbo 作为开源的中间件,有可能会运行在各种不同的 JDK 版本中,且该方法是它主链路上的核心代码,对于特定的 JDK 版本来说,这个优化确实是对于性能的提升有很大的帮助。

所以写中间件还是有点意思哈。

最后,再送你一个为 Dubbo 提交源码的机会。

在其下面的这个类中:

org.apache.dubbo.rpc.AsyncRpcResult

还是存在这两个方法:

但是上面的 get() 方法只有测试类在调用了:

完全可以把它们全部改掉调用 get(long timeout, TimeUnit unit) 方法,然后把 get() 方法直接删除了。

我觉得肯定是能被 merge 的。

如果你想为开源项目做贡献,熟悉一下流程,那么这是一个不错的小机会。

本文已收录至个人博客,欢迎大家来玩

www.whywhy.vip/

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Linux 一键配置 Oracle 主机 /dev/shm

发表于 2021-10-09

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

首先可以看出来/dev/shm是一个设备文件, 可以把/dev/shm看作是系统内存的入口, 可以把它看做是一块物理存储设备,一个tmp filesystem, 可以通过这个设备向内存中读写文件, 以加快某些I/O高的操作,比如对一个大型文件频繁的open, write, read!

Oracle 就利用了/dev/shm(shitou没用过oracle), 可以通过mount命令列出当前的/dev/shm的挂载的文件系统,
你可以直接对/dev/shm进行读写操作, 例如:

1
bash复制代码#touch /dev/shm/file1

既然是基于内存的文件系统,系统重启后/dev/shm下的文件就不存在了。Linux默认(CentOS)/dev/shm分区的大小是系统物理内存的50%, 虽说使用/dev/shm对文件操作的效率会高很多。但是目前各发行软件中却很少有使用它的(除了前面提到的Oracle), 可以通过ls /dev/shm查看下面是否有文件, 如果没有就说明当前系统并没有使用该设备。

因此需要在 /etc/fstab 文件中加上配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bash复制代码DAYTIME=$(date +%Y%m%d)
memTotal=$(grep MemTotal /proc/meminfo | awk '{print $2}')
shmTotal=$(df -k /dev/shm | awk '{print $2}' | head -n 2 | tail -n 1)
if [ "$(grep -E -c "/dev/shm" /etc/fstab)" -eq 0 ]; then
[ ! -f /etc/fstab."${DAYTIME}" ] && cp /etc/fstab /etc/fstab."${DAYTIME}"
cat <<EOF >>/etc/fstab
tmpfs /dev/shm tmpfs size=${memTotal}k 0 0
EOF
mount -o remount /dev/shm
else
if [ "$shmTotal" -lt "$memTotal" ]; then
shmTotal=$memTotal
[ ! -f /etc/fstab."${DAYTIME}" ] && cp /etc/fstab /etc/fstab."${DAYTIME}"
line=$(grep -n "/dev/shm" /etc/fstab | awk -F ":" '{print $1}')
sed -i "${line} d" /etc/fstab
cat <<EOF >>/etc/fstab
tmpfs /dev/shm tmpfs size=${memTotal}k 0 0
EOF
mount -o remount /dev/shm
fi

fi

本次分享到此结束啦~

如果觉得文章对你有帮助,点赞、收藏、关注、评论,一键四连支持,你的支持就是我创作最大的动力。

❤️ 技术交流可以 关注公众号:Lucifer三思而后行 ❤️

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…503504505…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%