Talk about flink’s TextOutputFormat

  flink

Order

This article mainly studies flink’s TextOutputFormat

DataStream.writeAsText

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/DataStream.java

    /**
     * Writes a DataStream to the file specified by path in text format.
     *
     * <p>For every element of the DataStream the result of {@link Object#toString()} is written.
     *
     * @param path
     *            The path pointing to the location the text file is written to
     * @param writeMode
     *            Controls the behavior for existing files. Options are
     *            NO_OVERWRITE and OVERWRITE.
     *
     * @return The closed DataStream.
     */
    @PublicEvolving
    public DataStreamSink<T> writeAsText(String path, WriteMode writeMode) {
        TextOutputFormat<T> tof = new TextOutputFormat<>(new Path(path));
        tof.setWriteMode(writeMode);
        return writeUsingOutputFormat(tof);
    }

    /**
     * Writes the dataStream into an output, described by an OutputFormat.
     *
     * <p>The output is not participating in Flink's checkpointing!
     *
     * <p>For writing to a file system periodically, the use of the "flink-connector-filesystem"
     * is recommended.
     *
     * @param format The output format
     * @return The closed DataStream
     */
    @PublicEvolving
    public DataStreamSink<T> writeUsingOutputFormat(OutputFormat<T> format) {
        return addSink(new OutputFormatSinkFunction<>(format));
    }
  • DataStream’s writeAsText method creates TextOutputFormat, which is then wrapped as a sink function by OutputFormatSinkFunction

TextOutputFormat

flink-java-1.7.0-sources.jar! /org/apache/flink/api/java/io/TextOutputFormat.java

/**
 * A {@link FileOutputFormat} that writes objects to a text file.
 *
 * <p>Objects are converted to Strings using either {@link Object#toString()} or a {@link TextFormatter}.
 * @param <T> type of elements
 */
@PublicEvolving
public class TextOutputFormat<T> extends FileOutputFormat<T> {

    private static final long serialVersionUID = 1L;

    private static final int NEWLINE = '\n';

    private String charsetName;

    private transient Charset charset;

    // --------------------------------------------------------------------------------------------


    /**
     * Formatter that transforms values into their {@link String} representations.
     * @param <IN> type of input elements
     */
    public interface TextFormatter<IN> extends Serializable {
        String format(IN value);
    }

    public TextOutputFormat(Path outputPath) {
        this(outputPath, "UTF-8");
    }

    public TextOutputFormat(Path outputPath, String charset) {
        super(outputPath);
        this.charsetName = charset;
    }

    public String getCharsetName() {
        return charsetName;
    }

    public void setCharsetName(String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
        if (charsetName == null) {
            throw new NullPointerException();
        }

        if (!Charset.isSupported(charsetName)) {
            throw new UnsupportedCharsetException("The charset " + charsetName + " is not supported.");
        }

        this.charsetName = charsetName;
    }

    // --------------------------------------------------------------------------------------------

    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
        super.open(taskNumber, numTasks);

        try {
            this.charset = Charset.forName(charsetName);
        }
        catch (IllegalCharsetNameException e) {
            throw new IOException("The charset " + charsetName + " is not valid.", e);
        }
        catch (UnsupportedCharsetException e) {
            throw new IOException("The charset " + charsetName + " is not supported.", e);
        }
    }

    @Override
    public void writeRecord(T record) throws IOException {
        byte[] bytes = record.toString().getBytes(charset);
        this.stream.write(bytes);
        this.stream.write(NEWLINE);
    }

    // --------------------------------------------------------------------------------------------

    @Override
    public String toString() {
        return "TextOutputFormat (" + getOutputFilePath() + ") - " + this.charsetName;
    }
}
  • TextOutputFormat inherits FileOutputFormat. its open method is mainly to call FileOutputFormat’s open method, while writeRecord method writes directly to stream, and writes a new line after writing a record (\n)

FileOutputFormat

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/io/FileOutputFormat.java

/**
 * The abstract base class for all Rich output formats that are file based. Contains the logic to
 * open/close the target
 * file streams.
 */
