开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

RocketMQ源码解析-通信模块

发表于 2021-01-05

这篇文章和 《RocketMQ源码解析-开篇》 隔了非常久,肥壕真是惭愧不已。一方面是忙于工作(摸鱼),另一方面一直纠结从哪个方面入手会让大家更加容易理解,而且如果贴上太多的源码,阅读的效果可能会适得其反。

所以为了提高文章的阅读质量,肥壕决定删繁就简,摒弃过度的源码解析,结合更多的设计图,目的是:看完直呼好家伙!

在消息队列架构中,各个角色可能随时都要进行通信交互,数据传输。

因此,通信模块在消息队列设计中是不可或缺的核心模块。

而且一个优秀良好的网络通信模块,很大程度上决定了消息传输的能力和整体性能。

本文就从 RocketMQ 的通信模块源码解析,深入学习高性能的网络通信模块究竟是如何实现的。

RocketMQ 消息队列的整体架构图👇


关于 RocketMQ 架构中各角色的作用和功能可以查看: ,肥壕就不再复述。

这里我们重点关注是各角色之间的通信关系:

NameServer

  • Name Server 每隔 10 s 扫描所有存活 Broker 的连接,如果 NameServer 超过 2 min 没有收到心跳,则 NameServer 断开与 Broker 的连接。

Broker

  • 每个 Broker 与所有的 NameServer 节点建立长连接,每隔 30s 汇报 Topic 信息到所有 NameServer。

Producer

  • Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,默认每隔 30s 从 NameServer 获取所有 Topic 队列的最新情况,这意味着如果 Broker不可用,Producer 最多 30s 能够感知,在此期间内发往 Broker 的所有消息都会失败。
  • Producer 与 提供 topic 服务的 Broker 建立长连接,默认 30s 向所有关联的 Broker 发送心跳,Broker 每隔 10s 扫描所有存活的连接,如果 Broker 在 2min 内没有收到心跳数据,则关闭与 Producer 的连接。

Consumer

  • Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,默认每隔 30s 从 NameServer 获取 Topic 的最新队列情况,这意味着 Broker 不可用时,Consumer 最多最需要 30s 才能感知。
  • Consumer 每隔 30s 向所有关联的 Broker 发送心跳,Broker 每隔 10s 扫描所有存活的连接,若某个连接 2min 内没有发送心跳数据,则关闭连接;并向该 Consumer Group 的所有 Consumer 发出通知,Group 内的 Consumer 重新分配队列,然后继续消费。

可以看出,RocketMQ 架构中各角色之间形成一个比较复杂的通信网络,每一条链路都有可能影响整个消息队里通信的性能。

rocketmq-remoting 模块是 RocketMQ 中负责网络通信的核心模块,也是我们这次阅读主要的模块。(本文使用的 RocketMQ 版本是 4.4.1)

RocketMQ 的通信模块是基于 Netty 扩展的,在阅读通信模块部分的源码前,大家最好对 Netty 有一个基础的入门了解,知道 Netty 的整体通信模型,NIO 线程模型的知识。这样在下面的源码解析中就不会犯迷糊。

RocketMQ 多线程模型

Remoting 的网络通信是基于 Netty 实现,所以整个通信架构都是基于 Netty 模型扩展来的。

1. Remoting 通信模块结构

我们先来看看 Remoting 通信模块的类结构图:

  • RemotingService:最上层接口,定义了三个方法
1
2
3
java复制代码void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);
  • RemotingServer:定义了服务端的接口,继承了最上层接口 RemotingService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码// 注册处理器
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);

// 注册默认处理器
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);

int localListenPort();

// 根据请求码(code)获取不同的处理器
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);

// 同步通信,返回 RemotingCommand
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException;

// 异步通信
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

// 单向通信
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
  • RemotingClient:定义了客户端的接口,并且继承了最上层接口 RemotingService,定义的方法与 RemotingServer 相似。
  • NettyRemotingAbstract:Netty 通信抽象类,定义并封装了服务端和客户端公共方法。
  • NettyRemotingServer:服务端的实现类,实现了 RemotingServer 接口,继承 NettyRemotingAbstract 抽象类。
  • NettyRemotingClient:客户端的实现类,实现了 RemotingClient 接口,继承 NettyRemotingAbstract 抽象类。

简单说,RocketMQ 在 Netty 通信的基础框架上对通信的 Server 和 Client 进行了抽象和封装的处理,使结构更为简洁和易扩展。

2. Netty 的多线程模型

Netty 是一个高性能,异步事件驱动的 NIO 框架,使用 Reactor 模式构建的线程模型。

Reactor 三种线程模型:

  • 单线程 Reactor 模型

所有的 I/O 操作都在一个 NIO 线程完成,同时负责客户端的连接和 read/write 操作。

缺点:一个 NIO 线程即要负责 I/O 连接又要负责 I/O 读写,可能会导致线程负载过高, 处理性能越来越低效,甚至会导致 CPU 跑飞,系统宕机的风险。

  • 多线程 Reactor 模型

与单线程模型最大的区别是,有一组 NIO 线程负责 I/O 读写,将 I/O 连接与读写分离开,提高 I/O 的读写速率。

这也是大部分场景所使用的模型,能够支撑日常高并发连接的业务场景。

  • 主从线程模型

如果是并发百万的客户端连接,单个 Acceptor 线程可能就会显得力不从心,有性能上的瓶颈。而主从线程模型的特点是:将原本负责 I/O 连接的单个线程替换成 NIO 线程池。

3. RocketMQ 的线程模型

RocketMQ 则采用了多线 Reactor 程模型的设计实现网络通信:


通过这个结构图可以看出,RocketMQ 在 Netty 原生的多线程 Reactor 模型上做了一系列的扩展和优化,记住主要的数字:(1 + N + M1 + M2)

  • 一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP 网络连接请求,建立好连接,创建 SocketChannel,并注册到 selector 上。
  • RocketMQ 的源码中会自动根据 OS 的类型选择 NIO 和 Epoll,也可以通过参数配置,然后监听真正的网络数据。
  • 拿到网络数据后,再丢给 Worker 线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),
  • 在真正执行业务逻辑之前需要进行 SSL 验证、编解码、空闲检查、网络连接管理,这些工作交给 defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为 8 )去做。
  • 而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor,然后封装成 task 任务后,提交给对应的业务 processor 处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。
线程数 线程名 线程具体说明
1 NettyBoss_%d Reactor 主线程
N NettyServerEPOLLSelector_%d_%d Reactor 线程池
M1 NettyServerCodecThread_%d Worker线程池
M2 RemotingExecutorThread_%d 业务processor处理线程池

更详细的内容可以参考官方文档的说明 [Apache RocketMQ开发者指南-设计] ,当然,前提是要熟悉 Netty 的整个调用链路和整体的设计结构,不熟悉的同学先自行学习。

消息的协议设计与编解码

我们知道网络传输的数据是二进制格式的,Server 与 Client 之间发送消息与接收消息的时候,需要对其进行序列化和反序列化,所以也可以理解为数据的解码和编码的过程。

但是解码和编码也必须要按相同的消息协议进行,就好比这一段二进制的消息是一封信,哪里个位置是开头,哪里位置是内容,哪里位置是结尾,写信人和收信人都必须要有相同的约定。

所以要保证消息能够正确的发送与接收,就必须保证彼此使用一致的消息协议和编解码方式,不然就会出现

👳‍♂️:ミ耗釨尾汁o巴さ

🤷‍♂️: ???

RocketMQ 为了更高效地在网络中传输消息和对收到的消息读取,自定义通信协议和消息的编解码。

先来看一下 RocketMQ 自定义的通信协议的格式:


可见传输内容主要可以分为以下 4 部分:

(1) 消息长度:总长度,四个字节存储,占用一个 int 类型;

(2) 序列化类型 & 消息头长度:同样占用一个 int 类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;

(3) 消息头数据:经过序列化后的消息头数据;

(4) 消息主体数据:消息主体的二进制字节数据内容;

RemotingCommand 类是消息协议的数据封装,不但包含了所有的数据结构,还包含了编码解码操作。

RemotingCommand 类的成员变量如下:

Header字段 类型 Request说明 Response说明
code int 请求操作码,应答方根据不同的请求码进行不同的业务处理 应答响应码。0表示成功,非0则表示各种错误
language LanguageCode 请求方实现的语言 应答方实现的语言
version int 请求方程序的版本 应答方程序的版本
opaque int 相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应 应答不做修改直接返回
flag int 区分是普通RPC还是onewayRPC得标志 区分是普通RPC还是onewayRPC得标志
remark String 传输自定义文本信息 传输自定义文本信息
extFields HashMap<String, String> 请求自定义扩展信息 响应自定义扩展信息

具体的编解码操作都在 RemotingCommand 类实现,肥壕来简单讲解一下消息的编码过程,让大家了解一下 RocketMQ 对消息做了哪些自定义的规范和处理。

进去 NettyEncoder 类编码器看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
// 1. Message Length + Serialization type + Header Length + Data Header
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
// 2. Message Body
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}

对照上图 RocketMQ 自定义的通信协议的格式

步骤 1 :remotingCommand.encodeHeader() 将通信协议的前三部分都转成 byte

步骤 2: remotingCommand.getBody() 将消息内容转成 byte

最终转成能在网络中传输的二进制数据,我们再深入 remotingCommand.encodeHeader():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size
int length = 4;

// 2> header data length
byte[] headerData;
headerData = this.headerEncode();

length += headerData.length;

// 3> body data length
length += bodyLength;

ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

// 1.message length
result.putInt(length);

// 2.serialization type + header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

// 3.header data
result.put(headerData);

result.flip();

return result;
}

这里其实逻辑也是比较清晰的,就是把自定义协议的前三部分分别转成 byte。不过这里有个比较有意思的方法:markProtocolType(headerData.length, serializeTypeCurrentRPC)

1
2
3
4
5
6
7
8
9
java复制代码public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];

result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}

可能有些同学没看明白上面的位运算,肥壕呢一开始看到也是一脸懵逼,后面经过一番资料研究之后其实还是很好理解的。可以参考:www.cnblogs.com/mcsfx/p/110…

消息的通信方式和通信流程

RocketMQ 通信方式主要有三种:

  • 同步(sync)
  • 异步(async)
  • 单向(oneway)

下文就以 同步(sync) 通信模式,重点分析一下客户端的发送流程。

1. Client 发送请求消息

客户端(发送者)发送消息的时候,一般都会直接调用 DefaultMQProducerImpl 类中的 send(Message msg),而这个方法默认是同步通信模式的。而这个方法最后会调用到 NettyRemotingClient 类中的 invokeSync方法,拿到与服务器 Channel ,然后调用 NettyRemotingAbstract 类中的 invokeSyncImpl 方法,给服务端发送消息。

invokeAsyncImpl 发送消息的源码如下(已附上相关注释):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
// 相当于requestID,每个请求都会生成一个唯一ID,每次加一
final int opaque = request.getOpaque();

try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
// 使用 Netty 的 Channel 发送请求数据到服务端
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
// 执行这个方法同时也会调用 countDownLatch countDown()方法
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
// 使用 countDownLatch 实现同步
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
  • opaque:请求标识码,同个客户端连接每个请求都会生成唯一的请求码。
  • ResponseFuture: 是获取发送消息结果的封装对象,这里 RocketMQ 使用 CountDownLatch 计数器实现同步通信模式。创建对象的时候也会默认创建一个 countDownLatch = new CountDownLatch(1) ,调用 Channel 发送消息后会调用 waitResponse(timeoutMillis)(实际调用countDownLatch.await()) 阻塞等待结果,然后在 Channel 的回调函数中会释放这个计数器。
  • responseTable:保存请求标识码和响应结果的映射表。在同步模式中即用即取,发挥作用不大,主要是作用于异步通信模式,因网络丢失问题,对异步调用做补偿处理等。

