开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

《浅入浅出》-RocketMQ

发表于 2019-12-02

你知道的越多,你不知道的越多

点赞再看,养成习惯

本文GitHub github.com/JavaFamily 已收录,有一线大厂面试点脑图、个人联系方式和技术交流群,欢迎Star和指教

捞一下

消息队列系列前面两章分别讲了消息队列的基础知识,还有比较常见的问题和常见分布式事务解决方案,那么在实际开发过程中,我们使用频率比较高的消息队列中间件有哪些呢?

帅丙我工作以来接触的消息队列中间件有RocketMQ、Kafka、自研,是的因为我主要接触的都是电商公司,相对而言业务体量还有场景来说都是他们比较适合,再加上杭州阿里系公司偏多,身边同事或者公司老大基本都是阿里出来创业的,那在使用技术栈的时候阿里系的开源框架也就成了首选。

就算是自研的中间件多多少少也是借鉴RocketMQ、Kafka的优点自研的,那我后面两章就分别简单的介绍下两者,他们分别在业务场景和大数据领域各自发光发热。

那到底是道德的沦丧,还是人性的泯灭,让我们跟着敖丙走进RocketMQ的内心世界。

正文

RocketMQ简介

RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。

我们再看下阿里给他取的名字哈:Rocket 火箭 阿里这是希望他上天呀,不过我觉得这个名字确实挺酷的。

我们先看看他最新的官网

回顾一下他的心路历程

2007年:淘宝实施了“五彩石”项目,“五彩石”用于将交易系统从单机变成分布式,也是在这个过程中产生了阿里巴巴第一代消息引擎——Notify。

2010年:阿里巴巴B2B部门基于ActiveMQ的5.1版本也开发了自己的一款消息引擎,称为Napoli,这款消息引擎在B2B里面广泛地被使用,不仅仅是在交易领域,在很多的后台异步解耦等方面也得到了广泛的应用。

2011年:业界出现了现在被很多大数据领域所推崇的Kafka消息引擎,阿里巴巴在研究了Kafka的整体机制和架构设计之后,基于Kafka的设计使用Java进行了完全重写并推出了MetaQ 1.0版本,主要是用于解决顺序消息和海量堆积的问题。

2012年:阿里巴巴开源其自研的第三代分布式消息中间件——RocketMQ。

经过几年的技术打磨,阿里称基于RocketMQ技术,目前双十一当天消息容量可达到万亿级。

2016年11月:阿里将RocketMQ捐献给Apache软件基金会,正式成为孵化项目。

阿里称会将其打造成顶级项目。这是阿里迈出的一大步,因为加入到开源软件基金会需要经过评审方的考核与观察。

坦率而言,业界还对国人的代码开源参与度仍保持着刻板印象;而Apache基金会中的342个项目中,暂时还只有Kylin、CarbonData、Eagle 、Dubbo和 RocketMQ 共计五个中国技术人主导的项目。

2017年2月20日:RocketMQ正式发布4.0版本,专家称新版本适用于电商领域,金融领域,大数据领域,兼有物联网领域的编程模型。

以上就是RocketMQ的整体发展历史,其实在阿里巴巴内部围绕着RocketMQ内核打造了三款产品,分别是MetaQ、Notify和Aliware MQ。

这三者分别采用了不同的模型,MetaQ主要使用了拉模型,解决了顺序消息和海量堆积问题;Notify主要使用了推模型,解决了事务消息;而云产品Aliware MQ则是提供了商业化的版本。

经历多次双11洗礼的英雄

在备战2016年双十一时,RocketMq团队重点做了两件事情,优化慢请求与统一存储引擎。

  • 优化慢请求:这里主要是解决在海量高并发场景下降低慢请求对整个集群带来的抖动,毛刺问题。这是一个极具挑战的技术活,团队同学经过长达1个多月的跟进调优,从双十一的复盘情况来看,99.996%的延迟落在了10ms以内,而99.6%的延迟在1ms以内。优化主要集中在RocketMQ存储层算法优化、JVM与操作系统调优。更多的细节大家可以参考《万亿级数据洪峰下的分布式消息引擎》。
  • 统一存储引擎:主要解决的消息引擎的高可用,成本问题。在多代消息引擎共存的前提下,我们对Notify的存储模块进行了全面移植与替换。

RocketMQ天生为金融互联网领域而生,追求高可靠、高可用、高并发、低延迟,是一个阿里巴巴由内而外成功孕育的典范,除了阿里集团上千个应用外,根据我们不完全统计,国内至少有上百家单位、科研教育机构在使用。

RocketMQ在阿里集团也被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。

他所拥有的功能

我们直接去GitHub上看Apache对他的描述可能会好点

是的功能完整到爆炸基本上开发完全够用,什么?看不懂专业词汇的英文?

帅丙是暖男来的嘛,中文功能如下 ↓

  • 发布/订阅消息传递模型
  • 财务级交易消息
  • 各种跨语言客户端,例如Java,C / C ++,Python,Go
  • 可插拔的传输协议,例如TCP,SSL,AIO
  • 内置的消息跟踪功能,还支持开放式跟踪
  • 多功能的大数据和流生态系统集成
  • 按时间或偏移量追溯消息
  • 可靠的FIFO和严格的有序消息传递在同一队列中
  • 高效的推拉消费模型
  • 单个队列中的百万级消息累积容量
  • 多种消息传递协议,例如JMS和OpenMessaging
  • 灵活的分布式横向扩展部署架构
  • 快如闪电的批量消息交换系统
  • 各种消息过滤器机制,例如SQL和Tag
  • 用于隔离测试和云隔离群集的Docker映像
  • 功能丰富的管理仪表板,用于配置,指标和监视
  • 认证与授权

他的项目结构组成是怎么样子的?

GitHub地址:https://github.com/apache/rocketmq

他的核心模块:

  • rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息
  • rocketmq-client:提供发送、接受消息的客户端API。
  • rocketmq-namesrv:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息。
  • rocketmq-common:通用的一些类,方法,数据结构等。
  • rocketmq-remoting:基于Netty4的client/server + fastjson序列化 + 自定义二进制协议。
  • rocketmq-store:消息、索引存储等。
  • rocketmq-filtersrv:消息过滤器Server,需要注意的是,要实现这种过滤,需要上传代码到MQ!(一般而言,我们利用Tag足以满足大部分的过滤需求,如果更灵活更复杂的过滤需求,可以考虑filtersrv组件)。
  • rocketmq-tools:命令行工具。

他的架构组成,或者理解为为什么他这么快?这么强?这么厉害?

他主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。

Tip:我们可以看到RocketMQ啥都是集群部署的,这是他吞吐量大,高可用的原因之一,集群的模式也很花哨,可以支持多master 模式、多master多slave异步复制模式、多 master多slave同步双写模式。

而且这个模式好像Kafka啊!(我这里是废话,本身就是阿里基于Kafka的很多特性研发的)。

分别介绍下各个集群组成部分吧

NameServer:

主要负责对于源数据的管理,包括了对于Topic和路由信息的管理。

NameServer是一个功能齐全的服务器,其角色类似Dubbo中的Zookeeper,但NameServer与Zookeeper相比更轻量。主要是因为每个NameServer节点互相之间是独立的,没有任何信息交互。

NameServer压力不会太大,平时主要开销是在维持心跳和提供Topic-Broker的关系数据。

但有一点需要注意,Broker向NameServer发心跳时, 会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),会导致一次心跳中,就Topic的数据就几十M,网络情况差的话, 网络传输失败,心跳失败,导致NameServer误认为Broker心跳失败。

NameServer 被设计成几乎无状态的,可以横向扩展,节点之间相互之间无通信,通过部署多台机器来标记自己是一个伪集群。

每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。

所以从功能上看NameServer应该是和 ZooKeeper 差不多,据说 RocketMQ 的早期版本确实是使用的 ZooKeeper ,后来改为了自己实现的 NameServer 。

我们看一下Dubbo中注册中心的角色,是不是真的一毛一样,师出同门相似点真的很多:

Producer

消息生产者,负责产生消息,一般由业务系统负责产生消息。

  • Producer由用户进行分布式部署,消息由Producer通过多种负载均衡模式发送到Broker集群,发送低延时,支持快速失败。
  • RocketMQ 提供了三种方式发送消息:同步、异步和单向
  • 同步发送:同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。
  • 异步发送:异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。
  • 单向发送:单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。

Broker

消息中转角色,负责存储消息,转发消息。

  • Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer,顺带一提底层的通信和连接都是基于Netty实现的。
  • Broker负责消息存储,以Topic为纬度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型。
  • 官网上有数据显示:具有上亿级消息堆积能力,同时可严格保证消息的有序性。

Consumer

消息消费者,负责消费消息,一般是后台系统负责异步消费。

  • Consumer也由用户部署,支持PUSH和PULL两种消费模式,支持集群消费和广播消息,提供实时的消息订阅机制。
  • Pull:拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型。
  • Push:推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息。

消息领域模型

Message

Message(消息)就是要传输的信息。

一条消息必须有一个主题(Topic),主题可以看做是你的信件要邮寄的地址。

一条消息也可以拥有一个可选的标签(Tag)和额处的键值对,它们可以用于设置一个业务 Key 并在 Broker 上查找此消息以便在开发期间查找问题。

Topic

Topic(主题)可以看做消息的规类,它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic 。

Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。

一个 Topic 也可以被 0个、1个、多个消费者订阅。

Tag

Tag(标签)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag 。

标签有助于保持您的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。

Group

分组,一个组可以订阅多个Topic。

分为ProducerGroup,ConsumerGroup,代表某一类的生产者和消费者,一般来说同一个服务可以作为Group,同一个Group一般来说发送和消费的消息都是一样的

Queue

在Kafka中叫Partition,每个Queue内部是有序的,在RocketMQ中分为读和写两种队列,一般来说读写队列数量一致,如果不一致就会出现很多问题。

Message Queue

Message Queue(消息队列),主题被划分为一个或多个子主题,即消息队列。

一个 Topic 下可以设置多个消息队列,发送消息时执行该消息的 Topic ,RocketMQ 会轮询该 Topic 下的所有队列将消息发出去。

消息的物理管理单位。一个Topic下可以有多个Queue,Queue的引入使得消息的存储可以分布式集群化,具有了水平扩展能力。

Offset

在RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset 来访问,Offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限。

也可以认为 Message Queue 是一个长度无限的数组,Offset 就是下标。

消息消费模式

消息消费模式有两种:Clustering(集群消费)和Broadcasting(广播消费)。

默认情况下就是集群消费,该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。

而广播消费消息会发给消费者组中的每一个消费者进行消费。

Message Order

Message Order(消息顺序)有两种:Orderly(顺序消费)和Concurrently(并行消费)。

顺序消费表示消息消费的顺序同生产者为每个消息队列发送的顺序一致,所以如果正在处理全局顺序是强制性的场景,需要确保使用的主题只有一个消息队列。

并行消费不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制。

一次完整的通信流程是怎样的?

Producer 与 NameServer集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。

Producer 只能将消息发送到 Broker master,但是 Consumer 则不一样,它同时和提供 Topic 服务的 Master 和 Slave建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息。

具体如下图:

我上面说过他跟Dubbo像不是我瞎说的,就连他的注册过程都很像Dubbo的服务暴露过程。

是不是觉得很简单,但是你同时也产生了好奇心,每一步是怎么初始化启动的呢?

帅丙呀就知道大家都是求知欲极强的人才,这不我都准备好了,我们一步步分析一下。

主要是人才群里的仔要求我写出来。。。(文末有进群方式)

NameService启动流程

在org.apache.rocketmq.namesrv目录下的NamesrvStartup这个启动类基本上描述了他的启动过程我们可以看一下代码:

  • 第一步是初始化配置
  • 创建NamesrvController实例,并开启两个定时任务:
  • 每隔10s扫描一次Broker,移除处于不激活的Broker;
  • 每隔10s打印一次KV配置。

  • 第三步注册钩子函数,启动服务器并监听Broker。

NameService还有很多东西的哈我这里就介绍他的启动流程,大家还可以去看看代码,还是很有意思的,比如路由注册会发送心跳包,还有心跳包的处理流程,路由删除,路由发现等等。

Tip:本来我想贴很多源码的,后面跟歪歪(Java3y)讨论了很久做出了不贴的决定,大家理解过程为主!我主要是做只是扫盲还有一些痛点分析嘛,深究还是得大家花时间,我要啥都介绍篇幅就不够了。

Producer

链路很长涉及的细节也多,我就发一下链路图。

Producer是消息发送方,那他怎么发送的呢?

通过轮训,Producer轮训某个Topic下面的所有队列实现发送方的负载均衡

Broker

Broker在RocketMQ中是进行处理Producer发送消息请求,Consumer消费消息的请求,并且进行消息的持久化,以及HA策略和服务端过滤,就是集群中很重的工作都是交给了Broker进行处理。

Broker模块是通过BrokerStartup进行启动的,会实例化BrokerController,并且调用其初始化方法

大家去看Broker的源码的话会发现,他的初始化流程很冗长,会根据配置创建很多线程池主要用来发送消息、拉取消息、查询消息、客户端管理和消费者管理,也有很多定时任务,同时也注册了很多请求处理器,用来发送拉取消息查询消息的。

Consumer

不说了直接怼图吧!要死了,下次我还是做扫盲,写点爽文吧555

Consumer是消息接受,那他怎么接收消息的呢?

消费端会通过RebalanceService线程,10秒钟做一次基于Topic下的所有队列负载。

面试常见问题分析

他的优缺点是啥

RocketMQ优点:

  • 单机吞吐量:十万级
  • 可用性:非常高,分布式架构
  • 消息可靠性:经过参数优化配置,消息可以做到0丢失
  • 功能支持:MQ功能较为完善,还是分布式的,扩展性好
  • 支持10亿级别的消息堆积,不会因为堆积导致性能下降
  • 源码是java,我们可以自己阅读源码,定制自己公司的MQ,可以掌控
  • 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况
  • RoketMQ在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择RocketMQ

RocketMQ缺点:

  • 支持的客户端语言不多,目前是java及c++,其中c++不成熟
  • 社区活跃度不是特别活跃那种
  • 没有在 mq 核心中去实现JMS等接口,有些系统要迁移需要修改大量代码

消息去重

去重原则:使用业务端逻辑保持幂等性

幂等性:就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用,数据库的结果都是唯一的,不可变的。

只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样,需要业务端来实现。

去重策略:保证每条消息都有唯一编号(比如唯一流水号),且保证消息处理成功与去重表的日志同时出现。

建立一个消息表,拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键(primary key)或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突,那么就不再处理这条消息。

消息重复

消息领域有一个对消息投递的QoS定义,分为:

  • 最多一次(At most once)
  • 至少一次(At least once)
  • 仅一次( Exactly once)

QoS:Quality of Service,服务质量

几乎所有的MQ产品都声称自己做到了At least once。

既然是至少一次,那避免不了消息重复,尤其是在分布式网络环境下。

比如:网络原因闪断,ACK返回失败等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。

不同的消息队列发送的确认信息形式不同,例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,Kafka实际上有个offset的概念。

RocketMQ没有内置消息去重的解决方案,最新版本是否支持还需确认。

消息的可用性

当我们选择好了集群模式之后,那么我们需要关心的就是怎么去存储和复制这个数据,RocketMQ对消息的刷盘提供了同步和异步的策略来满足我们的,当我们选择同步刷盘之后,如果刷盘超时会给返回FLUSH_DISK_TIMEOUT,如果是异步刷盘不会返回刷盘相关信息,选择同步刷盘可以尽最大程度满足我们的消息不会丢失。

除了存储有选择之后,我们的主从同步提供了同步和异步两种模式来进行复制,当然选择同步可以提升可用性,但是消息的发送RT时间会下降10%左右。

RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。

而Kafka采用的是独立型的存储结构,每个队列一个文件。

这里帅丙认为,RocketMQ采用混合型存储结构的缺点在于,会存在较多的随机读操作,因此读的效率偏低。同时消费消息需要依赖ConsumeQueue,构建该逻辑消费队列需要一定开销。

RocketMQ 刷盘实现

Broker 在消息的存取时直接操作的是内存(内存映射文件),这可以提供系统的吞吐量,但是无法避免机器掉电时数据丢失,所以需要持久化到磁盘中。

刷盘的最终实现都是使用NIO中的 MappedByteBuffer.force() 将映射区的数据写入到磁盘,如果是同步刷盘的话,在Broker把消息写到CommitLog映射区后,就会等待写入完成。

异步而言,只是唤醒对应的线程,不保证执行的时机,流程如图所示。

顺序消息:

我简单的说一下我们使用的RocketMQ里面的一个简单实现吧。

Tip:为啥用RocketMQ举例呢,这玩意是阿里开源的,我问了下身边的朋友很多公司都有使用,所以读者大概率是这个的话我就用这个举例吧,具体的细节我后面会在RocketMQ和Kafka各自章节说到。

生产者消费者一般需要保证顺序消息的话,可能就是一个业务场景下的,比如订单的创建、支付、发货、收货。

那这些东西是不是一个订单号呢?一个订单的肯定是一个订单号的说,那简单了呀。

