开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

高频面试题-请把Java的双亲委派机制说清楚!

发表于 2021-06-16

这是我参与更文挑战的第5天,活动详情查看: 更文挑战

如果面试官问你,类加载过程是哪几步?

巴拉巴拉巴拉…(加载、验证、准备、解析、初始化)

见这小伙子面容惊奇,脸泛红光,不由自主的就问了一下双亲委派模型说一下吧;遇见没准备充分的,瞬间懵逼。

下面我们就来说一下这个一个有意思的虚拟机类加载机制。

一说起双亲委派,就必然要先聊一下Java中的类加载器。

Java中的类加载器

Bootstrap ClassLoader (启动类加载器)

Bootstrap ClassLoader,启动类加载,默认加载的是jdk\lib目录下jar中诸多类;

这个路径可以使用 -Xbootclasspath参数指定。

Extension ClassLoader (扩展类加载器)

Extension ClassLoader,扩展类加载器,默认加载jdk\lib\ext\目录下jar中诸多类;

这个路径可以使用 java.ext.dirs系统变量来更改。

Application ClassLoader (应用程序类加载器)

Application ClassLoader,应用程序类加载器,负责加载开发人员所编写的诸多类。

User ClassLoader (自定义类加载器)

自定义类加载器,当存在上述类加载器解决不了的特殊情况,或存在特殊要求时,可以自行实现类加载逻辑。

关系如图所示:

1.png

双亲委派模型是什么?

说完了类加载器,下面我们就说一下什么是双亲委派模型吧。

其是在JDK1.2期间被引入的,而后陆续被推荐给开发者,到目前已经成为了最常用的类加载器实现方式了。

双亲委派整个过程分为以下几步:

  1. 假设用户刚刚摸鱼写的Test类想进行加载,这个时候首先会发送给应用程序类加载器AppCloassLoader;
  2. 然后AppClassLoader并不会直接去加载Test类,而是会委派于父类加载器完成此操作,也就是ExtClassLoader;
  3. ExtClassLoader同样也不会直接去加载Test类,而是会继续委派于父类加载器完成,也就是BootstrapClassLoader;
  4. BootstrapClassLoader这个时候已经到顶层了,没有父类加载器了,所以BootstrapClassLoader会在jdk/lib目录下去搜索是否存在,因为这里是用户自己写的Test类,是不会存在于jdk下的,所以这个时候会给子类加载器一个反馈。
  5. ExtClassLoader收到父类加载器发送的反馈,知道了父类加载器并没有找到对应的类,爸爸靠不住,就只能自己来加载了,结果显而易见,自己也不行,没办法,只能给更下面的子类加载器了。
  6. AppClassLoader收到父类加载器的反馈,顿时明白,原来爸爸虽然是爸爸,但是他终究不能管儿子的私事,所以这时候,AppClassLoader就自己尝试去加载。
  7. 结果,就这样成功了,走了一大圈,兜兜转转还是自己干。

这个并没有那么复杂,我就不画图了哈,大家如果想看图,可以去网上再搜一下,我记得有个大佬画的就很形象。

为什么要使用双亲委派模型?

使用双亲委派模型,有一个很大的好处,就是避免原始类被覆盖的问题。

比如,用户编写了一个Object类,放入程序中加载。

当没有双亲委派机制时,就会出现重复的Object类,会给开发人员造成很大的困扰,本来就只需要基于JDK开发就好了,现在还得把JDK中的类全记住,避免编写重复的类。

当存在双亲委派机制时呢,整个事情就不一样了,每次加载类时,都会遵循双亲委派机制,去问父类是否可以加载,如果可以呢,那就不需要再次加载了,这样事情就变得简单了。(老子走的路,小子不能走 》.《)

如何打破双亲委派模型?

这个问题,其实就算是双亲委派模型中最深入的问题了,最起码中高级工程师面试,问到这也就下个话题了。

这里我也简单说一下吧,最常见的也是人们常说的有两种,分别是:

  1. 在自定义类加载器中,重写loadClass方法。

为什么呢?因为如果你去看ClassLoader类的源码时,你会发现,双亲委派的核心代码就是在这个方法中的;试想,你如果重写了这个方法,自然而然的就打破了双亲委派机制了。

2.png

  1. 使用线程上下文类加载器

因为双亲委派机制不能支持SPI(Service Provider Interface), 所以才会引入线程上下文类加载器,有了它,就可以打通双亲委派模型的层次结构来反向使用类加载器来完成类加载。

这个目前也是没有办法的事,涉及到SPI的目前都在使用这种方式来完成类加载,其中就包括常用到的JNDI、JDBC、JAXB等。

这里其实还有一个可以打破双亲委派模型的情况,那就是OSGI,这个解释是在《深入理解 Java虚拟机》这本书中,可能是这本书的作者本身就对OSGI比较熟悉,而且看简介还出过一本OSGI的书。

在很多应用环境中,OSGI被当做模块化热部署实现的关键,所以,这样就说明了,OSGI一定会对类加载过程做相应的一些措施。

因为这个并不常碰到,还是大家自行去查看吧,我就不在这赘述了。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MySQL系列(5)— InnoDB 缓冲池Buffer P

发表于 2021-06-16

系列文章:MySQL系列专栏

缓冲池

通过前面几篇文章,我们已经了解到 InnoDB 存储引擎是基于磁盘存储的,并将其中的记录按照页的方式进行管理。但由于CPU速度与磁盘速度之间的鸿沟,通常都会将数据加载到内存中来操作数据,以此提高数据库的整体性能。

InnoDB 设计了一个缓冲池(Buffer Pool),当需要访问某个页时,就会把这一页的数据全部加载到缓冲池中,这样就可以在内存中进行读写访问了。对于数据库中页的修改操作,也是先修改在缓冲池中的页,然后再以一定的频率刷新到磁盘上。有了缓冲池,就可以省去很多磁盘IO的开销了,从而提升数据库性能。

注意即使只访问页中的一条记录,也需要把整个页的数据加载到内存中。前面有说过,通过索引只能定位到磁盘中的页,而不能定位到页中的一条记录。将页加载到内存后,就可以通过页目录(Page Directory)去定位到某条具体的记录。

Buffer Pool 结构

MySQL服务器启动的时候就会向操作系统申请一片连续的内存,即 Buffer Pool。默认配置下 Buffer Pool 只有128MB 大小,我们可以调整 innodb_buffer_pool_size 参数来设置 Buffer Pool 的大小。

例如下面设置了1GB的内存,单位是字节:

1
2
sh复制代码[server]
innodb_buffer_pool_size = 1073741824

Buffer Pool 也是按页来划分的,默认和磁盘上的页一样,都是 16KB 大小。Buffer Pool 中不只缓存了数据页,还包括索引页、undo页、插入缓冲、自适应哈希索引、InnoDB存储的锁信息、数据字典信息等。

为了管理 Buffer Pool 中的缓存页,InnoDB 为每一个缓存页都创建了一些描述信息(元数据),用来描述这个缓存页。描述信息主要包括该页所属的表空间编号、页号、缓存页的地址、链表节点信息、锁信息、LSN信息等等。

这个描述信息本身也是一块数据,它们占用的内存大小都是相同的。在 Buffer Pool 中,每个缓存页的描述信息放在最前面,各个缓存页在后面。看起来就像下图这个样子。

image.png

另外需要注意下,每个描述数据大约相当于缓存页大小的 5%,也就是800字节左右的样子。而我们设置的 innodb_buffer_pool_size 并不包含描述数据的大小,实际上 Buffer Pool 的大小会超出这个值。比如默认配置 128MB,那么InnoDB在为 Buffer Pool 申请连续的内存空间时,会申请差不多 128 + 128*5% ≈ 134MB 大小的空间。

Buffer Pool 中划分缓存页的时候,会让所有的缓存页和描述数据块都紧密的挨在一起,前面是描述数据块,后面是缓存页,尽可能减少内存浪费。但是划分完后,可能还剩一点点的内存,这点内存放不下任何一个缓存页了,这就是图中的碎片空间了。

Free 链表

MySQL 数据库启动的时候,InnoDB 就会为 Buffer Pool 申请一片连续的内存空间,然后按照默认的16KB的大小划分出一个个的页来,还会按照800字节左右的大小划分出页对应的描述数据来。

划分完成后,Buffer Pool 中的缓存页都是空的,等执行增删改查等操作时,才会把数据对应的页从磁盘加载到 Buffer Pool 中的页来。

那怎么知道哪些页是空闲的呢?InnoDB 设计了一个 free 链表,它是一个双向链表数据结构,这个链表的每个节点就是一个空闲缓存页的描述信息。

实际上,每个描述信息中有 free_pre、free_next 两个指针,Free链表 就是由这两个指针连接起来形成的一个双向链表。然后 Free链表 有一个基础节点,这个基础节点存放了链表的头节点地址、尾节点地址,以及当前链表中节点数的信息。

Free链表 看起来就像下图这样:

image.png

需要注意的是,链表的基础节点占用的内存空间并不包含在 Buffer Pool 之内,而是单独申请的一块内存空间,每个基节点只占用40字节大小。后边介绍的其它链表的基础节点也是一样的,都是单独申请的一块40字节大小的内存空间。

有了这个 Free链表 之后,当需要从磁盘加载一个页到 Buffer Pool 时,就从 Free链表 中取出一个描述数据块,然后将页写入这个描述数据块对应的空闲缓存页中。并把一些描述数据写入描述数据块中,比如页的表空间号、页号之类的。最后,把缓存页对应的描述数据块从 Free链表 中移除,表示该缓存页已被使用了。

可以看到,从磁盘加载一个页到缓存页后,缓存页对应的描述信息块就从Free链表移除了。

image.png

缓存页哈希表

有些数据页被加载到 Buffer Pool 的缓存页中了,那怎么知道一个数据页有没有被缓存呢?

所以InnoDB还会有一个哈希表数据结构,它用 表空间号+数据页号 作key,value 就是缓存页的地址。

当使用一个数据页的时候,会先通过表空间号+数据页号作为key去这个哈希表里查一下,如果没有就从磁盘读取数据页,如果已经有了,就直接使用该缓存页。

LRU 链表

前面已经知道 Free链表管理着所有空闲的缓存页,当使用了一个缓存页后,那个缓存页就从 Free链表 移除了,那它对应的控制块到哪里去了呢?

这时就有另外一个 LRU 链表 来管理已经使用了的缓存页。LRU链表与 Free链表的结构是类似的,都会有一个基础节点来指向链表的首、尾描述信息块,加入LRU链表中的描述信息块就通过 free_pre 和 free_next 两个指针连接起来行程一个双向链表。

例如查询数据的时候,首先从索引找到数据所在的数据页,再根据 表空间号+页号 去缓存页哈希表查找页是否已经在 Buffer Pool 中了,如果在就直接使用,不在就从磁盘加载页,然后从 Free链表 取出一个空闲页加入到 LRU链表,再把数据页放到缓存页中,并以表空间号+页号作为 key,缓存页地址作为 value,放入缓存页哈希表。

可以通过下图来理解 Free链表 和 LRU链表,注意缓存页没有画出,描述信息块是保存了缓存页的地址的。

image.png

简单 LRU 链表

LRU 链表,就是所谓的 Least Recently Used,最近最少使用的意思。因为缓冲池大小是有限的,不可能一直加载数据到缓冲池中,对于一些频繁访问的数据可以一直留在缓冲池中,而一些很少访问的数据,当缓存页快用完了的时候,就可以淘汰掉一些。这时就可以使用 LRU链表 来管理已使用的缓存页,这样就可以知道哪些页最常使用,哪些页最少使用了。

按照正常对LRU链表的理解,一个新的数据页从磁盘加载到缓存页时,对应的描述信息块应该是放到 LRU链表 的头部,每次查询、修改了一个页,也将这个页对应的描述信息块移到 LRU链表 的头部,也就是说最近被访问过的缓存页都在 LRU 链表的头部。然后当 Free链表 用完了之后,就可以从 LRU链表的尾部找一些最少使用的页刷入磁盘,腾出一些空闲页来。

但是这种 LRU链表 有几个问题:

1、InnoDB 有一个预读机制,就是从磁盘上加载一个数据页的时候,可能连带着把这个数据页相邻的其它数据页都加载到缓存里去。虽然预读了其它页,但可能都没用上,但是这些页如果都往 LRU 头部放,就会导致原本经常访问的页往后移,然后被淘汰掉。这种情况属于加载到 Buffer Pool 中的页不一定被用到导致缓存命中率降低。

2、如果我们写了一个全表扫描的查询语句,一下就将整个表的页加载到了 LRU 的头部,如果表记录很多的话,可能 LRU 链表中之前经常被访问的页一下就淘汰了很多,而留下来的数据可能并不会被经常访问到。这种就是加载了大量使用频率很低的页到 Buffer Pool,然后淘汰掉使用频率很高的页,从而导致缓存命中率降低。

缓存命中率:假设一共访问了n次页,那么被访问的页已经在缓存中的次数除以n就是所谓的缓存命中率,缓存命中率肯定是越高越好

冷热数据分离的 LRU 链表

为了解决简单 LRU 链表的问题,InnoDB在设计 LRU 链表的时候,实际上是采取冷热数据分离的思想,LRU链表会被拆成两部分,一部分是热数据(又称new列表),一部分是冷数据(又称old列表)。如下图所示。

image.png

这个冷热数据的位置并不固定,是一个比例,由参数 innodb_old_blocks_pct 来控制,默认比例是 37,也就是冷数据占 37%,大约占 LRU链表 3/8 的样子。

1
2
3
4
5
6
sh复制代码mysql> SHOW VARIABLES LIKE 'innodb_old_blocks_pct';
+-----------------------+-------+
| Variable_name | Value |
+-----------------------+-------+
| innodb_old_blocks_pct | 37 |
+-----------------------+-------+

基于冷热分离的LRU链表,这时新加载一个缓存页时,就不是直接放到LRU的头部了,而是放到冷数据区域的头部。那什么时候将冷数据区域的页移到热数据区域呢?也许你会认为访问了一次或几次页就会移到热数据区域的头部,其实不是这样的。

如果是预读机制加载了一些不用的页,慢慢的被淘汰掉就行了。但如果是全表扫描加载了大量的页进来,必然是会被读取至少一次的,而且一页包含很多条记录,可能会被访问多次。所以这时就将冷数据区域的页移到热数据区域也是不太合理的。

所以 InnoDB 设置了一个规则,在第一次访问冷数据区域的缓存页的时候,就在它对应的描述信息块中记录第一次访问的时间,默认要间隔1秒后再访问这个页,才会被移到热数据区域的头部。也就是从第一次加载到冷数据区域后,1秒内多次访问都不会移动到热数据区域,基本上全表扫描查询缓存页的操作1秒内就结束了。

