开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

网络基础:HTTP 服务介绍

发表于 2021-11-16

「这是我参与11月更文挑战的第15天,活动详情查看:2021最后一次更文挑战」

你好,我是看山。

HTTP(Hyper Text Transfer Protocol)即超文本传输协议,采用请求/响应模型,是目前互联网使用最为广泛的一种网络协议。主要的过程:客户端向服务器发送一个请求,请求的请求头包含请求的方法、URI、协议版本、请求修饰符、客户信息、以及请求的内容等信息;服务器以一个状态行作为响应,包括消息协议的版本、成功或者错误编码、服务器信息、实体元信息以及实体内容。http 服务默认端口是 80,https 默认端口是 443。下图为 HTTP 服务简单的处理图。

图片

http 服务处理图

本着实用主义,本文对 http 协议的原理不做过多解释,只是分享一下日常使用 Chrome 调试 http 服务的经验,希望能得到同行们的指点。

在开发 http 服务过程中,如果是编写服务端的,经常会是没有界面可以看到效果,只能通过一些小工具调用服务端服务,查看返回结果。如果一切顺利,自不必说,但是如果遇到一些很奇怪的错误,无法对服务端进行 debug,就需要对 http 请求及响应进行跟踪,来了解客户端请求方式、传递什么类型、什么格式的什么数据,然后是服务端响应的结果等。通过这一系列的参数来诊断服务的健康状况,并进行错误排查。经常使用 Chrome,所以这里就只列出使用 Chrome 调试的方式,其他的浏览器,比如 Firefox、IE 等也有类似的功能,大家可以自由选择自己熟悉的。

图片

Chrome 的 Network 选项卡

上图是在 Chrome 浏览器中开发者工具(工具->开发者工具,或 F12 唤醒)。图中上半部分是当前页面各个请求的 time line,可以查看每个请求的请求时间及时长,用于优化请求时间过长的请求。下半部分的左侧,是请求列表,通过点击左侧的请求列表,可以在右侧查看当前请求的各项参数。可以看到,每次请求有 Headers、Preview、Response、Cookier、Timing 几个选项卡,下面将一一介绍。

Headers

这里就是所谓的请求头,包括:general header、request header、response header、entity header 几部分。

General

  • Request URL: 客户端的请求地址,对应服务端的服务
  • Request Method: 请求类型,主要有 get、post、put、delete 等一系列,最常用的还是 get、post
  • Status Code: 响应状态码,可以从 这里 查看常用的响应状态码及对应解释
  • Remote Address: 域名对应的 ip 及端口,这个直接显示当前请求是请求的那个 ip 的服务,用于定位异常服务很有用

Response Headers

  • cache-control: 请求和响应遵循的缓存机制,当前请求的 Cache-Control 不会影响另一个请求的缓存处理。private(默认)、no-cache、must-revalidate、max-age。
  • content-encoding: 服务器响应结果的编码类型(主要是压缩类型)identity、gzip、compress
  • content-type: 服务器响应结果格式/类型,比如 text/html; charset=utf-8
  • content-language: 响应体的语言
  • content-length: 响应体的长度
  • date: 消息发出时间 (GMT)
  • expires: 响应过期时间
  • status: http 响应码
  • vary: 提示使用缓存响应还是从原始服务器请求,即当缓存中存在一个未过期的响应是否能被后续的请求服用,Accept-Encoding、User-Agent。如果 vary 的值中返回了 User-Agent,那么通过不同的浏览器打开相同的页面都会重新请求服务器;如果 Vary 中没返回 User-Agent,那么客户端缓存把它看成是相同的页面,相同的请求,直接给用户返回缓存的内容;如果返回的值是 Accept-Encoding,将请求头信息中的 Accept-encoding 字段的值(gzip 等)作为缓存的 key; 如果 vary 的值为*表示缓存不会去做判断;
  • transfer-encoding: 文件传输编码 chunked 标识传输内容长度不确定,如果以 gzip 方式输出时,就不必申请一个很大的字节数组了,可以一块一块的输出,更科学,占用资源更少。

Request Headers

  • Cache-Control: 请求和响应遵循的缓存机制,当前请求的 Cache-Control 不会影响另一个请求的缓存处理。private(默认)、no-cache、must-revalidate、max-age。
  • Accept: 客户端/发送端能够接收的数据类型,包括 text/html、application/xhtml+xml、application/xml 等
  • Accept-Encoding: 浏览器发给服务器的声明可以支持的编码类型(主要是压缩类型)identity、gzip、compress
  • Accept-Language: 浏览器可接收的语言,在国内大部分应该是 zh-CN,zh;q=0.8
  • Connection: 是否保持与服务器的 tcp 长连接。keep-alive(默认)、close。keep-alive 代表服务会保留当前连接一段时间被其他请求重复使用;close 代表请求之后关闭连接。
  • Referer: 当前请求的来源
  • Upgrade-Insecure-Requests:1
  • Host: 请求的服务器域名
  • User-Agent: 发出请求的客户端信息,比如 Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/49.0.2623.108 Chrome/49.0.2623.108 Safari/537.36

Query String Parameter

这个模块是 get 请求才会出现,是请求参数,比如:

1
2
3
4
5
6
7
makefile复制代码soc-app:162
cid:0
soc-platform:1
ozv:es_oz_20160428.15_p0
f.sid:132385467
_reqid:142769
rt:j

Form Data

这个模块是有 form 表单提交时候出现,比如:

1
2
3
perl复制代码f.req:%5B%5B%22OGB%22%2C%5B7%2C%22en%22%5D%5D%2C%5B%5B%5D%2C%5B1%5D%2C%5B3%5D%2C100%2C%22OGB%22%5D%2C%5B1%5D%5D
at:AObGSAhUe1EYP164z9Z6L0cy8oPPAm_ezw%3A1462071694241
:

Preview

显示请求响应后的预览。

Response

显示响应的具体内容。

Cookie

以 key-value 形式展示客户端所有的 Cookie 信息。

Timing

展示的是从请求开始到响应结束整个过程每个阶段经历的时间或者说耗时。

写在最后

使用 Chrome 可以调试大多数 http 服务,但是有时候调试 post、put、delete 等方式的请求时,又没有界面,可以使用一些插件或工具来实现。在 Chrome 中可以使用 Postman 或 DHC 来实现这些功能,非常好用。下图是 Postman 的截图:

图片


你好,我是看山。游于码界,戏享人生。如果文章对您有帮助,请点赞、收藏、关注。欢迎关注公众号「看山的小屋」,发现不一样的世界。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

服务注册与发现 上手实践Spring Cloud Eur

发表于 2021-11-16

这是我参与11月更文挑战的第9天,活动详情查看:2021最后一次更文挑战

上一篇文章《上手实践Spring Cloud Eureka 和 Feign(一)》讲到了服务注册与发现的简单概念,这篇文章直接上手实践Eureka,启动一个服务注册中心,以及将业务服务注册到这个中心上。

二、Eureka

Eureka作为Spring Cloud的服务注册中心,我们需要引入Spring Cloud相关的依赖,其次我们需要启动一个Eureka注册服务中心,最后为了验证服务注册的功能,需要启动一个web 服务注册到注册中心。

2.1 Eureka的Spring Cloud 依赖

首先,我们将创建一个新的 Maven pom 项目模块,并将Spring Cloud的的依赖放入,这个依赖将决定Spring Cloud 相关组件Starter的版本。

1
2
3
4
5
6
7
8
9
10
11
maven复制代码<properties>
<version.spring-cloud>2020.0.1</version.spring-cloud>
</properties>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${version.spring-cloud}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

Spring Cloud 依赖的版本我用的比较新,根据实际工程选择合适的即可。

2.2 注册中心

我们新建一个子模块管理Eureka的注册中心服务,在这里要引入Eureka的Server依赖包。

1
2
3
4
maven复制代码<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>

然后写一个启动类Launcher,目的是把服务启动起来,包含的内容如下。

1
2
3
4
5
6
7
8
java复制代码@SpringBootApplication
@EnableEurekaServer
public class Launcher {

public static void main(String[] args) {
SpringApplication.run(Launcher.class,args);
}
}

application.yml的配置信息如下。

1
2
3
properties复制代码server.port=8761
eureka.client.register-with-eureka=false
eureka.client.fetch-registry=false

此时启动服务,浏览器上访问URLhttp://{yourhost}:8761,如下图所示,表示服务注册中心启动成功,能提供服务注册和发现的功能。

screenshot-127-0-0-1-8761-1636959093343.png

2.3 服务提供方

在有了服务注册中心后,我们可以启动一个服务去注册到服务注册中心,这些服务都统统成为Eureka Client,所以要引入Client相关的依赖。

1
2
3
4
maven复制代码<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

我们提供一个简单的greeting api接口。

1
2
3
4
5
java复制代码public interface GreetingController {

@GetMapping("/greeting")
String greeting();
}

然后在启动类Launcher去实现这个接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码@SpringBootApplication
@RestController
public class Launcher implements GreetingController {

@Autowired
@Lazy
private EurekaClient eurekaClient;

@Value("${spring.application.name}")
private String appName;

@Override
public String greeting() {
return String.format(
"Hello from '%s'!", eurekaClient.getApplication(appName).getName());
}

public static void main(String[] args) {
SpringApplication.run(Launcher.class,args);
}
}

配置文件application.yml的内容如下。

1
2
3
4
5
6
7
8
9
10
11
12
yml复制代码spring:
application:
name: spring-cloud-eureka-client