2. Server 接收消息和处理逻辑

Server 端接收消息的核心处理入口在 NettyServerHandler 类的 channelRead0 方法中,并调用负责处理请求消息的核心方法 processRequestCommand。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
java复制代码/**
* Process incoming request command issued by remote peer.
*
* @param ctx channel handler context.
* @param cmd request command.
*/
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
// 根据请求业务码,获取对应的处理类和线程池
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();

if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
// 前置处理
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
// 核心处理方法
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
// 后置处理
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {

}
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());

if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
// 如果拒绝请求为true
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}

try {
// 封装task,使用当前业务的processor绑定的线程池执行
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}

if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}

在处理请求命令的方法中,RocketMQ 使用了一系列的设计模式,把消息业务代码抽象化,使整个调用方法的代码逻辑更为简洁和易扩展。

  • processorTable:业务码与业务处理器、业务线程池的映射表
1
2
3
4
5
java复制代码/**
* This container holds all processors per request code, aka, for each incoming request, we may look up the
* responding processor in this map to handle the request.
*/
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

可以看到,如果要对业务进行修改和扩展,只需改动对应的业务处理器即可,扩展性是非常高的。并且都会封装成 RequestTask 线程,由对应的业务线程池异步执行任务。

总结

本篇文章最核心的是深入解析 RocketMQ 的通信模块,分别从多线程模型、消息协议设计与编解码、消息通信方式这几个方面并结合代码较为深入的了解整个通信模块的设计和交互流程。

RocketMQ 作为一个出色优秀的消息队列框架,其底层必然少不了一个性能高效的通讯架构支撑。

当然,一个高效的网络通信架构,除了有优秀的通信设计,还需要确保通信的稳定性。比如:客户端如何确保发送消息不丢失、客户端的负载均衡等,这些后面肥壕再一一讲解吧。

本文内容如有理解不到位的地方,欢迎大家留言探讨~

普通的改变,将改变普通

我是宅小年,一个在互联网低调前行的小青年

欢迎微信搜一搜「宅小年」,点击关注,阅读更多分享好文

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【Java基础】吃透Java IO:字节流、字符流、缓冲流

发表于 2021-01-05

原文链接:blog.csdn.net/mu\_wind/ar…?

Java IO流

  • 前言
  • 1 初识Java IO
    • 1.1 IO流分类
      • 1.2 案例实操
  • 2 IO流对象
    • 2.1 File类
      • 2.2 字节流
      • 2.3 字符流
      • 2.4 序列化
  • 3 IO流方法
    • 3.1 字节流方法
      • 3.2 字符流方法
  • 4 附加内容
    • 4.1 位、字节、字符
      • 4.2 IO流效率对比
      • 4.3 NIO

前言

有人曾问fastjson的作者(阿里技术专家高铁):“你开发fastjson,没得到什么好处,反而挨了骂背了锅,这种事情你为什么要做呢?”

高铁答道:“因为热爱本身,就是奖励啊!”

这个回答顿时触动了我。想想自己,又何尝不是如此。写作是个痛苦的过程,用心写作就更加煎熬,需字字斟酌,反复删改才有所成。然而,当一篇篇精良文章出自己手而呈现眼前时,那些痛苦煎熬就都那么值得。如果这些博文能有幸得大家阅读和认可,就更加是莫大的鼓舞了。技术人的快乐就是可以这么纯粹和简单。

点波关注不迷路,一键三连好运连连!

IO流是Java中的一个重要构成部分,也是我们经常打交道的。这篇关于Java IO的博文干货满满,堪称全网前三(请轻喷!)

下面几个问题(问题还会继续补充),如果你能对答如流,那么恭喜你,IO知识掌握得很好,可以立即关闭文章。反之,你可以在后面得文章中寻找答案。

  1. Java IO流有什么特点?
  2. Java IO流分为几种类型?
  3. 字节流和字符流的关系与区别?
  4. 字符流是否使用了缓冲?
  5. 缓冲流的效率一定高吗?为什么?
  6. 缓冲流体现了Java中的哪种设计模式思想?
  7. 为什么要实现序列化?如何实现序列化?
  8. 序列化数据后,再次修改类文件,读取数据会出问题,如何解决呢?

1 初识Java IO

IO,即in和out,也就是输入和输出,指应用程序和外部设备之间的数据传递,常见的外部设备包括文件、管道、网络连接。

Java 中是通过流处理IO 的,那么什么是流?

流(Stream),是一个抽象的概念,是指一连串的数据(字符或字节),是以先进先出的方式发送信息的通道。

当程序需要读取数据的时候,就会开启一个通向数据源的流,这个数据源可以是文件,内存,或是网络连接。类似的,当程序需要写入数据的时候,就会开启一个通向目的地的流。这时候你就可以想象数据好像在这其中“流”动一样。

一般来说关于流的特性有下面几点:

  1. 先进先出:最先写入输出流的数据最先被输入流读取到。
  2. 顺序存取:可以一个接一个地往流中写入一串字节,读出时也将按写入顺序读取一串字节,不能随机访问中间的数据。(RandomAccessFile除外)
  3. 只读或只写:每个流只能是输入流或输出流的一种,不能同时具备两个功能,输入流只能进行读操作,对输出流只能进行写操作。在一个数据传输通道中,如果既要写入数据,又要读取数据,则要分别提供两个流。

1.1 IO流分类

IO流主要的分类方式有以下3种:

  1. 按数据流的方向:输入流、输出流
  2. 按处理数据单位:字节流、字符流
  3. 按功能:节点流、处理流

在这里插入图片描述

1、输入流与输出流

输入与输出是相对于应用程序而言的,比如文件读写,读取文件是输入流,写文件是输出流,这点很容易搞反。

在这里插入图片描述

2、字节流与字符流

字节流和字符流的用法几乎完成全一样,区别在于字节流和字符流所操作的数据单元不同,字节流操作的单元是数据单元是8位的字节,字符流操作的是数据单元为16位的字符。

为什么要有字符流?

Java中字符是采用Unicode标准,Unicode 编码中,一个英文为一个字节,一个中文为两个字节。

在这里插入图片描述

而在UTF-8编码中,一个中文字符是3个字节。例如下面图中,“云深不知处”5个中文对应的是15个字节:-28-70-111-26-73-79-28-72-115-25-97-91-27-92-124

在这里插入图片描述

那么问题来了,如果使用字节流处理中文,如果一次读写一个字符对应的字节数就不会有问题,一旦将一个字符对应的字节分裂开来,就会出现乱码了。为了更方便地处理中文这些字符,Java就推出了字符流。

字节流和字符流的其他区别:

  1. 字节流一般用来处理图像、视频、音频、PPT、Word等类型的文件。字符流一般用于处理纯文本类型的文件,如TXT文件等,但不能处理图像视频等非文本文件。用一句话说就是:字节流可以处理一切文件,而字符流只能处理纯文本文件。
  2. 字节流本身没有缓冲区,缓冲字节流相对于字节流,效率提升非常高。而字符流本身就带有缓冲区,缓冲字符流相对于字符流效率提升就不是那么大了。详见文末效率对比。

以写文件为例,我们查看字符流的源码,发现确实有利用到缓冲区:

在这里插入图片描述

在这里插入图片描述

3、节点流和处理流

节点流:直接操作数据读写的流类,比如FileInputStream

处理流:对一个已存在的流的链接和封装,通过对数据进行处理为程序提供功能强大、灵活的读写功能,例如BufferedInputStream(缓冲字节流)

处理流和节点流应用了Java的装饰者设计模式。

下图就很形象地描绘了节点流和处理流,处理流是对节点流的封装,最终的数据处理还是由节点流完成的。

在这里插入图片描述

在诸多处理流中,有一个非常重要,那就是缓冲流。

我们知道,程序与磁盘的交互相对于内存运算是很慢的,容易成为程序的性能瓶颈。减少程序与磁盘的交互,是提升程序效率一种有效手段。缓冲流,就应用这种思路:普通流每次读写一个字节,而缓冲流在内存中设置一个缓存区,缓冲区先存储足够的待操作数据后,再与内存或磁盘进行交互。这样,在总数据量不变的情况下,通过提高每次交互的数据量,减少了交互次数。

在这里插入图片描述

联想一下生活中的例子,我们搬砖的时候,一块一块地往车上装肯定是很低效的。我们可以使用一个小推车,先把砖装到小推车上,再把这小推车推到车前,把砖装到车上。这个例子中,小推车可以视为缓冲区,小推车的存在,减少了我们装车次数,从而提高了效率。

在这里插入图片描述

需要注意的是,缓冲流效率一定高吗?不一定,某些情形下,缓冲流效率反而更低,具体请见IO流效率对比。

完整的IO分类图如下:

在这里插入图片描述

1.2 案例实操

接下来,我们看看如何使用Java IO。

文本读写的例子,也就是文章开头所说的,将“松下问童子,言师采药去。只在此山中,云深不知处。”写入本地文本,然后再从文件读取内容并输出到控制台。

1、FileInputStream、FileOutputStream(字节流)

字节流的方式效率较低,不建议使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码public class IOTest {
public static void main(String[] args) throws IOException {
File file = new File("D:/test.txt");

write(file);
System.out.println(read(file));
}

public static void write(File file) throws IOException {
OutputStream os = new FileOutputStream(file, true);

// 要写入的字符串
String string = "松下问童子,言师采药去。只在此山中,云深不知处。";
// 写入文件
os.write(string.getBytes());
// 关闭流
os.close();
}

public static String read(File file) throws IOException {
InputStream in = new FileInputStream(file);

// 一次性取多少个字节
byte[] bytes = new byte[1024];
// 用来接收读取的字节数组
StringBuilder sb = new StringBuilder();
// 读取到的字节数组长度,为-1时表示没有数据
int length = 0;
// 循环取数据
while ((length = in.read(bytes)) != -1) {
// 将读取的内容转换成字符串
sb.append(new String(bytes, 0, length));
}
// 关闭流
in.close();

return sb.toString();
}
}
123456789101112131415161718192021222324252627282930313233343536373839

2、BufferedInputStream、BufferedOutputStream(缓冲字节流)

缓冲字节流是为高效率而设计的,真正的读写操作还是靠FileOutputStream和FileInputStream,所以其构造方法入参是这两个类的对象也就不奇怪了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码public class IOTest {

public static void write(File file) throws IOException {
// 缓冲字节流,提高了效率
BufferedOutputStream bis = new BufferedOutputStream(new FileOutputStream(file, true));

// 要写入的字符串
String string = "松下问童子,言师采药去。只在此山中,云深不知处。";
// 写入文件
bis.write(string.getBytes());
// 关闭流
bis.close();
}

public static String read(File file) throws IOException {
BufferedInputStream fis = new BufferedInputStream(new FileInputStream(file));

// 一次性取多少个字节
byte[] bytes = new byte[1024];
// 用来接收读取的字节数组
StringBuilder sb = new StringBuilder();
// 读取到的字节数组长度,为-1时表示没有数据
int length = 0;
// 循环取数据
while ((length = fis.read(bytes)) != -1) {
// 将读取的内容转换成字符串
sb.append(new String(bytes, 0, length));
}
// 关闭流
fis.close();

return sb.toString();
}
}
12345678910111213141516171819202122232425262728293031323334

3、InputStreamReader、OutputStreamWriter(字符流)

