开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

分布式缓存系统分析!基于Lease机制实现分布式系统缓存详解

发表于 2021-08-30

这是我参与8月更文挑战的第30天,活动详情查看:8月更文挑战

基于Lease协议的分布式Cache系统

  • 问题背景:
    • 在一个分布式系统中,有一个中心服务器节点,中心服务器存储,维护着一些数据,这些数据是系统的元数据
    • 系统中其余的节点通过访问中心服务器节点读取,修改上面的元数据
    • 由于系统中的各种操作都依赖于元数据,如果每次读取元数据的操作都访问中心服务器节点,那么中心服务器节点的性能就会成为系统的瓶颈
      • 所以,需要设计一种元数据cache, 在各个节点上cache元数据信息,从而减少对中心服务器节点的访问,提高性能
      • 系统的正确运行依赖于元数据的正确,这就要求各个节点上cache的数据始终与中心服务器上的数据一致 ,cache中不能是旧的脏数据
      • 设计的cache系统要能最大可能的处理节点宕机,网络中断等异常,最大程度提高系统的可用性
  • 为了解决以上问题,利用Lease机制实现基于Lease协议的分布式Cache系统
  • 基于Lease协议的分布式Cache系统的基本原理:
    • 中心服务器在向各节点发送数据时同时向节点发送一个lease
    • 每个lease具有一个有效期,通常是一个明确的时间节点,一旦真实时间超过这个时间点,则lease过期失效
    • lease的有效期与节点收到lease的时间无关,节点可能收到lease时该lease就已经过期失效
    • 假设中心服务器与各节点的时钟是同步的,中心服务器发出的lease的含义为:
      • 在lease的有效期内,中心服务器保证不会修改对应数据的值
      • 这样,节点收到数据和lease后,将数据加入本地cache. 一旦对应的lease超时,节点将对应的本地cache删除
      • 中心服务器在修改数据时,首先阻塞所有新的读请求,并等待之前为该数据发出的所有lease超时过期,然后修改数据的值
  • 基于lease的cache中,客户端节点读取元数据的流程:
    • 判断元数据是否已经处于本地cache且lease处于有效期内:
      • 是: 直接返回cache中的元数据
      • 否: 向中心服务器节点请求读取元数据信息:
        • 服务器接收到读取请求后,返回元数据及一个对应的lease
        • 客户端是否成功接收到服务器返回的数据:
          • 失败或者超时: 退出流程,读取失败,可以重试
          • 成功: 将元数据与该元数据的lease记录到内存中,返回元数据
  • 基于lease的cache中,客户端节点修改元数据的流程:
    • 节点向服务器发起修改元数据请求
    • 服务器接收到修改请求后,阻塞所有新的读数据的请求,即接收读请求,但是不返回数据
    • 服务器等待所有与该元数据相关的lease超时
    • 服务器修改元数据并向客户端节点返回修改成功
  • 基于lease的cache机制可以保证各个节点上的cache数据与中心服务器上的数据始终一致:
    • 中心服务器在发送数据的同时设置了节点对应的lease
    • 在lease有效期内,服务器不会修改数据,从而客户端节点可以在lease有效期内cache数据
  • 基于lease机制的cache能够容错的关键:
    • 服务器一旦发出数据以及cache, 无论客户端是否收到,无论后续客户端是否宕机,无论后续网络是否正常,服务器只要等待lease超时,就可以保证对应的客户端不会再继续cache数据,才可以进行修改数据而不会破坏cache的一致性
  • 基于lease机制的cache的优化改进:
    • 服务器在修改元数据时首先要阻塞所有新的请求,保证没有读服务
    • 这是为了防止发出新的lease从而引起不断有新客户端节点持有lease并缓存着数据,形成“活锁”
    • 优化方法:
      • 服务器在进入修改数据流程后,一旦收到读请求则只返回数据但不设置lease
      • 这样在修改流程执行的过程中,客户端可以读到元数据,只是不能缓存元数据
      • 进一步优化:
        • 当进入修改流程,服务器设置的lease的有效期期限选择为已发出的lease的最大有效期
        • 这样,客户端可以继续在服务器进入修改流程后继续缓存元数据,但是服务器等待所有lease过期的时间不会因为设置新的lease而不断延长
  • Cache机制与多副本机制的异同:
    • 相同点: 都是将一份数据保存在多个节点上
    • 不同点:
      • cache机制简单,可以随时删除丢弃,并且命中已删除cache的后果仅仅是需要访问数据源读取数据
      • 副本不能随意丢弃,每失去一个副本,服务的质量都在下降,一旦副本下降到一定程度,则服务往往不再可用

Lease机制的分析

  • Lease的定义:
    • Lease是发送方设置的在某一有效期内的承诺
    • 发送方一旦发出lease, 则无论接收方是否收到,也无论后续接收方处于何种状态,只要lease不过期,发送方一定严守承诺
    • 接收方在lease的有效期内可以使用发送方的承诺,一旦lease过期,接收方一定不能继续使用发送方的承诺
  • Lease机制具有很高的容错能力:
    • 通过引入有效期 ,Lease机制能否非常好的容错网络异常
    • Lease的发送过程只依赖于网络可以单向通信,即使接收方无法向发送方发送消息,也不会影响Lease的发送
    • 由于Lease的有效期是一个确定的时间点 ,Lease的语义和发送Lease的具体时间无关,所以同一个Lease可以被发送方不断重复向接收方发送
    • 即使发送方偶尔发送Lease失败,发送方也可以简单的通过重发的方法解决
    • 一旦Lease被接收方成功接收,后续Lease机制不再依赖于网络通信,即使网络完全中断 ,Lease机制也不会受到影响
  • Lease机制能较好地容错节点宕机:
    • 如果发送方宕机,则宕机的发送方通常无法改变之间的承诺,不会影响Lease的正确性
      • 在发送方恢复后,如果发送方恢复了之前的Lease信息,发送方可以继续遵守Lease的承诺
      • 如果发送方无法恢复Lease信息,则只需等待一个最大的Lease超时时间就可以使得所有的Lease都失效,从而不破坏Lease机制
    • 如果接收方宕机,发送者不需要做更多的容错处理,,只需要等待Lease过期失效,就可以收回承诺,在工程实践中就是收回之前赋予的权限,身份等
  • Lease机制不依赖于存储:
    • 发送方可以持久化发送过的Lease信息,从而在宕机恢复后可以使得在有效期的Lease继续有效
    • 这只是一个Lease机制的优化,即使发送方没有持久化Lease信息,也可以通过等待一个最大的Lease时间的方式使得之前所有发送的Lease失效,从而保证机制持续有效
  • Lease机制依赖于有效期:
    • 要求发送方和接收方的时钟是同步的
      • 如果发送方的时钟比接收方慢:
        • 当接收方认为Lease已经过期的时候,发送方依旧认为Lease有效
        • 接收方可以用在Lease到期前申请新的Lease的方式解决这个问题
      • 如果发送方的时钟比接收方快:
        • 当发送方认为Lease已经过期的时候,接收方依旧认为Lease有效
        • 发送方可能将Lease发送给其余节点,造成承诺失效,影响系统的正确性
        • 实践中的通常做法是将发送方的有效期设置设置得比接收方的略大,只需大过时钟误差就可以避免对Lease的有效性的影响

基于Lease机制确定节点状态

  • 分布式协议依赖于对节点状态认知的全局一致性:
    • 一旦节点Q认为某个节点A异常,则节点A也必须认为自己异常,从而节点A停止作为primary节点,避免 “双主” 的问题出现
  • 解决 “双主” 的问题有两种思路:
    • 放弃使用中心化设计,改用去中心化设计: 设计的分布式协议可以容忍 “双主” 错误,不依赖于对节点状态的全局一致性认识或者全局一致性是全体协商后的结果
    • 利用Lease机制
  • 利用Lease机制确定节点状态:
    • 由中心节点向其余节点发送Lease, 如果某个节点持有有效的Lease, 则认为该节点正常可以提供服务
    • 示例:
      • 节点A,B,C周期性的发送heart beat报告自身状态
      • 节点Q收到heart beat后发送一个Lease, 表示节点Q确认了节点A,B,C的状态,并允许节点在Lease有效期内正常工作
      • 节点Q可以给primary节点一个特殊的Lease, 表示该节点可以作为primary工作
      • 一旦节点Q希望切换新的primary, 只需要等待前一个primary的Lease过期,就可以安全的发送新的Lease给新的primary节点,而不会出现 “双主” 问题
  • 在工程实践中,如果只使用一个中心节点发送Lease存在很大的风险:
    • 如果该中心节点宕机或者网络异常,则所有的节点没有Lease. 从而造成系统高度不可用
    • 实际系统中总是使用多个中心节点互为副本, 成为一个小的集群,该小集群具有高可用性,对外提供发送Lease的功能
    • chubby和zookeeper都是基于使用多个中心节点互为副本的设计

Lease的有效期选择

  • 在工程实践中,通常使用10秒级别作为Lease的有效期时长
  • 实践中可以以此作为参考并综合选择合适的时长

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

C#多线程开发-线程基础 01

发表于 2021-08-30

“这是我参与8月更文挑战的第30天,活动详情查看:8月更文挑战”

欢迎各位朋友微信搜索「Andy阿辉」关注一波!

写一些程序员的所思所想,希望对你有帮助。

公号链接:

最近由于工作的需要,一直在使用C#的多线程进行开发,其中也遇到了很多问题,但也都解决了。后来发觉自己对于线程的知识和运用不是很熟悉,所以将利用几篇文章来系统性的学习汇总下C#中的多线程开发。

线程基础

“进程是操作系统分配资源的最小单元,线程是操作系统调度的最小单元” 这句话应该学习计算机的朋友或多或少都听说过,这在操作系统这门课中是很重要的一个概念。

在操作系统中可以同时运行很多个应用程序,那么你知道计算机是如何分配和调度这些应用程序去使用CPU进行工作的吗?

这里面就牵扯到了进程、线程的概念,也就是我们接下来要学习的内容。

一个应用程序会有很多个线程,但是只能有一个进程。也就是说一个进程中可以有很多个线程。那么这是为什么呢?以前计算机只有一个计算模块,每次只能单一的执行一个计算单元,不能同时执行多个计算任务。现在随着科技的发展,有了多核CPU,可以一次性执行多个应用程序,这样就实现了多任务。操作系统为了不让一个应用程序独占CPU,导致其余程序挂起等待,不得不设计出一种将物理计算单元分割为一些虚拟的进程,并给予每个执行程序一定量的计算能力。此外,操作系统必须始终能够优先访问CPU,并能调整不同程序访问CPU的优先级(说白了就是典型的以空间换时间)。

线程正是这一概念的实现,可以认为线程是一个虚拟的进程,用于独立运行一个特定的程序。

大量使用线程会消耗大量的OS资源

那么为什么需要使用线程呢!其实就是为了在相同的时间内,让操作系统或CPU干更多的活,那么在C#中线程应该如何使用或者说在什么场景下使用呢!

在C#中关于线程的使用,大多数时候是在当程序需要处理大量繁琐、占用资源多、花费大量时间的任务时进行应用,比如访问数据库,视频显示,文件IO操作、网络传输等。

线程在应用程序中可以进行如何操作:1、创建线程;2、暂停线程;3、线程等待;4、终止线程。

1、创建线程

通过声明并实例化Thread就可以创建线程,它接收方法作为参数。使用Thread.Start()就可以开启子线程,让其去执行方法中的内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
scss复制代码        static void Main(string[] args)
{
//新创建的线程中输出
Thread oneThread = new Thread(PrintNumber);
oneThread.Start();

//主线程中输出
PrintNumber();
Console.ReadKey();
}

static void PrintNumber()
{
Console.WriteLine("开始......");
for (int i = 0; i < 10; i++)
{
Console.WriteLine(i);
}
}

主线程和子线程同时输出

可以看到当我们在子线程和主线程中同时输出PrintNumber()中的内容时,它是乱的随机交叉输出的。

2、暂停线程

暂停线程故名思意就是让线程暂停,不让其占用CPU资源,在一直等待,啥时候取消暂停就恢复运行。在C#中暂停就是让这个线程进入睡眠状态,让其休眠,不让其占用系统资源就可以了。

1
less复制代码  Thread.Sleep(TimeSpan.FromSeconds(2));    //睡眠2s

3、线程等待

线程等待就是多个线程在处理某个任务时,某个线程必须等待前一个线程处理所有数据后才可以进行执行,在这个期间,这个线程是阻塞状态的。只有前一个线程完事了,他才可以再继续执行。

