开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

9个REST API设计的基本准则

发表于 2021-05-28

通常情况下,在项目开发过程中涉及的API设计是采用REST API的模式,但并没有制定一个严格的、可理解的、可扩展的规范,从长远来看,随着项目的不断迭代,特别的在赶工期的情况下,REST API就会出现偏移。因此建议在项目初期就建立严格的API设计规范。

个人觉得好的API设计规范可能避免很多不必要的沟通,提高项目开发效率,接口本身就是文档。下面是我觉得REST API必须要遵守的9个设计规范。

1、使用HTTP方法赋予端点意义

REST API鼓励为应用程序的每个CRUD操作使用不同的HTTP方法。其中,有以下几种:GET、POST、PUT、DELETE和PATCH。与资源相关联的端点的名称必须与应用的操作相关的HTTP方法对应。

1
2
3
4
5
6
7
8
9
10
11
bash复制代码// 不好的设计
GET /get_articles
POST /insert_articles
PUT /modify_articles
DELETE /delete_articles

//建议的设计
GET /articles
POST /articles
PUT /articles
DELETE /articles

2、状态码须根据API的数据结果给出

应用程序最重要的特性之一是端点的返回与相应的状态码相关。这意味着,当我们的结果是成功还是失败的时候,可以用一种更描述性的方式来表达想要传达的数据。

例如,如果得到状态码200,则可以立即知道API请求的结果是成功的,否则,如果得到状态码400,结果是失败的。

重要的是要知道现有的状态码,并知道在什么情况下应用它们,因为返回消息可能与某些状态码错误地关联在一起(这是很常见的),这对于正常开发过程非常不友好,会给开发人员带来困惑。

1
2
3
4
5
6
7
8
9
10
json复制代码// 不好的设计
{
"status": 200,
"error": {...}
}
// 建议的设计
{
"status": 200,
"data": [...]
}

一些常见的HTTP状态码包括:

  • 200:成功的请求,通常是GET
  • 201:创建后成功请求,通常是POST
  • 204:成功的请求,没有返回任何内容,通常是PUT或PATCH
  • 301:永久重定向到另一个端点
  • 400:错误的请求(客户端应修改请求)
  • 401:未经授权,凭据未被识别
  • 403:禁止,凭据已接受但没有权限
  • 404:找不到资源不存在
  • 410:该资源先前已存在,但现在不存在
  • 429:请求太多,用于速率限制,并且应包含重试标头
  • 500:服务器错误,是通用的,值得一看的是其他500级错误
  • 503:服务不可用,其中重试标头有用的另一个服务

3、过滤、排序和分页支持

在应用程序中,过滤、排序及分页是常见的,这样有助于用户查找到需要的信息,同样也能减少服务器资源的消耗(数据量大的情况下不分页可能会带来服务器资源的耗尽)。

  • GET /articles?title=api&cate_id=1 :过滤、检索所有具有以下属性的文章列表:标题为api,类型id为1。
  • GET /articles?limit=10&offset=0 :分页,从0行开始返回10条。
  • GET /articles?limit=10&offset=0&sort=asc&order=title :排序,返回按名称升序排列的文章记录。

4、一致性端点设计

经常遇到的关于各种API开发的讨论之一就是如何设计端点?是使用单数还是复数。简而言之,我们希望在应用程序中保持API设计的一致性,为此,建议以复数形式来构建端点。

资源不会总是有一个结果,一个表可能会有许多结果,即使它只有一个结果,并且将其转为单数形式,我们也很难保持路由名称格式的一致性。

1
2
3
4
5
6
7
bash复制代码// 不好的设计
GET /article
GET /article/:id

// 建议的设计
GET /articles
GET /articles/:id

5、用资源名称命名端点

谈到一致性,如果我们知道路由负责处理资源上的操作,那么有必要直接用资源的名称来命名端点,这样当开发人员使用API的时候,就知道API在处理哪些实体数据。

例如,需要增加订单,则不能/articles作为端点,那就非常糟糕了。

6、资源层级设计

如果我们需要访问资源相关的实体数据怎么办呢?为了体现这种层级关系,有两个方式可以参考:

  • 以端点分层设计
  • 以参数设计

让我们以“作者”和“文章”的经典示例为例。

1
2
bash复制代码GET /authors/quintion/articles/rest-api-design
GET /articles?author=quintion&title=rest-api-design

这些方法都是有效的,在很多平台上都已看到过它们。就个人而言,我认为使用查询字符串比扩展当前路径更简洁。应用程序扩展得越多,我们肯定会具有更大的层次结构,反过来,路由也会扩展。即使这样,它还是根据每个人的标准,使用最喜欢的一种。

7、版本控制

随着项目的迭代,不可避免的是要有一个稳定且明确的API版本,且没有错误和歧义。假设我们部署了API,并且有几个客户端开始使用它,那么当需要从资源中添加或删除更多数据时,会发生什么情况?可能会在使用我们接口的外部服务上产生错误。这就是为什么对我们的应用程序要有具有版本控制的原因。

有几种方法,但我是URI版本的支持者,在该版本中,我们将在端点中明确拥有路由的版本。

1
2
3
bash复制代码// URI 版本 v[x] 
GET /v1/news
GET /v2/articles

8、缓存机制

高速缓存是一种可以提高API速度和降低资源消耗的强大工具之一,其原理就是对相同结果的请求,减少对数据库的操作。有多种方式可以帮助我们实现缓存系统,其中之一文件缓存,如 Redis。

当然实现缓存机制通常也会带来成本,原则就是需要问一下:信息是动态的还是静态的?如果是动态的,信息多久更改一次?

重要的是要知道缓存中有很长时间的信息,这可能会由于长时间保留信息而导致API的错误结果,建议缓存时间设计得短一点。

9、设计文档

文档是项目开发和协作最好的工具之一,也是很多研发人员最讨厌的。在这种情况下,文档化的API是必不可少的,以便使用我们API的用户可以理解我们界面的几个重要方面,包括可访问性、响应、请求、示例。

  • 可访问性:界面的位置和交互是最重要的特征之一,我们不想给客户一个操作手册文档。将我们的文档公开在云上,让每个人都能看到,这是我们能做的最方便的事情
  • 响应和请求:我们提供的信息必须考虑任何资源可能产生的所有可能结果以及如何使用它们。
  • 示例:提供如何使用接口的示例非常重要,即使它是一个可以在控制台中执行并从中获得响应的bash脚本。

结论

设计的API水平如何能够暴露一个开发人员的资质,一个API的最佳设计,能够使项目中的所有人员都喜欢。当然一个最佳的REST API实践还有其他的原则,并且需要开发人员共同保持一致性。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

数据库连接池到底应该设多大?

发表于 2021-05-28

数据库连接池的配置是开发者们常常搞出坑的地方,在配置数据库连接池时,有几个可以说是和直觉背道而驰的原则需要明确。

1万并发用户访问

想象你有一个网站,压力虽然还没到Facebook那个级别,但也有个1万上下的并发访问——也就是说差不多2万左右的TPS。那么这个网站的数据库连接池应该设置成多大呢?结果可能会让你惊讶,因为这个问题的正确问法是:

“这个网站的数据库连接池应该设置成多小呢?”
下面这个视频是Oracle Real World Performance Group发布的,请先看完:www.dailymotion.com/video/x2s8u…

(因为这视频是英文解说且没有字幕,我替大家做一下简单的概括:) 视频中对Oracle数据库进行压力测试,9600并发线程进行数据库操作,每两次访问数据库的操作之间sleep 550ms,一开始设置的中间件线程池大小为2048:
在这里插入图片描述
初始的配置

压测跑起来之后是这个样子的:
在这里插入图片描述
2048连接时的性能数据

每个请求要在连接池队列里等待33ms,获得连接后执行SQL需要77ms

此时数据库的等待事件是这个熊样的:
在这里插入图片描述
各种buffer busy waits

各种buffer busy waits,数据库CPU在95%左右(这张图里没截到CPU)

接下来,把中间件连接池减到1024(并发什么的都不变),性能数据变成了这样:
在这里插入图片描述
连接池降到1024后

获取链接等待时长没怎么变,但是执行SQL的耗时减少了。下面这张图,上半部分是wait,下半部分是吞吐量
在这里插入图片描述
wait和吞吐量

能看到,中间件连接池从2048减半之后,吐吞量没变,但wait事件减少了一半。

接下来,把数据库连接池减到96,并发线程数仍然是9600不变。
在这里插入图片描述
96个连接时的性能数据

队列平均等待1ms,执行SQL平均耗时2ms。
在这里插入图片描述
wait事件几乎没了,吞吐量上升。

没有调整任何其他东西,仅仅只是缩小了中间件层的数据库连接池,就把请求响应时间从100ms左右缩短到了3ms。

But why?

为什么nginx只用4个线程发挥出的性能就大大超越了100个进程的Apache HTTPD?回想一下计算机科学的基础知识,答案其实是很明显的。

即使是单核CPU的计算机也能“同时”运行数百个线程。但我们都[应该]知道这只不过是操作系统用时间分片玩的一个小把戏。一颗CPU核心同一时刻只能执行一个线程,然后操作系统切换上下文,核心开始执行另一个线程的代码,以此类推。给定一颗CPU核心,其顺序执行A和B永远比通过时间分片“同时”执行A和B要快,这是一条计算机科学的基本法则。一旦线程的数量超过了CPU核心的数量,再增加线程数系统就只会更慢,而不是更快。

这几乎就是真理了……

有限的资源

上面的说法只能说是接近真理,但还并没有这么简单,有一些其他的因素需要加入。当我们寻找数据库的性能瓶颈时,总是可以将其归为三类:CPU、磁盘、网络。把内存加进来也没有错,但比起磁盘和网络,内存的带宽要高出好几个数量级,所以就先不加了。

如果我们无视磁盘和网络,那么结论就非常简单。在一个8核的服务器上,设定连接/线程数为8能够提供最优的性能,再增加连接数就会因上下文切换的损耗导致性能下降。数据库通常把数据存储在磁盘上,磁盘又通常是由一些旋转着的金属碟片和一个装在步进马达上的读写头组成的。读/写头同一时刻只能出现在一个地方,然后它必须“寻址”到另外一个位置来执行另一次读写操作。所以就有了寻址的耗时,此外还有旋回耗时,读写头需要等待碟片上的目标数据“旋转到位”才能进行操作。使用缓存当然是能够提升性能的,但上述原理仍然成立。

在这一时间段(即”I/O等待”)内,线程是在“阻塞”着等待磁盘,此时操作系统可以将那个空闲的CPU核心用于服务其他线程。所以,由于线程总是在I/O上阻塞,我们可以让线程/连接数比CPU核心多一些,这样能够在同样的时间内完成更多的工作。

那么应该多多少呢?这要取决于磁盘。较新型的SSD不需要寻址,也没有旋转的碟片。可别想当然地认为“SSD速度更快,所以我们应该增加线程数”,恰恰相反,无需寻址和没有旋回耗时意味着更少的阻塞,所以更少的线程[更接近于CPU核心数]会发挥出更高的性能。只有当阻塞创造了更多的执行机会时,更多的线程数才能发挥出更好的性能。

网络和磁盘类似。通过以太网接口读写数据时也会形成阻塞,10G带宽会比1G带宽的阻塞少一些,1G带宽又会比100M带宽的阻塞少一些。不过网络通常是放在第三位考虑的,有些人会在性能计算中忽略它们。
在这里插入图片描述
上图是PostgreSQL的benchmark数据,可以看到TPS增长率从50个连接数开始变缓。在上面Oracle的视频中,他们把连接数从2048降到了96,实际上96都太高了,除非服务器有16或32颗核心。

计算公式

下面的公式是由PostgreSQL提供的,不过我们认为可以广泛地应用于大多数数据库产品。你应该模拟预期的访问量,并从这一公式开始测试你的应用,寻找最合适的连接数值。

连接数 = ((核心数 * 2) + 有效磁盘数)

核心数不应包含超线程(hyper thread),即使打开了hyperthreading也是。如果活跃数据全部被缓存了,那么有效磁盘数是0,随着缓存命中率的下降,有效磁盘数逐渐趋近于实际的磁盘数。这一公式作用于SSD时的效果如何尚未有分析。

按这个公式,你的4核i7数据库服务器的连接池大小应该为((4 * 2) + 1) = 9。取个整就算是是10吧。是不是觉得太小了?跑个性能测试试一下,我们保证它能轻松搞定3000用户以6000TPS的速率并发执行简单查询的场景。如果连接池大小超过10,你会看到响应时长开始增加,TPS开始下降。

笔者注:这一公式其实不仅适用于数据库连接池的计算,大部分涉及计算和I/O的程序,线程数的设置都可以参考这一公式。我之前在对一个使用Netty编写的消息收发服务进行压力测试时,最终测出的最佳线程数就刚好是CPU核心数的一倍。

公理:你需要一个小连接池,和一个充满了等待连接的线程的队列

如果你有10000个并发用户,设置一个10000的连接池基本等于失了智。1000仍然很恐怖。即是100也太多了。你需要一个10来个连接的小连接池,然后让剩下的业务线程都在队列里等待。连接池中的连接数量应该等于你的数据库能够有效同时进行的查询任务数(通常不会高于2*CPU核心数)。

我们经常见到一些小规模的web应用,应付着大约十来个的并发用户,却使用着一个100连接数的连接池。这会对你的数据库造成极其不必要的负担。

请注意

连接池的大小最终与系统特性相关。

比如一个混合了长事务和短事务的系统,通常是任何连接池都难以进行调优的。最好的办法是创建两个连接池,一个服务于长事务,一个服务于短事务。

再例如一个系统执行一个任务队列,只允许一定数量的任务同时执行,此时并发任务数应该去适应连接池连接数,而不是反过来。

来源:kelgon
jianshu.com/p/a8f653fc0c54

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 解析 XML 的四种方式

发表于 2021-05-28

hello.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<bookstore>
<book id="1"> <!-- id="1"---属性名和属性值 -->
<name>天龙八部</name>
<author>金庸</author>
<year>2014</year>
<price>88</price>
</book>
<book id="2">
<name>鹿鼎记</name>
<year>2015</year>
<price>66</price>
<language>中文</language>
</book>
<book id="3">
<name>射雕英雄传</name>
<author>金庸</author>
<year>2016</year>
<price>44</price>
</book>
</bookstore>

DOM(Document Object Model)解析

优点

  1. 允许应用程序对数据和结构做出更改
  2. 访问是双向的,可以在任何时候再树中上、下导航获取、操作任意部分的数据

缺点

解析XML文档的需要加载整个文档来构造层次结构,消耗内存资源大。

应用范围

遍历能力强,常应用于XML文档需要频繁改变的服务中。

解析步骤

  1. 创建一个 DocumentBuilderFactory 对象
  2. 创建一个 DocumentBuilder 对象
  3. 通过 DocumentBuilder 的 parse() 方法加载 XML 到当前工程目录下
  4. 通过 getElementsByTagName() 方法获取所有 XML 所有节点的集合
  5. 遍历所有节点
  6. 通过 item() 方法获取某个节点的属性
  7. 通过 getNodeName() 和 getNodeValue() 方法获取属性名和属性值
  8. 通过 getChildNodes() 方法获取子节点,并遍历所有子节点
  9. 通过 getNodeName() 和 getTextContent() 方法获取子节点名称和子节点值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