一个topic下有多个队列,为了保证发送有序,RocketMQ提供了MessageQueueSelector队列选择机制,他有三种实现:

我们可使用Hash取模法,让同一个订单发送到同一个队列中,再使用同步发送,只有同个订单的创建消息发送成功,再发送支付消息。这样,我们保证了发送有序。

RocketMQ的topic内的队列机制,可以保证存储满足FIFO(First Input First Output 简单说就是指先进先出),剩下的只需要消费者顺序消费即可。

RocketMQ仅保证顺序发送,顺序消费由消费者业务保证!!!

这里很好理解,一个订单你发送的时候放到一个队列里面去,你同一个的订单号Hash一下是不是还是一样的结果,那肯定是一个消费者消费,那顺序是不是就保证了?

真正的顺序消费不同的中间件都有自己的不同实现我这里就举个例子,大家思路理解下。

分布式事务:

Half Message(半消息)

是指暂不能被Consumer消费的消息。Producer 已经把消息成功发送到了 Broker 端,但此消息被标记为暂不能投递状态,处于该种状态下的消息称为半消息。需要 Producer

对消息的二次确认后,Consumer才能去消费它。

消息回查

由于网络闪段,生产者应用重启等原因。导致 Producer 端一直没有对 Half Message(半消息) 进行 二次确认。这是Brock服务器会定时扫描长期处于半消息的消息,会

主动询问 Producer端 该消息的最终状态(Commit或者Rollback),该消息即为 消息回查。

  1. A服务先发送个Half Message给Brock端,消息中携带 B服务 即将要+100元的信息。
  2. 当A服务知道Half Message发送成功后,那么开始第3步执行本地事务。
  3. 执行本地事务(会有三种情况1、执行成功。2、执行失败。3、网络等原因导致没有响应)
  4. 如果本地事务成功,那么Product像Brock服务器发送Commit,这样B服务就可以消费该message。
  5. 如果本地事务失败,那么Product像Brock服务器发送Rollback,那么就会直接删除上面这条半消息。
  6. 如果因为网络等原因迟迟没有返回失败还是成功,那么会执行RocketMQ的回调接口,来进行事务的回查。

消息过滤

  • Broker端消息过滤  
    在Broker中,按照Consumer的要求做过滤,优点是减少了对于Consumer无用消息的网络传输。缺点是增加了Broker的负担,实现相对复杂。
  • Consumer端消息过滤
    这种过滤方式可由应用完全自定义实现,但是缺点是很多无用的消息要传输到Consumer端。

Broker的Buffer问题

Broker的Buffer通常指的是Broker中一个队列的内存Buffer大小,这类Buffer通常大小有限。

另外,RocketMQ没有内存Buffer概念,RocketMQ的队列都是持久化磁盘,数据定期清除。

RocketMQ同其他MQ有非常显著的区别,RocketMQ的内存Buffer抽象成一个无限长度的队列,不管有多少数据进来都能装得下,这个无限是有前提的,Broker会定期删除过期的数据。

例如Broker只保存3天的消息,那么这个Buffer虽然长度无限,但是3天前的数据会被从队尾删除。

回溯消费

回溯消费是指Consumer已经消费成功的消息,由于业务上的需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度。

例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。

RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。

消息堆积

消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要求消息中间件具有一定的消息堆积能力,消息堆积分以下两种情况:

  • 消息堆积在内存Buffer,一旦超过内存Buffer,可以根据一定的丢弃策略来丢弃消息,如CORBA Notification规范中描述。适合能容忍丢弃消息的业务,这种情况消息的堆积能力主要在于内存Buffer大小,而且消息堆积后,性能下降不会太大,因为内存中数据多少对于对外提供的访问能力影响有限。
  • 消息堆积到持久化存储系统中,例如DB,KV存储,文件记录形式。 当消息不能在内存Cache命中时,要不可避免的访问磁盘,会产生大量读IO,读IO的吞吐量直接决定了消息堆积后的访问能力。
  • 评估消息堆积能力主要有以下四点:
  • 消息能堆积多少条,多少字节?即消息的堆积容量。
  • 消息堆积后,发消息的吞吐量大小,是否会受堆积影响?
  • 消息堆积后,正常消费的Consumer是否会受影响?
  • 消息堆积后,访问堆积在磁盘的消息时,吞吐量有多大?

定时消息

定时消息是指消息发到Broker后,不能立刻被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。

如果要支持任意的时间精度,在Broker层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。

RocketMQ支持定时消息,但是不支持任意时间精度,支持特定的level,例如定时5s,10s,1m等。

总结

写这种单纯介绍中间件的枯燥乏味,大家看起来估计也累,目前已经破一万个字了,以后我这种类型的少写,大家老是让我写点深度的,我说真的很多东西我源码一贴,看都没人看。

Kafka我就不发博客了,大家可以去GItHub上第一时间阅读,后面会出怎么搭建项目在服务器的教程,还有一些大牛个人经历和个人书单的东西,今年应该先这么写,主要是真心太忙了,望理解。

絮叨

我也不过多描述了,反正嘛网络上重拳出击嘛,现实中唯唯诺诺,让他说理由也说不出来,不回我。

他说的是下面这个场景多线程的情况,就是第一个线程还没走完,第二个现在进来,也判断没处理过那不就两个都继续加了么?

订单号+业务场景,组成一个唯一主键,你插入数据库只能成功第一个,后续的都会报错的,报违反唯一主键的错误。

还有就是有人疑惑为啥不直接就不判断就等他插入的时候报错,丢掉后续的就好了?

你要知道报错有很多种,你哪里知道不是数据库挂了的错?或者别的运行时异常?

不过你如果可以做到抛特定的异常也可以,反正我们要减少数据库的报错,如果并发大,像我现在负责的系统都是10W+QPS,那日志会打满疯狂报警的。(就是正常情况我们都经常报警)

解决问题的思路有很多,喷我可以,讲清楚问题,讲清楚你的理由。

很多大家都只是单方面的知识摄入,就这样还要喷我,还有一上来就问我为啥今天没发文章,我欠你的?我工作日上班,周六周日都怼上去了,时间有限啊,哥哥。

大家都有自己的事情,写文章也耗时耗脑,难免出错,还望理解。

点关注,不迷路

好了各位,以上就是这篇文章的全部内容了,能看到这里的人呀,都是人才。

我后面会每周都更新几篇一线互联网大厂面试和常用技术栈相关的文章,非常感谢人才们能看到这里,如果这个文章写得还不错,觉得「敖丙」我有点东西的话 求点赞👍 求关注❤️ 求分享👥 对暖男我来说真的 非常有用!!!

白嫖不好,创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见!

敖丙 | 文 【原创】

如果本篇博客有任何错误,请批评指教,不胜感激 !


文章每周持续更新,可以微信搜索「 三太子敖丙 」第一时间阅读和催更(比博客早一到两篇哟),本文 GitHub github.com/JavaFamily 已经收录,有一线大厂面试点思维导图,也整理了很多我的文档,欢迎Star和完善,大家面试可以参照考点复习,希望我们一起有点东西。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

图解redis五种数据结构底层实现(动图哦) 动态字符串SD

发表于 2019-12-02

redis有五种基本数据结构:字符串、hash、set、zset、list。但是你知道构成这五种结构的底层数据结构是怎样的吗?
今天我们来花费五分钟的时间了解一下。
(目前redis版本为3.0.6)

动态字符串SDS

SDS是”simple dynamic string”的缩写。
redis中所有场景中出现的字符串,基本都是由SDS来实现的

  • 所有非数字的key。例如set msg "hello world" 中的key msg.
  • 字符串数据类型的值。例如`` set msg “hello world”中的msg的值”hello wolrd”
  • 非字符串数据类型中的“字符串值”。例如RPUSH fruits "apple" "banana" "cherry"中的”apple” “banana” “cherry”

SDS长这样:

free:还剩多少空间
len:字符串长度
buf:存放的字符数组

空间预分配

为减少修改字符串带来的内存重分配次数,sds采用了“一次管够”的策略:

  • 若修改之后sds长度小于1MB,则多分配现有len长度的空间
  • 若修改之后sds长度大于等于1MB,则扩充除了满足修改之后的长度外,额外多1MB空间

惰性空间释放

为避免缩短字符串时候的内存重分配操作,sds在数据减少时,并不立刻释放空间。

int

就是redis中存放的各种数字
包括一下这种,故意加引号“”的

双向链表

长这样:

分两部分,一部分是“统筹部分”:橘黄色,一部分是“具体实施方“:蓝色。

主体”统筹部分“:

  • head指向具体双向链表的头
  • tail指向具体双向链表的尾
  • len双向链表的长度

具体”实施方”:一目了然的双向链表结构,有前驱pre有后继next

由list和listNode两个数据结构构成。

ziplist

压缩列表。
redis的列表键和哈希键的底层实现之一。此数据结构是为了节约内存而开发的。和各种语言的数组类似,它是由连续的内存块组成的,这样一来,由于内存是连续的,就减少了很多内存碎片和指针的内存占用,进而节约了内存。

然后文中的entry的结构是这样的:

元素的遍历

先找到列表尾部元素:

然后再根据ziplist节点元素中的previous_entry_length属性,来逐个遍历:

连锁更新

再次看看entry元素的结构,有一个previous_entry_length字段,他的长度要么都是1个字节,要么都是5个字节:

  • 前一节点的长度小于254字节,则previous_entry_length长度为1字节
  • 前一节点的长度大于254字节,则previous_entry_length长度为5字节

假设现在存在一组压缩列表,长度都在250字节至253字节之间,突然新增一新节点new,
长度大于等于254字节,会出现:

程序需要不断的对压缩列表进行空间重分配工作,直到结束。

除了增加操作,删除操作也有可能带来“连锁更新”。
请看下图,ziplist中所有entry节点的长度都在250字节至253字节之间,big节点长度大于254字节,small节点小于254字节。

哈希表

哈希表略微有点复杂。哈希表的制作方法一般有两种,一种是:开放寻址法,一种是拉链法。redis的哈希表的制作使用的是拉链法。

整体结构如下图:

也是分为两部分:左边橘黄色部分和右边蓝色部分,同样,也是”统筹“和”实施“的关系。
具体哈希表的实现,都是在蓝色部分实现的。
先来看看蓝色部分:

这也分为左右两边“统筹”和“实施”的两部分。

右边部分很容易理解:就是通常拉链表实现的哈希表的样式;数组就是bucket,一般不同的key首先会定位到不同的bucket,若key重复,就用链表把冲突的key串起来。

新建key的过程:

假如重复了:

rehash

再来看看哈希表总体图中左边橘黄色的“统筹”部分,其中有两个关键的属性:ht和rehashidx。
ht是一个数组,有且只有俩元素ht[0]和ht[1];其中,ht[0]存放的是redis中使用的哈希表,而ht[1]和rehashidx和哈希表的rehash有关。

rehash指的是重新计算键的哈希值和索引值,然后将键值对重排的过程。

加载因子(load factor) = ht[0].used / ht[0].size。

扩容和收缩标准

扩容:

  • 没有执行BGSAVE和BGREWRITEAOF指令的情况下,哈希表的加载因子大于等于1。
  • 正在执行BGSAVE和BGREWRITEAOF指令的情况下,哈希表的加载因子大于等于5。

收缩:

  • 加载因子小于0.1时,程序自动开始对哈希表进行收缩操作。

扩容和收缩的数量

扩容:

  • 第一个大于等于ht[0].used * 2的2^n(2的n次方幂)。

收缩:

  • 第一个大于等于ht[0].used的2^n(2的n次方幂)。

(以下部分属于细节分析,可以跳过直接看扩容步骤)
对于收缩,我当时陷入了疑虑:收缩标准是加载因子小于0.1的时候,也就是说假如哈希表中有4个元素的话,哈希表的长度只要大于40,就会进行收缩,假如有一个长度大于40,但是存在的元素为4即(ht[0].used为4)的哈希表,进行收缩,那收缩后的值为多少?

我想了一下:按照前文所讲的内容,应该是4。
但是,假如是4,存在和收缩后的长度相等,是不是又该扩容?
翻开源码看看:

收缩具体函数:

1
2
3
4
5
6
7
8
9
10
11
12
复制代码int dictResize(dict *d)     //缩小字典d
{
int minimal;

//如果dict_can_resize被设置成0,表示不能进行rehash,或正在进行rehash,返回出错标志DICT_ERR
if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;

minimal = d->ht[0].used; //获得已经有的节点数量作为最小限度minimal
if (minimal < DICT_HT_INITIAL_SIZE)//但是minimal不能小于最低值DICT_HT_INITIAL_SIZE(4)
minimal = DICT_HT_INITIAL_SIZE;
return dictExpand(d, minimal); //用minimal调整字典d的大小
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
复制代码int dictExpand(dict *d, unsigned long size)     //根据size调整或创建字典d的哈希表
{
dictht n;
unsigned long realsize = _dictNextPower(size); //获得一个最接近2^n的realsize

if (dictIsRehashing(d) || d->ht[0].used > size) //正在rehash或size不够大返回出错标志
return DICT_ERR;

if (realsize == d->ht[0].size) return DICT_ERR; //如果新的realsize和原本的size一样则返回出错标志
/* Allocate the new hash table and initialize all pointers to NULL */
//初始化新的哈希表的成员
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;

/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
if (d->ht[0].table == NULL) { //如果ht[0]哈希表为空,则将新的哈希表n设置为ht[0]
d->ht[0] = n;
return DICT_OK;
}

d->ht[1] = n; //如果ht[0]非空,则需要rehash
d->rehashidx = 0; //设置rehash标志位为0,开始渐进式rehash(incremental rehashing)
return DICT_OK;
}
1
2
3
4
5
6
7
8
9
10
11
复制代码static unsigned long _dictNextPower(unsigned long size)
{
unsigned long i = DICT_HT_INITIAL_SIZE; //DICT_HT_INITIAL_SIZE 为 4

if (size >= LONG_MAX) return LONG_MAX + 1LU;
while(1) {
if (i >= size)
return i;
i *= 2;
}
}

由代码我们可以看到,假如收缩后长度为4,不仅不会收缩,甚至还会报错。(😝)

我们回过头来再看看设定:题目可能成立吗?
哈希表的扩容都是2倍增长的,最小是4,
4 ===》 8 ====》 16 =====》 32 ======》 64 ====》 128

也就是说:不存在长度为 40多的情况,只能是64。但是如果是64的话,64 X 0.1(收缩界限)= 6.4 ,也就是说在减少到6的时候,哈希表就会收缩,会缩小到多少呢?是8。此时,再继续减少到4,也不会再收缩了。所以,根本不存在一个长度大于40,但是存在的元素为4的哈希表的。

扩容步骤

收缩步骤

渐进式refresh

在”扩容步骤”和”收缩步骤” 两幅动图中每幅图的第四步骤“将ht[0]中的数据利用哈希函数重新计算,rehash到ht[1]”,并不是一步完成的,而是分成N多步,循序渐进的完成的。
因为hash中有可能存放几千万甚至上亿个key,毕竟Redis中每个hash中可以存2^32 - 1 键值对(40多亿),假如一次性将这些键值rehash的话,可能会导致服务器在一段时间内停止服务,毕竟哈希函数就得计算一阵子呢((#^.^#))。

哈希表的refresh是分多次、渐进式进行的。

渐进式refresh和下图中左边橘黄色的“统筹”部分中的rehashidx密切相关:

  • rehashidx 的数值就是现在rehash的元素位置
  • rehashidx 等于 -1 的时候说明没有在进行refresh

甚至在进行期间,每次对哈希表的增删改查操作,除了正常执行之外,还会顺带将ht[0]哈希表相关键值对rehash到ht[1]。

以扩容步骤为例:

intset

整数集合是集合键的底层实现方式之一。

跳表

跳表这种数据结构长这样:

redis中把跳表抽象成如下所示:

看这个图,左边“统筹”,右边实现。
统筹部分有以下几点说明:

  • header: 跳表表头
  • tail:跳表表尾
  • level:层数最大的那个节点的层数
  • length:跳表的长度

实现部分有以下几点说明:

  • 表头:是链表的哨兵节点,不记录主体数据。
  • 是个双向链表
  • 分值是有顺序的
  • o1、o2、o3是节点所保存的成员,是一个指针,可以指向一个SDS值。
  • 层级高度最高是32。没每次创建一个新的节点的时候,程序都会随机生成一个介于1和32之间的值作为level数组的大小,这个大小就是“高度”

redis五种数据结构的实现

redis对象

redis中并没有直接使用以上所说的各种数据结构来实现键值数据库,而是基于一种对象,对象底层再间接的引用上文所说的具体的数据结构。

结构如下图:

字符串

其中:embstr和raw都是由SDS动态字符串构成的。唯一区别是:raw是分配内存的时候,redisobject和 sds 各分配一块内存,而embstr是redisobject和在一块儿内存中。

列表

hash

set

zset

更多精彩内容,请关注我的微信公众号 互联网技术窝 或者加微信共同探讨交流:

参考文献

  • throwsnew.com/2017/09/12/…
  • 《redis设计与实现》

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

API接口开发(一):接口开发返回结果解决方案

发表于 2019-12-01

摘要

采用前后端分离的方式进行项目开发,那么前后端交互比较好的方式是采用HTTP+JSON。如何接口返回结果更加简洁,更加优雅,也更加合理,并且让前端开发人员看得明白,后端开发者也并不会因此而增加工作量呢?

正文开始

Hello,各位,好久不见了。一直在筹划个人网站2020版本改版的事情,所以,本篇文章,也是2019年最后一篇了,当然,也得花一些心思,争取把我想要说的话,都一一说出来,说明白,说透彻。

采用前后端分离的方式进行项目开发,那么前后端交互比较好的方式是采用HTTP+JSON。如何接口返回结果更加简洁,更加优雅,也更加合理,并且让前端开发人员看得明白,后端开发者也并不会因此而增加工作量呢?

为此,我写了一套关于API接口开发返回结果解决方案, api-result,已将其开源,并上传到中央仓库,欢迎各位批评和指正。

API讲解

实体类

提供了满足各场景使用的实体类,如下:

ResultModel

这个类是基础实体类,有如下属性:

success:返回结果标识,是一个布尔值,true / false(成功 / 失败)

message:描述信息,错误时,可以在这里填写错误的详细信息

data:数据,是一个泛型,可以是数组或者对象等等,成功并且需要返回数据时,才有该参数

ApiResultModel

结构关系如下:

1
2
复制代码 ResultModel
└── ApiResultModel

在这个类里面增加了 code 属性,也是一个泛型,你可以自定义你的返回码类型,可以是整数,或者字符串。

PageResultModel

结构关系如下:

1
2
复制代码 ResultModel
└── PageResultModel

这个类主要主要分页返回结果,所以,增加了以下属性:

total:数据总条数,Long型

size:每页条数,整数

pages:总页数,Long型

current:当前页,Long型

Helper工具类

帮助我们操作实体类,具体有哪些helper呢?如下:

ResultHelper

ResultHelper是与ResultModel对应的

success(String message)

成功,携带描述信息

success(String message, T data)

成功,携带描述信息和数据

error(String message)

错误,携带详细的描述信息

ApiResultHelper

ApiResultHelper是与ApiResultModel对应的

success(S code, String message)

成功,携带返回码和描述信息

success(S code, String message, T data)

成功,携带返回码、描述信息和数据

error(S code, String message)

错误,携带错误码和详细描述信息

PageResultHelper

PageResultHelper是与PageResultModel对应的

success(String message)

成功,携带描述信息

success(String message, T data)

成功,携带描述信息和数据

success(String message, T data, long total, int size, long pages, long current)

成功,携带描述信息、数据、总数、每页条数、总页数、当前页

error(String message)

错误,携带详细的描述信息

快速入门

我们为你提供了三个实体类,以满足不同场景,ResultModel适用于通常返回结果,ApiResultModel适用于接口开发返回结果,PageResultModel适用于分页返回结果。也分别为这三个实体类提供了各自的Helper,所以,你可以直接使用这些Helper进行返回。当然,我推荐的使用方式是,先为各Helper编写工具类,再通过工具类进行返回,这样可能更加合理定制自己的返回工具类。

利用Helper进行返回

首先我们来看一个简单的示例代码:

1
2
3
4
5
6
7
8
9
复制代码/**
* 添加方法示例
* @return {@link ResultModel}
*/
@ApiOperation(value = "添加方法示例")
@PostMapping
public ResultModel<?> add() {
return ResultHelper.success("添加成功");
}

返回结果:

1
2
3
4
复制代码{
"success": true,
"message": "添加成功"
}

注:这只是一个接口返回示例,而不是说添加接口应该这样写。

编写返回结果工具类

比如,我们可以写一个ResultUtils工具类来操作ResultHelper。如下示例:

1
2
3
4
5
6
7
复制代码/**
* 成功示例
* @return {@link ResultModel}
*/
public static ResultModel <?> success() {
return ResultHelper.success("Success");
}

使用返回结果工具类

我门就可以调用ResultUtils类里面的方法,如下示例:

1
2
3
4
5
6
7
8
9
复制代码/**
* 成功示例
* @return {@link ResultModel}
*/
@ApiOperation(value = "成功示例")
@DeleteMapping
public ResultModel<?> success() {
return ResultUtils.success();
}

印象结果:

1
2
3
4
复制代码{
"success": true,
"message": "Success"
}

使用示例

示例图

测试接口预览

接口示例

Models

Models

返回成功结果示例

1
2
3
4
5
6
7
8
9
复制代码/**
* 删除方法示例
* @return {@link ResultModel}
*/
@ApiOperation(value = "删除方法示例")
@DeleteMapping
public ResultModel<?> delete() {
return ResultUtils.success();
}

响应结果:

1
2
3
4
复制代码{
"success": true,
"message": "Success"
}

返回失败结果示例

如果操作出错了,我们怎么返回呢?我们来看一下:

1
2
3
4
5
6
7
8
9
复制代码/**
* 修改方法示例
* @return {@link ResultModel}
*/
@ApiOperation(value = "修改方法示例")
@PutMapping
public ResultModel<?> update() {
return ResultUtils.error("修改失败");
}

返回结果:

1
2
3
4
复制代码{
"success": false,
"message": "修改失败"
}

返回查询结果示例

值得一提的话,就是查询方法了,我们看一下吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
复制代码/**
* 查询方法示例
* @return {@link ResultModel}
*/
@ApiOperation(value = "查询方法示例")
@GetMapping
public ResultModel<?> get() {
List<Map<String, String>> list = new ArrayList<>();
Map<String, String> map1 = new HashMap<>();

map1.put("name", "张飞");
map1.put("desc", "燕人张飞");
list.add(map1);

Map<String, String> map2 = new HashMap<>();
map2.put("name", "赵云");
map2.put("desc", "常山赵子龙");
list.add(map2);

Map<String, String> map3 = new HashMap<>();
map3.put("name", "关羽");
map3.put("desc", "温酒斩华雄");
list.add(map3);

return ResultUtils.success(list);
}

看一下响应结果吧,是否如你所愿:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码{
"success": true,
"message": "Success",
"data": [
{
"name": "张飞",
"desc": "燕人张飞"
},
{
"name": "赵云",
"desc": "常山赵子龙"
},
{
"name": "关羽",
"desc": "温酒斩华雄"
}
]
}

接口返回数据示例

1
2
3
4
5
6
7
8
9
复制代码/**
* 接口返回数据示例
* @return {@link ApiResultModel}
*/
@ApiOperation(value = "接口返回数据示例")
@GetMapping("/api-data")
public ApiResultModel<Integer, ?> apiData() {
return ApiResultUtils.success(getData());
}

响应结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码{
"success": true,
"message": "Success",
"data": [
{
"name": "张飞",
"desc": "燕人张飞"
},
{
"name": "赵云",
"desc": "常山赵子龙"
},
{
"name": "关羽",
"desc": "温酒斩华雄"
}
],
"code": 0
}

接口返回失败结果示例

1
2
3
4
5
6
7
8
9
复制代码/**
* API接口错误返回示例
* @return {@link ApiResultModel}
*/
@ApiOperation(value = "API接口错误返回示例")
@GetMapping("/api-error")
public ApiResultModel<Integer, ?> apiError() {
return ApiResultUtils.error(1101, "API接口错误返回示例");
}

响应结果:

1
2
3
4
5
复制代码{
"success": false,
"message": "API接口错误返回示例",
"code": 1101
}

分页返回数据示例

1
2
3
4
5
6
7
8
9
复制代码/**
* 分页返回数据示例
* @return {@link ApiResultModel}
*/
@ApiOperation(value = "分页返回数据示例")
@GetMapping("/page")
public PageResultModel<?> page() {
return PageResultUtils.success(getData(), 100, 10, 10, 1);
}

响应结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码{
"success": true,
"message": "Success",
"data": [
{
"name": "张飞",
"desc": "燕人张飞"
},
{
"name": "赵云",
"desc": "常山赵子龙"
},
{
"name": "关羽",
"desc": "温酒斩华雄"
}
],
"total": 100,
"size": 10,
"pages": 10,
"current": 1
}

工具类示例

返回结果工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
复制代码package com.fengwenyi.api_example.util;

import com.fengwenyi.api_result.helper.ResultHelper;
import com.fengwenyi.api_result.model.ResultModel;

/**
* 返回结果封装工具类
* @author Erwin Feng[xfsy_2015@163.com]
* @since 2019/11/30 13:54
*/
public class ResultUtils {

/**
* 成功
* @return {@link ResultModel}
*/
public static ResultModel <?> success() {
return ResultHelper.success("Success");
}

/**
* 成功,携带数据
* @param data 数据
* @param <T> 数据的类型
* @return {@link ResultModel}
*/
public static <T> ResultModel <T> success(T data) {
return ResultHelper.success("Success", data);
}

/**
* 错误,携带详细的错误描述信息
* @param message 详细的错误描述信息
* @return {@link ResultModel}
*/
public static ResultModel <?> error(String message) {
return ResultHelper.error(message);
}

}

API接口返回结果工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
复制代码package com.fengwenyi.api_example.util;

import com.fengwenyi.api_result.helper.ApiResultHelper;
import com.fengwenyi.api_result.model.ApiResultModel;

/**
* API接口返回结果工具类
* @author Erwin Feng[xfsy_2015@163.com]
* @since 2019/12/1 20:10
*/
public class ApiResultUtils {

/**
* 成功,携带返回码和描述信息
* @return {@link ApiResultModel}
*/
public static ApiResultModel<Integer, ?> success() {
return ApiResultHelper.success(0, "Success");
}

/**
* 成功,携带返回码、描述信息和数据
* @param data 数据
* @param <T> 数据的类型
* @return {@link ApiResultModel}
*/
public static <T> ApiResultModel<Integer, T> success(T data) {
return ApiResultHelper.success(0, "Success", data);
}

/**
* 出错,携带错误吗和详细描述信息
* @param code 返回码
* @param message 相信描述信息
* @return {@link ApiResultModel}
*/
public static ApiResultModel<Integer, ?> error(int code, String message) {
return ApiResultHelper.error(code, message);
}
}

分页返回结果工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
复制代码package com.fengwenyi.api_example.util;

import com.fengwenyi.api_result.helper.PageResultHelper;
import com.fengwenyi.api_result.model.PageResultModel;

/**
* 分页返回结果工具类
* @author Erwin Feng[xfsy_2015@163.com]
* @since 2019/12/1 20:32
*/
public class PageResultUtils {

/**
* 成功,携带分页相关数据以及信息
* @param data 数据
* @param total 数据总条数
* @param size 每页条数
* @param pages 总页数
* @param current 当前页
* @param <T> 数据类型
* @return {@link PageResultModel}
*/
public static <T> PageResultModel<T> success(T data, long total, int size, long pages, long current) {
return PageResultHelper.success("Success", data, total, size, pages, current);
}

}

解析返回结果示例

这里补充一下,关于如何解析返回的json字符串,谈谈我的看法吧。返回的是一个json格式的字符串,这里我用fastjson来写解析示例。我们通常会将请求数据封装为一个通用方法或者工具类,只需要返回数据,当然,如果失败,或者出现异常,都在这里处理。

常用返回结果解析示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码/**
* 解析常用返回结果示例
* @return 数据
*/
public Object parseResult() {
String result = "";
ResultModel<?> resultModel = JSON.parseObject(result, ResultModel.class);
Boolean success = resultModel.getSuccess();
if (success != null && success) {
return resultModel.getData();
} else {
// 异常信息
String message = resultModel.getMessage();
// 异常处理
throw new DataParseException(message);
}
}

接口返回结果解析示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码/**
* 解析接口返回结果示例
* @return 数据
*/
public Object parseApiResult() {
String apiResult = "";
ApiResultModel<?, ?> apiResultModel = JSON.parseObject(apiResult, ApiResultModel.class);
Boolean success = apiResultModel.getSuccess();
if (success != null && success) {
return apiResultModel.getData();
} else {
Object code = apiResultModel.getCode();
String message = apiResultModel.getMessage();
// 根据接口错误码分别进行处理
// ...
return null;
}
}

分页返回结果解析示例

这里与上面略有不同,因为,增加了一些字段,所以,我们可以借助bean来返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
复制代码/**
* 解析分页返回结果示例
* @return {@link PageResultDataBean}
*/
public PageResultDataBean parsePageResult() {
String pageResult = "";
PageResultModel<List<?>> pageResultModel = JSON.parseObject(pageResult, PageResultModel.class);
Boolean success = pageResultModel.getSuccess();
if (success != null && success) {
List<?> data = pageResultModel.getData();
Long total = pageResultModel.getTotal();
Integer size = pageResultModel.getSize();
Long pages = pageResultModel.getPages();
Long current = pageResultModel.getCurrent();
return new PageResultDataBean()
.setTotal(total)
.setSize(size)
.setPages(pages)
.setCurrent(current)
.setData(data);
} else {
// 异常信息
String message = pageResultModel.getMessage();
// 异常处理
throw new DataParseException(message);
}
}

以上,这一切都是否如你所愿呢?欢迎评论留言告诉我。

链接

[1] api-result源码 | github

[2] api-result源码 | 码云

[3] api-result中央仓库

[4] 测试示例代码

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【Java实例】使用Thumbnailator生成缩略图(缩

发表于 2019-12-01

1 需求

表哥需要给儿子报名考试,系统要求上传不超过30KB的图片,而现在的手机随手一拍就是几MB的,怎么弄一个才30KB的图片呢?

一个简单的办法是在电脑上把图片缩小,然后截屏小图片,但现在的电脑屏幕分辨率很高,而且截屏大小不好控制;同样分辨率在不同图片格式下,大小也相差很大。试了一下微信截图工具,输出的图片比较大。于是放弃了该办法。

另一个办法是通过其它工具来处理。找了图片在线压缩网站和一款手机软件,发现在图片小于一定大小时压缩无效。如果再继续找其它工具,简直就是折腾且浪费时间,所以也放弃了该办法。

最后,还是自己写个代码把图片按原比例压缩成了29KB,使用Thumbnailator库,两三行代码搞定。

2 缩略图可以做什么?

缩略图是应用极其广泛的,像头像、图片消息、商品图片等,都会用到缩略图。

比如,当你有了一个新的微信好友,你就能看到他的头像,一开始这个头像是一个比原图更小的缩略图。而你点击查看原图时,微信客户端才会给你下载原图。因为你并不会对每个人的头像都感兴趣,都会去查看清晰的原图,一个小小的缩略图已经能满足了。这样可以减轻网络传输的负担,加快响应速度。

微信传图片和视频也是同样的道理,先给你传一个比较小的预览,你点击查看原图或视频播放才给你传更大的文件。

3 缩略图的开源库

缩略图的开源库很多:

(1)Thumbnailator

GitHub:https://github.com/coobird/thumbnailator

不依赖外部库,轻便高效,任何平台适用,支持缩放、旋转、截取,支持水印。

(2)Imgscalr

GitHub:https://github.com/rkalla/imgscalr

全部基于 Java 2D,不依赖外部库,轻便高效,任何平台适用,支持缩放、旋转、截取,不支持水印。

本文主要讲解Thumbnailator的使用,最新版本为0.4.8,maven引入如下:

1
2
3
4
5
复制代码<dependency>
<groupId>net.coobird</groupId>
<artifactId>thumbnailator</artifactId>
<version>0.4.8</version>
</dependency>

4 常用操作

4.1 指定大小缩放

原图为4:3比例的图片,为4032x3024(图片太大,网站无法上传原图),如下:

file

当使用指定大小方式进行缩放时,默认是保持原比例的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
复制代码//参数小且比例与原比例一样
//则按参数输出结果
Thumbnails.of(originalPic)
.size(400, 300)
.toFile(picturePath + "climb-up.size.400X300.jpeg");
//参数大且比例不等
//则按比例放大,取最小的值
Thumbnails.of(originalPic)
.size(4400, 3400)
.toFile(picturePath + "climb-up.size.4400X3300.jpeg");
//参数小且比例不等
//则按比例缩小,取最小的值
Thumbnails.of(originalPic)
.size(200, 300)
.toFile(picturePath + "climb-up.size.200X150.jpeg");
//不保持比例
//则按参数输出结果
Thumbnails.of(originalPic)
.size(200, 300)
.keepAspectRatio(false)
.toFile(picturePath + "climb-up.size.notKeepRatio.200X300.jpeg");
//强制设置大小
//则按参数输出结果,与上个例子一样
Thumbnails.of(originalPic)
.forceSize(200, 300)
.toFile(picturePath + "climb-up.forceSize.200X300.jpeg");

展示其中两种结果:

(1)size(200, 300),结果为200X150的图片,比例还是4:3。

file

(2)forceSize(200, 300),结果为200X300的图片,如原比例不同,会有变形。

file

4.2 按比例进行缩放

按比例进行缩放是指按宽和高的比例同时缩放,看下面代码:

1
2
3
4
5
6
7
8
9
10
复制代码//比例小于1,缩小
//宽和高同时缩小为原来的0.1倍
Thumbnails.of(originalPic)
.scale(0.1f)
.toFile(picturePath + "climb-up.scale.403X302.jpeg");
//比例大于1,放大
//宽和高同时放大为原来的1.1倍
Thumbnails.of(originalPic)
.scale(1.1f)
.toFile(picturePath + "climb-up.scale.4435X3326.jpeg");

4.3 按角度旋转

按角度旋转时,角度为正数时,顺时针;角度为负数时,逆时针。代码如下:

1
2
3
4
复制代码Thumbnails.of(originalPic)
.size(400,300)
.rotate(45)
.toFile(picturePath + "climb-up.rotate.45.jpeg");

压缩并旋转后的结果图片如下所示:

file

4.4 添加水印

添加水印也是十分方便,我们示例将水印放在右上角,代码如下:

1
2
3
4
5
复制代码Thumbnails.of(originalPic)
.size(2000,1500)
.watermark(Positions.TOP_RIGHT, ImageIO.read(
new File(picturePath + "pkslow.size.400X300.jpeg")), 0.5f)
.toFile(picturePath + "climb-up.watermark.jpeg");

加上水印后的图片如下:

file

4.5 裁剪

代码如下:

1
2
3
4
复制代码Thumbnails.of(originalPic)
.sourceRegion(Positions.TOP_RIGHT, 1800, 1800)
.size(400, 400)
.toFile(picturePath + "climb-up.crop.jpeg");

结果如下:

file

4.6 目录下的文件批量操作

这个功能还是非常有用,可以操作目录下的所有图片,并指定文件名输出,如指定前缀,代码如下:

1
2
3
复制代码Thumbnails.of(new File("/pictures/201912/").listFiles())
.size(400, 400)
.toFiles(Rename.PREFIX_DOT_THUMBNAIL);

操作后的生成的结果如下:

file

5 总结

Thumbnailator库操作方便,支持缩放、旋转、裁剪、水印等功能,而且没有其它依赖,值得了解学习。

欢迎关注公众号<南瓜慢说>,将持续为你更新…

file

多读书,多分享;多写作,多整理。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

GitHub 标星 16w+,我发现了一个宝藏项目,作为编

发表于 2019-11-29

大家好,我是 Rocky0429,一个最近老在 GitHub 上闲逛的蒟蒻…

特别惭愧的是,虽然我很早就知道 GitHub,但是学会逛 GitHub 的时间特别晚。当时一方面是因为菜,看着这种全是英文的东西难受,不知道该怎么去玩,另一方面是一直在搞 ACM,没有做一些工程类的项目,所以想当然的以为和 GitHub 也没什么关系(当然这种想法是错误的)。

后来自己花了一个星期看完了 Python 的基础知识,就想着找点项目看一看,学一学,练一练,这个时候我才真正的去了解 GitHub,开始了在 GitHub 的瞎逛之旅,在开始之初,随之而来的问题是我不知道哪些项目当时还是新手的我,哪些项目是好项目,哪些项目好玩有价值。

虽然现在我已经在 GitHub 上逛的相当流畅,但我还是想如果有一个东西可以收集这些对新手友好的东西,那么我当时可以少走更多的弯路,节省更多的时间吧。

那么有这么一个东西么?

有的,而且已经做了三年多,这就是HelloGitHub,一个分享 GitHub 上有趣,入门级的开源项目。

GitHub 网址:github.com/521xueweiha…

在项目中,内容每月 28 号以月刊的形式更新发布,主要是面向编程新手、热爱编程、对开源社区感兴趣人群的项目。更新的内容主要包括:各种语言的流行项目、入门级项目、让生活变得更美好的工具、书籍、学习心得笔记、企业级项目等。

这些项目都有一些共同的特征,那就是很酷,非常容易上手的项目,编程的魅力和便捷体验起来就是这么简单。

再次给出 GitHub 地址:
github.com/521xueweiha…

下面我们就来看看,这个项目具体包括啥,我们以最新的月刊(43 期)为例。

0x00 简介

0x01 目录

0x02 内容

每个类型的项目我们挑一个来看。

C 项目

tmux:一个终端复用工具,可极大的提高工作效率。
提供了强劲的、易于使用的命令行界面;
可横向和纵向分割窗口;
窗格可以自由移动和调整大小,或直接利用四个预设布局之一;
可在多个缓冲区进行复制和粘贴;
可通过交互式菜单来选择窗口、会话及客户端;
等等。

C# 项目

BenchmarkDotNet:功能强大的用于基准测试 .NET 库。

C++ 项目

dbg-macro:打日志是 C++ 开发中必不可少的一种 debug 方式,dbg-macro 受 rust-lang 中 的 dbg 启发,提供比 printf 和 std::cout 更好的宏函数。主要有如下特点:
美观的彩色输出(当输出不是交互式终端时,颜色将自动禁用);
兼容 C++11,并且是 header-only;
支持基础类型和 STL 容器类型的输出;
除了基本信息外,还输出变量名和类型;
启用 DBG_MACRO_DISABLE 生成 release 版。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
复制代码#include <vector>
#include <dbg.h>

// You can use "dbg(..)" in expressions:
int factorial(int n) {
if (dbg(n <= 1)) {
return dbg(1);
} else {
return dbg(n * factorial(n - 1));
}
}

int main() {
std::string message = "hello";
dbg(message); // [example.cpp:15 (main)] message = "hello" (std::string)
const int a = 2;
const int b = dbg(3 * a) + 1; // [example.cpp:18 (main)] 3 * a = 6 (int)
std::vector<int> numbers{b, 13, 42};
dbg(numbers); // [example.cpp:21 (main)] numbers = {7, 13, 42} (size: 3) (std::vector<int>)
dbg("this line is executed"); // [example.cpp:23 (main)] this line is executed
factorial(4);
return 0;
}

Go 项目

go-admin:基于 Golang 语言的数据可视化与管理平台。特性如下:

🚀
高生产效率:10 分钟内做一个好看的管理后台;

🎨
主题:默认为 adminlte,更多好看的主题正在制作中,欢迎给我们留言;

🔢
插件化:提供插件使用,真正实现一个插件解决不了问题,那就两个;

✅
认证:开箱即用的 rbac 认证系统;

⚙️
框架支持:支持大部分框架接入,让你更容易去上手和扩展。

Java 项目

eladmin:基于 Spring Boot 2.1.0、Vue 的前后端分离的后台管理系统,支持数据字典与数据权限管理、一键生成前后端代码、前端菜单动态路由等。基于 Spring Boot2.1.0 框架,涉及的技术栈:非关系数据库 redis、接口测试工具 swagger、druid 数据源驱动、邮件依赖(javax.mail)、三方支付和云存储 SDK、页面模板引擎 freemarker。技术栈丰富,初学者可以作为实战项目学习和使用。

JavaScript

chart-race-react:一个简单易用的 Bar Chart Race(长条图赛跑动画) React 组件。示例代码:

1
2
3
4
5
复制代码
import ReactDOM from 'react-dom';
import BarChart from 'chart-race-react';

ReactDOM.render(<BarChart />, document.getElementById('root'));

在这里插入图片描述

Python 项目

TagUI-Python:一个 Python 自动化操作的库。比如:自动打开网页并截图,示例代码:

1
2
3
4
5
复制代码t.init()
t.url('https://www.google.com')
t.type('q', 'decentralization[enter]')
t.snap('page', 'results.png')
t.close()

Ruby 项目

shift:一个 Ruby 语言写的在线 MySQL 数据库迁移工具。

Swift 项目

Percent:让 Swift 语言支持百分比类型,消除精度缺失的烦恼。示例代码:

1
2
3
4
5
6
复制代码import Percent

10% + 5.5%
//=> 15.5%
-10% / 2
//=> -5%

其他

cascadia-code:微软开源的一套等宽字体,有趣的是可以组合字符创建新的字形。组合效果如下:

在这里插入图片描述

开源书籍

python_ebook:Python 编程相关的电子书资源集合项目。

book:(英文)《Cosmic Python》讲述如何管理复杂性的 Pythonic 应用程序结构的书籍。

教程

BigData-Notes:大数据入门教程,该教程介绍了大数据常用技术栈的基础和核心知识。内容涵盖:Hadoop、Spark、Storm、HBase、Hive、ZooKeeper、Kafka 等。

机器学习

dimensionality_reduction_alo_codes:该项目使用 Python 实现了 11 种经典的数据抽取(数据降维)算法。同时附有相关资料、展示效果,适用于机器学习初学者和刚刚入坑数据挖掘的小伙伴。

0x03 写在之后

以上,就是今天分享的内容,希望更多的人能够知道 HelloGitHub 这个项目,内容已经在这了,万事俱备,差的就是你学习的驱动力和对作者 Star 的支持了。

让走在开源路上的开发者不再孤单,让想进入开源世界的人不再畏惧。

看完有所收获?点个在看,让更多人可以看到~谢谢啦!

今天的分享就到这,拜里个拜~

❤️ 看完有所收获?希望爱学习的你不要吝啬三连击哟[点赞 + 收藏 + 评论]~

❤️可以关注我的原创公众号:「Python空间」,更多优质的技术文章第一时间更新。最后送你新人大礼包一份,关注微信公众号,后台回复:“掘金” 即可获取!

作者Info:

【作者】:Rocky0429

【原创公众号】:Python空间。

【简介】:CSDN 博客专家, 985 计算机在读研究生,ACM 退役狗 & 亚洲区域赛银奖划水选手。这是一个坚持原创的技术公众号,每天坚持推送各种 Python 基础/进阶文章,数据分析,爬虫实战,数据结构与算法,不定期分享各类资源。

【转载说明】:转载请说明出处,谢谢合作!~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

真的,Kafka 入门一篇文章就够了

发表于 2019-11-28

初识 Kafka

什么是 Kafka

Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。

Kafka 的基本术语

消息:Kafka 中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。

批次:为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。

主题:消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。

分区:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序

生产者: 向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。

消费者:订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。

消费者群组:生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。

偏移量:偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。

broker: 一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。

broker 集群:broker 是集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。

副本:Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。

重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

Kafka 的特性(设计原则)

  • 高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
  • 高伸缩性: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
  • 持久性、可靠性: Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
  • 容错性: 允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
  • 高并发: 支持数千个客户端同时读写

Kafka 的使用场景

  • 活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。
  • 传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的。
  • 度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 流式处理:流式处理是有一个能够提供多种应用程序的领域。
  • 限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃。

Kafka 的消息队列

Kafka 的消息队列一般分为两种模式:点对点模式和发布订阅模式

Kafka 是支持消费者群组的,也就是说 Kafka 中会有一个或者多个消费者,如果一个生产者生产的消息由一个消费者进行消费的话,那么这种模式就是点对点模式

如果一个生产者或者多个生产者产生的消息能够被多个消费者同时消费的情况,这样的消息队列成为发布订阅模式的消息队列

Kafka 系统架构

如上图所示,一个典型的 Kafka 集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

核心 API

Kafka 有四个核心API,它们分别是

  • Producer API,它允许应用程序向一个或多个 topics 上发送消息记录
  • Consumer API,允许应用程序订阅一个或多个 topics 并处理为其生成的记录流
  • Streams API,它允许应用程序作为流处理器,从一个或多个主题中消费输入流并为其生成输出流,有效的将输入流转换为输出流。
  • Connector API,它允许构建和运行将 Kafka 主题连接到现有应用程序或数据系统的可用生产者和消费者。例如,关系数据库的连接器可能会捕获对表的所有更改

Kafka 为何如此之快

Kafka 实现了零拷贝原理来快速移动数据,避免了内核之间的切换。Kafka 可以将数据记录分批发送,从生产者到文件系统(Kafka 主题日志)到消费者,可以端到端的查看这些批次的数据。

批处理能够进行更有效的数据压缩并减少 I/O 延迟,Kafka 采取顺序写入磁盘的方式,避免了随机磁盘寻址的浪费,更多关于磁盘寻址的了解,请参阅 程序员需要了解的硬核知识之磁盘 。

总结一下其实就是四个要点

  • 顺序读写
  • 零拷贝
  • 消息压缩
  • 分批发送

Kafka 安装和重要配置

Kafka 安装我在 Kafka 系列第一篇应该比较详细了,详情见带你涨姿势的认识一下kafka 这篇文章。

那我们还是主要来说一下 Kafka 中的重要参数配置吧,这些参数对 Kafka 来说是非常重要的。

broker 端配置

  • broker.id

每个 kafka broker 都有一个唯一的标识来表示,这个唯一的标识符即是 broker.id,它的默认值是 0。这个值在 kafka 集群中必须是唯一的,这个值可以任意设定,

  • port

如果使用配置样本来启动 kafka,它会监听 9092 端口。修改 port 配置参数可以把它设置成任意的端口。要注意,如果使用 1024 以下的端口,需要使用 root 权限启动 kakfa。

  • zookeeper.connect

用于保存 broker 元数据的 Zookeeper 地址是通过 zookeeper.connect 来指定的。比如我可以这么指定 localhost:2181 表示这个 Zookeeper 是运行在本地 2181 端口上的。我们也可以通过 比如我们可以通过 zk1:2181,zk2:2181,zk3:2181 来指定 zookeeper.connect 的多个参数值。该配置参数是用冒号分割的一组 hostname:port/path 列表,其含义如下

hostname 是 Zookeeper 服务器的机器名或者 ip 地址。

port 是 Zookeeper 客户端的端口号

/path 是可选择的 Zookeeper 路径,Kafka 路径是使用了 chroot 环境,如果不指定默认使用跟路径。

如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2,那么两套集群的zookeeper.connect参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1和zk1:2181,zk2:2181,zk3:2181/kafka2

  • log.dirs

Kafka 把所有的消息都保存到磁盘上,存放这些日志片段的目录是通过 log.dirs 来制定的,它是用一组逗号来分割的本地系统路径,log.dirs 是没有默认值的,你必须手动指定他的默认值。其实还有一个参数是 log.dir,如你所知,这个配置是没有 s 的,默认情况下只用配置 log.dirs 就好了,比如你可以通过 /home/kafka1,/home/kafka2,/home/kafka3 这样来配置这个参数的值。

  • num.recovery.threads.per.data.dir

对于如下3种情况,Kafka 会使用可配置的线程池来处理日志片段。

服务器正常启动,用于打开每个分区的日志片段;

服务器崩溃后重启,用于检查和截断每个分区的日志片段;

服务器正常关闭,用于关闭日志片段。

默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到井行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩愤,在进行恢复时使用井行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,如果 num.recovery.threads.per.data.dir 被设为 8,并且 log.dir 指定了 3 个路径,那么总共需要 24 个线程。

  • auto.create.topics.enable

默认情况下,kafka 会使用三种方式来自动创建主题,下面是三种情况:

当一个生产者开始往主题写入消息时

当一个消费者开始从主题读取消息时

当任意一个客户端向主题发送元数据请求时

auto.create.topics.enable参数我建议最好设置成 false,即不允许自动创建 Topic。在我们的线上环境里面有很多名字稀奇古怪的 Topic,我想大概都是因为该参数被设置成了 true 的缘故。

主题默认配置

Kafka 为新创建的主题提供了很多默认配置参数,下面就来一起认识一下这些参数

  • num.partitions

num.partitions 参数指定了新创建的主题需要包含多少个分区。如果启用了主题自动创建功能(该功能是默认启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。

  • default.replication.factor

这个参数比较简单,它表示 kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务default.replication.factor 的默认值为1,这个参数在你启用了主题自动创建功能后有效。

  • log.retention.ms

Kafka 通常根据时间来决定数据可以保留多久。默认使用 log.retention.hours 参数来配置时间,默认是 168 个小时,也就是一周。除此之外,还有两个参数 log.retention.minutes 和 log.retentiion.ms 。这三个参数作用是一样的,都是决定消息多久以后被删除,推荐使用 log.retention.ms。

  • log.retention.bytes

另一种保留消息的方式是判断消息是否过期。它的值通过参数 log.retention.bytes 来指定,作用在每一个分区上。也就是说,如果有一个包含 8 个分区的主题,并且 log.retention.bytes 被设置为 1GB,那么这个主题最多可以保留 8GB 数据。所以,当主题的分区个数增加时,整个主题可以保留的数据也随之增加。

  • log.segment.bytes

上述的日志都是作用在日志片段上,而不是作用在单个消息上。当消息到达 broker 时,它们被追加到分区的当前日志片段上,当日志片段大小到达 log.segment.bytes 指定上限(默认为 1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就越会频繁的关闭和分配新文件,从而降低磁盘写入的整体效率。

  • log.segment.ms

上面提到日志片段经关闭后需等待过期,那么 log.segment.ms 这个参数就是指定日志多长时间被关闭的参数和,log.segment.ms 和 log.retention.bytes 也不存在互斥问题。日志片段会在大小或时间到达上限时被关闭,就看哪个条件先得到满足。

  • message.max.bytes

broker 通过设置 message.max.bytes 参数来限制单个消息的大小,默认是 1000 000, 也就是 1MB,如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到 broker 返回的错误消息。跟其他与字节相关的配置参数一样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于 mesage.max.bytes,那么消息的实际大小可以大于这个值

这个值对性能有显著的影响。值越大,那么负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求。它还会增加磁盘写入块的大小,从而影响 IO 吞吐量。

  • retention.ms

规定了该主题消息被保存的时常,默认是7天,即该主题只能保存7天的消息,一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。

  • retention.bytes

retention.bytes:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。

JVM 参数配置

JDK 版本一般推荐直接使用 JDK1.8,这个版本也是现在中国大部分程序员的首选版本。

说到 JVM 端设置,就绕不开堆这个话题,业界最推崇的一种设置方式就是直接将 JVM 堆大小设置为 6GB,这样会避免很多 Bug 出现。

JVM 端配置的另一个重要参数就是垃圾回收器的设置,也就是平时常说的 GC 设置。如果你依然在使用 Java 7,那么可以根据以下法则选择合适的垃圾回收器:

  • 如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。
  • 否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。

当然了,如果你已经在使用 Java 8 了,那么就用默认的 G1 收集器就好了。在没有任何调优的情况下,G1 表现得要比 CMS 出色,主要体现在更少的 Full GC,需要调整的参数更少等,所以使用 G1 就好了。

一般 G1 的调整只需要这两个参数即可

  • MaxGCPauseMillis

该参数指定每次垃圾回收默认的停顿时间。该值不是固定的,G1可以根据需要使用更长的时间。它的默认值是 200ms,也就是说,每一轮垃圾回收大概需要200 ms 的时间。

  • InitiatingHeapOccupancyPercent

该参数指定了 G1 启动新一轮垃圾回收之前可以使用的堆内存百分比,默认值是45,这就表明G1在堆使用率到达45之前不会启用垃圾回收。这个百分比包括新生代和老年代。

Kafka Producer

在 Kafka 中,我们把产生消息的那一方称为生产者,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka 后台,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka 后台,然后淘宝会根据你的爱好做智能推荐,致使你的钱包从来都禁不住诱惑,那么这些生产者产生的消息是怎么传到 Kafka 应用程序的呢?发送过程是怎么样的呢?

尽管消息的产生非常简单,但是消息的发送过程还是比较复杂的,如图

我们从创建一个ProducerRecord 对象开始,ProducerRecord 是 Kafka 中的一个核心类,它代表了一组 Kafka 需要发送的 key/value 键值对,它由记录要发送到的主题名称(Topic Name),可选的分区号(Partition Number)以及可选的键值对构成。

在发送 ProducerRecord 时,我们需要将键值对对象由序列化器转换为字节数组,这样它们才能够在网络上传输。然后消息到达了分区器。

如果发送过程中指定了有效的分区号,那么在发送记录时将使用该分区。如果发送过程中未指定分区,则将使用key 的 hash 函数映射指定一个分区。如果发送的过程中既没有分区号也没有,则将以循环的方式分配一个分区。选好分区后,生产者就知道向哪个主题和分区发送数据了。

ProducerRecord 还有关联的时间戳,如果用户没有提供时间戳,那么生产者将会在记录中使用当前的时间作为时间戳。Kafka 最终使用的时间戳取决于 topic 主题配置的时间戳类型。

  • 如果将主题配置为使用 CreateTime,则生产者记录中的时间戳将由 broker 使用。
  • 如果将主题配置为使用LogAppendTime,则生产者记录中的时间戳在将消息添加到其日志中时,将由 broker 重写。

然后,这条消息被存放在一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。由一个独立的线程负责把它们发到 Kafka Broker 上。

Kafka Broker 在收到消息时会返回一个响应,如果写入成功,会返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量,上面两种的时间戳类型也会返回给用户。如果写入失败,会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败的话,就返回错误消息。

创建 Kafka 生产者

要向 Kafka 写入消息,首先需要创建一个生产者对象,并设置一些属性。Kafka 生产者有3个必选的属性

  • bootstrap.servers

该属性指定 broker 的地址清单,地址的格式为 host:port。清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其他的 broker 信息。不过建议至少要提供两个 broker 信息,一旦其中一个宕机,生产者仍然能够连接到集群上。

  • key.serializer

broker 需要接收到序列化之后的 key/value值,所以生产者发送的消息需要经过序列化之后才传递给 Kafka Broker。生产者需要知道采用何种方式把 Java 对象转换为字节数组。key.serializer 必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer 接口的类,生产者会使用这个类把键对象序列化为字节数组。这里拓展一下 Serializer 类

Serializer 是一个接口,它表示类将会采用何种方式序列化,它的作用是把对象转换为字节,实现了 Serializer 接口的类主要有 ByteArraySerializer、StringSerializer、IntegerSerializer ,其中 ByteArraySerialize 是 Kafka 默认使用的序列化器,其他的序列化器还有很多,你可以通过 这里 查看其他序列化器。要注意的一点:key.serializer 是必须要设置的,即使你打算只发送值的内容。

  • value.serializer

与 key.serializer 一样,value.serializer 指定的类会将值序列化。

下面代码演示了如何创建一个 Kafka 生产者,这里只指定了必要的属性,其他使用默认的配置

1
2
3
4
5
复制代码private Properties properties = new Properties();
properties.put("bootstrap.servers","broker1:9092,broker2:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties = new KafkaProducer<String,String>(properties);

来解释一下这段代码

  • 首先创建了一个 Properties 对象
  • 使用 StringSerializer 序列化器序列化 key / value 键值对
  • 在这里我们创建了一个新的生产者对象,并为键值设置了恰当的类型,然后把 Properties 对象传递给他。

Kafka 消息发送

实例化生产者对象后,接下来就可以开始发送消息了,发送消息主要由下面几种方式

简单消息发送

Kafka 最简单的消息发送如下:

1
2
3
4
复制代码ProducerRecord<String,String> record =
new ProducerRecord<String, String>("CustomerCountry","West","France");

producer.send(record);

代码中生产者(producer)的 send() 方法需要把 ProducerRecord 的对象作为参数进行发送,ProducerRecord 有很多构造函数,这个我们下面讨论,这里调用的是

1
复制代码public ProducerRecord(String topic, K key, V value) {}

这个构造函数,需要传递的是 topic主题,key 和 value。

把对应的参数传递完成后,生产者调用 send() 方法发送消息(ProducerRecord对象)。我们可以从生产者的架构图中看出,消息是先被写入分区中的缓冲区中,然后分批次发送给 Kafka Broker。

发送成功后,send() 方法会返回一个 Future(java.util.concurrent) 对象,Future 对象的类型是 RecordMetadata 类型,我们上面这段代码没有考虑返回值,所以没有生成对应的 Future 对象,所以没有办法知道消息是否发送成功。如果不是很重要的信息或者对结果不会产生影响的信息,可以使用这种方式进行发送。

我们可以忽略发送消息时可能发生的错误或者在服务器端可能发生的错误,但在消息发送之前,生产者还可能发生其他的异常。这些异常有可能是 SerializationException(序列化失败),BufferedExhaustedException 或 TimeoutException(说明缓冲区已满),又或是 InterruptedException(说明发送线程被中断)

同步发送消息

第二种消息发送机制如下所示

1
2
3
4
5
6
7
8
复制代码ProducerRecord<String,String> record =
new ProducerRecord<String, String>("CustomerCountry","West","France");

try{
RecordMetadata recordMetadata = producer.send(record).get();
}catch(Exception e){
e.printStackTrace();
}

这种发送消息的方式较上面的发送方式有了改进,首先调用 send() 方法,然后再调用 get() 方法等待 Kafka 响应。如果服务器返回错误,get() 方法会抛出异常,如果没有发生错误,我们会得到 RecordMetadata 对象,可以用它来查看消息记录。

生产者(KafkaProducer)在发送的过程中会出现两类错误:其中一类是重试错误,这类错误可以通过重发消息来解决。比如连接的错误,可以通过再次建立连接来解决;无主错误则可以通过重新为分区选举首领来解决。KafkaProducer 被配置为自动重试,如果多次重试后仍无法解决问题,则会抛出重试异常。另一类错误是无法通过重试来解决的,比如消息过大对于这类错误,KafkaProducer 不会进行重试,直接抛出异常。

异步发送消息

同步发送消息都有个问题,那就是同一时间只能有一个消息在发送,这会造成许多消息无法直接发送,造成消息滞后,无法发挥效益最大化。

比如消息在应用程序和 Kafka 集群之间一个来回需要 10ms。如果发送完每个消息后都等待响应的话,那么发送100个消息需要 1 秒,但是如果是异步方式的话,发送 100 条消息所需要的时间就会少很多很多。大多数时候,虽然Kafka 会返回 RecordMetadata 消息,但是我们并不需要等待响应。

为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回掉支持。下面是回调的一个例子

1
2
3
4
5
6
7
8
9
10
11
12
复制代码ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("CustomerCountry", "Huston", "America");
producer.send(producerRecord,new DemoProducerCallBack());


class DemoProducerCallBack implements Callback {

public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null){
exception.printStackTrace();;
}
}
}

首先实现回调需要定义一个实现了org.apache.kafka.clients.producer.Callback的类,这个接口只有一个 onCompletion方法。如果 kafka 返回一个错误,onCompletion 方法会抛出一个非空(non null)异常,这里我们只是简单的把它打印出来,如果是生产环境需要更详细的处理,然后在 send() 方法发送的时候传递一个 Callback 回调的对象。

生产者分区机制

Kafka 对于数据的读写是以分区为粒度的,分区可以分布在多个主机(Broker)中,这样每个节点能够实现独立的数据写入和读取,并且能够通过增加新的节点来增加 Kafka 集群的吞吐量,通过分区部署在多个 Broker 来实现负载均衡的效果。

上面我们介绍了生产者的发送方式有三种:不管结果如何直接发送、发送并返回结果、发送并回调。由于消息是存在主题(topic)的分区(partition)中的,所以当 Producer 生产者发送产生一条消息发给 topic 的时候,你如何判断这条消息会存在哪个分区中呢?

这其实就设计到 Kafka 的分区机制了。

分区策略

Kafka 的分区策略指的就是将生产者发送到哪个分区的算法。Kafka 为我们提供了默认的分区策略,同时它也支持你自定义分区策略。

如果要自定义分区策略的话,你需要显示配置生产者端的参数 Partitioner.class,我们可以看一下这个类它位于 org.apache.kafka.clients.producer 包下

1
2
3
4
5
6
7
8
复制代码public interface Partitioner extends Configurable, Closeable {

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

public void close();

default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
}

Partitioner 类有三个方法,分别来解释一下

  • partition(): 这个类有几个参数: topic,表示需要传递的主题;key 表示消息中的键值;keyBytes表示分区中序列化过后的key,byte数组的形式传递;value 表示消息的 value 值;valueBytes 表示分区中序列化后的值数组;cluster表示当前集群的原数据。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。
  • close() : 继承了 Closeable 接口能够实现 close() 方法,在分区关闭时调用。
  • onNewBatch(): 表示通知分区程序用来创建新的批次

其中与分区策略息息相关的就是 partition() 方法了,分区策略有下面这几种

顺序轮询

顺序分配,消息是均匀的分配给每个 partition,即每个分区存储一次消息。就像下面这样

上图表示的就是轮询策略,轮训策略是 Kafka Producer 提供的默认策略,如果你不使用指定的轮训策略的话,Kafka 默认会使用顺序轮训策略的方式。

随机轮询

随机轮询简而言之就是随机的向 partition 中保存消息,如下图所示

实现随机分配的代码只需要两行,如下

1
2
复制代码List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。

本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。

按照 key 进行消息保存

这个策略也叫做 key-ordering 策略,Kafka 中每条消息都会有自己的key,一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,如下图所示

实现这个策略的 partition 方法同样简单,只需要下面两行代码即可:

1
2
复制代码List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

上面这几种分区策略都是比较基础的策略,除此之外,你还可以自定义分区策略。

生产者压缩机制

压缩一词简单来讲就是一种互换思想,它是一种经典的用 CPU 时间去换磁盘空间或者 I/O 传输量的思想,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。如果你还不了解的话我希望你先读完这篇文章 程序员需要了解的硬核知识之压缩算法,然后你就明白压缩是怎么回事了。

Kafka 压缩是什么

Kafka 的消息分为两层:消息集合 和 消息。一个消息集合中包含若干条日志项,而日志项才是真正封装消息的地方。Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。

在 Kafka 中,压缩会发生在两个地方:Kafka Producer 和 Kafka Consumer,为什么启用压缩?说白了就是消息太大,需要变小一点 来使消息发的更快一些。

Kafka Producer 中使用 compression.type 来开启压缩

1
2
3
4
5
6
7
8
9
10
复制代码private Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.1.9:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");

Producer<String,String> producer = new KafkaProducer<String, String>(properties);

ProducerRecord<String,String> record =
new ProducerRecord<String, String>("CustomerCountry","Precision Products","France");

上面代码表明该 Producer 的压缩算法使用的是 GZIP

有压缩必有解压缩,Producer 使用压缩算法压缩消息后并发送给服务器后,由 Consumer 消费者进行解压缩,因为采用的何种压缩算法是随着 key、value 一起发送过去的,所以消费者知道采用何种压缩算法。

Kafka 重要参数配置

在上一篇文章 带你涨姿势的认识一下kafka中,我们主要介绍了一下 kafka 集群搭建的参数,本篇文章我们来介绍一下 Kafka 生产者重要的配置,生产者有很多可配置的参数,在文档里(kafka.apache.org/documentati…

key.serializer

用于 key 键的序列化,它实现了 org.apache.kafka.common.serialization.Serializer 接口

value.serializer

用于 value 值的序列化,实现了 org.apache.kafka.common.serialization.Serializer 接口

acks

acks 参数指定了要有多少个分区副本接收消息,生产者才认为消息是写入成功的。此参数对消息丢失的影响较大

  • 如果 acks = 0,就表示生产者也不知道自己产生的消息是否被服务器接收了,它才知道它写成功了。如果发送的途中产生了错误,生产者也不知道,它也比较懵逼,因为没有返回任何消息。这就类似于 UDP 的运输层协议,只管发,服务器接受不接受它也不关心。
  • 如果 acks = 1,只要集群的 Leader 接收到消息,就会给生产者返回一条消息,告诉它写入成功。如果发送途中造成了网络异常或者 Leader 还没选举出来等其他情况导致消息写入失败,生产者会受到错误消息,这时候生产者往往会再次重发数据。因为消息的发送也分为 同步 和 异步,Kafka 为了保证消息的高效传输会决定是同步发送还是异步发送。如果让客户端等待服务器的响应(通过调用 Future 中的 get() 方法),显然会增加延迟,如果客户端使用回调,就会解决这个问题。
  • 如果 acks = all,这种情况下是只有当所有参与复制的节点都收到消息时,生产者才会接收到一个来自服务器的消息。不过,它的延迟比 acks =1 时更高,因为我们要等待不只一个服务器节点接收消息。

buffer.memory

此参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,send() 方法调用要么被阻塞,要么抛出异常,具体取决于 block.on.buffer.null 参数的设置。

compression.type

此参数来表示生产者启用何种压缩算法,默认情况下,消息发送时不会被压缩。该参数可以设置为 snappy、gzip 和 lz4,它指定了消息发送给 broker 之前使用哪一种压缩算法进行压缩。下面是各压缩算法的对比

retries

生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领),在这种情况下,reteis 参数的值决定了生产者可以重发的消息次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者在每次重试之间等待 100ms,这个等待参数可以通过 retry.backoff.ms 进行修改。

batch.size

当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次被填满,批次里的所有消息会被发送出去。不过生产者井不一定都会等到批次被填满才发送,任意条数的消息都可能被发送。

client.id

此参数可以是任意的字符串,服务器会用它来识别消息的来源,一般配置在日志里

max.in.flight.requests.per.connection

此参数指定了生产者在收到服务器响应之前可以发送多少消息,它的值越高,就会占用越多的内存,不过也会提高吞吐量。把它设为1 可以保证消息是按照发送的顺序写入服务器。

timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms 指定了生产者在发送数据时等待服务器返回的响应时间,metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待时间超时,生产者要么重试发送数据,要么返回一个错误。timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配—-如果在指定时间内没有收到同步副本的确认,那么 broker 就会返回一个错误。

max.block.ms

此参数指定了在调用 send() 方法或使用 partitionFor() 方法获取元数据时生产者的阻塞时间当生产者的发送缓冲区已捕,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。

max.request.size

该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息的总大小。

receive.buffer.bytes 和 send.buffer.bytes

Kafka 是基于 TCP 实现的,为了保证可靠的消息传输,这两个参数分别指定了 TCP Socket 接收和发送数据包的缓冲区的大小。如果它们被设置为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值。

Kafka Consumer

应用程序使用 KafkaConsumer 从 Kafka 中订阅主题并接收来自这些主题的消息,然后再把他们保存起来。应用程序首先需要创建一个 KafkaConsumer 对象,订阅主题并开始接受消息,验证消息并保存结果。一段时间后,生产者往主题写入的速度超过了应用程序验证数据的速度,这时候该如何处理?如果只使用单个消费者的话,应用程序会跟不上消息生成的速度,就像多个生产者像相同的主题写入消息一样,这时候就需要多个消费者共同参与消费主题中的消息,对消息进行分流处理。

Kafka 消费者从属于消费者群组。一个群组中的消费者订阅的都是相同的主题,每个消费者接收主题一部分分区的消息。下面是一个 Kafka 分区消费示意图

上图中的主题 T1 有四个分区,分别是分区0、分区1、分区2、分区3,我们创建一个消费者群组1,消费者群组中只有一个消费者,它订阅主题T1,接收到 T1 中的全部消息。由于一个消费者处理四个生产者发送到分区的消息,压力有些大,需要帮手来帮忙分担任务,于是就演变为下图

这样一来,消费者的消费能力就大大提高了,但是在某些环境下比如用户产生消息特别多的时候,生产者产生的消息仍旧让消费者吃不消,那就继续增加消费者。

如上图所示,每个分区所产生的消息能够被每个消费者群组中的消费者消费,如果向消费者群组中增加更多的消费者,那么多余的消费者将会闲置,如下图所示

向群组中增加消费者是横向伸缩消费能力的主要方式。总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。

Kafka 一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组 G2,而这个消费组有两个消费者,那么就演变为下图这样

在这个场景中,消费组 G1 和消费组 G2 都能收到 T1 主题的全量消息,在逻辑意义上来说它们属于不同的应用。

总结起来就是如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者。

消费者组和分区重平衡

消费者组是什么

消费者组(Consumer Group)是由一个或多个消费者实例(Consumer Instance)组成的群组,具有可扩展性和可容错性的一种机制。消费者组内的消费者共享一个消费者组ID,这个ID 也叫做 Group ID,组内的消费者共同对一个主题进行订阅和消费,同一个组中的消费者只能消费一个分区的消息,多余的消费者会闲置,派不上用场。

我们在上面提到了两种消费方式

  • 一个消费者群组消费一个主题中的消息,这种消费模式又称为点对点的消费方式,点对点的消费方式又被称为消息队列
  • 一个主题中的消息被多个消费者群组共同消费,这种消费模式又称为发布-订阅模式

消费者重平衡

我们从上面的消费者演变图中可以知道这么一个过程:最初是一个消费者订阅一个主题并消费其全部分区的消息,后来有一个消费者加入群组,随后又有更多的消费者加入群组,而新加入的消费者实例分摊了最初消费者的部分消息,这种把分区的所有权通过一个消费者转到其他消费者的行为称为重平衡,英文名也叫做 Rebalance 。如下图所示

重平衡非常重要,它为消费者群组带来了高可用性 和 伸缩性,我们可以放心的添加消费者或移除消费者,不过在正常情况下我们并不希望发生这样的行为。在重平衡期间,消费者无法读取消息,造成整个消费者组在重平衡的期间都不可用。另外,当分区被重新分配给另一个消费者时,消息当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。

消费者通过向组织协调者(Kafka Broker)发送心跳来维护自己是消费者组的一员并确认其拥有的分区。对于不同不的消费群体来说,其组织协调者可以是不同的。只要消费者定期发送心跳,就会认为消费者是存活的并处理其分区中的消息。当消费者检索记录或者提交它所消费的记录时就会发送心跳。

如果过了一段时间 Kafka 停止发送心跳了,会话(Session)就会过期,组织协调者就会认为这个 Consumer 已经死亡,就会触发一次重平衡。如果消费者宕机并且停止发送消息,组织协调者会等待几秒钟,确认它死亡了才会触发重平衡。在这段时间里,死亡的消费者将不处理任何消息。在清理消费者时,消费者将通知协调者它要离开群组,组织协调者会触发一次重平衡,尽量降低处理停顿。

重平衡是一把双刃剑,它为消费者群组带来高可用性和伸缩性的同时,还有有一些明显的缺点(bug),而这些 bug 到现在社区还无法修改。

重平衡的过程对消费者组有极大的影响。因为每次重平衡过程中都会导致万物静止,参考 JVM 中的垃圾回收机制,也就是 Stop The World ,STW,(引用自《深入理解 Java 虚拟机》中 p76 关于 Serial 收集器的描述):

更重要的是它在进行垃圾收集时,必须暂停其他所有的工作线程。直到它收集结束。Stop The World 这个名字听起来很帅,但这项工作实际上是由虚拟机在后台自动发起并完成的,在用户不可见的情况下把用户正常工作的线程全部停掉,这对很多应用来说都是难以接受的。

也就是说,在重平衡期间,消费者组中的消费者实例都会停止消费,等待重平衡的完成。而且重平衡这个过程很慢……

创建消费者

上面的理论说的有点多,下面就通过代码来讲解一下消费者是如何消费的

在读取消息之前,需要先创建一个 KafkaConsumer 对象。创建 KafkaConsumer 对象与创建 KafkaProducer 对象十分相似 — 把需要传递给消费者的属性放在 properties 对象中,后面我们会着重讨论 Kafka 的一些配置,这里我们先简单的创建一下,使用3个属性就足矣,分别是 bootstrap.server,key.deserializer,value.deserializer 。

这三个属性我们已经用过很多次了,如果你还不是很清楚的话,可以参考 带你涨姿势是认识一下Kafka Producer

还有一个属性是 group.id 这个属性不是必须的,它指定了 KafkaConsumer 是属于哪个消费者群组。创建不属于任何一个群组的消费者也是可以的

1
2
3
复制代码Properties properties = new Properties();
properties.put("bootstrap.server","192.168.1.9:9092"); properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

主题订阅

创建好消费者之后,下一步就开始订阅主题了。subscribe() 方法接受一个主题列表作为参数,使用起来比较简单

1
复制代码consumer.subscribe(Collections.singletonList("customerTopic"));

为了简单我们只订阅了一个主题 customerTopic,参数传入的是一个正则表达式,正则表达式可以匹配多个主题,如果有人创建了新的主题,并且主题的名字与正则表达式相匹配,那么会立即触发一次重平衡,消费者就可以读取新的主题。

要订阅所有与 test 相关的主题,可以这样做

1
复制代码consumer.subscribe("test.*");

轮询

我们知道,Kafka 是支持订阅/发布模式的,生产者发送数据给 Kafka Broker,那么消费者是如何知道生产者发送了数据呢?其实生产者产生的数据消费者是不知道的,KafkaConsumer 采用轮询的方式定期去 Kafka Broker 中进行数据的检索,如果有数据就用来消费,如果没有就再继续轮询等待,下面是轮询等待的具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, String> record : records) {
int updateCount = 1;
if (map.containsKey(record.value())) {
updateCount = (int) map.get(record.value() + 1);
}
map.put(record.value(), updateCount);
}
}
}finally {
consumer.close();
}
  • 这是一个无限循环。消费者实际上是一个长期运行的应用程序,它通过轮询的方式向 Kafka 请求数据。
  • 第三行代码非常重要,Kafka 必须定期循环请求数据,否则就会认为该 Consumer 已经挂了,会触发重平衡,它的分区会移交给群组中的其它消费者。传给 poll() 方法的是一个超市时间,用 java.time.Duration 类来表示,如果该参数被设置为 0 ,poll() 方法会立刻返回,否则就会在指定的毫秒数内一直等待 broker 返回数据。
  • poll() 方法会返回一个记录列表。每条记录都包含了记录所属主题的信息,记录所在分区的信息、记录在分区中的偏移量,以及记录的键值对。我们一般会遍历这个列表,逐条处理每条记录。
  • 在退出应用程序之前使用 close() 方法关闭消费者。网络连接和 socket 也会随之关闭,并立即触发一次重平衡,而不是等待群组协调器发现它不再发送心跳并认定它已经死亡。

线程安全性

在同一个群组中,我们无法让一个线程运行多个消费者,也无法让多个线程安全的共享一个消费者。按照规则,一个消费者使用一个线程,如果一个消费者群组中多个消费者都想要运行的话,那么必须让每个消费者在自己的线程中运行,可以使用 Java 中的 ExecutorService 启动多个消费者进行进行处理。

消费者配置

到目前为止,我们学习了如何使用消费者 API,不过只介绍了几个最基本的属性,Kafka 文档列出了所有与消费者相关的配置说明。大部分参数都有合理的默认值,一般不需要修改它们,下面我们就来介绍一下这些参数。

  • fetch.min.bytes

该属性指定了消费者从服务器获取记录的最小字节数。broker 在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题使用频率不是很高的时候就不用来回处理消息。如果没有很多可用数据,但消费者的 CPU 使用率很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值调大可以降低 broker 的工作负载。

  • fetch.max.wait.ms

我们通过上面的 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才会把它返回给消费者。而 fetch.max.wait.ms 则用于指定 broker 的等待时间,默认是 500 毫秒。如果没有足够的数据流入 kafka 的话,消费者获取的最小数据量要求就得不到满足,最终导致 500 毫秒的延迟。如果要降低潜在的延迟,就可以把参数值设置的小一些。如果 fetch.max.wait.ms 被设置为 100 毫秒的延迟,而 fetch.min.bytes 的值设置为 1MB,那么 Kafka 在收到消费者请求后,要么返回 1MB 的数据,要么在 100 ms 后返回所有可用的数据。就看哪个条件首先被满足。

  • max.partition.fetch.bytes

该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值时 1MB,也就是说,KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.partition.fetch.bytes 指定的字节。如果一个主题有20个分区和5个消费者,那么每个消费者需要至少4 MB的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。max.partition.fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数(通过 max.message.size 属性配置大),否则消费者可能无法读取这些消息,导致消费者一直挂起重试。 在设置该属性时,另外一个考量的因素是消费者处理数据的时间。消费者需要频繁的调用 poll() 方法来避免会话过期和发生分区再平衡,如果单次调用poll() 返回的数据太多,消费者需要更多的时间进行处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况,可以把 max.partition.fetch.bytes 值改小,或者延长会话过期时间。

  • session.timeout.ms

这个属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。如果消费者没有在 session.timeout.ms 指定的时间内发送心跳给群组协调器,就会被认定为死亡,协调器就会触发重平衡。把它的分区分配给消费者群组中的其它消费者,此属性与 heartbeat.interval.ms 紧密相关。heartbeat.interval.ms 指定了 poll() 方法向群组协调器发送心跳的频率,session.timeout.ms 则指定了消费者可以多久不发送心跳。所以,这两个属性一般需要同时修改,heartbeat.interval.ms 必须比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 应该是 1s。把 session.timeout.ms 值设置的比默认值小,可以更快地检测和恢复崩愤的节点,不过长时间的轮询或垃圾收集可能导致非预期的重平衡。把该属性的值设置得大一些,可以减少意外的重平衡,不过检测节点崩溃需要更长的时间。

  • auto.offset.reset

该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下的该如何处理。它的默认值是 latest,意思指的是,在偏移量无效的情况下,消费者将从最新的记录开始读取数据。另一个值是 earliest,意思指的是在偏移量无效的情况下,消费者将从起始位置处开始读取分区的记录。

  • enable.auto.commit

我们稍后将介绍几种不同的提交偏移量的方式。该属性指定了消费者是否自动提交偏移量,默认值是 true,为了尽量避免出现重复数据和数据丢失,可以把它设置为 false,由自己控制何时提交偏移量。如果把它设置为 true,还可以通过 auto.commit.interval.ms 属性来控制提交的频率

  • partition.assignment.strategy

我们知道,分区会分配给群组中的消费者。PartitionAssignor 会根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者,Kafka 有两个默认的分配策略Range 和 RoundRobin

  • client.id

该属性可以是任意字符串,broker 用他来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额中

  • max.poll.records

该属性用于控制单次调用 call() 方法能够返回的记录数量,可以帮你控制在轮询中需要处理的数据量。

  • receive.buffer.bytes 和 send.buffer.bytes

socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设置为 -1,就使用操作系统默认值。如果生产者或消费者与 broker 处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

提交和偏移量的概念

特殊偏移

我们上面提到,消费者在每次调用poll() 方法进行定时轮询的时候,会返回由生产者写入 Kafka 但是还没有被消费者消费的记录,因此我们可以追踪到哪些记录是被群组里的哪个消费者读取的。消费者可以使用 Kafka 来追踪消息在分区中的位置(偏移量)

消费者会向一个叫做 _consumer_offset 的特殊主题中发送消息,这个主题会保存每次所发送消息中的分区偏移量,这个主题的主要作用就是消费者触发重平衡后记录偏移使用的,消费者每次向这个主题发送消息,正常情况下不触发重平衡,这个主题是不起作用的,当触发重平衡后,消费者停止工作,每个消费者可能会分到对应的分区,这个主题就是让消费者能够继续处理消息所设置的。

如果提交的偏移量小于客户端最后一次处理的偏移量,那么位于两个偏移量之间的消息就会被重复处理

如果提交的偏移量大于最后一次消费时的偏移量,那么处于两个偏移量中间的消息将会丢失

既然_consumer_offset 如此重要,那么它的提交方式是怎样的呢?下面我们就来说一下####提交方式

KafkaConsumer API 提供了多种方式来提交偏移量

自动提交

最简单的方式就是让消费者自动提交偏移量。如果 enable.auto.commit 被设置为true,那么每过 5s,消费者会自动把从 poll() 方法轮询到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认是 5s。与消费者里的其他东西一样,自动提交也是在轮询中进行的。消费者在每次轮询中会检查是否提交该偏移量了,如果是,那么就会提交从上一次轮询中返回的偏移量。

提交当前偏移量

把 auto.commit.offset 设置为 false,可以让应用程序决定何时提交偏移量。使用 commitSync() 提交偏移量。这个 API 会提交由 poll() 方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。

commitSync() 将会提交由 poll() 返回的最新偏移量,如果处理完所有记录后要确保调用了 commitSync(),否则还是会有丢失消息的风险,如果发生了在均衡,从最近一批消息到发生在均衡之间的所有消息都将被重复处理。

异步提交

异步提交 commitAsync() 与同步提交 commitSync() 最大的区别在于异步提交不会进行重试,同步提交会一致进行重试。

同步和异步组合提交

一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大的问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但是如果在关闭消费者或再均衡前的最后一次提交,就要确保提交成功。

因此,在消费者关闭之前一般会组合使用commitAsync和commitSync提交偏移量。

提交特定的偏移量

消费者API允许调用 commitSync() 和 commitAsync() 方法时传入希望提交的 partition 和 offset 的 map,即提交特定的偏移量。

下面为自己做个宣传,欢迎关注公众号 Java建设者,号主是Java技术栈,热爱技术,喜欢阅读,热衷于分享和总结,希望能把每一篇好文章分享给成长道路上的你。
关注公众号回复 002 领取为你特意准备的大礼包,你一定会喜欢并收藏的。

文章参考:

Kafka史上最详细原理总结

《Kafka 权威指南》

kafka.apache.org/

kafka.apache.org/documentati…

www.tutorialkart.com/apache-kafk…

dzone.com/articles/wh…

《极客时间 - Kafka 核心技术与实战》

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

从硬件入手深入理解epoll 的本质 一、从网卡接收数据说起

发表于 2019-11-28

这篇文章从硬件开始说起,从根上理解数据传输的过程,还不错,收藏起来。

原文链接

从事服务端开发,少不了要接触网络编程。epoll 作为 Linux 下高性能网络服务器的必备技术至关重要,nginx、Redis、Skynet 和大部分游戏服务器都使用到这一多路复用技术。

epoll 很重要,但是 epoll 与 select 的区别是什么呢?epoll 高效的原因是什么?

网上虽然也有不少讲解 epoll 的文章,但要么是过于浅显,或者陷入源码解析,很少能有通俗易懂的。笔者于是决定编写此文,让缺乏专业背景知识的读者也能够明白 epoll 的原理。

文章核心思想是:要让读者清晰明白 epoll 为什么性能好。

本文会从网卡接收数据的流程讲起,串联起 CPU 中断、操作系统进程调度等知识;再一步步分析阻塞接收数据、select 到 epoll 的进化过程;最后探究 epoll 的实现细节。

一、从网卡接收数据说起

下边是一个典型的计算机结构图,计算机由 CPU、存储器(内存)与网络接口等部件组成,了解 epoll 本质的第一步,要从硬件的角度看计算机怎样接收网络数据。

下图展示了网卡接收数据的过程。

  • 在 ① 阶段,网卡收到网线传来的数据;
  • 经过 ② 阶段的硬件电路的传输;
  • 最终 ③ 阶段将数据写入到内存中的某个地址上。

这个过程涉及到 DMA 传输、IO 通路选择等硬件有关的知识,但我们只需知道:网卡会把接收到的数据写入内存。

网卡接收数据的过程

通过硬件传输,网卡接收的数据存放到内存中,操作系统就可以去读取它们。

二、如何知道接收了数据?

了解 epoll 本质的第二步,要从 CPU 的角度来看数据接收。理解这个问题,要先了解一个概念——中断。

计算机执行程序时,会有优先级的需求。比如,当计算机收到断电信号时,它应立即去保存数据,保存数据的程序具有较高的优先级(电容可以保存少许电量,供 CPU 运行很短的一小段时间)。

一般而言,由硬件产生的信号需要 CPU 立马做出回应,不然数据可能就丢失了,所以它的优先级很高。CPU 理应中断掉正在执行的程序,去做出响应;当 CPU 完成对硬件的响应后,再重新执行用户程序。中断的过程如下图,它和函数调用差不多,只不过函数调用是事先定好位置,而中断的位置由“信号”决定。

中断程序调用

以键盘为例,当用户按下键盘某个按键时,键盘会给 CPU 的中断引脚发出一个高电平,CPU 能够捕获这个信号,然后执行键盘中断程序。下图展示了各种硬件通过中断与 CPU 交互的过程。

现在可以回答“如何知道接收了数据?”这个问题了:当网卡把数据写入到内存后,网卡向 CPU 发出一个中断信号,操作系统便能得知有新数据到来,再通过网卡中断程序去处理数据。
三、进程阻塞为什么不占用 CPU 资源?
====================

了解 epoll 本质的第三步,要从操作系统进程调度的角度来看数据接收。阻塞是进程调度的关键一环,指的是进程在等待某事件(如接收到网络数据)发生之前的等待状态,recv、select 和 epoll 都是阻塞方法。下边分析一下进程阻塞为什么不占用 CPU 资源?

为简单起见,我们从普通的 recv 接收开始分析,先看看下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码
//创建socket
int s = socket(AF_INET, SOCK_STREAM, 0);
//绑定
bind(s, ...)
//监听
listen(s, ...)
//接受客户端连接
int c = accept(s, ...)
//接收客户端数据
recv(c, ...);
//将数据打印出来
printf(...)

这是一段最基础的网络编程代码,先新建 socket 对象,依次调用 bind、listen 与 accept,最后调用 recv 接收数据。recv 是个阻塞方法,当程序运行到 recv 时,它会一直等待,直到接收到数据才往下执行。

那么阻塞的原理是什么?

工作队列

操作系统为了支持多任务,实现了进程调度的功能,会把进程分为“运行”和“等待”等几种状态。运行状态是进程获得 CPU 使用权,正在执行代码的状态;等待状态是阻塞状态,比如上述程序运行到 recv 时,程序会从运行状态变为等待状态,接收到数据后又变回运行状态。操作系统会分时执行各个运行状态的进程,由于速度很快,看上去就像是同时执行多个任务。

下图的计算机中运行着 A、B 与 C 三个进程,其中进程 A 执行着上述基础网络程序,一开始,这 3 个进程都被操作系统的工作队列所引用,处于运行状态,会分时执行。

等待队列

当进程 A 执行到创建 socket 的语句时,操作系统会创建一个由文件系统管理的 socket 对象(如下图)。这个 socket 对象包含了发送缓冲区、接收缓冲区与等待队列等成员。等待队列是个非常重要的结构,它指向所有需要等待该 socket 事件的进程。

当程序执行到 recv 时,操作系统会将进程 A 从工作队列移动到该 socket 的等待队列中(如下图)。由于工作队列只剩下了进程 B 和 C,依据进程调度,CPU 会轮流执行这两个进程的程序,不会执行进程 A 的程序。所以进程 A 被阻塞,不会往下执行代码,也不会占用 CPU 资源。

注:操作系统添加等待队列只是添加了对这个“等待中”进程的引用,以便在接收到数据时获取进程对象、将其唤醒,而非直接将进程管理纳入自己之下。上图为了方便说明,直接将进程挂到等待队列之下。

唤醒进程

当 socket 接收到数据后,操作系统将该 socket 等待队列上的进程重新放回到工作队列,该进程变成运行状态,继续执行代码。同时由于 socket 的接收缓冲区已经有了数据,recv 可以返回接收到的数据。

四、内核接收网络数据全过程

这一步,贯穿网卡、中断与进程调度的知识,叙述阻塞 recv 下,内核接收数据的全过程。

如下图所示,进程在 recv 阻塞期间,计算机收到了对端传送的数据(步骤①),数据经由网卡传送到内存(步骤②),然后网卡通过中断信号通知 CPU 有数据到达,CPU 执行中断程序(步骤③)。

此处的中断程序主要有两项功能,先将网络数据写入到对应 socket 的接收缓冲区里面(步骤④),再唤醒进程 A(步骤⑤),重新将进程 A 放入工作队列中。

内核接收数据全过程

唤醒进程的过程如下图所示:

唤醒进程

以上是内核接收数据全过程,这里我们可能会思考两个问题:

  • 其一,操作系统如何知道网络数据对应于哪个 socket?
  • 其二,如何同时监视多个 socket 的数据?
    第一个问题:因为一个 socket 对应着一个端口号,而网络数据包中包含了 ip 和端口的信息,内核可以通过端口号找到对应的 socket。当然,为了提高处理速度,操作系统会维护端口号到 socket 的索引结构,以快速读取。

第二个问题是多路复用的重中之重,也正是本文后半部分的重点。

五、同时监视多个 socket 的简单方法

服务端需要管理多个客户端连接,而 recv 只能监视单个 socket,这种矛盾下,人们开始寻找监视多个 socket 的方法。epoll 的要义就是高效地监视多个 socket。

从历史发展角度看,必然先出现一种不太高效的方法,人们再加以改进,正如 select 之于 epoll。

先理解不太高效的 select,才能够更好地理解 epoll 的本质。

假如能够预先传入一个 socket 列表,如果列表中的 socket 都没有数据,挂起进程,直到有一个 socket 收到数据,唤醒进程。这种方法很直接,也是 select 的设计思想。

为方便理解,我们先复习 select 的用法。在下边的代码中,先准备一个数组 fds,让 fds 存放着所有需要监视的 socket。然后调用 select,如果 fds 中的所有 socket 都没有数据,select 会阻塞,直到有一个 socket 接收到数据,select 返回,唤醒进程。用户可以遍历 fds,通过 FD_ISSET 判断具体哪个 socket 收到数据,然后做出处理。

1
2
3
4
5
6
7
8
9
10
11
复制代码int s = socket(AF_INET, SOCK_STREAM, 0);  
bind(s, ...);
listen(s, ...);
int fds[] = 存放需要监听的socket;
while(1){
int n = select(..., fds, ...)
for(int i=0; i < fds.count; i++){
if(FD_ISSET(fds[i], ...)){
//fds[i]的数据处理
}
}}

select 的流程

select 的实现思路很直接,假如程序同时监视如下图的 sock1、sock2 和 sock3 三个 socket,那么在调用 select 之后,操作系统把进程 A 分别加入这三个 socket 的等待队列中。

操作系统把进程 A 分别加入这三个 socket 的等待队列中

当任何一个 socket 收到数据后,中断程序将唤起进程。下图展示了 sock2 接收到了数据的处理流程:

注:recv 和 select 的中断回调可以设置成不同的内容。

sock2 接收到了数据,中断程序唤起进程 A

所谓唤起进程,就是将进程从所有的等待队列中移除,加入到工作队列里面,如下图所示:

将进程 A 从所有等待队列中移除,再加入到工作队列里面

经由这些步骤,当进程 A 被唤醒后,它知道至少有一个 socket 接收了数据。程序只需遍历一遍 socket 列表,就可以得到就绪的 socket。

这种简单方式行之有效,在几乎所有操作系统都有对应的实现。

但是简单的方法往往有缺点,主要是:

其一,每次调用 select 都需要将进程加入到所有监视 socket 的等待队列,每次唤醒都需要从每个队列中移除。这里涉及了两次遍历,而且每次都要将整个 fds 列表传递给内核,有一定的开销。正是因为遍历操作开销大,出于效率的考量,才会规定 select 的最大监视数量,默认只能监视 1024 个 socket。

其二,进程被唤醒后,程序并不知道哪些 socket 收到数据,还需要遍历一次。

那么,有没有减少遍历的方法?有没有保存就绪 socket 的方法?这两个问题便是 epoll 技术要解决的。

补充说明: 本节只解释了 select 的一种情形。当程序调用 select 时,内核会先遍历一遍 socket,如果有一个以上的 socket 接收缓冲区有数据,那么 select 直接返回,不会阻塞。这也是为什么 select 的返回值有可能大于 1 的原因之一。如果没有 socket 有数据,进程才会阻塞。

六、epoll 的设计思路

epoll 是在 select 出现 N 多年后才被发明的,是 select 和 poll(poll 和 select 基本一样,有少量改进)的增强版本。epoll 通过以下一些措施来改进效率:

措施一:功能分离

select 低效的原因之一是将“维护等待队列”和“阻塞进程”两个步骤合二为一。如下图所示,每次调用 select 都需要这两步操作,然而大多数应用场景中,需要监视的 socket 相对固定,并不需要每次都修改。epoll 将这两个操作分开,先用 epoll_ctl 维护等待队列,再调用 epoll_wait 阻塞进程。显而易见地,效率就能得到提升。

相比 select,epoll 拆分了功能
为方便理解后续的内容,我们先了解一下 epoll 的用法。如下的代码中,先用 epoll_create 创建一个 epoll 对象 epfd,再通过 epoll_ctl 将需要监视的 socket 添加到 epfd 中,最后调用 epoll_wait 等待数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码int s = socket(AF_INET, SOCK_STREAM, 0);   
bind(s, ...)
listen(s, ...)

int epfd = epoll_create(...);
epoll_ctl(epfd, ...); //将所有需要监听的socket添加到epfd中

while(1){
int n = epoll_wait(...)
for(接收到数据的socket){
//处理
}
}

功能分离,使得 epoll 有了优化的可能。

###措施二:就绪列表

select 低效的另一个原因在于程序不知道哪些 socket 收到数据,只能一个个遍历。如果内核维护一个“就绪列表”,引用收到数据的 socket,就能避免遍历。如下图所示,计算机共有三个 socket,收到数据的 sock2 和 sock3 被就绪列表 rdlist 所引用。当进程被唤醒后,只要获取 rdlist 的内容,就能够知道哪些 socket 收到数据。

就绪列表示意图

七、epoll 的原理与工作流程

本节会以示例和图表来讲解 epoll 的原理和工作流程。

创建 epoll 对象

如下图所示,当某个进程调用 epoll_create 方法时,内核会创建一个 eventpoll 对象(也就是程序中 epfd 所代表的对象)。eventpoll 对象也是文件系统中的一员,和 socket 一样,它也会有等待队列。

内核创建 eventpoll 对象

创建一个代表该 epoll 的 eventpoll 对象是必须的,因为内核要维护“就绪列表”等数据,“就绪列表”可以作为 eventpoll 的成员。

维护监视列表

创建 epoll 对象后,可以用 epoll_ctl 添加或删除所要监听的 socket。以添加 socket 为例,如下图,如果通过 epoll_ctl 添加 sock1、sock2 和 sock3 的监视,内核会将 eventpoll 添加到这三个 socket 的等待队列中。

添加所要监听的 socket

当 socket 收到数据后,中断程序会操作 eventpoll 对象,而不是直接操作进程。

接收数据

当 socket 收到数据后,中断程序会给 eventpoll 的“就绪列表”添加 socket 引用。如下图展示的是 sock2 和 sock3 收到数据后,中断程序让 rdlist 引用这两个 socket。

给就绪列表添加引用

eventpoll 对象相当于 socket 和进程之间的中介,socket 的数据接收并不直接影响进程,而是通过改变 eventpoll 的就绪列表来改变进程状态。

当程序执行到 epoll_wait 时,如果 rdlist 已经引用了 socket,那么 epoll_wait 直接返回,如果 rdlist 为空,阻塞进程。

阻塞和唤醒进程

假设计算机中正在运行进程 A 和进程 B,在某时刻进程 A 运行到了 epoll_wait 语句。如下图所示,内核会将进程 A 放入 eventpoll 的等待队列中,阻塞进程。

epoll_wait 阻塞进程

当 socket 接收到数据,中断程序一方面修改 rdlist,另一方面唤醒 eventpoll 等待队列中的进程,进程 A 再次进入运行状态(如下图)。也因为 rdlist 的存在,进程 A 可以知道哪些 socket 发生了变化。

epoll 唤醒进程

八、epoll 的实现细节

至此,相信读者对 epoll 的本质已经有一定的了解。但我们还需要知道 eventpoll 的数据结构是什么样子?

此外,就绪队列应该应使用什么数据结构?eventpoll 应使用什么数据结构来管理通过 epoll_ctl 添加或删除的 socket?

如下图所示,eventpoll 包含了 lock、mtx、wq(等待队列)与 rdlist 等成员,其中 rdlist 和 rbr 是我们所关心的。

epoll 原理示意图,图片来源:《深入理解Nginx:模块开发与架构解析(第二版)》,陶辉

就绪列表的数据结构

就绪列表引用着就绪的 socket,所以它应能够快速的插入数据。

程序可能随时调用 epoll_ctl 添加监视 socket,也可能随时删除。当删除时,若该 socket 已经存放在就绪列表中,它也应该被移除。所以就绪列表应是一种能够快速插入和删除的数据结构。

双向链表就是这样一种数据结构,epoll 使用双向链表来实现就绪队列(对应上图的 rdllist)。

索引结构

既然 epoll 将“维护监视队列”和“进程阻塞”分离,也意味着需要有个数据结构来保存监视的 socket,至少要方便地添加和移除,还要便于搜索,以避免重复添加。红黑树是一种自平衡二叉查找树,搜索、插入和删除时间复杂度都是O(log(N)),效率较好,epoll 使用了红黑树作为索引结构(对应上图的 rbr)。

注:因为操作系统要兼顾多种功能,以及由更多需要保存的数据,rdlist 并非直接引用 socket,而是通过 epitem 间接引用,红黑树的节点也是 epitem 对象。同样,文件系统也并非直接引用着 socket。为方便理解,本文中省略了一些间接结构。

九、小结

epoll 在 select 和 poll 的基础上引入了 eventpoll 作为中间层,使用了先进的数据结构,是一种高效的多路复用技术。这里也以表格形式简单对比一下 select、poll 与 epoll,结束此文。希望读者能有所收获。

oscimg.oschina.net/oscnet/7159…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一步一步地了解SpringBean的生命周期 SpringB

发表于 2019-11-27

SpringBean的生命周期

在面试中,我们经常会被问到一个问题,就是SpringBean的生命周期。用大白话说,就是说其在创造到销毁按顺序调用了什么方法,在我刚开始学了,一般就是对着标准答案去硬背,完全不了解其意思,也十分容易忘记。

流程图如下

这样看上去十分的复杂,记住也很容易忘掉。

所以,我下面会用代码的方式一步步来模拟SpringBean的工作流程,做到深入了解,这样就不会再次忘记这个知识点了。

既然都说Bean对象Bean对象,那么SpringBean自然也是一个对象了,我们用一个简单的对象来说明,那么要创建对象就需要有构造方法,对象还会有它的属性跟get、set方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码    private String field;

public SpringBean() {
System.out.println("SpringBean 构造方法");
}

public String getField() {
System.out.println("SpringBean get方法");
return field;
}

public void setField(String field) {
System.out.println("SpringBean set方法");
this.field = field;
}

众所周知要创建一个SpringBean的话还要在配置文件里去声明这个Beam,当然也可以用注解的方式。

1
2
3
复制代码    <bean class="SpringBean">
<property name="field" value="test" />
</bean>

然后我们去初始化这个容器看看,运行结果是什么

1
2
3
4
复制代码public static void main(String[] args) {
ClassPathXmlApplicationContext applicationContext =
new ClassPathXmlApplicationContext("spring.xml");
}

这里我们可以看到当我们在容器里面加载bean的时候,它会依次调用构造方法和set方法,这两步我相信大部分人都知道。但是除此之外我们想一想,我们的Spring它是否还干了别的事情,我们接下来看一下。

如果这个Bean它实现了一些Aware接口的话,它就会注入和bean容器基础属性层面相关的信息。比如实现了BeanNameAware接口,我们要重写它的setBeanName方法,在配置文件中把这个Bean的id设置一下

1
2
3
复制代码 public void setBeanName(String s) {
System.out.println("setBeanName:"+s);
}
1
2
3
复制代码<bean id="BeanName" class="SpringBean">
<property name="field" value="test" />
</bean>

运行一下看会发生什么

剩下的BeanFactoryAware和ApplicationContextAware接口同理,我们直接看结果

我们可以看到先执行的是setBeanFactory方法然后是setApplicationContext方法

下面我们再创建一个新的Bean,这个Bean是做全局的前置和后置初始化的,也就是我们看面试题时所说的前置处理器和后置处理器。

这个Bean我们实现BeanPostProcessor接口

1
2
3
4
5
6
7
8
9
复制代码public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
System.out.println("postProcessBeforeInitialization");
return bean;
}

public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
System.out.println("postProcessAfterInitialization");
return null;
}