字符流适用于文本文件的读写,OutputStreamWriter类其实也是借助FileOutputStream类实现的,故其构造方法是FileOutputStream的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public class IOTest {

public static void write(File file) throws IOException {
// OutputStreamWriter可以显示指定字符集,否则使用默认字符集
OutputStreamWriter osw = new OutputStreamWriter(new FileOutputStream(file, true), "UTF-8");

// 要写入的字符串
String string = "松下问童子,言师采药去。只在此山中,云深不知处。";
osw.write(string);
osw.close();
}

public static String read(File file) throws IOException {
InputStreamReader isr = new InputStreamReader(new FileInputStream(file), "UTF-8");
// 字符数组:一次读取多少个字符
char[] chars = new char[1024];
// 每次读取的字符数组先append到StringBuilder中
StringBuilder sb = new StringBuilder();
// 读取到的字符数组长度,为-1时表示没有数据
int length;
// 循环取数据
while ((length = isr.read(chars)) != -1) {
// 将读取的内容转换成字符串
sb.append(chars, 0, length);
}
// 关闭流
isr.close();

return sb.toString()
}
}
12345678910111213141516171819202122232425262728293031

4、字符流便捷类

Java提供了FileWriter和FileReader简化字符流的读写,new FileWriter等同于new OutputStreamWriter(new FileOutputStream(file, true))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码public class IOTest {

public static void write(File file) throws IOException {
FileWriter fw = new FileWriter(file, true);

// 要写入的字符串
String string = "松下问童子,言师采药去。只在此山中,云深不知处。";
fw.write(string);
fw.close();
}

public static String read(File file) throws IOException {
FileReader fr = new FileReader(file);
// 一次性取多少个字节
char[] chars = new char[1024];
// 用来接收读取的字节数组
StringBuilder sb = new StringBuilder();
// 读取到的字节数组长度,为-1时表示没有数据
int length;
// 循环取数据
while ((length = fr.read(chars)) != -1) {
// 将读取的内容转换成字符串
sb.append(chars, 0, length);
}
// 关闭流
fr.close();

return sb.toString();
}
}
123456789101112131415161718192021222324252627282930

5、BufferedReader、BufferedWriter(字符缓冲流)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
arduino复制代码public class IOTest {

public static void write(File file) throws IOException {
// BufferedWriter fw = new BufferedWriter(new OutputStreamWriter(new
// FileOutputStream(file, true), "UTF-8"));
// FileWriter可以大幅度简化代码
BufferedWriter bw = new BufferedWriter(new FileWriter(file, true));

// 要写入的字符串
String string = "松下问童子,言师采药去。只在此山中,云深不知处。";
bw.write(string);
bw.close();
}

public static String read(File file) throws IOException {
BufferedReader br = new BufferedReader(new FileReader(file));
// 用来接收读取的字节数组
StringBuilder sb = new StringBuilder();

// 按行读数据
String line;
// 循环取数据
while ((line = br.readLine()) != null) {
// 将读取的内容转换成字符串
sb.append(line);
}
// 关闭流
br.close();

return sb.toString();
}
}
1234567891011121314151617181920212223242526272829303132

2 IO流对象

第一节中,我们大致了解了IO,并完成了几个案例,但对IO还缺乏更详细的认知,那么接下来我们就对Java IO细细分解,梳理出完整的知识体系来。

Java种提供了40多个类,我们只需要详细了解一下其中比较重要的就可以满足日常应用了。

2.1 File类

File类是用来操作文件的类,但它不能操作文件中的数据。

1
2
scala复制代码public class File extends Object implements Serializable, Comparable<File>
1

File类实现了Serializable、 Comparable<File>,说明它是支持序列化和排序的。

File类的构造方法

方法名

说明

File(File parent, String child)

根据 parent 抽象路径名和 child 路径名字符串创建一个新 File 实例。

File(String pathname)

通过将给定路径名字符串转换为抽象路径名来创建一个新 File 实例。

File(String parent, String child)

根据 parent 路径名字符串和 child 路径名字符串创建一个新 File 实例。

File(URI uri)

通过将给定的 file: URI 转换为一个抽象路径名来创建一个新的 File 实例。

File类的常用方法

方法

说明

createNewFile()

当且仅当不存在具有此抽象路径名指定名称的文件时,不可分地创建一个新的空文件。

delete()

删除此抽象路径名表示的文件或目录。

exists()

测试此抽象路径名表示的文件或目录是否存在。

getAbsoluteFile()

返回此抽象路径名的绝对路径名形式。

getAbsolutePath()

返回此抽象路径名的绝对路径名字符串。

length()

返回由此抽象路径名表示的文件的长度。

mkdir()

创建此抽象路径名指定的目录。

File类使用实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
arduino复制代码public class FileTest {
public static void main(String[] args) throws IOException {
File file = new File("C:/Mu/fileTest.txt");

// 判断文件是否存在
if (!file.exists()) {
// 不存在则创建
file.createNewFile();
}
System.out.println("文件的绝对路径:" + file.getAbsolutePath());
System.out.println("文件的大小:" + file.length());

// 刪除文件
file.delete();
}
}
12345678910111213141516

2.2 字节流

InputStream与OutputStream是两个抽象类,是字节流的基类,所有具体的字节流实现类都是分别继承了这两个类。

以InputStream为例,它继承了Object,实现了Closeable

1
2
3
4
java复制代码public abstract class InputStream
extends Object
implements Closeable
123

InputStream类有很多的实现子类,下面列举了一些比较常用的:

在这里插入图片描述

详细说明一下上图中的类:

  1. InputStream:InputStream是所有字节输入流的抽象基类,前面说过抽象类不能被实例化,实际上是作为模板而存在的,为所有实现类定义了处理输入流的方法。
  2. FileInputSream:文件输入流,一个非常重要的字节输入流,用于对文件进行读取操作。
  3. PipedInputStream:管道字节输入流,能实现多线程间的管道通信。
  4. ByteArrayInputStream:字节数组输入流,从字节数组(byte[])中进行以字节为单位的读取,也就是将资源文件都以字节的形式存入到该类中的字节数组中去。
  5. FilterInputStream:装饰者类,具体的装饰者继承该类,这些类都是处理类,作用是对节点类进行封装,实现一些特殊功能。
  6. DataInputStream:数据输入流,它是用来装饰其它输入流,作用是“允许应用程序以与机器无关方式从底层输入流中读取基本 Java 数据类型”。
  7. BufferedInputStream:缓冲流,对节点流进行装饰,内部会有一个缓存区,用来存放字节,每次都是将缓存区存满然后发送,而不是一个字节或两个字节这样发送,效率更高。
  8. ObjectInputStream:对象输入流,用来提供对基本数据或对象的持久存储。通俗点说,也就是能直接传输对象,通常应用在反序列化中。它也是一种处理流,构造器的入参是一个InputStream的实例对象。

OutputStream类继承关系图:

在这里插入图片描述

OutputStream类继承关系与InputStream类似,需要注意的是PrintStream.

2.3 字符流

与字节流类似,字符流也有两个抽象基类,分别是Reader和Writer。其他的字符流实现类都是继承了这两个类。

以Reader为例,它的主要实现子类如下图:

在这里插入图片描述

各个类的详细说明:

  1. InputStreamReader:从字节流到字符流的桥梁(InputStreamReader构造器入参是FileInputStream的实例对象),它读取字节并使用指定的字符集将其解码为字符。它使用的字符集可以通过名称指定,也可以显式给定,或者可以接受平台的默认字符集。
  2. BufferedReader:从字符输入流中读取文本,设置一个缓冲区来提高效率。BufferedReader是对InputStreamReader的封装,前者构造器的入参就是后者的一个实例对象。
  3. FileReader:用于读取字符文件的便利类,new FileReader(File file)等同于new InputStreamReader(new FileInputStream(file, true),"UTF-8"),但FileReader不能指定字符编码和默认字节缓冲区大小。
  4. PipedReader :管道字符输入流。实现多线程间的管道通信。
  5. CharArrayReader:从Char数组中读取数据的介质流。
  6. StringReader :从String中读取数据的介质流。

Writer与Reader结构类似,方向相反,不再赘述。唯一有区别的是,Writer的子类PrintWriter。

2.4 序列化

待续…

3 IO流方法

3.1 字节流方法

字节输入流InputStream主要方法:

  • read() :从此输入流中读取一个数据字节。
  • read(byte[] b) :从此输入流中将最多 b.length 个字节的数据读入一个 byte 数组中。
  • read(byte[] b, int off, int len) :从此输入流中将最多 len 个字节的数据读入一个 byte 数组中。
  • close():关闭此输入流并释放与该流关联的所有系统资源。

字节输出流OutputStream主要方法:

  • write(byte[] b) :将 b.length 个字节从指定 byte 数组写入此文件输出流中。
  • write(byte[] b, int off, int len) :将指定 byte 数组中从偏移量 off 开始的 len 个字节写入此文件输出流。
  • write(int b) :将指定字节写入此文件输出流。
  • close() :关闭此输入流并释放与该流关联的所有系统资源。

3.2 字符流方法

字符输入流Reader主要方法:

  • read():读取单个字符。
  • read(char[] cbuf) :将字符读入数组。
  • read(char[] cbuf, int off, int len) : 将字符读入数组的某一部分。
  • read(CharBuffer target) :试图将字符读入指定的字符缓冲区。
  • flush() :刷新该流的缓冲。
  • close() :关闭此流,但要先刷新它。

字符输出流Writer主要方法:

  • write(char[] cbuf) :写入字符数组。
  • write(char[] cbuf, int off, int len) :写入字符数组的某一部分。
  • write(int c) :写入单个字符。
  • write(String str) :写入字符串。
  • write(String str, int off, int len) :写入字符串的某一部分。
  • flush() :刷新该流的缓冲。
  • close() :关闭此流,但要先刷新它。

另外,字符缓冲流还有两个独特的方法:

  • BufferedWriter类newLine() :写入一个行分隔符。这个方法会自动适配所在系统的行分隔符。
  • BufferedReader类readLine() :读取一个文本行。

4 附加内容

4.1 位、字节、字符

字节(Byte)是计量单位,表示数据量多少,是计算机信息技术用于计量存储容量的一种计量单位,通常情况下一字节等于八位。

字符(Character)计算机中使用的字母、数字、字和符号,比如’A’、‘B’、’$’、’&’等。

一般在英文状态下一个字母或字符占用一个字节,一个汉字用两个字节表示。

字节与字符:

  • ASCII 码中,一个英文字母(不分大小写)为一个字节,一个中文汉字为两个字节。
  • UTF-8 编码中,一个英文字为一个字节,一个中文为三个字节。
  • Unicode 编码中,一个英文为一个字节,一个中文为两个字节。
  • 符号:英文标点为一个字节,中文标点为两个字节。例如:英文句号 . 占1个字节的大小,中文句号 。占2个字节的大小。
  • UTF-16 编码中,一个英文字母字符或一个汉字字符存储都需要 2 个字节(Unicode 扩展区的一些汉字存储需要 4 个字节)。
  • UTF-32 编码中,世界上任何字符的存储都需要 4 个字节。

4.2 IO流效率对比

首先,对比下普通字节流和缓冲字节流的效率:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
ini复制代码public class MyTest {
public static void main(String[] args) throws IOException {
File file = new File("C:/Mu/test.txt");
StringBuilder sb = new StringBuilder();

for (int i = 0; i < 3000000; i++) {
sb.append("abcdefghigklmnopqrstuvwsyz");
}
byte[] bytes = sb.toString().getBytes();

long start = System.currentTimeMillis();
write(file, bytes);
long end = System.currentTimeMillis();

long start2 = System.currentTimeMillis();
bufferedWrite(file, bytes);
long end2 = System.currentTimeMillis();

System.out.println("普通字节流耗时:" + (end - start) + " ms");
System.out.println("缓冲字节流耗时:" + (end2 - start2) + " ms");

}

// 普通字节流
public static void write(File file, byte[] bytes) throws IOException {
OutputStream os = new FileOutputStream(file);
os.write(bytes);
os.close();
}

// 缓冲字节流
public static void bufferedWrite(File file, byte[] bytes) throws IOException {
BufferedOutputStream bo = new BufferedOutputStream(new FileOutputStream(file));
bo.write(bytes);
bo.close();
}
}
12345678910111213141516171819202122232425262728293031323334353637

