开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

mysql进阶系列:存储引擎

发表于 2021-07-25

本文mysql实验版本 : 5.7.21

基础架构篇了解到执行器执行这个执行计划,通过调用存储引擎的API来操作数据。

mysql提供了一系列存储引擎的API,所有的存储引擎都要符合API要求,因此可以实现这种插件式的存储引擎,可以根据不同的需求选择合适的存储引擎(就像握推杠铃一样,可以按需选择不同大小的杠铃片,嗯对的)。

存储引擎是针对表的而不是库,对于同一个库不同的表可以使用不同的存储引擎。

常见的存储引擎有 MyISAM,InnoDB,Memory

查看当前数据支持的存储引擎:

mysql-2-engines.png

  1. 在新建表的时候可以选择存储引擎
1
2
3
sql复制代码CREATE TABLE  'user' (
 'id' bigint(20) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT ''
) ENGINE = MyISAM

ENGINE = MyISAM 代表这个表的存储引擎是MyISAM 。

  1. 查看表相关信息,例如mysql库中的user表
  • 使用show table status 查看表信息(不限版本)

需要先切换到对应的数据库下再执行此命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
sql复制代码mysql> use mysql;
Database changed
mysql> show table status like 'user' \G ;
*************************** 1. row ***************************
          Name: user
        Engine: MyISAM
      Version: 10
    Row_format: Dynamic
          Rows: 3
Avg_row_length: 128
  Data_length: 384
Max_data_length: 281474976710655
  Index_length: 4096
    Data_free: 0
Auto_increment: NULL
  Create_time: 2018-06-11 09:51:16
  Update_time: 2018-06-11 09:53:08
    Check_time: NULL
    Collation: utf8_bin
      Checksum: NULL
Create_options: 
      Comment: Users and global privileges
1 row in set (0.00 sec)
  • 还可以使用information_schema查看表信息(mysql5.0以后的版本支持)

会查出实例中所有库中的表信息,但是可以指定TABLE_SCHEMA查询指定库的表

1
csharp复制代码mysql> select * from information_schema.tables where table_name = 'user' and TABLE_SCHEMA='mysql' \G;

简单介绍小输出字段的含义:

Name: 表名。

Engine: 存储引擎。

Version:版本,默认10。

Row_format: 行的格式。

Rows: 表中的行数,对应MyISAM和其他一些存储引擎,该值是精确的; 而InnoDB该值是估计的。

Avg_row_length: 平均每行包含的字节数。

Data_length: 表数据的大小(字节)。

Max_data_length: 表数据的最大容量(和存储引擎有关)。

Index_length: 索引的大小(字节)。

Data_free: 对于MyISAM表,表示已经分配但是没有使用的空间。

Auto_increment: 下一个auto_increment值。

Create_time: 表的创建时间。

Update_time: 表数据最后修改时间。

Check_time: 使用check table命令或者myisamchk工具最后一次检查表的时间。

Collation: 表的默认字符集和字符列排序规则。

Checksum: 如果启用保存的是整个表的实时校验和。

Create_options: 创建表是指定的其他选项。

Comment: 包含其他额外信息

1. InnoDB

InnoDB是mysql5.5.x开始默认的事务型引擎,也是使用最广泛的存储引擎。被设计用来处理大量短期事务的。

InnoDB所有的表都保存在同一个数据文件中(也可能是多个文件,或者是独立的表空间文件),表的大小只受限于操作系统文件的大小。表的结构定义存在 .frm后缀文件中,数据和索引集中存放在 .idb后缀文件中。因为表数据和索引是在同一个文件,InnoDB的索引是聚簇索引。

InnoDB采用MVCC支持高并发,并且实现了四种标准的隔离级别(读未提交,读已提交,可重复读,可串行化),其默认级别是REPEATABLE-READ(可重复读),并且通过间隙锁(next-key locking)策略防止幻读的出现。间隙锁不仅仅锁定查询涉及的行,还会对索引中的间隙行进行锁定,以防止幻影行的插入。

InnoDB表是基于聚簇索引建立的,聚簇索引对主键的查询有很高的性能。但是InnoDB的非主键索引中必须包含主键列,所以如果主键列很大的话,非主键索引也会很大。如果一张表的索引较多,主键应该尽可能的小。关于索引,后面会详细讲解。

InnoDB的内部优化,包括磁盘预读(从磁盘读取数据时采用可预测性读取),自适应哈希(自动在内存中创建hash索引以加速读操作)以及能够加速插入操作的插入缓冲区。

2. MyISAM

在mysql5.1及之前的版本,MyISAM是默认的存储引擎。提供了大量的特性,包括全文索引,压缩,空间函数等,但是不支持事务和行级锁,而且有一个严重的问题是奔溃后无法安全恢复。

MyISAM的数据表存储在磁盘上是3个文件,表结构定义存在 .frm后缀文件中,表数据存储在 .MYD后缀文件中,表索引存储在 .MYI后缀文件中。表数据和表索引在不同的文件中,所以MyISAM索引是非聚簇索引。而且MyISAM可以存储表数据的总行数。

MyISAM表支持数据压缩,对于表创建后并导入数据以后,不需要修改操作,可以采用MyISAM压缩表。压缩命令:myisampack,压缩表可以极大的减少磁盘空间占用,因此也可以减少磁盘I/O,提高查询性能。而且压缩表中的数据是单行压缩,所以单行读取是不需要解压整个表。

3. Memory

Memory存储引擎的数据是存放在内存中的,所以如果服务器重启会导致数据丢失,但是表结构还是存在的表结构是以 .frm 后缀的文件中。

Memory默认hash索引,因此查询非常快。Memory表是表级锁,因此并发写入的性能较低。不支持BLOB或TEXT类型的列,并且每行的长度都是固定的,所以即使指定了varchar列实际存储也会转换成char,会导致内存浪费。

如果mysql查询过程中需要使用临时表来保存中间结果,内部使用的临时表就是Memory表,如果中间结果太大超出Memory表的限制或者含有BLOB或TEXT字段,那么临时表会转换成MyISAM表。

上面介绍了三种,你如何选择存储引擎呢:

  • 事务 :目前只有Innodb能完美的支持事务。
  • 备份 :只有Innodb有免费的在线热备方案,mysqldump不算在线热备的方案,它需要对数据加锁。
  • 崩溃恢复:myisam表由于系统崩溃导致数据损坏的概率比Innodb高跟很多,而且恢复速度也没有innodb快。
  • 特有的特性:如需要聚簇索引,那就需要选择innodb存储引擎,有的需要使用地理空间搜索,那就选择myisam 。

mysql的存储引擎有很多,这里主要介绍了以上3中,其中InnoDB是现在使用最广泛也是默认的存储引擎,如果没有特殊需求使用默认的即可,也就是InnoDB。

欢迎关注公众号
纪先生笔记.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

精读 RocketMQ 源码系列(3)--- Broker

发表于 2021-07-25

一、前言

开始之前,我们先看一张来自 RocketMQ 官方文档中的消息数据流图:

image-20210714230821260.png

可以简单总结为:

  1. Producer 通过 netty 客户端将消息发送给 Broker
  2. Broker 通过 netty 服务端接收到消息,解析后存储在 CommitLog 中
  3. 消息存储到 CommitLog 后会被分发到 ConsumerQueue 和 IndexFile 文件
  4. 消费者拉取 ConsumerQueue 的消息完成消费

注意:MessageQueue 和 ConsumerQueue 逻辑上是一一对应的,因为两者使用同一个 queueId。

上一篇 精读 RocketMQ 源码系列(2)— Producer 中我们讲了消息是怎么发送的,也就是第1步。本篇主要围绕 Broker 在接收到消息之后是如何进行存储的这个问题展开,也就是第2、3步展开。

思考几个问题:

  1. 从接收到存储的完整处理流程是怎样的?
  2. 为什么消息存储到了 CommitLog 之后还需要分发到 ConsumerQueue 和 IndexFile 文件中
  3. CommitLog、ConsumerQueue、IndexFile 分别存储了什么消息的什么内容,作用是什么?

二、消息接收和存储流程

看了很多资料发现都是直接从 Broker 的存储的时候就开始讲了,没有提消息是如何接收的。为了保持逻辑的连贯性,我还是从消息接收说起。

2.1 消息接收

我们知道 RocketMQ 底层是使用 netty 进行通信的,在精读 RocketMQ 源码系列(1)— NameServer 的 3.1.1 中讲 Broker 启动流程的时候,提到过 Broker 启动时还会启动 netty 客户端,源码位置:BrokerController#start

1
2
3
kotlin复制代码if (this.remotingServer != null) {
this.remotingServer.start();
}

跟进这个 remotingServer#start方法,实际是 NettyRemotingServer#start,主要关注下面这段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
scss复制代码        // ServerBootstrap:netty服务启动的辅助类
       ServerBootstrap childHandler =
           this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
              .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
              .option(ChannelOption.SO_BACKLOG, 1024)
              .option(ChannelOption.SO_REUSEADDR, true)
              .option(ChannelOption.SO_KEEPALIVE, false)
              .childOption(ChannelOption.TCP_NODELAY, true)
              .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
              .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
              .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
              .childHandler(new ChannelInitializer<SocketChannel>() { // Handler:理解为业务逻辑处理器
                   @Override
                   public void initChannel(SocketChannel ch) throws Exception {
                       ch.pipeline() // pipeline:一组 Handler 的链条
                          .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                          .addLast(defaultEventExecutorGroup,
                               encoder,
                               new NettyDecoder(),
                               new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                               connectionManageHandler,
                               serverHandler  // ☆☆ 处理发送过来的消息的核心处理逻辑
                          );
                  }
              });

上面这段代码看起来很多,但其实没什么东西。这就是使用 netty 进行网络编程时经常写的一段模板代码,可以对照注释看下(我也是刚接触 netty,不是太懂,计划后期也写一个 netty 的系列)。现在可以这么简单理解这段代码:

netty 服务端启动时,会进行很多配置,同时会绑定一些 Handler,这些 Handler 就是服务端在接收到客户端请求后需要执行的业务逻辑。这里面,我们要找的关键逻辑在 serverHandler 里。

再进到 serverHandler 中,发现它是一个内部类:

1
2
3
4
5
6
7
8
scala复制代码    @ChannelHandler.Sharable
   class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
​
       @Override
       protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
           processMessageReceived(ctx, msg);
      }
  }

继续下去,依次会进到:

  1. NettyRemotingAbstract#processRequestCommand
  2. SendMessageProcessor#asyncProcessRequest,不论是AsyncNettyRequestProcessor做处理还是NettyRequestProcessor都会进到这个方法中
  3. SendMessageProcessor#asyncSendMessage:这个方法会对消息 requestHeader做解析,获取消息的 topic、queueId等信息,并将消息封装到Broker端的消息内置类型 MessageExtBrokerInner 中
  4. DefaultMessageStore#asyncPutMessage:最后来到了消息存储核心类 DefaultMessageStore 这里

以上流程总结为流程图如下:

msg_receive.png

好了,消息接收的部分就结束了,下一节我们开始关注本篇的重头戏:消息存储!

2.2 消息存储

消息存储我这里选的入口是 DefaultMessageStore#putMessage,DefaultMessageStore#asyncPutMessage 与之类似

接收到消息之后的消息存储流程如下:

msg_store.png

上图看着流程很多,但其实并不复杂(像这种时序图都是一边看源码一遍记录的,所以一些细枝末节可能也被记录下来)。对于也想直接看源码的读者是很好的对照图。

这里我以消息为主体,以消息所在位置为观察重心将以上流程总结为以下几步:

  1. SendMessageProcessor 接收到的消息是封装在 RemotingCommand类中的,然后SendMessageProcessor会将 header 的内容解析到SendMessageRequestHeader中:
1
ini复制代码 SendMessageRequestHeader requestHeader = parseRequestHeader(request);
  1. SendMessageProcessor将消息的 header 和 body 封装到 MessageExtBrokerInner
  2. 按顺序将消息内容通过 ByteBuffer逐一写入 MappedFile
  3. MappedFile 刷入磁盘,进行持久化

我们知道,消息在 Broker 的文件存储形式是 CommitLog,这里我们刷入磁盘的是 MappedFile。这两者之间的关系是一一对应的:

MappedFile.png

现在我们知道了,MappedFile 刷入磁盘之后,即是将消息写到了 CommitLog 文件中。

但它是怎么刷盘的呢?我们回到CommitLog#asyncPutMessage 的方法,往下看,看到CommitLog#submitFlushRequest

