Talk about flink’s MemoryPool

  flink

Order

This article mainly studies flink’s MemoryPool

MemoryPool

flink-runtime_2.11-1.7.2-sources.jar! /org/apache/flink/runtime/memory/MemoryManager.java

    abstract static class MemoryPool {

        abstract int getNumberOfAvailableMemorySegments();

        abstract MemorySegment allocateNewSegment(Object owner);

        abstract MemorySegment requestSegmentFromPool(Object owner);

        abstract void returnSegmentToPool(MemorySegment segment);

        abstract void clear();
    }
  • MemoryPool defines several abstract methods: GetNumberOfAvailableMemorySegments, allocateNewSegment, requestSegmentFromPool, returnSegmentToPool, clear. It has two subclasses: HybridHeapMemoryPool and HybridOffHeapMemoryPool

HybridHeapMemoryPool

flink-runtime_2.11-1.7.2-sources.jar! /org/apache/flink/runtime/memory/MemoryManager.java

    static final class HybridHeapMemoryPool extends MemoryPool {

        /** The collection of available memory segments. */
        private final ArrayDeque<byte[]> availableMemory;

        private final int segmentSize;

        HybridHeapMemoryPool(int numInitialSegments, int segmentSize) {
            this.availableMemory = new ArrayDeque<>(numInitialSegments);
            this.segmentSize = segmentSize;

            for (int i = 0; i < numInitialSegments; i++) {
                this.availableMemory.add(new byte[segmentSize]);
            }
        }

        @Override
        MemorySegment allocateNewSegment(Object owner) {
            return MemorySegmentFactory.allocateUnpooledSegment(segmentSize, owner);
        }

        @Override
        MemorySegment requestSegmentFromPool(Object owner) {
            byte[] buf = availableMemory.remove();
            return  MemorySegmentFactory.wrapPooledHeapMemory(buf, owner);
        }

        @Override
        void returnSegmentToPool(MemorySegment segment) {
            if (segment.getClass() == HybridMemorySegment.class) {
                HybridMemorySegment heapSegment = (HybridMemorySegment) segment;
                availableMemory.add(heapSegment.getArray());
                heapSegment.free();
            }
            else {
                throw new IllegalArgumentException("Memory segment is not a " + HybridMemorySegment.class.getSimpleName());
            }
        }

        @Override
        protected int getNumberOfAvailableMemorySegments() {
            return availableMemory.size();
        }

        @Override
        void clear() {
            availableMemory.clear();
        }
    }
  • HybridHeapMemoryPool inherits MemoryPool, which uses the jvm’s heap memory; The constructor receives two parameters numInitialSegments and segmentSize to initialize the ArrayDeque of availableMemory, and the element type of the queue is byte[]
  • The allocateNewSegment method calls memorysegmentfactory.allocateeunpooledsegment, which is used to allocate unpooled memory; ; The requestSegmentFromPool method calls availableMemory.remove (), and then calls MemorySegmentFactory. WrappledHeadPMemory to wrap it as MemorySegment. This method directly removes without determining the size of ArrayDeque, which requires attention.
  • The returnSegmentToPool method only processes the HybridMemorySegment type, first returning its byte[] to the availableMemory, and then calling heapSegment.free () to release; The getNumberOfAvailableMemorySegments method returns AvailableMemory.size (); Clear method calls availableMemory.clear ()

HybridOffHeapMemoryPool

