博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink DataStream的window coGroup操作
阅读量:6814 次
发布时间:2019-06-26

本文共 14704 字,大约阅读时间需要 49 分钟。

本文主要研究一下flink DataStream的window coGroup操作

实例

dataStream.coGroup(otherStream)    .where(0).equalTo(1)    .window(TumblingEventTimeWindows.of(Time.seconds(3)))    .apply (new CoGroupFunction () {...});复制代码
  • 这里展示了DataStream的window coGroup操作的基本用法

DataStream.coGroup

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

@Publicpublic class DataStream
{ //...... public
CoGroupedStreams
coGroup(DataStream
otherStream) { return new CoGroupedStreams<>(this, otherStream); } //......}复制代码
  • DataStream的coGroup操作创建的是CoGroupedStreams

CoGroupedStreams

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

@Publicpublic class CoGroupedStreams
{ private final DataStream
input1; private final DataStream
input2; public CoGroupedStreams(DataStream
input1, DataStream
input2) { this.input1 = requireNonNull(input1); this.input2 = requireNonNull(input2); } public
Where
where(KeySelector
keySelector) { Preconditions.checkNotNull(keySelector); final TypeInformation
keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType()); return where(keySelector, keyType); } public
Where
where(KeySelector
keySelector, TypeInformation
keyType) { Preconditions.checkNotNull(keySelector); Preconditions.checkNotNull(keyType); return new Where<>(input1.clean(keySelector), keyType); } //.......}复制代码
  • CoGroupedStreams提供了where操作,用于指定input1的keySelector,它创建并返回Where对象

Where

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

@Public	public class Where
{ private final KeySelector
keySelector1; private final TypeInformation
keyType; Where(KeySelector
keySelector1, TypeInformation
keyType) { this.keySelector1 = keySelector1; this.keyType = keyType; } public EqualTo equalTo(KeySelector
keySelector) { Preconditions.checkNotNull(keySelector); final TypeInformation
otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType()); return equalTo(keySelector, otherKey); } public EqualTo equalTo(KeySelector
keySelector, TypeInformation
keyType) { Preconditions.checkNotNull(keySelector); Preconditions.checkNotNull(keyType); if (!keyType.equals(this.keyType)) { throw new IllegalArgumentException("The keys for the two inputs are not equal: " + "first key = " + this.keyType + " , second key = " + keyType); } return new EqualTo(input2.clean(keySelector)); } //...... } 复制代码
  • Where对象提供了equalTo操作,用于指定input2的keySelector,它创建并返回EqualTo对象

EqualTo

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

@Public		public class EqualTo {			private final KeySelector
keySelector2; EqualTo(KeySelector
keySelector2) { this.keySelector2 = requireNonNull(keySelector2); } @PublicEvolving public
WithWindow
window(WindowAssigner
, W> assigner) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null); } }复制代码
  • EqualTo对象提供了window操作,它创建并返回WithWindow对象

WithWindow

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