java复制代码package Paint;

import java.io.IOException;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;

import org.w3c.dom.Document;
import org.w3c.dom.NamedNodeMap;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

public class DOMTest {
public static void main(String[] args) {
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
try {
DocumentBuilder db = dbf.newDocumentBuilder();
Document document = db.parse("./src/Paint/hello.xml");
NodeList bookList = document.getElementsByTagName("book"); //节点集

int bookCnt = bookList.getLength();
System.err.println("一共获取到" + bookCnt +"本书");

for(int i=0; i<bookCnt; i++){
Node book = bookList.item(i);
NamedNodeMap attrs = book.getAttributes();
for(int j=0; j<attrs.getLength(); j++){
Node attr = attrs.item(j);
System.err.println(attr.getNodeName()+"---"+attr.getNodeValue());//id

}

NodeList childNodes = book.getChildNodes();
for(int k=0; k<childNodes.getLength(); k++){
if(childNodes.item(k).getNodeType() == Node.ELEMENT_NODE){
System.out.println(childNodes.item(k).getNodeName()+"---" + childNodes.item(k).getTextContent());
}
}
}
} catch (ParserConfigurationException e) {
e.printStackTrace();
} catch (SAXException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}

SAX(Simple API for XML)解析

优点

  1. 不需要等待所有的数据被处理,解析就可以开始
  2. 只在读取数据时检查数据,不需要保存在内存中
  3. 可以在某一个条件满足时停止解析,不必要解析整个文档
  4. 效率和性能较高,能解析大于系统内存的文档

缺点

  1. 解析逻辑复杂,需要应用层自己负责逻辑处理,文档越复杂程序越复杂
  2. 单向导航,无法定位文档层次,很难同时同时访问同一文档的不同部分数据,不支持 XPath

解析步骤

  1. 获取一个 SAXParserFactory 的实例
  2. 通过 factory() 获取 SAXParser 实例
  3. 创建一个 handler() 对象
  4. 通过 parser 的 parse() 方法来解析 XML

SAXTest.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码package Paint;

import java.io.IOException;

import javax.xml.parsers.ParserConfigurationException;
import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory;

import org.xml.sax.SAXException;

public class SAXTest {

public static void main(String[] args) {
// 获取实例
SAXParserFactory factory = SAXParserFactory.newInstance();
try {
SAXParser parser = factory.newSAXParser();
SAXParserHandler handler = new SAXParserHandler();
parser.parse("./src/Paint/hello.xml", handler);

System.err.println("共有"+ handler.getBookList().size()+ "本书");
for(Book book : handler.getBookList()){
System.out.println(book.getName());
System.out.println("id=" + book.getId());
System.out.println(book.getAuthor());
System.out.println(book.getYear());
System.out.println(book.getPrice());
System.out.println(book.getLanguage());
}
} catch (ParserConfigurationException e) {
e.printStackTrace();
} catch (SAXException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}

SAXParserHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
java复制代码package Paint;

import java.util.ArrayList;

import org.xml.sax.Attributes;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.DefaultHandler;

public class SAXParserHandler extends DefaultHandler {
String value = null;
Book book = null;
private ArrayList<Book> bookList = new ArrayList<Book>();

public ArrayList<Book> getBookList() {
return bookList;
}
/*
* XML 解析开始
*/
public void startDocument() throws SAXException {
super.startDocument();
System.out.println("xml 解析开始");
}

/*
* XML 解析结束
*/
public void endDocument() throws SAXException {
super.endDocument();
System.out.println("xml 解析结束");
}

/*
* 解析 XML 元素开始
*/
public void startElement(String uri, String localName, String qName,
Attributes attributes) throws SAXException {

super.startElement(uri, localName, qName, attributes);

if(qName.equals("book")){
book = new Book();

for(int i=0; i<attributes.getLength();i++){
System.out.println(attributes.getQName(i)+"---"+attributes.getValue(i));
if(attributes.getQName(i).equals("id")){
book.setId(attributes.getValue(i));
}
}
}else if(!qName.equals("bookstore")){
System.out.print("节点名:"+ qName + "---");
}
}

/*
*解析 XML 元素结束
*/
public void endElement(String uri, String localName, String qName)
throws SAXException {

super.endElement(uri, localName, qName);
if(qName.equals("book")){
bookList.add(book);
book = null;
}
else if(qName.equals("name")){
book.setName(value);
}else if(qName.equals("year")){
book.setYear(value);
}else if(qName.equals("author")){
book.setAuthor(value);
}else if(qName.equals("price")){
book.setPrice(value);
}else if(qName.equals("language")){
book.setLanguage(value);
}
}

public void characters(char[] ch, int start, int length)
throws SAXException {
super.characters(ch, start, length);

// 获取节点值数组
value = new String(ch, start, length);
if(!value.trim().equals("")){
System.out.println("节点值:"+value);
}
}
}

JDOM 解析

需要引入 jdom.jar

特征

  1. 仅使用具体类,而不使用接口
  2. API 大量使用了 Collections 类

解析步骤

  1. 创建一个 SAXBuilder 的对象
  2. 创建一个输入流,将 xml 文件加载到输入流中
  3. 通过 saxBuilder 的 build()方法,将输入流加载到 saxBuilder 中
  4. 通过 document 对象获取 xml 文件的根节点
  5. 获取根节点下的子节点的 List 集合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
java复制代码package Paint;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

import org.jdom.Attribute;
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.JDOMException;
import org.jdom.input.SAXBuilder;

public class JDOMTest {
private static ArrayList<Book> booksList = new ArrayList<Book>();

public static void main(String[] args) {
SAXBuilder saxBuilder = new SAXBuilder();
InputStream in;
try {
in = new FileInputStream("./src/Paint/hello.xml");
InputStreamReader isr = new InputStreamReader(in, "UTF-8");

Document document = saxBuilder.build(isr);
Element rootElement = document.getRootElement();
List<Element> bookList = rootElement.getChildren();

for (Element book : bookList) {
Book bookEntity = new Book();

List<Attribute> attrList = book.getAttributes();

for (Attribute attr : attrList) {
String attrName = attr.getName();
String attrValue = attr.getValue();
System.out.println( attrName + "----" + attrValue);
if (attrName.equals("id")) {
bookEntity.setId(attrValue);
}
}
// 对book节点的子节点的节点名以及节点值的遍历
List<Element> bookChilds = book.getChildren();
for (Element child : bookChilds) {
System.out.println(child.getName() + "----"+ child.getValue());
if (child.getName().equals("name")) {
bookEntity.setName(child.getValue());
}
else if (child.getName().equals("author")) {
bookEntity.setAuthor(child.getValue());
}
else if (child.getName().equals("year")) {
bookEntity.setYear(child.getValue());
}
else if (child.getName().equals("price")) {
bookEntity.setPrice(child.getValue());
}
else if (child.getName().equals("language")) {
bookEntity.setLanguage(child.getValue());
}
}

booksList.add(bookEntity);
bookEntity = null;
System.out.println(booksList.size());
System.out.println(booksList.get(0).getId());
System.out.println(booksList.get(0).getName());

}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (JDOMException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}

DOM4J(Document Object Model for Java)解析

需要引入 dom4j-1.6.1.jar

优点

  1. 性能很好
  2. 大量使用 Java 集合类,开发简便,同时也提供了一些提高性能的代替方法
  3. 支持 XPath

缺点

  1. API 比较复杂

步骤

  1. 创建 SAXReader 的对象 reader
  2. 通过 reader 对象的 read() 方法加载 books.xml 文件,获取 document 对象
  3. 通过 document 对象获取根节点 bookstore
  4. 通过 element 对象的 elementIterator() 获取迭代器
  5. 遍历迭代器,获取根节点中的信息
  6. 获取 book 的属性名和属性值
  7. 通过 book 对象的 elementIterator() 获取节点元素迭代器
  8. 遍历迭代器,获取子节点中的信息
  9. 获取节点名和节点值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
java复制代码package Paint;

import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.dom4j.Attribute;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;

public class DOM4JTest {

public static void main(String[] args) {
ArrayList<Book> bookList = new ArrayList<Book>();
SAXReader reader = new SAXReader();
try {
Document document = reader.read(new File("./src/Paint/hello.xml"));
Element bookStore = document.getRootElement();
Iterator it = bookStore.elementIterator();
while (it.hasNext()) {

Element book = (Element) it.next();
Book bookData = new Book();
List<Attribute> bookAttrs = book.attributes();
for (Attribute attr : bookAttrs) {
System.out.println(attr.getName() + "---" + attr.getValue());

if(attr.getName().equals("id")){
bookData.setId(attr.getValue());
}
}
Iterator itt = book.elementIterator();

while (itt.hasNext()) {
Element bookChild = (Element) itt.next();

System.out.println(bookChild.getName()+ "---" + bookChild.getText());

if(bookChild.getName().equals("name")){
bookData.setName(bookChild.getText());
}else if(bookChild.getName().equals("author")){
bookData.setAuthor(bookChild.getText());
}else if(bookChild.getName().equals("year")){
bookData.setYear(bookChild.getText());
}else if(bookChild.getName().equals("price")){
bookData.setPrice(bookChild.getText());
}else if(bookChild.getName().equals("language")){
bookData.setLanguage(bookChild.getText());
}
}
// 遍历完一个节点,将该节点信息添加到列表中
bookList.add(bookData);

}
} catch (DocumentException e) {

e.printStackTrace();
}

// 输出保存在内存中XML信息
for(Book book:bookList){
System.out.println(book.getName());
System.out.println("id=" + book.getId());
System.out.println(book.getAuthor());
System.out.println(book.getYear());
System.out.println(book.getPrice());
System.out.println(book.getLanguage());
}
}
}

🔗本文系转载:www.jianshu.com/p/9b3835299…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

王者并发课-青铜7:顺藤摸瓜-如何从synchronized

发表于 2021-05-28

欢迎来到《王者并发课》,本文是该系列文章中的第7篇。

在前面的文章中,我们已经体验过synchronized的用法,并对锁的概念和原理做了简单的介绍。然而,你可能已经察觉到,有一个概念似乎总是和synchronized、锁这两个概念如影相随,很多人也比较喜欢问它们之间的区别,这个概念就是Monitor,也叫监视器。

所以,在讲解完synchronized、锁之后,文本将为你讲解Monitor,揭示它们之间那些公开的秘密,希望你不再迷惑。

首先,你要明白的是,Monitor作为一种同步机制,它并非Java所特有,但Java实现了这一机制。

为了具象地理解Monitor这一抽象概念,我们先来分析身边的一个常见场景。

一、从医院排队就诊机制理解Monitor

相信你一定有过去医院就诊的经历。我们去医院时,情况一般是这样的:

  • 首先,我们在门诊大厅前台或自助挂号机进行挂号;
  • 随后,挂号结束后我们找到对应的诊室就诊:
    • 诊室每次只能有一个患者就诊;
    • 如果此时诊室空闲,直接进入就诊;
    • 如果此时诊室内有患者正在就诊,那么我们进入候诊室,等待叫号;
  • 就诊结束后,走出就诊室,候诊室的下一位候诊患者进入就诊室。

这个就诊过程你一定耳熟能详,理解起来必然毫不费力气。我们做了一张图展示图下:

仔细看这幅图中的就诊过程,如果你理解了这个过程,你就理解了Monitor. 这么简单吗?不要怀疑自己,是的。你竟然早已理解Monitor机制!

不要小看这个机制,它可是生活中的智慧体现。在这个就诊机制中,它起到了两个关键性的作用:

  • 互斥(mutual exclusion ):每次只允许一个患者进入候诊室就诊;
  • 协作(cooperation):就诊室中的患者就诊结束后,可以通知候诊区的下一位患者。

明白了吗?你在医院就诊的过程竟然和Monitor的机制几乎一模一样。我们换个方式来描述Monitor在计算机科学中的作用:

  • 互斥(mutual exclusion ):每次只允许一个线程进入临界区;
  • 协作(cooperation):当临界区的线程执行结束后满足特定条件时,可以通知其他的等待线程进入。

而就诊过程中的门诊大厅、就诊室、候诊室则恰好对应着Monitor中的三个关键概念。其中:

  • 门诊大厅:所有待进入的线程都必须先在**入口(Entry Set)**挂号才有资格;
  • 就诊室:一个每次只能有一个线程进入的特殊房间(Special Room);
  • 候诊室:就诊室繁忙时,进入等待区(Wait Set);

我们把上面的图稍作调整,就可以看到Monitor在Java中的模样:

对比来看,相信你已经很直观地理解Monitor机制。再一回味,你会发现synchronized正是对Monitor机制的一种实现。而在Java中,每一个对象都会关联一个监视器。

二、从synchronized源码感受Monitor

既然synchronized是对Monitor机制的一种实现,为了让你更有体感,我们可以写一段极简代码一探究竟。

这段代码极为简单,但是够用,我们在代码中使用了synchronized关键字:

1
2
3
4
5
6
7
8
java复制代码public class SyncMonitorDemo {
public static void main(String[] args) {
Object o = new Object();
synchronized (o) {
System.out.println("locking...");
}
}
}

代码写好后,分别执行javac SyncMonitorDemo.java和 javap -v SyncMonitorDemo.class,随后你就能得到下面这样的字节码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
yaml复制代码Classfile SyncMonitorDemo.class
Last modified May 26, 2021; size 684 bytes
MD5 checksum e366920f22845e98c45f26531596d6cf
Compiled from "SyncMonitorDemo.java"
public class cn.tao.king.juc.execises1.SyncMonitorDemo
minor version: 0
major version: 49
flags: ACC_PUBLIC, ACC_SUPER
Constant pool:
#1 = Methodref #2.#22 // java/lang/Object."<init>":()V
#2 = Class #23 // java/lang/Object
#3 = Fieldref #24.#25 // java/lang/System.out:Ljava/io/PrintStream;
#4 = String #26 // locking...
#5 = Methodref #27.#28 // java/io/PrintStream.println:(Ljava/lang/String;)V
#6 = Class #29 // cn/tao/king/juc/execises1/SyncMonitorDemo
#7 = Utf8 <init>
#8 = Utf8 ()V
#9 = Utf8 Code
#10 = Utf8 LineNumberTable
#11 = Utf8 LocalVariableTable
#12 = Utf8 this
#13 = Utf8 Lcn/tao/king/juc/execises1/SyncMonitorDemo;
#14 = Utf8 main
#15 = Utf8 ([Ljava/lang/String;)V
#16 = Utf8 args
#17 = Utf8 [Ljava/lang/String;
#18 = Utf8 o
#19 = Utf8 Ljava/lang/Object;
#20 = Utf8 SourceFile
#21 = Utf8 SyncMonitorDemo.java
#22 = NameAndType #7:#8 // "<init>":()V
#23 = Utf8 java/lang/Object
#24 = Class #30 // java/lang/System
#25 = NameAndType #31:#32 // out:Ljava/io/PrintStream;
#26 = Utf8 locking...
#27 = Class #33 // java/io/PrintStream
#28 = NameAndType #34:#35 // println:(Ljava/lang/String;)V
#29 = Utf8 cn/tao/king/juc/execises1/SyncMonitorDemo
#30 = Utf8 java/lang/System
#31 = Utf8 out
#32 = Utf8 Ljava/io/PrintStream;
#33 = Utf8 java/io/PrintStream
#34 = Utf8 println
#35 = Utf8 (Ljava/lang/String;)V
{
public cn.tao.king.juc.execises1.SyncMonitorDemo();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=1, locals=1, args_size=1
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: return
LineNumberTable:
line 3: 0
LocalVariableTable:
Start Length Slot Name Signature
0 5 0 this Lcn/tao/king/juc/execises1/SyncMonitorDemo;

public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=4, args_size=1
0: new #2 // class java/lang/Object
3: dup
4: invokespecial #1 // Method java/lang/Object."<init>":()V
7: astore_1
8: aload_1
9: dup
10: astore_2
11: monitorenter
12: getstatic #3 // Field java/lang/System.out:Ljava/io/PrintStream;
15: ldc #4 // String locking...
17: invokevirtual #5 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
20: aload_2
21: monitorexit
22: goto 30
25: astore_3
26: aload_2
27: monitorexit
28: aload_3
29: athrow
30: return
Exception table:
from to target type
12 22 25 any
25 28 25 any
LineNumberTable:
line 5: 0
line 6: 8
line 7: 12
line 8: 20
line 9: 30
LocalVariableTable:
Start Length Slot Name Signature
0 31 0 args [Ljava/lang/String;
8 23 1 o Ljava/lang/Object;
}
SourceFile: "SyncMonitorDemo.java"

javap是JDK自带的一个反汇编命令。你可以忽略其他不必要的信息,直接在结果中找到下面这段代码:

1
2
3
4
5
6
yaml复制代码11: monitorenter
12: getstatic #3 // Field java/lang/System.out:Ljava/io/PrintStream;
15: ldc #4 // String locking...
17: invokevirtual #5 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
20: aload_2
21: monitorexit

看到monitorenter和monitorexit指令,相信智慧的你已经看穿一切。

以上就是文本的全部内容,恭喜你又上了一颗星✨

夫子的试炼

  • 写一段包含synchronized关键字的代码,使用javap命令观察结果。

延伸阅读与参考资料

  • 《王者并发课》专栏文集下载:github.com/ThoughtsBet…

关于作者

专注高并发领域创作。姊妹篇小册《高并发秒杀的设计精要与实现》作者,关注公众号【MetaThoughts】,及时获取文章更新和文稿。


如果本文对你有帮助,欢迎点赞、关注、监督,我们一起从青铜到王者。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

License授权方案

发表于 2021-05-28

源码地址:github.com/sixj0/licen…

解决的问题:

将项目卖给其他公司,需要将jar包在客户的服务器上部署,为了避免客户将项目jar包进行二次售卖,或者我们需要控制项目的使用期限,比如控制项目只能运行一年,这个是时候,需要使用License认证,生成一个License证书,该证书中包含客户服务器信息(IP地址、MAC地址、CPU序列号、主板序列号),同时可以设置生效时间与失效时间,控制项目到期之后项目不可用。

license授权机制的原理

(1)生成密钥对,包含私钥和公钥。(2)授权者保留私钥,使用私钥对授权信息诸如使用截止日期,mac 地址等内容生成 license 签名证书。(3)公钥给使用者,放在代码中使用,用于验证 license 签名证书是否符合使用条件。

使用KeyTool生成密匙库:

  1. 生成私钥库
1
shell复制代码keytool -genkeypair -keysize 1024 -validity 3650 -alias SYSHLANG -keystore privateKeys.keystore -storepass 12345678A -keypass 12345678A -dname "CN=sixj, OU=runlion, O=redlion, L=HZ, ST=ZJ, C=CN"

参数说明:

* keysize 密钥长度
* validity 私钥的有效期(单位:天)
* alias 私钥别称
* keystore 指定私钥库文件的名称 (生成在当前目录)
* storepass 指定私钥库的密码 (keystore 文件存储密码)
* keypass 指定别名条目的密码 (私钥加解密密码)
* dname 证书个人信息
    + CN 为你的姓名
    + OU 为你的组织单位名称
    + O 为你的组织名称
    + L 为你所在的城市名称
    + ST 为你所在的省份名称
    + C 为你的国家名称
  1. 生成证书文件
1
shell复制代码keytool -exportcert -alias SYSHLANG -keystore privateKeys.keystore -storepass 12345678A -file certfile.cer

参数说明:

* alias 私钥别称
* keystore 指定私钥库文件的名称 (如果没有带路径,在当前目录查找)
* storepass 指定私钥库的密码
* file 导出证书文件名称
  1. 生成公钥库
1
shell复制代码keytool -import -alias SYSHLANG -file certfile.cer -keystore publicCerts.keystore  -storepass 12345678A

参数说明:

* alias 公钥别称
* file 证书文件名称
* keystore 公钥文件名称
* storepass 指定私钥库的密码

看到以下三个文件:

  • privateKeys.keystore(私钥)提供给生成证书使用
  • publicCerts. keystore(公钥)提供给证书认证使用
  • certfile.cer后续步骤用不到,可以删除。

License证书:

根据客户服务器硬件信息(MAC地址、IP地址、CPU序列号、主板序列号)生成授权证书,同时可以给授权证书设置生效时间与失效时间。

整个授权过程分为三步:

  1. 获取客户端服务器信息(license-server-info服务)

将license-server-info服务部署到客户服务器上

请求接口/license/getServerInfo

得到硬件信息:

1
2
3
4
5
6
7
8
9
10
11
12
> json复制代码{
> "ipAddress": [ //授权的ip列表
> "172.17.0.8"
> ],
> "macAddress": [ //授权的mac地址列表
> "52-54-00-74-0B-D9"
> ],
> "cpuSerial": "55 06 05 00 FF FB 8B 0F", //cpu序列号
> "mainBoardSerial": "afb14aac-eccb-4a37-9c31-e7951ce73e0d"//主板序列号
> }
>
>
  1. 生成授权证书(license-creator服务)

我们自己部署生成证书的服务(license-creator)

请求接口/license/generateLicense

入参传入授权信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
> json复制代码{
>
> "expiryTime":"2021-05-25 19:07:59", //生效时间
> "issuedTime":"2021-04-25 19:07:00", //失效时间
> "keyPass":"12345678A", //密钥的密码
> "privateAlias":"SYSHLANG",
> "licensePath":"/Users/sixj/Desktop/license/license.lic", //证书生成地址
> "privateKeysStorePath":"/Users/sixj/Desktop/license/privateKeys.keystore",// 密钥文件地址
> "storePass":"12345678A", //密钥库的密码
> "subject":"pushi-kn-graph",
> "licenseCheckModel":{ //授权验证信息
> "cpuSerial":"47A8E193-23D4-5B93-92AB-4A96FBC0346F",//cpu序列号
> "ipAddress":[ //ip
> "192.168.174.107"
> ],
> "macAddress":[ //mac地址
> "F8-FF-C2-6A-3E-73"
> ],
> "mainBoardSerial":"C02C31HZMD6P"//主板序列号
> }
> }
>
>

会生成一个license.lic授权文件到执行目录
3. 授权验证(license-verify-starter)

需要添加授权功能的服务需要依赖license-verify-starter

配置证书信息

1
2
3
4
5
6
7
> properties复制代码license.subject: pushi-kn-graph
> license.publicAlias: SYSHLANG
> license.storePass: 12345678A
> license.licensePath: /Users/sixj/Desktop/license/license.lic
> license.publicKeysStorePath: /Users/sixj/Desktop/license/publicCerts.keystore
>
>

项目启动的时候,会去验证授权证书的有效性,是否在有效期内,硬件信息是否匹配,如果授权证书无效,项目启动失败。

另外可以在一些核心接口,比如登陆接口,添加@License注解,请求该接口的时候,也会去验证授权证书的有效性,比如验证证书是否到期,如果失效,该接口将会拒绝访问。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Android Jetpack 开发套件

发表于 2021-05-28

⭐️ 本文已收录到 AndroidFamily,技术和职场问题,请关注公众号 [彭旭锐] 和 [BaguTree Pro] 知识星球提问。

Android Jetpack 开发套件是 Google 推出的 Android 应用开发编程范式,为开发者提供了解决应用开发场景中通用的模式化问题的最佳实践,让开发者可将时间精力集中于真正重要的业务编码工作上。

这篇文章是 Android Jetpack 系列文章的第 8 篇文章,完整目录可以移步至文章末尾~

前言

  • 从 androidx.activity 1.0.0 开始,Google 引入 OnBackPressedDispatcher API 来处理回退事件,旨在优化回退事件处理:你可以在任何位置定义回退逻辑,而不是依赖于 Activity#onBackPressed();
  • 在这篇文章里,我将介绍 OnBackPressedDispatcher 的使用方法 & 实现原理 & 应用场景。如果能帮上忙,请务必点赞加关注,这真的对我非常重要。
  • 本文相关代码可以从 DemoHall·HelloAndroidX 下载查看。

目录


  1. 概述

  • OnBackPressedDispatcher 解决了什么问题: 在 Activity 里可以通过回调方法 onBackPressed() 处理,而 Fragment / View 却没有直接的回调方法。现在,我们可以使用 OnBackPressedDispatcher 替代 Activity#onBackPressed(),更优雅地实现回退逻辑。
  • OnBackPressedDispatcher 的整体处理流程: 分发器整体采用责任链设计模式,向分发器添加的回调对象都会成为责任链上的一个节点。当用户触发返回键时,将按顺序遍历责任链,如果回调对象是启用状态(Enabled),则会消费该回退事件,并且停止遍历。如果最后事件没有被消费,则交回到 Activity#onBackPressed() 处理。
  • OnBackPressedDispatcher 与其他方案对比: 在 OnBackPressedDispatcher 之前,我们只能通过 “取巧” 的方法处理回退事件:
+ 1、在 Fragment 中定义回调方法,从 Activity#onBackPressed() 中传递回调事件(缺点:增加了 Activity & Fragment 的耦合关系);
+ 2、在 Fragment 根布局中设置按键监听 setOnKeyListener(缺点:不灵活 & 多个 Fragment 监听冲突)。

  1. OnBackPressedDispatcher 有哪些 API?

主要有以下几个,其他这几个 API 都比较好理解。其中 addCallback(LifecycleOwner, callback) 会在生命周期持有者 LifecycleOwner 进入 Lifecycle.State.STARTED 状态,才会加入分发责任链,而在 LifecycleOwner 进入 Lifecycle.State.STOP 状态时,会从分发责任链中移除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码1、添加回调对象
public void addCallback(OnBackPressedCallback onBackPressedCallback)

2、添加回调对象,关联到指定生命周期持有者
public void addCallback(LifecycleOwner owner, OnBackPressedCallback onBackPressedCallback)

3、判断是否有启用的回调
public boolean hasEnabledCallbacks()

4、回退事件分发入口
public void onBackPressed()

5、构造器(参数为最终回调)
public OnBackPressedDispatcher(@Nullable Runnable fallbackOnBackPressed) {
mFallbackOnBackPressed = fallbackOnBackPressed;
}

  1. OnBackPressedDispatcher 源码分析

OnBackPressedDispatcher 源码不多,我直接带着问题入手,帮你梳理 OnBackPressedDispatcher 内部的实现原理:

3.1 Activity 如何将事件分发到 OnBackPressedDispatcher?

答:ComponentActivity 内部组合了分发器对象,返回键回调 onBackPressed() 会直接分发给 OnBackPressedDispatcher#onBackPressed()。另外,Activity 本身的回退逻辑则封装为 Runnable 交给分发器处理。

androidx.activity.ComponentActivity.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码private final OnBackPressedDispatcher mOnBackPressedDispatcher =
new OnBackPressedDispatcher(new Runnable() {
@Override
public void run() {
// Activity 本身的回退逻辑
ComponentActivity.super.onBackPressed();
}
});

@Override
@MainThread
public void onBackPressed() {
mOnBackPressedDispatcher.onBackPressed();
}

@NonNull
@Override
public final OnBackPressedDispatcher getOnBackPressedDispatcher() {
return mOnBackPressedDispatcher;
}

3.2 说一下 OnBackPressedDispatcher 的处理流程?

答:分发器整体采用责任链设计模式,向分发器添加的回调对象都会成为责任链上的一个节点。当用户触发返回键时,将按顺序遍历责任链,如果回调对象是启用状态(Enabled),则会消费该回退事件,并且停止遍历。如果最后事件没有被消费,则交回到 Activity#onBackPressed() 处理。

OnBackPressedDispatcher.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
java复制代码// final 回调:Activity#onBackPressed()
@Nullable
private final Runnable mFallbackOnBackPressed;

// 责任链
final ArrayDeque<OnBackPressedCallback> mOnBackPressedCallbacks = new ArrayDeque<>();

// 构造器
public OnBackPressedDispatcher() {
this(null);
}

// 构造器
public OnBackPressedDispatcher(@Nullable Runnable fallbackOnBackPressed) {
mFallbackOnBackPressed = fallbackOnBackPressed;
}

// 判断是否有启用的回调
@MainThread
public boolean hasEnabledCallbacks() {
Iterator<OnBackPressedCallback> iterator = mOnBackPressedCallbacks.descendingIterator();
while (iterator.hasNext()) {
if (iterator.next().isEnabled()) {
return true;
}
}
return false;
}

入口方法:责任链上的每个回调方法仅在前面的回调处于未启用状态(unEnabled)才能调用。
如果如果都没有启用,最后会回调给 mFallbackOnBackPressed
@MainThread
public void onBackPressed() {
Iterator<OnBackPressedCallback> iterator = mOnBackPressedCallbacks.descendingIterator();
while (iterator.hasNext()) {
OnBackPressedCallback callback = iterator.next();
if (callback.isEnabled()) {
callback.handleOnBackPressed();
// 消费
return;
}
}
if (mFallbackOnBackPressed != null) {
mFallbackOnBackPressed.run();
}
}

3.3 回调方法执行在主线程还是子线程?

答:主线程,分发器的入口方法 Activity#onBackPressed() 执行在主线程,因此回调方法也是执行在主线程。另外,添加回调的 addCallback() 方法也要求在主线程执行,分发器内部使用非并发安全容器 ArrayDeque 存储回调对象。

3.4 OnBackPressedCallback 可以同时添加到不同分发器吗?

答:可以。

3.5 加入返回栈的Fragment 事务,如何回退?

答:FragmentManager 也将事务回退交给 OnBackPressedDispatcher 处理。首先,在 Fragment attach 时,会创建一个回调对象加入分发器,回调处理时弹出返回栈栈顶事务。不过初始状态是未启用,只有当事务添加进返回栈后,才会修改回调对象为启用状态。源码体现如下:

FragmentManagerImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
java复制代码// 3.5.1 分发器与回调对象(初始状态是未启用)
private OnBackPressedDispatcher mOnBackPressedDispatcher;
private final OnBackPressedCallback mOnBackPressedCallback =
new OnBackPressedCallback(false) {
@Override
public void handleOnBackPressed() {
execPendingActions();
if (mOnBackPressedCallback.isEnabled()) {
popBackStackImmediate();
} else {
mOnBackPressedDispatcher.onBackPressed();
}
}
};

// 3.5.2 添加回调对象 addCallback
public void attachController(@NonNull FragmentHostCallback host, @NonNull FragmentContainer container, @Nullable final Fragment parent) {
if (mHost != null) throw new IllegalStateException("Already attached");
...
// Set up the OnBackPressedCallback
if (host instanceof OnBackPressedDispatcherOwner) {
OnBackPressedDispatcherOwner dispatcherOwner = ((OnBackPressedDispatcherOwner) host);
mOnBackPressedDispatcher = dispatcherOwner.getOnBackPressedDispatcher();
LifecycleOwner owner = parent != null ? parent : dispatcherOwner;
mOnBackPressedDispatcher.addCallback(owner, mOnBackPressedCallback);
}
...
}

// 3.5.3 执行事务时,尝试修改回调对象状态
void scheduleCommit() {
...
updateOnBackPressedCallbackEnabled();
}

private void updateOnBackPressedCallbackEnabled() {
if (mPendingActions != null && !mPendingActions.isEmpty()) {
mOnBackPressedCallback.setEnabled(true);
return;
}

mOnBackPressedCallback.setEnabled(getBackStackEntryCount() > 0 && isPrimaryNavigation(mParent));
}

// 3.5.4 回收
public void dispatchDestroy() {
mDestroyed = true;
...
if (mOnBackPressedDispatcher != null) {
// mOnBackPressedDispatcher can hold a reference to the host
// so we need to null it out to prevent memory leaks
mOnBackPressedCallback.remove();
mOnBackPressedDispatcher = null;
}
}

如果你对 Fragment 事务缺乏清晰的概念,务必看下我之前写的一篇文章:Android Jetpack 开发套件 #7 AndroidX Fragment 核心原理分析

讨论完 OnBackPressedDispatcher 的使用方法 & 实现原理,下面我们直接通过一些应用场景来实践:


  1. 再按一次返回键退出

再按一次返回键退出是一个很常见的功能,本质上是一种退出挽回。网上也流传着很多不全面的实现方式。其实,这个功能看似简单,却隐藏着一些优化细节,一起来看看~

4.1 需求分析

首先,我分析了几十款知名的 App,梳理总结出 4 类返回键交互:

分类 描述 举例
1、系统默认行为 返回键事件交给系统处理,应用不做干预 微信、支付宝等
2、再按一次退出 是否两秒内再次点击返回键,是则退出 爱奇艺、高德等
3、返回首页 Tab 按一次先返回首页 Tab,再按一次退出 Facebook、Instagram等
4、刷新信息流 按一次先刷新信息流,再按一次退出 小红书、今日头条等

4.2 如何退出 App?

交互逻辑主要依赖于产品形态和具体应用场景,对于我们技术同学还需要考虑不同的退出 App 的方式的区别。通过观测以上 App 的实际效果,我梳理出以下 4 种退出 App 的实现方式:

  • 1、系统默认行为: 将回退事件交给系统处理,而系统的默认行为是 finish() 当前 Activity,如果当前 Activity 位于栈底,则将 Activity 任务栈转入后台;
  • 2、调用 moveTaskToBack(): 手动将当前 Activity 所在任务栈转入后台,效果与系统的默认行为类似(该方法接收一个 nonRoot 参数:true:要求只有当前 Activity 处于栈底有效、false:不要求当前 Activity 处于栈底)。因为 Activity 实际上并没有销毁,所以用户下次返回应用时是热启动;
  • 3、调用 finish(): 结束当前 Activity,如果当前 Activity 处于栈底,则销毁 Activity 任务栈,如果当前 Activity 是进程最后一个组件,则进程也会结束。需要注意的时,进程结束后内存不会立即被回收,将来(一段时间内)用户重新启动应用为温启动,启动速度比冷启动更快;
  • 4、调用 System.exit(0) 杀死应用 杀死进程 JVM,将来用户重新启动为冷启动,需要花费更多时间。

那么,我们应该如何选择呢?一般情况下,“调用 moveTaskToBack()” 表现最佳,两个论点:

  • 1、两次点击返回键的目的是挽回用户,确认用户真的需要退出。那么,退出后的行为与无拦截的默认行为相同,这点 moveTaskToBack() 可以满足,而 finish() 和 System.exit(0) 的行为比默认行为更严重;
  • 2、moveTaskToBack() 退出应用并没有真正销毁应用,用户重新返回应用是热启动,恢复速度最快。

需要注意,一般不推荐使用 System.exit(0) 和 Process.killProcess(Process.myPid) 来退出应用。因为这些 API 的表现并不理想:

  • 1、当调用的 Activity 不位于栈顶时,杀死进程系统会立即重新启动 App(可能是系统认为 前台 App 是意外终止的,会自动重启);
  • 2、当 App 退出后,粘性服务会自动重启(Service#onStartCommand() 返回 START_STICKY 的 Service),粘性服务会一致运行除非手动停止。
分类 应用返回效果 举例
1、系统默认行为 热启动 微信、支付宝等
2、调用 moveTaskToBack() 热启动 QQ 音乐、小红书等
3、调用 finish() 温启动 待确认(备选爱奇艺、高德等)
4、调用 System.exit(0) 杀死应用 冷启动 待确认(备选爱奇艺、高德等)

Process.killProcess(Process.myPid) 和 System.exit(0) 的区别? todo

4.3 具体代码实现

BackPressActivity.kt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
kotlin复制代码fun Context.startBackPressActivity() {
startActivity(Intent(this, BackPressActivity::class.java))
}

class BackPressActivity : AppCompatActivity(R.layout.activity_backpress) {

// ViewBinding + Kotlin 委托
private val binding by viewBinding(ActivityBackpressBinding::bind)

/**
* 上次点击返回键的时间
*/
private var lastBackPressTime = -1L

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

// 添加回调对象
onBackPressedDispatcher.addCallback(this, onBackPress)

// 返回按钮
binding.ivBack.setOnClickListener {
onBackPressed()
}
}

private val onBackPress = object : OnBackPressedCallback(true) {
override fun handleOnBackPressed() {
if (popBackStack()) {
return
}
val currentTIme = System.currentTimeMillis()
if (lastBackPressTime == -1L || currentTIme - lastBackPressTime >= 2000) {
// 显示提示信息
showBackPressTip()
// 记录时间
lastBackPressTime = currentTIme
} else {
//退出应用
finish()
// android.os.Process.killProcess(android.os.Process.myPid())
// System.exit(0) // exitProcess(0)
// moveTaskToBack(false)
}
}
}

private fun showBackPressTip() {
Toast.makeText(this, "再按一次退出", Toast.LENGTH_SHORT).show();
}
}

这段代码的逻辑并不复杂,我们主要通过 OnBackPressedDispatcher#addCallback() 添加了一个回调对象,从而干预了返回键事件的逻辑:“首次点击返回键弹出提示,两秒内再次点击返回键退出应用”。

另外,需要解释下这句代码: private val binding by viewBinding(ActivityBackpressBinding::bind)。这里其实是使用了 ViewBinding + Kotlin 委托属性的视图绑定方案,相对于传统的 findViewById、ButterKnife、Kotlin Synthetics 等方案,这个方案从多个角度上表现更好。具体分析你可以看我之前写过的一篇文章:Android Jetpack 开发套件 #4 ViewBinding 与 Kotlin 委托双剑合璧

4.4 优化:兼容 Fragment 返回栈

上一节基本能满足需求,但考虑一种情况:页面内有多个 Fragment 事务加入了返回栈,点击返回键时需要先依次清空返回栈,最后再走 “再按一次返回键退出” 逻辑。

此时,你会发现上一节的方法不会等返回栈清空就直接走退出逻辑了。原因也很好理解,因为 Activity 的回退对象的加入时机比 FragmentManagerImpl 中的回退对象加入时机更早,所以 Activity 的回退逻辑优先处理。解决方法就是在 Activtiy 回退逻辑中手动弹出 Fragment 事务返回栈。完整演示代码如下:

BackPressActivity.kt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
kotlin复制代码class BackPressActivity : AppCompatActivity(R.layout.activity_backpress) {

private val binding by viewBinding(ActivityBackpressBinding::bind)

/**
* 上次点击返回键的时间
*/
private var lastBackPressTime = -1L

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

addFragmentToStack()
onBackPressedDispatcher.addCallback(this, onBackPress)

binding.ivBack.setOnClickListener {
onBackPressed()
}
}

private fun addFragmentToStack() {
// 提示:为了聚焦问题,这里不考虑 Activity 重建的场景
for (index in 1..5) {
supportFragmentManager.beginTransaction().let { it ->
it.add(
R.id.container,
BackPressFragment().also { it.text = "fragment_$index" },
"fragment_$index"
)
it.addToBackStack(null)
it.commit()
}
}
}

/**
* @return true:没有Fragment弹出 false:有Fragment弹出
*/
private fun popBackStack(): Boolean {
// 当 Fragment 状态以保存,不弹出返回栈
return supportFragmentManager.isStateSaved
|| supportFragmentManager.popBackStackImmediate()
}

private val onBackPress = object : OnBackPressedCallback(true) {
override fun handleOnBackPressed() {
if (popBackStack()) {
return
}
val currentTIme = System.currentTimeMillis()
if (lastBackPressTime == -1L || currentTIme - lastBackPressTime >= 2000) {
// 显示提示信息
showBackPressTip()
// 记录时间
lastBackPressTime = currentTIme
} else {
//退出应用
finish()
// android.os.Process.killProcess(android.os.Process.myPid())
// System.exit(0) // exitProcess(0)
// moveTaskToBack(false)
}
}
}

private fun showBackPressTip() {
Toast.makeText(this, "再按一次退出", Toast.LENGTH_SHORT).show();
}
}

4.5 在 Fragment 中使用

TestFragment.kt

1
2
3
4
5
6
7
8
9
10
11
12
13
kotlin复制代码class TestFragment : Fragment() {
private val dispatcher by lazy {requireActivity().onBackPressedDispatcher}

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

dispatcher.addCallback(this, object : OnBackPressedCallback(true) {
override fun handleOnBackPressed() {
Toast.makeText(context, "TestFragment - handleOnBackPressed", Toast.LENGTH_SHORT).show()
}
})
}
}

4.6 其他 finish() 方法

另外,finish() 还有一些类似的 API,可以补充了解下:

  • finishAffinity():关闭当前 Activity 任务栈中,位于当前 Activity 底下的所有 Activity(例如 A 启动 B,B 启动 C,如果 B 调用 finishAffinity(),则会关闭 A 和 B,而 C 保留)。该 API 在 API 16 后引入,最好通过 ActivityCompat.finishAffinity() 调用。
  • finishAfterTransition():执行转场动画后 finish Activity,需要通过 ActivityOptions 定义转场动画。该 API 在 API 21 后引入,最好通过 ActivityCompat.finishAfterTransition() 调用。

  1. 总结

关于 OnBackPressedDispatcher 的讨论就先到这里,给你留两个思考题:

  • 1、如果 Activity 上弹出一个 Dialog,此时点返回键是先关闭 Dialog,还是会分发给 OnBackPressedDispatcher?如果弹出的是 PopupWindow 呢?
  • 2、Activity 的 WebView 中弹出了一个浮层,怎么实现点击返回键先关闭浮层,再次点击才回退页面?

参考资料

  • Jetpack 应用架构指南 —— 官方文档
  • 提供自定义返回导航 —— 官方文档
  • Fragment 的过去、现在和将来 —— 谷歌开发者

推荐阅读

Android Jetpack 系列文章目录如下(2023/07/08 更新):

  • #1 Lifecycle:生命周期感知型组件的基础
  • #2 为什么 LiveData 会重放数据,怎么解决?
  • #3 为什么 Activity 都重建了 ViewModel 还存在?
  • #4 有小伙伴说看不懂 LiveData、Flow、Channel,跟我走
  • #5 Android UI 架构演进:从 MVC 到 MVP、MVVM、MVI
  • #6 ViewBinding 与 Kotlin 委托双剑合璧
  • #7 AndroidX Fragment 核心原理分析
  • #8 OnBackPressedDispatcher:Jetpack 处理回退事件的新姿势
  • #9 食之无味!App Startup 可能比你想象中要简单
  • #10 从 Dagger2 到 Hilt 玩转依赖注入(一)

⭐️ 永远相信美好的事情即将发生,欢迎加入小彭的 Android 交流社群~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

盘点 Cloud Nacos 基础手册

发表于 2021-05-27

总文档 :文章目录

Github : github.com/black-ant

一 . 前言

水文一篇 , 哈哈哈哈哈哈~

文章目的 :

  • 梳理 Nacos 创建功能的 Debug 方向
  • 梳理 Nacos 组件的模块体系

文章大纲 : 该文档主要涉及以下几个主要的部分

  • Nacos 服务发现
  • Nacos 配置加载
  • Nacos 健康检查
  • Nacos 路由策略

PS : 文档参考来源为 官方文档 , 建议阅读文档了解快速使用

二 . 源码的编译

2.1 Nacos源码编译

1
2
3
4
5
6
7
8
java复制代码// Step 1 : 下载源码
https://github.com/alibaba/nacos.git

// Step 2 : 编译 Nacos
mvn -Prelease-nacos -Dmaven.test.skip=true clean install -U

// Step 3 : 运行 Server 文件
nacos_code\distribution\target

2.2 Nacos 源码运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码// Step 1 : 下载 Nacos 源码
https://github.com/alibaba/nacos.git

// Step 2 : IDEA 导入 Nacos
此处添加 SpringBoot 启动 , 启动的类为 com.alibaba.nacos.Nacos


// Step 3 : IDEA 修改启动参数
-Dnacos.standalone=true -Dnacos.home=C:\\nacos

-nacos.standalone=true : 单机启动
-Dnacos.home=C:\\nacos : 日志路径



// PS : Nacos Application
@SpringBootApplication(scanBasePackages = "com.alibaba.nacos")
@ServletComponentScan
@EnableScheduling
public class Nacos {

public static void main(String[] args) {
SpringApplication.run(Nacos.class, args);
}
}

三 . 模块源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
xml复制代码<modules>
<!-- 配置管理-->
<module>config</module>
<!-- Nacos 内核 -->
<module>core</module>
<!-- 服务发现 -->
<module>naming</module>
<!-- 地址服务器-->
<module>address</module>
<!-- 单元测试 -->
<module>test</module>
<!-- 接口抽象 -->
<module>api</module>
<!-- 客户端 -->
<module>client</module>
<!-- 案例 -->
<module>example</module>
<!-- 公共工具 -->
<module>common</module>
<!-- Server 构建发布 -->
<module>distribution</module>
<!-- 控制台,图形界面模块 -->
<module>console</module>
<!-- 元数据管理-->
<module>cmdb</module>
<!-- TODO : 猜测是集成 istio 完成流量控制-->
<module>istio</module>
<!-- 一致性管理 -->
<module>consistency</module>
<!-- 权限控制 -->
<module>auth</module>
<!-- 系统信息管理 Env 读取 , conf 读取 -->
<module>sys</module>
</modules>

3.1 Nacos 服务的发现和管理 (c0-c20)

Nacos 对服务的管理主要集中在 Naming 模块中 , 这里结合 Client 端看一下服务的发现和管理是什么逻辑 , 以及该如何去操作他们

3.1.1 控制台获取服务列表

外部接口 :

  • C- CatalogController # listDetail : 获取服务详情列表
  • C- CatalogController # instanceList: 列出特殊服务的实例
  • C- CatalogController # serviceDetail : 服务详情

核心主要是通过 ServiceManager 进行处理 , 此处看一下内部相关的逻辑 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码通过三个接口不难发现 ,其最终的调用核心都是 ServiceManager 类

// 内部类
C01- ServiceManager
// 内部类
PC- UpdatedServiceProcessor
PC- ServiceUpdater
PSC- ServiceChecksum
PC- EmptyServiceAutoClean
PC- ServiceReporter
PSC- ServiceKey :
// 核心参数
F- Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
F- LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
F- ConsistencyService consistencyService;
// 常用方法
M- init : 初始化方法
M- chooseServiceMap : 通过空间名获取 Server 集合
M- addUpdatedServiceToQueue : 更新 Server
M- onChange : server 改变
M- onDelete : server 删除


// 另外 , ServiceManager 还存在一个依赖 : ConsistencyService
C02- ConsistencyService : 一致性服务接口 -> PS:C02_01
M- put :向集群提交一个数据
M- remove :从集群删除一个数据
M- get :从集群获取数据
M- listen :监听集群中某个key的变化
M- unlisten :删除对某个key的监听
M- isAvailable :返回当前一致性状态是否可用


//总结 : 此处的逻辑很简单 , 就是对集合的 CURD 操作 , 核心特点有以下几个 :
1- Delete 时 , 会调用依赖对象 ConsistencyService (DelegateConsistencyServiceImpl) , 用于处理一致性需求

PS:C02_01 ConsistencyService 体系结构

nacos-ConsistencyService.png

当存在多个服务的时候 , 是如何存储的?

Nacos-server-Map.jpg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码// 如上述图所示 , 相关的对象存放在 clusterMap 中

// 注意 , Service 有2个
com.alibaba.nacos.api.naming.pojo.Service
com.alibaba.nacos.naming.core.Service


// Service 的属性
C- Service
I- com.alibaba.nacos.api.naming.pojo.Service
F- Selector selector
F- Map<String, Cluster> clusterMap = new HashMap<>();
F- Boolean enabled
F- Boolean resetWeight
F- String token
F- List<String> owners

3.1.2 Nacos 服务和 Config 的控制类

1
2
3
4
5
6
7
8
9
java复制代码Nacos 中主要通过 NamingService 和 ConfigService 对服务和配置进行控制 , 其底层原理仍然为 :   , 这里来简单看一下 

ConfigService -> NacosConfigService
NamingService -> NacosNamingService


这2个类归属于 com.alibaba.nacos.client.naming 包

// PS : 注意 ,要使用 nacos-config-spring-boot-starter 和 nacos-discovery-spring-boot-starter 包

3.1.3 Nacos 的退出销毁

当我们注册的服务在关闭的时候 , Nacos 会在生命周期结束的时候从 Server 端注销该应用

Step 1 : Closeable 停止相关类 , 停止项目 , 当我们点击停止后 , 可以看到如下一串log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码com.alibaba.nacos.client.naming          : com.alibaba.nacos.client.naming.beat.BeatReactor do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.EventDispatcher do shutdown begin
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.EventDispatcher do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.HostReactor do shutdown begin
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.PushReceiver do shutdown begin
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.PushReceiver do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.backups.FailoverReactor do shutdown begin
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.backups.FailoverReactor do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.HostReactor do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.net.NamingProxy do shutdown begin
com.alibaba.nacos.client.naming : [NamingHttpClientManager] Start destroying NacosRestTemplate
com.alibaba.nacos.client.naming : [NamingHttpClientManager] Destruction of the end
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.net.NamingProxy do shutdown stop


这里可以看到 , 这里进行了关闭操作 , 其中主要的 Close 操作都是基于 com.alibaba.nacos.common.lifecycle.Closeable 进行实现

// PS : 此处的调用逻辑为 TODO

Step 2 : NacosServiceRegistry 注销

除了这里 Closeable 会关闭外 , 还会注销 Service , 此处主要是NacosServiceRegistry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public void deregister(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
return;
}

NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();

try {
namingService.deregisterInstance(serviceId, group, registration.getHost(),
registration.getPort(), nacosDiscoveryProperties.getClusterName());
} catch (Exception e) {
// 省略 log
}

}

// PS : 此处的原理为 继承了 ServiceRegistry , 实现销毁逻辑
C- AbstractAutoServiceRegistration

Nacos_Closeable 体系

Nacos_Closeable.png

3.1.4 健康检查流程

Nacos 提供对服务的实时的健康检查,阻止向不健康的主机或服务实例发送请求。Nacos 支持传输层 (PING 或 TCP)和应用层 (如 HTTP、MySQL、用户自定义)的健康检查。

健康检查相关接口

  • 发送实例心跳 (InstanceController) : /nacos/v1/ns/instance/beat
  • 更新实例的健康状态 (HealthController) : /nacos/v1/ns/health/instance

Client 端发起心跳

服务端会定时发起心跳操作调用2个接口 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码C- BeatReactor
PC- BeatTask : 内部类

// 其中会有2个步骤 :

// Step 1 : BeatReactor # addBeatInfo 中添加定时任务
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
// 添加心跳信息
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

// Step 2 : BeatTask 中调用 Server 接口进行心跳操作
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);


// PS : 心跳的间隔默认是 5秒 ( com.alibaba.nacos.api.common.Constants)
public static final long DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
public static final long DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);

服务端检测心跳

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
java复制代码// 服务端同样会对实例进行检测 , 核心类为 ClientBeatCheckTask

// Step 1 : ClientBeatCheckTask 的创建
C- Service
?- 在 Service 初始化时 ,即开始了 Task 任务


public void init() {
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
//............
}

// 这里也可以看到默认时间的设置
public static void scheduleCheck(ClientBeatCheckTask task) {
futureMap.computeIfAbsent(task.taskKey(),
k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}



// Step 2 : ClientBeatCheckTask 的运行
C- ClientBeatCheckTask
?- 检查并更新临时实例的状态,如果它们已经过期则删除它们。


public void run() {
//...... 省略

// Step 1 : 获取所有实例
List<Instance> instances = service.allIPs(true);

for (Instance instance : instances) {
// 如果时间大于心跳超时时间 , 则修改健康状态
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}

if (!getGlobalConfig().isExpireInstance()) {
return;
}

for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
// 如果心跳大于删除时间 , 则删除实例
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
deleteIp(instance);
}
}
}

// PS : 第一次修改的是健康状态 , 后面才修改的实例数

ACK 健康检查主要逻辑 ,同时推送信息

无意中发现了一个 ACK 机制 , 是通过事件出发的 . 这个模式看代码主要是为了在 Server 发生变化的时候 ,通过 udpClient , 以 UDP 端口推送更新

PS : 客户端在查询服务实例的时候,如果提供 udp 端口,则 server 会创建 udpClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
java复制代码for (Instance instance : service.allIPs(Lists.newArrayList(clusterName))) {
if (instance.getIp().equals(ip) && instance.getPort() == port) {
instance.setHealthy(valid);
// 发布事件 ServiceChangeEvent
pushService.serviceChanged(service);
break;
}
}

// ServiceChangeEvent 事件的处理
C- PushService
public void onApplicationEvent(ServiceChangeEvent event) {
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();

Future future = GlobalExecutor.scheduleUdpSender(() -> {
try {
// 通过 ServerName 和 命名空间 获取PushClient 集合
ConcurrentMap<String, PushClient> clients = clientMap
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}

Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();

// 循环所有的 PushClient
for (PushClient client : clients.values()) {
if (client.zombie()) {
clients.remove(client.toString());
continue;
}

Receiver.AckEntry ackEntry;
// 获取缓存 key ,并且从缓存中获取实体数据
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();

}

// 构建 ACK 实体类
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}

// UDP ACK 校验 , 同时推送 ACK 实体
udpPush(ackEntry);
}
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);

} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}

}, 1000, TimeUnit.MILLISECONDS);

futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

}

private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
if (ackEntry == null) {
return null;
}

if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return ackEntry;
}

try {
if (!ackMap.containsKey(ackEntry.key)) {
totalPush++;
}
ackMap.put(ackEntry.key, ackEntry);
udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());

// Socket Send
udpSocket.send(ackEntry.origin);

ackEntry.increaseRetryTime();

GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);

return ackEntry;
} catch (Exception e) {
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;

return null;
}
}

健康检查阈值的使用

在配置服务时 , 可以配置一个 0-1 的浮点数 , 定义健康检查的阈值 ,该阈值对应的类为 com.alibaba.nacos.api.naming.pojo.Service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码C- Service
F- name : 服务名
F- protectThreshold : 健康阈值
F- appName : 应用名
F- groupName : 组名
F- metadata : 元数据

// 阈值的使用
C- InstanceController
M- doSrvIpxt :

// 核心逻辑

double threshold = service.getProtectThreshold();
// IPMap 中可用的健康实例数/服务总数的比例 如果小于阈值 , 则达到保护阈值
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {

if (isCheck) {
result.put("reachProtectThreshold", true);
}
ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
ipMap.get(Boolean.FALSE).clear();
}


PS : 这里联想后面 , Client 做 Balancer 时 , 获取的 Server 实际上就是全部健康的实例了