千万不要忘记要在配置文件里声明这个Bean

1
复制代码<bean class="SpringProccesserBean"></bean>

然后我们再运行试试

不出我们所料,前置方法和后置方法依次在后面运行了,难道这就是Bean的全部了吗?呵呵,太天真了,如果在Bean中有实现InitializingBean接口,那又是不同的说法了。我们下面除了重写了afterPropertiesSet方法以外我们还可以在bean里面指定它的init方法。

1
2
3
4
5
6
7
复制代码 public void afterPropertiesSet() throws Exception {
System.out.println("afterPropertiesSet");
}

public void init(){
System.out.println("init");
}

不要忘记配置文件

1
2
3
复制代码<bean id="BeanName" class="SpringBean" init-method="init">
<property name="field" value="test" />
</bean>

可以看到,这两个方法是夹在前置处理器和后置处理器之间的,先执行的是InitializingBean接口的afterPropertiesSet方法然后是Bean自身的init方法。

至此,一个Bean的初始化环节就完成了,我们就可以去使用这个Bean了。

既然是说Bean的生命周期,那当然还有Bean的销毁流程了,Bean的销毁流程一共分为连个步骤,一个步骤是实现销毁接口DisposableBean,重写它的销毁方法destroy,还有一个是Bean自身的destroy方法。

