开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

CompletableFuture真香,可以替代CountD

发表于 2021-09-02

原创:小姐姐味道(微信公众号ID:xjjdog),欢迎分享,转载请保留出处。

在对类的命名篇长文中,我们提到了Future和Promise。

Future相当于一个占位符,代表一个操作将来的结果。一般通过get可以直接阻塞得到结果,或者让它异步执行然后通过callback回调结果。

但如果回调中嵌入了回调呢?如果层次很深,就是回调地狱。Java中的CompletableFuture其实就是Promise,用来解决回调地狱问题。Promise是为了让代码变得优美而存在的。

有多优美?这么说吧,一旦你使用了CompletableFuture,就会爱不释手,就像初恋女友一样,天天想着她。

一系列静态方法

从它的源代码中,我们可以看到,CompletableFuture直接提供了几个便捷的静态方法入口。其中有run和supply两组。

image.png

run的参数是Runnable,而supply的参数是Supplier。前者没有返回值,而后者有,否则没有什么两样。

这两组静态函数,都提供了传入自定义线程池的功能。如果你用的不是外置的线程池,那么它就会使用默认的ForkJoin线程池。默认的线程池,大小和用途你是控制不了的,所以还是建议自己传递一个。

典型的代码,写起来是这个样子。

1
2
3
4
java复制代码CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
return "test";
});
String result = future.join();

拿到CompletableFuture后,你就可以做更多的花样。

这些花样有很多

我们说面说了,CompletableFuture的主要作用,就是让代码写起来好看。配合Java8之后的stream流,可以把整个计算过程抽象成一个流。前面任务的计算结果,可以直接作为后面任务的输入,就像是管道一样。

1
2
3
4
5
6
7
8
9
10
java复制代码thenApply
thenApplyAsync
thenAccept
thenAcceptAsync
thenRun
thenRunAsync
thenCombine
thenCombineAsync
thenCompose
thenComposeAsync

比如,下面代码的执行结果是99,并不因为是异步就打乱代码执行的顺序了。

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
.thenApplyAsync((e) -> {
try {
Thread.sleep(10000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
return e * 10;
}).thenApplyAsync(e -> e - 1);

cf.join();
System.out.println(cf.get());

同样的,函数的作用还要看then后面的动词。

  • apply 有入参和返回值,入参为前置任务的输出
  • accept 有入参无返回值,会返回CompletableFuture
  • run 没有入参也没有返回值,同样会返回CompletableFuture
  • combine 形成一个复合的结构,连接两个CompletableFuture,并将它们的2个输出结果,作为combine的输入
  • compose 将嵌套的CompletableFuture平铺开,用来串联两个CompletableFuture

when和handle

上面的函数列表,其实还有很多。比如:

1
java复制代码whenComplete

when的意思,就是任务完成时候的回调。比如我们上面的例子,打算在完成任务后,输出一个done。它也是属于只有入参没有出参的范畴,适合放在最后一步进行观测。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
.thenApplyAsync((e) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
return e * 10;
}).thenApplyAsync(e -> e - 1)
.whenComplete((r, e)->{
System.out.println("done");
})
;

cf.join();
System.out.println(cf.get());

handle和exceptionally的作用,和whenComplete是非常像的。

1
2
3
java复制代码public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

CompletableFuture的任务是串联的,如果它的其中某一步骤发生了异常,会影响后续代码的运行的。

exceptionally从名字就可以看出,是专门处理这种情况的。比如,我们强制某个步骤除以0,发生异常,捕获后返回-1,它将能够继续运行。

1
2
3
4
5
6
7
8
9
10
java复制代码CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
.thenApplyAsync(e->e/0)
.thenApplyAsync(e -> e - 1)
.exceptionally(ex->{
System.out.println(ex);
return -1;
});

cf.join();
System.out.println(cf.get());

handle更加高级一些,因为它除了一个异常参数,还有一个正常的入参。处理方法也都类似,不再赘述。

当然,CompletableFuture的函数不仅仅这些,还有更多,根据函数名称很容易能够了解到它的作用。它还可以替换复杂的CountDownLatch,这要涉及到几个比较难搞的函数。

替代CountDownLatch

考虑下面一个场景。某一个业务接口,需要处理几百个请求,请求之后再把这些结果给汇总起来。

如果顺序执行的话,假设每个接口耗时100ms,那么100个接口,耗时就需要10秒。假如我们并行去获取的话,那么效率就会提高。

使用CountDownLatch可以解决。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码ExecutorService executor = Executors.newFixedThreadPool(5);

CountDownLatch countDown = new CountDownLatch(requests.size());
for(Request request:requests){
executor.execute(()->{
try{
//some opts
}finally{
countDown.countDown();
}
});
}
countDown.await(200,TimeUnit.MILLISECONDS);

我们使用CompletableFuture来替换它。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码ExecutorService executor = Executors.newFixedThreadPool(5);

List<CompletableFuture<Result>> futureList = requests
.stream()
.map(request->
CompletableFuture.supplyAsync(e->{
//some opts
},executor))
.collect(Collectors.toList());

CompletableFuture<Void> allCF = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));

allCF.join();

我们这里用到了一个主要的函数,那就是allOf,用来把所有的CompletableFuture组合在一起;类似的还有anyOf,表示只运行其中一个。常用的,还有三个函数:

  • thenAcceptBoth 处理两个任务的情况,有两个任务结果入参,无返回值
  • thenCombine 处理两个任务的情况,有入参有返回值,最喜欢
  • runAfterBoth 处理两个任务的情况,无入参,无返回值

End

自从认识了CompletableFuture,我已经很少硬编码Future了。相对于各种回调的嵌套,CompletableFuture为我们提供了更直观、更优美的API。在“多个任务等待完成状态”这个应用场景,CompletableFuture已经成了我的首选。

唯一的问题是,它的函数有点多,你需要熟悉一小段时间。另外,有一个小小的问题,个人觉得,这个类如果叫做Promise的话,就能够和JS的统一起来,算是锦上添花吧。

作者简介:小姐姐味道 (xjjdog),一个不允许程序员走弯路的公众号。聚焦基础架构和Linux。十年架构,日百亿流量,与你探讨高并发世界,给你不一样的味道。我的个人微信xjjdog0,欢迎添加好友,进一步交流。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Dubbo动态调用、自动注册、引用dubbo服务,Dubbo

发表于 2021-09-01

利用注册bean过滤接口获取注册bean和bean的属性动态注册或者引用dubbo服务

启动类配置@ComponentScan(excludeFilters = {@ComponentScan.Filter(type = FilterType.CUSTOM, classes = {BeanFilter.class})})

1
2
3
4
5
6
7
8
less复制代码@ComponentScan(excludeFilters = {@ComponentScan.Filter(type = FilterType.CUSTOM, classes = {BeanFilter.class})})
@SpringBootApplication
public class DubboDemoApplication {

public static void main(String[] args) {
SpringApplication.run(DubboDemoApplication.class,args);
}
}