运行结果:

1
2
3
复制代码普通字节流耗时:250 ms
缓冲字节流耗时:268 ms
12

这个结果让我大跌眼镜,不是说好缓冲流效率很高么?要知道为什么,只能去源码里找答案了。翻看字节缓冲流的write方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scss复制代码public synchronized void write(byte b[], int off, int len) throws IOException {
if (len >= buf.length) {
/* If the request length exceeds the size of the output buffer,
flush the output buffer and then write the data directly.
In this way buffered streams will cascade harmlessly. */
flushBuffer();
out.write(b, off, len);
return;
}
if (len > buf.length - count) {
flushBuffer();
}
System.arraycopy(b, off, buf, count, len);
count += len;
}
123456789101112131415

注释里说得很明白:如果请求长度超过输出缓冲区的大小,刷新输出缓冲区,然后直接写入数据。这样,缓冲流将无害地级联。

但是,至于为什么这么设计,我没有想明白,有哪位明白的大佬可以留言指点一下。

基于上面的情形,要想对比普通字节流和缓冲字节流的效率差距,就要避免直接读写较长的字符串,于是,设计了下面这个对比案例:用字节流和缓冲字节流分别复制文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
ini复制代码public class MyTest {
public static void main(String[] args) throws IOException {
File data = new File("C:/Mu/data.zip");
File a = new File("C:/Mu/a.zip");
File b = new File("C:/Mu/b.zip");

StringBuilder sb = new StringBuilder();

long start = System.currentTimeMillis();
copy(data, a);
long end = System.currentTimeMillis();

long start2 = System.currentTimeMillis();
bufferedCopy(data, b);
long end2 = System.currentTimeMillis();

System.out.println("普通字节流耗时:" + (end - start) + " ms");
System.out.println("缓冲字节流耗时:" + (end2 - start2) + " ms");
}

// 普通字节流
public static void copy(File in, File out) throws IOException {
// 封装数据源
InputStream is = new FileInputStream(in);
// 封装目的地
OutputStream os = new FileOutputStream(out);

int by = 0;
while ((by = is.read()) != -1) {
os.write(by);
}
is.close();
os.close();
}

// 缓冲字节流
public static void bufferedCopy(File in, File out) throws IOException {
// 封装数据源
BufferedInputStream bi = new BufferedInputStream(new FileInputStream(in));
// 封装目的地
BufferedOutputStream bo = new BufferedOutputStream(new FileOutputStream(out));

int by = 0;
while ((by = bi.read()) != -1) {
bo.write(by);
}
bo.close();
bi.close();
}
}
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950

运行结果:

1
2
3
复制代码普通字节流耗时:184867 ms
缓冲字节流耗时:752 ms
12

这次,普通字节流和缓冲字节流的效率差异就很明显了,达到了245倍。

再看看字符流和缓冲字符流的效率对比:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
ini复制代码public class IOTest {
public static void main(String[] args) throws IOException {
// 数据准备
dataReady();

File data = new File("C:/Mu/data.txt");
File a = new File("C:/Mu/a.txt");
File b = new File("C:/Mu/b.txt");
File c = new File("C:/Mu/c.txt");

long start = System.currentTimeMillis();
copy(data, a);
long end = System.currentTimeMillis();

long start2 = System.currentTimeMillis();
copyChars(data, b);
long end2 = System.currentTimeMillis();

long start3 = System.currentTimeMillis();
bufferedCopy(data, c);
long end3 = System.currentTimeMillis();

System.out.println("普通字节流1耗时:" + (end - start) + " ms,文件大小:" + a.length() / 1024 + " kb");
System.out.println("普通字节流2耗时:" + (end2 - start2) + " ms,文件大小:" + b.length() / 1024 + " kb");
System.out.println("缓冲字节流耗时:" + (end3 - start3) + " ms,文件大小:" + c.length() / 1024 + " kb");
}

// 普通字符流不使用数组
public static void copy(File in, File out) throws IOException {
Reader reader = new FileReader(in);
Writer writer = new FileWriter(out);

int ch = 0;
while ((ch = reader.read()) != -1) {
writer.write((char) ch);
}
reader.close();
writer.close();
}

// 普通字符流使用字符流
public static void copyChars(File in, File out) throws IOException {
Reader reader = new FileReader(in);
Writer writer = new FileWriter(out);

char[] chs = new char[1024];
while ((reader.read(chs)) != -1) {
writer.write(chs);
}
reader.close();
writer.close();
}

// 缓冲字符流
public static void bufferedCopy(File in, File out) throws IOException {
BufferedReader br = new BufferedReader(new FileReader(in));
BufferedWriter bw = new BufferedWriter(new FileWriter(out));

String line = null;
while ((line = br.readLine()) != null) {
bw.write(line);
bw.newLine();
bw.flush();
}

// 释放资源
bw.close();
br.close();
}

// 数据准备
public static void dataReady() throws IOException {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 600000; i++) {
sb.append("abcdefghijklmnopqrstuvwxyz");
}
OutputStream os = new FileOutputStream(new File("C:/Mu/data.txt"));
os.write(sb.toString().getBytes());

os.close();
System.out.println("完毕");
}
}
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283

运行结果:

1
2
3
4
复制代码普通字符流1耗时:1337 ms,文件大小:15234 kb
普通字符流2耗时:82 ms,文件大小:15235 kb
缓冲字符流耗时:205 ms,文件大小:15234 kb
123

测试多次,结果差不多,可见字符缓冲流效率上并没有明显提高,我们更多的是要使用它的readLine()和newLine()方法。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

橙单中台化低代码生成器 v13 发布

发表于 2021-01-05

OrangeAdmin 橙单中台化低代码生成器在又经过 30 天迭代后,现发布 v1.3 版本。该版本已实现,用户可根据不同的工程配置属性,生成不同偏好的业务框架代码。

在线资源

  • 开源项目 gitee.com/orangeform/…
  • 开发文档 www.orangeforms.com/development…
  • 教学视频 www.bilibili.com/video/bv1Wg…

新功能列表

  • 新增支持 Mybatis Plus,配置工程时可选择 Mybatis 插件,目前已同时支持最主流的 Mybatis Plus 和 tk mapper。
  • 单体和微服务工程均支持 DTO 和 VO 相关代码的生成,进一步解耦数据组装,同时也为 Swagger 提供了更好的支持。
  • 配置微服务工程时,可精确设定每个 Controller 接口是否生成 FeignClient 远程调用接口,此前是全部生成。
  • 配置工程时,可选择 Service 层为支持接口和实现类的标准方式,或是只有 Service 实现类的简洁方式。
  • 配置工程时,可选择是否开启基于 BaseModel 的实体对象全局公有字段,用户可根据偏好修改公有字段名称。如不开启,则仍然保留原有功能,既可为每个数据表指定不同的公有字段属性。
  • 根据用户反馈,集成RoctetMQ,在有效保证消息投递的可靠性、消费的顺序性和幂等性的前提下,实现实时同步部门关系的变化数据到其他多个业务数据库中,从而保障数据权限过滤在实现上的简单性,以及运行时的高效性。
  • 根据用户反馈,单点登录在原有功能的基础上,新增本地密码登录方式的支持。同时也在原有支持 OAuth2 的 auth_code 授权模式基础上,新增支持了 OAuth2 的 password 授权模式。
  • 根据用户反馈,新增支持一对多从表数据过滤,可生成基于嵌套子查询的 SQL 语句,同时基于新增的注解 RelationOneToMany 完成最后的数据组装。

修改说明

  • 优化字典表数据缓存代码处理方式,数据字典列表页面支持数据库数据与缓存数据的比对功能。
  • 修复一对一关联时,从表包含过滤条件同时又包含逻辑删除字段时,逻辑删除字段关联处理不当的问题。

新功能截图

详情可见

橙单简介

橙单低代码生成器由知视科技团队研发,团队经过多年大型企业中台化改造项目的沉淀,不断总结经验教训,不断努力提升产能,不断积极应对微服务改造过程中出现的分分合合。经过无数日夜的持续迭代和优化,终于可以实现生成 70% 的适用于微服务架构的高质量范式化工程级代码。此后,我们就很少加班,热爱工作,并与领导成为了战友,与客户成为了朋友。

我们的优势

  • 排在首位的就是 16 万字以上的专业详尽、同步更新和暗黑护眼的操作指南、开发文档和教学视频。
  • 真正的中台化代码生成器,而非脚手架,可生成相对复杂的业务代码,拥抱中台服务的不断分分合合。
  • 非常浅显易懂的高质量生成后工程代码,经过 SonarQube 和 Alibaba 代码规范的严格扫描。
  • 极为宽松、合理、透明、全网超低价的商业授权。(其实就是撸顿串的价格)

基础功能

  • 前端框架:单页面、多标签、多栏目、子路由和多套高颜值样式,多种模式可供选择。
  • 前端能力:多表联动、上传下载、数据导出、自定义打印模板、富文本、分组统计图表、明细数据下钻等。
  • 页面布局:支持基于 Fragment 和 Block 的灵活布局方式,通过配置即可生成多样化的表单页面,支持全工程模式的页面预览。
  • 后台架构:分布式锁、分布式 Id 生成器、分布式缓存、分布式事务、分布式存储、分布式数据同步和分布式灰度发布,按需集成。
  • 缓存同步:支持基于 Canal 的多实例分布式数据同步,可将变化的数据实时同步到 Redis 集群。
  • 操作权限:精确到按钮级的操作和标签级的显示,同时提供多维度的权限分配路径查询能力。
  • 数据权限:基于 Mybatis 拦截器 + JSqlParser 的实现方式,配置更灵活,代码侵入性更低。
  • 多数据源:可根据配置动态生成,路由策略灵活可扩展。
  • 数据组装:Java 注解方式配装多数据库间和多服务间的多种关联和计算关系的数据。
  • 定时任务:支持多种类型的定时任务代码模板,灵活可配、高度优化、二次开发简单。
  • 日志监控:基于 Kafka + ELK 的服务日志跟踪,基于 PinPoint / SkyWalking 的服务链路跟踪。
  • 指标监控:Grafana + Prometheus 和 Spring Boot Admin 的指标监控。
  • 接口文档:目前已经集成 Knife4j,同时支持基于 0 注解的 Postman 接口导出。

技术选型

  • 前端框架为 Element (Vue) / Ant Design (React) / ECharts / AntV / Axios / Webpack。
  • 后端框架为 Spring Boot / Spring Cloud / Spring Cloud Alibaba + Mybatis Plus + tk mapper + Jwt。
  • Java工具库 Apache Commons + Hutool + Guava + Caffeine + Lombok + MapStruct + Knife4j + qdox。
  • 主要中间件 Redis + Zookeeper + Apollo + XXL-Job + Quartz + Seata + Canal + RocketMQ + Kafka + Consul + Minio + ELK + Sentinel + PinPoint / SkyWalking + Prometheus + Grafana + Spring Boot Admin。

代码质量

  • 无任何二次封装,只生成您最懂的代码。
  • 遵循阿里巴巴标准的代码规范,扫描后无任何警告。
  • SonarQube 基于最严格检测规则的代码扫描。
  • 产品级代码质量,层次清晰、滴水不漏。
  • 近乎于 0 的代码重复率,35% 以上的注释覆盖率。
  • 15 年以上经验的前后端架构师优化的每一处细节。
  • 前沿的单表组合式设计,使业务服务的拆分与再合并 SO EASY。
  • 先代码,后 SQL 的原则,让微服务横向扩充更具弹性。
  • 标准化的服务间调用接口,使业务服务组合更具正交性。
  • 前后端基于约定各司其职,默契配合,让系统运行飞起来。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

