Talk about flink’s CsvReader

  flink

Order

This article mainly studies flink’s CsvReader

Example

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<RecordDto> csvInput = env.readCsvFile(csvFilePath)
                .pojoType(RecordDto.class, "playerName", "country", "year", "game", "gold", "silver", "bronze", "total");

        csvInput.map(new MapFunction<RecordDto, RecordDto>() {
            @Override
            public RecordDto map(RecordDto value) throws Exception {
                LOGGER.info("execute map:{}",value);
                TimeUnit.SECONDS.sleep(5);
                return value;
            }
        }).print();

ExecutionEnvironment.readCsvFile

flink-java-1.6.2-sources.jar! /org/apache/flink/api/java/ExecutionEnvironment.java

    /**
     * Creates a CSV reader to read a comma separated value (CSV) file. The reader has options to
     * define parameters and field types and will eventually produce the DataSet that corresponds to
     * the read and parsed CSV input.
     *
     * @param filePath The path of the CSV file.
     * @return A CsvReader that can be used to configure the CSV input.
     */
    public CsvReader readCsvFile(String filePath) {
        return new CsvReader(filePath, this);
    }
  • CsvReader is created here based on filePath.

CsvReader

flink-java-1.6.2-sources.jar! /org/apache/flink/api/java/io/CsvReader.java

    public CsvReader(String filePath, ExecutionEnvironment executionContext) {
        this(new Path(Preconditions.checkNotNull(filePath, "The file path may not be null.")), executionContext);
    }

    public CsvReader(Path filePath, ExecutionEnvironment executionContext) {
        Preconditions.checkNotNull(filePath, "The file path may not be null.");
        Preconditions.checkNotNull(executionContext, "The execution context may not be null.");

        this.path = filePath;
        this.executionContext = executionContext;
    }

    /**
     * Configures the reader to read the CSV data and parse it to the given type. The all fields of the type
     * must be public or able to set value. The type information for the fields is obtained from the type class.
     *
     * @param pojoType The class of the target POJO.
     * @param pojoFields The fields of the POJO which are mapped to CSV fields.
     * @return The DataSet representing the parsed CSV data.
     */
    public <T> DataSource<T> pojoType(Class<T> pojoType, String... pojoFields) {
        Preconditions.checkNotNull(pojoType, "The POJO type class must not be null.");
        Preconditions.checkNotNull(pojoFields, "POJO fields must be specified (not null) if output type is a POJO.");

        final TypeInformation<T> ti = TypeExtractor.createTypeInfo(pojoType);
        if (!(ti instanceof PojoTypeInfo)) {
            throw new IllegalArgumentException(
                "The specified class is not a POJO. The type class must meet the POJO requirements. Found: " + ti);
        }
        final PojoTypeInfo<T> pti = (PojoTypeInfo<T>) ti;

        CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, pti, pojoFields, this.includedMask);

        configureInputFormat(inputFormat);

        return new DataSource<T>(executionContext, inputFormat, pti, Utils.getCallLocationName());
    }
  • CsvReader provides pojoType method, which is used to map csv data to java type and convert it to flink’s DataSource; . When the DataSource was created, PojoCsvInputFormat and PojoTypeInfo were provided here.

Task

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
    }
}
  • The run method of a Task calls invokable.invoke (), here invokable is DataSourceTask.

