开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

SpringCloud升级之路20200x版-38 实

发表于 2021-11-20

这是我参与11月更文挑战的第20天,活动详情查看:2021最后一次更文挑战

本系列代码地址:github.com/JoJoTec/spr…

实现 WeClient 的 NamedContextFactory

我们要实现的是不同微服务自动配置装载不同的 WebClient Bean,这样就可以通过 NamedContextFactory 实现。我们先来编写下实现这个 NamedContextFactory 整个的加载流程的代码,其结构图如下所示:

image

spring.factories

1
2
3
ini复制代码# AutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.github.jojotech.spring.cloud.webflux.auto.WebClientAutoConfiguration

在 spring.factories 定义了自动装载的自动配置类 WebClientAutoConfiguration

WebClientAutoConfiguration

1
2
3
4
less复制代码@Import(WebClientConfiguration.class)
@Configuration(proxyBeanMethods = false)
public class WebClientAutoConfiguration {
}

WebClientAutoConfiguration 这个自动配置类 Import 了 WebClientConfiguration

WebClientConfiguration

1
2
3
4
5
6
7
8
less复制代码@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(WebClientConfigurationProperties.class)
public class WebClientConfiguration {
@Bean
public WebClientNamedContextFactory getWebClientNamedContextFactory() {
return new WebClientNamedContextFactory();
}
}

WebClientConfiguration 中创建了 WebClientNamedContextFactory 这个 NamedContextFactory 的 Bean。在这个 NamedContextFactory 中,定义了默认配置 WebClientDefaultConfiguration。在这个默认配置中,主要是给每个微服务都定义了一个 WebClient

定义 WebClient 的配置类

我们编写下上一节定义的配置,包括:

  • 微服务名称
  • 微服务地址,服务地址,不填写则为 http://微服务名称
  • 连接超时,使用 Duration,这样我们可以用更直观的配置了,例如 5ms,6s,7m 等等
  • 响应超时,使用 Duration,这样我们可以用更直观的配置了,例如 5ms,6s,7m 等等
  • 可以重试的路径,默认只对 GET 方法重试,通过这个配置增加针对某些非 GET 方法的路径的重试;同时,这些路径可以使用 * 等路径匹配符,即 Spring 中的 AntPathMatcher 进行路径匹配多个路径。例如 /query/order/**

WebClientConfigurationProperties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
typescript复制代码@Data
@NoArgsConstructor
@ConfigurationProperties(prefix = "webclient")
public class WebClientConfigurationProperties {
private Map<String, WebClientProperties> configs;
@Data
@NoArgsConstructor
public static class WebClientProperties {
private static AntPathMatcher antPathMatcher = new AntPathMatcher();
private Cache<String, Boolean> retryablePathsMatchResult = Caffeine.newBuilder().build();
/**
* 服务地址,不填写则为 http://serviceName
*/
private String baseUrl;
/**
* 微服务名称,不填写就是 configs 这个 map 的 key
*/
private String serviceName;
/**
* 可以重试的路径,默认只对 GET 方法重试,通过这个配置增加针对某些非 GET 方法的路径的重试
*/
private List<String> retryablePaths;
/**
* 连接超时
*/
private Duration connectTimeout = Duration.ofMillis(500);
/**
* 响应超时
*/
private Duration responseTimeout = Duration.ofSeconds(8);

/**
* 是否匹配
* @param path
* @return
*/
public boolean retryablePathsMatch(String path) {
if (CollectionUtils.isEmpty(retryablePaths)) {
return false;
}
return retryablePathsMatchResult.get(path, k -> {
return retryablePaths.stream().anyMatch(pattern -> antPathMatcher.match(pattern, path));
});
}
}
}

粘合 WebClient 与 resilience4j

接下来粘合 WebClient 与 resilience4j 实现断路器以及重试逻辑,WebClient 基于 project-reactor 实现,resilience4j 官方提供了与 project-reactor 的粘合库:

1
2
3
4
5
xml复制代码<!--粘合 project-reactor 与 resilience4j,这个在异步场景经常会用到-->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-reactor</artifactId>
</dependency>

参考官方文档,我们可以像下面这样给普通的 WebClient 增加相关组件:

增加重试器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ini复制代码//由于还是在前面弄好的 spring-cloud 环境下,所以还是可以这样获取配置对应的 retry
Retry retry;
try {
retry = retryRegistry.retry(name, name);
} catch (ConfigurationNotFoundException e) {
retry = retryRegistry.retry(name);
}

Retry finalRetry = retry;
WebClient.builder().filter((clientRequest, exchangeFunction) -> {
return exchangeFunction.exchange(clientRequest)
//核心就是加入 RetryOperator
.transform(RetryOperator.of(finalRetry));
})

这个 RetryOperator 其实就是使用了 project-reactor 中的 retryWhen 方法实现了 resilience4j 的 retry 机制:

RetryOperator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
swift复制代码@Override
public Publisher<T> apply(Publisher<T> publisher) {
//对于 mono 的处理
if (publisher instanceof Mono) {
Context<T> context = new Context<>(retry.asyncContext());
Mono<T> upstream = (Mono<T>) publisher;
return upstream.doOnNext(context::handleResult)
.retryWhen(reactor.util.retry.Retry.withThrowable(errors -> errors.flatMap(context::handleErrors)))
.doOnSuccess(t -> context.onComplete());
} else if (publisher instanceof Flux) {
//对于 flux 的处理
Context<T> context = new Context<>(retry.asyncContext());
Flux<T> upstream = (Flux<T>) publisher;
return upstream.doOnNext(context::handleResult)
.retryWhen(reactor.util.retry.Retry.withThrowable(errors -> errors.flatMap(context::handleErrors)))
.doOnComplete(context::onComplete);
} else {
//不可能是 mono 或者 flux 以外的其他的
throw new IllegalPublisherException(publisher);
}
}

可以看出,其实主要填充了:

  • doOnNext(context::handleResult): 在有响应之后调用,将响应结果传入 retry 的 Context,判断是否需要重试以及重试间隔是多久,并且抛出异常 RetryDueToResultException
  • retryWhen(reactor.util.retry.Retry.withThrowable(errors -> errors.flatMap(context::handleErrors))):捕捉异常 RetryDueToResultException,根据其中的间隔时间,返回 reactor 的重试间隔: Mono.delay(Duration.ofMillis(waitDurationMillis))
  • doOnComplete(context::onComplete):请求完成,没有异常之后,调用 retry 的 complete 进行清理

增加断路器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ini复制代码//由于还是在前面弄好的 spring-cloud 环境下,所以还是可以这样获取配置对应的 circuitBreaker
CircuitBreaker circuitBreaker;
try {
circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId, finalServiceName);
} catch (ConfigurationNotFoundException e) {
circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId);
}

CircuitBreaker finalCircuitBreaker = circuitBreaker;
WebClient.builder().filter((clientRequest, exchangeFunction) -> {
return exchangeFunction.exchange(clientRequest)
//核心就是加入 CircuitBreakerOperator
.transform(CircuitBreakerOperator.of(finalCircuitBreaker));
})

类似的,CircuitBreakerOperator 其实也是粘合断路器与 reactor 的 publisher 中的一些 stage 方法,将结果的成功或者失败记录入断路器,这里需要注意,可能有的链路能走到 onNext,可能有的链路能走到 onComplete,也有可能都走到,所以这两个方法都要记录成功,并且保证只记录一次:

CircuitBreakerSubscriber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
java复制代码class CircuitBreakerSubscriber<T> extends AbstractSubscriber<T> {

private final CircuitBreaker circuitBreaker;

private final long start;
private final boolean singleProducer;

private final AtomicBoolean successSignaled = new AtomicBoolean(false);
private final AtomicBoolean eventWasEmitted = new AtomicBoolean(false);

protected CircuitBreakerSubscriber(CircuitBreaker circuitBreaker,
CoreSubscriber<? super T> downstreamSubscriber,
boolean singleProducer) {
super(downstreamSubscriber);
this.circuitBreaker = requireNonNull(circuitBreaker);
this.singleProducer = singleProducer;
this.start = circuitBreaker.getCurrentTimestamp();
}

@Override
protected void hookOnNext(T value) {
if (!isDisposed()) {
//正常完成时,断路器也标记成功,因为可能会触发多次(因为 onComplete 也会记录),所以需要 successSignaled 标记只记录一次
if (singleProducer && successSignaled.compareAndSet(false, true)) {
circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), value);
}
//标记事件已经发出,就是已经执行完 WebClient 的请求,后面判断取消的时候会用到
eventWasEmitted.set(true);

downstreamSubscriber.onNext(value);
}
}

@Override
protected void hookOnComplete() {
//正常完成时,断路器也标记成功,因为可能会触发多次(因为 onNext 也会记录),所以需要 successSignaled 标记只记录一次
if (successSignaled.compareAndSet(false, true)) {
circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());
}

downstreamSubscriber.onComplete();
}

@Override
public void hookOnCancel() {
if (!successSignaled.get()) {
//如果事件已经发出,那么也记录成功
if (eventWasEmitted.get()) {
circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());
} else {
//否则取消
circuitBreaker.releasePermission();
}
}
}

@Override
protected void hookOnError(Throwable e) {
//记录失败
circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), e);
downstreamSubscriber.onError(e);
}
}

我们会使用这个库进行粘合,但是不会直接使用上面的代码,因为考虑到:

  • 需要在重试以及断路中加一些日志,便于日后的优化
  • 需要定义重试的 Exception,并且与断路器相结合,将非 2xx 的响应码也封装成特定的异常
  • 需要在断路器相关的 Operator 中增加类似于 FeignClient 中的负载均衡的数据更新,使得负载均衡更加智能