实现TypeFilter接口,默认取实现的第一个接口(可做修改),如果Service注解没有设置value值,spring注册的bean是实现类的首字母小写,获取bean的时候注意

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
ini复制代码public class BeanFilter  implements TypeFilter{

@Override
public boolean match(MetadataReader reader, MetadataReaderFactory factory) throws IOException {
AnnotationMetadata annotationMetadata = reader.getAnnotationMetadata();
//扫描Spring的service注解
Map<String, Object> annotationAttributes = annotationMetadata.getAnnotationAttributes(Service.class.getName());
AnnotationAttributes annoAttrs = AnnotationAttributes.fromMap(annotationAttributes);
if(ObjectUtil.isNotNull(annoAttrs)){
String[] interfaceNames = reader.getAnnotationMetadata().getInterfaceNames();
//注册dubbo服务
if(interfaceNames.length > 0){
//默认注册第一个实现接口
providerConfig(interfaceNames[0],annoAttrs.getString("value"));
}
//引用dubbo服务
String className = annotationMetadata.getClassName();
Field[] declaredFields = Class.forName(className).getDeclaredFields();
for (Field field : declaredFields) {
//获取属性上面所有注解
Annotation[] annotations = field.getAnnotations();
for (int i = 0; i < annotations.length; i++) {
Annotation annotation = annotations[i];
String beanName = "";
if(annotation instanceof Resource || annotation instanceof Autowired){
String name = field.getType().getName();
//过滤不需要引用的bean
if(name.contains(".mapper.") ){
continue;
}

Autowired annotation2 = field.getAnnotation(Autowired.class);
if(ObjectUtil.isNotNull(annotation2)){
if(!annotation2.required()){
continue;
}
}
if(StrUtil.isBlank(beanName)){
String[] splits = field.getType().getName().split("\.");
beanName = splits[splits.length-1];
beanName = lowerFirstCase(beanName);
}
//过滤重复的bean
if(!callBeans.containsKey(beanName)){
callBeans.put(beanName,Class.forName(field.getType().getName()));
referenceConfig(field.getType().getName(),beanName);
}
}
}
}
}
return false;
}

注册dubbo服务方法,dubbo注册参参数group、version等可从配置文件中读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ini复制代码private void providerConfig(String ClassName, String beanName) {

ServiceConfig<Object> serviceConfig = new ServiceConfig<>();
serviceConfig.setInterface(Class.forName(beanName));
serviceConfig.setRef(beanFactory.getBean(beanName));
serviceConfig.setVersion("2.1");
serviceConfig.setGroup("test");
serviceConfig.setTimeout(50000);

ApplicationConfig applicationConfig = new ApplicationConfig();
applicationConfig.setName("applicationName");
serviceConfig.setApplication(applicationConfig);

RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setProtocol("zookeeper");
registryConfig.setAddress("127.0.0.1:2181");
serviceConfig.setRegistry(registryConfig);

ProtocolConfig protocolConfig = new ProtocolConfig();
protocolConfig.setName("dubbo");
protocolConfig.setPort(-1);
protocolConfig.setHost("0.0.0.0");
serviceConfig.setProtocol(protocolConfig);

//注册服务
serviceConfig.export();
}

引用dubbo服务方法包含dubbo动态调用。group、version等也可从配置文件中读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
ini复制代码private void referenceConfig(String className,String beanName){
//spring和redis的jar包不适用dubbo调用相关的bean
if(className.contains("org.springframework") || className.contains("org.redisson.api")){
return;
}
//自己服务不使用dubbo
if(className.contains("com.dubbo.demo")){
return;
}
ReferenceConfig<Object> referenceConfig = new ReferenceConfig<>();
referenceConfig.setInterface(Class.forName(className));
// 负载均衡策略,可选值:random,roundrobin,leastactive,分别表示:随机,轮询,最少活跃调用
referenceConfig.setLoadbalance("random");
referenceConfig.setId(beanName);
referenceConfig.setVersion("*");
referenceConfig.setGroup("test");
referenceConfig.setTimeout(5000);
referenceConfig.setCheck(false);

ConsumerConfig consumerConfig = new ConsumerConfig();
consumerConfig.setCheck(false);
referenceConfig.setConsumer(consumerConfig);

ApplicationConfig applicationConfig = new ApplicationConfig();
applicationConfig.setName("applicationName");
referenceConfig.setApplication(applicationConfig);

RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setProtocol("zookeeper");
registryConfig.setAddress("addr");
referenceConfig.setRegistry(registryConfig);

//过滤指定路径的包动态调用代理一个类出来使用
if(className.contains("com.dubbo.api")){
Class<?> aClass = Class.forName(className);
Object o = Proxy.getProxy(aClass).newInstance(new DubboHandler(referenceConfig));
beanFactory.registerSingleton(beanName,o);
return;
}

Object o = referenceConfig.get();
beanFactory.registerSingleton(beanName,o);

}

dubbo动态调用,参数可从数据库中获取,记得加缓存,每次从数据库中获取会影响性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
ini复制代码public class DubboHandler implements InvocationHandler {

private ReferenceConfig referenceConfig;


public DubboHandler(){

}

public DubboHandler(ReferenceConfig referenceConfig){
this.referenceConfig = referenceConfig;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
DubboConfigService dubboConfigService = ContextUtils.getApplicationContext().getBean(DubboConfigService.class);
DubboConfig dubboConfig = dubboConfigService.getDubboConfig();
RegistryConfig registryConfig = new RegistryConfig();
referenceConfig.setProtocol("zookeeper");
registryConfig.setAddress(dubboConfig.getAddr());
referenceConfig.setGroup(dubboConfig.getGroup());
referenceConfig.setVersion(dubboConfig.getVersion());
referenceConfig.setRegistry(registryConfig);
ReferenceConfig newReferenceConfig = new ReferenceConfig();
BeanUtils.copyProperties(referenceConfig ,newReferenceConfig);
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
Object o = cache.get(newReferenceConfig);
return method.invoke(o, args);
}

}

如果只是注册服务获取引用服务使用其中一个方法就行了。
第一次写文章,有不对的地方或者写得不好的地方大家多多指点,共同进步。。。。。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

websocket+netty实时视频弹幕交互功能(Java

发表于 2021-09-01

2021年了,还有不支持弹幕的视频网站吗,现在各种弹幕玩法层出不穷,抽奖,ppt都上弹幕玩法了,不整个弹幕都说不过去了,今天笔者就抽空做了一个实时视频弹幕交互功能的实现,不得不说这样的形式为看视频看直播,讲义PPT,抽奖等形式增加了许多乐趣。

1 技术选型

1.1 netty
官方对于netty的描述:

netty.io/

主要关键词描述:netty是异步事件驱动网络框架,可做各种协议服务端,并且支持了FTP,SMTP,HTTP等很多协议,并且性能,稳定性,灵活性都很棒。
在这里插入图片描述
可以看到netty整体架构上分了三个部分:

  • 以零拷贝,一致性接口,扩展事件模型的底层核心。
  • Socket,Datagram,Pipe,Http Tunnel作为传输媒介。
  • 传输支持的各种协议,HTTP&WebSocket,SSL,大文件,zlib/gzip压缩,文本,二进制,Google Protobuf等各种各种的传输形式。

1.2 WebSocket
WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。

WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

1.3 为什么做这样的技术选型。
由上述可知,实时直播交互作为互动式是一个双向数据传输过程。所以使用webSocket。
netty本身支持了webSocket协议的实现,让实现更加简单方便。

2 实现思路

2.1 服务架构
整体架构是所有客户端都和我的服务端开启一个双向通道的架构。
在这里插入图片描述
2.2 传输流程
在这里插入图片描述

3 实现效果

3.1 视频展示
先看看效果吧,是不是perfect,接下来就来看具体代码是怎么实现的吧。
在这里插入图片描述
图片视频直播弹幕示例

4 代码实现

4.1 项目结构
一个maven项目,将代码放一个包下就行。
在这里插入图片描述
4.2 Java服务端
Java服务端代码,总共三个类,Server,Initailizer和 Handler。

4.2.1 先做一个netty nio的服务端:
一个nio的服务,开启一个tcp端口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
markdown复制代码import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
* Copyright(c)lbhbinhao@163.com
* @author liubinhao
* @date 2021/1/14
* ++++ ______ ______ ______
* +++/ /| / /| / /|
* +/_____/ | /_____/ | /_____/ |
* | | | | | | | | |
* | | | | | |________| | |
* | | | | | / | | |
* | | | | |/___________| | |
* | | |___________________ | |____________| | |
* | | / / | | | | | | |
* | |/ _________________/ / | | / | | /
* |_________________________|/b |_____|/ |_____|/
*/
public enum BulletChatServer {
/**
* Server instance
*/
SERVER;

private BulletChatServer(){
EventLoopGroup mainGroup = new NioEventLoopGroup();
EventLoopGroup subGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(mainGroup,subGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new BulletChatInitializer());
ChannelFuture future = server.bind(9123);
}

public static void main(String[] args) {

}

}

4.2.2 服务端的具体处理逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
markdown复制代码import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;

/**
* Copyright(c)lbhbinhao@163.com
*
* @author liubinhao
* @date 2021/1/14
* ++++ ______ ______ ______
* +++/ /| / /| / /|
* +/_____/ | /_____/ | /_____/ |
* | | | | | | | | |
* | | | | | |________| | |
* | | | | | / | | |
* | | | | |/___________| | |
* | | |___________________ | |____________| | |
* | | / / | | | | | | |
* | |/ _________________/ / | | / | | /
* |_________________________|/b |_____|/ |_____|/
*/

public class BulletChatInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(1024*64));
pipeline.addLast(new IdleStateHandler(8, 10, 12));
pipeline.addLast(new WebSocketServerProtocolHandler("/lbh"));
pipeline.addLast(new BulletChatHandler());
}
}

后台处理逻辑,接受到消息,写出到所有的客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
java复制代码import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
* Copyright(c)lbhbinhao@163.com
*
* @author liubinhao
* @date 2021/1/14
* ++++ ______ ______ ______
* +++/ /| / /| / /|
* +/_____/ | /_____/ | /_____/ |
* | | | | | | | | |
* | | | | | |________| | |
* | | | | | / | | |
* | | | | |/___________| | |
* | | |___________________ | |____________| | |
* | | / / | | | | | | |
* | |/ _________________/ / | | / | | /
* |_________________________|/b |_____|/ |_____|/
*/

public class BulletChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 用于记录和管理所有客户端的channel
public static ChannelGroup channels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// 获取客户端传输过来的消息
String content = msg.text();
System.err.println("收到消息:"+ content);
channels.writeAndFlush(new TextWebSocketFrame(content));
System.err.println("写出消息完成:"+content);
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
channels.add(ctx.channel());
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

String channelId = ctx.channel().id().asShortText();
System.out.println("客户端被移除,channelId为:" + channelId);
channels.remove(ctx.channel());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
// 发生异常之后关闭连接(关闭channel),随后从ChannelGroup中移除
ctx.channel().close();
channels.remove(ctx.channel());
}

}

4.3 网页客户端实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
xml复制代码<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<title>Netty视频弹幕实现 Author:Binhao Liu</title>
<link rel="stylesheet" href="">
<style type="text/css" media="screen">
* {
margin: 0px;
padding: 0px
}

html, body {
height: 100%
}

body {
overflow: hidden;
background-color: #FFF;
text-align: center;
}

.flex-column {
display: flex;
flex-direction: column;
justify-content: space-between;, align-items: center;
}

.flex-row {
display: flex;
flex-direction: row;
justify-content: center;
align-items: center;
}

.wrap {
overflow: hidden;
width: 70%;
height: 600px;
margin: 100px auto;
padding: 20px;
background-color: transparent;
box-shadow: 0 0 9px #222;
border-radius: 20px;
}

.wrap .box {
position: relative;
width: 100%;
height: 90%;
background-color: #000000;
border-radius: 10px
}

.wrap .box span {
position: absolute;
top: 10px;
left: 20px;
display: block;
padding: 10px;
color: #336688
}

.wrap .send {
display: flex;
width: 100%;
height: 10%;
background-color: #000000;
border-radius: 8px
}

.wrap .send input {
width: 40%;
height: 60%;
border: 0;
outline: 0;
border-radius: 5px 0px 0px 5px;
box-shadow: 0px 0px 5px #d9d9d9;
text-indent: 1em
}

.wrap .send .send-btn {
width: 100px;
height: 60%;
background-color: #fe943b;
color: #FFF;
text-align: center;
border-radius: 0px 5px 5px 0px;
line-height: 30px;
cursor: pointer;
}

.wrap .send .send-btn:hover {
background-color: #4cacdc
}
</style>
</head>
<script>
var ws = new WebSocket("ws://localhost:9123/lbh");

ws.onopen = function () {
// Web Socket 已连接上,使用 send() 方法发送数据
alert("数据发送中...");
};
ws.onmessage = function (e) {
console.log("接受到消息:"+e.data);
createEle(e.data);
};
ws.onclose = function () {
// 关闭 websocket
alert("连接已关闭...");
};
function sendMsg(msg) {
ws.send(msg)
}


</script>
<body>
<div class="wrap flex-column">
<div class="box">
<video src="shape.mp4" width="100%" height="100%" controls autoplay></video>
</div>
<div class="send flex-row">

<input type="text" class="con" placeholder="弹幕发送[]~(^v^)~*"/>

<div class="send-btn" onclick="javascript:sendMsg(document.querySelector('.con').value)">发送</div>
</div>
</div>
<script src="https://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js" type="text/javascript"></script>
<script>
//1.获取元素
var oBox = document.querySelector('.box'); //获取.box元素
var cW = oBox.offsetWidth; //获取box的宽度
var cH = oBox.offsetHeight; //获取box的高度
function createEle(txt) {
//动态生成span标签
var oMessage = document.createElement('span'); //创建标签
oMessage.innerHTML = txt; //接收参数txt并且生成替换内容
oMessage.style.left = cW + 'px'; //初始化生成位置x
oBox.appendChild(oMessage); //把标签塞到oBox里面
roll.call(oMessage, {
//call改变函数内部this的指向
timing: ['linear', 'ease-out'][~~(Math.random() * 2)],
color: '#' + (~~(Math.random() * (1 << 24))).toString(16),
top: random(0, cH),
fontSize: random(16, 32)
});
}

