开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Java多线程之join方法

发表于 2020-09-21

个人技术博客(IBLi)
CSDN Github
掘金

参考资料

1、Java多线程中join方法的理解

2、Thread.join的作用和原理

3、Thread.join的作用和原理

join方法

1
2
3
4
5
csharp复制代码join重载方法

1 join()
2 join(long millis) //参数为毫秒
3 join(long millis,int nanoseconds) //第一参数为毫秒,第二个参数为纳秒

功能演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public class JoinDemo implements Runnable{
public void run() {
System.err.println("join thread demo ");
}

public static void main(String[] args) throws Exception {
System.err.println("main thread start... ");
Runnable r = new JoinDemo();
Thread t = new Thread(r);
t.setName("ibli joinTest ...");
t.start();
// t.join();
System.err.println("main thread end... ");
}
}

以上将t.join();注释掉,执行的一种可能结果如下:

1
2
3
4
5
6
7
8
arduino复制代码main thread start... 
main thread end...
join thread demo

还有可能是这种结果:
main thread start...
join thread demo
main thread end...

但是把注释去掉,结果如下:

1
2
3
arduino复制代码main thread start... 
join thread demo
main thread end...

这是一个非常简单的demo,效果是显而易见的。当main线程去调用t.join()是,会将自己当前线程阻塞,等到t线程执行完成到达完结状态,main线程才可以继续执行。

我们看一下join()设置超时时间的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public class JoinDemo implements Runnable{
public void run() {
System.err.println("join thread demo ");
try {
// 线程睡眠4s
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<String> strings = null;
System.err.println(strings.get(0));
}

public static void main(String[] args) throws Exception {
System.err.println("main thread start... ");
Runnable r = new JoinDemo();
Thread t = new Thread(r);
t.setName("ibli joinTest ...");
t.start();
// 但是主线程join的超时时间是1s
t.join(1000);
System.err.println("main thread end... ");
}
}

执行效果:

1
2
3
4
5
6
arduino复制代码main thread start... 
join thread demo
main thread end...
Exception in thread "ibli joinTest ..." java.lang.NullPointerException
at com.ibli.threadTest.api.JoinDemo.run(JoinDemo.java:14)
at java.lang.Thread.run(Thread.java:748)

上面的执行结果可以看到,子线程设置了4s的超时时间,但是主线程在1秒超时后,并没有等待子线程执行完毕,就被唤醒执行后续操作了;这样的预期是否符合你的预期呢?

下面我们按照join的源码去分析吧!

join方法原理

下面是join的原理图

join()源码

首先会调用join(0)方法,其实是join的重载方法;

1
2
3
csharp复制代码public final void join() throws InterruptedException {
join(0);
}

下面是join的核心实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
arduino复制代码public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;

// 首先校验参数是否合法
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

// 如果join方法没有参数,则相当于直接调用wait方法
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}

下面是isAlive方法的源码

1
java复制代码public final native boolean isAlive();

这是一个本地方法,作用是判断当前的线程是否处于活动状态。什么是活动状态呢?活动状态就是线程已经启动且尚未终止。线程处于正在运行或准备开始运行的状态,就认为线程是“存活”的。

  • 这里有一个点要注意,join为什么阻塞的是主线程,而不是子线程呢?
  • 不理解的原因是阻塞主线程的方法是放在previousThread这个实例作用,让大家误以为应该阻塞previousThread线程。实际上主线程会持有previousThread这个对象的锁,然后调用wait方法去阻塞,而这个方法的调用者是在主线程中的。所以造成主线程阻塞。
  • 其实join()方法的核心在于wait(),在主线程中调用t.join()相当于在main方法中添加 new JoinDemo().wait();是一样的效果;在这里只不过是wait方法写在了子线程的方法中。
  • 再次重申一遍,join方法的作用是在主线程阻塞,等在子线程执行完之后,由子线程唤醒主线程,再继续执行主线程调用t.join()方法之后的逻辑。

那么主线程是在什么情况下知道要继续执行呢?就是上面说的,主线程其实是由join的子线程在执行完成之后调用的notifyAll()方法,来唤醒等待的线程。怎么证明呢?

其实大家可以去翻看JVM的源码实现,Thread.cpp文件中,有一段代码:

1
2
3
4
5
6
arduino复制代码void JavaThread::exit(bool destroy_vm, ExitType exit_type) {
// Notify waiters on thread object. This has to be done after exit() is called
// on the thread (if the thread is the last thread in a daemon ThreadGroup the
// group should have the destroyed bit set before waiters are notified).
ensure_join(this);
}

其中调用ensure_join方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
scss复制代码static void ensure_join(JavaThread* thread) {
// We do not need to grap the Threads_lock, since we are operating on ourself.
Handle threadObj(thread, thread->threadObj());
assert(threadObj.not_null(), "java thread object must exist");
ObjectLocker lock(threadObj, thread);
// Ignore pending exception (ThreadDeath), since we are exiting anyway
thread->clear_pending_exception();
// Thread is exiting. So set thread_status field in java.lang.Thread class to TERMINATED.
java_lang_Thread::set_thread_status(threadObj(), java_lang_Thread::TERMINATED);
// Clear the native thread instance - this makes isAlive return false and allows the join()
// to complete once we've done the notify_all below
//这里是清除native线程,这个操作会导致isAlive()方法返回false
java_lang_Thread::set_thread(threadObj(), NULL);
// 在这里唤醒等待的线程
lock.notify_all(thread);
// Ignore pending exception (ThreadDeath), since we are exiting anyway
thread->clear_pending_exception();
}

在JVM的代码中,线程执行结束的最终调用了lock.notify_all(thread)方法来唤醒所有处于等到的线程

使用场景

  • 比如我们使用Callable执行异步任务,需要在主线程处理任务的返回值时,可以调用join方法;
  • 还有一些场景希望线程之间顺序执行的;

join()方法与sleep()的比较

我们先说一下sleep方法:

  • 让当前线程休眠指定时间。
  • 休眠时间的准确性依赖于系统时钟和CPU调度机制。
  • 不释放已获取的锁资源,如果sleep方法在同步上下文中调用,那么其他线程是无法进- 入到当前同步块或者同步方法中的。
  • 可通过调用interrupt()方法来唤醒休眠线程。
  • sleep是静态方法,可以在任何地方调用

相比与sleep方法
sleep是静态方法,而且sleep的线程不是放锁资源,而join方法是对象方法,并且在等待的过程中会释放掉对象锁;

关于join方法会释放对象锁,那到底是释放的那个对象的锁呢,可以参照 关于join() 是否会释放锁的一些思考

一些关于join的面试题

一个Thread.join()面试题的思考

山脚太拥挤 我们更高处见。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

进阶全栈之路之 nest 篇(一) Nest + TypeS

发表于 2020-09-21

Nest + TypeScript + TypeOrm + JWT

序: 个人觉得 JavaScript 最大优势是灵活,最大的缺点也是灵活。开发速度快,但是调试和维护花费的时间会比强类型语言花的时间多很多,运行时报错,是我觉得它作为后端语言很大的一个问题,开发时跨文件调用 IDE 的函数以及变量提示,以及类型的限定也是我觉得JS的一些开发问题。这些问题在Typescript得到了很好的解决,加上面向对象的东西能在TS上实现,其实基础的东西在node上都能做了。

由于公司目前的技术栈是js, 后端在node.js 中用的比较多的服务端开发框架是,egg、nest、 koa、express等。

在之前的项目中,公司是采用的是egg,也研究了一些上ts的方式。但是由于项目之前存在比较多的问题,准备重构之前的代码。对,我就是在坚定的推动TS的那个人。

egg 对ts的支持不是很好,对于TS的支持,阿里在egg的基础上有 midway,个人写了下demo感觉不是很那啥,可能还在开发中吧,喜欢的朋友可以支持下哦。所以我放弃了原先的egg。

在node 中选择TS的框架,选择了Nest.js,下面列举nest我认为比较好一点。

Nest的优势:
  • Nest 类似于java中的 Spring Boot ,吸取了很多优秀的思想和想法,有想学习spring boot的前端同学,可以从这个搞起。对于这种后端过来的全栈比较容易就能上手。
  • egg star(目前为止) : 15.7K,而 nest 有28.1k
  • egg 有的, nest 基本上都有。
  • Nest 面对切面,对面对对象和面向切面支持的非常好。
  • 依赖注入容器(midway也是这种形式)
Nest的劣势:
  • 国内用的人不多,但是我发现国内也有很多人在搞。

好了废话,不多说,上教学地址:github.com/liangwei010…

生命周期

QQ图片20200624183631.png

  1. 当客户端一个Http请求到来时,首先过的中间件。
  2. 再是过的守卫(守卫只有通过和不通过)。
  3. 拦截器(这里我们可以看到,我们在执行函数前后都能做某些事情,统一的返回格式等等)。
  4. 管道,我们可以做参数校验和值的转换。
  5. 最后才会到Controller,然后就返回给客户端数据。

这里是我的项目的目录结构,大家也可以不按这个来。同层级的只列出部分,详细请看代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
scss复制代码project
├── src(所有的ts源码都在这里)
│ ├── common (通用的一个目录)
│ │ └── class(通用类的集合)
│ │ │ └── xxx.ts(这个看业务吧)
│ │ └── decorator(自定义装饰器集合)
│ │ │ └── pagination.ts(自定义分页装饰器)
│ │ └── enum(枚举型集合)
│ │ │ └── apiErrorCode.ts(api错误集合)
│ │ └── globalGuard(全局守卫)
│ │ │ └── apiErrorCode.ts(api错误集合)
│ │ └── httpHandle(Http的处理)
│ │ │ └── httpException.ts(http异常统一处理)
│ │ └── interceptor(拦截器处理)
│ │ │ └── httpException.ts(http异常统一处理)
│ │ └── interface(接口集合)
│ │ │ └── xxx.ts(通用的接口)
│ │ └── middleware(中间件)
│ │ │ └──logger.middleware.ts(日志中间件)
│ │ └── pipe(管道)
│ │ │ └──validationPipe.ts(管道验证全局设置)
│ │ └── pipe(管道)
│ │ │ └──validationPipe.ts(管道验证全局设置)
│ │ └── specialModules(特殊模块)
│ │ │ └── auth(认证模块模块)
│ │ │ └── database(数据库模块)
│ │ └── utils(工具目录层)
│ │ │ └── stringUtil.ts(字符串工具集合)
│ ├── config(配置文件集合)
│ │ └── dev(dev配置)
│ │ │ └── database(数据库配置)
│ │ │ └── development.ts(配置引入出)
│ │ └── prod(prod配置)
│ │ │ └── (同上)
│ │ └── staging(staging配置)
│ │ │ └── (同上)
│ │ └── unitTest(unitTest配置)
│ │ │ └── (同上)
│ ├── entity(数据库表集合)
│ │ └── user.entity.ts(用户表)
│ ├── modules(模块的集合)
│ │ └── user(用户模块)
│ │ │ └── user.controller.ts(controller)
│ │ │ └── user.module.ts(module声明)
│ │ │ └── user.service.ts(service)
│ │ │ └── user.service.spec.ts(service 测试)
│ │ │ └── userDto.ts(user Dto验证)
│ ├── app.module.ts
│ ├── main.ts(代码运行入口)
├── package.json
├── tsconfig.json
└── tslint.json

Controller 层

Controller 和常规的spring boot的 Controller 或者egg之类的是一样的。就是接收前端的请求层。**建议:**业务不要放在 Controller 层,可以放在service层。如果service文件过大,可以采用namespace的方式进行文件拆分。

1
2
3
4
5
6
7
8
9
10
11
12
13
less复制代码@Controller()   // 这里是说明这是一个Controller层
export class UserController {
// 这里是相当于new userService(),但是容器会帮助你处理一些依赖关系。这里是学习spring的思想
constructor(private readonly userService: UserService) {}

// 这里就说这是一个get请求,具体的这种看下文件就会了
// 在上面的声明周期里面
@Get()
getHello(@Body() createCatDto: CreateCatDto): string {
console.log(createCatDto)
return this.appService.getHello();
}
}

Service 层

Service 层我这边是做的是一些业务的处理层,所以Controller 层的默认的.spec.ts测试文件,我是删掉的,因为,我的单元测试是在xx.service.spec.ts 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ini复制代码@Injectable()
export class UserService {
// 这里是一个数据User表操作的Repository,通过注解的方式,由容器创建和销毁
constructor(@InjectRepository(User) private usersRepository: Repository<User>) {
}

/**
* 创建用户
*/
async createUser() {
const user = new User();
user.userSource = '123456';
user.paymentPassword = '123';
user.nickname = '梁二狗';
user.verifiedName = '梁二狗';
const res = await this.usersRepository.save(user);
return res;
}
}

Service 单元测试

  • 单元测试分两种,一种是连接数据库的测试,一种是mock数据,测试逻辑是否正确的测试。这里先展示mock的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
scss复制代码const user = {
"id": "2020-0620-1525-45106",
"createTime": "2020-06-20T07:25:45.116Z",
"updateTime": "2020-06-20T07:25:45.116Z",
"phone": "18770919134",
"locked": false,
"role": "300",
"nickname": "梁二狗",
"verifiedName": "梁二狗",
}
describe('user.service', () => {
let service: UserService;
let repo: Repository<User>;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
UserService,
{
provide: getRepositoryToken(User),
useValue: {
// 这里mock掉数据函数中涉及到的数据库的CURD
create: jest.fn().mockResolvedValue(user),
save: jest.fn().mockResolvedValue(user),
findOne: jest.fn().mockResolvedValue(user),
},
},
],
}).compile();
service = module.get<UserService>(UserService);
repo = module.get<Repository<User>>(getRepositoryToken(User));
});
// 测试逻辑的话,大概就是这个意思,
it('createUser', async () => {
const user = await service.createUser();
expect(user.phone).toEqual('18770919134');
});
}