但是阈值的目的是什么呢 ?

假设实例出现了大量的异常 , 那么就会导致最后压力会到那几个健康的实例上 , 这个时候 , 可能会出现连锁反应

为了避免这些情况 ,当达到健康阈值的时候 , 就将所有的实例返回.

这就是那句**ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));**的目的.

虽然Client 端可能碰到异常实例 ,但是可以避免整个系统崩溃

/nacos/v1/ns/health/instance 参数

名称 类型 是否必选 描述
namespaceId 字符串 否 命名空间ID
serviceName 字符串 是 服务名
groupName 字符串 否 分组名
clusterName 字符串 否 集群名
ip 字符串 是 服务实例IP
port int 是 服务实例port
healthy boolean 是 是否健康

权重的处理

权重主要是 Instance 对象中进行配置

1
2
3
4
5
6
7
8
9
10
11
java复制代码C- Instance
M- instanceId
M- ip
M- port
M- weight : 权重
M- healthy : 健康情况
M- enabled
M- ephemeral
M- clusterName
M- serviceName
M- metadata

PS : 权重可以用于 Client 端时进行 权重分配处理

参考原文 @ blog.csdn.net/krisdad/art…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码public class NacosWeightLoadBalanceRule extends AbstractLoadBalancerRule {

@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {}

@Resource private NacosDiscoveryProperties nacosDiscoveryProperties;

@Override
public Server choose(Object key) {
// 1.获取服务的名称
BaseLoadBalancer loadBalancer = (BaseLoadBalancer) this.getLoadBalancer();
String serverName = loadBalancer.getName();
// 2.此时Nacos Client会自动实现基于权重的负载均衡算法
NamingService namingService = nacosDiscoveryProperties.namingServiceInstance();
try {
Instance instance = namingService.selectOneHealthyInstance(serverName);
return new NacosServer(instance);
} catch (NacosException e) {
e.printStackTrace();
}
return null;
}

@Bean
public IRule getLoadBalancerRule(){
return new NacosWeightLoadBalancerRule();
}


// PS : 个人以为 , 权重是给 Client 端自行处理的

其他要点

1
2
3
4
5
6
7
8
9
10
java复制代码// Nacos 的默认值

@Value("${nacos.naming.empty-service.auto-clean:false}")
private boolean emptyServiceAutoClean;

@Value("${nacos.naming.empty-service.clean.initial-delay-ms:60000}")
private int cleanEmptyServiceDelay;

@Value("${nacos.naming.empty-service.clean.period-time-ms:20000}")
private int cleanEmptyServicePeriod;

3.2 Nacos 配置流程 C30-C60

3.2.1 Nacos 配置的管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码// 获取配置 , 这里主要有几个步骤 :
C30- NacosConfigService
M30_01- getConfigInner(String tenant, String dataId, String group, long timeoutMs)
- 构建 ConfigResponse , 为其设置 dataId , tenant , group
1- 调用 LocalConfigInfoProcessor.getFailover 优先使用本地配置
2- 调用 ClientWorker , 获取远程配置 -> PS:M30_01_01
3- 仍然没有 , LocalConfigInfoProcesso.getSnapshot 获取快照
End- configFilterChainManager 进行 Filter 链处理

// PS:M30_01_01 ClientWorker 的处理
ClientWorker 中进行了远程服务的请求 , 核心代码 :
agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);