@Public
public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implements InitializeOnMaster, CleanupWhenUnsuccessful {
    //......

    /**
     * Initialization of the distributed file system if it is used.
     *
     * @param parallelism The task parallelism.
     */
    @Override
    public void initializeGlobal(int parallelism) throws IOException {
        final Path path = getOutputFilePath();
        final FileSystem fs = path.getFileSystem();
        
        // only distributed file systems can be initialized at start-up time.
        if (fs.isDistributedFS()) {
            
            final WriteMode writeMode = getWriteMode();
            final OutputDirectoryMode outDirMode = getOutputDirectoryMode();

            if (parallelism == 1 && outDirMode == OutputDirectoryMode.PARONLY) {
                // output is not written in parallel and should be written to a single file.
                // prepare distributed output path
                if(!fs.initOutPathDistFS(path, writeMode, false)) {
                    // output preparation failed! Cancel task.
                    throw new IOException("Output path could not be initialized.");
                }

            } else {
                // output should be written to a directory

                // only distributed file systems can be initialized at start-up time.
                if(!fs.initOutPathDistFS(path, writeMode, true)) {
                    throw new IOException("Output directory could not be created.");
                }
            }
        }
    }
    
    @Override
    public void tryCleanupOnError() {
        if (this.fileCreated) {
            this.fileCreated = false;
            
            try {
                close();
            } catch (IOException e) {
                LOG.error("Could not properly close FileOutputFormat.", e);
            }

            try {
                FileSystem.get(this.actualFilePath.toUri()).delete(actualFilePath, false);
            } catch (FileNotFoundException e) {
                // ignore, may not be visible yet or may be already removed
            } catch (Throwable t) {
                LOG.error("Could not remove the incomplete file " + actualFilePath + '.', t);
            }
        }
    }

    @Override
    public void configure(Configuration parameters) {
        // get the output file path, if it was not yet set
        if (this.outputFilePath == null) {
            // get the file parameter
            String filePath = parameters.getString(FILE_PARAMETER_KEY, null);
            if (filePath == null) {
                throw new IllegalArgumentException("The output path has been specified neither via constructor/setters" +
                        ", nor via the Configuration.");
            }
            
            try {
                this.outputFilePath = new Path(filePath);
            }
            catch (RuntimeException rex) {
                throw new RuntimeException("Could not create a valid URI from the given file path name: " + rex.getMessage()); 
            }
        }
        
        // check if have not been set and use the defaults in that case
        if (this.writeMode == null) {
            this.writeMode = DEFAULT_WRITE_MODE;
        }
        
        if (this.outputDirectoryMode == null) {
            this.outputDirectoryMode = DEFAULT_OUTPUT_DIRECTORY_MODE;
        }
    }

    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
        if (taskNumber < 0 || numTasks < 1) {
            throw new IllegalArgumentException("TaskNumber: " + taskNumber + ", numTasks: " + numTasks);
        }
        
        if (LOG.isDebugEnabled()) {
            LOG.debug("Opening stream for output (" + (taskNumber+1) + "/" + numTasks + "). WriteMode=" + writeMode +
                    ", OutputDirectoryMode=" + outputDirectoryMode);
        }
        
        Path p = this.outputFilePath;
        if (p == null) {
            throw new IOException("The file path is null.");
        }
        
        final FileSystem fs = p.getFileSystem();

        // if this is a local file system, we need to initialize the local output directory here
        if (!fs.isDistributedFS()) {
            
            if (numTasks == 1 && outputDirectoryMode == OutputDirectoryMode.PARONLY) {
                // output should go to a single file
                
                // prepare local output path. checks for write mode and removes existing files in case of OVERWRITE mode
                if(!fs.initOutPathLocalFS(p, writeMode, false)) {
                    // output preparation failed! Cancel task.
                    throw new IOException("Output path '" + p.toString() + "' could not be initialized. Canceling task...");
                }
            }
            else {
                // numTasks > 1 || outDirMode == OutputDirectoryMode.ALWAYS
                
                if(!fs.initOutPathLocalFS(p, writeMode, true)) {
                    // output preparation failed! Cancel task.
                    throw new IOException("Output directory '" + p.toString() + "' could not be created. Canceling task...");
                }
            }
        }



        // Suffix the path with the parallel instance index, if needed
        this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) ? p.suffix("/" + getDirectoryFileName(taskNumber)) : p;

        // create output file
        this.stream = fs.create(this.actualFilePath, writeMode);
        
        // at this point, the file creation must have succeeded, or an exception has been thrown
        this.fileCreated = true;
    }

    @Override
    public void close() throws IOException {
        final FSDataOutputStream s = this.stream;
        if (s != null) {
            this.stream = null;
            s.close();
        }
    }
}
  • FileOutputFormat inherits RichOutputFormat and implements InitializeOnMaster (InitializeGlobal method)、CleanupWhenUnsuccessful(TryCleanupOnError method) interface
  • InitializeGlobal is mainly a judgment. If the file is a distributed system file, it will be initialized globally at startup. The tryCleanupOnError method close before deleting the file
  • FileOutputFormat also implements the configure, open, and close methods of the OutputFormat interface, while the writeRecord method is implemented by subclasses. The configure method mainly configures the attributes outputFilePath, writeMode, outputDirectoryMode; The open method gets the actualFilePath (If numTasks is greater than 1, add a file under the configured outputFilePath directory according to tasknumber, with the file name being the value +1 corresponding to tasknumber), and then create a stream;; The close method only needs to close the stream.