server:
port: 0
eureka:
client:
serviceUrl:
defaultZone: ${EUREKA_URI:http://localhost:8761/eureka}
instance:
preferIpAddress: true

然后启动这个服务,观察Eureka的服务管理页面,可以看到有服务注册成功。

screenshot-127-0-0-1-8761-1636959488655.png
此时,服务的提供方也准备好了,接下来就需要去消费这个服务了。


少年,没看够?点击石头的主页,随便点点看看,说不定有惊喜呢?欢迎支持点赞/关注/评论,有你们的支持是我更文最大的动力,多谢啦!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

如果非要在多线程中使用ArrayList会发生什么?

发表于 2021-11-16

「这是我参与11月更文挑战的第15天,活动详情查看:2021最后一次更文挑战」

本文被《从小工到专家的 Java 进阶之旅》收录。

你好,我是看山。

我们都知道,Java中的ArrayList是非线程安全的,这个知识点太熟了,甚至面试的时候都很少问了。

但是我们真的清楚原理吗?或者知道多线程情况下使用ArrayList会发生什么?

前段时间,我们就踩坑了,而且直接踩了两个坑,今天就来扒一扒。

翠花,上源码

上代码之前先说下ArrayList的add逻辑:

  1. 检查队列中数组是否还没有添加过元素
  2. 如果是,设置当前需要长度为10,如果否,设置当前需要长度为当前队列长度+1
  3. 判断需要长度是否大于数组大小
  4. 如果是,需要扩容,将数组长度扩容1.5倍(第一次扩容会从0直接到10,后续会按照1.5倍的步幅增长)
  5. 数组中添加元素,队列长度+1

附上代码,有兴趣的可以在看看源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
java复制代码/**
 * Appends the specified element to the end of this list.
 *
 * @param e element to be appended to this list
 * @return <tt>true</tt> (as specified by {@link Collection#add})
 */
public boolean add(E e) {
    // 判断数组容量是否足够,如果不足,增加1.5倍,size是当前队列长度
    ensureCapacityInternal(size + 1);  // Increments modCount!!
    // 给下标为size的赋值,同时队列长度+1,下标从0开始
    elementData[size++] = e;
    return true;
}

private void ensureCapacityInternal(int minCapacity) {
    ensureExplicitCapacity(calculateCapacity(elementData, minCapacity));
}

private static int calculateCapacity(Object[] elementData, int minCapacity) {
    // 判断是否首次添加元素,如果是,返回默认队列长度,现在是10
    if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
        return Math.max(DEFAULT_CAPACITY, minCapacity);
    }
    // 如果不是首次添加元素,就返回当前队列长度+1
    return minCapacity;
}

private void ensureExplicitCapacity(int minCapacity) {
    modCount++;

    // overflow-conscious code
    // 如果需要的长度大于队列中数组长度,扩容,如果可以满足需求,就不用扩容
    if (minCapacity - elementData.length > 0)
        grow(minCapacity);
}

/**
 * Increases the capacity to ensure that it can hold at least the
 * number of elements specified by the minimum capacity argument.
 *
 * @param minCapacity the desired minimum capacity
 */
private void grow(int minCapacity) {
    // overflow-conscious code
    int oldCapacity = elementData.length;
    // 这里就是扩容1.5倍的代码
    int newCapacity = oldCapacity + (oldCapacity >> 1);
    if (newCapacity - minCapacity < 0)
        newCapacity = minCapacity;
    if (newCapacity - MAX_ARRAY_SIZE > 0)
        newCapacity = hugeCapacity(minCapacity);
    // minCapacity is usually close to size, so this is a win:
    elementData = Arrays.copyOf(elementData, newCapacity);
}

就是这么不安全

从上面代码可以看出,ArrayList中一丁点考虑多线程的元素都没有,完全的效率优先。

奇怪的ArrayIndexOutOfBoundsException

先做一个假设,此时数组长度达到临界边缘,比如目前容量是10,现在已经有9个元素,也就是size=9,然后有两个线程同时向队列中增加元素:

  1. 线程1开始进入add方法,获取size=9,调用ensureCapacityInternal方法进行容量判断,此时数组容量是10,不需要扩容
  2. 线程2也进入add方法,获取size=9,调用ensureCapacityInternal方法进行容量判断,此时数组容量还是10,也不需要扩容
  3. 线程1开始赋值值了,也就是elementData[size++] = e,此时size变成10,达到数组容量极限
  4. 线程2此次开始执行赋值操作,使用的size=10,也就是elementData[10] = e,因为下标从0开始,目前数组容量是10,直接报数组越界ArrayIndexOutOfBoundsException。

仅仅差了一步,线程2就成为了抛异常的凶手。但是抛出异常还是好的,因为我们知道出错了,可以沿着异常

诡异的null元素

这种情况不太容易从代码中发现,得对代码稍加改造,elementData[size++] = e这块代码其实执行了两步:

1
2
java复制代码elementData[size] = e;
size++;

假设还是有两个线程要赋值,此时数组长度还比较富裕,比如数组长度是10,目前size=5:

  1. 线程1开始进入add方法,获取size=5,调用ensureCapacityInternal方法进行容量判断,此时数组容量是10,不需要扩容
  2. 线程2也进入add方法,获取size=5,调用ensureCapacityInternal方法进行容量判断,此时数组容量还是10,也不需要扩容
  3. 线程1开始赋值,执行elementData[size] = e,此时size=5,在执行size++之前,线程2开始赋值了
  4. 线程2开始赋值,执行elementData[size] = e,此时size还是5,所以线程2把线程1赋的值覆盖了
  5. 线程1开始执行size++,此时size=6
  6. 线程2开始执行size++,此时size=7

也就是说,添加了2个元素,队列长度+2,但是真正加入队列的元素只有1个,有一个被覆盖了。

这种情况不会立马报错,排查起来就很麻烦了。而且随着JDK 8的普及,可能随手使用filter过滤空元素,这样就不会立马出错,直到出现业务异常之后才能发现,到那时,错误现场已经不见了,排查起来一头雾水。

有同学会问,源码中是elementData[size++] = e,是一行操作,为什么会拆成两步执行呢?其实这得从JVM字节码说起了。

通过JVM字节码说说第二种异常出现的原因

先来一段简单的代码:

1
2
3
4
5
6
7
java复制代码public class Main {
    public static void main(String[] args) {
        int[] nums = new int[3];
        int index = 0;
        nums[index++] = 5;
    }
}

通过javac Main.java和javap -v -l Main.class组合操作得到字节码:

下面那些中文是我后加的备注,备注中还列出了局部变量表和栈值的变化,需要有点耐心。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
less复制代码public class Main
minor version: 0
major version: 52
flags: ACC_PUBLIC, ACC_SUPER
Constant pool:
#1 = Methodref #3.#12 // java/lang/Object."<init>":()V
#2 = Class #13 // Main
#3 = Class #14 // java/lang/Object
#4 = Utf8 <init>
#5 = Utf8 ()V
#6 = Utf8 Code
#7 = Utf8 LineNumberTable
#8 = Utf8 main
#9 = Utf8 ([Ljava/lang/String;)V
#10 = Utf8 SourceFile
#11 = Utf8 Main.java
#12 = NameAndType #4:#5 // "<init>":()V
#13 = Utf8 Main
#14 = Utf8 java/lang/Object
{
public Main();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=1, locals=1, args_size=1
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: return
LineNumberTable:
line 1: 0

public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=3, locals=3, args_size=1 局部变量表 栈
0: iconst_3 // 将int型(3)推送至栈顶 args 3
1: newarray int // 创建一个指定原始类型(如int, float, char…)的数组,并将其引用值压入栈顶 args 数组引用
3: astore_1 // 将栈顶引用型数值存入第二个本地变量 args, nums=数组引用 null
4: iconst_0 // 将int型(0)推送至栈顶 args, nums=数组引用 0
5: istore_2 // 将栈顶int型数值存入第三个本地变量 args, nums=数组引用, index=0 null
6: aload_1 // 将第二个引用类型本地变量推送至栈顶 args, nums=数组引用, index=0 数组引用
7: iload_2 // 将第三个int型本地变量推送至栈顶 args, nums=数组引用, index=0 0, 数组引用
8: iinc 2, 1 // 将指定int型变量增加指定值(i++, i--, i+=2),也就是第三个本地变量增加1 args, nums=数组引用, index=1 0, 数组引用
11: iconst_5 // 将int型(5)推送至栈顶 args, nums=数组引用, index=1 5, 0, 数组引用
12: iastore // 将栈顶int型数值存入指定数组的指定索引位置 args, nums=数组引用, index=1 null
13: return // 从当前方法返回void
LineNumberTable:
line 3: 0 // int[] nums = new int[3];
line 4: 4 // int index = 0;
line 5: 6 // nums[index++] = 5;
line 6: 13 // 方法结尾默认的return
}

从上面的字节码可以看到,nums[index++] = 5这一句会被转为5个指令,是从6到12。大体操作如下:

  1. 将数组、下标压入栈
  2. 给下标加值
  3. 将新值压入栈
  4. 取栈顶三个元素开始给元素指定下标赋值

也即是说,错误出在数组赋值操作时先将数组引用和下标同时压入栈顶,与下标赋值是两步,在多线程环境中,就有可能出现上面说到的null值存在。

解法

其实解法也很简单,就是要意识到多线程环境,然后不使用ArrayList。可以使用Collections.synchronizedList()返回的同步队列,也可以使用CopyOnWriteArrayList这个队列,或者自己扩展ArrayList,将add方法做成同步方法。

文末总结

ArrayList整个类的操作都是非线程安全的,一旦在多线程环境中使用,就可能会出现问题。上面提到add操作就会有两种异常行为,一个是数组越界异常,一个是出现丢数且出现空值。这还只是最简单的add操作,如果add、addAll和get混合使用使用时,异常情况就更多了。所以,使用的时候一定要注意是不是单线程操作,如果不是,果断使用其他队列防雷。

推荐阅读

  • JDK中居然也有反模式接口常量
  • java import 导入包时,我们需要注意什么呢?
  • Java 并发基础(一):synchronized 锁同步
  • Java 并发基础(二):主线程等待子线程结束
  • Java 并发基础(三):再谈 CountDownLatch
  • Java 并发基础(四):再谈 CyclicBarrier
  • Java 并发基础(五):面试实战之多线程顺序打印
  • 如果非要在多线程中使用ArrayList会发生什么?
  • 重新认识 Java 中的队列
  • Java 中 Vector 和 SynchronizedList 的区别
  • 如果非要在多线程中使用 ArrayList 会发生什么?(第二篇)
  • 一文掌握 Java8 Stream 中 Collectors 的 24 个操作
  • 一文掌握 Java8 的 Optional 的 6 种操作