// 可以看到 , 这里并没有太负载的逻辑 , 仍然是 Rest 请求 : PS 看官网的消息 , 2.0 会采用长连接 , 这里应该会有变动
- Constants.CONFIG_CONTROLLER_PATH : /v1/cs/configs

LocalConfigInfoProcessor 主要逻辑

1
2
3
4
5
6
7
8
9
10
JAVA复制代码// 这里本地配置是指本地 File 文件 , 这里通过源码推断一下使用的方式 : 
C31- LocalConfigInfoProcessor
M31_01- getFailover
- 获取 localPath -> PS:M31_01_01
M32_02- saveSnapshot : 获取成功后 , 会保存快照
?- 保存路径 : 省略\nacos\config\fixed-127.0.0.1_8848_nacos\snapshot\one1\test1


// PS:M31_01_01 localPath 参数
C:\Users\10169\nacos\config\fixed-127.0.0.1_8848_nacos\data\config-data\one1\test1

Pro 1 : 从这个源码里面 , 可以看到哪些知识点 ?

既然存在本地文件 , 是否意味着我可以通过修改这个路径优先使用本地配置 , 此处修改以下路径后测试成功
省略\nacos\config\fixed-127.0.0.1_8848_nacos\data\config-data\one1\test1

1
2
3
4
5
6
java复制代码PS : 除了这个路径 , SpringBoot 运行在配置文件中直接配置路径 -> 
spring:
cloud:
config:
# 相同配置,本地优先
override-none: true

Pro 2 : Filter 的使用

上面可以看到 , 配置的处理中 , 都有个默认的 Filter 处理

configFilterChainManager.doFilter(null, cr);

1
2
3
4
5
java复制代码    // 依照这个逻辑 , 是可以进行更多配置的
C32- ConfigFilterChainManager
?- 其中允许自定义添加 Filter
M32_01- addFilter
M32_02- doFilter

TODO : 此处如何植入 Filter 待完善 , 没找到接口添加Filter , 奇怪….

3.2.2 Nacos 配置的容灾处理