在 putMessage 方法中会调用 handleDiskFlush 方法,在 asyncPutMessage中会调用submitFlushRequest方法,这两个方法差别不大。我在上面的流程图中写的是 handleDiskFlush。这里我们分析 submitFlushRequest 即可

这个方法的主逻辑非常重要,所以我这里把它整体贴出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
scss复制代码    public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
       // Synchronization flush   同步刷盘
       if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
           final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
           if (messageExt.isWaitStoreMsgOK()) { // 是否需要等待消息存储完成才返回给生产者发送成功的响应
               GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                       this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
               service.putRequest(request);
               return request.future();
          } else {
               service.wakeup(); // 不需要则唤醒服务并立即返回给生产者发送成功的响应
               return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
          }
      }
       // Asynchronous flush   异步刷盘
       else {
           // 判断是否启动堆外内存 transientStorePoolEnable 默认是 false
           if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
               // 不开启时,采用 MappedByteBuffer 刷盘
               flushCommitLogService.wakeup();
          } else {
               // 开启时,通过 FileChannel 刷盘
               commitLogService.wakeup();
          }
           return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
      }
  }

关键逻辑见注释。这里我们小结一下:

  1. 有同步刷盘和异步刷盘两种方式,同步刷盘还是异步刷盘是由 MessageStoreConfig 中的参数 flushDiskType 决定的,默认是异步
  2. 对于同步刷盘,有两种情况:
0. 等待消息存储完成再返回生产者发送结果
1. 直接返回生产者发送结果
  1. 对于异步刷盘,有两种方式:
0. `transientStorePoolEnable = true` 时,表示开启堆外内存,消息会先放在 `writeBuffer`,然后通过 `FileChannle` 映射到虚拟内存,最后再 `flush` 到磁盘
1. `transientStorePoolEnable=false`(默认),消息追加时,直接存入 MappedByteBuffer(pageCache) 中,然后定时 flush

完成消息到 CommitLog 刷盘之后,消息就算是已经完成持久化了。

贴一张 CommitLog 中一条消息的组成结构图:

image-20210721005237968.png

但为了消费者更好地消费消息,还需要将 CommitLog 分发到 ConsumeQueue 中;为了实现根据某些关键字查询的功能那个,又需要将 CommitLog 分发到 IndexFile 中。

2.3 indexFile 和 ConsumeQueue 是如何更新的

启动 broker 的时候会启动 DefaultMessageStore,在其start方法中又启动了存储相关的服务,其中就包括了将 CommitLog 分发到 indexFile 和 comsumeQueue 的服务

1
2
3
4
kotlin复制代码//设置CommitLog内存中最大偏移量
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
//启动
this.reputMessageService.start();

看看这个方法的执行时序图:

reput_msg.png

接着会分别调用不同的实现类,去分别构建 ConsumeQueue 和 Index

构建 ConsumeQueue

dispatch_consume.png

构建 IndexFile

dispatch_index.png

最后我们来分别看一下这两个文件的结构。

ConsumeQueue

ConsumeQueue 在逻辑上与生产者发送时 MessageQueue 是一一对应的。如果有4个消息写队列,那么 CommitLog 也会被分发到4个相应的 ConsumeQueue(同一个consumeQueue有多个文件,因为单独一个文件的限制是30万*20B)。

ConsumeQueue 由30万个固定大小为20byte的数据块组成,数据块的内容如下:

ConsumeQueue.png

msgPhyOffset: 消息在 CommitLog 文件中的起始位置

msgSize: 消息在文件中占的长度

msgTagCode: 消息 tag,用于标识业务相关

如何构建:

  1. broker启动时会启动ReputMessageService任务,1ms执行1次
  2. ReputMessageService记录分发的reputFromOffset,每次将对应的消息追加到 consumeQueue文件中,完成一条消息的分发
  3. 设置reputFromOffset = reputFromOffset + 读取到的消息.size,等待下次任务继续构建下一条消息

如何查询:

消费者消费时,只要知道自己要消费的是第几条消息(称之为消费位点),就可以通过消费位点对30万取余的方式,定位到指定的 consumeQueue文件的指定数据块。如果超过了30万,那就是下一个文件的数据块。例如,我需要读取第31万的消息,我可以计算出这条消息的索引内容在第2个consumequeue的第1万个数据块。然后读取这个数据块的内容,再去commitLog获取真正的消息。

IndexFile

indexFile 也是一个索引文件,不过它的定位是提供根据msgId或生产者指定的消息key作为索引key。整个indexFile的文件物理存储结构如下:

index.png

  1. Header:固定大小 40B
0. beginTimestamp: 该indexFile对应的第一条消息的存储时间
1. endTimestamp: 该indexFile对应的最后一条消息的存储时间
2. beginPhyOffset: 该indexFile对应的第一条消息在CommitLog中的偏移量
3. endPhyOffset: 该indexFile对应的最后一条消息在CommitLog中的偏移量
4. hashSlotCount: 已填充值的slot数量
5. indexCount: 该indexFile包含的索引个数
  1. Slot: 500万个,每个4B,slot 中存储一个int值,保存当前slot下最新的index的序号(链表头),可以算出来该Index的位置
  2. Index:结构如图灰色部分,共2000万个,每个20B
0. Key Hash: 索引 key 的hash值
1. CommitLog Offset: 索引对应的消息在 CommitLog 的偏移量
2. Timestamp:记录的是该条消息与当前索引文件第一条消息的存储时间的时间差,并不是绝对时间
3. next index offset: 当前slot下,当前index的前一个index的slotValue(可以简单理解是一个指向前一个Index的指针),这也就是为什么 slot 总是存最新的index,因为最新的index是链表头,持有前一个index的序号。

其实讲到这里,应该很容易想到,逻辑上,indexFile 的构造很像 Java 中的 HashMap

如何构建:

  1. 获取到消息的msgId,进行hash值计算
  2. 对500万取余获得对应的slot号n
  3. 根据40+(n-1)*4算出该slot文件的位置,并读取slotValue
  4. 追加写入一条index数据,next index offset写第3步中获取到的slotValue,即相同slot下前一个Index的序号
  5. 更新当前slot值为新插入的index的序号
  6. 更新Header中的endTimestamp、endPhyOffset、indexCount、hashSlotCount(可能不更新)

如何查询:

查询需要传入的参数有:key、beginTimestamp、endTimestamp

为什么要传时间呢?

因为 indexFile 文件有多个,而key有可能在不同的indexFile中重复,所以要先根据时间范围确定唯一的indexFile。而indexFile的文件命名就是一个起始时间戳,同时Header中有截止时间戳,根据这些信息就可以确定indexFile。

确定了indexFile之后是怎么查询的?

  1. 根据key计算hash值,hash值对500万取余得出slot的序号n
  2. 根据公式:40+(n-1)*4 即可获得slot在文件中的位置
  3. 读取slot中的值,也即最新的index在文件中的序号s
  4. 根据40+500万*4+(s-1)*20可以得到最新的index在文件中的位置
  5. 读取该Index,将该index的hash值、timestamp和传入参数作比对
  6. 不符合则找到下一个index,找到后得到了偏移量,就可以去commitLog中拿到具体的消息

为什么比对的时候也要比对时间范围?

因为key可能会重复,producer在消息生产时可以指定消息的key,这个key显然无法保证唯一性。而自动生成的msgId也不能保证唯一。

msgId生成规则: 前机器IP+进程号+MessageClientIDSetter.class.getClassLoader()的hashCode值+消息生产时间与broker启动时间的差值+broker启动后从0开始单调自增的int值,前面三项很明显可能重复,后面两项一个是时间差,一个是重启归零,也可能重复

三、小结

到这里我们小结下,本篇文章讲了哪些东西:

  1. 消息接收:通过 nettey server 接收请求,判断是消息发送请求,则会调用相应的消息接收方法
  2. 消息存储:消息先会存储 CommitLog 中,然后会通过同步/异步的方式刷盘到 ConsumerQueue 和 IndexFile 中

接着回答下文章开头的第二个问题:为什么消息还要被分发到 ConsumeQueue 和 IndexFile?

  1. ConsumeQueue 可以认为是逻辑分区,类似于 Kafka 中的 partition,通过将 CommitLog 分为多个 ConsumeQueue,使得同一个 Topic 的消息可以被多个消费者同时消费,增加吞吐量。同时也能实现单个 ConsumeQueue 的消息的顺序性,满足一些业务场景。
  2. 分发到 IndexFile,是因为在客户端(生产者和消费者)和admin接口提供了根据 key 查询消息的实现。为了方便用户查询具体某条消息。

四、参考文档

blog.csdn.net/wb_snail/ar…

blog.csdn.net/wb_snail/ar…

blog.csdn.net/meilong_whp…

github.com/DillonDong/…

最后

  • 如果觉得有收获,三连支持下;
  • 文章若有错误,欢迎评论留言指出,也欢迎转载,转载请注明出处;
  • 个人vx:Listener27, 交流技术、面试、学习资料、帮助一线互联网大厂内推等

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Openstack架构构建及详解

发表于 2021-07-25

一、云计算

1、定义

首先对云计算这三个字的理解,云,是网络、互联网的一种比喻说法,即互联网与建立互联网所需要的底层基础设施的抽象体。
“计算”当然不是指一般的数值计算,指的是-台足够强大的计算机提供的计算服务( 包括各种功能,资源,存储)。
“云计算”可以理解为:网络上足够强大的计算机为你提供的服务,只是这种服务是按你的使用量进行付费的。

2、云计算的分类

分类1
云的类型
私有云 公有云 混合云
分类2
提供的供给方式
Iaas(Infrastructure as a Service)基础设施即服务 腾讯云 阿里云 aws
Paas(Platform as a Servervice)平台即服务 新浪云
Saas(Software as a Service)软件即服务 微软 office365 B/S

3、虚拟化项目-Openstack

OpenStack 是一个美国国家航天局和RackSpace 合作研发的,以Apache 许可证授权,并且是一个自由软件
OpensStack 是一个云平台管理的项目,它不是一个软件。这个项目由几个主要的组件组合起来完成一些工作
OpenStack 通过一个通过web 界面提供资源管理,通过一个仪表盘管理整个数据中心的计算存储资源等

二、云计算框架

Openstack是用来构建私有云和公共云的开源架构
Openstack由多个组件组成
组件说明:
Nova 计算服务: 负责创建,调度,销毁云主机
Glance 镜像服务:提供镜像服务,装机使用
Swift 对象存储:目录结构存储数据
Cinder 块存储:提供持久化块存储,即为云主机提供附加云盘
Neurton 网络服务: 负责实现SDN
Horizon 仪表盘: 就是web展示界面操作平台,方便用户交互的
Keystone 认证服务:为访问openstack各组件提供认证和授权功能,认证通过后,提供一个服务列表(存放你有权访问的服务),可以通过该列表访问各个组件
Heat 编排:自动化部署应用
Ceilometer 监控:监控性能,计费
Trove 数据库服务
Sahare 数据处理

三、openstack云管理平台安装

1、基础配置

1:实验架构
在这里插入图片描述
2:准备资源:
链接:pan.baidu.com/s/1HF8WH85M…
提取码:q5mp
镜像和yum资源都在云盘自取
下载到本地,通过serv-u建立局域网共享yum源

1
2
3
4
5
6
javascript复制代码版本:
[root@controller ~]# cat /etc/redhat-release
CentOS Linux release 7.0.1406 (Core)

镜像:
CentOS-7.0-1406-x86_64-Everything.iso

1、关闭防火墙、Selinux、网卡守护进程

1
javascript复制代码systemctl stop firewalld && systemctl disable firewalld && setenforce 0 && sed -i 's/=enforcing/=disabled/g' /etc/selinux/config && systemctl stop NetworkManager && systemctl disable NetworkManager

2、设置主机名

1
javascript复制代码hostnamectl set-hostname xx.xx.xx

3、(1)配置局域网YUM源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
javascript复制代码cd /etc/yum.repos.d/ && mkdir back && mv * back 
vi ftp.repo
[base]
name=base
baseurl=ftp://a:a@192.168.222.240/7/os/x86_64/
enabled=1
gpgcheck=0

[updates]
name=updates
baseurl=ftp://a:a@192.168.222.240/7/updates/x86_64/
enabled=1
gpgcheck=0

[extras]
name=extras
baseurl=ftp://a:a@192.168.222.240/7/extras/x86_64/
enabled=1
gpgcheck=0

[epel]
name=epel
baseurl=ftp://a:a@192.168.222.240/7/epel
enabled=1
gpgcheck=0