在下面一节我们会详细说明我们是如何实现的有断路器以及重试逻辑和负载均衡数据更新的 WebClient。

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer:

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

String 与不可变对象

发表于 2021-11-20

这是我参与11月更文挑战的第20天,活动详情查看:2021最后一次更文挑战

什么是不可变对象 ?不可变对象指的是在创建一个对象之后 ,不能再改变它的状态 ,那么这个对象就是不可变的 。不能改变状态的意思是 ,不能改变对象内的成员变量 ,包括基本数据类型的值不能改变 ,引用类型的变量不能指向其它的对象 ,引用类型指向的对象状态也不能改变 。

这里插播一下对象和对象的引用之间的区别 ,对象的引用是放在栈中的 ,而对象是放在堆中的 ,看这个例子 String s = "123" ; s = "456" ; 表面上 s 看是变了 ,但是要搞清楚 ,变的只是 String 对象的引用 s ,而 “123” 这个对象是没有变化的 。看一个图 :

我们都说 String 是不可变对象 ,那我们就分析一下 String 的源码来看看它是怎么保证 String 对象不可变的 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
arduino复制代码 public final class String
implements java.io.Serializable, Comparable<String>, CharSequence {
/** The value is used for character storage. */
private final char value[];

/** Cache the hash code for the string */
private int hash; // Default to 0

public String(String original) {
this.value = original.value;
this.hash = original.hash;
}

public String concat(String str) {
int otherLen = str.length();
if (otherLen == 0) {
return this;
}
int len = value.length;
char buf[] = Arrays.copyOf(value, len + otherLen);
str.getChars(buf, len);
return new String(buf, true);
}

public boolean equals(Object anObject) {
if (this == anObject) {
return true;
}
if (anObject instanceof String) {
String anotherString = (String) anObject;
int n = value.length;
if (n == anotherString.value.length) {
char v1[] = value;
char v2[] = anotherString.value;
int i = 0;
while (n-- != 0) {
if (v1[i] != v2[i])
return false;
i++;
}
return true;
}
}
return false;
}
...

我们通过分析源码可知 ,首先 String 类和存放字符串的 char 型数组都是 final 类型的 ,保证了 String 类不能被继承 ,value 这个引用本身不会该变 ,但是这样还不能说明属性是不可变的 ,因为我们可以通过改变 value 数组中具体的值来达到改变 value 的目的 。

1
2
3
4
5
6
7
8
9
10
ini复制代码public static void main(String[] args) {
char[] value = {'a','b','c','d'};
System.out.println(value);
for (int i = 0; i < value.length; i++) {
if(i == 1){
value[i] = 'B';
}
}
System.out.println(value); //aBcd
}

但是进一步研究会发现 ,对于 value 这个属性 ,被定义为 private ,而且 String 并没有提供相对应的 get set 方法 ,所以我们也不能操作它 。而那些我们常认为改变了 String 对象的方法 ,比方说 subString concat toUpperCase tirm 看了源码之后发现 ,这些方法都是重新创建了一个 char 数组 ,并没有改动之前的对象 。这也就是我们常说 String 是不可变对象的原因 。但是我们还是可以通过反射的方式来改变 value 的值 ,看个例子就好 ,但是我们一般不这么做 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
arduino复制代码public static void main(String[] args) throws Exception {
//创建字符串"Hello World", 并赋给引用s
String s = "Hello World";
System.out.println("s = " + s); //Hello World

//获取String类中的value字段
Field valueFieldOfString = String.class.getDeclaredField("value");
//改变value属性的访问权限
valueFieldOfString.setAccessible(true);

//获取s对象上的value属性的值
char[] value = (char[]) valueFieldOfString.get(s);
//改变value所引用的数组中的第5个字符
value[5] = '_';

System.out.println("s = " + s); //Hello_World
}

为什么要将 String 设计为不可变对象呢 ?当然是为了提高效率和安全 ,也是由于 String 的超高使用频率 。效率主要体现在当我们复制 String 对象的时候我只需要复制引用即可 ,不需要复制具体的对象 ,而在多线程环境中 ,若是不同的线程同时修改 String 对象 ,相互之间也不会有影响 。因为一旦改变都会创建一个新的字符串 ,保证了线程的安全 。

另外在堆中有个字符串常量池 ,我们创建的字符串都会存在这里 ,当创建相同的字符串的时候 ,其实指向的是同一个地方 。这就节省了大量的空间 ,当然 ,能出现字符串常量池也是因为 String 是不可变对象 。

因为 String 不可变 ,所以在创建对象的时候就已经将 hashCode 的值计算出来缓存在字段 hash 中 。这样在 Map 中 String 就很适合作为主键 ,速度快 。(因为在散列表中我们定位时会计算主键的 hash 值 。)

1
2
arduino复制代码/** Cache the hash code for the string */
private int hash; // Default to 0

还有在数据库连接的时候我们通过字符串来传递用户名 ,密码 ,连接的库等信息 ,若字符串可变 ,则很可能被黑客篡改 。

PS. 基本类型对应的包装类都是不可变对象 ,这样设计的原因还是因为使用太频繁 ,好处在上面已经说过了 。突然感觉学到了好多之前没注意到的细节 。

源码版本为 JDK 1.7

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

力扣刷题笔记【双指针】 → 594 最长和谐子序列

发表于 2021-11-20

这是我参与11月更文挑战的第20天,活动详情查看:2021最后一次更文挑战

题目

和谐数组是指一个数组里元素的最大值和最小值之间的差别 正好是 1 。

现在,给你一个整数数组 nums ,请你在所有可能的子序列中找到最长的和谐子序列的长度。

数组的子序列是一个由数组派生出来的序列,它可以通过删除一些元素或不删除元素、且不改变其余元素的顺序而得到。

示例

1
2
3
ini复制代码输入: nums = [1,3,2,2,5,2,3,7]
输出: 5
解释: 最长的和谐子序列是 [3,2,2,2,3]
1
2
ini复制代码输入: nums = [1,2,3,4]
输出: 2
1
2
ini复制代码输入: nums = [1,1,1,1]
输出: 0

提示

  • 1 <= nums.length <= 2 * 10^4
  • -10^9 <= nums[i] <= 10^9

解题思路

双指针

题目要求是找到数组的子序列(通过删除一些元素或不删除元素,且不改变其余元素的顺序而得),很多人可能会想着那就没办法先排序了,然后就在想还有什么解题但方法。

但其实它这里面有个坑在这,和谐数组的定义是数组中的最大值和最小值直接的距离为1,那么也就是说,数组中只有相邻的两个数才能构成一个和谐数组,其余元素没得参合进来,那么我们就可以直接进行排序操作了,不是相邻的两个元素都进行删除操作,仅保留所需元素,这样得出的长度与排序前的长度也是一致的,并不会导致结果发生变化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码class Solution {
public int findLHS(int[] nums) {
// 排序
Arrays.sort(nums);
int max = 0;
for(int left = 0, right = 1; right < nums.length; ++right){
// 判断如果左边元素与右边元素相差超过1,则指针右移
while(left < right && nums[right] - nums[left] > 1){
++left;
}
// 判断是否为和谐数组
if(nums[right] - nums[left] == 1){
// 更新结果
max = Math.max(max, right - left + 1);
}
}

return max;
}
}

复杂度分析

  • 时间复杂度:O(NlogN)O(NlogN)O(NlogN)
  • 空间复杂度:O(1)O(1)O(1)

最后

文章有写的不好的地方,请大佬们不吝赐教,错误是最能让人成长的,愿我与大佬间的距离逐渐缩短!

如果觉得文章对你有帮助,请 点赞、收藏、关注、评论 一键四连支持,你的支持就是我创作最大的动力!!!

题目出处: leetcode-cn.com/problems/lo…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

电商系统设计之用户系统 前言 数据表 数据记录 致谢 交流

发表于 2021-11-20

电商大伙每天都在用,类似某猫,某狗等。
电商系统设计看似复杂又很简单,看似简单又很复杂
本章适合初级工程师及中级工程师细看,大佬请随意

前言

设计以以下为工具讲起

  • PHP为开发语言
  • 基于Laravel框架
  • MySQL为数据存储

电商的可变性与孩子的心情一样,变化极快,所以在设计之处就要想好大部分的功能接入及开发,尽量减少重构次数。对老板来说节约成本,对程序员来说“珍惜生命”

数据表

前期业务简单时,我们可以将数据表设计为下列的样子

TableName Comments
member 用户表
member_address 收货地址表
member_card 银行卡表
member_cart 购物车表
member_cart_item 购物车商品表
member_collect_product 商品收藏表
member_collect_supplier 店铺收藏表
member_data 用户信息表
member_query_history 用户搜索历史表
member_wallet 用户账户表
member_withdrawal 用户提现表

用户表

考虑到多种登录方式,应在数据表中涉及到微信的openid,unionid,支付宝、QQ的用户token等,这些要在前期就涉及进去,因后期用户量大了之后加一个字段简直是噩梦,用户状态status也必不可少,比较人也是分好坏,其次就是创建时间,登录时间等,用户表与用户信息表绝逼是绑定关系,这就不多言了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
sql复制代码CREATE TABLE `member` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`tel` bigint(20) DEFAULT NULL COMMENT '手机号码',
`password` varchar(555) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '登录密码',
`wx_token` varchar(125) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '微信TOKEN',
`im_token` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '用户融云token',
`open_id` varchar(125) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`status` enum('1','-1') COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '1' COMMENT '账号状态',
`created_at` timestamp NULL DEFAULT NULL,
`updated_at` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `member_tel_unique` (`tel`),
UNIQUE KEY `member_wx_token_unique` (`wx_token`)
) ENGINE=InnoDB AUTO_INCREMENT=95 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