function roll(opt) {
//弹幕滚动
//如果对象中不存在timing 初始化
opt.timing = opt.timing || 'linear';
opt.color = opt.color || '#fff';
opt.top = opt.top || 0;
opt.fontSize = opt.fontSize || 16;
this._left = parseInt(this.offsetLeft); //获取当前left的值
this.style.color = opt.color; //初始化颜色
this.style.top = opt.top + 'px';
this.style.fontSize = opt.fontSize + 'px';
this.timer = setInterval(function () {
if (this._left <= 100) {
clearInterval(this.timer); //终止定时器
this.parentNode.removeChild(this);
return; //终止函数
}
switch (opt.timing) {
case 'linear': //如果匀速
this._left += -2;
break;
case 'ease-out': //
this._left += (0 - this._left) * .01;
break;
}
this.style.left = this._left + 'px';
}.bind(this), 1000 / 60);
}

function random(start, end) {
//随机数封装
return start + ~~(Math.random() * (end - start));
}

var aLi = document.querySelectorAll('li'); //10

function forEach(ele, cb) {
for (var i = 0, len = aLi.length; i < len; i++) {
cb && cb(ele[i], i);
}
}

forEach(aLi, function (ele, i) {
ele.style.left = i * 100 + 'px';
});
//产生闭包
var obj = {
num: 1,
add: function () {
this.num++; //obj.num = 2;
(function () {
console.log(this.num);
})
}
};
obj.add();//window

</script>
</body>
</html>

这样一个实时的视频弹幕功能就完成啦,是不是很简单,各位小伙伴快来试试吧。

5 小结

上班撸代码,下班继续撸代码写博客,这个还是很简单,笔者写这个的时候一会儿就写完了,不过这也得益于笔者很久以前就写过netty的服务,对于Http,Tcp之类协议也比较熟悉,只有前端会有些难度,问下度娘,也很快能做完,在此分享出来与诸君分享。

来源:binhao.blog.csdn.net/article/details/112631642

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

基于apache flink的流处理前三章分享 第1章 状态

发表于 2021-09-01

第1章 状态化流处理概述

传统数据处理

绝大多数企业所实现的传统架构都会将数据处理分为两类:

  • 事务型处理
  • 分析型处理

事务型处理

企业在日常业务运营过程中会用到各类应用,例如:客户管理管理软件、基于Web的应用等,这些应用系统通常都会设置独立的数据处理层(应用程序本身)和数据存储层(事务型数据库系统)。

这些应用通常会连接外部服务或实际用户,并持续处理诸如订单、邮件、网站点击等传入的数据。期间每处理一条事件,应用都会通过执行远程数据库系统的事务来读取或者更新状态,多个应用会共享同一个数据库系统,有时候还会访问相同的数据库或表。

在这里插入图片描述

分析型处理

存储于不同事务型数据库系统中的数据,可以为企业提供业务运营相关的分析见解。然而用于存储事务性数据的多个数据库系统通常都是相互隔离的,对于分析类查询,我们通常不会直接在事务型数据库上进行,而是将数据复制到一个撰文用来处理分析类查询的数据仓库为了填充数据仓库,需要将事务型数据库系统中数据拷贝过去。这个向数据仓库拷贝数据的过程被称为提取-转换-加载(Extract-Transform-Load,ETL)。

ETL的基本流程是:

  1. 从事务型数据库中提取数据
  2. 将其转换为通用表示形式(可能包含数据验证、数据归一化、编码、去重、表模式转换等工作)
  3. 加载到数据分析数据库中

为了保持数据仓库中的数据同步,ETL过程需要周期性的执行
1. 在这里插入图片描述

状态化流处理

几乎所有的数据都是以连续事件流的形式产生的。事实上,现实世界中很难找到那种瞬间就生成完整数据集的例子。

任何一个处理事件流的应用,如果要支持跨多条记录的转换操作,都必须是有状态的,即能够存储和访问中间结果。应用收到事件后可以执行包括读写状态在内的任意计算。原则上,需要在应用中访问的状态有多种可选的存储位置,例如:程序变量、本地文件、嵌入式或外部数据库等。

在这里插入图片描述

有状态的流处理应用通常分为三类:

  1. 事件驱动型应用,通过接受事件流触发特定应用业务逻辑的有状态的流式应用,如实时推荐、异常检测等
  2. 数据管道型应用,以低延迟的方式从不同的外部系统获取、转换并插入数据,并在段时间内处理大批量数据的应用,提供多样化的数据源、数据汇连接器。Flink可以做到上述一切。
  3. 数据分析型应用,主要有周期性的批处理和持续性的流处理两类应用。

Flink快览

Apache Flink是一个集众多具有竞争力的特性于一身的第三代流处理引擎。它支持精确的流处理,能同事满足各种规模下对高吞吐和低延迟的要求,尤其是以下功能使其能在同类系统中脱颖而出:

  • 同时支持事件时间和处理时间语义
  • 提供精确一次的状态一致性保障
  • 在每秒处理数百万条事件的同时保持毫秒级延迟
  • 层次化的API
  • 常见的存储系统的连接器
  • 支持高可用配置
  • 允许在不丢失应用状态的前提下更新作业代码,或进行跨Flink集群的作业迁移
  • 提供详细、可定制的系统及应用指标(metrics)集合,用于提前定位和响应问题
  • Flink同时也是一个成熟的批处理引擎(批是流的特例,即有界流)

第2章 流处理基础

Dataflow编程概览

Dataflow图

Dataflow程序描述了数据如何在不同的操作之间流动。Dataflow程序通常表示为有向图。图中顶点称为算子(逻辑Dataflow图称为算子,物理Dataflow图称为任务),表示计算;而边表示数据依赖关系。算子是Dataflow程序的基本功能单元,他们从输入获取数据,对其进行计算,然后产生数据并发往输出供后续处理。没有输入端的算子成为数据源,没有输出端的算子成为数据汇。一个Dataflow图至少有一个数据源和一个数据汇。

数据并行和任务并行

数据并行:将输入数据分组,让同一操作的多个任务并行执行在不同数据子集上。将计算负载分配到多个节点上从而允许处理大规模的数据

任务并行:让不同算子的任务(基于相同或不通的数据)并行计算,可以更好的利用集群的计算资源

数据交换

数据交换策略定义了如何将数据项分配给物理Dataflow图中的不同任务。常见有如下四种数据交换策略:

  • 转发策略:在发送端任务和接收端任务之间一对一的进行数据传输。如果两端的任务运行在同一物理机器上,可以避免网络通信
  • 广播策略:把每个数据项发往下游算子的全部任务
  • 基于键值的策略:根据魔衣键值属性对数据分区,并保证键值相同的数据项会交由同一任务处理
  • 随机策略:将数据均匀分配至算子的所有任务,以实现计算任务的负载均衡

并行流处理

数据流定义:一个可能无限的事件序列

延迟和吞吐

延迟:处理一个事件所需的时间。本质上,它是从接收事件到在输出中观察到事件处理效果的时间间隔。

吞吐:用来衡量系统处理能力(处理速率)的指标,它告诉我们系统每单位时间可以处理多少事件。如果系统持续以力不能及的高速率接收数据,那么缓冲区可能会用尽,继而导致数据丢失,这种情形同城称为被压。

延迟和吞吐并非相互独立的指标。如果事件在数据处理管道中传输时间太久,我们将难以保证高吞吐;同样,如果系统性能不足,事件很容易堆积缓冲,必须等待一段时间才能处理。

数据流上的操作

流处理引擎通常会提供一系列内置操作来实现数据流的获取、转换,以及输出。这些算子可以组合生成Dataflow处理图,从而时间流式应用所需的逻辑。常见有如下流式操作:

数据接入和数据输出

数据接入和数据输出操作允许流处理引擎和外部系统通信。

数据接入操作是从外部数据源获取原始数据并将其转换成合适后续处理的格式,该类算子称为数据源。

数据输出操作是将数据以合适外部系统使用的格式输出,该类算子称为数据汇。

转换操作

转换操作是一类”只过一次“的操作,它们会分别处理每个事件,对其应用某些转换并产生一条心的输出流。

滚动聚合

滚动聚合(如求和、求最值)会根据每个到来的事件持续更新结果。聚合操作都是有状态的,它们通过将新到来的事件合并到已有状态来生成更新后的聚合值。

窗口操作

有些操作必须收集并缓冲记录才能计算结果,例如流式join或像是求中位数的整体聚合。为了在无限数据流上高效的执行这些操作,必须对操作的数据加以限制。窗口操作会持续创建一些称为“桶”的有限事件合集,并允许我们基于这些有限集进行计算。

常见有如下几种窗口类型:

滚动窗口:将事件分配到长度固定且互不重叠的桶中。在窗口边界通过后,所有事件会发送给计算函数处理。可分为基于数量的滚动窗口和基于时间的滚动窗口。

滑动窗口:将事件分配到大小固定且允许重叠的桶中,这意味着每个事件可能会同时属于多个桶。我们指定长度和滑动间隔连定义滑动窗口。

会话窗口:将属于同一会话的事件分配到相同桶中。会话窗口根据会话间隔将事件分为不同的会话,该间隔值定义了会话在关闭前的非活动事件长度。

时间语义

处理时间

处理时间是当前流处理算子所在机器上的本地时钟时间。

时间事件

事件时间是数据流中事件实际发生的时间,它以附加在数据流中事件的时间戳为依据。这些时间戳通常在事件数据进入流处理管道之前就存在。

事件时间将处理速度和结果内容彻底解耦。基于事件时间的操作是可预测的,其结果具有确定性。无论数据流的处理速度如何、事件到达算子的顺序怎样,基于事件时间的窗口都会生成同样的结果。

使用事件时间要克服的挑战之一是如何处理延迟事件。普遍存在的无序问题也可以借此解决。

水位线

水位线是一个全局进度指标,表示我们确信不会再有延迟事件到来的某个时间点。本质上,水位线提供了一个逻辑时钟,用来通知系统当前的事件时间。当一个算子收到事件为T的水位线,就可以认为不会再收到任何时间戳小于或等于T的事件了。

状态和一致性模型

状态在数据处理中无处不在,任何一个稍复杂的计算都要用它。不难想象,支持有状态算子将面临很多实现上的挑战:

  1. 状态管理:系统需要高效的管理状态并保证它们不受并发更新影响
  2. 状态划分:把状态按照键值划分,并独立管理每一部分
  3. 状态恢复:最后一个也是最大的挑战在于,有状态算子需要保证状态可以恢复,并且即使出现故障也要确保结果正确。

结果保障