[rdo]
name=rdo
baseurl=ftp://a:a@192.168.222.240/7/rdo
enabled=1
gpgcheck=0

yum clean all && yum makecache && yum -y install yum-plugin-priorities && yum upgrade

2、安装NTP服务进行配置
[root@controller ~]# yum install -y ntp

[root@controller ~]# vim /etc/ntp.conf
# Hosts on local network are less restricted.
restrict 192.168.222.0 mask 255.255.255.0 nomodify notrap

# Use public servers from the pool.ntp.org project.
# Please consider joining the pool (http://www.pool.ntp.org/join.html).
#server 0.centos.pool.ntp.org iburst
#server 1.centos.pool.ntp.org iburst
#server 2.centos.pool.ntp.org iburst
#server 3.centos.pool.ntp.org iburst
server 127.127.1.0
fudge 127.127.1.0 stratum 10

[root@controller ~]# systemctl restart ntpd
[root@controller ~]# systemctl enable ntpd
ln -s '/usr/lib/systemd/system/ntpd.service' '/etc/systemd/system/multi-user.target.wants/ntpd.service'

3、配置主机解析文件
[root@controller ~]# vim /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.222.5 controller.nice.com
192.168.222.6 network.nice.com
192.168.222.10 compute1.nice.com
192.168.222.20 block1.nice.com

(2)公网安装openstack yum源(速度较慢)

1
javascript复制代码yum -y install yum-plugin-priorities && yum -y install http://dl.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-2.noarch.rpm && yum -y install http://rdo.fedorapeople.org/openstack-juno/rdo-release-juno.rpm

2、Keystone详解与安装

keystone文章

3、Glance详解与安装

Glance文章

4、Nova详解与安装

Nova文章

5、Neutron详解与安装

Neutron文章

6、Dashboard详解与安装

Dashboard文章

7、Cinder详解与安装

Cinder文章

搭建成功如下,慢慢来做把
在这里插入图片描述

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

使用gin+vue前后端分离的电商平台 go语言实战

发表于 2021-07-25

在这里插入图片描述

🎈1. 需求分析

1.1 数据获取

使⽤爬⾍爬取某⼀电商平台上部分商品信息,包括但不限于商品图⽚、价格、名称等。

1.2 ⽤户操作

顾客

  • 注册,登录,登出
  • ⽤户个⼈资料展示与编辑,上传头像
  • 更改密码

商家

  • 拥有顾客的⼀切功能
  • 可以进⾏零⻝信息的上传(包括图⽚、价格等信息)

管理员

  • 拥有上述⽤户的所有功能
  • ⽤户管理
  • 商品信息管理

1.3 其他功能

  • 添加虚拟货币功能。
  • 订单有过期时间
  • 为管理员添加⼀个充值接⼝,管理员可以为某⼀⽤户加钱。
  • 添加购物⻋和背包功能。 购买操作在购物⻋界⾯完成,完成购买后完成物品转移,以及货币转移(购买后物品⾃动下架)
  • 背包中的物品可以由⽤户上传,但默认不在购物⻚⾯中出现,需持有者进⾏上架才能被其他⽤户购买。

1.4 拓展功能

  • ⽀付密码
  • 商品下评论
  • 注意并发性

1.5 开发环境

后端:Python v3.8 、Golang v1.15

数据库:MySql v5.7.30、Redis v4.0.9

文件存储 :七牛云存储

支付接口:支付FM


🎉2. 后端逻辑代码

2.1 Python - 爬虫

  • 数据库表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
python复制代码class product_img_info(Base):
__tablename__ = 'product_param_img' # 数据库中的表名
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(), nullable=False)
title = Column(String())
category_id = Column(String())
product_id = Column(String())
info = Column(String())
img_path = Column(String())
price = Column(String())
discount_price = Column(String())
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now)
deleted_at = Column(DateTime, default = None)

def __repr__(self):
return """
<product_img_info(id:%s, product_id:%s>
""" % (self.id,self.product_id)
  • 爬取操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
python复制代码def getHTMLText(url):
try:
header = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36',
'cookie': '' # 到浏览器复制cookie
}
r = requests.get(url, timeout=30, headers=header)
r.raise_for_status()
r.encoding = r.apparent_encoding
# print(r.text)
return r.text
except:
return ""

def parsePage(html):
view_prices = re.findall(r'\"view_price\"\:\"[\d\.]*\"', html)
view_fees = re.findall(r'\"view_fee\"\:\"[\d\.]*\"', html)
raw_titles = re.findall(r'\"raw_title\"\:\".*?\"', html)
titles = re.findall(r'\"title\"\:\".*?\"', html)
user_ids = re.findall(r'\"user_id\"\:\"[\d\.]*\"',html)
pic_urls = re.findall(r'\"pic_url\"\:\".*?\"',html)
detail_urls = re.findall(r'\"detail_url\"\:\".*?\"',html)
for view_price,view_fee,title,raw_title,user_id,pic_url,detail_url in zip(view_prices,view_fees,titles,raw_titles,user_ids,pic_urls,detail_urls):
price=eval(view_price.split(':')[1])
discount_price=eval(view_fee.split(':')[1])
name=eval(title.split(':')[1])
a4=eval(raw_title.split(':')[1])
product_id=eval(user_id.split(':')[1])
img_path=eval(pic_url.split(':')[1])
persopn = product_img_info(name=name,title=name,product_id=product_id,category_id="6",info=name,img_path=img_path,price=price,discount_price=discount_price)
session.add(persopn) # 增加一个
session.commit() # 提交到数据库中

# 1手机 2女装 3电脑 4杯子 5零食 6耳机

def main():
goods = '耳机'
depth = 1
start_url = 'https://s.taobao.com/search?q=' + goods
for i in range(depth):
try:
url = start_url + '&s=' + str(44 * i)
html = getHTMLText(url)
parsePage(html)
except:
continue

if __name__ == '__main__':
# Base.metadata.create_all() # 创建表需要执行这行代码,如果表存在则不创建
#sqlOperation()
main()

在这里插入图片描述

2.2 Golang - Gin

2.2.1 数据库部分

部分数据库建设

  • 用户模型
1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码//User 用户模型
type User struct {
gorm.Model
UserName string `gorm:"unique"`
Email string //`gorm:"unique"`
PasswordDigest string
Nickname string `gorm:"unique"`
Status string
Limit int // 0 非管理员 1 管理员
Type int // 0表示用户 1表示商家
Avatar string `gorm:"size:1000"`
Monery int
}
  • 商品模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
go复制代码//商品模型
type Product struct {
gorm.Model
ProductID string `gorm:"primary_key"`
Name string
CategoryID int
Title string
Info string `gorm:"size:1000"`
ImgPath string
Price string
DiscountPrice string
OnSale string
Num int
BossID int
BossName string
BossAvatar string
}

2.2.1 服务部分

部分逻辑代码

  • 增加商品
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
go复制代码func (service *UpProductService) UpProduct() serializer.Response {
var product model.Product
code := e.SUCCESS
err := model.DB.First(&product,service.ID).Error
if err != nil {
logging.Info(err)
code = e.ErrorDatabase
return serializer.Response{
Status: code,
Msg: e.GetMsg(code),
Error: err.Error(),
}
}
product.OnSale = service.OnSale
err = model.DB.Save(&product).Error
if err != nil {
logging.Info(err)
code = e.ErrorDatabase
return serializer.Response{
Status: code,
Msg: e.GetMsg(code),
Error: err.Error(),
}
}
return serializer.Response{
Status: code,
Data: serializer.BuildProduct(product),
Msg: e.GetMsg(code),
}
}
  • 修改商品
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
go复制代码//更新商品
func (service *UpdateProductService) Update() serializer.Response {
product := model.Product{
Name: service.Name,
CategoryID: service.CategoryID,
Title: service.Title,
Info: service.Info,
ImgPath: service.ImgPath,
Price: service.Price,
DiscountPrice: service.DiscountPrice,
OnSale: service.OnSale,
}
product.ID = service.ID
code := e.SUCCESS
err := model.DB.Save(&product).Error
if err != nil {
logging.Info(err)
code = e.ErrorDatabase
return serializer.Response{
Status: code,
Msg: e.GetMsg(code),
Error: err.Error(),
}
}
return serializer.Response{
Status: code,
Msg: e.GetMsg(code),
}
}

✨3. 前端核心代码

3.1 AXIOS前后端交互

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
js复制代码import axios from 'axios'

// 创建商品
const postProduct = form =>
axios.post('/api/v1/products', form).then(res => res.data)

// 读商品详情
const showProduct = id =>
axios.get(`/api/v1/products/${id}`).then(res => res.data)

// 读取商品列表
const listProducts = (category_id, start, limit) =>
axios
.get('/api/v1/products', { params: { category_id, start, limit } })
.then(res => res.data)

//读取商品的图片
const showPictures = id => axios.get(`/api/v1/imgs/${id}`).then(res => res.data)

//搜索商品
const searchProducts = form =>
axios.post('/api/v1/searches', form).then(res => res.data)

export {
postProduct,
showProduct,
listProducts,
showPictures,
searchProducts
}

🎊4. 部分页面展示

4.1 前台页面

  • 主页面

在这里插入图片描述

在这里插入图片描述

  • 商品页面
    在这里插入图片描述

在这里插入图片描述

  • 发布商品
    在这里插入图片描述
  • 购物车
    在这里插入图片描述
  • 结算页面
  • 个人中心
    在这里插入图片描述

4.2 后台管理

  • 用户管理
    在这里插入图片描述
  • 商品管理

在这里插入图片描述


🎆5. 结语

这个商场是在作者的开源项目基础上加以改进的!
原GitHub地址:CongZ666
真的非常感谢作者的开源
让我能通过这个项目,懂得Gin+Vue前后端分离的很多知识!!
还有懂了一些如同支付功能等,之前没有做过的功能。
不过我还没搞懂极验的功能。后续再完善。


🎇最后

小生凡一,期待你的关注。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【每日算法】根据相邻关系还原数组的两种方式:「单向构造」&「

发表于 2021-07-25

本文正在参加「Python主题月」,详情查看 活动链接

题目描述

这是 LeetCode 上的 1743. 从相邻元素对还原数组 ,难度为 中等。

Tag : 「哈希表」、「双指针」、「模拟」

存在一个由 n 个不同元素组成的整数数组 nums ,但你已经记不清具体内容。好在你还记得 nums 中的每一对相邻元素。

给你一个二维整数数组 adjacentPairs ,大小为 n - 1 ,其中每个 adjacentPairs[i] = [ui, vi] 表示元素 ui 和 vi 在 nums 中相邻。

题目数据保证所有由元素 nums[i] 和 nums[i+1] 组成的相邻元素对都存在于 adjacentPairs 中,存在形式可能是 [nums[i], nums[i+1]] ,也可能是 [nums[i+1], nums[i]] 。这些相邻元素对可以 按任意顺序 出现。

返回 原始数组 nums 。如果存在多种解答,返回 其中任意一个 即可。

示例 1:

1
2
3
4
5
6
css复制代码输入:adjacentPairs = [[2,1],[3,4],[3,2]]

输出:[1,2,3,4]

解释:数组的所有相邻元素对都在 adjacentPairs 中。
特别要注意的是,adjacentPairs[i] 只表示两个元素相邻,并不保证其 左-右 顺序。

示例 2:

1
2
3
4
5
6
css复制代码输入:adjacentPairs = [[4,-2],[1,4],[-3,1]]

输出:[-2,4,1,-3]

解释:数组中可能存在负数。
另一种解答是 [-3,1,4,-2] ,也会被视作正确答案。

示例 3:

1
2
3
lua复制代码输入:adjacentPairs = [[100000,-100000]]

输出:[100000,-100000]

提示:

  • nums.length == n
  • adjacentPairs.length == n - 1
  • adjacentPairs[i].length == 2
  • 2 <= n <= 10510^5105
  • -10510^5105 <= nums[i], ui, vi <= 10510^5105
  • 题目数据保证存在一些以 adjacentPairs 作为元素对的数组

单向构造(哈希表计数)

根据题意,由于所有的相邻关系都会出现在 numsnumsnums 中,假设其中一个合法数组为 ansansans,长度为 nnn。

那么显然 ans[0]ans[0]ans[0] 和 ans[n−1]ans[n - 1]ans[n−1] 在 numsnumsnums 中只存在一对相邻关系,而其他 ans[i]ans[i]ans[i] 则存在两对相邻关系。