这里有一个国外大佬写的测试,还蛮全的,有需要的可以看看:github.com/Zhao-Null/n…

DTO (数据库传输对象)

这个也不是java里面的独有的名词,DTO是数据库传输对象,所以,在我们前端传输数据过来的时候,我们需要校验和转换成数据库表对应的值,然后去save。
这里讲解下nest的DTO,在Controller处理前,我们需要校验参数是否正确,比如,我们需要某个参数,而前端没有传递,或者传递类型不对。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
typescript复制代码// 设置全局验证管道
@Injectable()
export class ValidationPipeConfig implements PipeTransform<any> {
async transform(value: any, { metatype }: ArgumentMetadata) {
if (!metatype || !this.toValidate(metatype)) {
return value;
}
const object = plainToClass(metatype, value);
const errors = await validate(object);
if (errors.length > 0) {
const errorMessageList = []
const errorsObj = errors[0].constraints
for (const key in errorsObj) {
if (errorsObj.hasOwnProperty(key)) {
errorMessageList.push(errorsObj[key])
}
}
throw new CustomException(errorMessageList, HttpStatus.BAD_REQUEST);
}
return value;
}

private toValidate(metatype: any): boolean {
const types = [String, Boolean, Number, Array, Object];
return !types.find((type) => metatype === type);
}
}

// 全局使用管道
app.useGlobalPipes(new ValidationPipeConfig());
1
2
3
4
5
6
7
8
9
10
11
less复制代码// 创建用户dto
export class CreateUserDto {

@IsNotEmpty({ message: 'account is null' })
@IsString({ message: 'account is to require' })
account: string;

@IsNotEmpty({ message: 'name is null' })
@IsString({ message: 'name is not null and is a string' })
name: string;
}
1
2
3
4
5
6
less复制代码// Controller 中  使用dto(当然要记得注册先,稍后讲解全局注册)
@Post('/dto')
async createTest(@Body() createUserDto: CreateUserDto) {
console.log(createUserDto)
return true;
}

例如 account字段 在前端传递的参数为空时,或者类型不对时,将会返回 [ “account is null”, “account is to require” ],这些个错误。这种防止到业务层做过多的判断,减少很多事情。当然,这里也是支持转化的,比如 字符串 “1” 转成数字 1,这种的,详情请看链接:docs.nestjs.com/pipes

全局超时时间

设置全局的超时时间,当请求超过某个设定时间时,将会返回超时。

1
2
3
arduino复制代码  //main.ts 
// 全局使用超时拦截
app.useGlobalInterceptors(new TimeoutInterceptor());
1
2
3
4
5
6
7
8
9
10
11
arduino复制代码/**
* 您想要处理路线请求的超时。当您的端点在一段时间后没有返回任何内容时,
* 您希望以错误响应终止。以下结构可实现此目的
* 10s 超时
*/
@Injectable()
export class TimeoutInterceptor implements NestInterceptor {
public intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
return next.handle().pipe(timeout(10000));
}
}

全局成功返回格式

统一返回的格式,方便统一处理数据和错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
kotlin复制代码import { Injectable, NestInterceptor, CallHandler, ExecutionContext } from '@nestjs/common';
import { map, switchMap } from 'rxjs/operators';
import { Observable } from 'rxjs';

interface Response<T> {
data: T;
}

/**
* 封装正确的返回格式
* {
* data,
* code: 200,
* message: 'success'
* }
*/
@Injectable()
export class TransformInterceptor<T> implements NestInterceptor<T, Response<T>> {
intercept(context: ExecutionContext, next: CallHandler<T>): Observable<Response<T>> {
return next.handle().pipe(
map(data => {
return {
data,
code: 200,
message: 'success',
};
}),
);
}
}

全局成功异常的格式

这里分自定义异常和其它异常,自定义将会返回自定义异常的状态码和系统。而其它异常将会返回异常和,系统返回的错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
typescript复制代码import { ArgumentsHost, Catch, ExceptionFilter, HttpException, HttpStatus } from '@nestjs/common';
import { CustomException } from './customException';

@Catch(HttpException)
export class HttpExceptionFilter implements ExceptionFilter {
catch(exception: any, host: ArgumentsHost) {
const ctx = host.switchToHttp();
const response = ctx.getResponse();
const request = ctx.getRequest();
let errorResponse: any;
const date = new Date().toLocaleDateString() + ' ' + new Date().toLocaleTimeString();

if (exception instanceof CustomException) {
// 自定义异常
errorResponse = {
code: exception.getErrorCode(), // 错误code
errorMessage: exception.getErrorMessage(),
message: 'error',
url: request.originalUrl, // 错误的url地址
date: date,
};
} else {
// 非自定义异常
errorResponse = {
code: exception.getStatus(), // 错误code
errorMessage: exception.message,
url: request.originalUrl, // 错误的url地址
date: date,
};
}
const status =
exception instanceof HttpException
? exception.getStatus()
: HttpStatus.INTERNAL_SERVER_ERROR;
// 设置返回的状态码、请求头、发送错误信息
response.status(status);
response.header('Content-Type', 'application/json; charset=utf-8');
response.send(errorResponse);
}
}

JWT的封装

官网的jwt的例子,在每个函数如果需要接口校验都需要加 @UseGuards(AuthGuard()) 相关的注解,但是大部分接口都是需要接口验证的。所以这里我选择了自己封装一个。

这里我有写2种方式,如果有适合自己的,请选择。

  • 方式1:自己封装一个注解。
    这里是我们重写的本地校验类的名称,继承于AuthGuard
1
2
3
4
5
6
scala复制代码///auth.local.guard.ts
import { Injectable } from '@nestjs/common';
import { AuthGuard } from '@nestjs/passport';
// 自定义校验
@Injectable()
export class LocalAuthGuard extends AuthGuard('local') { }

这里是我们的JWT校验类的名称,继承于AuthGuard

1
2
3
4
5
scala复制代码///jwt.auth.guard.ts
import { Injectable } from '@nestjs/common';
import { AuthGuard } from '@nestjs/passport';
@Injectable()
export class JwtAuthGuard extends AuthGuard('jwt') { }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scala复制代码/// jwt.strategy.ts
@Injectable()
export class JwtStrategy extends PassportStrategy(Strategy) {
constructor() {
super({
jwtFromRequest: ExtractJwt.fromAuthHeaderAsBearerToken(),
ignoreExpiration: false,
secretOrKey: jwtConstants.secret,
});
}

async validate(payload: any) {
return { userId: payload.account, password: payload.password };
}
}

这里抛出了一个自定义异常,在上面有写的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
typescript复制代码/// local.strategy.ts
import { Injectable, UnauthorizedException } from '@nestjs/common';
import { PassportStrategy } from '@nestjs/passport';
import { Strategy } from 'passport-local';
import { AuthService } from '../auth.service';
import { CustomException } from '../../../httpHandle/customException';
import { ApiError } from '../../../enum/apiErrorCode';

/**
* 本地 验证
*/
@Injectable()
export class LocalStrategy extends PassportStrategy(Strategy) {

/**
* 这里的构造函数向父类传递了授权时必要的参数,在实例化时,父类会得知授权时,客户端的请求必须使用 Authorization 作为请求头,
* 而这个请求头的内容前缀也必须为 Bearer,在解码授权令牌时,使用秘钥 secretOrKey: 'secretKey' 来将授权令牌解码为创建令牌时的 payload。
*/
constructor(private readonly authService: AuthService) {
super({
usernameField: 'account',
passwordField: 'password'
});
}

/**
* validate 方法实现了父类的抽象方法,在解密授权令牌成功后,即本次请求的授权令牌是没有过期的,
* 此时会将解密后的 payload 作为参数传递给 validate 方法,这个方法需要做具体的授权逻辑,比如这里我使用了通过用户名查找用户是否存在。
* 当用户不存在时,说明令牌有误,可能是被伪造了,此时需抛出 UnauthorizedException 未授权异常。
* 当用户存在时,会将 user 对象添加到 req 中,在之后的 req 对象中,可以使用 req.user 获取当前登录用户。
*/
async validate(account: string, password: string): Promise<any> {
let user = await this.authService.validateUserAccount(account);
if (!user) {
throw new CustomException(
ApiError.USER_IS_NOT_EXIST,
ApiError.USER_IS_NOT_EXIST_CODE,
);
}

user = await this.authService.validateUserAccountAndPasswd(account, password);
if (!user) {
throw new CustomException(
ApiError.USER_PASSWD_IS_ERROR,
ApiError.USER_PASSWD_IS_ERROR_CODE,
);
}
return user;
}
}

全局守卫,这里的核心就是,当我们去执行时,看有没有 no-auth 的注解,有的话,就直接跳过,不走默认的jwt和自定义(登录)校验。当然,我们也是在这里写相关的白名单哦。先看注解吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
typescript复制代码import { Injectable, CanActivate, ExecutionContext } from '@nestjs/common';
import { Observable } from 'rxjs';
import { Reflector } from '@nestjs/core';
import { IAuthGuard } from '@nestjs/passport';
import { JwtAuthGuard } from '../specialModules/auth/guards/jwt.auth.guard';
import { LocalAuthGuard } from '../specialModules/auth/guards/auth.local.guard';

@Injectable()
export class GlobalAuthGuard implements CanActivate {
constructor(private readonly reflector: Reflector) { }
canActivate(context: ExecutionContext): boolean | Promise<boolean> | Observable<boolean> {

// 获取登录的注解
const loginAuth = this.reflector.get<boolean>('login-auth', context.getHandler());

// 在这里取metadata中的no-auth,得到的会是一个bool
const noAuth = this.reflector.get<boolean>('no-auth', context.getHandler());
if (noAuth) {
return true;
}

const guard = GlobalAuthGuard.getAuthGuard(loginAuth);
// 执行所选策略Guard的canActivate方法
return guard.canActivate(context);
}

// 根据NoAuth的t/f选择合适的策略Guard
private static getAuthGuard(loginAuth: boolean): IAuthGuard {
if (loginAuth) {
return new LocalAuthGuard();
} else {
return new JwtAuthGuard();
}
}
}

有 @NoAuth()的将不在进行任何校验,其他接口默认走JwtAuthGuard和 LocalAuthGuard校验

1
2
3
4
5
arduino复制代码// 自定义装饰器
/**
* 登录认证
*/
export const LoginAuth = () => SetMetadata('login-auth', true);
1
2
3
4
5
6
7
less复制代码/// user.controller.ts
@Get()
@NoAuth()
@ApiOperation({ description: '获取用户列表' })
async userList(@Paginations() paginationDto: IPagination) {
return await this.userService.getUserList(paginationDto);
}
  • 方式2:就是在配置里头添加一个白名单列表,然后在守卫处判断。这个代码就不写了吧,不复杂的,随便搞搞就有了。

到这里基本的resetful接口和业务逻辑就能跑起来了,下节课讲解队列,graphql,等相关业务开发经常用到的东西,下次再见。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

面试官问我什么JMM 思维导图 面试官:讲讲什么是JMM 面

发表于 2020-09-20

思维导图

文章已收录Github精选,欢迎Star:github.com/yehongzhi/l…

面试官:讲讲什么是JMM

你要是整这个我可就不困了。

JMM就是Java内存模型(java memory model)。因为在不同的硬件生产商和不同的操作系统下,内存的访问有一定的差异,所以会造成相同的代码运行在不同的系统上会出现各种问题。所以java内存模型(JMM)屏蔽掉各种硬件和操作系统的内存访问差异,以实现让java程序在各种平台下都能达到一致的并发效果。

Java内存模型规定所有的变量都存储在主内存中,包括实例变量,静态变量,但是不包括局部变量和方法参数。每个线程都有自己的工作内存,线程的工作内存保存了该线程用到的变量和主内存的副本拷贝,线程对变量的操作都在工作内存中进行。线程不能直接读写主内存中的变量。

不同的线程之间也无法访问对方工作内存中的变量。线程之间变量值的传递均需要通过主内存来完成。

如果听起来抽象的话,我可以画张图给你看看,会直观一点:

每个线程的工作内存都是独立的,线程操作数据只能在工作内存中进行,然后刷回到主存。这是 Java 内存模型定义的线程基本工作方式。

温馨提醒一下,这里有些人会把Java内存模型误解为Java内存结构,然后答到堆,栈,GC垃圾回收,最后和面试官想问的问题相差甚远。实际上一般问到Java内存模型都是想问多线程,Java并发相关的问题。

面试官:那JMM定义了什么

这个简单,整个Java内存模型实际上是围绕着三个特征建立起来的。分别是:原子性,可见性,有序性。这三个特征可谓是整个Java并发的基础。

原子性

原子性指的是一个操作是不可分割,不可中断的,一个线程在执行时不会被其他线程干扰。

面试官拿笔写了段代码,下面这几句代码能保证原子性吗?

1
2
3
4
java复制代码int i = 2;
int j = i;
i++;
i = i + 1;

第一句是基本类型赋值操作,必定是原子性操作。

第二句先读取i的值,再赋值到j,两步操作,不能保证原子性。

第三和第四句其实是等效的,先读取i的值,再+1,最后赋值到i,三步操作了,不能保证原子性。

JMM只能保证基本的原子性,如果要保证一个代码块的原子性,提供了monitorenter 和 moniterexit 两个字节码指令,也就是 synchronized 关键字。因此在 synchronized 块之间的操作都是原子性的。

