Talk about flink’s FileSystem

  flink

Order

This article mainly studies flink’s FileSystem.

FileSystem

flink-1.7.2/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java

@Public
public abstract class FileSystem {

    /**
     * The possible write modes. The write mode decides what happens if a file should be created,
     * but already exists.
     */
    public enum WriteMode {

        /** Creates the target file only if no file exists at that path already.
         * Does not overwrite existing files and directories. */
        NO_OVERWRITE,

        /** Creates a new target file regardless of any existing files or directories.
         * Existing files and directories will be deleted (recursively) automatically before
         * creating the new file. */
        OVERWRITE
    }

    // ------------------------------------------------------------------------

    /** Logger for all FileSystem work. */
    private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);

    /** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
     * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races. */
    private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true);

    /** Object used to protect calls to specific methods.*/
    private static final ReentrantLock LOCK = new ReentrantLock(true);

    /** Cache for file systems, by scheme + authority. */
    private static final HashMap<FSKey, FileSystem> CACHE = new HashMap<>();

    /** All available file system factories. */
    private static final List<FileSystemFactory> RAW_FACTORIES = loadFileSystems();

    /** Mapping of file system schemes to the corresponding factories,
     * populated in {@link FileSystem#initialize(Configuration)}. */
    private static final HashMap<String, FileSystemFactory> FS_FACTORIES = new HashMap<>();

    /** The default factory that is used when no scheme matches. */
    private static final FileSystemFactory FALLBACK_FACTORY = loadHadoopFsFactory();

    /** The default filesystem scheme to be used, configured during process-wide initialization.
     * This value defaults to the local file systems scheme {@code 'file:///'} or {@code 'file:/'}. */
    private static URI defaultScheme;

    //......

    // ------------------------------------------------------------------------
    //  Initialization
    // ------------------------------------------------------------------------

    /**
     * Initializes the shared file system settings.
     *
     * <p>The given configuration is passed to each file system factory to initialize the respective
     * file systems. Because the configuration of file systems may be different subsequent to the call
     * of this method, this method clears the file system instance cache.
     *
     * <p>This method also reads the default file system URI from the configuration key
     * {@link CoreOptions#DEFAULT_FILESYSTEM_SCHEME}. All calls to {@link FileSystem#get(URI)} where
     * the URI has no scheme will be interpreted as relative to that URI.
     * As an example, assume the default file system URI is set to {@code 'hdfs://localhost:9000/'}.
     * A file path of {@code '/user/USERNAME/in.txt'} is interpreted as
     * {@code 'hdfs://localhost:9000/user/USERNAME/in.txt'}.
     *
     * @param config the configuration from where to fetch the parameter.
     */
    public static void initialize(Configuration config) throws IOException, IllegalConfigurationException {
        LOCK.lock();
        try {
            // make sure file systems are re-instantiated after re-configuration
            CACHE.clear();
            FS_FACTORIES.clear();

            // configure all file system factories
            for (FileSystemFactory factory : RAW_FACTORIES) {
                factory.configure(config);
                String scheme = factory.getScheme();

                FileSystemFactory fsf = ConnectionLimitingFactory.decorateIfLimited(factory, scheme, config);
                FS_FACTORIES.put(scheme, fsf);
            }

            // configure the default (fallback) factory
            FALLBACK_FACTORY.configure(config);

            // also read the default file system scheme
            final String stringifiedUri = config.getString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, null);
            if (stringifiedUri == null) {
                defaultScheme = null;
            }
            else {
                try {
                    defaultScheme = new URI(stringifiedUri);
                }
                catch (URISyntaxException e) {
                    throw new IllegalConfigurationException("The default file system scheme ('" +
                            CoreOptions.DEFAULT_FILESYSTEM_SCHEME + "') is invalid: " + stringifiedUri, e);
                }
            }
        }
        finally {
            LOCK.unlock();
        }
    }

    // ------------------------------------------------------------------------
    //  Obtaining File System Instances
    // ------------------------------------------------------------------------

    public static FileSystem getLocalFileSystem() {
        return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(LocalFileSystem.getSharedInstance());
    }

    public static FileSystem get(URI uri) throws IOException {
        return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri));
    }

    @Internal
    public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOException {
        checkNotNull(fsUri, "file system URI");

        LOCK.lock();
        try {
            final URI uri;

            if (fsUri.getScheme() != null) {
                uri = fsUri;
            }
            else {
                // Apply the default fs scheme
                final URI defaultUri = getDefaultFsUri();
                URI rewrittenUri = null;

                try {
                    rewrittenUri = new URI(defaultUri.getScheme(), null, defaultUri.getHost(),
                            defaultUri.getPort(), fsUri.getPath(), null, null);
                }
                catch (URISyntaxException e) {
                    // for local URIs, we make one more try to repair the path by making it absolute
                    if (defaultUri.getScheme().equals("file")) {
                        try {
                            rewrittenUri = new URI(
                                    "file", null,
                                    new Path(new File(fsUri.getPath()).getAbsolutePath()).toUri().getPath(),
                                    null);
                        } catch (URISyntaxException ignored) {
                            // could not help it...
                        }
                    }
                }

                if (rewrittenUri != null) {
                    uri = rewrittenUri;
                }
                else {
                    throw new IOException("The file system URI '" + fsUri +
                            "' declares no scheme and cannot be interpreted relative to the default file system URI ("
                            + defaultUri + ").");
                }
            }

            // print a helpful pointer for malformed local URIs (happens a lot to new users)
            if (uri.getScheme().equals("file") && uri.getAuthority() != null && !uri.getAuthority().isEmpty()) {
                String supposedUri = "file:///" + uri.getAuthority() + uri.getPath();

                throw new IOException("Found local file path with authority '" + uri.getAuthority() + "' in path '"
                        + uri.toString() + "'. Hint: Did you forget a slash? (correct path would be '" + supposedUri + "')");
            }

            final FSKey key = new FSKey(uri.getScheme(), uri.getAuthority());

            // See if there is a file system object in the cache
            {
                FileSystem cached = CACHE.get(key);
                if (cached != null) {
                    return cached;
                }
            }

            // this "default" initialization makes sure that the FileSystem class works
            // even when not configured with an explicit Flink configuration, like on
            // JobManager or TaskManager setup
            if (FS_FACTORIES.isEmpty()) {
                initialize(new Configuration());
            }

            // Try to create a new file system
            final FileSystem fs;
            final FileSystemFactory factory = FS_FACTORIES.get(uri.getScheme());

            if (factory != null) {
                fs = factory.create(uri);
            }
            else {
                try {
                    fs = FALLBACK_FACTORY.create(uri);
                }
                catch (UnsupportedFileSystemSchemeException e) {
                    throw new UnsupportedFileSystemSchemeException(
                            "Could not find a file system implementation for scheme '" + uri.getScheme() +
                                    "'. The scheme is not directly supported by Flink and no Hadoop file " +
                                    "system to support this scheme could be loaded.", e);
                }
            }

            CACHE.put(key, fs);
            return fs;
        }
        finally {
            LOCK.unlock();
        }
    }

    public static URI getDefaultFsUri() {
        return defaultScheme != null ? defaultScheme : LocalFileSystem.getLocalFsURI();
    }

    // ------------------------------------------------------------------------
    //  File System Methods
    // ------------------------------------------------------------------------

    public abstract Path getWorkingDirectory();

    public abstract Path getHomeDirectory();

    public abstract URI getUri();

    public abstract FileStatus getFileStatus(Path f) throws IOException;

    public abstract BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException;

    public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException;

    public abstract FSDataInputStream open(Path f) throws IOException;

    public RecoverableWriter createRecoverableWriter() throws IOException {
        throw new UnsupportedOperationException("This file system does not support recoverable writers.");
    }

    public abstract FileStatus[] listStatus(Path f) throws IOException;

    public boolean exists(final Path f) throws IOException {
        try {
            return (getFileStatus(f) != null);
        } catch (FileNotFoundException e) {
            return false;
        }
    }

    public abstract boolean delete(Path f, boolean recursive) throws IOException;

    public abstract boolean mkdirs(Path f) throws IOException;


    public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException;

    public abstract boolean rename(Path src, Path dst) throws IOException;

    public abstract boolean isDistributedFS();

    public abstract FileSystemKind getKind();

    // ------------------------------------------------------------------------
    //  output directory initialization
    // ------------------------------------------------------------------------

    public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
        if (isDistributedFS()) {
            return false;
        }

        // NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that
        // concurrently work in this method (multiple output formats writing locally) might end
        // up deleting each other's directories and leave non-retrievable files, without necessarily
        // causing an exception. That results in very subtle issues, like output files looking as if
        // they are not getting created.

        // we acquire the lock interruptibly here, to make sure that concurrent threads waiting
        // here can cancel faster
        try {
            OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly();
        }
        catch (InterruptedException e) {
            // restore the interruption state
            Thread.currentThread().interrupt();

            // leave the method - we don't have the lock anyways
            throw new IOException("The thread was interrupted while trying to initialize the output directory");
        }

        try {
            FileStatus status;
            try {
                status = getFileStatus(outPath);
            }
            catch (FileNotFoundException e) {
                // okay, the file is not there
                status = null;
            }

            // check if path exists
            if (status != null) {
                // path exists, check write mode
                switch (writeMode) {

                case NO_OVERWRITE:
                    if (status.isDir() && createDirectory) {
                        return true;
                    } else {
                        // file may not be overwritten
                        throw new IOException("File or directory " + outPath + " already exists. Existing files and directories " +
                                "are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " +
                                WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories.");
                    }

                case OVERWRITE:
                    if (status.isDir()) {
                        if (createDirectory) {
                            // directory exists and does not need to be created
                            return true;
                        } else {
                            // we will write in a single file, delete directory
                            try {
                                delete(outPath, true);
                            }
                            catch (IOException e) {
                                throw new IOException("Could not remove existing directory '" + outPath +
                                        "' to allow overwrite by result file", e);
                            }
                        }
                    }
                    else {
                        // delete file
                        try {
                            delete(outPath, false);
                        }
                        catch (IOException e) {
                            throw new IOException("Could not remove existing file '" + outPath +
                                    "' to allow overwrite by result file/directory", e);
                        }
                    }
                    break;

                default:
                    throw new IllegalArgumentException("Invalid write mode: " + writeMode);
                }
            }

            if (createDirectory) {
                // Output directory needs to be created
                if (!exists(outPath)) {
                    mkdirs(outPath);
                }

                // double check that the output directory exists
                try {
                    return getFileStatus(outPath).isDir();
                }
                catch (FileNotFoundException e) {
                    return false;
                }
            }
            else {
                // check that the output path does not exist and an output file
                // can be created by the output format.
                return !exists(outPath);
            }
        }
        finally {
            OUTPUT_DIRECTORY_INIT_LOCK.unlock();
        }
    }

    public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
        if (!isDistributedFS()) {
            return false;
        }

        // NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that
        // concurrently work in this method (multiple output formats writing locally) might end
        // up deleting each other's directories and leave non-retrievable files, without necessarily
        // causing an exception. That results in very subtle issues, like output files looking as if
        // they are not getting created.

        // we acquire the lock interruptibly here, to make sure that concurrent threads waiting
        // here can cancel faster
        try {
            OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly();
        }
        catch (InterruptedException e) {
            // restore the interruption state
            Thread.currentThread().interrupt();

            // leave the method - we don't have the lock anyways
            throw new IOException("The thread was interrupted while trying to initialize the output directory");
        }

        try {
            // check if path exists
            if (exists(outPath)) {
                // path exists, check write mode
                switch(writeMode) {

                case NO_OVERWRITE:
                    // file or directory may not be overwritten
                    throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " +
                            WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
                                " mode to overwrite existing files and directories.");

                case OVERWRITE:
                    // output path exists. We delete it and all contained files in case of a directory.
                    try {
                        delete(outPath, true);
                    } catch (IOException e) {
                        // Some other thread might already have deleted the path.
                        // If - for some other reason - the path could not be deleted,
                        // this will be handled later.
                    }
                    break;

                default:
                    throw new IllegalArgumentException("Invalid write mode: " + writeMode);
                }
            }

            if (createDirectory) {
                // Output directory needs to be created
                try {
                    if (!exists(outPath)) {
                        mkdirs(outPath);
                    }
                } catch (IOException ioe) {
                    // Some other thread might already have created the directory.
                    // If - for some other reason - the directory could not be created
                    // and the path does not exist, this will be handled later.
                }

                // double check that the output directory exists
                return exists(outPath) && getFileStatus(outPath).isDir();
            }
            else {
                // single file case: check that the output path does not exist and
                // an output file can be created by the output format.
                return !exists(outPath);
            }
        }
        finally {
            OUTPUT_DIRECTORY_INIT_LOCK.unlock();
        }
    }

    //......
}
  • FileSystem is the abstract base class of file systems used by flink. Subclasses can implement local file systems or distributed file systems
  • FileSystem defines the abstract methods of getWorkingDirectory, getHomeDirectory, getUri, getFileStatus, getFileBlockLocations, open, listStatus, delete, mkdirs, create, rename, isDistributedFS, getKind, which require subclass implementation.
  • FileSystem provides several realized instance methods of initOutPathLocalFS and initOutPathDistFS, as well as several static methods of initialize, getLocalFileSystem, get, getUnguardedFileSystem, getDefaultFsUri