因此我们可以使用「哈希表」对 numsnumsnums 中出现的数值进行计数,找到“出现一次”的数值作为 ansansans 数值的首位,然后根据给定的相邻关系进行「单向构造」,为了方便找到某个数其相邻的数是哪些,我们还需要再开一个「哈希表」记录相邻关系。

image.png

Java 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Java复制代码class Solution {
public int[] restoreArray(int[][] aps) {
int m = aps.length, n = m + 1;
Map<Integer, Integer> cnts = new HashMap<>();
Map<Integer, List<Integer>> map = new HashMap<>();
for (int[] ap : aps) {
int a = ap[0], b = ap[1];
cnts.put(a, cnts.getOrDefault(a, 0) + 1);
cnts.put(b, cnts.getOrDefault(b, 0) + 1);
List<Integer> alist = map.getOrDefault(a, new ArrayList<>());
alist.add(b);
map.put(a, alist);
List<Integer> blist = map.getOrDefault(b, new ArrayList<>());
blist.add(a);
map.put(b, blist);
}
int start = -1;
for (int i : cnts.keySet()) {
if (cnts.get(i) == 1) {
start = i;
break;
}
}
int[] ans = new int[n];
ans[0] = start;
ans[1] = map.get(start).get(0);
for (int i = 2; i < n; i++) {
int x = ans[i - 1];
List<Integer> list = map.get(x);
for (int j : list) {
if (j != ans[i - 2]) ans[i] = j;
}
}
return ans;
}
}

Python 3 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Python复制代码class Solution:
def restoreArray(self, adjacentPairs: List[List[int]]) -> List[int]:
m = n = len(adjacentPairs)
n += 1
cnts = defaultdict(int)
hashmap = defaultdict(list)
for a, b in adjacentPairs:
cnts[a] += 1
cnts[b] += 1
hashmap[a].append(b)
hashmap[b].append(a)
start = -1
for i, v in cnts.items():
if v == 1:
start = i
break
ans = [0] * n
ans[0] = start
ans[1] = hashmap[start][0]
for i in range(2, n):
x = ans[i - 1]
for j in hashmap[x]:
if j != ans[i - 2]:
ans[i] = j
return ans
  • 时间复杂度:O(n)O(n)O(n)
  • 空间复杂度:O(n)O(n)O(n)

双向构造(双指针)

在解法一中,我们通过「哈希表」计数得到 ansansans 首位的原始作为起点,进行「单向构造」。

那么是否存在使用任意数值作为起点进行的双向构造呢?

答案是显然的,我们可以利用 ansansans 的长度为 2<=n<=1052 <= n <= 10^52<=n<=105,构造一个长度 10610^6106 的数组 qqq(这里可以使用 static 进行加速,让多个测试用例共享一个大数组)。

这里 qqq 数组不一定要开成 1e61e61e6 大小,只要我们 qqq 大小大于 ansansans 的两倍,就不会存在越界问题。

从 qqq 数组的 中间位置 开始,先随便将其中一个元素添加到中间位置,使用「双指针」分别往「两边拓展」(l 和 r 分别指向左右待插入的位置)。

当 l 指针和 r 指针直接已经有 nnn 个数值,说明整个 ansansans 构造完成,我们将 [l+1,r−1][l + 1, r - 1][l+1,r−1] 范围内的数值输出作为答案即可。

image.png

Java 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Java复制代码class Solution {
static int N = (int)1e6+10;
static int[] q = new int[N];
public int[] restoreArray(int[][] aps) {
int m = aps.length, n = m + 1;
Map<Integer, List<Integer>> map = new HashMap<>();
for (int[] ap : aps) {
int a = ap[0], b = ap[1];
List<Integer> alist = map.getOrDefault(a, new ArrayList<>());
alist.add(b);
map.put(a, alist);
List<Integer> blist = map.getOrDefault(b, new ArrayList<>());
blist.add(a);
map.put(b, blist);
}
int l = N / 2, r = l + 1;
int std = aps[0][0];
List<Integer> list = map.get(std);
q[l--] = std;
q[r++] = list.get(0);
if (list.size() > 1) q[l--] = list.get(1);
while ((r - 1) - (l + 1) + 1 < n) {
List<Integer> alist = map.get(q[l + 1]);
int j = l;
for (int i : alist) {
if (i != q[l + 2]) q[j--] = i;
}
l = j;

List<Integer> blist = map.get(q[r - 1]);
j = r;
for (int i : blist) {
if (i != q[r - 2]) q[j++] = i;
}
r = j;
}
int[] ans = new int[n];
for (int i = l + 1, idx = 0; idx < n; i++, idx++) {
ans[idx] = q[i];
}
return ans;
}
}

Python 3 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
Python复制代码class Solution:
N = 10 ** 6 + 10
q = [0] * N

def restoreArray(self, adjacentPairs: List[List[int]]) -> List[int]:
m = len(adjacentPairs)
n = m + 1
hashmap = defaultdict(list)
for a, b in adjacentPairs:
hashmap[a].append(b)
hashmap[b].append(a)
l = self.N // 2
r = l + 1
std = adjacentPairs[0][0]
lt = hashmap[std]
self.q[l] = std
l -= 1
self.q[r] = lt[0]
r += 1
if len(lt) > 1:
self.q[l] = lt[1]
l -= 1
while (r-1)-(l+1)+1<n:
alt = hashmap[self.q[l+1]]
j = l
for i in alt:
if i != self.q[l+2]:
self.q[j] = i
j -= 1
l = j

blt = hashmap[self.q[r-1]]
j = r
for i in blt:
if i != self.q[r - 2]:
self.q[j] = i
j += 1
r = j
ans = [0] * n
for idx in range(n):
ans[idx] = self.q[idx+l+1]
return ans
  • 时间复杂度:O(n)O(n)O(n)
  • 空间复杂度:O(n)O(n)O(n)

最后

这是我们「刷穿 LeetCode」系列文章的第 No.1743 篇,系列开始于 2021/01/01,截止于起始日 LeetCode 上共有 1916 道题目,部分是有锁题,我们将先把所有不带锁的题目刷完。

在这个系列文章里面,除了讲解解题思路以外,还会尽可能给出最为简洁的代码。如果涉及通解还会相应的代码模板。

为了方便各位同学能够电脑上进行调试和提交代码,我建立了相关的仓库:github.com/SharingSour… 。

在仓库地址里,你可以看到系列文章的题解链接、系列文章的相应代码、LeetCode 原题链接和其他优选题解。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Dubbo 30 RPC Server 接收消息的处理

发表于 2021-07-25

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 . 前言

Dubbo 3.0 在去年就已经发布了 , 整体来说使用没有太大的区别 , 之前一直没有好好读一下 Dubbo 的源码 , 这里把这些补上 , 顺便来看看和 2.0 有什么区别.

Dubbo 2.0 中把 Dubbo 分成了 10 个层次 , 相对 3.0 同样如此 , 这一篇只要对应 Proxy , Exchange , Transport 三个部分

1.1 补充一 : RPC 简介

RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。

一个基本的RPC架构里面应该至少包含以下4个组件:

1、客户端(Client):服务调用方(服务消费者)

2、客户端存根(Client Stub):存放服务端地址信息,将客户端的请求参数数据信息打包成网络消息,再通过网络传输发送给服务端

3、服务端存根(Server Stub):接收客户端发送过来的请求消息并进行解包,然后再调用本地服务进行处理

4、服务端(Server):服务的真正提供者

Dubbo3 的核心内容之一就是定义了下一代 RPC 协议 , 除了通信功能 , 还具有以下的功能 :

  • 统一的跨语言二进制格式
  • 支持Streaming 和应用层全双工调用模型
  • 易于扩展
  • 能够被各层设备识别

二 . RPC Server 端

2.1 Service 服务类案例

以官方案例为例 , 先看一下 Service 端有什么关键点 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@DubboService
public class DemoServiceImpl implements DemoService {
private static final Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);

@Override
public String sayHello(String name) {
logger.info("Hello " + name + ", request from consumer: " + RpcContext.getServiceContext().getRemoteAddress());
return "Hello " + name + ", response from provider: " + RpcContext.getServiceContext().getLocalAddress();
}

@Override
public CompletableFuture<String> sayHelloAsync(String name) {
return null;
}

}


// 核心主要是 @DubboService
// 实现接口并不是Dubbo 流程上的必需点 ,只是为了保证规范 , Client 端的 @Reference 如果与此接口不匹配 , 会相对抛出异常

2.2 Server 端创建代理

忽略 @DubboService 的扫描逻辑 , 我们只是来看一下是怎么扫描和创建 Server 端的代理的

2.3 Server 端被调流程

该环节中将属性映射到对应的方法 , 主要看一下 Server 端的反射流程 :

  • C- ChannelEventRunnable # run : 对 channel 监听和处理
  • C- DecodeHandler # received : 接受消息
  • C- HeaderExchangeHandler # received : Exchange 接收
  • C- HeaderExchangeHandler # handleRequest : 处理请求
  • C- DubboProtocol # requestHandler : 反射到指定的方法
  • C- xxxFilter.invoke : Filter 拦截链处理
  • C- InvokerWrapper # invoke
  • C- AbstractProxyInvoker # invoke
  • C- JavassistProxyFactory # doInvoke

这里可以看到 , 主体还是经过 Exchange - Protocol - Proxy 进行处理的

2.3.1 接收消息

在从 received 看之前 , 要先看一下 Dubbo Remote 远程调用的相关逻辑 , 可以看懂 ,入口类为ChannelEventRunnable :

补充 : ChannelEventRunnable 的注册

1
2
3
4
5
6
7
8
9
java复制代码// ChannelEventRunnable 的注册是在调用时进行 , 首先构建了一个 NettyServerHandler 进行 Netty 服务器的创建
C- NettyServerHandler # channelActive
C- AbstractServer # connected
C- AllChannelHandler # connected : Channel 连接处理
C- ChannelEventRunnable # ChannelEventRunnable : 构造器加载 ChannelEventRunnable

// PS : 更之前通过 DubboBootstrap # exportServices 发起的
// 经过 DubboProtocol # createServer 逻辑
// 最终创建了 NettyServer , 这里先不深入

正式处理 : 看看 ChannelEventRunnable 如何实现监听分类

可以看到 , ChannelEventRunnable 中会针对不同的 state 去调用不同的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码// 接收到数据
- ChannelState.RECEIVED <---> handler.received(channel, message)
// Channel 连接
- ChannelState.CONNECTED <---> handler.connected(channel)
// Channel 连接失败
- ChannelState.DISCONNECTED <---> handler.disconnected(channel)
// Channel 发送状态
- ChannelState.SENT <---> handler.sent(channel, message)
// 出现异常
- ChannelState.CAUGHT <---> handler.caught(channel, exception)

// Step 1 : 调用 CONNECTED 建立连接
channel -> [id: 0xb761cccc, L:/192.168.181.2:20880 - R:/192.168.181.2:52575]
url : dubbo://192.168.181.2:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bind.ip=192.168.181.2&bind.port=20880&channel.readonly.sent=true&codec=dubbo&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&heartbeat=60000&interface=org.apache.dubbo.demo.DemoService&metadata-type=remote&methods=sayHello,sayHelloAsync&pid=5676&release=&side=provider&threadname=DubboServerHandler-192.168.181.2:20880&timestamp=1627140229352

// Step 2 : 接收消息
handler.received(channel, message)

此处只关注 received , 代码和接收的数据如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
JAVA复制代码// C- DecodeHandler # received : 接受消息
public void received(Channel channel, Object message) throws RemotingException {
// Step 1 : 省略对 message 进行 decode 转换逻辑 ...

// Step 2 : 调用 Handler 处理
handler.received(channel, message);
}

channel -> NettyChannel [channel=[id: 0x665df7bf, L:/192.168.181.2:20880 - R:/192.168.181.2:61537]]
message -> Request
[id=1, version=2.0.2, twoway=true, event=false, broken=false, data=RpcInvocation
[methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world],
attachments=
{
input=272, dubbo=2.0.2, path=org.apache.dubbo.demo.DemoService, version=0.0.0,
remote.application=dubbo-demo-annotation-consumer,
interface=org.apache.dubbo.demo.DemoService
}
]
]

2.3.2 received 处理 request

先来看一下 ExchangeHandler 体系结构

dubbo-system-ExchangeHandler.png

这里在 DecodeHandler 中就先进行了一遍处理 , 分别通过 message 的类型 , 进行了一次 decode 处理 :

  • Decodeable –> decode(message);
  • Request –> decode(((Request) message).getData())
  • Response –> decode(((Response) message).getResult());