@Public	public static class WithWindow
{ private final DataStream
input1; private final DataStream
input2; private final KeySelector
keySelector1; private final KeySelector
keySelector2; private final TypeInformation
keyType; private final WindowAssigner
, W> windowAssigner; private final Trigger
, ? super W> trigger; private final Evictor
, ? super W> evictor; private final Time allowedLateness; private WindowedStream
, KEY, W> windowedStream; protected WithWindow(DataStream
input1, DataStream
input2, KeySelector
keySelector1, KeySelector
keySelector2, TypeInformation
keyType, WindowAssigner
, W> windowAssigner, Trigger
, ? super W> trigger, Evictor
, ? super W> evictor, Time allowedLateness) { this.input1 = input1; this.input2 = input2; this.keySelector1 = keySelector1; this.keySelector2 = keySelector2; this.keyType = keyType; this.windowAssigner = windowAssigner; this.trigger = trigger; this.evictor = evictor; this.allowedLateness = allowedLateness; } @PublicEvolving public WithWindow
trigger(Trigger
, ? super W> newTrigger) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, newTrigger, evictor, allowedLateness); } @PublicEvolving public WithWindow
evictor(Evictor
, ? super W> newEvictor) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, trigger, newEvictor, allowedLateness); } @PublicEvolving public WithWindow
allowedLateness(Time newLateness) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, trigger, evictor, newLateness); } public
DataStream
apply(CoGroupFunction
function) { TypeInformation
resultType = TypeExtractor.getCoGroupReturnTypes( function, input1.getType(), input2.getType(), "CoGroup", false); return apply(function, resultType); } @PublicEvolving @Deprecated public
SingleOutputStreamOperator
with(CoGroupFunction
function) { return (SingleOutputStreamOperator
) apply(function); } public
DataStream
apply(CoGroupFunction
function, TypeInformation
resultType) { //clean the closure function = input1.getExecutionEnvironment().clean(function); UnionTypeInfo
unionType = new UnionTypeInfo<>(input1.getType(), input2.getType()); UnionKeySelector
unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2); DataStream
> taggedInput1 = input1 .map(new Input1Tagger
()) .setParallelism(input1.getParallelism()) .returns(unionType); DataStream
> taggedInput2 = input2 .map(new Input2Tagger
()) .setParallelism(input2.getParallelism()) .returns(unionType); DataStream
> unionStream = taggedInput1.union(taggedInput2); // we explicitly create the keyed stream to manually pass the key type information in windowedStream = new KeyedStream
, KEY>(unionStream, unionKeySelector, keyType) .window(windowAssigner); if (trigger != null) { windowedStream.trigger(trigger); } if (evictor != null) { windowedStream.evictor(evictor); } if (allowedLateness != null) { windowedStream.allowedLateness(allowedLateness); } return windowedStream.apply(new CoGroupWindowFunction
(function), resultType); } @PublicEvolving @Deprecated public
SingleOutputStreamOperator
with(CoGroupFunction
function, TypeInformation
resultType) { return (SingleOutputStreamOperator
) apply(function, resultType); } @VisibleForTesting Time getAllowedLateness() { return allowedLateness; } @VisibleForTesting WindowedStream
, KEY, W> getWindowedStream() { return windowedStream; } }复制代码
  • WithWindow可以设置windowAssigner、trigger、evictor、allowedLateness,它提供apply操作(with操作被标记为废弃)
  • apply操作接收CoGroupFunction,它内部是先根据两个keySelector创建UnionKeySelector,然后对两个input stream分别使用Input1Tagger及Input2Tagger进行map转换为TaggedUnion对象的stream,然后执行taggedInput1.union(taggedInput2)得到unionStream,之后使用UnionKeySelector将unionStream转换为KeyedStream,之后在对KeyedStream执行window操作,把原来的windowAssigner、trigger、evictor、allowedLateness都赋值过去,最后将用户定义的CoGroupFunction包装为CoGroupWindowFunction,然后调用windowedStream.apply方法
  • 可以看到apply操作内部转化的WindowedStream,其element类型为TaggedUnion;WindowedStream使用的KeyedStream,它的KeySelector为UnionKeySelector;而KeyedStream是基于TaggedUnion类型的DataStream,是taggedInput1.union(taggedInput2)操作而来;而taggedInput1及taggedInput2是对原始input stream进行map操作而来,使用的MapFunction分别是Input1Tagger及Input2Tagger

CoGroupFunction

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/CoGroupFunction.java

@Public@FunctionalInterfacepublic interface CoGroupFunction
extends Function, Serializable { void coGroup(Iterable
first, Iterable
second, Collector
out) throws Exception;}复制代码
  • CoGroupFunction继承了Function,它定义了coGroup方法,该方法接收两个Iterable类型的element集合

Input1Tagger及Input2Tagger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

