Flink-Job(DAG)的生成(1)

作者 chauncy 日期 2020-05-18
Flink-Job(DAG)的生成(1)

As we knonw ,Flink 编写代码的一共就分为三步

  • 定义 source 数据源
  • 对source 进行操作(map,fliter,flatmap process etc)
  • 定义一个sink,数据处理完结果输出地

当然这仅仅是一个大体主要的流程,还有一些去其他配置这里就忽略掉了

定义完所有操作过后,他会生成一个DAG,然后部署到集群进行运行

从一个例子开始

从flink 与代码里面example 样例找的一个

public static void main(String[] args) throws Exception {
// Checking input parameters
final ParameterTool params = ParameterTool.fromArgs(args);
System.out.println("Usage: TwitterExample [--output <path>] " +
"[--twitter-source.consumerKey <key> --twitter-source.consumerSecret <secret> --twitter-source.token <token> --twitter-source.tokenSecret <tokenSecret>]");
// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
env.setParallelism(params.getInt("parallelism", 1));
// get input data
DataStream<String> streamSource;
if (params.has(TwitterSource.CONSUMER_KEY) &&
params.has(TwitterSource.CONSUMER_SECRET) &&
params.has(TwitterSource.TOKEN) &&
params.has(TwitterSource.TOKEN_SECRET)
) {
streamSource = env.addSource(new TwitterSource(params.getProperties()));
} else {
System.out.println("Executing TwitterStream example with default props.");
System.out.println("Use --twitter-source.consumerKey <key> --twitter-source.consumerSecret <secret> " +
"--twitter-source.token <token> --twitter-source.tokenSecret <tokenSecret> specify the authentication info.");
// get default test text data
streamSource = env.fromElements(TwitterExampleData.TEXTS);
}
DataStream<Tuple2<String, Integer>> tweets = streamSource
// selecting English tweets and splitting to (word, 1)
.flatMap(new SelectEnglishAndTokenizeFlatMap())
// group by words and sum their occurrences
.keyBy(0).sum(1);
// emit result
if (params.has("output")) {
tweets.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
tweets.print();
}
// execute program
env.execute("Twitter Streaming Example");
}

他的步骤如下

  • ParameterTool.fromArgs(args) 解析参数(可以ignore)
  • StreamExecutionEnvironment.getExecutionEnvironment() (得到一个环境变量,如果是本地的话,就是调用的local stream execution env)
  • env.setParallelism 设置并行度
  • streamSource = env.addSource(new TwitterSource(params.getProperties())) 设置一个source ,上面说过的
  • streamSource
    and splitting to (word, 1)
    .flatMap(new SelectEnglishAndTokenizeFlatMap())
    .keyBy(0).sum(1); 对这source 进行计算 上面说到的operator
    
  • tweets.writeAsText(params.get(“output”)); 数据输出到哪里,定义一个sink
  • env.execute(“Twitter Streaming Example”); 最后进行一个执行

前面的配置项我们暂时可以忽略
主要看怎么添加source ,operator 怎么转换的,sink 怎么添加的,最后怎么生产的DAG 的

环境变量StreamExecutionEnvironment

StreamExecutionEnvironment 是一个抽象的class
定义了一些addSource fromCollection 基本跟怎么获取一个source 有关的方法,以及一些类型
他的主要子类有 RemoteStreamEnvironment LocalStreamEnvironment 这里的区别就是Local 就是用于在直接启动一个job,remote 就是在生产环境,提交一个job ,这个有可能是yarn 的资源调度

根据我们的例子,我们主要看一下他的addSource 方法

public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
if (typeInfo == null) {
if (function instanceof ResultTypeQueryable) {
typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
} else {
try {
typeInfo = TypeExtractor.createTypeInfo(
SourceFunction.class,
function.getClass(), 0, null, null);
} catch (final InvalidTypesException e) {
typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
}
}
}
boolean isParallel = function instanceof ParallelSourceFunction;
clean(function);
StreamSource<OUT, ?> sourceOperator;
if (function instanceof StoppableFunction) {
sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
} else {
sourceOperator = new StreamSource<>(function);
}
return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}

主要步骤

  • type info 类型的初始化
  • 并行度的设置
  • 构造一个StreamSource
  • 最后生成一个 DataStreamSource

###StreamSource
StreamSource 继承AbstractUdfStreamOperator 同时实现了StreamOperator ,然AbstractUdfStreamOperator 继承AbstractStreamOperator ,AbstractStreamOperator 实现了StreamOperator
StreamOperator 主要是一些生命周期的方法,状态 快照的一些更新 存储

public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT>

###DataStreamSource

DataStreamSource 继承 SingleOutputStreamOperator ,SingleOutputStreamOperator 继承DataStream,DataStream 就是所有的operator 的一个抽象,他提供了 flatMap,map
,filter etc,不过 DataStream 依赖于StreamExecutionEnvironment StreamTransformation

整体的类图

source 类图

DataStream 与StreamOperator 中间通过StreamTransformation 进行一个桥接

可以这样理解,DataStream 是一个具体的数据流,StreamOperator 是在这个流上进行一个操作,比如map ,filter 这样的

DataStream 具体的operator

例子中的DataStream 的 flatMap(new SelectEnglishAndTokenizeFlatMap())
进入flatMap

public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);
return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
}

生成 out type 生成一个StreamFlatMap,StreamFlatMap 上线了OneInputStreamOperator ,OneInputStreamOperator 继承了 StreamOperator ,所以生产了还是一个 streamOPerator 最后call transform 方法

public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operator,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}

生成一个 resultTransform ,它是一个Transformation,因为OneInputTransformation是Transformation的子类,同时我们new OneInputTransformation 的时候也需要传入了OneInputStreamOperator
在这里 Transformation 与stream operator 联系起来了,然后DataStream 又有一个transformation ,所以 这里就是 DataStream 通过transformation 连接了StreamOperator ,最后调用了 ExecutionEnvironment 的addOperator,将transformation放到 execution env 里面

public void addOperator(StreamTransformation<?> transformation) {
Preconditions.checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
}

env.execute

最后当我们执行 env.execute 他就会将 env 里面的transformation 生成一个有向无环图DAG …敬请期待