开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

dbcp数据库连接池工作原理--源码系列

发表于 2021-09-07

准备

pom.xml

1
2
3
4
5
6
xml复制代码<!-- dbcp连接池-->
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.4</version>
</dependency>

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public class DbcpUtil {
private static DataSource dataSource = null;
//静态代码块初始化数据库连接池
static {
//通过配置文件dbcp.properties 加载dbcp相关配置文件
PropertiesUtil.load("dbcp.properties");
try {
//一个数据源 一个应用范围内加载一次
dataSource = BasicDataSourceFactory.createDataSource(PropertiesUtil.getProperties());
} catch (Exception e) {
e.printStackTrace();
}
}

//直接从连接池获取连接
public static Connection getConnection() {
Connection connection = null;
try {
connection = dataSource.getConnection();
} catch (SQLException e) {
e.printStackTrace();
}
return connection;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ini复制代码public static void main(String[] args) {
//获取连接
Connection connection = DbcpUtil.getConnection();

String sql1 = " select * from t_user tu where name = '张三'";
Statement statement = null;
try {
statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql1);
//第四步:获取结果
while (resultSet.next()) {
String name = resultSet.getString("name");
Integer age = resultSet.getInt("age");
System.out.println("name:" + name + " age:" + age); //打印输出结果集
}
} catch (SQLException e) {
e.printStackTrace();
}

}

到这里我们就能简单的跑一个dbcp数据库连接池的demo了

源码级原理

核心类

BasicDataSourceFactory类

BasicDataSource的工厂类,创建BasicDataSource对象,此类能查看所有数据库连接池的配置

BasicDataSource类

1、主要通过public Connection getConnection() throws SQLException方法对外提供数据库连接

2、步骤1中的getConnection()是通过内部持有的DataSource 对象提供的。后面会提到是PoolingDataSource的实例。

3、通过protected synchronized DataSource createDataSource()经过一系列处理 得到PoolingDataSource的实例

PoolingDataSource类

PoolingDataSource类持有连接池ObjectPool对象,并且通过连接池对外提供getConnection()方法。

GenericObjectPool类

上面PoolingDataSource的连接池对象,提供核心方法

1
2
3
4
5
6
7
8
9
10
11
java复制代码//获取连接
Object borrowObject() throws Exception, NoSuchElementException, IllegalStateException;

//返回连接
void returnObject(Object obj) throws Exception;

//校验连接
void invalidateObject(Object obj) throws Exception;

//添加连接
void addObject() throws Exception, IllegalStateException, UnsupportedOperationException;

PoolableConnectionFactory类

上面GenericObjectPool类的工厂类
核心方法

1
2
3
4
5
6
7
8
9
10
java复制代码//为数据库连接池提供一个连接对象
Object makeObject() throws Exception;
//为数据库连接池销毁一个连接对象
void destroyObject(Object obj) throws Exception;
//数据库连接池校验连接是否可用
boolean validateObject(Object obj);
//激活连接能够为连接池所用
void activateObject(Object obj) throws Exception;
//从连接池返回空闲状态
void passivateObject(Object obj) throws Exception;

核心流程

初始化过程

  1. 入口 加载配置 创建BasicDataSource 数据源
1
ini复制代码BasicDataSource dataSource = BasicDataSourceFactory.createDataSource(PropertiesUtil.getProperties());

这里是通过BasicDataSourceFactory工厂创建的,经过这一步的BasicDataSource实例并没有与数据库有交互,只是简单处理了一下配置项。

注意:如果需要查看dbcp支持哪些配置,这个BasicDataSourceFactory能看到每个配置项。

  1. BasicDataSource数据源赋予能力(主要是注入的datasource的能力)

image.png
展开看整个流程 忽略上面的对象不为空直接返回
image.png

2.1、加载驱动并且返回 DriverConnectionFactory 对象,该对象具有物理创建数据库连接Connection的能力
image.png
2.2、创建一个连接池 默认是GenericObjectPool类的实例 set各种properpties

image.png

2.3、创建一个PoolableConnectionFactory 并且把它注入上面连接池,为上面连接池提供make,destory数据库连接的能力

2.4、创建真实的数据源PoolingDataSource 的对象,并且PoolingDataSource持有GenericObjectPool数据库连接池对象。GenericObjectPool对象对外核心的从连接池获取,释放连接的能力。

获取连接过程

经过了上面整个初始化,现在dbcp连接池具有了对外提供数据库连接的能力。

1
2
3
js复制代码BasicDataSource.getConnection()
-->PoolingDataSource.getConnection()
---->GenericObjectPool.borrowObject()

borrowObject流程图

borrowObject.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

原来这就是比 ThreadLocal 更快的玩意? 数数 T

发表于 2021-09-07

你好,我是yes。

继上一篇之后我把 ThreadLocal 能问的,都写了之后,咱们再来盘一盘 FastThreadLocal ,这个算是 ThreadLocal 的进阶版,是 Netty 针对 ThreadLocal 自己造的轮子,所以对 ThreadLocal 没有完全理解的话,建议先看上一篇文章,打个基础。

那了解 FastThreadLocal 之后呢,对平日的一些优化可能可以提供一些思路,或者面试就能装个x。

面试官:ThreadLocal 竟然有xxx这个缺点,那怎么优化啊?

你就把 FastThreadLocal 的实现 BB 一遍,这不就稳妥了嘛!

所以,今天我们就来看看 Netty 是如何实现 FastThreadLocal 的,话不多说,本文大纲如下:

  • 数数 ThreadLocal 的缺点。
  • 应该如何针对 ThreadLocal 缺点改进?
  • FastThreadLocal 的原理。
  • FastThreadLocal VS ThreadLocal 的实操。

这篇下来,进阶版 ThreadLocal 基本拿下,下篇我会基于这篇做一个延伸,一个比较底层的延伸,属于绝对装x的那种,等下看文章你就知道了,我会埋坑的,哈哈。

发车发车!

数数 ThreadLocal 的缺点

看完上篇文章的同学,应该都很清楚了 ThreadLocal 的一个缺点:hash 冲突用的是线性探测法,效率低。


可以看到,图上显示的是经过两个遍历找到了空位,假设冲突多了,需要遍历的次数就多了。并且下次 get 的时候,hash 直接命中的位置发现不是要找的 Entry ,于是就接着遍历向后找,所以说这个效率低。

而像 HashMap 是通过链表法来解决冲突,并且为了防止链表过长遍历的开销变大,在一定条件之后又会转变成红黑树来查找,这样的解决方案在频繁冲突的条件下,肯定是优于线性探测法,所以这是一个优化方向。

不过 FastThreadLocal 不是这样优化的,我们下面再说。

还有一个缺点是 ThreadLocal 使用了 WeakReference 以保证资源可以被释放,但是这可能会产生一些 Etnry 的 key 为 null,即无用的 Entry 存在。

所以调用 ThreadLocal 的 get 或 set 方法时,会主动清理无用的 Entry,减轻内存泄漏的发生。


这其实等于把清理的开销弄到了 get 和 set 上,万一 get 的时候清理的无用 Entry 特别多,那这次 get 相对而言就比较慢了。

还有一个就是内存泄漏的问题了,当然这个问题只存在于用线程池使用的时候,并且上面也提到了 get 和 set 的时候也能清理一些无用的 Key,所以没有那么的夸张,只要记得用完后调用 ThreadLocal#remove 就不会有内存泄漏的问题了。

大致就这么几点。

应该如何针对 ThreadLocal 缺点改进

所以怎么改呢?

前面提到 ThreadLocal hash 冲突的线性探测法不好,还有 Entry 的弱引用可能会发生内存泄漏,这些都和 ThreadLocalMap 有关,所以需要搞个新的 map 来替换 ThreadLocalMap。

而这个 ThreadLocalMap 又是 Thread 里面的一个成员变量,这么一看 Thread 也得动一动,但是我们又无法修改 Thread 的代码,所以配套的还得弄个新的 Thread。

所以我们不仅得弄个新的 ThreadLocal、ThreadLocalMap 还得弄个配套的 Thread 来用上新的 ThreadLocalMap 。

所以如果想改进 ThreadLocal ,就需要动这三个类。

对应到 Netty 的实现就是 FastThreadLocal、InternalThreadLocalMap、FastThreadLocalThread


然后发散一下思维,既然 Hash 冲突的想线性探测效果不好,你可能比较容易想到的就是上面提到的链表法,然后再基于链表法说个改成红黑树,这个确实是一方面,但是可以再想想。

比如,让 Hash 不冲突,所以设计一个不会冲突的 hash 算法?不存在的!

所以怎么样才不会产生冲突呢?

各自取号入座

什么意思?就是每往 InternalThreadLocalMap 中塞入一个新的 FastThreadLocal 对象,就给这个对象发个唯一的下标,然后让这个对象记住这个下标,到时候去 InternalThreadLocalMap 找 value 的时候,直接通过下标去取对应的 value 。

这样不就不会冲突了?

这就是 FastThreadLocal 给出的方案,具体下面分析。

还有个内存泄漏的问题,这个其实只要规范的使用即用完后 remove 就好了,其实也没太好的解决方案,不过 FastThreadLocal 曲线救国了一下,这个也且看下面的分析!

FastThreadLocal 的原理

以下 Netty 基于 4.1 版本分析

先来看下 FastThreadLocal 的定义:


可以看到有个叫 variablesToRemoveIndex 的类成员,并且用 final 修饰的,所以等于每个 FastThreadLocal 都有个共同的不可变 int 值,值为多少等下分析。

然后看到这个 index 没,在 FastThreadLocal 构造的时候就被赋值了,且也被 final 修饰,所以也不可变,这个 index 就是我上面说的给每个新 FastThreadLocal 都发个唯一的下标,这样每个 index 就都知道自己的位置了。

上面两个 index 都是通过 InternalThreadLocalMap.nextVariableIndex() 赋值的,盲猜一下,这个肯定是用原子类递增实现的。

我们来看一下实现:


确实,在 InternalThreadLocalMap 也定义了一个静态原子类,每次调用 nextVariableIndex 就返回且递增,没有什么别的赋值操作,从这里也可以得知 variablesToRemoveIndex 的值为 0,因为它属于常量赋值,第一次调用时 nextIndex 的值为 0 。

看到这,不知道大家是否已经感觉到一丝不对劲了。好像有点浪费空间的意思,我们继续往下看。

InternalThreadLocalMap 对标的就是之前的 ThreadLocalMap 也就是 ThreadLocal 缺点集中的类,需要重点看下。

我们再来回顾一下 ThreadLocalMap 的定义。


它是个 Entry 数组,然后 Entry 里面弱引用了 ThreadLocal 作为 Key。

而 InternalThreadLocalMap 有点不太一样:


可以看到, InternalThreadLocalMap 好像放弃了 map 的形式,没用定义 key 和 value,而是一个 Object 数组?

那它是如何通过 Object 来存储 FastThreadLocal 和对应的 value 的呢?我们从 FastThreadLocal#set 开始分析:

因为我们已经熟悉 ThreadLocal 的套路,所以我们知道 InternalThreadLocalMap 肯定是 FastThreadLocalThread 里面的一个变量。

然后我们从对应的 FastThreadLocalThread 里面拿到了 map 之后,就要执行塞入操作即 setKnownNotUnset。

我们先看一下塞入操作里面的 setIndexedVariable 方法:

可以看到,根据传入构造 FastThreadLocal 生成的唯一 index 可以直接从 Object 数组里面找到下标并且进行替换,这样一来压根就不会产生冲突,逻辑很简单,完美。

那如果塞入的 value 不是 UNSET(默认值),则执行 addToVariablesToRemove 方法,这个方法又有什么用呢?


是不是看着有点奇怪?这是啥操作?别急,看我画个图来解释解释:


这就是 Object 数组的核心关系图了,第一个位置放了一个 set ,set 里面存储了所有使用的 FastThreadLocal 对象,然后数组后面的位置都放 value。

那为什么要放一个 set 保存所有使用的 FastThreadLocal 对象?

用于删除,你想想看,假设现在要清空线程里面的所有 FastThreadLocal ,那必然得有一个地方来存放这些 FastThreadLocal 对象,这样才能找到这些家伙,然后干掉。

所以刚好就把数组的第一个位置腾出来放一个 set 来保存这些 FastThreadLocal 对象,如果要删除全部 FastThreadLocal 对象的时候,只需要遍历这个 set ,得到 FastThreadLocal 的 index 找到数组对应的 位置将 value 置空,然后把 FastThreadLocal 从 set 中移除即可。

刚好 FastThreadLocal 里面实现了这个方法,我们来看下:


内容可能有点多了,我们做下小结,理一理上面说的:

首先 InternalThreadLocalMap 没有采用 ThreadLocalMap k-v形式的存储方式,而是用 Object 数组来存储 FastThreadLocal 对象和其 value,具体是在第一个位置存放了一个包含所使用的 FastThreadLocal 对象的 set,然后后面存储所有的 value。

之所以需要个 set 是为了存储所有使用的 FastThreadLocal 对象,这样就能找到这些对象,便于后面的删除工作。

之所以数组其他位置可以直接存储 value ,是因为每个 FastThreadLocal 构造的时候已经被分配了一个唯一的下标,这个下标对应的就是 value 所处的下标。

看到这里,不知道大家是否有感受到空间的浪费?

我举个例子。

假设系统里面一个 new 了 100 个 FastThreadLocal ,那第 100 个 FastThreadLocal 的下标就是 100 ,这个应该没有疑义。

从上面的 set 方法可以得知,只有调用 set 的时候,才会从当前线程中拿出 InternalThreadLocalMap ,然后往这个 map 的数组里面塞入 value,这里我们再回顾一下 set 的方法。


那这里是什么意思呢?

如果我这个线程之前都没塞过 FastThreadLocal ,此时要塞入第一个 FastThreadLocal ,构造出来的数组长度是32,但是这个 FastThreadLocal 的下标已经涨到了 100 了,所以这个线程第一次塞值,也仅仅只有这么一个值,数组就需要扩容。

看到没,这就是我所说的浪费,空间被浪费了。

Netty 相关实现者知道这样会浪费空间,所以数组的扩容是基于 index 而不是原先数组的大小,你看看如果是基于原先数组的扩容,那么第一次扩容 2 倍,32 变成 64,还是塞不下下标 100 的数据,所以还得扩容一次,这就不美了。

所以可以看到扩容传进去的参数是 index 。


可以看到,直接基于 index 的向上 2 次幂取整。然后就是扩容的拷贝,这里是直接进行数组拷贝,不需要进行 rehash,而 ThreadLocalMap 的扩容需要进行rehash,也就是重新基于 key 的 hash 值进行位置的分配,所以这个也是 FastThreadLocal 优于ThreadLocal 的一个点。

对了,上面那个向上 2 次幂取整的操作,不知道你们熟悉不熟悉,这个和 HashMap 的实现是一致的。


咳咳,但是我没有证据,只能说优秀的代码,就是源远流长。

所以从上面的实现可以得知 Netty 就是特意这样设计的,用多余的空间去换取不会冲突的 set 和 get ,这样写入和获取的速度就更快了,这就是典型的空间换时间。

好了,想必此时你已经弄懂了 FastThreadLocal 的核心原理了,我们再来看看 get 方法的实现,我想你应该能脑补这个实现了。


是吧,没啥难度,index 就是 FastThreadLocal 构造时候预先分配好的那个下标,然后直接进行一个数组下标查找,如果没找到就调用 init 方法进行初始化。

我们这里再继续探究一下InternalThreadLocalMap.get(),这里面做了一个兼容。不过我要先介绍一下 FastThreadLocalThread ,就是这玩意替代了 Thread。


可以看到它继承了 Thread ,并且弄了一个成员变量就是我们前面说的 InternalThreadLocalMap。

然后我们再来看一下 get 方法,我截了好几个,不过逻辑很简单。

这里之所以分了 fastGet 和 slowGet 是为了做一个兼容,假设有个不熟悉的人,他用了 FastThreadLocal 但是没有配套使用 FastThreadLocalThread ,然后调用 FastThreadLocal#get 的时候去 Thread 里面找 InternalThreadLocalMap 那不就傻了吗,会报错的。

所以就再弄了个 slowThreadLocalMap ,它是个 ThreadLocal ,里面保存 InternalThreadLocalMap 来兼容一下这个情况。

从这里我们也能得知,FastThreadLocal 最好和 FastThreadLocalThread 配套使用,不然就隔了一层了。

1
2
3
4
5
6
7
typescript复制代码FastThreadLocal<String> threadLocal = new FastThreadLocal<String>();
Thread t = new FastThreadLocalThread(new Runnable() { //记得要 new FastThreadLocalThread
public void run() {
threadLocal.get();
....
}
});

好了,get 和 set 这两个核心操作都分析完了,我们最后再来看一下 remove 操作吧。

在这里插入图片描述
很简单对吧,把数组里的 value 给覆盖了,然后再到 set 里把对应的 FastThreadLocal 对象给删了。

不过看到这里,可能有人会发出疑惑,内存泄漏相关的点呢?

其实吧,可以看到 FastThreadLocal 就没用弱引用,所以它把无用 FastThreadLocal 的清理就寄托到规范使用上,即没用了就主动调用 remove 方法。

但是它曲线救国了一下,我们来看一下 FastThreadLocalRunnable 这个类:

我已经把重点画出来了,可以看到这个 Runnable 执行完毕之后,会主动调用 FastThreadLocal.removeAll() 来清理所有的 FastThreadLocal,这就是我说的曲线救国,怕你完了调用 remove ,没事我帮你封装一下,就是这么贴心。

当然,这个前提是你不能用 Runnable 而是用 FastThreadLocalRunnable。不过这里 Netty 也是做了封装的。

Netty 实现了一个 DefaultThreadFactory 工厂类来创建线程。

你看,你传入 Runnable 是吧,没事,我把它包成 FastThreadLocalRunnable,并且我 new 回去的线程是 FastThreadLocalThread 类型,这样就能在很大程度上避免使用的错误,也减少了使用的难度。

这也是工厂方法这个设计模式的好处之一啦。所以工程上如果怕对方没用对,我们就封装了再给别人使用,这样也屏蔽了一些细节,他好你也好。

所以说多看看开源框架的源码,有很多可以学习的地方!好了,FastThreadLocal 原理大致就说到这里。

FastThreadLocal VS ThreadLocal

到此,我们已经充分了解了两者之间的不同,但是 Fast 到底有多 Fast 呢?

我们用实验说话,Netty 源码里面已经有 benchmark 了,我们直接跑就行了

里面有两个实验:

FastPath 对应的是使用 FastThreadLocalThread 线程对象。

SlowPath 对应的是使用 Thread 线程对象。


两个实验都是分别定义了 ThreadLocal 和 FastThreadLocal :

我们来看一下执行的结果:

FastPath:

SlowPath:

可以看到搭配 FastThreadLocalThread 来使用 FastThreadLocal 吞吐确实比使用 ThreadLocal 大,但是好像也没大太多?

不过,我在网上有看别比人的 benchmark 对比,同样的代码,他的结果是大了三倍。


我反正又跑了几遍,每次都比原生的 ThreadLocal 吞吐好,但是也没好那么多…有点奇怪。

至于 FastThreadLocal 搭配 Thread 则吞吐比 ThreadLocal 都少,说明 FastThreadLocal 的使用必须得搭配 FastThreadLocalThread ,不然就是反向优化了。

代码在 netty 的 microbench 这个项目里,有兴趣的可以自己 down 下来跑一跑看看。

最后

我们再来总结一下:

  • FastThreadLocal 通过分配下标直接定位 value ,不会有 hash 冲突,效率较高。
  • FastThreadLocal 采用空间换时间的方式来提高效率。
  • FastThreadLocal 需要配套 FastThreadLocalThread 使用,不然还不如原生 ThreadLocal。
  • FastThreadLocal 使用最好配套 FastThreadLocalRunnable,这样执行完任务后会主动调用 removeAll 来移除所有 FastThreadLocal ,防止内存泄漏。
  • FastThreadLocal 的使用也是推荐用完之后,主动调用 remove。

这就是 Netty 实现的加强版 ThreadLocal,如果你看过 Netty 源码,你会发现内部是有挺多使用 ThreadLocal 的场景,所以这个优化还是有必要的。

并且 Netty work 线程池默认线程数是两倍 CPU 核心数,所以线程不会太多,那么空间的浪费其实也不会很多,所以这波空间换时间影响不大。

好了,文章就到这了。挖个坑,我在 InternalThreadLocalMap 这个类里面发现了一些奇怪的 long 变量。


懂行的同学看着可能知道,这是为了填充 Cache Line,避免伪共享问题的产生。

ok ,那为什么被标记了@deprecated?并且说将来的版本要被移除?

且听下回分解。

推荐阅读:我把 ThreadLocal 能问的,都写了


我是yes,从一点点到亿点点,我们下篇见!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot整合Spring Data Elasti

发表于 2021-09-07

前言

本文讲述了SpringBoot整合Spring Data Elasticsearch的详细过程,主要使用了ElasticsearchRestTemplate、ElasticsearchRepository等类来实现Index、DocumentCRUD操作的Java Api。

在开始之前,我们首先需要选择Es对应版本的jar包,本文所使用的Es版本为7.9.3,查看spring官网可知,对应的springboot版本为2.4.x。

image.png

整合

创建一个maven项目,就可以操作起来了。

配置

pom.xml

引入以下jar包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
xml复制代码<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.0</version>
<relativePath/>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.71</version>
</dependency>
</dependencies>

application.properties

1
2
ini复制代码spring.data.elasticsearch.repositories.enabled = true
spring.elasticsearch.rest.uris=localhost:9200

ElasticsearchRestTemplate配置

1
2
3
4
5
6
7
8
9
10
11
12
13
scala复制代码@Configuration
public class ElasticsearchRestTemplateConfig extends AbstractElasticsearchConfiguration {
@Value("${spring.elasticsearch.rest.uris}")
private String uris;

@Override
public RestHighLevelClient elasticsearchClient() {
ClientConfiguration configuration = ClientConfiguration.builder()
.connectedTo(uris)
.build();
return RestClients.create(configuration).rest();
}
}

代码

Model

Model类似于数据库实体,不过此处所映射的是Index和Document的字段。注解含义可查看Spring官网。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
typescript复制代码@Data
@Document(indexName = "order", shards = 1, replicas = 1)
public class Order implements Serializable {
@Id
private Integer id;

@Field(type = FieldType.Keyword)
private Long orderNo;

@Field(type = FieldType.Integer)
private Integer orderType;

@Field(type = FieldType.Long)
private Long orderAmount;

@Field(type = FieldType.Text, analyzer = "ik_smart", searchAnalyzer = "ik_max_word")
private String orderDesc;

@Field(type = FieldType.Keyword, analyzer = "ik_smart", searchAnalyzer = "ik_max_word")
private String username;

@Field(type = FieldType.Keyword, analyzer = "ik_smart", searchAnalyzer = "ik_max_word")
private String userPhone;

private Map<String, List<String>> highlights;
}

Repository

ElasticsearchRepository接口封装了Document的CRUD操作,我们直接定义接口继承它即可。

1
2
vbnet复制代码public interface OrderRepository extends ElasticsearchRepository<Order, Integer> {
}

Service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scss复制代码public interface OrderService {
void saveAll(List<Order> orders);

Order findById(Integer id);

void deleteById(Integer id);

void updateById(Order order);

PageResponse<Order> findList(Order order, Integer pageIndex, Integer pageSize);

PageResponse<Order> findAll(Integer pageIndex, Integer pageSize);

PageResponse<Order> findHighlight(Order order, Integer pageIndex, Integer pageSize);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
scss复制代码@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
OrderRepository orderRepository;

@Autowired
ElasticsearchRestTemplate elasticsearchRestTemplate;

@Override
public void saveAll(List<Order> orders) {
orderRepository.saveAll(orders);
}

@Override
public void deleteById(Integer id) {
orderRepository.deleteById(id);
}

@Override
public void updateById(Order order) {
orderRepository.save(order);
}

@Override
public PageResponse<Order> findList(Order order, Integer pageIndex, Integer pageSize) {
CriteriaQuery criteriaQuery = new CriteriaQuery(new Criteria()
.and(new Criteria("orderDesc").contains(order.getOrderDesc()))
.and(new Criteria("orderNo").is(order.getOrderNo())))
.setPageable(PageRequest.of(pageIndex, pageSize));

SearchHits<Order> searchHits = elasticsearchRestTemplate.search(criteriaQuery, Order.class);
List<Order> result = searchHits.get().map(SearchHit::getContent).collect(Collectors.toList());
PageResponse<Order> pageResponse = new PageResponse<Order>();
pageResponse.setTotal(searchHits.getTotalHits());
pageResponse.setResult(result);
return pageResponse;
}

@Override
public PageResponse<Order> findAll(Integer pageIndex, Integer pageSize) {
Page<Order> page = orderRepository.findAll(PageRequest.of(pageIndex, pageSize));

PageResponse<Order> pageResponse = new PageResponse<Order>();
pageResponse.setTotal(page.getTotalElements());
pageResponse.setResult(page.getContent());
return pageResponse;
}

@Override
public PageResponse<Order> findHighlight(Order order, Integer pageIndex, Integer pageSize) {
if (order == null) {
PageResponse<Order> pageResponse = new PageResponse<Order>();
pageResponse.setTotal(0L);
pageResponse.setResult(new ArrayList<>());
return pageResponse;
}

CriteriaQuery criteriaQuery = new CriteriaQuery(new Criteria()
.and(new Criteria("orderNo").is(order.getOrderNo()))
.and(new Criteria("orderDesc").contains(order.getOrderDesc())))
.setPageable(PageRequest.of(pageIndex, pageSize));

HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("orderNo").field("orderDesc");
highlightBuilder.requireFieldMatch(false);
highlightBuilder.preTags("<h3 style="color:blue">");
highlightBuilder.postTags("</h3>");

HighlightQuery highlightQuery = new HighlightQuery(highlightBuilder);
criteriaQuery.setHighlightQuery(highlightQuery);

SearchHits<Order> searchHits = elasticsearchRestTemplate.search(criteriaQuery, Order.class);

List<Order> result = searchHits.get().map(e -> {
Order element = e.getContent();
element.setHighlights(e.getHighlightFields());
return element;
}).collect(Collectors.toList());

PageResponse<Order> pageResponse = new PageResponse<Order>();
pageResponse.setTotal(searchHits.getTotalHits());
pageResponse.setResult(result);
return pageResponse;
}

@Override
public Order findById(Integer id) {
return orderRepository.findById(id).orElse(null);
}

Controller

  • Index操作
    使用ElasticsearchRestTemplate直接就可以创建和删除索引。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
less复制代码@RequestMapping("/index/")
@RestController
public class IndexController {

@Autowired
private ElasticsearchRestTemplate elasticsearchRestTemplate;

/**
* 创建索引
*/
@GetMapping("create")
public String create(@RequestParam String indexName) {
IndexOperations indexOperations = elasticsearchRestTemplate.indexOps(IndexCoordinates.of(indexName));
if (indexOperations.exists()) {
return "索引已存在";
}
indexOperations.create();
return "索引创建成功";
}

/**
* 删除索引
*/
@GetMapping("delete")
public String delete(@RequestParam String indexName) {
IndexOperations indexOperations = elasticsearchRestTemplate.indexOps(IndexCoordinates.of(indexName));
indexOperations.delete();
return "索引删除成功";
}
}
  • Document操作
    主要包括简单查询、增删改、分页查询、高亮搜索。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
less复制代码@RequestMapping("/doc/")
@RestController
public class DocController {

@Autowired
OrderService orderService;

/**
* 批量创建
*/
@PostMapping("saveBatch")
public String saveBatch(@RequestBody List<Order> orders) {
if (CollectionUtils.isEmpty(orders)) {
return "文档不能为空";
}
orderService.saveAll(orders);
return "保存成功";
}

/**
* 根据id删除
*/
@GetMapping("deleteById")
public String deleteById(@RequestParam Integer id) {
orderService.deleteById(id);
return "删除成功";
}

/**
* 根据id更新
*/
@PostMapping("updateById")
public String updateById(@RequestBody Order order) {
orderService.updateById(order);
return "更新成功";
}

/**
* 根据id搜索
*/
@GetMapping("findById")
public String findById(@RequestParam Integer id) {
return JSON.toJSONString(orderService.findById(id));
}

/**
* 分页搜索所有
*/
@GetMapping("findAll")
public String findAll(@RequestParam Integer pageIndex, @RequestParam Integer pageSize) {
return JSON.toJSONString(orderService.findAll(pageIndex, pageSize));
}

/**
* 条件分页搜索
*/
@GetMapping("findList")
public String findList(@RequestBody Order order, @RequestParam Integer pageIndex, @RequestParam Integer pageSize) {
return JSON.toJSONString(orderService.findList(order, pageIndex, pageSize));
}

/**
* 条件高亮分页搜索
*/
@GetMapping("findHighlight")
public String findHighlight(@RequestBody(required = false) Order order, @RequestParam Integer pageIndex, @RequestParam Integer pageSize) {
return JSON.toJSONString(orderService.findHighlight(order, pageIndex, pageSize));
}
}

测试

直接postman访问controller进行测试即可。
示例:

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

简单的三步教你下载PyCharm汉化插件,让你学习Pytho

发表于 2021-09-07

你还在为了看不懂PyCharm而烦恼嘛?是不是看到全是英文你就怕了,嘿嘿,其实早在很久之前咱们官方就出了PyCharm的汉化插件啦,今天我就手把手教你下载。

第一步:打开PyCharm

打开之后找到File中的Settings…打开

image.png

第二步:点击Plugins

看到右上角有个搜索的标志吗?

image.png

第三步:搜索Chinese

下载重启就好啦

image.png

够不够详细,觉得帮助到了你的同学欢迎给我点个赞呀!喜欢嘛?喜欢的可以给我三连一下啦

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

08篇 要给Nacos的UDP通信功能点个赞

发表于 2021-09-07

学习不用那么功利,二师兄带你从更高维度轻松阅读源码~

Nacos在服务注册功能中使用到了UDP的通信方式,主要功能就是用来辅助服务实例变化时对客户端进行通知。然而,对于大多数使用Nacos的程序员来说,可能还不知道这个功能,更别说灵活运用了。

看完整个源码的实现,还是要为这一功能点个赞的,可以说非常巧妙和实用。但在实现上有一些不足,文末会进行指出。

本篇文章就带大家从源码层面来分析一下Nacos 2.0中是如何基于UDP协议来实现服务实例变更的通知。

UDP通知基本原理

在分析源码之前,先来从整体上看一下Nacos中UDP的实现原理。

Nacos UDP基本原理

我们知道,UDP协议通信是双向的,没有所谓的客户端和服务端,因此在客户端和服务器端都会开启UDP的监听。客户端是单独开启一个线程来处理UDP消息的。当采用HTTP协议与注册中心通信时,,在客户端调用服务订阅接口时,会将客户端的UPD信息(IP和端口)上送到注册中心,注册中心以PushClient对象来进行封装和存储。

当注册中心有实例变化时,会发布一个ServiceChangeEvent事件,注册中心监听到这个事件之后,会遍历存储的PushClient,基于UDP协议对客户端进行通知。客户端接收到UDP通知,即可更新本地缓存的实例列表。

前面我们已经知道,基于HTTP协议进行服务注册时,会有一个实例更新的时间差,因为是通过客户端定时拉取服务器中的实例列表。如果拉取太频繁,注册中心压力比较大,如果拉取的周期比较长,实例的变化又没办法快速感知到。而UDP协议的通知,恰恰弥补了这一缺点,所以说,要为基于UDP通知这个功能点个赞。

下面就来看看源码层面是如何实现的。

客户端UDP通知监听与处理

客户端在实例化NamingHttpClientProxy时,在其构造方法中会初始化PushReceiver。

1
2
3
4
5
6
7
8
9
arduino复制代码public NamingHttpClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListManager serverListManager,
Properties properties, ServiceInfoHolder serviceInfoHolder) {
// ...
// 构建BeatReactor
this.beatReactor = new BeatReactor(this, properties);
// 构建UDP端口监听
this.pushReceiver = new PushReceiver(serviceInfoHolder);
// ...
}

PushReceiver的构造方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
arduino复制代码public PushReceiver(ServiceInfoHolder serviceInfoHolder) {
try {
// 持有ServiceInfoHolder引用
this.serviceInfoHolder = serviceInfoHolder;
// 获取UDP端口
String udpPort = getPushReceiverUdpPort();
// 根据端口情况,构建DatagramSocket,如果未设置端口,则采用随机端口
if (StringUtils.isEmpty(udpPort)) {
this.udpSocket = new DatagramSocket();
} else {
this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
}
// 创建只有一个线程的ScheduledExecutorService
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});

// 执行线程,PushReceiver实现了Runnable接口
this.executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}