1
2
3
4
5
6
7
8
9
10
11
scss复制代码        static void Main(string[] args)
{
//新创建的线程中输出
Thread oneThread = new Thread(PrintNumber);
oneThread.Start();
oneThread.Join();

//主线程中输出
PrintNumber();
Console.ReadKey();
}

也就是说上面的程序主线程必须得等oneThread线程执行完PrintNumber方法后,它才可以执行。

4、线程终止

就是线程在执行过程中,利用某些操作(Thread.Abort())可以使其线程立即退出,不进行工作了。

1
2
3
4
5
6
7
8
9
10
11
12
13
scss复制代码        static void Main(string[] args)
{
//新创建的线程中输出
Thread oneThread = new Thread(PrintNumber);
oneThread.Start();

Thread.Sleep(TimeSpan.FromSeconds(6));
oneThread.Abort();

//主线程中输出
PrintNumber();
Console.ReadKey();
}

上面的程序可以看到,当主程序再等待6s后,立即将oneThread线程终止掉。

其实Abort()方法是给线程注入了ThreadAbortException方法,导致线程被终结,这其实很危险,因为该线程可能正在处理某些重要的数据,比如接收传输数据等,这样子就传递摧毁了程序,数据也就丢失了。还有就是这个方法不能保证100%终止线程。有时候有些异常会被吃掉,我们可以利用某些关键变量在子线程中进行控制,从而取消线程的执行就可以。

在实际编码使用线程的过程中,可以通过oneThread.ThreadState来获取目前线程的状态。有时候我们也可以手动的设置线程的优先级,设置为最高的则提前执行,但是这个只是针对于单核CPU时,目前市面上基本都是多核的了,这种使用场景也就很少了。

一般我们创建的线程都是属于前台线程,通过手动设置ontThread对象的IsBackground属性为true时才会为后台线程。通常前台线程会比后台线程提前执行完。当前台线程执行完成后,程序结束并且后台线程被终结。进程会等待所有的前台线程完成后再结束工作,但是如果只剩下后台线程,进程会直接结束工作。

C#中的lock关键字

某一个资源当被多个线程同时访问时,可能这个资源的某些值对于各个线程来说会出问题。如果在某一时刻,一个线程是使其递增,一个线程是递减,会导致其值不唯一,各个线程拿到的值不对。这种情况就是所谓的竞争条件,竞争条件是多线程环境中非常常见的导致错误的原因。

1
2
3
4
5
6
7
8
9
10
11
12
csharp复制代码    class PepoleCount 
{
int count = 0;
public void AddCount()
{
++count;
}
public void DeleteCount()
{
--count;
}
}

比如是上面的程序,当两个线程同时访问这个PepoleCount类时,会导致count变量出现竞争条件。就是每个线程可能拿到的数值不是最新的。那么如何办呢,此时就需要使用到lock机制,也就是加锁。目的是为了当一个线程访问某个资源时,其余线程如果在访问时,必须等待当前访问完事后,它才可以访问。保证了数据的有效性。

lock关键字是如果锁定了一个对象,需要访问该对象的所有其他线程则会处于阻塞状态,并等待知道该对象解除锁定才可以访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
csharp复制代码    class PepoleCount 
{
private readonly object _syncRoot = new object();
int count = 0;
public void AddCount()
{
lock(_syncRoot)
{
++count;
}
}
public void DeleteCount()
{
lock(_syncRoot)
{
--count;
}
}
}

关于加锁这块还是有很多讲究的,不是说每一个方法,每一个变量都需要进行加锁,如果频繁的加锁会导致其余线程处于阻塞状态,那么也会导致应用程序出现严重的性能问题。

好了,今天关于线程的分享就先到这里。

期待下一篇文章的推送吧,希望我可以写的简单点,让大家对多线程开发有一些全新的认识。

小寄语

人生短暂,我不想去追求自己看不见的,我只想抓住我能看的见的。

原创不易,给个关注。

我是阿辉,感谢您的阅读,如果对你有帮助,麻烦点赞、转发 谢谢。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot整合ElasticSearch实战 1、

发表于 2021-08-30

这是我参与8月更文挑战的第28天,活动详情查看:8月更文挑战

| 作者:江夏

| CSDN:blog.csdn.net/qq_41153943

| 掘金:juejin.cn/user/651387…

| 知乎:www.zhihu.com/people/1024…

| GitHub:github.com/JiangXia-10…

本文大概6532字,读完共需18分钟

1、什么是ElasticSearch

在说ElasticSearch之前先来说说什么是搜索引擎。搜索引擎,即Search Engine,是一个帮助用户搜索他们需要内容的计算机程序,即把计算机中存储的信息与用户的信息需求(information need)相匹配,并把匹配的结果展示出来。简单点说,搜索引擎就是万维网环境中的为了使网民搜索信息的速度更加快捷、准确信息检索系统。常见的搜索引擎有百度、谷歌、搜狗等。

Elasticsearch 是一个分布式可扩展的实时搜索和分析引擎,一个建立在全文搜索引擎 Apache Lucene(TM) 基础上的搜索引擎,它提供了一个基于 RESTful web 接口的分布式多用户能力的全文搜索引擎,能够达到实时搜索、稳定、可靠、快速、安装使用方便等特点。它不仅包括了全文搜索功能,还可以进行以下工作:

  • 分布式实时文件存储,并将每一个字段都编入索引,使其可以被搜索。
  • 实时分析的分布式搜索引擎。
  • 可以扩展到上百台服务器,处理PB级别的结构化或非结构化数据。

Elasticsearch是用Java语言开发的,并作为Apache许可条款下的开放源码发布,是一种流行的企业级搜索引擎。

这篇文章就记录一下 SpringBoot 与 ElasticSearch 的整合。

2、ElasticSearch 安装

前面说到Elasticsearch是用Java语言开发的,所以在安装Elasticsearch之前需要先安装配置java环境。

然后登录www.elastic.co/cn/download… 选择相应的系统环境下载软件包,这里的下载应该会很慢。

图片

这里介绍Mac、Linux、Windows系统三种安装ElasticSearch的方式。2.1、Mac安装ElasticSearch

2.1、MacOS安装ElasticSearch

首先下载对应的MacOS的安装包。

homebrew安装

1
复制代码brew install elasticsearch

运行

  • 查看状态

$ brew services list

  • 启动

$ brew services start elasticsearch

  • 重启

$ brew services restart elasticsearch

  • 停止

$ brew services stop elasticsearch

浏览器输入 http://localhost:9200 查看ES是否运行

2.2、Linux安装ElasticSearch

进入到对应上传的文件夹,安装ElasticSearch

1
复制代码rpm -ivh elasticsearch-6.1.0.rpm

查找安装路径

1
复制代码rpm -ql elasticsearch

一般是装在/usr/share/elasticsearch/下。

然后设置data的目录:创建/data/es-data目录,用于elasticsearch数据的存放

1
bash复制代码mkdir -p /data/es-data

修改该目录的拥有者为elasticsearch

1
bash复制代码chown -R elasticsearch:elasticsearch /data/es-data

然后设置log的目录。

1
perl复制代码mkdir -p /log/es-log

修改该目录的拥有者为elasticsearch。

1
perl复制代码chown -R elasticsearch:elasticsearch /log/es-log

修改配置文件elasticsearch.yml。

1
bash复制代码vim /etc/elasticsearch/elasticsearch.yml

修改如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
makefile复制代码#设置节点名称
cluster.name: my ElasticSearch
#设置data存放的路径为/data/es-data
path.data: /data/es-data
#设置logs日志的路径为/log/es-log
path.logs: /log/es-log
#设置内存不使用交换分区,配置了bootstrap.memory_lock为true时反而会引发9200不会被监听,原因不明
bootstrap.memory_lock: false
#设置允许所有ip可以连接该elasticsearch
network.host: 0.0.0.0
#开启监听的端口为9200
http.port: 9200
#增加新的参数,为了让elasticsearch-head插件可以访问elasticsearchhttp.cors.enabled: true
http.cors.allow-origin: "*"

启动elasticsearch。

1
sql复制代码systemctl start elasticsearch

查看状态

1
lua复制代码systemctl status elasticsearch

设置开机启动

1
bash复制代码systemctl enable elasticsearch

启动成功之后,测试服务是否开启,或者浏览器访问。

1
arduino复制代码curl -X GET http://localhost:9200

返回如下信息,说明安装、启动成功了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
json复制代码{
  "name": "pYAFJhz",
"cluster_name": "my ElasticSearch",
"cluster_uuid": "oC28y-cNQduGItC7qq5W8w",
"version": {
"number": "6.8.2",
"build_flavor": "oss",
"build_type": "tar",
"build_hash": "b506955",
    "build_date": "2020-07-07T20:24:41.545295Z",
"build_snapshot": false,
"lucene_version": "7.7.0",
"minimum_wire_compatibility_version": "5.6.0",
"minimum_index_compatibility_version": "5.0.0"
},
"tagline": "You Know, for Search"
}

2.3、Windows安装ElasticSearch

下载安装包,解压,然后双击执行 bin/elasticsearch.bat进行安装。

安装完成后运行命令提示符

1
复制代码elasticsearch.bat -d

或者直接双击执行elasticsearch.bat

浏览器输入 http://localhost:9200 查看ES是否运行。

或者下载cURL,使用cURL的方式验证:

curl ‘http://localhost:9200/?pretty‘

返回结果同上,则安装成功。

3、SpringBoot 整合 ElasticSearch

上面的都是准备工作。安装好ElasticSearch之后,就使用SpringBoot来整合ElasticSearch 进行实战开发。SpringBoot 提供了 ElasticSearch 依赖库,只需要添加依赖包,通过 JPA 访问非常方便。

首先创建一个SpringBoot项目,我这里使用的IDEA,如果还不会使用创建springboot项目的可以参考这篇文章:SpringBoot入门:使用IDEA和Eclipse构建第一个SpringBoot项目。

创建好springboot项目之后,在pom.xml中,添加依 ElasticSearch和JPA 的依赖包,代码如下:

1
2
3
4
5
6
7
8
9
10
xml复制代码<!--jpa 支持-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!--elasticsearch-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

之后新建一个application.properties文件,然后添加ElasticSearch相关的配置:

1
2
ini复制代码spring.data.elasticsearch.cluster-name=my ElasticSearch
spring.data.elasticsearch.cluster-nodes=192.168.5.229:9200

这里我在数据库创建了一个user表,并插入数据。

建表语句:

1
2
3
4
5
6
7
8
sql复制代码CREATE TABLE `user` (
 `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
 `email` varchar(255) NOT NULL COMMENT '邮箱',
 `password` varchar(255) NOT NULL COMMENT '密码',
 `username` varchar(255) NOT NULL COMMENT '姓名',
 PRIMARY KEY (`id`),
 UNIQUE KEY `email` (`email`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

插入数据:

1
2
3
sql复制代码INSERT INTO `user` VALUES ('1', '1@qq.com', '123456', '张三');
INSERT INTO `user` VALUES ('2', '2@qq.com', '234567', '李四');
INSERT INTO `user` VALUES ('3', '3@qq.com', '345678', '王五');

图片

前面的都是一些准备的工作,后面开始正式些后台的代码。首先新建一个实体类,命名为,User,代码如下:

1
2
3
4
5
6
7
8
9
10
11
less复制代码@Data
@Accessors(chain = true)
// indexName表示索引,type表示索引类别
@Document(indexName = "user", type = "person")
public class User{
@Id
    private String id;
    private String email;
    private String password;
    private String username;
}

这里使用 JPA 作为数据持久层,接口继承自ElasticsearchRepository,同时新增两个自定义查询方法。关于JPA的使用可以参考:SpringBoot整合JPA进行数据访问 。

1
2
3
typescript复制代码public interface userRepository extends ElasticsearchRepository<Student, String> {
    List<User> findUserByName(String username);
}

创建控制层,核心逻辑代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
less复制代码@RestController
@RequestMapping("/user")
public class UserController {

@Autowired
    private UserRepository userRepository;

@Autowired
    private ElasticsearchTemplate elasticsearchTemplate;
    
/**
* 查询指定ID
*/
@GetMapping("/get/{id}")
public Object getById(@PathVariable String id){
if(StringUtils.isEmpty(id)){
return Result.error();
}
Optional<User> studentOptional = userRepository.findById(id);
        if(userOptional.isPresent()){
            return userOptional.get();
}
return null;
}


/**
     * 普通搜索
*/
@GetMapping("/search/username")
public Object searchName(String username){
        List<User> users = userRepository.findByName(username);
        return users;
    }
}

然后开始写前端页面,用来结果的展示。在webapp下新建一个index.html文件,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
xml复制代码<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>SSMDemo</title>
</head>
<script>
//selectUser按钮的js功能代码
function selectUser() {
var xmlhttp = new XMLHttpRequest();
xmlhttp.onreadystatechange = function () {
if (xmlhttp.readyState == 4 && xmlhttp.status == 200) {
document.getElementById("test").innerHTML = xmlhttp.responseText;
}
}
xmlhttp.open("POST", "/get/{id}", true);
xmlhttp.setRequestHeader("Content-type", "application/x-www-form-urlencoded");
xmlhttp.send("id=2");
}
    //SearchUser按钮的js功能代码
function SearchUser() {
var xmlhttp = new XMLHttpRequest();
xmlhttp.onreadystatechange = function () {
if (xmlhttp.readyState == 4 && xmlhttp.status == 200) {
document.getElementById("test").innerHTML = xmlhttp.responseText;
}
}
        xmlhttp.open("POST", "/search/username", true);
xmlhttp.setRequestHeader("Content-type", "application/x-www-form-urlencoded");
        xmlhttp.send("username=王五");
}
</script>
<body>
<p id="test">Hello World</p>
<button type="button" onclick="selectUser()">查询用户</button>
<button type="button" onclick="SearchUser()">搜索用户</button>
</body>
</html>

运行项目项目,测试结果如下:

图片

图片

4、Ending

Elasticsearch 是一个分布式、高扩展、高实时的搜索与数据分析引擎。它能很方便的使大量数据具有搜索、分析和探索的能力。所以Elasticsearch 真正强大和公司使用较多的是在海量数据查询方面。但由于实际数据量和篇幅有限,这里只是简单的介绍一下有关Elasticsearch 入门相关的知识,感兴趣的同学可以后续继续学习Elasticsearch 相关的知识,如果有问题或不正确的地方可以在下方或后台留言讨论交流。

往期推荐

  • SpringBoot整合JPA进行数据访问
  • SpringBoot整合Mybatis(上):注解版
  • SpringBoot整合Mybatis(下):配置版

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

美团外卖实时数仓建设实践

发表于 2021-08-30

实时数仓以端到端低延迟、SQL标准化、快速响应变化、数据统一为目标。美团外卖数据智能组总结的最佳实践是:一个通用的实时生产平台跟一个通用交互式实时分析引擎相互配合,同时满足实时和准实时业务场景。两者合理分工,互相补充,形成易开发、易维护且效率高的流水线,兼顾开发效率与生产成本,以较好的投入产出比满足业务的多样性需求。

01 实时场景

实时数据在美团外卖的场景是非常多的,主要有以下几个方面:

  • 运营层面:比如实时业务变化,实时营销效果,当日营业情况以及当日分时业务趋势分析等。
  • 生产层面:比如实时系统是否可靠,系统是否稳定,实时监控系统的健康状况等。
  • C端用户:比如搜索推荐排序,需要实时行为、特点等特征变量的生产,给用户推荐更加合理的内容。
  • 风控侧:实时风险识别、反欺诈、异常交易等,都是大量应用实时数据的场景。

02 实时技术及架构

1. 实时计算技术选型

目前,市面上已经开源的实时技术还是很多的,比较通用的有Storm、Spark Streaming以及Flink,技术同学在做选型时要根据公司的具体业务来进行部署。

美团外卖依托于美团整体的基础数据体系建设,从技术成熟度来讲,公司前几年主要用的是Storm。当时的Storm,在性能稳定性、可靠性以及扩展性上也是无可替代的。但随着Flink越来越成熟,从技术性能上以及框架设计优势上已经超越了Storm,从趋势来讲就像Spark替代MR一样,Storm也会慢慢被Flink替代。当然,从Storm迁移到Flink会有一个过程,我们目前有一些老的任务仍然运行在Storm上,也在不断推进任务迁移。

具体Storm和Flink的对比可以参考上图表格。

2. 实时架构

① Lambda架构

Lambda是比较经典的一款架构,以前实时的场景不是很多,以离线为主,当附加了实时场景后,由于离线和实时的时效性不同,导致技术生态是不一样的。而Lambda架构相当于附加了一条实时生产链路,在应用层面进行一个整合,双路生产,各自独立。在业务应用中,顺理成章成为了一种被采用的方式。

双路生产会存在一些问题,比如加工逻辑Double,开发运维也会Double,资源同样会变成两个资源链路。因为存在以上问题,所以又演进了一个Kappa架构。

② Kappa架构

Kappa从架构设计来讲,比较简单,生产统一,一套逻辑同时生产离线和实时。但是在实际应用场景有比较大的局限性,在业内直接用Kappa架构生产落地的案例不多见,且场景比较单一。这些问题在美团外卖这边同样会遇到,我们也会有自己的一些思考,将会在后面的章节进行阐述。

03 业务痛点

首先,在外卖业务上,我们遇到了一些问题和挑战。在业务早期,为了满足业务需要,一般是Case By Case地先把需求完成。业务对于实时性要求是比较高的,从时效性的维度来说,没有进行中间层沉淀的机会。在这种场景下,一般是拿到业务逻辑直接嵌入,这是能想到的简单有效的方法,在业务发展初期这种开发模式也比较常见。

如上图所示,拿到数据源后,我们会经过数据清洗、扩维,通过Storm或Flink进行业务逻辑处理,最后直接进行业务输出。把这个环节拆开来看,数据源端会重复引用相同的数据源,后面进行清洗、过滤、扩维等操作,都要重复做一遍。唯一不同的是业务的代码逻辑是不一样的,如果业务较少,这种模式还可以接受,但当后续业务量上去后,会出现谁开发谁运维的情况,维护工作量会越来越大,作业无法形成统一管理。而且所有人都在申请资源,导致资源成本急速膨胀,资源不能集约有效利用,因此要思考如何从整体来进行实时数据的建设。

04 数据特点与应用场景

那么如何来构建实时数仓呢?首先要进行拆解,有哪些数据,有哪些场景,这些场景有哪些共同特点,对于外卖场景来说一共有两大类,日志类和业务类。

  • 日志类:数据量特别大,半结构化,嵌套比较深。日志类的数据有个很大的特点,日志流一旦形成是不会变的,通过埋点的方式收集平台所有的日志,统一进行采集分发,就像一颗树,树根非常大,推到前端应用的时候,相当于从树根到树枝分叉的过程(从1到n的分解过程)。如果所有的业务都从根上找数据,看起来路径最短,但包袱太重,数据检索效率低。日志类数据一般用于生产监控和用户行为分析,时效性要求比较高,时间窗口一般是5min或10min,或截止到当前的一个状态,主要的应用是实时大屏和实时特征,例如用户每一次点击行为都能够立刻感知到等需求。
  • 业务类:主要是业务交易数据,业务系统一般是自成体系的,以Binlog日志的形式往下分发,业务系统都是事务型的,主要采用范式建模方式。特点是结构化,主体非常清晰,但数据表较多,需要多表关联才能表达完整业务,因此是一个n到1的集成加工过程。

而业务类实时处理,主要面临的以下几个难点:

  • 业务的多状态性:业务过程从开始到结束是不断变化的,比如从下单->支付->配送,业务库是在原始基础上进行变更的,Binlog会产生很多变化的日志。而业务分析更加关注最终状态,由此产生数据回撤计算的问题,例如10点下单,13点取消,但希望在10点减掉取消单。
  • 业务集成:业务分析数据一般无法通过单一主体表达,往往是很多表进行关联,才能得到想要的信息,在实时流中进行数据的合流对齐,往往需要较大的缓存处理且复杂。
  • 分析是批量的,处理过程是流式的:对单一数据,无法形成分析,因此分析对象一定是批量的,而数据加工是逐条的。

日志类和业务类的场景一般是同时存在的,交织在一起,无论是Lambda架构还是Kappa架构,单一的应用都会有一些问题。因此针对场景来选择架构与实践才更有意义。

05 实时数仓架构设计

1. 实时架构:流批结合的探索

基于以上问题,我们有自己的思考。通过流批结合的方式来应对不同的业务场景。

如上图所示,数据从日志统一采集到消息队列,再到数据流的ETL过程,作为基础数据流的建设是统一的。之后对于日志类实时特征,实时大屏类应用走实时流计算。对于Binlog类业务分析走实时OLAP批处理。

流式处理分析业务的痛点是什么?对于范式业务,Storm和Flink都需要很大的外存,来实现数据流之间的业务对齐,需要大量的计算资源。且由于外存的限制,必须进行窗口的限定策略,最终可能放弃一些数据。计算之后,一般是存到Redis里做查询支撑,且KV存储在应对分析类查询场景中也有较多局限。

实时OLAP怎么实现?有没有一种自带存储的实时计算引擎,当实时数据来了之后,可以灵活的在一定范围内自由计算,并且有一定的数据承载能力,同时支持分析查询响应呢?随着技术的发展,目前MPP引擎发展非常迅速,性能也在飞快提升,所以在这种场景下就有了一种新的可能。这里我们使用的是Doris引擎。

这种想法在业内也已经有实践,且成为一个重要探索方向。阿里基于ADB的实时OLAP方案等。

2. 实时数仓架构设计

从整个实时数仓架构来看,首先考虑的是如何管理所有的实时数据,资源如何有效整合,数据如何进行建设。

从方法论来讲,实时和离线是非常相似的。离线数仓早期的时候也是Case By Case,当数据规模涨到一定量的时候才会考虑如何治理。分层是一种非常有效的数据治理方式,所以在实时数仓如何进行管理的问题上,首先考虑的也是分层的处理逻辑,具体内容如下:

  • 数据源:在数据源的层面,离线和实时在数据源是一致的,主要分为日志类和业务类,日志类又包括用户日志、DB日志以及服务器日志等。
  • 实时明细层:在明细层,为了解决重复建设的问题,要进行统一构建,利用离线数仓的模式,建设统一的基础明细数据层,按照主题进行管理,明细层的目的是给下游提供直接可用的数据,因此要对基础层进行统一的加工,比如清洗、过滤、扩维等。
  • 汇总层:汇总层通过Flink或Storm的简洁算子直接可以算出结果,并且形成汇总指标池,所有的指标都统一在汇总层加工,所有人按照统一的规范管理建设,形成可复用的汇总结果。

总结起来,从整个实时数仓的建设角度来讲,首先数据建设的层次化要先建出来,先搭框架,然后定规范,每一层加工到什么程度,每一层用什么样的方式,当规范定义出来后,便于在生产上进行标准化的加工。由于要保证时效性,设计的时候,层次不能太多,对于实时性要求比较高的场景,基本可以走上图左侧的数据流,对于批量处理的需求,可以从实时明细层导入到实时OLAP引擎里,基于OLAP引擎自身的计算和查询能力进行快速的回撤计算,如上图右侧的数据流。

06 实时平台化建设

架构确定之后,我们后面考虑的是如何进行平台化的建设,实时平台化建设是完全附加于实时数仓管理之上进行的。

首先进行功能的抽象,把功能抽象成组件,这样就可以达到标准化的生产,系统化的保障就可以更深入的建设,对于基础加工层的清洗、过滤、合流、扩维、转换、加密、筛选等功能都可以抽象出来,基础层通过这种组件化的方式构建直接可用的数据结果流。这会产生一个问题,用户的需求多样,为了满足了这个用户,如何兼容其他的用户,因此可能会出现冗余加工的情况。从存储的维度来讲,实时数据不存历史,不会消耗过多的存储,这种冗余是可以接受的,通过冗余的方式可以提高生产效率,是一种以空间换时间思想的应用。

通过基础层的加工,数据全部沉淀到IDL层,同时写到OLAP引擎的基础层,再往上是实时汇总层计算,基于Storm、Flink或Doris,生产多维度的汇总指标,形成统一的汇总层,进行统一的存储分发。

当这些功能都有了以后,元数据管理,指标管理,数据安全性、SLA、数据质量等系统能力也会逐渐构建起来。

1. 实时基础层功能

实时基础层的建设要解决一些问题。首先是一条流重复读的问题,一条Binlog打过来,是以DB包的形式存在的,用户可能只用其中一张表,如果大家都要用,可能存在所有人都要接这个流的问题。解决方案是可以按照不同的业务解构出来,还原到基础数据流层,根据业务的需要做成范式结构,按照数仓的建模方式进行集成化的主题建设。

其次要进行组件的封装,比如基础层的清洗、过滤、扩维等功能,通过一个很简单的表达入口,让用户将逻辑写出来。数据转换环节是比较灵活的,比如从一个值转换成另外一个值,对于这种自定义逻辑表达,我们也开放了自定义组件,可以通过Java或Python开发自定义脚本,进行数据加工。

2. 实时特征生产功能

特征生产可以通过SQL语法进行逻辑表达,底层进行逻辑的适配,透传到计算引擎,屏蔽用户对计算引擎的依赖。就像对于离线场景,目前大公司很少通过代码的方式开发,除非一些特别的Case,所以基本上可以通过SQL化的方式表达。

在功能层面,把指标管理的思想融合进去,原子指标、派生指标,标准计算口径,维度选择,窗口设置等操作都可以通过配置化的方式,这样可以统一解析生产逻辑,进行统一封装。

还有一个问题,同一个源,写了很多SQL,每一次提交都会起一个数据流,比较浪费资源,我们的解决方案是,通过同一条流实现动态指标的生产,在不停服务的情况下可以动态添加指标。

所以在实时平台建设过程中,更多考虑的是如何更有效的利用资源,在哪些环节更能节约化的使用资源,这是在工程方面更多考虑的事情。

3. SLA建设

SLA主要解决两个问题,一个是端到端的SLA,一个是作业生产效率的SLA,我们采用埋点+上报的方式,由于实时流比较大,埋点要尽量简单,不能埋太多的东西,能表达业务即可,每个作业的输出统一上报到SLA监控平台,通过统一接口的形式,在每一个作业点上报所需要的信息,最后能够统计到端到端的SLA。

在实时生产中,由于链路非常长,无法控制所有链路,但是可以控制自己作业的效率,所以作业SLA也是必不可少的。

4. 实时OLAP方案

问题

  • Binlog业务还原复杂:业务变化很多,需要某个时间点的变化,因此需要进行排序,并且数据要存起来,这对于内存和CPU的资源消耗都是非常大的。
  • Binlog业务关联复杂:流式计算里,流和流之间的关联,对于业务逻辑的表达是非常困难的。

解决方案

通过带计算能力的OLAP引擎来解决,不需要把一个流进行逻辑化映射,只需要解决数据实时稳定的入库问题。

我们这边采用的是Doris作为高性能的OLAP引擎,由于业务数据产生的结果和结果之间还需要进行衍生计算,Doris可以利用Unique模型或聚合模型快速还原业务,还原业务的同时还可以进行汇总层的聚合,也是为了复用而设计。应用层可以是物理的,也可以是逻辑化视图。

这种模式重在解决业务回撤计算,比如业务状态改变,需要在历史的某个点将值变更,这种场景用流计算的成本非常大,OLAP模式可以很好的解决这个问题。

07 实时应用案例

最后通过一个案例说明,比如商家要根据用户历史下单数给用户优惠,商家需要看到历史下了多少单,历史T+1的数据要有,今天实时的数据也要有,这种场景是典型的Lambda架构。我们可以在Doris里设计一个分区表,一个是历史分区,一个是今日分区,历史分区可以通过离线的方式生产,今日指标可以通过实时的方式计算,写到今日分区里,查询的时候进行一个简单的汇总。

这种场景看起来比较简单,难点在于商家的量上来之后,很多简单的问题都会变得复杂。后续,我们也会通过更多的业务输入,沉淀出更多的业务场景,抽象出来形成统一的生产方案和功能,以最小化的实时计算资源支撑多样化的业务需求,这也是未来我们需要达到的目的。

阅读美团技术团队更多技术文章合集

前端 | 算法 | 后端 | 数据 | 安全 | 运维 | iOS | Android | 测试

| 在公众号菜单栏对话框回复【2020年货】、【2019年货】、【2018年货】、【2017年货】等关键词,可查看美团技术团队历年技术文章合集。

| 本文系美团技术团队出品,著作权归属美团。欢迎出于分享和交流等非商业目的转载或使用本文内容,敬请注明“内容转载自美团技术团队”。本文未经许可,不得进行商业性转载或者使用。任何商用行为,请发送邮件至tech@meituan.com申请授权。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring5参考指南 事件Event

发表于 2021-08-30

本文已参与掘金创作者训练营第三期「高产更文」赛道,详情查看:掘力计划|创作者训练营第三期正在进行,「写」出个人影响力。

Spring提供了很方便的事件的处理机制,包括事件类ApplicationEvent和事件监听类ApplicationListener。 他实现的是设计者模式,如果实现了ApplicationListener接口的bean部署到Spring容器中,则每次ApplicationEvent发布到ApplicationContext时,都会通知该bean。

从Spring4.2开始,提供了基于注解的事件,即事件对象不一定要从ApplicationEvent来扩展。Spring会自动将其封装成一个事件对象。

下面是Spring的标准事件描述:

Event 解释
ContextRefreshedEvent 在初始化或刷新ApplicationContext时发布(例如,通过在ConfigurableApplicationContext接口上使用refresh()方法)。这里,“初始化”意味着加载所有bean,检测并激活后处理器bean,预先实例化单例,并且ApplicationContext对象准备好使用。只要上下文未关闭,只要所选的ApplicationContext实际上支持此类“热”刷新,就可以多次触发刷新。例如,XMLWebApplicationContext支持热刷新,但GenericApplicationContext不支持。
ContextStartedEvent 在可配置的ApplicationContext接口上使用start()方法启动ApplicationContext时发布。这里,“启动”意味着所有生命周期bean都会收到一个显式的启动信号。通常,此信号用于在显式停止后重新启动bean,但也可以用于启动尚未配置为自动启动的组件(例如,初始化时尚未启动的组件)。
ContextStoppedEvent 在可配置的ApplicationContext接口上使用stop()方法停止ApplicationContext时发布。这里,“停止”意味着所有生命周期bean都会收到一个明确的停止信号。停止的上下文可以通过start()调用重新启动。
ContextClosedEvent 在可配置的ApplicationContext接口上使用close()方法关闭ApplicationContext时发布。这里,“关闭”意味着所有的单例beans都被销毁了。封闭的环境达到了生命的尽头。无法刷新或重新启动。
RequestHandledEvent 一个特定于Web的事件,告诉所有bean HTTP请求已被服务。此事件在请求完成后发布。此事件仅适用于使用Spring的DispatcherServlet的Web应用程序。

基于继承的Event

你也可以自定义事件,下面是一个继承ApplicationEvent的例子:

1
2
3
4
5
6
7
8
9
10
11
java复制代码public class BlackListEvent extends ApplicationEvent {

private final String address;
private final String content;

public BlackListEvent(Object source, String address, String content) {
super(source);
this.address = address;
this.content = content;
}
}

若要发布自定义ApplicationEvent,在ApplicationEventPublisher上调用PublishEvent()方法。通常可以通过实现ApplicationEventPublisherAware接口来实现,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public class EmailService implements ApplicationEventPublisherAware {

private List<String> blackList;
private ApplicationEventPublisher publisher;

public void setBlackList(List<String> blackList) {
this.blackList = blackList;
}

public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}

public void sendEmail(String address, String content) {
if (blackList.contains(address)) {
publisher.publishEvent(new BlackListEvent(this, address, content));
return;
}
}
}

在配置时,Spring容器检测到EmailService实现了ApplicationEventPublisherAware,并自动调用setApplicationEventPublisher()。实际上,传入的参数是Spring容器本身。您正在通过其applicationEventPublisher接口与应用程序上下文进行交互。

要接收定制的applicationEvent,可以创建一个实现applicationListener的类,并将其注册为SpringBean。下面的示例显示了这样的类:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public class BlackListNotifier implements ApplicationListener<BlackListEvent> {

private String notificationAddress;

public void setNotificationAddress(String notificationAddress) {
this.notificationAddress = notificationAddress;
}

public void onApplicationEvent(BlackListEvent event) {
// notify appropriate parties via notificationAddress...
}
}

这里使用了ApplicationListener 的BlackListEvent泛型。意味着onApplicationEvent()方法可以保持类型安全,避免任何向下强制转换的需要。

但请注意,默认情况下,事件侦听器同步接收事件。这意味着publishEvent()方法将一直阻塞,直到所有侦听器完成对事件的处理。

下面是注册和配置bean的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
xml复制代码<bean id="emailService" class="example.EmailService">
<property name="blackList">
<list>
<value>known.spammer@example.org</value>
<value>known.hacker@example.org</value>
<value>john.doe@example.org</value>
</list>
</property>
</bean>

<bean id="blackListNotifier" class="example.BlackListNotifier">
<property name="notificationAddress" value="blacklist@example.org"/>
</bean>

Spring的事件机制是为同一应用程序上下文中SpringBean之间的简单通信而设计的。对于更复杂的企业集成需求,可以使用Spring Integration的AMQP模型来处理。

基于注解的Event

从Spring4.2开始,您可以使用EventListener注解在托管bean的任何公共方法上注册事件侦听器。BlackListNotifier程序可以改写如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public class BlackListNotifierAnnotation {

private String notificationAddress;

public void setNotificationAddress(String notificationAddress) {
this.notificationAddress = notificationAddress;
}

@EventListener
public void processBlackListEvent(BlackListEvent event) {
// notify appropriate parties via notificationAddress...
}
}

如果您的方法应该监听多个事件,或者您想要定义它而不使用任何参数,那么也可以在注解本身上指定事件类型。以下示例显示了如何执行此操作:

1
2
3
java复制代码    @EventListener({ContextStartedEvent.class, ContextRefreshedEvent.class})
public void handleContextStart() {
}

还可以使用定义spEL表达式的注解的条件属性添加其他运行时筛选,该表达式应与实际调用特定事件的方法相匹配。

下面的例子显示了如何重写通知程序,以便仅在事件的内容属性等于my-event时调用:

1
2
3
4
java复制代码    @EventListener(condition = "#blEvent.content == 'my-event'")
public void processBlackListSPELEvent(BlackListEvent blEvent) {
// notify appropriate parties via notificationAddress...
}

下表列出了上下文可用的项,以便您可以将它们用于条件事件处理:

name Location 描述 例子
Event root object 真实的ApplicationEvent #root.event
Arguments array root object 调用目标的参数 #root.args[0]
Argument name evaluation context 任何方法参数的名称。如果由于某种原因,名称不可用(例如,因为没有调试信息),参数名称也可以在 #a<#arg>下使用,其中#arg表示参数索引(从0开始)。 #blEvent or #a0 (也可以使用 #p0 or #p<#arg>)

异步侦听器

如果希望特定的侦听器异步处理事件,可以重用常规的@Async支持。下面是@Async的例子:

1
2
3
4
5
java复制代码    @Async
@EventListener
public void processBlackListEvent(BlackListEvent event) {
// notify appropriate parties via notificationAddress...
}

Listeners排序

如果需要先调用一个监听器,然后再调用另一个监听器,则可以将@order注解添加到方法声明中,如下所示:

1
2
3
4
5
java复制代码    @EventListener
@Order(12)
public void processBlackListEvent(BlackListEvent event) {
// notify appropriate parties via notificationAddress...
}

本文的例子可以参考: event

更多教程请参考 flydean的博客

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring5参考指南 JSR 330标准注解

发表于 2021-08-30

本文已参与掘金创作者训练营第三期「高产更文」赛道,详情查看:掘力计划|创作者训练营第三期正在进行,「写」出个人影响力。

之前的文章我们有讲过,从Spring3.0之后,除了Spring自带的注解,我们也可以使用JSR330的标准注解。不过需要加入maven依赖如下:

1
2
3
4
5
6
7
xml复制代码    <dependencies>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
</dependency>
</dependencies>

下面是标准注解和Spring注解的区别:

Spring javax.inject.* javax.inject限制/描述
@Autowired @Inject @Inject没有required属性,可以使用Java8的Optional代替
@Component @Named / @ManagedBean JSR-330没有提供组合模式,只有一种方式来标记命名组件
@Scope(“singleton”) @Singleton JSR-330默认范围类似Spring的prototype,但是为了和Spring的默认值保持一致,在Spring中定义的JSR-330 bean默认是singleton。如果要使用其他的作用范围,那么需要使用Spring的@Scope注解。javax.inject也提供了一个@Scope注解。但是这个注解仅用来创建你自己的注解。
@Qualifier @Qualifier / @Named javax.inject.Qualifier只是一个用来构建自定义Qualifier的元注解。具体的字符串限定符(如带value的Spring的@Qualifier)可以通过javax.inject.Named关联。
@Value - 没有相同功能
@Required - 没有相同功能
@Lazy - 没有相同功能
ObjectFactory Provider javax.inject.Provider是Spring的ObjectFactory的直接替代品,它只使用了较短的get()方法名。它还可以与Spring的@Autowired结合使用,或者与无注解的构造函数和setter方法结合使用。

下面我们分别来介绍。

@Inject 和 @Named

@Inject可以用来替换@Autowired:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码public class SimpleMovieLister {

private MovieFinder movieFinder;

@Inject
public void setMovieFinder(MovieFinder movieFinder) {
this.movieFinder = movieFinder;
}

public void listMovies() {
this.movieFinder.findMovies();
}
}

与@Autowired一样,你可以在字段级、方法级和构造函数参数级使用@Inject。此外,可以将注入点声明为Provider,允许通过Provider.get() 调用按需访问较短作用域的bean或延迟访问其他bean。下面是Provider的例子:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public class SimpleMovieProviderLister {
private Provider<MovieFinder> movieFinder;

@Inject
public void setMovieFinder(Provider<MovieFinder> movieFinder) {
this.movieFinder = movieFinder;
}

public void listMovies() {
this.movieFinder.get().findMovies();
}
}

可以使用@Named注解来为注入的参数限定名字:

1
2
3
4
java复制代码    @Inject
public void setMovieFinderNamed(@Named("main") MovieFinder movieFinder) {
this.movieFinder = movieFinder;
}

与@Autowired一样,@Inject也可以与java.util.Optional或@Nullable一起使用。下面是例子:

1
2
3
4
5
6
7
java复制代码    @Inject
public void setMovieFinder(Optional<MovieFinder> movieFinder) {
}

@Inject
public void setMovieFinder(@Nullable MovieFinder movieFinder) {
}

@Named 和 @ManagedBean

除了使用@Component,你也可以使用@javax.inject.Named 或者 javax.annotation.ManagedBean,如下:

1
2
3
4
5
6
7
8
9
10
11
java复制代码@Named("movieListener")  // @ManagedBean("movieListener") could be used as well
public class SimpleMovieNamedLister {

private MovieFinder movieFinder;

@Inject
public void setMovieFinder(MovieFinder movieFinder) {
this.movieFinder = movieFinder;
}

}

本节的例子可以参考jsr330

更多教程请参考 flydean的博客

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

netty系列之 搭建自己的下载文件服务器 简介 文件的co

发表于 2021-08-30

这是我参与8月更文挑战的第30天,活动详情查看:8月更文挑战

简介

上一篇文章我们学习了如何在netty中搭建一个HTTP服务器,讨论了如何对客户端发送的请求进行处理和响应,今天我们来讨论一下在netty中搭建文件服务器进行文件传输中应该注意的问题。

文件的content-type

客户端向服务器端请求一个文件,服务器端在返回的HTTP头中会包含一个content-type的内容,这个content-type表示的是返回的文件类型。这个类型应该怎么确认呢?

一般来说,文件类型是根据文件的的扩展名来确认的,根据 RFC 4288的规范,所有的网络媒体类型都必须注册。apache也提供了一个文件MIME type和扩展名的映射关系表。

因为文件类型比较多,我们看几个比较常用到的类型如下:

MIME type 扩展名
image/jpeg jpg
image/jpeg jpeg
image/png png
text/plain txt text conf def list log in
image/webp webp
application/vnd.ms-excel xls
application/vnd.openxmlformats-officedocument.spreadsheetml.sheet xlsx
application/msword doc
application/vnd.openxmlformats-officedocument.wordprocessingml.document docx
application/vnd.openxmlformats-officedocument.presentationml.presentation pptx
application/vnd.ms-powerpoint ppt
application/pdf pdf

JDK提供了一个MimetypesFileTypeMap的类,这个类提供了一个getContentType方法,可以根据请求的文件path信息,来推断其MIME type类型:

1
2
3
4
scss复制代码    private static void setContentTypeHeader(HttpResponse response, File file) {
MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
response.headers().set(HttpHeaderNames.CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath()));
}

客户端缓存文件

对于HTTP的文件请求来说,为了保证请求的速度,会使用客户端缓存的机制。比如客户端向服务器端请求一个文件A.txt。服务器在接收到该请求之后会将A.txt文件发送给客户端。

其请求流程如下:

1
2
3
bash复制代码   步骤1:客户端请求服务器端的文件
===================
GET /file1.txt HTTP/1.1
1
2
3
4
5
6
7
yaml复制代码   步骤2:服务器端返回文件,并且附带额外的文件时间信息:
===================
HTTP/1.1 200 OK
Date: Mon, 23 Aug 2021 17:52:30 GMT+08:00
Last-Modified: Tue, 10 Aug 2021 18:05:35 GMT+08:00
Expires: Mon, 23 Aug 2021 17:53:30 GMT+08:00
Cache-Control: private, max-age=60

一般来说如果客户端是现代浏览器的话,就会把A.txt缓存起来。在下次调用的时候只需要在head中添加If-Modified-Since,询问服务器该文件是否被修改了即可,如果文件没有被修改,则服务器会返回一个304 Not Modified,客户端得到该状态之后就会使用本地的缓存文件。

1
2
3
4
5
6
7
8
9
yaml复制代码   步骤3:客户端再次请求该文件
===================
GET /file1.txt HTTP/1.1
If-Modified-Since: Mon, 23 Aug 2021 17:55:30 GMT+08:00

步骤4:服务器端响应该请求
===================
HTTP/1.1 304 Not Modified
Date: Mon, 23 Aug 2021 17:55:32 GMT+08:00

在服务器的代码层面,我们首先需要返回一个响应中通常需要的日期字段,如Date、Last-Modified、Expires、Cache-Control等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
vbscript复制代码 SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));

// 日期 header
Calendar time = new GregorianCalendar();
log.info(dateFormatter.format(time.getTime()));

response.headers().set(HttpHeaderNames.DATE, dateFormatter.format(time.getTime()));

// 缓存 headers
time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
response.headers().set(HttpHeaderNames.EXPIRES, dateFormatter.format(time.getTime()));
response.headers().set(HttpHeaderNames.CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS);
response.headers().set(
HttpHeaderNames.LAST_MODIFIED, dateFormatter.format(new Date(fileToCache.lastModified())));

然后在收到客户端的二次请求之后,需要比较文件的最后修改时间和If-Modified-Since中自带的时间,如果没有发送变化,则发送304状态:

1
2
ini复制代码FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED, Unpooled.EMPTY_BUFFER);
setDateHeader(response);

其他HTTP中常用的处理

我们讨论了文件类型和缓存,对于一个通用的HTTP服务器来说,还需要考虑很多其他常用的处理,比如异常、重定向和Keep-Alive设置。

对于异常,我们需要根据异常的代码来构造一个DefaultFullHttpResponse,并且设置相应的CONTENT_TYPE头即可,如下所示:

1
2
3
vbscript复制代码FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, status, Unpooled.copiedBuffer("异常: " + status + "\r\n", CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");

重定向同样需要构建一个DefaultFullHttpResponse,其状态是302 Found,并且在响应头中设置location为要跳转的URL地址即可:

1
2
ini复制代码FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND, Unpooled.EMPTY_BUFFER);
response.headers().set(HttpHeaderNames.LOCATION, newUri);

Keep-Alive是HTTP中为了避免每次请求都建立连接而做的一个优化方式。在HTTP/1.0中默认是的keep-alive是false,在HTTP/1.1中默认的keep-alive是true。如果在header中手动设置了connection:false,则server端请求返回也需要同样设置connection:false。

另外,因为HTTP/1.1中默认的keep-alive是true,如果通过HttpUtil.isKeepAlive判断通过之后,还需要判断是否是HTTP/1.0,并显示设置keep-alive为true。

1
2
3
4
5
6
7
scss复制代码final boolean keepAlive = HttpUtil.isKeepAlive(request);
HttpUtil.setContentLength(response, response.content().readableBytes());
if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
} else if (request.protocolVersion().equals(HTTP_1_0)) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}

文件内容展示处理

文件内容展示处理是http服务器的核心,也是比较难以理解的地方。

首先要设置的是ContentLength,也就是响应的文件长度,这个可以使用file的length方法来获取:

1
2
3
4
ini复制代码RandomAccessFile raf;
raf = new RandomAccessFile(file, "r");
long fileLength = raf.length();
HttpUtil.setContentLength(response, fileLength);

然后我们需要根据文件的扩展名设置对应的CONTENT_TYPE,这个在第一小节已经介绍过了。

然后再设置date和缓存属性。这样我们就得到了一个只包含响应头的DefaultHttpResponse,我们先把这个只包含响应头的respose写到ctx中。

写完HTTP头,接下来就是写HTTP的Content了。

对于HTTP传递的文件来说,有两种处理方式,第一种方式情况下如果知道整个响应的content大小,则可以在后台直接进行整个文件的拷贝传输。如果服务器本身支持零拷贝的话,则可以使用DefaultFileRegion的transferTo方法将File或者Channel的文件进行转移。

1
2
3
4
ini复制代码sendFileFuture =
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
// 结束部分
lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);

如果并不知道整个响应的context大小,则可以将大文件拆分成为一个个的chunk,并且在响应的头中设置transfer-coding为chunked,netty提供了HttpChunkedInput和ChunkedFile,用来将大文件拆分成为一个个的Chunk进行传输。

1
2
3
ini复制代码sendFileFuture =
ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
ctx.newProgressivePromise());

如果向channel中写入ChunkedFile,则需要添加相应的ChunkedWriteHandler对chunked文件进行处理。

1
arduino复制代码pipeline.addLast(new ChunkedWriteHandler());

注意,如果是完整文件传输,则需要手动添加last content部分:

1
ini复制代码lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);

如果是ChunkedFile,last content部分已经包含在了chunkedFile中,不需要再手动添加了。

文件传输进度

ChannelFuture可以添加对应的listner,用来监控文件传输的进度,netty提供了一个ChannelProgressiveFutureListener,用于监控文件的进程,可以重写operationProgressed和operationComplete方法对进度监控进行定制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
arduino复制代码        sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
if (total < 0) {
log.info(future.channel() + " 传输进度: " + progress);
} else {
log.info(future.channel() + " 传输进度: " + progress + " / " + total);
}
}

@Override
public void operationComplete(ChannelProgressiveFuture future) {
log.info(future.channel() + " 传输完毕.");
}
});

总结

我们考虑了一个HTTP文件服务器最基本的一些考虑因素,现在可以使用这个文件服务器来提供服务啦!

本文的例子可以参考:learn-netty4

本文已收录于 www.flydean.com/20-netty-fi…

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

spring-data-redis 连接泄漏,我 TM 人傻

发表于 2021-08-30

这是我参与8月更文挑战的第30天

本系列是 我TM人傻了 系列第四期[捂脸],往期精彩回顾:

  • 升级到Spring 5.3.x之后,GC次数急剧增加,我TM人傻了
  • 这个大表走索引字段查询的 SQL 怎么就成全扫描了,我TM人傻了
  • 获取异常信息里再出异常就找不到日志了,我TM人傻了

image

本文基于 Spring Data Redis 2.4.9

最近线上又出事儿了,新上线了一个微服务系统,上线之后就开始报各种发往这个系统的请求超时,这是咋回事呢?

image

还是经典的通过 JFR 去定位(可以参考我的其他系列文章,经常用到 JFR),对于历史某些请求响应慢,我一般按照如下流程去看:

  1. 是否有 STW(Stop-the-world,参考我的另一篇文章:JVM相关 - SafePoint 与 Stop The World 全解):
  2. 是否有 GC 导致的长时间 STW
  3. 是否有其他原因导致进程所有线程进入 safepoint 导致 STW
  4. 是否 IO 花了太长时间,例如调用其他微服务,访问各种存储(硬盘,数据库,缓存等等)
  5. 是否在某些锁上面阻塞太长时间?
  6. 是否 CPU 占用过高,哪些线程导致的?

通过 JFR 发现是很多 HTTP 线程在一个锁上面阻塞了,这个锁是从 Redis 连接池获取连接的锁。我们的项目使用的 spring-data-redis,底层客户端使用 lettuce。为何会阻塞在这里呢?经过分析,我发现 spring-data-redis 存在连接泄漏的问题。

image

我们先来简单介绍下 Lettuce,简单来说 Lettuce 就是使用 Project Reactor + Netty 实现的 Redis 非阻塞响应式客户端。spring-data-redis 是针对 Redis 操作的统一封装。我们项目使用的是 spring-data-redis + Lettuce 的组合。

为了和大家尽量说明白问题的原因,这里先将 spring-data-redis + lettuce API 结构简单介绍下。

首先 lettuce 官方,是不推荐使用连接池的,但是官方没有说,这是什么情况下的决定。这里先放上结论:

  • 如果你的项目中,使用的 spring-data-redis + lettuce,并且使用的都是 Redis 简单命令,没有使用 Redis 事务,Pipeline 等等,那么不使用连接池,是最好的(并且你没有关闭 Lettuce 连接共享,这个默认是开启的)。
  • 如果你的项目中,大量使用了 Redis 事务,那么最好还是使用连接池
  • 其实更准确地说,如果你使用了大量会触发 execute(SessionCallback) 的命令,最好使用连接池,如果你使用的都是 execute(RedisCallback) 的命令,就不太有必要使用连接池了。如果大量使用 Pipeline,最好还是使用连接池。

接下来介绍下 spring-data-redis 的 API 原理。在我们的项目中,主要使用 spring-data-redis 的两个核心 API,即同步的 RedisTemplate 和异步的 ReactiveRedisTemplate。我们这里主要以同步的 RedisTemplate 为例子,说明原理。ReactiveRedisTemplate 其实就是做了异步封装,Lettuce 本身就是异步客户端,所以 ReactiveRedisTemplate 其实实现更简单。

RedisTemplate 的一切 Redis 操作,最终都会被封装成两种操作对象,一是 RedisCallback<T>:

1
2
3
4
java复制代码public interface RedisCallback<T> {
@Nullable
T doInRedis(RedisConnection connection) throws DataAccessException;
}

是一个 Functional Interface,入参是 RedisConnection,可以通过使用 RedisConnection 操作 Redis。可以是若干个 Redis 操作的集合。大部分 RedisTemplate 的简单 Redis 操作都是通过这个实现的。例如 Get 请求的源码实现就是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
vbnet复制代码//在 RedisCallback 的基础上增加统一反序列化的操作
abstract class ValueDeserializingRedisCallback implements RedisCallback<V> {
private Object key;

public ValueDeserializingRedisCallback(Object key) {
this.key = key;
}

public final V doInRedis(RedisConnection connection) {
byte[] result = inRedis(rawKey(key), connection);
return deserializeValue(result);
}

@Nullable
protected abstract byte[] inRedis(byte[] rawKey, RedisConnection connection);
}

//Redis Get 命令的实现

public V get(Object key) {

return execute(new ValueDeserializingRedisCallback(key) {

@Override
protected byte[] inRedis(byte[] rawKey, RedisConnection connection) {
//使用 connection 执行 get 命令
return connection.get(rawKey);
}
}, true);
}

另一种是SessionCallback<T>:

1
2
3
4
5
java复制代码public interface SessionCallback<T> {

@Nullable
<K, V> T execute(RedisOperations<K, V> operations) throws DataAccessException;
}

SessionCallback也是一个 Functional Interface,方法体也是可以放若干个命令。顾名思义,即在这个方法中的所有命令,都是会共享同一个会话,即使用的 Redis 连接是同一个并且不能被共享的。一般如果使用 Redis 事务则会使用这个实现。

RedisTemplate 的 API 主要是以下这几个,所有的命令底层实现都是这几个 API:

  • execute(RedisCallback<?> action) 和 executePipelined(final SessionCallback<?> session):执行一系列 Redis 命令,是所有方法的基础,里面使用的连接资源会在执行后自动释放。
  • executePipelined(RedisCallback<?> action) 和 executePipelined(final SessionCallback<?> session):使用 PipeLine 执行一系列命令,连接资源会在执行后自动释放。
  • executeWithStickyConnection(RedisCallback<T> callback):执行一系列 Redis 命令,连接资源不会自动释放,各种 Scan 命令就是通过这个方法实现的,因为 Scan 命令会返回一个 Cursor,这个 Cursor 需要保持连接(会话),同时交给用户决定什么时候关闭。

image

通过源码我们可以发现,RedisTemplate 的三个 API 在实际应用的时候,经常会发生互相嵌套递归的情况。

例如如下这种:

1
2
3
4
5
6
7
8
9
scss复制代码redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
orders.forEach(order -> {
connection.hashCommands().hSet(orderKey.getBytes(), order.getId().getBytes(), JSON.toJSONBytes(order));
});
return null;
}
});

和

1
2
3
4
5
6
7
8
9
typescript复制代码redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
orders.forEach(order -> {
redisTemplate.opsForHash().put(orderKey, order.getId(), JSON.toJSONString(order));
});
return null;
}
});

是等价的。redisTemplate.opsForHash().put()其实调用的是 execute(RedisCallback) 方法,这种就是 executePipelined 与 execute(RedisCallback) 嵌套,由此我们可以组合出各种复杂的情况,但是里面使用的连接是怎么维护的呢?

其实这几个方法获取连接的时候,使用的都是:RedisConnectionUtils.doGetConnection 方法,去获取连接并执行命令。对于 Lettuce 客户端,获取的是一个 org.springframework.data.redis.connection.lettuce.LettuceConnection. 这个连接封装包含两个实际 Lettuce Redis 连接,分别是:

1
2
3
java复制代码private final @Nullable StatefulConnection<byte[], byte[]> asyncSharedConn;

private @Nullable StatefulConnection<byte[], byte[]> asyncDedicatedConn;
  • asyncSharedConn:可以为空,如果开启了连接共享,则不为空,默认是开启的;所有 LettuceConnection 共享的 Redis 连接,对于每个 LettuceConnection 实际上都是同一个连接;用于执行简单命令,因为 Netty 客户端与 Redis 的单处理线程特性,共享同一个连接也是很快的。如果没开启连接共享,则这个字段为空,使用 asyncDedicatedConn 执行命令。
  • asyncDedicatedConn:私有连接,如果需要保持会话,执行事务,以及 Pipeline 命令,固定连接,则必须使用这个 asyncDedicatedConn 执行 Redis 命令。

我们通过一个简单例子来看一下执行流程,首先是一个简单命令:redisTemplate.opsForValue().get("test"),根据之前的源码分析,我们知道,底层其实就是 execute(RedisCallback),流程是:

image

可以看出,如果使用的是 RedisCallback,那么其实不需要绑定连接,不涉及事务。Redis 连接会在回调内返回。需要注意的是,如果是调用 executePipelined(RedisCallback),需要使用回调的连接进行 Redis 调用,不能直接使用 redisTemplate 调用,否则 pipeline 不生效:

Pipeline 生效:

1
2
3
4
5
6
7
8
dart复制代码List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
connection.get("test".getBytes());
connection.get("test2".getBytes());
return null;
}
});

Pipeline 不生效:

1
2
3
4
5
6
7
8
dart复制代码List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
redisTemplate.opsForValue().get("test");
redisTemplate.opsForValue().get("test2");
return null;
}
});

然后,我们尝试将其加入事务中,由于我们的目的不是真的测试事务,只是为了演示问题,所以,仅仅是用 SessionCallback 将 GET 命令包装起来:

1
2
3
4
5
6
java复制代码redisTemplate.execute(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
return operations.opsForValue().get("test");
}
});

这里最大的区别就是,外层获取连接的时候,这次是 bind = true 即将连接与当前线程绑定,用于保持会话连接。外层流程如下:

image

里面的 SessionCallback 其实就是 redisTemplate.opsForValue().get("test"),使用的是共享的连接,而不是独占的连接,因为我们这里还没开启事务(即执行 multi 命令),如果开启了事务使用的就是独占的连接,流程如下:
image

由于 SessionCallback 需要保持连接,所以流程有很大变化,首先需要绑定连接,其实就是获取连接放入 ThreadLocal 中。同时,针对 LettuceConnection 进行了封装,我们主要关注这个封装有一个引用计数的变量。每嵌套一次 execute 就会将这个计数 + 1,执行完之后,就会将这个计数 -1, 同时每次 execute 结束的时候都会检查这个引用计数,如果引用计数归零,就会调用 LettuceConnection.close()。

接下来再来看,如果是 executePipelined(SessionCallback) 会怎么样:

1
2
3
4
5
6
7
typescript复制代码List<Object> objects = redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
operations.opsForValue().get("test");
return null;
}
});

其实与第二个例子在流程上的主要区别在于,使用的连接不是共享连接,而是直接是独占的连接。

image

最后我们再来看一个例子,如果是在 execute(RedisCallback) 中执行基于 executeWithStickyConnection(RedisCallback<T> callback) 的命令会怎么样,各种 SCAN 就是基于 executeWithStickyConnection(RedisCallback<T> callback) 的,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
scss复制代码redisTemplate.execute(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
Cursor<Map.Entry<Object, Object>> scan = operations.opsForHash().scan((K) "key".getBytes(), ScanOptions.scanOptions().match("*").count(1000).build());
//scan 最后一定要关闭,这里采用 try-with-resource
try (scan) {

} catch (IOException e) {
e.printStackTrace();
}
return null;
}
});

这里 Session callback 的流程,如下图所示,因为处于 SessionCallback,所以 executeWithStickyConnection 会发现当前绑定了连接,于是标记 + 1,但是并不会标记 - 1,因为 executeWithStickyConnection 可以将资源暴露到外部,例如这里的 Cursor,需要外部手动关闭。
image

image

在这个例子中,会发生连接泄漏,首先执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
scss复制代码redisTemplate.execute(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
Cursor<Map.Entry<Object, Object>> scan = operations.opsForHash().scan((K) "key".getBytes(), ScanOptions.scanOptions().match("*").count(1000).build());
//scan 最后一定要关闭,这里采用 try-with-resource
try (scan) {

} catch (IOException e) {
e.printStackTrace();
}
return null;
}
});

这样呢,LettuceConnection 会和当前线程绑定,并且在结束时,引用计数不为零,而是 1。并且 cursor 关闭时,会调用 LettuceConnection 的 close。但是 LettuceConnection 的 close 的实现,其实只是标记状态,并且把独占的连接 asyncDedicatedConn 关闭,由于当前没有使用到独占的连接,所以为空,不需要关闭;如下面源码所示:

LettuceConnection:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
ini复制代码@Override
public void close() throws DataAccessException {
super.close();

if (isClosed) {
return;
}

isClosed = true;

if (asyncDedicatedConn != null) {
try {
if (customizedDatabaseIndex()) {
potentiallySelectDatabase(defaultDbIndex);
}
connectionProvider.release(asyncDedicatedConn);
} catch (RuntimeException ex) {
throw convertLettuceAccessException(ex);
}
}

if (subscription != null) {
if (subscription.isAlive()) {
subscription.doClose();
}
subscription = null;
}

this.dbIndex = defaultDbIndex;
}

之后我们继续执行一个 Pipeline 命令:

1
2
3
4
5
6
7
8
dart复制代码List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
connection.get("test".getBytes());
redisTemplate.opsForValue().get("test");
return null;
}
});

这时候由于连接已经绑定到当前线程,同时同上上一节分析我们知道第一步解开释放这个绑定,但是调用了 LettuceConnection 的 close。执行这个代码,会创建一个独占连接,并且,由于计数不能归零,导致连接一直与当前线程绑定,这样,这个独占连接一直不会关闭(如果有连接池的话,就是一直不返回连接池)

即使后面我们手动关闭这个链接,但是根据源码,由于状态 isClosed 已经是 true,还是不能将独占链接关闭。这样,就会造成连接泄漏。

针对这个 Bug,我已经向 spring-data-redis 一个 Issue:Lettuce Connection Leak while using execute(SessionCallback) and executeWithStickyConnection in same thread by random turn

image

  • 尽量避免使用 SessionCallback,尽量仅在需要使用 Redis 事务的时候,使用 SessionCallback。
  • 使用 SessionCallback 的函数单独封装,将事务相关的命令单独放在一起,并且外层尽量避免再继续套 RedisTemplate 的 execute 相关函数。

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringCloudAlibaba全网最全讲解8️⃣之Sl

发表于 2021-08-30

这是我参与8月更文挑战的第30天,活动详情查看:8月更文挑战

🌈专栏简介

**感谢阅读,希望能对你有所帮助,博文若有瑕疵请在评论区留言或在主页个人介绍中添加我私聊我,感谢每一位小伙伴不吝赐教。我是XiaoLin,既会写bug也会唱rap的男孩,这个专栏主要是介绍目前微服务最主流的解决方案,SpringCloudAlibaba,将会分组件介绍。专栏地址: [SpringCloudAlibaba](https://juejin.cn/column/7001291481705086990)。**
  • SpringCloudAlibaba全网最全讲解7️⃣之Gateway(建议收藏)
  • SpringCloudAlibaba全网最全讲解6️⃣之Sentinel(建议收藏)
  • SpringCloudAlibaba全网最全讲解5️⃣之Feign(建议收藏)
  • SpringCloudAlibaba全网最全讲解4️⃣之Ribbon(建议收藏)
  • SpringCloudAlibaba全网最全讲解3️⃣之Nacos(建议收藏)
  • SpringCloudAlibaba全网最全讲解2️⃣(建议收藏)
  • SpringCloudAlibaba全网最全讲解1️⃣(建议收藏)

十、链路追踪:Sleuth&Zipkin

10.1、链路追踪简介

在大型系统的微服务化构建中,一个系统被拆分成了许多模块。这些模块负责不同的功能,组合成系统,最终可以提供丰富的功能。


在这种架构中,一次请求往往需要涉及到多个服务。互联网应用构建在不同的软件模块集上,这些软件模块,有可能是由不同的团队开发、可能使用不同的编程语言来实现、有可能布在了几千台服务器,横跨多个不同的数据中心,也就意味着这种架构形式也会存在一些问题:
  1. 如何快速发现问题?
  2. 如何判断故障影响范围?
  3. 如何梳理服务依赖以及依赖的合理性?
  4. 如何分析链路性能问题以及实时容量规划?
分布式链路追踪(Distributed Tracing),就是将一次分布式请求还原成调用链路,进行日志记录,性能监控并将一次分布式请求的调用情况集中展示。比如各个服务节点上的耗时、请求具体到达哪台机器上、每个服务节点的请求状态等等。

image-20210507222709097

常见的链路追踪技术有下面这些:
  1. cat:由大众点评开源,基于Java开发的实时应用监控平台,包括实时应用监控,业务监控 。 集成方案是通过代码埋点的方式来实现监控,比如: 拦截器,过滤器等。 对代码的侵入性很大,集成成本较高。风险较大。
  2. zipkin:由Twitter公司开源,开放源代码分布式的跟踪系统,用于收集服务的定时数据,以解决微服务架构中的延迟问题,包括:数据的收集、存储、查找和展现。该产品结合spring-cloud-sleuth使用较为简单, 集成很方便, 但是功能较简单。
  3. pinpoint:Pinpoint是韩国人开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI功能强大,接入端无代码侵入。
  4. skywalking:SkyWalking是本土开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI功能较强,接入端无代码侵入。目前已加入Apache孵化器。
  5. Sleuth:SpringCloud 提供的分布式系统中链路追踪解决方案。
SpringCloud alibaba技术栈中并没有提供自己的链路追踪技术的,我们可以采用Sleuth +Zinkin来做链路追踪解决方案。

10.2、Sleuth入门

10.2.1、Sleuth简介

SpringCloud Sleuth主要功能就是在分布式系统中提供追踪解决方案。它大量借用了Google Dapper的设计, 先来了解一下Sleuth中的术语和相关概念。
  • Trace

由一组Trace Id相同的Span串联形成一个树状结构。为了实现请求跟踪,当请求到达分布式系统的入口端点时,只需要服务跟踪框架为该请求创建一个唯一的标识(即TraceId),同时在分布式系统内部流转的时候,框架始终保持传递该唯一值,直到整个请求的返回。那么我们就可以使用该唯一标识将所有的请求串联起来,形成一条完整的请求链路。

  • Span

代表了一组基本的工作单元。为了统计各处理单元的延迟,当请求到达各个服务组件的时候,也通过一个唯一标识(SpanId)来标记它的开始、具体过程和结束。通过SpanId的开始和结束时间戳,就能统计该span的调用时间,除此之外,我们还可以获取如事件的名称。请求信息等元数据。

  • Annotation

用它记录一段时间内的事件,内部使用的重要注释:

+ cs(Client Send)客户端发出请求,开始一个请求的生命。
+ sr(Server Received)服务端接受到请求开始进行处理, sr-cs = 网络延迟(服务调用的时间)。
+ ss(Server Send)服务端处理完毕准备发送到客户端,ss - sr = 服务器上的请求处理时间。
+ cr(Client Reveived)客户端接受到服务端的响应,请求结束。 cr - sr = 请求的总时间。

image-20210507223230617

10.2.2、集成链路追踪组件Sleuth

Sleuth的使用及其简单,直接引入一个依赖即可。
1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
我们随便在一个服务里面打印日志,可以在控制台观察到sleuth的日志输出:

在这里插入图片描述

日志参数详解:

[product-service,d1e92e984eaec1ff,d1e92e984eaec1ff,true]

  1. 第一个值,spring.application.name的值,代表服务名
  2. 第二个值,d1e92e984eaec1ff,sleuth生成的一个ID,叫Trace ID,用来标识一条请求链路,一条请求链路中包含一个Trace ID,多个Span ID
  3. 第三个值,d1e92e984eaec1ff、spanID 基本的工作单元,获取元数据,如发送一个http
  4. 第四个值:true,是否要将该信息输出到zipkin服务中来收集和展示。
查看日志文件并不是一个很好的方法,当微服务越来越多日志文件也会越来越多,通过Zipkin可以将日志聚合,并进行可视化展示和全文检索。

10.3、Zipkin+Sleuth整合

10.3.1、ZipKin介绍

Zipkin 是 Twitter 的一个开源项目,它基于Google Dapper实现,它致力于收集服务的定时数据,以解决微服务架构中的延迟问题,包括数据的收集、存储、查找和展现。我们可以使用它来收集各个服务器上请求链路的跟踪数据,并通过它提供的REST API接口来辅助我们查询跟踪数据以实现对分布式系统的监控程序,从而及时地发现系统中出现的延迟升高问题并找出系统性能瓶颈的根源。除了面向开发的 API 接口之外,它也提供了方便的UI组件来帮助我们直观的搜索跟踪信息和分析请求链路明细,比如:可以查询某段时间内各用户请求的处理时间等。


Zipkin 提供了可插拔数据存储方式:In-Memory、MySql、Cassandra 以及 Elasticsearch。

image-20210508094417349

它主要由 4 个核心组件构成:
  1. Collector:收集器组件,它主要用于处理从外部系统发送过来的跟踪信息,将这些信息转换为Zipkin内部处理的 Span 格式,以支持后续的存储、分析、展示等功能。
  2. Storage:存储组件,它主要对处理收集器接收到的跟踪信息,默认会将这些信息存储在内存中,我们也可以修改此存储策略,通过使用其他存储组件将跟踪信息存储到数据库中。
  3. RESTful API:API 组件,它主要用来提供外部访问接口。比如给客户端展示跟踪信息,或是外接系统访问以实现监控等。
  4. Web UI:UI 组件, 基于API组件实现的上层应用。通过UI组件用户可以方便而有直观地查询和分析跟踪信息。
Zipkin分为两端,一个是 Zipkin服务端,一个是 Zipkin客户端,客户端也就是微服务的应用。 客户端会配置服务端的 URL 地址,一旦发生服务间的调用的时候,会被配置在微服务里面的 Sleuth 的监听器监听,并生成相应的 Trace 和 Span 信息发送给服务端。

10.3.2、 ZipKin服务端安装

下载ZipKin的jar包

官网下载地址,下载了以后是一个jar包。

通过命令启动

1
shell复制代码java -jar jar包名字

访问 http://localhost:9411

image-20210508095432711

10.3.3、Zipkin+Sleuth整合

在Shop-produuct-server和Shop-order-server中加入依赖

1
2
3
4
xml复制代码    <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>

在Shop-produuct-server和Shop-order-server中加入application.yml

1
2
3
4
5
6
7
yaml复制代码spring:
zipkin:
base-url: http://127.0.0.1:9411/ #zipkin server的请求地址
discoveryClientEnabled: false #让nacos把它当成一个URL,而不要当做服务名
sleuth:
sampler:
probability: 1.0 #采样的百分比

访问测试

image-20210508102949805

image-20210508103004885

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一篇入门Nodejs可不可行?【实打实的使用Node,就够了

发表于 2021-08-30

本文已参与掘金创作者训练营第三期「话题写作」赛道,详情查看:掘力计划|创作者训练营第三期正在进行,「写」出个人影响力。

这是一篇对Node使用和各个方面的整体介绍,力争进行足够详细的且精简的介绍,实现一篇就入门的目的。如有问题欢迎指正或批评!

Node的介绍和流行就不再赘述了,直接撸起袖子,实打实的使用Node,先从其概念和基本使用开始!

Node介绍

NodeJs官网

Node基于V8引擎开发,实现了脱离于浏览器运行js,使得JavaScript可以编写Web服务器后台应用,甚至现在,桌面应用(Electron…)、手机App(React Native、uni-app…)等市场也开始被js占领。

Webpack、Gulp,Npm等前端工程化的工具都是依赖于Node。node也被广泛应用于中间层服务器,负责IO读写、数据查询等。

Node.js 就是运行在服务端的 JavaScript,是一个基于Chrome JavaScript 运行时建立的一个平台。

Node.js是一个事件驱动I/O的服务端JavaScript环境,基于Google的V8引擎,V8引擎执行Javascript的速度非常快,性能非常好。加上前端的蓬勃发展,js已经逐渐发展为一门全栈、全端的语言,也使得Node如此受欢迎!

Node使用

下载安装

从 nodejs.cn/download/cu… 下载最新的Nodejs版本。目前最新的长期支持版本为 v14.17.5 。

选择对应的版本下载即可。

下载好后。双击运行安装程序:


选择安装路径:

自定义安装中,选择需要的项。通常保持默认,都选择即可:

不要勾选自动安装所需工具:

install等待安装完成即可:

查看版本、运行js文件

1
2
sh复制代码> node -v
v14.17.0

进入node环境运行js和js文件

命令中,输入node,回车:

1
2
3
4
5
6
7
8
9
10
11
12
sh复制代码> node
Welcome to Node.js v14.17.0.
Type ".help" for more information.
> let a=1;
undefined
> a+1
2
> a++
1
> a
2
> .exit

Node环境的退出使用.exit命令。或者按键Ctrl+D,或两次按键Ctrl+C。

执行一个js文件。新建test.js文件,文件内容为console.log("Hello World!");。

在test.js文件所在目录下,启动命令工具cmd或powershell。如下:

1
2
3
4
sh复制代码> node test.js
Hello World!
> node test
Hello World!

NPM(Node Package Manager)包管理器

NPM命令和CNPM使用

在使用和开发Nodejs项目时,通常使用npm管理用到的包及包之间的依赖。

下面是各npm命令的简要介绍和使用。

  • npm init,用于初始化一个项目.
  • npm install -g cnpm --registry=https://registry.npm.taobao.org 安装cnpm,cnpm是npm的国内镜像(淘宝),基本和npm保持同步,国内使用可以免除网络连接的限制。
  • cnpm i packagename或cnpm install packagename,安装一个包。

如安装jquery:cnpm i jquery。

新版npm install仅表示将node包下载到node_modules文件夹,并不会将其作为依赖添加到package.json中。

  • cnpm un packagename或cnpm uninstall packagename,将包从node_modules文件夹下移除,并删除package.json对应依赖。
  • cnpm i packagename -S:安装包并将其添加到package.json的依赖中。
  • cnpm i packagename -D:安装包并将其添加到package.json的开发者依赖中。
  • npm update xxx更新某个包。
  • 直接运行cnpm i或cnpm install,可以将 package.json 中的依赖全部安装

package.json文件,里面包含用到的包信息(node或js库、框架等),用以管理需要的包和包的依赖。通过npm install安装包时,可以指定安装在哪个依赖下,这样会修改package.json中的内容,维护包及其依赖。

包依赖分类

开发依赖:devDependencies

开发环境依赖,仅次于 dependencies 的使用频率!它的对象定义和dependencies一样,只不过它里面的包只用于开发环境,不用于生产环境,这些包通常是单元测试或者打包工具等,例如gulp, grunt, webpack, moca, coffee等

  • 安装命令:
1
sh复制代码npm install package-name --save-dev

或

1
sh复制代码npm install package-name -D

生产依赖:dependencies

应用依赖,或者叫做业务依赖,这是最常用的依赖包管理对象!它用于指定应用依赖的外部包,这些依赖是应用发布后正常执行时所需要的,但不包含测试时或者本地打包时所使用的包。

  • 安装命令
1
sh复制代码npm install package-name --save

或

1
sh复制代码npm install package-name -S

参考npm–02–开发依赖和生产依赖

推荐后续node项目使用Yarn包管理器,后续文章会详细介绍yarn!

Node在多系统用户情况下的使用

一般在node安装后,切换到另一个用户下命令行下输入node -v和npm -v能够正确获取版本号。

但是通过npm全局安装的模块,无法在命令行下获取和运行。

解决办法是重新安装node或者修复安装node即可解决。

如果修复安装node后,npm全局安装模块后仍然无法使用,再次的解决办法是将%APPDATA%\npm路径添加到用户的环境变量中,如Administrator用户,路径为:C:\Users\Administrator\AppData\Roaming\npm,添加环境变量

也可以直接添加到系统的PATH环境变量中。

Node的模块

回调函数

首先说一下Node中的回调函数。因此在Node中编写或导入模块时,通常会用到很多的回调函数。

回调函数是JavaScript中对异步编程的实现,你会在Node或js中发现大量的回调函数的使用。

所谓回调函数,就是把任务的第二段单独写在一个函数里面,等到需要执行第二段任务的时候,就直接调用这个函数。

Node 约定,回调函数的第一个参数,必须是错误对象err(如果没有错误,该参数就是null)

原因是执行分成两段,第一段执行完以后,任务所在的上下文环境就已经结束了。在这以后抛出的错误,原来的上下文环境已经无法捕捉,只能当作参数,传入第二段。

Generator 函数的异步应用

全局模块(对象)

随时随地都可以访问,不需要引用。如process。

process.env 环境变量

比如根据不同环境变量输出不同内容:

1
2
3
4
5
6
7
js复制代码// process.env.dev=true;
if(process.env.dev){
console.log("我是开发环境");
}
else{
console.log("我是生产环境");
}

process.argv 输入的命令和参数

如下新建test.js文件,内容为:

1
2
3
4
5
js复制代码console.log(process.argv);

let num1=parseInt(process.argv[2]);
let num2=parseInt(process.argv[3]);
console.log(num1+num2);

执行:

1
2
3
4
5
6
7
8
sh复制代码>node test 20 40
[
'C:\\Program Files\\nodejs\\node.exe',
'E:\\vscode\\Node接触\\Node接触使用1\\test',
'20',
'40'
]
60

__dirname 路径名

__dirname 用于获取输出当前执行的文件所在的路径。

Buffer 对象

Buffer对象是Node处理二进制数据的一个接口。它是Node原生提供的全局对象,可直接使用。

Buffer代表一个缓冲区,主要用于操作二进制数据流。是一个类似数组的对象,成员是8位的一个字节(取值为0到255的整数值)。

  • Buffer对象使用

如下,实例化一个Buffer对象,并赋值和取值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
js复制代码// 生成一个256字节的Buffer实例
var bytes = new Buffer(256);

// 遍历每个字节,写入内容
for (var i = 0; i < bytes.length; i++) {
bytes[i] = i;
}


// 生成一个buffer的view
// 取出从240字节到256字节
var end = bytes.slice(240, 256);

end[0] // 240
end[0] = 0;
end[0] // 0
  • Buffer拷贝

如下,copy 方法将 bytes 实例的4号到7号成员的这一段,拷贝到more实例从0号开始的区域。

1
2
3
4
5
6
7
8
9
js复制代码var bytes = new Buffer(8);

for (var i = 0; i < bytes.length; i++) {
bytes[i] = i;
}

var more = new Buffer(4);
bytes.copy(more, 0, 4, 8); // bytes.length
more[0] // 4
  • Buffer和字符串的转换

将字符串转换为Buffer对象,可以使用:

new Buffer(str[,encoding]) —— 根据一个字符串和编码格式创建buffer,不指定编码时默认使用utf8。

将Buffer对象转换为字符串:

toString方法将Buffer实例,按照指定编码(默认为utf8)转为字符串。

1
2
3
js复制代码var hello = new Buffer('Hello');
hello // <Buffer 48 65 6c 6c 6f>
hello.toString() // "Hello"
  • Buffer构造函数
  1. new Buffer(size),创建一个指定大小的buffer。
  2. new Buffer(array),根据一个字节数组来创建一个buffer,数组成员必须是整数值。
  3. new Buffer(str[,encoding]),根据一个字符串和编码格式创建buffer,不指定编码时默认使用utf8。
  4. new Buffer(buffer),根据buffer实例创建一个新的buffer。
  • 类方法
  1. Buffer.isEncoding('utf8') 检测是否为有效的编码参数。即检查 utf8 是否是有效的。
  2. Buffer.isBuffer(Date) 检查是否是一个Buffer对象。
  3. Buffer.byteLength('Hello', 'utf8') 返回字符串实际占据的字节长度,默认utf8。
  4. Buffer.concat() 将一组Buffer对象合并为一个Buffer对象。
  • 实例方法
  1. write() 写入数据。第一个参数是所写入的内容,第二个参数(可省略)是所写入的起始位置(默认从0开始),第三个参数(可省略)是编码方式,默认为utf8。
  2. slice() 返回一个按照指定位置、从原对象引用出来的Buffer实例。它的两个参数分别为起始和终止位置。slice方法创造原内存的一个视图(view),返回的buffer跟原buffer引用的是同一块内存。
  3. toString() 转为字符串。
  4. toJSON() 将Buffer实例转为JSON对象。

系统模块

系统内置,需要require()引入,但不需要单独下载。

path

path:用于处理文件路径和目录路径的使用工具。

下面是path提供的几个方法的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
js复制代码let path=require('path');

// 获取目录路径
let dir= path.dirname("/node/a/b/c/1.jpg");
// 文件名
let filename = path.basename("/node/a/b/c/1.jpg");
// 后缀名
let extname = path.extname("/node/a/b/c/1.jpg");

console.log(dir);
console.log(filename);
console.log(extname);

// resolve处理多个参数共同组合下的路径
console.log(path.resolve("/node/a/b/c/d","../","e/f/g","../../","h"));

运行输出:

1
2
3
4
5
sh复制代码$ node test.js
/node/a/b/c
1.jpg
.jpg
e:\node\a\b\c\e\h

fs

fs:用于文件读写操作。

如下是使用fs读取文件、覆盖写入文件、追加写文件,及同步读写文件的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
js复制代码let fs=require("fs");

// 读取一个文件
fs.readFile("./a.txt",(err,data)=>{
if (err) {
console.log(err);
}
else{
// 输出为二进制的 <Buffer 31 32 33 2c 0d 0a 61 62 63 2c 0d 0a e4 bd a0 e5 a5 bd e3 80 82>
//console.log(data);
console.log(data.toString());
}
})
// 写入一个文件,文件不存在将创建,存在则覆盖内容
fs.writeFile("b.txt","你好,这是使用node写入一个文件(覆盖)",err=>{
console.log(err);
})

// {flag:"a"} append 追加到一个文件
fs.writeFile("b.txt", "你好,这是使用node写入一个文件(追加)",{flag:"a"}, err => {
console.log(err);
})

//append 追加到一个文件方法
// fs.appendFile(filename,data,[options],callback);
fs.appendFile("b.txt", '你好,这是使用node写入一个文件(追加)', function () {
console.log('追加内容完成');
});

//同步读取
let fileContent=fs.readFileSync("a.txt");
console.log(fileContent.toString());

//同步写
let write=fs.writeFileSync("b.text","同步写文件方法");
console.log(write);

自定义模块

Nodejs支持自定义的模块,同时使用 require 可以引入自己封装的模块。以及引入和使用通过npm安装的第三方模块。

模块的导出:

  • exports导出,exports 是一个对象,通过它可以将一个变量或参数暴露出去。

如,在一个js文件中:

1
2
js复制代码exports.a=1;
exposts.b="你好,Node!";

在另一个文件中引入,并输出变量:

1
2
3
4
js复制代码/* 引入自定义模块 */
let importData=require("./exportsUse");
console.log(importData.a);
console.log(importData.b);
  • module导出,通过 module.exports 可以导出变量、对象、函数等。
1
2
3
4
5
6
7
8
js复制代码module.exports=class{
constructor(name){
this.name=name;
}
holle(){
console.log("你好:"+this.name);
}
}

引入

1
2
3
js复制代码let mod = require("./exportsUse");
var m=new mod("张三");
m.holle();

require引入

上面已经演示了 require 引入的使用。主要介绍下引入时的注意点。

引入自定义模块时:必须使用 ./ 指定当前路径的模块,即必须指定模块的路径。

否则报错 Cannot find module。

不指定路径 将从 node_modules 路径下查找模块。

require引入时的路径查找

  1. 默认从当前路径的node_modules下查找。
  2. 如果没有则从node安装目录下查找。
  3. 其他路径的模块,必须指定模块的路径,否则无法找到。

http模块

http模块是由 Node 提供的用于创建web服务的系统模块。可以很方便的启用一个处理http请求的web服务器。

http模块使用

  • 创建服务器对象:http.createServer()

如下,httpSerer.js文件中创建一个web服务器,监听8080端口:

1
2
3
4
5
6
7
8
js复制代码let http=require("http");

http.createServer((req,res)=>{
console.log("这是node创建的Web Server");//会在服务器终端打印

res.write("hello,i'm a Web Server!");
res.end();
}).listen(8080);

node运行:

1
2
3
sh复制代码$ node httpServer
这是node创建的Web Server
这是node创建的Web Server

在浏览器中访问本地的8080端口: http://localhost:8080,显示结果如下:

http web server根据请求返回页面

如下,创建一个node server,根据请求的url返回对应的文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
js复制代码let http=require("http");
let fs=require("fs");

http.createServer((req,res)=>{
fs.readFile(`./${req.url}`,(err,data)=>{
if (err) {
res.writeHead(404);
res.end("404 Not Found");
}else{
res.end(data);
}
})

}).listen(8080);
console.log('Web服务已启动,访问 http://localhost:8080 测试');

在命令中启动 httpServer.js :

1
2
sh复制代码$ node httpServer.js
Web服务已启动,访问 http://localhost:8080 测试

当请求到存在文件的路径时,会返回文件中内容:

1.html 是与 httpServer.js 位于同级目录的新建的HTML文件。

其他情况则返回404的提示:

http服务的数据交互

下面对web请求的处理,均基于上面的 httpSerer.js 文件中 createServer 回调。

GET方法

通过url模块,处理get请求中url的查询字符串及路由信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
js复制代码let fs=require("fs");
let url=require("url");

// 输出解析的请求的url
console.log(url.parse(req.url));

/*
Url {
protocol: null,
slashes: null,
auth: null,
host: null,
port: null,
hostname: null,
hash: null,
search: '?username=zhangsan&password=123456',
query: 'username=zhangsan&password=123456',
pathname: '/login',
path: '/login?username=zhangsan&password=123456',
href: '/login?username=zhangsan&password=123456'
}
*/

// 解析url时格式化查询字符串,query格式化为json对象
console.log(url.parse(req.url,true));
/*
Url {
protocol: null,
slashes: null,
auth: null,
host: null,
port: null,
hostname: null,
hash: null,
search: '?username=zhangsan&password=123456',
query: [Object: null prototype] { username: 'zhangsan', password: '123456' },
pathname: '/login',
path: '/login?username=zhangsan&password=123456',
href: '/login?username=zhangsan&password=123456'
}
*/

//获取请求路由和查询字符串对象
let {pathname,query}=url.parse(req.url,true);
console.log(pathname,query);
/* 结果
/login [Object: null prototype] { username: 'zhangsan', password: '123456' }
*/

POST方法

POST方法将请求的数据放在请求体中,因此可以传输大数据(<2G,有的是1G)。

因此在POST实际传输中数据是被一段一段的传输到服务器的,利用这一点,可以在Node Server端接受每一段的数据,最后组合成发送过来的完整数据。

req.on("data",(data)=>{...})方法,可以接受每一段传输的数据;req.on("end",()=>{...})方法,会在数据接收完后执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
js复制代码let http=require("http");
let querystring=require("querystring");

let result=[];
//接收每一段的buffer
req.on("data",buffer=>{
result.push(buffer);
})
// 接收完成,处理数据
req.on("end",()=>{
let data=Buffer.concat(result);
console.log(data);
console.log(data.toString());
console.log(querystring.parse(data.toString()));
})
/* 输出
<Buffer 75 73 65 72 6e 61 6d 65 3d 7a 68 61 6e 67 73 61 6e 26 70 61 73 73 77 6f 72 64 3d 31 32 33 34 35 36>
username=zhangsan&password=123456
*/
/* querystring.parse 输出
[Object: null prototype] { username: 'zhangsan', password: '123456' }
*/

参考

介绍的目录结构和顺序主要参考自前端面试加分福音–node基础(内容有些老,但是介绍的流程和思路结构还是非常棒的)、Buffer对象,以及参考了网上其他的资料,除了文中给出的参考链接,不再一一列出,所有内容都是结合实际示例进行介绍。

本篇示例源码下载

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…544545546…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%