1
2
3
4
5
6
7
复制代码public void destroy() throws Exception {
System.out.println("DisposableBean");
}

public void des(){
System.out.println("des");
}

记得要在配置文件里定义

1
2
3
复制代码<bean id="BeanName" class="SpringBean" init-method="init" destroy-method="des">
<property name="field" value="test" />
</bean>

这两个方法会在关闭容易的时候调用,因此我们调用close方法关闭容器

1
2
3
4
5
6
7
复制代码public static void main(String[] args) {
ClassPathXmlApplicationContext applicationContext =
new ClassPathXmlApplicationContext("spring.xml");

applicationContext.close();

}

运行结果

我们看到是先调用DisposableBean接口的destroy方法然后是Bean自身的销毁方法。

这就是SpringBean的生命周期。

下面我们来总结一下

Spring Bean的生命周期

1
2
3
4
5
6
7
8
9
10
11
12
复制代码    1、实例化Bean对象
2、设置Bean属性
3、如果通过各种Aware接口声明了依赖关系,则会注入Bean对容器基础设施层面的依赖。
Aware接口集体包括BeanNameAware、BeanFactoryAware和ApplicationContextAware
分别注入Bean ID、Bean Factory 和ApplicationContext
4、如果实现了BeanPostProcesser,调用BeanPostProcesser的前置初始化方法postProcessBeforeInitialization
5、如果实现了InitializingBean接口,则会调用afterPropertiesSet方法
6、调用Bean自身定义的init方法
7、调用BeanPostProcesser的后置方法postProcessAfterInitialization
创建完毕
销毁
8、容器关闭前调用DisposableBean的destroy方法和自身的destroy方法

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