DataSourceTask.invoke

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/operators/DataSourceTask.java

    @Override
    public void invoke() throws Exception {
        // --------------------------------------------------------------------
        // Initialize
        // --------------------------------------------------------------------
        initInputFormat();

        LOG.debug(getLogString("Start registering input and output"));

        try {
            initOutputs(getUserCodeClassLoader());
        } catch (Exception ex) {
            throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " +
                    ex.getMessage(), ex);
        }

        LOG.debug(getLogString("Finished registering input and output"));

        // --------------------------------------------------------------------
        // Invoke
        // --------------------------------------------------------------------
        LOG.debug(getLogString("Starting data source operator"));

        RuntimeContext ctx = createRuntimeContext();

        final Counter numRecordsOut;
        {
            Counter tmpNumRecordsOut;
            try {
                OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
                ioMetricGroup.reuseInputMetricsForTask();
                if (this.config.getNumberOfChainedStubs() == 0) {
                    ioMetricGroup.reuseOutputMetricsForTask();
                }
                tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                tmpNumRecordsOut = new SimpleCounter();
            }
            numRecordsOut = tmpNumRecordsOut;
        }
        
        Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");

        if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
            ((RichInputFormat) this.format).setRuntimeContext(ctx);
            LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
            ((RichInputFormat) this.format).openInputFormat();
            LOG.debug(getLogString("Rich Source detected. Opening the InputFormat."));
        }

        ExecutionConfig executionConfig = getExecutionConfig();

        boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();

        LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
        
        final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer();
        
        try {
            // start all chained tasks
            BatchTask.openChainedTasks(this.chainedTasks, this);
            
            // get input splits to read
            final Iterator<InputSplit> splitIterator = getInputSplits();
            
            // for each assigned input split
            while (!this.taskCanceled && splitIterator.hasNext())
            {
                // get start and end
                final InputSplit split = splitIterator.next();

                LOG.debug(getLogString("Opening input split " + split.toString()));
                
                final InputFormat<OT, InputSplit> format = this.format;
            
                // open input format
                format.open(split);
    
                LOG.debug(getLogString("Starting to read input from split " + split.toString()));
                
                try {
                    final Collector<OT> output = new CountingCollector<>(this.output, numRecordsOut);

                    if (objectReuseEnabled) {
                        OT reuse = serializer.createInstance();

                        // as long as there is data to read
                        while (!this.taskCanceled && !format.reachedEnd()) {

                            OT returned;
                            if ((returned = format.nextRecord(reuse)) != null) {
                                output.collect(returned);
                            }
                        }
                    } else {
                        // as long as there is data to read
                        while (!this.taskCanceled && !format.reachedEnd()) {
                            OT returned;
                            if ((returned = format.nextRecord(serializer.createInstance())) != null) {
                                output.collect(returned);
                            }
                        }
                    }

                    if (LOG.isDebugEnabled() && !this.taskCanceled) {
                        LOG.debug(getLogString("Closing input split " + split.toString()));
                    }
                } finally {
                    // close. We close here such that a regular close throwing an exception marks a task as failed.
                    format.close();
                }
                completedSplitsCounter.inc();
            } // end for all input splits

            // close the collector. if it is a chaining task collector, it will close its chained tasks
            this.output.close();

            // close all chained tasks letting them report failure
            BatchTask.closeChainedTasks(this.chainedTasks, this);

        }
        catch (Exception ex) {
            // close the input, but do not report any exceptions, since we already have another root cause
            try {
                this.format.close();
            } catch (Throwable ignored) {}

            BatchTask.cancelChainedTasks(this.chainedTasks);

            ex = ExceptionInChainedStubException.exceptionUnwrap(ex);

            if (ex instanceof CancelTaskException) {
                // forward canceling exception
                throw ex;
            }
            else if (!this.taskCanceled) {
                // drop exception, if the task was canceled
                BatchTask.logAndThrowException(ex, this);
            }
        } finally {
            BatchTask.clearWriters(eventualOutputs);
            // --------------------------------------------------------------------
            // Closing
            // --------------------------------------------------------------------
            if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
                ((RichInputFormat) this.format).closeInputFormat();
                LOG.debug(getLogString("Rich Source detected. Closing the InputFormat."));
            }
        }

        if (!this.taskCanceled) {
            LOG.debug(getLogString("Finished data source operator"));
        }
        else {
            LOG.debug(getLogString("Data source operator cancelled"));
        }
    }
  • The invoke method of DataSourceTask calls format.nextrecord (serializer.createinstance ()) to pull the data as long as it is not taskCanceled and format.reachedEnd (), and then executes output.collect(returned)
  • The format here is CsvInputFormat (PojoCsvInputFormatHowever, nextRecord and the reachedEnd method are the parent class of the call DelimitedInputFormat.
  • PojoCsvInputFormat inherits the abstract class CsvInputFormat, while CsvInputFormat inherits the abstract class GenericCsvInputFormat, while GenericCsvInputFormat inherits the abstract class DelimitedInputFormat

DelimitedInputFormat

flink-core-1.6.2-sources.jar! /org/apache/flink/api/common/io/DelimitedInputFormat.java

    /**
     * The default read buffer size = 1MB.
     */
    private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024;

    private transient byte[] readBuffer;

    private int bufferSize = -1;

    private void initBuffers() {
        this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize;

        if (this.bufferSize <= this.delimiter.length) {
            throw new IllegalArgumentException("Buffer size must be greater than length of delimiter.");
        }

        if (this.readBuffer == null || this.readBuffer.length != this.bufferSize) {
            this.readBuffer = new byte[this.bufferSize];
        }
        if (this.wrapBuffer == null || this.wrapBuffer.length < 256) {
            this.wrapBuffer = new byte[256];
        }

        this.readPos = 0;
        this.limit = 0;
        this.overLimit = false;
        this.end = false;
    }

    /**
     * Checks whether the current split is at its end.
     * 
     * @return True, if the split is at its end, false otherwise.
     */
    @Override
    public boolean reachedEnd() {
        return this.end;
    }
    
    @Override
    public OT nextRecord(OT record) throws IOException {
        if (readLine()) {
            return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
        } else {
            this.end = true;
            return null;
        }
    }

    /**
     * Fills the read buffer with bytes read from the file starting from an offset.
     */
    private boolean fillBuffer(int offset) throws IOException {
        int maxReadLength = this.readBuffer.length - offset;
        // special case for reading the whole split.
        if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
            int read = this.stream.read(this.readBuffer, offset, maxReadLength);
            if (read == -1) {
                this.stream.close();
                this.stream = null;
                return false;
            } else {
                this.readPos = offset;
                this.limit = read;
                return true;
            }
        }
        
        // else ..
        int toRead;
        if (this.splitLength > 0) {
            // if we have more data, read that
            toRead = this.splitLength > maxReadLength ? maxReadLength : (int) this.splitLength;
        }
        else {
            // if we have exhausted our split, we need to complete the current record, or read one
            // more across the next split.
            // the reason is that the next split will skip over the beginning until it finds the first
            // delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the
            // previous split.
            toRead = maxReadLength;
            this.overLimit = true;
        }

        int read = this.stream.read(this.readBuffer, offset, toRead);

        if (read == -1) {
            this.stream.close();
            this.stream = null;
            return false;
        } else {
            this.splitLength -= read;
            this.readPos = offset; // position from where to start reading
            this.limit = read + offset; // number of valid bytes in the read buffer
            return true;
        }
    }
  • DelimitedInputFormat first calls readLine () to read data to currBuffer. If there is data, it calls readRecord method implemented by subclass CsvInputFormat, which passes currBuffer, currOffset, currLen
  • In DelimitedInputFormat’s readLine () method, the fillBuffer method is called, which is based on splitLength (The length of FileInputSplit in the deletedinputformat.getstatistics method) and maxReadLength to determine toRead, then read data from file to readBuffer from offset to toRead, and then set currBuffer, currOffset, currLen
  • ReadBuffer will set bufferSize when init, bufferSize will be -1 when initialization, it is set to 4 * 1024 in getStatistics method, and default _ readbuffer _ size is 1024*1024