然后才是 HeaderExchangeHandler 对 Channel 的请求的处理 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码C- HeaderExchangeHandler
M- received :
- HeaderExchangeChannel.getOrAddChannel(channel)
1- handlerEvent(channel, request)
2- handleRequest(exchangeChannel, request)
3- handler.received(exchangeChannel, request.getData())
4- handleResponse(channel, (Response) message)
5- channel.send(echo)


public void received(Channel channel, Object message) throws RemotingException {
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
// 通过具体的类型进行分别的处理 , 前面的 message 已经进行过 Decode
// 这里分开做可以看到很明显的解耦想法
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
// 针对事件或者双向标识(是否 response ,ack )进行分别处理 , 默认直接转向下层 Handler
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
// 单纯的异常处理 ,说明什么类型都不是
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
}

2.3.3 处理 Request , 构建 Response

因为 isTwoWay 为 true ,说明需要通过 response 告知调用者已经接收到消息

这里准备了 Response , 并且进行返回 , 同时调用对应方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {

// 准备 Response 进行返回
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
// ... 此处是出现异常 , 省略构建异常的相关逻辑

// channle 返回 response
channel.send(res);
return;
}
// 从 messsage 中获取 InvokeMehod
Object msg = req.getData();
try {
// 此处构建了一个 CompletableFuture , 用于发起异步处理 -> DubboProtocol
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
// 处理完成后直接返回 , 舒服
channel.send(res);
} catch (RemotingException e) {

}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}

2.3.4 获取反射到的方法

上面那个环节构建了一个 CompletableFuture , 此对象由 DubboProtocol # requestHandler 完成构建 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
java复制代码C- DubboProtocol
P- requestHandler : 注意 ,这是一个属性 , 在创建 DubboProtocol 时完成创建的


private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

// 这才是流程中调用的方法
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

if (!(message instanceof Invocation)) {
throw new RemotingException(....);
}

// 构建代理类
Invocation inv = (Invocation) message;
// Step 2 : 核心逻辑 , 获取 Invoke 代理类
Invoker<?> invoker = getInvoker(channel, inv);
// 如果是回调,需要考虑向后兼容性
if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
return null;
}
}

// 配置容器消息 : RemoteAddress
RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());

// Step 3 : 注意 . 此处开始的是拦截器链 -> 2.4 Filter 体系
// invoker : FilterChainBuilder$FilterChainNode
Result result = invoker.invoke(inv);

// 处理完成 ,Future 返回
return result.thenApply(Function.identity());
}


@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);

} else {
super.received(channel, message);
}
}

// 在第一步连接的时候 ,就会调用该方法 , 此时不会做 invoke 处理
// 但是, 我认为这里应该有定制的途径 ,后面有机会看一下
@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, ON_CONNECT_KEY);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
invoke(channel, ON_DISCONNECT_KEY);
}

private void invoke(Channel channel, String methodKey) {
// 创建代理 ,代理不为空 ,才会继续逻辑
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
received(channel, invocation);
} catch (Throwable t) {
logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
}
}
}

// channel.getUrl()总是绑定到一个固定的服务,并且这个服务是随机的
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
// 此处会从 url 获取对应的属性 , 常量有 : onconnect , ondisconnect , 此时不会创建代理
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}

// 通过
RpcInvocation invocation = new RpcInvocation(method, url.getParameter(INTERFACE_KEY), "", new Class<?>[0], new Object[0]);
invocation.setAttachment(PATH_KEY, url.getPath());
invocation.setAttachment(GROUP_KEY, url.getGroup());
invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
invocation.setAttachment(VERSION_KEY, url.getVersion());
if (url.getParameter(STUB_EVENT_KEY, false)) {
invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
}

return invocation;
}
};


// Step 2 : 具体的方法调用 , 该方法中解析出 ServiceKey , 并且查询出 Invoke
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;

// Step 2-1 : 准备标识数据
// 拿到端口 : 20880
int port = channel.getLocalAddress().getPort();
// 拿到需要的配置类型 : org.apache.dubbo.metadata.MetadataService
String path = (String) inv.getObjectAttachments().get(PATH_KEY);

// Step 2-2 : 判断它是否为客户端的回调服务
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(STUB_EVENT_KEY));
if (isStubServiceInvoke) {
port = channel.getRemoteAddress().getPort();
}

// 是否有回调代理
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if (isCallBackServiceInvoke) {
path += "." + inv.getObjectAttachments().get(CALLBACK_SERVICE_KEY);
inv.getObjectAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}

// Step 2-3 : 获取 ServiceKey
// dubbo-demo-annotation-provider/org.apache.dubbo.metadata.MetadataService:1.0.0:20880
String serviceKey = serviceKey(
port,
path,
(String) inv.getObjectAttachments().get(VERSION_KEY),
(String) inv.getObjectAttachments().get(GROUP_KEY)
);

// Step 2-4 : 查询对应的 DubboExporter
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

if (exporter == null) {
throw new RemotingException(....;
}

// Step 2-5 : 返回 invoke 类
return exporter.getInvoker();
}

// 补充 Step 2-3 :
protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup);
}

2.4 Filter 体系

之前看到 , 构建 invoke 的时候是构建 FilterChain , 感觉这种写法挺不错 , 以后有机会试试 , 这里的 Filter 主要有 :

  • EchoFilter
  • ClassLoaderFilter
  • GenericFilter
  • ContextFilter
  • ExceptionFilter
  • MonitorFilter
  • TimeoutFilter
  • TraceFilter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码C- FilterChainBuilder

public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
// filter 链的处理
asyncResult = filter.invoke(nextNode, invocation);
} catch (Exception e) {
// 省略 ,主要是进行 Listerner 的监听 , 又在玩异步
throw e;
} finally {

}
return asyncResult.whenCompleteWithContext((r, t) -> {
// 处理完成后 ,还是要通知监听器
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if (listener != null) {
if (t == null) {
listener.onResponse(r, originalInvoker, invocation);
} else {
listener.onError(t, originalInvoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof FILTER.Listener) {
FILTER.Listener listener = (FILTER.Listener) filter;
if (t == null) {
listener.onResponse(r, originalInvoker, invocation);
} else {
listener.onError(t, originalInvoker, invocation);
}
}
});
}

2.5 最终的方法调用

以及最后一个Filter 的方法调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码C- AbstractProxyInvoker

// 反正就是各种异步和 Future , 用的已经炉火纯青了
public Result invoke(Invocation invocation) throws RpcException {
try {
// 调用 JavassistProxyFactory 处理方法
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value);
CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
AppResponse result = new AppResponse(invocation);
//.....
return result;
});
return new AsyncRpcResult(appResponseFuture, invocation);
} catch (InvocationTargetException e) {
return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
} catch (Throwable e) {
throw new RpcException(....);
}
}

// PS : AsyncRpcResult 的作用 (有必要后面再看)
这个类表示一个未完成的RPC调用,它将保存这个调用的一些上下文信息,例如RpcContext和Invocation
因此,当调用结束并返回结果时,它可以保证恢复的所有上下文与调用任何回调之前发出调用时相同。



// C- JavassistProxyFactory : 也没啥看头了 , 基本上东西前面都准备好了 , 这里反射处理就行
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}

- proxy : 被代理的类
- methodName : 调用的方法
- parameterTypes : 属性类型

2.6 RPC 的 线程调用

RPC 被调流程中 , 创建的是 InternalRunnable , 来看一下是如何调用的

其底层是通过 Netty 进行的交互 , 这里就不详述了 ,以后说 Netty 时再分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码public class InternalRunnable implements Runnable{
private final Runnable runnable;

public InternalRunnable(Runnable runnable){
this.runnable=runnable;
}

/**
* After the task execution is completed, it will call {@link InternalThreadLocal#removeAll()} to clear
* unnecessary variables in the thread.
*/
@Override
public void run() {
try{
runnable.run();
}finally {
InternalThreadLocal.removeAll();
}
}

}


// 调用的路径
public class NamedInternalThreadFactory extends NamedThreadFactory {
@Override
public Thread newThread(Runnable runnable) {
String name = mPrefix + mThreadNum.getAndIncrement();
InternalThread ret = new InternalThread(mGroup, InternalRunnable.Wrap(runnable), name, 0);
ret.setDaemon(mDaemon);
return ret;
}
}

三 . 深入梳理

3.1 方法的信息来源和校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
java复制代码// 从 Netty 传入的参数是这样的 :
{
"broken": false,
"data": {
"arguments": ["world"],
"attachments": {
"input": "272",
"path": "org.apache.dubbo.demo.DemoService",
"remote.application": "dubbo-demo-annotation-consumer",
"dubbo": "2.0.2",
"interface": "org.apache.dubbo.demo.DemoService",
"version": "0.0.0"
},
"methodName": "sayHello",
"objectAttachments": {
"input": "272",
"dubbo": "2.0.2",
"path": "org.apache.dubbo.demo.DemoService",
"version": "0.0.0",
"remote.application": "dubbo-demo-annotation-consumer",
"interface": "org.apache.dubbo.demo.DemoService"
},
"parameterTypesDesc": "Ljava/lang/String;",
"targetServiceUniqueName": "org.apache.dubbo.demo.DemoService:0.0.0"
},
"event": false,
"heartbeat": false,
"id": 1,
"twoWay": true,
"version": "2.0.2"
}

// 这里可以看到 , 从Netty 传入的参数中就已经携带了方法信息 , 也就是说方法信息是 Client 端发送的 ,
// 注意 , 这个 methodName 并不是后置校验 , 在项目初始化的时候 , 就进行了校验 , 这个位置以后细说 , 只跟到最后校验的地方

C- ReferenceConfig
protected synchronized void init() {
// 前置流程省略 : DubboBootstrap 初始化和配置初始胡啊

// 校验 Invoke
checkInvokerAvailable();
}



// Step 2 :检验是否正确
private void checkInvokerAvailable() throws IllegalStateException {
if (shouldCheck() && !invoker.isAvailable()) {
invoker.destroy();
throw new IllegalStateException("Failed to check the status of the service "......);
}
}

总结

从这篇发现 , Dubbo 的 Method 代理信息是从远程 Client 传递过来的 ,而远程 Client 在启动时会校验 @ReferenceService 是否正确 , 从而保证了准确性

  • Dubbo Server 端首先会创建 NettyService
  • 处理的起点是 ChannelEventRunnable , 通过 DecodeHandler 进行解析
  • 核心的处理逻辑在 DubboProtocol , 这个环节中已经获取具体的 Invoke 类了
  • 最终的调用方为 JavassistProxyFactory , 发起代理的调用

整体来说 , 看 Dubbo 的代码还是更接地气 , 比看 Spring 的代码收获了更多的东西 !!!!!!!!!!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

手写一个rpc远程调用服务demo

发表于 2021-07-24

前言

  • 因为公司业务需求,使用了K8S + istio进行服务部署和治理,没有使用常规的springclould技术栈(包括注册中心nacos和openfeign远程服务调用)。
  • 所以就自研了一个基于AOP实现的rpc远程调用服务模块。其实现原理实现和feign类似,都是通过远程调用方法的代理对象发送HTTP请求并返回结果。

实现代码

  • 废话不多说,下面直接上代码
  • 下图是demo模块划分,common是公共模块,demo-order和demo-user是模拟两个服务调用。
    在这里插入图片描述
  • 定义一个标识为远程调用类的注解 @RpcService ,有点类似于feign的@FeignClient注解。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码/**
*
* @AUTHOR ZRH
* @DATE 2021/4/10
*/
@Component
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RpcService {

/**
* 远程服务名称
*/
String service();

/**
* 端口
*/
String port();
}
  • 定义两个标识远程调用接口请求方式注解 @get和@post,相当于@PostMapping和@GetMapping。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码/**
*
* @AUTHOR ZRH
* @DATE 2021/4/10
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Post {

/**
* 接口路由
*
* @return
*/
String value();
}

/**
*
* @AUTHOR ZRH
* @DATE 2021/4/10
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Get {

/**
* 接口路由
*
* @return
*/
String value();
}
  • 然后定义一个AOP切面处理类 AopRpcHandler。只要远程调用接口方法上有注解@Post或者@Get,就会对方法进行代理方式请求。
  • 因为这里不需要原本远程调用方法的执行结果,所以这里直接使用@Around环绕切面,并且不需要执行原方法,所以直接使用JoinPoint 做参数接口(ProceedingJoinPoint继承自JoinPoint,里面多了两个阻塞方法proceed,用于获取原代理方法的执行结果)。
  • 通过代理对象获取到原方法的参数值,参数名,接口路由地址,接口请求方式,远程服务和端口等等。使用okhttp工具类发送代理请求,然后返回响应结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