这个间隔时间是由参数 innodb_old_blocks_time 控制的,默认是 1000毫秒。如果我们把这个参数值设置为0,那么每次访问一个页面时就会把该页面放到热数据区域的头部。

1
2
3
4
5
6
sh复制代码mysql> SHOW VARIABLES LIKE 'innodb_old_blocks_time';
+------------------------+-------+
| Variable_name | Value |
+------------------------+-------+
| innodb_old_blocks_time | 1000 |
+------------------------+-------+

之后缓存页不够用的时候,就会优先从冷数据区域的尾部淘汰掉一些不常用的页,频繁访问的数据页还是会留在热数据区域,不会受到影响。而冷数据区域停留超过1秒的页,被再次访问时就会移到热数据区域的头部。

热数据区域中的页是每访问一次就移到头部吗?也不是的,热数据区域是最频繁访问的数据,如果频繁的对LRU链表进行节点移动操作也是不合理的。所以 InnoDB 就规定只有在访问了热数据区域的 后3/4 的缓存页才会被移动到链表头部,访问 前1/4 中的缓存页是不会移动的。

下面对冷热数据分离的LRU链表总结下:

  • LRU链表分为冷、热数据区域,前 63% 为热数据区域,后 37% 为冷数据区域,加载缓存页先放到冷数据区域头部。
  • 冷数据区域的缓存页第一次访问超过1秒后,再次访问时才会被移动到热数据区域头部。
  • 热数据区域中,只有后 3/4 的缓存页被访问才会移到头部,前 1/4 被访问到不会移动。
  • 淘汰数据优先淘汰冷数据区域尾部的缓存页。

Flush 链表

当我们执行增删改的时候,肯定是去更新了 Buffer Pool 中的某些缓存页,那这些被更新了的缓存页就和磁盘上的数据页不一致了,就变成了脏页。这些脏页最终肯定会被刷回到磁盘中,但并不是所有的缓存页都需要刷回到磁盘,因为有些页只是被查询了,但并没有被增删改过。

那怎么知道哪些页是脏页呢?这时就引入了另一个链表,Flush 链表。Flush链表 跟前面两个链表一样,也有一个基础节点,如果一个缓存页被修改了,就会加入到 Flush链表 中。但是不像 LRU链表 是从 Free链表 中来的,描述信息块中还有两个指针 flush_pre、flush_next用来连接形成 flush 链表,所以 Flush链表 中的缓存页一定是在 LRU 链表中的,而 LRU 链表中不在 Flush链表 中的缓存页就是未修改过的页。可以通过下图来理解 LRU 链表和 Flsuh链表。

可以看到,脏页既存在于 LRU链表 中,也存在于 Flush链表 中。LRU链表 用来管理 Buffer Pool 中页的可用性,Flush链表 用来管理将页刷新回磁盘,二者互不影响。

image.png

刷新脏页到磁盘

前面我们已经知道,LRU链表分为冷热数据区域,这样就可以在空闲缓存页不够的用的时候,可以将LRU链表尾部的磁盘页刷回磁盘,腾出一些空闲页来,还有 Flush链表 中的脏页,在某些时刻也会刷回磁盘中。那将脏页刷回磁盘的时机有哪些呢?

  • 定时从 LRU链表 尾部刷新一部分脏页到磁盘

后台有专门的线程会定时从LRU链表尾部扫描一些缓存页,扫描的数量可以通过参数 innodb_lru_scan_depth 来设置。如果有脏页,就会把它们刷回磁盘,然后释放掉,不是脏页就直接释放掉,再把它们加回Free链表中。这种刷新页面的方式被称之为 BUF_FLUSH_LRU。

  • 定时把 Flush链表 中的一些脏页刷回磁盘

后台线程会在MySQL不怎么繁忙的时候,将 Flush 链表中的一些脏页刷到磁盘,这样LRU热数据区域的一些脏页就会被刷回磁盘。这种刷新页面的方式被称之为 BUF_FLUSH_LIST。

  • 没有空闲页的时候刷新页

前面两种方式是后台线程定时运行,并不是在缓存页满的时候才去刷新脏页,这种方式不会影响用户线程处理正常的请求。

但可能要加载一个数据页到 Buffer Pool 时,没有空闲页了,这时就会从 LRU链表 尾部找一个缓存页,如果是脏页就刷回磁盘,如果不是脏页就释放掉,然后放入Free链表中,再将数据页放入这个腾出来的空闲页中。如果要刷新脏页,这时就会降低处理用户请求的速度,毕竟和磁盘交互是很慢的。这种刷新单个页面到磁盘中的刷新方式被称之为 BUF_FLUSH_SINGLE_PAGE。

查看 Buffer Pool 状态

我们可以通过 SHOW ENGINE INNODB STATUS; 来查看 InnoDB 的状态信息。但是要注意,状态并不是当前的状态,而是过去某个时间范围内 InnoDB 存储引擎的状态,例如从下面一次输出中可以看到是过去15秒内的一个状态。

1
2
3
4
sh复制代码=====================================
2021-04-30 09:20:24 0x7f39a34a7700 INNODB MONITOR OUTPUT
=====================================
Per second averages calculated from the last 15 seconds

Buffer Pool和内存状态

从输出的内容中,可以找到 BUFFER POOL AND MEMORY 这段关于缓冲池和内存的状态信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
sh复制代码----------------------
BUFFER POOL AND MEMORY
----------------------
Total large memory allocated 1099431936
Dictionary memory allocated 8281957
Buffer pool size 65535
Free buffers 1029
Database pages 63508
Old database pages 23423
Modified db pages 80
Pending reads 0
Pending writes: LRU 0, flush list 0, single page 0
Pages made young 15278983, not young 2027514654
0.00 youngs/s, 0.00 non-youngs/s
Pages read 83326150, created 1809368, written 21840503
0.00 reads/s, 0.00 creates/s, 0.00 writes/s
Buffer pool hit rate 1000 / 1000, young-making rate 1 / 1000 not 0 / 1000
Pages read ahead 0.00/s, evicted without access 0.00/s, Random read ahead 0.00/s
LRU len: 63508, unzip_LRU len: 0
I/O sum[833]:cur[0], unzip sum[0]:cur[0]

其中信息如下:

  • Total large memory allocated:Buffer Pool向操作系统申请的内存空间大小,包括全部控制块、缓存页、以及碎片的大小。
  • Dictionary memory allocated:为数据字典信息分配的内存空间大小,这个内存空间和Buffer Pool没啥关系,不包括在Total large memory allocated中。
  • Buffer pool size:缓存池页的数量,所以缓冲池大小为 65535 * 16KB = 1G。
  • Free buffers:Free链表中的空闲缓存页数量。
  • Database pages:LRU链表中的缓存页数量。需要注意的是,Database pages + Free buffers 可能不等于 Buffer pool size,因为缓冲池中的页还可能分配给自适应哈希索引、Lock信息、Insert Buffer等,而这部分不需要LRU来管理。
  • Old database pages:LRU冷数据区域(old列表)的缓存页数量,23423/63508=36.88%,约等于 37%。
  • Modified db pages:修改过的页,这就是 Flush链表中的脏页数量。
  • Pending reads:正在等待从磁盘上加载到Buffer Pool中的页面数量。当准备从磁盘中加载某个页面时,会先为这个页面在Buffer Pool中分配一个缓存页以及它对应的控制块,然后把这个控制块添加到LRU的冷数据区域的头部,但是这个时候真正的磁盘页并没有被加载进来,所以 Pending reads 的值会加1。
  • Pending writes:从LRU链表中刷新到磁盘中的页面数量,其实就对应着前面说的三种刷盘的时机:BUF_FLUSH_LRU、BUF_FLUSH_LIST、BUF_FLUSH_SINGLE_PAGE。
  • Pages made young:显示了页从LRU的冷数据区域移到热数据区域头部的次数。注意如果是热数据区域后3/4被访问移动到头部是不会增加这个值的。
  • Pages made not young:这个是由于 innodb_old_blocks_time 的设置导致页没有从冷数据区域移到热数据区域的页数,可以看到这个值减少了很多不常用的页被移到热数据区域。
  • xx youngs/s, xx non-youngs/s:表示 made young 和 not young 这两类每秒的操作次数。
  • xx reads/s, xx creates/s, xx writes/s:代表读取,创建,写入的速率。
  • Buffer pool hit rate xx/1000:表示在过去某段时间,平均访问1000次页面,有多少次该页面已经被缓存到Buffer Pool了,表示缓存命中率。这里显示的就是 100%,说明缓冲池运行良好。这是一个重要的观察变量,通常该值不应该小于 95%,否则我们应该看下是否有全表扫描引起LRU链表被污染的问题。
  • young-making rate xx/1000 not xx/1000:表示在过去某段时间,平均访问1000次页面,有多少次访问使页面移动到热数据区域的头部了,以及没移动的缓存页数量。
  • LRU len:LRU 链表中节点的数量。
  • I/O sum[xx]:cur[xx]:最近50s读取磁盘页的总数,现在正在读取的磁盘页数量。

查看LRU链表信息

我们还可以查询 information_schema 下的 INNODB_BUFFER_PAGE_LRU 来观察LRU链表中每个页的具体信息。

1
2
3
4
5
6
7
8
9
10
11
12
sql复制代码mysql> SELECT * FROM information_schema.INNODB_BUFFER_PAGE_LRU WHERE TABLE_NAME = '`hzero_platform`.`iam_role`';
+---------+--------------+-------+-------------+-----------+------------+-----------+-----------+---------------------+---------------------+-------------+-----------------------------+-------------+----------------+-----------+-----------------+------------+---------+--------+-----------------+
| POOL_ID | LRU_POSITION | SPACE | PAGE_NUMBER | PAGE_TYPE | FLUSH_TYPE | FIX_COUNT | IS_HASHED | NEWEST_MODIFICATION | OLDEST_MODIFICATION | ACCESS_TIME | TABLE_NAME | INDEX_NAME | NUMBER_RECORDS | DATA_SIZE | COMPRESSED_SIZE | COMPRESSED | IO_FIX | IS_OLD | FREE_PAGE_CLOCK |
+---------+--------------+-------+-------------+-----------+------------+-----------+-----------+---------------------+---------------------+-------------+-----------------------------+-------------+----------------+-----------+-----------------+------------+---------+--------+-----------------+
| 0 | 7938 | 14261 | 66 | INDEX | 1 | 0 | NO | 287769805005 | 0 | 1178342949 | `hzero_platform`.`iam_role` | PRIMARY | 57 | 14956 | 0 | NO | IO_NONE | YES | 0 |
| 0 | 7973 | 14261 | 39 | INDEX | 1 | 0 | YES | 287769773866 | 0 | 1036433373 | `hzero_platform`.`iam_role` | PRIMARY | 48 | 15102 | 0 | NO | IO_NONE | YES | 85389982 |
| 0 | 7974 | 14261 | 12 | INDEX | 1 | 0 | YES | 287769585186 | 0 | 1176755165 | `hzero_platform`.`iam_role` | PRIMARY | 55 | 15036 | 0 | NO | IO_NONE | YES | 85389982 |
| 0 | 7975 | 14261 | 65 | INDEX | 1 | 0 | YES | 287769796826 | 0 | 1176755230 | `hzero_platform`.`iam_role` | PRIMARY | 52 | 15097 | 0 | NO | IO_NONE | YES | 85389982 |
| 0 | 8034 | 14261 | 9 | INDEX | 1 | 0 | NO | 287769569861 | 0 | 1176811763 | `hzero_platform`.`iam_role` | PRIMARY | 52 | 14958 | 0 | NO | IO_NONE | YES | 85389982 |
| 0 | 8035 | 14261 | 11 | INDEX | 1 | 0 | NO | 287769577285 | 0 | 1177589383 | `hzero_platform`.`iam_role` | PRIMARY | 52 | 15040 | 0 | NO | IO_NONE | YES | 85389982 |
| 0 | 8036 | 14261 | 13 | INDEX | 1 | 0 | NO | 287769592932 | 0 | 1177589384 | `hzero_platform`.`iam_role` | PRIMARY | 54 | 15011 | 0 | NO | IO_NONE | YES | 85389982 |
| 0 | 8037 | 14261 | 14 | INDEX | 1 | 0 | NO | 287769599951 | 0 | 1177589384 | `hzero_platform`.`iam_role` | PRIMARY | 49 | 15170 | 0 | NO | IO_NONE | YES | 85389982 |

其中的一些信息如下:

  • POOL_ID:缓冲池ID,我们是可以设置多个缓冲池的。
  • SPACE:页所属表空间ID,表空间ID也可以从 information_schema.INNODB_SYS_TABLES 去查看。
  • PAGE_NUMBER:页号。
  • PAGE_TYPE:页类型,INDEX就是数据页。
  • NEWEST_MODIFICATION、OLDEST_MODIFICATION:LRU热数据区域和冷数据区域被修改的记录,如果想查询脏页的数量,可以加上条件 (NEWEST_MODIFICATION > 0 or OLDEST_MODIFICATION > 0)。
  • NUMBER_RECORDS:这一页中的记录数。
  • COMPRESSED:是否压缩了。

从INNODB_SYS_TABLES查看表信息:

1
2
3
4
5
6
sql复制代码mysql> SELECT * FROM information_schema.INNODB_SYS_TABLES WHERE NAME = 'hzero_platform/iam_role';
+----------+-------------------------+------+--------+-------+-------------+------------+---------------+------------+
| TABLE_ID | NAME | FLAG | N_COLS | SPACE | FILE_FORMAT | ROW_FORMAT | ZIP_PAGE_SIZE | SPACE_TYPE |
+----------+-------------------------+------+--------+-------+-------------+------------+---------------+------------+
| 14552 | hzero_platform/iam_role | 33 | 30 | 14261 | Barracuda | Dynamic | 0 | Single |
+----------+-------------------------+------+--------+-------+-------------+------------+---------------+------------+

Buffer Pool 配置调优

配置多个Buffer Pool来提升数据库的并发性能

多线程访问 Buffer Pool 的时候,会涉及到对同一个 Free、LRU、Flush 等链表的操作,例如节点的移动、缓存页的刷新等,那必然是会涉及到加锁的。

首先要知道,就算只有一个 Buffer Pool,多线程访问要加锁、释放锁,由于基本都是内存操作,所以性能也是很高的。但在一些高并发的生产环境中,配置多个 Buffer Pool,还是能极大地提高数据库并发性能的。

可以通过参数 innodb_buffer_pool_instances 来配置 Buffer Pool 实例数,通过参数 innodb_buffer_pool_size 设置所有 Buffer Pool 的总大小(单位字节)。每个 Buffer Pool 的大小就是 innodb_buffer_pool_size / innodb_buffer_pool_instances。

1
2
3
sh复制代码[server]
innodb_buffer_pool_size=2147483648
innodb_buffer_pool_instances=2