CsvInputFormat.readRecord

flink-java-1.6.2-sources.jar! /org/apache/flink/api/java/io/CsvInputFormat.java

    @Override
    public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
        /*
         * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
         */
        // Found window's end line, so find carriage return before the newline
        if (this.lineDelimiterIsLinebreak && numBytes > 0 && bytes[offset + numBytes - 1] == '\r') {
            //reduce the number of bytes so that the Carriage return is not taken as data
            numBytes--;
        }

        if (commentPrefix != null && commentPrefix.length <= numBytes) {
            //check record for comments
            boolean isComment = true;
            for (int i = 0; i < commentPrefix.length; i++) {
                if (commentPrefix[i] != bytes[offset + i]) {
                    isComment = false;
                    break;
                }
            }
            if (isComment) {
                this.commentCount++;
                return null;
            }
        }

        if (parseRecord(parsedValues, bytes, offset, numBytes)) {
            return fillRecord(reuse, parsedValues);
        } else {
            this.invalidLineCount++;
            return null;
        }
    }
  • CsvInputFormat’s readRecord method is responsible for reading the original data, and then parseRecord method is used to parse the original data and populate it into parsedValues (Object[]After that, call the subclass’s fillRecord method (This is PojoCsvInputFormat) populates parsedValues with reuse objects (This object is serializer.createInstance () that DataSourceTask passed in when calling format.nextRecord)

PojoCsvInputFormat.fillRecord

flink-java-1.6.2-sources.jar! /org/apache/flink/api/java/io/PojoCsvInputFormat.java

/**
 * Input format that reads csv into POJOs.
 * @param <OUT> resulting POJO type
 */
@Internal
public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> {

    //......

    @Override
    public void open(FileInputSplit split) throws IOException {
        super.open(split);

        pojoFields = new Field[pojoFieldNames.length];

        Map<String, Field> allFields = new HashMap<String, Field>();

        findAllFields(pojoTypeClass, allFields);

        for (int i = 0; i < pojoFieldNames.length; i++) {
            pojoFields[i] = allFields.get(pojoFieldNames[i]);

            if (pojoFields[i] != null) {
                pojoFields[i].setAccessible(true);
            } else {
                throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName());
            }
        }
    }

    @Override
    public OUT fillRecord(OUT reuse, Object[] parsedValues) {
        for (int i = 0; i < parsedValues.length; i++) {
            try {
                pojoFields[i].set(reuse, parsedValues[i]);
            } catch (IllegalAccessException e) {
                throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e);
            }
        }
        return reuse;
    }

    //......
}
  • PojoCsvInputFormat’s open method is used to call when executePlan of executor, using reflection to obtain the required Field in advance.
  • The fillRecord method here uses only reflection to set parsedValues to pojo.
  • If the reflection setting is unsuccessful, an IllegalAccessException exception is thrown.