全网最通俗易懂的【短链接】入门

发表于 2019-11-27

前言

只有光头才能变强。

文本已收录至我的GitHub仓库,欢迎Star:github.com/ZhongFuChen…

最近接了一个需求,涉及到了短链接的相关的知识,于是去查阅了相关的资料,在这里给大家整理分享一下。

我帮阿里云推广服务器89/年,229/3年,买来送自己,送女朋友马上过年再合适不过了,买了搭建个项目给面试官看也香,还可以熟悉技术栈,(老用户用家人账号买就好了,我用我女朋友的😂)。扫码或者点击购买

搭建教程,从0开始一步一步带你搭建😂

一、短链接介绍

举个例子,现在我的GitHub的地址是这个:https://github.com/ZhongFuCheng3y/3y (36个字符)

我通过百度的短链接服务可以将上面的地址转成https://dwz.cn/LwlrfG4j(23个字符)

转短链接

那我为什么要将原有的URL转成较短的链接呢?比如我们发短信提醒用户去XXX,XXX有优惠活动,在文案上往往会带有一个链接进行跳转,方便用户快速去到对应的活动落地页。

而短信的发送是需要成本的,短信的成本主要有两方面组成:

  1. 发送的人数(发的人越多,自然短信的花费就越大,这个我就不解释了)
  2. 短信发送的字数(比如,文案总字数超过70个字,那就算两条短信计费,超过140个字就算三条短信计费)