每日一面 - mysql 的自增 id 的实现逻辑是什么样子

发表于 2021-01-05

本问题参考自: www.zhihu.com/question/43… 解答为个人原创

Key TakeAways

  • InnoDB 引擎中 有三种 AutoIncrement 锁模式:
    • innodb_autoinc_lock_mode=0(traditional lock mode):获取表锁,语句执行结束后释放
    • innodb_autoinc_lock_mode=1(consecutive lock mode,MySQL 8.0 之前默认 ):对于不确定插入数量的语句(例如INSERT ... SELECT, REPLACE ... SELECT和LOAD DATA)和 innodb_autoinc_lock_mode=0 一样,其他的确定数量的语句在执行前先批量获取 id,之后再执行语句。
    • innodb_autoinc_lock_mode=2(interleaved lock mode,MySQL 8.0+ 默认 ):采用乐观锁, CAS 更新计数器获取。
  • AutoIncrement 计数器在 MySQL 8.0 之前,存储在内存中,在 MySQL 8.0 之后,持久化存储到磁盘。通过每次更新写入 Redo Log,并在检查点刷入 innodb 引擎表中记录下来。
  • AutoIncrement 的 id 可以让新数据聚集在一起,利于大部分 OLTP 业务(访问频率在最近一天,一周,或者几个月内比较活跃,而超过一段时间内的数据很少访问)。如果是这类业务推荐使用自增主键,将业务主键(UUID)作为二级的唯一索引使用。
  • 如果考虑分布式性能以及避免 AutoIncrement 带来的锁性能问题,可以考虑使用 ID 生成器生成:全局趋势增长的主键

为何主键要 Auto Increment 而不是 UUID

MySQL InnoDB 引擎默认主键索引是 B+ 树索引,也是聚集索引,为何叫聚集索引呢?

以 InnoDB 作为存储引擎的表,表中的数据都会有一个主键,即使你不创建主键,系统也会帮你创建一个隐式的主键。这是因为 InnoDB 是把数据存放在 B+ 树中的,而 B+ 树的键值就是主键,在 B+ 树的叶子节点中,存储了表中所有的数据。这种以主键作为 B+ 树索引的键值而构建的 B+ 树索引,我们称之为聚集索引。

存储中,聚集索引的数据,会根据索引的值,对应的数据也会聚集存储在一起:

image

MySQL 读取磁盘上的数据是一页一页读取的,如果某条我们要处理的数据在某一页中,但是这一页其他数据我们都不关心,这样的请求多了,性能会急剧下降,类似于 CPU 的 false sharing:

image

按照 B+ 树的原理,AutoIncrement 的 ID 能保证最新的数据在一页中被读取,而且减少了 B+ 树分裂翻转。 UUID 由于无序,插入时,B+ 树会不断翻转,并且最新的数据可能不在同一页。很可能会出现,最新一条数据,和好几年前的数据在同一页。

在大部分 OLTP 类业务中,例如购物和支付交易的订单,节日促销的抽奖活动这类业务都有这样的使用场景,访问频率在最近一天,一周,或者几个月内比较活跃,而超过一段时间内的数据很少访问。如果是这类业务推荐使用自增主键,将业务主键(UUID)作为二级的唯一索引使用。 如果考虑分布式性能以及避免 AutoIncrement 带来的锁性能问题,可以考虑使用 ID 生成器生成全局趋势增长的主键,例如 Twitter 的 Snowflake 算法生成的前面是时间戳的主键id,或者是 类似于这种 “时间+业务+自增”(例如 20210105105811233ORD0000001) 字符串,作为主键id,这样其实也能近似保证热数据聚集存储在一起,也就是 MySQL 一页一页读取能命中更多要读取处理的数据

AutoIncrement 原理

我们这里只关心 InnoDB 引擎的。

AutoIncrement 最大值

AutoIncrement 最大值,和列类型相关。最大可以设置列类型为 UNSIGNED BIGINT,这样最大值就是 18446744073709551615。 超过这个值继续生成则还是 18446744073709551615。不会再增加。

AutoIncrement 锁模式

获取 AutoIncrement 最新值,需要涉及到锁。目前有三种锁模式,对应 innodb_autoinc_lock_mode 的值, 0 ,1,2. MySQL 8.0 之后,默认为 2, 在这之前,默认为 1

  • innodb_autoinc_lock_mode=0(traditional lock mode) 传统的auto_increment机制,这种模式下所有针对auto_increment列的插入操作都会加表级别的AUTO-INC锁,在语句执行结束则会释放,分配的值也是一个个分配,是连续的,正常情况下也不会有间隙(当然如果事务rollback了这个auto_increment值就会浪费掉,从而造成间隙)。
  • innodb_autoinc_lock_mode=1(consecutive lock mode) 这种情况下,针对未知数量批量插入(例如INSERT ... SELECT, REPLACE ... SELECT和LOAD DATA)才会采用AUTO-INC锁这种方式,而针对已知数量的普通插入,则采用了一种新的轻量级的互斥锁来分配auto_increment列的值。这种锁,只会持续到获取一定数量的 id,不会等待语句执行结束在释放。也就是拿轻量级锁提前分配好所需数量的 id 之后释放锁,再执行语句。当然,如果其他事务已经持有了AUTO-INC锁,则simple inserts需要等待。当然,这种情况下,可能产生的间隙更多。
  • innodb_autoinc_lock_mode=2(interleaved lock mode) 这种模式下任何类型的inserts都不会采用AUTO-INC锁,性能最好,但是在同一条语句内部产生auto_increment值间隙。其实这个就是所有语句对于同一个值进行 Compare-And-Set 更新,类似于乐观锁。这个锁模式对statement-based replication的主从同步都有一定问题。因为同步传输的是语句,而不是行值,语句执行后的差异导致主从可能主键不一致。

AutoIncrement 存储

AutoIncrement 计数器在 MySQL 8.0 之前,存储在内存中,每次启动时通过以下语句初始化:

1
sql复制代码SELECT MAX(ai_col) FROM table_name FOR UPDATE;

在 MySQL 8.0 之后,持久化存储到磁盘。通过每次更新写入 Redo Log,并在检查点刷入 innodb 引擎表中记录下来。

所以,在MySQL 8.0 之前,如果 rollback 导致某些值没有使用,重启后,这些值还是会使用。但是在 MySQL 8.0 之后就不会了。

每日一刷,轻松提升技术,斩获各种offer:

image

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

校招mysql那些事儿 日志模块binlog/redolog

发表于 2021-01-04

该文章始发于公众号【迈莫coding】

地址:校招mysql那些事儿|日志模块binlog/redolog/undolog

目录

  • 背景
  • 物理日志和逻辑日志
  • 日志模块:redo log
    • redo log产生背景
    • redo log基本概念
    • redo log记录形式
    • redo log使用场景
  • 日志模块:bin log
    • bin log基本概念
    • bin log刷盘机制
    • bin log使用场景
  • 日志模块:undo log
    • undo log基本概念
    • undo log使用场景
  • binlog/redo log/undo log区别
  • 闲聊
  • 欢迎加入我的公众号【迈莫coding】 一起pk大厂

背景

日志是mysql数据库的重要组成部分,记录着数据库运行期间各种状态信息。mysql日志主要包括错误日志、查询日志、慢查询日志、事务日志、二进制日志几大类。作为开发,我们重点需要关注的是二进制日志(bin log)和事务日志(包括redo log和undo log),本文接下来会详细介绍这三种日志。

物理日志和逻辑日志

在说日志模块前,先剧透一下什么是物理日志和逻辑日志,以防有小伙伴不太熟悉这两个词,所以先学习一下,这样看日志模块时见到这两个词不至于生疏。

  • 物理日志:通俗的讲,就是只有”我”自己可以使用,别人无法共享我的”物理格式,私有化。
  • 逻辑日志:可以给别的引擎使用,是所有引擎共享的。

日志模块:redolog

redo log产生背景

说mysql日志redo log,先说一个故事。从前有一家酒馆,酒馆老板常备着一个粉板,专门记着客人的赊账记录,也是他的法宝之一。如果赊账的人不多,他可以在粉板上记录下赊账的人和账目。如果赊账人多的话,由于粉板的空间大小有限,所以他又需要额外准备一本账本,专门记录所有赊账的账目。

如果有人要赊账的话,一般老板有两种做法:

  • 打开账本,找到赊账人的记录,进行追加赊账记录
  • 先把赊账人的记录写到粉板上,待客流量少的时刻,再更新到赊账账目上

如果老板使用第一种方法的话,每当有人要赊账的话,首先他需要打开厚厚的账本,一页一页查找该顾客的姓名,然后进行登记。你们想啊,如果赊账的人不多,老板找赊账人的记录轻松点,如果赊账本有好几本的话,一本一本的找,老板看的都头疼。

方案一的过程想想都头大。相比第二种方案,对老板就相对轻松点,老板只需要把赊账人的信息写在粉板上,待客流量低的时候,再更新到赊账本上。

同样,mysql里也有同样的问题,如果每一次数据的操作都写入磁盘中,首先磁盘先要找到对应的记录,然后再更新,整个过程io成本,查找成本比较高。所以,mysql为了提高性能,使用了类似老板粉板方式,先把更新数据结果存储到某个地方,待空闲时再写入磁盘中。

而mysql把更新结果存储的地方,就是咱们今天所说的主角之一redo log。接下里,咱们就好好和主角交流交流。

redo log基本概念

redo log是InnoDB存储引擎层的日志,又被称为重写日志,用来记录事务操作的变化,记录的是数据修改之后的值,不管事务提交是否成功,都会被记录下来。

而这种先写日志,后写磁盘的技术就是mysql里面经常提及到的WAL(Write Ahead Logging)技术。

具体的来说,就是当有一条记录需要更新的时候,InnoDB引擎会把记录优先更新到redo log(粉板)里面,并更新内存,这样更新操作就完成了。同时,InnoDB引擎会在空闲的时间将redo log中的记录存储到磁盘上。

redo log 记录方式

由于redo log记录的是数据页的变更,而这种记录是没有必要永久保存的,因此redo log实现上采用来大小固定,循环写入的方式,当记录写到末尾时,又会从头开始写,如下图所示。

如图所示,write pos是当前记录的位置,一边写一边后移,写到4号文件末尾就回到1号文件开头。check point是当前要把记录写入到数据文件的位置,也是后移并且循环的。

如果和上面老板粉板场景结合起来描述的话,write pos就是老板在粉板上顺序写入赊账人记录位置,对于mysql来说,write pos后移;而check point就是老板把粉板上记录写入到赊账本上的位置,当老板写入到赊账本上后,就会把粉板上该记录擦除掉,对于mysql来说,check point后移。

redo log使用场景

  • 用于系统奔溃恢复(crash-safe)

日志模块:binlog

bin log基本概念

bin log是mysql数据库service层的,是所有存储引擎共享的日志模块,它用于记录数据库执行的写入性操作,也就是在事务commit阶段进行记录,以二进制的形式保存于磁盘中。

bin log是逻辑日志,并且由mysql数据库的service层执行,也就是说使用所有的存储引擎数据库都会记录bin log日志。

bin log是以追加的方式进行写入的,可以通过 max_binlog_size 参数设置bin log文件大小,当文件大小达到某个值时,会生成新的文件来保存日志。

bin log刷盘机制

对于InnoDB引擎而言,在每次事务commit提交时才会记录bin log日志,此时记录仍然在内存中,那么什么时候存储到磁盘中呢?mysql通过 sync_binlog 参数控制bin log刷盘时机,取值范围:0~N: 0:不去强求,由系统自行判断何时写入磁盘; 1:每次事务commit的时候都要将bin log写入磁盘; N:每N个事务commit,才会将bin log写入磁盘;

