开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

使用Loki服务统一将日志发送至一台主机查看

发表于 2021-10-02

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动

前言

众所周知,对于一道程序来说,能有详细的日志输出是十分重要的。一方面,日志便于代码出错后的调试,另一方面,在没有事先埋点的情况下,详细的日志对日后的数据统计也会有极大的帮助。

但是,but,由于我们的程序往往会部署在不同服务器的不同地方,因此查看不同的日志往往需要在不同的服务器不同的文件夹中跑来跑去非常麻烦。那么,有没有一个东西,可以让我们可以便捷的直接在一台主机上的一个地方上查看所有程序的日志呢?
接下来,我将向大家介绍如何使用Loki以实现这一功能

原理

image.png

准备工作

以下我们称,输出日志的程序,所在的主机为主机A,查看日志的主机为主机B。

在主机A安装并部署Loki

移动至任一想要让Loki运行的目录下

cd <path_to_loki_to_run>

下载 二进制文件(Linux) 或 exe可执行文件(Windows)

二进制文件

exe可执行文件

下载完成后记得要解压到当前目录哦

编辑Loki配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
yaml复制代码# loki-config.yaml
auth_enabled: false

server:
http_listen_port: 3100

ingester:
lifecycler:
address: 127.0.0.1
ring:
kvstore:
store: inmemory
replication_factor: 1
final_sleep: 0s
chunk_idle_period: 5m
chunk_retain_period: 30s
max_transfer_retries: 0

schema_config:
configs:
- from: 2018-04-15
store: boltdb
object_store: filesystem
schema: v11
index:
prefix: index_
period: 168h

storage_config:
boltdb:
directory: /tmp/loki/index

filesystem:
directory: /tmp/loki/chunks

limits_config:
enforce_metric_name: false
reject_old_samples: true
reject_old_samples_max_age: 168h

chunk_store_config:
max_look_back_period: 0s

table_manager:
retention_deletes_enabled: false
retention_period: 0s

配置文件中较为重要的参数说明

  • server.http_listen_port:Loki服务运行的端口。之后配置Promtail和Grafana时需要用到。
    其他参数对我们目前的需求来说,可以直接忽视。

部署Loki

Linux

./loki-linux-amd64 -config.file=loki_config.yaml

Windows

./loki-windows-amd64.exe -config.file=loki_config.yaml

在主机A安装并部署Promtail

移动至任一想要让Promtail运行的目录下

cd <path_to_promtail_to_run>

下载 二进制文件(Linux) 或 exe可执行文件(Windows)

二进制文件

exe可执行文件

编辑Promtail配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
yaml复制代码# promtail-config.yaml
server:
http_listen_port: 9080
grpc_listen_port: 0

positions:
filename: /tmp/positions.yaml

clients:
- url: http://127.0.0.1:3100/loki/api/v1/push

scrape_configs:
- job_name: system
static_configs:
- targets:
- localhost
labels:
job: varlogs
__path__: <path_to_log_file>

配置文件中较为重要的参数说明

  • clients.url:Loki服务的日志推送接口的url,一般为http://<ip_of_loki>:<port_of_loki>/loki/api/v1/push,在本文中,由于Loki与Promtail是在一台主机上,故ip为127.0.0.1,且上文Loki配置文件中,是将Loki部署在3100端口,故端口为3100,所以该参数的值为http://127.0.0.1:3100/loki/api/v1/push
  • scrape_config:日志爬虫配置,这里以最简单的文件爬虫为例。jon_name即为爬虫名称,没啥大用。targets为爬虫目的主机,本文中设为localhost即可。labels即为爬虫的标签,到时候可以使用标签过滤查看日志。labels.__path__即为日志文件所在路径,promtail会自动将该文件的日志推送至Loki服务。

部署Promtail

Linux

./promtail-linux-amd64 --config.file=promtail-config.yaml

Loki

./promtail-windows-amd64.exe --config.file=promtail-config.yaml

在主机B安装并部署Grafana

下载安装Grafana

下载链接

运行Grafana并配置Loki服务

image.png
image.png
image.png
在HTTP.URL中输入http://<ip_of_host_A>:<prot_of_Loki_of_host_A>并保存配置即可。
image.png
配置完成后即可在Grafana的explor中查看我们的日志
image.png

结语

本文只是简单地介绍了如何将一台主机的日志发送至另一台主机查看,若想将多台主机的日志发送至至一台主机查看,按照相同的操作配置即可。

本次并未深入介绍Promtail与Loki的原理。Promtail除了可以将日志文件中的日志推送至Loki外,还可以将其他一些服务如Docker容器或者系统服务的日志也推送到Loki。且他除了可以直接推送日志外,还可以将日志进行一定规则的过滤筛选更改之后,再推送出去。感兴趣的朋友可以自行去深入了解一下Promtail与Loki。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

基于Spring Boot实现电脑端网页微信扫码授权登录方式

发表于 2021-10-02

本文已参与「掘力星计划」,赢取创作大礼包,挑战创作激励金。

简介

电脑端微信网页扫码授权登录有2种方式:

  • 第一种:基于微信公众号,单独获取登录二维码扫码,然后扫码登录,程序控制跳转逻辑,例如CSDN:
    在这里插入图片描述
  • 第二种:基于微信开放平台,跳转到微信二维码页面进行扫码登录,重定向到成功页面,例如有道笔记:
    在这里插入图片描述
    注: 本文记录第一种方式,只需通过微信测试公众号即可完成完整测试,即所有人本地都可以完整运行;第二种需有通过认证资质的开发者账号,后续再记录。

本地完整运行环境准备

内网渗透=>生成本地指定端口映射的外网域名

传送门:内网渗透工具Natapp使用详解
域名生成之后修改配置文件:
在这里插入图片描述

注册并配置微信测试公众号

  1. 注册地址:微信公众平台 测试账号申请,扫码登录,并关注该测试号
    在这里插入图片描述
    在这里插入图片描述
  2. 获取测试号appid和appsecret
    在这里插入图片描述

3.接口配置信息修改(此处会回调后台签名验证方法,配置时需启动后台)
在这里插入图片描述
4.配置网页授权域名,用于获取微信用户信息
在这里插入图片描述
在这里插入图片描述

流程

文字简述

  1. 页面点击获取微信登录二维码按钮
  2. 后台以appid和appsecret为参数调用微信api获取access_token
  3. 以access_token为参数调用微信api获取本地微信登录二维码ticket,同时传随机字符串参数scene_str,该参与可用于程序验证是否扫码成功
  4. 通过ticket获取二维码成功,页面显示,同时前台开始进行js定时任务(此处可优化成后台通知),监听是否扫码成功
  5. 用户微信扫码成功
  6. 微信服务器端回调本地服务器签名验证接口验证签名和回调处理
  7. 回调处理确定扫码成功(此处还可进行关注验证),数据库或缓存插入本地扫码成功记录
  8. 前台定时任务获取到扫码成功记录或者后台主动通知,进行扫码成功后业务处理

代码

1.获取微信二维码和启动js定时任务

前台html

1
2
3
4
5
html复制代码<h1>微信扫码登录方式一</h1>
<button onclick="getQrCode()" style="width: 100px;height: 50px;">获取二维码</button>
<br>
<img src="" id="qrCodeImgId" style="width: 300px;height: 300px;display: none">
<hr>

前台js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
js复制代码//======================================微信扫码登录方式一=========================================================
// 存储二维码标识,用于验证是否扫码成功
var sceneStr;
var t;
// 获取登录二维码
function getQrCode(){
$.get('qrCodeFirstLogin/getQrCode',function (data) {
console.log("=============getQrCode=======================");
console.log(data);
if(data.code == 200){
sceneStr = data.data.sceneStr;
// 二维码获取
$('#qrCodeImgId').attr('src',"https://mp.weixin.qq.com/cgi-bin/showqrcode?ticket="+data.data.ticket);
$('#qrCodeImgId').show();
// 定时任务监听是否扫码成功
t = window.setInterval(getOpenId,3000);
}else{
alert(data.msg);
}

});
}

后台获取二维码ticket接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码    /**
* 获取accessToken
* @return
*/
public String getAccessToken(){
String accessToken = null;
String getTokenUrl = wxConfig.getTokenUrl().replace("APPID", wxConfig.getAppId()).replace("SECRET", wxConfig.getAppSecret());
String result = HttpClientUtil.doGet(getTokenUrl);
JSONObject jsonObject = JSONObject.parseObject(result);
accessToken = jsonObject.getString("access_token");
return accessToken ;
}
/**
* 获取登录二维码
* @return
*/
@GetMapping("/getQrCode")
private ResultJson getQrCode(){
try {
// 获取token开发者
String accessToken =getAccessToken();
String getQrCodeUrl = wxConfig.getQrCodeUrl().replace("TOKEN", accessToken);
// 这里生成一个带参数的二维码,参数是scene_str
String sceneStr = CodeLoginUtil.getRandomString(8);
String json="{\"expire_seconds\": 604800, \"action_name\": \"QR_STR_SCENE\"" +", \"action_info\": {\"scene\": {\"scene_str\": \""+sceneStr+"\"}}}";
String result = HttpClientUtil.doPostJson(getQrCodeUrl,json);
JSONObject jsonObject = JSONObject.parseObject(result);
jsonObject.put("sceneStr",sceneStr);
return ResultJson.ok(jsonObject);
} catch (Exception e) {
e.printStackTrace();
return ResultJson.error(e.getMessage());
}
}

2.签名验证和回调处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
java复制代码/**
* 验证签名
* @param request
* @return
* @throws Exception
*/
@RequestMapping("/checkSign")
public String checkSign ( HttpServletRequest request) throws Exception {
log.info("===========>checkSign");
//获取微信请求参数
String signature = request.getParameter ("signature");
String timestamp = request.getParameter ("timestamp");
String nonce = request.getParameter ("nonce");
String echostr = request.getParameter ("echostr");
//参数排序。 token 就要换成自己实际写的 token
String [] params = new String [] {timestamp,nonce,"123456"} ;
Arrays.sort (params) ;
//拼接
String paramstr = params[0] + params[1] + params[2] ;
//加密
//获取 shal 算法封装类
MessageDigest Sha1Dtgest = MessageDigest.getInstance("SHA-1") ;
//进行加密
byte [] digestResult = Sha1Dtgest.digest(paramstr.getBytes ("UTF-8"));
//拿到加密结果
String mysignature = CodeLoginUtil.bytes2HexString(digestResult);
mysignature=mysignature.toLowerCase(Locale.ROOT);
//是否正确
boolean signsuccess = mysignature.equals(signature);
//逻辑处理
if (signsuccess && echostr!=null) {
//peizhi token
return echostr ;//不正确就直接返回失败提示.
}else{
JSONObject jsonObject = callback(request);
return jsonObject.toJSONString();
}
}


/**
* 回调方法
* @param request
* @return
* @throws Exception
*/
public JSONObject callback(HttpServletRequest request) throws Exception{
log.info("===========>callback");
//request中有相应的信息,进行解析
WxMpXmlMessage message= WxMpXmlMessage.fromXml(request.getInputStream());//获取消息流,并解析xml
String messageType=message.getMsgType(); //消息类型
String messageEvent=message.getEvent(); //消息事件
// openid
String fromUser=message.getFromUser(); //发送者帐号
String touser=message.getToUser(); //开发者微信号
String text=message.getContent(); //文本消息 文本内容
// 生成二维码时穿过的特殊参数
String eventKey=message.getEventKey(); //二维码参数

String uuid=""; //从二维码参数中获取uuid通过该uuid可通过websocket前端传数据
String userid="";

//if判断,判断查询
JSONObject jsonObject = new JSONObject();
jsonObject.put("code","200");
if(messageType.equals("event")){
jsonObject = null;
//先根据openid从数据库查询 => 从自己数据库中查取用户信息 => jsonObject
if(messageEvent.equals("SCAN")){
//扫描二维码
//return "欢迎回来";
}
if(messageEvent.equals("subscribe")){
//关注
//return "谢谢您的关注";
}
//没有该用户
if(jsonObject==null){
//从微信上中拉取用户信息
String url = "https://api.weixin.qq.com/cgi-bin/user/info?access_token=" +getAccessToken() +
"&openid=" + fromUser +
"&lang=zh_CN";
String result = HttpClientUtil.doGet(url);
jsonObject = JSONObject.parseObject(result);
/**
* 用户信息处理....
*/
}
// 扫码成功,存入缓存
loginMap.put(eventKey,new CodeLoginKey(eventKey,fromUser));
return jsonObject;
}
return jsonObject;
//log.info("消息类型:{},消息事件:{},发送者账号:{},接收者微信:{},文本消息:{},二维码参数:{}",messageType,messageEvent,fromUser,touser,text,eventKey);
}

3.扫码成功获取用户信息取消定时任务

js定时任务监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
js复制代码    // 扫码成功,获取用户openId=>为获取用户信息做准备
function getOpenId() {
$.get("qrCodeFirstLogin/getOpenId",{
"eventKey":sceneStr
},function (data) {
if(data.code == 200){
console.log("========getOpenId==========");
console.log(data.data);
window.clearInterval(t);
alert("登录成功openId:"+data.data.openId);
/**
* 1、第一次扫码登录进行账号绑定
* 2、以后根据openId获取用户信息
*/
}
});
}

后台验证接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码    /**
* 根据二维码标识获取用户openId=>获取用户信息
* @param eventKey
* @return
*/
@RequestMapping("getOpenId")
public ResultJson getOpenId(String eventKey){
if(loginMap.get(eventKey) == null){
return ResultJson.error("未扫码成功!") ;
}
CodeLoginKey codeLoginKey = loginMap.get(eventKey);
loginMap.remove(eventKey);
return ResultJson.ok(codeLoginKey);
}

结果演示

获取二维码

在这里插入图片描述

扫码成功

在这里插入图片描述

源码