所以在发送短信给用户时:要么就投放更加精准优质的用户,以便控制好发送的数量,要么就尽可能控制文案的字数。

显然,如果在短信上配上普通的URL,那真正的文案可写的字数就没多少了。于是我们可以发现,各大公司的短信推送的URL都是短链接。

短链接案例之一

比如在一些平台发布消息时会限制字数,如果我们的发的URL过长就很容易就被限制住了:

限制字数

使用短链接的好处:短、字符少、美观、便于发布、传播。

二、短链接它是怎么干的呢?

我们先回到生成好的短链上https://dwz.cn/LwlrfG4j

虽然这个链接看起来有点奇怪,但他终究还是一个链接,从URL的特征我们可以分出:

  • dwz.cn是域名
  • LwlrfG4j是参数

域名

我们在浏览器请求一下短链接看看是什么情况:

302跳转

短链接的原理其实就是:

  • 将长链接通过一定的手段生成一个短链接
  • 访问短链接时实际访问的是短链接服务器,然后根据短链接的参数找回对应的长链接
  • 重定向跳转

大致原理图

2.1 核心的要解决的问题

通过上面的分析我们可以知道的是,我们实际核心要做的是怎么从LwlrfG4j类似这样的参数找到对应的完整URL:https://github.com/ZhongFuCheng3y/3y