PushReceiver的构造方法做了以下操作:

  • 第一、持有ServiceInfoHolder对象引用;
  • 第二、获取UDP端口;
  • 第三、实例化DatagramSocket对象,用于发送和接收Socket数据;
  • 第四,创建线程池,并执行PushReceiver(实现了Runnable接口);

既然PushReceiver实现了Runnable接口,run方法肯定是需要重新实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
swift复制代码@Override
public void run() {
while (!closed) {
try {

// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
// 创建DatagramPacket用于存储接收到的报文
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
// 接收报文,在未接收到报文时会进行线程阻塞
udpSocket.receive(packet);
// 将报文转换为json格式
String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
// 将json格式的报文转换为PushPacket对象
PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
String ack;
// 如果符合条件,则调用ServiceInfoHolder进行接收报文处理,并返回应答报文
if (PUSH_PACKAGE_TYPE_DOM.equals(pushPacket.type) || PUSH_PACKAGE_TYPE_SERVICE.equals(pushPacket.type)) {
serviceInfoHolder.processServiceInfo(pushPacket.data);

// send ack to server
ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"\"}";
} else if (PUSH_PACKAGE_TYPE_DUMP.equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}
// 发送应答报文
udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
packet.getSocketAddress()));
} catch (Exception e) {
if (closed) {
return;
}
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}