Nacos LocalConfigInfoProcessor 提供了容灾的功能 , 方式包含2种 : 本地配置和快照处理

本地配置

上面说了 , 修改指定路径即可实现

配置快照
Nacos 的客户端 SDK 会在本地生成配置的快照。当客户端无法连接到 Nacos Server 时,可以使用配置快照显示系统的整体容灾能力。配置快照类似于 Git 中的本地 commit,也类似于缓存,会在适当的时机更新,但是并没有缓存过期(expiration)的概念。

3.2.3 Nacos 动态配置处理

动态配置主要是指配置变更时的监听 :

Nacos 通过长轮询检测配置是否变化 , 对应的核心类为 LongPollingRunnable # checkUpdateDataIds , 对这里 Debug 看下 >>>>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
java复制代码

class LongPollingRunnable implements Runnable {

private final int taskId;

public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}

@Override
public void run() {

// .... 核心语句
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
}

}

List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {
StringBuilder sb = new StringBuilder();
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isUseLocalConfigInfo()) {
sb.append(cacheData.dataId).append(WORD_SEPARATOR);
sb.append(cacheData.group).append(WORD_SEPARATOR);
if (StringUtils.isBlank(cacheData.tenant)) {
sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
} else {
sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
}
if (cacheData.isInitializing()) {
// It updates when cacheData occours in cacheMap by first time.
inInitializingCacheList
.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
}
}
}
boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}


List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {

Map<String, String> params = new HashMap<String, String>(2);
params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
Map<String, String> headers = new HashMap<String, String>(2);
// 长轮询方式
headers.put("Long-Pulling-Timeout", "" + timeout);

// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.put("Long-Pulling-Timeout-No-Hangup", "true");
}

if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
}

try {
// In order to prevent the server from handling the delay of the client's long task,
// increase the client's read timeout to avoid this problem.

long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
// /v1/cs/configs/listener
HttpRestResult<String> result = agent
.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),
readTimeoutMs);

if (result.ok()) {
setHealthServer(true);
return parseUpdateDataIdResponse(result.getData());
} else {
setHealthServer(false);
}
} catch (Exception e) {
setHealthServer(false);
throw e;
}
return Collections.emptyList();
}


// 对应的 Controller 为 ConfigController
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {

// .............

Map<String, String> clientMd5Map;
try {
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}

// do long-polling
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}

// 对应轮询的接口
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {

// Long polling.
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}

// Compatible with short polling logic.
List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);

// Compatible with short polling result.
String oldResult = MD5Util.compareMd5OldResult(changedGroups);
String newResult = MD5Util.compareMd5ResultString(changedGroups);

String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
if (version == null) {
version = "2.0.0";
}
int versionNum = Protocol.getVersionNumber(version);

// Before 2.0.4 version, return value is put into header.
if (versionNum < START_LONG_POLLING_VERSION_NUM) {
response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
} else {
request.setAttribute("content", newResult);
}

Loggers.AUTH.info("new content:" + newResult);

// Disable cache.
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
return HttpServletResponse.SC_OK + "";
}


// LongPollingService

这里想看详情推荐看这一篇 https://www.jianshu.com/p/acb9b1093a54

3.2.4 Nacos 元数据

这里来看一下 , Nacos 的元数据是什么 ?

Nacos数据(如配置和服务)描述信息,如服务版本、权重、容灾策略、负载均衡策略、鉴权配置、各种自定义标签 (label),从作用范围来看,分为服务级别的元信息、集群的元信息及实例的元信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码// 方式一 : 配置服务的时候配置
spring:
application:
name: nacos-config-server
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
metadata:
version: v1

// 方式二 : 页面直接配置

// 方式三 : API 调用 , 通过 Delete , Update 等请求方式决定类型
批量更新实例元数据 (InstanceController) : /nacos/v1/ns/instance/metadata/batch
批量删除实例元数据 (InstanceController) : /nacos/v1/ns/instance/metadata/batch

image-20210527135529407.png

3.3 Nacos 负载均衡

Nacos 的负载均衡归属于 动态 DNS 服务

Nacos_service_list.jpg

动态 DNS 服务支持权重路由,让您更容易地实现中间层负载均衡、更灵活的路由策略、流量控制以及数据中心内网的简单DNS解析服务。动态DNS服务还能让您更容易地实现以 DNS 协议为基础的服务发现,以帮助您消除耦合到厂商私有服务发现 API 上的风险。

基于 Feign 相关的知识 , 我们知道 , Balance 的处理是在 BaseLoadBalancer 中结合 Rule 处理的 , 这里我们来分析一下 , 2 者是通过什么结构进行连接处理的

Step 1 : Feign 对 Nacos 的调用

我们以 BaseLoadBalancer 为起点 , 进行 Debug 处理 , 来到点位 PredicateBasedRule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
java复制代码C- PredicateBasedRule
M- choose(Object key)
- Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
?- 此处可以看到 , 其中有一个 lb.getAllServers 的操作 , 此处的 lb 为 DynamicServerListLoadBalancer

// PS : getAllServers , 此处可以看到 ,其中的 Servers 已经全部放在 List 中了
public List<Server> getAllServers() {
return Collections.unmodifiableList(allServerList);
}


// 跟踪一下放入的逻辑 , 放入逻辑的起点是ILoadBalancer Bean 的加载 , 其调用链为 :
C- RibbonClientConfiguration # ribbonLoadBalancer : 构建一个 ILoadBalancer
C- ZoneAwareLoadBalancer : 进入 ZoneAwareLoadBalancer 构造函数
C- DynamicServerListLoadBalancer : 进入 构造函数
C- DynamicServerListLoadBalancer # restOfInit : init 操作
C- DynamicServerListLoadBalancer # updateListOfServers : 更新 Server 列表主流程 , 此处第一次获取相关的 Server List , 后续Debug 第一节点
C- DynamicServerListLoadBalancer # updateAllServerList : 设置 ServerList


public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
}
}
updateAllServerList(servers);
}


可以看到 , 其中有2个获取 Server 的逻辑方法 , 在这里看一下家族体系 , 就清楚了
serverListImpl.getUpdatedListOfServers();
filter.getFilteredListOfServers(servers);


// 下述图片中就很清楚了 , 存在一个实现类 NacosServerList 实现类 , 从Nacos 中获取服务列表
private List<NacosServer> getServers() {
try {
String group = discoveryProperties.getGroup();
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, group, true);
return instancesToServerList(instances);
} catch (Exception e) {
throw new IllegalStateException(....);
}
}


// 负载均衡策略
负载均衡策略是基于 Balance

Nacos_ServerList.png

3.4 集群的处理

Nacos 集群使用

Nacos 的集群使用比较简单 , 只需要在 /conf/cluster.conf 中配置对应的服务信息即可>>>>

1
2
3
4
5
6
java复制代码
#it is ip
#example
127.0.0.1:8848
127.0.0.1:8849
127.0.0.1:8850

Nacos 集群源码跟踪

来看一下源码层面 , 这个逻辑是怎么处理的 ?

核心处理类在 com.alibaba.nacos.core.cluster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码
// Step 1 : 获取配置的方式
C- EnvUtil
public static String getClusterConfFilePath() {
return Paths.get(getNacosHome(), "conf", "cluster.conf").toString();
}

// 读取 Cluster 配置
public static List<String> readClusterConf() throws IOException {
try (Reader reader = new InputStreamReader(new FileInputStream(new File(getClusterConfFilePath())),
StandardCharsets.UTF_8)) {
return analyzeClusterConf(reader);
} catch (FileNotFoundException ignore) {
List<String> tmp = new ArrayList<>();
String clusters = EnvUtil.getMemberList();
if (StringUtils.isNotBlank(clusters)) {
String[] details = clusters.split(",");
for (String item : details) {
tmp.add(item.trim());
}
}
return tmp;
}
}


// Step 2 : Cluster 的使用
AbstractMemberLookup

// 主要使用集中在 ServerManager 中
C- ServerManager
F- ServerMemberManager memberManager;

C- ServerMemberManager : Nacos中的集群节点管理
M- init : 集群节点管理器初始化
M- getSelf : 获取本地节点信息
M- getmemberaddressinfo : 获取正常成员节点的地址信息
M- allMembers : 获取集群成员节点的列表
M- update : 更新目标节点信息
M- isUnHealth : 目标节点是否健康
M- initAndStartLookup : 初始化寻址模式

// TODO : 其他方法就省略了 , 后期准备进行相关的性能分析 , 集群的源码梳理预计放在那一部分分析

总结

这篇文章又是一篇更偏向于应用的文章 , 源码深入的较少 , 更重要的原因是因为 Nacos 的源码分层更清楚 , 结构清晰 , 不需要太负载的深入.

另外 , Nacos 2.0 也在发布中 , 看文档采用了 Socket 长连接的方式 , 后续如果有机会 , 对比一下2者的区别看看.

附录

# 附录一 : 手动调用 Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
java复制代码package com.alibaba.nacos.discovery.service;

import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Cluster;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.Service;
import com.alibaba.nacos.api.naming.pojo.healthcheck.AbstractHealthChecker;
import netscape.javascript.JSObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* @Classname NacosClientService
* @Description TODO
* @Date 2021/5/26
* @Created by zengzg
*/
@Component
public class NacosClientNodesService implements ApplicationRunner {

private Logger logger = LoggerFactory.getLogger(this.getClass());

private NamingService namingService;

@Value("${spring.cloud.nacos.config.server-addr}")
private String serverAddr;

@Override
public void run(ApplicationArguments args) throws Exception {
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
namingService = NacosFactory.createNamingService(properties);
}

/**
* 获取 Nacos Config
* 参数格式
* {
* "instanceId": "192.168.0.97#9083#DEFAULT#DEFAULT_GROUP@@nacos-user-server",
* "ip": "192.168.0.97",
* "port": 9083,
* "weight": 1, 可以通过权重决定使用的 Server
* "healthy": true,
* "enabled": true,
* "ephemeral": true,
* "clusterName": "DEFAULT",
* "serviceName": "DEFAULT_GROUP@@nacos-user-server",
* "metadata": {
* "preserved.register.source": "SPRING_CLOUD"
* },
* "ipDeleteTimeout": 30000,
* "instanceHeartBeatInterval": 5000,
* "instanceHeartBeatTimeOut": 15000
* }
*
* @param serviceName
* @return
*/
public List<Instance> get(String serviceName) {
List<Instance> content = new LinkedList<Instance>();
try {
content = namingService.getAllInstances(serviceName);
logger.info("------> 获取 Config serviceName [{}] <-------", serviceName);
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}

return content;


}

/**
* 创建 Nacos Config
*
* @param serviceName
* @param ip
* @param port
*/
public void createOrUpdate(String serviceName, String ip, Integer port) {
try {
logger.info("------> 创建 Config GroupID [{}] -- DataID [{}] Success ,The value :[{}] <-------", serviceName, ip, port);
namingService.registerInstance(serviceName, ip, port, "TEST1");
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}
}


/**
* 移除 Nacos Config
*
* @param serviceName
* @param ip
*/
public void delete(String serviceName, String ip, Integer port) {
try {
namingService.deregisterInstance(serviceName, ip, port, "DEFAULT");
logger.info("------> 删除 Config GroupID [{}] -- DataID [{}] Success <-------", serviceName, ip);
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}
}
}

# 附录二 : 手动调用 Config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
java复制代码public class NacosClientConfigService implements ApplicationRunner {

private Logger logger = LoggerFactory.getLogger(this.getClass());

private ConfigService configService;

@Value("${spring.cloud.nacos.config.server-addr}")
private String serverAddr;

@Override
public void run(ApplicationArguments args) throws Exception {
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
configService = NacosFactory.createConfigService(properties);
}


/**
* 获取 Nacos Config
*
* @param dataId
* @param groupId
* @return
*/
public String get(String dataId, String groupId) {
String content = "";
try {
content = configService.getConfig(dataId, groupId, 5000);
logger.info("------> 获取 Config GroupID [{}] -- DataID [{}] Success ,The value :[{}] <-------", dataId, groupId, content);

configService.addListener(dataId, groupId, new ConfigListener());

} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}

return content;


}

/**
* 创建 Nacos Config
*
* @param dataId
* @param groupId
* @param content
*/
public void createOrUpdate(String dataId, String groupId, String content) {
try {
logger.info("------> 创建 Config GroupID [{}] -- DataID [{}] Success ,The value :[{}] <-------", dataId, groupId, content);
configService.publishConfig(dataId, groupId, content);
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}
}


/**
* 移除 Nacos Config
*
* @param dataId
* @param groupId
*/
public void delete(String dataId, String groupId) {
try {
configService.removeConfig(dataId, groupId);
logger.info("------> 删除 Config GroupID [{}] -- DataID [{}] Success <-------", dataId, groupId);

configService.removeListener(dataId, groupId, null);

} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}
}


}

# 附录三 : 使用NacosInjected

可以通过 的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码<!-- 使用 Nacos Inject -->
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>0.2.7</version>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-discovery-spring-boot-starter</artifactId>
<version>0.2.7</version>
</dependency>


@NacosInjected
private ConfigService configService;

@NacosInjected
private NamingService namingService;

# 附录四 : Naocs 官方架构图 (搬运)

这里是纯搬运 , 可以看官方文档 @ nacos.io/zh-cn/docs/…

功能图 :
image.png

  • 服务管理:实现服务CRUD,域名CRUD,服务健康状态检查,服务权重管理等功能
  • 配置管理:实现配置管CRUD,版本管理,灰度管理,监听管理,推送轨迹,聚合数据等功能
  • 元数据管理:提供元数据CURD 和打标能力
  • 插件机制:实现三个模块可分可合能力,实现扩展点SPI机制
  • 事件机制:实现异步化事件通知,sdk数据变化异步通知等逻辑
  • 日志模块:管理日志分类,日志级别,日志可移植性(尤其避免冲突),日志格式,异常码+帮助文档
  • 回调机制:sdk通知数据,通过统一的模式回调用户处理。接口和数据结构需要具备可扩展性
  • 寻址模式:解决ip,域名,nameserver、广播等多种寻址模式,需要可扩展
  • 推送通道:解决server与存储、server间、server与sdk间推送性能问题
  • 容量管理:管理每个租户,分组下的容量,防止存储被写爆,影响服务可用性
  • 流量管理:按照租户,分组等多个维度对请求频率,长链接个数,报文大小,请求流控进行控制
  • 缓存机制:容灾目录,本地缓存,server缓存机制。容灾目录使用需要工具
  • 启动模式:按照单机模式,配置模式,服务模式,dns模式,或者all模式,启动不同的程序+UI
  • 一致性协议:解决不同数据,不同一致性要求情况下,不同一致性机制
  • 存储模块:解决数据持久化、非持久化存储,解决数据分片问题
  • Nameserver:解决namespace到clusterid的路由问题,解决用户环境与nacos物理环境映射问题
  • CMDB:解决元数据存储,与三方cmdb系统对接问题,解决应用,人,资源关系
  • Metrics:暴露标准metrics数据,方便与三方监控系统打通
  • Trace:暴露标准trace,方便与SLA系统打通,日志白平化,推送轨迹等能力,并且可以和计量计费系统打通
  • 接入管理:相当于阿里云开通服务,分配身份、容量、权限过程
  • 用户管理:解决用户管理,登录,sso等问题
  • 权限管理:解决身份识别,访问控制,角色管理等问题
  • 审计系统:扩展接口方便与不同公司审计系统打通
  • 通知系统:核心数据变更,或者操作,方便通过SMS系统打通,通知到对应人数据变更
  • OpenAPI:暴露标准Rest风格HTTP接口,简单易用,方便多语言集成
  • Console:易用控制台,做服务管理、配置管理等操作
  • SDK:多语言sdk
  • Agent:dns-f类似模式,或者与mesh等方案集成
  • CLI:命令行对产品进行轻量化管理,像git一样好用