可见性

可见性指当一个线程修改共享变量的值,其他线程能够立即知道被修改了。Java是利用volatile关键字来提供可见性的。 当变量被volatile修饰时,这个变量被修改后会立刻刷新到主内存,当其它线程需要读取该变量时,会去主内存中读取新值。而普通变量则不能保证这一点。

除了volatile关键字之外,final和synchronized也能实现可见性。

synchronized的原理是,在执行完,进入unlock之前,必须将共享变量同步到主内存中。

final修饰的字段,一旦初始化完成,如果没有对象逸出(指对象为初始化完成就可以被别的线程使用),那么对于其他线程都是可见的。

有序性

在Java中,可以使用synchronized或者volatile保证多线程之间操作的有序性。实现原理有些区别:

volatile关键字是使用内存屏障达到禁止指令重排序,以保证有序性。

synchronized的原理是,一个线程lock之后,必须unlock后,其他线程才可以重新lock,使得被synchronized包住的代码块在多线程之间是串行执行的。

面试官:给我讲一下八种内存交互操作吧

好的,面试官,内存交互操作有8种,我画张图给你看吧:

  • lock(锁定),作用于主内存中的变量,把变量标识为线程独占的状态。
  • read(读取),作用于主内存的变量,把变量的值从主内存传输到线程的工作内存中,以便下一步的load操作使用。
  • load(加载),作用于工作内存的变量,把read操作主存的变量放入到工作内存的变量副本中。
  • use(使用),作用于工作内存的变量,把工作内存中的变量传输到执行引擎,每当虚拟机遇到一个需要使用到变量的值的字节码指令时将会执行这个操作。
  • assign(赋值),作用于工作内存的变量,它把一个从执行引擎中接受到的值赋值给工作内存的变量副本中,每当虚拟机遇到一个给变量赋值的字节码指令时将会执行这个操作。
  • store(存储),作用于工作内存的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用。
  • write(写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中。
  • unlock(解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。

我再补充一下JMM对8种内存交互操作制定的规则吧:

  • 不允许read、load、store、write操作之一单独出现,也就是read操作后必须load,store操作后必须write。
  • 不允许线程丢弃他最近的assign操作,即工作内存中的变量数据改变了之后,必须告知主存。
  • 不允许线程将没有assign的数据从工作内存同步到主内存。
  • 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是对变量实施use、store操作之前,必须经过load和assign操作。
  • 一个变量同一时间只能有一个线程对其进行lock操作。多次lock之后,必须执行相同次数unlock才可以解锁。
  • 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值。在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值。
  • 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量。
  • 一个线程对一个变量进行unlock操作之前,必须先把此变量同步回主内存。

面试官:讲一下volatile关键字吧

内心:这可以重头戏呀,可不能出岔子~

很多并发编程都使用了volatile关键字,主要的作用包括两点:

  1. 保证线程间变量的可见性。
  2. 禁止CPU进行指令重排序。

可见性

volatile修饰的变量,当一个线程改变了该变量的值,其他线程是立即可见的。普通变量则需要重新读取才能获得最新值。

volatile保证可见性的流程大概就是这个一个过程:

volatile一定能保证线程安全吗

先说结论吧,volatile不能一定能保证线程安全。

怎么证明呢,我们看下面一段代码的运行结果就知道了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码/**
* @author Ye Hongzhi 公众号:java技术爱好者
**/
public class VolatileTest extends Thread {

private static volatile int count = 0;

public static void main(String[] args) throws Exception {
Vector<Thread> threads = new Vector<>();
for (int i = 0; i < 100; i++) {
VolatileTest thread = new VolatileTest();
threads.add(thread);
thread.start();
}
//等待子线程全部完成
for (Thread thread : threads) {
thread.join();
}
//输出结果,正确结果应该是1000,实际却是984
System.out.println(count);//984
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
//休眠500毫秒
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
count++;
}
}
}

为什么volatile不能保证线程安全?

很简单呀,可见性不能保证操作的原子性,前面说过了count++不是原子性操作,会当做三步,先读取count的值,然后+1,最后赋值回去count变量。需要保证线程安全的话,需要使用synchronized关键字或者lock锁,给count++这段代码上锁:

1
2
3
java复制代码private static synchronized void add() {
count++;
}

禁止指令重排序

首先要讲一下as-if-serial语义,不管怎么重排序,(单线程)程序的执行结果不能被改变。

为了使指令更加符合CPU的执行特性,最大限度的发挥机器的性能,提高程序的执行效率,只要程序的最终结果与它顺序化情况的结果相等,那么指令的执行顺序可以与代码逻辑顺序不一致,这个过程就叫做指令的重排序。

重排序的种类分为三种,分别是:编译器重排序,指令级并行的重排序,内存系统重排序。整个过程如下所示:

指令重排序在单线程是没有问题的,不会影响执行结果,而且还提高了性能。但是在多线程的环境下就不能保证一定不会影响执行结果了。

所以在多线程环境下,就需要禁止指令重排序。

volatile关键字禁止指令重排序有两层意思:

  • 当程序执行到volatile变量的读操作或者写操作时,在其前面的操作的更改肯定全部已经进行,且结果已经对后面的操作可见,在其后面的操作肯定还没有进行。
  • 在进行指令优化时,不能将在对volatile变量访问的语句放在其后面执行,也不能把volatile变量后面的语句放到其前面执行。

下面举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码private static int a;//非volatile修饰变量
private static int b;//非volatile修饰变量
private static volatile int k;//volatile修饰变量

private void hello() {
a = 1; //语句1
b = 2; //语句2
k = 3; //语句3
a = 4; //语句4
b = 5; //语句5
//以下省略...
}

变量a,b是非volatile修饰的变量,k则使用volatile修饰。所以语句3不能放在语句1、2前,也不能放在语句4、5后。但是语句1、2的顺序是不能保证的,同理,语句4、5也不能保证顺序。

并且,执行到语句3的时候,语句1,2是肯定执行完毕的,而且语句1,2的执行结果对于语句3,4,5是可见的。

volatile禁止指令重排序的原理是什么

首先要讲一下内存屏障,内存屏障可以分为以下几类:

  • LoadLoad 屏障:对于这样的语句Load1,LoadLoad,Load2。在Load2及后续读取操作要读取的数据被访问前,保证Load1要读取的数据被读取完毕。
  • StoreStore屏障:对于这样的语句Store1, StoreStore, Store2,在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见。
  • LoadStore 屏障:对于这样的语句Load1, LoadStore,Store2,在Store2及后续写入操作被刷出前,保证Load1要读取的数据被读取完毕。
  • StoreLoad 屏障:对于这样的语句Store1, StoreLoad,Load2,在Load2及后续所有读取操作执行前,保证Store1的写入对所有处理器可见。

在每个volatile读操作后插入LoadLoad屏障,在读操作后插入LoadStore屏障。

在每个volatile写操作的前面插入一个StoreStore屏障,后面插入一个SotreLoad屏障。

大概的原理就是这样。

面试官:讲得还不错,基本上都讲到了,时间也不早了,今天的面试就到这吧,回去等通知吧~

总结

要学习并发编程,java内存模型是第一站了。原子性,有序性,可见性这三大特征几乎贯穿了并发编程,可谓是基础知识。对于后面要深入学习起到铺垫作用。

在这篇文章中,如果面试的话,重点是Java内存模型(JMM)的工作方式,三大特征,还有volatile关键字。为什么喜欢问volatile关键字呢,因为volatile关键字可以扯出很多东西,比如可见性,有序性,还有内存屏障等等。可以一针见血地看出面试者的技术水平,毕竟面试官也想高效地筛选出符合要求的人才嘛。

上面所有例子的代码都上传Github了:

github.com/yehongzhi/m…

觉得有用就点个赞吧,你的点赞是我创作的最大动力~

拒绝做一条咸鱼,我是一个努力让大家记住的程序员。我们下期再见!!!

能力有限,如果有什么错误或者不当之处,请大家批评指正,一起学习交流!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

熬了7天,总结出来了Java中动态规则的实现方式 背景 表达

发表于 2020-09-20

背景

业务系统在应用过程中,有时候要处理“经常变化”的部分,这部分需求可能是“业务规则”,也可能是“不同的数据处理逻辑”,这部分动态规则的问题,往往需要可配置,并对性能和实时性有一定要求。

Java不是解决动态层问题的理想语言,在实践中发现主要有以下几种方式可以实现:

  • 表达式语言(expression language)
  • 动态语言(dynamic/script language language),如Groovy
  • 规则引擎(rule engine)

表达式语言

Java Unified Expression Language,简称JUEL,是一种特殊用途的编程语言,主要在Java Web应用程序用于将表达式嵌入到web页面。Java规范制定者和Java Web领域技术专家小组制定了统一的表达式语言。JUEL最初包含在JSP 2.1规范JSR-245中,后来成为Java EE 7的一部分,改在JSR-341中定义。

主要的开源实现有:OGNL ,MVEL ,SpEL,JUEL,Java Expression Language (JEXL),JEval,Jakarta JXPath 等。

这里主要介绍在实践中使用较多的MVEL、OGNL和SpEL。

OGNL(Object Graph Navigation Library)

在Struts 2 的标签库中都是使用OGNL表达式访问ApplicationContext中的对象数据,简单示例:

1
2
3
4
5
6
7
8
9
10
11
ini复制代码Foo foo = new Foo();
foo.setName("test");
Map<String, Object> context = new HashMap<String, Object>();
context.put("foo",foo);
String expression = "foo.name == 'test'";
try {
Boolean result = (Boolean) Ognl.getValue(expression,context);
System.out.println(result);
} catch (OgnlException e) {
e.printStackTrace();
}

MVEL

MVEL最初作为Mike Brock创建的 Valhalla项目的表达式计算器(expression evaluator),相比最初的OGNL、JEXL和JUEL等项目,而它具有远超它们的性能、功能和易用性 - 特别是集成方面。它不会尝试另一种JVM语言,而是着重解决嵌入式脚本的问题。

MVEL主要使用在Drools,是Drools规则引擎不可分割的一部分。

MVEL语法较为丰富,不仅包含了基本的属性表达式,布尔表达式,变量复制和方法调用,还支持函数定义,详情参见MVEL Language Guide 。

MVEL在执行语言时主要有解释模式(Interpreted Mode)和编译模式(Compiled Mode )两种:

  • 解释模式(Interpreted Mode)是一个无状态的,动态解释执行,不需要负载表达式就可以执行相应的脚本。
  • 编译模式(Compiled Mode)需要在缓存中产生一个完全规范化表达式之后再执行。

//解释模式
Foo foo = new Foo();
foo.setName(“test”);
Map context = new HashMap();
String expression = “foo.name == ‘test’”;
VariableResolverFactory functionFactory = new MapVariableResolverFactory(context);
context.put(“foo”,foo);
Boolean result = (Boolean) MVEL.eval(expression,functionFactory);
System.out.println(result);

//编译模式
Foo foo = new Foo();foo.setName(“test”);
Map context = new HashMap();
String expression = “foo.name == ‘test’”;
VariableResolverFactory functionFactory = new MapVariableResolverFactory(context);context.put(“foo”,foo);
Serializable compileExpression = MVEL.compileExpression(expression);
Boolean result = (Boolean) MVEL.executeExpression(compileExpression, context, functionFactory);

SpEL

SpEl(Spring表达式语言)是一个支持查询和操作运行时对象导航图功能的强大的表达式语言。 它的语法类似于传统EL,但提供额外的功能,最出色的就是函数调用和简单字符串的模板函数。SpEL类似于Struts2x中使用的OGNL表达式语言,能在运行时构建复杂表达式、存取对象图属性、对象方法调用等等,并且能与Spring功能完美整合,如能用来配置Bean定义。

SpEL主要提供基本表达式、类相关表达式及集合相关表达式等,详细参见Spring 表达式语言 (SpEL) 。

类似与OGNL,SpEL具有expression(表达式),Parser(解析器),EvaluationContext(上下文)等基本概念;类似于MVEL,SpEl也提供了解释模式和编译模式两种运行模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ini复制代码//解释器模式
Foo foo = new Foo();
foo.setName("test");
// Turn on:
// - auto null reference initialization
// - auto collection growing
SpelParserConfiguration config = new SpelParserConfiguration(true,true);
ExpressionParser parser = new SpelExpressionParser(config);
String expressionStr = "#foo.name == 'test'";
StandardEvaluationContext context = new StandardEvaluationContext();
context.setVariable("foo",foo);
Expression expression = parser.parseExpression(expressionStr);
Boolean result = expression.getValue(context,Boolean.class);

//编译模式
config = new SpelParserConfiguration(SpelCompilerMode.IMMEDIATE, RunSpel.class.getClassLoader());
parser = new SpelExpressionParser(config);
context = new StandardEvaluationContext();
context.setVariable("foo",foo);
expression = parser.parseExpression(expressionStr);
result = expression.getValue(context,Boolean.class);

规则引擎

一些规则引擎(rule engine):aviator,easy-rules,drools,esper, siddhi

aviator

AviatorScript 是一门高性能、轻量级寄宿于 JVM 之上的脚本语言。

使用场景包括:

  1. 规则判断及规则引擎
  2. 公式计算
  3. 动态脚本控制
  4. 集合数据 ELT 等

public class Test {
public static void main(String[] args) {
String expression = “a+(b-c)>100”;
// 编译表达式
Expression compiledExp = AviatorEvaluator.compile(expression);

1
2
3
4
5
6
7
8
ini复制代码   Map<String, Object> env = new HashMap<>();
env.put("a", 100.3);
env.put("b", 45);
env.put("c", -199.100);

// 执行表达式
Boolean result = (Boolean) compiledExp.execute(env);
System.out.println(result);

}
}

easy-rules

Easy Rules is a Java rules engine。

使用POJO定义规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
typescript复制代码@Rule(name = "weather rule", description = "if it rains then take an umbrella")
public class WeatherRule {

@Condition
public boolean itRains(@Fact("rain") boolean rain) {
return rain;
}

@Action
public void takeAnUmbrella() {
System.out.println("It rains, take an umbrella!");
}
}

Rule weatherRule = new RuleBuilder()
.name("weather rule")
.description("if it rains then take an umbrella")
.when(facts -> facts.get("rain").equals(true))
.then(facts -> System.out.println("It rains, take an umbrella!"))
.build();

支持使用表达式语言(MVEL/SpEL)来定义规则:

weather-rule.yml example:

1
2
3
4
5
6
7
8
vbnet复制代码name: "weather rule"
description: "if it rains then take an umbrella"
condition: "rain == true"
actions:
- "System.out.println(\"It rains, take an umbrella!\");"

MVELRuleFactory ruleFactory = new MVELRuleFactory(new YamlRuleDefinitionReader());
Rule weatherRule = ruleFactory.createRule(new FileReader("weather-rule.yml"));

触发规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public class Test {
public static void main(String[] args) {
// define facts
Facts facts = new Facts();
facts.put("rain", true);

// define rules
Rule weatherRule = ...
Rules rules = new Rules();
rules.register(weatherRule);

// fire rules on known facts
RulesEngine rulesEngine = new DefaultRulesEngine();
rulesEngine.fire(rules, facts);
}
}

drools

An open source rule engine, DMN engine and complex event processing (CEP) engine for Java and the JVM Platform.

定义规则:

1
2
3
4
5
6
7
8
kotlin复制代码import com.lrq.wechatDemo.domain.User   // 导入类
dialect "mvel"
rule "age" // 规则名,唯一
when
$user : User(age<15 || age>60) //规则的条件部分
then
System.out.println("年龄不符合要求!");
end

参考例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ini复制代码public class TestUser {
private static KieContainer container = null;
private KieSession statefulKieSession = null;

@Test
public void test(){
KieServices kieServices = KieServices.Factory.get();
container = kieServices.getKieClasspathContainer();
statefulKieSession = container.newKieSession("myAgeSession");
User user = new User("duval yang",12);
statefulKieSession.insert(user);
statefulKieSession.fireAllRules();
statefulKieSession.dispose();
}
}

drools是比较重的规则引擎,有自己的状态存储,详见其官方文档。

esper

Esper is a component for complex event processing (CEP), streaming SQL and event series analysis, available for Java as Esper, and for .NET as NEsper.

一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
ini复制代码public class Test {
public static void main(String[] args) throws InterruptedException {
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();

EPAdministrator admin = epService.getEPAdministrator();

String product = Apple.class.getName();
String epl = "select avg(price) from " + product + ".win:length_batch(3)";

EPStatement state = admin.createEPL(epl);
state.addListener(new AppleListener());

EPRuntime runtime = epService.getEPRuntime();

Apple apple1 = new Apple();
apple1.setId(1);
apple1.setPrice(5);
runtime.sendEvent(apple1);

Apple apple2 = new Apple();
apple2.setId(2);
apple2.setPrice(2);
runtime.sendEvent(apple2);

Apple apple3 = new Apple();
apple3.setId(3);
apple3.setPrice(5);
runtime.sendEvent(apple3);
}
}

siddhi

Siddhi is a cloud native

Streaming

and

Complex Event Processing

engine that understands Streaming SQL queries in order to capture events from diverse data sources, process them, detect complex conditions, and publish output to various endpoints in real time.

For example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
typescript复制代码package io.siddhi.sample;

import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.event.Event;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.core.stream.output.StreamCallback;
import io.siddhi.core.util.EventPrinter;

/**
* The sample demonstrate how to use Siddhi within another Java program.
* This sample contains a simple filter query.
*/
public class SimpleFilterSample {

public static void main(String[] args) throws InterruptedException {

// Create Siddhi Manager
SiddhiManager siddhiManager = new SiddhiManager();

//Siddhi Application
String siddhiApp = "" +
"define stream StockStream (symbol string, price float, volume long); " +
"" +
"@info(name = 'query1') " +
"from StockStream[volume < 150] " +
"select symbol, price " +
"insert into OutputStream;";

//Generate runtime
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

//Adding callback to retrieve output events from stream
siddhiAppRuntime.addCallback("OutputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
//To convert and print event as a map
//EventPrinter.print(toMap(events));
}
});

//Get InputHandler to push events into Siddhi
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("StockStream");

//Start processing
siddhiAppRuntime.start();

//Sending events to Siddhi
inputHandler.send(new Object[]{"IBM", 700f, 100L});
inputHandler.send(new Object[]{"WSO2", 60.5f, 200L});
inputHandler.send(new Object[]{"GOOG", 50f, 30L});
inputHandler.send(new Object[]{"IBM", 76.6f, 400L});
inputHandler.send(new Object[]{"WSO2", 45.6f, 50L});
Thread.sleep(500);

//Shutdown runtime
siddhiAppRuntime.shutdown();

//Shutdown Siddhi Manager
siddhiManager.shutdown();

}
}