PushReceiver#run方法主要处理了以下操作:

  • 第一、构建DatagramPacket用于接收报文数据;
  • 第二、通过DatagramSocket#receive方法阻塞等待报文的到来;
  • 第三、DatagramSocket#receive接收到报文之后,方法继续执行;
  • 第四、解析JSON格式的报文为PushPacket对象;
  • 第五、判断报文类型,调用ServiceInfoHolder#processServiceInfo处理接收到的报文信息,在该方法中会将PushPacket转化为ServiceInfo对象;
  • 第六、封装ACK信息(即应答报文信息);
  • 第七、通过DatagramSocket发送应答报文;

上面我们看到了Nacos客户端是如何基于UDP进行报文的监听和处理的,但并未找到客户端是如何将UDP信息上送给注册中心的。下面我们就来梳理一下,上送UDP信息的逻辑。

客户端上送UDP信息

在NamingHttpClientProxy中存储了UDP_PORT_PARAM,即UDP的端口参数信息。

UDP端口信息通过实例查询类接口进行传递,比如:查询实例列表、查询单个健康实例、查询所有实例、订阅接口、订阅的更新任务UpdateTask等接口。在这些方法中都调用了NamingClientProxy#queryInstancesOfService方法。

NamingHttpClientProxy中的queryInstancesOfService方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
arduino复制代码@Override
public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
boolean healthyOnly) throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, NamingUtils.getGroupedName(serviceName, groupName));
params.put(CLUSTERS_PARAM, clusters);
// 获取UDP端口
params.put(UDP_PORT_PARAM, String.valueOf(udpPort));
params.put(CLIENT_IP_PARAM, NetUtils.localIP());
params.put(HEALTHY_ONLY_PARAM, String.valueOf(healthyOnly));
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
if (StringUtils.isNotEmpty(result)) {
return JacksonUtils.toObj(result, ServiceInfo.class);
}
return new ServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), clusters);
}