结果保障指的是流处理引擎内部状态的一致性。结果保障可分为如下几种:

  • 至多一次:保证每个事件至多被处理一次,在故障时既不恢复丢失的状态,也不重放丢失的事件
  • 至少一次:所有的事件都会被处理,但有些可能会被处理多次。为了确保至少一次语义,需要从源头或者缓冲区中重放事件。
  • 精确一次:既不丢失事件,也不重复处理事件。

第3章 Apache Flink架构

系统架构

Flink是一个用于状态化并行流处理的分布式系统。Flink在已有集群基础设施和服务至上专注于它的核心功能——分布式数据流处理。Flink和很多集群管理器(如Apache Mesos、YARN及Kubernets)都能很好的集成;同时它也可以通过配置,作为独立的集群来运行。Flink没有提供分布式持久化存储,而是利用了现有的分布式文件系统(如HDFS)或对象存储(如S3)。它依赖Apache Zookeeper来完成高可用性设置中的领导选举。

搭建Flink所需组件

  • JobManager:作为主进程,JobManager控制着单个应用程序的执行。换句话说,每个应用都由一个不同的JobManager掌控。JobManager接收需要执行的应用,该应用会包含一个所谓的JobGraph,JobManager将其转化为ExecutionGraph,然后从ResourceManager申请执行任务的必要资源(处理槽),然后在将ExecutionGraph中的任务分发给TaskManager来执行。在执行的过程中JobManager还要负责所有需要集中协调的操作,如创建检查点。
  • ResourceManager:负责管理Flink的处理资源单元——TaskManager处理槽。当JobManager申请TaskManager处理槽时,ResourceManager会指示一个拥有空闲处理槽的TaskManager将其处理槽提供给JobManager。如果处理槽数无法满足JobManager的请求,ResourceManager可以和资源提供者通信,让它们提供额外容器来启动更多的TaskManager进程。同时,ResourceManager还负责终止空闲的TaskManager以释放计算资源。
  • TaskManager:工作进程。通常在Flink搭建过程中会启动多个TaskManager,每个TaskManager提供一定数量的处理槽,处理槽的数目限制了一个TaskManager可执行的任务数。
  • Dispatcher:跨多个作业运行。提供一个REST接口来让我们提交需要执行的应用。一旦某个应用提交执行,Dispatcher会启动一个JobManager并将应用转交给它。Dispatcher同时还会启动一个WebUI,用来提供有关作业执行的信息。

在这里插入图片描述

高可用设置

TaskManager故障

如果部分TaskManager故障,JobManager会向ResourceManager申请相应数量的处理槽。如果无法完成,JobManager将无法重启应用,直到有足够数量的可用处理槽。

JobManager故障

JobManager用于控制流式应用执行以及保存该过程中的源数据,如果JobManager进程消失,流式应用将无法继续处理数据。这就导致JobManager成为Flink应用中的一个单点失效组件。

JobManager在高可用模式下工作时,会依赖Zookeeper完成作业管理及元数据的迁移。具体步骤如下:

  1. JobManager将JobGraph以及全部所需元数据(例如应用的JAR文件)写入一个远程持久化存储系统中
  2. 将存储位置的路径地址写入ZK
  3. JobManager故障时,新进程从ZK获取存储位置,并从远程持久化存储系统中获取相关数据,申请处理槽,重启应用并利用最近一次检查点重置任务状态

Flink中的数据传输

在运行过程中,应用的任务会持续进行数据交换。TaskManager负责将数据从发送任务传输至接收任务。记录并非逐个发送的,而是在缓冲区中以批次形式发送,该技术是有效利用网络资源、实现高吞吐的基础。

发送端和接收端的任务运行在不同的TaskManager进程中时,数据交换需要利用操作系统的网络栈进行通信。在同一个TaskManager进程中时,数据会放在缓冲区和队列中,不涉及网络通信。

基于信用值的流量控制

通过网络连接逐条发送记录不但低效,还会导致很多额外开销。若想充分利用网络连接带宽,就要对数据进行缓冲。在流处理环境下,缓冲的一个明显缺点是会增加延迟,因为记录首先要收集到缓冲区而不会立即发送。Flink实现了一个基于信用值的流量控制机制,它的工作原理如下:接收任务会给发送任务授予一定信用值,其实就是保留一些用来接收它的数据的网络缓冲。一旦发送端收到信用通知,就会在信用值所限定的范围内尽可能多地传输缓冲数据,并会附带上积压量(已经填满准备传输的网络缓冲数目)大小。接收端使用保留的缓冲来处理收到的数据,同时依据各发送端的积压量信息来计算所有相连的发送端在下一轮的信用优先级。

任务链接

任务链接是Flink采用的一种用来降低某些情况下的本地通信开销的优化技术。任务链接的前提条件是,多个算子必须有相同的并行度且通过本地转发通道相连。

在这里插入图片描述)在这里插入图片描述

事件时间处理

时间戳

在事件时间模式下,Flink流式应用处理的所有记录都必须包含时间戳。时间戳将记录和特定的时间点关关联,这些时间点通常是记录所对应事件的发生时间。但实际上应用可以自由的选择时间戳的含义,只要保证流记录的时间戳会随着数据流的前进大致递增即可。

水位线

除了记录的时间戳,Flink基于事件时间的应用还必须提供水位线。水位线用于在事件时间应用中推断每个任务当前的事件时间。基于时间的算子会使用这个时间来触发计算并推动进度前进。

水位线本质上是一个包含时间戳信息的特殊记录。

水位线拥有两个基本特征:

  1. 必须单调递增。这是为了确保任务中的事件时间时钟正确前进,不会倒退
  2. 和记录的时间戳存在联系。一个时间戳为T的水位线表示,接下来所有记录的时间戳一定都大于T

状态管理

在Flink中,状态都是和特定的算子相关联。根据作用域的不同,状态可以分为两类:算子状态和键值分区状态。

算子状态

算子状态的作用域是某个算子任务,这意味着所有在同一个并行任务之内的记录都能访问到相同的状态。

键值分区状态

键值分区状态会按照算子输入记录所定义的键值来进行维护或访问。Flink为每个键值都维护了一个状态,该状态总是位于处理对应键值记录的算子任务上。

状态后端

为了保证快速访问状态,每个并行任务都会把状态维护在本地。至于状态具体的存储、访问和维护,则是由一个成为状态后端的可拔插组件来决定。状态后端主要负责两件事:本地状态管理和将状态以检查点的方式写入远程持久化存储中。

检查点、保存点及状态恢复

一致性检查点

Flink的故障恢复机制需要基于应用状态的一致性检查点。有状态的流式应用的一致性检查点是在所有任务处理完灯亮的原始输入后对全部任务状态进行的一个拷贝。

在这里插入图片描述

从一致性检查点中恢复

应用恢复需要经过3个步骤:

  1. 重启整个应用
  2. 利用最新的检查点重置任务状态
  3. 恢复所有任务的运行

Flink检查点算法

  • Flink 的检查点算法用到了一种称为分界线(barrier)的特殊数据形式,用来把一条流上数据按照不同的检查点分开
  • 分界线之前到来的数据导致的状态更改,都会被包含在当前分界线所属的检查点中;而基于分界线之后的数据导致的所有更改,就会被包含在之后的检查点中。

算法操作解析 :

  1. 现在是一个有两个输入流的应用程序,用并行的两个 Source 任务来读取
  2. 两条自然数数据流,蓝色数据流已经输出完蓝3了,黄色数据流输出完黄4了
  3. 在Souce端 Source1 接收到了数据蓝3 正在往下游发向一个数据蓝2 和 蓝3; Source2 接受到了数据黄4,且往下游发送数据黄4
  4. 偶数流已经处理完黄2 所以后面显示为2, 奇数流处理完蓝1 和 黄1 黄3 所以为5 并分别往下游发送每次聚合后的结果给Sink

在这里插入图片描述
5. JobManager 会向每个 source 任务发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点,这个带有新检查点ID的东西为barrier,图中三角型表示,2只是ID

在这里插入图片描述
6. 在Source端接受到barrier后,将自己此身的3 和 4 的数据,将它们的状态写入检查点,且向JobManager发送checkpoint成功的消息(状态后端在状态存入检查点之后,会返回通知给 source 任务,source 任务就会向 JobManager 确认检查点完),然后向下游分别发出一个检查点 barrier
7. 可以看出在Source接受barrier时,数据流也在不断的处理,不会进行中断,
8. 此时的偶数流已经处理完蓝2变成了4,但是还没处理到黄4,只是下游发送了一个次数的数据4,而奇数流已经处理完蓝3变成了8,并向下游发送了8
9. 此时barrier都还未到奇数流和偶数流

在这里插入图片描述
10. 此时蓝色流的barrier先一步抵达了偶数流,黄色的barrier还未到,但是因为数据的不中断一直处理,此时的先到的蓝色的barrier会将此时的偶数流的数据4进行缓存处理,流接着处理接下来的数据等待着黄色的barrier的到来,而黄色barrier之前的数据将会对缓存的数据相加
11. 这次处理的总结:分界线对齐,barrier 向下游传递,sum 任务会等待所有输入分区的 barrier 到达,对于barrier已经到达的分区,继续到达的数据会被缓存。而barrier尚未到达的分区,数据会被正常处理

在这里插入图片描述
12. 当蓝色的barrier和黄色的barrier(所有分区的)都到达后,进行状态保存到远程仓库,然后对JobManager发送消息,说自己的检查点保存完毕了
13. 此时的偶数流和奇数流都为8
14. 当收到所有输入分区的 barrier 时,任务就将其状态保存到状态后端的检查点中,然后将 barrier 继续向下游转发

在这里插入图片描述
15. 向下游转发检查点 barrier 后,任务继续正常的数据处理

在这里插入图片描述
16. Sink 任务向 JobManager 确认状态保存到 checkpoint 完毕
17. 当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了

在这里插入图片描述

保存点

原则上,保存点的生成算法和检查点完全一样,因此可以把保存点看做包含一些额外元数据的检查点。保存点的生成不是由Flink自动完成,而是需要由用户(外部调度器)显式触发。同时,Flink也不会自动清理保存点。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

redisson分布式锁使用小记 2 踩坑异常

发表于 2021-09-01

首先关于redisson的介绍,这里就不搬运了,贴一下github原地址:

  1. 概述

由于我这里只是简单使用了redisson的 分布式锁 的功能,这里仅记录下锁的简单使用。

官方文档:8.分布式锁和同步器

此次所用锁为可重入锁

8.1. 可重入锁(Reentrant Lock)

基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。

1
2
3
js复制代码RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();

大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