CountingCollector.collect

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/operators/util/metrics/CountingCollector.java

public class CountingCollector<OUT> implements Collector<OUT> {
    private final Collector<OUT> collector;
    private final Counter numRecordsOut;

    public CountingCollector(Collector<OUT> collector, Counter numRecordsOut) {
        this.collector = collector;
        this.numRecordsOut = numRecordsOut;
    }

    @Override
    public void collect(OUT record) {
        this.numRecordsOut.inc();
        this.collector.collect(record);
    }

    @Override
    public void close() {
        this.collector.close();
    }
}
  • The collector here is org.apache.flink.runtime.operators.chaining.chainedmapdriver.

ChainedMapDriver

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java

    @Override
    public void collect(IT record) {
        try {
            this.numRecordsIn.inc();
            this.outputCollector.collect(this.mapper.map(record));
        } catch (Exception ex) {
            throw new ExceptionInChainedStubException(this.taskName, ex);
        }
    }
  • Here, we will first call mapper’s map method, execute map logic, and then call outputCollector.collect to send the result
  • The outputCollector here is a CountingCollector, and the collector wrapped in it is org.Apache.flink.runtime.operators.shipping.outputcollector.

OutputCollector

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/operators/shipping/OutputCollector.java

    /**
     * Collects a record and emits it to all writers.
     */
    @Override
    public void collect(T record)  {
        if (record != null) {
            this.delegate.setInstance(record);
            try {
                for (RecordWriter<SerializationDelegate<T>> writer : writers) {
                    writer.emit(this.delegate);
                }
            }
            catch (IOException e) {
                throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e);
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Emitting the record was interrupted: " + e.getMessage(), e);
            }
        }
        else {
            throw new NullPointerException("The system does not support records that are null."
                                + "Null values are only supported as fields inside other objects.");
        }
    }
  • The emit method of RecordWriter is called here to transmit data.

RecordWriter

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/io/network/api/writer/RecordWriter.java

    public void emit(T record) throws IOException, InterruptedException {
        for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
            sendToTarget(record, targetChannel);
        }
    }
  • Here, the targetChannel to be sent is returned through channelSelector.selectChannels, where channelSelector is OutputEmitter.

OutputEmitter

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/operators/shipping/OutputEmitter.java

    @Override
    public final int[] selectChannels(SerializationDelegate<T> record, int numberOfChannels) {
        switch (strategy) {
        case FORWARD:
            return forward();
        case PARTITION_RANDOM:
        case PARTITION_FORCED_REBALANCE:
            return robin(numberOfChannels);
        case PARTITION_HASH:
            return hashPartitionDefault(record.getInstance(), numberOfChannels);
        case BROADCAST:
            return broadcast(numberOfChannels);
        case PARTITION_CUSTOM:
            return customPartition(record.getInstance(), numberOfChannels);
        case PARTITION_RANGE:
            return rangePartition(record.getInstance(), numberOfChannels);
        default:
            throw new UnsupportedOperationException("Unsupported distribution strategy: " + strategy.name());
        }
    }

    private int[] forward() {
        return this.channels;
    }
  • Strategy here is FORWARD.

Summary

  • The inputFormat created by CsvReader is PojoCsvInputFormat. its main method is fillRecord, which fills data by reflection. while the data is read in DelimitedInputFormat’s readLine method, it calls the fillBuffer method, which is based on splitLength (The length of FileInputSplit in the deletedinputformat.getstatistics method) and maxReadLength to determine toRead, and then from offset to to toRead to read data from the file into the readBuffer.
  • The DataSourceTask repeatedly calls format.nextRecord in the invoke method, and then calls the output.collect method (CountingCollector wrapped with org.apache.flink.runtime.operators.shipping.outputcollector) until taskCanceled or format.reachedEnd ()
  • Output.collect method, where output is CountingCollector, and its proxy collector is ChainedMapDriver; ; ChainedMapDriver will map the read data and finally pass the result of the map to the CountingCollector, which represents the OutputCollector. OutputCollector uses RecordWriter to transmit the data.

doc