sync_binlog 参数建议设置为1,这样每次事务commit时就会把bin log写入磁盘中,这样也可以保证mysql异常重启之后bin log日志不会丢失。

bin log使用场景

在实际场景中, bin log 的主要场景有两点,一点是主从复制,另一点是数据恢复 主从复制:在master端开启 bin log ,然后将 bin log 发送给各个slaver端,slaver端读取 bin log 日志,从而使得主从数据库中数据一致 数据恢复:通过 bin log 获取想要恢复的时间段数据

日志模块:undolog

undo log基本概念

undo log 是回滚日志,是记录每条数据的所有版本,比如 update 语句,那么它首先会将该条记录的数据记录到undo log日志中,并且将最新版本的roll_pointer指针指向上一个版本,这样就可以形成当前记录的所有版本,这也是MVCC的实现机制。

MVCC实现机制,查看我的另一篇文章<<校招mysql那些事|MVCC原理机制>>。 校招mysql那些事|MVCC原理机制

undo log使用场景

  • MVCC多版本控制中使用undo log

binlog/redo log/undo log区别

闲聊

  • 读完文章,自己是不是和mysql日志模块的cp率又提高了
  • 我是迈莫,欢迎大家和我交流

原创不易,觉得文章写得不错的小伙伴,点个赞👍 鼓励一下吧~

欢迎加入我的公众号【迈莫coding】 一起pk大厂

- 迈莫coding欢迎客官的到来

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

10个Python爬虫框架推荐,你使用的是哪个呢?

发表于 2021-01-04

实现爬虫技术的编程环境有很多种,Java、Python、C++等都可以用来爬虫。但很多人选择Python来写爬虫,为什么呢?因为Python确实很适合做爬虫,丰富的第三方库十分强大,简单几行代码便可实现你想要的功能。更重要的,Python也是数据挖掘和分析的好能手。那么,Python爬虫一般用什么框架比较好?

一般来讲,只有在遇到比较大型的需求时,才会使用Python爬虫框架。这样的做的主要目的,是为了方便管理以及扩展。本文我将向大家推荐十个Python爬虫框架。

在这里插入图片描述

1、Scrapy:Scrapy是一个为了爬取网站数据,提取结构性数据而编写的应用框架。 可以应用在包括数据挖掘,信息处理或存储历史数据等一系列的程序中。它是很强大的爬虫框架,可以满足简单的页面爬取,比如可以明确获知url pattern的情况。用这个框架可以轻松爬下来如亚马逊商品信息之类的数据。但是对于稍微复杂一点的页面,如weibo的页面信息,这个框架就满足不了需求了。它的特性有:HTML, XML源数据 选择及提取 的内置支持;提供了一系列在spider之间共享的可复用的过滤器(即 Item Loaders),对智能处理爬取数据提供了内置支持。

2、Crawley:高速爬取对应网站的内容,支持关系和非关系数据库,数据可以导出为JSON、XML等。

3、Portia:是一个开源可视化爬虫工具,可让使用者在不需要任何编程知识的情况下爬取网站!简单地注释自己感兴趣的页面,Portia将创建一个蜘蛛来从类似的页面提取数据。简单来讲,它是基于scrapy内核;可视化爬取内容,不需要任何开发专业知识;动态匹配相同模板的内容。

4、newspaper:可以用来提取新闻、文章和内容分析。使用多线程,支持10多种语言等。作者从requests库的简洁与强大得到灵感,使用Python开发的可用于提取文章内容的程序。支持10多种语言并且所有的都是unicode编码。

5、Python-goose:Java写的文章提取工具。Python-goose框架可提取的信息包括:文章主体内容、文章主要图片、文章中嵌入的任何Youtube/Vimeo视频、元描述、元标签。

6、Beautiful Soup:名气大,整合了一些常用爬虫需求。它是一个可以从HTML或XML文件中提取数据的Python库。它能够通过你喜欢的转换器实现惯用的文档导航,查找,修改文档的方式.Beautiful Soup会帮你节省数小时甚至数天的工作时间。Beautiful Soup的缺点是不能加载JS。

7、mechanize:它的优点是可以加载JS。当然它也有缺点,比如文档严重缺失。不过通过官方的example以及人肉尝试的方法,还是勉强能用的。

8、selenium:这是一个调用浏览器的driver,通过这个库你可以直接调用浏览器完成某些操作,比如输入验证码。Selenium是自动化测试工具,它支持各种浏览器,包括 Chrome,Safari,Firefox等主流界面式浏览器,如果在这些浏览器里面安装一个 Selenium 的插件,可以方便地实现Web界面的测试. Selenium支持浏览器驱动。Selenium支持多种语言开发,比如 Java,C,Ruby等等,PhantomJS 用来渲染解析JS,Selenium 用来驱动以及与Python的对接,Python进行后期的处理。

9、cola:是一个分布式的爬虫框架,对于用户来说,只需编写几个特定的函数,而无需关注分布式运行的细节。任务会自动分配到多台机器上,整个过程对用户是透明的。项目整体设计有点糟,模块间耦合度较高。

10、PySpider:一个国人编写的强大的网络爬虫系统并带有强大的WebUI。采用Python语言编写,分布式架构,支持多种数据库后端,强大的WebUI支持脚本编辑器,任务监视器,项目管理器以及结果查看器。Python脚本控制,可以用任何你喜欢的html解析包。

以上就是我分享的Python爬虫一般用的十大主流框架。这些框架的优缺点都不同,大家在使用的时候,可以根据具体场景选择合适的框架。如果你对Python感兴趣,欢迎加入我们【python学习交流裙】,免费领取学习资料和源码。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Flink 双流 Join 的3种操作示例

发表于 2021-01-04

在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作。同理,在流式处理作业中,有时也需要在两条流上做 join 以获得更丰富的信息。Flink DataStream API 为用户提供了3个算子来实现双流 join,分别是:

● join()

● coGroup()

● intervalJoin()

本文举例说明它们的使用方法,顺便聊聊比较特殊的 interval join 的原理。

准备数据

从 Kafka 分别接入点击流和订单流,并转化为 POJO。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ini复制代码DataStream<String> clickSourceStream = env
.addSource(new FlinkKafkaConsumer011<>(
"ods_analytics_access_log",
new SimpleStringSchema(),
kafkaProps
).setStartFromLatest());
DataStream<String> orderSourceStream = env
.addSource(new FlinkKafkaConsumer011<>(
"ods_ms_order_done",
new SimpleStringSchema(),
kafkaProps
).setStartFromLatest());
​
DataStream<AnalyticsAccessLogRecord> clickRecordStream = clickSourceStream
.map(message -> JSON.parseObject(message, AnalyticsAccessLogRecord.class));
DataStream<OrderDoneLogRecord> orderRecordStream = orderSourceStream
.map(message -> JSON.parseObject(message, OrderDoneLogRecord.class));

join()

join() 算子提供的语义为”Window join”,即按照指定字段和(滚动/滑动/会话)窗口进行 inner join,支持处理时间和事件时间两种时间特征。以下示例以10秒滚动窗口,将两个流通过商品 ID 关联,取得订单流中的售价相关字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scss复制代码clickRecordStream
.join(orderRecordStream)
.where(record -> record.getMerchandiseId())
.equalTo(record -> record.getMerchandiseId())
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply(new JoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
@Override
public String join(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception {
return StringUtils.join(Arrays.asList(
accessRecord.getMerchandiseId(),
orderRecord.getPrice(),
orderRecord.getCouponMoney(),
orderRecord.getRebateAmount()
), '\t');
}
})
.print().setParallelism(1);

简单易用。

coGroup()

只有 inner join 肯定还不够,如何实现 left/right outer join 呢?答案就是利用 coGroup() 算子。它的调用方式类似于 join() 算子,也需要开窗,但是 CoGroupFunction 比 JoinFunction 更加灵活,可以按照用户指定的逻辑匹配左流和/或右流的数据并输出。

以下的例子就实现了点击流 left join 订单流的功能,是很朴素的 nested loop join 思想(二重循环)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
scss复制代码clickRecordStream
.coGroup(orderRecordStream)
.where(record -> record.getMerchandiseId())
.equalTo(record -> record.getMerchandiseId())
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply(new CoGroupFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, Tuple2<String, Long>>() {
@Override
public void coGroup(Iterable<AnalyticsAccessLogRecord> accessRecords, Iterable<OrderDoneLogRecord> orderRecords, Collector<Tuple2<String, Long>> collector) throws Exception {
for (AnalyticsAccessLogRecord accessRecord : accessRecords) {
boolean isMatched = false;
for (OrderDoneLogRecord orderRecord : orderRecords) {
// 右流中有对应的记录
collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice()));
isMatched = true;
}
if (!isMatched) {
// 右流中没有对应的记录
collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null));
}
}
}
})
.print().setParallelism(1);

intervalJoin()

join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。所以 Flink 又提供了”Interval join”的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:

right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]

interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。

示例代码如下。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
less复制代码clickRecordStream
.keyBy(record -> record.getMerchandiseId())
.intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
.between(Time.seconds(-30), Time.seconds(30))
.process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
@Override
public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {
collector.collect(StringUtils.join(Arrays.asList(
accessRecord.getMerchandiseId(),
orderRecord.getPrice(),
orderRecord.getCouponMoney(),
orderRecord.getRebateAmount()
), '\t'));
}
})
.print().setParallelism(1);

由上可见,interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界。如果想令上下界是开区间,可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法。

interval join 的实现原理

以下是 KeyedStream.process(ProcessJoinFunction) 方法调用的重载方法的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
scss复制代码public <OUT> SingleOutputStreamOperator<OUT> process(
ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
TypeInformation<OUT> outputType) {
Preconditions.checkNotNull(processJoinFunction);
Preconditions.checkNotNull(outputType);
final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);
final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
new IntervalJoinOperator<>(
lowerBound,
upperBound,
lowerBoundInclusive,
upperBoundInclusive,
left.getType().createSerializer(left.getExecutionConfig()),
right.getType().createSerializer(right.getExecutionConfig()),
cleanedUdf
);
return left
.connect(right)
.keyBy(keySelector1, keySelector2)
.transform("Interval Join", outputType, operator);
}

可见是先对两条流执行 connect() 和 keyBy() 操作,然后利用 IntervalJoinOperator 算子进行转换。在 IntervalJoinOperator 中,会利用两个 MapState 分别缓存左流和右流的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码​
private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
​
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
LEFT_BUFFER,
LongSerializer.INSTANCE,
new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))
));
this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
RIGHT_BUFFER,
LongSerializer.INSTANCE,
new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
));
}

其中 Long 表示事件时间戳,List> 表示该时刻到来的数据记录。当左流和右流有数据到达时,会分别调用 processElement1() 和 processElement2() 方法,它们都调用了 processElement() 方法,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
swift复制代码@Override
public void processElement1(StreamRecord<T1> record) throws Exception {
processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
}
​
@Override
public void processElement2(StreamRecord<T2> record) throws Exception {
processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
}
​
@SuppressWarnings("unchecked")
private <THIS, OTHER> void processElement(
final StreamRecord<THIS> record,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
final long relativeLowerBound,
final long relativeUpperBound,
final boolean isLeft) throws Exception {
final THIS ourValue = record.getValue();
final long ourTimestamp = record.getTimestamp();
if (ourTimestamp == Long.MIN_VALUE) {
throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
"interval stream joins need to have timestamps meaningful timestamps.");
}
if (isLate(ourTimestamp)) {
return;
}
addToBuffer(ourBuffer, ourValue, ourTimestamp);
for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
final long timestamp = bucket.getKey();
if (timestamp < ourTimestamp + relativeLowerBound ||
timestamp > ourTimestamp + relativeUpperBound) {
continue;
}
for (BufferEntry<OTHER> entry: bucket.getValue()) {
if (isLeft) {
collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
} else {
collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
}
}
}
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
if (isLeft) {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
} else {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
}
}