领域模型 :

image.png

SDK 类图 :

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

借助MySQL的全文索引+ngram解释器实现中文搜索的踩坑

发表于 2021-05-27

公司要做一个搜索,数据量不大但是着急,引入ES就显得来不及了,于是决定就用mysql的全文索引原地实现,过程中踩了一些坑:

一开始是就是简单了使用了FULLTEXT全文索引 ,然后在搜索的时候发现结果十分的不尽人意,原来是MYSQL的FULLTEXT索引默认的分词策略的问题,中文全部按照一种分词方式来,也就是说 你要是不搜中文的全名,在自然语言模式下(IN NATURE LANGUAGE MODE) 根本搜不出来东西。

全文索引文档:dev.mysql.com/doc/refman/…

但是在查阅文档的时候发现了ngram这个解释器,mysql自带的

ngram官方文档:dev.mysql.com/doc/refman/…

  1. 因为都是已经存在了的表了 就直接Navicat或者DDL语句就可以新建,我依赖于图形工具,就直接新建了但是第一个坑就来了:
    image.png
    直接在原有的索引上进行的修改 保存之后没生效!!! 但是我不知道 展现出来的就还是搜索不出来结果,后来检查了一下才发现不能完全相信Navicat
  2. 那就使用DDL语句呗 2号坑就来了,我用Navicat模拟了一下操作 生成了DDL语句如下(错误的那个):

错误DDL:

ALTER TABLE biz_shop_spu
DROP INDEX IDX_NAME_FULLTEXT,
ADD FULLTEXT INDEX IDX_NAME_FULLTEXT(NAME) WITH PARSER NGRAM;

正确:

ALTER TABLE biz_shop_spu
DROP INDEX IDX_NAME_FULLTEXT;

ALTER TABLE biz_shop_spu
ADD FULLTEXT INDEX IDX_NAME_FULLTEXT(NAME) WITH PARSER NGRAM;

必须重新ALTER一下表 不然直接DROP旧索引插入索引的话 ,用FlyWay执行不会报错 但是还是没有使用ngram。

flyway执行结果图:

image.png
3. 至此一个使用ngram作为解释器的全文类型的索引 就建立完成了,但是分词只会对新数据进行分词,导致使用全文索引搜不出来历史数据
需要显式的使用 OPTIMIZE TABLE biz_shop_spu

这个是清理表碎片文件的命令 但是它也会重新更新索引的统计数据

ps:命令执行期间会锁表哦~

至此踩坑部分完毕

下面就是一些ngram的小技巧

  • 观察索引的分词:

如果使用了ngram就不用看这个SHOW VARIABLES LIKE '%ft%';了 而是看这个
show variables like 'ngram_token_size%'; 默认是2

  • 显式指定全文检索表源:

SET GLOBAL innodb_ft_aux_table="db_name/tableName" 搭配SELECT * FROM information_schema.innodb_ft_index_cache ORDER BY doc_id , position;可以看到哪些词在全文索引里面

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

零基础Python爬虫教程:爬取付费电影,告别费钱的日子

发表于 2021-05-27

是不是有好多的小伙伴跟好久好久以前的小编一样,看一个电影充个会员,这个没关系,最主要的是,充一个平台的VIP还不行得有好几个才可以。

这么贫穷的小编,当然只能看6分钟的视频了,不过没关系,小编现在有python。

不会小伙伴此刻的心情是:(如同所示)

image.png
不过没关系,小编接下来就是授教大家一些Python神技(零基础的也是可以操作的奥)

让咱们首先来看看实现效果吧

image.png
把你想要看的VIP电影地址复制粘贴到上面用Python做好的数据转换工具,就可以实现在线免费观看电影了。

搜索下方加老师微信 老师微信号:XTUOL1988【切记备注:学习Python】 领取Python web开发,Python爬虫,Python数据分析,人工智能等学习教程。带你从零基础系统性的学好Python!

一般来说各大平台的VIP电影只能观看6分钟→_→ (四不四很伤 ):

image.png
按照小编的授教之后呢大家看下实现效果之后:

image.png
不是很吃惊?是不是感到“亮瞎了”哈哈哈 ~

这就是Python的独特的魅力,并且像你知道的 前沿科技 技术:爬虫工程、自动化运维、数据挖掘、人工智能、游戏开发、web开发、数据分析、自动化测试、大数据技术、区块链技术……等 都是需要python的!

好啦 好啦,用一句经典的话总结:“向代码致敬”!

image.png
最后:若是小伙伴还没有安装PyCharm工具的以及没有搭建Python环境的,可以告诉下小编,帮大家哈!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

闲鱼是如何利用RxJava提升异步编程能力的

发表于 2021-05-27

作者:闲鱼技术——鲲鸣

RxJava是Java对于反应式编程的一个实现框架,是一个基于事件的、提供实现强大且优雅的异步调用程序的代码库。18年以来,由淘宝技术部发起的应用架构升级项目,希望通过反应式架构、全异步化的改造,提升系统整体性能和机器资源利用率,减少网络延时,资源的重复使用,并为业务快速创新提供敏捷的架构支撑。在闲鱼的基础链路诸如商品批量更新、订单批量查询等,都利用了RxJava的异步编程能力。

​

不过,RxJava是入门容易精通难,一不小心遍地坑。今天来一起看下RxJava的使用方式、基本原理、注意事项。

1.开始之前

让我们先看下,使用RxJava之前,我们曾经写过的回调代码存在的痛点。

当我们的应用需要处理用户事件、异步调用时,随着流式事件的复杂性和处理逻辑的复杂性的增加,代码的实现难度将爆炸式增长。比如我们有时需要处理多个事件流的组合、处理事件流的异常或超时、在事件流结束后做清理工作等,如果需要我们从零实现,势必要小心翼翼地处理回调、监听、并发等很多棘手问题。

还有一个被称作“回调地狱”的问题,描述的是代码的不可读性。

Code 1.1

​

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
javascript复制代码// 示例引自callbackhell.com
fs.readdir(source, function (err, files) {
if (err) {
console.log('Error finding files: ' + err)
} else {
files.forEach(function (filename, fileIndex) {
console.log(filename)
gm(source + filename).size(function (err, values) {
if (err) {
console.log('Error identifying file size: ' + err)
} else {
console.log(filename + ' : ' + values)
aspect = (values.width / values.height)
widths.forEach(function (width, widthIndex) {
height = Math.round(width / aspect)
console.log('resizing ' + filename + 'to ' + height + 'x' + height)
this.resize(width, height).write(dest + 'w' + width + '_' + filename, function(err) {
if (err) console.log('Error writing file: ' + err)
})
}.bind(this))
}
})
})
}
})

​

以上js代码有两个明显槽点: 1.由于传入的层层回调方法,代码结尾出现一大堆的 }) ; 2. 代码书写的顺序与代码执行的顺序相反:后面出现回调函数会先于之前行的代码先执行。

而如果使用了RxJava,我们处理回调、异常等将得心应手。

2.引入RxJava

假设现在要异步地获得一个用户列表,然后将结果进行处理,比如展示到ui或者写到缓存,我们使用RxJava后代码如下:

Code 2.1

​

1
2
3
4
5
6
7
8
9
10
11
12
13
14
less复制代码Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(@NotNull ObservableEmitter<Object> emitter) throws Exception {
System.out.println(Thread.currentThread().getName() + "----TestRx.subscribe");
List<UserDo> result = userService.getAllUser();
for (UserDo st : result) {emitter.onNext(st);}
}
});
Observable<String> map = observable.map(s -> s.toString());
// 创建订阅关系
map.subscribe(o -> System.out.println(Thread.currentThread().getName() + "----sub1 = " + o)/*更新到ui*/);
map.subscribe(o -> System.out.println(Thread.currentThread().getName() + "----sub2 = " + o)/*写缓存*/,
e-> System.out.println("e = " + e)),
()->System.out.println("finish")));

​

userService.getAllUser()是一个普通的同步方法,但是我们把它包到了一个Observable中,当有结果返回时,将user逐个发送至监听者。第一个监听者更新ui,第二个监听者写到缓存。并且当上游发生异常时,进行打印;在事件流结束时,打印finish。

另外还可以很方便的配置上游超时时间、调用线程池、fallback结果等,是不是非常强大。

​

需要注意的是,RxJava代码就像上面例子中看起来很容易上手,可读性也很强,但是如果理解不充分,很容易出现意想不到的bug:初学者可能会认为,上面的代码中,一个user列表返回后,每个元素会被异步地发送给两个下游的观察者,这两个观察者在各自的线程内打印结果。但事实却不是这样:userService.getAllUser()会被调用两次(每当建立订阅关系时方法getAllUser()都会被重新调用),而user列表被查询出后,会同步的发送给两个观察者,观察者也是同步地打印出每个元素。即sub1 = user1,sub1 = user2,sub1 = user3,sub2 = user1,sub2 = user2,sub2 = user3。

可见,如果没有其他配置,RxJava默认是同步阻塞的!!!那么,我们如何使用它的异步非阻塞能力呢,我们接着往下看。

Code 2.2

​

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
scss复制代码Observable
.fromCallable(() -> {
System.out.println(Thread.currentThread().getName() + "----observable fromCallable");
Thread.sleep(1000); // imitate expensive computation
return "event";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.map(i->{
System.out.println(Thread.currentThread().getName() + "----observable map");
return i;
})
.observeOn(Schedulers.newThread())
.subscribe(str -> System.out.println(Thread.currentThread().getName() + "----inputStr=" + str));

System.out.println(Thread.currentThread().getName() + "----end");

Thread.sleep(2000); // <--- wait for the flow to finish. In RxJava the default Schedulers run on daemon threads

​

我们用Observable.fromCallable()代替code2.1中最底层的Observable.create方法,来创建了一个Observable(即被观察者)。fromCallable方法创建的是一个lazy的Observable,只有当有人监听它时,传入的代码才被执行。(关于这一点,我们后面会讲,这里只是为了展示有很多种创建Observable的方式)。

然后通过subscribeOn(Schedulers.io())指定了被观察者执行的线程池。observeOn(Schedulers.single())指定了下游观察者(map方法实际也是一个观察者)执行的线程池。map方法如同很多流式编程api一样,将上游的每个元素转化成另一个元素。最后又通过observeOn(Schedulers.newThread())制定了当前下游的观察者,即最后的subscribe中传入的观察者(lambda方式)执行的线程池。

上面的代码执行后,通过打印的线程名可以看出,被观察者、map、观察者均是不同的线程,并且,主线程最后的”end”会先执行,也就是实现了异步非阻塞。

  1. 使用方式

本文不是RxJava的接口文档,不会详细介绍每个api,只是简单讲下一些常见或者特殊api,进一步阐述RxJava的能力。

3.1 基本组件

RxJava的核心原理其实非常简单。可类比观察者模式。Observable是被观察者,作为数据源产生数据。Observer是观察者,消费上游的数据源。

每个Observable可注册多个Observer。但是默认情况下,每当有注册发生时,Observable的生产方法subscribe都会被调用。如果想只生产一次,可以调用Observable.cached方法。

被观察者Observable还有多个变体,如Single、Flowable。Single代表只产生一个元素的数据源。Flowable是支持背压的数据源。通过背压设计,下游监听者可以向上游反馈信息,可以达到控制发送速率的功能。

​

Observable和Observer是通过装饰器模式层层包装达到从而串联起来。转换API如map等,会创建一个新的ObservableMap(基层自Observable),包装原始的Observable作为source,而在真正执行时,先做转换操作,再发给下游的观察者。

Scheduler是RxJava为多线程执行提供的支持类,它将可以将生产者或者消费者的执行逻辑包装成一个Worker,提交到框架提供的公共线程池中,如Schedulers.io()、Schedulers.newThread()等。便于理解,可以将Schedulers类比做线程池,Worker类比做线程池中的线程。可以通过Observable.subscribeOn和Observable.observeOn分别制定被观察者和观察者执行的线程,来达到异步非阻塞。

​

RxJava核心架构图如下:

​

image.png

3.2 转换API

•

map: 见Code 2.2,一对一转换,如同很多流式编程api一样,将上游的每个元素转化成另一个元素

•

flatMap: 一对多转换,将上游的每个元素转化成0到多个元素。类比Java8:Stream.flatMap内返回的是stream,Observerable.flatMap内返回的是Observerable。注意,本方法非常强大,很多api底层都是基于此方法。并且由于flatMap返回的多个Observerable是相互独立的,可以基于这个特点,实现并发。

3.3 组合API

•

merge:将两个事件流合并成一个时间流,合并后的事件流的顺序,与上流两个流中元素到来的时间顺序一致。

image.png

•zip: 逐个接收上游多个流的每个元素,并且一对一的组合起来,转换后发送给下游。示例见code3.1

code 3.1

​

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
rust复制代码//第一个流每1秒输出一个偶数
Observable<Long> even = Observable.interval(1000, TimeUnit.MILLISECONDS).map(i -> i * 2L);
//第二个流每3秒输出一个奇数
Observable<Long> odd = Observable.interval(3000, TimeUnit.MILLISECONDS).map(i -> i * 2L + 1);
//zip也可以传入多个流,这里只传入了两个
Observable.zip(even, odd, (e, o) -> e + "," + o).forEach(x -> {
System.out.println("observer = " + x);
});

/* 输出如下,可以看到,当某个流有元素到来时,会等待其他所有流都有元素到达时,才会合并处理然后发给下游
observer = 0,1
observer = 2,3
observer = 4,5
observer = 6,7
...
*/

​

代码code 3.1看起来没什么问题,两个流并发执行,最后用zip等待他们的结果。但是却隐藏了一个很重要的问题:RxJava默认是同步、阻塞的!!当我们想去仿照上面的方式并发发送多个请求,最后用zip监听所有结果时,很容易发先一个诡异的现象, code 3.2的代码中,ob2的代码总是在ob1执行之后才会执行,并不是我们预期的两个请求并发执行。而打印出来的线程名也可以看到,两个Single是在同一个线程中顺序执行的!

​

code 3.2

​

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scss复制代码// Single是只返回一个元素的Observable的实现类
Single<String> ob1 = Single.fromCallable(() -> {
System.out.println(Thread.currentThread().getName() + "----observable 1");
TimeUnit.SECONDS.sleep(3);
return userService.queryById(1).getName();
});

Single<String> ob2 = Single.fromCallable(() -> {
System.out.println(Thread.currentThread().getName() + "----observable 2");
TimeUnit.SECONDS.sleep(1);
return userService.queryById(1).getName();
});

String s = Single.zip(ob1, ob2,
(e, o) -> {System.out.println(e + "++++" + o);

​

那为什么code 3.1的两个流能够并发执行呢?阅读源码可以发现zip的实现其实就是先订阅第一个流,再订阅第二个流,那么默认当然是顺序执行。但是通过Observable.interval创建的流,默认会被提交到 Schedulers.computation()提供的线程池中。关于线程池,本文后面会讲解。

3.4 创建API

•create :最原始的create和subscribe,其他创建方法都基于此

code 3.3

​

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
typescript复制代码// 返回的子类是ObservableCreate
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("event");
emitter.onNext("event2");
emitter.onComplete();
}
});
// 订阅observable
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(Thread.currentThread().getName() + " ,TestRx.onSubscribe");
}
@Override
public void onNext(String s) {
System.out.println(Thread.currentThread().getName() + " ,s = " + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + " ,TestRx.onComplete");
}
});