你好,我是看山。游于码界,戏享人生。如果文章对您有帮助,请点赞、收藏、关注。欢迎关注公众号「看山的小屋」,发现不一样的世界。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Redis(十三)redis事务

发表于 2021-11-16

这是我参与11月更文挑战的第16天,活动详情查看:2021最后一次更文挑战

Redis作为一个非关系型数据库,其也是有事务操作的。

Redis 事务可以一次执行多个命令, 并且带有以下三个重要的保证:

(1)批量操作在发送 EXEC 命令前被放入队列缓存。

(2)收到 EXEC 命令后进入事务执行,事务中任意命令执行失败,其余的命令依然被执行。

(3)在事务执行过程,其他客户端提交的命令请求不会插入到事务执行命令序列中。

一个事务从开始到执行会经历以下三个阶段:

(1)开始事务。

(2)命令入队。

(3)执行事务。

但是redis的事务和mysql的事务还是有很大区别的,mysql执行过程中出现错误,数据会回滚,而redis事务执行过程中失败则不会回滚。

Redis的事务可以看做是一个批量执行命令的一个脚本。

这里关注的不是事务执行过程中每条命令是否都执行成功,而是,执行当前的事务是否成功。

一:linux命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
bash复制代码// 开始事务
127.0.0.1:6379> multi
OK
// 设置健值
127.0.0.1:6379> set a aaaa
QUEUED
// 设置健值
127.0.0.1:6379> set b bbbb
QUEUED
// 设置健值
127.0.0.1:6379> set c cccc
QUEUED
// 执行事务,其实redis的事务就是一个批处理命令的一个脚本
127.0.0.1:6379> exec
1) OK
2) OK
3) OK

二:PHP命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bash复制代码// 开启事务
$res = $redis->multi();

$res = $redis->set('a','aaa');
var_dump($res);
echo "<hr>";

$res = $redis->set('b','bbb');
var_dump($res);
echo "<hr>";

$res = $redis->set('c','bbb');
var_dump($res);
echo "<hr>";

// 执行事务
$res = $redis->exec();

var_dump($res);

以上只是对redis事务简单的应用,大概就是这样,主要应用其是一个批处理的命令。

三:基于redis事务的乐观锁实现

解释:乐观锁(Optimistic Lock), 顾名思义,就是很乐观。

每次去拿数据的时候都认为别人不会修改,所以不会上锁。

watch命令会监视给定的key,当exec时候如果监视的key从调用watch后发生过变化,则整个事务会失败。

也可以调用watch多次监视多个key。这样就可以对指定的key加乐观锁了。

注意watch的key是对整个连接有效的,事务也一样。

如果连接断开,监视和事务都会被自动清除。

当然了exec,discard,unwatch命令都会清除连接中的所有监视。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
bash复制代码// 监视 count 值
$redis->watch("count");
// 开启事务
$redis->multi();
// 操作count
$time = time();
$redis->set("count", $time);

// 模拟并发下其他进程进行set count操作 请执行下面操作
// 在shell命令行中执行 set count issimulate;

sleep(40);
// 提交事务
$res = $redis->exec();

$result = $redis->get('count');

if ($res) {
// 成功...
echo "success:" . $result ;
echo "<hr>";
return;
}
// 失败...
echo "fail:" . $result ;
echo "<hr>";

这段测试代码可能要解释一下:

首先我们先监控键count,开启事务之后,将count键的值设置成当前时间戳,程序睡眠40秒,再执行事务操作。这是在理想情况下,就是上边这段代码在没有任何外力影响的情况下的执行事务成功,输出

1
bash复制代码Success:1502547852

上面是事务执行成功的结果,我们现在来模拟事务执行失败的情况,很简单,还是上边那段代码,在浏览器中执行之后,程序会睡眠40秒,我们在这个时间里模拟另一个进程修改count键的值,在shell中执行set count issimulate;,watch监控的健的值在事务执行过程中被另一进程改变,则此次事务执行失败。输出

1
bash复制代码fail:issimulate

其实回过头来看,当不同进程修改同一个值得时候,使用事务,就相当于给其加了一个乐观锁。

四:redis事务命令

序号

命令及描述

1

1
2
bash复制代码DISCARD 
取消事务,放弃执行事务块内的所有命令。

2

1
2
bash复制代码EXEC 
执行所有事务块内的命令。

3

1
2
bash复制代码MULTI 
标记一个事务块的开始。

4

1
2
bash复制代码UNWATCH 
取消 WATCH 命令对所有 key 的监视。

5

1
2
bash复制代码WATCH   key [key ...] 
监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。

有好的建议,请在下方输入你的评论。

欢迎访问个人博客
guanchao.site

欢迎访问小程序:

在这里插入图片描述

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

虎嗅 24 小时点赞器,一个案例附带一个爬虫技巧,Pytho

发表于 2021-11-16

「这是我参与11月更文挑战的第16天,活动详情查看:2021最后一次更文挑战」

很多平台都有点赞功能,今天提供的这个思路可用于很多平台,希望可以掌握该技巧,实现你自己的点赞器。本案例目标为虎嗅 24 小时频道点赞。
虎嗅 24 小时点赞器,可扩展到多平台,Python 爬虫小课 7-9

爬取前的分析

分析热点数据来源

该案例在分析阶段需要费一些时间,过程中有那么一点点的难度,理解之后,你将学到一个非常常用的爬虫编写技巧。

拖动浏览器滚动条到页面最底部,捕获请求:https://moment-api.huxiu.com/web-v2/moment/feed 该内容的请求数据与返回数据分别如下图所示。

请求方式为POST。

虎嗅 24 小时点赞器,可扩展到多平台,Python 爬虫小课 7-9

请求数据为三个值,后面两个应为固定值,第一个 last_dateline 需要找到参数值来源才可以解开本案例。

虎嗅 24 小时点赞器,可扩展到多平台,Python 爬虫小课 7-9

返回的数据嵌套的层级比较多,看起来复杂,不过数据格式为 JSON 格式,正常解析即可。

虎嗅 24 小时点赞器,可扩展到多平台,Python 爬虫小课 7-9

上述代码还有一个重点内容,就是返回的数据中,其实提供了参数 last_dateline 的值,那这样就可以通过一个死循环一直查找下去,直到数据获取失败结束循环。

到此,数据获取分析完毕,得到结论如下:

  • 请求方式为 POST;
  • 请求地址为 moment-api.huxiu.com/web-v2/mome…
  • 请求参数为 last_dateline,platform, is_ai;
  • 第一次请求可以先固定一个 last_dateline 的值;
  • 通过循环获取所有的 热点信息。

分析点赞接口

点击点赞按钮抓取到的链接为 https://moment-api.huxiu.com/web/moment/agree,该链接为点赞请求地址。

请求方式与地址如下,依旧为 POST 请求方式:

虎嗅 24 小时点赞器,可扩展到多平台,Python 爬虫小课 7-9

请求数据如下图所示:

虎嗅 24 小时点赞器,可扩展到多平台,Python 爬虫小课 7-9

参数中 moment_id 应该为上文获取到的热点 monent_id,具体位置如图:

虎嗅 24 小时点赞器,可扩展到多平台,Python 爬虫小课 7-9

请求成功之后返回的数据格式如下图所示。

虎嗅 24 小时点赞器,可扩展到多平台,Python 爬虫小课 7-9

到此,点赞接口分析完毕,得到结论如下:

  • 请求方式为 POST;
  • 请求地址为 moment-api.huxiu.com/web/moment/…
  • 请求参数为 monent_id,platform;
  • 测试代码时可以先发起一个请求,monent_id 的值固定一个值即可。

爬虫编写时间

requests post 请求介绍

下面进入到爬虫的具体编写过程中,首先了解下 requests 库如何发起 POST 请求吧。

POST 请求在 requests 中存在几种用法,分别如下。

普通的 post 使用最多
实现方式,就是简单地传递一个字典给 data 参数即可。

1
2
3
4
5
6
7
8
python复制代码import requests
# 该网址专门用于测试使用
url = 'http://httpbin.org/post'
data = {'key1':'value1','key2':'value2'}
r =requests.post(url,data)
print(r)
print(r.text)
print(r.content)

注意返回数据中存在 <Response [200]> 即表示成功。

data 参数传入一个元组列表

如果待提交的表单中多个元素使用同一 key 的时候,这种方式尤其有效。

1
2
3
4
5
6
7
python复制代码import requests
# 该网址专门用于测试使用
url = 'http://httpbin.org/post'
payload = (('key1', 'value1'), ('key1', 'value2'))
r = requests.post('http://httpbin.org/post', data=payload)

print(r.text)

传递一个 JSON 字符串

发送的数据并非编码为表单形式,需要传递一个字符串,而不是一个字典。

1
2
3
4
5
6
7
8
9
10
python复制代码import requests
import json

# 该网址专门用于测试使用
url_json = 'http://httpbin.org/post'
# dumps:将python对象解码为json数据
data_json = json.dumps({'key1': 'value1', 'key2': 'value2'})
r_json = requests.post(url_json, data_json)

print(r_json.text)

传递文件

该内容就有点超出爬虫小课大纲了,暂时忽略。

注意到,POST 和 GET 的区别其实就是 data 这个参数的区别,下面就可以实际编码了。

获取待点赞数据

首先获取 一页待点赞的数据,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
python复制代码def get_feed():
# 拼接数据字典
data = {
"last_dateline": "1605591900",
"platform": "www",
"is_ai": 0
}
r = requests.post(
"https://moment-api.huxiu.com/web-v2/moment/feed", data=data)
data = r.json()
# data["success"]