脑子第一时间想到的是:能不能通过一个压缩算法将https://github.com/ZhongFuCheng3y/3y压缩更小的字符?

显然,不能,压缩算法大多数都是针对大文本才奏效,本身的URL也不见得有多大…压缩出来肯定比原来的URL还大。

脑子第二时间想到的是:能不能用Hash算法?还是不能,用Hash存在哈希碰撞的问题

  • 什么是哈希碰撞?两个不相同的字符串(值)进行Hash操作后,得到的哈希值相同。
  • 这就意味着,两个完全不同的长链得到的哈希值一模一样,而我的短链是依赖哈希值去找到长链的(此时一个短链对应多个长链,这不合理)。

脑子第三时间想到的是?脑子想不到了。

现在业内用得比较多的是发号器(ID自增)+62进制编码:

  • 比如,我将https://github.com/ZhongFuCheng3y/3y看作是10000,然后将10000进行62进制编码得到的结果是:2Bi

那我的短链URL就可以弄成https://3y.cn/2Bi,其中3y.cn是域名,2Bi是经过62进制转换后的参数。

为什么要用62进制转换?64进制转换倒是听得多了

  • 62进制转换是因为62进制转换后只含数字+小写+大写字母。而64进制转换会含有/,+这样的符号(不符合正常URL的字符)
  • 10进制转62进制可以缩短字符,如果我们要6位字符的话,已经有560亿个组合了。

6位字符

总结:

  • ID自增后,转成62进制,在DB保存映射关系,生成短链接

短链接过程

三、短信的链接直接跳转到APP

以下内容来源:sq.163yun.com/blog/articl… ,作者:西西吹雪

综合起来就是:

  • 通过 Deep Links(iOS 则是Universal Links),可以实现点击短信链接直接唤起 App;
  • 如果系统因为各种原因不支持 Deep Links,备选方案是 intent filter,不过会出弹框让用户选择用哪个 App 打开链接;
  • 如果用户没有选择我们的 App 而是选择了浏览器打开,则通过 自定义 scheme 尝试唤起 App;
  • 由于技术和成本问题,我们忽略不支持 自定义 scheme 的浏览器。

短信链接唤醒APP

最后

这篇文章主要是简单了解一下短链接的相关知识,一个完备的短链服务肯定还要考虑更多的事,这里我就不展开了(毕竟我也没真正写过,可以在下方的链接继续学习)~

更多资料查阅:

  • www.zhihu.com/question/29…
  • hufangyun.com/2017/short-…
  • blog.csdn.net/c10WTiybQ1Y…

本已收录至我的GitHub精选文章,欢迎Star:github.com/ZhongFuChen…

乐于输出干货的Java技术公众号:Java3y。公众号内有300多篇原创技术文章、海量视频资源、精美脑图,关注即可获取!

转发到朋友圈是对我最大的支持!

非常感谢人才们能看到这里,如果这个文章写得还不错,觉得「三歪」我有点东西的话 求点赞 求关注️ 求分享👥 求留言💬 对暖男我来说真的 非常有用!!!

创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

读写分离很难吗?springboot结合aop简单就实现了

发表于 2019-11-27

前言

入职新公司到现在也有一个月了,完成了手头的工作,前几天终于有时间研究下公司旧项目的代码。在研究代码的过程中,发现项目里用到了Spring Aop来实现数据库的读写分离,本着自己爱学习(我自己都不信…)的性格,决定写个实例工程来实现spring aop读写分离的效果。

环境部署

数据库:MySql

库数量:2个,一主一从

关于mysql的主从环境部署之前已经写过文章介绍过了,这里就不再赘述,参考《手把手教你,如何在windows系统搭建mysql主从复制的环境》

开始项目

首先,毫无疑问,先开始搭建一个SpringBoot工程,然后在pom文件中引入如下依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
复制代码<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper-spring-boot-starter</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
<!-- 动态数据源 所需依赖 ### start-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
<scope>provided</scope>
</dependency>
<!-- 动态数据源 所需依赖 ### end-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
</dependencies>

目录结构

引入基本的依赖后,整理一下目录结构,完成后的项目骨架大致如下:

建表

创建一张表user,在主库执行sql语句同时在从库生成对应的表数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`user_id` bigint(20) NOT NULL COMMENT '用户id',
`user_name` varchar(255) DEFAULT '' COMMENT '用户名称',
`user_phone` varchar(50) DEFAULT '' COMMENT '用户手机',
`address` varchar(255) DEFAULT '' COMMENT '住址',
`weight` int(3) NOT NULL DEFAULT '1' COMMENT '权重,大者优先',
`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_at` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `user` VALUES ('1196978513958141952', '测试1', '18826334748', '广州市海珠区', '1', '2019-11-20 10:28:51', '2019-11-22 14:28:26');
INSERT INTO `user` VALUES ('1196978513958141953', '测试2', '18826274230', '广州市天河区', '2', '2019-11-20 10:29:37', '2019-11-22 14:28:14');
INSERT INTO `user` VALUES ('1196978513958141954', '测试3', '18826273900', '广州市天河区', '1', '2019-11-20 10:30:19', '2019-11-22 14:28:30');

主从数据源配置

application.yml,主要信息是主从库的数据源配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码server:
port: 8001
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
master:
url: jdbc:mysql://127.0.0.1:3307/user?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false&useSSL=false&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true
username: root
password:
slave:
url: jdbc:mysql://127.0.0.1:3308/user?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false&useSSL=false&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true
username: root
password:

因为有一主一从两个数据源,我们用枚举类来代替,方便我们使用时能对应

1
2
3
4
5
6
7
8
9
复制代码@Getter
public enum DynamicDataSourceEnum {
MASTER("master"),
SLAVE("slave");
private String dataSourceName;
DynamicDataSourceEnum(String dataSourceName) {
this.dataSourceName = dataSourceName;
}
}

数据源配置信息类 DataSourceConfig,这里配置了两个数据源,masterDb和slaveDb

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
复制代码@Configuration
@MapperScan(basePackages = "com.xjt.proxy.mapper", sqlSessionTemplateRef = "sqlTemplate")
public class DataSourceConfig {

// 主库
@Bean
@ConfigurationProperties(prefix = "spring.datasource.master")
public DataSource masterDb() {
return DruidDataSourceBuilder.create().build();
}

/**
* 从库
*/
@Bean
@ConditionalOnProperty(prefix = "spring.datasource", name = "slave", matchIfMissing = true)
@ConfigurationProperties(prefix = "spring.datasource.slave")
public DataSource slaveDb() {
return DruidDataSourceBuilder.create().build();
}

/**
* 主从动态配置
*/
@Bean
public DynamicDataSource dynamicDb(@Qualifier("masterDb") DataSource masterDataSource,
@Autowired(required = false) @Qualifier("slaveDb") DataSource slaveDataSource) {
DynamicDataSource dynamicDataSource = new DynamicDataSource();
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put(DynamicDataSourceEnum.MASTER.getDataSourceName(), masterDataSource);
if (slaveDataSource != null) {
targetDataSources.put(DynamicDataSourceEnum.SLAVE.getDataSourceName(), slaveDataSource);
}
dynamicDataSource.setTargetDataSources(targetDataSources);
dynamicDataSource.setDefaultTargetDataSource(masterDataSource);
return dynamicDataSource;
}
@Bean
public SqlSessionFactory sessionFactory(@Qualifier("dynamicDb") DataSource dynamicDataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setMapperLocations(
new PathMatchingResourcePatternResolver().getResources("classpath*:mapper/*Mapper.xml"));
bean.setDataSource(dynamicDataSource);
return bean.getObject();
}
@Bean
public SqlSessionTemplate sqlTemplate(@Qualifier("sessionFactory") SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
@Bean(name = "dataSourceTx")
public DataSourceTransactionManager dataSourceTx(@Qualifier("dynamicDb") DataSource dynamicDataSource) {
DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager();
dataSourceTransactionManager.setDataSource(dynamicDataSource);
return dataSourceTransactionManager;
}
}

设置路由

设置路由的目的为了方便查找对应的数据源,我们可以用ThreadLocal保存数据源的信息到每个线程中,方便我们需要时获取

1
2
3
4
5
6
7
8
9
10
11
12
复制代码public class DataSourceContextHolder {
private static final ThreadLocal<String> DYNAMIC_DATASOURCE_CONTEXT = new ThreadLocal<>();
public static void set(String datasourceType) {
DYNAMIC_DATASOURCE_CONTEXT.set(datasourceType);
}
public static String get() {
return DYNAMIC_DATASOURCE_CONTEXT.get();
}
public static void clear() {
DYNAMIC_DATASOURCE_CONTEXT.remove();
}
}

获取路由

1
2
3
4
5
6
复制代码public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContextHolder.get();
}
}

AbstractRoutingDataSource的作用是基于查找key路由到对应的数据源,它内部维护了一组目标数据源,并且做了路由key与目标数据源之间的映射,提供基于key查找数据源的方法。

数据源的注解

为了可以方便切换数据源,我们可以写一个注解,注解中包含数据源对应的枚举值,默认是主库,

1
2
3
4
5
6
7
8
复制代码@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Documented
public @interface DataSourceSelector {

DynamicDataSourceEnum value() default DynamicDataSourceEnum.MASTER;
boolean clear() default true;
}

aop切换数据源

到这里,aop终于可以现身出场了,这里我们定义一个aop类,对有注解的方法做切换数据源的操作,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
复制代码@Slf4j
@Aspect
@Order(value = 1)
@Component
public class DataSourceContextAop {

@Around("@annotation(com.xjt.proxy.dynamicdatasource.DataSourceSelector)")
public Object setDynamicDataSource(ProceedingJoinPoint pjp) throws Throwable {
boolean clear = true;
try {
Method method = this.getMethod(pjp);
DataSourceSelector dataSourceImport = method.getAnnotation(DataSourceSelector.class);
clear = dataSourceImport.clear();
DataSourceContextHolder.set(dataSourceImport.value().getDataSourceName());
log.info("========数据源切换至:{}", dataSourceImport.value().getDataSourceName());
return pjp.proceed();
} finally {
if (clear) {
DataSourceContextHolder.clear();
}

}
}
private Method getMethod(JoinPoint pjp) {
MethodSignature signature = (MethodSignature)pjp.getSignature();
return signature.getMethod();
}

}

到这一步,我们的准备配置工作就完成了,下面开始测试效果。

先写好Service文件,包含读取和更新两个方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
复制代码@Service
public class UserService {

@Autowired
private UserMapper userMapper;

@DataSourceSelector(value = DynamicDataSourceEnum.SLAVE)
public List<User> listUser() {
List<User> users = userMapper.selectAll();
return users;
}

@DataSourceSelector(value = DynamicDataSourceEnum.MASTER)
public int update() {
User user = new User();
user.setUserId(Long.parseLong("1196978513958141952"));
user.setUserName("修改后的名字2");
return userMapper.updateByPrimaryKeySelective(user);
}

@DataSourceSelector(value = DynamicDataSourceEnum.SLAVE)
public User find() {
User user = new User();
user.setUserId(Long.parseLong("1196978513958141952"));
return userMapper.selectByPrimaryKey(user);
}
}

根据方法上的注解可以看出,读的方法走从库,更新的方法走主库,更新的对象是userId为1196978513958141953 的数据,

然后我们写个测试类测试下是否能达到效果,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
复制代码@RunWith(SpringRunner.class)
@SpringBootTest
class UserServiceTest {

@Autowired
UserService userService;

@Test
void listUser() {
List<User> users = userService.listUser();
for (User user : users) {
System.out.println(user.getUserId());
System.out.println(user.getUserName());
System.out.println(user.getUserPhone());
}
}
@Test
void update() {
userService.update();
User user = userService.find();
System.out.println(user.getUserName());
}
}

测试结果:

1、读取方法

2、更新方法

执行之后,比对数据库就可以发现主从库都修改了数据,说明我们的读写分离是成功的。当然,更新方法可以指向从库,这样一来就只会修改到从库的数据,而不会涉及到主库。

注意

上面测试的例子虽然比较简单,但也符合常规的读写分离配置。值得说明的是,读写分离的作用是为了缓解写库,也就是主库的压力,但一定要基于数据一致性的原则,就是保证主从库之间的数据一定要一致。如果一个方法涉及到写的逻辑,那么该方法里所有的数据库操作都要走主库。

假设写的操作执行完后数据有可能还没同步到从库,然后读的操作也开始执行了,如果这个读取的程序走的依然是从库的话,那么就会出现数据不一致的现象了,这是我们不允许的。

最后发一下项目的github地址,有兴趣的同学可以看下,记得给个star哦

地址:github.com/Taoxj/mysql…

參考:

www.cnblogs.com/cjsblog/p/9…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…845846847…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%