本文分享Spring中如何实现Redis响应式交互模式。
本文将模拟一个用户服务,并使用Redis作为数据存储服务器。
本文涉及两个java bean,用户与权益
1 | kotlin复制代码public class User { |
启动
引入依赖
1 | xml复制代码 <dependency> |
添加Redis配置
1 | ini复制代码spring.redis.host=192.168.56.102 |
SpringBoot启动
1 | java复制代码@SpringBootApplication |
应用启动后,Spring会自动生成ReactiveRedisTemplate(它的底层框架是Lettuce)。
ReactiveRedisTemplate与RedisTemplate使用类似,但它提供的是异步的,响应式Redis交互方式。
这里再强调一下,响应式编程是异步的,ReactiveRedisTemplate发送Redis请求后不会阻塞线程,当前线程可以去执行其他任务。
等到Redis响应数据返回后,ReactiveRedisTemplate再调度线程处理响应数据。
响应式编程可以通过优雅的方式实现异步调用以及处理异步结果,正是它的最大的意义。
序列化
ReactiveRedisTemplate默认使用的序列化是Jdk序列化,我们可以配置为json序列化
1 | java复制代码@Bean |
builder.hashValue方法指定Redis列表值的序列化方式,由于本文Redis列表值只存放字符串,所以还是设置为StringRedisSerializer.UTF_8。
基本数据类型
ReactiveRedisTemplate支持Redis字符串,散列,列表,集合,有序集合等基本的数据类型。
本文使用散列保存用户信息,列表保存用户权益,其他基本数据类型的使用本文不展开。
1 | typescript复制代码public Mono<Boolean> save(User user) { |
beanToMap方法负责将User类转化为map。
HyperLogLog
Redis HyperLogLog结构可以统计一个集合内不同元素的数量。
使用HyperLogLog统计每天登录的用户量
1 | vbscript复制代码public Mono<Long> login(User user) { |
BitMap
Redis BitMap(位图)通过一个Bit位表示某个元素对应的值或者状态。由于Bit是计算机存储中最小的单位,使用它进行储存将非常节省空间。
使用BitMap记录用户本周是否有签到
1 | vbnet复制代码public void addSignInFlag(long userId) { |
userId高48位用于将用户划分到不同的key,低16位作为位图偏移参数offset。
offset参数必须大于或等于0,小于2^32(bit 映射被限制在 512 MB 之内)。
Geo
Redis Geo可以存储地理位置信息,并对地理位置进行计算。
如查找给定范围内的仓库信息
1 | scss复制代码public Flux getWarehouseInDist(User u, double dist) { |
warehouse:address
这个集合中需要先保存好仓库地理位置信息。
ReactiveGeoOperations#radius方法可以查找集合中地理位置在给定范围内的元素,它中还支持添加元素到集合,计算集合中两个元素地理位置距离等操作。
Lua
ReactiveRedisTemplate也可以执行Lua脚本。
下面通过Lua脚本完成用户签到逻辑:如果用户今天未签到,允许签到,积分加1,如果用户今天已签到,则拒接操作。
1 | arduino复制代码public Flux<String> addScore(long userId) { |
signin.lua内容如下
1 | sql复制代码local score=redis.call('hget','user:'..KEYS[1],'score') |
Stream
Redis Stream 是 Redis 5.0 版本新增加的数据类型。该类型可以实现消息队列,并提供消息的持久化和主备复制功能,并且可以记住每一个客户端的访问位置,还能保证消息不丢失。
Redis借鉴了kafka的设计,一个Stream内可以存在多个消费组,一个消费组内可以存在多个消费者。
如果一个消费组内某个消费者消费了Stream中某条消息,则这消息不会被该消费组其他消费者消费到,当然,它还可以被其他消费组中某个消费者消费到。
下面定义一个Stream消费者,负责处理接收到的权益数据
1 | java复制代码@Component |
下面看一下如何发送信息
1 | java复制代码public Mono<RecordId> addRights(Rights r) { |
创建一个消息记录对象ObjectRecord,并通过ReactiveStreamOperations发送信息记录。
Sentinel、Cluster
ReactiveRedisTemplate也支持Redis Sentinel、Cluster集群模式,只需要调整配置即可。
Sentinel配置如下
1 | ini复制代码spring.redis.sentinel.master=mymaster |
spring.redis.sentinel.nodes
配置的是Sentinel节点IP地址和端口,不是Redis实例节点IP地址和端口。
Cluster配置如下
1 | ini复制代码spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379 |
如Redis Cluster中node2是node1的从节点,Lettuce中会缓存该信息,当node1宕机后,Redis Cluster会将node2升级为主节点。但Lettuce不会自动将请求切换到node2,因为它的缓冲没有刷新。
开启spring.redis.lettuce.cluster.refresh.adaptive
配置,Lettuce可以定时刷新Redis Cluster集群缓存信息,动态改变客户端的节点情况,完成故障转移。
暂时未发现ReactiveRedisTemplate实现pipeline,事务的方案。
官方文档:docs.spring.io/spring-data…
文章完整代码:gitee.com/binecy/bin-…
如果您觉得本文不错,欢迎关注我的微信公众号,系列文章持续更新中。您的关注是我坚持的动力!
本文转载自: 掘金