java复制代码/**
* @AUTHOR ZRH
* @DATE 2021/4/10
*/
@Slf4j
@Aspect
@Component
public class AopRpcHandler {

private final static String HTTP = "http://";

@Around(value = "@annotation(post)")
public String aopPost(JoinPoint joinPoint, Post post) {
String result = null;
String url = null;
try {
RpcService rpcService = (RpcService) joinPoint.getSignature().getDeclaringType().getAnnotation(RpcService.class);
url = HTTP + rpcService.service() + ":" + rpcService.port() + "/" + post.value();
Object[] args = joinPoint.getArgs();
result = OkHttpUtils.post(url, JSON.toJSONString(args[0]));
} catch (Throwable throwable) {
log.error("服务调用异常,url = [{}]", url);
}
return result;
}

@Around(value = "@annotation(get)")
public String aopGet(JoinPoint joinPoint, Get get) {
String result = null;
String url = null;
try {
RpcService rpcService = (RpcService) joinPoint.getSignature().getDeclaringType().getAnnotation(RpcService.class);
url = HTTP + rpcService.service() + ":" + rpcService.port() + "/" + get.value();

MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Parameter[] parameters = signature.getMethod().getParameters();
if (parameters != null && parameters.length > 0) {
Object[] args = joinPoint.getArgs();
int length = parameters.length;
url += "?";
for (int i = 0; i < length; i++) {
url += parameters[i] + "=" + args[i];
if (i != length - 1) {
url += "&";
}
}
}
result = OkHttpUtils.get(url);
} catch (Throwable throwable) {
log.error("服务调用异常,url = [{}]", url);
}
return result;
}
}
  • 然后在demo-user服务中如果有远程调用场景,就创建一个远程调用类。使用注解@RpcService和@Post申明一下即可。方法中的返回值和返回类型可以自定义,比如一般项目中会有统一的响应结果。
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码/**
* @AUTHOR ZRH
* @DATE 2021/4/10 0010 1:06
*/
@RpcService(service = "demo-order", port = "18002")
public class AopRpcDemo {

@Post("post")
public String post(String param) {
return "1";
}
}
  • 在demo-user服务中使用和正常调用接口一样。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
sql复制代码/**
* @AUTHOR ZRH
* @DATE 2021/4/10 0010 0:42
*/
@RestController
public class DemoController {

@Autowired
private AopRpcDemo aopRpcDemo;

@PostMapping("post")
public String post() {
String post = aopRpcDemo.post("zrh.post");
System.out.println("调用远程接口方法返回结= " + post);
return "ok";
}
}
  • 如果就这样把demo服务启动后,访问是访问不了的。因为在aop切面处理类中对http请求的URL没有通过域名而是通过服务名称拼接的(一般情况微服务项目的子服务可能会集群部署到不同的服务器上,那么就会有多个访问域名等等,这里如果直接把多个访问域名都拼接在@RpcService注解里,那么就还需要自行实现负载均衡策略选择某个域名进行访问)
  • 这里如果是基于注册中心和feign进行服务调用,那就没有问题,因为feign会通过服务名称到注册中心找到对应服务的地址进行请求远程接口。而这里因为没有使用注册中心,所以在window上需要增加hosts文件上的地址映射关系。在C:\Windows\System32\drivers\etc目录下的hosts文件增加。并在cmd控制台中使用ipconfig /flushdns刷新DNS内容。
  • @RpcService中的service写服务名而不写服务访问域名,是因为如果是多机集群部署,那么就可以使用服务名映射域名方式通过Nginx负载均衡进行转发请求,也可以使用K8S集群环境的服务发现和负载均衡。
  • 在K8S集群容器内网可以使用服务名称直接解析访问到其他服务,并且也实现了负载均衡处理。外网访问访问需要K8S集群公网域名。
    在这里插入图片描述
  • 先看一下两个服务的配置文件和demo-order的接口
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
  • 服务启动后,访问http://localhost:18001/post,结果如下图:
    在这里插入图片描述
    在这里插入图片描述
  • 最后的结果和我们想要的结果一致。
  • 上面的demo是很简单的实现。如果读者想要在自己项目中使用此类技术栈,那需要考虑服务容错,服务发现,服务限流等等是否能兼容等。

最后

  • openfeign其实是可以独立和springboot进行使用的。先引入openfeign的maven包
1
2
3
4
5
java复制代码        <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<version>3.0.2</version>
</dependency>
  • 然后在使用@FeignClient注解时,对url配置接口的访问地址,最后的执行结果和上述的结果是一样的。
1
2
3
4
5
6
7
8
9
10
java复制代码/**
* @AUTHOR ZRH
* @DATE 2021/4/10 0010 1:15
*/
@FeignClient(name = "demo-user", url = "demo-user:18001")
public interface UserFeign {

@PostMapping("hello")
String hello(@RequestBody String param);
}
  • 上述代码我已经上传到我的gitee账号上:点击跳转
  • 如果有什么地方写的不对的,欢迎指出,我确认会加以改正。
  • 虚心学习,共同进步 -_-

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

帮你快速上手Jenkins自动化部署 Jenkins学习

发表于 2021-07-24

Jenkins学习

在未学习Jenkins之前,只是对Jenkins有一个比较模糊的理解,即Jenkins是一个自动化构建项目发布的工具,可以实现代码->github或者gitlab库->jenkins自动部署->访问的整体的过程,而无需人为重新打包进行war包或者jar包或者其他工具打包到linux上进行运行的一个工具。但大致只是知道具体的一个流程,还是没有整体的尝试过,这次是工作上面必须用到Jenkins,就必须要花时间来学习一下这个工具了,有助于在后续的工作中排查问题,以及发布上线自己也知道整体的流程,更加轻车熟路的进行开发部署工作!

  1. 下载与安装

Jenkins的下载可以直接在官网下载,方式还是比较简单的,官网提供了具体的下载和安装的步骤。

1
2
3
4
5
6
7
8
9
10
11
sh复制代码sudo wget -O /etc/yum.repos.d/jenkins.repo https://pkg.jenkins.io/redhat-stable/jenkins.repo
sudo rpm --import https://pkg.jenkins.io/redhat-stable/jenkins.io.key

# 安装
yum install jenkins -y

# 启动
systemctl daemon-reoload
systemctl start jenkins
# 查看状态,如果没有问题的话,状态应该是running
systemctl status jenkins

这里注意,如果是没有安装java环境的,可以按照官网指示的www.jenkins.io/doc/book/in… 一起把jdk给装一下。

问题点

启动的时候可能会报错,这个错误会来自于java的环境配置的不是很正确

jenkins启动报错

解决办法:

1
2
sh复制代码# 修改jenkins的配置文件中的java的路径
vim /etc/init.d/jenkins

修改jenkins的java配置

  1. Jenkins的使用

之后可以按照文档,直接访问ip:8080可以看到具体的jenkins的页面,会要求输入密码操作,这个密码给出了具体的路径,所以不需要担心,直接cat 路径即可获取密码操作。进入到安装插件的页面,直接点击推荐的安装插件即可进入到插件。

jenkins插件安装
新建item,配置item,配置与github之间的联系(这里主要是公钥和私钥配置),解决为什么连接不上github,遇到clone失败的话,是因为本地还没有将git远程添加为可信任的用户,所以需要自己手动的执行git clone然后添加信任凭证即可。

2.1 配置流程

需要配置具体的地址,这个地址必须是网上github可以访问到的地址,github无法访问虚拟机的地址,除非是利用阿里云或者腾讯云服务器。我是用的腾讯云服务器,此时需要设置github上的webhook的网址,主要的目的是仓库一旦被push此时jenkins就会重新构建整个服务器。

2.1.1 如何让jenkins与github通信

这个问题是必须要考虑的,因为自动构建整个的过程需要本地的代码与github库构建起来,然后jenkins通过webhook的方式接收到github那边的push信号,然后从github中拉取代码进行本地的一个构建任务。

对于本地的代码与github之间的通信我们需要生成公钥和私钥然后配置好即可,对于jenkins来说也是一样,要想从github拉代码就需要建立公钥和私钥的方式来建立通信。

第一步我们需要生成公钥和私钥:

1
2
3
4
sh复制代码ssh-keygen -t rsa -C "xxx@xxx.com"
生成公钥和私钥
cat ~/.ssh/id_rsa.pub # 写入到github的settings => SSH and GPG keys
cat ~/.ssh/id_rsa # 写入到jenkins的配置中

设置公钥

2.1.2 配置Jenkins

注意在配置之前你需要先新建你任务,可以任意选择自由风格的,名字的话随你安排

项目配置

先对General部分进行配置,即我们的github项目的URL,这个就把自己想要自动构建的项目房子放在这里就好了。

源码管理配置

点击对上面红色字体部分的添加此时可以看到。

添加凭证

构建触发器

构建触发器

上面的添加框中的内容

img

Secret Token生成

token的生成

构建

这个是代码拉过来之后的行为了,就是你需要自己写脚本把自己的项目运行起来。

构建

2.1.3 一段Django启动的脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
sh复制代码#!/bin/bash
VENV_DIR=/usr/local/src/jenkins
JENKINS_PROJECT_DIR=/var/lib/jenkins/workspace/jenkinsdemo
# 构建环境中的脚本命令
echo "Congratulations! Build Success!"
# 先判断一下进程是否在,在的话就不用管了,不在的话需要执行激活操作
PROCESS_EXIST=`netstat -anp | grep 8899 | grep -v "grep" | awk '{print $7///}'`
# 如果长度为0的话此时需要激活环境,否则直接跳过
if [ -z $PROCESS_EXIST ]
then
# 进入到venv环境中需要激活环境
source $VENV_DIR/bin/activate
# 进入到具体的代码的workspace空间中
cd $JENKINS_PROJECT_DIR
# 启动当前项目
python manage.py runserver 0.0.0.0:8899 &
echo -ne "\n"
sleep 3
# 将环境注销
# deactivate
# 提示启动成功
echo "the project run success"
fi
echo "The project reload success"

2.1.4 webhook的配置

如果没有配置webhook,jenkins是不能够实现自动构建的,那就需要自己手动点击构建了,就没有啥意思了。所以在这里我们还需要配置一个webhook。

系统设置

2.1.5 jenkins添加webhook

添加webhook

点击高级之后,会出现如下界面

配置webhook

设置具体的hook地址

设置hook地址

github配置webhook

github配置

  1. 运行

运行的话,本地修改代码,此时你会发现Jenkins实现了自动构建,如果你写了脚本,你会发现你的程序也已经运行了起来,此时你只需要对你的代码进行小幅度的改动,将代码推送到github上去,此时就可以通过jenkins自动部署,将改动的代码更新到你的代码库实现自动构建部署,然后重新请求就会发现内容已经更新了。

  1. 小结

你所以为的不会再见到的东西,通常都会在未来某个时间节点再遇见,并且你不得不解决它。 —– 我 & Jenkins

Jenkins之前也有接触过,第一次实习的时候,同事们就是用的Jenkins来实现代码的发版,但是那个时候还是2019年的事情了,自己因为没有机会参加真实的线上发布,就用不上,虽然有想过去学习这个,但是一直都搁置了。后面到了第二次实习,在滴滴的时候,滴滴那边基础架构把它包了一层,当时都没意识到是Jenkins,哈哈,虽然自己也用了很多次,但是包的还是不错的,用起来很舒服。后面到了腾讯实习的话,就没参加过正式的发版,所以就没接触(内部应该也做了包装)。

现在工作了发现自己也需要去使用Jenkins了,逃不过的话就学会使用了!花了半天时间熟悉了一下,用了一个简单的例子跑了一下,实现了具体的操作,代码的话还是非常简单的,就是一个小的Django程序,跳转一个HTML页面。

总体来说,Jenkins还是很强大的,感觉需要好好学学shell脚本,这个还是有很大作用,项目中很多的脚本,但是自己好多都看不懂,菜的流眼泪。

继续加油吧!

Keep thinking, keep coding! 2021年5月29日18:06:45 写于深圳宝安!加油!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

程序员必备技能之SpringBoot的自动装配原理,很详细,

发表于 2021-07-24

SpringBoot应该是每个Java程序猿都会使用的基础框架了,对于SpringBoot的核心内容自动装配原理的掌握就显得非常重要了。è¯·æ·»åŠ å›¾ç‰‡æè¿°