esper和siddhi都是streaming process,支持CEP和SQL,详见其官方文档。

动态JVM语言

Groovy

Groovy除了Gradle 上的广泛应用之外,另一个大范围的使用应该就是结合Java使用动态代码了。Groovy的语法与Java非常相似,以至于多数的Java代码也是正确的Groovy代码。Groovy代码动态的被编译器转换成Java字节码。由于其运行在JVM上的特性,Groovy可以使用其他Java语言编写的库。

Groovy可以看作给Java静态世界补充动态能力的语言,同时Groovy已经实现了java不具备的语言特性:

  • 函数字面值;
  • 对集合的一等支持;
  • 对正则表达式的一等支持;
  • 对xml的一等支持;

Groovy作为基于JVM的语言,与表达式语言存在语言级的不同,因此在语法上比表达还是语言更灵活。Java在调用Groovy时,都需要将Groovy代码编译成Class文件。

Groovy 可以采用GroovyClassLoader、GroovyShell、GroovyScriptEngine和JSR223 等方式与Java语言集成。

一个使用GroovyClassLoader动态对json对象进行filter的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码public class GroovyFilter implements Filter {
private static String template = "" +
"package com.alarm.eagle.filter;" +
"import com.fasterxml.jackson.databind.node.ObjectNode;" +
"def match(ObjectNode o){[exp]}";

private static String method = "match";

private String filterExp;

private transient GroovyObject filterObj;

public GroovyFilter(String filterExp) throws Exception {
ClassLoader parent = Thread.currentThread().getContextClassLoader();
GroovyClassLoader classLoader = new GroovyClassLoader(parent);
Class clazz = classLoader.parseClass(template.replace("[exp]", filterExp));
filterObj = (GroovyObject)clazz.newInstance();
}

public boolean filter(ObjectNode objectNode) {
return (boolean)filterObj.invokeMethod(method, objectNode);
}
}

Java每次调用Groovy代码都会将Groovy编译成Class文件,因此在调用过程中会出现JVM级别的问题。如使用GroovyShell的parse方法导致perm区爆满的问题,使用GroovyClassLoader加载机制导致频繁gc问题和CodeCache用满,导致JIT禁用问题等,相关问题可以参考Groovy与Java集成常见的坑 。

最后

感谢大家看到这里,文章有不足,欢迎大家指出;最后欢迎大家关注微信公众号【Java程序员聚集地】获取最新技术知识。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

面试被问到RocketMq,我懵了

发表于 2020-09-18

事情是这样的,只是刚好在简历写了项目用到了RocketMq,只是碰巧收到了一条Boss的信息,显示是阿里的招聘者,又刚好我把这份简历发了出去,刚好过了几天,一个电话打来了,上来的第一个问题就是你说下RocketMq的原理,我懵了,大脑一片空白……

事后我也反思了下,我确实用过RocketMq开发过生产者和消费者应用,RocketMq核心的东西也不多,理解记忆就行了,不信你往下看:

RocketMq是什么

一个纯Java、分布式队列模型的消息中间件,具有高可用、高可靠、高实时、低延迟的特点。(记住这句就行了)

RocketMq有什么功能

先答出消息中间件的通用功能

  • 1、业务解耦:这也是发布订阅的消息模型。生产者发送指令到MQ中,然后下游订阅这类指令的消费者会收到这个指令执行相应的逻辑,整个过程与具体业务无关,抽象成了一个发送指令,存储指令,消费指令的过程。
  • 2、前端削峰:前端发起的请求在短时间内太多后端无法处理,可以堆积在MQ中,后端按照一定的顺序处理,秒杀系统就是这么实现的。

再说说RocketMq的特点:

  • 3、亿级消息的堆积能力,单个队列中的百万级消息的累积容量。
  • 4、高可用性:Broker服务器支持多Master多Slave的同步双写以及Master多Slave的异步复制模式,其中同步双写可保证消息不丢失。
  • 5、高可靠性:生产者将消息发送到Broker端有三种方式,同步、异步和单向,其中同步和异步都可以保证消息成功的成功发送。Broker在对于消息刷盘有两种策略:同步刷盘和异步刷盘,其中同步刷盘可以保证消息成功的存储到磁盘中。消费者的消费模式也有集群消费和广播消费两种,默认集群消费,如果集群模式中消费者挂了,一个组里的其他消费者会接替其消费。综上所述,是高可靠的。
  • 6、支持分布式事务消息:这里是采用半消息确认和消息回查机制来保证分布式事务消息的,下面会详细描述。
  • 7、支持消息过滤:建议采用消费者业务端的tag过滤
  • 8、支持顺序消息:消息在Broker中是采用队列的FIFO模式存储的,也就是发送是顺序的,只要保证消费的顺序性即可。
  • 9、支持定时消息和延迟消息:Broker中由定时消息的机制,消息发送到Broker中,不会立即被Consumer消费,会等到一定的时间才被消费。延迟消息也是一样,延迟一定时间之后才会被Consumer消费。

能说出这么多已经不错了😄

RoctetMq的架构

回到最开始的问题,RocketMq的原理是什么,也就是怎么实现的,先看图

RocketMq一共有四个部分组成:NameServer,Broker,Producer生产者,Consumer消费者,每一部分都是集群部署的。

NameServer

NameServer是一个无状态的服务器,角色类似于Dubbo的Zookeeper,但比Zookeeper更轻量。

特点:

  • 每个NameServer结点之间是相互独立,彼此没有任何信息交互。
  • Nameserver被设计成几乎是无状态的,通过部署多个结点来标识自己是一个伪集群,Producer在发送消息前从NameServer中获取Topic的路由信息也就是发往哪个Broker,Consumer也会定时从NameServer获取topic的路由信息,Broker在启动时会向NameServer注册,并定时进行心跳连接,且定时同步维护的Topic到NameServer。

功能主要有两个:

  • 1、跟Broker结点保持长连接。
  • 2、维护Topic的路由信息。

Broker

消息存储和中转角色,负责存储和转发消息。

  • Broker内部维护着一个个Message Queue,用来存储消息的索引,真正存储消息的地方是CommitLog(日志文件)。
  • 单个Broker与所有的Nameserver保持着长连接和心跳,并会定时将Topic信息同步到NameServer,和NameServer的通信底层是通过Netty实现的。

Producer

消息生产者,业务端负责发送消息,由用户自行实现和分布式部署。

Producer的负载均衡

Producer的负载均衡是由MQFaultStratege.selectOneMessageQueue()来实现的。这个方法就是随机选择一个要发送消息的broker来达到负载均衡的效果,选择的标准:尽量不选刚刚选过的broker,尽量不选发送上条消息延迟过高或没有响应的broker,也就是找到一个可用的broker。(源码不贴了)

发送的三种策略

Producer发送消息有三种方式:同步、异步和单向

  • 同步:同步发送是指发送方发出数据后等待接收方发回响应后在发送下一个数据包。一般用于重要的消息通知,如重要的通知邮件或者营销短信等。
  • 异步:异步发送是指发送方发出数据后不等接收方发回响应就发出下一个数据包。一般用于可能链路耗时较长而对响应时间比较敏感的场景。如视频上传后通知启动转码服务。
  • 单向:单向发送是指只负责发送消息而不等待接收方发送响应且没有回调函数,适合那些耗时比较短且对可靠性要求不高的场景,例如日志收集。

Consumer

消息消费者,负责消费消息,由用户自行实现并进行集群部署。

推拉消费模式

  • PULL:拉取型消费者主动从broker中拉取消息消费,只要拉取到消息,就会启动消费过程,称为主动型消费。
  • PUSH:推送型消费者就是要注册消息的监听器,监听器是要用户自行实现的。当消息达到broker服务器后,会触发监听器拉取消息,然后启动消费过程。但是从实际上看还是从broker中拉取消息,称为被动消费型。

集群还是广播

看业务需求,默认是集群消费。

  • 集群消费:broker中的一条消息会发送给订阅这个topic的一个消费组里的唯一一个消费者进行消费。如果这个消费者挂掉了,组里的其他消费者会接替它进行消费。
  • 广播消费:broker中的一条消息会发送给订阅这个topic的一个消费组里的每一个消费者进行消费。

Consumer的负载均衡

  • Consumer的负载均衡是指将MessageQueue中的消息队列分配到消费者组里的具体消费者。
  • Consumer在启动的时候会实例化rebalanceImpl,这个类负责消费端的负载均衡。通过rebalanceImpl调用allocateMesasgeQueueStratage.allocate()完成负载均衡。
  • 每次有新的消费者加入到组中就会重新做一下分配。每10秒自动做一次负载均衡。