但查看源码会发现,查询实例列表、查询单个健康实例、查询所有实例、订阅的更新任务UpdateTask中,UDP端口传递的参数值均为0。只有HTTP协议的订阅接口取值为PushReceiver中的UDP端口号。

1
2
3
4
arduino复制代码@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
return queryInstancesOfService(serviceName, groupName, clusters, pushReceiver.getUdpPort(), false);
}

在上面的代码中我们已经知道PushReceiver中有一个getPushReceiverUdpPort的方法:

1
2
3
typescript复制代码public static String getPushReceiverUdpPort() {
return System.getenv(PropertyKeyConst.PUSH_RECEIVER_UDP_PORT);
}

很明显,UDP的端口是通过环境变量设置的,对应的key为“push.receiver.udp.port”。

而在1.4.2版本中,HostReactor中的NamingProxy成员变量的queryList方法也会传递UDP端口:

1
2
3
4
5
6
7
8
9
10
11
java复制代码public void updateService(String serviceName, String clusters) throws NacosException {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
if (StringUtils.isNotEmpty(result)) {
processServiceJson(result);
}
} finally {
// ...
}
}

关于1.4.2版本中的实现,大家自行看源码即可,这里不再展开。

完成了客户端UDP基本信息的传递,再来看看服务器端是如何接收和存储这些信息的。

UDP服务存储

服务器端在获取实例列表的接口中,对UDP端口进行了处理。

1
2
3
4
5
6
7
8
9
10
11
12
less复制代码@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public Object list(HttpServletRequest request) throws Exception {
// ...
// 如果没有获得UDP端口信息,则默认端口为0
int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
// ...
// 客户端的IP、UDP端口封装到Subscriber对象中
Subscriber subscriber = new Subscriber(clientIP + ":" + udpPort, agent, app, clientIP, namespaceId, serviceName,
udpPort, clusters);
return getInstanceOperator().listInstance(namespaceId, serviceName, subscriber, clusters, healthyOnly);
}

在getInstanceOperator()方法中会获得当前采用的哪个协议,然后选择对应的处理类:

1
2
3
4
5
6
7
csharp复制代码/**
* 判断并返回采用V1版本或V2版本的操作服务
* @return V1:Jraft协议(服务器端);V2:gRpc协议(客户端)
*/
private InstanceOperator getInstanceOperator() {
return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;
}

这里具体的实现类为InstanceOperatorServiceImpl:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
scss复制代码@Override
public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster,
boolean healthOnly) throws Exception {
ClientInfo clientInfo = new ClientInfo(subscriber.getAgent());
String clientIP = subscriber.getIp();
ServiceInfo result = new ServiceInfo(serviceName, cluster);
Service service = serviceManager.getService(namespaceId, serviceName);
long cacheMillis = switchDomain.getDefaultCacheMillis();
// now try to enable the push
try {
// 处理支持UDP协议的客户端信息
if (subscriber.getPort() > 0 && pushService.canEnablePush(subscriber.getAgent())) {
subscriberServiceV1.addClient(namespaceId, serviceName, cluster, subscriber.getAgent(),
new InetSocketAddress(clientIP, subscriber.getPort()), pushDataSource, StringUtils.EMPTY,
StringUtils.EMPTY);
cacheMillis = switchDomain.getPushCacheMillis(serviceName);
}
} catch (Exception e) {
// ...
}
// ...
}

当UDP端口大于0,且agent参数定义的客户端支持UDP,则将对应的客户端信息封装到InetSocketAddress对象中,然后放入NamingSubscriberServiceV1Impl中(该类已经被废弃,看后续如何调整该方法实现)。

在NamingSubscriberServiceV1Impl中,会将对应的参数封装为PushClient,存放在Map当中。

1
2
3
4
5
6
7
typescript复制代码public void addClient(String namespaceId, String serviceName, String clusters, String agent,
InetSocketAddress socketAddr, DataSource dataSource, String tenant, String app) {

PushClient client = new PushClient(namespaceId, serviceName, clusters, agent, socketAddr, dataSource, tenant,
app);
addClient(client);
}

addClient方法会将PushClient信息存放到ConcurrentMap<String, ConcurrentMap<String, PushClient>>当中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ini复制代码private final ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap = new ConcurrentHashMap<>();

public void addClient(PushClient client) {
// client is stored by key 'serviceName' because notify event is driven by serviceName change
String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName());
ConcurrentMap<String, PushClient> clients = clientMap.get(serviceKey);
if (clients == null) {
clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<>(1024));
clients = clientMap.get(serviceKey);
}

PushClient oldClient = clients.get(client.toString());
if (oldClient != null) {
oldClient.refresh();
} else {
PushClient res = clients.putIfAbsent(client.toString(), client);
// ...
}
}

此时,UDP的IP、端口信息已经封装到PushClient当中,并存储在NamingSubscriberServiceV1Impl的成员变量当中。

注册中心的UDP通知

当服务端发现某个实例发生了变化,比如主动注销了,会发布一个ServiceChangeEvent事件,UdpPushService会监听到该事件,并进行业务处理。

在UdpPushService的onApplicationEvent方法中,会根据PushClient的具体情况进行移除或发送UDP通知。onApplicationEvent中核心逻辑代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
ini复制代码ConcurrentMap<String, PushClient> clients = subscriberServiceV1.getClientMap()
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}

Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
for (PushClient client : clients.values()) {
// 移除僵尸客户端
if (client.zombie()) {
Loggers.PUSH.debug("client is zombie: " + client);
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie: " + client);
continue;
}

AckEntry ackEntry;
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
}

// 封装AckEntry对象
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key, new org.javatuples.Pair<>(ackEntry.getOrigin().getData(), ackEntry.getData()));
}
}
// 通过UDP通知其他客户端
udpPush(ackEntry);
}

事件处理的核心逻辑是就是先判断PushClient的状态信息,如果已经是僵尸客户端,则移除。然后将发送UDP的报文信息和接收客户端的信息封装为AckEntry对象,然后调用udpPush方法,进行UDP消息的发送。

注册中心的UDP接收

在看客户端源码的时候,我们看到客户端不仅会接收UDP请求,而且还会进行应答。那么注册中心怎么接收应答呢?也在UdpPushService类中,该类内部的静态代码块初始化一个UDP的DatagramSocket,用来接收消息:

1
2
3
4
5
6
7
8
9
10
11
12
ini复制代码static {
try {
udpSocket = new DatagramSocket();
Receiver receiver = new Receiver();
Thread inThread = new Thread(receiver);
inThread.setDaemon(true);
inThread.setName("com.alibaba.nacos.naming.push.receiver");
inThread.start();
} catch (SocketException e) {
Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
}
}

Receiver是一个内部类,实现了Runnable接口,在其run方法中主要就是接收报文信息,然后进行报文消息的判断,根据判断结果,操作本地Map中数据。

UDP设计不足

文章最开始就写到,UDP的设计非常棒,即弥补了HTTP定时拉取的不足,又不至于太影响性能。但目前Nacos在UDP方面有一些不足,也可能是个人的吹毛求疵吧。

第一,文档中没有明确说明UDP的功能如何使用,这导致很多使用者在使用时并不知道UDP功能的存在,以及使用的限制条件。

第二,对云服务不友好。客户端的UDP端口可以自定义,但服务器端的UDP端口是随机获取到。在云服务中,即便是内网服务,UDP端口也是被防火墙限制的。如果服务端的UDP端口是随机获取(客户端默认也是),那么UDP的通信将直接被防火墙拦截掉,而用户根本看不到任何异常(UDP协议不关注客户端是否收到消息)。

至于这两点,说起来算是瑕不掩瑜,读完源码或读过我这篇文章的朋友大概已经知道怎么用了。后续可以给官方提一个Issue,看看是否可以改进。

小结

本文重点从三个方面讲解的Nacos基于UDP的服务实例变更通知:

第一,客户端监听UDP端口,当接收注册中心发来的服务实例变化,可以及时的更新本地的实例缓存;

第二,客户端通过订阅接口,将自身的UDP信息发送给注册中心,注册中心进行存储;

第三,注册中心中实例发生了变化,通过事件机制,将变更信息通过UDP协议发送给客户端。

经过本篇文章,想必你不仅了解了Nacos中UDP协议的通知机制。同时,也开拓了一个新的思路,即如何使用UDP,在什么场景下使用UDP,以及在云服务中使用UDP可能会存在的问题。如果这篇文章对你有帮助,关注或点赞都可以。

博主简介:《SpringBoot技术内幕》技术图书作者,酷爱钻研技术,写技术干货文章。

公众号:「程序新视界」,博主的公众号,欢迎关注~

技术交流:请联系博主微信号:zhuan2quan

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Zookeeper 源码入门

发表于 2021-09-06

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一. 前言

出于对集群选举流程的好奇 , 所以把 Zookeeper 源码拉下来跑了一下 , 这篇文档对这个过程做一个简单的记录.

要想看懂任何源码 , 第一步就是要跑起来 . 这一篇主要介绍 , 如果快速的跑源码 ,同时简单介绍其中的关键点 , 便于处理

二 . 源码的运行

2.1 主启动流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码// Step 1 : Git 拉取
https://github.com/apache/zookeeper.git

// Step 2 : 本地运行 (Intellij)
1. 找到 zookeeper-server 子模块
2. 找到 对应的启动类
3. 通过命令启动项目

// PS : 这里查看了 Zookeeper 运行包 (ZkServer.cmd)的内容 , 决定先采用相同的方式启动 >>>>
setlocal
call "%~dp0zkEnv.cmd"

set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
echo on
call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*

endlocal

2.2 其他启动类

进入源码可以看到很多其他的启动类 , 这里查阅了一下 API Doc , 大概了解了一下

image.png

2.3 源码的主要逻辑

源码的主要入口类为 QuorumPeerMain , 而其他的类会由 QuorumPeerMain 发起调用 (例如 : ) , 这里提供一个简单的调用流程图 >>>

结构图.jpg

二 . 启动流程

来简单看一下 QuorumPeerMain 启动时做了什么 :

3.1 QuorumPeerMain 简介

当使用该类的main()方法启动程序时,第一个参数被用作配置文件的路径 , 配置文件中可以包含如下信息 :

  • dataDir : ZooKeeper数据所在目录。
  • dataLogDir : ZooKeeper事务日志存放目录。
  • clientPort : 用于与客户端通信的端口。
  • tickTime : 一个滴答的持续时间,单位为毫秒。这是ZooKeeper中的基本时间单位。
  • initLimit : 跟踪者等待与leader初始同步的最大节拍数。
  • syncLimit : 跟踪者等待来自leader的消息(包括心跳)的最大节拍数。
  • server.id : 这是具有给定id的服务器将用于仲裁协议的主机:port[:port]。
1
2
3
4
5
6
java复制代码# 以下是我的配置
tickTime=2000
initLimit=10
syncLimit=5
dataDir=D:\\java\\workspace\\git\\zookeeper\\temp
clientPort=2181

3.2 IDEA 配置

VM Options