# 获取数据,注意嵌套的层次有点多,慢慢对应即可
datalist = data["data"]["moment_list"]["datalist"][0]["datalist"]
print(datalist)

实验中发现其实第一次获取可以将 last_dateline 置为空也可。后续循环的代码部分留给大家,爬虫小课已经接近尾声,剩下的你需要自己在扩展一部分 Python 基础语法。

点赞代码编写

点赞代码编写的过程中碰到了一点点小问题,该问题涉及 cookies 这个老大难问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
python复制代码import requests
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36"
}

def agree(moment_id):

data = {
"moment_id": moment_id,
"platform": "www",
}

r = requests.post(
"https://moment-api.huxiu.com/web/moment/agree", data=data, headers=headers)
res_data = r.json()
print(res_data)
# print(res_data["success"] == "true")

if __name__ == "__main__":
agree(130725)

上述代码在运行过程中发现,得到的请求结果是:

1
python复制代码{'success': False, 'message': '请开启COOKIE'}

再次核对开发者工具捕获的请求,确实发现一个独特的 cookie,具体如下:

虎嗅 24 小时点赞器,可扩展到多平台,Python 爬虫小课 7-9

一个 huxiu_analyzer_wcy_id 参数,看来问题出现在这里。这个值的获取就变得比较麻烦了,需要想办法捕获到。

这个地方橡皮擦是按照下述步骤解决的,希望你可以学会该类型问题或者说常见的找 cookie 的办法。

打开谷歌浏览器无痕模式,输入 https://www.huxiu.com/moment/ 直接进行跳转,打开无痕模式的原因是因为需要确保 cookie 没有被缓存下来。

接下来在开发者工具中直接检索 huxiu_analyzer_wcy_id 参数。

得到如下内容:

虎嗅 24 小时点赞器,可扩展到多平台,Python 爬虫小课 7-9

这里就发现了该 cookie 设置的地方,发现该 cookie 是由 checklogin 这个接口响应并设置的。

虎嗅 24 小时点赞器,可扩展到多平台,Python 爬虫小课 7-9

接下来的解决办法就非常粗暴了,看一下 checklogin 是如何调用的,然后请求一下,保存它响应的 cookie 值,然后在去请求点赞接口。

最终修改点赞代码,完成任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
python复制代码def agree(moment_id):
s = requests.Session()
r = s.get("https://www-api.huxiu.com/v1/user/checklogin", headers=headers)

jar = r.cookies
print(jar.items())
data = {
"moment_id": moment_id,
"platform": "www",
}

r = s.post(
"https://moment-api.huxiu.com/web/moment/agree", data=data, headers=headers, cookies=jar)
res_data = r.json()
print(res_data)
# print(res_data["success"] == "true")

运行上述代码得到正确的点赞成功说明。

1
python复制代码{'success': True, 'message': '点赞成功'}

最后在啰嗦两句

本篇博客主要用于介绍 requests 库的 post 请求方式,顺带着给大家写了一下 cookie 的一般获取方式,这里要提醒一句在爬虫编写的过程中,开发者工具中的查找是经常用到的,尤其是在解决 JS 加密问题的时候。

虎嗅 24 小时点赞器,可扩展到多平台,Python 爬虫小课 7-9

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