这段代码的思路是:

1.取得当前流 StreamRecord 的时间戳,调用 isLate() 方法判断它是否是迟到数据(即时间戳小于当前水印值),如是则丢弃。

2.调用 addToBuffer() 方法,将时间戳和数据一起插入当前流对应的 MapState。

3.遍历另外一个流的 MapState,如果数据满足前述的时间区间条件,则调用 collect() 方法将该条数据投递给用户定义的 ProcessJoinFunction 进行处理。collect() 方法的代码如下,注意结果对应的时间戳是左右流时间戳里较大的那个。

1
2
3
4
5
6
java复制代码private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
collector.setAbsoluteTimestamp(resultTimestamp);
context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);
userFunction.processElement(left, right, context, collector);
}

4.调用 TimerService.registerEventTimeTimer() 注册时间戳为 timestamp + relativeUpperBound 的定时器,该定时器负责在水印超过区间的上界时执行状态的清理逻辑,防止数据堆积。注意左右流的定时器所属的 namespace 是不同的,具体逻辑则位于 onEventTime() 方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
arduino复制代码​
@Override
public void onEventTime(InternalTimer<K, String> timer) throws Exception {
long timerTimestamp = timer.getTimestamp();
String namespace = timer.getNamespace();
logger.trace("onEventTime @ {}", timerTimestamp);
switch (namespace) {
case CLEANUP_NAMESPACE_LEFT: {
long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
logger.trace("Removing from left buffer @ {}", timestamp);
leftBuffer.remove(timestamp);
break;
}
case CLEANUP_NAMESPACE_RIGHT: {
long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
logger.trace("Removing from right buffer @ {}", timestamp);
rightBuffer.remove(timestamp);
break;
}
default:
throw new RuntimeException("Invalid namespace " + namespace);
}
}

本文转载自简书,作者:LittleMagic原文链接:

www.jianshu.com/p/45ec88833…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Python茅台抢购脚本的使用说明!!

发表于 2021-01-04

本教程完成针对小白,大佬请绕道!!
本教程完成针对小白,大佬请绕道!!
本教程完成针对小白,大佬请绕道!!

重要的事情说三遍!

小白福音!!这两天问茅台脚本使用方法的人很多!!本人没有精力有限,所以弄了一个抢购软件!!

不需要安装Python,不需要配置环境,就和你在电脑上安装使用QQ等软件一样

公众号: Python编程与实战

PS:公众号后台回复 “茅台” 获取抢购软件!!

是 .exe 文件,点击 main.exe 就能运行

但是有个前提是需要更改 eid, fp 这两个参数

这两个参数在 config.ini 文件中

获取这两个参数的方法在第 4 步!!

你用软件抢购只看前面和第 4 步就可以了~

——————————–分割线————————————————————
下面是安装Python, 配置开发环境抢购的教程,用软件的不用看!!!
下面是安装Python, 配置开发环境抢购的教程,用软件的不用看!!!
下面是安装Python, 配置开发环境抢购的教程,用软件的不用看!!!

因为问的人太多了,一方面,有时候没空回复,回复不及时,导致错过抢购..

另外大多数人问的都是同一类问题,我每次都要重复再重复回复同一个答案

譬如:如何怎么用,如何运行脚本,要改什么,在哪改等等…

既然是小白教程,那我们就从安装Python开始,本文以windows 为例,其他系统同理

  1. 下载Python

下载地址:www.python.org/downloads/
选择适合自己系统的Python,最好是安装下载 3.8版本以上 Python,免得出现问题

Python

下载完之后就和你安装其他电脑软件一样,没难度的。记得选择加入环境变量

环境变量

我这图片是3.5版本,可以忽略, 只是一张图片,用来给你参考

cmd 中输入Python 验证是否安装成功

cmd

  1. 下载集成开发环境

如果你会玩 cmd 可以直接跳过这一步!

你在 cmd 中也能运行项目。前提是进入到项目目录。

然后输入 python main.py

我已经记不清有多少个人拿着下面这个图来问我了。。。

要学会看报错原因!!别一出问题就问!学会自己思考很重要

这就是因为没进入到项目目录执行,电脑知道你这文件在哪吗??

但是由于要改项目参数,有些小白会把格式改错,所以最好还是下载IDE,对新手学习Python能避免很多坑!

如果嫌麻烦,后面的IDE下载安装配置可以不弄,直接看第3个步骤!

——————————–分割线————————————————————

Python有许多的IDE,比如 vscode,pycharm等

笔者主要用 pycharm,下载安装。公众号后台ide获取安装包

无脑下一步,等待自动安装

选择 open 你下载的的茅台项目

然后点击左上角 File->Setting,选择你刚才安装的 Python

点下拉,show all

选择第一步安装好的 Python

选择 apply->ok

3.安装依赖包

点击 Termianl 进入控制台

输入命令:pip install -r requirements.txt -i pypi.douban.com/simple/

cmd 中同理。需要进入到项目目录!!!我的项目在F盘,所以先进入F盘,命令, F:

也是输入这个命令安装

运行之后没问题,就 ok 了!

4. 参数更改

eid, fp参数必须填写
这两个参数在下单页面获取(不用下单)
电脑网页端登录你的京东,随便选个商品进入下单页面

按下F12打开浏览器调试窗口,然后点击选择Console,在控制台中输入变量_JdTdudfp,即可从输出的Json中获取eid和fp。如上图

复制下来这两个参数,填入即可

5.运行main.py

茅台抢购有几个前提!

1.本脚本只针对京东

2.需要开通京东Plus会员

项目中有个 README 的项目说明。建议花几分钟时间看看

在pycharm 中,选择main.py ,右键选择run

1是预约,2是抢购!

需要先预约,预约完之后停止运行。

再运行脚本,输入2,它就能根据你设定的时间,到达时间自动抢购!

不能关闭软件啊!关闭软件不会抢了!

最后祝大家都能抢到茅台!!!

抢到了就是上面这样!记得去app 上支付付款!!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

BI无缝整合Apache Kylin,实现一站式大数据解决方

发表于 2021-01-04

研发背景

今天随着移动互联网、物联网、大数据、AI等技术的快速发展,数据已成为所有这些技术背后最重要,也是最具价值的“资产”,同时数据也是每一个商业决策的基石,越来越多的企业选择数字化转型,但数据驱动增长然充满挑战,企业数据孤岛严重、数据一致性难以保证、数据资产沉淀数据分散难以共用、数据分析项目上线经历数月,报表查询响应慢难以应对瞬息万变的市场环境,成本问题在数据量呈指数增长的前提下难以控制,因此在大数据的背景下,如何从海量的超大规模数据中快速获取有价值的信息,已经成为新时代的挑战。

​ Hadoop诞生以来,大数据的存储和批处理问题均得到了妥善解决,而如何高速地分析数据也就成为了下一个挑战。于是各式各样的“SQL on Hadoop”技术应运而生,其中以Hive为代表,Impala、Presto、Phoenix、Drill、SparkSQL、FlinkSQL等紧随其后。它们的主要技术是“大规模并行处理”(Massive Parallel Processing,MPP)和“列式存储”(Columnar Storage)。大规模并行处理可以调动多台机器一起进行并行计算,用线性增加的资源来换取计算时间的线性下降。列式存储则将记录按列存放,这样做不仅可以在访问时只读取需要的列,还可以利用存储设备擅长连续读取的特点,大大提高读取的速率。这两项关键技术使得Hadoop上的SQL查询速度从小时提高到了分钟级。

​ 然而分钟级别的查询响应仍然离交互式分析的现实需求还很远,市面上主流的开源OLAP引擎目前还没有一个系统能够满足各种场景的查询需求。其本质原因是,没有一个系统能同时在数据量、性能、和灵活性三个方面做到完美,每个系统在设计时都需要在这三者间做出取舍。

仔细思考大数据OLAP,可以注意到两个事实。

大数据查询要的一般是统计结果,是多条记录经过聚合函数计算后的统计值。原始的记录则不是必需的,或者访问频率和概率都极低。

聚合是按维度进行的,由于业务范围和分析需求是有限的,有意义的维度聚合组合也是相对有限的,一般不会随着数据的膨胀而增长。

​ 基于以上两点,我们可以得到一个新的思路——“预计算”。应尽量多地预先计算聚合结果,在查询时刻应尽量使用预算的结果得出查询结果,从而避免直接扫描可能无限增长的原始记录,预计算系统是在入库时对数据进行预聚合,进一步牺牲灵活性换取性能,以实现对超大数据集的秒级响应。

​ Apache Kylin是一个开源的、分布式的分析型数据仓库,提供Hadoop/Spark/Flink 之上的 SQL 查询接口及多维分析(OLAP)能力以支持超大规模数据,通过预计算它能在亚秒内查询巨大的表,其中的关键就是要打破查询时间随着数据量成线性增长的这个规律。

​ BI(Business Intelligence),即商务智能,指用现代数据仓库技术、在线分析技术、数据挖掘和数据展现技术进行数据分析以实现商业价值,随着业务数据的规模增长,传统数据仓库不堪重负,数据的存储和批量处理成了瓶颈,查询分析速度无法满足日益增长的数据需求,传统关系型多维分析ROLAP引擎遇到极大挑,越来越多的企业引入大数据平台架构。

​ BI大数据分析平台致力于帮助用户快速在业务场景中应用大数据,助力业务发展和产业升级,让数据更高效地驱动生产力。因此必须集成整合Kylin实现赋能基于大数据Hadoop 生态 MOLAP(Kylin)及 HOLAP (多引擎)的查询分析,实现支持识别、管理和优化最有价值数据、提升整合底层复杂数据源的能力,通过数据服务将复杂的数据映射为业务语言,统一业务口径。采用预计算技术可打破查询时间随数据量线性增长的现状,提供稳定高效的查询性能。

研发目标

​ BI平台无缝集成Apache Kylin,托管Kylin用户、权限管理统一的安全认证,统一界面样式、操作流程,并对一些功能进行扩展改造以适配BI系统,整合SparkSQL、FlinkSQL、Presto等多种引擎结合起来实现智能路由,充分发挥每种引擎的长处优势,赋能用户极速数据分析体验,形成统一、一站式的大数据OLAP解决方案。

数据立方体构建引擎(Cube Build Engine):当前底层数据计算引擎支持、MapReduce、Spark、Flink等。

Rest Server:当前kylin采用的REST API、JDBC、ODBC接口提供web服务。

查询引擎(Query Engine):Rest Server接收查询请求后,解析sql语句,生成执行计划,然后转发查询请求到Hbase中,最后将结果返回给 Rest Server。

存储引擎:Kylin默认使用分布式、面向列的开源数据库Hbase作为存储库引擎。

设计架构

附注1

  1. Mondrian为一个OLAP引擎,而且是一个ROLAP引擎,实现了以下规范:
  2. MDX(多维查询语言,相当于数据库的SQL)
  3. XMLA(通过SOAP使用OLAP)
  4. olap4j(Java API规范,相当于JDBC关系数据库)

附注1:

  1. 数据应用,包括智能报告、支持生成SQL或多维分析查询MDX语句组件、托拉拽自助式分析可视化组件等
  2. Mondrian Schema,数据多维分析模型
  3. Mondrian引擎,根据Schema生成标准SQL
  4. 目标数据源,包括关系型数据源、非关系型数据源、企业数据仓库

功能架构设计