1
java复制代码"-Dzookeeper.root.logger=INFO,CONSOLE" -cp "D:\java\workspace\git\zookeeper\zookeeper\zookeeper-server\target\classes;D:\java\workspace\git\zookeeper\zookeeper\zookeeper-server\target\lib\*;D:\java\workspace\git\zookeeper\zookeeper\bin\..\*;D:\java\workspace\git\zookeeper\zookeeper\bin\..\lib\*;D:\java\workspace\git\zookeeper\zookeeper\bin\..\conf"

Program arguments

1
java复制代码D:\java\workspace\git\zookeeper\zoo.cfg

image.png

其中比较重要的就是 log 级别和你的源码路径 , 配置完成后一般项目就能正常跑起来了

三 . 请求与接收

请求和接收跑通了才是一切的基础 , Zk 的核心对象分别为 ClientCnxn 和 ServerCnxn

System-ServerCnxn.png

从 log 中不难发现 , 默认是走的 NIO

1
2
3
4
5
6
java复制代码[main:NIOServerCnxnFactory@89] - binding to port 0.0.0.0/0.0.0.0:2181
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@222] - Accepted socket connection from /127.0.0.1:53152
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@903] - Connection request from old client /127.0.0.1:53152; will be dropped if server is in r-o mode
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@942] - Client attempting to renew session 0x100001d6f1f0009 at /127.0.0.1:53152
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@687] - Invalid session 0x100001d6f1f0009 for client /127.0.0.1:53152, probably expired
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1056] - Closed socket connection for client /127.0.0.1:53152 which had sessionid 0x100001d6f1f0009

3.1 客户端发起请求

以 SetData 为例 , 主要经过如下流程 :

  • C- ZooKeeper # 相关逻辑
  • C- ClientCnxn # submitRequest : 构建 Packet , 同时加入 Queue 中
  • C- ClientCnxnSocketNIO : 发起处理请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码public Stat setData(final String path, byte[] data, int version) throws KeeperException, InterruptedException {

final String clientPath = path;
PathUtils.validatePath(clientPath);

// serverPath -> /testWatch
final String serverPath = prependChroot(clientPath);

RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setData);

// 构建 request
SetDataRequest request = new SetDataRequest();
request.setPath(serverPath);
request.setData(data);
request.setVersion(version);

SetDataResponse response = new SetDataResponse();

// 通过 ClientCnxn 发起请求
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
}
return response.getStat();
}

底层 还是用的 NIO 调用 , 后面详细看看

3.2 服务端处理请求

Zookeeper 的核心管理类为 ZooKeeperServer , 其中包括以下方法 :

  • processConnectRequest : 处理连接请求

核心一 : processConnectRequest

创建连接是一切的起点 , 主要通过以下流程调用到该类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码C- ZooKeeperServer : 仅保留核心代码
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));

// 参数准备
ConnectRequest connReq = new ConnectRequest();
connReq.deserialize(bia, "connect");
boolean readOnly = false;

readOnly = bia.readBool("readOnly");
cnxn.isOldClient = false;

// 超时时间 : 4000
int sessionTimeout = connReq.getTimeOut();
// 连接密码 , 不存在则为一个空数组
byte passwd[] = connReq.getPasswd();

// 最小最大会话超时时间
int minSessionTimeout = getMinSessionTimeout();
int maxSessionTimeout = getMaxSessionTimeout();

// 设置超时时间
cnxn.setSessionTimeout(sessionTimeout);
cnxn.disableRecv();

// 如果session 存在
long sessionId = connReq.getSessionId();
if (sessionId != 0) {

long clientSessionId = connReq.getSessionId();
// 先关闭再重新打开会话
serverCnxnFactory.closeSession(sessionId);
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
} else {
// 创建一个会话
createSession(cnxn, passwd, sessionTimeout);
}
}

核心二 : Request 请求的处理
这里有一个稍微有点绕的多线程处理 , 后面再详细介绍一下 ,先看下主要的调用流程

  • C- PrepRequestProcessor # pRequest : 由配置类发起的first 请求处理器
  • C- SyncRequestProcessor # processRequest : 将请求加入 Queue 中(核心)
  • C- SyncRequestProcessor # run : 其中会一直循环处理 Request
  • C- FinalRequestProcessor # processRequest : 发起 Process 调用

PS : 其中循环的处理很不错 , 值得深入学习一下

四 . Zookeeper 存储的数据结构

另外一大重点就是了解一下数据是以什么样的结构保存到Zookeeper 中的 , Zookeeper 中存在以下几个核心的数据存储对象 :

  • ZKDatabase : 数据中心
  • DataTree : 数据数 , 核心数据对象

4.1 数据的获取

以数据的获取为例 , 经历了以下流程 :

  • C- SyncRequestProcessor # run : 注意 , 这个是一个不断从 queue 中获取数据的过程
  • C- FinalRequestProcessor # processRequest
  • C- ZKDatabase # getNode
  • C- DataTree # getNode
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码public class DataTree {
// 核心存储对象
private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();

// watches 对象集合
private final WatchManager dataWatches = new WatchManager();
private final WatchManager childWatches = new WatchManager();

/** the root of zookeeper tree */
private static final String rootZookeeper = "/";

/** the zookeeper nodes that acts as the management and status node **/
private static final String procZookeeper = Quotas.procZookeeper;

/** this will be the string thats stored as a child of root */
private static final String procChildZookeeper = procZookeeper.substring(1);

/**
* the zookeeper quota node that acts as the quota management node for
* zookeeper
*/
private static final String quotaZookeeper = Quotas.quotaZookeeper;

/** this will be the string thats stored as a child of /zookeeper */
private static final String quotaChildZookeeper = quotaZookeeper
.substring(procZookeeper.length() + 1);

/**
* the path trie that keeps track fo the quota nodes in this datatree
*/
private final PathTrie pTrie = new PathTrie();

}

总结

内容不多 , 但是比较重要 , 这几个环节弄清楚后 , 后面围着整个环节抽丝剥茧就行了

后续文章已经整理得差不多了 , 稍微修改一下后续发出来 , 文章写的比较早 , 版本比较旧 ,但是核心是差不多得

附录 :

Zookeeper 项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码//─zookeeper-assembly                
//─zookeeper-client
│ └─zookeeper-client-c
//─zookeeper-compatibility-tests
│ └─zookeeper-compatibility-tests-curator
//─zookeeper-contrib
│ ├─zookeeper-contrib-fatjar
│ ├─zookeeper-contrib-huebrowser
│ ├─zookeeper-contrib-loggraph
│ ├─zookeeper-contrib-monitoring
│ ├─zookeeper-contrib-rest
│ ├─zookeeper-contrib-zkfuse
│ ├─zookeeper-contrib-zkperl
│ ├─zookeeper-contrib-zkpython
│ ├─zookeeper-contrib-zktreeutil
│ └─zookeeper-contrib-zooinspector
//─zookeeper-docs
//─zookeeper-it
//─zookeeper-jute
//─zookeeper-metrics-providers
│ └─zookeeper-prometheus-metrics
//─zookeeper-recipes
│ ├─zookeeper-recipes-election
│ ├─zookeeper-recipes-lock
│ └─zookeeper-recipes-queue
//─zookeeper-server

参考文档

www.cnblogs.com/jing99/p/12…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

从 ClickHouse 到自研 ByteHouse:实时数

发表于 2021-09-06

近日,字节跳动旗下数字服务与智能科技品牌火山引擎正式对外发布「ByteHouse」,作为 ClickHouse 企业版,解决开源技术上手难 & 试错成本高的痛点,同时提供商业产品和企业级技术支持服务。

作为国内规模最大的 ClickHouse 用户,目前字节跳动内部的 ClickHouse 节点总数超过 1 万 5 千个,管理总数据量超过 600PB,最大的集群规模在 2400 余个节点。综合来说,字节跳动广泛的业务增长分析很多都建立在 ClickHouse 为基础的查询引擎上。在打造 ClickHouse 企业版「ByteHouse」的路程中,我们经过了多年的探索与沉淀,今天和大家分享字节跳动过去使用 ClickHouse 的两个典型应用与优化案例。

推荐系统实时指标

在字节跳动内部“AB实验”应用非常广泛,特别是在验证推荐算法和功能优化的效果方面。最初,公司内部专门的 AB 实验平台已经提供了 T+1 的离线实验指标,而推荐系统需要更快地观察算法模型、或者某个功能的上线效果,因此需要一份能够实时反馈的数据作为补充:

  • 能同时查询聚合指标和明细数据;
  • 能支持多达几百列的维度和指标,且场景灵活变化,会不断增加;
  • 可以高效地按ID过滤数据;
  • 需要支持一些机器学习和统计相关的指标计算(比如 AUC)。

技术选型

字节内部有很多分析引擎,ClickHouse、 Druid、 Elastic Search、 Kylin等,通过分析用户需求后选择了ClickHouse:

  • 能更快地观察算法模型,没有预计算所导致的高数据时延;
  • ClickHouse 既适合聚合查询,配合跳数索引后,对于明细点查性能也不错;
  • 字节自研的ClickHouse 支持 Map 类型,支持动态变更的维度和指标,更加符合需求;
  • BitSet 的过滤 Bloom Filter 是比较好的解决方案,ClickHouse 原生就有 BF 的支持;
  • 字节自研的 ClickHouse 引擎已经通过UDF 实现了相关的能力,而且有比较好的扩展性。

每个产品都有自己合适的场景,但是对于当前场景的需求评估下,ClickHouse 更加合适

方案评估

方案对比

确认技术选型后,在如何实现部分,也有两种方式:

优点 缺点
方案1:Kafka + Flink + ClickHouse 各个组件职责划分清楚、潜在扩展性强 需要额外资源、写入频次不好控制、难以处理节点故障、维护成本较高
方案2:ClickHouse 内置的 Kafka Engine 内置 Kafka 消费线程直接消费 Topic,架构简单少了一次数据传输 处理逻辑在引擎侧,略微增加了引擎维护负担。 缺少了Flink 的 ETL 的能力

最终 方案 & 效果

由于外部写入并不可控和技术栈上的原因,我们最终采用了 Kafka Engine 的方案,也就是 ClickHouse 内置消费者去消费 Kafka。整体的架构如图:

  • 数据由推荐系统直接产生,写入 Kafka——为了弥补缺少 Flink 的 ETL 能力,推荐系统做了相应配合,修改 Kafka Topic 的消息格式直接适配 ClickHouse 表的 schema;
  • 敏捷 BI 平台也适配了一下实时的场景,可以支持交互式的查询分析;
  • 如果实时数据有问题,也可以从 Hive 把数据导入至 ClickHouse 中,除此之外,业务方还会将 1% 抽样的离线数据导入过来做一些简单验证,1% 抽样的数据一般会保存更久的时间。

除了技术选型和实现方案,我们在支持推荐系统的实时数据时遇到过不少问题,其中最大的问题随着推荐系统产生的数据量越来越大,单个节点的消费能力也要求越来越大,主要碰到如下问题:

问题一:写入吞吐量不足

挑战: 在有大量辅助跳数索引的场景下,索引的构建严重影响写入吞吐量。

解决方案:异步构建索引。

社区版本的实现里的具体逻辑如下:

  • 解析输入数据生成内存中数据结构的 Block;
  • 然后切分 Block,并按照表的 schema 构建 columns 数据文件;
  • 最后扫描根据 skip index schema 去构建 skip index 文件。三个步骤完成之后才会算 Part 文件构建完毕。

在需要保证构建完 columns 数据之后用户即可正常查询的前提下,ByteHouse 同步完成前面两步,第三步把构建好的 part 放入到一个异步索引构建队列中,由后台线程构建索引文件。

效果: 在改成异步后,整体的写入吞吐量大概能提升 20%。

问题二: Kafka 消费能力不足

挑战:社区版本的 Kafka 表,内部默认只会有一个消费者,这样会比较浪费资源并且性能达不到性能要求