你不知道的开源分布式存储系统 Alluxio 源码完整解析(

发表于 2021-11-16

一、前言

目前数据湖已成为大数据领域的最新热门话题之一,而什么是数据湖,每家数据平台和云厂商都有自己的解读。整体来看,数据湖主要的能力优势是:集中式存储原始的、海量的、多来源的、多类型的数据,支持数据的快速加工及计算。相比于传统的数据仓库,数据湖对数据有更大的包容性,支持结构化/半结构化/非结构化数据,能快速进行数据的落地和数据价值发掘。数据湖的技术体系可以分为三个子领域:数据湖存储、数据湖计算、数据湖统一元数据。

数据湖存储提供海量异构数据的存储能力,支持多类型的底层存储系统,如分布式存储 HDFS、对象存储 AWS S3、腾讯云对象存储 COS 等,除此之外,在数据湖场景中计算和存储分离,使得计算的数据本地性不复存在。因此有必要在数据湖存储和计算之间引入统一的数据缓存层。

Alluxio 是一款基于云原生开源的数据编排技术,为数据计算与数据存储构建了桥梁,支持将数据从原始存储层移动到加速计算的虚拟分布式存储系统。Alluxio 可为数据湖计算提供统一的数据湖存储访问入口,支持跨不同类型的底层存储并抽象出统一的数据访问命名空间,提供数据本地性、数据可访问性、数据伸缩性。

本文将对 Alluxio 底层源码进行简要分析,分上下两篇:主要包括本地环境搭建,源码项目结构,服务进程的启动流程,服务间 RPC 调用,Alluxio 中重点类详解,Alluxio 中 Block 底层读写流程,Alluxio Client 调用流程和 Alluxio 内置的轻量级调度框架。

二、环境准备

2.1. 本地部署

从官方下载安装版本(下载地址),以 2.6.0 安装为例,下载后解压安装包:

tar -zxvf alluxio-2.6.0-bin.tar.gz

修改基本的配置文件,(1). 修改 alluxio-site.properties,设置 master 地址,设置默认 Alluxio root 挂载点

1
bash复制代码cp conf/alluxio-site.properties.template  alluxio-site.properties#放开注释:alluxio.master.hostname=127.0.0.1alluxio.master.mount.table.root.ufs=${alluxio.work.dir}/underFSStorage

(2). 修改 masters、workers 配置对应 ip,本地安装,可都设置为 127.0.0.1

1
bash复制代码vi conf/mastersvi conf/workers

修改完配置后,准备启动 Alluxio 服务,执行如下命令操作:

1
shell复制代码# mount对应磁盘bin/alluxio-mount.sh Mount workers# 进行环境校验bin/alluxio validateEnv masterbin/alluxio validateEnv worker

服务启动命令操作,对于所有服务操作包括:master、worker、job_master、job_worker、proxy

1
css复制代码# 启动所有服务bin/alluxio-start.sh all# 停止所有服务bin/alluxio-stop.sh all# 启动单个服务bin/alluxio-start.sh -a masterbin/alluxio-start.sh -a workerbin/alluxio-start.sh -a job_masterbin/alluxio-start.sh -a job_workerbin/alluxio-start.sh -a proxy

启动后服务成功,也可通过 JPS 查看 Java 进程:AlluxioMaster、AlluxioWorker、AlluxioJobMaster、AlluxioJobWorker、AlluxioProxy。

image.png

  • http://localhost:19999,页面查看 alluxio master ui 界面,默认端口:19999
  • http://localhost:30000,页面查看 alluxio worker ui 界面,默认端口:30000

image.png

  1. IDEA 调试

源码编译可参考官方说明文档:Building Alluxio From Source

mvn clean install -DskipTests# 加速编译mvn -T 2C clean install -DskipTests -Dmaven.javadoc.skip -Dfindbugs.skip -Dcheckstyle.skip -Dlicense.skip -Dskip.protoc

通过 IDEA 启动 Alluxio 各个服务进程,其核心启动类包括:

  • AlluxioMaster:Main 函数入口,设置启动运行 VM Options,alluxio.logger.type=MASTER_LOGGER,RPC 端口:19998,Web 端口:19999;
  • AlluxioJobMaster:Main 函数入口,设置启动运行 VM Options,alluxio.logger.type=JOB_MASTER_LOGGER
  • AlluxioWorker:Main 函数入口,设置启动运行 VM Options,alluxio.logger.type=WORKER_LOGGER,
  • AlluxioJobWorker:Main 函数入口,设置启动运行 VM Options,alluxio.logger.type=JOB_WORKER_LOGGER
  • AlluxioProxy:Main 函数入口,设置启动运行 VM Options,alluxio.logger.type=PROXY_LOGGER

VM Options 参数示例如下:

-Dalluxio.home=/code/git/java/alluxio -Dalluxio.conf.dir=/code/git/java/alluxio/conf -Dalluxio.logs.dir=/code/git/java/alluxio/logs -Dlog4j.configuration=file:/code/git/java/alluxio/conf/log4j.properties -Dorg.apache.jasper.compiler.disablejsr199=true -Djava.net.preferIPv4Stack=true -Dalluxio.logger.type=MASTER_LOGGER -Xmx2g -XX:MaxDirectMemorySize=516M

操作示例如下:

image.png
在项目根目录 logs 下可查看服务启动的日志文件:

image.png

DEBUG 远程调试,在 alluxio-env.sh 配置环境变量,可增加如下配置属性
export ALLUXIO_WORKER_JAVA_OPTS="$ALLUXIO_JAVA_OPTS -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=6606"export ALLUXIO_MASTER_JAVA_OPTS="$ALLUXIO_JAVA_OPTS -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=6607"export ALLUXIO_USER_DEBUG_JAVA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=6609"

如下图所示,增加远程的监控端口,监控 Alluxio Worker 6606:

image.png

调用 Alluxio Shell 命令时开启 DEBUG 的输出,使用参数:-debug,示例如下:
bin/alluxio fs -debug ls /

三、项目结构

Alluxio 源码的项目结构可简化如下几个核心模块:

  • alluxio-core:实现 Alluxio 系统的核心模块,其中 alluxio-core-server 内实现 Alluxio Master、Alluxio Worker、Alluxio Proxy;alluxio-core-client 定义 Alluxio Clien 操作;alluxio-core-transport 实现服务间 RPC 通信;
  • alluxio-job:Alluxio 内部轻量级作业调度实现,alluxio-job-server 内实现 Alluxio Job Master、Alluxio Job Worker;
  • alluxio-underfs:适配对接不同的底层存储,如 hdfs、cephfs、local、s3 等;
  • alluxio-table:实现 Alluxio Catalog 功能,基于 table 引擎读取元数据并支持关联 Alluxio 存储,目前 catalog 的底层 UDB 支持 hive metastore 和 aws glue catalog;
  • alluxio-shell:封装 Alluxio shell 工具;

image.png

四、服务进程

Alluxio 服务内部的 5 个核心进程:AlluxioMaster、AlluxioWorker、AlluxioProxy、AlluxioJobMaster、AlluxioJobWorker 都是基于 Process(进程)接口类扩展实现的,定义组件进程的生命周期管理操作。

类图实现继承关系如下所示:

image.png

4.1 AlluxioMaster

4.1.1. 启动流程

  • 基于 JournalSystem 维护 Master 元数据持久化信息,便于服务宕机后,从最新的 Journal File 恢复,详见 Journal Management;
  • 进行 AlluxioMaster 选举,Master 选举支持两种方式:ZK、Raft(RaftJournalSystem);
  • 基于 ProcessUtils 进行进程启停管理触发,执行 AlluxioMasterProcess 启动

JournalSystem 启动/设置主要执行模式(gainPrimacy)

AlluxioMasterProcess#startMasters:启动所有 Master 相关服务,包括 block master、FileSystem master 等;若是 leader,则调用 BackupManager#initFromBackup 初始化所有注册 master server 组件,若不是 leader 则仅启动 Master 的 RPC/UI 服务

AlluxioMasterProcess#startServing:启动指标相关服务,包括 Web、JVM、RPC 相关的指标;

启动时序图简化如下所示:

image.png

4.1.2. Server 接口类

特别的,初始化并启动的 Server 接口类组件,主要包括 Master 类和 Worker 类,Server 会从线程池获取线程,启动执行各个 Server 定义的操作,server 中定义服务线程的生命周期操作,定义的接口方法如下:

  • getName:获取该 Server 名称;
  • getDependencies:该 Server 依赖的其他前置 Server;
  • getServices:获取 Server 定义的 GrpcService 集合;
  • start:Server 启动;
  • stop:Server 停止;
  • close:Server 关闭;

image.png

4.1.3. Master Server

定义 Master 组件中封装的各个线程 Server 服务,包括 Block 元数据管理,文件系统管理等,其细化类图如下所示:

image.png

4.1.3.1. DefaultFileSystemMaster

Alluxio Master 处理系统中所有文件系统元数据管理的 Server 服务,基于 DefaultFileSystemMaster 可对文件执行 Lock(加锁)操作,为了对任意 inode 进行读写操作,需要对 inode tree 中的每个独立路径进行加锁。InodeTree 对象提供加锁方法有:InodeTree#lockInodePath、InodeTree#lockFullInodePath,方法返回已被加锁处理的 LockedInodePath 路径对象。

在 DefaultFileSystemMaster 中常用的上下文对象:JournalContext, BlockDeletionContext, RpcContext;用户对文件元数据的访问(方法调用)都有一个独立的线程进行审计日志记录及管理。

备注:当获取 inode path 时,可能存在并发操作对该 path 进行写变更操作,那么读取 inode path 会抛出异常,提示 path 的数据结构已变更。

DefaultFileSystemMaster start 启动流程概述:

  • 基于 InodeTree 初始化文件系统根目录(initializeRoot)并判断是否有该文件系统权限;
  • 遍历 MountTable,初始化 MasterUfsManager 并进行文件系统挂载 Mount 操作;
  • 提交不同的 HeartbeatThread(心跳线程) 进行各个检测校验,最终调用 HeartbeatExecutor.heartbeat 方法,其心跳检测包括:

BlockIntegrityChecker:Block 完整性校验

InodeTtlChecker:File Inode TTL 生命周期校验

LostFileDetector:丢失文件探测

ReplicationChecker:副本数校验

PersistenceSchedule:持久化调度

PersistenceChecker:持久化校验

TimeSeriesRecorder:时间序列记录

UfsCleaner:UFS 清理器

image.png

附:HeartbeatExecutor 的类图概要

image.png

4.1.3.2. DefaultBlockMaster

Alluxio Master 中管理所有 Block 和 Worker 映射元数据的 Server 服务。

为保证并发请求,BlockMaster Server 使用支持并发的数据结构,每个元数据都可以进行独立的加锁操作。在 BlockMaster 中有两大类元数据:block metadata(block 块元数据),worker metadata(worker 节点元数据):

  • block metadata 加锁操作:基于 block 执行任意的 BlockStore 操作,从 mLostBlocks 中移除元素;
  • worker metadata 加锁操作:校验/更新 worker 注册状态,读/写 worker 使用率,读/写 worker 上的 block 管理;

为避免死锁操作,如果 block 和 worker 元数据需要同时加锁,worker 需要在 block 之前加锁,释放锁时则相反,block 需要在 worker 之前释放锁。

start 启动流程概述:提交 HeartbeatThread(心跳线程) 进行检测校验,提交的线程是:LostWorkerDetectionHeartbeatExecutor,对 worker 的心跳进行检测。

4.2. AlluxioWorker

4.2.1. 启动流程

  • 通过 MasterInquireClient.Factory 获取 Alluxio Master 的地址和相关配置信息;
  • 创建 AlluxioWorkerProcess 进程对象,并执行 start 方法,具体如下:

通过 WorkerRegistry 获取 Worker 上的所有 Worker Server 服务,并启动相应的 Server;

注册 WebServer Handler,并启动,包括通用指标和 Prometheus 指标;

注册 JvmPauseMonitor,采集 worker 节点相关的 JVM 监控指标信息;

  • 如果 Worker 内嵌 FUSE 服务,则启动 FuseManager

image.png

4.2.2. Worker Server

image.png

4.2.2.1. DefaultBlockWorker

负责管理 Worker 节点中最高层级的 Block 抽象操作,包括:

  • 周期性的 BlockMasterSync,将当前 Worker 节点的 Block 信息周期定时上报同步给 Master;
  • 维护当前 Worker 所有 Block 信息与底层存储操作的逻辑关系;

start 启动流程概述:通过 BlockMasterClientPool 获取 BlockMaster RPC 地址并注册,基于 ExecutorService 提交 Worker 节点的 HeartbeatThread 线程,包括:

  • BlockMasterSync:将 Worker 节点 Block 信息定时同步 BlockMaster 进行统一 block 元数据管理;
  • PinListSync:维护 Alluxio 与底层 UFS 的联通地址;
  • StorageChecker:校验存储地址;

4.3. AlluxioProxy

4.3.1. 启动流程

  • 基于 ProxyProcess.Factory 创建对应的进程对象:AlluxioProxyProcess;
  • 创建 AlluxioProxyProcess 进程对象后,执行 start 方法,调用 ProxyWebServer 执行 start 方法,启动 Proxy Web 服务;

image.png

4.4. AlluxioJobMaster

4.4.1. 启动流程

  • 基于 AlluxioJobMasterProcess.Factory 创建对应的进程对象:AlluxioJobMasterProcess;
  • AlluxioJobMasterProcess 执行 start 方法,调用细节如下:

启动 AlluxioJobMaster 关联的 JournalSystem,并获取 Master Leader;

启动 Job 的 Server 服务,调用 JobMaster start;

分别启动 JobMaster 的 Web Server 和 RPC Server,提供对外通信服务;

image.png

4.4.2. JobMaster

Alluxio 内置轻量级的作业调度框架,JobMaster 处理 AlluxioJobMaster 中所有 job 管理相关操作。

start 启动流程概述:基于 PlanTracker 获取上一次调度系统中遗留的所有运行中执行计划并停止,提交 HeartbeatThread(心跳线程) 进行监测,提交的进程是:LostWorkerDetectionHeartbeatExecutor,用于检测心跳丢失的 Worker 节点;

4.5. AlluxioJobWorker

4.5.1. 启动流程

  • 通过 MasterInquireClient.Factory 获取 Alluxio Master 的地址和相关配置信息;
  • 创建 AlluxioJobWorkerProcess 进程对象,并执行 start 方法,具体如下:

注册 WebServer Handler 并启动 JobWorkerWebServer,提供 Web 服务;

启动 JobWorker 的 Server 服务 JobWorker,注册 job worker 节点,并提交心跳检测线程 CommandHandlingExecutor;

启动 RPC 服务于外部通信。

image.png

4.5.2. JobWorker

负责管理 Worker 节点中执行任务相关的所有操作,通过 CommandHandlingExecutor 心跳检测执行进程实现。

start 启动流程概述:向 JobWorkerIdRegistry 注册当前 worker 节点信息,提交 HeartbeatThread(心跳线程) 进行监测,提交的线程是:CommandHandlingExecutor,处理 JobWorker 节点所接受的 Command 命令。

五、RPC 框架

Alluxio 是分布式存储缓存系统,服务之间的通信经过 RPC 调用,其内部采用了 grpc 框架实现,在子项目 alluxio-core-transport 中定义 RPC 的 proto 文件。以 AlluxioMaster 为例,详述 RPC 启动调用流程:AlluxioMaster 进程启动的时候,会启动 grpc server 对外提供接口服务,其中 Server(Master 服务)中定义各个 Server 待注册启动的 RPC 服务,所有 RPC 服务注册到 GrpcServerBuilder 后,基于 GrpcServerBuilder.build 生成 GrpcServer 并启动。

Master RPC 和 Worker RPC 注册服务,都是基于 Handler 实现 grpc 定义的方法,如下所示:

《Alluxio-源码简析》下篇更精彩哦,已同步更新。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Hystrix

发表于 2021-11-16

「这是我参与11月更文挑战的第16天,活动详情查看:2021最后一次更文挑战」

1、简介

在微服务中,服务与服务之间的调用经常出现两个不确定性因素:

  1. 网络延迟
  2. 服务异常

延迟在微服务中是一个非常重要的性能指标,随着服务的增加,调用链越来越复杂,此时低延迟往往是微服务系统架构中首要目标;高网络延迟可能会拖垮整个微服务,这是不允出现的。此外服务内部可能会发生未知异常,或者未捕获的异常,这时异常如果没有得到正确的处理,将会沿着调用链往上抛出,这对上传调用链来说也是致命的,因为往往这个时候上层调用方它不知道该如何处理未知异常。

对于服务异常,我们应该在系统架构时满足维加斯规则(Vegas Rule) :在微服务中发生的事情,就留在该微服务中。通俗点说,微服务中发生的异常要自己处理,不应该给其他微服务返回非约定交互报文之外的任何信息。

对于网络延迟,这是无法避免的,CAP理论中也谈到过分布式架构中网络分区无法避免,用于可能发生;因此我们只能在可能发生网络延迟的地方,做超时设置、超时后的副本处理等操作。

Hystrix用于解决上面两个问题。( 注意,它并不能让错误不发生或者让网络延迟不发生,它只是提供了后备行为和自校正功能,可以用于优雅的处理错误和网络延迟。 )Hystrix的工作原理很简单,被保护的方法可以设定失败阈值,在给定的失败阈值内方法发生失败(异常/延迟),通过调用一个预先准备的后备方法来返回预先准备的数据报文(本质上仍然是通过切面实现)。Hystrix有三种状态,分别是关闭状态、打开状态、半开状态。

  1. 关闭状态(closed),Hystrix默认为关闭状态
  2. 打开状态(open),超过设定的失败阈值后,熔断机制打开,Hystrix进入打开状态,此时所有请求直接请求提供熔断方法,不再请求正常服务
  1. 半开状态(half open),Hystrix进入打开状态之后,超过circuitBreaker.sleepWindowInMilliseconds时间周期,Hystrix进入半打开状态,此时尝试调用正常服务,如果服务调用失败会重置为失败状态

\

2、正文

2.1 Hystrix使用场景

Hystrix多用于有网络延时的场景,因此其使用场景也是那些容易出现网络延迟的方法,比如说:

  1. 远程服务调用,rest请求
  2. 数据库访问
  1. 复杂且耗时的计算场景

2.2 Hystrix处理异常

Hystrix用于微服中,因此使用Hystrix之前,需要准备一个简单的微服务环境,指定Spring Cloud版本和Spring Boot版本,此外引入web依赖用于模拟微服务间调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
xml复制代码<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

Hystrix依赖导入

1
2
3
4
5
6
xml复制代码<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
</dependencies>

配置服务启动端口

1
2
yaml复制代码server:
port: 18888

启动类增加 @EnableHystrix注解

1
2
3
4
5
6
7
8
9
less复制代码@SpringBootApplication
@EnableHystrix
public class ServiceApplication {

public static void main(String[] args) {
SpringApplication.run(ServiceApplication.class, args);
}

}

方法一:

编写使用Hystrix保护的方法,这里使用@HystrixCommand注解注释需要受Hystrix保护的方法,并且指定fallbackMethod属性的值为fallback,fallback是一个提前预置的方法,该方法与受保护的方法返回值一致,用于服务断路器打开时备用。我在demo方法中,直接抛出了一个RuntimeException,模拟服务调用失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
less复制代码@RestController
@RequestMapping("/fallback")
public class FallbackMethodController {

@GetMapping
@HystrixCommand(fallbackMethod = "fallback")
public ResponseEntity<String> demo() {
// 模拟服务异常
throw new RuntimeException("Error.");
}

private ResponseEntity<String> fallback() {
return new ResponseEntity<>("Hello World!", HttpStatus.OK);
}

}

对该rest接口发起请求,此时无论请求多少次都会得到Hello World!返回值。

方法二:

除了上面这种直接在方法指定后备方法之外,还可以采用另外一种方法,直接在Controller类上定义默认的后备方法,这样整个Controller需要受保护的方法,无需每个都明确指定后备方法了。(区别:@HystrixCommand无需再指定fallbackMethod)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
less复制代码@RestController
@RequestMapping("/defaultFallback")
// 整体定义后备方法
@DefaultProperties(defaultFallback = "defaultFallback")
public class DefaultFallbackMethodController {

@GetMapping
@HystrixCommand
public ResponseEntity<String> demo() {
throw new RuntimeException("Error.");
}

public ResponseEntity<String> defaultFallback() {
return new ResponseEntity<>("Hello World.", HttpStatus.OK);
}
}

2.3 Hystrix处理超时

Hystrix除了能优雅的处理未知异常之外,其另外一个能力就是方法执行延迟的处理, @HystrixCommand注解默认情况下设置了1秒的超时时间,如果1秒内方法未返回,将会执行预置的后备方法。1秒的超时时间不一定满足所有的业务场景,或者有些方法它就是硬不要设置超时时间,关于这些需求Hystrix都提供了相应的配置项。

@HystrixCommand注解中提供了commandProperties属性,它是一个HystrixProperty数组,因此@HystrixProperty可以定义多个;其中name指定要配置的项,value指定对应配置项的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
less复制代码@RestController
@RequestMapping("/timeout")
public class TimeoutController {

@GetMapping
@HystrixCommand(
fallbackMethod = "fallback",
commandProperties = {
@HystrixProperty(
name = "execution.isolation.thread.timeoutInMilliseconds",
value = "2000"
)
})
public ResponseEntity<String> demo() {
try {
// 模拟接口请求,时间设置为3秒
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new ResponseEntity<>("Hello!", HttpStatus.OK);
}


private ResponseEntity<String> fallback() {
return new ResponseEntity<>("Timeout!", HttpStatus.OK);
}

}

接口并未返回Hello!,而是返回了后备方法的返回值Timeout!,这是因为我们设值的超时时间是2秒,而 TimeUnit.SECONDS.sleep(3)睡眠了3秒,导致熔断器打开,返回了后备方法。

Hystrix的方法超时时间也可以关闭, @HystrixProperty提供了关闭的开关如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
less复制代码@RestController
@RequestMapping("/closeTimeout")
public class TimeoutDisableController {


@GetMapping
@HystrixCommand(
fallbackMethod = "fallback",
commandProperties = {
@HystrixProperty(
name = "execution.timeout.enabled",
value = "false"
)
})
public ResponseEntity<String> demo() {
try {
// 模拟接口请求
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new ResponseEntity<>("Hello!", HttpStatus.OK);
}


private ResponseEntity<String> fallback() {
return new ResponseEntity<>("Timeout!", HttpStatus.OK);
}

}

此时/closeTimeout接口无论多久多不会触发超时保护(理论上不会这样玩儿!)

2.4 Hystrix断路器阈值设置

上面有说到Hystrix断路器的三个状态,在默认情况下,Hystrix保护的方法,在10秒内,请求次数超过了20次,50%以上的请求发生失败, 断路器将会进入打开状态,5秒后断路器进入半开状态,尝试重新调用原始的方法,如果调用失败,断路器直接变为打开状态。

Hystrix断路器阈值,默认配置:

在给定的时间范围内,方法应该被调用的次数

circuitBreaker.requestVolumeThreshold = 20


在给定时间范围内,方法调用产生失败的百分比

circuitBreaker.errorThresholdPercentage = 50%

请求量和错误百分比的滚动时间周期

metrics.rollingStats.timeInMilliseconds = 10000

处于打开状态的断路器,要经过多长时间才会进入半开状态,进入半开状态之后,将会再次尝试原始方法

circuitBreaker.sleepWindowInMilliseconds = 5000

如下将默认断路器阈值进行修改,修改后60秒内,请求次数超过4次,50%以上的请求失败,断路器就会进入打开状态,并且60秒后断路器才会进入半开状态,尝试调用原始方法。我这里设置成这样是为了方便测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
less复制代码@RestController
@RequestMapping("/circuitBreaker")
public class CircuitBreakConfigController {


@GetMapping
@HystrixCommand(
fallbackMethod = "fallback",
commandProperties = {
@HystrixProperty(
name = "execution.isolation.thread.timeoutInMilliseconds",
value = "1000"
),
@HystrixProperty(
name = "circuitBreaker.requestVolumeThreshold",
value = "4"
),
@HystrixProperty(
name = "circuitBreaker.errorThresholdPercentage",
value = "50"
),
@HystrixProperty(
name = "metrics.rollingStats.timeInMilliseconds",
value = "60000"
),
@HystrixProperty(
name = "circuitBreaker.sleepWindowInMilliseconds",
value = "60000"
)
})
public ResponseEntity<String> demo() {
try {
// 模拟接口请求
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new ResponseEntity<>("Hello!", HttpStatus.OK);
}


private ResponseEntity<String> fallback() {
return new ResponseEntity<>("Timeout!", HttpStatus.OK);
}


}

将超时时间设置为1秒,方法中执行TimeUnit.SECONDS.sleep(2);使得线程阻塞2秒,显然每次调用都会失败,因此在第四之后(60s内)的请求,都会直接执行后备方法。

2.5 Hystrix与Fegin集成

很多时候我们会使用Open Fegin来向服务端请求数据,这个时候我们可以使用Hystrix来包含Fegin Client,集成方式也十分简单。

OpenFeign中已经集成了Hystrix,因此不需要再单独导入Hystrix的依赖

1
2
3
4
5
xml复制代码<!--openfeign 中已经依赖了Hystrix-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

启动类中添加@EnableFeignClients注解,该注解默认支持Hystrix

1
2
3
4
5
6
7
8
9
less复制代码@SpringBootApplication
@EnableFeignClients // 支持Hystrix
public class ConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}

}

开启Feign对Hystix的支持,此外由于Feign集成的Ribbon,Ribbon也有默认的请求超时时间,因此我们要想正确的使用Hystrix带来的熔断保护,就应该将Ribbon的超时时间设定的比Hystrix的超时时间大。(两者默认超时时间都是1秒)

1
2
3
4
5
6
7
8
9
10
11
12
yaml复制代码server:
port: 19999

# 开启feign对hystrix的支持
feign:
hystrix:
enabled: true
client:
config:
default:
readTimeout: 3000
connectTimeout: 3000

定义一个Feign Client,并指定fallback类,该类需要实现Feign Client才能提供熔断服务。注意,Feign Client的实现类需要添加到容器中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
less复制代码@FeignClient(value = "service", url = "http://localhost:18888/fallback", fallback = ServerClientFallback.class)
public interface ServerClient {

@GetMapping
ResponseEntity<String> demo();

}

@Component
class ServerClientFallback implements ServerClient {

@Override
public ResponseEntity<String> demo() {
return new ResponseEntity<>("Client FallBack!", HttpStatus.OK);
}
}

定义一个测试controller,我们不启动1888服务,模拟服务端不可用的情况!

1
2
3
4
5
6
7
8
9
10
11
12
less复制代码@RestController
@RequestMapping("/feign")
public class FeignController {

@Autowired
private ServerClient serverClient;

@GetMapping
public ResponseEntity<String> demo() {
return serverClient.demo();
}
}

此时触发服务降级,直接返回Client FallBack!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Linux 小知识 文件管理小知识 Linux 小知识

发表于 2021-11-16

这是我参与11月更文挑战的第16天,活动详情查看:2021最后一次更文挑战

Linux 小知识 | 文件管理小知识

Vim 定位行

快速定位到指定位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
bash复制代码$ vim 文件名 行数 # 查看文件并定位到具体行数

[root@VM-8-10-centos app]# vim --help
VIM - Vi IMproved 8.0 (2016 Sep 12, compiled Jun 18 2020 15:49:08)

usage: vim [arguments] [file ..] edit specified file(s)
or: vim [arguments] - read text from stdin
or: vim [arguments] -t tag edit file where tag is defined
or: vim [arguments] -q [errorfile] edit file with first error

Arguments:
-- Only file names after this
-v Vi mode (like "vi")
-e Ex mode (like "ex")
-E Improved Ex mode
-s Silent (batch) mode (only for "ex")
-d Diff mode (like "vimdiff")
-y Easy mode (like "evim", modeless)
-R Readonly mode (like "view")
-Z Restricted mode (like "rvim")
-m Modifications (writing files) not allowed
-M Modifications in text not allowed
-b Binary mode
-l Lisp mode
-C Compatible with Vi: 'compatible'

异常处理

如何出现? 编辑文件时, 服务器连接异常关闭

如果 vim 异常退出, 在硬盘上可能会保存有交换文件

修改 a.txt 不会在原文件上修改 会创建 a.txt.swp(交换文件) 修改, 保存时在写回到a.txt中

处理方法

1
bash复制代码rm -rf a.txt.swp # 删除即可

echo 命令

1
2
3
4
5
6
7
bash复制代码echo 字符串 # 展示文本
[root@VM-8-10-centos touch-test]# echo "hello world"
hello world

echo 字符串 > 文件名 # 将字符串写到文件中(覆盖文件中的内容)
echo 字符串 >> 文件名 # 将字符串写到文件中(不覆盖文件中内容)
cat 不存在的目录 &>> error.log # 将命令的失败结果 追加到 error.log文件的后面

软连接

类似于 windows 中的快捷方式

为什么需要软链接?

因为某些文件和目录的路径很深,所以需要增加软链接(快捷方式)

1
2
3
4
5
6
7
8
9
10
bash复制代码ln -s 目标文件路径 快捷方式路径

[root@VM-8-10-centos app]# ln --help
Usage: ln [OPTION]... [-T] TARGET LINK_NAME
or: ln [OPTION]... TARGET
or: ln [OPTION]... TARGET... DIRECTORY
or: ln [OPTION]... -t DIRECTORY TARGET...
# 示例
cd app
ln -s /app/command/test test

find 命令

在指定目录下查找文件或文件夹

1
2
3
4
5
6
7
8
9
10
11
12
bash复制代码find --help
Usage: find [-H] [-L] [-P] [-Olevel] [-D help|tree|search|stat|rates|opt|exec] [path...] [expression]

语法: find [参数选项] <指定目录> <指定条件> <指定内容> # 在指定目录下查找文件
-name filename # 查找名为filename的文件 按文件名查找
-ctime -n或+n # 按时间来查找文件, -n指n天以内, +n指n天以前

[root@VM-8-10-centos app]# find . -name setup
./test/setup
./setup
# 全盘查找
find / -name "*.txt"

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

浅谈Rocketmq源码-消息存储(一)

发表于 2021-11-15

脉络

image.png

Rocketmq消息存储涉及几个比较重要的文件,我们先来看看这几个文件

CommitLog:存储消息的元数据,每个CommitLog文件的大小默认为1G。文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1073741824,当第一个文件写满了,会创建第二个文件,名为00000000001073741824,起始偏移量为1073741824,以此类推

ConsumerQueue:消费队列,主要用于消费拉取消息、更新消费位点等所用的索引

Index:索引文件,提供了一种可以通过key或时间区间来查询消息的方法

abort:这个设计本人觉得比较巧妙,如果这个文件存在,表示Rocketmq非正常关闭,如果这个文件不存在,表示Rocketmq正常关闭

在Rocketmq中,CommitLog、ConsumerQueue、Index这些文件都被映射成存储对象MappedFile,消息到来时会先存储在CommitLog,ConsumeQueue 和 Index文件是通过ReputMessageService的异步线程根据CommitLog的数据对其进行更新

文件创建

当有一条消息过来,Broker首先要思考的是:这条消息应该存在哪个文件中,上文说到,这些文件叫做CommitLog,默认大小为1G,那么这些文件是什么时候创建的?Rockermq有一个文件预分配机制,当上一个CommitLog写满时,自然要获取下一个CommitLog,文件预分配机制使得获取下一个文件时,不需要等待文件创建,下面我们来看看Rocketmq的文件预分配机制是怎么做的

我们先看一个比较关键的类:AllocateMappedFileService,Broker启动的时候会初始化DefaultMessageStore,DefaultMessageStore初始化的时候会初始化AllocateMappedFileService,根据下图,可以看到,AllocateMappedFileService继承了ServiceThread,ServiceThread实现了Runnable接口,我们直接看这个类的run方法

image.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
kotlin复制代码public void run() {
   log.info(this.getServiceName() + " service started");
// while循环,服务不停止就一直调用mmapOperation这个方法
   while (!this.isStopped() && this.mmapOperation()) {
​
  }
   log.info(this.getServiceName() + " service end");
}
​
// 此线程不断循环调用的mmapOperation究竟做了什么操作
private boolean mmapOperation() {
   boolean isSuccess = false;
   AllocateRequest req = null;
   try {
     // 从优先队列中获取AllocateRequest
     // 如果requestQueue为空,会阻塞等待唤醒
     req = this.requestQueue.take();
     // 从ConcurrentMap requestTable中获取AllocateRequest
     AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
     // 校验
     // requestQueue和requestTable中的数据需一致
     if (null == expectedRequest) {
       log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
                + req.getFileSize());
       return true;
    }
     if (expectedRequest != req) {
       log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "
                + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
       return true;
    }
// 如果MappedFile为空,表示要创建MappedFile
     if (req.getMappedFile() == null) {
       // 记录开始创建MappedFile的时间
       long beginTime = System.currentTimeMillis();
​
       MappedFile mappedFile;
       // 判断是否开启isTransientStorePoolEnable,如果开启则使用直接内存写入数据,这个判断有三个条件
       // 1.开启transientStorePoolEnable配置
    // 2.异步输盘
    // 3.必须是Broker主节点
       // transientStorePoolEnable = true 时,mappedByteBuffer 只是用来读消息,堆外内存用来写消息,从而实现对于消息的读写分离
       if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
         try {
           mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
           mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
        } catch (RuntimeException e) {
           log.warn("Use default implementation.");
           mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
        }
      } else {
         // 使用 mmap 方式创建MappedFile
         mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
      }
// 计算创建MappedFile所用的时间
       long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
       // 如果超过10ms则打印告警日志
       if (elapsedTime > 10) {
         int queueSize = this.requestQueue.size();
         log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize
                  + " " + req.getFilePath() + " " + req.getFileSize());
      }

       // 满足这两个条件会进行文件预热
       // 1.配置了进行文件预热
       // 2.只有 CommitLog 才进行文件预热,所以MappedFile的文件大小需大于CommitLog的文件大小,CommitLog 的大小默认为1G
       // 文件预热会进行数据预写入,根据系统的 pageSize 对每个 pageSize 写入一个字节数据。
       if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
          .getMappedFileSizeCommitLog()
           &&
           this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
         mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                                   this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
      }
​
       req.setMappedFile(mappedFile);
       this.hasException = false;
       isSuccess = true;
    }
  }