LocalFileSystem

flink-1.7.2/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java

@Internal
public class LocalFileSystem extends FileSystem {

    private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystem.class);

    /** The URI representing the local file system. */
    private static final URI LOCAL_URI = OperatingSystem.isWindows() ? URI.create("file:/") : URI.create("file:///");

    /** The shared instance of the local file system. */
    private static final LocalFileSystem INSTANCE = new LocalFileSystem();

    /** Path pointing to the current working directory.
     * Because Paths are not immutable, we cannot cache the proper path here */
    private final URI workingDir;

    /** Path pointing to the current working directory.
     * Because Paths are not immutable, we cannot cache the proper path here. */
    private final URI homeDir;

    /** The host name of this machine. */
    private final String hostName;

    /**
     * Constructs a new <code>LocalFileSystem</code> object.
     */
    public LocalFileSystem() {
        this.workingDir = new File(System.getProperty("user.dir")).toURI();
        this.homeDir = new File(System.getProperty("user.home")).toURI();

        String tmp = "unknownHost";
        try {
            tmp = InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            LOG.error("Could not resolve local host", e);
        }
        this.hostName = tmp;
    }

    // ------------------------------------------------------------------------

    @Override
    public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
        return new BlockLocation[] {
                new LocalBlockLocation(hostName, file.getLen())
        };
    }

    @Override
    public FileStatus getFileStatus(Path f) throws IOException {
        final File path = pathToFile(f);
        if (path.exists()) {
            return new LocalFileStatus(path, this);
        }
        else {
            throw new FileNotFoundException("File " + f + " does not exist or the user running "
                    + "Flink ('" + System.getProperty("user.name") + "') has insufficient permissions to access it.");
        }
    }

    @Override
    public URI getUri() {
        return LOCAL_URI;
    }

    @Override
    public Path getWorkingDirectory() {
        return new Path(workingDir);
    }

    @Override
    public Path getHomeDirectory() {
        return new Path(homeDir);
    }

    @Override
    public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
        return open(f);
    }

    @Override
    public FSDataInputStream open(final Path f) throws IOException {
        final File file = pathToFile(f);
        return new LocalDataInputStream(file);
    }

    @Override
    public LocalRecoverableWriter createRecoverableWriter() throws IOException {
        return new LocalRecoverableWriter(this);
    }

    @Override
    public boolean exists(Path f) throws IOException {
        final File path = pathToFile(f);
        return path.exists();
    }

    @Override
    public FileStatus[] listStatus(final Path f) throws IOException {

        final File localf = pathToFile(f);
        FileStatus[] results;

        if (!localf.exists()) {
            return null;
        }
        if (localf.isFile()) {
            return new FileStatus[] { new LocalFileStatus(localf, this) };
        }

        final String[] names = localf.list();
        if (names == null) {
            return null;
        }
        results = new FileStatus[names.length];
        for (int i = 0; i < names.length; i++) {
            results[i] = getFileStatus(new Path(f, names[i]));
        }

        return results;
    }

    @Override
    public boolean delete(final Path f, final boolean recursive) throws IOException {

        final File file = pathToFile(f);
        if (file.isFile()) {
            return file.delete();
        } else if ((!recursive) && file.isDirectory()) {
            File[] containedFiles = file.listFiles();
            if (containedFiles == null) {
                throw new IOException("Directory " + file.toString() + " does not exist or an I/O error occurred");
            } else if (containedFiles.length != 0) {
                throw new IOException("Directory " + file.toString() + " is not empty");
            }
        }

        return delete(file);
    }

    /**
     * Deletes the given file or directory.
     *
     * @param f
     *        the file to be deleted
     * @return <code>true</code> if all files were deleted successfully, <code>false</code> otherwise
     * @throws IOException
     *         thrown if an error occurred while deleting the files/directories
     */
    private boolean delete(final File f) throws IOException {

        if (f.isDirectory()) {
            final File[] files = f.listFiles();
            if (files != null) {
                for (File file : files) {
                    final boolean del = delete(file);
                    if (!del) {
                        return false;
                    }
                }
            }
        } else {
            return f.delete();
        }

        // Now directory is empty
        return f.delete();
    }

    /**
     * Recursively creates the directory specified by the provided path.
     *
     * @return <code>true</code>if the directories either already existed or have been created successfully,
     *         <code>false</code> otherwise
     * @throws IOException
     *         thrown if an error occurred while creating the directory/directories
     */
    @Override
    public boolean mkdirs(final Path f) throws IOException {
        checkNotNull(f, "path is null");
        return mkdirsInternal(pathToFile(f));
    }

    private boolean mkdirsInternal(File file) throws IOException {
        if (file.isDirectory()) {
                return true;
        }
        else if (file.exists() && !file.isDirectory()) {
            // Important: The 'exists()' check above must come before the 'isDirectory()' check to
            //            be safe when multiple parallel instances try to create the directory

            // exists and is not a directory -> is a regular file
            throw new FileAlreadyExistsException(file.getAbsolutePath());
        }
        else {
            File parent = file.getParentFile();
            return (parent == null || mkdirsInternal(parent)) && (file.mkdir() || file.isDirectory());
        }
    }

    @Override
    public FSDataOutputStream create(final Path filePath, final WriteMode overwrite) throws IOException {
        checkNotNull(filePath, "filePath");

        if (exists(filePath) && overwrite == WriteMode.NO_OVERWRITE) {
            throw new FileAlreadyExistsException("File already exists: " + filePath);
        }

        final Path parent = filePath.getParent();
        if (parent != null && !mkdirs(parent)) {
            throw new IOException("Mkdirs failed to create " + parent);
        }

        final File file = pathToFile(filePath);
        return new LocalDataOutputStream(file);
    }

    @Override
    public boolean rename(final Path src, final Path dst) throws IOException {
        final File srcFile = pathToFile(src);
        final File dstFile = pathToFile(dst);

        final File dstParent = dstFile.getParentFile();

        // Files.move fails if the destination directory doesn't exist
        //noinspection ResultOfMethodCallIgnored -- we don't care if the directory existed or was created
        dstParent.mkdirs();

        try {
            Files.move(srcFile.toPath(), dstFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
            return true;
        }
        catch (NoSuchFileException | AccessDeniedException | DirectoryNotEmptyException | SecurityException ex) {
            // catch the errors that are regular "move failed" exceptions and return false
            return false;
        }
    }

    @Override
    public boolean isDistributedFS() {
        return false;
    }

    @Override
    public FileSystemKind getKind() {
        return FileSystemKind.FILE_SYSTEM;
    }

    // ------------------------------------------------------------------------

    /**
     * Converts the given Path to a File for this file system.
     *
     * <p>If the path is not absolute, it is interpreted relative to this FileSystem's working directory.
     */
    public File pathToFile(Path path) {
        if (!path.isAbsolute()) {
            path = new Path(getWorkingDirectory(), path);
        }
        return new File(path.toUri().getPath());
    }

    // ------------------------------------------------------------------------

    /**
     * Gets the URI that represents the local file system.
     * That URI is {@code "file:/"} on Windows platforms and {@code "file:///"} on other
     * UNIX family platforms.
     *
     * @return The URI that represents the local file system.
     */
    public static URI getLocalFsURI() {
        return LOCAL_URI;
    }

    /**
     * Gets the shared instance of this file system.
     *
     * @return The shared instance of this file system.
     */
    public static LocalFileSystem getSharedInstance() {
        return INSTANCE;
    }
}
  • The LocalFileSystem inherits the FileSystem and is implemented by using the local file system. the isDistributedFS method returns false; . The getKind method returns FileSystemKind.FILE_SYSTEM