RocketMq消息模型(专业术语)

初学者可以了解下。

Message

就是要传输的消息,一个消息必须有一个主题,一条消息也可以有一个可选的Tag(标签)和额外的键值对,可以用来设置一个业务的key,便于开发中在broker服务端查找消息。

Topic

主题,是消息的第一级类型,每条消息都有一个主题,就像信件邮寄的地址一样。主题就是我们具体的业务,比如一个电商系统可以有订单消息,商品消息,采购消息,交易消息等。Topic和生产者和消费者的关系非常松散,生产者和Topic可以是1对多,多对1或者多对多,消费者也是这样。

Tag

标签,是消息的第二级类型,可以作为某一类业务下面的二级业务区分,它的主要用途是在消费端的消息过滤。比如采购消息分为采购创建消息,采购审核消息,采购推送消息,采购入库消息,采购作废消息等,这些消息是同一Topic和不同的Tag,当消费端只需要采购入库消息时就可以用Tag来实现过滤,不是采购入库消息的tag就不处理。

Group

组,可分为ProducerGroup生产者组合ConsumerGroup消费者组,一个组可以订阅多个Topic。一般来说,某一类相同业务的生产者和消费者放在一个组里。

Message Queue

消息队列,一个Topic可以划分成多个消息队列。Topic只是个逻辑上的概念,消息队列是消息的物理管理单位,当发送消息的时候,Broker会轮询包含该Topic的所有消息队列,然后将消息发出去。有了消息队列,可以使得消息的存储可以分布式集群化,具有了水平的扩展能力。

offset

是指消息队列中的offset,可以认为就是下标,消息队列可看做数组。offset是java long型,64位,理论上100年不会溢出,所以可以认为消息队列是一个长度无限的数据结构。

核心问题

顺序消息

  • 如何保证顺序消息?

顺序由producer发送到broker的消息队列是满足FIFO的,所以发送是顺序的,单个queue里的消息是顺序的。多个Queue同时消费是无法绝对保证消息的有序性的。所以,同一个topic,同一个queue,发消息的时候一个线程发送消息,消费的时候一个线程去消费一个queue里的消息。

  • 追问:怎么保证消息发到同一个queue里?

RocketMQ给我们提供了MessageQueueSelector接口,可以重写里面的接口,实现自己的算法,比如判断i%2==0,那就发送消息到queue1否则发送到queue2。

消息过滤

  • 如何实现消息过滤?

有两种方案,一种是在broker端按照Consumer的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到Consumer端,缺点是加重了Broker的负担,实现起来相对复杂。另一种是在Consumer端过滤,比如按照消息设置的tag去重,这样的好处是实现起来简单,缺点是有大量无用的消息到达了Consumer端只能丢弃不处理。

消息去重

  • 如果由于网络等原因,多条重复消息投递到了Consumer端,你怎么进行消息去重?

这个得先说下消息的幂等性原则:就是用户对于同一种操作发起的多次请求的结果是一样的,不会因为操作了多次就产生不一样的结果。只要保持幂等性,不管来多少条消息,最后处理结果都一样,需要Consumer端自行实现。

去重的方案:因为每个消息都有一个MessageId, 保证每个消息都有一个唯一键,可以是数据库的主键或者唯一约束,也可以是Redis缓存中的键,当消费一条消息前,先检查数据库或缓存中是否存在这个唯一键,如果存在就不再处理这条消息,如果消费成功,要保证这个唯一键插入到去重表中。

分布式事务消息

  • 你知道半消息吗?RocketMQ是怎么实现分布式事务消息的?

半消息:是指暂时还不能被Consumer消费的消息,Producer成功发送到broker端的消息,但是此消息被标记为“暂不可投递”状态,只有等Producer端执行完本地事务后经过二次确认了之后,Consumer才能消费此条消息。

上图就是分布式事务消息的实现过程,依赖半消息,二次确认以及消息回查机制。

  • 1、Producer向broker发送半消息
  • 2、Producer端收到响应,消息发送成功,此时消息是半消息,标记为“不可投递”状态,Consumer消费不了。
  • 3、Producer端执行本地事务。
  • 4、正常情况本地事务执行完成,Producer向Broker发送Commit/Rollback,如果是Commit,Broker端将半消息标记为正常消息,Consumer可以消费,如果是Rollback,Broker丢弃此消息。
  • 5、异常情况,Broker端迟迟等不到二次确认。在一定时间后,会查询所有的半消息,然后到Producer端查询半消息的执行情况。
  • 6、Producer端查询本地事务的状态
  • 7、根据事务的状态提交commit/rollback到broker端。(5,6,7是消息回查)

消息的可用性

  • RocketMQ如何能保证消息的可用性/可靠性?(这个问题的另一种问法:如何保证消息不丢失)

答案如下,要从Producer,Consumer和Broker三个方面来回答。

从Producer角度分析,如何确保消息成功发送到了Broker?

  • 1、可以采用同步发送,即发送一条数据等到接受者返回响应之后再发送下一个数据包。如果返回响应OK,表示消息成功发送到了broker,状态超时或者失败都会触发二次重试。
  • 2、可以采用分布式事务消息的投递方式。
  • 3、如果一条消息发送之后超时,也可以通过查询日志的API,来检查是否在Broker存储成功。

总的来说,Producer还是采用同步发送来保证的。

从Broker角度分析,如何确保消息持久化?

  • 1、消息只要持久化到CommitLog(日志文件)中,即使Broker宕机,未消费的消息也能重新恢复再消费。
  • 2、Broker的刷盘机制:同步刷盘和异步刷盘,不管哪种刷盘都可以保证消息一定存储在pagecache中(内存中),但是同步刷盘更可靠,它是Producer发送消息后等数据持久化到磁盘之后再返回响应给Producer。
  • 3、Broker支持多Master多Slave同步双写和多Master多Slave异步复制模式,消息都是发送给Master主机,但是消费既可以从Master消费,也可以从Slave消费。同步双写模式可以保证即使Master宕机,消息肯定在Slave中有备份,保证了消息不会丢失。

从Consumer角度分析,如何保证消息被成功消费?

  • Consumer自身维护了个持久化的offset(对应Message Queue里的min offset),用来标记已经成功消费且已经成功发回Broker的消息下标。如果Consumer消费失败,它会向Broker发回消费失败的状态,发回成功才会更新自己的offset。如果发回给broker时broker挂掉了,Consumer会定时重试,如果Consumer和Broker一起挂掉了,消息还在Broker端存储着,Consumer端的offset也是持久化的,重启之后继续拉取offset之前的消息进行消费。

刷盘实现

RocketMQ提供了两种刷盘策略:同步刷盘和异步刷盘

  • 同步刷盘:在消息达到Broker的内存之后,必须刷到commitLog日志文件中才算成功,然后返回Producer数据已经发送成功。
  • 异步刷盘:异步刷盘是指消息达到Broker内存后就返回Producer数据已经发送成功,会唤醒一个线程去将数据持久化到CommitLog日志文件中。

优缺点分析:同步刷盘保证了消息不丢失,但是响应时间相对异步刷盘要多出10%左右,适用于对消息可靠性要求比较高的场景。异步刷盘的吞吐量比较高,RT小,但是如果broker断电了内存中的部分数据会丢失,适用于对吞吐量要求比较高的场景。

负载均衡

  • 你说下RocketMQ的负载均衡是如何实现的?

RocketMQ是分布式消息服务,负载均衡时再生产和消费的客户端完成的。

上面讲过,nameServer保存着Topic的路由信息,路由记录了broker集群节点的通讯地址,broker的名称以及读写队列数量等信息。写队列writeQueue表示生产者可以写入的队列数,如果不做配置默认为4,也就是queueId是0,1,2,3.broker收到消息后根据queueId生成消息队列,生产者负载均衡的过程的实质就是选择broker集群和queueId的过程。读队列readQueue表示broker中可以供消费者读取信息的队列个数,默认也是4个,也就是queueId也是0,1,2,3。消费者拿到路由信息后会选择queueId,从对应的broker中读取数据消费。

下面我从生产者负载均衡和消费者负载均衡两个角度来说明:

  • 生产者的负载均衡:实质是在选择MessageQueue对象(内部包含了brokerName和queueId),第一种是默认策略,从MessageQueue列表中随机选择一个,算法时通过自增随机数对列表打下取余得到位置信息,但获得的MessageQueue所在集群不能是上次失败集群。第二种是超时容忍策略,先随机选择一个MessageQueue,如果因为超时等异常发送失败,会优先选择该broker集群下其他MessageQueue发送,如果没找到就从之前发送失败的Broker集群中选一个进行发送,若还没有找到才使用默认策略。
  • 消费者的负载均衡:这里可选的有六种算法。
    1、平均分配算法

2、环形算法

3、指定机房算法

4、就近机房算法

5、统一哈希算法

使用一致性哈希算法进行负载,每次负载都会重建一致性hash路由表,获取本地客户端负责的所有队列信息。默认的hash算法为MD5,假设有4个消费者客户端和2个消息队列mq1和mq2,通过hash后分布在hash环的不同位置,按照一致性hash的顺时针查找原则,mq1被client2消费,mq2被client3消费。


6、手动配置算法

总结

今天主要讲了RocketMq的基本架构以及消息是怎么发送,存储和消费的,要掌握NameServer、Broker、Producer、Consumer各自的职责是什么,在消息这方面都做了什么努力,比如保证高可用、保证顺序、保证分布式、实现过滤、实现去重以及怎么做负载均衡等等。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MySQL数据库的表中 NULL 和 空值 到底有什么区别呢

发表于 2020-09-18

Mysql 5.7.29

首先我们建立一个表

表的一个字段default空字符串,一个字段default null

接下来我们往数据库插入10条数据,下面的SQL执行10次,那么我们会得到下面的数据

那么,下面这两条语句会走索引吗

1
2
3
csharp复制代码explain select id, configuration, description from t_my_jvm2 where configuration = '2' ;

explain select id, configuration, description from t_my_jvm2 where description = '描述' ;

是否走索引

根据查看执行计划的key列,你会得到这样的答案,上面那条不走索引,下面那条走索引

这很令人吃惊,为什么第一条不走索引呢????

我们接下来修改一下数据。

删除configuration的5条数据,往description添加5条数据

我们再来查一次

发现两列数据,都走索引了

我们再来试验一个东西,两列数据都增加一行数据

再测试一次,发现都不走索引了

小推测

所以我们简单的推测一下,

1
2
3
4
5
shell复制代码# configuration = '2'  当空串''的数量,小于等于40%的时候(10条有4条及以下),不会走索引
# configuration = '2' 当空串''的数量,大于40%的时候(10条有4条以上),走索引

# description = '描述' 当null的数量,小于等于40%的时候(10条有4条及以下),不会走索引
# description = '描述' 当null的数量,大于40%的时候(10条有4条以上),会走索引

测试 is null 和 is not null

我们接下来看一下 is null 和 is not null

我们先把数据恢复

执行下面两条语句

1
2
3
csharp复制代码explain select id, configuration, description from t_my_jvm2 where description is null ;

explain select id, configuration, description from t_my_jvm2 where description is not null ;

结果是,上面的 is null 语句,不走索引,下面的 is not null 语句走索引

我们先测试 is null,我们来修改一下数据

再执行一次,is null,仍然不会走索引

我们再次修改一下数据

再执行一次,is null,发现走索引了

与此同时,我们执行一下,is not null,发现不走索引

我们修改一下数据,

再执行一次,is not null,发现还是不走索引

再修改一下数据

再执行一次,is not null,发现走索引了

总结

1
2
3
4
5
kotlin复制代码# configuration = '2' 当空串''的数量,小于等于40%的时候(10条有4条及以下),不会走索引
# configuration = '2' 当空串''的数量,大于40%的时候(10条有4条以上),会走索引

# is null 当null的数量,小于等于40%的时候(10条有4条及以下),不会走索引
# is null 当null的数量,大于40%的时候(10条有4条以上),会走索引
1
2
3
4
5
kotlin复制代码# is not null 当null的数量,小于等于30%的时候(10条有3条及以下),会走索引
# is not null 当null的数量,大于30%的时候(10条有3条以上),不会走索引

# description = '描述' 当null的数量,小于等于40%的时候(10条有4条及以下),不会走索引
# description = '描述' 当null的数量,大于40%的时候(10条有4条以上),会走索引

最后

1,走过路过记得点赞啊,盆友们