尝试优化过程

  • 尝试通过增大消费者的个数来增大消费能力,但社区的实现是由一个线程去管理多个的消费者,多个消费者消费到的数据最后仅能由一个输出线程完成数据构建,所以这里没能完全利用上多线程和磁盘的潜力;
  • 尝试通过创建多张 Kafka Table 和 Materialized View 写入同一张表,但是对于运维会比较麻烦。

解决方案:支持多线程消费。

前面提到的优化手段都不尽如人意,最后决定改造 Kafka Engine 在其内部支持多个消费线程,简单来说就是每一个线程它持有一个消费者,然后每一个消费者负责各自的数据解析、数据写入,这样的话就相当于一张表内部同时执行多个的 INSERT Query。

效果:通过多线程实现多消费者同时消费写入表,写入性能达到接近于线性的提升。

问题三:出现故障无法保证数据完整性

挑战:在主备模式下,如果数据同时两个节点都写入,一旦一个节点出现故障,新启的节点恢复过程中容易出现各种问题,包括性能下降,无法保证分片,最严重可能导致查询结果不正确

解决方案: 确保主备模式下只会写入一个主备其中一个节点。

为了避免两个节点消费这个数据,改进版的 Kafka Engine 参考了 ReplicatedMergeTree 基于 ZooKeeper 的选主逻辑。对于每一对副本的一对消费者,会尝试在 ZooKeeper 上完成选主逻辑,确保选举成为主节点的消费者才能消费,另一个节点则会处于一个待机状态。

有了这样的单节点消费机制, 系统会检测 ReplicatedMergeTree 表数据是否完整,如果数据不完整则代表不能正常服务,此时消费者会主动出让 Leader,让副本节点上成为消费者,也就是新写入的数据并不会写入到缺少数据的节点,对于查询而言,由于查询路由机制的原因也不会把 Query 路由到缺少数据的节点上,所以一直能查询到最新的数据。

效果:改进 Kafka engine 确保主备模式下只有一个节点能消费数据,即使出现节点故障在新节点恢复过程中同样保障了解决了数据完整性的问题。

广告投放实时数据

第二个典型案例关于广告的投放数据,一般是运营人员需要查看广告投放的实时效果。由于业务的特点,当天产生的数据往往会涉及到多天的数据。这套系统原来基于 Druid 实现的,Druid 在这个场景会有一些难点:

Druid ClickHouse
难点1:实时数据由于涉及到较多的时间分区(历史 3 个月) 产生很多 segment,历史数据重刷后还需要额外时间合并,无法保证数据实时消费 单批次生成的 parts 太多导致写入性能有一定下降, 通过调大 block_size 可以提升吞吐量。同时引入 Buffer Engine
难点2:业务数据的维度也非常多 维度太多,数据粒度很细,Druid 的预聚合意义不大 直接消费明细数据,适合数据粒度很细的场景

选择了 ClickHouse 之后能解决 Druid 不足的地方,但还是有部分问题需要解决

问题一:Buffer Engine 无法和 ReplicatedMergeTree 一起使用

问题 & 挑战: 社区提供了 Buffer Engine 为了解决单次写入生成过多 parts 的问题, 但是不太能配合 ReplicatedMergeTree 一起工作, 写入不同 Replica 的 Buffer 仅缓存了各自节点上新写入的数据, 导致查询会出现不一致的情况。

解决方案:

改进了 Buffer Engine 做了如下的调整和优化:

  • 我们选择将 Kafka/Buffer/MergeTree 三张表结合起来,提供的接口更加易用;
  • 把 Buffer 内置到 Kafka engine 内部, 作为 Kafka engine 的选项可以开启/关闭,使用更方便;
  • Buffer table 内部类似 pipeline 模式处理多个 Block;
  • 支持了 ReplicatedMergeTree 情况下的查询。

首先确保一对副本仅有一个节点在消费,所以一对副本的两个 Buffer 表,只有一个节点有数据。 如果查询发送到了没有消费的副本,会额外构建一个特殊的查询逻辑,从另一个副本的 Buffer 表里读取数据

效果:增强 Buffer Engine,解决了Buffer Engine 和 ReplicatedMergeTree 同时使用下查询一致性的问题。

问题二:出现宕机后可能会出现数据丢失后者重复消费的情况

挑战:ClickHouse 缺少事务支持。 一批次写入只写入部分 part 后出现宕机,因为没有事务保障重启后可能出现丢失或者重复消费的情况。

解决方案:

参考了 Druid 的 KIS 方案自己管理 Kafka Offset,实现单批次消费/写入的原子语义:实现上选择将 Offset 和 Parts 数据绑定在一起,增强了消费的稳定性。 每次消费时,会默认创建一个事务,由事务负责把 Part 数据和 Offset 一同写入磁盘中,如果出现失败,事务会一起回滚 Offset 和写入的 part 然后重新消费。

效果: 确保了每次插入数据的原子性,增强了数据消费的稳定性。

小结

实时数据分析是 ClickHouse 的优势场景,结合字节跳动实时数据场景的特点,我们对 ClickHouse 进行了优化和改造,并将这些能力沉淀到了 ByteHouse 上。ByteHouse 基于自研技术优势和超大规模的使用经验,为企业大数据团队带来新的选择和支持,以应对复杂多变的业务需求,高速增长的数据场景。未来,ByteHouse将不断以字节和外部最佳实践输出行业用户,帮助企业更好地构建交互式大数据分析平台,并更广泛的与 ClickHouse 研发者社群共享经验,共同推动 ClickHouse 社区的发展。

联系我们(中文产品官网):bytehouse.cn/product/ent…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

快速搭建一个网关服务,动态路由、鉴权看完就会(含流程图) 前

发表于 2021-09-06

最近发现网易号有盗掘金文章的,xdm有空可以关注一下这个问题,希望帮助到大家同时能够保障自己权益。

前言

本文记录一下我是如何使用Gateway搭建网关服务及实现动态路由的,帮助大家学习如何快速搭建一个网关服务,了解路由相关配置,鉴权的流程及业务处理,有兴趣的一定看到最后,非常适合没接触过网关服务的同学当作入门教程。

搭建服务

框架

  • SpringBoot 2.1
1
2
3
4
5
xml复制代码<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
</parent>
  • Spring-cloud-gateway-core
1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gateway-core</artifactId>
</dependency>
  • common-lang3
1
2
3
4
xml复制代码<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

路由配置

网关作为请求统一入口,路由就相当于是每个业务系统的入口,通过路由规则则可以匹配到对应微服务的入口,将请求命中到对应的业务系统中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
yaml复制代码server:
port: 8080

spring:
cloud:
gateway:
enabled: true
routes:
- id: demo-server
uri: http://localhost:8081
predicates:
- Path=/demo-server/**
filters:
- StripPrefix= 1

routes

配置项 描述
id 路由唯一id,使用服务名称即可
uri 路由服务的访问地址
predicates 路由断言
filters 过滤规则

解读配置

  • 现在有一个服务demo-server部署在本机,地址和端口为127.0.0.1:8081,所以路由配置uri为http://localhost:8081
  • 使用网关服务路由到此服务,predicates -Path=/demo-server/**,网关服务的端口为8080,启动网关服务,访问localhost:8080/demo-server,路由断言就会将请求路由到demo-server
  • 直接访问demo-server的接口localhost:8081/api/test,通过网关的访问地址则为localhost:8080/demo-server/api/test,predicates配置将请求断言到此路由,filters-StripPrefix=1代表将地址中/后的第一个截取,所以demo-server就截取掉了

使用gateway通过配置文件即可完成路由的配置,非常方便,我们只要充分的了解配置项的含义及规则就可以了;但是这些配置如果要修改则需要重启服务,重启网关服务会导致整个系统不可用,这一点是无法接受的,下面介绍如何通过Nacos实现动态路由

动态路由

使用nacos结合gateway-server实现动态路由,我们需要先部署一个nacos服务,可以使用docker部署或下载源码在本地启动,具体操作可以参考官方文档即可

Nacos配置

image.png

groupId: 使用网关服务名称即可

dataId: routes

配置格式: json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
bash复制代码[{
"id": "xxx-server",
"order": 1, #优先级
"predicates": [{ #路由断言
"args": {
"pattern": "/xxx-server/**"
},
"name": "Path"
}],
"filters":[{ #过滤规则
"args": {
"parts": 0 #k8s服务内部访问容器为http://xxx-server/xxx-server的话,配置0即可
},
"name": "StripPrefix" #截取的开始索引
}],
"uri": "http://localhost:8080/xxx-server" #目标地址
}]

json格式配置项与yaml中对应,需要了解配置在json中的写法

比对一下json配置与yaml配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
json复制代码{
"id":"demo-server",
"predicates":[
{
"args":{
"pattern":"/demo-server/**"
},
"name":"Path"
}
],
"filters":[
{
"args":{
"parts":1
},
"name":"StripPrefix"
}
],
"uri":"http://localhost:8081"
}
1
2
3
4
5
6
7
8
9
10
11
yaml复制代码spring:
cloud:
gateway:
enabled: true
routes:
- id: demo-server
uri: http://localhost:8081
predicates:
- Path=/demo-server/**
filters:
- StripPrefix= 1

代码实现

Nacos实现动态路由的方式核心就是通过Nacos配置监听,配置发生改变后执行网关相关api创建路由

image.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
typescript复制代码@Component
public class NacosDynamicRouteService implements ApplicationEventPublisherAware {

private static final Logger LOGGER = LoggerFactory.getLogger(NacosDynamicRouteService.class);

@Autowired
private RouteDefinitionWriter routeDefinitionWriter;

private ApplicationEventPublisher applicationEventPublisher;

/** 路由id */
private static List<String> routeIds = Lists.newArrayList();

/**
* 监听nacos路由配置,动态改变路由
* @param configInfo
*/
@NacosConfigListener(dataId = "routes", groupId = "gateway-server")
public void routeConfigListener(String configInfo) {
clearRoute();
try {
List<RouteDefinition> gatewayRouteDefinitions = JSON.parseArray(configInfo, RouteDefinition.class);
for (RouteDefinition routeDefinition : gatewayRouteDefinitions) {
addRoute(routeDefinition);
}
publish();
LOGGER.info("Dynamic Routing Publish Success");
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}

}


/**
* 清空路由
*/
private void clearRoute() {
for (String id : routeIds) {
routeDefinitionWriter.delete(Mono.just(id)).subscribe();
}
routeIds.clear();
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}

/**
* 添加路由
*
* @param definition
*/
private void addRoute(RouteDefinition definition) {
try {
routeDefinitionWriter.save(Mono.just(definition)).subscribe();
routeIds.add(definition.getId());
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}

/**
* 发布路由、使路由生效
*/
private void publish() {
this.applicationEventPublisher.publishEvent(new RefreshRoutesEvent(this.routeDefinitionWriter));
}
}

过滤器

gateway提供GlobalFilter及Ordered两个接口用来定义过滤器,我们自定义过滤器只需要实现这个两个接口即可

  • GlobalFilter filter() 实现过滤器业务
  • Ordered getOrder() 定义过滤器执行顺序

通常一个网关服务的过滤主要包含 鉴权(是否登录、是否黑名单、是否免登录接口…) 限流(ip限流等等)功能,我们今天简单介绍鉴权过滤器的流程实现

鉴权过滤器

需要实现鉴权过滤器,我们先得了解登录及鉴权流程,如下图所示

image.png
由图可知,我们鉴权过滤核心就是验证token是否有效,所以我们网关服务需要与业务系统在同一个redis库,先给网关添加redis依赖及配置

1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
1
2
3
4
5
6
yaml复制代码spring:
redis:
host: redis-server
port: 6379
password:
database: 0

代码实现

  • 1.定义过滤器AuthFilter
  • 2.获取请求对象 从请求头或参数或cookie中获取token(支持多种方式传token对于客户端更加友好,比如部分web下载请求会新建一个页面,在请求头中传token处理起来比较麻烦)
  • 3.没有token,返回401
  • 4.有token,查询redis是否有效
  • 5.无效则返回401,有效则完成验证放行
  • 6.重置token过期时间、添加内部请求头信息方便业务系统权限处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