RichOutputFormat

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/io/RichOutputFormat.java

/**
 * An abstract stub implementation for Rich output formats.
 * Rich formats have access to their runtime execution context via {@link #getRuntimeContext()}.
 */
@Public
public abstract class RichOutputFormat<IT> implements OutputFormat<IT> {
    
    private static final long serialVersionUID = 1L;
    
    // --------------------------------------------------------------------------------------------
    //  Runtime context access
    // --------------------------------------------------------------------------------------------
    
    private transient RuntimeContext runtimeContext;

    public void setRuntimeContext(RuntimeContext t) {
        this.runtimeContext = t;
    }
    
    public RuntimeContext getRuntimeContext() {
        if (this.runtimeContext != null) {
            return this.runtimeContext;
        } else {
            throw new IllegalStateException("The runtime context has not been initialized yet. Try accessing " +
                    "it in one of the other life cycle methods.");
        }
    }
}
  • RichOutputFormat declares the implementation of the OutputFormat interface, which mainly adds the RuntimeContext attribute.

OutputFormat

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/io/OutputFormat.java

/**
 * The base interface for outputs that consumes records. The output format
 * describes how to store the final records, for example in a file.
 * <p>
 * The life cycle of an output format is the following:
 * <ol>
 *   <li>configure() is invoked a single time. The method can be used to implement initialization from
 *       the parameters (configuration) that may be attached upon instantiation.</li>
 *   <li>Each parallel output task creates an instance, configures it and opens it.</li>
 *   <li>All records of its parallel instance are handed to the output format.</li>
 *   <li>The output format is closed</li>
 * </ol>
 * 
 * @param <IT> The type of the consumed records. 
 */
@Public
public interface OutputFormat<IT> extends Serializable {
    
    /**
     * Configures this output format. Since output formats are instantiated generically and hence parameterless, 
     * this method is the place where the output formats set their basic fields based on configuration values.
     * <p>
     * This method is always called first on a newly instantiated output format. 
     *  
     * @param parameters The configuration with all parameters.
     */
    void configure(Configuration parameters);
    
    /**
     * Opens a parallel instance of the output format to store the result of its parallel instance.
     * <p>
     * When this method is called, the output format it guaranteed to be configured.
     * 
     * @param taskNumber The number of the parallel instance.
     * @param numTasks The number of parallel tasks.
     * @throws IOException Thrown, if the output could not be opened due to an I/O problem.
     */
    void open(int taskNumber, int numTasks) throws IOException;
    
    
    /**
     * Adds a record to the output.
     * <p>
     * When this method is called, the output format it guaranteed to be opened.
     * 
     * @param record The records to add to the output.
     * @throws IOException Thrown, if the records could not be added to to an I/O problem.
     */
    void writeRecord(IT record) throws IOException;
    
    /**
     * Method that marks the end of the life-cycle of parallel output instance. Should be used to close
     * channels and streams and release resources.
     * After this method returns without an error, the output is assumed to be correct.
     * <p>
     * When this method is called, the output format it guaranteed to be opened.
     *  
     * @throws IOException Thrown, if the input could not be closed properly.
     */
    void close() throws IOException;
}
  • The OutputFormat interface defines the configure, open, writeRecord, close methods.

Summary

  • DataStream’s writeAsText method creates TextOutputFormat, which is then wrapped as a sink function by OutputFormatSinkFunction
  • TextOutputFormat inherits FileOutputFormat. its open method is mainly to call FileOutputFormat’s open method, while writeRecord method writes directly to stream, and writes a new line after writing a record (\n)
  • FileOutputFormat inherits RichOutputFormat and implements InitializeOnMaster (InitializeGlobal method)、CleanupWhenUnsuccessful(TryCleanupOnError method) interface and the configure, open, close methods of the OutputFormat interface, while the writeRecord method is implemented by subclasses;
  • FileOutputFormat’s open method gets the actualFilePath (If numTasks is greater than 1, add a file under the configured outputFilePath directory according to tasknumber, with the file name being the value +1 corresponding to tasknumber), and then create a stream.
  • RichOutputFormat declares the implementation of the OutputFormat interface, which mainly adds the RuntimeContext attribute; The OutputFormat interface defines the configure, open, writeRecord, close methods.

doc