DataStream的设计与实现 DataStream的设计

DataStream的设计与实现

一. DataStream的含义以及成员构成

1. DataStream的含义

由源码翻译可知,DataStream是一个集合,包含了相同类型元素的数据流,一个DataStream可以通过transformation(如Map,filter操作)变成另外一种DataStream.同时,DataStream主要用于表达业务逻辑,实际上并没有存储真实数据

2.DataStream的继承关系

image.png

图一 DataStream的成员变量以及子类

1.由图一可知,DataStream主要是由StreamExecutionEnvironment 以及transformation构成.其中SingleOutputStreamOperator、keyedStream继承了DataStream抽象类.

  • transformation是当前的DataStream对应的上一次的转换操作,即上个阶段的流完成这个transformation操作然后得到了当前流。
  • StreamExecutionEnvironment会将DataStream之间的转换操作存储至StreamExecutionEnvironment的List中.然后基于这些转换操作构建左右Pipeline拓扑,用于描述整个作业的计算逻辑

图一中也标识了DataStreamSink,主要是用来从数据流拓扑结构生成元素,同时和其他DataStream进行一个区分

3. DataStream的方法

image.png

图二 DataStream的方法介绍

DataStream有大量的transform(转换)操作方法可以调用,主要是将DataStream流进行操作变换成为另外一种流,其底层是调用了Transform方法.

二、DataStream API的应用实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码
1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2. DataStream<String> inputStream = env.readTextFile("xxxx.csv");
3. SingleOutputStreamOperator<Role> dataStream = inputStream.map(new MapFunction<String, Role>() {
@Override
public Role map(String value) throws Exception {
String[] arrs = value.split(",");
return new Role(Long.parseLong(arrs[0]), Long.parseLong(arrs[1]), Integer.parseInt(arrs[2]), arrs[3], Long.parseLong(arrs[4]));
}
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Role>() {
@Override
public long extractAscendingTimestamp(Role element) {
return element.timestamp * 1000L;
}
});
  1. 第一句首先构建了一个StreamExecutionEnvironment对象env,env.readText会生成DataStreamSource
  2. 第二句env.readTextFile方法中构建出了DataStream对象,DataStreamSource继承了DataStream
  3. 第三句DataStream对象调用了map方法,生成了SingleOutputStreamOperator对象,因此由当前的dataStream转换成了另外一个dataStream

三、 DataStream的map方法调用流程

image.png

图三 DataStream的map方法调用流程

如图三所示,是DataStream的map方法的调用流程,由图得知,所有调用的方法都在dataStream类中进行调用跳转.最后调用了doTransform方法来生成DataStream,返回SingleOutputStreamOperator.

值得注意的是

SingleOutputStreamOperator是继承了DataStream类,属于特殊的DtataStream

SingleOutputStreamOperator是继承了DataStream类,属于特殊的DtataStream
SingleOutputStreamOperator是继承了DataStream类,属于特殊的DtataStream

因此transformation操作本质上就是一种datastream变成另外一种datastream

doTransform方法实现逻辑
  • 如图三所示, 在调用doTransform方法的时候,主要是把之前map操作中的mapfunction封装成operator.同时,将当前datastream的transformation作为参数,一起构建成新的transformation
  • 将新的tranformation加入至executionenvironment中,用于构建streamGraph
  • 将excutionenvironment和tranformation作为参数,构建新的DataStream

DataStream.doTranform()方法定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码	protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {

// 获取上一次tranformation的输出类型
transformation.getOutputType();

//将本次tranformation作为输入,生成一个新的tranformation
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());

//将新的environment和新的tranformation作为参数,构建成新的datastream(SingleOutputStreamOperator)
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

//将当前新生成的tranformation加入到executionenvironment中,用于生成streamGraph
getExecutionEnvironment().addOperator(resultTransform);

return returnStream;
}

参考文献

Flink设计与实现:核心原理与源码解析

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%