2,掘金的markdown编辑器真难用,比不上typora,连CSDN都比不上,虽然一个网站内容是最重要的,但是写作环境这么差,谁写啊!!!!!!!!!!吐槽!!!!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot主流读取配置文件四种方式(新增更强大读取

发表于 2020-09-18

读取配置SpringBoot配置文件四种方式(新增更强大读取方式)

一、利用Bean注解中的Value(${})注解

1
2
3
4
5
6
java复制代码@Data
@Component
public class ApplicationProperty {
@Value("${application.name}")
private String name;
}

该方式可以自动读取当前配置文件appliation.yml 或者application.properties中的配置值

区别在于读取yml文件时候支持中文编码,peoperties需要转码

二、利用@ConfigurationProperties(prefix = “developer”)注解

1
2
3
4
5
6
7
8
9
java复制代码@Data
@ConfigurationProperties(prefix = "developer")
@Component
public class DeveloperProperty {
private String name;
private String website;
private String qq;
private String phoneNumber;
}

该方式直接将当前加载yml配置文件前缀为developer的属性

读取developer.name…

pom文件中引入依赖

1
2
3
4
5
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>

三、前两种读取配置的使用方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码//使用方法
private final ApplicationProperty applicationProperty;
private final DeveloperProperty developerProperty;

@Autowired
public PropertyController(ApplicationProperty applicationProperty, DeveloperProperty developerProperty) {
this.applicationProperty = applicationProperty;
this.developerProperty = developerProperty;
}

@GetMapping("/property")
public Dict index() {
System.out.println("name:"+applicationProperty.getName());
System.out.println("version:"+applicationProperty.getVersion());
System.out.println("DevName:"+applicationProperty.getDeveloperName());
}

四、用Hutool的方式读取配置文件(不支持yml格式)

1.用Props的方式读取

1
java复制代码static Props props1 = new Props("application.properties",CharsetUtil.CHARSET_UTF_8);

2.用Setting的方法读取

1
java复制代码static Setting setting = new Setting("application-dev.yml", CharsetUtil.CHARSET_UTF_8,true);

3.将配置文件读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码public class Constant {

static Props props1 = new Props("application.properties",CharsetUtil.CHARSET_UTF_8);

static Setting setting = new Setting("application-dev.properties", CharsetUtil.CHARSET_UTF_8,true);

public static final String Name ;
public static final String SettingName ;

static {
Name = props.getStr("application.name");
SettingName = setting.getByGroup("name","application");

}
}

4.使用方式

1
java复制代码System.out.println(Constant.DevName+"------"+Constant.DevWebsite);

直接用常量类调用该类属性即可使用

五、用@PropertySource注解的方式读取配置文件(所有格式配置文件都支持)

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Configuration
@Component
@PropertySource(value = {"application.yml"})
@Data
public class ApplicationProperty_PropertySource {
@Value("${server.port}")
private String port;

@Value("${spring.profiles.active}")
private String active;

}

直接使用注解@PropertySource读取classPath下任意配置文件,也可以读取非classPath路径下,可以自定义。用法跟上述方法一致。使用spring框架中自带依赖即可使用。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

高并发下的海量数据处理 前言

发表于 2020-09-18

前言

文本已收录至我的GitHub仓库,欢迎Star:https://github.com/bin392328206/six-finger

种一棵树最好的时间是十年前,其次是现在

每篇一句

利剑虽强却斩不断流水,微风虽弱却能平息海浪。

絮叨

移动互联网时代,高并发下的海量的用户数据每天都在产生,基于用户使用数据等这样的分析,都需要依靠数据统计和分析,当数据量小时,数据库方面的优化显得不太重要,一旦数据量越来越大,系统响应会变慢,TPS直线下降,直至服务不可用。

业务背景

今天呢?其实就是想把自己公司题库业务下的架构梳理一下,当然不是什么牛逼的公司,可能说自己接触的东西,不是那么的牛皮,但是好歹自己也总结一下嘛,公司属于一个线上线下的教育公司,那么题库当然是支撑公司许多业务的底层基石了,当然我来了公司也不是那么久,但是目前题库业务也已经重构了2次了。今年2020年是第三次。下面我先来梳理一下存储设计流程吧

  • 第一阶段 2015年到2018年,当时据说公司是给外包公司做的一个题库系统,因为当时的体量不大,研发之类的原因吧,反正就是外包给人家了,第一阶段,当时据说是采用.Net+sqlserver 做的,然后后面系统维护呀,系统反应之类的慢,因为做题记录我们当时采用的是做一个题就存一个数据,可想而知慢慢的数据量就超过500W了,毕竟业务量也是越来越大。
  • 第二阶段,2018年的重构,当时我也还没来公司,把数据迁移到了 hbase上就是做题数据,所以采用的就是hbase+mysql的存储方式,但是我们用的是phoniex这个其实对于写业务的我们来说,在代码层面是没有啥改变的,其实phoniex对于oltp的业务还是蛮好的,比如索引,多表联查还是可以的,但是就是对于olap业务,就感觉力不从心了,很多业务的报表就会导致了事务事件长,导致hbase写入超时,等等问题就出现了,后面的解决方案就是说主从架构,把报表业务查从库,才解决了这个目前,当时数据量的做题数据已经达到了40亿,但是写入和少量的针对索引的查询,还是在100ms 以内,快还是可以的,但是对于oltp业务就有点力不从心的,然后因为当时2018年的重构呢,很多业务是直接抄的第一阶段的业务代码,经过了几年的开发,几年的人员变更,加上文档的齐全,导致维护,和迭代的成本越来越大,所以我们不得不考虑重构这个题库系统(重构系统的最大难处就是数据和业务的让用户无感知的迁移,这个有机会的话我再写文章来阐述)
  • 再我们技术总监和产品总监的敲定下,决定对公司的所有的教务 教学, 教研 ,全自动自适应学习系统的全方面的重构,题库业务当然是首当其冲,当然其实重构之前的业务梳理,然后报表梳理,架构设计,模块设计,数据同步,系统并行,高并发设计,支撑上千校区数万学员同时考试了,等这些我先不阐述,我们来关注对于海量数据存储的方案讨论,然后我们采用的架构就是mysql+es+hbase+clickhuose的组合来应对题库全场景,全方位的业务,首先做题记录存在hbase+clickhuose中,然后试题和试卷就放到mysql+es中。反正听上面的人说,就是目标做成10年内架构不变的打算,哈哈,针对上面的这些业务背景,小六六想和大家来探讨一下,目前市面上主流的对于海量数据的处理方式。

海量数据导致性能慢怎么办

优化现有mysql数据库

  • 数据库设计和表创建时就要考虑性能
    mysql数据库本身高度灵活,造成性能不足,严重依赖开发人员能力。也就是说开发人员能力高,则mysql性能高。这也是很多关系型数据库的通病,所以公司的dba通常工资巨高。
+ 表字段避免null值出现,null值很难查询优化且占用额外的索引空间,推荐默认数字0代替null。
+ 尽量使用INT而非BIGINT,如果非负则加上UNSIGNED(这样数值容量会扩大一倍),当然能使用TINYINT、SMALLINT、MEDIUM\_INT更好。
+ 使用枚举或整数代替字符串类型
+ 尽量使用TIMESTAMP而非DATETIME
+ 单表不要有太多字段,建议在20以内
+ 用整型来存IP
  • 索引
  • 索引并不是越多越好,要根据查询有针对性的创建,考虑在WHERE和ORDER BY命令上涉及的列建立索引,可根据EXPLAIN来查看是否用了索引还是全表扫描
  • 应尽量避免在WHERE子句中对字段进行NULL值判断,否则将导致引擎放弃使用索引而进行全表扫描
  • 值分布很稀少的字段不适合建索引,例如”性别”这种只有两三个值的字段
  • 字符字段只建前缀索引
  • 字符字段最好不要做主键
  • 不用外键,由程序保证约束
  • 尽量不用UNIQUE,由程序保证约束
  • 使用多列索引时主意顺序和查询条件保持一致,同时删除不必要的单列索引
    简言之就是使用合适的数据类型,选择合适的索引

总之:一条就是如果我们的mysql还能优化,那就优化它吧,不行就加缓存,就是简单的方法可以完成80% 和难的方法完成95% 那么我们也可以选择简单的方法,难的放到不得不用的时候再来。

总之,能先进行简单优化则没必要引入负责的解决方案,通常数据库刚表现出压力的时候,大部分原因不是因为业务真的发展到数据库撑不住了,而是很多慢查询导致的;

聊聊分库分表

为什么需要分库分表?

请求数太高:
在高并发情况下,大量请求落入数据库,最终会导致数据库的活跃连接数增加,进而逼近甚至达到数据库可承载活跃连接数的阈值。在业务Service层来看就是,可用数据库连接少甚至无连接可用。接下来面临的就是并发量、吞吐量、连接异常、崩溃、宕机;

  • 数据查询慢:
+ 一、单表或单库数据量过大引起的,具体参考第三条;
+ 二、单库整体并发连接数接近系统阈值,从而导致此请求获取不到连接数或者已经获取但是遇到CPU瓶颈,导致SQL所查询的表就算数据行很少也同样出现查询过慢的现象;
  • 数据量太大:

-一、当一个库的数据存储量太大时,就算每张表的并发数不多,但是因为是海量数据,单库中存在大量的数据表,每张表都有一部分并发请求,导致最终单库的连接数阈值(最大连接数默认100,最大可设为16384,但是一般按硬件和库的业务属性来合理配置,一般在500-1200之间)成为数据库的瓶颈;

+ 二、当一张表数据太多时也导致单表查询速度严重下降,虽然innoDB存储引擎的表允许的最大行数为10亿,但是如果一张表的数据行记录达到上亿级,那么我就算通过索引去查询一条数据,它也需要至少经过上十次到几十次磁盘IO,从而导致单表查询速度直线下降;一般一张表的数据行为1000万左右是最合适的,因为表数据为1000万时建立的索引如果是B+Tree类型的话一般树高在3~5之间,所以查询的速度自然也是很快速的;
  • 单体架构通病:
    单库中遇到问题需要修复时影响了整个库中所有数据,而分库时只需要修复某个库就好了;

其实以上问题都是属于数据库遭遇到了瓶颈,但是只不过根据情况不同分为不同类型的数据库瓶颈,但是最终对于客户端而言就是数据库不可用了或者变慢了。

注意!!!
不要为了分库分表而分库分表!!!
引入SOA架构中的一句话:架构不是一蹶而起的,而是慢慢演进的

至于分库分表的常识性问题,小六六就不在这边阐述了,我觉得每一个后端开发人员至少需要去了解一下它,它的垂直分库,垂直分表,水平分库,水平分表。它的优点(其实就是可以解决单表数据量大的情况下,查询的效率问题,因为mysql 索引的树高在3到4层的时候,查询是好的),然后它的缺点(事务,多表联查,分页,排序)等等,然后要怎么解决这些问题,目前业界也有很多开源框架,mycat,shardingsphere等等等等。其实这也算是一个知识点了,可能小六六后面会出这种文章吧,但是我们今天分库分表只能说是一种海量数据的解决方案

NoSQL/NewSQL

这边说下,例如我们公司使用的hbase,es,redis等都是nosql,他们的特点就是分布式,具有很强的水平拓展能力,

NoSQL/NewSQL作为新生儿,在我们把可靠性当做首要考察对象时,它是无法与RDBMS相提并论的。RDBMS发展几十年,只要有软件的地方,它都是核心存储的首选。

目前绝大部分公司的核心数据都是:以RDBMS存储为主,NoSQL/NewSQL存储为辅!互联网公司又以MySQL为主,国企&银行等不差钱的企业以Oracle/DB2为主。NoSQL/NewSQL宣传的无论多牛逼,就现在各大公司对它的定位,都是RDBMS的补充,而不是取而代之!

最近很火的TiDB就是一种NewSql,小六六公司也没用,但是自己去玩了一把,当然我不是专业的测试人员,对于它的性能我确实不知道到底比其他数据库好多少,但是对于用法来说,我觉得还行,因为你会sql语法,你就会用它,比较简单就能使用。它的TIKV 支持OLTP场景,他的TIflash 支持 OLAP场景,我相信,再5G时代,他会是一款非常好的数据库。

mysql+Nosql方案

目前因为NewSql的普及性,大部分的公司我觉得还是mysql为主+Nosql为辅的方案比较多,大家可以在下面评论留言,讨论一下大家公司的解决方案,

  • mysql(主要数据存储)分库分表+ es(查询 分页 排序)放部分字段+主键

这种方案的话,也是可以的,应该有公司用吧

  • hbase (列式存储 rowkey设计)+es(查询 分页 排序) 部分字段+rowkey
  • solr+hbase(其实这个方案更加成熟,用得估计也更多) 比如说 Lily HBase Indexer。

总结

最后,对几种方案总结如下(分库分表简称为sc):

总之,对于海量数据,且有一定的并发量的分库分表,绝不是引入某一个分库分表中间件就能解决问题,而是一项系统的工程。需要分析整个表相关的业务,让合适的中间件做它最擅长的事情。例如有sharding column的查询走分库分表,一些模糊查询,或者多个不固定条件筛选则走es,海量存储则交给HBase。

做了这么多事情后,后面还会有很多的工作要做,比如数据同步的一致性问题,还有运行一段时间后,某些表的数据量慢慢达到单表瓶颈,这时候还需要做冷数据迁移。总之,分库分表是一项非常复杂的系统工程。任何海量数据的处理,都不是简单的事情,做好战斗的准备吧!

结尾

今天就聊这么多吧,下次有机会和大家聊聊对于高并发的处理呗,可能我们的业务场景是做题,所以处理起来就比订单 商品啥的要简单很多,但是我们也是花心思去设计我们的高并发场景的,还有重构后,连表结构都变了,我们是怎么让用户做到无感知的迁移的,反正重构绝不是说新做一个系统那么简单,路还是很长的,除非你能不要以前的数据,那就很简单了。哈哈,其实我们做的也只能是70分吧,做不到满分,不知道大公司是怎么搞重构的,希望有大佬再评论下留言。

日常求赞

好了各位,以上就是这篇文章的全部内容了,能看到这里的人呀,都是真粉。

创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见

六脉神剑 | 文 【原创】如果本篇博客有任何错误,请批评指教,不胜感激 !

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

OkHttp透明压缩,收获性能10倍,外加故障一枚

发表于 2020-09-16

原创:小姐姐味道(微信公众号ID:xjjdog),欢迎分享,转载请保留出处。

要使用OkHttp,一定要知道它的透明压缩,否则死都不知道怎么死的;或者活也不知道为什么活的不舒坦。

反正不是好事。

什么叫透明压缩呢?OkHttp在发送请求的时候,会自动加入gzip请求头Accept-Encoding:gzip。所以,当返回的数据带有gzip响应头时Content-Encoding=gzip,OkHttp会自动帮我们解压数据。(Accept-Encoding和Content-Encoding是一对请求头,分别对应着请求和返回)

为什么要进行压缩呢?因为它能大幅减少传输的容量。像一些CPU资源占用不高的服务,比如Kafka,我们就可以开启gzip压缩,加快信息的流转。

这个压缩比有多高呢?可以看下下面实实在在的截图,对于普通的xml或者json,数据可以由9MB压缩到350KB左右,压缩比足足达到了26。

它让系统性能飞起来

SpringCloud微服务体系,现在有非常多的公司在用。即使是一些传统企业,一些大数据量的toB企业,也想尝一尝螃蟹。

对于一个简单的SpringBoot服务,我们只需要在yml文件中配置上相应的压缩就可以了。这样,我们就打通了浏览器到Web服务的这一环。这种压缩方式,对于大数据量的服务来说,是救命式的!

具体配置如下。

1
2
3
4
5
6
yaml复制代码server:
port: 8082
compression:
enabled: true
min-response-size: 1024
mime-types: ["text/html","text/xml","application/xml","application/json","application/octet-stream"]

它所对应的Spring配置类是org.springframework.boot.web.server.Compression。

但是不要高兴太早。由于是分布式环境,这里面调用链就会长一些。即使是在内网,动辄十几MB的网络传输,也会耗费可观的时间。

如上图,一个请求从浏览器到达真正的服务节点,可能要经过很多环节。

  • nginx转发请求到微服务网关zuul
  • zuul转发到具体的微服务A
  • 微服务A通过Feign接口调用微服务B

如果我们的数据,大多数是由微服务B提供的,那么上面的任何一个环节传输效率慢,都会影响请求的性能。

所以,我们需要开启Feign接口的gzip压缩。使用OkHttp的透明代理是最简单的方式。

首先,在项目中引入feign的jar包。

1
2
3
4
xml复制代码dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-okhttp</artifactId>
</dependency>

其次,在yml文件中启用OkHttp作为feign的客户端请求工具包。稳妥起见,我们同时屏蔽了httpclient,这个东西太重太老了。

1
2
3
4
5
yaml复制代码feign:
httpclient:
enabled: false
okhttp:
enabled: true

到此为止,我们就可以享受OkHttp的透明代理带来的便捷性了。

假如你的应用数据包大,调用链长,这种方式甚至会给你的服务带来数秒的性能力提升。xjjdog就曾经靠调整几个参数,就让一个蜗牛系统飞了起来。大家惊呼:原来B端也可以C一下。

OkHttp是如何实现透明压缩的?

OkHttp对于透明压缩的处理,是通过拦截器来做的。具体的类,就是okhttp3.internal.http.BridgeInterceptor。

具体代码如下,当判断没有Accept-Encoding头的时候,就自行加入一个。

1
2
3
4
5
6
7
java复制代码// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}

最关键的代码在下面。

1
2
3
4
5
6
7
8
9
10
11
12
bash复制代码if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}

