Talk about flink’s NetworkBufferPool

  flink

Order

This article mainly studies flink’s NetworkBufferPool.

BufferPoolFactory

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java

public interface BufferPoolFactory {

    /**
     * Tries to create a buffer pool, which is guaranteed to provide at least the number of required
     * buffers.
     *
     * <p>The buffer pool is of dynamic size with at least <tt>numRequiredBuffers</tt> buffers.
     *
     * @param numRequiredBuffers
     *         minimum number of network buffers in this pool
     * @param maxUsedBuffers
     *         maximum number of network buffers this pool offers
     */
    BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) throws IOException;

    /**
     * Tries to create a buffer pool with an optional owner, which is guaranteed to provide at least the
     * number of required buffers.
     *
     * <p>The buffer pool is of dynamic size with at least <tt>numRequiredBuffers</tt> buffers.
     *
     * @param numRequiredBuffers
     *         minimum number of network buffers in this pool
     * @param maxUsedBuffers
     *         maximum number of network buffers this pool offers
     *     @param owner
     *         the optional owner of this buffer pool to release memory when needed
     */
    BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers, Optional<BufferPoolOwner> owner) throws IOException;

    /**
     * Destroy callback for updating factory book keeping.
     */
    void destroyBufferPool(BufferPool bufferPool) throws IOException;

}
  • BufferPoolFactory defines createBufferPool and destroyBufferPool methods; The createBufferPool supports numRequiredBuffers, maxUsedBuffers, and owner parameters

NetworkBufferPool

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java

public class NetworkBufferPool implements BufferPoolFactory {
    //......

    private static final Logger LOG = LoggerFactory.getLogger(NetworkBufferPool.class);

    private final int totalNumberOfMemorySegments;

    private final int memorySegmentSize;

    private final ArrayBlockingQueue<MemorySegment> availableMemorySegments;

    private volatile boolean isDestroyed;

    // ---- Managed buffer pools ----------------------------------------------

    private final Object factoryLock = new Object();

    private final Set<LocalBufferPool> allBufferPools = new HashSet<>();

    private int numTotalRequiredBuffers;

    /**
     * Allocates all {@link MemorySegment} instances managed by this pool.
     */
    public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {

        this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
        this.memorySegmentSize = segmentSize;

        final long sizeInLong = (long) segmentSize;

        try {
            this.availableMemorySegments = new ArrayBlockingQueue<>(numberOfSegmentsToAllocate);
        }
        catch (OutOfMemoryError err) {
            throw new OutOfMemoryError("Could not allocate buffer queue of length "
                    + numberOfSegmentsToAllocate + " - " + err.getMessage());
        }

        try {
            for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
                availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null));
            }
        }
        catch (OutOfMemoryError err) {
            int allocated = availableMemorySegments.size();

            // free some memory
            availableMemorySegments.clear();

            long requiredMb = (sizeInLong * numberOfSegmentsToAllocate) >> 20;
            long allocatedMb = (sizeInLong * allocated) >> 20;
            long missingMb = requiredMb - allocatedMb;

            throw new OutOfMemoryError("Could not allocate enough memory segments for NetworkBufferPool " +
                    "(required (Mb): " + requiredMb +
                    ", allocated (Mb): " + allocatedMb +
                    ", missing (Mb): " + missingMb + "). Cause: " + err.getMessage());
        }

        long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;

        LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",
                allocatedMb, availableMemorySegments.size(), segmentSize);
    }

    @Override
    public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) throws IOException {
        return createBufferPool(numRequiredBuffers, maxUsedBuffers, Optional.empty());
    }

    @Override
    public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers, Optional<BufferPoolOwner> owner) throws IOException {
        // It is necessary to use a separate lock from the one used for buffer
        // requests to ensure deadlock freedom for failure cases.
        synchronized (factoryLock) {
            if (isDestroyed) {
                throw new IllegalStateException("Network buffer pool has already been destroyed.");
            }

            // Ensure that the number of required buffers can be satisfied.
            // With dynamic memory management this should become obsolete.
            if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) {
                throw new IOException(String.format("Insufficient number of network buffers: " +
                                "required %d, but only %d available. The total number of network " +
                                "buffers is currently set to %d of %d bytes each. You can increase this " +
                                "number by setting the configuration keys '%s', '%s', and '%s'.",
                        numRequiredBuffers,
                        totalNumberOfMemorySegments - numTotalRequiredBuffers,
                        totalNumberOfMemorySegments,
                        memorySegmentSize,
                        TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
                        TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
                        TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
            }

            this.numTotalRequiredBuffers += numRequiredBuffers;

            // We are good to go, create a new buffer pool and redistribute
            // non-fixed size buffers.
            LocalBufferPool localBufferPool =
                new LocalBufferPool(this, numRequiredBuffers, maxUsedBuffers, owner);

            allBufferPools.add(localBufferPool);

            try {
                redistributeBuffers();
            } catch (IOException e) {
                try {
                    destroyBufferPool(localBufferPool);
                } catch (IOException inner) {
                    e.addSuppressed(inner);
                }
                ExceptionUtils.rethrowIOException(e);
            }

            return localBufferPool;
        }
    }

    @Override
    public void destroyBufferPool(BufferPool bufferPool) throws IOException {
        if (!(bufferPool instanceof LocalBufferPool)) {
            throw new IllegalArgumentException("bufferPool is no LocalBufferPool");
        }

        synchronized (factoryLock) {
            if (allBufferPools.remove(bufferPool)) {
                numTotalRequiredBuffers -= bufferPool.getNumberOfRequiredMemorySegments();

                redistributeBuffers();
            }
        }
    }

    //......
}
  • The NetworkBufferPool implements the BufferPoolFactory interface, and its constructor receives two parameters, numberOfSegmentsToAllocate segmentSize. In the constructor, the ArrayBlockingQueue is created according to numberOfSegmentsToAllocate, and then the MemorySegment are created one by one through MemorySegmentFactory. AllocateUserOffHeadPMemory and added to availableMemorySegments.
  • The createBufferPool method creates a LocalBufferPool (Passed in its own NetworkBufferPool instance.), and then added to allBufferPools this set, while adding numTotalRequiredBuffers;; The destroyBufferPool method removes the bufferPool from allBufferPools while reducing numTotalRequiredBuffers.
  • The createBufferPool method and destroyBufferPool method will call the redistributeBuffers method to adjust the size of the buffer pool by calling the setNumBuffers method of LocalBufferPool.