传送门:微信pc扫码登录

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Redis——LFU(Least Frequently Us

发表于 2021-10-02

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

本文已参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金。

1、简介

LRU有一个明显的缺点,它无法正确的表示一个Key的热度,如果一个key从未被访问过,仅仅发生内存淘汰的前一会儿被用户访问了一下,在LRU算法中这会被认为是一个热key。

例如如下图,keyA与keyB同时被set到Redis中,在内存淘汰发生之前,keyA被频繁的访问,而keyB只被访问了一次,但是这次访问的时间比keyA的任意一次访问时间都更接近内存淘汰触发的时间,如果keyA与keyB均被Redis选中进行淘汰,keyA将被优先淘汰。我想大家都不太希望keyA被淘汰吧,那么有没有更好的的内存淘汰机制呢?当然有,那就是LFU。\

LRU存在的问题.png

LFU(Least Frequently Used)是Redis 4.0 引入的淘汰算法,它通过key的访问频率比较来淘汰key,重点突出的是Frequently Used。

​

LRU与LFU的区别:

  • LRU -> Recently Used,根据最近一次访问的时间比较
  • LFU -> Frequently Used,根据key的访问频率比较

Redis4.0之后为maxmemory_policy淘汰策略添加了两个LFU模式(LRU请看我上一篇文章) :

  • volatile-lfu:对有过期时间的key采用LFU淘汰算法
  • allkeys-lfu:对全部key采用LFU淘汰算法

2、实现方式

Redis分配一个字符串的最小空间占用是19字节,16字节(对象头)+3字节(SDS基本字段)。Redis的内存淘汰算法LRU/LFU均依靠其中的对象头中的lru来实现。

Redis对象头的内存结构:

1
2
3
4
5
6
7
arduino复制代码typedef struct redisObject {
    unsigned type:4;        // 4 bits 对象的类型(zset、set、hash等)
    unsigned encoding:4;    // 4 bits 对象的存储方式(ziplist、intset等)
    unsigned lru:24;        // 24bits 记录对象的访问信息
    int refcount;            // 4 bytes 引用计数
    void *ptr;                // 8 bytes (64位操作系统),指向对象具体的存储地址/对象body
}

Redis对象头中的lru字段,在LRU模式下和LFU模式下使用方式并不相同。

​

2.1 LRU实现方式

在LRU模式,lru字段存储的是key被访问时Redis的时钟server.lrulock(Redis为了保证核心单线程服务性能,缓存了Unix操作系统时钟,默认每毫秒更新一次,缓存的值是Unix时间戳取模2^24)。当key被访问的时候,Redis会更新这个key的对象头中lru字段的值。

因此在LRU模式下,Redis可以根据对象头中的lru字段记录的值,来比较最后一次key的访问时间。

​

用Java代码演示一个简单的Redis-LRU算法:

  • Redis对象头
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
kotlin复制代码package com.lizba.redis.lru;

/**
 * <p>
 *      Redis对象头
 * </p>
 *
 * @Author: Liziba
 * @Date: 2021/9/22 22:40
 */
public class RedisHead {

    /** 时间 */
    private Long lru;
    /** 具体数据 */
    private Object body;

    public RedisHead setLru(Long lru) {
        this.lru = lru;
        return this;
    }

    public RedisHead setBody(Object body) {
        this.body = body;
        return this;
    }


    public Long getLru() {
        return lru;
    }

    public Object getBody() {
        return body;
    }

}
  • Redis LRU实现代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
kotlin复制代码package com.lizba.redis.lru;

import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
 * <p>
 * Redis中LRU算法的实现demo
 * </p>
 *
 * @Author: Liziba
 * @Date: 2021/9/22 22:36
 */
public class RedisLruDemo {

    /**
     * 缓存容器
     */
    private ConcurrentHashMap<String, RedisHead> cache;
    /**
     * 初始化大小
     */
    private int initialCapacity;

    public RedisLruDemo(int initialCapacity) {
        this.initialCapacity = initialCapacity;
        this.cache = new ConcurrentHashMap<>(initialCapacity);
        ;
    }

    /**
     * 设置key/value 设置的时候更新LRU
     *
     * @param key
     * @param body
     */
    public void set(String key, Object body) {
        // 触发LRU淘汰
        synchronized (RedisLruDemo.class) {
            if (!cache.containsKey(key) && cache.size() >= initialCapacity) {
                this.flushLruKey();
            }
        }
        RedisHead obj = this.getRedisHead().setBody(body).setLru(System.currentTimeMillis());
        cache.put(key, obj);
    }


    /**
     * 获取key,存在则更新LRU
     *
     * @param key
     * @return
     */
    public Object get(String key) {

        RedisHead result = null;
        if (cache.containsKey(key)) {
            result = cache.get(key);
            result.setLru(System.currentTimeMillis());
        }

        return result;
    }


    /**
     * 清除LRU key
     */
    private void flushLruKey() {

        List<String> sortData = cache.keySet()
                .stream()
                .sorted(Comparator.comparing(key -> cache.get(key).getLru()))
                .collect(Collectors.toList());
        String removeKey = sortData.get(0);
        System.out.println( "淘汰 -> " + "lru : " + cache.get(removeKey).getLru() + " body : " + cache.get(removeKey).getBody());
        cache.remove(removeKey);
        if (cache.size() >= initialCapacity) {
            this.flushLruKey();
        }
        return;
    }


    /**
     *  获取所有数据测试用
     *
     * @return
     */
    public List<RedisHead> getAll() {
         return cache.keySet().stream().map(key -> cache.get(key)).collect(Collectors.toList());
    }


    private RedisHead getRedisHead() {
        return new RedisHead();
    }

}
  • 测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码package com.lizba.redis.lru;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 *      测试LRU
 * </p>
 *
 * @Author: Liziba
 * @Date: 2021/9/22 22:51
 */
public class TestRedisLruDemo {

    public static void main(String[] args) throws InterruptedException {

        RedisLruDemo demo = new RedisLruDemo(10);
        // 先加入10个key,此时cache达到容量,下次加入会淘汰key
        for (int i = 0; i < 10; i++) {
            demo.set(i + "", i);
        }
        // 随机访问前十个key,这样可以保证下次加入时随机淘汰
        for (int i = 0; i < 20; i++) {
            int nextInt = new Random().nextInt(10);
            TimeUnit.SECONDS.sleep(1);
            demo.get(nextInt + "");
        }
        // 再次添加5个key,此时每次添加都会触发淘汰
        for (int i = 10; i < 15; i++) {
            demo.set(i + "", i);
        }

        System.out.println("-------------------------------------------");
        demo.getAll().forEach( redisHead -> System.out.println("剩余 -> " + "lru : " + redisHead.getLru() + " body : " + redisHead.getBody()));
    }

}
  • 测试结果

image.png

2.2 LFU实现方式

在LFU模式下,Redis对象头的24bit lru字段被分成两段来存储,高16bit存储ldt(Last Decrement Time),低8bit存储logc(Logistic Counter)。\

lru_24 bit.png

​

2.2.1 ldt(Last Decrement Time)

高16bit用来记录最近一次计数器降低的时间,由于只有8bit,存储的是Unix分钟时间戳取模2^16,16bit能表示的最大值为65535(65535/24/60≈45.5),大概45.5天会折返(折返指的是取模后的值重新从0开始)。

​

Last Decrement Time计算的算法源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
arduino复制代码/* Return the current time in minutes, just taking the least significant
 * 16 bits. The returned time is suitable to be stored as LDT (last decrement
 * time) for the LFU implementation. */
// server.unixtime是Redis缓存的Unix时间戳
// 可以看出使用的Unix的分钟时间戳,取模2^16
unsigned long LFUGetTimeInMinutes(void) {
  return (server.unixtime/60) & 65535;
}

/* Given an object last access time, compute the minimum number of minutes
 * that elapsed since the last access. Handle overflow (ldt greater than
 * the current 16 bits minutes time) considering the time as wrapping
 * exactly once. */
unsigned long LFUTimeElapsed(unsigned long ldt) {
  // 获取系统当前的LFU time
  unsigned long now = LFUGetTimeInMinutes();
  // 如果now >= ldt 直接取差值  
  if (now >= ldt) return now-ldt;
  // 如果now < ldt 增加上65535
  // 注意Redis 认为折返就只有一次折返,多次折返也是一次,我思考了很久感觉这个应该是可以接受的,本身Redis的淘汰算法就带有随机性  
  return 65535-ldt+now;
}

2.2.2 logc(Logistic Counter)

低8位用来记录访问频次,8bit能表示的最大值为255,logc肯定无法记录真实的Rediskey的访问次数,其实从名字可以看出存储的是访问次数的对数值,每个新加入的key的logc初始值为5(LFU_INITI_VAL),这样可以保证新加入的值不会被首先选中淘汰;logc每次key被访问时都会更新;此外,logc会随着时间衰减。

​

2.2.3 logc 算法调整

redis.conf 提供了两个配置项,用于调整LFU的算法从而控制Logistic Counter的增长和衰减。\

image.png

  • lfu-log-factor 用于调整Logistic Counter的增长速度,lfu-log-factor值越大,Logistic Counter增长越慢。

Redis Logistic Counter增长的源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
scss复制代码/* Logarithmically increment a counter. The greater is the current counter value
 * the less likely is that it gets really implemented. Saturate it at 255. */
uint8_t LFULogIncr(uint8_t counter) {
  // Logistic Counter最大值为255  
  if (counter == 255) return 255;
  // 取一个0~1的随机数r  
  double r = (double)rand()/RAND_MAX;
  // counter减去LFU_INIT_VAL (LFU_INIT_VAL为每个key的Logistic Counter初始值,默认为5)
  double baseval = counter - LFU_INIT_VAL;
  // 如果衰减之后已经小于5了,那么baseval < 0取0
  if (baseval < 0) baseval = 0;
  // lfu-log-factor在这里被使用
  // 可以看出如果lfu_log_factor的值越大,p越小
  // r < p的概率就越小,Logistic Counter增加的概率就越小(因此lfu_log_factor越大增长越缓慢)
  double p = 1.0/(baseval*server.lfu_log_factor+1);
  if (r < p) counter++;
  return counter;
}

如下是官网提供lfu-log-factor在不同值下,key随着访问次数的增加的Logistic Counter变化情况的数据:\

image.png

  • lfu-decay-time 用于调整Logistic Counter的衰减速度,它是一个以分钟为单位的数值,默认值为1,;lfu-decay-time值越大,衰减越慢。

Redis Logistic Counter衰减的源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
vbnet复制代码/* If the object decrement time is reached decrement the LFU counter but
 * do not update LFU fields of the object, we update the access time
 * and counter in an explicit way when the object is really accessed.
 * And we will times halve the counter according to the times of
 * elapsed time than server.lfu_decay_time.
 * Return the object frequency counter.
 *
 * This function is used in order to scan the dataset for the best object
 * to fit: as we check for the candidate, we incrementally decrement the
 * counter of the scanned objects if needed. */
unsigned long LFUDecrAndReturn(robj *o) {
  // 获取lru的高16位,也就是ldt
  unsigned long ldt = o->lru >> 8;  
  // 获取lru的低8位,也就是logc  
  unsigned long counter = o->lru & 255;
  // 根据配置的lfu-decay-time计算Logistic Counter需要衰减的值
  unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
  if (num_periods)
    counter = (num_periods > counter) ? 0 : counter - num_periods;
  return counter;
}

2.2.4 LFU 优化

LFU 与 LRU 有一个共同点,当内存达到max_memory时,选择key是随机抓取的,因此Redis为了使这种随机性更加准确,设计了一个淘汰池,这个淘汰池对于LFU和LRU算的都适应,只是淘汰池的排序算法有区别而已。

Redis 3.0就对这一块进行了优化(来自redis.io):\

image.png

3、LFU使用

3.1 配置文件开启LFU淘汰算法

修改redis.conf配置文件,设置maxmemory-policy volatile-lfu/allkeys-lfu\

image.png

重启Redis,连接客户端通过info指令查看maxmemory_policy的配置信息\

image.png

通过object freq key 获取对象的LFU的Logistic Counter值\

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

有请Minio统一管理文件(实操JS、JAVA)

发表于 2021-10-02

本文已参与「掘力星计划」,赢取创作大礼包,挑战创作激励金。

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

前言

  • 在线音乐戳我呀!
  • 音乐博客源码上线啦!
  • 最近在整理自己的在线音乐(因为最近换服务器了),发现上传的图片文件很杂乱,如:音乐上传到minio中(文件服务器),IT知识模块的图片上传在Node指定的文件夹中,其他模块的文件又是Node的另外一个文件夹。
  • 就想着能不能全部都由一个来管理这些文件,优先想到之前实习的时候公司(dpf)搭建个文件服务器,也用了一下确实是挺方便的,也就是今天的主角 – Minio。
  • 其实自己的在线音乐已经用了很久了,只不过没有说对全部文件都归其管理,再加上现在应用部署在Docker上,不知道为什么Node一直映射失败,但Minio却映射得出来,这也是一开始初心想文件都放在minio中。
  • 接下来会分享如何搭建、可能会遇到的问题,一五一十盘出。
  • Are you ready ?

假期快乐。

先来张图效果。

fbd57b4cd563eedf704d7ae9570938d.png

界面直观、可视化界面、方便管理、上传下载流程简单,还能设置权限、失效时间等等。

Minio

  • 是什么
  • 怎么下载安装使用
  • 代码层面如何开发

一、Minio是什么?

Minio是一个基于Apache License v2.0开源协议的对象存储服务。它兼容亚马逊S3云存储服务接口,非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等,而一个对象文件可以是任意大小,从几kb到最大5T不等。

MinIO是一个非常轻量的服务,可以很简单的和其他应用的结合,类似 NodeJS, Redis 或者 MySQL。

同时也支持多种语言客户端集成开发,如JavaScript 、Java、Python、Golang、.NET。

简而言之就是:文件服务器,存放管理我们的文件。

二、Minio怎么下载安装使用?

  • 官网地址
  • github地址
  • Minio客户端下载地址

我们拿window、docker环境进行手把手教学。目前这两种环境较多人使用吧。(😁其实我也是只用这两种环境,其他环境不敢有话语权)

2.1 Windows版

2.1.1 下载

  • 官网下载
  • 百度云下载软件:链接:pan.baidu.com/s/1pu-qyW4Q… 提取码:n5le

下载好了之后,文件后缀是exe。

2.1.2 使用

进入minio.exe所在目录。

cmd走起。

minio.exe server G:\tsblog\minio

参数说明:G:\tsblog\minio这是文件上传之后的存储目录(我们图片上传到哪)

运行成功。会看到下面的界面。

20190330122426574.png

这里minio会给出ACCESS-KEY 和 SECRET-KEY,供后台管理登录使用。

2.1.3 登录使用

打开浏览器,使用命令行界面给出的地址都可以登录。

默认端口9000:http://localhost:9000/

480a8c6edeff17ad3b8e425cd7f4040.png

输入ACCESS-KEY 和 SECRET-KEY即可登录minio的管理界面了。

2.1.4 创建桶和上传图片

在后台管理界面你可以上创建你的Bucket(桶),可以理解为一个文件夹用来存放图片。桶创建成功之后就可以上传图片了。如下图:

左边是桶(文件夹),右边是桶中的文件。

baef6d635f119a5e9dd37ad88e06e17.png

在文件列表的右边可以复制出图片的访问地址,在浏览器中就可以访问图片了。这个时候的图片地址是带过期时间和密钥的。

888924df317095b7e1d3317936950f5.png

2.2 Docker版

我们会将minio部署到线上,众所周知,大部分服务器都是Linux系统,像我自己还装着Docker管理我的应用程序。

2.2.1 下载MinIO 镜像包

1
bash复制代码docker pull minio/minio

20200818103209519.png

2.2.2 启动MinIO 镜像

使用docker run -p 9000:9000 minio/minio server /data 端口可以自定义修改,默认会生成ACCESS_KEY与SECRET_KEY,看下图提示生成的是minioadmin/minioadmin

20200818103354496.png

当然,我们正常不会这样子启动,肯定是要自定义账号密码。还要把minio文件给映射出来(这才是重要)

使用以下脚本启动MinIO镜像,生成永久MinIO的容器。

一般复制这串代码即可适用大部分情况。

1
2
3
4
5
6
bash复制代码docker run -p 9000:9000 --name minio -d
-e "MINIO_ACCESS_KEY=admin"
-e "MINIO_SECRET_KEY=123456"
-v /home/minio:/data
-v /home/minio/config:/root/.minio
minio/minio  server /data

参数说明

  • run:启动镜像
  • -p 9000:9000:设置端口,并映射给外面的端口
  • -d:后台启动
  • -e “MINIO_ACCESS_KEY=admin” -e “MINIO_SECRET_KEY=123456” :设置账号密码(MINIO_ACCESS_KEY:密码Key;MINIO_SECRET_KEY:密码值)
  • -v /home/minio:/data -v /home/minio/config:/root/.minio :虚拟配置目录,文件映射出来,我的minio文件夹资源放在/home/minio/下,若要改的话,要改下这里的路径
  • minio/minio server:这是我们一开始拉取下来的minio镜像名

有小伙子不知道文件映射出来是什么意思,请允许我解析一波。
我们把minio部署在docker中运行。
比如我们在minio中上传一个文件,那我们在linux服务器上怎么查看这个文件在哪?
这个时候就要文件映射,通俗的说就是将minio中上传的文件给映射到linux上,这样子我们在linux上就可以实时同步看到我们上传的文件。

2.2.3 查看MinIO 镜像

查看minio有没有启动成功:docker ps

20200818105135478.png

登录客户端也可以登录成功!

三、Minio实战(代码层面如何开发)

我们知道,MinIO是一个非常轻量的服务,可以支持多种语言客户端集成开发,如JavaScript 、Java、Python、Golang、.NET。

我们拿JavaScript 、Java进行手把手教学。目前这两种环境较多人使用吧。(😁其实我也是只用这两种环境,其他语言咱也不敢有话语权)

3.1 JavaScript引入minio并使用

在线音乐中的【IT知识】模块文件本是由node,以原生上传文件保存到服务器本地中,但后面我想把全部文件统一管理,在前几天换做上传到minio中。

本例子用koa2作为JavaScript的后台。

3.1.1 koa2安装minio

1
css复制代码npm i minio --save-dev

3.1.2 新建minio.js

配置minio账号密码端口。

1
2
3
4
5
6
7
8
9
10
11
12
javascript复制代码var Minio = require("minio");
let endPoint = "127.0.0.1";

var minioClient = new Minio.Client({
endPoint,
port: 9000,
useSSL: false,
accessKey: "admin",
secretKey: "123456",
});

module.exports = minioClient;

于是就可以在js接口中开始使用minio了,当然在写之前是需要引入上面定义的minio.js文件。

下面简单列出几个常用的方法操作。

3.1.3 列出全部存储桶(文件夹)

1
2
3
javascript复制代码minioClient.listBuckets().then((res) => {
console.log(res);
})

3.1.4 列出一个存储桶下的所有文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
javascript复制代码var stream = minioClient.listObjectsV2("music", "", true);
let arr = [];

stream.on("data", (obj) => {
arr.push(obj); // 每一个文件都会去执行该data方法,类似于遍历
});

stream.on("end", function () {
console.log(arr);
// 全部遍历完成后,在这里会打印出该music文件夹下的所有文件
});

stream.on("error", function (err) {
console.log(err);
});

3.1.5 判断是否有这个存储桶

1
2
3
4
5
6
7
8
9
10
11
javascript复制代码minioClient.bucketExists("music", function (err, exists) {
if (err) {
console.log(err);

}
if (exists) {
console.log('存在')
} else {
console.log('不存在')
}
});

3.1.6 创建一个新的存储桶

1
2
3
4
5
6
7
javascript复制代码minioClient.makeBucket("music", "us-east-1", function (err) {
if (err) {
console.log("Error creating bucket.", err);
}

console.log('创建成功')
});

3.1.7 下载对象并将其保存为本地文件系统中的文件

1
2
3
4
5
6
7
8
9
10
11
12
javascript复制代码minioClient.fGetObject(
"music",
"周杰伦 - Mojito202006152354441192.flac",
"H:/85/周杰伦 - Mojito202006152354441192.flac",
function (err) {
if (err) {
console.log(err);
}

console.log('下载成功');
}
);

3.1.8 下载的临时url

1
2
3
4
5
6
7
8
9
10
javascript复制代码minioClient.presignedUrl("GET", "music", "周.flac", 24 * 60 * 60, function (
err,
presignedUrl
) {
if (err) {
console.log(err);
}

console.log(presignedUrl);
});

3.1.9 往存储桶上传一个文件

上传文件的时候要注意文件名字可能会重复,所以这里加了随机数作为文件名字;

当然,规则都是人定的,可以根据自己的场景制订不同写法;

可以看到minioClient.putObject()函数中打印的那些字段,一般都会存在数据库中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
javascript复制代码const bucketsName = "music", file = File对象;
const path = file[0][1].path;
const fileName = file[0][1].name;
const type = file[0][1].type;
const nowDate = moment(new Date()).format("YYYYMMDDHHmmss");
const random = Math.round((Math.random() * 9 + 1) * 1000);
const fileNameRandom = nowDate + random;

const fileSuffix = fileName.substring(fileName.lastIndexOf(".")); // 文件后缀名字
const filePrefix = fileName.substring(0, fileName.lastIndexOf(".")); // 文件名字前面那段

let objectName = filePrefix.length > 20
? filePrefix.substring(0, 20) + fileNameRandom + fileSuffix
: filePrefix + fileNameRandom + fileSuffix;



var fileStream = fs.createReadStream(path);
fs.stat(path, function (err, stats) {
if (err) {
console.log(err);
}


minioClient.putObject(
bucketsName,
objectName,
fileStream,
stats.size,
{ "Content-Type": type },
function (err, etag) {
if (err) {
console.log(err);
}

console.log({
fileName,
bucketsName,
fileContentType: type,
size: stats.size,
fileNameRandom,
url: objectName,
});
}
);
});
});