•just : Observable.just(“e1”,”e2”); 简单的创建一个Observable,发出指定的n个元素。•interval:code 3.1已给出示例,创建一个按一定间隔不断产生元素的Observable,默认执行在Schedulers.comutation()提供的线程池中•defer:产生一个延迟创建的Observable。 有点绕:Observable.create等创建出来的被观察者虽然是延迟执行的,只有有人订阅的时候才会真正开始生成数据。但是创建Observable的方法却是立即执行的。而 Observable.defer方法会在有人订阅的时候才开始创建Observable。如代码Code3.4

1
2
3
4
5
6
7
8
9
10
11
12
typescript复制代码public String myFun() {
String now = new Date().toString();
System.out.println("myFun = " + now);
return now;
}

public void testDefer(){
// 该代码会立即执行myFun()
Observable<String> ob1 = Observable.just(myFun());
// 该代码会在产生订阅时,才会调用myFun(), 可类比Java8的Supplier接口
Observable<String> ob2 = Observable.defer(() -> Observable.just(myFun()) );
}

•fromCallable :产生一个延迟创建的Observable,简化的defer方法。Observable.fromCallable(() -> myFun()) 等同于Observable.defer(() -> Observable.just(myFun()) );

4.基本原理

RxJava的代码,就是观察者模式+装饰器模式的体现。

4.1 Observable.create

见代码code 3.3,create方法接收一个ObserverableOnSubscribe接口对象,我们定义了了发送元素的代码,create方法返回一个ObserverableCreate类型对象(继承自Observerable抽象类)。跟进create方法原码,直接返回new出来的ObserverableCreate,它包装了一个source对象,即传入的ObserverableOnSubscribe。

code4.1

​

1
2
3
4
5
csharp复制代码    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
//onAssembly默认直接返回ObservableCreate
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

​

Create方法就这么简单,只需要记住它返回了一个包装了source的Observerble。

4.2 Observerable.subscribe(observer)

看下code3.3中创建订阅关系时(observalbe.subscribe)发生了什么:

code4.2

​

1
2
3
4
5
6
7
8
java复制代码 public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) {... } catch (Throwable e) {... }
}

​

Observable是一个抽象类,定义了subscribe这个final方法,最终会调用subscribeActual(observer);而subscribeActual是由子类实现的方法,自然我们需要看ObserverableCreate实现的该方法。

code4.3

​

1
2
3
4
5
6
7
8
9
typescript复制代码//ObserverableCreate实现的subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);

try {
source.subscribe(parent); //source是ObservableOnSubscribe,即我们写的生产元素的代码
} catch (Throwable ex) {...}
}

1.将观察者observer包装到一个CreateEmitter里。2.调用observer的onSubscribe方法,传入这个emitter。3.调用source(即生产代码接口)的subscribe方法,传入这个emitter。

第二步中,直接调用了我们写的消费者的onSubscribe方法,很好理解,即创建订阅关系的回调方法。

重点在第三步,source.subscribe(parent); 这个parent是包装了observer的emitter。还记得source就是我们写的发送事件的代码。其中手动调用了emitter.onNext()来发送数据。那么我们CreateEmitter.onNext()做了什么

code4.4

​

1
2
3
4
scss复制代码public void onNext(T t) {
if (t == null) {...}
if (!isDisposed()) { observer.onNext(t); }
}

​

!isDisposed()判断若订阅关系还没取消,则调用observer.onNext(t);这个observer就是我们写的消费者,code 3.3中我们重写了它的onNext方法来print接收到的元素。

以上就是RxJava最基本的原理,其实逻辑很简单,就是在创建订阅关系的时候,直接调用生产逻辑代码,然后再生产逻辑的onNext中,调用了观察者observer.onNext。时序图如下。

image.png

显然,最基本的原理,完全解耦了和异步回调、多线程的关系。

4.2 Observable.map

通过最简答的map方法,看下转换api做了什么。

如Code2.1中,调用map方法,传入一个转换函数,可以一对一地将上游的元素转换成另一种类型的元素。

code4.5

​

1
2
3
4
typescript复制代码    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

​

code4.5是Observable定义的final的map方法,可见map方法将this(即原始的observer)和转换函数mapper包装到一个ObservableMap中(ObservableMap也继承Observable),然后返回这个ObservableMap(onAssembly默认什么都不做)。

由于ObservableMap也是一个Observable,所以他的subscribe方法会在创建订阅者时被层层调用到,subscribe是Observable定义的final方法,最终会调用到他实现的subscribeAcutal方法。

code4.6

​

1
2
3
4
typescript复制代码//ObservableMap的subscribeActual
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}

​

可以看到ObservableMap的subscribeActual中,将原始的观察者t和变换函数function包装到了一个新的观察者MapObserver中,并将它订阅到被观察者source上。

我们知道,发送数据的时候,观察者的onNext会被调用,所以看下MapObserver的onNext方法

code4.7

​

1
2
3
4
5
6
7
8
9
10
kotlin复制代码@Override
public void onNext(T t) {
if (done) {return; }
if (sourceMode != NONE) { actual.onNext(null);return;}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {...}
actual.onNext(v);
}

​

code4.7中可以看到mapper.apply(t)将变换函数mapper施加到每个元素t上,变换后得到v,最后调用actual.onNext(v)将v发送给下游观察者actual(actual为code4.6中创建MapObserver时传入的t)。

​

总结一下例如map之类的变换api的原理:

1.map方法返回一个ObservableMap,包装了原始的观察者t和变换函数function2.ObservableMap继承自AbstractObservableWithUpstream(它继承自Observable)3.订阅发生时,observable的final方法subscribe()会调用实现类的subscribeActual4.ObservableMap.subscribeActual中创建MapObserver(包装了原observer),订阅到原Observable5.发送数据onNext被调用时,先apply变换操作,再调用原observer的onNext,即传给下游观察者

​

4.3 线程调度

代码Code 2.2中给出了线程调度的示例。subscribeOn(Schedulers.io())指定了被观察者执行的线程池。observeOn(Schedulers.single())指定了下游观察者执行的线程池。经过了上面的学习,很自然的能够明白,原理还是通过装饰器模式,将Observable和Observer层层包装,丢到线程池里执行。我们以observeOn()为例,见code4.8。

​

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//observeOn(Scheduler) 返回ObservableObserveOn(继承自Observable)
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

// Observable的subscribe方法最终会调用到ObservableObserveOn.subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
//创建一个ObserveOnObserver包装了原观察者、worker,把它订阅到source(原observable)
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

1.observeOn(Scheduler) 返回ObservableObserveOn2.ObservableObserveOn继承自Observable3.所以subscribe方法最终会调用到ObservableObserveOn重写的subscribeActual方法4.subscribeActual返回一个ObserveOnObserver(是一个Observer)包装了真实的observer和worker

​

根据Observer的逻辑,发送数据时onNext方法会被调用,所以要看下ObserveOnObserver的onNext方法:

code4.9

​

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
scss复制代码public void onNext(T t) {
if (done) { return; }
if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t);}
schedule();
}

void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this); //this是ObserveOnObserver,他同样实现了Runable
}
}

public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal(); //最终会调用actual.onNext(v) , 即调用被封装的下游观察者,v是emmiter
}
}

1.最终生产者代码中调用onNext时,会调用schedule方法2.schedule方法中,会提交自身(ObserveOnObserver)到线程池3.而run方法会调用onNext(emmiter)

可见,RxJava线程调度的机制就是通过observeOn(Scheduler)将发送元素的代码onNext(emmiter)提交到线程池里执行。

​

5.使用注意

最后,给出几个我们在开发中总结的注意事项,避免大家踩坑。

5.1 适用场景

并不是所有的IO操作、异步回调都需要使用RxJava来解决,比如如果我们只是一两个RPC服务的调用组合,或者每个请求都是独立的处理逻辑,那么引入RxJava并不会带来多大的收益。下面给出几个最佳的适用场景。

•处理UI事件•异步响应和处理IO结果•事件或数据 是由无法控制的生产者推送过来的•组合接收到的事件

下面给一个闲鱼商品批量补数据的使用场景:

背景:算法推荐了用户的一些商品,目前只有基础信息,需要调用多个业务接口,补充用户和商品的附加业务信息,如用户头像、商品视频连接、商品首图等。并且根据商品的类型不同,填充不同的垂直业务信息。

难点:1. 多个接口存在前后依赖甚至交叉依赖;2. 每个接口都有可能超时或者报错,继而影响后续逻辑;3.根据不同的依赖接口特点,需要单独控制超时和fallback。整个接口也需要设置整体的超时和fallback。

方案:如果只是多个接口独立的异步查询,那么完全可以使用CompletableFuture。但基于它对组合、超时、fallback支持不友好,并不适用于此场景。我们最终采用RxJava来实现。下面是大致的代码逻辑。代码中的HsfInvoker是阿里内部将普通HSF接口转为Rx接口的工具类,默认运行到单独的线程池中,所以能实现并发调用。

​

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
scss复制代码// 查找当前用户的所有商品
Single<List<IdleItemDO>> userItemsFlow =
HSFInvoker.invoke(() -> idleItemReadService.queryUserItems(userId, userItemsQueryParameter))
.timeout(300, TimeUnit.MILLISECONDS)
.onErrorReturnItem(errorRes)
.map(res -> {
if (!res.isSuccess()) {
return emptyList;
}
return res.getResult();
})
.singleOrError();

//补充商品,依赖userItemsFlow
Single<List<FilledItemInfo>> fillInfoFlow =
userItemsFlow.flatMap(userItems -> {

if (userItems.isEmpty()) {
return Single.just(emptyList);
}

Single<List<FilledItemInfo>> extraInfo =
Flowable.fromIterable(userItems)
.flatMap(item -> {

//查找商品extendsDo
Flowable<Optional<ItemExtendsDO>> itemFlow =
HSFInvoker.invoke(() -> newItemReadService.query(item.getItemId(), new ItemQueryParameter()))
.timeout(300, TimeUnit.MILLISECONDS)
.onErrorReturnItem(errorRes)
.map(res -> Optional.ofNullable(res.getData()));

//视频url
Single<String> injectFillVideoFlow =
HSFInvoker.invoke(() -> videoFillManager.getVideoUrl(item))
.timeout(100, TimeUnit.MILLISECONDS)
.onErrorReturnItem(fallbackUrl);

//填充首图
Single<Map<Long, FrontCoverPageDO>> frontPageFlow =
itemFlow.flatMap(item -> {
...
return frontCoverPageManager.rxGetFrontCoverPageWithTpp(item.id);
})
.timeout(200, TimeUnit.MILLISECONDS)
.onErrorReturnItem(fallbackPage);

return Single.zip(itemFlow, injectFillVideoFlow, frontPageFlow, (a, b, c) -> fillInfo(item, a, b, c));
})
.toList(); //转成商品List

return extraInfo;
});

//头像信息
Single<Avater> userAvaterFlow =
userAvaterFlow = userInfoManager.rxGetUserAvaters(userId).timeout(150, TimeUnit.MILLISECONDS).singleOrError().onErrorReturnItem(fallbackAvater);

//组合用户头像和商品信息,一并返回
return Single.zip(fillInfoFlow, userAvaterFlow,(info,avater) -> fillResult(info,avater))
.timeout(300, TimeUnit.MILLISECONDS)
.onErrorReturn(t -> errorResult)
.blockingGet(); //最后阻塞式的返回

​

可以看到,通过引入RxJava,对于超时控制、兜底策略、请求回调、结果组合都能更方便的支持。

5.2 Scheduler线程池

RxJava2 内置多个 Scheduler 的实现,但是我们建议使用Schedulers.from(executor)指定线程池,这样可以避免使用框架提供的默认公共线程池,防止单个长尾任务block其他线程执行,或者创建了过多的线程导致OOM。

5.3 CompletableFuture

当我们的逻辑比较简单,只想异步调用一两个RPC服务的时,完全可以考虑使用Java8提供的CompletableFuture实现,它相较于Future是异步执行的,也可以实现简单的组合逻辑。

5.4 并发

单个Observable始终是顺序执行的,不允许并发地调用onNext()。

code5.1

1
2
3
4
scss复制代码Observable.create(emitter->{
new Thread(()->emitter.onNext("a1")).start();
new Thread(()->emitter.onNext("a2")).start();
})

但是,每个Observable可以独立的并发执行。

code5.2

1
2
3
ini复制代码Observable ob1 = Observable.create(e->new Thread(()->e.onNext("a1")).start());
Observable ob2 = Observable.create(e->new Thread(()->e.onNext("a2")).start());
Observable ob3 = Observable.merge(ob1,ob2);

​

ob3中组合了ob1和ob2两个流,每个流是独立的。(这里需要注意,这两个流能并发执行,还有一个条件是他们的发送代码运行在不同线程,就如果code3.1和code3.2中的示例一样,虽然两个流是独立的,但是如果不提交到不同的线程中,还是顺序执行的)。

​

5.5 背压

在 RxJava 2.x 中,只有 Flowable 类型支持背压。当然,Observable 能解决的问题,对于 Flowable 也都能解决。但是,其为了支持背压而新增的额外逻辑导致 Flowable 运行性能要比 Observable 慢得多,因此,只有在需要处理背压场景时,才建议使用 Flowable。如果能够确定上下游在同一个线程中工作,或者上下游工作在不同的线程中,而下游处理数据的速度高于上游发射数据的速度,则不会产生背压问题,就没有必要使用Flowable。关于Flowable的使用,由于篇幅原因,就不在本文阐述。

​

5.6 超时

强烈建议设置异步调用的超时时间,用timeout和onErrorReturn方法设置超时的兜底逻辑,否则这个请求将一直占用一个Observable线程,当大量请求到来时,也会导致OOM。

​

6.结语

目前,闲鱼的多个业务场景都采用RxJava做异步化,大大降低了开发同学的异步开发成本。同时在多请求响应组合、并发处理都有很好的性能表现。自带的超时逻辑和兜底策略,在批量业务数据处理中能保证可靠性,是用户流畅体验的强力支撑。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…659660661…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%