InnoDB 规定,当 innodb_buffer_pool_size 小于1GB的时候,设置多个实例是无效的,会默认把innodb_buffer_pool_instances 的值修改为1。

动态调整Buffer Pool大小

我们可以在运行时动态调整 innodb_buffer_pool_size 这个参数,但 InnoDB 并不是一次性申请 pool_size 大小的内存空间,而是以 chunk 为单位申请。一个 chunk 默认就是 128M,代表一片连续的空间,申请到这片内存空间后,就会被分为若干缓存页与其对应的描述信息块。

也就是说一个Buffer Pool实例其实是由若干个chunk组成的,每个chunk里划分了描述信息块和缓存页,然后共用一套 Free链表、LRU链表、Flush链表。

image.png

每个chunk 的大小由参数 innodb_buffer_pool_chunk_size 控制,这个参数只能在服务器启动时指定,不能在运行时动态修改。

1
2
3
4
5
6
sql复制代码mysql> SHOW VARIABLES LIKE 'innodb_buffer_pool_chunk_size';
+-------------------------------+-----------+
| Variable_name | Value |
+-------------------------------+-----------+
| innodb_buffer_pool_chunk_size | 134217728 |
+-------------------------------+-----------+

合理设置 Buffer Pool 大小

在生产环境中安装MySQL数据库,首先我们一般要选择大内存的机器,那我们如何合理的设置 Buffer Pool 的大小呢?

比如有一台 32GB 的机器,不可能说直接给个30G,要考虑几个方面。首先前面说过,innodb_buffer_pool_size 并不包含描述块的大小,实际 Buffer Pool 的大小会超出 innodb_buffer_pool_size 5% 左右。另外机器本身运行、MySQL运行也会占用一定的内存,所以一般 Buffer Pool 可以设置为机器的 50%~60% 左右就可以了,比如32GB的机器,就设置 innodb_buffer_pool_size 为 20GB。

另外,innodb_buffer_pool_size 必须是 innodb_buffer_pool_chunk_size × innodb_buffer_pool_instances 的倍数,主要是保证每一个Buffer Pool实例中包含的chunk数量相同。

比如默认 chunk_size=128MB,pool_size 设置 20GB,pool_instances 设置 16 个,那么 20GB / (128MB * 16) = 10 倍,这样每个 Buffer Pool 的大小就是 128MB * 10 = 1280MB。如果将 pool_instances 设置为 32 个,那么 20GB / (128MB * 32) = 5 倍,这样每个 Buffer Pool 的代销就是 128MB * 5 = 640MB

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Mysql为何使用可重复读(Repeatable read)

发表于 2021-06-16

事务的特性(ACID)

群里有小伙伴面试时,碰到面试官提了个很刁钻的问题:

Mysql为何使用可重复读(Repeatable read)为默认隔离级别???

下面进入正题:

我们都知道事务的几种性质 :原子性、一致性、隔离性和持久性 (ACID)

为了维持一致性和隔离性,一般使用加锁这种方式来处理,但是加锁相对带来的是并发处理能力的降低

而数据库是个高并发的应用,因此对于加锁的处理是事务的精髓.

下面我们来了解一下封锁协议,以及事务在数据库中做了什么

封锁协议(Locking Protocol)

MySQL的锁系统:shared lock 和 exclusive lock 即共享锁和排他锁,也叫读锁(S)和写锁(X),共享锁和排他锁都属于悲观锁。排他锁又可以可以分为行锁和表锁。

封锁协议(Locking Protocol): 在使用X锁或S锁对数据加锁时,约定的一些规则.例如何时申请X或S锁,持续时间,何时释放锁等.

一级、二级、三级封锁协议

对封锁方式规定不同的规则,就形成了各种不同的封锁协议,不同的封锁协议,为并发操作的正确性提供不同程度的保证

一级封锁协议

一级封锁协议定义:事务T在修改数据R之前必须先对其加X锁(排他锁),直到事务结束才释放。事务结束包括正常结束(COMMIT)和非正常结束(ROLLBACK)。

1
markdown复制代码	一级封锁协议可以防止丢失修改,并保证事务T是可恢复的。使用一级封锁协议可以解决丢失修改问题。

在一级封锁协议中,如果仅仅是读数据不对其进行修改,是不需要加锁的,它不能保证可重复读和不读“脏”数据。  

二级封锁协议

二级封锁协议定义:一级封锁协议加上事务T在读取数据R之前必须先对其加S锁(共享锁),读完后释放S锁。事务的加锁和解锁严格分为两个阶段,第一阶段加锁,第二阶段解锁。

  • 加锁阶段: 在对任何数据进行读操作之前要申请并获得S锁(共享锁,其它事务可以继续加共享锁,但不能加排它锁),在进行写操作之前要申请并获得X锁(排它锁,其它事务不能再获得任何锁)。加锁不成功,则事务进入等待状态,直到加锁成功才继续执行。
  • 解锁阶段:当事务释放了一个封锁以后,事务进入解锁阶段,在该阶段只能进行解锁操作不能再进行加锁操作。

   二级封锁协议除防止了丢失修改,还可以进一步防止读“脏”数据。但在二级封锁协议中,由于读完数据后释放S锁,所以它不能保证可重复读。

​ 二级封锁的目的是保证并发调度的正确性。就是说,如果事务满足两段锁协议,那么事务的并发调度策略是串行性的。保证事务的并发调度是串行化(串行化很重要,尤其是在数据恢复和备份的时候)   

三级封锁协议

三级封锁协议定义:一级封锁协议加上事务T在读取数据R之前必须先对其加S锁(共享锁),直到事务结束才释放。在一级封锁协议(一级封锁协议:修改之前先加X锁,事务完成释放)的基础上加上S锁,事务结束后释放S锁

  三级封锁协议除防止了丢失修改和不读“脏”数据外,还进一步防止了不可重复读。
上述三级协议的主要区别在于什么操作需要申请封锁,以及何时释放。

事务四种隔离级别

在数据库操作中,为了有效保证并发读取数据的正确性,提出的事务隔离级别。上面提到的封锁协议 ,也是为了构建这些隔离级别存在的。

隔离级别 脏读(Dirty Read) 不可重复读(NonRepeatable Read) 幻读(Phantom Read)
未提交读(Read uncommitted) 可能 可能 可能
已提交读(Read committed) 不可能 可能 可能
可重复读(Repeatable read) 不可能 不可能 可能
可串行化(Serializable ) 不可能 不可能 不可能

对于事务并发访问会产生的问题,以及各隔离级别的详细介绍在我的上一篇文章

一文搞懂事务

为什么是RR

一般的DBMS系统,默认都会使用读提交(Read-Comitted,RC)作为默认隔离级别,如Oracle、SQL Server等,而MySQL却使用可重复读(Read-Repeatable,RR)。要知道,越高的隔离级别,能解决的数据一致性问题越多,理论上性能的损耗更大,且并发性越低。隔离级别依次为: SERIALIZABLE > RR > RC > RU

我们可以通过以下语句设置和获取数据库的隔离级别:

查看系统的隔离级别:

1
2
3
4
5
6
7
mysql复制代码mysql> select @@global.tx_isolation isolation;
+-----------------+
| isolation |
+-----------------+
| REPEATABLE-READ |
+-----------------+
1 row in set, 1 warning (0.00 sec)

查看当前会话的 隔离级别:

1
2
3
4
5
6
7
mysql复制代码mysql> select @@tx_isolation;
+----------------+
| @@tx_isolation |
+----------------+
| READ-COMMITTED |
+----------------+
1 row in set, 1 warning (0.00 sec)

设置会话的隔离级别,隔离级别由低到高设置依次为:

1
2
3
4
mysql复制代码set session transacton isolation level read uncommitted;
set session transacton isolation level read committed;
set session transacton isolation level repeatable read;
set session transacton isolation level serializable;

设置当前系统的隔离级别,隔离级别由低到高设置依次为:

1
2
3
4
mysql复制代码set global transacton isolation level read uncommitted;
set global transacton isolation level read committed;
set global transacton isolation level repeatable read;
set global transacton isolation level serializable;

可重复读(Repeated Read):可重复读。基于锁机制并发控制的DBMS需要对选定对象的读锁(read locks)和写锁(write locks)一直保持到事务结束,但不要求“范围锁(range-locks)”,因此可能会发生“幻影读(phantom reads)”
在该事务级别下,保证同一个事务从开始到结束获取到的数据一致。是Mysql的默认事务级别。

下面我们先来思考2个问题

  • 在读已提交(Read Commited)级别下,出现不可重复读问题怎么办?需要解决么?

不用解决,这个问题是可以接受的!毕竟你数据都已经提交了,读出来本身就没有太大问题!Oracle ,SqlServer 默认隔离级别就是RC,我们也没有更改过它的默认隔离级别.

  • 在Oracle,SqlServer中都是选择读已提交(Read Commited)作为默认的隔离级别,为什么Mysql不选择读已提交(Read Commited)作为默认隔离级别,而选择可重复读(Repeatable Read)作为默认的隔离级别呢?

历史原因,早阶段Mysql(5.1版本之前)的Binlog类型Statement是默认格式,即依次记录系统接受的SQL请求;5.1及以后,MySQL提供了Row,Mixed,statement 3种Binlog格式, 当binlog为statement格式,使用RC隔离级别时,会出现BUG因此Mysql将可重复读(Repeatable Read)作为默认的隔离级别!

Binlog简介

Mysql binlog是二进制日志文件,用于记录mysql的数据更新或者潜在更新(比如DELETE语句执行删除而实际并没有符合条件的数据),在mysql主从复制中就是依靠的binlog。可以通过语句“show binlog events in ‘binlogfile’”来查看binlog的具体事件类型。binlog记录的所有操作实际上都有对应的事件类型的

MySQL binlog的三种工作模式:
Row (用到MySQL的特殊功能如存储过程、触发器、函数,又希望数据最大化一直则选择Row模式,我们公司选择的是row)
简介:日志中会记录每一行数据被修改的情况,然后在slave端对相同的数据进行修改。
优点:能清楚的记录每一行数据修改的细节
缺点:数据量太大

Statement (默认)
简介:每一条被修改数据的sql都会记录到master的bin-log中,slave在复制的时候sql进程会解析成和原来master端执行过的相同的sql再次执行。在主从同步中一般是不建议用statement模式的,因为会有些语句不支持,比如语句中包含UUID函数,以及LOAD DATA IN FILE语句等
优点:解决了 Row level下的缺点,不需要记录每一行的数据变化,减少bin-log日志量,节约磁盘IO,提高新能
缺点:容易出现主从复制不一致

Mixed(混合模式)
简介:结合了Row level和Statement level的优点,同时binlog结构也更复杂。

我们可以简单理解为binlog是一个记录数据库更改的文件,主从复制时需要此文件,具体细节先略过

主从不一致实操

binlog为STATEMENT格式,且隔离级别为**读已提交(Read Commited)**时,有什么bug呢?
测试表:

1
2
3
4
5
6
7
8
9
10
11
12
mysql复制代码mysql> select * from test;
+----+------+------+
| id | name | age |
+----+------+------+
| 1 | NULL | NULL |
| 2 | NULL | NULL |
| 3 | NULL | NULL |
| 4 | NULL | NULL |
| 5 | NULL | NULL |
| 6 | NULL | NULL |
+----+------+------+
6 rows in set (0.00 sec)
Session1 Session2
mysql> set tx_isolation = ‘read-committed’;
Query OK, 0 rows affected, 1 warning (0.00 sec) mysql> set tx_isolation = ‘read-committed’;
Query OK, 0 rows affected, 1 warning (0.00 sec)
begin;Query OK, 0 rows affected (0.00 sec) begin;Query OK, 0 rows affected (0.00 sec)
delete from test where 1=1;
Query OK, 6 rows affected (0.00 sec)
insert into test values (null,’name’,100);
Query OK, 1 row affected (0.00 sec)
commit;
Query OK, 0 rows affected (0.01 sec)
commit;
Query OK, 0 rows affected (0.01 sec)

Master此时输出

1
2
3
4
5
6
7
mysql复制代码select * from test;
+----+------+------+
| id | name | age |
+----+------+------+
| 7 | name | 100 |
+----+------+------+
1 row in set (0.00 sec)

但是,你在此时在从(slave)上执行该语句,得出输出

1
2
3
mysql复制代码
mysql> select * from test;
Empty set (0.00 sec)

在master上执行的顺序为先删后插!而此时binlog为STATEMENT格式,是基于事务记录,在事务未提交前,二进制日志先缓存,提交后再写入记录的,因此顺序为先插后删!slave同步的是binglog,因此从机执行的顺序和主机不一致!slave在插入后删除了所有数据.

解决方案有两种!
(1)隔离级别设为可重复读(Repeatable Read),在该隔离级别下引入间隙锁。当Session 1执行delete语句时,会锁住间隙。那么,Ssession 2执行插入语句就会阻塞住!
(2)将binglog的格式修改为row格式,此时是基于行的复制,自然就不会出现sql执行顺序不一样的问题!奈何这个格式在mysql5.1版本开始才引入。因此由于历史原因,mysql将默认的隔离级别设为可重复读(Repeatable Read),保证主从复制不出问题!

RU和Serializable

项目中不太使用**读未提交(Read UnCommitted)和串行化(Serializable)**两个隔离级别,原因:

读未提交(Read UnCommitted)

允许脏读,也就是可能读取到其他会话中未提交事务修改的数据 一个事务读到另一个事务未提交读数据

串行化(Serializable)

使用的悲观锁的理论,实现简单,数据更加安全,但是并发能力非常差。如果你的业务并发的特别少或者没有并发,同时又要求数据及时可靠的话,可以使用这种模式。一般是使用mysql自带分布式事务功能时才使用该隔离级别

RC和 RR

此时我们纠结的应该就只有一个问题了:隔离级别是用读已提交还是可重复读?

接下来对这两种级别进行对比的第一种情况:

在RR隔离级别下,存在间隙锁,导致出现死锁的几率比RC大的多!

实现一个简单的间隙锁例子

1
2
3
4
5
6
7
8
9
10
11
12
13
mysql复制代码select * from test where id <11 ;
+----+------+------+
| id | name | age |
+----+------+------+
| 1 | NULL | NULL |
| 2 | NULL | NULL |
| 3 | NULL | NULL |
| 4 | NULL | NULL |
| 5 | NULL | NULL |
| 6 | NULL | NULL |
| 7 | name | 7 |
+----+------+------+
7 rows in set (0.00 sec)
session1 session2
mysql> set tx_isolation = ‘repeatable-read’;
Query OK, 0 rows affected, 1 warning (0.00 sec) mysql> set tx_isolation = ‘repeatable-read’;
Query OK, 0 rows affected, 1 warning (0.00 sec)
Begin;
select * from test where id <11 for update;
insert into test values(null,’name’,9); //被阻塞!
commit;
Query OK, 0 rows affected (0.00 sec)
Query OK, 1 row affected (12.23 sec) //锁释放后完成了操作