3.1.10 获取文件信息

1
2
3
4
5
6
7
javascript复制代码minioClient.statObject("test", '9.png', function (err, stat) {
if (err) {
console.log(err);
}

console.log(stat);
});

3.1.11 移除一个对象

1
2
3
4
5
6
7
javascript复制代码minioClient.removeObject("test", '9.png', function (e) {
if (e) {
console.log("Unable to remove Objects ", e);
}

console.log('移除成功');
});

3.1.12 删除objectsList 中的所有对象

1
2
3
4
5
6
7
javascript复制代码minioClient.removeObjects("test", ['9.png'], function (e) {
if (e) {
console.log("Unable to remove Objects ", e);
}

console.log('移除成功');
});

我在github上写了个minio工具类,懒人可以借鉴一下(可以的话给个⭐Star):github.com/git-Dignity…

minio官方文档中也有明确的操作例子:docs.min.io/docs/javasc…

3.2 Java引入minio并使用

在线音乐中的【音乐】模块文件一直都是由minio管理的。

其实java和JavaScript的写法基本一样。

大同小异。

3.2.1 java安装minio

pom.xml文件引入依赖

1
2
3
4
5
xml复制代码<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>3.0.10</version>
</dependency>

3.2.2 在application.yml

我喜欢把配置参数(账号密码)写在application.yml文件中,规范管理。

1
2
3
4
5
6
7
java复制代码#文件服务器

minio:
endpoint: http://39.108.185.253 # 外网
accessKey: admin
secretKey: 123456
endpointIn: http://127.0.0.1:9000 # 内网

3.2.3 创建实体类Minio.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码package com.example.bkapi.common.entity;

import org.springframework.boot.context.properties.ConfigurationProperties;


@ConfigurationProperties(prefix = "minio")
public class Minio {

private String endpoint;
private String accessKey;
private String secretKey;
private String endpointIn;

public String getEndpoint() {
return endpoint;
}

public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}

public String getAccessKey() {
return accessKey;
}

public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}

public String getSecretKey() {
return secretKey;
}

public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}

public String getEndpointIn() {
return endpointIn;
}

public void setEndpointIn(String endpointIn) {
this.endpointIn = endpointIn;
}
}

于是就可以在接口中开始使用minio了,当然在写之前是需要引入上面定义的Minio实体类。

1
arduino复制代码private static Minio minio;

下面简单列出几个常用的方法操作。

3.2.4 创建minioClient

这里的写法其实和上面的3.1.2 新建minio.js 是一样的,为了创建Minio实例。

下面的minio.getEndpoint()获取到application.yml定义的http://39.108.185.253

下面的minio.getAccessKey()获取到application.yml定义的admin

下面的minio.getSecretKey()获取到application.yml定义的123456

大家的参数可以写死在这里,看个人习惯。

1
2
3
4
java复制代码public static MinioClient createMinioClient() throws InvalidPortException, InvalidEndpointException {
MinioClient minioClient = new MinioClient(minio.getEndpoint(), minio.getAccessKey(), minio.getSecretKey());
return minioClient;
}

3.2.5 创建内网minioClient

下面的minio.getEndpoint()获取到application.yml定义的http://127.0.0.1:9000

其他参数和上方createMinioClient方法一样。

1
2
3
4
java复制代码public static MinioClient createInMinioClient() throws InvalidPortException, InvalidEndpointException {
MinioClient minioClient = new MinioClient(minio.getEndpointIn(), minio.getAccessKey(), minio.getSecretKey());
return minioClient;
}

3.2.6 创建具有给定区域的新存储桶

先判断有没有这个桶,没有在进行创建新桶。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public static void makeBucket(String bucketName){
try {
MinioClient minioClient = createInMinioClient();

// Create bucket if it doesn't exist.
boolean found = minioClient.bucketExists(bucketName);
if (found) {
System.out.println(bucketName + " already exists");
} else {
// Create bucket 'my-bucketname'.
minioClient.makeBucket(bucketName);
System.out.println(bucketName + " is created successfully");
}
} catch (MinioException e) {
System.out.println("Error occurred: " + e);
} catch (Exception e) {
e.printStackTrace();
}
}

3.2.7 列出所有桶

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public static List<Bucket> listBuckets(){
List<Bucket> bucketList = null;
try {
MinioClient minioClient = createInMinioClient();

// List buckets that have read access.
bucketList = minioClient.listBuckets();
for (Bucket bucket : bucketList) {
System.out.println(bucket.creationDate() + ", " + bucket.name());
}
} catch (MinioException e) {
System.out.println("Error occurred: " + e);
} catch (Exception e) {
e.printStackTrace();
}
return bucketList;
}

3.2.8 检查是否存在存储桶

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public static boolean bucketExists(String bucketName){

boolean found = false;
try {
MinioClient minioClient = createInMinioClient();

// Check whether 'my-bucketname' exists or not.
found = minioClient.bucketExists(bucketName);
if (found) {
System.out.println(bucketName + " exists");
} else {
System.out.println(bucketName + " does not exist");
}
} catch (MinioException e) {
System.out.println("Error occurred: " + e);
} catch (Exception e) {
e.printStackTrace();
}
return found;
}

3.2.9 删除一个桶

先判断桶存不存在,存在则删除该桶。

注意: - removeBucket不会删除存储桶内的对象。需要使用removeObject API删除对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public static void removeBucket (String bucketName){
try {
MinioClient minioClient = createInMinioClient();
// Check if my-bucket exists before removing it.
boolean found = minioClient.bucketExists(bucketName);
if (found) {
// Remove bucket my-bucketname. This operation will succeed only if the bucket is empty.
minioClient.removeBucket(bucketName);
System.out.println(bucketName + " is removed successfully");
} else {
System.out.println(bucketName + " does not exist");
}
} catch(MinioException e) {
System.out.println("Error occurred: " + e);
} catch (Exception e) {
e.printStackTrace();
}
}

3.2.10 列出给定存储桶中的对象信息

先判断桶存不存在,存在则列出给定存储桶中的对象信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public static Iterable<Result<Item>> listObjects(String bucketName){
Iterable<Result<Item>> myObjects = null;
try {
MinioClient minioClient = createInMinioClient();

// Check whether 'mybucket' exists or not.
boolean found = minioClient.bucketExists(bucketName);
if (found) {
// List objects from 'my-bucketname'
myObjects = minioClient.listObjects(bucketName);
for (Result<Item> result : myObjects) {
Item item = result.get();
System.out.println(item.lastModified() + ", " + item.size() + ", " + item.objectName());
}
} else {
System.out.println(bucketName + " does not exist");
}
} catch (MinioException e) {
System.out.println("Error occurred: " + e);
} catch (Exception e) {
e.printStackTrace();
}
return myObjects;
}

3.2.11 列出给定存储桶和前缀中的对象信息

先判断桶存不存在,存在则列出给定存储桶和前缀中的对象信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public static Iterable<Result<Item>> listObjects(String bucketName,String prefix){
Iterable<Result<Item>> myObjects = null;
try {
MinioClient minioClient = createInMinioClient();
// Check whether 'mybucket' exists or not.
boolean found = minioClient.bucketExists(bucketName);
if (found) {
// List objects from 'my-bucketname'
myObjects = minioClient.listObjects(bucketName,prefix);
for (Result<Item> result : myObjects) {
Item item = result.get();
System.out.println(item.lastModified() + ", " + item.size() + ", " + item.objectName());
}
} else {
System.out.println(bucketName + " does not exist");
}
} catch (MinioException e) {
System.out.println("Error occurred: " + e);
} catch (Exception e) {
e.printStackTrace();
}
return myObjects;
}

3.2.12 将对象信息列为Iterable 在给定的桶,前缀和递归标志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public static Iterable<Result<Item>> listObjects(String bucketName, String prefix, boolean recursive){
Iterable<Result<Item>> myObjects = null;
try {
MinioClient minioClient = createInMinioClient();
// Check whether 'mybucket' exists or not.
boolean found = minioClient.bucketExists(bucketName);
if (found) {
// List objects from 'my-bucketname'
myObjects = minioClient.listObjects(bucketName,prefix,recursive);
for (Result<Item> result : myObjects) {
Item item = result.get();
System.out.println(item.lastModified() + ", " + item.size() + ", " + item.objectName());
}
} else {
System.out.println(bucketName + " does not exist");
}
} catch (MinioException e) {
System.out.println("Error occurred: " + e);
} catch (Exception e) {
e.printStackTrace();
}
return myObjects;
}

3.2.13 将对象下载为流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码public static InputStream getObject(String bucketName, String objectName){
InputStream stream = null;
try {
MinioClient minioClient = createInMinioClient();
// Check whether the object exists using statObject().
// If the object is not found, statObject() throws an exception,
// else it means that the object exists.
// Execution is successful.
minioClient.statObject(bucketName, objectName);

// Get input stream to have content of 'my-objectname' from 'my-bucketname'
stream = minioClient.getObject(bucketName, objectName);
} catch (MinioException e) {
System.out.println("Error occurred: " + e);
} catch (Exception e) {
e.printStackTrace();
}
return stream;
}

3.2.14 将对象下载并保存为本地文件系统中的文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public static void getObject(String bucketName, String objectName, String fileName){
try {
MinioClient minioClient = createInMinioClient();
// Check whether the object exists using statObject().
// If the object is not found, statObject() throws an exception,
// else it means that the object exists.
// Execution is successful.
minioClient.statObject(bucketName, objectName);

// Gets the object's data and stores it in photo.jpg
minioClient.getObject(bucketName, objectName, fileName);

} catch (MinioException e) {
System.out.println("Error occurred: " + e);
} catch (Exception e) {
e.printStackTrace();
}
}

3.2.15 通过InputStream上传对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public static ObjectStat putObject(String bucketName, String objectName, InputStream stream, long size, String contentType){
try {
MinioClient minioClient = createInMinioClient();
// 创建对象
minioClient.putObject(bucketName, objectName, stream, size, contentType);

System.out.println(objectName + " is uploaded successfully");
} catch(MinioException e) {
System.out.println("Error occurred: " + e);
} catch (Exception e) {
e.printStackTrace();
}
return statObject(bucketName,objectName);
}

3.2.16 通过文件上传到对象中

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public static int putObject(String bucketName, String objectName, String fileName){
try {
MinioClient minioClient = createInMinioClient();

minioClient.putObject(bucketName, objectName, fileName);
System.out.println(objectName + " is uploaded successfully");
} catch(MinioException e) {
System.out.println("Error occurred: " + e);
} catch (Exception e) {
e.printStackTrace();
}
return 1;
}

3.2.17 从objectName指定的对象中将数据拷贝到destObjectName指定的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public static void copyObject(String bucketName, String objectName, String destBucketName, String destObjectName, CopyConditions cpConds, Map<String, String> metadata){
try {
MinioClient minioClient = createInMinioClient();

CopyConditions copyConditions = new CopyConditions();
copyConditions.setMatchETagNone("TestETag");

minioClient.copyObject(bucketName, objectName, destBucketName, destObjectName, copyConditions);//copyConditions 或 cpConds
System.out.println(objectName + " is uploaded successfully");
} catch(MinioException e) {
System.out.println("Error occurred: " + e);
} catch (Exception e) {
e.printStackTrace();
}
}

3.2.18 删除一个对象

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public static void removeObject(String bucketName, String objectName){
try {
MinioClient minioClient = createInMinioClient();

// 从bucketName中删除objectName。
minioClient.removeObject(bucketName, objectName);
System.out.println("successfully removed " + bucketName + "/" +objectName);
} catch (MinioException e) {
System.out.println("Error: " + e);
} catch (Exception e) {
e.printStackTrace();
}
}

3.2.19 删除多个对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public static Iterable<Result<DeleteError>> removeObject(String bucketName, Iterable<String> objectNames){
Iterable<Result<DeleteError>> results = null;
try {
MinioClient minioClient = createInMinioClient();
// 删除my-bucketname里的多个对象
results = minioClient.removeObject(bucketName, objectNames);

for (Result<DeleteError> errorResult: results) {
DeleteError error = errorResult.get();
System.out.println("Failed to remove '" + error.objectName() + "'. Error:" + error.message());
}
} catch (MinioException e) {
System.out.println("Error: " + e);
} catch (Exception e) {
e.printStackTrace();
}
return results;
}