java复制代码@Component
public class AuthFilter implements GlobalFilter, Ordered {

@Autowired
private RedisTemplate<String, String> redisTemplate;

private static final String TOKEN_HEADER_KEY = "auth_token";

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 1.获取请求对象
ServerHttpRequest request = exchange.getRequest();
// 2.获取token
String token = getToken(request);
ServerHttpResponse response = exchange.getResponse();
if (StringUtils.isBlank(token)) {
// 3.token为空 返回401
response.setStatusCode(HttpStatus.UNAUTHORIZED);
return response.setComplete();
}
// 4.验证token是否有效
String userId = getUserIdByToken(token);
if (StringUtils.isBlank(userId)) {
// 5.token无效 返回401
response.setStatusCode(HttpStatus.UNAUTHORIZED);
return response.setComplete();
}
// token有效,后续业务处理
// 从写请求头,方便业务系统从请求头获取用户id进行权限相关处理
ServerHttpRequest.Builder builder = exchange.getRequest().mutate();
request = builder.header("user_id", userId).build();
// 延长缓存过期时间-token缓存用户如果一直在操作就会一直重置过期
// 这样避免用户操作过程中突然过期影响业务操作及体验,只有用户操作间隔时间大于缓存过期时间才会过期
resetTokenExpirationTime(token, userId);
// 完成验证
return chain.filter(exchange);
}


@Override
public int getOrder() {
// 优先级 越小越优先
return 0;
}

/**
* 从redis中获取用户id
* 在登录操作时候 登陆成功会生成一个token, redis得key为auth_token:token 值为用户id
*
* @param token
* @return
*/
private String getUserIdByToken(String token) {
String redisKey = String.join(":", "auth_token", token);
return redisTemplate.opsForValue().get(redisKey);
}

/**
* 重置token过期时间
*
* @param token
* @param userId
*/
private void resetTokenExpirationTime(String token, String userId) {
String redisKey = String.join(":", "auth_token", token);
redisTemplate.opsForValue().set(redisKey, userId, 2, TimeUnit.HOURS);
}


/**
* 获取token
*
* @param request
* @return
*/
private static String getToken(ServerHttpRequest request) {
HttpHeaders headers = request.getHeaders();
// 从请求头获取token
String token = headers.getFirst(TOKEN_HEADER_KEY);
if (StringUtils.isBlank(token)) {
// 请求头无token则从url获取token
token = request.getQueryParams().getFirst(TOKEN_HEADER_KEY);
}
if (StringUtils.isBlank(token)) {
// 请求头和url都没有token则从cookies获取
HttpCookie cookie = request.getCookies().getFirst(TOKEN_HEADER_KEY);
if (cookie != null) {
token = cookie.getValue();
}
}
return token;
}
}

总结

Gateway通过配置项可以实现路由功能,整合Nacos及配置监听可以实现动态路由,实现GlobalFilter, Ordered两个接口可以快速实现一个过滤器,文中也详细的介绍了登录后的请求鉴权流程,如果有不清楚地方可以评论区见咯。

其它文章

没事花几分钟学习一下mongo

点击了解低代码平台

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

ELK专题:Day1——最小化搭建ELK集群,采集Nginx

发表于 2021-09-06

1. 前言

ELK作为一套广泛使用的日志检索平台已经发展了好些年,并且是业界的主流解决方案之一。但是因为ELK集群里面涉及到多个不同的服务,对新手来说上手比较困难,所以我写了这个教程,希望能通过这个教程给新手完成一个从0到1的突破,用最简的配置和最小化的部署方案先搭建出一套能用的环境。同时在这个过程中也可以让新手逐渐对ELK技术栈建立起一个初步的认识,在迷雾中找到一个方向。

2. 实验环境简介

2.1 架构图

diragram.png

2.2 集群版本信息

  • 操作系统版本:Ubuntu 20.04.2 LTS (GNU/Linux 5.4.0-80-generic x86_64)
  • Elasticsearch版本:7.13.4
  • Logstash版本:7.14
  • Kibana版本:7.14
  • Filebeat版本:7.14

2.3 方案概述

  1. Nginx服务器产生的访问日志会通过Filebeat传送到Logstash。
  2. 在Logstash的配置文件中定义对采集到的日志的操作,最后归档到ES。
  3. 用户通过浏览器进入Kibana,读取ES里面的数据。
  4. 所有服务器网络互通,位于同一个子网。

2.4 实验目标

  1. Nginx、Filebeat、Logstash、ES、Kibana各个服务都正常运行。
  2. 各个服务都能按照最基本的需求完成既定任务。
  3. 在Kibana可以正常访问和检索日志。

3. 环境搭建步骤

3.1 Nginx 部署

本测试环境使用本Hexo网站的测试环境,Nginx服务的搭建比较简单,暂不作详细描述。

3.2 ElasticSearch 服务搭建

根据各个服务之间的依赖关系,我们首先安装不需要依赖其他服务的ES。这也是很多时候在搭建集群时候的习惯做法,先从数据库开始。

在本文中,我们使用Ubuntu自带的apt包管理工具安装ES。

3.2.1 安装Elasticsearch

官方文档:www.elastic.co/guide/en/el…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bash复制代码# 方法1,使用APT在线安装

# 下载安装Elasticsearch仓库的PGP Key
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -

sudo apt-get install apt-transport-https

# 把elasticsearch的仓库地址信息加入到apt工具的配置文件中
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-7.x.list

# 安装elasticsearch,在本文中,安装的是7.13.4版本
sudo apt-get update && sudo apt-get install elasticsearch


# 方法2,使用deb包安装

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.13.4-amd64.deb
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.13.4-amd64.deb.sha512

# 校验文件完整性
shasum -a 512 -c elasticsearch-7.13.4-amd64.deb.sha512

#安装
sudo dpkg -i elasticsearch-7.13.4-amd64.deb
2.2.2 配置Elasticsearch

使用安装包安装完成后,默认的配置文件存放在/etc/elasticsearch,在本例中,按照最简化部署,我们只调整/etc/elasticsearch/elasticsearch.yml,配置范例如下:

1
2
3
4
5
6
7
yaml复制代码cluster.name: rc-application
node.name: node-rc-1
path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 192.168.0.212
http.port: 9200
cluster.initial_master_nodes: ["node-rc-1"]
3.2.3 启动Elasticsearch

在本例的Ubuntu发行版中,安装完成之后会自动创建systemd启动项目,相关操作如下:

1
2
3
4
5
6
7
8
bash复制代码# 设置开机启动
sudo /bin/systemctl daemon-reload
sudo /bin/systemctl enable elasticsearch.service

# start/stop/restart 操作和其他的系统服务一样操作
sudo systemctl start elasticsearch.service
sudo systemctl stop elasticsearch.service
sudo systemctl restart elasticsearch.service
2.2.4 验证Elasticsearch是否正常
  • 查看服务状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
bash复制代码root@ES:~# systemctl status elasticsearch.service 
● elasticsearch.service - Elasticsearch
Loaded: loaded (/lib/systemd/system/elasticsearch.service; disabled; vendor preset: enabled)
Active: active (running) since Tue 2021-08-03 08:07:57 UTC; 25min ago
Docs: https://www.elastic.co
Main PID: 10100 (java)
Tasks: 61 (limit: 9448)
Memory: 4.2G
CGroup: /system.slice/elasticsearch.service
├─10100 /usr/share/elasticsearch/jdk/bin/java -Xshare:auto -Des.networkaddress.cache.ttl=60 -Des.ne>
└─10303 /usr/share/elasticsearch/modules/x-pack-ml/platform/linux-x86_64/bin/controller

Aug 03 08:07:25 ES systemd[1]: Starting Elasticsearch...
Aug 03 08:07:57 ES systemd[1]: Started Elasticsearch.
  • 查看端口占用,ES默认使用9200端口