在RR隔离级别下,可以锁住(-∞,10] 这个间隙,防止其他事务插入数据!
而在RC隔离级别下,不存在间隙锁,其他事务是可以插入数据!

ps:在RC隔离级别下并不是不会出现死锁,只是出现几率比RR低而已

锁表和锁行

在RR隔离级别下,条件列未命中索引会锁表!而在RC隔离级别下,只锁行

1
2
3
4
5
6
7
8
9
10
mysql复制代码select * from test;
+----+------+------+
| id | name | age |
+----+------+------+
| 8 | name | 11 |
| 9 | name | 9 |
| 10 | name | 15 |
| 11 | name | 15 |
| 12 | name | 16 |
+----+------+------+

锁表的例子:

session1 session2
Begin;
update test set age = age+1 where age = 15;
Rows matched: 2 Changed: 2 Warnings: 0
insert into test values(null,’test’,15);
ERROR 1205 (HY000): Lock wait timeout exceeded;
Commit;

session2插入失败 查询 数据显示:

1
2
3
4
5
6
7
8
9
10
mysql复制代码select * from test;
+----+------+------+
| id | name | age |
+----+------+------+
| 8 | name | 11 |
| 9 | name | 9 |
| 10 | name | 16 |
| 11 | name | 16 |
| 12 | name | 16 |
+----+------+------+

半一致性读(semi-consistent)特性

在RC隔离级别下,半一致性读(semi-consistent)特性增加了update操作的并发性!

在5.1.15的时候,innodb引入了一个概念叫做“semi-consistent”,减少了更新同一行记录时的冲突,减少锁等待。
所谓半一致性读就是,一个update语句,如果读到一行已经加锁的记录,此时InnoDB返回记录最近提交的版本,判断此版本是否满足where条件。若满足则重新发起一次读操作,此时会读取行的最新版本并加锁!

建议

在RC级别下,用的binlog为row格式,是基于行的复制,Innodb的创始人也是建议binlog使用该格式

互联网项目请用:读已提交(Read Commited)这个隔离级别

总结

由于历史原因,老版本Mysql的binlog使用statement格式,不使用RR隔离级别会导致主从不一致的情况

目前(5.1版本之后)我们使用row格式的binlog 配合RC隔离级别可以实现更好的并发性能.

关注公众号:java宝典

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【爬虫实战】一起一步步分析亚马逊的反爬虫机制

发表于 2021-06-16

​

大家好,我是Lex 喜欢欺负超人那个Lex

擅长领域:python开发、网络安全渗透、Windows域控Exchange架构

今日重点:一步步分析and越过亚马逊的反爬虫机制

事情是这样的

亚马逊是全球最大的购物平台

很多商品信息、用户评价等等都是最丰富的。

今天,手把手带大家,越过亚马逊的反爬虫机制

爬取你想要的商品、评论等等有用信息

)​

反爬虫机制

但是,我们想用爬虫来爬取相关的数据信息时

像亚马逊、TBao、JD这些大型的购物商城

他们为了保护自己的数据信息,都是有一套完善的反爬虫机制的

先试试亚马逊的反爬机制

我们用不同的几个python爬虫模块,来一步步试探

最终,成功越过反爬机制。

一、urllib模块

代码如下:

1
2
3
4
ini复制代码# -*- coding:utf-8 -*-
import urllib.request
req = urllib.request.urlopen('https://www.amazon.com')
print(req.code)

返回结果:状态码:503。

分析:亚马逊将你的请求,识别为了爬虫,拒绝提供服务。

)​

本着科学严谨的态度,我们拿万人上的百度试一下。

返回结果:状态码 200

分析:正常访问

)​

那说明,urllib模块的请求,被亚马逊识别为爬虫,并拒绝提供服务

二、requests模块

1、requests直接爬虫访问

效果如下 ↓ ↓ ↓

)​

代码如下 ↓ ↓ ↓

1
2
3
4
ini复制代码import requests
url='https://www.amazon.com/KAVU-Rope-Bag-Denim-Size/product-reviews/xxxxxx'
r = requests.get(url)
print(r.status_code)

返回结果:状态码:503。

分析:亚马逊同样拒绝了requsets模块的请求

将其识别为了爬虫,拒绝提供服务。

2、我们给requests加上cookie

加上请求cookie等相关信息

效果如下 ↓ ↓ ↓

)​

代码如下 ↓ ↓ ↓

1
2
3
4
5
6
7
8
9
10
11
12
13
dart复制代码import requests

url='https://www.amazon.com/KAVU-Rope-Bag-Denim-Size/product-reviews/xxxxxxx'
web_header={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:88.0) Gecko/20100101 Firefox/88.0',
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Cookie': '你的cookie值',
'TE': 'Trailers'}
r = requests.get(url,headers=web_header)
print(r.status_code)

返回结果:状态码:200

分析:返回状态码是200了,正常了,有点爬虫那味了。

3、检查返回页面

我们通过requests+cookie的方法,得到的状态码为200

目前至少被亚马逊的服务器正常提供服务了

我们将爬取的页面写入文本中,通过浏览器打开。

)​

我踏马…返回状态是正常了,但返回的是一个反爬虫的验证码页面。

还是 被亚马逊给挡住了。

三、selenium自动化模块

相关selenium模块的安装

1
复制代码pip install selenium

代码中引入selenium,并设置相关参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
python复制代码import os
from requests.api import options
from selenium import webdriver
from selenium.webdriver.chrome.options import Options

#selenium配置参数
options = Options()
#配置无头参数,即不打开浏览器
options.add_argument('--headless')
#配置Chrome浏览器的selenium驱动
chromedriver="C:/Users/pacer/AppData/Local/Google/Chrome/Application/chromedriver.exe"
os.environ["webdriver.chrome.driver"] = chromedriver
#将参数设置+浏览器驱动组合
browser = webdriver.Chrome(chromedriver,chrome_options=options)

测试访问

1
2
3
4
ini复制代码url = "https://www.amazon.com"
print(url)
#通过selenium来访问亚马逊
browser.get(url)

返回结果:状态码:200

分析:返回状态码是200了,访问状态正常,我们再看看爬到的网页信息。

将网页源码保存到本地

1
2
3
4
5
lua复制代码#将爬取到的网页信息,写入到本地文件
fw=open('E:/amzon.html','w',encoding='utf-8')
fw.write(str(browser.page_source))
browser.close()
fw.close()

打开我们爬取的本地文件,查看 ,

我们已经成功越过了反爬虫机制,进入到了Amazon的首页

)​

结局

通过selenium模块,我们可以成功的越过

亚马逊的反爬虫机制。

下一篇:我们继续介绍,如何来爬取亚马逊的数十万商品信息及评论。

【有问题,请留言~】

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot项目整合FastDFS+Nginx实现图

发表于 2021-06-16

FastDFS概述

  • FastDFS是一个开源的轻量级分布式文件系统,它对文件进行管理,功能包括:文件存储、文件同步、文件访问(文件上传、文件下载)等,解决了大容量存储和负载均衡的问题。特别适合以文件为载体的在线服务,如相册网站、视频网站等等。
  • FastDFS为互联网量身定制,充分考虑了冗余备份、负载均衡、线性扩容等机制,并注重高可用、高性能等指标,使用FastDFS很容易搭建一套高性能的文件服务器集群提供文件上传、下载等服务。
  • FastDFS由阿里资深架构师余庆开发。
    在这里插入图片描述

Fastdfs原理

FastDFS包含Tracker Server和Storage Server;
客户端请求Tracker Server进行文件的上传与下载;
Tracker Server调度Storage Server最终完成上传与下载。
在这里插入图片描述

  • Tracker (追踪者)
+ 作用是负载均衡和调度,它管理着存储服务(Storage Server),可以理解为:“大管家,追踪者,调度员”;
+ Tracker Server可以集群,实现高可用,策略为“轮询”。
  • Storage (贮存器)
+ 作用是文件存储,客户端上传的文件最终存储到storage服务器上;
+ storage集群采用分组的方式,同组内的每台服务器是平等关系,数据同步,目的是实现数据备份,从而高可用,而不同组的服务器之间是不通信的;
+ 同组内的每台服务器的存储量不一致的情况下,会选取容量最小的那个,所以同组内的服务器之间软硬件最好保持一致。
+ Storage Server会连接集群中的所有Tracker Server,定时向他们汇报自己的状态,例如:剩余空间,文件同步情况,文件上传下载次数等信息。

上传文件流程
在这里插入图片描述
查询文件流程
在这里插入图片描述

安装Fastdfs

1、安装gcc

1
r复制代码yum install -y gcc gcc-c++

2、下载libfastcommon到/usr/local下

1
2
bash复制代码cd /usr/local
wget https://github.com/happyfish100/libfastcommon/archive/V1.0.7.tar.gz

3、解压libfastcommon

1
2
bash复制代码tar -zxvf V1.0.7.tar.gz
cd libfastcommon-1.0.7

4、安装libfastcommon

1
2
bash复制代码./make.sh
./make.sh install

5、下载fastdfs

1
bash复制代码wget https://github.com/happyfish100/fastdfs/archive/V5.05.tar.gz

6、解压fastdfs并安装

1
2
3
4
bash复制代码tar -zxvf V5.05.tar.gz
cd fastdfs-5.05/
./make.sh
./make.sh install

7、将conf目录下的所有文件复制到/etc/fdfs/