3.2.20 获取文件永久地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public static String getObjectUrl(String bucketName, String objectName){
String url = null;
try {
MinioClient minioClient = createInMinioClient();

url = minioClient.getObjectUrl(bucketName, objectName);
System.out.println(url);
} catch(MinioException e) {
System.out.println("Error occurred: " + e);
} catch (Exception e) {
e.printStackTrace();
}
return url;
}

我在github上写了个minio工具类,懒人可以借鉴一下(可以的话给个⭐Star):github.com/git-Dignity…

minio官方文档中也有明确的操作例子:docs.min.io/docs/java-c…

最后

上个星期,大学学霸过来坐坐,闲聊之中,看到我在做在线音乐,说起了我打算项目中的文件都要由minio管理,于是我就问起他文件都是怎么管理的,也是上传后端指定的文件夹。

于是我就打开minio给他看,一开始看了也觉得只是可视化界面,我说还有文件过期、权限等功能,oh ~ 还有这些功能,那不错,发下给我。

bd244e3d6e8959d37aa3870abac80a3.png

如果对您有帮助,你的点赞是我前进的润滑剂。

相关文献

Minio Windows安装和使用

MinIO Docker部署初探

以往推荐

尤大大说我的代码全部不加分号

老湿说的万物皆对象,你也信?

Vue-Cli3搭建组件库

Vue实现动态路由(和面试官吹项目亮点)

项目中你不知道的Axios骚操作(手写核心原理、兼容性)

VuePress搭建项目组件文档

koa2+vue+nginx部署

vue-typescript-admin-template后台管理系统

原文链接

juejin.cn/post/701435…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

如何利用hutool实现Excel的导出实战

发表于 2021-10-01

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

今天我们来聊聊关于导入导出的需求实现:

背景:

经过对于数据同步的经过之后,老板今天又说,我想让客户直接直观的可以将数据看到, 但是呢又不想一个个新建,你又🈶啥可以高效的方法呢,导入啊;

我提出的一个方案,就是将我们需要的数据,按照一个模板直接导入到系统平台中,达到可以显示出新建数据的需求;

导入和导出是我们常用于直观新建数据集合,导出多用于数据统计,比如当月的绩效,考勤等,钉钉就支持导出当月的打卡记录,这就是一个比较典型的导出, 导出的格式是EXCEL,其中也要确定,导出是2007-13版本的,后缀是xslx,xsl格式的,

对于导入导出的,功能,我们常见的是根据POI,Apache的开源组织的 对于EXCEL的导出,细节的话,可以看我上次的文章:xie.infoq.cn/article/8c0…(关于POI),这也是比较官方的类库,支持office套件的数据统计化的工具;

现在已经到2021年了,对于类库,已经层出不穷,所以,这次我给大家推荐一个关于直接上手的工具,名称叫hutool, 也成为“糊涂”,也即是大家可以糊涂的知道他使用就可以,因为他的底层已经封装好了很多好的类库,官网地址www.hutool.cn/(主页)

这个是我比较喜欢的一个工具,而且对于Java开发者来说,可以极大程度的上提升你的开发效率,底层封装的很高效编辑,其中主要有,我们对于JSONutil,(JSON的依赖),ID(自定义自增ID),时间戳,HTTP请求,过滤器,类型转化,以及数据校验等,当然也缺少不了,今天的小主题:

www.hutool.cn/docs/#/

常见的组件,我们可以单出的导入pom文件中,加入依赖,如果想都用,也可以使用hutool-all;

下期介绍关于如何使用hutool的工具实现导出的具体功能

今天我们来讲讲熟知的HUTOOL如何展示导出的数据文件, 这里我们用Excel的导出方式,作为方案进行;

Excel导出:

用户想要将数据直观的统计,展示,不喜欢看一行行的数据对比, 想要总结一份表格,然后给老板做汇报,这里就体现出个人化Excel报告图标的展示了,其中特殊字段,展示格式等都需要去调整;

Hutool将Excel写出封装为ExcelWriter,其中将之前POI的底层封装好,支持的版本更好,

其中我们先开始添加依赖:

1
2
3
4
5
6
7
8
9
10
11
12
xml复制代码<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>${poi.version}</version>
</dependency>

//我们只需要单纯的poi
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-poi</artifactId>
<version>4.3.5</version>
</dependency>

根据这个依赖,我们直接可以使用EXCEL

1.添加依赖

这里我主要写的是关于导出文件,response流是处理,也就是直接发送到servlet客户端下载;

我们将数据放在文件中,文件发送在返回流中;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
csharp复制代码// 通过工具类创建writer,默认创建xls格式
ExcelWriter writer = ExcelUtil.getWriter();
// 一次性写出内容,使用默认样式,强制输出标题
writer.write(rows, true);
//out为OutputStream,需要写出到的目标流

//response为HttpServletResponse对象
response.setContentType("application/vnd.ms-excel;charset=utf-8");
//test.xls是弹出下载对话框的文件名,不能为中文,中文请自行编码
response.setHeader("Content-Disposition","attachment;filename=test.xls");
ServletOutputStream out=response.getOutputStream();

writer.flush(out, true);
// 关闭writer,释放内存
writer.close();
//此处记得关闭输出Servlet流
IoUtil.close(out);

2.注意IO关闭

导出的时候,将IO关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
csharp复制代码private void write(ExcelWriter writer, HttpServletResponse response) {
OutputStream out = null;
try {
out = response.getOutputStream();
writer.flush(out);
// 关闭writer,释放内存
writer.close();
// 此处记得关闭输出Servlet流
IoUtil.close(out);
} catch (IOException e) {
log.error("数据导出异常", e);
} finally {
IoUtil.close(out);
}
}

对于数据如何写入文件:

3.数据写入

最主要要的方法是:

1
2
3
ini复制代码ExcelWriter writer = ExcelUtil.getWriter(true);
this.exportWriter(writer);
writer.write(rows);

其中row就是将我们自己的数据写入Excel中去,

1
2
3
ini复制代码List<Test> rows = CollUtil.newArrayList();

//新建了一个数据集,对于写入数据,一次按照每行去创建就可以了

因为官网的例子已经很明确了,上述的代码是我在企业中实际开发中遇到的,希望你可帮助到你,

www.hutool.cn/docs/#/poi/…(excel数据生成)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Scala 函数式数据结构与递归的艺术

发表于 2021-10-01

本章起的主要内容源自于 Paul Chiusano 所著的 《Functional Programming in Scala》。笔者将以 Scala 的视角逐步地深入了解函数式编程部分,并且尝试着充分利用 Scala 提供的语言特性 —— 隐式转换,形变,高阶类型,模式匹配等内容。

有关于高阶函数,函数柯里化,纯函数,非纯函数,副作用等概念笔者曾在之前的 Scala 之:函数式编程之始 介绍过,这里不再赘述。

Why FP

函数式编程 ( Functional Programming,简称 “FP” ) 构造在纯函数的基础之上,而纯函数隔绝了副作用。我们因此至少获得了最显著的好处:程序的运行环境不会影响到计算过程,因此也不会影响到计算结果。最简单的纯函数例子是数学的加 (+) 函数。比如给定一个 1 + 2 的表达式,它的运算结果永远是结果 3 。这个表达式不会有任何副作用,换句话说,代码中任何出现了 1 + 2 的式子,都可以简单地使用它的结果 3 代替。

引用透明与纯函数

上面的例子描述的是纯函数具备的引用透明 ( referential transparency ) 特性:一切对纯函数的调用都可以替换成它的计算结果。这种限制使得推导程序的求值结果变得更加简单而自然,我们称之为代换模型 ( substitution model ):推导计算结果就像简单地解数学题一样,不断地对参数进行等价代换,直到规约到最简单的格式。引用透明使得程序具备了等式推理的能力 ( equational reasoning ) 。这里再举出两个例子来说明:

第一个正例是 Java 和 Scala 中的字符串 String 。它是一种不可变的数据结构,所有的变换只会产生新的结果,而不会更改自身:

1
2
3
scala复制代码val x = "hello"
val r1 = x.reverse // => "olleh"
val r2 = x.reverse // => "olleh"

显然,无论在何处调用了 x.reverse ,这个结果永远保持不变。因此我们称 x.reverse 是一个引用透明的表达式,在任何调用了 x.reverse 的代码中,我们都可以使用其结果 olleh 替换之。

第二个反例是 java.lang.StringBuilder::append 方法。这个方法会改变 StringBuilder 自身,这意味着每调用一次 append 方法,它之前的状态就会被破坏:

1
2
3
scala复制代码val x = new StringBuilder("hello")
val r1 = x.append(",world") //=> "hello,world"
val r2 = x.append(",world") //=> "hello,world,world"

显然这个函数破坏了引用透明原则。r2 在调用 x.append(",world") 之前,原先的状态已经被 r1 的副作用改变了。虽然 r1 和 r2 都描述了同一个表达式,但是却得到了不同的结果,副作用使得程序行为的推理变得更加困难。

与之相对的就是代数模型更加容易推理,因为我们无需担心函数执行前后的环境变化。

模块化编程

FP 的另一个好处是让程序变得更加模块化。一个系统由多个模块所组成,这些模块应该可被独立出来理解,并支持复用的。在 FP 的世界中,一个系统的功能仅取决于这些模块本身的功能和组成顺序,而和其它的任何因素都没有关系。比如说,我们可以用 “加减乘除” 的任意组合表达出任意一个足够复杂的表达式。同样,函数之间可以相互组合 ( composite ) ,并最终形成功能强大的计算系统。

重返递归

笔者极力建议先阅读此篇文章:有趣的 Scala 语言 – 使用递归去思考。

大部分抽象逻辑都能够使用递归思维来简洁地表述。以快速排序为例子,只需要一小段描述就能够概括它:在序列中任选一个基准点并以此分出前后两个数组:前面是所有小于此基准点的元素,后面是所有大于此基准点的元素。为了让列表整体有序,继续深入到前后两个子序列中递归重复的过程,直到某一时刻选中的基准点前后没有再需要排序的子序列了。

多亏 Scala 提供的模式匹配,实现它只需要一行代码:

1
2
3
scala复制代码def quickSort(xs: List[Int]): List[Int] = {
if (xs.isEmpty) Nil else quickSort(xs.filter(_ > xs.head) ::: xs.head :: xs.filter(_ < xs.head))
}

如果说命令式编程是使用 while 或者 for 循环细心地交代程序每一步都应该怎么做,那么函数式编程的递归只需要说清两件事:做什么,到什么时候为止 ( 边界条件 ) 。感兴趣的同学还可以去上述的文章翻阅 “零钱兑换问题”,并感受更复杂的问题是如何用递归巧妙解决的。

拥抱递归,似乎一切都会变得美好。但是这里有一个不可忽视的新问题:每调用一次函数,就需要新创建一个栈帧,对于深度的递归调用,这有栈空间溢出的风险。针对这个问题,笔者下面要引入尾调用和尾递归的相关概念:

尾调用与尾递归

假定有两个函数 f(·) 和 g(·),如果函数 f(·) 最终仅返回对 g(·) 的调用而没有任何额外赋值或者计算操作,则称 f(·) 进行了尾调用。

1
2
3
4
5
6
7
scala复制代码def f(x: Int, acc : Int): Int = {
// 允许有其它的操作
// 尾调用
g(x + 1, acc + 1)
}

def g(x: Int, acc: Int): Int = 3 * x + acc

下面的情况不能称之尾调用,因为 f(·) 调用 g(·) 之后还要进行一步赋值操作:

1
2
3
4
5
scala复制代码  def f(a : Int, acc : Int) : Int = {
val result = g(a + 1,acc + 1)
result
}
def g(x: Int, y: Int): Int = 3 * x + y

下面的情况同样不能称之尾调用,因为 f(·) 调用 g(·) 之后还需要后续的 + 1 计算:

1
2
scala复制代码  def k(a : Int, acc : Int) : Int = g(a + 1,acc + 1) + 1
def g(x: Int, y: Int): Int = 3 * x + y

如果 f(·) 以尾调用的方式进行了递归,则称 f(·) 是一个尾递归函数。下面的函数代表一个阶乘计算:

1
2
3
4
scala复制代码def f(a : Int,acc : Int) : Int = {
if (a <= 1) acc
else f(a-1,a * acc)
}

该函数要么返回值 acc 中断递归,要么就只返回对自身的一个新调用 f(a - 1,a * acc)。而下面的函数不能称为尾递归:

1
2
3
4
scala复制代码def f(a : Int) : Int = {
if(a <= 1) 1
else a * f(a -1)
}

这是阶乘函数的另外一种写法,而 f(a-1) 不再是尾部,因为它还要参与到另一个乘运算当中。

为何强调尾递归优化

假使 f(·) 对 g(·) 进行了尾调用,当解释器为 g(·) 创建新的栈帧时,就可以将原来的 f(·) 从栈中弹出,因为 f(·) 没有任何后续计算了。同样,若 g(·) 尾调用了函数 h(·) ,则运行 h(·) 时,g(·) 也可以从栈中弹出。很容易就能推导出来,由于这种 “边进边出” 的机制,尾递归的整个过程实际上只占用一个栈帧。

但如果 f(·) 并没有进行尾调用,那么就需要等待 g(·) 返回值之后再进行后续的计算,f(·) 才算执行完毕。因此程序将g(·) 引入栈时, f(·) 不可以从栈帧弹出。此 “只进不出” 的非尾递归的情况,解释器需要为此创建和递归次数等量的栈帧,这种情况下就存在 Stack overflow 的风险。

CSDN:尾递归与尾调用 | CSDN:函数式编程-尾递归与尾调用

Scala 对尾递归函数的处理

对于尾递归函数,Scala 编译器通常都会尝试着将它翻译成是等效的迭代循环,但并不会通知你是否尝试成功。Scala 提供一个用于函数上的注解:@tailrec ,它用于显式地检查该函数是否是尾递归函数。如果不是,则会直接提示编译错误。

对于使用 IntelliJ IDEA 的程序员,IDE 会使用 tailrec.png 符号标记出尾递归 ( 或尾调用) 函数,而非尾递归的函数则使用 non-tailrec.png 符号。

定义函数式数据结构

函数式数据结构,只能被纯函数操作,并且仅返回一个新的值,而不改变原来的状态,因此 FP 范式中所操作的数据结构天生就是不可变的。这一章,我们仿写 Scala 的 List —— 以直观方式了解最普遍存在的函数式数据结构,单向链表。

在 Scala 提供的原生 List 中,非空结点为 :: ( 我们后续去分析这个熟悉的操作符究竟是什么来头,以及为什么要这么命名 ),而空结点为 Nil 。不过,出于 “避嫌” 的目的,笔者在命名上有所区别。一个链表 Chain 可包含两种类型:非空的结点类型 Cons 和代表 “空结点” 的 Vain 类型。

1
2
3
scala复制代码sealed trait Chain[+A]
case object Vain extends Chain[Nothing]
case class Cons[+A](head: A, tail: Chain[A]) extends Chain[A]

除此之外,Chain 还内置了一个与之协变的类型参数 A ,这样继承于 Chain[Nothing] 的 “链表底类型” Vain 能够用在任何一个 Chain[A] 类型的链表中充当 “空结点” 的角色。表示 “空” 的结点需要有一个就够了,因此我们还将 Vain 设置为了单例对象。

Head 与 Tail

在以往的链表定义中,我们常常为每个结点都设置一个 “前一个结点” 或者是 “后一个结点”。但此处 Cons 的定义有所不同,head 代表当前链表的首元素,而 tail 则代表除了首元素剩下的部分。

在之前的认知中,链表是 “一环接一环” 的,而在此处变成了 “一层套一层” 。对于一切的链表操作,都可以拆分成对 head 的操作和剩余链表 tail 的递归操作,这样的定义方式和后续的各类转换器实现完美契合。