LocalBufferPool

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java

class LocalBufferPool implements BufferPool {
    private static final Logger LOG = LoggerFactory.getLogger(LocalBufferPool.class);

    /** Global network buffer pool to get buffers from. */
    private final NetworkBufferPool networkBufferPool;

    /** The minimum number of required segments for this pool. */
    private final int numberOfRequiredMemorySegments;

    /**
     * The currently available memory segments. These are segments, which have been requested from
     * the network buffer pool and are currently not handed out as Buffer instances.
     *
     * <p><strong>BEWARE:</strong> Take special care with the interactions between this lock and
     * locks acquired before entering this class vs. locks being acquired during calls to external
     * code inside this class, e.g. with
     * {@link org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel#bufferQueue}
     * via the {@link #registeredListeners} callback.
     */
    private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();

    /**
     * Buffer availability listeners, which need to be notified when a Buffer becomes available.
     * Listeners can only be registered at a time/state where no Buffer instance was available.
     */
    private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>();

    /** Maximum number of network buffers to allocate. */
    private final int maxNumberOfMemorySegments;

    /** The current size of this pool. */
    private int currentPoolSize;

    /**
     * Number of all memory segments, which have been requested from the network buffer pool and are
     * somehow referenced through this pool (e.g. wrapped in Buffer instances or as available segments).
     */
    private int numberOfRequestedMemorySegments;

    private boolean isDestroyed;

    private final Optional<BufferPoolOwner> owner;