......
  } finally {
     if (req != null && isSuccess)
       req.getCountDownLatch().countDown();
  }
   return true;
}

可以看到,mmapOperation不断循环就做了两件事

  1. 初始化MappedFile
  2. 预热MappedFile

这里有一个问题,为什么要进行文件预热?

要知道这个问题的答案,需要先了解一下Page Cache
Page Cache 是 OS 对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于 OS 使用 Page Cache 机制对读写访问操作进行了性能优化,将一部分的内存用作 Page Cache。对于数据的写入,OS 会先写入至 Cache 内,随后通过异步的方式由 pdflush 内核线程将 Cache 内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中 Page Cache 的情况,OS 从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。文件预热可以防止出现缺页中断,从磁盘重新加载数据到内存

以上这段解释来自 segmentfault.com/a/119000004…

创建的命令是来自优先队列,如果优先队列中没有AllocateRequest,会一直阻塞,那么AllocateRequest是从哪里放进去的

在将消息存储到CommitLog前,需要知道这些消息要存储到哪个CommitLog,所以会有一个获取写入文件的操作,如果此文件不存在,或者上一个文件已写满,便会创建MappedFile,AllocateRequest便是在此时放进优先队列中的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
kotlin复制代码public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
   long createOffset = -1;
// 获取将要写入的CommitLog对应的MappedFile
   MappedFile mappedFileLast = getLastMappedFile();