自动装配原理分析

1 理论介绍

  SpringBoot通过自动装配实现了第三方框架系统对象的注入。这种实现机制和我们前面介绍的SPI(服务扩展机制)很相似。

è¯·æ·»åŠ å›¾ç‰‡æè¿°

2 源码分析

2.1 Spring的IoC

  SpringBoot的本质是SpringFramework【IoC,AOP】的再次封装的上层应用框架。

è¯·æ·»åŠ å›¾ç‰‡æè¿°

2.2 run方法

  我们启动一个SpringBoot项目,本质上就是执行了启动类中的主方法,然后调用执行了run方法,那么run方法到底做了什么操作呢?我们可以先来分析下:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@SpringBootApplication
@MapperScan("com.bobo.mapper")
public class SpringBootVipDemoApplication {
public static void main(String[] args) {
// 基于配置文件的方式
ApplicationContext ac1 = new ClassPathXmlApplicationContext("");
// 基于Java配置类的方式
ApplicationContext ac2 = new AnnotationConfigApplicationContext(SpringBootVipDemoApplication.class);
// run 方法的返回对象是 ConfigurableApplicationContext 对象
ConfigurableApplicationContext ac3 = SpringApplication.run(SpringBootVipDemoApplication.class, args);
}
}

ConfigurableApplicationContext这个对象其实是 ApplicationContext接口的一个子接口

è¯·æ·»åŠ å›¾ç‰‡æè¿°

那么上面的代码可以调整为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@SpringBootApplication
@MapperScan("com.bobo.mapper")
public class SpringBootVipDemoApplication {

public static void main(String[] args) {
// 基于配置文件的方式
ApplicationContext ac1 = new ClassPathXmlApplicationContext("");
// 基于Java配置类的方式
ApplicationContext ac2 = new AnnotationConfigApplicationContext(SpringBootVipDemoApplication.class);
// run 方法执行完成后返回的是一个 ApplicationContext 对象
// 到这儿我们是不是可以猜测 run 方法的执行 其实就是Spring的初始化操作[IoC]
ApplicationContext ac3 = SpringApplication.run(SpringBootVipDemoApplication.class, args);
}

}

根据返回结果,我们猜测SpringBoot项目的启动其实就是Spring的初始化操作【IoC】。

è¯·æ·»åŠ å›¾ç‰‡æè¿°

下一步:

è¯·æ·»åŠ å›¾ç‰‡æè¿°

下一步:

è¯·æ·»åŠ å›¾ç‰‡æè¿°

直接调用:

è¯·æ·»åŠ å›¾ç‰‡æè¿°

è¯·æ·»åŠ å›¾ç‰‡æè¿°

到这儿,其实我们就可以发现SpringBoot项目的启动,本质上就是Spring的初始化操作。但是并没有涉及到SpringBoot的核心装配。

2.3 @SpringBootApplication

  @SpringBootApplication点开后我们能够发现@SpringBootApplication这个注解其实是一个组合注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
less复制代码@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootConfiguration
@EnableAutoConfiguration
@ComponentScan(
excludeFilters = {@Filter(
type = FilterType.CUSTOM,
classes = {TypeExcludeFilter.class}
), @Filter(
type = FilterType.CUSTOM,
classes = {AutoConfigurationExcludeFilter.class}
)}
)

我们发现@SpringBootApplication注解的前面四个注解是JDK中自动的元注解 (用来修饰注解的注解)

1
2
3
4
less复制代码@Target({ElementType.TYPE}) // 表明 修饰的注解的位置 TYPE 表示只能修饰类
@Retention(RetentionPolicy.RUNTIME) // 表明注解的作用域
@Documented // API 文档抽取的时候会将该注解 抽取到API文档中
@Inherited // 表示注解的继承

还有就是@ComponentScan注解,该注解的作用是用来指定扫描路径的,如果不指定特定的扫描路径的话,扫描的路径是当前修饰的类所在的包及其子包。

  @SpringBootConfiguration这个注解的本质其实是@Configuration注解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
less复制代码@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 上面是3个元注解
// @Configuration 注解修饰的Java类是一个配置类
@Configuration
// @Indexed
@Indexed
public @interface SpringBootConfiguration {
@AliasFor(
annotation = Configuration.class
)
boolean proxyBeanMethods() default true;
}

 这样一来7个注解,咱们清楚了其中的6个注解的作用,而且这6个注解都和SpringBoot的自动装配是没有关系的。

è¯·æ·»åŠ å›¾ç‰‡æè¿°

2.4 @EnableAutoConfiguration

  @EnableAutoConfiguration这个注解就是SpringBoot自动装配的关键。

1
2
3
4
5
6
7
8
9
10
11
12
13
less复制代码@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@AutoConfigurationPackage
@Import({AutoConfigurationImportSelector.class})
public @interface EnableAutoConfiguration {
String ENABLED_OVERRIDE_PROPERTY = "spring.boot.enableautoconfiguration";

Class<?>[] exclude() default {};

String[] excludeName() default {};
}

我们发现要搞清楚EnableAutoConfiguration注解的关键是要弄清楚@Import注解。这个内容我们前面在注解编程发展中有详细的介绍。AutoConfigurationImportSelector实现了ImportSelector接口,那么我们清楚只需要关注selectImports方法的返回结果即可

1
2
3
4
5
6
7
8
9
10
kotlin复制代码    public String[] selectImports(AnnotationMetadata annotationMetadata) {
if (!this.isEnabled(annotationMetadata)) {
return NO_IMPORTS;
} else {
AutoConfigurationImportSelector.AutoConfigurationEntry autoConfigurationEntry = this.getAutoConfigurationEntry(annotationMetadata);
// 返回的就是需要注册到IoC容器中的对象对应的类型的全类路径名称的字符串数组
// ["com.bobo.pojo.User","com.bobo.pojo.Person", ....]
return StringUtils.toStringArray(autoConfigurationEntry.getConfigurations());
}
}

我们清楚了该方法的作用就是要返回需要注册到IoC容器中的对象对应的类型的全类路径名称的字符串数组。那么我接下来分析的关键是返回的数据是哪来的?所以呢进入getAutoConfigurationEntry方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
kotlin复制代码    protected AutoConfigurationImportSelector.AutoConfigurationEntry getAutoConfigurationEntry(AnnotationMetadata annotationMetadata) {
if (!this.isEnabled(annotationMetadata)) {
return EMPTY_ENTRY;
} else {
// 获取注解的属性信息
AnnotationAttributes attributes = this.getAttributes(annotationMetadata);
// 获取候选配置信息 加载的是 当前项目的classpath目录下的 所有的 spring.factories 文件中的 key 为
// org.springframework.boot.autoconfigure.EnableAutoConfiguration 的信息
List<String> configurations = this.getCandidateConfigurations(annotationMetadata, attributes);
configurations = this.removeDuplicates(configurations);
Set<String> exclusions = this.getExclusions(annotationMetadata, attributes);
this.checkExcludedClasses(configurations, exclusions);
configurations.removeAll(exclusions);
configurations = this.getConfigurationClassFilter().filter(configurations);
this.fireAutoConfigurationImportEvents(configurations, exclusions);
return new AutoConfigurationImportSelector.AutoConfigurationEntry(configurations, exclusions);
}
}

先不进入代码,直接DEBUG调试到 候选配置信息这步。我们发现里面有很多个Java类。

è¯·æ·»åŠ å›¾ç‰‡æè¿°

然后进入getCandidateConfiguration方法中,我们可以发现加载的是 META-INF/spring.factories 文件中的配置信息

è¯·æ·»åŠ å›¾ç‰‡æè¿°

然后我们可以验证,进入到具体的META-INF目录下查看文件。

è¯·æ·»åŠ å›¾ç‰‡æè¿°

最后几个

è¯·æ·»åŠ å›¾ç‰‡æè¿°

在我们的Debug中还有一个配置文件(MyBatisAutoConfiguration)是哪来的呢?

è¯·æ·»åŠ å›¾ç‰‡æè¿°

深入源码也可以看到真正加载的文件

è¯·æ·»åŠ å›¾ç‰‡æè¿°

然后我们继续往下看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
kotlin复制代码    protected AutoConfigurationImportSelector.AutoConfigurationEntry getAutoConfigurationEntry(AnnotationMetadata annotationMetadata) {
if (!this.isEnabled(annotationMetadata)) {
return EMPTY_ENTRY;
} else {
AnnotationAttributes attributes = this.getAttributes(annotationMetadata);
List<String> configurations = this.getCandidateConfigurations(annotationMetadata, attributes);
// 因为会加载多个 spring.factories 文件,那么就有可能存在同名的,
// removeDuplicates方法的作用是 移除同名的
configurations = this.removeDuplicates(configurations);
// 获取我们配置的 exclude 信息
// @SpringBootApplication(exclude = {RabbitAutoConfiguration.class})
// 显示的指定不要加载那个配置类
Set<String> exclusions = this.getExclusions(annotationMetadata, attributes);
this.checkExcludedClasses(configurations, exclusions);
configurations.removeAll(exclusions);
// filter的作用是 过滤掉咱们不需要使用的配置类。
configurations = this.getConfigurationClassFilter().filter(configurations);
this.fireAutoConfigurationImportEvents(configurations, exclusions);
return new AutoConfigurationImportSelector.AutoConfigurationEntry(configurations, exclusions);
}
}

先来看过滤的效果:

è¯·æ·»åŠ å›¾ç‰‡æè¿°

那么我们需要考虑这个过滤到底是怎么实现的呢?进入filter方法

è¯·æ·»åŠ å›¾ç‰‡æè¿°

我们可以看到有具体的匹配方法 match。里面有个关键的属性是 autoConfigurationMetadata,的本质是 加载的 META-INF/spring-autoconfigure-metadata.properties 的文件中的内容。我们以 RedisAutoConfiguration 为例:

è¯·æ·»åŠ å›¾ç‰‡æè¿°

通过上面的配置文件,我们发现RedisAutoConfiguration 被注入到IoC中的条件是系统中要存在 org.springframework.data.redis.core.RedisOperations 这个class文件。首先系统中不存在 RedisOperations 这个class文件。

è¯·æ·»åŠ å›¾ç‰‡æè¿°

过滤后,我们发现 RedisAutoConfiguration 就不存在了。

è¯·æ·»åŠ å›¾ç‰‡æè¿°

但是当我们在系统中显示的创建 RedisOperations Java类后,filter就不会过滤 RedisAutoConfiguration 配置文件了。

è¯·æ·»åŠ å›¾ç‰‡æè¿°

到这其实我们就已经给大家介绍完了SpringBoot的自动装配原理。

è¯·æ·»åŠ å›¾ç‰‡æè¿°

看完三件事❤️

如果你觉得这篇内容对你还蛮有帮助,我想邀请你帮我三个小忙:

  1. 点赞,转发,有你们的 『点赞和评论』,才是我创造的动力。
  2. 关注公众号 『 java烂猪皮 』,不定期分享原创知识。
  3. 同时可以期待后续文章ing🚀
  4. .关注后回复【666】扫码即可获取学习资料包

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一文带你掌握RedisTemplate的常见用法

发表于 2021-07-24
  • ValueOperations:简单K-V操作
  • SetOperations:set类型数据操作
  • ZSetOperations:zset类型数据操作
  • HashOperations:针对map类型的数据操作
  • ListOperations:针对list类型的数据操作

一、通过bound封装指定的key

指定后进行一系列的操作而无须“显式”的再次指定Key,即BoundKeyOperations:

  • BoundValueOperations
  • BoundSetOperations
  • BoundListOperations
  • BoundSetOperations
  • BoundHashOperations
1
2
3
4
5
6
7
8
9
10
11
java复制代码//1、通过redisTemplate设置值
redisTemplate.boundValueOps("StringKey").set("StringValue");
redisTemplate.boundValueOps("StringKey").set("StringValue",1, TimeUnit.MINUTES);
//2、通过BoundValueOperations设置值
BoundValueOperations stringKey = redisTemplate.boundValueOps("StringKey");
stringKey.set("StringVaule");
stringKey.set("StringValue",1, TimeUnit.MINUTES);
//3、通过ValueOperations设置值
ValueOperations ops = redisTemplate.opsForValue();
ops.set("StringKey", "StringVaule");
ops.set("StringValue","StringVaule",1, TimeUnit.MINUTES);
1
2
3
4
5
6
7
8
java复制代码//1、通过redisTemplate获取值
String str1 = (String) redisTemplate.boundValueOps("StringKey").get();
//2、通过BoundValueOperations获取值
BoundValueOperations stringKey = redisTemplate.boundValueOps("StringKey");
String str2 = (String) stringKey.get();
//3、通过ValueOperations获取值
ValueOperations ops = redisTemplate.opsForValue();
String str3 = (String) ops.get("StringKey");