flink-runtime_2.11-1.7.2-sources.jar! /org/apache/flink/runtime/memory/MemoryManager.java

    static final class HybridOffHeapMemoryPool extends MemoryPool {

        /** The collection of available memory segments. */
        private final ArrayDeque<ByteBuffer> availableMemory;

        private final int segmentSize;

        HybridOffHeapMemoryPool(int numInitialSegments, int segmentSize) {
            this.availableMemory = new ArrayDeque<>(numInitialSegments);
            this.segmentSize = segmentSize;

            for (int i = 0; i < numInitialSegments; i++) {
                this.availableMemory.add(ByteBuffer.allocateDirect(segmentSize));
            }
        }

        @Override
        MemorySegment allocateNewSegment(Object owner) {
            return MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, owner);
        }

        @Override
        MemorySegment requestSegmentFromPool(Object owner) {
            ByteBuffer buf = availableMemory.remove();
            return MemorySegmentFactory.wrapPooledOffHeapMemory(buf, owner);
        }

        @Override
        void returnSegmentToPool(MemorySegment segment) {
            if (segment.getClass() == HybridMemorySegment.class) {
                HybridMemorySegment hybridSegment = (HybridMemorySegment) segment;
                ByteBuffer buf = hybridSegment.getOffHeapBuffer();
                availableMemory.add(buf);
                hybridSegment.free();
            }
            else {
                throw new IllegalArgumentException("Memory segment is not a " + HybridMemorySegment.class.getSimpleName());
            }
        }

        @Override
        protected int getNumberOfAvailableMemorySegments() {
            return availableMemory.size();
        }

        @Override
        void clear() {
            availableMemory.clear();
        }
    }
  • HybridOffHeapMemoryPool inherits MemoryPool, which uses OffHeap; ; The constructor receives two parameters numInitialSegments and segmentSize to initialize the ArrayDeque of availableMemory, and the element type of the queue is ByteBuffer
  • The allocateNewSegment method calls memorysegmentfactory.allocateeunpooledoffheadmemory, which is used to allocate unpoled off-headmemory; The requestSegmentFromPool method calls availableMemory.remove (), and then calls MemorySegmentFactory. WrappledOffheadMemory to wrap it as a MemorySegment. This method directly removes without determining the size of ArrayDeque, which requires attention.
  • The returnSegmentToPool method only processes the HybridMemorySegment type, first returning its ByteBuffer to the availableMemory, and then calling heapSegment.free () to release; The getNumberOfAvailableMemorySegments method returns AvailableMemory.size (); Clear method calls availableMemory.clear ()

Summary

  • MemoryPool defines several abstract methods: GetNumberOfAvailableMemorySegments, allocateNewSegment, requestSegmentFromPool, returnSegmentToPool, clear. It has two subclasses: HybridHeapMemoryPool and HybridOffHeapMemoryPool
  • HybridHeapMemoryPool inherits MemoryPool, which uses the jvm’s heap memory; The constructor receives two parameters, numInitialSegments and segmentSize, to initialize the ArrayDeque, availableMemory, whose element type is byte []; The allocateNewSegment method calls memorysegmentfactory.allocateeunpooledsegment, which is used to allocate unpooled memory; ; The requestSegmentFromPool method calls availableMemory.remove (), and then calls MemorySegmentFactory. WrappledHeadPMemory to wrap it as MemorySegment. This method directly removes without judging the size of ArrayDe que, which requires attention. The returnSegmentToPool method only processes the HybridMemorySegment type, first returning its byte[] to the availableMemory, and then calling heapSegment.free () to release; The getNumberOfAvailableMemorySegments method returns AvailableMemory.size (); Clear method calls availableMemory.clear ()
  • HybridOffHeapMemoryPool inherits MemoryPool, which uses OffHeap; ; The constructor receives two parameters, numInitialSegments and segmentSize, to initialize the ArrayDeque of availableMemory. the element type of the queue is ByteBuffer; ; The allocateNewSegment method calls memorysegmentfactory.allocateeunpooledoffheadmemory, which is used to allocate unpoled off-headmemory; The requestSegmentFromPool method calls availableMemory.remove (), and then calls MemorySegmentFactory. WrappledOffheadMemory to wrap it as MemorySegment. This method directly removes without judging the size of Array Deque, which requires attention. The returnSegmentToPool method only processes the HybridMemorySegment type, first returning its ByteBuffer to the availableMemory, and then calling heapSegment.free () to release; The getNumberOfAvailableMemorySegments method returns AvailableMemory.size (); Clear method calls availableMemory.clear ()

doc