1
bash复制代码cp /usr/local/fastdfs-5.05/conf/* /etc/fdfs/

8、配置tracker

1
2
bash复制代码cd /etc/fdfs
vi tracker.conf

主要配置

1
2
3
4
ini复制代码#端口号
port=22122
#基础目录(Tracker运行时会向此目录存储storage的管理数据)
base_path=/usr/local/fastdfs

如果base_path不存在,则需要创建目录

1
bash复制代码mkdir /usr/local/fastdfs

9、配置storage

1
2
bash复制代码cd /etc/fdfs
vi storage.conf

主要配置

1
2
3
4
5
6
7
8
9
10
11
12
ini复制代码#配置组名
group_name=group1
#端口
port=23000
#向tracker心跳间隔(秒)
heart_beat_interval=30
#基础目录,目录不存在,需要自行创建
base_path=/usr/local/fastdfs
#存放文件的位置,目录不存在,需要自行创建
store_path0=/usr/local/fastdfs/fdfs_storage
#配置tracker服务器:IP
tracker_server=192.168.31.168:22122

10、启动服务
启动tracker

1
bash复制代码/usr/bin/fdfs_trackerd /etc/fdfs/tracker.conf restart

启动storage

1
bash复制代码/usr/bin/fdfs_storaged /etc/fdfs/storage.conf restart

查看服务

1
复制代码netstat -ntlp

在这里插入图片描述

整合Nginx模块

1、上传fastdfs-nginx-module_v1.16.tar.gz 到/usr/local
2、解压nginx模块

1
复制代码tar -zxvf fastdfs-nginx-module_v1.16.tar.gz

3、修改config文件,将文件中的 /usr/local/ 路径改为 /usr/

1
2
lua复制代码cd /usr/local/fastdfs-nginx-module/src
vi config

在这里插入图片描述
4、将fastdfs-nginx-module/src下的mod_fastdfs.conf拷贝至/etc/fdfs下

1
bash复制代码cp mod_fastdfs.conf /etc/fdfs/

5、修改/etc/fdfs/mod_fastdfs.conf

1
2
3
4
5
6
7
8
ini复制代码vi /etc/fdfs/mod_fastdfs.conf
内容:
base_path=/usr/local/fastdfs
tracker_server=192.168.31.168:22122
#url中包含group名称
url_have_group_name=true
#指定文件存储路径(上面配置的store路径)
store_path0=/usr/local/fastdfs/fdfs_storage

6、将libfdfsclient.so拷贝至/usr/lib下

1
bash复制代码cp /usr/lib64/libfdfsclient.so /usr/lib/

7、创建nginx/client目录

1
bash复制代码mkdir -p /var/temp/nginx/client

安装Nginx

1、 将nginx-1.8.0.tar.gz上传到/usr/local
2、解压:tar -zxvf nginx-1.8.0.tar.gz
3、安装依赖库

1
2
3
4
5
6
复制代码yum install pcre
yum install pcre-devel
yum install zlib
yum install zlib-devel
yum install openssl
yum install openssl-devel

4、进入nginx解压的目录下:

1
bash复制代码cd /usr/local/nginx-1.8.0

5、安装

1
2
3
4
5
6
7
8
9
10
11
12
13
javascript复制代码./configure \
--prefix=/usr/local/nginx \
--pid-path=/var/run/nginx/nginx.pid \
--lock-path=/var/lock/nginx.lock \
--error-log-path=/var/log/nginx/error.log \
--http-log-path=/var/log/nginx/access.log \
--with-http_gzip_static_module \
--http-client-body-temp-path=/var/temp/nginx/client \
--http-proxy-temp-path=/var/temp/nginx/proxy \
--http-fastcgi-temp-path=/var/temp/nginx/fastcgi \
--http-uwsgi-temp-path=/var/temp/nginx/uwsgi \
--http-scgi-temp-path=/var/temp/nginx/scgi \
--add-module=/usr/local/fastdfs-nginx-module/src

编译、安装

1
2
go复制代码make
make install

安装成功
6、拷贝配置文件http.conf和mime.types

1
2
bash复制代码cd /usr/local/fastdfs-5.0.5/conf
cp http.conf mime.types /etc/fdfs/

7、修改nginx配置文件

1
2
bash复制代码cd /usr/local/nginx/conf/
vi nginx.conf

在这里插入图片描述
8、关闭nginx,并启动nginx

1
2
bash复制代码pkill -9 nginx
/usr/local/nginx/sbin/nginx -c /usr/local/nginx/conf/nginx.conf

在这里插入图片描述
9、启动nginx
在这里插入图片描述

SpringBoot整合Fastdfs

1、创建SpringBoot项目
2、引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
xml复制代码 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.github.tobato</groupId>
<artifactId>fastdfs-client</artifactId>
<version>1.26.4</version>
</dependency>

3、启动类上配置

1
2
less复制代码@Import(FdfsClientConfig.class)
@EnableMBeanExport(registration = RegistrationPolicy.IGNORE_EXISTING)

4、配置文件

1
2
3
4
5
ini复制代码fdfs.so-timeout=3000
fdfs.connect-timeout=1000
fdfs.thumb-image.height=60
fdfs.thumb-image.width=60
fdfs.tracker-list=192.168.31.168:22122

5、控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码@Controller
public class UploadController {

public static final String DIR = "http://192.168.31.168/";

@Autowired
private FastFileStorageClient client;

@RequestMapping("login")
public String login(){
return "login";
}

@ResponseBody
@RequestMapping(value = "/upload",method = RequestMethod.POST)
public JsonResult upload(MultipartFile file) throws IOException {
//获得后缀名
String extension = FilenameUtils.getExtension(file.getOriginalFilename());
//上传
StorePath storePath = client.uploadFile(file.getInputStream(), file.getSize(), extension, null);
System.out.println("save:" + storePath.getFullPath());
return new JsonResult(1,DIR + storePath.getFullPath());
}
}

Java对象

1
2
3
4
5
6
vbnet复制代码public class JsonResult {

private Integer code;
private Object data;
//get/set/constructor
}

6、测试页面
页面使用了Vue+ElementUI
在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
xml复制代码<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8">
<title>上传</title>
<link rel="stylesheet" href="/elementui/index.css">
<style>
...
</style>
</head>
<body>
<div id="app">
<el-card >
<el-upload
class="avatar-uploader"
action="/upload"
:show-file-list="false"
:on-success="handleAvatarSuccess">
<img v-if="imageUrl" :src="imageUrl" class="avatar">
<i v-else class="el-icon-plus avatar-uploader-icon"></i>
</el-upload>
</el-card>
</div>
<script src="/vue/vue.js"></script>
<script src="/elementui/index.js"></script>
<script>
new Vue({
el:"#app",
data(){
return{
imageUrl: ''
}
},
methods:{
handleAvatarSuccess(res, file) {
console.log(res);
this.imageUrl = res.data;
}
}
})
</script>
</body>
</html>

上传效果
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

关于微信支付的一些坑

发表于 2021-06-16

其实关于微信支付,网上已经有很多的案例.今天主要是介绍下关于微信支付,退款,转账中遇到的那些容易而且极易被忽略的一些坑.至于key,mcid,oppid和证书下载网上已经很全面了
项目依赖

1
2
3
4
5
java复制代码    <dependency>
<groupId>com.github.wxpay</groupId>
<artifactId>wxpay-sdk</artifactId>
<version>0.0.3</version>
</dependency>

配置类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
java复制代码public class WXConfigUtil implements WXPayConfig {

private final byte[] certData;

private String mchId;

private String key;


public WXConfigUtil(String certPath, String key, String mchId) throws Exception {
//从微信商户平台下载的安全证书存放的路径
File file = new File(certPath);
InputStream certStream = new FileInputStream(file);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
byte[] b = new byte[1024];
int n;
while ((n = certStream.read(b)) != -1) {
bos.write(b, 0, n);
}
certStream.close();
bos.close();
//将读入的证书加载到cerData(这一步在网上的文章几乎都是没用)
this.certData = bos.toByteArray();
this.mchId = mchId;
this.key = key;
}

@Override
public String getAppID() {
return WechatConstant.APP_ID;
}

@Override
public String getMchID() {
return mchId;
}

@Override
public String getKey() {
return key;
}

@Override
public InputStream getCertStream() {
//请求时获取证书
return new ByteArrayInputStream(this.certData);
}

@Override
public int getHttpConnectTimeoutMs() {
return 8000;
}

@Override
public int getHttpReadTimeoutMs() {
return 10000;
}

}

上面的配置其实比较关键的就是证书读取的配置.废话也不多数,下面就是统一的下单接口的处理:
支付:(这里已app支付为例)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码  Map<String, String> response;
try {
WXConfigUtil wxConfigUtil = new WXConfigUtil(certPath, key, mchid);
WXPay wxPay = new WXPay(wxConfigUtil);
Map<String, String> data = new HashMap<>();
//随机生成商户订单号

System.out.println("商户订单号------------" + orderNo);

//商户号
data.put("mch_id", wxConfigUtil.getMchID());
//随机字符串
data.put("nonce_str", WXPayUtil.generateNonceStr());
//商品描述
data.put("body", "描述");
//商品订单号
data.put("out_trade_no", orderNo);
// 总金额(单位分)
data.put("total_fee",100);
//终端IP
data.put("spbill_create_ip", "124.xx.xx.xx");
//回调地址
data.put("notify_url", notifyUrl);
//appid
data.put("appid", wxConfigUtil.getAppID());
//交易类型
data.put("trade_type", "APP");
data.put("attach", wechatVo.getUserId().toString());
//生成签名
String sign = WXPayUtil.generateSignature(data, wxConfigUtil.getKey(), WXPayConstants.SignType.MD5);
data.put("sign", sign);
String str = WXPayUtil.mapToXml(data);
System.out.println("map转xml" + str);
System.out.println("我给的数据是" + data);
System.out.println("第一次签名------------------" + sign);
//使用官方API请求预付订单
response = wxPay.unifiedOrder(data);
} catch (Exception e) {
e.printStackTrace();
return null;
}
//返回结果,可以自己封装
return response;

企业转账到个人:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码  Map<String, String> restmap = null;
try {
WXConfigUtil wxConfigUtil = new WXConfigUtil(certPath, key, mchid);
WXPay wxPay = new WXPay(wxConfigUtil);
Map<String, String> param = new HashMap<String, String>();
//公众账号appid
param.put("mch_appid", wxConfigUtil.getAppID());
//商户号
param.put("mchid", wxConfigUtil.getMchID());
//随机字符串
param.put("nonce_str", WXPayUtil.generateNonceStr());
//商户订单号
param.put("partner_trade_no", "");
//用户openid
param.put("openid", wechatVo.getOpenId());
//校验用户姓名选项 OPTION_CHECK
param.put("check_name", "NO_CHECK");
param.put("amount","");
//企业付款描述信息
param.put("desc", "withdraw");
//服务器Ip地址
param.put("spbill_create_ip", "124.xx.xx.xx");
param.put("sign", WXPayUtil.generateSignature(param, wxConfigUtil.getKey()));
//携带证书请求
String restxml = wxPay.requestWithCert("https://api.mch.weixin.qq.com/mmpaymkttransfers/promotion/transfers", param, 15000, 15000);
assert restxml != null;
restmap = WXPayUtil.xmlToMap(restxml);
} catch (Exception e) {
System.err.println(e.getMessage());
}
//返回请求结果
return restmap;

退款:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码     HashMap<String, String> data = new HashMap<String, String>();
String orderNo = "VxRefund_" + System.currentTimeMillis();
try {
WXConfigUtil wxConfigUtil = new WXConfigUtil(certPath, key, mchid);
WXPay wxPay = new WXPay(wxConfigUtil);
data.put("appid", wxConfigUtil.getAppID());
data.put("mch_id", wxConfigUtil.getMchID());
data.put("nonce_str", WXPayUtil.generateNonceStr());
//微信订单号 流水号____
data.put("transaction_id", "支付的流水号");
//商户退款单号 退款单号 ___
data.put("out_refund_no", orderNo);
//支付金额,微信支付提交的金额是不能带小数点的,且是以分为单位,这边需要转成字符串类型,否则后面的签名会失败
data.put("total_fee", "");
//退款总金额,订单总金额,单位为分,只能为整数
data.put("refund_fee", "");
data.put("op_user_id", wxConfigUtil.getMchID());
//MD5运算生成签名,这里是第一次签名,用于调用统一下单接口
String sign = WXPayUtil.generateSignature(data, wxConfigUtil.getKey());
//生成签名
data.put("sign", sign);
//使用携带证书请求
String s = wxPay.requestWithCert("https://api.mch.weixin.qq.com/secapi/pay/refund", data, 15000, 15000);
Map<String, String> response = WXPayUtil.xmlToMap(s);
if ("SUCCESS".equals(returnCode) && returnCode.equals(resultCode)) {
//todo 自己的业务逻辑
} else {
return ResultInfo.error("退款失败");
}

} catch (Exception e) {
e.printStackTrace();
}
return ResultInfo.successful("退款成功");

统一回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
java复制代码 StringBuilder sb = new StringBuilder();
try {
//在获取wx回调数据
BufferedReader br = new BufferedReader(new InputStreamReader(request.getInputStream()));
String line = null;
new StringBuilder();
while ((line = br.readLine()) != null) {
sb.append(line);
}
br.close();
} catch (IOException e) {
e.printStackTrace();
}
//sb为微信返回的xml
String notityXml = sb.toString();

System.out.println("======================微信支付异步结果逻辑处理开始=================================");
WXConfigUtil config = null;
try {
config = new WXConfigUtil(certPath, key, mchid);
} catch (Exception e) {
e.printStackTrace();
}
WXPay wxpay = new WXPay(config);
String xmlBack = "";
Map<String, String> notifyMap = null;
try {
// 调用官方SDK转换成map类型数据
notifyMap = WXPayUtil.xmlToMap(notityXml);
System.out.println("返回的map----------------" + notifyMap);
//验证签名是否有效,有效则进一步处理
System.out.println("返回的错误代码--------" + notifyMap.get("err_code") + "返回的错误信息--------" + notifyMap.get("err_code_des"));
if (wxpay.isPayResultNotifySignatureValid(notifyMap)) {
//状态
String returnCode = notifyMap.get("return_code");
//商户订单号
String outTradeNo = notifyMap.get("out_trade_no");
String userId = notifyMap.get("attach");
if ("SUCCESS".equals(returnCode)) {
if (outTradeNo != null) {
//业务数据持久化
String transactionId = notifyMap.get("transaction_id");
//todo 业务逻辑
//..........
System.err.println("-------------------------------支付成功----------------------");
xmlBack = "<xml>" + "<return_code><![CDATA[SUCCESS]]></return_code>" + "<return_msg><![CDATA[OK]]></return_msg>" + "</xml> ";
} else {
xmlBack = "<xml>" + "<return_code><![CDATA[FAIL]]></return_code>" + "<return_msg><![CDATA[报文为空]]></return_msg>" + "</xml> ";
}
}
} else {
// 签名错误,如果数据里没有sign字段,也认为是签名错误
//失败的数据要不要存储?
xmlBack = "<xml>" + "<return_code><![CDATA[FAIL]]></return_code>" + "<return_msg><![CDATA[报文为空]]></return_msg>" + "</xml> ";
}
} catch (Exception e) {
xmlBack = "<xml>" + "<return_code><![CDATA[FAIL]]></return_code>" + "<return_msg><![CDATA[报文为空]]></return_msg>" + "</xml> ";
}
return ResultInfo.successful(xmlBack);

以上只是一些浅显的业务逻辑,希望对大家有所帮助

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

模板方法 (宝,我输液了,输的想你的夜)

发表于 2021-06-16

本文 GitHub github.com/JavaFamily 已收录,有一线大厂面试完整考点、资料以及我的系列文章。

大家每到一家公司都会发现,每个公司都会有一个规范,比如说请假流程规范,代码规范等等。每个公司都有这个流程,只是里面的具体执行条件不一样而已。

在设计模式中的模版方法模式,也是可以理解为一种规范模版。主要是提升我们代码的复用性,以及扩展等问题。

这样的模板方法在我们当舔狗跟妹妹们聊天的时候也是可以用到的,比如这样一个模板:

“宝,XXXX了,XXXX什么XX?X你的XXX”

当我拿到这样一个模板的时候,我就可以举一反三直接套用了,我们直接填参数就可以了,比如:

“宝,我打疫苗了,打的什么苗 ,爱你的每一秒 ”

“宝,我做核酸了,做的什么酸,得不到你的心酸”

“宝,今天去输液了,输的什么液,想你的夜”

………..

好了言归正传,在框架中模版方法模式也是很常见的。

今天就具体来聊聊设计模式中行为型设计模式中模版方法模式。

设计模式系列往期文章:

  • 单例模式
  • 工厂模式
  • 流程引擎
  • 建造者模式
  • 原型模式
  • 责任链模式
  • 观察者模式
  • 策略模式

大纲

还是老规矩从上图五个方面来分别具体和大家聊聊模版方法模式

定义

模版方法模式的定义以及目的?

  • 定义:模板方法模式在一个方法中定义一个算法骨架,并将某些步骤推迟到子类中实现。模板方法模式可以让子类在不改变算法整体结构的情况下,重新定义算法中的某些步骤
  • 目的:1.使用模版方法模式的目的是避免编写重复代码,以便开发人员可以专注于核心业务逻辑的实现

​ 2.解决接口与接口实现类之间继承矛盾问题

以上定义来自《设计模式之美》

结构图:

  • AbstractTemplate(抽象模版):定义一系列抽象方法,或者实现的方法,又或者是钩子方法。即:定义流程
  • ConcreteTemplate(具体模版):实现父类抽象方法,基于本身不同的模版业务逻辑,实现不同的业务逻辑代码。即:抽象方法实现相同,内部逻辑不同

整个结构图看起来还是很简单的,但是还是要理解设计模式解决什么问题。

代码实现?还是举例吧。

还是以上面的请假举例吧,假设现在A公司请假需要直属领导审批以及通知HR有人请假了就可以了,B公司需要直属领导,部门负责人审批最后通知HR,方能完成整个请假流程。那作为OA办公流程怎么去处理这个问题嘛?直接看代码实现吧!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public abstract class AskForLeaveFlow {

// 一级组长直接审批
protected abstract void firstGroupLeader(String name);

// 二级组长部门负责人审批
protected void secondGroupLeader(String name) {
}

// 告知HR有人请假了
private final void notifyHr(String name) {
System.out.println("当前有人请假了,请假人:" + name);
}

// 请假流模版
public void askForLeave(String name) {
firstGroupLeader(name);
secondGroupLeader(name);
notifyHr(name);
}

}

首先还是定义一个请假流程,其中:

firstGroupLeader方法为abstract修饰,则作为子类都是必须要实现的

secondGroupLeader 二级领导审批,在子类中可以重写,也可不重写

notifyHr 方法为通知HR,已经内部实现

最后一个askForLeave请假流程方法,把以上模版方法串起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public class CompanyA extends AskForLeaveFlow {

@Override
protected void firstGroupLeader(String name) {
System.out.println("CompanyA 组内有人请假,请假人:" + name);
}
}

public class CompanyB extends AskForLeaveFlow {
@Override
protected void firstGroupLeader(String name) {
System.out.println("CompanyB 组内有人请假,请假人:" + name);
}
@Override
protected void secondGroupLeader(String name){
System.out.println("CompanyB 部门有人请假,请假人:" + name);
}
}

在CompanyA以及CompanyB中,secondGroupLeader二级领导可以选择重写或者不重写,这个类模版方法简称为钩子方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public class testTemplate {
public static void main(String[] args) {
// 公司A请假流程模版
AskForLeaveFlow companyA = new CompanyA();
companyA.askForLeave("敖丙");
// 结果:CompanyA 组内有人请假,请假人:敖丙
// 当前有人请假了,请假人:敖丙

AskForLeaveFlow companyB = new CompanyB();
companyB.askForLeave("敖丙");
// 结果:CompanyB 组内有人请假,请假人:敖丙
// CompanyB 部门有人请假,请假人:敖丙
// 当前有人请假了,请假人:敖丙
}
}

最后就是看测试dome结果了。companyA和companyB分别输出了对应的请假流程。

细心的同学可能已经发现了,做为模版方法中里面除了可以有抽象方法外,还可以有具体的实现方法以及钩子方法。

所以大家在应用的过程可以多考虑考虑在内部定义模版方法时,应该定义成抽象方法还是其它的。

框架中的应用

模版方法模式在我们常见的Java的框架中也是非常常见的,只是可能我们平时没有注意到这一点而已。

第一个:首先我们学SpringMVC的时候,最开始都会写一些Servlet来作为处理一些post或者get请求等。

这里直接看这个源码大家就可以发现这也是直接使用模版方法模式的思想,期间在HttpServlet 继承GenericServlet中也还是模版方法的体现,这说明了可以多次抽象构建模版。

第二个:常见问的文件流中,Java IO 类中的InputStream、OutputStream、Reader、Writer等都能看到模版方法模式的身影。

上面是我贴出的部分InputStream的源码,主要看这个read模版方法,也就是模版方法模式的体现。

当然IO类中还有很多其他的,我就不一一贴源码出来了,感情兴趣的同学,可以自己打开源码了解了解。

业务举例

在业务中怎么使用模版方法?

首先需要理解模版方法它是为了增加代码的复用性,以及扩展性而存在的,所以本着这个思想我还是给大家举一个例子吧。

之前写责任链模式最后给大家举例商品详情,这次还是用商品详情,但是用模版方法模式来实现这个问题,理解为商详2.0版本。

商品详情展示我们可以是分模块展示的,比如头图,商品信息,sku信息,配送地址,分期付费等等。

那么怎么进行组装到商品详情的展示呢?

流程图:

可以看到一个请求过来,可以有模块组装器选择组装返回结果。

提一个点,在第二步请求的模块的时候为了减少整个链路的请求时间可以考虑是串行,或者并行(开线程池处理)。

接下来直接看代码吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public abstract class AbstractTemplateBlock<T> {
// 组装结果
public T template(ModelContainer modelContainer) {
T block = initBlock();
try {
this.doWork(modelContainer, block);
} catch (Exception e) {
// 可以选择捕获异常,是中断流程,还是只打印日志,不中断流程
}
return block;
}
// 初始化构建返回结果模型
protected abstract T initBlock();
// 定义抽象模版
protected abstract void doWork(ModelContainer modelContainer, T block) throws Exception;
}

还是先创建模版Block

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码@Component
public class ItemInfoBlock extends AbstractTemplateBlock<ItemInfoBlock.ItemInfo> {
@Override
protected ItemInfoBlock.ItemInfo initBlock() {
return new ItemInfoBlock.ItemInfo();
}

// 模拟业务逻辑,组装返回商品信息模块数据
@Override
protected void doWork(ModelContainer modelContainer, ItemInfo block) throws Exception {
block.setItemId(123L);
block.setItemName("测试");
}
@Data
public static class ItemInfo {
private Long itemId;
private String itemName;
}
}

这里只写了一个ItemInfoBlock,其他的模块也是这一样的写法,所以就不全写出来了。

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码    public static void main(String[] args) {
// 1.模拟获取SpringBean
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
ItemInfoBlock itemInfoBlock = (ItemInfoBlock) applicationContext.getBean("itemInfoBlock");

// 2. ModelContainer可以理解为贯穿上下文中的请求参数,或者一些组装数据需要的预加载数据
ModelContainer modelContainer = new ModelContainer();
// 3. 获取返回结果
ItemInfoBlock.ItemInfo itemInfo = itemInfoBlock.template(modelContainer);
System.out.println(JSON.toJSONString(itemInfo));
// 结果:{"itemId":123,"itemName":"测试"}
}

最后就是看测试demo了,可以看到再每一个模块中都是有一个AbstractTemplateBlock,内部包含doWork抽象方法,由子类去实现当前自己的业务逻辑。

同时第三步获取返回结果时,我只是单独列出来,大家可以根据实际情况还能做改造。比如说返回map结构等 mapKey 是模块名称,value是数据。

当前这种组装商品详情的模式也是比较常见的一种方式。代码的复用性高,同时扩展性也有一定的体现,符合模版方法模式的思想。

总结

模版方法模式的特点大家应该也能体会到了,适用场景还是为了增加代码的复用性,以及扩展性。

还是那句话存在即合理,不要因设计模式而在写代码时强行嵌套。合理的学习每种设计模式适合场景,解决什么问题。

宝,明天我可能无法正常更新了,我生病了在输液。

我是敖丙,你知道的越多,你不知道的越多,感谢各位人才的:点赞、收藏和评论,我们下期见!


文章持续更新,可以微信搜一搜「 三太子敖丙 」第一时间阅读,回复【资料】有我准备的一线大厂面试资料和简历模板,本文 GitHub github.com/JavaFamily 已经收录,有大厂面试完整考点,欢迎Star。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

巧用策略模式完美应付产品四次需求变更,也吵了四次|2021

发表于 2021-06-16

前言

  • 设计模式大家应该很熟悉了,使用最多的应该就是工厂模式。关于工厂模式有简单工厂、懒汉工厂、饿汉工厂等等形式,今天我们结合项目场景来总结下策略模式

项目需求

image-20210615101701014

  • 上面是我们需求效果图!我们需要针对个人对本年度指标完成情况进行一次统计。
  • 比如上面test用户在xxx年份中有5个指标考核。每个指标考核维度不一样,对于指标1考核目标是一天施工一次,然后对一个月进行汇总考核有点类似于上班打卡的形式。而他打卡的方式就是后方的施工按钮。对于指标1他只需要每天点击施工填写响应的数据即可!关于施工后填写数据就是一份表单数据这个是非常简单的。
  • 包括后面1月1次、1季度1次、半年1次、1年1次这些考核频率都是同样的操作。这份报表统计应该也很容易没啥难点。我们只需要在生成的时候按照月份查找施工记录是否符合当前频率要求!

image-20210615111422888

  • 在项目中对频率进行判断。不同的频率我们做不一样的逻辑就完事了。这个可以说是初级程序!!!这样完全没有考虑到语言或者说设计模式。这种思路实现完全没有问题但是没有考虑到后续的扩展性!
  • 首先代码会变得很长不利于他人阅读理解。其次是没有做到相互隔离如果其中一个分支出现问题需要进行修改这时候对其他的分支实际上是一次污染,因为他们都在一起,因为修改某个分支从而导致其他分支异常的情况是经常出现的。这种做法也是我之前最喜欢的方式—不知道已设计模式的角度考虑问题

瓶颈突破

  • 成长是一条艰辛的道路,但是对于别人来说可能就是那么一瞬间!初级成长成中级程序员我们需要多少努力只有经历之后才知道但是对别人来说可能会认为是一次代码的事情!今天我就抛弃到传统的流水账形式开发,结合项目实战运用起设计模式。
  • 经过仔细分析了之后觉得设计模式中的策略模式很是适合该需求。我们可以将频率抽象成策略 ,我们先来看看下面我的图示

image-20210615111158546

  • 首先将策略进行抽象成AbstrachTimeStrategy 该类主要责任就是对数据进行分析,接收两个参数分别是Map , bean 。前者是用户月份详情数据,后者是根据频率相关的解析数据。在每个策略中针对单一的场景进行具体分析,每个类具有了单一的职责这样将他们进行分离,比如说上面PerHalfSomeTimesStrategy出现的bug那么我们只需要对他进行修复,其他的策略是不会受影响的。
  • 这是从宏观层面进行相互隔离。我们进入到每个策略中也是很简单了。每个策略我们只需要定制化的考虑具体的场景。比如PerDaySomeTimesStrategy策略对应的一天N次的频率。我们在里面只需要获取到指定月份的施工记录并对这些记录按天分组,最终看看每天的完成情况是否满足我们N次的要求就可以了。而不需要去考虑季度、年的频率问题
  • 运用策略模式还有一个好处是我们可以随意的装配!如果现在我们的需求是多个频率只需要满足其中一个就可以了。那么我们只需要将多个频率对应的策略进行分析下就可以得出结论!如果是之前流水账形式的开发的话我们肯定是需要将代码进行重构下!说的好听的是重构,难听的就个话其词了。
  • 当然除了策略以外,我们还需要一个工厂类。这个工厂类是做啥子用的呢?因为策略本身只需要考虑策略的场景。至于谁去组装这些策略那就是我们的工厂的活了。

策略组装

image-20210615114206964

  • 首相该工厂提供了注册器和获取对应策略的两个入口。另外注册器内部是将具体策略已别名的方式注册到内部的一个map中。为了方便我直接在工厂中添加了目前我们需求确定的策略。后续如果需求变动我就不在对工厂进行修改而是直接通过注册器向内部进行注册新的策略。
1
2
3
4
5
6
7
8
java复制代码static {
strategyMap.put("month", new PerMonthSomeTimesStrategy());
strategyMap.put("halfMonth", new PerHalfMonthSomeTimesStrategy());
strategyMap.put("year", new PerYearSomeTimesStrategy());
strategyMap.put("day", new PerDaySomeTimesStrategy());
strategyMap.put("squash", new PerSquarshSomeTimesStrategy());
strategyMap.put("halfyear", new PerHalfYearSomeTimesStrategy());
}
  • 上述就是我们系统内部确定的6中策略。剩下的就是我们在系统需要借助策略分析的时候通过工厂的获取策略器进行分析
1
java复制代码TimeStrategy strategy = ContextTimeStrategyFactory.getInstance().getStrategy(cycle.getCycleName());
  • 这样子的代码看起来真的很舒服!就算是其他人来阅读也很容易理解。而且还用担心其他接手的XD改出问题!想一想当我将内置的功能如果进行maven进行分模块开发那么其他人不论怎么修改都不会影响到我们已有的6中策略了。设计模式真的是我们程序的救星啊!

需求变更

  • 实际开发中面对需求变更我们程序员都是如何面对的呢?不管怎么面对最终只能默默的接受!上面我们的需求是统计个人指标完成情况。针对该需求我特意仔细研究了一下最终决定使用策略模式。策略模式的好处就是方便扩展,这么我们的产品又提出一种方式需要按照周的维度去考核完成情况。接到这个需求我是悲喜各占一半吧!假装性的吐槽了下产品为什么没有在初期确定好需求。但是实际上在策略模式的基础上还是很好扩展这个功能的。

image-20210615133219606

  • 对!就是这么简单我们只需要实现一个继承了抽象算法类的接口,然后将新实现的算法注册到工厂内部map中。然后就会在对应的频率中使用到我们的新的策略!我也是嗖嗖嗖嗖的进行了升级不一会就完成了!本次需求的变更完全是意料之中的升级!交工之际还不忘感慨下自己的明智!!

需求白热化

落地

  • 有时候一度认为产品就是程序员的天敌。他们会无时无刻折磨你!这不刚刚新增了一个策略需求还没等你缓过来需求又改动了!我也只能兵来将挡水来土掩!
  • 之前是在月末进行统一计算,因为用户对数据没有实时性要求所以我们是定时器在每月末进行计算的。现在需求需要做到计算是实时计算的。这就让我这个本不富裕的程序员更加的雪上加霜了!这不是逼着我加班吗?下面就开始了我和产品之间的【火热讨论】了。作为弱势群体最终结果可想而知
  • 那么我们在对需要进行一次分析。我们的考核是针对个人在指定周期内的,所以想要实现实时的那么我们就得监控到施工记录中的数据。根据施工中变动的数据决定对哪些人进行重新计算考核情况

image-20210615135557541

  • 施工记录中张三数据发生了新增、修改、删除等操作都会影响到我们考核详情中张三对应的数据情况。我们根据姓名能够匹配到考核对象在根据张三施工记录这条数据的时间可以匹配到具体会影响到哪一月的数据;根据这些信息我们能够精准定位到数据。
  • 定位到数据之后我们就需要对数据进行更新!这涉及到到两种方式:增量+全量
  • 如果是增量我们需要区分是新增、修改、删除的情况在对考核完成数量进行响应的增加、减少;全局考虑一下增量是最适合的方式因为这样计算量是最小的但是对于我们的编程工作量确很大!作为资深摸鱼师我果断放弃了这种方式,选择全量模式
  • 全量模式相对简单很多!我们只需要检测到对应的人员的时间发生施工变化时我们就重新计算该人员的月份下的完成数据!当然我也不是完全为了自己方便选择全量模式的。也是结合了数据量的大小决定的。因为实际生产中一个月的数据不会过200!这个是分析了客户之前的历史数据得出的结论!数据是不会说谎的。

增量

  • 虽然最终没有选择增量,但是在我还是实现了增量模式的骨架!也是为了方便以后扩展。如果日后数据量出现激增的情况增量就是最优方式。关于增量上面说了需要考虑【施工记录】的新增、修改、删除的情况。而策略模式就是为什么解决我们的分支判断的。所以这里显然又是一个策略模式

image-20210615140701908

  • 同样的配方、同样的开发人员我们就是这么任性!通过策略模式工程就可以提供出具体的策略来实现了。剩下的就是在数据新增时通过工厂获取到InsertDataHandlerStrategyImpl算法去解析就行了。
  • 每种策略计算出数据之后根据策略的特性在原有基础上对完成情况进行加减就可以了。
  • 封装好方法之后就开始找负责【施工记录】的同学麻烦他们在响应的方法上添加策略的调度!

解耦

  • 上面不管是增量还是全量都需要【施工记录】的同学对我们模块进行通知!当时找他们的时候也是抱怨不断!抱怨我们总是需求不定给他们带来很多不必要的工作量!
  • 我本人很是赞成他们的抱怨,回来也和我们的产品反应了情况。我是两头受气产品的意见就是这个需求必须做必须准时上线!又是一个黑夜,又是一杯茶一包烟陪我度过。最终我决定使用消息队列来进行解耦这样对【施工记录】的同学来说工作量最小而且日后我们也会摆脱对他们的要求。
  • 最终就是让【施工记录】同学在对施工记录进行增删改查的时候想消息队列中心中投递消息!我们模块在订阅消息,接收到消息后我们根据消息中约定好的类型就可以知道是新增还是修改还是删除了。然后我们就可以调用对应的策略进行增量数据修复了。或者说直接调用全量模式方法进行数据重新计算了。不管怎么样接收到消息之后就是我们自己内部组装的事情了。
  • 而且后续我们还可以基于订阅的消息在进行开发其他的功能!真实一举两得。

我已阵亡

  • 添加周频率考核目标、实时计算考核两个需求的变动已经让我暴跳如雷了。产品又抛出一个需求需要清洗历史数据!理由是项目上线客户需要维护历史数据
  • 不过这个需求我觉得还是很合理的。所以默默地又是自己背下了所有。针对历史数据清洗准确的说也是自己没有考虑那么长远既然需求来了而且很合理那就开始做吧
  • 本次变动不需要我们在架构上有啥改动。唯一改动的就是之前的策略算法。通过消息队列接收到变动后需要计算当前时间和考核时间的关系如果确定是历史数据的则会认定是延期状态。这里还涉及到按季度、半年、年等跨多月的情况。如果是跨多月恰巧数据不再考核月,我们还需要对月份进行纠偏操作。

image-20210615142500178

  • 这样客户在当前时间上传之前月份的数据时状态也会重新计算的。

新增考核依据

  • 所以说永远不能相信产品说的话!这么现在又想修改考核依据。不在已【施工记录】为最终考核依据而是需要将考核依据与指标项进行绑定。也就是说每个指标对应自己的一个考核依据我们需要根据对应的考核依据进行计算!
  • 辛苦之前我和【施工记录】的同学们约定了将操作放到消息队列中!并在整个部门中进行同样的约定。现在的改动就不需要别人配合了。我直接从消息队列中监听相关的考核依据表然后在查出对应指标项就可以了。后面重复我们之前的全量模式和具体的频率策略计算就可以了

项目总结

  • 本来是个简单的功能由于一开始采用的了对的选择,所以在需求不断变更时基本上我还是在不大改代码的情况下完成了功能的升级的!
  • 而且每次需求变动基本上也是对自己当初定下的骨架进行不断的功能完善的。并没有出现因为需求变动导致骨架重新设计的场景。这也足够说明当初选择策略模式的正确性
  • 策略模式其实就是对算法的一种抽象!策略模式往往需要结合工厂模式。每种策略实现的功能是一样的像上述的场景每种策略都是对考核月数据进行考核只不过每个策略的考核依据不同而已。而策略本身值关注算法的实现而不考虑调用。所以借助于工厂来进行维护策略,方便第三方进行策略的调度!
  • 策略模式的理解最好的就是商品售价的落地实现了。商场经常会有各种各样的打折优惠有的甚至是叠加优惠。这种场景我们将各种的优惠抽象成策略我们在工厂中将各种的策略叠加使用就是平时的商品促销了!
  • 文章中也有提到每种策略只需要关心自己本身!也就是说在开发中每种策略我们可以交由不同的开发者去实现!开发者只需要自测自己的策略没问题就可以了!在架构中我们甚至可以将策略模式运用横跨语言,每种实现甚至可以交由不同语言去实现!
  • 那么什么时候应该用策略模式呢?简单的理解就是如果你的代码中出现分情况的场景,就该策略模式出马了

总结

  • 本文通过项目实战的角度应该可以说是透彻的解析了策略模式的场景!之前不想写关于策略模式在项目中的运用的。经过几次的需求变更越来越觉得当初的选择真的明智!虽然现在说起来面对需求变动感觉改的很轻松!但是当时真的是很艰难有的时候实在没办法硬逼着自己改造的。最终才实现出相对满意的模块功能。
  • 经过这个功能的开发慢慢的自己成长很多!不在是串行编程,懂得将特性就行抽象化开发这无疑是对开发者一种质的提升!面对需求的变更我不在去关注代码本身,而是将重心放在项目骨架上!只有骨架满足需求的情况下代码的开发才会变得有意义!
  • 经过一个项目的开发让我学到了设计模式并非理论!他是前人在无数个项目实战中落地总结!善用设计模式可以避免掉很多无意义加班!可以说如果不用设计模式这个项目面对多次需求变更洗礼加班不会比现在少的。而且最终成品应该也只有我自己能够看懂了。用了设计模式就不一样了,后面接手的同学应该也能够理解到我们思路,思路清晰的同学应该可以理解到当初项目需求的变更记录吧
  • 年中将至,关于个人经历总结之前已经专门发了一篇文章陈述了自己这半年以来的成长!而此篇是从技术成长的角度介绍了自己2021的收获。

点赞、关注、产生共鸣!!!

掘金年中主题活动 | 2021 我的半程成长之路征文活动正在进行中……

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

xxl-job 源码这么容易懂,那就随手画图分析下 xxl-

发表于 2021-06-16

声明

阅读本文前, 需要对 xxl-job 的使用有所了解。

正文

本文内容基于 xxl-job v2.2.0 源码。

一、调度中心和执行器

下面是 xxl-job v2.x 的架构图, 该图中包含了两大核心模块, 分别是 调度中心 和 执行器.

  • 调度中心

简单来讲就是一个 管理系统, 用户通过系统提供的管理界面可以创建任务、编辑任务、手动触发任务以及查看任务执行日志, 另外系统后台会不停地把需要执行的任务从数据中心的 任务表 中扫描出来, 然后一个个去触发任务.

  • 执行器

当任务被触发时, 不管是定时触发还是手动触发, 调度中心都会向执行器发送 http 请求, 由执行器负责任务的执行.

在这里插入图片描述

xxl-job 源码目录

在这里插入图片描述

二、创建任务

在这里插入图片描述

从图中可以看出:

  • 任务执行参数包含了诸多内容, 比如任务阻塞处理策略,执行策略等等。
  • 运行模式选择 Bean,这也是本文讲解的重点。
  • 任务执行前,还需要指定一个具体的 JobHandler 去执行。
  • 如果是周期性的任务,它在执行的过程中,我们是可以随时调整执行参数的。比如 JobHandler(这点很重要!!!)。

三、定义任务

1
2
3
4
5
6
7
8
9
10
11
JAVA复制代码@Component
public class SampleXxlJob {
/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public ReturnT<String> demoJobHandler(String param) throws Exception {
// doSomething
return ReturnT.SUCCESS;
}
}

上面通过 Spring 框架注解和 Xxl-Job 框架注解定义了一个任务, 当执行器启动时, 会扫描到这个 SampleXxlJob中的 demoJobHandler方法, 并将该方法封装成一个JobHandler ( 具体实现为MethodJobHandler ) 对象(该 Handler 中的 execute 方法为任务的具体执行逻辑),并把该 JobHandler 注册到容器1中。容器1 的定义如下:

1
2
3
JAVA复制代码// key 为 XxlJob 注解中的 bean 名称
// value 为 JobHandler
ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentMap();

四、任务执行流程

当一个任务被触发(不管是手动触发还是自动触发)时:

  1. 调度中心 会将任务执行参数封装到 TriggerParam 通过 POST 请求传给 执行器.
  2. 如果该任务是第一次执行, 会 new 一个新线程并启动。 JobThread 线程启动后,它会被注册到容器2 中。容器2 定义如下,最后会把 TriggerParam 推送到 线程中维护的一个队列。
1
2
3
4
5
Java复制代码这个新线程的名字为 JobThread, 该线程和 JobHandler 是一一绑定的。我们可以通过 JobThread 获取到此 JobHandler。

// key 为任务 id, jobId
// value 为 JobThread 对象
private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
  1. JobThread 线程启动后,run 方法会不断从 队列 中读取任务执行参数。
  2. 读取到执行参数后, 会把执行参数交给 JobHandler#execute 方法去执行任务。

JobThread 中维护了一个队列, 任务每执行一次,就把执行参数推到该队列中,JobThread 启动后就一直从队列中读取执行参数,直到任务停止。

如果该任务第二次执行,会通过 jobId 从容器 jobThreadRepository 中获取之前缓存的 JobThread。

  • 如果 JobThread 不为空,则取出 JobThread 中的 JobHandler。
  • 根据前面提到的,因为任务每次执行时, JobHandler 是可以更换的。所以这里会判断此次任务执行的 JobHandler 是不是跟上一次的一样。如果不一样就要用新的替换掉旧的。并把缓存的 JobThread 注销掉。重新 new 一个新的 JobThread。
  • 如果 JobHandler 没有发生改变,就复用。

五、源码入口

1
2
3
JAVA复制代码com.xxl.job.core.biz.impl.ExecutorBizImpl#run

com.xxl.job.core.thread.JobThread#run

参考文档

xxl-job 官方文档

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

盘点 Seata Seata Server 启动流程

发表于 2021-06-15

首先分享之前的所有文章 , 欢迎点赞收藏转发三连下次一定 >>>> 😜😜😜

文章合集 : 🎁 juejin.cn/post/694164…

Github : 👉 github.com/black-ant

CASE 备份 : 👉 gitee.com/antblack/ca…

一 .前言

这一篇文章将会开启 Seata 的源码分析流程 , 源码的整体逻辑会比较长 , 预计分为以下几个部分 :

  • Seata Server 启动流程
  • Seata Client 启动流程
  • Seata 的配置加载
  • Seata 的配置处理
  • Seata 事务的处理
  • Seata ID 的流转
  • Seata TCC 模式
  • Seata XA 模式
  • Seata sega 模式
  • Seata Nacos 及其他的服务管理

二 . Seata Server 源码下载及启动

Seata Server 下载地址 , 通过该地址下载 Server Code 即可 , 整个过程中只有2个文件需要变动

Server 项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
xml复制代码<modules>
<module>all</module>
<module>bom</module>
<module>common</module>
<!-- 动态配置 , 配置中心 -->
<module>config</module>
<!-- 核心模块 -->
<module>core</module>
<!-- 服务注册与发现的相关功能-->
<module>discovery</module>
<module>distribution</module>
<!-- 用于对不同框架的集成-->
<module>integration/dubbo</module>
<module>integration/dubbo-alibaba</module>
<module>integration/sofa-rpc</module>
<module>integration/motan</module>
<module>integration/grpc</module>
<module>integration/http</module>
<!-- Seata 对 RM 的核心实现-->
<module>rm</module>
<module>rm-datasource</module>
<!-- Server 运行启动类 , TC 的核心实现 -->
<module>server</module>
<module>spring</module>
<!-- TCC 模块 -->
<module>tcc</module>
<module>test</module>
<!-- Seata 对 TM 的实现,提供了全局事务管理 -->
<module>tm</module>
<module>metrics</module>
<module>serializer</module>
<!-- Spring Boot 自动配置相关均在里面 -->
<module>seata-spring-boot-starter</module>
<module>compressor</module>
<!-- saga 模块 -->
<module>saga</module>
<module>sqlparser</module>
</modules>

源码的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
java复制代码// Step 1 : 打开 Server 模块 Resources 目录 , 修改 Registry.conf
// PS : 可以看到 , 它支持的发现中心有 file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
registry {
type = "nacos"

nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = ""
password = ""
}
}

config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"

nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
dataId = "seataServer.properties"
}
}


// Step 2 : 添加 / 修改 nacos.conf
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.thread-factory.boss-thread-prefix=NettyBoss
transport.thread-factory.worker-thread-prefix=NettyServerNIOWorker
transport.thread-factory.server-executor-thread-prefix=NettyServerBizHandler
transport.thread-factory.share-boss-worker=false
transport.thread-factory.client-selector-thread-prefix=NettyClientSelector
transport.thread-factory.client-selector-thread-size=1
transport.thread-factory.client-worker-thread-prefix=NettyClientWorkerThread
transport.thread-factory.boss-thread-size=1
transport.thread-factory.worker-thread-size=8
transport.shutdown.wait=3
service.vgroup_mapping.order-service-seata-service-group=default
service.vgroup_mapping.account-service-seata-service-group=default
service.vgroup_mapping.storage-service-seata-service-group=default
service.vgroup_mapping.business-service-seata-service-group=default
service.vgroupMapping.order-service-seata-service-group=default
service.vgroupMapping.account-service-seata-service-group=default
service.vgroupMapping.storage-service-seata-service-group=default
service.vgroupMapping.business-service-seata-service-group=default
service.enableDegrade=false
service.disable=false
service.max.commit.retry.timeout=-1
service.max.rollback.retry.timeout=-1
client.async.commit.buffer.limit=10000
client.lock.retry.internal=10
client.lock.retry.times=30
store.mode=db
store.file.dir=file_store/data
store.file.max-branch-session-size=16384
store.file.max-global-session-size=512
store.file.file-write-buffer-cache-size=16384
store.file.flush-disk-mode=async
store.file.session.reload.read_size=100
store.db.driver-class-name=com.mysql.jdbc.Driver
store.db.datasource=dbcp
store.db.db-type=mysql
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
store.db.user=root
store.db.password=123456
store.db.min-conn=1
store.db.max-conn=3
store.db.global.table=global_table
store.db.branch.table=branch_table
store.db.query-limit=100
store.db.lock-table=lock_table
recovery.committing-retry-period=1000
recovery.asyn-committing-retry-period=1000
recovery.rollbacking-retry-period=1000
recovery.timeout-retry-period=1000
transaction.undo.data.validation=true
transaction.undo.log.serialization=jackson
transaction.undo.log.save.days=7
transaction.undo.log.delete.period=86400000
transaction.undo.log.table=undo_log
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registry-type=compact
metrics.exporter-list=prometheus
metrics.exporter-prometheus-port=9898
client.report.retry.count=5
service.disableGlobalTransaction=false
client.support.spring.datasource.autoproxy=true


// Step 3 : 启动 io.seata.server.Server 类

三 . Seata 流程分析

3.1 Main 启动流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
java复制代码// Seata 的启动类是 Server # main , 来看一下这个
public static void main(String[] args) throws IOException {

// 获取 port 端口 , 该端口会用于 log 对象 -> 3.2
int port = PortHelper.getPort(args);
System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));

// 准备 log 对象
final Logger logger = LoggerFactory.getLogger(Server.class);
if (ContainerHelper.isRunningInContainer()) {
logger.info("The server is running in container.");
}

// 初始化参数解析器 -> 3.3
ParameterParser parameterParser = new ParameterParser(args);

// 初始化指标 -> 3.4
MetricsManager.get().init();

System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());

// 线程池的构建
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(
NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(),
NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()),
new ThreadPoolExecutor.CallerRunsPolicy());

// 构建 NettyRemotingServer
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);

// 构建 nettyRemotingServer 的监听端口
nettyRemotingServer.setListenPort(parameterParser.getPort());
UUIDGenerator.init(parameterParser.getServerNode());

//log store mode : file, db, redis
SessionHolder.init(parameterParser.getStoreMode());

DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
// 核心一 : 初始化 DefaultCoordinator
coordinator.init();
nettyRemotingServer.setHandler(coordinator);

// register ShutdownHook
ShutdownHook.getInstance().addDisposable(coordinator);
ShutdownHook.getInstance().addDisposable(nettyRemotingServer);

//127.0.0.1 and 0.0.0.0 are not valid here.
// 设置 XID 后 , 后续生成全局事务id 是会使用
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(nettyRemotingServer.getListenPort());

try {
// 核心二 : 初始化 nettyRemotingServer
nettyRemotingServer.init();
} catch (Throwable e) {
logger.error("nettyServer init error:{}", e.getMessage(), e);
System.exit(-1);
}

System.exit(0);
}

3.2 Port 获取流程

Port 的获取流程很简单 , 没有什么值得深入的地方 , 核心关注点就是 :

  • 区分容器通过不同的方式处理 , 主要是 Docker 和 K8S
  • cat /proc/1/cgroup 获取类型 (cgroup是linux内核实现、用于控制linux系统资源的组件)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public static int getPort(String[] args) {
// 判断是否在容器中运行
if (ContainerHelper.isRunningInContainer()) {
return ContainerHelper.getPort();
} else if (args != null && args.length >= 2) {
for (int i = 0; i < args.length; ++i) {
// 通过参数 -p 获取 port 参数
if ("-p".equalsIgnoreCase(args[i]) && i < args.length - 1) {
return NumberUtils.toInt(args[i + 1], SERVER_DEFAULT_PORT);
}
}
}

return SERVER_DEFAULT_PORT;
}

注意 , 此处会放在 System 中 , 看注释是用于 logback 获取 System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port))

3.3 ParameterParser 的作用

ParameterParser 用于解析启动参数 , 参数解析器应该始终是要执行的第一行.

此处使用 com.beust.jcommander.JCommander 进行参数的处理 , 使用方式 : seata-server.bat -h 127.0.0.1 -p 8086

Seata 中提供了如下几种参数 :

  • –host, -h : 要注册到注册中心的ip
  • –port, -p : 监听端口 , 默认 0
  • –storeMode, -m : 日志存储方式:”file” , “db”
  • –serverNode, -n : 服务器节点id,如1、2、3。它将根据雪花默认生成
  • –seataEnv, -e : 用于多配置隔离的名称
  • –help

PS : 另外 , Seata 中通过 Maven Plugin 做了第二层配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
xml复制代码<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>2.3.0</version>
<configuration>
<to>
<image>docker.io/seataio/seata-server</image>
<tags>
${image.tags}
</tags>
<!-- ..... -->
</to>
<container>
<appRoot>/seata-server</appRoot>
<workingDirectory>/seata-server</workingDirectory>
<mainClass>io.seata.server.Server</mainClass>
<ports>
<port>8091</port>
</ports>
<jvmFlags>
<jvmFlag>-Djava.security.egd=file:/dev/./urandom</jvmFlag>
<jvmFlag>-server</jvmFlag>
<jvmFlag>-Xss512k</jvmFlag>
<jvmFlag>-XX:+UnlockExperimentalVMOptions</jvmFlag>
<!-- ..... -->
</jvmFlags>
<!-- ..... -->
</container>
<!-- ..... -->
</configuration>
</plugin>

可以看到 , 此处配置了 Port 及 JVM 等参数

3.4 MetricsManager.get().init()

** MetricsManager 的作用** :

  1. metrics 是事务协调者
  2. MetricsManager 通过懒汉单例方式生成
  3. 此处通过 metrics.enabled 判断是否开启 metrics

总结一下 : 该工具用于快速详尽的获取到TC、TM(规划中)和RM(规划中)中事务的活动状态以及时延等重要统计信息

当状态有变化,EvenBus会把事件推送给MetricsSubscriber,MetricsSubscriber中调用Registry把度量数据写入。Exporter再定期把度量数据拉出来,发给外部监控系统。


注册方式 :

  1. 从配置中心读取配置,看是否需要初始化metric ( ConfigurationFactory.getInstance().getBoolean/metrics.enabled)
  2. 通过RegistryFactory.getInstance() 初始化 Registry对象
  3. 然后使用 ExporterFactory.getInstanceList() 设置Registry对象
  4. 最后用 EventBusManager注册一个metrics的订阅

成员功能 :
Registry : 定义了getCounter、getSummary、getTimer等接口

Exporter : 发布器,把度量数据同步给对应的监控系统

EventBusManager : metrics的数据来源就是通过订阅EventBus获取的

具体流程后续继续分析 >>>>

3.5 ThreadPoolExecutor 的创建

构建一个线程池 , 此处的参数主要是 NettyServerConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码// 线程池默认属性
// 1 . WORKER_THREAD_SIZE 在 NettyBaseConfig 中进行配置
private int serverSelectorThreads = WORKER_THREAD_SIZE ;
private int serverSocketSendBufSize = 153600;
private int serverSocketResvBufSize = 153600;
private int serverWorkerThreads = WORKER_THREAD_SIZE ;
private int soBackLogSize = 1024 ;
private int writeBufferHighWaterMark = 67108864 ;
private int writeBufferLowWaterMark = 1048576 ;
private static final int DEFAULT_LISTEN_PORT = 8091;
private static final int RPC_REQUEST_TIMEOUT = 30 * 1000;
private int serverChannelMaxIdleTimeSeconds = 30 ;
private static final String EPOLL_WORKER_THREAD_PREFIX = "NettyServerEPollWorker";
private static int minServerPoolSize = 50;
private static int maxServerPoolSize = 500;
private static int maxTaskQueueSize = 20000;
private static int keepAliveTime = 500;

3.6 NettyRemotingServer 的构建

NettyRemotingServer 的作用 :

该类用于实现Netty服务器端功能,接受数据包,在服务器端处理后发送给客户端 , 同时它还用于启动netty,监听服务器端口,接收TM、RM的请求,它还为处理不同的请求创建不同的处理器

可以看到 , 这里是把线程池传入 NettyRemotingServer

1
2
3
4
5
6
7
8
java复制代码NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);

public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {
super(messageExecutor);
// Rpc server bootstrap.
serverBootstrap = new NettyServerBootstrap(nettyServerConfig);
serverBootstrap.setChannelHandlers(new ServerHandler());
}

3.7 NettyRemotingServer 的配置

主要做了这些事情 :

  • 设置端口
  • 初始化 Session 方案
  • 构建 DefaultCoordinator , 并且初始化
1
2
3
4
5
6
7
8
9
java复制代码nettyRemotingServer.setListenPort(parameterParser.getPort()); -> 8091

UUIDGenerator.init(parameterParser.getServerNode());
//log store mode : file, db, redis
SessionHolder.init(parameterParser.getStoreMode());

DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator);

UUIDGenerator 的作用 的作用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码
init : IdWorker idWorker = new IdWorker(serverNode);

Step 1 : 立即初始化时间戳和序列
private void initTimestampAndSequence() {
long timestamp = getNewestTimestamp();
long timestampWithSequence = timestamp << sequenceBits;
this.timestampAndSequence = new AtomicLong(timestampWithSequence);
}

Step 2 : 初始化 workid
private void initWorkerId(Long workerId) {
if (workerId == null) {
// 使用最低10位可用MAC作为workerId
workerId = generateWorkerId();
}
if (workerId > maxWorkerId || workerId < 0) {
String message = String.format("worker Id can't be greater than %d or less than 0", maxWorkerId);
throw new IllegalArgumentException(message);
}
this.workerId = workerId << (timestampBits + sequenceBits);
}

SessionHolder 的处理

  • Step 1 : StoreMode storeMode = StoreMode.get(mode); 生成 Mode 类型对象
  • Step 2 : 通过 storeMode 判断具体的处理类型 (DB , FILE . REDIS)

这里主要碰到了2个对象 :

  • SessionHolder : Session 处理器 , 用于初始化及处理
  • GlobalSession : GlobalSession 是 seata协调器DefaultCoordinator管理维护的重要部件

Session 主要处理那些数据 :

  • String ROOT_SESSION_MANAGER_NAME = “root.data”;
  • String ASYNC_COMMITTING_SESSION_MANAGER_NAME = “async.commit.data”;
  • String RETRY_COMMITTING_SESSION_MANAGER_NAME = “retry.commit.data”;
  • String RETRY_ROLLBACKING_SESSION_MANAGER_NAME = “retry.rollback.data”;

SessionHolder 中会为这四个数据创建 Object ,存入 Session

3.8 构建和初始化 DefaultCoordinator

seata-DefaultCoordinator-system.png

作用 : DefaultCoordinator 为TC协调器 (事务控制协调器)

结构 :

  • 继承AbstractTCInboundHandler接口(为TC接受到RM和TM的request请求数据)

  • 实现TransactionMessageHandler接口(处理RPC消息)

  • 实现ResourceManagerInbound接口 (处理发送至RM的branchCommit,branchRollback请求)

整体结构如下图所示 :

seata-server-coordinator.jpg

DefaultCoordinator init 流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
java复制代码    public void init() {
retryRollbacking.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.retryRollbackingLock();
if (lock) {
try {
handleRetryRollbacking();
} catch (Exception e) {
LOGGER.info("Exception retry rollbacking ... ", e);
} finally {
SessionHolder.unRetryRollbackingLock();
}
}
}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

retryCommitting.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.retryCommittingLock();
if (lock) {
try {
handleRetryCommitting();
} catch (Exception e) {
LOGGER.info("Exception retry committing ... ", e);
} finally {
SessionHolder.unRetryCommittingLock();
}
}
}, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

asyncCommitting.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.asyncCommittingLock();
if (lock) {
try {
handleAsyncCommitting();
} catch (Exception e) {
LOGGER.info("Exception async committing ... ", e);
} finally {
SessionHolder.unAsyncCommittingLock();
}
}
}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

timeoutCheck.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.txTimeoutCheckLock();
if (lock) {
try {
timeoutCheck();
} catch (Exception e) {
LOGGER.info("Exception timeout checking ... ", e);
} finally {
SessionHolder.unTxTimeoutCheckLock();
}
}
}, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);

undoLogDelete.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.undoLogDeleteLock();
if (lock) {
try {
undoLogDelete();
} catch (Exception e) {
LOGGER.info("Exception undoLog deleting ... ", e);
} finally {
SessionHolder.unUndoLogDeleteLock();
}
}
}, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}

3.9 ShutdownHook 的作用

ShutdownHook 主要用于销毁流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码ShutdownHook.getInstance().addDisposable(coordinator);
ShutdownHook.getInstance().addDisposable(nettyRemotingServer);

public class ShutdownHook extends Thread {

private static final ShutdownHook SHUTDOWN_HOOK = new ShutdownHook("ShutdownHook");
private final PriorityQueue<DisposablePriorityWrapper> disposables = new PriorityQueue<>();
private final AtomicBoolean destroyed = new AtomicBoolean(false);

// 默认10。值越低优先级越高
private static final int DEFAULT_PRIORITY = 10;


// 主要方法 :
addDisposable() : 添加实例
destroyAll() : 销毁所有的实例
- Disposable disposable = disposables.poll();
- disposable.destroy();

}


// 调用的逻辑
GlobalTransactionScanner#distory
DefaultSagaTransactionalTemplate#distory

3.10 XID 的作用

之前看到 , 前面对 XID 进行了设置 , 加入了 Address 和 Port , 它实际上会在后面设置全局ID的时候使用 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(nettyRemotingServer.getListenPort());


// XID 是一个全局的 事务ID 对象
public class XID {
private static int port;
private static String ipAddress;

}

public static String generateXID(long tranId) {
return ipAddress + ":" + port + ":" + tranId;
}

public static long getTransactionId(String xid) {
if (xid == null) {
return -1;
}

int idx = xid.lastIndexOf(":");
return Long.parseLong(xid.substring(idx + 1));
}

3.11 nettyRemotingServer.init() 初始化流程

Step Start : init 入口

1
2
3
4
5
6
7
java复制代码public void init() {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
}
}

Step 2 : registerProcessor 处理流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
java复制代码
private void registerProcessor() {
// 1. 注册表上的请求消息处理器
ServerOnRequestProcessor onRequestProcessor =
new ServerOnRequestProcessor(this, getHandler());
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
// 2. 注册表上的响应消息处理器
ServerOnResponseProcessor onResponseProcessor =
new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
// 3. 注册表信息处理器
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
// 4. 注册表信息处理器
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
// 5. 注册表心跳消息处理器
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}


// PS : 这里来看一下 Registry 到底注册了什么
protected final HashMap<Integer, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
this.processorTable.put(messageType, pair);
}

// PS : processorTable 的使用
C- AbstractNettyRemoting
M- processMessage : Rpc消息处理

protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {

Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
// 从 processorTable 中取出 Pair 对象
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
if (pair.getSecond() != null) {
try {
// ExecutorService 处理
pair.getSecond().execute(() -> {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
//.......
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
//..............
}
}
}
}
}

init 处理操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
java复制代码
// Init 入口类
public void init() {
// Step 1 : 调用父类 init 方法
super.init();
// Step 2 : 调用 serverBootstrap 启动流程
serverBootstrap.start();
}

// Step 1 : 调用父类 init 方法
ConcurrentHashMap<Integer, MessageFuture> futures = new ConcurrentHashMap<>();
public void init() {
// 延迟 3 秒执行 , 3秒执行一次
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//
for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
if (entry.getValue().isTimeout()) {
futures.remove(entry.getKey());
entry.getValue().setResultMessage(null);
}
}

nowMills = System.currentTimeMillis();
}
}, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}


// Step 2 : 调用 serverBootstrap 启动流程
public void start() {
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
.channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark()))
.localAddress(new InetSocketAddress(listenPort))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
.addLast(new ProtocolV1Decoder())
.addLast(new ProtocolV1Encoder());
if (channelHandlers != null) {
addChannelPipelineLast(ch, channelHandlers);
}

}
});

try {
ChannelFuture future = this.serverBootstrap.bind(listenPort).sync();
LOGGER.info("Server started, listen port: {}", listenPort);
RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
initialized.set(true);
future.channel().closeFuture().sync();
} catch (Exception exx) {
throw new RuntimeException(exx);
}

}

// 这里涉及到 NettyServerBootstrap , 该对象用于构建 RPC Server , 其中 会调用 ServerBootstrap 的处理流程

PS : 后续是 Netty ServerBootstrap 的处理 , 考虑以后说 Netty 的时候单独说说

这里简单说一下就是 : 

  • group : 为父(接受方)和子(客户端)设置EventLoopGroup。这些EventLoopGroup的用于处理ServerChannel和Channel的所有事件和IO。
  • channel : 用于从中创建Channel实例的类。
  • ChannelOption : 允许指定一个ChannelOption,用于创建Channel实例。
  • childOption : 允许指定一个 ChannelOption,用于创建Channel实例(在接受方接受了 Channel之后)。

3.12 其他

System 处理

初始化异常 : System.exit(-1)

初始化成功 : System.exit(0)

终止当前运行的Java虚拟机。参数作为一种状态代码;按照约定,非零状态码表示异常终止。

调用System.exit(n)有效地等价于调用:Runtime.getRuntime ().exit(n)

总结

因为是一篇为了入门的文章 , 所以文章相对简单 , 也没有涉及太多的流程 .

主要是为了完善整个体系 , 并且开启 Seata 的流程.

核心步骤 :

1 . MetricsManager.get().init() :初始化指标

2 . new ThreadPoolExecutor : 构建线程池

3 . new NettyRemotingServer : 构建 Netty Server

4 . SessionHolder.init

5 . 构建全局事务 ID

6 . nettyRemotingServer.init()

更新日志

  • V20210805 : 添加 总结

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…641642643…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%