HadoopFileSystem

flink-1.7.2/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java

public class HadoopFileSystem extends FileSystem {

    /** The wrapped Hadoop File System. */
    private final org.apache.hadoop.fs.FileSystem fs;

    /* This field caches the file system kind. It is lazily set because the file system
    * URL is lazily initialized. */
    private FileSystemKind fsKind;


    /**
     * Wraps the given Hadoop File System object as a Flink File System object.
     * The given Hadoop file system object is expected to be initialized already.
     *
     * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
     */
    public HadoopFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
        this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
    }

    /**
     * Gets the underlying Hadoop FileSystem.
     * @return The underlying Hadoop FileSystem.
     */
    public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
        return this.fs;
    }

    // ------------------------------------------------------------------------
    //  file system methods
    // ------------------------------------------------------------------------

    @Override
    public Path getWorkingDirectory() {
        return new Path(this.fs.getWorkingDirectory().toUri());
    }

    public Path getHomeDirectory() {
        return new Path(this.fs.getHomeDirectory().toUri());
    }

    @Override
    public URI getUri() {
        return fs.getUri();
    }

    @Override
    public FileStatus getFileStatus(final Path f) throws IOException {
        org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(toHadoopPath(f));
        return new HadoopFileStatus(status);
    }

    @Override
    public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
            throws IOException {
        if (!(file instanceof HadoopFileStatus)) {
            throw new IOException("file is not an instance of DistributedFileStatus");
        }

        final HadoopFileStatus f = (HadoopFileStatus) file;

        final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
            start, len);

        // Wrap up HDFS specific block location objects
        final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
        for (int i = 0; i < distBlkLocations.length; i++) {
            distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
        }

        return distBlkLocations;
    }

    @Override
    public HadoopDataInputStream open(final Path f, final int bufferSize) throws IOException {
        final org.apache.hadoop.fs.Path path = toHadoopPath(f);
        final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
        return new HadoopDataInputStream(fdis);
    }

    @Override
    public HadoopDataInputStream open(final Path f) throws IOException {
        final org.apache.hadoop.fs.Path path = toHadoopPath(f);
        final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(path);
        return new HadoopDataInputStream(fdis);
    }

    @Override
    @SuppressWarnings("deprecation")
    public HadoopDataOutputStream create(
            final Path f,
            final boolean overwrite,
            final int bufferSize,
            final short replication,
            final long blockSize) throws IOException {

        final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
                toHadoopPath(f), overwrite, bufferSize, replication, blockSize);
        return new HadoopDataOutputStream(fdos);
    }

    @Override
    public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
        final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream =
                this.fs.create(toHadoopPath(f), overwrite == WriteMode.OVERWRITE);
        return new HadoopDataOutputStream(fsDataOutputStream);
    }

    @Override
    public boolean delete(final Path f, final boolean recursive) throws IOException {
        return this.fs.delete(toHadoopPath(f), recursive);
    }

    @Override
    public boolean exists(Path f) throws IOException {
        return this.fs.exists(toHadoopPath(f));
    }

    @Override
    public FileStatus[] listStatus(final Path f) throws IOException {
        final org.apache.hadoop.fs.FileStatus[] hadoopFiles = this.fs.listStatus(toHadoopPath(f));
        final FileStatus[] files = new FileStatus[hadoopFiles.length];

        // Convert types
        for (int i = 0; i < files.length; i++) {
            files[i] = new HadoopFileStatus(hadoopFiles[i]);
        }

        return files;
    }

    @Override
    public boolean mkdirs(final Path f) throws IOException {
        return this.fs.mkdirs(toHadoopPath(f));
    }

    @Override
    public boolean rename(final Path src, final Path dst) throws IOException {
        return this.fs.rename(toHadoopPath(src), toHadoopPath(dst));
    }

    @SuppressWarnings("deprecation")
    @Override
    public long getDefaultBlockSize() {
        return this.fs.getDefaultBlockSize();
    }

    @Override
    public boolean isDistributedFS() {
        return true;
    }

    @Override
    public FileSystemKind getKind() {
        if (fsKind == null) {
            fsKind = getKindForScheme(this.fs.getUri().getScheme());
        }
        return fsKind;
    }

    @Override
    public RecoverableWriter createRecoverableWriter() throws IOException {
        // This writer is only supported on a subset of file systems, and on
        // specific versions. We check these schemes and versions eagerly for better error
        // messages in the constructor of the writer.
        return new HadoopRecoverableWriter(fs);
    }

    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------

    public static org.apache.hadoop.fs.Path toHadoopPath(Path path) {
        return new org.apache.hadoop.fs.Path(path.toUri());
    }

    /**
     * Gets the kind of the file system from its scheme.
     *
     * <p>Implementation note: Initially, especially within the Flink 1.3.x line
     * (in order to not break backwards compatibility), we must only label file systems
     * as 'inconsistent' or as 'not proper filesystems' if we are sure about it.
     * Otherwise, we cause regression for example in the performance and cleanup handling
     * of checkpoints.
     * For that reason, we initially mark some filesystems as 'eventually consistent' or
     * as 'object stores', and leave the others as 'consistent file systems'.
     */
    static FileSystemKind getKindForScheme(String scheme) {
        scheme = scheme.toLowerCase(Locale.US);

        if (scheme.startsWith("s3") || scheme.startsWith("emr")) {
            // the Amazon S3 storage
            return FileSystemKind.OBJECT_STORE;
        }
        else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
            // file servers instead of file systems
            // they might actually be consistent, but we have no hard guarantees
            // currently to rely on that
            return FileSystemKind.OBJECT_STORE;
        }
        else {
            // the remainder should include hdfs, kosmos, ceph, ...
            // this also includes federated HDFS (viewfs).
            return FileSystemKind.FILE_SYSTEM;
        }
    }

}
  • HadoopFileSystem inherits FileSystem, which is implemented by using HDFS file system, and its isDistributedFS method returns true; ; The getKind method returns FileSystemKind.FILE_SYSTEM or FileSystemKind.OBJECT_STORE; ; FlinkS3FileSystem and MapRFileSystem both inherit from HadoopFileSystem.

Summary

  • FileSystem is the abstract base class of file systems used by flink. Subclasses can implement local file systems or distributed file systems. It defin es the abstract methods of getWorkingDirectory, getHomeDirectory, getUri, getFileStatus, getFileBlockLocations, open, listStatus, delete, mkdirs, create, rename, isDistributedFS, getKind, which require subclass implementation. It provides several realized instance methods of initOutPathLocalFS and initOutPathDistFS, as well as several static methods of initialize, getLocalFileSystem, get, getUnguardedFileSystem, getDefaultFsUri
  • The LocalFileSystem inherits the FileSystem and is implemented by using the local file system. the isDistributedFS method returns false; . The getKind method returns FileSystemKind.FILE_SYSTEM
  • HadoopFileSystem inherits FileSystem, which is implemented by using HDFS file system, and its isDistributedFS method returns true; ; The getKind method returns FileSystemKind.FILE_SYSTEM or FileSystemKind.OBJECT_STORE; ; FlinkS3FileSystem and MapRFileSystem both inherit from HadoopFileSystem.

doc