// 如果MappedFile为空,表示要进行创建
   if (mappedFileLast == null) {
     // 计算出新文件的起始偏移量(起始偏移量即文件名称)
     createOffset = startOffset - (startOffset % this.mappedFileSize);
  }
// 如果MappedFile写满了,同样要计算新文件的起始偏移量
   if (mappedFileLast != null && mappedFileLast.isFull()) {
     createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
  }
​
   if (createOffset != -1 && needCreate) {
     // 拼接文件名称
     String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
     String nextNextFilePath = this.storePath + File.separator
       + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
     MappedFile mappedFile = null;
// allocateMappedFileService 已初始化,创建下一个文件和下下个文件
     if (this.allocateMappedFileService != null) {
       mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                                                                                 nextNextFilePath, this.mappedFileSize);
    } else {
       // allocateMappedFileService 未初始化,直接创建文件
       try {
         mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
      } catch (IOException e) {
         log.error("create mappedFile exception", e);
      }
    }
// 将创建的 MappedFile 对象添加到 mappedFiles 列表中
     if (mappedFile != null) {
       if (this.mappedFiles.isEmpty()) {
         mappedFile.setFirstCreateInQueue(true);
      }
       this.mappedFiles.add(mappedFile);
    }
// 返回新创建的MappedFile
     return mappedFile;
  }