private static class Input1Tagger
implements MapFunction
> { private static final long serialVersionUID = 1L; @Override public TaggedUnion
map(T1 value) throws Exception { return TaggedUnion.one(value); } } private static class Input2Tagger
implements MapFunction
> { private static final long serialVersionUID = 1L; @Override public TaggedUnion
map(T2 value) throws Exception { return TaggedUnion.two(value); } }复制代码
  • Input1Tagger及Input2Tagger实现了MapFunction,该map方法返回的类型为TaggedUnion

TaggedUnion

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

@Internal	public static class TaggedUnion
{ private final T1 one; private final T2 two; private TaggedUnion(T1 one, T2 two) { this.one = one; this.two = two; } public boolean isOne() { return one != null; } public boolean isTwo() { return two != null; } public T1 getOne() { return one; } public T2 getTwo() { return two; } public static
TaggedUnion
one(T1 one) { return new TaggedUnion<>(one, null); } public static
TaggedUnion
two(T2 two) { return new TaggedUnion<>(null, two); } }复制代码
  • TaggedUnion里头有one、two两个属性,它提供了两个静态工厂方法one及two,可以看到TaggedUnion对象要么one为null,要么two为null,不可能两个同时有值

UnionKeySelector

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

private static class UnionKeySelector
implements KeySelector
, KEY> { private static final long serialVersionUID = 1L; private final KeySelector
keySelector1; private final KeySelector
keySelector2; public UnionKeySelector(KeySelector
keySelector1, KeySelector
keySelector2) { this.keySelector1 = keySelector1; this.keySelector2 = keySelector2; } @Override public KEY getKey(TaggedUnion
value) throws Exception{ if (value.isOne()) { return keySelector1.getKey(value.getOne()); } else { return keySelector2.getKey(value.getTwo()); } } }复制代码
  • UnionKeySelector有两个KeySelector属性,它的getKey操作根据TaggedUnion来判断,如果是one,则使用keySelector1.getKey(value.getOne()),否则使用keySelector2.getKey(value.getTwo())

DataStream.union

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

@Publicpublic class DataStream
{ //...... @SafeVarargs public final DataStream
union(DataStream
... streams) { List
> unionedTransforms = new ArrayList<>(); unionedTransforms.add(this.transformation); for (DataStream
newStream : streams) { if (!getType().equals(newStream.getType())) { throw new IllegalArgumentException("Cannot union streams of different types: " + getType() + " and " + newStream.getType()); } unionedTransforms.add(newStream.getTransformation()); } return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms)); } //......}复制代码
  • DataStream的union操作,使用UnionTransformation创建了一个新的DataStream;注意union操作需要两个stream使用相同类型的element,这就是为什么WithWindow的apply操作对两个input stream分别使用Input1Tagger及Input2Tagger进行map转换为TaggedUnion对象来统一两个stream的element类型的原因

CoGroupWindowFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

private static class CoGroupWindowFunction
extends WrappingFunction
> implements WindowFunction
, T, KEY, W> { private static final long serialVersionUID = 1L; public CoGroupWindowFunction(CoGroupFunction
userFunction) { super(userFunction); } @Override public void apply(KEY key, W window, Iterable
> values, Collector
out) throws Exception { List
oneValues = new ArrayList<>(); List
twoValues = new ArrayList<>(); for (TaggedUnion
val: values) { if (val.isOne()) { oneValues.add(val.getOne()); } else { twoValues.add(val.getTwo()); } } wrappedFunction.coGroup(oneValues, twoValues, out); } }复制代码
  • CoGroupWindowFunction继承了WrappingFunction(WrappingFunction继承了AbstractRichFunction,覆盖了父类的open、close、setRuntimeContext方法,用于管理wrappedFunction),实现了WindowFunction接口,其apply方法对TaggedUnion类型的Iterable数据进行拆解,分别拆分到oneValues及twoValues中,然后调用用户定义的CoGroupFunction的coGroup方法