二、针对数据的“序列化/反序列化”,提供了多种可选择策略(RedisSerializer)

​ JdkSerializationRedisSerializer: POJO对象的存取场景,使用JDK本身序列化机制,将pojo类通过ObjectInputStream/ObjectOutputStream进行序列化操作,最终redis-server中将存储字节序列。是目前最常用的序列化策略。

​ StringRedisSerializer: Key或者value为字符串的场景,根据指定的charset对数据的字节序列编码成string,是“new String(bytes, charset)”和“string.getBytes(charset)”的直接封装。是最轻量级和高效的策略。

​ JacksonJsonRedisSerializer: jackson-json工具提供了javabean与json之间的转换能力,可以将pojo实例序列化成json格式存储在redis中,也可以将json格式的数据转换成pojo实例。因为jackson工具在序列化和反序列化时,需要明确指定Class类型,因此此策略封装起来稍微复杂。【需要jackson-mapper-asl工具支持】

三、基本API

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码//删除key
redisTemplate.delete(keys);
//指定key的失效时间
redisTemplate.expire(key,time,TimeUnit.MINUTES);
//根据key获取过期时间
Long expire = redisTemplate.getExpire(key);
//判断key是否存在
redisTemplate.hasKey(key);
//顺序递增
redisTemplate.boundValueOps("StringKey").increment(3L);
//顺序递减
redisTemplate.boundValueOps("StringKey").increment(-3L);

四、Hash类型相关操作

4.1 设置值:

1
2
3
4
5
6
7
8
java复制代码//1、通过redisTemplate设置值
redisTemplate.boundHashOps("HashKey").put("SmallKey", "HashVaue");
//2、通过BoundValueOperations设置值
BoundHashOperations hashKey = redisTemplate.boundHashOps("HashKey");
hashKey.put("SmallKey", "HashVaue");
//3、通过ValueOperations设置值
HashOperations hashOps = redisTemplate.opsForHash();
hashOps.put("HashKey", "SmallKey", "HashVaue");

4.2 设置过期时间(单独设置)

1
2
java复制代码redisTemplate.boundValueOps("HashKey").expire(1,TimeUnit.MINUTES);
redisTemplate.expire("HashKey",1,TimeUnit.MINUTES);

4.3 添加一个Map集合

1
2
java复制代码HashMap<String, String> hashMap = new HashMap<>();
redisTemplate.boundHashOps("HashKey").putAll(hashMap);

4.4 提取Hash中的小key

1
2
3
4
5
6
7
8
java复制代码//1、通过redisTemplate获取值
Set keys1 = redisTemplate.boundHashOps("HashKey").keys();
//2、通过BoundValueOperations获取值
BoundHashOperations hashKey = redisTemplate.boundHashOps("HashKey");
Set keys2 = hashKey.keys();
//3、通过ValueOperations获取值
HashOperations hashOps = redisTemplate.opsForHash();
Set keys3 = hashOps.keys("HashKey");

4.5 提取所有的value值

1
2
3
4
5
6
7
8
java复制代码//1、通过redisTemplate获取值
List values1 = redisTemplate.boundHashOps("HashKey").values();
//2、通过BoundValueOperations获取值
BoundHashOperations hashKey = redisTemplate.boundHashOps("HashKey");
List values2 = hashKey.values();
//3、通过ValueOperations获取值
HashOperations hashOps = redisTemplate.opsForHash();
List values3 = hashOps.values("HashKey");

4.6 根据Hash的key提取value值

1
2
3
4
5
6
7
8
java复制代码//1、通过redisTemplate获取
String value1 = (String) redisTemplate.boundHashOps("HashKey").get("SmallKey");
//2、通过BoundValueOperations获取值
BoundHashOperations hashKey = redisTemplate.boundHashOps("HashKey");
String value2 = (String) hashKey.get("SmallKey");
//3、通过ValueOperations获取值
HashOperations hashOps = redisTemplate.opsForHash();
String value3 = (String) hashOps.get("HashKey", "SmallKey");

4.7 获取所有的键值对集合

1
2
3
4
5
6
7
8
java复制代码//1、通过redisTemplate获取
Map entries = redisTemplate.boundHashOps("HashKey").entries();
//2、通过BoundValueOperations获取值
BoundHashOperations hashKey = redisTemplate.boundHashOps("HashKey");
Map entries1 = hashKey.entries();
//3、通过ValueOperations获取值
HashOperations hashOps = redisTemplate.opsForHash();
Map entries2 = hashOps.entries("HashKey");

4.8 删除

1
2
3
4
java复制代码//删除小key
redisTemplate.boundHashOps("HashKey").delete("SmallKey");
//删除大key
redisTemplate.delete("HashKey");

4.9 判断Hash中是否含有该值

1
java复制代码Boolean isEmpty = redisTemplate.boundHashOps("HashKey").hasKey("SmallKey");

五、Set类型相关操作

5.1 添加

1
2
3
4
5
6
7
8
9
10
java复制代码//1、通过redisTemplate设置值
redisTemplate.boundSetOps("setKey").add("setValue1", "setValue2", "setValue3");

//2、通过BoundValueOperations设置值
BoundSetOperations setKey = redisTemplate.boundSetOps("setKey");
setKey.add("setValue1", "setValue2", "setValue3");

//3、通过ValueOperations设置值
SetOperations setOps = redisTemplate.opsForSet();
setOps.add("setKey", "SetValue1", "setValue2", "setValue3");

5.2 设置过期时间(单独设置)

1
2
java复制代码redisTemplate.boundValueOps("setKey").expire(1,TimeUnit.MINUTES);
redisTemplate.expire("setKey",1,TimeUnit.MINUTES);

5.3 根据key获取Set中的所有值

1
2
3
4
5
6
7
8
9
10
java复制代码//1、通过redisTemplate获取值
Set set1 = redisTemplate.boundSetOps("setKey").members();

//2、通过BoundValueOperations获取值
BoundSetOperations setKey = redisTemplate.boundSetOps("setKey");
Set set2 = setKey.members();

//3、通过ValueOperations获取值
SetOperations setOps = redisTemplate.opsForSet();
Set set3 = setOps.members("setKey");

5.4 查询value是否已存在

1
java复制代码Boolean isEmpty = redisTemplate.boundSetOps("setKey").isMember("setValue2");

5.5 获取Set的长度

1
java复制代码Long size = redisTemplate.boundSetOps("setKey").size();

5.6 移除指定的元素

1
java复制代码Long result1 = redisTemplate.boundSetOps("setKey").remove("setValue1");

5.7 移除指定的key

1
java复制代码Boolean result2 = redisTemplate.delete("setKey");

六、LIST类型相关操作

6.1 添加

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码//1、通过redisTemplate设置值
redisTemplate.boundListOps("listKey").leftPush("listLeftValue1");
redisTemplate.boundListOps("listKey").rightPush("listRightValue2");

//2、通过BoundValueOperations设置值
BoundListOperations listKey = redisTemplate.boundListOps("listKey");
listKey.leftPush("listLeftValue3");
listKey.rightPush("listRightValue4");

//3、通过ValueOperations设置值
ListOperations opsList = redisTemplate.opsForList();
opsList.leftPush("listKey", "listLeftValue5");
opsList.rightPush("listKey", "listRightValue6");

6.2 将Java的List集合放入List类型

1
2
3
java复制代码ArrayList<String> list = new ArrayList<>();
redisTemplate.boundListOps("listKey").rightPushAll(list);
redisTemplate.boundListOps("listKey").leftPushAll(list);

6.3 设置过期时间(单独设置)

1
2
java复制代码redisTemplate.boundValueOps("listKey").expire(1,TimeUnit.MINUTES);
redisTemplate.expire("listKey",1,TimeUnit.MINUTES);

6.4 获取List缓存全部内容(起始索引,结束索引)

1
java复制代码List listKey1 = redisTemplate.boundListOps("listKey").range(0, 10);

6.5 从左或从右弹出一个元素

1
2
java复制代码String listKey2 = (String) redisTemplate.boundListOps("listKey").leftPop();  //从左侧弹出一个元素
String listKey3 = (String) redisTemplate.boundListOps("listKey").rightPop(); //从右侧弹出一个元素

6.6 根据索引查询元素

1
java复制代码String listKey4 = (String) redisTemplate.boundListOps("listKey").index(1);

6.7 获取List缓存的长度

1
java复制代码Long size = redisTemplate.boundListOps("listKey").size();

6.8 根据索引修改List中的某条数据(key,索引,值)

1
java复制代码redisTemplate.boundListOps("listKey").set(3L,"listLeftValue3");

6.9 移除N个值为value(key,移除个数,值)

1
java复制代码redisTemplate.boundListOps("listKey").remove(3L,"value");

七、Zset类型的相关操作

7.1 向集合中插入元素,并设置分数

1
2
3
4
5
6
7
8
9
10
11
java复制代码//1、通过redisTemplate设置值
redisTemplate.boundZSetOps("zSetKey").add("zSetVaule", 100D);

//2、通过BoundValueOperations设置值
BoundZSetOperations zSetKey = redisTemplate.boundZSetOps("zSetKey");
zSetKey.add("zSetVaule", 100D);

//3、通过ValueOperations设置值
ZSetOperations zSetOps = redisTemplate.opsForZSet();
zSetOps.add("zSetKey", "zSetVaule", 100D);
12345678910

7.2 向集合中插入多个元素,并设置分数

1
2
3
java复制代码DefaultTypedTuple<String> p1 = new DefaultTypedTuple<>("zSetVaule1", 2.1D);
DefaultTypedTuple<String> p2 = new DefaultTypedTuple<>("zSetVaule2", 3.3D);
redisTemplate.boundZSetOps("zSetKey").add(new HashSet<>(Arrays.asList(p1,p2)));

7.3 按照排名先后(从小到大)打印指定区间内的元素, -1为打印全部

1
java复制代码Set<String> range = redisTemplate.boundZSetOps("zSetKey").range(key, 0, -1);

7.4 获得指定元素的分数

1
java复制代码Double score = redisTemplate.boundZSetOps("zSetKey").score("zSetVaule");

7.5 返回集合内的成员个数

1
java复制代码Long size = redisTemplate.boundZSetOps("zSetKey").size();

7.6 返回集合内指定分数范围的成员个数(Double类型)

1
java复制代码Long COUNT = redisTemplate.boundZSetOps("zSetKey").count(0D, 2.2D);

7.7 返回集合内元素在指定分数范围内的排名(从小到大)

1
java复制代码Set byScore = redisTemplate.boundZSetOps("zSetKey").rangeByScore(0D, 2.2D);

7.8 带偏移量和个数,(key,起始分数,最大分数,偏移量,个数)

1
java复制代码Set<String> ranking2 = redisTemplate.opsForZSet().rangeByScore("zSetKey", 0D, 2.2D 1, 3);

7.9 返回集合内元素的排名,以及分数(从小到大)

1
2
3
4
java复制代码Set<TypedTuple<String>> tuples = redisTemplate.boundZSetOps("zSetKey").rangeWithScores(0L, 3L);
for (TypedTuple<String> tuple : tuples) {
System.out.println(tuple.getValue() + " : " + tuple.getScore());
}ss

7.10 返回指定成员的排名

1
2
3
4
5
java复制代码//从小到大
Long startRank = redisTemplate.boundZSetOps("zSetKey").rank("zSetVaule");
//从大到小
Long endRank = redisTemplate.boundZSetOps("zSetKey").reverseRank("zSetVaule");
1234

7.11 从集合中删除指定元素

1
java复制代码redisTemplate.boundZSetOps("zSetKey").remove("zSetVaule");

7.12 删除指定索引范围的元素(Long类型)

1
java复制代码redisTemplate.boundZSetOps("zSetKey").removeRange(0L,3L);

7.13 删除指定分数范围内的元素(Double类型)

1
java复制代码redisTemplate.boundZSetOps("zSetKey").removeRangeByScorssse(0D,2.2D);

7.14 为指定元素加分(Double类型)

1
java复制代码Double score = redisTemplate.boundZSetOps("zSetKey").incrementScore("zSetVaule",1.1D);

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…594595596…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%