​
   return mappedFileLast;
}

提交请求的具体代码在AllocateMappedFileService类中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
kotlin复制代码public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
// 默认提交两个请求
   int canSubmitRequests = 2;
   if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
     if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
         && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
       canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
    }
  }
​
   AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
// 向ConcurrentHashMap中存放AllocateRequest对象,如果存放失败说明有别的线程已经创建相同的文件
   boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
// 存放成功,向requestQueue存放AllocateRequest对象
   if (nextPutOK) {
     // 异常处理
     if (canSubmitRequests <= 0) {
       log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
       this.requestTable.remove(nextFilePath);
       return null;
    }
     boolean offerOK = this.requestQueue.offer(nextReq);
     if (!offerOK) {
       log.warn("never expected here, add a request to preallocate queue failed");
    }
     canSubmitRequests--;
  }
// 相同的逻辑,上面是创建下一个文件,此处是创建下下个文件
   AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
   boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
   if (nextNextPutOK) {
     if (canSubmitRequests <= 0) {
       log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
       this.requestTable.remove(nextNextFilePath);
    } else {
       boolean offerOK = this.requestQueue.offer(nextNextReq);
       if (!offerOK) {
         log.warn("never expected here, add a request to preallocate queue failed");
      }
    }
  }
​
  ......
​
   AllocateRequest result = this.requestTable.get(nextFilePath);
   try {
     if (result != null) {
       // 等待下一个MappedFile文件创建完成
       boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
       if (!waitOK) {
         log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
         return null;
      } else {
         // 删除requestTable中对应的数据
         this.requestTable.remove(nextFilePath);
         return result.getMappedFile();
      }
    } else {
       log.error("find preallocate mmap failed, this never happen");
    }
  } catch (InterruptedException e) {
     log.warn(this.getServiceName() + " service has exception. ", e);
  }
​
   return null;
}

消息写入文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
kotlin复制代码public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
           PutMessageContext putMessageContext) {
   assert messageExt != null;
   assert cb != null;
   // 获取消息的写指针
   int currentPos = this.wrotePosition.get();
   // 当前写指针小于文件的大小,那就文件还没写满
   if (currentPos < this.fileSize) {
     // writeBuffer不为空,则取writeBuffer
     // 什么时候writeBuffer不为空呢?
     // 判断的根据为transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
     //       && BrokerRole.SLAVE != getBrokerRole();
     // 1.开启transientStorePoolEnable配置
     // 2.异步输盘
     // 3.必须是Broker主节点
     ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
     byteBuffer.position(currentPos);
     AppendMessageResult result;
     // 处理单个消息
     if (messageExt instanceof MessageExtBrokerInner) {
       result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                            (MessageExtBrokerInner) messageExt, putMessageContext);
       // 处理批量消息
    } else if (messageExt instanceof MessageExtBatch) {
       result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                            (MessageExtBatch) messageExt, putMessageContext);
    } else {
       return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }
     // 更新wrotePosition的位置
     this.wrotePosition.addAndGet(result.getWroteBytes());
     this.storeTimestamp = result.getStoreTimestamp();
     return result;
  }
   log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
   return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

doAppend是具体的写入文件逻辑,这个方法处理数据的方式比较复杂,本人水平有限,也没看懂,我们只看一个比较重要的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
kotlin复制代码public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
           final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
           
​
  ......
​
   // 确认是否有足够的空闲空间
// 如果超过,返回END_OF_FILE,在此方法的外面会处理这种场景
// 处理方式是创建一个新的文件存储消息
   if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
     this.msgStoreItemMemory.clear();
     this.msgStoreItemMemory.putInt(maxBlank);
     // BLANK_MAGIC_CODE表示一个CommitLog文件结尾魔法值,当设置成这个魔法值表示文件已写完
     this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
     final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
     byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
     return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
                                    maxBlank,
                                    msgIdSupplier, msgInner.getStoreTimestamp(),
                                    queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
  }
​
  ......
   return result;
}

未完待续

参考资料

segmentfault.com/a/119000004…

cloud.tencent.com/developer/a…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Python面向对象编程01:入门类和对象 什么是面向对象编

发表于 2021-11-15

「这是我参与11月更文挑战的第13天,活动详情查看:2021最后一次更文挑战」

正式的Python专栏第36篇,同学站住,别错过这个从0开始的文章!

前面写了文件的读取和文件处理等其他函数,里面用到了os库。

本来想分享os库,发现这个库可能对于初学者来说比较难,所以后面再等合适的时间谈谈,因为设计了很多文件系统的操作,比较偏Linux运维。

我们先看看面向对象编程吧,就是很多人口中谈到的OOP(Object-oriented Programming)

什么是面向对象编程?

我们是不是听过面向过程,拿来放在一起对比就比较好理解了。

简单来理解,面向过程就是一切用函数解决一切文件,简单粗暴!

面向对象是面向过程编程之后才出现了,没有面向对象编程很多程序还不是照样开发。

面向对象,也使用函数,但是多了一个网,这个网把一个或者多个函数,和数据关联在一起,然后称为一类事物,也就是程序中的‘类’(class)

定义类,从具体代码来感受吧!

面向对象编程,首先提出的第一个概念就是‘class’,类:

1
2
3
python复制代码#这就是一个class的定义代码:
class hello_class():
pass

然后通过class_name() 这样调用来生产对象。

代码稍微升级一下,我们看看:

1
2
3
4
5
6
7
8
9
python复制代码class hello_class():
pass

#输出类信息
print(hello_class)
print(type(hello_class))
#创建类的实例对象
print(hello_class())
print(type(hello_class()))

稍微补充一下:

print函数输出类对象的结果:通常是<’class全名‘ object at id序列号>

下面是运行结果:

屏幕快照 2021-11-16 上午8.52.25.png

这里我们加入新知识点:类实例对象 , 通常直接说,实例。

实例是class产生的对象,所有某个hello_class对象的类型(通过type函数获取)都必定是hello_class。

多个类和对象的观察

看完一个类,我们再看看两个类的对比,结果也是一致的。

下面是两个类的定义和生成对象的代码展示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
python复制代码#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/15 11:58 下午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : __init__.py.py
# @Project : hello


class student(object):
“”“学委补充一下:__init___ 函数为类的初始化函数,在创建类对象实例的时候这个函数会被调用。”“”
def __init__(self):
print("hello, I am student")


class programmer(object):
def __init__(self):
print("hello, I am programmer")


class student(object):
def __init__(self):
print("hello, I am student")


class programmer(object):
def __init__(self):
print("hello, I am programmer")


s1 = student
print(s1)
p1 = programmer
print(p1)
s11 = student
print(s11)
p11 = programmer
print(p11)

print("*" * 16)
# 创建对象
s2 = student()
print(s2)
p2 = programmer()
print(p2)

# 创建对象
s3 = student()
print(s3)
p3 = programmer()
print(p3)

稍微解释一下:

s1 和 p1 这两个变量打印输出结果是‘class’类型的。

s11 和 p11 这两个变量打印输出结果是‘class’类型的,但是s1跟s11,p1跟p11 是不变的。

s2 和 p2 这两个变量打印输出结果是’object’类型的。

s3 和 p3 这两个变量打印输出结果是’object’类型的。\

下面是运行结果:

屏幕快照 2021-11-16 上午9.01.31.png

初始化函数被调用了打印了对象信息。

到这里,大家应该都能知道class和object区别了吧

类: 描述了函数和属性的固定关系
(类实例)对象: 基于这种固定关系的一个活生生的个体,它的id是变化的。

补充一下类的属性(数据部分)

学委定义了一个student类,并创建了两个学生对象。

直接复制运行下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
ruby复制代码#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/15 11:58 下午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : __init__.py.py
# @Project : hello


class student(object):
def __init__(self, name):
self.name = name

def get_name(self):
return self.name

def set_name(self, name):
self.name = name

def study(self):
print(f"{self.name} : 好好学习,天天向上!")



s1= student("小白")
print(s1)
print(s1.get_name())
print(s1.study())

s2 = student("学委的学生粉丝:哈哈哈")
print(s2)
print(s2.get_name())
print(s2.study())

我们看到他们的id总是不同的(运行几次看看)。

然后每个学生都有一个name属性(携带姓名数据),和三个函数属性(分别用来获取姓名,改名,学习)。

然后我们调用了每个学生的study函数,输出了各自的学习状态。

屏幕快照 2021-11-16 上午9.15.25.png

总结

Python语言的简约设计,使得面向对象编程非常简单,轻松就定义和和获取对象了。

上面的代码非常简单,但是可以很好的感受到类和对象的呈现,多练习。

对了,喜欢Python的朋友,请关注学委的 Python基础专栏 or Python入门到精通大专栏

持续学习持续开发,我是雷学委!

编程很有趣,关键是把技术搞透彻讲明白。

欢迎关注微信,点赞支持收藏!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…331332333…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%