1
2
3
4
5
6
7
8
9
10
11
12
13
js复制代码// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}

Redisson同时还为分布式锁提供了异步执行的相关方法:

1
2
3
4
js复制代码RLock lock = redisson.getLock("anyLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);

RLock对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException错误。但是如果遇到需要其他进程也能解锁的情况,请使用分布式信号量Semaphore 对象.

首先springboot整合redisson需要引入redisson的依赖:

1
2
3
4
5
js复制代码<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>

亲测以上版本,配合springboot:2.2.5.RELEASE版本,正常使用。

单机版redis的配置文件跟原来springboot集成redis一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
yaml复制代码spring:
# Redis配置
redis:
timeout: 6000 # 连接超时时长(毫秒)
password: huauN@2021
database: 0
host: 192.168.104.64
port: 6379
#cluster:
#max-redirects: 3 # 获取失败 最大重定向次数
#nodes:
#- 192.168.104.101:6379
lettuce:
pool:
max-active: 1024 # 连接池最大连接数(默认为8,-1表示无限制 如果pool已经分配了超过max_active个jedis实例,则此时pool为耗尽)
max-wait: 10000 #最大等待连接时间,单位毫秒 默认为-1,表示永不超时,超时会抛出JedisConnectionException
max-idle: 10
min-idle: 5

Redis配置映射类 RedisConfigProperties.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
typescript复制代码
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.List;

/**
* Redis配置映射类
*
* @author linmengmeng
* @date 2021-03-11
**/

@Component
@ConfigurationProperties(prefix = "spring.redis")
public class RedisConfigProperties {
private Integer timeout;
private Integer database;
private Integer port;
private String host;
private String password;
private cluster cluster;

public static class cluster {
private List<String> nodes;

public List<String> getNodes() {
return nodes;
}

public void setNodes(List<String> nodes) {
this.nodes = nodes;
}
}

public Integer getTimeout() {
return timeout;
}

public void setTimeout(Integer timeout) {
this.timeout = timeout;
}

public Integer getDatabase() {
return database;
}

public void setDatabase(Integer database) {
this.database = database;
}

public Integer getPort() {
return port;
}

public void setPort(Integer port) {
this.port = port;
}

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public RedisConfigProperties.cluster getCluster() {
return cluster;
}

public void setCluster(RedisConfigProperties.cluster cluster) {
this.cluster = cluster;
}
}

添加自动装配类:RedissonConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
js复制代码
import gc.cnnvd.config.properties.RedisConfigProperties;
import org.redisson.Redisson;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author linmengmeng
* @author 2021-08-30
*/
@Configuration
public class RedissonConfig {

@Autowired
private RedisConfigProperties redisConfigProperties;

/**
* redis://host:port
*/
private static final String REDIS_ADDRESS = "redis://%s:%s";

// /**
// * 集群模式-添加redisson的bean
// * @return
// */
// @Bean
// public Redisson redisson() {
// //redisson版本是3.5,集群的ip前面要加上“redis://”,不然会报错,3.2版本可不加
// List<String> clusterNodes = new ArrayList<>();
// for (int i = 0; i < redisConfigProperties.getCluster().getNodes().size(); i++) {
// clusterNodes.add("redis://" + redisConfigProperties.getCluster().getNodes().get(i));
// }
// Config config = new Config();
// ClusterServersConfig clusterServersConfig = config.useClusterServers()
// .addNodeAddress(clusterNodes.toArray(new String[clusterNodes.size()]));
// clusterServersConfig.setPassword(redisConfigProperties.getPassword());//设置密码
// return (Redisson) Redisson.create(config);
// }

/**
* Redisson单机模式
* @return
*/
@Bean
public Redisson RedissonConfig(){
Config config = new Config();
// config.useSingleServer().setAddress("redis://localhost:6379").setDatabase(redisConfigProperties.getDatabase());
config.useSingleServer().setAddress(String.format(REDIS_ADDRESS, redisConfigProperties.getHost(), redisConfigProperties.getPort()))
.setDatabase(redisConfigProperties.getDatabase())
.setPassword(redisConfigProperties.getPassword());// 没有密码可以不设置
return (Redisson) Redisson.create(config);
}
}

创建测试接口测试分布式锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
java复制代码package gc.cnnvd;

import gc.cnnvd.framework.common.api.ApiResult;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* @Auther linmengmeng
* @Date 2021-09-01 15:37
*/
@Slf4j
@RestController
@RequestMapping("/tourist")
public class TestRedissonLockController {

private static String FORMAT_LOCKKEY = "testLockKey:%s";//分布式锁的key

@Autowired
private RedisTemplate redisTemplate;

@Autowired
private Redisson redisson;

@PostMapping("/testLock11")
public ApiResult<Boolean> testLock11() {
String lockKey = String.format(FORMAT_LOCKKEY, 3);
log.info("-------lockKey:{}", lockKey);
RLock lock = redisson.getLock(lockKey);
log.info("-------创建锁之后 isLocked-1:{}", lock.isLocked());
// try {
// Thread.sleep(10000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
Future<Boolean> res = lock.tryLockAsync(10, 30, TimeUnit.SECONDS);
log.info("-------tryLockAsync 后 isLocked-2:{}", lock.isLocked());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (lock.isLocked()){
log.info("-------10秒 后----isLocked-3:{}", lock.isLocked());
//throw new BusinessException("测试获取锁后发生异常");
}
if (lock.isHeldByCurrentThread()){
log.info("-------isHeldByCurrentThread:{}", lock.isHeldByCurrentThread());
}
boolean result = false;
try {
result = res.get();
log.info("-------result:" + result);
if (result){
Thread.sleep(10000);
if (lock.isHeldByCurrentThread()){
log.info("-------isHeldByCurrentThread:{}", lock.isHeldByCurrentThread());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}finally {
log.info("-------666666-------unlock-isLocked:{}", lock.isLocked());
if (lock.isLocked()){
log.info("-------88888888-------解锁解锁:{}", lock.isLocked());
lock.unlock();
}
}
log.info("res.get:{}", result);
return ApiResult.ok(lock.isLocked());
}

@PostMapping("/testLock12")
public ApiResult<Boolean> testLock12() {
String lockKey = String.format(FORMAT_LOCKKEY, 3);
log.info("====================lockKey:{}", lockKey);
RLock lock = redisson.getLock(lockKey);
log.info("====================isLocked-1:{}", lock.isLocked());
Future<Boolean> res = lock.tryLockAsync(5, 2, TimeUnit.SECONDS);
boolean locked = lock.isLocked();
log.info("====================isLocked-2:{}", locked);
if (locked){
if (lock.isHeldByCurrentThread()){
log.info("====================锁住了,是我的锁");
}else {
log.info("====================锁住了,不是我的锁");
}
}
Boolean getLock = null;
log.info("====================getLock-2:{}", getLock);
return ApiResult.ok(locked);
}

}

上面的代码看着很乱,当时摸索着打的日志,为了更好的理解加锁与解锁机制。

在 testLock11 接口里面添加线程睡眠,模仿程序运行占用锁,这时可以在redis里面看到我们加锁的key:

image.png

刚开始一直没有找到key,后来才发现,线程运行完成后,自动释放了锁,后面把睡眠时间拉长,才找到redis里面的key。

后面在用到代码里面,对加锁和锁的判断使用如下:

  1. 设置加锁的唯一标识
  2. 获取锁,并持有,做自己的业务逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
csharp复制代码// 尝试获取锁-异步,第一个时间表示:最多等待时间,第二个时间参数表示:上锁以后多久自动解锁
Future<Boolean> res = lock.tryLockAsync(5, 5, TimeUnit.SECONDS);
if (lock.isLocked() && lock.isHeldByCurrentThread()){
// 处理业务逻辑
// 解锁
if (lock.isLocked()){
lock.unlock();
}
if (!lock.isLocked()){
log.info("解锁成功");
}
return;
}

这里判断加锁我使用了:lock.isLocked() && lock.isHeldByCurrentThread(),这样可以确保只有一个线程进入锁的部分。

释放锁的时候,又加了一个判断:lock.isLocked(),避免由于业务逻辑耗时超过锁的自动释放时间,在执行lock.unlock();时,如果锁已经释放,或者别的线程拿到锁了,当前线程释放锁会抛出异常:

  1. 手动释放或者到期自动释放
  1. 踩坑异常

  1. Error processing condition on org.springframework.boot.autoconfigure.cache.S

首次添加redisson依赖后,原来的redis配置不好使了,项目启动就报了上面的错。最后切换实例化CacheManager

参考 SpringBoot通过Cacheable注解完成redis缓存功能

redisson入门可参考:# SpringBoot整合Redisson(单机版)

集群版配置文件可参考:redisson版本_SpringBoot整合Redisson(集群版)

  1. o.redisson.client.handler.CommandsQueue : Exception occured. Channel:

刚开始由于使用了旧版本的redisson,发现项目启动后不久,控制台抛出此异常

参考:

www.cnblogs.com/junge8618/p…

www.jianshu.com/p/a89dbefb8…

切换redisson版本,解决此异常。

其他参考博客:

SpringBoot整合Redisson

REDIS分布式锁REDISSON扩展 这篇博客结合AOP整合了redisson锁,值得学习下。

锁的相关概念,理解:分布式锁,redisson是如何解决死锁问题

Redisson(1)分布式锁——如何解决死锁问题

使用Redisson实现分布式锁

如果翻到了最后,就再推荐一篇通俗易懂的博客: Spring Boot整合Redis实现简单的分布式锁
SpringBoot整合Redisson实现分布式锁

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

mybatis-plus 团队新作 mybatis-mate

发表于 2021-09-01

一、主要功能

  • 字典绑定
  • 字段加密
  • 数据脱敏
  • 表结构动态维护
  • 数据审计记录
  • 数据范围(数据权限)
  • 数据库分库分表、动态据源、读写分离、数据库健康检查自动切换。

二、使用

2.1 依赖导入

  • Spring Boot 引入自动依赖注解包
1
2
3
4
5
plain复制代码<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-mate-starter</artifactId>
<version>1.0.8</version>
</dependency>
  • 注解(实体分包使用)
1
2
3
4
5
plain复制代码<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-mate-annotation</artifactId>
<version>1.0.8</version>
</dependency>

2.2 字典绑定

  • 例如 user_sex 类型 sex 字典结果映射到 sexText 属性
1
2
3
4
java复制代码@FieldDict(type = "user_sex", target = "sexText")
private Integer sex;

private String sexText;
  • 实现 IDataDict 接口提供字典数据源,注入到 Spring 容器即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码@Component
public class DataDict implements IDataDict {

/**
* 从数据库或缓存中获取
*/
private Map<String, String> SEX_MAP = new ConcurrentHashMap<String, String>() {{
put("0", "女");
put("1", "男");
}};

@Override
public String getNameByCode(FieldDict fieldDict, String code) {
System.err.println("字段类型:" + fieldDict.type() + ",编码:" + code);
return SEX_MAP.get(code);
}
}

2.3 字段加密

属性 @FieldEncrypt 注解即可加密存储,会自动解密查询结果,支持全局配置加密密钥算法,及注解密钥算法,可以实现 IEncryptor 注入自定义算法。

1
2
java复制代码@FieldEncrypt(algorithm = Algorithm.PBEWithMD5AndDES)
private String password;

2.4 数据脱敏

  • 属性 @FieldSensitive 注解即可自动按照预设策略对源数据进行脱敏处理,默认 SensitiveType 内置 9 种常用脱敏策略。例如:中文名、银行卡账号、手机号码、固话号码、邮寄地址、电子邮箱、身份证号码、密码、车牌号 脱敏策略,也可以自定义策略如下:
1
2
3
4
5
java复制代码@FieldSensitive(type = "testStrategy")
private String username;

@FieldSensitive(type = SensitiveType.mobile)
private String mobile;
  • 自定义脱敏策略 testStrategy 添加到默认策略中注入 Spring 容器即可。
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Configuration
public class SensitiveStrategyConfig {

/**
* 注入脱敏策略
*/
@Bean
public ISensitiveStrategy sensitiveStrategy() {
// 自定义 testStrategy 类型脱敏处理
return new SensitiveStrategy().addStrategy("testStrategy", t -> t + "***test***");
}
}

2.5 DDL 数据结构自动维护

  • 解决升级表结构初始化,版本发布更新 SQL 维护问题,目前支持 MySql、PostgreSQL。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码@Component
public class PostgresDdl implements IDdl {

/**
* 执行 SQL 脚本方式
*/
@Override
public List<String> getSqlFiles() {
return Arrays.asList(
// 内置包方式
"db/tag-schema.sql",
// 文件绝对路径方式
"D:\\db\\tag-data.sql"
);
}
}
  • 不仅仅可以固定执行,也可以动态执行!!
1
2
3
java复制代码ddlScript.run(new StringReader("DELETE FROM user;\n" +
"INSERT INTO user (id, username, password, sex, email) VALUES\n" +
"(20, 'Duo', '123456', 0, 'Duo@baomidou.com');"));
  • 这样就完了吗??当然没有,它还支持多数据源执行!!!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码@Component
public class MysqlDdl implements IDdl {

@Override
public void sharding(Consumer<IDdl> consumer) {
// 多数据源指定,主库初始化从库自动同步
String group = "mysql";
ShardingGroupProperty sgp = ShardingKey.getDbGroupProperty(group);
if (null != sgp) {
// 主库
sgp.getMasterKeys().forEach(key -> {
ShardingKey.change(group + key);
consumer.accept(this);
});
// 从库
sgp.getSlaveKeys().forEach(key -> {
ShardingKey.change(group + key);
consumer.accept(this);
});
}
}

/**
* 执行 SQL 脚本方式
*/
@Override
public List<String> getSqlFiles() {
return Arrays.asList("db/user-mysql.sql");
}
}

2.6 动态多数据源主从自由切换

  • @Sharding 注解支持一句话使数据源不限制随意使用切换,你可以在 mapper 层添加注解,按需求指哪打哪!!
1
2
3
4
5
6
7
java复制代码@Mapper
@Sharding("mysql")
public interface UserMapper extends BaseMapper<User> {

@Sharding("postgres")
Long selectByUsername(String username);
}
  • 你也可以自定义策略统一调兵遣将
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码@Component
public class MyShardingStrategy extends RandomShardingStrategy {

/**
* 决定切换数据源 key {@link ShardingDatasource}
*
* @param group 动态数据库组
* @param invocation {@link Invocation}
* @param sqlCommandType {@link SqlCommandType}
*/
@Override
public void determineDatasourceKey(String group, Invocation invocation, SqlCommandType sqlCommandType) {
// 数据源组 group 自定义选择即可, keys 为数据源组内主从多节点,可随机选择或者自己控制
this.changeDatabaseKey(group, sqlCommandType, keys -> chooseKey(keys, invocation));
}
}

可以开启主从策略,当然也是可以开启健康检查!!!

2.7 数据权限

  • mapper 层添加注解:
1
2
3
4
5
6
7
8
9
java复制代码// 测试 test 类型数据权限范围,混合分页模式
@DataScope(type = "test", value = {
// 关联表 user 别名 u 指定部门字段权限
@DataColumn(alias = "u", name = "department_id"),
// 关联表 user 别名 u 指定手机号字段(自己判断处理)
@DataColumn(alias = "u", name = "mobile")
})
@Select("select u.* from user u")
List<User> selectTestList(IPage<User> page, Long id, @Param("name") String username);
  • 模拟业务处理逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码@Bean
public IDataScopeProvider dataScopeProvider() {
return new AbstractDataScopeProvider() {
@Override
protected void setWhere(PlainSelect plainSelect, Object[] args, DataScopeProperty dataScopeProperty) {
// args 中包含 mapper 方法的请求参数,需要使用可以自行获取
/*
// 测试数据权限,最终执行 SQL 语句
SELECT u.* FROM user u WHERE (u.department_id IN ('1', '2', '3', '5'))
AND u.mobile LIKE '%1533%'
*/
if ("test".equals(dataScopeProperty.getType())) {
// 业务 test 类型
List<DataColumnProperty> dataColumns = dataScopeProperty.getColumns();
for (DataColumnProperty dataColumn : dataColumns) {
if ("department_id".equals(dataColumn.getName())) {
// 追加部门字段 IN 条件,也可以是 SQL 语句
Set<String> deptIds = new HashSet<>();
deptIds.add("1");
deptIds.add("2");
deptIds.add("3");
deptIds.add("5");
ItemsList itemsList = new ExpressionList(deptIds.stream().map(StringValue::new).collect(Collectors.toList()));
InExpression inExpression = new InExpression(new Column(dataColumn.getAliasDotName()), itemsList);
if (null == plainSelect.getWhere()) {
// 不存在 where 条件
plainSelect.setWhere(new Parenthesis(inExpression));
} else {
// 存在 where 条件 and 处理
plainSelect.setWhere(new AndExpression(plainSelect.getWhere(), inExpression));
}
} else if ("mobile".equals(dataColumn.getName())) {
// 支持一个自定义条件
LikeExpression likeExpression = new LikeExpression();
likeExpression.setLeftExpression(new Column(dataColumn.getAliasDotName()));
likeExpression.setRightExpression(new StringValue("%1533%"));
plainSelect.setWhere(new AndExpression(plainSelect.getWhere(), likeExpression));
}
}
}
}
};
}
  • 最终执行 SQL 输出:
1
2
3
sql复制代码SELECT u.* FROM user u 
WHERE (u.department_id IN ('1', '2', '3', '5'))
AND u.mobile LIKE '%1533%' LIMIT 1, 10

三、最后

大家好,我是 如梦技术春哥(mica 微服务组件开源作者)笔者使用 mybatis-plus 已有 4 年多(资深老粉),mybatis-plus 帮助我们大大提升了开发效率,统一了企业内代码开发风格,降低维护成本。

如果大家在企业内有 mybatis-mate 使用场景,不妨支持一下。更多 mybatis-mate 使用示例详见:gitee.com/baomidou/my…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

docker实现jekins+gitlab自动部署 1安装

发表于 2021-09-01

1.安装docker

Docker 分为 CE 和 EE 两大版本。CE 即社区版(免费,支持周期 7 个月),EE 即企业版,强调安全,付费使用,支持周期 24 个月。

Ubuntu 安装 Docker CE

卸载旧版本
旧版本的 Docker 称为 docker 或者 docker-engine,使用以下命令卸载旧版本:

1
2
3
4
5
6
arduino复制代码$ sudo apt-get remove docker \
docker-engine \
docker.io
鉴于国内网络问题,强烈建议使用国内源。
为了确认所下载软件包的合法性,需要添加软件源的 GPG 密钥。
$ curl -fsSL https://mirrors.ustc.edu.cn/docker-ce/linux/ubuntu/gpg | sudo apt-key add -

然后,我们需要向 source.list 中添加 Docker 软件源

1
2
3
4
5
6
7
8
9
10
shell复制代码$ sudo add-apt-repository \
"deb [arch=amd64] https://mirrors.ustc.edu.cn/docker-ce/linux/ubuntu \
$(lsb_release -cs) \
stable"
安装 Docker CE
更新 apt 软件包缓存,并安装 docker-ce:

$ sudo apt-get update

$ sudo apt-get install docker-ce

Windows 10 PC 安装 Docker CE

系统要求

Docker for Windows 支持 64 位版本的 Windows 10 Pro,且必须开启 Hyper-V。

安装

点击以下链接下载 Stable 或 Edge 版本的 Docker for Windows。
下载好之后双击 Docker for Windows Installer.exe 开始安装。

镜像加速

鉴于国内网络问题,后续拉取 Docker 镜像十分缓慢,强烈建议安装 Docker 之后配置 国内镜像加速。
对于使用 Windows 10 的系统,在系统右下角托盘 Docker 图标内右键菜单选择 Settings,打开配置窗口后左侧导航菜单选择 Daemon。在 Registry mirrors 一栏中填写加速器地址 registry.docker-cn.com,之后点击 Apply 保存后 Docker 就会重启并应用配置的镜像地址了。

2.docker部署jenkins

1
2
3
4
5
6
7
8
9
10
11
12
13
14
bash复制代码创建目录并授权
mkdir -p /home/jenkins_home && chmod -R 777 /home/jenkins_home
启动容器
docker -run -it -d --restart always --name jenkins -p 8080:8080 -p 50000:50000 -v /home/jenkins_home:/var/jenkins_home jenkins
浏览器访问
http://122.144.144.144:8080/
安装gitlab-hook-pplugin 和 gitlab-plugin插件
jenkins安装maven
tar -xf apache-maven-3.6.0-bin.tar.gz -C /usr/local
vim /etc/profile
export M2_HOME=/usr/local/apache-maven-3.6.0
export CLASSPATH=$CLASSPATH:$M2_HOME/lib
export PATH=$PATH:$M2_HOME/bin
source /etc/profile

3.docker部署gitlab

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bash复制代码mkdir -p /home/gitlab/etc
mkdir -p /home/gitlab/log
mkdir -p /home/gitlab/data
sudo chmod -R 777 /home/gitlab/

docker run --detach -p 443:443 -p 8088:80 -p 22:22 --name gitab --restart always -v /home/gitlab/etc:/etc/gitlab -v /home/gitlab/log:/var/log/gitlab -v /home/gitlab/data /var/opt/gitlab beginor/gitlab-ce:10.4.1-ce.0

docker exec -it gitlab /bin/bash
vim /etc/gitlab/gitlab.rb

external_url 'http://192.168.2.137'

vim /var/opt/gitlab/gitlab-rails/etc/gitlab.yml

host:192.168.2.137
port:8088

重新配置
docker exec gitlab gitlab-ctl reconfigure

重启
docker exec gitlab gitlab-ctl restart

浏览器访问 http://122.144.144.144:8088

4.配秘钥实现免密登录

ssh-keygen -t rsa
jenkins的公钥发给代码服务器和gitlab服务器

5.jenkins构建

image.png

image.png

image.png

image.png

6.gitlab设置webhook

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Go117 新特性,凭什么让程序提速 5~10%?

发表于 2021-09-01

微信搜索【脑子进煎鱼了】关注这一只爆肝煎鱼。本文 GitHub github.com/eddycjy/blo… 已收录,有我的系列文章、资料和开源 Go 图书。

大家好,我是煎鱼。

在 Go1.17 发布后,我们惊喜的发现 Go 语言他又又又优化了,编译器改进后产生了约 5% 的性能提升,也没有什么破坏性修改,保证了向前兼容。

他做了些什么呢,好像没怎么看到有人提起。为此今天煎鱼带大家来解读两新提案:

  • 《Proposal: Register-based Go calling convention》
  • 《Proposal: Create an undefined internal calling convention》

本文会基于提案讲解和拆解,毕竟分享新知识肯定要从官方资料作为事实基准出发。

背景

在以往的 Go 版本中,Go 的调用约定简单且几乎跨平台通用,其原因在于选用了基于 Plan9 ABI 的堆栈调用约定,也就是函数的参数和返回值都是通过堆栈上来进行传递。

这里我们一共提到了 Plan9 和 ABI,这是两个很关键的理念:

  • Plan9:Go 语言所使用的汇编器,Rob Pike 是贝尔实验室的猛人。
  • ABI:Application Binary Interface(应用程序二进制接口),ABI 包含了应用程序在操作系统下运行时必须遵守的编程约定(例如:二进制接口)。

该方案的优缺点如下:

  • 优点:实现简单,简化了实现成本。
  • 缺点:性能方面付出了不少的代价。

按我理解,在 Go 语言初创时期,采取先简单实现,跑起来再说。也合理,性能倒不是一个 TOP1 需求。

Go1.17 优化

什么是调用惯例

在新版本的优化中,提到了调用惯例(calling convention)的概念,指的是调用方和被调用方对函数调用的共识约定。

这些共识包含:函数的参数、返回值、参数传递顺序、传递方式等。

双方都必须遵循这个约定时,程序的函数才能正常的运行起来。如果不遵循,那么该函数是没法运行起来的。

优化是什么

在 Go1.17 起,正式将把 Go 内部 ABI 规范(在 Go 函数之间使用)从基于堆栈的函数参数和结果传递的方式改为基于寄存器的函数参数和结果传递。

本次修改涉及到的项非常多,该优化是持续的,原本预计是 Go1.16 实现,不过拖到了 Go1.17。

目前实现了 amd64 和 arm64 架构的支持。还有不少的更多的支持会持续在 Go1.18 中完成,具体进度可见 issues #40724。

性能如何

在 Go1.17 Release Notes 中明确指出,用一组有代表性的 Go 包和程序的基准测试。

官方数据显示:

  • Go 程序的运行性能提高了约 5%。
  • Go 所编译出的二进制大小的减少约 2%。

在民间数据来看,在 twitter 看到 @Achille 表示从 Go1.15.7 升级到 Go1.17 后显示。在一个大规模的数据处理系统上进行的 Go1.17 升级产生了惊人的效果,我们来看看他的真实数据。

CPU、Malloc 调用时间减少了约15%:

图来自 @Achille

图来自 @Achille

RSS 大小更接近于堆的大小:

图来自 @Achille

从原本的 1.6GB 降至 1GB。

结合官方和民间数据来看,优化效果是明确且有效的。有兴趣的小伙伴也可以自己测一测。

总结

在 Go1.17 这一个新版本中,只需要简单的升一升 Go 版本,我们就能得到一定的性能优化,这是非常不错的。

从以往的基于堆栈的函数参数和结果传递的方式改为 Go1.17~Go1.18 基于寄存器的函数参数和结果传递,Go 语言正在一步步走的更好!

你觉得呢?

若有任何疑问欢迎评论区反馈和交流,最好的关系是互相成就,各位的点赞就是煎鱼创作的最大动力,感谢支持。

文章持续更新,可以微信搜【脑子进煎鱼了】阅读,回复【000】有我准备的一线大厂面试算法题解和资料;本文 GitHub github.com/eddycjy/blo… 已收录,欢迎 Star 催更。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

高性能利器:CDN我建议你好好学一下!

发表于 2021-09-01

硬核干货分享,欢迎关注【Java补习课】成长的路上,我们一起前行 !

《高可用系列文章》 已收录在专栏,欢迎关注!

CDN 概述

CDN 全称 Content Delivery Network,即内容分发网络。其基本思路是尽可能避开互联网上有可能影响数据传输速度和稳定性的瓶颈和环节,使内容传输的更快、更稳定

CDN 的工作原理 就是将源站的资源缓存CDN各个节点上,当请求命中了某个节点的资源缓存时,立即返回客户端,避免每个请求的资源都通过源站获取,避免网络拥塞、缓解源站压力,保证用户访问资源的速度和体验。

举一个生活中的例子,我们在某东上购买商品,快递能做到当日送达,其根本原理是通过在全国各地建设本地仓库。当用户购买商品时,通过智能仓配模式,为消费者选择就近仓库发货,从而缩短物流配送时间。

image.png

而商品库存的分配,流程可以参考下图,从 工厂(源站) -> 地域仓库(二级缓存) -> 本地仓库 (一级缓存)

image.png

内容分发网络 就像前面提到的 智能仓配网络 一样,解决了因分布、带宽、服务器性能带来的访问延迟问题,适用于站点加速、点播、直播等场景。使用户可就近取得所需内容,解决 Internet网络拥挤的状况,提高用户访问网站的响应速度和成功率。

CDN的诞生

image.png

CDN 诞生于二十多年前,为解决内容源服务器和传输骨干网络压力过大的问题,在 1995 年,麻省理工学院教授,互联网发明者之一 Tom Leighton 带领着研究生 Danny Lewin 和其他几位顶级研究人员一起尝试用数学问题解决网络拥堵问题。

他们使用数学算法,处理内容的动态路由安排,并最终解决了困扰 Internet 使用者的难题。后来,史隆管理学院的 MBA 学生 Jonathan Seelig 加入了 Leighton 的队伍中,从那以后他们开始实施自己的商业计划,最终于 1998 年 8 月 20 日正式成立公司,命名为 Akamai。Akamai 公司通过智能化的互联网分发,结束了 “World Wide Wait” 的尴尬局面。

同年 1998 年,中国第一家 CDN 公司 ChinaCache 成立

CDN工作原理

接入CDN

在接入CDN前,当我们访问某个域名,直接拿到第一个真实服务器的IP地址,整个流程如下(图有点简陋)

image.png

当我们需要加速网站时,通过向运营商注册自己加速域名,源站域名,然后进入到自己域名的DNS配置信息,将 A 记录修改成 CNAME 记录即可。阿里云加速申请参考如下:

image.png

CDN访问过程

image.png

  • 1、用户访问图片内容,先经过 本地DNS 解析,如果 LDNS 命中,直接返回给用户。
  • 2、LDNS MISS,转发 授权DNS 查询
  • 3、返回域名 CNAME picwebws.pstatp.com.wsglb0.com. 对应IP地址(实际就是DNS调度系统的ip地址)
  • 4、域名解析请求发送至DNS调度系统,DNS调度系统为请求分配最佳节点IP地址。
  • 5、返回的解析IP地址
  • 6、用户向缓存服务器发起请求,缓存服务器响应用户请求,将用户所需内容传送到用户终端。

图:华为云全站加速示意图
image.png

CDN解决了什么问题

骨干网压力过大

Tom Leighton在 1995 年, 带领团队尝试用数学问题解决网络拥堵问题,从而解决骨干网络压力过大的问题。由于上网冲浪 的少年越来越多,造成骨干网的核心节点流量吞吐不足以支撑互联网用户的增长,通过CDN可以避免用户流量流经骨干网。

骨干网是一个全球性的局域网,一级互联网服务提供商(ISP)将其高速光纤网络连接在一起,形成互联网的骨干网,实现在不同地理区域之间高效地传输流量。

1、局域网

局域网(Local Area Network,LAN)是指在某一区域内由多台计算机互联成的计算机组,比如:在大学时期,晚上12点后断网了,我们仍然能够通过路由器开黑打CS,魔兽。那就是基于局域网互联,实现资料共享与信息之间的通信。

image.png

2、骨干网

这里引用一下中国电信全网架构,骨干网可以理解成是一个全国性的局域网,通过核心节点的流量互通,实现全网网络的互通。这也是为什么我们称为互联网 的原因。

北京、上海、广州,是ChinaNet的超级核心。除了超级核心之外,ChinaNet还有天津、西安、南京、杭州、武汉、成都等普通核心。

image.png

三公里之 middlemile

通常网络访问中会有”三公里”路程

  • 第一公里为:源站到ISP接入点
  • 第二公里为:源站ISP接入点到访问用户的ISP接入点
  • 第三公里(最后一公里)为:用户ISP接入点到用户客户端

CDN网络层主要用来加速第二公里(middlemile),

在 CDN 的基础架构中,通常使用两级 server 做加速:

  • L1(下层):距离用户(或俗称网民)越近越好,通常用于缓存那些可缓存的静态数据,称之为 lastmile(最后一公里)。
  • L2(上层):距离源站越近越好,称之为 firstmile(第一公里),当 L1 无法命中缓存,或内容不可缓存时,请求会通过 L1 透传给 L2,若 L2 仍然没有命中缓存或内容不可缓存,则会继续透传给 L2 的 upstream(有可能是源站,也有可能是 L3),同时 L2 还可以做流量、请求数的量级收敛,减少回源量(如果可缓存),降低源站压力。
  • L1 和 L2 之间的部分,是 CDN 的 ”内部网络“,称之为 middlemile(中间一公里)。

image.png

CDN的组成

全局负载均衡系统 GLB(Global Load Balance)

image.png

  • 当用户访问加入CDN服务的网站时,域名解析请求将最终由 “智能调度DNS”负责处理。
  • 它通过一组预先定义好的策略,将当时最接近用户的节点地址提供给用户,使用户可以得到快速的服务。
  • 同时它需要与分布在各地的CDN节点保持通信,跟踪各节点的健康状态、容量等信息,确保将用户的请求分配到就近可用的节点上.

缓存服务器

缓存服务器主要的功能就是缓存热点数据,数据类型包括:静态资源(html,js,css等),多媒体资源(img,mp3,mp4等),以及动态数据(边缘渲染)等。

众所周知耳熟能详的与 CDN 有关的开源软件有:

  • Squid
  • Varnish
  • Nginx
  • OpenResty
  • ATS
  • HAProxy

具体对比可参考:blog.csdn.net/joeyon1985/…

CDN的分层架构

image.png

源站

源站指发布内容的原始站点。添加、删除和更改网站的文件,都是在源站上进行的;另外缓存服务器所抓取的对象也全部来自于源站。

CDN 调度策略

DNS 调度

基于请求端 local DNS 的出口 IP 归属地以及运营商的 DNS 调度。

DNS 调度的问题:

  • DNS 缓存时间在 TTL 过期前是不会刷新的, 这样会导致节点异常的时候自动调度延时很大,会直接影响线上业务访问。
  • 大量的 local DNS 不支持 EDNS 协议,拿不到客户的真实IP,CDN 绝大多数时候只能通过local DNS IP来做决策,经常会出现跨区域调度的情况。

HTTP DNS 调度

客户端请求固定的 HTTP DNS 地址,根据返回获取解析结果。可以提高解析的准确性(不像DNS调度,只能通过local DNS IP来做决策),能很好的避免劫持等问题。

当然这种模式也有一些问题,例如客户端每次加载URL都可能产生一次HTTP DNS查询,这就对性能和网络接入要求很高。

302调度

基于客户端 IP 和 302 调度集群进行实时的流量调度。

我们来看一个例子:

  1. 访问 URL 链接后,此时请求到了调度群集上,我们能拿到的客户端信息有 客户端的出口IP(绝大多情况下是相同的),接下来算法和基于 DNS 的调度可以是一样的,只是判断依据由 local DNS 出口 ip 变成了客户端的出口IP。
  2. 浏览器收到302回应,跟随 Location 中的 URL,继续发起 http 请求,这次请求的目标 IP 是CDN 边缘节点,CDN节点会响应实际的文件内容。

302 调度的优势:

  • 实时调度,因为没有 local DNS 缓存的,适合 CDN 的削峰处理,对于成本控制意义重大;
  • 准确性高,直接获取客户端出口 IP 进行调度。

302 调度的劣势:

  • 每次都要跳转,对于延时敏感的业务不友好。一般只适用于大文件。

AnyCast BGP路由调度

基于 BGP AnyCast 路由策略,只提供极少的对外 IP,路由策略可以很快的调整。

目前 AWS CloudFront、CloudFlare 都使用了这种方式,在路由层面进行调度。

这种方式可以很好地抵御 DDOS 攻击,降低网络拥塞。

当然这种方式的成本和方案设计都比较复杂,所以国内的 CDN 目前还都是用 UniCast 的方式。

一些概念

CDN运作原理

本地缓存的数据,通过key-value 的形式,将url 和本地缓存进行映射,存储结构与 Map相似,采用 hash+链表形式进行缓存。

image.png

CDN命中率

衡量我们CDN服务质量的一个核心标准,当用户访问的资源恰好在缓存系统里,可以直接返回给用户,说明CDN命中;如果CDN缓存中,没有命中资源,那么会触发回源动作。

CDN回源

当CDN本地缓存没有命中时,触发回源动作,

  • 一级缓存 访问二级缓存是否有相关数据,如果有,返回一级缓存。
  • 二级缓存 Miss,触发 二级缓存 回源请求,请求源站对应数据。获取结果后,缓存到本地缓存,返回数据到一级缓存。
  • 一级缓存 获取数据,缓存本地后,返回给用户。

CDN预热数据

上面说的访问模式,都是基于Pull模式,由用户决策哪部分热点数据会最终存留在CDN缓存中;对于大促场景,我们往往需要预先将活动相关资源预热 到 边缘节点(L1),避免大促开启后,大量用户访问,造成源站压力过大。这时候采用的是 Push模式。

CDN的特点总结

1、资源访问加速: 本地Cache加速,提高了企业站点(尤其含有大量图片和静态页面站点)的访问速度,并大大提高以上性质站点的稳定性

2、消除运营商间网络互联的瓶颈问题: 镜像服务消除了不同运营商之间互联的瓶颈造成的影响,实现了跨运营商的网络加速,保证不同网络中的用户都能得到良好的访问质量。

3、远程加速: 远程访问用户根据DNS负载均衡技术 智能自动选择Cache服务器,选择最快的Cache服务器,加快远程访问的速度

4、带宽优化: 自动生成服务器的远程Mirror(镜像)cache服务器,远程用户访问时从cache服务器上读取数据,减少远程访问的带宽、分担网络流量、减轻原站点WEB服务器负载等功能。

5、集群抗攻击: 广泛分布的CDN节点加上节点之间的智能冗余机制,可以有效地预防黑客入侵以及降低各种D.D.o.S攻击对网站的影响,同时保证较好的服务质量 。

点关注,不迷路

好了各位,以上就是这篇文章的全部内容了,我后面会每周都更新几篇高质量的大厂面试和常用技术栈相关的文章。感谢大伙能看到这里,如果这个文章写得还不错, 求三连!!! 感谢各位的支持和认可,我们下篇文章见!

我是 九灵 ,有需要交流的童鞋可以关注公众号:Java 补习课! 如果本篇博客有任何错误,请批评指教,不胜感激 !

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Kafka丢数据、重复消费、顺序消费的问题

发表于 2021-09-01

面试官:今天我想问下,你觉得Kafka会丢数据吗?

候选者:嗯,使用Kafka时,有可能会有以下场景会丢消息

候选者:比如说,我们用Producer发消息至Broker的时候,就有可能会丢消息

候选者:如果你不想丢消息,那在发送消息的时候,需要选择带有 callBack的api进行发送

候选者:其实就意味着,如果你发送成功了,会回调告诉你已经发送成功了。如果失败了,那收到回调之后自己在业务上做重试就好了。

候选者:等到把消息发送到Broker以后,也有可能丢消息

候选者:一般我们的线上环境都是集群环境下嘛,但可能你发送的消息后broker就挂了,这时挂掉的broker还没来得及把数据同步给别的broker,数据就自然就丢了

候选者:发送到Broker之后,也不能保证数据就一定不丢了,毕竟Broker会把数据存储到磁盘之前,走的是操作系统缓存

候选者:也就是异步刷盘这个过程还有可能导致数据会丢

候选者:嗯,到这里其实我已经说了三个场景了,分别是:producer -> broker ,broker->broker之间同步,以及broker->磁盘

候选者:要解决上面所讲的问题也比较简单,这块也没什么好说的…

候选者:不想丢数据,那就使用带有callback的api,设置 acks、retries、factor等等些参数来保证Producer发送的消息不会丢就好啦。

面试官:嗯…

候选者:一般来说,还是client 消费 broker 丢消息的场景比较多

面试官:那你们在消费数据的时候是怎么保证数据的可靠性的呢?

候选者:首先,要想client端消费数据不能丢,肯定是不能使用autoCommit的,所以必须是手动提交的。

候选者:我们这边是这样实现的:

候选者:一、从Kafka拉取消息(一次批量拉取500条,这里主要看配置)时

候选者:二、为每条拉取的消息分配一个msgId(递增)

候选者:三、将msgId存入内存队列(sortSet)中

候选者:四、使用Map存储msgId与msg(有offset相关的信息)的映射关系

候选者:五、当业务处理完消息后,ack时,获取当前处理的消息msgId,然后从sortSet删除该msgId(此时代表已经处理过了)

候选者:六、接着与sortSet队列的首部第一个Id比较(其实就是最小的msgId),如果当前msgId<=sort Set第一个ID,则提交当前offset

候选者:七、系统即便挂了,在下次重启时就会从sortSet队首的消息开始拉取,实现至少处理一次语义

候选者:八、会有少量的消息重复,但只要下游做好幂等就OK了。

面试官:嗯,你也提到了幂等,你们这业务怎么实现幂等性的呢?

候选者:嗯,还是以处理订单消息为例好了。

候选者:幂等Key我们由订单编号+订单状态所组成(一笔订单的状态只会处理一次)

候选者:在处理之前,我们首先会去查Redis是否存在该Key,如果存在,则说明我们已经处理过了,直接丢掉

候选者:如果Redis没处理过,则继续往下处理,最终的逻辑是将处理过的数据插入到业务DB上,再到最后把幂等Key插入到Redis上

候选者:显然,单纯通过Redis是无法保证幂等的(:

候选者:所以,Redis其实只是一个「前置」处理,最终的幂等性是依赖数据库的唯一Key来保证的(唯一Key实际上也是订单编号+状态)

候选者:总的来说,就是通过Redis做前置处理,DB唯一索引做最终保证来实现幂等性的

面试官:你们那边遇到过顺序消费的问题吗?

候选者:嗯,也是有的,我举个例子

候选者:订单的状态比如有 支付、确认收货、完成等等,而订单下还有计费、退款的消息报

候选者:理论上来说,支付的消息报肯定要比退款消息报先到嘛,但程序处理的过程中可不一定的嘛

候选者:所以在这边也是有消费顺序的问题

候选者:但在广告场景下不是「强顺序」的,只要保证最终一致性就好了。

候选者:所以我们这边处理「乱序」消息的实现是这样的:

候选者:一、宽表:将每一个订单状态,单独分出一个或多个独立的字段。消息来时只更新对应的字段就好,消息只会存在短暂的状态不一致问题,但是状态最终是一致的

候选者:二、消息补偿机制:另一个进行消费相同topic的数据,消息落盘,延迟处理。将消息与DB进行对比,如果发现数据不一致,再重新发送消息至主进程处理

候选者:还有部分场景,可能我们只需要把相同userId/orderId发送到相同的partition(因为一个partition由一个Consumer消费),又能解决大部分消费顺序的问题了呢。

面试官:嗯…懂了

欢迎关注我的微信公众号【Java3y】来聊聊Java面试

【对线面试官-移动端】系列 一周两篇持续更新中!

【对线面试官-电脑端】系列 一周两篇持续更新中!

原创不易!!求三连!!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…541542543…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%