Flink 数据的发送

作者 chauncy 日期 2018-08-08
Flink 数据的发送

Flink 数据的发送是通过 Collector 的collector 方法

public interface Collector<T> {
/**
* Emits a record.
*
* @param record The record to collect.
*/
void collect(T record);
/**
* Closes the collector. If any data was buffered, that data will be flushed.
*/
void close();
}

其中 Output 拓展了 Collector

public interface Output<T> extends Collector<T> {
/**
* Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream
* operators.
*
* <p>A watermark specifies that no element with a timestamp lower or equal to the watermark
* timestamp will be emitted in the future.
*/
void emitWatermark(Watermark mark);
/**
* Emits a record the side output identified by the given {@link OutputTag}.
*
* @param record The record to collect.
*/
<X> void collect(OutputTag<X> outputTag, StreamRecord<X> record);
void emitLatencyMarker(LatencyMarker latencyMarker);
}

实现Output 的类有 RecordWriterOutput

public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
.....
private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;
private SerializationDelegate<StreamElement> serializationDelegate;
private final StreamStatusProvider streamStatusProvider;
private final OutputTag outputTag;
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
// we are only responsible for emitting to the side-output specified by our
// OutputTag.
return;
}
pushToRecordWriter(record);
}
private <X> void pushToRecordWriter(StreamRecord<X> record) {
serializationDelegate.setInstance(record);
try {
recordWriter.emit(serializationDelegate);
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}

StreamRecordWriter

public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
@Override
public void emit(T record) throws IOException, InterruptedException {
checkErroneous();
super.emit(record);
if (flushAlways) {
flush();
}
}
}

RecordWriter

public class RecordWriter<T extends IOReadableWritable>{
protected final ResultPartitionWriter targetPartition; // 用于真正写入到没有个partition
private final ChannelSelector<T> channelSelector; //用于选着发送到哪一个channel
/** {@link RecordSerializer} per outgoing channel */
private final RecordSerializer<T>[] serializers; //每一个channel 的序列
}
public void emit(T record) throws IOException, InterruptedException {
for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
sendToTarget(record, targetChannel);
}
}
private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
RecordSerializer<T> serializer = serializers[targetChannel];
synchronized (serializer) { //每一个channel serializer不能并发
SerializationResult result = serializer.addRecord(record);
while (result.isFullBuffer()) { //buffer,底层使用memorySegment 已满
Buffer buffer = serializer.getCurrentBuffer(); //将其取出
if (buffer != null) {
numBytesOut.inc(buffer.getSizeUnsafe());
writeAndClearBuffer(buffer, targetChannel, serializer);
// If this was a full record, we are done. Not breaking
// out of the loop at this point will lead to another
// buffer request before breaking out (that would not be
// a problem per se, but it can lead to stalls in the
// pipeline).
if (result.isFullRecord()) {
break;
}
} else {
buffer = targetPartition.getBufferProvider().requestBufferBlocking();
result = serializer.setNextBuffer(buffer);
}
}
}
private void writeAndClearBuffer(
Buffer buffer,
int targetChannel,
RecordSerializer<T> serializer) throws IOException {
try {
targetPartition.writeBuffer(buffer, targetChannel);
}
finally {
serializer.clearCurrentBuffer();
}
}
}

writeAndClearBuffer 真正写入是通过ResultPartitionWriter 的writeBuffer

public class ResultPartitionWriter implements EventListener<TaskEvent> {
private final ResultPartition partition; //分区的写入
public void writeBuffer(Buffer buffer, int targetChannel) throws IOException {
partition.add(buffer, targetChannel);
}
}

ResultPartition 直接将buffer add

public class ResultPartition implements BufferPoolOwner{
private final ResultSubpartition[] subpartitions; //对buffer 的add ,具体分为 pipeline or Spillable ,这是两种不同的机制 A pipelined in-memory only subpartition, which can be consumed once. A spillable sub partition starts out in-memory and spills to disk if asked to do so. 就是一种在内存里面,一种是刷新到磁盘上面,内存是仅仅只能获取一次
public void add(Buffer buffer, int subpartitionIndex) throws IOException {
boolean success = false;
try {
checkInProduceState();
final ResultSubpartition subpartition = subpartitions[subpartitionIndex];
synchronized (subpartition) {
success = subpartition.add(buffer);
// Update statistics
totalNumberOfBuffers++;
totalNumberOfBytes += buffer.getSize();
}
}
finally {
if (success) {
notifyPipelinedConsumers();
}
else {
buffer.recycle();
}
}
}
}

ResultSubpartition 的add 时候就是将buffer 放入到一个queue 里面去同时notify reader 进行读取如果对于SpillableSubpartition则会刷入到磁盘,然后在返回