可以看到if语句里,有三个条件。

  • 程序没有设置Accept-Encoding,启用了透明压缩
  • 服务端有Content-Encoding头,并启用了gzip压缩
  • 有数据包

只有同时满足这三个条件,OkHttp的透明压缩才会起作用,帮我们自动解压。

它挖的坑有点深

可惜的是,上面的关键代码,只有if,没有else,也就是当其中的任何一个条件不满足,后端的数据包将原封不动的返回。

2、3两个条件是没有什么问题的,原样返回后端数据并没有什么损害,问题就出在第一个条件里。

如果你在代码中,使用了下面的代码:

1
2
3
4
java复制代码Request.Builder builder = chain.request()
.newBuilder()
.addHeader("Accept", "application/json")
.addHeader("Accept-Encoding", "gzip");

也就是手动设置了Accept-Encoding头信息。这很常见,因为这体现了程序员思维的严谨。

正是这种严谨,造成了问题。

假如你的后端应用刚开始是没有开启gzip压缩的,这时候两者相安无事;但如果你的后端应用突然有一天开启了gzip压缩,你的这段代码将全部over。

原因就是,服务端gzip数据包会原样返回,你需要手动处理gzip数据包。

所以,不加是好事,加了反而会坏事,除非你想自己处理gzip数据。

由于OkHttp在Android上应用也非常广泛,如果你不知道这个细节,造成的后果就是灾难性的。客户端更新慢,只能老老实实回退服务端了。

智能的背后,总有些肉眼不可见的细节。就像是xjjdog纯情的背后,总有一份羞涩。只有深入了解,你才会知道它的美。

作者简介:小姐姐味道 (xjjdog),一个不允许程序员走弯路的公众号。聚焦基础架构和Linux。十年架构,日百亿流量,与你探讨高并发世界,给你不一样的味道。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

阿里架构师讲面试:分布式缓存

发表于 2020-09-15

为什么引入缓存

  • 高性能

sql语句查询600ms,缓存查询2ms,查询速度提高300倍。

  • 高并发

mysql这么重的数据库,压根儿设计不是让你玩儿高并发的,虽然也可以玩儿,但是天然支持不好。mysql单机支撑到2000qps也开始容易报警了。所以要是你有个系统,高峰期一秒钟过来的请求有1万,那一个mysql单机绝对会死掉。你这个时候就只能上缓存,把很多数据放缓存,别放mysql。缓存功能简单,说白了就是key-value式操作,单机支撑的并发量轻松一秒几万十几万,支撑高并发so easy。单机承载并发量是mysql单机的几十倍。

redis分布式缓存原理

高性能

  • 纯内存操作
  • 单线程避免上下文切换
  • 数据结构优化
  • NIO线程模型(多路复用)

高并发

Redis 读并发 110000 次 /s, 写并发 81000 次 /s 。

读写分离

如果redis要支撑超过10万+的读并发,那应该怎么做?

单机的redis几乎不太可能说QPS超过10万+,除非一些特殊情况,比如你的机器性能特别好,配置特别高,物理机,维护做的特别好,而且你的整体的操作不是太复杂。

读写分离,一般来说,对缓存,一般都是用来支撑读高并发的,写的请求是比较少的,可能写并发也就一秒钟几千。

大量的请求都是读,假设一秒钟二十万次读,单机redis肯定撑不住这个并发,我们可以采用读写分离的技术方案去解决:

主从架构,即一主带多从的架构。主服务器只负责应对写入并发,然后将数据同步到从服务器,从服务器只负责读取并发。由于follower的数据是master的全映像,因此可以通过follow-slave的横向扩容很容易的增加读并发能力。

数据分片

当缓存的写入并发越来越大,单机master的架构就会成为瓶颈。这时我们仍然可以借鉴分布式DB的思路,通过数据分片来降低master单机的写并发。实现架构:

主从模式组 + 多个组集群。其中每一个组相当于一套主从结构的系统。多个组间存储不同数据分片,共同构成集群。

数据分片架构下需要考虑key的映射,集群的扩容缩容以及服务器状态管理等问题,这些功能目前已经有相应的框架实现,不需要过分侵入业务开发工程师的精力。目前实现数据分片(集群)技术方案大致有三种:

  • 客户端实现数据分片

即客户端自己计算数据的key应该在哪个机器上存储和查找,此方法的好处是降低了服务器集群的复杂度,客户端实现数据分片时,服务器是独立的,服务器之前没有任何关联。多数redis客户端库实现了此功能,也叫sharding,这种方式的缺点是客户端需要实时知道当前集群节点的联系信息,同时,当添加一个新的节点时,客户端要支持动态sharding,多数客户端实现不支持此功能,需要重启redis。另一个弊端是redis的HA需要额外考虑。

  • 服务器实现数据分片

客户端随意与集群中的任何节点通信,服务器端负责计算某个key在哪个机器上。当客户端访问某台机器时,服务器计算对应的key应该存储在哪个机器,然后把结果返回给客户端。客户端再去对应的节点操作key,是一个重定向的过程,此方式是redis3.0正在实现,Redis 3.0的集群同时支持HA功能,某个master节点挂了后,其slave会自动接管。

  • 通过代理服务器实现数据分片

此方式是借助一个代理服务器实现数据分片,客户端直接与proxy联系,proxy计算集群节点信息,并把请求发送到对应的集群节点。降低了客户端的复杂度,需要proxy收集集群节点信息。Twemproxy是twitter开源的,实现这一功能的proxy。这个实现方式在客户端和服务器之间加了一个proxy,但这是在redis 3.0稳定版本出来之前官方推荐的方式。结合redis-sentinel的HA方案,是个不错的组合。

高可用

Redis的几种常见架构方案包括以下几种,需要根据不同的业务实际做选择:

• Redis单副本;

• Redis多副本(主从);

Redis Sentinel(哨兵);

Redis-Sentinel是Redis官方推荐的高可用性(HA)解决方案。实际上这意味着你可以使用Sentinel模式创建一个可以不用人为干预而应对各种故障的Redis部署。

Sentinel(哨兵)的主要功能有以下几点:

  • 监控:Sentinel不断的检查master和slave是否正常的运行。
  • 通知:如果发现某个redis节点运行出现问题,可以通过API通知系统管理员和其他的应用程序。
  • 自动故障转移:能够进行自动切换。当一个master节点不可用时,能够选举出master的多个slave中的一个来作为新的master,其它的slave节点会将它所追随的master的地址改为被提升为master的slave的新地址。
  • 配置提供者:哨兵作为Redis客户端发现的权威来源。客户端连接到哨兵请求当前可靠的master的地址。如果发生故障,哨兵将报告新地址。

sentinel的分布式特性:

很显然,只使用单个sentinel进程来监控redis集群是不可靠的,当sentinel进程宕掉后(sentinel本身也有单点问题,single-point-of-failure)整个集群系统将无法按照预期的方式运行。所以有必要将sentinel集群,这样有几个好处:

  • 即使有一些sentinel进程宕掉了,依然可以进行redis集群的主备切换;
  • 如果只有一个sentinel进程,如果这个进程运行出错,或者是网络堵塞,那么将无法实现redis集群的主备切换(单点问题);
  • 如果有多个sentinel,redis的客户端可以随意地连接任意一个sentinel来获得关于redis集群中的信息。

参考:

www.cnblogs.com/knowledgese…

Redis Cluster

hash slot算法

redis cluster有固定的16384个hash slot,对每个key计算CRC16值,然后对16384取模,可以获取key对应的hash slot

redis cluster中每个master都会持有部分slot,比如有3个master,那么可能每个master持有5000多个hash slot

hash slot让node的增加和移除很简单,增加一个master,就将其他master的hash slot移动部分过去,减少一个master,就将它的hash slot移动到其他master上去。

移动hash slot的成本是非常低的。

Redis Cluster是一个高性能高可用的分布式系统。由多个Redis实例组成的整体,数据按照Slot存储分布在多个Redis实例上,通过Gossip协议来进行节点之间通信。

Redis Cluster功能特点如下:

1)所有的节点相互连接

2)集群消息通信通过集群总线通信,,集群总线端口大小为客户端服务端口+10000,这个10000是固定值

3)节点与节点之间通过二进制协议进行通信

4)客户端和集群节点之间通信和通常一样,通过文本协议进行

5)集群节点不会代理查询

6)数据按照Slot存储分布在多个Redis实例上

7)集群节点挂掉会自动故障转移

8)可以相对平滑扩/缩容节点

  • 节点间的内部通信机制

redis cluster节点间采取gossip协议进行通信。跟集中式不同,不是将集群元数据(节点信息,故障等等)集中存储在某个节点上(比如哨兵节点上),而是互相之间不断通信,保持整个集群所有节点的数据是完整的。

集中式:好处在于,元数据的更新和读取,时效性非常好,一旦元数据出现了变更,立即就更新到集中式的存储中,其他节点读取的时候立即就可以感知到;。不好在于,所有的元数据的跟新压力全部集中在一个地方,可能会导致元数据的存储有压力。

gossip:好处在于,元数据的更新比较分散,不是集中在一个地方,更新请求会陆陆续续,打到所有节点上去更新,有一定的延时,降低了压力;缺点,元数据更新有延时,可能导致集群的一些操作会有一些滞后。

对于gossip通信方式,可能有些同学会考虑网状结构通信的问题。考虑到以下实际情况:一是节点相对较少,二是元数据信息更新频率较低。所以,在这个背景下采用gossip通信方式是合理的。

参考:developer.aliyun.com/article/626…

zhuanlan.zhihu.com/p/95678924

缓存架构业务场景分析

当我们考虑是否采用缓存架构作为技术方案,需要考虑以下几个问题:

  1. 项目的读写操作比例为多少,如果是写多读少,那缓存真的不一定能帮助你,此时不妨考虑数据库分库分表,然后做MySQL的分布式集群,或者简单直接,将硬盘全部替换为SSD(如果你的公司财大气粗),反之,以读为主的项目就比较适合加缓存了。
  2. 项目的访问频率高不高(用户多不多)?如果用户区区几千人或几万人,全然没有必要使用缓存,这点访问量经过网络后几乎不会造成并发,即使偶出现几万的并发,MySQL也是扛得住的,强行使用缓存反而会增加代码复杂度,甚至不容易维护,得不偿失。
  3. 数据是否要求强一致性?如果项目涉及到金钱或者重要数据,且数据频繁发生变化,不允许存在一点差异,那是否使用缓存就要慎重慎重再慎重!因为缓存适用的是对数据一致性不是特别高的项目,如果使用,需要对缓存的设计有很好的方案,非常考验技术功底。