下面给出 head 和 tail 的方法,它们都是很容易被理解的模式匹配。我们这里以 “第三人称” 的视角对链表进行操作,因此后续所有的函数 ( 或者称之为 “方法” ) 全部定义到伴生对象中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala复制代码object Chain { 

def getHead[A](chain: Chain[A]): A = {
chain match {
case Vain => throw new NullPointerException("there's nothing in this chain.")
case Cons(head, _) => head
}
}

def getTail[A](chain: Chain[A]): Chain[A] = {
chain match {
case Vain => Vain
case Cons(_, tail) => tail
}
}
//TODO ...
}

为了能够方便地通过变参形式创建链表并使用,我们还需要在伴生对象中定义一个 apply 方法。根据链表的递归定义,其逻辑如下:

1
2
3
4
scala复制代码 //  非尾递归实现,但是不依赖其它函数
def apply[A](items: A*): Chain[A] = {
if (items.isEmpty) Vain else Cons(items.head, apply(items.tail: _*))
}

每从 items 当中提取出一个参数,就构造出一个以它为首元素的子链表,然后递归装入剩下的参数,直到所有的参数都被链入这个链表中,并以 Vain 元素作为链表的结尾。它目前还不是尾递归函数,不过我们先不着急优化它。

尝试着创建并打印这个自定义的链表,观察它的构造。其中 _* 表示将一个 Seq[_] 内的元素提取出来并作为多个参数传入:

1
2
scala复制代码val chain: Chain[Int] = Chain(1 to 3: _*)
println(chain) // Cons(1,Cons(2,Cons(3,Vain)))

通过打印结果能观察到,这个自定义链表是嵌套结构。对于至少有一个元素的链表,屏幕会打印 Cons(_,Vain) ,而对于空链表,则根据定义,屏幕只会打印一个 Vain 。

函数式数据结构中的数据共享

当数据不可变时,该如何实现对数据的删除,增加之类的功能?通过上述模式匹配的代码块可以发现,如果想要在一个已有的 xs 链表前面再添加元素1 ,这仅仅需要返回一个 Cons(1,xs) 。由于链表 Chain[A] 是不可变的,那我们不需要对原先的 xs 链表重新做一遍复制来避免对其修改和污染,而是放心地直接复用它,这种做法称之为数据共享。

同样,如果要返回 xs 的 tail 部分,我们仅需要借助模式匹配 Cons(_,tail) 返回链表 xs 除了首部以外的所有元素,而原始的 xs 链表不会受到任何影响。我们说函数式的数据结构是持久,可推理的,这意味着已存在的引用不会因为后续的操作而发生改变。

除了添加,删除等基本操作,在下文我们还将尝试着逐步去实现 Scala 原生 List 中提供的各种转换器功能。

折叠操作

我们先假设有一个 Chain[Int] 类型的链表,然后思考如何用递归的方式实现元素累加 sum 和累乘 product 操作,并最终仅返回一个 Int 型结果。

回顾笔者刚刚提到的:”对于一切的链表操作,都可以拆分成对 head 的操作和剩余链表 tail 的递归操作” 。实际上我们需要考虑的只有两件事:首先是对 head 的操作,然后对 tail 做递归即可;另一件事是当递归处于临界条件时的返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scala复制代码// 对 Chain[Int] 链表的累加方法
def sum(ints: Chain[Int]): Int = {
ints match {
case Vain => 0
case Cons(head, tail) => head + sum(tail) // Can be generalized
}
}

// 对 Chain[Int] 边表的累乘方法
def product(ints: Chain[Int]): Int = {
ints match {
case Vain => 1
case Cons(head, tail) => head * product(tail) // Can be generalized
}
}

很明显,这两个代码在结构上有高度的一致性,唯一的区别在于到达临界条件时的返回值,以及递归过程中对中间结果的累积方式 ( 笔者使用 Can be generalized 标注出的地方 ) 。

现在尝试着将它泛化成更加一般的 foldRight 右折叠方法:将累积操作 op 抽象成满足 (A,B)=>B 类型的函数。这里假设我们最后的累积结果是 B 类型,在每次的递归过程中,把所有的元素全部向右累积。

1
2
3
4
5
6
7
scala复制代码//更加泛化的高阶函数,非尾递归
def foldRight[A,B](chain: Chain[A])(init: B)(op: (A, B) => B): B = {
chain match {
case Vain => init
case Cons(head, tail) => op(head, foldRight(tail)(init)(op))
}
}

这里我们还对 foldRight 进行了两步柯里化操作,其目的是当我们分开传入 chain 和 init 时,Scala 编译器能够自动推导出 op 实际的类型。比如说:

1
2
scala复制代码val chain: Chain[Int] = Chain(1 to 3: _*)
println(Chain.foldRight(chain)(0)(_+_))

这段程序将以 0 作为初始值,并不断地将链表内的元素向右边聚合,显然,这个结果应当是 6。

优化非尾递归函数的思路

不过,这依旧不令人满意,因为目前的递归并不是尾递归。原因在于每一步递归调用时都需要 “保存现场” ,直到下一个递归返回结果时才会继续计算,这导致了一连串的 “等待链”:

1
2
scala复制代码// 需要等待下一次递归的 foldRight(tail)(init)(op) 返回结果
case Cons(head,tail) => op(head,foldRight(tail)(init)(op))

它的效率并不高,原因是每一层递归除了展开 op 操作然后将 init “抛给” 下一个递归调用之外,它什么都没有做。直到递归已经展开到边界时,程序才开始使用 init 逐级返回运算结果。

那不妨在每次递归过程中,直接将 head 元素右规约到 init 并作为本次的运算结果,然后将它作为参数传入到下次递归调用。这样,每层递归不再需要相互等待运算结果,尾递归函数就构造出来了。 或许可以这样理解:我们将运算过程从 “先后向传播再前向传播” 直接改造成了 “向后边传播边计算” 的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala复制代码def foldRight[A, B](chain: Chain[A], acc: B)(op: (A, B) => B): B = {
chain match {
case Vain => acc
case Cons(head, tail) => foldRight(tail, op(head, acc))(op)
}
}
//同理,可以实现左规约,所有的参数向左累积。
def foldLeft[A, B](chain: Chain[A], init: B)(op: (B, A) => B): B = {
chain match {
case Vain => init
case Cons(head, tail) => foldLeft(tail, op(init, head))(op)
}
}
//通过左规约实现的右规约。
def foldLeftByfR[A, B](chain: Chain[A], init: B)(op: (B, A) => B): B = {
foldRight(reverse(chain), init)((a, b) => op(b, a))
}

现在,当右折叠达到递归条件时,将直接返回累积结果。这种将非尾递归转化为尾递归函数的套路可以推广开来,比如说实现倒置链表元素的 reverse 方法,以及基于此尾递归函数的 apply 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala复制代码def reverse[A](chain: Chain[A]): Chain[A] = {
def loop(chain: Chain[A], acc: Chain[A]): Chain[A] = {
chain match {
case Vain => acc
case Cons(head, tail) => loop(tail, Cons(head, acc))
}
}
loop(chain, Vain)
}

// 只依赖尾递归的 reverse 的新 apply 方法。
def apply[A](items: A*)(implicit acc: Chain[A] = Vain): Chain[A] = {
if (items.isEmpty) reverse(acc) else apply(items.tail: _*)(Cons(items.head, acc))
}

reverse 函数的思想是:”先装入的元素会排到后面” ( 想象一个空链表不断头插值的过程 )。此外,非尾递归都可以优化成一个 “套壳函数” 提供初始值,并尾调用另一个尾递归的形式。

不过对于 Scala 而言,我们还可以利用隐式参数简化这段代码。这本质上是省略了第一次调用 loop 函数时提供初始值的步骤,因此也就不再需要外部的一层 “套壳函数” 了。

1
2
3
4
5
6
7
scala复制代码def reverse[A](chain: Chain[A])(implicit acc : Chain[A] = Vain): Chain[A] = {
chain match {
case Vain => acc
case Cons(head, tail) => reverse1(tail)(Cons(head, acc))
}
reverse(chain)
}

隐式参数的精髓在于 “隐藏” :对于代码的调用者而言,隐式参数意味着他不需要主动提供一个初始值 ( 最好让他对此毫无感知 ) ,在某些场合下用起来非常合适。比如,一个定义在伴生对象的求链表长度 size 方法。作为代码的使用者,他很自然地认为这个方法入参只需要一个被计算长度的链表,而不是由自己再额外提供一个 “用于累积求链表长度的初始值 0”。

1
2
3
4
5
6
7
8
9
10
11
scala复制代码def size(chain: Chain[_],depth: Int): Int = {
def loop(chain: Chain[_],depth : Int) : Int = {
chain match {
case Vain => depth
case Cons(_, tail) => size(tail)(depth + 1)
}
}
loop(chain,depth)
}
//一个不好的设计,初始值不需要也不应该由用户来提供。
Chain.size(chain,0)

除此之外,这需要对 Scala 的隐式值机制有足够的了解,避免隐式变量被上下文的隐式值 “覆盖” 而得到预期外的结果。下面以一个例子说明:

1
2
scala复制代码def f(i : Int)(implicit a : Int = 5) : Int = g(i)
def g(int: Int)(implicit b : Int = 0): Int = int + b

如果调用函数 f(1) ,最终得到的结果将是 6 而非 1 。 从结构上看,可以认为函数 f(·) 是函数 g(·) 的闭包。若函数 f(·) 的隐式变量 a 被赋值为 5 ,这会隐含地表达了 “f(·) 的作用域内所有 Int 类型的隐式值都应是 5“。因此,其作用域内的函数 g(·) 的隐式变量 b 的值也将默认取 5 ,而不是 0 ,除非以 g(i)(0) 的形式覆盖掉上下文环境中默认定义的隐式值。

从追加元素抽象到聚合列表

设有 Chain[Chain[A]] 类型的嵌套列表,设计一个 flat 函数将它 “展平” 成一个 Chain[A] 类型。以现有的函数来看,直接实现这个功能有点困难。我们可以分步骤来执行:

  1. 考虑如何实现尾部追加单个元素的 append 方法;
  2. 考虑如何实现合并两个链表的 concat 方法;
  3. 最终实现对嵌套链表的 flat 方法。

这三个方法的逻辑是递进关系:高级的方法可以借助低级的方法来便捷实现。

对于 append 方法有两种方式实现,第一种是重新复制一份原有的链表并在最后添加新元素 ( 因为不能修改原有的链表 );第二种方法是调用两次尾递归函数 reverse:先将链表 “倒过来” ,头插入新元素,然后再将链表 “正过来”,思路很清晰,但是运行效率较低。笔者在这里选择的是前者:

1
2
3
4
5
6
7
8
scala复制代码def append[A](chain:Chain[A],last:A)(implicit acc : Chain[A] = Vain) : Chain[A] = {
chain match {
case Vain => Cons(last,acc)
case Cons(head,tail) => append(tail,last)(Cons(head,acc))
}
}

// def append[A](chain: Chain[A], last: A): Chain[A] = reverse(Cons(last, reverse(chain)))

对于两个链表 ca 和 cb ,一个直观的想法是:只要利用 append 方法不断地将 cb 的首元素追加到 ca 的尾部 ( 实际上返回的是新的链表,而不是 ca 本身被改变了 ),最终就可以实现 concat 方法的效果了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala复制代码  def concat[A](ca: Chain[A], cb: Chain[A]): Chain[A] = {

def concatTail(ca: Chain[A], cb: Chain[A]): Chain[A] = {
cb match {
// 当 cb 为空,则不需要合并,这时直接返回前者。
case Vain => ca
case Cons(head, tail) => concatTail(append(ca, head), tail)
}
}

(ca, cb) match {
case (Vain, nonVainB) => nonVainB
case (nonVainA, Vain) => nonVainA
case _ => concatTail(ca, cb)
}
}

至于 flat 方法,可以想象成它是一个特化的右 ( 左 ) 叠加过程:将 Chain[Chain[A]] 内部的多个 Chain[A] 以两两的形式通过 concat 方法规约,并最终达到将 Chain[Chain[_]] “降维” 到 Chain[A] 的目的。

1
2
scala复制代码//这里 Vain:Chain[A] 表示将单例对象 Vain 看作成 Chain[A] 的一个特殊实例
def flat[A](l: Chain[Chain[A]]): Chain[A] = foldRight(l, Vain: Chain[A])(concat)

这里涉及到了一个 Scala 的语法现象,为了编译器能够正确地推导出期望的 op 类型,此处我们将单例对象 Vain 特指为了 Chain[A] 的一个实例。否则,编译器将认为 op 应当是一个 (Chain[A],Vain.type) => Vain.type 类型而提示传入的 concat 是 “错误” 的函数类型。

一切均可从折叠方法中抽象

实际上,一切对链表的操作都可以视作是左折叠/右折叠的演化版本。下面是基于折叠操作或其它已有的尾递归函数实现的 map ,flatMap ,和 fiter 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
scala复制代码def map[A, B](chain: Chain[A])(a2b: A => B): Chain[B] = {
//可以简写成:foldLeft(chain,Vain:Chain[B])((t,h) => Cons(a2b(h),t))
val trans: (Chain[B], A) => Cons[B] = (chain, head) => Cons(a2b(head), chain)
foldLeft(chain, Vain: Chain[B])(trans)
}

def filter[A](chain: Chain[A])(whether: A => Boolean): Chain[A] = {
//可以简写成:foldLeft(chain,Vain:Chain[A])((chain,a) => if(whether(a)) Cons(a,chain) else chain)
val trans: (Chain[A], A) => Chain[A] = (chain, a) => if (whether(a)) Cons(a, chain) else chain
foldLeft(chain, Vain: Chain[A])(trans)
}

def flatMap[A](chain: Chain[A])(toChain: A => Chain[A]): Chain[A] = link(map(chain)(toChain))
//基于 flatMap 实现的过滤器 filter
def filterByFm[A](chain: Chain[A])(whether: A => Boolean): Chain[A] = {
val trans: A => Chain[A] = (a: A) => if (whether(a)) Chain(a) else Vain
flatMap(chain)(trans)
}

至此,我们所有实现的函数已经具备了下列的层次关系:

scala_fp.png

更加复杂的例子:匹配子序列

要求:给定一个长序列和一个短序列,它们同属于 Chain[A] 类型,要求判定短序列是否是长序列的子序列。

同样,我们先考虑 Chain[Int] 类型的情形,然后再考虑怎么给出泛化版本。你可以将它想象成是 “对齐游标卡尺” 的过程:首先对齐长短序列的一端,然后继续尝试对齐长短序列的剩下部分。如果失败了,那么就 “滑动” 到后面的序列中继续寻找长短序列开头对齐的部分,然后如此递归。

如果长序列的剩余长度已经比短序列还要短了,我们可以断言该短序列一定不是长序列的子序列。另外一个关键点是:在滑动的过程中,短序列一定不能被更改。下面用一段动画来表述这段逻辑:检查 064 是否是 37064 的子序列。

sub_seq.gif

用代码描述这段逻辑则是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scss复制代码def contains(long : Chain[Int],short : Chain[Int]): Boolean = {
   if(Chain.size(long) < Chain.size(short)) return false
  (long,short) match {
       case (Cons(lh,lt),Cons(sh,st)) if lh == sh =>
      if(compareLeft(lt,st)) true else contains(lt,short)
       case (Cons(_,lt),Cons(_,_)) => contains(lt,short)
       case _ => false
  }
}
def compareLeft(longLeft : Chain[Int],shortLeft: Chain[Int]) : Boolean = {
  (longLeft,shortLeft) match {
       case (Cons(lh,_),Cons(sLast,Vain)) if lh == sLast => true
       case (Cons(lh,lt),Cons(sh,st)) if lh == sh => compareLeft(lt,st)
       case _ => false
  }
}