1
2
bash复制代码root@ES:~# netstat -tunpl | grep 9200
tcp6 0 0 192.168.0.212:9200 :::* LISTEN 10100/java
  • 请求ES服务,直接通过curl命令请求ES服务的根目录,可以查看ES状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bash复制代码root@ES:~# curl http://192.168.0.212:9200
{
"name" : "node-rc-1",
"cluster_name" : "rc-application",
"cluster_uuid" : "y_q0vv9vQ3K7ukSdl0fO7g",
"version" : {
"number" : "7.13.4",
"build_flavor" : "default",
"build_type" : "deb",
"build_hash" : "c5f60e894ca0c61cdbae4f5a686d9f08bcefc942",
"build_date" : "2021-07-14T18:33:36.673943207Z",
"build_snapshot" : false,
"lucene_version" : "8.8.2",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}

3.3 安装Kibana

和Elasticsearch类似,我们继续使用官方提供的软件包进行安装。

3.3.1 安装过程

官方文档:www.elastic.co/guide/en/ki…

1
2
3
4
bash复制代码wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
sudo apt-get install apt-transport-https
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list
sudo apt-get update && sudo apt-get install kibana
3.3.2 Kibana配置文件/etc/kibana/kibana.yml

配置示例如下:

1
2
3
4
5
6
yaml复制代码server.port: 5601
server.host: "192.168.0.213"
server.name: "my-kibana"
elasticsearch.hosts: "http://192.168.0.212:9200" # 上文中的elasticsearch地址
logging.dest: /var/log/kibana/kibana.log
logging.verbose: false
3.3.3 启动kibana
1
2
3
4
5
6
7
8
9
10
11
12
13
14
bash复制代码root@Kibana:/etc/kibana# systemctl start kibana.service 
root@Kibana:/etc/kibana# systemctl status kibana.service
● kibana.service - Kibana
Loaded: loaded (/etc/systemd/system/kibana.service; disabled; vendor preset: enabled)
Active: active (running) since Tue 2021-08-03 08:49:27 UTC; 3s ago
Docs: https://www.elastic.co
Main PID: 9978 (node)
Tasks: 18 (limit: 9448)
Memory: 147.8M
CGroup: /system.slice/kibana.service
├─9978 /usr/share/kibana/bin/../node/bin/node /usr/share/kibana/bin/../src/cli/dist --logging.dest=>
└─9996 /usr/share/kibana/node/bin/node --preserve-symlinks-main --preserve-symlinks /usr/share/kiba>

Aug 03 08:49:27 Kibana systemd[1]: Started Kibana.
3.3.4 验证kibana

使用浏览器,进入kibana地址http://192.168.0.213:5601,如果页面可正常打开,则服务正常。

kibana.png

3.4 Logstash服务搭建

在ELK集群的调试中,我认为Logstash是最难的一个了,我们一步一步来。

3.4.1 安装Logstash

根据官方文档指引,使用apt安装

官方文档:www.elastic.co/guide/en/lo…

1
2
3
4
bash复制代码wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
sudo apt-get install apt-transport-https
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list
sudo apt-get update && sudo apt-get install logstash
3.4.2 配置Logstash

官方文档:www.elastic.co/guide/en/lo…

3.4.2.1 Logstash服务配置 /etc/logstash/logstash.yml

以下是配置范例:

1
2
3
4
5
6
7
8
9
10
yaml复制代码path.data: /data/logstash	# 需要提前确认路径存在,并且检查权限,允许logstash用户读写
pipeline.workers: 2
pipeline.batch.size: 125
path.config: /etc/logstash/conf.d/*.conf # 这是logstash的默认配置,在这里显式配置
path.settings: /etc/logstash # 这是logstash的默认配置,在这里显式配置
config.test_and_exit: false
http.host: 192.168.0.211
http.port: 9600-9700
log.level: debug
path.logs: /var/log/logstash
3.4.2.2 Logstash管道配置 /etc/logstash/conf.d/test.conf

我们先使用简单的配置,测试Logstash能否正常把内容推送到ES。

1
2
3
4
5
6
json复制代码input { stdin { } }

output {
elasticsearch { hosts => ["192.168.0.212:9200"] }
stdout { codec => rubydebug }
}
3.4.3 启动并测试Logstash
  1. 用命令行方式启动logstash
1
bash复制代码/usr/share/logstash/bin/logstash  -f /etc/logstash/conf.d/test.conf
  1. 通过stdin输入,观察logstash。如输入:“this is a test”,会有如下输出:
1
2
3
4
5
6
json复制代码{
"@timestamp" => 2021-08-03T10:33:10.584Z,
"@version" => "1",
"message" => "this is a test",
"host" => "Logstash"
}
  1. 进入ES查看内容,可以发现ES已把测试信息入库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
bash复制代码# 查看es索引
curl -s "192.168.0.212:9200/_cat/indices?v"
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open logstash-2021.08.03-000001 YIqdbC81Tq2zIulxfFUI4w 1 1 3 0 13.4kb 13.4kb
green open .apm-custom-link NnAWZ3THRWiHqJf2_B830Q 1 0 0 0 208b 208b
green open .apm-agent-configuration YOjjGNX9SaacWHnJshD6Sw 1 0 0 0 208b 208b
green open .kibana-event-log-7.13.4-000001 _UADZI-GSBmRc9inJd1eJg 1 0 1 0 5.6kb 5.6kb
green open .kibana_7.13.4_001 LXjP8PPnRrSDeO8-1APE7A 1 0 18 0 2.1mb 2.1mb
green open .kibana_task_manager_7.13.4_001 yotlqk02SH6P6xISCZZw-g 1 0 10 2062 270.3kb 270.3kb

# 发现有一个新索引logstash-2021.08.03-000001,查看新索引的内容
curl -XGET '192.168.0.212:9200/logstash-2021.08.03-000001/_doc/_search/?pretty'
{
"took" : 5,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "logstash-2021.08.03-000001",
"_type" : "_doc",
"_id" : "5_yTC3sB-S8uwXpkCUFc",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2021-08-03T10:33:10.584Z",
"@version" : "1",
"message" : "this is a test",
"host" : "Logstash"
}
}
}
]
}
}
  1. 确认Logstash可以正常运行之后,我们可以ctrl+c退出Logstash进程,后面会和Filebeat进行联调。

3.5 为Nginx服务器安装Filebeat

经过上面的步骤,我们已经完成了ELK(Elasticsearch - Logstash - Kibana)的部署,但在常见的技术实践中,因为Logstash需要依赖Java环境才能运行,往往不会直接安装到业务机上。在主流的解决方案中,会在业务机上安装filebeat进行日志传输。

3.5.1 安装Filebeat

官方文档:www.elastic.co/guide/en/be…

1
2
3
4
5
6
7
8
9
bash复制代码curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.13.4-amd64.deb
sudo dpkg -i filebeat-7.13.4-amd64.deb

# 安装完成后,会自动创建systemctl项目
root@hexo:/tmp# systemctl status filebeat
● filebeat.service - Filebeat sends log files to Logstash or directly to Elasticsearch.
Loaded: loaded (/lib/systemd/system/filebeat.service; disabled; vendor preset: enabled)
Active: inactive (dead)
Docs: https://www.elastic.co/beats/filebeat
3.5.2 配置Filebeat

以采集nginx access.log为例, ,我们使用的配置范例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
yaml复制代码# /etc/filebeat/filebeat.yml

# 配置filebeat自身的运行日志输出
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7

# filebeat对日志的采集和输出配置
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/hexo_access.log

setup.template.settings:
index.number_of_shards: 1

output.logstash:
# 这里输入的是logstash的地址和端口,后面我们会对logstash进行联调
hosts: ["192.168.0.211:5400"]
3.5.3 Filebeat启动

使用命令systemctl start filebeat.service即可启动filebeat,但目前filebeat与logstash之间的联调还没完成,所以filebeat会有报错,我们会在后面解决这个问题。

1
vbnet复制代码2021-08-03T22:12:45.795+0800    ERROR   [publisher_pipeline_output]     pipeline/output.go:154  Failed to connect to backoff(async(tcp://192.168.0.211:5400)): dial tcp 192.168.0.211:5400: connect: connection refused

4. 联调

经过了上面的步骤,我们已经完成了ELK + Filebeat + Nginx 典型业务集群的搭建。下面,我们需要继续进行具体的联调,让ELK发挥作用,实现日志检索和统计的功能。

4.1 filebeat与logstash的联调

从架构图可以看到,Filebeat会把日志传送到Logstash,Logstash再把日志传到ES,我们重点调整一下Logstash。

4.1.1 logstash配置修改
  1. 先把前面用于测试的配置/etc/logstash/conf.d/test.conf删除。
  2. 创建新的测试配置/etc/logstash/conf.d/nginx-es.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
json复制代码input {
beats {
host => "0.0.0.0"
port => 5400 # 对应在filebeat的配置中,output到logstash的5400端口
}
}

output {
elasticsearch {
hosts => ["192.168.0.212:9200"]
index => "rc_index_pattern-%{+YYYY.MM.dd}"
}
}
  1. 使用命令systemctl start logstash.service启动logstash服务。
4.1.2 验证现象
  1. 使用浏览器访问Nginx服务,使access_log产生内容。
  2. filebeat日志/var/log/filebeat/filebeat产生内容如下:
1
2
3
4
5
6
7
8
9
10
yaml复制代码2021-08-06T16:56:02.801+0800    INFO    [registrar]     registrar/registrar.go:109      States Loaded from registrar: 1
2021-08-06T16:56:02.801+0800 INFO [crawler] beater/crawler.go:71 Loading Inputs: 1
2021-08-06T16:56:02.802+0800 INFO log/input.go:157 Configured paths: [/var/log/nginx/hexo_access.log]
2021-08-06T16:56:02.802+0800 INFO [crawler] beater/crawler.go:141 Starting input (ID: 16452737213288841630)
2021-08-06T16:56:02.802+0800 INFO [crawler] beater/crawler.go:108 Loading and starting Inputs completed. Enabled inputs: 1
2021-08-06T16:56:02.802+0800 INFO log/harvester.go:302 Harvester started for file: /var/log/nginx/hexo_access.log
2021-08-06T16:56:03.803+0800 INFO [publisher_pipeline_output] pipeline/output.go:143 Connecting to backoff(async(tcp://192.168.0.211:5400))
2021-08-06T16:56:03.803+0800 INFO [publisher] pipeline/retry.go:219 retryer: send unwait signal to consumer
2021-08-06T16:56:03.803+0800 INFO [publisher] pipeline/retry.go:223 done
2021-08-06T16:56:03.803+0800 INFO [publisher_pipeline_output] pipeline/output.go:151 Connection to backoff(async(tcp://192.168.0.211:5400)) established

/var/log/nginx/hexo_access.log的日志已经成功推送到logstash。
3. 检查ES内容,发现ES已自动创建索引rc_index_pattern-2021.08.06并且已经有Nginx日志内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
sql复制代码curl -XGET '192.168.0.212:9200/rc_index_pattern-2021.08.06/_doc/_search/?pretty'
rondo@ES:~$ curl -XGET '192.168.0.212:9200/rc_index_pattern-2021.08.06/_doc/_search/?pretty'
{
"took" : 728,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 76,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "rc_index_pattern-2021.08.06",
"_type" : "_doc",
"_id" : "0_ynGnsB-S8uwXpkwkJF",
"_score" : 1.0,
"_source" : {
"host" : {
"name" : "hexo"
},
"@timestamp" : "2021-08-06T08:50:04.192Z",
"input" : {
"type" : "log"
},
"tags" : [
"beats_input_codec_plain_applied"
],
"message" : "202.105.107.186 - - [06/Aug/2021:16:49:57 +0800] \"GET /css/main.css HTTP/1.1\" 200 56117 \"http://192.168.0.125:80/\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36\"",
"@version" : "1",
"log" : {
"offset" : 197,
"file" : {
"path" : "/var/log/nginx/hexo_access.log"
}
},
"agent" : {
"id" : "d2f43da1-5024-4000-9251-0bcc8fc10697",
"type" : "filebeat",
"version" : "7.13.4",
"hostname" : "hexo",
"ephemeral_id" : "77089197-d22d-497f-a064-76fbb2e369b3",
"name" : "hexo"
},
"ecs" : {
"version" : "1.8.0"
}
}
},
......

4.2 Kibana设置

  1. 点击页面左上角的≡图标,在弹出菜单中点击底部的Stack Management

kibana-1.PNG
2. 在Index Patterns菜单中点击Create index pattern

kibana-2.PNG
3. 页面会显示已有的index,在输入框中输入需要的pattern,匹配你想要的index,点击Next step

kibana-3.PNG
4. 设置index使用的时间戳,创建index pattern。

kibana-4.PNG

kibana-5.PNG
5. 完成index pattern创建后,点击左上角菜单栏,进入Discover

kibana-6.PNG
6. 在页面左上角选择适当的时间范围,就可以查看索引内容了。

kibana-7.PNG
7. 点击每条消息左侧的>按钮,可以查看消息详情,里面会包含Nginx日志的详情。

kibana-8.PNG

5. 总结

经过前面一系列的操作,我们得到了一个ELK集群,并模拟了实际生产的场景,展示了从业务日志(nginx)产生到入库ES的过程,形成了一个日志平台的闭环。但这个只是雏形,后面我们会对这个集群不断进行优化,过程中会涉及到各个服务的配置优化和横向扩容等场景,我们拭目以待。

原文网址:ELK专题:Day1——最小化搭建ELK集群,采集Nginx日志

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring的ApplicationEventPublish

发表于 2021-09-06

今天闲来无事翻看国内对微服务封装的还不错的pig源码,看到了封装了一个工具类里边用到了Spring的ApplicationEventPublisher也就是今天这篇文章的主人公,源于对这个类不知道是干嘛的,在百度了一下后顺便记录一下,可能后期会用的上

  • ApplicationEventPublisher是publishEvent实现异步快速
  • 使用ApplicationEventPublisher的publishEvent来发布事件

这个一开始感觉和mq有点类似的,mq就是发送消息,在接收消息,而这个是推送事件,在监听事件.这里我进行模拟一个插入用户的时候将这个用户的信息作为事件推送最后进行打印

废话不多说,直接上代码

  1. 创建用户实体类UserDTO
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码/**
* @author 阿鹏
* @date 2021/9/3 13:55
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UserDTO {
private String name;
private String sex;
}
  1. 创建一个service,这里模拟一下,并不是service,只要能注入到spring容器中,能让我在测试类中运行就行哈哈
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码/**
* @author 阿鹏
* @date 2021/9/3 13:56
* 假设这个类是service类 这里需要模拟一个保存用户的方法 然后发送事件并去监听
*/
@Component
@RequiredArgsConstructor
public class ApplicationEventSender {

private final ApplicationEventPublisher publisher;

public void saveUser(UserDTO dto) {
// 推送事件
publisher.publishEvent(dto);
}
}
  1. 使用监听的类监听事件,这里使用的注解是@EventListener,里边的属性自己去研究一下吧,这里我使用的是condition属性,就是满足就执行不满足不执行,里边的表达式是spring的spel表达式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码/**
* @author 阿鹏
* @date 2021/9/3 14:17
*/
@Component
public class EventListenerDemo {

@EventListener(condition = "#user.name!=null")
public void watch(UserDTO user) {
// 如果user的name是null的话,下边都不会执行
System.out.println(user.getName());
System.out.println(user.getSex());
}
}
  1. 进行测试
1
2
3
4
5
6
7
8
9
10
java复制代码    @Autowired
private ApplicationEventSender sender;

@Test
public void testSpringWatch() {
sender.saveUser(UserDTO.builder()
.name("阿鹏从小就淘")
.sex("男")
.build());
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…537538539…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%