如果最后决定使用缓存方案,则需要根据实际业务场景进行缓存架构设计。是采用单副本架构,还是主从架构或者分片架构?需要根据业务的读写特征来看,一般的:

  • 当业务读写并发量都较小时,单副本架构完全可以满足未来业务发展,就不需要考虑读写分离和数据分片方案给架构带来不必要的复杂性。
  • 当写并发较小,且评估单副本无法承载读并发的场景,可以考虑读写分离的主从架构方案。
  • 如果随着业务的不断发展,写并发逐渐增大,master压力较大,则可以考虑做主从+分片的架构方案。

参考:www.cnblogs.com/wangbaojun/…

常见缓存问题

缓存雪崩

缓存雪崩是指缓存中数据大批量到过期时间(比如缓存服务器宕机,也算做缓存雪崩),而查询数据量巨大,引起数据库压力过大甚至宕机。和缓存击穿不同的是,缓存击穿指并发查同一条数据,缓存雪崩是不同数据都过期了,很多数据都查不到从而查数据库。

热点数据集中失效场景导致缓存雪崩场景

  • 通常的解决办法是对不同的数据使用不同的失效时间,避免热点数据缓存集中失效。例如,我们要缓存user数据,会对每个用户的数据设置不同的缓存过期时间,可以定义一个基础时间,假设10秒,然后加上一个两秒以内的随机数,过期时间为10~12秒,就会避免缓存雪崩。
  • 类似击穿的情形,也可以尝试以下方案。
  • 在第一个请求去查询数据库的时候对他加一个互斥锁,其余的查询请求都会被阻塞住,直到锁被释放,从而保护数据库。但是也是由于它会阻塞其他的线程,此时系统吞吐量会下降。这种方案会导致日常业务并发出现负尖刺的情况,因此需要结合实际的业务去考虑是否要这么做。
  • 定时主动刷新,避免缓存过期场景。

缓存服务不可用场景导致缓存雪崩场景

事前:

  • 使用集群缓存,保证缓存服务的高可用

这种方案就是在发生雪崩前对缓存集群实现高可用,如果是使用 Redis,可以使用 主从+哨兵 ,Redis Cluster 来避免 Redis 全盘崩溃的情况。

事中:

  • ehcache本地缓存 + Hystrix限流&降级,避免MySQL被打死

使用 ehcache 本地缓存的目的也是考虑在 Redis Cluster 完全不可用的时候,ehcache 本地缓存还能够支撑一阵。

使用 Hystrix进行限流 & 降级 ,比如一秒来了5000个请求,我们可以设置假设只能有一秒 2000个请求能通过这个组件,那么其他剩余的 3000 请求就会走限流逻辑。

然后去调用我们自己开发的降级组件(降级),比如设置的一些默认值呀之类的。以此来保护最后的 MySQL 不会被大量的请求给打死。

事后:

  • 开启Redis持久化机制,尽快恢复缓存集群

一旦重启,就能从磁盘上自动加载数据恢复内存中的数据。

缓存击穿

缓存击穿是指缓存中没有但数据库中有的数据(一般是缓存时间到期),这时由于并发用户特别多,同时读缓存没读到数据,又同时去数据库去取数据,引起数据库压力瞬间增大,造成过大压力。解决方案:

  • 设置热点数据永远不过期,比如配置数据,但是可以主动刷新。比如clientQuoteDefinition表配置。
  • 定时刷新。保证汇率不过期。
  • 缓存预热。
  • 加互斥锁。

上面的现象是多个线程同时去查询数据库的这条数据,那么我们可以在第一个查询数据的请求上使用一个 互斥锁来锁住它。

其他的线程走到这一步拿不到锁就等着,等第一个线程查询到了数据,然后做缓存。后面的线程进来发现已经有缓存了,就直接走缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scss复制代码public String get(key) {
String value = redis.get(key);
if (value == null) { //代表缓存值过期
//设置3min的超时,防止del操作失败的时候,下次缓存过期一直不能load db
if (redis.setnx(key_mutex, 1, 3 * 60) == 1) { //代表设置成功
value = db.get(key);
redis.set(key, value, expire_secs);
redis.del(key_mutex);
} else { //这个时候代表同时候的其他线程已经load db并回设到缓存了,这时候重试获取缓存值即可
sleep(50);
get(key); //重试
}
} else {
return value;
}
}

缓存穿透

缓存穿透是指缓存和数据库中都没有的数据,而用户不断发起请求,如发起为id为“-1”的数据或id为特别大不存在的数据。这时的用户很可能是攻击者,攻击会导致数据库压力过大。解决方案:

  • 接口层增加校验,如用户鉴权校验,id做基础校验,id<=0的直接拦截(业务约束校验)。
  • 设置null值,并设定超时时间。从缓存取不到的数据,在数据库中也没有取到,这时也可以将key-value对写为key-null,缓存有效时间可以设置短点,如30秒(设置太长会导致正常情况也没法使用)。这样可以防止攻击用户反复用同一个id暴力攻击。针对攻击key值重复率过高的场景。
  • bloomfilter,直接校验key值是否存在,针对攻击key值重复率低的场景。

缓存并发竞争

这里的并发指的是多个redis的client同时set key引起的并发问题。实际上,Redis是一种单线程机制的nosql数据库,基于key-value,数据可持久化落盘。由于单线程所以Redis本身并没有锁的概念,多个客户端连接并不存在竞争关系,但是利用jedis等客户端对Redis进行并发访问时会出现问题。

比如:同时有多个子系统去set一个key。这个时候要注意什么呢?举一个例子:

多客户端同时并发写一个key,一个key的值是1,本来按顺序修改为2,3,4,最后是4。但是因为网络原因,顺序变成了4,3,2,最后变成了2。再比如汇率更新场景。

  • 分布式锁+时间戳

这种情况,主要是准备一个分布式锁,大家去抢锁,抢到锁就做set操作。加锁的目的实际上就是把并行读写改成串行读写的方式,从而来避免资源竞争。

当然,分布式锁可以基于很多种方式实现,比如zookeeper、redis等,不管哪种方式实现,基本原理是不变的:用一个状态值表示锁,对锁的占用和释放通过状态值来标识。

时间戳

由于上面举的例子,要求key的操作需要顺序执行,所以需要保存一个时间戳判断set顺序。

1
2
css复制代码系统A key 1 {ValueA 7:00}
系统B key 1 { ValueB 7:05}

假设系统B先抢到锁,将key1设置为{ValueB 7:05}。接下来系统A抢到锁,发现自己的key1的时间戳早于缓存中的时间戳(7:00<7:05),那就不做set操作了。

  • 消息队列

在并发量过大的情况下,可以通过消息中间件进行处理,把并行读写进行串行化。把Redis.set操作放在队列中使其串行化,必须的一个一个执行。这种方式在一些高并发的场景中算是一种通用的解决方案。

参考:juejin.cn/post/684490…

缓存与数据库双写不一致(缓存更新策略)

在读取缓存方面,一般按照下图的流程来进行业务操作。

但是在更新缓存方面,对于更新完数据库,是更新缓存呢,还是删除缓存。又或者是先删除缓存,再更新数据库。考虑到缓存和DB的数据一致性,目前还存在很大的争议。

在这里,我们讨论三种更新策略:

  1. 先更新数据库,再更新缓存
  2. 先删除缓存,再更新数据库
  3. 先更新数据库,再删除缓存

先更新数据库,再更新缓存

这套方案,大家是普遍反对的。为什么呢?

原因一(线程安全角度)

同时有请求A和请求B进行更新操作,那么会出现

(1)线程A更新了数据库

(2)线程B更新了数据库

(3)线程B更新了缓存

(4)线程A更新了缓存

这就出现请求A更新缓存应该比请求B更新缓存早才对,但是因为网络等原因,B却比A更早更新了缓存。这就导致了脏数据,因此不考虑。

原因二(业务场景角度)

如果你是一个写数据库场景比较多,而读数据场景比较少的业务需求,采用这种方案就会导致,数据压根还没读到(由于主从同步延迟的原因),缓存就被更新,导致数据不一致的情况。

实际上,对于某些业务场景,也可以采用这个方案。比如外汇业务下某业务场景的汇率更新问题。对于某汇率数据库使用select for update控制单线程操作,并且数据库采用单体的架构方案。对于汇率缓存更新操作,使用先更新DB,再更新缓存的技术方案完全能够满足需求。从这里我们可以看到:

  • 架构方案是死的,且任何一套方案都有其优点和缺点,但是业务场景和业务需求是多变得,架构师的选择也是灵活的,在做架构方案时,不要过于刻板。
  • 没有任何架构方案能够做到百分百的高可用,最终都需要人工兜底。

接下来讨论的就是争议最大的,先删缓存,再更新数据库。还是先更新数据库,再删缓存的问题。

先删缓存,再更新数据库

该方案会导致不一致的原因是。同时有一个请求A进行更新操作,另一个请求B进行查询操作。那么会出现如下情形:

(1)请求A进行写操作,删除缓存

(2)请求B查询发现缓存不存在

(3)请求B去数据库查询得到旧值

(4)请求B将旧值写入缓存

(5)请求A将新值写入数据库

上述情况就会导致不一致的情形出现。而且,如果不采用给缓存设置过期时间策略,该数据永远都是脏数据。

那么,如何解决呢?采用延时双删策略

伪代码如下

1
2
3
4
5
6
scss复制代码public void write(String key,Object data){
redis.delKey(key);
db.updateData(data);
Thread.sleep(1000);
redis.delKey(key);
}

转化为中文描述就是

(1)先淘汰缓存

(2)再写数据库(这两步和原来一样)

(3)休眠1秒,再次淘汰缓存

这么做,可以将1秒内所造成的缓存脏数据,再次删除。

那么,这个1秒怎么确定的,具体该休眠多久呢?

针对上面的情形,读者应该自行评估自己的项目的读数据业务逻辑的耗时。然后写数据的休眠时间则在读数据业务逻辑的耗时基础上,加几百ms即可。这么做的目的,就是确保读请求结束,写请求可以删除读请求造成的缓存脏数据。

如果你用了mysql的读写分离架构怎么办?

ok,在这种情况下,造成数据不一致的原因如下,还是两个请求,一个请求A进行更新操作,另一个请求B进行查询操作。

(1)请求A进行写操作,删除缓存

(2)请求A将数据写入数据库了,

(3)请求B查询缓存发现,缓存没有值

(4)请求B去从库查询,这时,还没有完成主从同步,因此查询到的是旧值

(5)请求B将旧值写入缓存

(6)数据库完成主从同步,从库变为新值

上述情形,就是数据不一致的原因。还是使用双删延时策略。只是,睡眠时间修改为在主从同步的延时时间基础上,加几百ms。

采用这种同步淘汰策略,吞吐量降低怎么办?

ok,那就将第二次删除作为异步的。自己起一个线程,异步删除。这样,写的请求就不用沉睡一段时间后了,再返回。这么做,加大吞吐量。

第二次删除,如果删除失败怎么办?

这是个非常好的问题,因为第二次删除失败,就会出现如下情形。还是有两个请求,一个请求A进行更新操作,另一个请求B进行查询操作,为了方便,假设是单库:

(1)请求A进行写操作,删除缓存

(2)请求B查询发现缓存不存在

(3)请求B去数据库查询得到旧值

(4)请求B将旧值写入缓存

(5)请求A将新值写入数据库

(6)请求A试图去删除请求B写入对缓存值,结果失败了。

ok,这也就是说。如果第二次删除缓存失败,会再次出现缓存和数据库不一致的问题。

如何解决呢?

采用重试机制和告警监控人工兜底方案。

先更新数据库,再删缓存

老外提出了一个缓存更新套路,名为《Cache-Aside pattern》。其中就指出

  • 失效:应用程序先从cache取数据,没有得到,则从数据库中取数据,成功后,放到缓存中。
  • 命中:应用程序从cache中取数据,取到后返回。
  • 更新:先把数据存到数据库中,成功后,再让缓存失效。

另外,知名社交网站facebook也在论文《Scaling Memcache at Facebook》中提出,他们用的也是先更新数据库,再删缓存的策略。

这种情况不存在并发问题么?

不是的。假设这会有两个请求,一个请求A做查询操作,一个请求B做更新操作,那么会有如下情形产生

(1)缓存刚好失效

(2)请求A查询数据库,得一个旧值

(3)请求B将新值写入数据库

(4)请求B删除缓存

(5)请求A将查到的旧值写入缓存

ok,如果发生上述情况,确实是会发生脏数据。

然而,发生这种情况的概率又有多少呢?

发生上述情况有一个先天性条件,就是步骤(3)的写数据库操作比步骤(2)的读数据库操作耗时更短,才有可能使得步骤(4)先于步骤(5)。可是,大家想想,数据库的读操作的速度远快于写操作的(不然做读写分离干嘛,做读写分离的意义就是因为读操作比较快,耗资源少),因此步骤(3)耗时比步骤(2)更短,这一情形很难出现。

假设,有人非要抬杠,有强迫症,一定要解决怎么办?

如何解决上述并发问题?

首先,给缓存设有效时间是一种方案。其次,采用异步延时删除策略,保证读请求完成以后,再进行删除操作。

参考:juejin.cn/post/684490…

www.cnblogs.com/rjzheng/p/9…

觉得有收获的话帮忙点个赞吧,让有用的知识分享给更多的人

## 欢迎关注掘金号:五点半社

## 欢迎关注微信公众号:五点半社(工薪族的财商启蒙)##

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…779780781…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%