在完成了 Chain[Int] 版本的子序列匹配之后,我们再去推广类型为 A 时的一般情形。显然,这里要求 A 是可比较的类型,否则我们就无法用 == 准确描述出 “相同,相等” 的概念。

因此这里使用了上下文界定的方式提供由 A => Ordered[A] 的一个适配器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
scala复制代码/**
 * 通过上下文界定将它改写成适配任何元素列表的子序列查找方法。
 *
 * @param long   长序列
 * @param short   短序列
 * @param adaptor 转换器
 * @tparam A 列表元素
 * @return 若在长序列中找到了满足条件的子序列,则返回 true 。
 */
def contains_generic[A](long: Chain[A], short: Chain[A])(implicit adaptor: A => Ordered[A]): Boolean = {
 if (Chain.size(long) < Chain.size(short)) return false
(long, short) match {
   case (Cons(lh, lt), Cons(sh, st)) if lh == sh =>
     if (compareLeft_generic(lt, st)) true else contains_generic(lt, short)
   case (Cons(_, lt), Cons(_, _)) => contains_generic(lt, short)
   case _ => false
}
}

def compareLeft_generic[A](longLeft: Chain[A], shortLeft: Chain[A])(implicit adaptor: A => Ordered[A]): Boolean = {
(longLeft, shortLeft) match {
   case (Cons(lh, _), Cons(sLast, Vain)) if lh == sLast => true
   case (Cons(lh, lt), Cons(sh, st)) if lh == sh => compareLeft_generic(lt, st)
   case _ => false
}
}

优雅的符号表示法

Scala 的 “开明之处” 是允许使用符号组合来作为标识符来提升代码的可读性,以至于用户有时认为你设计出了新一套 DSL 。在原生的 Scala List 列表中,空列表使用 Nil 表示,而非空列表则是 :: 符号。当我们使用 List(1,2,3) 构造出一个不可变列表时,它实际上是 ::(1,::(2,::(3,Nil))) 。

样例类作为中缀符号匹配的情形

这里还需引入 Scala 提供的另一个语法特性。对于样例类而言,如果它仅有两个参数,比如:

1
scala复制代码case class Operator(a : Int, b : Int)

那么当进行模式匹配时,它可以被当作是一个中缀运算符来处理:

1
2
3
scala复制代码case a Operator b => ... 
// 这是之前学过的对象匹配模式,两者等效。
case Operator(a,b) => ...

Scala 将非空列表命名为 :: 的 “良苦用心”,就是为了用户能以简明方式表述出对列表的复杂匹配。观察下面表达相同意思的四个匹配语句,显然第一行的可读性最强。

1
2
3
4
scala复制代码case 1 :: 2 :: Nil => ...
// case ::(1,::(2,Nil)) => ...
// case Cons(1,Cons(2,Vain)) => ...
// case 1 Cons 2 Cons Nil => ...

向右操作符

另一个特殊的机制是,Scala 将所有以冒号 : 为结尾的操作符视作是右操作符,它的意思是这个符号会绑定在右操作元上。比如说表达式 1 :: xs ,按照常规的 Scala 中缀表达式表示法来理解,这应当是 1.::(xs) 。但作为元素的 Int 类型并没有 :: 方法,这说不通。实际的情况是: :: 作为右操作符,这个式子表达的意义是 xs.::(1) 。

此外,在非模式匹配的场合,仍然能够以类似 1 :: 2 :: 3 :: Nil 的方式构造 List(1,2,3) ,这是因为 List 提供了同名 :: 方法:

1
2
scala复制代码// 返回以新元素为 head,以自身 (this) 为 tail 的新 :: 实例。
def ::[U>:T](x:U): List[U] = new Scala.::(x,this)

我们也不难理解为何当使用 :: 符号构造列表时,编译器要求最右侧一定是 List 类型了,因为该方法的 this 关键字总是指向一个 List 对象。

笔者这次以符号形式给出了Scala 原生 List 的另一个仿写版本,它是仅包含了 apply 方法以及所依赖的 reverse 方法的精简版本。同时,为了避免协变的类型 A 出现在逆变点上,笔者使用上界对其进行了等效替换 ( 详情见笔者之前介绍的 Scala:形变章节 )。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
scala复制代码sealed trait Link[+A]{
 // 为了避免 ~>:(e,this) 在此处被误认为是递归函数,因此这个 new 关键字不能省略。
 def ~>:[O>:A](e : O) : Link[O] = { new ~>:(e,this)}  
 def reverse[O>:A]: Link[O] = {
   @tailrec
   def loop(left : Link[O], acc : Link[O]) :Link[O] = {
      left match {
        case Empty => acc
        case h~>:tail => loop(tail,h->:acc)
      }
  }
   loop(this,Empty)
}
}
case object Empty extends Link[Nothing]
case class ~>:[+A](head : A, tail : Link[A]) extends Link[A]
object Link {
 def apply[A](a: A) : Link[A] = a~>:Empty
 def apply[A](as:A*)(implicit result : Link[A] = Empty) : Link[A] = {
   if(as.isEmpty)  result.reverse
   else apply(as.tail:_*)(as.head~>:result)
}
}

现在,我们也可以用 ~> 符号构造出一个非空列表:

1
2
3
scala复制代码//先运算 2~>: Empty  = ~>:(2,Empty)
//然后运算 1~>:(~>:(2,Empty)) = ~>:(1,~>:(2,Empty))
1 ~>: 2 ~>: Empty

或者在模式匹配中直接用符号形式表述出匹配逻辑:

1
2
3
4
scala复制代码  1 ~>: 2 ~>: Empty match {
   case 1~>:Empty => println("has only one element: 1.")
   case 1~>:_ => println("starts with element: 1.")
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Dubbo 30 BootStrap 初始化主流程

发表于 2021-10-01

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 . 前言

这一篇只预览一下 Bootstrap 初始化的主流程 , 部分细节点会另外分析

Dubbo 3.0 通过 DubboBootstrap 进行初始化逻辑 , DubboBootstrap 的启动逻辑如下 :

1
2
3
4
5
6
JAVA复制代码// 核心原理为 ApplicationContextEvent 事件的触发 , 当 SpringApplication 启动后 ,会发布该事件

C- AbstractApplicationContext # refresh : 发布 ApplicationContextEvent 事件
C- DubboBootstrapApplicationListener # onApplicationContextEvent : 监听Application 事件
C- DubboBootstrapApplicationListener # onContextRefreshedEvent
C- DubboBootstrap # start : 开始加载操作

主要涉及类

1
2
3
4
5
6
7
8
java复制代码// 参考类 : 
DubboBeanUtils
ServiceAnnotationPostProcessor
DubboClassPathBeanDefinitionScanner


// 监听器 :
DubboBootstrapApplicationListener

二 . 入口详解

Dubbo 的初始化主要经过以下几个类 : DubboBootstrap -

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码// Step 1 : DubboBootstrapApplicationListener
public void onApplicationContextEvent(ApplicationContextEvent event) {
if (DubboBootstrapStartStopListenerSpringAdapter.applicationContext == null) {
DubboBootstrapStartStopListenerSpringAdapter.applicationContext = event.getApplicationContext();
}
// Step 2-1 : 容器启动后初始化 Bootstrap , 看名称这里应该还可以刷新
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
// 容器关闭逻辑
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}

// Step 2-1
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
if (dubboBootstrap.getTakeoverMode() == BootstrapTakeoverMode.SPRING) {
dubboBootstrap.start();
}
}

// Step 2-2
private void onContextClosedEvent(ContextClosedEvent event) {
if (dubboBootstrap.getTakeoverMode() == BootstrapTakeoverMode.SPRING) {
DubboShutdownHook.getDubboShutdownHook().run();
}
}

三 . DubboBootstrap 启动流程

3.1 Start 主流程

start 环节主要由以下几个环节 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码Step 1 : 设置启动参数
Step 2 : initialize 初始化
Step 3 : exportServices 输出 Services
Step 4 : referServices 注册 refer Service
Step 5 : onStart() 启动 (异步情况下会创建 Thread 后启动)

// 启动 Dubbo 容器环境
dubboBootstrap.start();

// Step 1 :Start 逻辑
public DubboBootstrap start() {
if (started.compareAndSet(false, true)) {
startup.set(false);
initialized.set(false);
shutdown.set(false);
awaited.set(false);

initialize();
// 1. export Dubbo Services
exportServices();

// Not only provider register
if (!isOnlyRegisterProvider() || hasExportedServices()) {
// 2. export MetadataService
exportMetadataService();
// 3. Register the local ServiceInstance if required
registerServiceInstance();
}

referServices();
if (asyncExportingFutures.size() > 0 || asyncReferringFutures.size() > 0) {
new Thread(() -> {
try {
this.awaitFinish();
} catch (Exception e) {
logger.warn(NAME + " asynchronous export / refer occurred an exception.");
}
startup.set(true);
onStart();
}).start();
} else {
startup.set(true);
onStart();
}

}
return this;
}

3.2 设置启动参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码if (started.compareAndSet(false, true)) 

startup.set(false);
initialized.set(false);
shutdown.set(false);
awaited.set(false);

// 此处有几个主要的操作 :
1. 原子设置已经启动
2. 设置属性

// 主要看一下这4个属性
private AtomicBoolean initialized = new AtomicBoolean(false);
private AtomicBoolean started = new AtomicBoolean(false);
private AtomicBoolean startup = new AtomicBoolean(true);
private AtomicBoolean destroyed = new AtomicBoolean(false);
private AtomicBoolean shutdown = new AtomicBoolean(false)

3.3 initialize 初始化内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public void initialize() {
if (!initialized.compareAndSet(false, true)) {
return;
}

ApplicationModel.initFrameworkExts();

// 初始化配置
startConfigCenter();
loadConfigsFromProps();
checkGlobalConfigs();

// 初始化 Service
startMetadataCenter();
initMetadataService();
}

3.3.1 ApplicationModel.initFrameworkExts()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
java复制代码// C- DubboBootstrap
public static void initFrameworkExts() {
Set<FrameworkExt> exts = ExtensionLoader.getExtensionLoader(FrameworkExt.class).getSupportedExtensionInstances();
// PIC21005
for (FrameworkExt ext : exts) {
ext.initialize();
}
}


// Step 1 : 通过 SPI 进行插件处理
@SPI
public interface FrameworkExt extends Lifecycle {

}

// 补充 : Dubbo 中的 Lifecycle 对象
- ConfigManager
- Environment
- LifecycleAdapter
- ServiceRepository
- SpringExtensionFactory


// Step 2-1 : ConfigManager 部分
public void initialize() throws IllegalStateException {
String configModeStr = null;
try {
configModeStr = (String) ApplicationModel.getEnvironment().getConfiguration().getProperty(DUBBO_CONFIG_MODE);
if (StringUtils.hasText(configModeStr)) {
this.configMode = ConfigMode.valueOf(configModeStr.toUpperCase());
}
} catch (Exception e) {
throw new IllegalArgumentException(msg, e);
}
}

// Step 2-2 : Environment 部分
public void initialize() throws IllegalStateException {
if (initialized.compareAndSet(false, true)) {
this.propertiesConfiguration = new PropertiesConfiguration();
this.systemConfiguration = new SystemConfiguration();
this.environmentConfiguration = new EnvironmentConfiguration();
this.externalConfiguration = new InmemoryConfiguration("ExternalConfig");
this.appExternalConfiguration = new InmemoryConfiguration("AppExternalConfig");
this.appConfiguration = new InmemoryConfiguration("AppConfig");

loadMigrationRule();
}
}


// Step 2-3 : ServiceRepository 部分

PIC21005 : FrameworkExt 对象列表

Dubbo-Framaket-module.png

3.3.2 startConfigCenter 简述

这个阶段总共可以分为3步 :

  1. 分别加载 ApplicationConfig 和 ConfigCenterConfig
  2. 对 configCenterConfig 进行配置
  3. 对 Application , Monitor , Modules 等多个组件进行最后的刷新
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码private void startConfigCenter() {

// Step 1 : 处理对应的 Config , 此处简单一点说分为2个类型 , 很好的思路 , 后面深入了解一下

// > 类型一 : 存在class 对应配置
// 1. 通过 class 类型去 newInstance 实例化对应类型的 Class 对象
// 2. 调用对应的 refush 进行刷新处理 (TODO : 后续分析 , 就是配置的重写流程)
// 3. 缓存到 configManager 中

// > 类型二 : 不存在对应的配置
// 1. 直接从 ApplicationModel 中获取 Environment
// 2. newInstance 实例化后放在 configManager 中
loadConfigs(ApplicationConfig.class);
loadConfigs(ConfigCenterConfig.class);


useRegistryAsConfigCenterIfNecessary();

// check Config Center -> PS:332001
Collection<ConfigCenterConfig> configCenters = configManager.getConfigCenters();
if (CollectionUtils.isEmpty(configCenters)) {
// 通常配置了注册中心这里都会有值 , 如果为空 , 此处 new 了一个基础实现
} else {
for (ConfigCenterConfig configCenterConfig : configCenters) {
// Step 2 : 对 configCenterConfig 进行配置 , 后续深入
configCenterConfig.refresh();
ConfigValidationUtils.validateConfigCenterConfig(configCenterConfig);
}
}

if (CollectionUtils.isNotEmpty(configCenters)) {
CompositeDynamicConfiguration compositeDynamicConfiguration = new CompositeDynamicConfiguration();

// 这里要对 environment 环节进行配置
for (ConfigCenterConfig configCenter : configCenters) {
// Pass config from ConfigCenterBean to environment
environment.updateExternalConfigMap(configCenter.getExternalConfiguration());
environment.updateAppExternalConfigMap(configCenter.getAppExternalConfiguration());

// Fetch config from remote config center
compositeDynamicConfiguration.addConfiguration(prepareEnvironment(configCenter));
}
environment.setDynamicConfiguration(compositeDynamicConfiguration);
}

// Step 3 : 此处对 Application , Monitor , Modules 等多个组件进行最后的刷新 TODO
configManager.refreshAll();
}

补充 : ConfigCenterConfig 对象的作用

该对象中包含连接的基础配置信息 , 例如 protocol , address , port , cluster , namespace 等

灵活利用该 ConfigCenterConfig 对象可以集成多种不同的注册中心

补充 : PS:332001 结构

image.png

3.3.3 loadConfigsFromProps 简述

这里和上面其实差不多 , loadConfigs 是通过特殊的前缀对配置文件的配置进行读取

之前看过多个开源项目对于配置文件的加载思路 , 基本上殊途同归 , 都是以类区分 , 通过前缀加载 , 后面会单独分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码private void loadConfigsFromProps() {

// application config has load before starting config center
// load dubbo.applications.xxx
loadConfigs(ApplicationConfig.class);

// load dubbo.modules.xxx
loadConfigs(ModuleConfig.class);

// load dubbo.monitors.xxx
loadConfigs(MonitorConfig.class);

// load dubbo.metricses.xxx
loadConfigs(MetricsConfig.class);

// load multiple config types:
// load dubbo.protocols.xxx
loadConfigs(ProtocolConfig.class);

// load dubbo.registries.xxx
loadConfigs(RegistryConfig.class);

// load dubbo.providers.xxx
loadConfigs(ProviderConfig.class);

// load dubbo.consumers.xxx
loadConfigs(ConsumerConfig.class);

// load dubbo.metadata-report.xxx
loadConfigs(MetadataReportConfig.class);

// config centers has bean loaded before starting config center
//loadConfigs(ConfigCenterConfig.class);

}

3.3.4 checkGlobalConfigs 简述