    /**
     * Local buffer pool based on the given <tt>networkBufferPool</tt> with a minimal number of
     * network buffers being available.
     *
     * @param networkBufferPool
     *         global network buffer pool to get buffers from
     * @param numberOfRequiredMemorySegments
     *         minimum number of network buffers
     */
    LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments) {
        this(networkBufferPool, numberOfRequiredMemorySegments, Integer.MAX_VALUE, Optional.empty());
    }

    /**
     * Local buffer pool based on the given <tt>networkBufferPool</tt> with a minimal and maximal
     * number of network buffers being available.
     *
     * @param networkBufferPool
     *         global network buffer pool to get buffers from
     * @param numberOfRequiredMemorySegments
     *         minimum number of network buffers
     * @param maxNumberOfMemorySegments
     *         maximum number of network buffers to allocate
     */
    LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments,
            int maxNumberOfMemorySegments) {
        this(networkBufferPool, numberOfRequiredMemorySegments, maxNumberOfMemorySegments, Optional.empty());
    }

    /**
     * Local buffer pool based on the given <tt>networkBufferPool</tt> and <tt>bufferPoolOwner</tt>
     * with a minimal and maximal number of network buffers being available.
     *
     * @param networkBufferPool
     *         global network buffer pool to get buffers from
     * @param numberOfRequiredMemorySegments
     *         minimum number of network buffers
     * @param maxNumberOfMemorySegments
     *         maximum number of network buffers to allocate
     *     @param owner
     *         the optional owner of this buffer pool to release memory when needed
     */
    LocalBufferPool(
        NetworkBufferPool networkBufferPool,
        int numberOfRequiredMemorySegments,
        int maxNumberOfMemorySegments,
        Optional<BufferPoolOwner> owner) {
        checkArgument(maxNumberOfMemorySegments >= numberOfRequiredMemorySegments,
            "Maximum number of memory segments (%s) should not be smaller than minimum (%s).",
            maxNumberOfMemorySegments, numberOfRequiredMemorySegments);

        checkArgument(maxNumberOfMemorySegments > 0,
            "Maximum number of memory segments (%s) should be larger than 0.",
            maxNumberOfMemorySegments);

        LOG.debug("Using a local buffer pool with {}-{} buffers",
            numberOfRequiredMemorySegments, maxNumberOfMemorySegments);

        this.networkBufferPool = networkBufferPool;
        this.numberOfRequiredMemorySegments = numberOfRequiredMemorySegments;
        this.currentPoolSize = numberOfRequiredMemorySegments;
        this.maxNumberOfMemorySegments = maxNumberOfMemorySegments;
        this.owner = owner;
    }

    //......

    @Override
    public void recycle(MemorySegment segment) {
        BufferListener listener;
        NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED;
        while (!notificationResult.isBufferUsed()) {
            synchronized (availableMemorySegments) {
                if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
                    returnMemorySegment(segment);
                    return;
                } else {
                    listener = registeredListeners.poll();
                    if (listener == null) {
                        availableMemorySegments.add(segment);
                        availableMemorySegments.notify();
                        return;
                    }
                }
            }
            notificationResult = fireBufferAvailableNotification(listener, segment);
        }
    }

    private MemorySegment requestMemorySegment(boolean isBlocking) throws InterruptedException, IOException {
        synchronized (availableMemorySegments) {
            returnExcessMemorySegments();

            boolean askToRecycle = owner.isPresent();

            // fill availableMemorySegments with at least one element, wait if required
            while (availableMemorySegments.isEmpty()) {
                if (isDestroyed) {
                    throw new IllegalStateException("Buffer pool is destroyed.");
                }

                if (numberOfRequestedMemorySegments < currentPoolSize) {
                    final MemorySegment segment = networkBufferPool.requestMemorySegment();

                    if (segment != null) {
                        numberOfRequestedMemorySegments++;
                        return segment;
                    }
                }

                if (askToRecycle) {
                    owner.get().releaseMemory(1);
                }

                if (isBlocking) {
                    availableMemorySegments.wait(2000);
                }
                else {
                    return null;
                }
            }

            return availableMemorySegments.poll();
        }
    }

    //......
}
  • The constructor of the LocalBufferPool requires that it be passed into the NetworkBufferPool, and its internal requestMemorySegment method, when the availableMemorySegments are empty and numberofrequestmemorysegments < currenttoolsize, RequestMemorySegment; () will be called to apply for MemorySegment; However, the recycle method will return the MemorySegment to the networkBufferPool when numberofrequestedmemorysegments > currentstoresize, otherwise it will be returned to the availableMemorySegments when the BufferListener is null.

Summary

  • BufferPoolFactory defines createBufferPool and destroyBufferPool methods; The createBufferPool supports numRequiredBuffers, maxUsedBuffers, and owner parameters; The NetworkBufferPool implements the BufferPoolFactory interface, and its constructor receives two parameters, numberOfSegmentsToAllocate segmentSize. In the constructor, the ArrayBlockingQueue is created according to numberOfSegmentsToAllocate, and then the MemorySegment are created one by one through MemorySegmentFactory. AllocateUserOffHeadPMemory and added to availableMemorySegments.
  • The createBufferPool method of NetworkBufferPool creates a LocalBufferPool (Passed in its own NetworkBufferPool instance.), and then added to allBufferPools this set, while adding numTotalRequiredBuffers;; The destroyBufferPool method removes the bufferPool from allBufferPools while reducing numTotalRequiredBuffers;; The createBufferPool method and destroyBufferPool method will call the redistributeBuffers method to adjust the size of the buffer pool by calling the setNumBuffers method of LocalBufferPool.
  • The constructor of the LocalBufferPool requires that it be passed into the NetworkBufferPool, and its internal requestMemorySegment method, when the availableMemorySegments are empty and numberofrequestmemorysegments < currenttoolsize, RequestMemorySegment; () will be called to apply for MemorySegment; However, the recycle method will return the MemorySegment to the networkBufferPool when numberofrequestedmemorysegments > currentstoresize, otherwise it will be returned to the availableMemorySegments when the BufferListener is null.

doc