附注1:

  1. 存储引擎,Kylin默认使用分布式、面向列的开源数据库Hbase作为存储库引擎,基于Apache Kylin插件架构实现数据库存储接入。
  2. Presto,分布式SQL查询引擎,适用于交互式分析查询,数据量支持GB到PB字节。

用户/权限

​ Kylin的Web模块使用Spring框架构建,在安全实现上选择了Spring Security。Spring Security是Spring项目组中用来提供安全认证服务的框架,它广泛支持各种身份验证模式,这些验证模型大多由第三方提供, Spring Security也提供了自己的一套验证功能。Kylin提供了三种用户验证方式“testing”、“ldap”和“saml”,依次为:自定义验证、LDAP验证和单点登录验证。

​ BI平台已实现一套成熟且完善的用户权限控制体系,为了便于系统的安全管理需要将Kylin用户权限管理和BI平台用户管理打通,可以利用现成的机制对Kylin的访问、资源的访问控制、修改做保护性的限制,使得大数据下的交互式报表分析成为可能。

数据模型

​ BI数据主题基于数据源元数据信息创建数据模型,支持简单可拖拉拽、灵活快速的方式实现可视化数据建模,需打通BI数据建模与Kylin数据建模功能,将BI数据模型适配至Kylin数据模型,支持事实表、维度、度量定义。

​ 每次Cube构建都会从数据源中批量读取数据,而对于大多数业务场景来说,数据源中的数据处于不断增长的状态,为了支持Cube中的数据能够不断地得到更新,且无需重复地为已经处理过的历史数据构建Cube,Cube支持增量构建,每个Cube都关联着一个数据模型Model,增量构建Cube需要指定分割时间列。

​ 对于维度表可选择配置是否将其以快照(Snapshot)形式存储到内存中以供查询。当维表小于300M时推荐启用,可以简化Cube计算提高效率。

CUBE配置

Cube功能改造:

  1. 页面,布局、样式统一、中文显示
  2. 用户权限,统一安全认证
  3. Cube管理查询
  4. 构建引擎,计算引擎默认选择Flink作为构建引擎

Cube运行监控

​ Apache Kylin通过日志和报警对任务进行监控、了解整体的运行情况,Kylin支持显示每个构建任务的进度条和构建状态,并可以展开明细,列出任务的每一步详细信息,数据模型下总Cube数,及空间占用。

  1. 状态

禁用(Disabled) 只有定义,没有构建数据

错误(ERROR) 报错并停止后续执行

准备(Ready) 构建完成可以提供查询服务。
2. 执行控制

恢复(Resume) 在上次错误位置恢复执行

放弃(Discard) 如要修改Cube或重新开始构建,可以放弃此次构建。

构建(Build) 全量构建,增量构建采用

刷新(Refresh) 对相应分区(Segment)历史数据进行重建

合并(Merge) 合分区(Segment),提高查询性能

数据查询

​ Cube构建好以后,状态变为“READY”,就可以进行查询,查询语言为标准SQL SELECT语句。

​ 只有当查询的模式跟Cube定义相匹配的时候,Kylin才能够使用Cube的数据来完成查询,“Group by”的列和“Where”条件里的列,必须是维度中定义的列,而SQL中的度量应跟Cube中定的义的度量一致。

​ Kylin提供了灵活的前端连接方式,包括Rest API、JDBC和ODBC。用户可以根据需要查询访问。

存储引擎

​ 基于Apache Kylin较强可伸缩性的插件架构实现数据库存储接入。

​ Kylin旨在减少Hadoop在10亿及百亿规模以上数据级别的情况下的查询延迟,目前底层数据存储基于HBase,具有较强的可伸缩性。插件架构旨在使 Kylin 在计算框架,数据源和cube 存储方面具有可扩展性。从 v1 开始,Kylin 与作为计算框架的 Hadoop MapReduce,作为数据源的 Hive,作为存储的 HBase紧密结合。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

5分钟带你快速了解Docker和k8s

发表于 2021-01-04

本文 github.com/smileArchit… 已收录。

JavaMap是Java知识地图,旨在让开发者学习不迷路!Java学习请认准JavaMap。

随着k8s 作为容器编排解决方案变得越来越流行,有些人开始拿 Docker 和 k8s进行对比,不禁问道:Docker 不香吗?

k8s 是kubernets的缩写,’8‘代表中间的八个字符。

其实 Docker 和 k8s 并非直接的竞争对手,它俩相互依存。 Docker 是一个容器化平台,而 k8s 是 Docker 等容器平台的协调器。

容器化时代来了

虚拟化技术已经走过了三个时代,没有容器化技术的演进就不会有 Docker 技术的诞生。

虚拟化技术演进

虚拟化技术演进

(1)物理机时代:多个应用程序可能会跑在一台机器上。

物理机时代

物理机时代

(2)虚拟机时代:一台物理机器安装多个虚拟机(VM),一个虚拟机跑多个程序。

虚拟机时代

虚拟机时代

(3)容器化时代:一台物理机安装多个容器实例(container),一个容器跑多个程序。

容器化时代

容器化时代

容器化解决了软件开发过程中一个令人非常头疼的问题,用一段对话描述:

测试人员:你这个功能有问题。

开发人员:我本地是好的啊。

开发人员编写代码,在自己本地环境测试完成后,将代码部署到测试或生产环境中,经常会遇到各种各样的问题。明明本地完美运行的代码为什么部署后出现很多 bug,原因有很多:不同的操作系统、不同的依赖库等,总结一句话就是因为本地环境和远程环境不一致。

容器化技术正好解决了这一关键问题,它将软件程序和运行的基础环境分开。开发人员编码完成后将程序打包到一个容器镜像中,镜像中详细列出了所依赖的环境,在不同的容器中运行标准化的镜像,从根本上解决了环境不一致的问题。

容器化技术的尖刀武器

容器化技术的特点

容器化技术的特点

  • 可移植性:不依赖具体的操作系统或云平台,比如在阿里云或腾讯云直接随意迁移。
  • 占地小:容器只需要其应用程序以及它需要运行的所有容器和库的依赖清单,不需要将所有的依赖库都打包在一起。
  • 共享 bin 和 lib:不同的容器可以共享 bin 和 lib,进一步节省了空间。

Docker 横空出世

2010年一位年轻小伙子在美国旧金山成立了一家名叫【dotCloud】的公司, 开发了 Docker的核心技术,从此开启了容器技术的时代。

Docker原公司

Docker原公司

后面 dotCloud 公司将自己的容器技术进行了简化和标准化,取名为 Docker,就是大家熟悉的鲸鱼 logo。

Docker新Logo

Docker新Logo

2013年dotCloud 公司宣布将 Docker 开源,随着越来越多的工程师发现了它的优点, Docker 的人气迅速攀升,成为当时最火爆的开源技术之一。

当前有30%以上的企业在其AWS环境中使用Docker,并且这个数字还在继续增长。

Docker使用率越来越高

Docker使用率越来越高

Docker怎么用?

其实大多数人谈论 Docker 时说的是 Docker Engine,这只是一个构建和运行的容器。

在运行容器前需要编写Docker File,通过 dockerFile 生成镜像,然后才能运行 Docker 容器。

Docker File 定义了运行镜像(image)所需的所有内容,包括操作系统和软件安装位置。一般情况下都不需要从头开始编写 Docker File,在 Docker Hub 中有来自世界各地的工程师编写好的镜像,你可以基于此修改。

编排系统的需求催生 k8s

尽管Docker为容器化的应用程序提供了开放标准,但随着容器越来越多出现了一系列新问题:

  • 如何协调和调度这些容器?
  • 如何在升级应用程序时不会中断服务?
  • 如何监视应用程序的运行状况?
  • 如何批量重新启动容器里的程序?

解决这些问题需要容器编排技术,可以将众多机器抽象,对外呈现出一台超大机器。现在业界比较流行的有:k8s、Mesos、Docker Swarm。

在业务发展初期只有几个微服务,这时用 Docker 就足够了,但随着业务规模逐渐扩大,容器越来越多,运维人员的工作越来越复杂,这个时候就需要编排系统解救opers。

应用程序的声明周期

应用程序的声明周期

一个成熟的容器编排系统需要具备以下能力:

  • 处理大量的容器和用户
  • 负载均衡
  • 鉴权和安全性
  • 管理服务通信
  • 多平台部署

k8s与Docker Swarm江湖恩怨

k8s VS Docker Swarm

k8s VS Docker Swarm

如果你非要拿 Docker 和 k8s 进行比较,其实你更应该拿 Docker Swarm 和 k8s 比较。

Docker Swarm 是 Docker 自家针对集群化部署管理的解决方案,优点很明显,可以更紧密集成到 Docker 生态系统中。

虽说 Swarm 是 Docker 亲儿子,但依旧没有 k8s 流行,不流行很大程度是因为商业、生态的原因,不多解释。

k8s是做什么用的?

K8s是Google研发的容器协调器,已捐赠给CNCF,现已开源。

Google 利用在容器管理多年的经验和专业知识推出了 k8s,主要用于自动化部署应用程序容器,可以支持众多容器化工具包括现在非常流行的Docker。

目前k8s 是容器编排市场的领导者,开源并公布了一系列标准化方法,主流的公有云平台都宣布支持。

一流的厂商都在抢占标准的制高点,一堆小厂商跟着一起玩,这就叫生态了。国内的大厂商都在干嘛呢?抢社区团购市场,玩资本游戏,哎?!

K8s 架构和组件

k8s 由众多组件组成,组件间通过 API 互相通信,归纳起来主要分为三个部分:

  • controller manager
  • nodes
  • pods

k8s集群架构图

k8s集群架构图

  • Controller Manager,即控制平面,用于调度程序以及节点状态检测。
  • Nodes,构成了Kubernetes集群的集体计算能力,实际部署容器运行的地方。
  • Pods,Kubernetes集群中资源的最小单位。

Docker与k8s 难舍难分

Docker 和 k8s 在业界非常流行,都已经是事实上的标准。

Docker 是用于构建、分发、运行容器的平台和工具。

而 k8s 实际上是一个使用 Docker 容器进行编排的系统,主要围绕 pods 进行工作。Pods 是 k8s 生态中最小的调度单位,可以包含一个或多个容器。

Docker 和 k8s 是根本上不同的技术,两者可以很好的协同工作。

开发实践,灵魂追问

(1)没有 k8s 可以使用 docker 吗?

可以。实际上一些小型公司,在业务不太复杂的情况下都是直接使用 Docker。尽管 k8s 有很多好处,但是众所周知它非常复杂,业务比较简单可以放弃使用 k8s。

(2)没有 Docker 可以使用 k8s 吗?

k8s 只是一个容器编排器,没有容器拿什么编排?!

k8s 经常与 Docker 进行搭配使用,但是也可以使用其他容器,如RunC、Containerted 等。

(3)Docker Swarm 和 k8s 怎么选?

选 k8s。2019年底Docker Enterprise已经出售给Mirantis,Mirantis声明要逐步淘汰Docker Swarm,后续会将 k8s 作为默认编排工具。

最后一个问题

Docker 不香吗?为什么还要用 k8s

Docker很香,但 k8s 在业务达到一定规模后也得启用。学会了吗?

– END –

日常厚脸皮求赞:你好技术人,先赞后看,养成习惯,不要白嫖哟。

作者简介:
☕读过几年书:华中科技大学硕士毕业;

😂浪过几个大厂:华为、网易、百度……

😘一直坚信技术能改变生活,愿保持初心,加油技术人!

微信搜索公众号【爱笑的架构师】,关注这个对技术和生活有追求的技术人。

最后推荐一个宝藏开源项目,github.com/smileArchit…

JavaMap是Java知识地图,让开发者学习不迷路!Java学习请认准JavaMap。

JAVA核心知识点整理(283页,超级详细)免费领取。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…743744745…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%