在这个环节中会通过 ConfigValidationUtils 进行配置校验 , 以及端口处理 , 最后通过 refresh 刷新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码private void checkGlobalConfigs() {
// check config types (ignore metadata-center)
List<Class<? extends AbstractConfig>> multipleConfigTypes = Arrays.asList(
ApplicationConfig.class,
ProtocolConfig.class,
RegistryConfig.class,
MetadataReportConfig.class,
ProviderConfig.class,
ConsumerConfig.class,
MonitorConfig.class,
ModuleConfig.class,
MetricsConfig.class,
SslConfig.class);

for (Class<? extends AbstractConfig> configType : multipleConfigTypes) {
// 通过 ConfigValidationUtils 对配置进行校验
checkDefaultAndValidateConfigs(configType);
}

// check port conflicts
//
Map<Integer, ProtocolConfig> protocolPortMap = new LinkedHashMap<>();
for (ProtocolConfig protocol : configManager.getProtocols()) {
Integer port = protocol.getPort();
if (port == null || port == -1) {
continue;
}

// 此处如果存在端口冲突 , 这里会直接抛出异常
ProtocolConfig prevProtocol = protocolPortMap.get(port);
if (prevProtocol != null) {
throw new IllegalStateException
}
protocolPortMap.put(port, protocol);
}

// check reference and service
// 这里正式开始处理 reference 和 DubboService , TODO : 后面单章分析
for (ReferenceConfigBase<?> reference : configManager.getReferences()) {
reference.refresh();
}
for (ServiceConfigBase service : configManager.getServices()) {
service.refresh();
}
}

3.3.5 startMetadataCenter 简述

此处是元数据的处理 , 元数据是新特性 , 可以在配置文件中使用前缀“dubbo.metadata-report”设置MetadataReportConfig的属性

什么是元数据 :

  • 元数据是为了减轻注册中心的压力,将部分存储在注册中心的内容放到元数据中心。
  • 元数据中心的数据只是给自己使用的 , 改动无需刷新
  • 不需要监听和通知 , 极大的减少了性能的消耗
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码private void startMetadataCenter() {

useRegistryAsMetadataCenterIfNecessary();

ApplicationConfig applicationConfig = getApplication();

String metadataType = applicationConfig.getMetadataType();
// FIXME, multiple metadata config support.
Collection<MetadataReportConfig> metadataReportConfigs = configManager.getMetadataConfigs();
if (CollectionUtils.isEmpty(metadataReportConfigs)) {
//... 这里配置了远程注册中心时默认都会有数据 , 没有会抛出异常
}

for (MetadataReportConfig metadataReportConfig : metadataReportConfigs) {
// <dubbo:metadata-report address="zookeeper://127.0.0.1:2181" id="metadata-center-zookeeper-2181" />
ConfigValidationUtils.validateMetadataConfig(metadataReportConfig);
if (!metadataReportConfig.isValid()) {
continue;
}

// 通过 instance 进行实例化的处理
MetadataReportInstance.init(metadataReportConfig);
}
}

3.3.6 initMetadataService 简述

1
2
3
4
5
java复制代码    private void initMetadataService() {
// startMetadataCenter();
this.metadataService = getDefaultExtension();
this.metadataServiceExporter = new ConfigurableMetadataServiceExporter(metadataService);
}

3.4 exportServices

详见服务注册体系

3.5 referServices

详见服务发现体系

3.6 onStart 调用监听器扩展

1
2
3
4
java复制代码private void onStart() {
ExtensionLoader<DubboBootstrapStartStopListener> exts = getExtensionLoader(DubboBootstrapStartStopListener.class);
exts.getSupportedExtensionInstances().forEach(ext -> ext.onStart(this));
}

总结

这一篇主要是为了总结, 以及对全部的概念有个初步的理解 , 后面会逐步完善整个流程中的细节点 , 以及对其代码的学习和使用

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Docker 快速部署本地开发环境常用数据库 前言 一 My

发表于 2021-10-01

小知识,大挑战!本文正在参与「程序员必备小知识」创作活动。

本文已参与 「掘力星计划」 ,赢取创作大礼包,挑战创作激励金。

前言

对于后端开发人员,经常需要使用数据库,在本地安装数据库麻烦且易出错,利用docker能非常快速的拉启所需数据库环境,不用的时候可以删掉,如果需要本地存储数据可以使用单独数据目录挂在到容器内,本文简单列举几类常用数据库。

一 MySQL

1
2
3
4
5
6
7
8
shell复制代码# 拉取镜像
docker pull mysql:8.0.19

# 启动server
docker run --name mysql01 -p 13306:3306 -e MYSQL_ROOT_PASSWORD=mysqladmin -d mysql:8.0.19

# 启动客户端,输入密码:mysqladmin
docker run -it --network host --rm mysql mysql -h127.0.0.1 -P13306 --default-character-set=utf8mb4 -uroot -p

二 Redis

1
2
3
4
5
6
7
shell复制代码# 拉取redis
docker pull redis
# 启动redis
docker run -itd --name redis01 -p 6379:6379 --requirepass "redisadmin" redis

# 使用客户端链接redis
docker exec -it redis01 /bin/bash

三 Etcd

1
2
3
4
5
6
7
shell复制代码# 拉去镜像
docker pull appcelerator/etcd:latest
# 启动
docker run --name etcd01 -d -p 2379:2379 -p 2380:2380 appcelerator/etcd --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://0.0.0.0:2379

# 客户端链接
docker exec -it etcd01 /bin/bash

四 Elasticsearch

1
2
3
4
5
6
7
8
9
10
11
shell复制代码# 拉取镜像
docker pull elasticsearch:latest
# 启动
docker run --name es01 -d -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" elasticsearch:latest

# 使用head客户端链接
docker pull mobz/elasticsearch-head:5
# 启动header 容器
docker run -d --name my-es_admin -p 9100:9100 mobz/elasticsearch-head:5

# curl测试访问

第一次打开浏览器header访问,连接的服务地址是localhost:9200,修改为docker所在的ip。此时出现连接失败,需要修改镜像的elasticsearch.yml文件,添加

1
2
3
4
5
6
yaml复制代码http.cors.enabled: true
http.cors.allow-origin: "*"

# 重启es
docker restart es01
docker restart my-es-head

五 MongoDB

1
2
3
4
5
6
7
8
shell复制代码#拉取镜像
docker pull mongo:lastest

# 启动
docker run --name mongodb01 -p 27017:27017 -d mongo:latest

# 客户端链接以admin进入容器
docker exec -it mongodb01 mongo admin

六 postgre

1
2
3
4
5
6
7
8
9
shell复制代码# 下载
docker pull postgres:12

# 启动
docker run --name pg01 -e POSTGRES_PASSWORD=pgadmin -p 54320:5432 -d postgres:12


# 客户端链接
docker exec -it pg01 /bin/bash

其他

本文通过利用Docker容器化封装的能力,将含有镜像直接从仓库拉取下来后,通过命令行运行,并将指定端口映射到本地。然后本地开发的时候,并不需要去关注数据库的配置和安装了,简单来说,就是查询镜像、拉取镜像、运行镜像。简单的三部操作就可以拥有一个配置好的需求数据库环境。

其他

「欢迎在评论区讨论,掘金官方将在掘力星计划活动结束后,在评论区抽送100份掘金周边,抽奖详情见活动文章」。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

以 Actix 为例,探索 Rust 拓展特征在工程中如何解

发表于 2021-10-01

以 Actix 为例,探索 Rust 拓展特征在工程中如何解耦

开篇

Actix 是 Actor 模型 的 Rust 语言实现

Actor 模型是并发计算的数学模型

关于 Actor 模型可简单参考:The actor model in 10 minutes

如果你看过 Actix 源码,你会发现在 actix::handler 中有这样一段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
rust复制代码// ...
impl<A, M, I, E> MessageResponse<A, M> for Result<I, E>
where
A: Actor,
M: Message<Result = Self>,
I: 'static,
E: 'static,
{
fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<Self>>) {
tx.send(self)
}
}
// ...

实现很简单,我们看看发生了什么。

tx 的类型是 Option<OneshotSender<Self>>。OneshotSender 是 tokio::sync::Sender 的别名。

而 .send(self) 方法定义于 actix::handler 即同一个模块下。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
rust复制代码// Helper trait for send one shot message from Option<Sender> type.
// None and error are ignored.
trait OneshotSend<M> {
fn send(self, msg: M);
}

impl<M> OneshotSend<M> for Option<OneshotSender<M>> {
fn send(self, msg: M) {
if let Some(tx) = self {
let _ = tx.send(msg);
}
}
}

其中泛型M指的是 tokio::sync::Sender<T> 。

为什么要多定义这个方法呢?

查看源码之后可以知道 MessageResponse 一共被实现了28次。也就是说上面这段代码,把所有 handle 函数中 Option 的判断统一做了处理,降低了代码耦合。

为什么不用其他方式去实现?

下面我们通过两个尝试去探讨一下,为什么采用这种方式实现更好。

尝试1: helper 函数

假设使用 helper 函数,那么函数大概如下:

1
2
3
4
5
rust复制代码fn send<M>(tx: Option<OneshotSender<M>>, msg: M) {
if let Some(tx) = tx {
let _ = tx.send(msg);
}
}

似乎代码更少更简单了?

使用的时候只需要 send(tx, msg) 就行了。

但是这么做有以下缺点:

  • 代码分散,如果不看函数定义,根本不会知道能用这个函数,在团队协作中容易出现许多重复代码。并且因此为将来的重构埋坑。
  • 缺少语义化的函数表达,这对于团队协作来说也是不利的。

尝试2: macro_rule 声明宏

假设用声明宏来实现,效果将是惨烈的:

1
2
3
4
5
6
7
rust复制代码macro_rules! send {
($tx:expr, $msg:expr) => {
if let Some(tx) = $tx {
let _ = $tx.send($msg);
}
};
}

虽然这里编译期没有阻止你,但任何人都不能猜到 $tx 和 $msg 是什么类型,除非是你自己写的。

况且,用 macro_rule 来实现这个有种高射炮打蚊子的感觉 :)

为什么用 helper trait 更好?

trait 是 Rust 的规范,实现的功能更加强大,它可以利用泛型为多个类型复用,并且可以参与 trait bound 作为类型约束。

不过 Rust 为了防止依赖地狱,在2015年引入了孤儿原则 “Orphan Rule”,使用时还是有一些限制,简而言之 trait 和 struct/enum 必须有一个是来自于自身的 crate。

关于孤儿原则,可参考:Little Orphan Impls

实践 - 开胃菜

众所周知,内置类型也是来自外部,想要为原始类型拓展方法可以在本地编写 trait ,举个栗子:

1
2
3
rust复制代码trait ColorExtend {
fn is_color(&self) -> bool;
}

我们编写了一个颜色拓展方法,用于判断该类型是否可以表达为颜色。

我们为 &str 类型实现一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
rust复制代码// 让 matching 可以匹配 char
#![feature(exclusive_range_pattern)]
impl ColorExtend for &str {
fn is_color(&self) -> bool {
if self.len() == 4 || self.len() == 7 {
for elem in self.char_indices().next() {
match elem {
(0, '#') => (),
(0, _) => return false,
(_, '0'..'9') => (),
(_, 'a'..'z') => (),
(_, 'A'..'Z') => (),
_ => return false,
}
}
true
} else {
false
}
}
}

在这之后,我们就可以这样来使用了: "#0000FF".is_color()

同样的,这个方法可以给其他类型实现,比如 String , Vec<u8> 等,甚至可以为 Option<&str> 实现。所有的泛型类型都属于一个独立的类型,这是由于 Rust 的零成本抽象,在编译器就会对泛型展开,生成独立的类型。基于此我们开始对 Actix 中的一些类型做拓展吧。

编写 Actor

先定义一个用于计数的 actor 作为例子,并为其实现一些基本特征:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
rust复制代码use actix::{Actor, Context};

pub struct MyActor {
pub counter: u16,
}

impl Default for MyActor {
fn default() -> Self {
Self { counter: Default::default() }
}
}

impl Actor for MyActor {
type Context = Context<Self>;
}

获取计数 GetCounter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
rust复制代码use actix::{Handler, Message};

/// counter add message
#[derive(Debug, Message)]
#[rtype("u16")]
pub struct GetCounter;

impl Handler<GetCounter> for MyActor {
type Result = u16;

fn handle(&mut self, _: GetCounter, _: &mut Self::Context) -> Self::Result {
self.counter
}
}

计数增加 CounterAdd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
rust复制代码/// counter add message
#[derive(Debug, Message)]
#[rtype("u16")]
pub struct CounterAdd(pub u16);

impl Handler<CounterAdd> for MyActor {
type Result = u16;

fn handle(&mut self, msg: CounterAdd, _: &mut Self::Context) -> Self::Result {
println!("add {}", msg.0);
self.counter += msg.0;
self.counter
}
}

n 秒内计数变化 GetDelta

此处由于需要异步,所以返回了 ResponseActFuture 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
rust复制代码use std::time::Duration;
/// get counter's value change during the [`Duration`]
#[derive(Debug, Message)]
#[rtype("u16")]
pub struct GetDelta(pub Duration);

impl Handler<GetDelta> for MyActor {
type Result = ResponseActFuture<Self, u16>;

fn handle(&mut self, msg: GetDelta, _: &mut Self::Context) -> Self::Result {
let init_value = self.counter; // 初始值
Box::pin(
async move {
actix::clock::sleep(msg.0).await;
}
.into_actor(self)
.map(move |_, actor, _| {
// 等待 future 结束后获取最新的 counter,与初始值相减即使变化值
actor.counter - init_value
})
)
}
}

actix::clock::sleep 并不阻塞,这是由于 actix 使用了基于 tokio 的运行时

这里的 into_actor 方法是 actix::fut::future::WrapFuture 定义的本文所提的 helper trait ,在一些库中这个思想很流行。

WrapFuture 定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
rust复制代码pub trait WrapFuture<A>
where
A: Actor,
{
/// The future that this type can be converted into.
type Future: ActorFuture<A>;

/// Convert normal future to a ActorFuture
fn into_actor(self, a: &A) -> Self::Future;
}
impl<F: Future, A: Actor> WrapFuture<A> for F {
type Future = FutureWrap<F, A>;

fn into_actor(self, _: &A) -> Self::Future {
wrap_future(self)
}
}

这里为 Future 增加了一个 into_actor 方法,返回了一个 FutureWrap ,而 FutureWrap 的定义恰好用了上回提到的 pin_project ,有机会再细讲。

参考译文链接:为什么 Rust 需要 Pin, Unpin ?

模拟需求

假设这时候产品经理提出了这样一个需求:在 n 秒内,统计出counter值的变化,然后再在 n 秒后额外添加同等的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
rust复制代码/// an odd mission required by the lovely PM
#[derive(Debug, Message)]
#[rtype("()")]
pub struct DoubleAfterDelta {
pub secs: u64
}

impl Handler<DoubleAfterDelta> for MyActor {
type Result = ResponseActFuture<Self, ()>;

fn handle(&mut self, msg: DoubleAfterDelta, ctx: &mut Self::Context) -> Self::Result {
Box::pin({
let addr = ctx.address();
addr.send(GetDelta(
Duration::from_secs(msg.secs)
))
.into_actor(self)
.map(move |ret, actor, ctx| {
// 手动添加的函数
ret.handle_mailbox(|delta| {
// 这也是手动添加的函数
ctx.add_later(delta, msg.secs);
});
})
})
}
}

对于 map 的参数函数,三个参数的类型分别为:

  1. Result<T, MailboxError>
  2. &mut MyActor
  3. &mut Context<MyActor>

假设我们暂时用朴素的方式写,大概会是这样:

1
2
3
4
rust复制代码match ret {
Ok(data) => ctx.notify_later(CounterAdd(data), Duration::from_secs(msg.secs)),
Err(e) => eprintln!("common handle MailboxError: {}", e),
}

Emmm… 貌似也还好。

不过当 Err 分支需要处理一些特殊情况时(比如重置 Actor ),代码量比较多的情况下,如此往复也是不个好办法,我们封装一下 Result<T, MailboxError> 和 &mut Context<MyActor>,让这部分代码抽离出来。

封装 Result<T, MailboxError>

首先,定义一个 trait :

1
2
3
4
5
rust复制代码pub trait ActixMailboxSimplifyExtend<T> {
fn handle_mailbox<F>(self, handle_fn: F)
where
F: FnOnce(T) -> ();
}