小结

  • DataStream提供了coGroup方法,用于执行window coGroup操作,它返回的是CoGroupedStreams;CoGroupedStreams主要是提供where操作来构建Where对象;Where对象主要提供equalTo操作用于构建EqualTo对象;EqualTo对象提供window操作用于构建WithWindow对象;WithWindow可以设置windowAssigner、trigger、evictor、allowedLateness,它提供apply操作
  • CoGroupedStreams的WithWindow对象的apply操作接收CoGroupFunction,它内部是先根据两个keySelector创建UnionKeySelector,然后对两个input stream分别使用Input1Tagger及Input2Tagger进行map转换为TaggedUnion对象的stream,然后执行taggedInput1.union(taggedInput2)得到unionStream,之后使用UnionKeySelector将unionStream转换为KeyedStream,之后在对KeyedStream执行window操作,把原来的windowAssigner、trigger、evictor、allowedLateness都赋值过去,最后将用户定义的CoGroupFunction包装为CoGroupWindowFunction,然后调用windowedStream.apply方法
  • CoGroupedStreams的WithWindow对象的apply操作借助了DataStream的union操作类合并两个stream,然后转换为KeyedStream,这里关键的两个类分别是TaggedUnion及UnionKeySelector;TaggedUnion里头有one、two两个属性,它提供了两个静态工厂方法one及two,可以看到TaggedUnion对象要么one为null,要么two为null,不可能两个同时有值;UnionKeySelector有两个KeySelector属性,它的getKey操作根据TaggedUnion来判断,如果是one,则使用keySelector1.getKey(value.getOne()),否则使用keySelector2.getKey(value.getTwo())(借助TaggedUnion类统一两个stream的element类型,然后好执行union操作)
  • CoGroupWindowFunction继承了WrappingFunction(WrappingFunction继承了AbstractRichFunction,覆盖了父类的open、close、setRuntimeContext方法,用于管理wrappedFunction),实现了WindowFunction接口,其apply方法对TaggedUnion类型的Iterable数据进行拆解,分别拆分到oneValues及twoValues中,然后调用用户定义的CoGroupFunction的coGroup方法
  • CoGroupFunction继承了Function,它定义了coGroup方法,该方法接收两个Iterable类型的element集合;JoinedStreams的WithWindow对象的apply方法内部将JoinFunction或者FlatJoinFunction包装为CoGroupFunction(JoinFunction使用JoinCoGroupFunction包装,FlatJoinFunction使用FlatJoinCoGroupFunction包装),然后去调用CoGroupedStreams的WithWindow的apply方法;而JoinCoGroupFunction及FlatJoinCoGroupFunction继承了WrappingFunction,同时实现CoGroupFunction接口定义的coGroup方法,默认是遍历第一个集合,对其每个元素遍历第二个集合,挨个执行JoinFunction或FlatJoinFunction的join方法(这里的操作对集合为空的情况不做任何操作,因而实现的就是inner join效果;用户使用coGroup操作可以自定义CoGroupFunction实现outer join)

doc

转载地址:http://dtzzl.baihongyu.com/

你可能感兴趣的文章
AutoScaling 弹性伸缩附加与分离RDS实例
查看>>
冒泡事件
查看>>
Spring Cloud Config采用Git存储时两种常用的配置策略
查看>>
PLook——记录你的知识
查看>>
css布局基础总结
查看>>
如何成为一位「不那么差」的程序员
查看>>
深入理解计算机系统读书笔记
查看>>
前端开发工作一年小记
查看>>
Java知识点总结(Java容器-TreeSet)
查看>>
ionic3 UI Components学习4:Button 按钮
查看>>
highcharts实现饼状图
查看>>
npm常用命令集合
查看>>
6. Java 中的基本数据类型 【连载 6】
查看>>
three.js简介 —— 3D框架
查看>>
MySQL - 索引详解
查看>>
比特币:交易的数据结构
查看>>
基于vue-electron的小项目
查看>>
【收藏】15个常用的javaScript正则表达式
查看>>
大数据可视化 - 收藏集 - 掘金
查看>>
尤大低仿博客带回家
查看>>