image.png

收货地址表

收货地址与用户是一一相对的,在设计上增加需要的字段即可,例如 收货人、收货人手机号、城市、详细地址等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
sql复制代码CREATE TABLE `member_address` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`member_id` int(11) NOT NULL COMMENT '用户编号',
`nick_name` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '收货人姓名',
`tel` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '手机号码',
`prov` int(11) DEFAULT NULL COMMENT '省',
`city` int(11) NOT NULL COMMENT '市',
`area` int(11) DEFAULT NULL COMMENT '区',
`address` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '街道地址',
`number` int(11) NOT NULL COMMENT '邮政编码',
`default` enum('0','1') COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '0' COMMENT '默认收货地址 1=>默认',
`deleted_at` timestamp NULL DEFAULT NULL,
`created_at` timestamp NULL DEFAULT NULL,
`updated_at` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=55 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

image.png

银行卡表

用于用户提现的业务等,大致将银行卡所需的信息记录即可,例如持卡人、卡号、归属银行等

1
2
3
4
5
6
7
8
9
10
sql复制代码CREATE TABLE `member_card` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`member_id` int(11) NOT NULL COMMENT '用户编码',
`card_name` varchar(25) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '持卡人姓名',
`card_number` varchar(25) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '银行卡号',
`created_at` timestamp NULL DEFAULT NULL,
`updated_at` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `member_card_card_number_unique` (`card_number`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

image.png

购物车表

为何单独建这个表,也是又一定原因的,正常只需要member_cart_item表即可,根据实际下线的业务场景,正常购物到超市需要拿一个购物车,但这个购物车并非属于你,你使用之后,需要归还,他人可继续使用,将购物车公开化,并不是将购物车商品公开化。业务场景比较窄,例如京东到家和京东商城一样(我只是举例,并不清楚他们怎么做的),购物车不通用,那如何区分呢,是应该在购物车上区分还是在购物车商品上区分?我想你已经清楚了。

1
2
3
4
5
6
7
8
9
sql复制代码CREATE TABLE `member_cart` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`member_id` int(11) NOT NULL COMMENT '用户编码',
`created_at` timestamp NULL DEFAULT NULL,
`updated_at` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `member_cart_member_id_unique` (`member_id`),
KEY `member_cart_member_id_index` (`member_id`)
) ENGINE=InnoDB AUTO_INCREMENT=28 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

购物车商品表

这块需要提的一点是 [并不是所有表的设计都是互相绑定,互相依赖的],就例如购物车商品表,不仅仅将商品编码存储在内,还要将商品价格,商品的简介以及商品的规格(既SKU)存储,不能因卖家下架商品,而查询不到商品的存在,比较一切以用户为主,用户是上帝的原则,不能让商品悄悄的就消失了吧。所以在做购物车商品表查询时,切记不要使用join或者表关联查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
sql复制代码CREATE TABLE `member_cart_item` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`cart_id` int(11) NOT NULL COMMENT '购物车编码',
`product_desc` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '商品sku信息',
`product_img` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '商品快照',
`product_name` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '商品名称',
`price` decimal(8,2) NOT NULL DEFAULT '0.00' COMMENT '价格',
`product_id` int(11) NOT NULL COMMENT '商品编码',
`supplier_id` int(11) NOT NULL COMMENT '店铺编码',
`sku_id` int(11) NOT NULL COMMENT '商品sku编码',
`number` int(11) NOT NULL DEFAULT '1' COMMENT '商品数量',
`created_at` timestamp NULL DEFAULT NULL,
`updated_at` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `member_cart_item_cart_id_product_id_supplier_id_index` (`cart_id`,`product_id`,`supplier_id`)
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

用户搜索历史表

用户搜索的记录是一定要有的,为了未来的数据分析,智能推荐做准备,毕竟现在是信息共享的时代嘛~

1
2
3
4
5
6
7
8
sql复制代码CREATE TABLE `member_query_history` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`member_id` int(11) NOT NULL COMMENT '用户编码',
`keyword` varchar(125) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '关键字',
`created_at` timestamp NULL DEFAULT NULL,
`updated_at` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

数据记录

有很多场景,都要将标题呀,内容呀直接存储,类似与收藏的店铺和商品,无论卖家怎么做,用户购物车,订单不能动,这是基准。

致谢

感谢你们看到这里,下一篇我会讲一下关于电商系统的商品设计的部分。有什么问题可以评论区提问。谢谢

交流

生命不息,编码不止。

微信搜索 【一文秒懂】 传播技术正能量,持续学习新知识。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

菜鱼的开发日常-记一次缓存问题在实际开发中的解决方案 业务介

发表于 2021-11-20

作为一个摸鱼大户,每天上班后看看热搜、听听音乐、刷刷知乎和同事扯扯闲淡,多么惬意而又美好的生活。但是生活啊,总是那么的不尽如人意。俗话说,摸鱼摸多了,人就废了。这不,菜鱼我顶着个位数QPS的压力去实现我们的首页,话不多说,先看业务需求。

业务介绍

业务是为一个社区系统做一个CMS。对于前端的展示功能区,分为搜索、话题、banner轮播图和标签,标签的下面又划分为标签1、标签2和标签n。简单两句话也描述不清具体要干啥,还是直接上图吧。

C端展示效果图.png
下面介绍C端这几个功能的作用。

  • 搜索。根据用户输入的关键字或者已经配置的默认/热门搜索词来搜索社区内具体业务的数据,然后形成列表展示给用户。
  • banner。根据用户的点击来跳转到具体的页面(比如:广告或者活动页面)。
  • 话题。功能和搜索一致,只是搜索的关键词是给定的。
  • 标签。每一个标签对应着一个或多个业务混合的数据列表。

介绍完毕了C端的这几个功能,现在来分析一下CMS系统的用例。C端需要搜索词,banner,话题和配置的标签,以及每个标签内的数据配置。其实仔细分析一下,就能看出来,C端首页的这一堆标签和配置,后端提供一个接口就能解决。

而至于实际去搜索,简单点说,社区系统的业务数据会发消息到一个搜索系统,C端这个界面的所有列表和搜索,都是直接去搜索系统中查询,只有拿到业务的唯一标识去查看详情的时候,采取直接调用社区系统,还可以进行点赞,收藏、评论等操作,这里不过多的去赘述。

描述到这里了,下面是流程的全貌图。

C端用例.png
整个业务的流程就是这样了,介绍了业务流程,一眼就能看出,CMS系统的瓶颈就是查询的效率。对于CMS系统里面的配置,业务人员更改的频率比较低,按照菜鱼我所在的摸鱼无限无责任公司的尿性,一周更改一次都算是高频率了。

虽然更改的频率低,但是不排除万一哪天我们的市场人员突然人品大爆发,或者某个爬虫小老弟来搞我们,如果不对数据做特殊处理,我们那可怜的数据库容器可就遭殃了。既然这样,那就上Redis。

针对CMS这部分业务来说,我们并没有使用那些复杂的数据结构,而是直接使用的key-value。为什么呢,其中一个原因是CMS里面的数据量少,更重要的原因是没必要。适合自己的才是王道。既然使用缓存了,那么击穿、 穿透和雪崩这些是不是都需要考虑到呢?

缓存击穿

所谓缓存击穿,是指某数据不存在于缓存中,但是数据库中却存在。常见的场景是在缓存到期或因为更新数据库从而删缓存以后。单线程的理想情况下,是不存在缓存击穿这个说法的,因为缓存中不存在,就直接查一次库,然后把缓存设置上就可以了,这是程序运行最合理的方式。当在多线程的情况下,缓存中不存在数据,在一定时间内会造成数据库的拥堵,当第一批线程查库并且设置缓存结束以后,后面的线程再过来,就直接查询缓存了,就像下面这样。

缓存击穿 (1).png

问题出来了,如何抗住第一波请求,让请求去查库的技术尽可能的少,这才是我们的目的。常见的解决方案就是锁或缓存不过期,技术的选型是根据业务来的。这里我们选择的方案是加锁,因为数据更改的频率不确定。接下来就选择单机锁还是分布式锁的问题了,现在微服务很流行啊,为了高可用,把同一个服务部署在两容器上也不难,所以这里选择了分布式锁,setnx命令实现的分布式锁,至于其内部原理,这里不过多赘述。
先来看一下没有加锁代码是怎么实现的:

无锁的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码
ResponseVO queryData(){

// 先从缓存中查数据
ResonseVO vo=queryFromCache();

if(Objects.isNull(vo){

// 缓存中没有数据,去数据库中查询数据
DataBaseEntity entity=queryFromDataBase();
vo=dataConvert(entity);

// 设置缓存
setCache(vo);
}
return vo;
}

再来考虑一下如何使用分布式锁,有两个思路:

  • 第一个思路:当n个线程来同时来竞争锁的时候,只有一个线程能胜出,这个线程查库和设置缓存结束以后,释放掉锁。而后其他n-1个线程竞争,当某线程胜出后,查一遍缓存,发现缓存已经存在了,就直接返回查询值释放掉锁。
    就像下面这样:

分布式锁思路1.png

加锁的代码逻辑,思路一:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
scss复制代码ResponseVO queryData() {

int retry = 0;
boolean locked;
try {
do {

// 从缓存中查询数据
ResponseVO vo = queryFromCache();
if (Objects.isNull(vo) {
return vo;
}

// 尝试获取锁,setnx命令
locked = tryLock();
if (locked) {

// 获取到锁以后,再次从缓存查数据.
// 因为当前线程获取到的锁是被另外一个线程释放掉的,而另外一个线程此时已经设置了缓存
vo = queryFromCache();
if (Objects.isNull(vo) {
return vo;
}
// 如果缓存中还是没有数据,只能查库了
DataBaseEntity entity = queryFromDataBase();
vo = dataConvert(entity);

// 设置缓存
setCache(vo);
return vo;
}
retry++;
} while (!locked && retry <= 10); // 如果某线程一直获取不到锁,就进入死循环了,设置一个循环次数

} finally {
if (locked) {
// 释放锁
releaseLock();
}
}
return null;
}

这种方式如果控制不得当,比如某线程就是获取不到锁,那整个程序就陷入死循环了,所以加入一个重试机制,循环十次获取不到锁,就退出循环。

  • 第二个思路:还是n个线程来竞争,只有一个线程能胜出,其他没有能获取到锁的线程直接去sleep,给他们设置一个睡眠时间,让他们睡一觉再去获取锁。就像下面这样:

分布式锁思路2.png

加锁的代码逻辑,思路二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码ResponseVO queryData() {

int retry = 0;
boolean locked;
try {

do{

// 从缓存中查询数据
ResponseVO vo = queryFromCache();
if (Objects.isNull(vo) {
return vo;
}

// 尝试获取锁,setnx命令,没获取到锁,就睡两秒钟
locked = tryLock(2);
if (locked) {

// 获取到锁以后,再次从缓存查数据.
// 因为当前线程获取到的锁是被另外一个线程释放掉的,而另外一个线程此时已经设置了缓存
vo = queryFromCache();
if (Objects.isNull(vo) {
return vo;
}
// 如果缓存中还是没有数据,只能查库了
DataBaseEntity entity = queryFromDataBase();
vo = dataConvert(entity);

// 设置缓存
setCache(vo);
return vo;
}
retry++;
}while (!locked && retry<=10);
} finally {
if (locked) {
// 释放锁
releaseLock();
}
}
return null;
}

代码逻辑和思路一差不多,无非是在循环获取锁的时候做了一点小手脚。具体使用还是看并发量吧,至于你问菜鱼使用的是哪一种方式,那当然是高精尖的无锁的版本喽。原因无他,适合自己的才是王道。

缓存穿透

所谓缓存穿透,是指某数据不存在于缓存中,数据库中也不存在。这个问题应该对应于特定的业务,比如上面CMS系统中的banner和话题,这两个业务中的数据是可有可无的。面对这个问题,最简单的解决办法就是给缓存设置特殊值。
比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码
ResponseVO queryData(){

// 先从缓存中查数据
ResonseVO vo=queryFromCache();

if(Objects.isNull(vo){

// 缓存中没有数据,去数据库中查询数据
DataBaseEntity entity=queryFromDataBase();
if(Objects.isNull(entity){
// 数据库中不存在,就设置一个空值
vo=new ResonseVO();
}else{
vo=dataConvert(entity);
}
// 设置缓存
setCache(vo);
}
return vo;
}

至于加不加锁,那就看业务的并发量喽。不过,菜鱼也看到网络上有其他解决方案,比如权限校验,布隆过滤器等等,之前菜鱼也考虑过实现一个布隆过滤器,后来估了一下开发时间,还是算了。

缓存雪崩

所谓缓存雪崩,是指存在缓存中的数据批量过期或者频繁更新数据库,然后导致大量请求落到数据库上,从而给数据库造成压力。对于前者,解决方案就比较简单了,根据业务的具体需求,设置缓存不过期,或者把缓存过期的时间打散。
设置带有逾期时间的缓存,下面这条语句就是王道:

1
scss复制代码set(key,value,100+ThreadLocalRandom.current().nextInt(1,500));

对于后者,那就要考虑一下业务了,频繁更新的数据要不要放在缓存里面,以及这个数据的重要性。上文提到,社区系统里面的有多种类型的数据,其中一个是发帖。帖子的属性有点赞数,收藏数,评论数和阅读次数。用户A发帖了,用户B能看到,并且可以点赞、收藏和评论,根据业务需求,这三个数值是要存关系的。用户B可以看到自己点赞,收藏和评论了哪些帖子。这是帖子的属性:

贴子属性.png

这是业务全貌图:

数值更新.png

解释一下用例:

  • 查看详情。直接从数据库中把帖子拿出来,然后阅读数+1,异步更新表,这个数值的正确与否,不重要。
  • 用户主页。对业务而言,用户只需要看到和自己相关的帖子,这是重要的。
  • 对帖子的操作。只需要把关系存储到三张表中,至于帖子上的点赞、收藏和评论数量,不重要,甚至数值是错的都没关系,只要能把关系维护完善。

这就是频繁更新数据库的一个例子,甚至都没把数据放在缓存里面。之前我们在讨论的架构的时候,有人提出要把这几个数字存放在缓存里面,讨论来讨论去,得出三个字的结论:没必要。如果非得放在缓存里面,然后还需要维护一张数值和帖子的关系表,累不累啊!!!

没有最好设计,只有最适合的设计。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

PHP程序员如何简单的开展服务治理架构(三) 概述 非SOA

发表于 2021-11-20

服务治理所治理的服务需要合理的部署与管理,本章我们讲一下SOA(面向服务架构),本人语言文笔不好,所以本章内容使用问答模式,参考了 [SOA面试题(www.jdon.com/soa/soa-int…)] 的面试题,通过对此站复杂的描述进行简单的讲解。

概述

SOA代表了面向服务架构,仅仅是一种概念,通过这种概念而演变出的各种各样的服务架构都可称为SOA架构,SOA核心的概念就是 “松耦合”。

非SOA的架构

多语言开发

image.png

同语言开发

image.png

服务与服务之间可能会是不同的开发语言或相同语言开发,他们的调用方式依旧只可以通过http去获取,或者比较流行的Restful Api的形式,无论是在性能与开发的过程中都是很笨的办法。

什么是SOA的服务

image.png

在现实世界中,服务是一种我们花费购买到的一种预期的服务。

1、(来自真实世界):你去餐馆订餐,您的订单首先进入到柜台,然后在厨房进行食物准备,最后服务员提供的食物。因此,为了实现一个餐厅订购服务,您需要三个逻辑部门/服务协同工作(计帐,厨房和服务员)。在软件世界同样的方法称为业务服务。

2、(软件世界):你去亚马逊订购了一本书,有不同的服务,如支付网关,库存系统,货运系统等共同完成一本书的订购。

image.png

  所有的服务是自包含的,合乎逻辑。他们就像黑盒子。总之,我们并不需要了解业务服务的内部工作细节。对于外部世界,它只是一个能够使用消息交互的黑盒子。例如在“支付网关”业务服务获得消息“检查信贷”后会给出输出:这个客户的信贷有或没有。对于“订单系统”,“支付网关”的服务是一个黑盒子。

服务的主要特点是什么

A) SOA组件是松耦合的。当我们说松耦合,这意味着每一个服务是自包含单独存在的逻辑。举例来说,我们采取了“支付网关”的服务,并将它附加到不同的系统。

B) SOA服务是黑匣子。在SOA中,服务隐藏有内在的复杂性。他们只使用交互消息,服务接受和发送消息。通过虚拟化一个服务为黑盒子,服务变得更松散的耦合。

C) SOA服务应该是自定义: SOA服务应该能够自己定义。

D) SOA服务维持在一个列表中: SOA服务保持在一个中央存储库。应用程序可以在中央存储库中搜索服务,并调用相应服务。

E) SOA服务可以编排和链接实现一个特定功能: SOA服务可以使用了即插即用的方式。例如,“业务流程”中有两个服务“安全服务”和“订单处理服务” 。从它的业务流程可以实现两种类型:一,您可以先检查用户,然后处理订单,或反之亦然。是的,你猜对了,使用SOA可以松散耦合的方式管理服务之间的工作流。

什么是SOA

SOA代表了面向服务的架构。 SOA是一种使用松耦合的黑盒子服务构建业务应用的体系架构,这些服务可以通过编排连接在一起以实现特定的功能。

什么是合同,地址和绑定?

这是三个SOA的标准术语。每个服务对外开放地址,在服务开发中进行合同约定,客户端绑定服务进行开发调用。

  • 合同是两方或多方之间的协议。它定义了一种客户端如何与服务通信的协议。从技术上讲,它有描述参数和返回值的方法。
  • 地址表明在哪儿能找到这种服务。地址是一个URL,它指向服务的位置。
  • 绑定是决定这个端点如何可以访问。它决定了如何完成通信。例如,你暴露你的服务,可以使用SOAP over HTTP或通过TCP的BINARY进行访问。因此,对于这些通信介质将被创建两个绑定。
    n

什么是可重用的服务?

服务是一个自主的,可重复使用的,可发现的,无状态的,有一定粒度的功能,并且是一个复合应用程序或一个组合服务的一部分。

可重复使用的服务通过业务活动标识,这个业务活动是使用服务规范(设计时合同)描述的。

一个服务约束是,包括安全性,QoS,SLA,使用策略,可以由多个运行时的合同 多个接口(WSDL中的Web服务)以及多个实现(代码)定义的。

可重复使用的服务应在被管制在其从设计到运行整个企业级生命周期。其重用应通过规范流程来推动,重用应该是可测量的。

在一个SOA中如何实现松耦合?

实现松耦合一种策略是使用服务接口(WSDL中为SOAP Web服务)来限制服务之间的依赖性,对消费者隐藏服务实现。松耦合可以通过实施服务的功能封装以及限制服务接口的实现变化影响来解决。然而,在某些时候,你需要改变接口,也不会影响服务的消费者,除了管理多个安全约束,多种传输,以及其他方面的考虑。

SOA的服务无状态或有状态?

服务应该是无状态的。它有一个无状态的执行上下文,但它不会有中间状态来等待一个事件或一个回调。状态有关的数据的保留一定不能超出的服务的请求/响应。这是因为状态管理消耗了大量的资源,这可能会影响服务的可重用 可伸缩性和可用性。

在RPC服务启动后,服务一直保持沉睡状态,只有在有请求时才会唤醒,你可以称他为无状态或有状态

在SOA中我们是否需要从头开始构建系统?

否。如果您需要集成现有系统为业务服务,你只需要创建松耦合的包装,包装您的现有系统,并以一种通用的方式暴露功能给外部世界。

其实并不需要重新构建,只需要将每个服务继续分解,分类出对外与对内。

image.png

什么是服务和组件之间的区别?

服务组件来实现业务功能的逻辑组件分组。组件是为实现服务这个目标的途径。组件可以使用Java,C#,C,但服务将以通用格式如像Web服务方式被暴露。

说的就是我们通过RPC调用其他服务 (thrift)

预告

看到这里大概了解了SOA,这个时候我透露一个秘密,SOA其实还有另外一个名字叫“服务治理”,是的,就是我们一直在讲的服务治理。下一章回到正题,讲一下服务治理实现

PHP程序员如何简单的开展服务治理架构(一)
juejin.cn/post/703258…

PHP程序员如何简单的开展服务治理架构(二)
juejin.cn/post/703258…

本猿人写了一个服务治理的框架
github.com/CrazyCodes/…

交流

生命不息,编码不止。

微信搜索 【一文秒懂】 传播技术正能量,持续学习新知识。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【Sentinel系列】Sentinel DashBoard

发表于 2021-11-20

这是我参与11月更文挑战的第20天,活动详情查看:2021最后一次更文挑战

使用Sentinel Dashboard 实现流控配置

在Sentinel Dashboard 配置流控规则,可实现流量控制的动态配置,步骤如下:

  1. 启动Sentinel Dashboard服务端
  2. 在application.yml中添加配置如下:
1
2
3
4
5
6
7
yml复制代码spring:
application:
name: spring-cloud-sentinel
cloud:
sentinel:
transport:
dashboard: 127.0.0.1:8080

spring.cloud.sentinel.transport.dashboard配置为Sentinel Dashboard服务端地址,用于实现流量控制监控和流量控制规则的分发。

定义一个需要限流REST接口,代码如下:

1
2
3
4
5
6
7
8
9
java复制代码@RestController
@RequestMapping("/sentinel")
public class SentinelController {

@GetMapping("limit")
public String limit(){
return "Sentinel DashBoard Flow Limit";
}
}

不需要添加任何资源埋点,在默认情况下Sentinel 会对所有请求进行限流。

访问对应接口,因为没有配置流控规则,所以没被Sentinel限流

进入Sentinel Dashboard实现流控规则配置,步骤如下:

  1. 启动Sentinel-Dashboard-1.3.0.jar
  2. 访问http://127.0.0.1:8888,进入Sentinel Dashboard配置流控规则
  3. 进入spring.application.name对应的菜单,访问【簇点链路,在这列表中可看到/limit接口的资源名称。
  4. 对于/limit资源名称,可点击【流控】按钮设置对应流控规则

image-20211120180008870.png

image-20211120180041355.png

新增规则中所有配置信息,其实就是FlowRole中对应的属性设置,测试效果将单机阈值设置为1

新增完成后,再次访问http://127.0.0.1:8080/sentinel/limit接口,当QPS大于1时,可查看到对应的限流效果

1
scss复制代码Blocked By Sentinel(flow limiting)

自定义URL限流异常

在默认情况下,URL触发限流会直接返回

1
scss复制代码Blocked By Sentinel(flow limiting)

在实际情况下,大多数采用JSON格式的数据,所以在触发限流返回json格式数据,可以通过自定义限流异常来修改,实现UrlBlockHandler并且重写blocked方法。

1
2
3
4
5
6
7
8
9
java复制代码@Service
public class CustomUrlBlockHandler implements UrlBlockHandler{
@Override
public void blocked(HttpServletRequest httpServletRequest,HttpServletResponse httpServletResponse,BlockException e) throws IOException{
httpServletResponse.setHeader("Content-Type","application/json;charset=UTF-8");
String msg="请求数超过最大数,请稍后再试!";
httpServletResponse.getWirter().write(msg);
}
}

还有一种情况是,当触发限流后,直接跳转到对应降级页面,可以通过添加如下配置实现

1
2
3
4
5
yml复制代码spring:
cloud:
sentinel:
serlvet:
block-page: 降级页面url地址

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

PHP程序员如何简单的开展服务治理架构(二) 名词解释 基本

发表于 2021-11-20

服务治理 治理的绝笔是服务,在一家公司有玩各种语言的程序员,如何去统一管理他们开发的服务,这是一个问题。

上一章主要讲了下服务治理需要什么,如何实现,这章我们详细的“肢解”一下服务治理的一个非常重要的组员 Thrift

上一章说明他的时候是这样写的

暂时大可理解为可以通过它去调用其他开发语言的方法

本猿人已经写好的服务治理 github.com/CrazyCodes/…

名词解释

thrift其实是一个软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Go,Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 这些编程语言间无缝结合的、高效的服务。

这个时候你就疑惑了?,如何跨语言调用

基本概念

如何调用这就需要讲一下我们强大的通信协议了。

http (tcp)

超文本传输协议,正常访问浏览器啥看新闻、购物的时候必定使用,需要客户端和服务端握手?成功才可以正常显示,这中间握手的流出很复杂,执行各种各样的解码编码(为了方便理解,暂时这么想吧)

rpc

远程过程调用协议,RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供者就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

rpc的优势很多,现在你大可理解为rpc通信要比http通信快很多就是了。

这个时候facebook和apache就厉害了,它们基于rpc通信协议开发出了一套thrift

实现方法

  • 上面假设你都没看懂,这里我们实战下。
  • 首先rpc我们通过使用swoole来实现,其他的手码。
  • 分为客户端和服务端做下演示

客户端

首先我们new一个client类,去调用服务端的UserSerivce这个类,并且调用UserService类中的getUserInfo方法。

1
2
3
4
ini复制代码$client = new Client('UserSerivce');
$userInfo = $client->getUserInfo(1);

var_dump($userInfo);

Client中我们只需要干这样的一件事,使用php魔术方法__call去调用一个不存在的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
php复制代码class Client{
protected $serviceName;

public function __construct($serviceName){
$this->serviceName = $serviceName;
}
public function __call($name, $arguments){
$rpcClient = new \swoole_client(SWOOLE_SOCK_TCP);
$rpcClient->connect('127.0.0.1',9503,0.5);
// 我们将要发送的数据是事先约定好的,跟写对外开放的Api一样
$rpcClient->send(json_encode([
'service'=>$this->serviceName,
'action'=>$name,
'params'=>$arguments[0]
]));
$rpcClient->close();
}
}

这个时候数据就通过rpc协议以json格式发送到了服务端

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bash复制代码$server = new swoole_server("127.0.0.1", 9503);
$server->on('connect', function ($server, $fd){
echo "connection open: {$fd}\n";
});
$server->on('receive', function ($server, $fd, $reactor_id, $data) {
// $data 则就是客户端发送过来的数据,我们可以这样做来做到去调用类,当然你必须遵守PSR-4 Autoloader
$request = json_decode ($data, true);
$className = $request['service'];
$app = new $className;
$response = $app->{$request['action']}($request['params']);

$server->send($fd, "Swoole: {$data}");
$server->close($fd);
});
$server->on('close', function ($server, $fd) {
echo "connection close: {$fd}\n";
});
$server->start();

往期文章

  • PHP程序员如何简单的开展服务治理架构(一)

鸣谢

周梦康 mengkang.net/

交流

生命不息,编码不止。

微信搜索 【一文秒懂】 传播技术正能量,持续学习新知识。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 网络IO Java 网络IO API 操作系统网络

发表于 2021-11-20

这是我参与11月更文挑战的第18天,活动详情查看:2021最后一次更文挑战

Java 网络IO API

Java 网络IO相关的API有三类:NIO、BIO、AIO、IO Multiplexing。
那么对应的IO如何使用到呢?

BIO

阻塞IO,在调用read和write时均会阻塞。read通常比较容易理解,当socket接受到数据read就会返回。那么write呢?write会阻塞我一开始也是迷迷糊糊,知道后来学到了 socket buffer这个数据结构才略懂一点,write API会将传入的byte 写到socket文件对应socket buffer中,这个数据结构类似一个队列或者链表,并且是有限的,当socket buffer容量不足时write API就会阻塞,知道有空闲空间。

在Java中,可以使用JDK提供的API使用BIO。下面是一个bio便携的client、server交互的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
java复制代码public class Server {
public static void main(String[] args) {
// 服务端占用端口
int port = 8000;
// 创建 serversocker
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
} catch (IOException e) {
e.printStackTrace();
}
if (serverSocket != null) {
while (true) {
InputStream is = null;
OutputStream os = null;
Socket client = null;
try {
// accept()方法会阻塞,直到有client连接后才会执行后面的代码
client = serverSocket.accept();
is = client.getInputStream();
os = client.getOutputStream();

// 3.server收到消息在控制台的打印,并回复"Hi client,I am Server."
byte[] buffer = new byte[5];
int len = 0;
// 使用ByteArrayOutputStream,避免缓冲区过小导致中文乱码
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while ((len = is.read(buffer)) != -1) {
baos.write(buffer,0,len);
}
System.out.println(baos.toString());
// 服务端回复客户端消息
os.write("Hi client,I am Server.".getBytes());
os.flush(); // 刷新缓存,避免消息没有发送出去
client.shutdownOutput();
} catch (IOException e) {
e.printStackTrace();
} finally {
// 程序异常或者执行完成,关闭流,防止占用资源
if (client != null) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (is != null) {
try {
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (os != null) {
try {
os.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

}
}


}
}


public class Client {
public static void main(String[] args) {
int port = 8000;
Socket client = null;
InputStream is = null;
OutputStream os = null;
try {
// 1.client连接server
client = new Socket("localhost", port);
is = client.getInputStream();
os = client.getOutputStream();

// 2.client发送"Hi Server,I am client."
os.write("Hi Server,I am client.".getBytes());
os.flush();
// 调用shutdownOutput()方法表示客户端传输完了数据,否则服务端的
// read()方法会一直阻塞
// (你可能会问我这不是写了 read()!=-1, -1表示的文本文件的结尾字符串,而对于字节流数据,
// 是没有 -1 标识的,这就会使服务端无法判断客户端是否发送完成,导致read()方法一直阻塞)
client.shutdownOutput();

// 4.client收到消息在控制台打印。
int len = 0;
byte[] buffer = new byte[5];
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while ((len = is.read(buffer)) != -1) {
baos.write(buffer,0,len);
}
System.out.println(baos.toString());
} catch (IOException e) {
e.printStackTrace();
} finally {
// 程序异常或者执行完成,关闭流,防止占用资源
try {
if (is != null) {
is.close();
}
} catch (IOException e) {
e.printStackTrace();
}
try {
if (os != null) {
os.close();
}
} catch (IOException e) {
e.printStackTrace();
}
try {
if (client != null) {
client.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}


}
}

NIO

BIO满足了应用层进行通信的基本需求,但是如果一直在accept、read、write这种地方干等着,CPU一直不被使用,岂不是很浪费。所以NIO出现了,NIO称作NO Block IO,其特点在于提供了非阻塞的accept,read,write。相当于Java中Lock 的tryLoack。当tryAccept返回失败,则表示没有socket建立连接,此时可以去干点别的。

但其实这个效果非常的鸡肋,假若你去干其它事情的时候,有client建立连接,你就无法及时响应,java中通过NIO java.nio.channels.spi.AbstractSelectableChannel#configureBlocking可以设置非阻塞模式,但是这个API只支持NIO的socket设置。

IO Multiplexing

BIO满足了应用层传输数据的基本需求,但在实际使用中,client一定是多个,并且还可能同时与一个server进行数据传输,所以在server端需要使用多线程的方式来接收客户端的请求。所以请求的并发数量会与机器的线程数量成正比,对外服务的性能也会受到操作系统分配给进程的最大线程数的影响。因为每个线程都需要阻塞到read、write操作,完全不能干点别的了。

操作系统的网络协议栈的开发者也觉得这个操作非常拉胯,如同LOL王者辅助带一个青铜ADC,任由你机器配置在牛,性能还是不行,熟话说好马配好鞍,经过操作系统开发者的反复努力,开发出了一个API,我们大众程序员调用这个API可以直接获取到哪些socket可以read,哪些socket可以write,哪些socket需要建立连接,这个直接获取的机制,称之为IO多路复用,其最大的意义在于单线程可以处理多个socket,IO多路复用的具体实现依赖于底层的操作系统,不同实现有:

  • select
  • poll
  • epoll

java中可以如下使用select实现的IO多路复用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
java复制代码public class NIOClient {

public static void main(String[] args) {
SocketAddress socketAddress = new InetSocketAddress(8000);
SocketChannel socketChannel = null;
try {
socketChannel = SocketChannel.open(socketAddress);
socketChannel.configureBlocking(false);
if (socketChannel.finishConnect()) {

ByteBuffer buffer = ByteBuffer.allocate(1024);
// 客户端发送数据 "Hi Server,I am client."
buffer.clear();
buffer.put("Hi Server,I am client.".getBytes());
buffer.flip();
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
// 客户端接收服务端数据打印在控制台

buffer.clear();
int len = socketChannel.read(buffer);
while (len > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());
}
System.out.println();
buffer.clear();
len = socketChannel.read(buffer);
}
if (len == -1) {
socketChannel.close();
}
}


} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (socketChannel != null) {
socketChannel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}


public class NIOServer {


public static void main(String[] args) {
ServerSocketChannel serverSocketChannel = null;
Selector selector = null;
try {
// 初始化一个 serverSocketChannel
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8000));

// 设置serverSocketChannel为非阻塞模式
// 即 select()会立即得到返回
serverSocketChannel.configureBlocking(false);

// 初始化一个 selector
selector = Selector.open();

// 将 serverSocketChannel 与 selector绑定
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
// 通过操作系统监听变化的socket个数
// 在windows平台通过selector监听(轮询所有的socket进行判断,效率低)
// 在Linux2.6之后通过epool监听(事件驱动方式,效率高)
int count = selector.select(3000);
if (count > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();

if (key.isAcceptable()) {
handleAccept(key);
}
if (key.isReadable()) {
handleRead(key);
}
if (key.isWritable() && key.isValid()) {
handleWrite(key);
}
if (key.isConnectable()) {
System.out.println("isConnectable = true");
}
iterator.remove();
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (serverSocketChannel != null) {
serverSocketChannel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
try {
if (selector != null) {
selector.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

private static void handleWrite(SelectionKey key) {
// 获取 client 的 socket
SocketChannel clientChannel = (SocketChannel) key.channel();
// 获取缓冲区
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.clear();
buffer.put("Hi client,I am Server.".getBytes());
buffer.flip();
try {
while (buffer.hasRemaining()) {
clientChannel.write(buffer);
}
buffer.compact();
} catch (IOException e) {
e.printStackTrace();
}
}

private static void handleRead(SelectionKey key) {
// 获取 readable 的客户端 socketChannel
SocketChannel clientChannel = (SocketChannel) key.channel();
// 读取客户端发送的消息信息,我们已经在 acceptable 中设置了缓冲区
// 所以直接冲缓冲区读取信息
ByteBuffer buffer = (ByteBuffer) key.attachment();

// 获取 client 发送的消息
try {
int len = clientChannel.read(buffer);
while (len > 0) {
// 设置 limit 位置
buffer.flip();
// 开始读取数据
while (buffer.hasRemaining()) {
byte b = buffer.get();
System.out.print((char) b);
}
System.out.println();
// 清除 position 位置
buffer.clear();
// 从新读取 len
len = clientChannel.read(buffer);
}
if (len == -1) {
clientChannel.close();
}
} catch (IOException e) {
e.printStackTrace();
}



}

private static void handleAccept(SelectionKey key) {
// 获得 serverSocketChannel
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
try {
// 获得 socketChannel,就是client的socket
SocketChannel clientChannel = serverSocketChannel.accept();
if (clientChannel == null) return;
// 设置 socketChannel 为无阻塞模式
clientChannel.configureBlocking(false);
// 将其注册到 selector 中,设置监听其是否可读,并分配缓冲区
clientChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocateDirect(512));
} catch (IOException e) {
e.printStackTrace();
}
}
}

JAVA NIO API相关资料:tutorials.jenkov.com/java-nio/in…

AIO

IO Multiplexing 以及非常牛皮了,但在实际使用中,感觉还是不够方便也不够快捷。因为作为开发者的我还需要主动查询哪些socket可以read、可以write、可以accept。

能不能更方便点,我事先给你个回调函数,操作系统你接收到cliet的数据久直接调用我这个函数,这样不是更方便,我什么也不用等,岂不是非常舒服呀呀呀呀!

贴心的操作系统开发者看到这个需要给你竖起了大拇指,不愧是996的程序员真有想法,给你扣波“666”。

以下为在Java中使用AIO,可以看到使用起来十分复杂,并且性能与IO Multiplesing差不并不是特别大,所以通常开发很少用AIO。

server.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
java复制代码package com.github.jiangxch.jdk.demo.netio.aio;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;

public class AioServer {

public AsynchronousServerSocketChannel serverSocketChannel;

public void listen() throws Exception {
//打开一个服务端通道
serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9988));//监听9988端口
//监听
serverSocketChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel, AioServer>() {
@Override
public void completed(AsynchronousSocketChannel client, AioServer attachment) {
try {
if (client.isOpen()) {
System.out.println("接收到新的客户端的连接,地址:" + client.getRemoteAddress());
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//读取客户端发送的数据
client.read(byteBuffer, client, new CompletionHandler<Integer, AsynchronousSocketChannel>() {
@Override
public void completed(Integer result, AsynchronousSocketChannel attachment) {
try {
//读取请求,处理客户端发送的数据
byteBuffer.flip();
String content = Charset.defaultCharset().newDecoder().decode(byteBuffer).toString();
System.out.println("服务端接受到客户端发来的数据:" + content);
//向客户端发送数据
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put("Server send".getBytes());
writeBuffer.flip();
attachment.write(writeBuffer).get();
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
try {
exc.printStackTrace();
attachment.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//当有新的客户端接入的时候,直接调用accept的方法,递归执行下去,这样可以保证多个客户端都可以阻塞
attachment.serverSocketChannel.accept(attachment, this);
}
}

@Override
public void failed(Throwable exc, AioServer attachment) {
exc.printStackTrace();
}
});
}


public static void main(String[] args) throws Exception {
new AioServer().listen();
Thread.sleep(Integer.MAX_VALUE);
}

}

client.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码package com.github.jiangxch.jdk.demo.netio.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;


public class AioClient {
public static void main(String[] args) throws IOException, InterruptedException {
//打开一个客户端通道
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
//与服务端建立连接
channel.connect(new InetSocketAddress("127.0.0.1", 9988));
//睡眠一秒,等待与服务端的连接
Thread.sleep(1000);

try {
//向服务端发送数据
channel.write(ByteBuffer.wrap("Hello,我是客户端".getBytes())).get();
} catch (ExecutionException e) {
e.printStackTrace();
}

try {
//从服务端读取返回的数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
channel.read(byteBuffer).get();//将通道中的数据写入缓冲Buffer
byteBuffer.flip();
String result = Charset.defaultCharset().newDecoder().decode(byteBuffer).toString();
System.out.println("客户端收到服务端返回的内容:" + result);//服务端返回的数据
} catch (ExecutionException e) {
e.printStackTrace();
}
}

}

AIO与NIO相比,除了上面使用不同之外,异步回调的方式也不会因为系统调用而产生更多的上下文切换和数据拷贝。

操作系统网络 IO

如果仅仅只是开发功能,上面的APi基本足够了。但是对于面试、优化来说,还需要更深入一层理解操作系统如何实现IO多路复用。Linux中提供到了以下API接口。

select

Linux中select函数定义:

1
2
c复制代码#include <sys/select.h>
int select(int maxfdpl,fd_set *readset,fd_set *writeset,fd_set *excepset,const struct timeval *timeout);
  • timeout参数

timeout是一个timeval的结构体,其数据结构定义:

1
2
3
4
c复制代码struct {
long tv_sec;// 等待秒
long tv_usec; // microseconds
}

fd_set数据结构定义:

1
2
3
4
5
c复制代码typedef struct fd_set {

__int32_t fds_bits[__DARWIN_howmany(__DARWIN_FD_SETSIZE, __DARWIN_NFDBITS)];

} fd_set;

fdset是一个数组,类型是32位的int类型。一个int类型可以表示32个文件描述符。可以通过判断int对应的位是0?or1?来判断是否可读可写。

该参数表示等待多久如果还无事件就绪,则直接返回。
传null,表示无超时设置,会永远等待直到事件就绪。
传0表示不等待,会立即返回。

  • excepset、writeset、readset
    这三个参数代表要监听的条件,分别是异常事件就绪,写事件就绪,读事件就绪。
  • maxfdpl
    该参数表示最大文件描述符的数量。

那么seelct是怎么工作的呢?

比如我们编写了一个NIO通信的java进程,就会在操作系统的内存地址中创建一个Process的数据结构,该数据结构会存储进程的pid,以及进程打开的文件对应的文件描述符,进程运行后,当进程接收到连接建立会创建一个套接字,Linux中一切皆文件,套接字也会被存放到Process,当调用select函数对套接字进行监听时,内核就会遍历这些套接字文件描述符对应的文件,在使用文件相关的api判断socket对应的文件是否可读可写,如果对应事件就绪,就会修改传入的event参数的对应bit位最后返回就绪的套接字给调用方。

那么对于一个socket套接字来说,什么情况下称作可读,什么情况称为可写呢?我们知道对于一个套接字,有读写两个缓冲区。在select函数中,定义了读的低水位,当读缓冲区的数据大于低水位,才会出发套接字的读就绪事件。写就绪也相同,当写缓冲区空闲大小大于写缓冲区的低水位才会触发。

所以并不是网卡家接收1byte就触发,接收1byte就触发一次。

poll

poll的功能与select类似。他俩的区别在于select出现的早,文件描述的最大限制是在代码中定义了一个常量表示=1024,poll则没有这个限制,poll能使用的最大文件描述仅收操作系统的限制。

epoll

epoll是对select的一个优化,我们来看下seelct的操作步骤:

1
2
3
4
5
6
7
8
9
10
11
12
c复制代码while(1) {
select(fdlist);// 监听文件描述符
for(fd : fdlist) {
if(fd.can_read()) {

}
if(fd.can_write()) {

}
}

}

根据以上为代码,我们捋一下整个操作的性能开销:

  1. select是系统调用,系统调用一定伴随着上下文切换和内存拷贝,频繁select会使得CPU过高。
  2. select函数是通过遍历传入的所有描述符对应的文件来判断是否有事件就绪,因此如果传入的文件描述符过多,会使得扫描耗时更久。
  3. select调用时参数和返回值混在了一起,需要我们调用方再次遍历进行判断。

epoll主要是对上面三个情况进行了优化。epoll的API定义如下:

1
c复制代码int epoll_create(int size);
1
csharp复制代码int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
1
arduino复制代码int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

epoll使用分为了三个函数,epoll_create只需调用一次,用于创建epoll的文件描述符,size参数只需大于0即可,最开始size表示初始化文件描述符的数量,若不够就会进行扩容。e poll_ctl函数则是向内核中的epoll实例添加、修改、删除对某个文件描述符上的事件监听。

op表示对于的操作,可以传入EPOLL_CTM_ADD,EPOLL_CTL_DEL,EPOLL_CTL_MOD。fd参数表示要监听的文件描述符。event参数结构体定义:

1
2
3
4
5
c复制代码struct epoll_event {
__unit32_t events;// 表示监听什么操作,读?写?
epoll_data_t data;// 用户数据变量

}

结构体中 events可以是以下几个宏的集合:

  • EPOLLIN: 监听读
  • EPOLLOUT:监听写
  • EPOLLERR: 监听操作
  • EPOLLPRI:监听外来数据
  • EPOLLHUP:描述符被挂断
  • EPOLLET:设置触发方法,边缘触发,水平触发
  • EPOLLONESHOT:仅监听一次,设置该参数如果需要继续监听需要重新加入到epoll的监听事件队列中

epoll_wait方法的events参数用于接收返回值,会返回就绪的事件。

那么epoll相对selec,它快在哪里呢?

仅返回就绪的文件描述符

select在返回返回值时,会将所有描述符(不论就绪还是未就绪的)进行返回,而epoll只会返回就绪的文件描述符,以及拷贝就绪socket的到用户内存。

更高效的查找就绪事件的方式

select需要遍历传入的所有的文件描述符,找到描述符在操作系统中对应的文件,逐个判断文件是否满足设置的条件,进而触发对应的事件。所以select会随着文件描述符数量的增长性能降低,而epoll则不会。

操作系统会注册一个中断函数,当网卡接收到数据会出发这个中断函数,如果接收到的数据是需要监听的文件描述符,就会直接将该描述符放入就绪列表。当调用epoll_wait时,直接返回该列表。

除此之外,epoll还有比较高级特性就是工作方式。

  • LT:水平触发,当如果设置事件为水平触发,则调用epoll_wait时会判断事件是否是水平触发,如果是水平触发并且该事件未经过用户程序处理,则会重新放入就绪列表中。
  • ET:边缘触发,当epoll_wait返回后,必须立即处理就绪事件。

那epoll为什么要设计LT与ET呢?为了面试,此处说一下自己的理解吧:

最开始之初,epoll只有LT的触发模式,水平触发对应到代码中,如果你监听某个socket读数据,当读队列一直有数据时都会触发事件,这种方式会导致一直有事件触发,从而到导致频繁的系统调用。而如果设置了ET,只有当读缓冲区接收到数据时才会触发(从空-》有数据),会降低系统调用。

参考文章

cloud.tencent.com/developer/a…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Elasticsearch不完全入门指北(五):聚合搜索

发表于 2021-11-20

「这是我参与11月更文挑战的第2天,活动详情查看:2021最后一次更文挑战」

Elasticsearch 除了最基本的搜索,数据分析功能也是十分强大,今天我们就来认识一下 ES 的聚合搜索功能,也可以叫分组搜索,类似于SQL语句中的 group by。

1. 什么是聚合搜索

聚合可以将你的数据进行汇总,让其称为各种维度的指标,统计数据。聚合可以很好地回答你下面的几个问题:

  • 我的网站的平均加载时间是多少?
  • 根据交易量,谁是我最有价值的客户?
  • 什么会被视为网络上的大文件?
  • 每个产品类别中有多少产品?

Elasticsearch 将聚合分为三类:

  • 从字段值计算指标(例如总和或平均值)的指标聚合。
  • 基于字段值、范围或其他标准将文档分组到桶中的桶聚合,也称为桶。
  • 从其他聚合而不是文档或字段获取输入的管道聚合。

下面我们就看一下如何来实现这样聚合。

2. 基本DSL语法

我们首先往 book 索引中插入几条数据,然后就可以来看一下具体效果。

image-20211120154123628.png

2.1 指标聚合

比如我们想要得到索引里价格的平均值,那么就可以如下这么写:

1
2
3
4
5
6
7
8
9
10
11
bash复制代码GET /book/_search
​
{
  "aggs": {
      "avg_bucket": {
          "avg": {
              "field": "price"
          }
      }
  }
}

其中 aggs 是 aggregrations 的缩写,代表是聚合搜索的意思,跟我们之前使用的搜索使用的 query 代表的是常规搜索一样。然后 avg_bucket 是我们自己定义的聚合搜索的名称,其下一层的属性 avg 表示的是要进行的计算,这里的关键字只有 avg 、sum、max、min 几种运算,下一层的 field 属性就是要计算的字段,然后我们就可以得到所有图书的平均价格,返回如下结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
json复制代码{
   "took": 62,
   "timed_out": false,
   "_shards": {
       "total": 3,
       "successful": 3,
       "skipped": 0,
       "failed": 0
  },
   "hits": {
       ...
  },
   "aggregations": {
       "avg_bucket": {
           "value": 65.64846185537485
      }
  }
}

其中 aggregations 就是我们计算得到的结果了,还有如果我们只想返回计算结果,不想返回搜索得到的 hits 值怎么办呢?那么就需要将搜索的 size 设置为0:

1
2
3
4
5
6
7
8
9
10
11
bash复制代码GET /book/_search
{
  "size": 0,
  "aggs": {
      "avg_bucket": {
          "avg": {
              "field": "price"
          }
      }
  }
}

就可以得到下面的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
json复制代码{
   "took": 43,
   "timed_out": false,
   "_shards": {
       "total": 3,
       "successful": 3,
       "skipped": 0,
       "failed": 0
  },
   "hits": {
       "total": {
           "value": 13,
           "relation": "eq"
      },
       "max_score": null,
       "hits": []
  },
   "aggregations": {
       "avg_bucket": {
           "value": 65.64846185537485
      }
  }
}

2.2 桶聚合

下面呢,我们可以搞一个比较复杂的桶聚合,比如说,我们要看每个图书类目下,价格最贵的一本书是哪本。那么基本逻辑就是,先要按照图书类目进行分组,然后每个类目分组中要有一个价格 top1 的图书。这种就需要使用桶聚合进行搜索了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
json复制代码GET /book/_search
{
  "size": 0,
  "aggs": {
      "group": {
          "terms": {
              "field": "categoryId"
          },
          "aggs": {
              "price_top_1": {
                  "top_hits": {
                      "size": 1,
                      "_source": {
                          "includes": [ // 注:为了节省返回数据,这里我们定义只返回这些字段
                              "id",
                              "bookName",
                              "categoryId",
                              "categoryName",
                              "price"
                          ]
                      },
                      "sort": [
                          {
                              "price": {
                                  "order": "desc"
                              }
                          }
                      ]
                  }
              }
          }
      }
  }
}

我们一层一层地看,最外层的 aggs 即为最初的聚合搜索条件,里面的 terms 表示我们需要根据某个字段进行分组,这里设置分组字段为类目ID categoryId ;下面还有一个 aggs 这个聚合搜索为要查询类目下面的内容,属于最外层的 sub_aggregration ,然后我们并不是取的类目下的所有数据,而是只取一条,所以这里是用的 top_hits 的方法,price_top_1 为我们自定义的聚合搜索名称,top_hits 为聚合搜索的关键字,表示这里取部分数据;然后 top_hits 中的属性,就是和我们正常搜索的属性差不多,定义 size: 1 ,按照价格 price 倒序排列,表明我们按照价格倒序只取第一条。那么最后得到结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
ruby复制代码{
   "took":23,
   "timed_out":false,
   "_shards":{
       "total":3,
       "successful":3,
       "skipped":0,
       "failed":0
  },
   "hits":{
       "total":{
           "value":13,   // 文档总数
           "relation":"eq"
      },
       "max_score":null,
       "hits":[]
  },
   "aggregations":{
       "group":{
           "buckets":[ // 桶数组,下面即为有多少分组
              {
                   "key":1,  // 这里就是分组的值,即为 categoryId
                   "doc_count":6, // 这是表示这里命中了6个文档
                   "price_top_1":{ // 自定义的top1聚合结果
                       "hits":{
                           "total":{
                               "value":6, // 文档数
                               "relation":"eq"
                          },
                           "hits":[ // 这里就是拿到的 类目1 下的价格最高的图书
                              {
                                   "_index":"book",
                                   "_type":"_doc",
                                   "_id":"3",
                                   "_score":null,
                                   "_source":{
                                       "price":88,
                                       "id":3,
                                       "bookName":"Java编程思想",
                                       "categoryName":"教科书",
                                       "categoryId":1
                                  }
                              }
                          ]
                      }
                  }
              },
              {
                   "key":3,
                   "doc_count":4,
                   "price_top_1":{
                       "hits":{
                           "total":{
                               "value":4,
                               "relation":"eq"
                          },
                           "max_score":null,
                           "hits":[
                              {
                                   "_index":"book",
                                   "_type":"_doc",
                                   "_id":"12",
                                   "_source":{
                                       "price":99.68,
                                       "id":12,
                                       "bookName":"周恩来传",
                                       "categoryName":"人物传记",
                                       "categoryId":3
                                  }
                              }
                          ]
                      }
                  }
              },
              {
                   "key":2,
                   "doc_count":3,
                   "price_top_1":{
                       "hits":{
                           "total":{
                               "value":3,
                               "relation":"eq"
                          },
                           "max_score":null,
                           "hits":[
                              {
                                   "_index":"book",
                                   "_type":"_doc",
                                   "_id":"8",
                                   "_source":{
                                       "price":40,
                                       "id":8,
                                       "bookName":"神雕侠侣",
                                       "categoryName":"小说",
                                       "categoryId":2
                                  }
                              }
                          ]
                      }
                  }
              }
          ]
      }
  }
}

最后我们得到了分组结果,以及想要的数据。

注:想必聪明的你已经从搜索得到的分组结果里看出来了,分组的排序并不是按照类目ID进行排序的。这里分组的默认排序条件有两个:一个是 doc_count ,即分组中的文档数量,默认是降序排列,文档数越多越靠前;另一个是 _key,即分组的key值,默认是生序排序,基本就是数字排序或者字符排序了。那么该怎么定义聚合搜索排序呢?这个大家可以自行探索一下。

3. Java代码

那么问题来了,我们怎么通过 Java 代码来实现 2.2 中的 DSL 语句呢?

套路还是固定的,首先需要构建一个搜索请求,然后组装搜索条件,只不过由原先的查询条件,变成了现在的聚合条件,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码    private SearchResponse searchGroupData() {
       // 构建搜索请求
       SearchRequest searchRequest = new SearchRequest(EsConstant.BOOK_INDEX_NAME);
​
       AggregationBuilder groupAggregation = AggregationBuilders
               // 这里是定义的最外层聚合的名字
              .terms("group")
               // 这是返回的分组的数量,默认是10
               // 大家可以根据自己的需要改返回的总数,最大不能超过设置的 max_result_window
              .size(10)
               // 要分组的字段
              .field("categoryId");
​
       SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
       // 不需要返回hits中的数据,设置 size 为 0
       searchSourceBuilder.size(0);
​
       // TOP hits 取图书的 top1
       // 定义返回的字段
       String[] includes = {"id", "bookName", "categoryId", "categoryName", "price"};
       String[] excludes = {};
       AggregationBuilder topHits = AggregationBuilders
               // 自定义的topHits名称
              .topHits("price_top_1")
               // 按照价格倒序
              .sort(SortBuilders.fieldSort("price").order(SortOrder.DESC))
              .fetchSource(includes, excludes)
               // 只取1条
              .size(1);
       // 将两个聚合结合起来,topHits 作为分组的子聚合
       groupAggregation.subAggregation(topHits);
​
       // 求分组之后的总数
       CardinalityAggregationBuilder totalAggregation = AggregationBuilders.cardinality("total")
              .field("categoryId");
       // 把聚合放在搜索构造器里
       searchSourceBuilder.aggregation(groupAggregation);
       searchSourceBuilder.aggregation(totalAggregation);
​
       searchRequest.source(searchSourceBuilder);
       try {
           return client.search(searchRequest, COMMON_OPTIONS);
      } catch (IOException e) {
           log.error("Failed to aggregations search data", e);
      }
       return null;
  }

上面就是我们的查询代码,然后就算得到了结果,怎么解析得到我们想要的结果呢?我简单写了一个示例,大家可以自行参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
ini复制代码    @Override
   public List<CategoryGroup> categoryGroup() {
       SearchResponse groupSearchResponse = searchGroupData();
       if (Objects.isNull(groupSearchResponse)) {
           return Collections.emptyList();
      }
       Aggregations aggregations = groupSearchResponse.getAggregations();
       List<CategoryGroup> groupList = new ArrayList<>();
       if (Objects.nonNull(aggregations)) {
           // 获取最外层的分组
           // 这里我们分组的条件是categoryId,ES类型为long,所以需要使用 ParsedLongTerms 来接收结果
           ParsedLongTerms group = aggregations.get("group");
           // 获取总数的 aggregation
           ParsedCardinality totalCount = aggregations.get("total");
           // 这里的总数我们暂时不返回,只是打印
           log.info("得到分组的总数:{}", totalCount);
           // 遍历得到的桶
           for (Terms.Bucket bucket : group.getBuckets()) {
               // 获取每个桶中的top1数据
               ParsedTopHits topHits = bucket.getAggregations().get("price_top_1");
               SearchHits groupHits = topHits.getHits();
               SearchHit[] hits = groupHits.getHits();
               List<Book> books = new ArrayList<>();
               Arrays.stream(hits).forEach(hit -> {
                   Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                   Book book = BeanUtil.mapToBean(sourceAsMap, Book.class, false, new CopyOptions());
                   if (Objects.nonNull(book.getId())) {
                       books.add(book);
                  }
              });
               CategoryGroup categoryGroup = new CategoryGroup();
               // 键值即为类目ID
               categoryGroup.setCategoryId(Integer.valueOf(bucket.getKeyAsString()));
               categoryGroup.setBooks(books);
               groupList.add(categoryGroup);
          }
      }
​
       return groupList;
  }

总结

本文我们简单展示了一下聚合搜索可以做的事情,但是不只是这些,还有一些未展示的内容,需要大家自己去尝试,比如聚合后结果如何进行分页,如何统计分组总数,排序是怎么自定义的等等。如果你遇到了问题,或者有了自己的方法,我们也可以一起交流一下,共同探讨。

链接

  • Elasticsearch Guide Aggregations
  • 代码地址:github.com/lq920320/es…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…264265266…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%