这个方法接收一个闭包函数 handler ,在 handler 中只处理正常返回的情况,而在 handle_mailbox 中统一处理 MailboxError ,实现如下:

1
2
3
4
5
6
7
8
9
10
rust复制代码impl<T> ActixMailboxSimplifyExtend<T> for Result<T, MailboxError> {
fn handle_mailbox<F>(self, handle_fn: F)
where
F: FnOnce(T) -> () {
match self {
Ok(data) => handle_fn(data),
Err(e) => eprintln!("common handle MailboxError: {}", e),
}
}
}

封装 Context<MyActor>

1
2
3
4
5
6
7
8
9
rust复制代码pub trait ActixContextExtend {
fn add_later(&mut self, add_num: u16, secs: u64) -> ();
}
impl ActixContextExtend for Context<MyActor> {
fn add_later(&mut self, add_num: u16, secs: u64) -> () {
println!("counter will add {}, after {} second(s)", add_num, secs);
self.notify_later(CounterAdd(add_num), Duration::from_secs(secs));
}
}

add_later 函数封装了 notify_later ,实现了在 secs 秒后自动发送 CounterAdd 消息。

封装了这两个方法之后,在遇到类似的需求时就 Don’t Repeat Yourself 了。

缺陷

当然这种方式也是有缺陷的,在使用时需要手动引入声明。编译器不会寻找其的所有实现,因为就会产生依赖混淆问题,如果遇到两个重复的方法,编译器会报如下错误:

1
shell复制代码error[E0034]: multiple applicable items in scope

所以尽量命名要避免与现有一些函数名重复,否则重构起来特别容易陷入混乱。

主函数

铺垫了这么多,看看实际结果吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
rust复制代码fn main() {
let run_ret = actix::run(async move {
let actor = MyActor::default();
let addr = actor.start();
println!("=[case 1]=========================");
let current_value = addr.send(GetCounter).await.unwrap();
println!("init value is: {}", current_value);
let fut = addr.send(DoubleAfterDelta {
secs: 1,
});

// add during DoubleAfterDelta's Handler waiting
sleep(Duration::from_millis(200)).await; // actix::clock::sleep
addr.do_send(CounterAdd(3));

sleep(Duration::from_millis(200)).await;
addr.do_send(CounterAdd(5));

let _ = fut.await; // wait a seconds.

let current_value = addr.send(GetCounter).await.unwrap();
println!("value is: {}", current_value);
sleep(Duration::from_secs(2)).await;

let current_value = addr.send(GetCounter).await.unwrap();
println!("value is: {}", current_value);

println!("=[case 2]=========================");
addr.do_send(ShutDown); // 关闭的 actor 的消息
let ret = addr.send(GetCounter).await;
// use the added method in ActixMailboxSimplifyExtend
ret.handle_mailbox(|_| {
unreachable!("unpossible to reach here due to MailboxError must be encountered.");
});
});
println!("actix-run: {:?}", run_ret);
}

这里测试了函数 add_later 的可用性,和 handle_mailbox 处理错误时是否正确。

运行结果:

1
2
3
4
5
6
7
8
9
10
11
shell复制代码=[case 1]=========================
init value is: 0
add 3
add 5
counter will add 8, after 1 second(s)
value is: 8
add 8
value is: 16
=[case 2]=========================
common handle MailboxError: Mailbox has closed
actix-run: Ok(())

总结

Rust 开创者根据多年的语言经历总结了前人开发的经验,推崇组合大于继承,可谓是集大成者。多用组合的方式去拓展你的代码,更你的代码更加 Rusty!

本文代码已推送至 github,如果对你有所启发欢迎 star。

github.com/oloshe/rust…

转载请声明出处

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

数据同步软件 Shareplex 异常重建详细步骤(Orac

发表于 2021-10-01

本文已参与「掘力星计划」,赢取创作大礼包,挑战创作激励金。

前言

最近有客户的 shareplex 因为一些稀奇古怪的原因又挂了,由于邮件告警问题,没有及时通知到,并且归档已经被删除,备份也追溯不回丢失的归档日志。

经过与客户确认repo库没有历史数据需保留,直接重建修复!

准备

确认以下条件均已具备:

  • 有可用备份;
  • 磁盘空间足够;
  • 由于使用 networker 备份,需要提前安装备份恢复所需客户端;

本次重建目标端使用 rman 进行全库恢复。

重建过程

确认数据库大小

1
2
sql复制代码select sum(bytes/1024/1024/1024) from dba_segments;
select sum(bytes/1024/1024/1024) from dba_data_files;

确认目标端磁盘空间足够!

确认备份可用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
sql复制代码--查询备份
set line222
set pagesize100
col status for a10
col input_type for a20
col INPUT_BYTES_DISPLAY for a10
col OUTPUT_BYTES_DISPLAY for a10
col TIME_TAKEN_DISPLAY for a10

select input_type,
status,
to_char(start_time,
'yyyy-mm-dd hh24:mi:ss'),
to_char(end_time,
'yyyy-mm-dd hh24:mi:ss'),
input_bytes_display,
output_bytes_display,
time_taken_display,
COMPRESSION_RATIO
from v$rman_backup_job_details
where start_time > date '2021-08-10'
order by 3 desc;

1
bash复制代码list backup of controlfile;

ctrl_mesdbtj_65271_1_1081487814

确认最新的有效备份,记录控制文件。

安装 networker 客户端

安装包上传目标端安装

1
2
bash复制代码lgtoclnt-9.2.1.4-1.x86_64.rpm
lgtonmda-9.2.1.4-1.x86_64.rpm

建议使用 yum install 进行安装,防止依赖包缺失,前提是 yum 源已配置。

按顺序安装:

1
2
3
4
bash复制代码yum install -y lgtoclnt-9.2.1.4-1.x86_64.rpm
systemctl start networker
systemctl start networker
yum install -y lgtonmda-9.2.1.4-1.x86_64.rpm

lgtoclnt 安装完成后,确保服务正常运行,再安装 lgtonmda。

配置解析

必须将目标端和源端,networker 服务端的ip和主机名解析全部写入 /etc/hosts 文件。

目标端链接 NMO 库文件

1
2
bash复制代码cd $ORACLE_HOME/lib
ln –s /usr/lib/libnwora.so libobk.so

至此,networker 目标端已安装完成。

清理 shareplex 旧环境

源端和目标端关闭 shareplex

1
2
bash复制代码sp_ctrl
shutdown

源端和目标端执行清理脚本

1
2
bash复制代码/quest/bin/ora_cleansp splex2300/splex2300
/data/quest/bin/ora_cleansp splex2300/splex2300

源端和目标端重新开启 shareplex 环境

1
bash复制代码sp_cop -u2300&

本文 shareplex 使用端口为 2300,读者需根据实际情况更换,如 2400、2500 等。

目标端停止 post 进程

1
bash复制代码stop post

最后全部恢复完毕之后再开启。

开始 rman 恢复

确保目标端数据库已开启到 nomount 状态。

恢复控制文件

连接 rman 客户端后执行恢复控制文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
bash复制代码run {
allocate channel c1 type 'SBT_TAPE';
send 'NSR_ENV=(NSR_SERVER=这里填写 networker 服务端主机名,NSR_CLIENT=这里填写源端备份主机名)';
restore controlfile from '这里填写最新备份控制文件名称';
release channel c1;
}
``

恢复完之后开启目标端数据库到 mount 状态。

### 恢复数据

由于数据库大概有 1-2 T 的大小,恢复时间很长,因此建议将恢复脚本放在后台进行执行,脚本如下:

```bash
#!/bin/bash
source ~/.bash_profile
backtime=`date +"20%y%m%d%H%M%S"`
rman target / log=/home/oracle/rman_repo_$backtime.log<<EOF
run {
allocate channel c1 type 'SBT_TAPE';
allocate channel c2 type 'SBT_TAPE';
allocate channel c3 type 'SBT_TAPE';
allocate channel c4 type 'SBT_TAPE';
send 'NSR_ENV=(NSR_SERVER=这里填写 networker 服务端主机名,NSR_CLIENT=这里填写源端备份主机名)';
set newname for database to '/data/mesdb/%b';
restore database;
switch datafile all;
recover database;
release channel c1;
release channel c2;
release channel c3;
release channel c4;
}
exit;
EOF

执行 sh rman_sp.sh & 进行后台恢复。

📢 注意: 通道根据实际情况进行修改,由于源端是 rac 环境,目标端是单机环境,因此数据文件路径需要 set newname 进行转换,最后执行初次 recover database。

备份恢复完之后,由于缺少归档,所以需要追归档。

追归档日志

由于备份时间与当前时间存在较大时差,在获取当前源端的 scn 进行 recover 时,必然需要追大量的归档日志文件,为了减少 shareplex 积压,因此提前追归档日志到当前时间。

源端备份归档日志到当前最新:

1
2
bash复制代码backup archivelog from sequence 71457 until sequence 71986 thread 1;
backup archivelog from sequence 65247 until sequence 65780 thread 2;

备份成功后拷贝至目标端,注册目录后执行 recover:

1
2
bash复制代码catalog start with '/data/archivelog/';
recover database;

追完归档之后,激活源端 shareplex 的 config 文件。

激活源端 config 配置文件

1
2
3
bash复制代码list config
activate config ORA_config_20210825 nolock
show config

激活成功后,检查源端数据库中是否存在 长事务。

1
sql复制代码select start_time from gv$transaction;

如果有长事务,可以确实是否可以杀掉,杀掉后才能继续操作。

根据以下 SQL 可以获取到事务的详细情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
sql复制代码set linesize 260 pagesize 10000
column sess format a21 heading "SESSION"
column program format a18
column clnt_pid format a8
column machine format a25
column username format a12
column osuser format a13
column event format a32
column waitsec format 999999
column start_time format a18
column sql_id format a15
column clnt_user format a10
column svr_ospid format a10

ALTER SESSION SET NLS_DATE_FORMAT = 'yyyy/mm/dd hh24:mi:ss';

set feedback off
set echo off

set head off
select chr(9) from dual;
select 'Waiting Transactions'||chr(10)||'====================' from dual;
set head on
select /*+ rule */
lpad(nvl(s.username,' '),8)||'('||s.sid||','||s.serial#||')' as sess,
p.spid as svr_ospid,
nvl(osuser,' ') as clnt_user,
s.process as clnt_pid,
substr((case instr(s.PROGRAM, '@')
when 0 then
s.program
else
case instr(s.PROGRAM, '(TNS V1-V3)')
when 0 then
substr(s.program, 1, instr(s.PROGRAM, '@') - 1) || substr(s.program, instr(s.PROGRAM, '(') - 1)
else
substr(s.program, 1, instr(s.PROGRAM, '@') - 1)
end
end),
1, 18) as program,
(case
when length(s.MACHINE) > 8 then substr(s.machine,1,8)||'~'
else s.machine
end
) || '('||nvl(s.client_info, 'Unknown IP')||')' as machine, s.sql_id,
substr(s.event, 1, 32) as event,
s.seconds_in_wait as waitsec
from v$transaction t,v$session s,v$process p
where t.ses_addr=s.saddr and s.paddr=p.addr
order by s.seconds_in_wait, s.program, s.machine;

可以通过 SESSION 字段来杀掉事务:

1
sql复制代码alter system kill session '1841,44697';

如果杀不掉,则使用 svr_ospid 系统层进行 kill:

1
bash复制代码kill -9 27353

确认没有长事务后,继续下一步操作。

源端获取 scn 号

1
2
perl复制代码col current_scn format 9999999999999999
select current_scn from v$database;

记录获取到的 SCN 号:72863106548。

目标端 rman 恢复至指定 scn

1
bash复制代码recover database until scn 72863106548;

因为源端一直在运行,激活期间到SCN号必然会有新的归档产生,提示缺少归档日志,因此需要去源端拷贝缺少的归档日志,再次进行 recover。

目标端开启 resetlogs 状态

1
sql复制代码alter database open resetlogs;

确认 recover 完成恢复之后,基本恢复结束,可以开启目标端到 resetlogs 状态。

rman 恢复后收尾

目标端 reconcile 至指定SCN号

1
2
3
4
5
bash复制代码reconcile queue q1 for o.mesdb2-o.mesdb scn 72863106548
reconcile queue q2 for o.mesdb2-o.mesdb scn 72863106548
reconcile queue q3 for o.mesdb2-o.mesdb scn 72863106548
reconcile queue q4 for o.mesdb2-o.mesdb scn 72863106548
reconcile queue q5 for o.mesdb2-o.mesdb scn 72863106548

非必须操作,如果出现 hang 住的情况,需要在源端 shareplex 执行 flush 操作疏通通道:

1
2
3
4
5
bash复制代码flush o.mesdb2 to mes-repo queue q1
flush o.mesdb2 to mes-repo queue q2
flush o.mesdb2 to mes-repo queue q3
flush o.mesdb2 to mes-repo queue q4
flush o.mesdb2 to mes-repo queue q5

📢 注意: 源端执行过 flush 的通道,目标端 start post 之后需要再次执行 start post queue 指定队列名 ,否则无法开启 post。

目标端运行 cleanup.sql 来清空内部表信息

1
2
3
bash复制代码cd /data/quest/bin/
sqlplus splex用户账号/splex账户密码
@cleanup.sql

该步骤用于清理源端 splex 用户相关数据。

目标端禁用所有 trigger

1
2
3
sql复制代码SELECT 'alter trigger ' || owner || '.' || trigger_name || ' disable;'
from dba_triggers
where owner in (需要同步的用户);

将输出结果复制执行即可!

目标端禁用所有约束

1
2
sql复制代码SELECT 'alter table '||owner||'.'||table_name||' disable constraint '||constraint_name||';' from dba_constraints
where constraint_type='R' and owner in (需要同步的用户);

将输出结果复制执行即可!

禁用job

1
sql复制代码alter system set job_queue_processes=0;

确保 job 任务不会运行!

目标端开启 post 进程

1
2
bash复制代码sp_ctrl
start post

确保所有队列均已处于正常 running 状态。

由于目标端执行 reconcile 时 2,4 队列 hang 住,因此需要单独 start post queue 指定队列名 来开启:

1
2
bash复制代码start post queue q2
start post queue q4

状态已全部正常 running。

重建后检查

1
2
3
bash复制代码qstatus
show post queue q2
show log reverse

通过命令查看同步是否正常,以及同步速度是否正常。再次确认邮件告警是否恢复正常。

写在最后

shareplex 重建恢复的流程还算复杂,因此需要做好必备的告警措施,防止遇到停止导致问题发生,无法及时补救的情况。

分享两个告警脚本:

1、监控 shareplex 进程是否正常运行:

1
2
3
4
5
6
7
8
9
10
11
bash复制代码#!/bin/bash
if [ -f ~/.bash_profile ];
then
. ~/.bash_profile
fi
count=`ps -ef|grep sp_cop|wc -l`
if [ "${count}" -ne 2 ];
then
ps -ef|grep sp_cop > /tmp/sp_cop.log
mail -s "Shareplex sp_cop process shutdown" 邮箱地址 < /tmp/sp_cop.log
fi

2、监控 shareplex 队列是否存在异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bash复制代码#!/bin/bash
if [ -f ~/.bash_profile ];
then
. ~/.bash_profile
fi
rm -rf /data/quest/error.log
echo "show "|sp_ctrl|grep "Idle" >> /data/quest/error.log
echo "show "|sp_ctrl|grep "Stopped" >> /data/quest/error.log

# -s 文件大小非0时为真
if [ ! -s /data/quest/error.log ]
then
rm -rf /data/quest/error.log #文件大小为0 删除
fi

if [ -s /data/quest/error.log ]
then
mail -s "Shareplex error" 邮箱地址 < /data/quest/error.log
fi

📢 如有问题,请及时指正,谢谢!


本次分享到此结束啦~

如果觉得文章对你有帮助,点赞、收藏、关注、评论,一键四连支持,你的支持就是我创作最大的动力。

❤️ 技术交流可以 关注公众号:Lucifer三思而后行 ❤️

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…510511512…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%