Flink 数据的发送是通过 Collector 的collector 方法public interface Collector <T > {
* Emits a record.
*
* @param record The record to collect.
*/
void collect (T record) ;
* Closes the collector. If any data was buffered, that data will be flushed.
*/
void close () ;
}
其中 Output 拓展了 Collector
public interface Output <T > extends Collector <T > {
* Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream
* operators.
*
* <p>A watermark specifies that no element with a timestamp lower or equal to the watermark
* timestamp will be emitted in the future.
*/
void emitWatermark (Watermark mark) ;
* Emits a record the side output identified by the given {@link OutputTag}.
*
* @param record The record to collect.
*/
<X> void collect (OutputTag<X> outputTag, StreamRecord<X> record) ;
void emitLatencyMarker (LatencyMarker latencyMarker) ;
}
实现Output 的类有 RecordWriterOutputpublic class RecordWriterOutput <OUT > implements Output <StreamRecord <OUT >> {
.....
private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;
private SerializationDelegate<StreamElement> serializationDelegate;
private final StreamStatusProvider streamStatusProvider;
private final OutputTag outputTag;
@Override
public <X> void collect (OutputTag<X> outputTag, StreamRecord<X> record) {
if (this .outputTag == null || !this .outputTag.equals(outputTag)) {
return ;
}
pushToRecordWriter(record);
}
private <X> void pushToRecordWriter (StreamRecord<X> record) {
serializationDelegate.setInstance(record);
try {
recordWriter.emit(serializationDelegate);
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
StreamRecordWriterpublic class StreamRecordWriter <T extends IOReadableWritable > extends RecordWriter <T > {
@Override
public void emit (T record) throws IOException, InterruptedException {
checkErroneous();
super .emit(record);
if (flushAlways) {
flush();
}
}
}
RecordWriterpublic class RecordWriter <T extends IOReadableWritable > {
protected final ResultPartitionWriter targetPartition;
private final ChannelSelector<T> channelSelector;
private final RecordSerializer<T>[] serializers;
}
public void emit (T record) throws IOException, InterruptedException {
for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
sendToTarget(record, targetChannel);
}
}
private void sendToTarget (T record, int targetChannel) throws IOException, InterruptedException {
RecordSerializer<T> serializer = serializers[targetChannel];
synchronized (serializer) {
SerializationResult result = serializer.addRecord(record);
while (result.isFullBuffer()) {
Buffer buffer = serializer.getCurrentBuffer();
if (buffer != null ) {
numBytesOut.inc(buffer.getSizeUnsafe());
writeAndClearBuffer(buffer, targetChannel, serializer);
if (result.isFullRecord()) {
break ;
}
} else {
buffer = targetPartition.getBufferProvider().requestBufferBlocking();
result = serializer.setNextBuffer(buffer);
}
}
}
private void writeAndClearBuffer (
Buffer buffer,
int targetChannel,
RecordSerializer<T> serializer) throws IOException {
try {
targetPartition.writeBuffer(buffer, targetChannel);
}
finally {
serializer.clearCurrentBuffer();
}
}
}
writeAndClearBuffer 真正写入是通过ResultPartitionWriter 的writeBuffer
public class ResultPartitionWriter implements EventListener <TaskEvent > {
private final ResultPartition partition;
public void writeBuffer (Buffer buffer, int targetChannel) throws IOException {
partition.add(buffer, targetChannel);
}
}
ResultPartition 直接将buffer addpublic class ResultPartition implements BufferPoolOwner {
private final ResultSubpartition[] subpartitions;
public void add (Buffer buffer, int subpartitionIndex) throws IOException {
boolean success = false ;
try {
checkInProduceState();
final ResultSubpartition subpartition = subpartitions[subpartitionIndex];
synchronized (subpartition) {
success = subpartition.add(buffer);
totalNumberOfBuffers++;
totalNumberOfBytes += buffer.getSize();
}
}
finally {
if (success) {
notifyPipelinedConsumers();
}
else {
buffer.recycle();
}
}
}
}
ResultSubpartition 的add 时候就是将buffer 放入到一个queue 里面去同时notify reader 进行读取如果对于SpillableSubpartition则会刷入到磁盘,然后在返回