Talk about flink’s MemorySegment

  flink

Order

This article mainly studies flink’s MemorySegment

MemorySegment

flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java

@Internal
public abstract class MemorySegment {

    @SuppressWarnings("restriction")
    protected static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;

    @SuppressWarnings("restriction")
    protected static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);

    private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);

    // ------------------------------------------------------------------------

    protected final byte[] heapMemory;

    protected long address;

    protected final long addressLimit;

    protected final int size;

    private final Object owner;

    MemorySegment(byte[] buffer, Object owner) {
        if (buffer == null) {
            throw new NullPointerException("buffer");
        }

        this.heapMemory = buffer;
        this.address = BYTE_ARRAY_BASE_OFFSET;
        this.size = buffer.length;
        this.addressLimit = this.address + this.size;
        this.owner = owner;
    }

    MemorySegment(long offHeapAddress, int size, Object owner) {
        if (offHeapAddress <= 0) {
            throw new IllegalArgumentException("negative pointer or size");
        }
        if (offHeapAddress >= Long.MAX_VALUE - Integer.MAX_VALUE) {
            // this is necessary to make sure the collapsed checks are safe against numeric overflows
            throw new IllegalArgumentException("Segment initialized with too large address: " + offHeapAddress
                    + " ; Max allowed address is " + (Long.MAX_VALUE - Integer.MAX_VALUE - 1));
        }

        this.heapMemory = null;
        this.address = offHeapAddress;
        this.addressLimit = this.address + size;
        this.size = size;
        this.owner = owner;
    }

    // ------------------------------------------------------------------------
    // Memory Segment Operations
    // ------------------------------------------------------------------------

    public int size() {
        return size;
    }

    public boolean isFreed() {
        return address > addressLimit;
    }

    public void free() {
        // this ensures we can place no more data and trigger
        // the checks for the freed segment
        address = addressLimit + 1;
    }

    public boolean isOffHeap() {
        return heapMemory == null;
    }

    public byte[] getArray() {
        if (heapMemory != null) {
            return heapMemory;
        } else {
            throw new IllegalStateException("Memory segment does not represent heap memory");
        }
    }

    public long getAddress() {
        if (heapMemory == null) {
            return address;
        } else {
            throw new IllegalStateException("Memory segment does not represent off heap memory");
        }
    }

    public abstract ByteBuffer wrap(int offset, int length);

    public Object getOwner() {
        return owner;
    }


    // ------------------------------------------------------------------------
    //                    Random Access get() and put() methods
    // ------------------------------------------------------------------------

    //------------------------------------------------------------------------
    // Notes on the implementation: We try to collapse as many checks as
    // possible. We need to obey the following rules to make this safe
    // against segfaults:
    //
    //  - Grab mutable fields onto the stack before checking and using. This
    //    guards us against concurrent modifications which invalidate the
    //    pointers
    //  - Use subtractions for range checks, as they are tolerant
    //------------------------------------------------------------------------

    public abstract byte get(int index);

    public abstract void put(int index, byte b);

    public abstract void get(int index, byte[] dst);

    public abstract void put(int index, byte[] src);

    public abstract void get(int index, byte[] dst, int offset, int length);

    public abstract void put(int index, byte[] src, int offset, int length);

    public abstract boolean getBoolean(int index);

    public abstract void putBoolean(int index, boolean value);

    @SuppressWarnings("restriction")
    public final char getChar(int index) {
        final long pos = address + index;
        if (index >= 0 && pos <= addressLimit - 2) {
            return UNSAFE.getChar(heapMemory, pos);
        }
        else if (address > addressLimit) {
            throw new IllegalStateException("This segment has been freed.");
        }
        else {
            // index is in fact invalid
            throw new IndexOutOfBoundsException();
        }
    }

    public final char getCharLittleEndian(int index) {
        if (LITTLE_ENDIAN) {
            return getChar(index);
        } else {
            return Character.reverseBytes(getChar(index));
        }
    }

    public final char getCharBigEndian(int index) {
        if (LITTLE_ENDIAN) {
            return Character.reverseBytes(getChar(index));
        } else {
            return getChar(index);
        }
    }

    @SuppressWarnings("restriction")
    public final void putChar(int index, char value) {
        final long pos = address + index;
        if (index >= 0 && pos <= addressLimit - 2) {
            UNSAFE.putChar(heapMemory, pos, value);
        }
        else if (address > addressLimit) {
            throw new IllegalStateException("segment has been freed");
        }
        else {
            // index is in fact invalid
            throw new IndexOutOfBoundsException();
        }
    }

    public final void putCharLittleEndian(int index, char value) {
        if (LITTLE_ENDIAN) {
            putChar(index, value);
        } else {
            putChar(index, Character.reverseBytes(value));
        }
    }

    public final void putCharBigEndian(int index, char value) {
        if (LITTLE_ENDIAN) {
            putChar(index, Character.reverseBytes(value));
        } else {
            putChar(index, value);
        }
    }

    public final short getShort(int index) {
        final long pos = address + index;
        if (index >= 0 && pos <= addressLimit - 2) {
            return UNSAFE.getShort(heapMemory, pos);
        }
        else if (address > addressLimit) {
            throw new IllegalStateException("segment has been freed");
        }
        else {
            // index is in fact invalid
            throw new IndexOutOfBoundsException();
        }
    }

    public final short getShortLittleEndian(int index) {
        if (LITTLE_ENDIAN) {
            return getShort(index);
        } else {
            return Short.reverseBytes(getShort(index));
        }
    }

    public final short getShortBigEndian(int index) {
        if (LITTLE_ENDIAN) {
            return Short.reverseBytes(getShort(index));
        } else {
            return getShort(index);
        }
    }

    public final void putShort(int index, short value) {
        final long pos = address + index;
        if (index >= 0 && pos <= addressLimit - 2) {
            UNSAFE.putShort(heapMemory, pos, value);
        }
        else if (address > addressLimit) {
            throw new IllegalStateException("segment has been freed");
        }
        else {
            // index is in fact invalid
            throw new IndexOutOfBoundsException();
        }
    }

    public final void putShortLittleEndian(int index, short value) {
        if (LITTLE_ENDIAN) {
            putShort(index, value);
        } else {
            putShort(index, Short.reverseBytes(value));
        }
    }

    public final void putShortBigEndian(int index, short value) {
        if (LITTLE_ENDIAN) {
            putShort(index, Short.reverseBytes(value));
        } else {
            putShort(index, value);
        }
    }

    public final int getInt(int index) {
        final long pos = address + index;
        if (index >= 0 && pos <= addressLimit - 4) {
            return UNSAFE.getInt(heapMemory, pos);
        }
        else if (address > addressLimit) {
            throw new IllegalStateException("segment has been freed");
        }
        else {
            // index is in fact invalid
            throw new IndexOutOfBoundsException();
        }
    }

    public final int getIntLittleEndian(int index) {
        if (LITTLE_ENDIAN) {
            return getInt(index);
        } else {
            return Integer.reverseBytes(getInt(index));
        }
    }

    public final int getIntBigEndian(int index) {
        if (LITTLE_ENDIAN) {
            return Integer.reverseBytes(getInt(index));
        } else {
            return getInt(index);
        }
    }

    public final void putInt(int index, int value) {
        final long pos = address + index;
        if (index >= 0 && pos <= addressLimit - 4) {
            UNSAFE.putInt(heapMemory, pos, value);
        }
        else if (address > addressLimit) {
            throw new IllegalStateException("segment has been freed");
        }
        else {
            // index is in fact invalid
            throw new IndexOutOfBoundsException();
        }
    }

    public final void putIntLittleEndian(int index, int value) {
        if (LITTLE_ENDIAN) {
            putInt(index, value);
        } else {
            putInt(index, Integer.reverseBytes(value));
        }
    }

    public final void putIntBigEndian(int index, int value) {
        if (LITTLE_ENDIAN) {
            putInt(index, Integer.reverseBytes(value));
        } else {
            putInt(index, value);
        }
    }

    public final long getLong(int index) {
        final long pos = address + index;
        if (index >= 0 && pos <= addressLimit - 8) {
            return UNSAFE.getLong(heapMemory, pos);
        }
        else if (address > addressLimit) {
            throw new IllegalStateException("segment has been freed");
        }
        else {
            // index is in fact invalid
            throw new IndexOutOfBoundsException();
        }
    }

    public final long getLongLittleEndian(int index) {
        if (LITTLE_ENDIAN) {
            return getLong(index);
        } else {
            return Long.reverseBytes(getLong(index));
        }
    }

    public final long getLongBigEndian(int index) {
        if (LITTLE_ENDIAN) {
            return Long.reverseBytes(getLong(index));
        } else {
            return getLong(index);
        }
    }

    public final void putLong(int index, long value) {
        final long pos = address + index;
        if (index >= 0 && pos <= addressLimit - 8) {
            UNSAFE.putLong(heapMemory, pos, value);
        }
        else if (address > addressLimit) {
            throw new IllegalStateException("segment has been freed");
        }
        else {
            // index is in fact invalid
            throw new IndexOutOfBoundsException();
        }
    }

    public final void putLongLittleEndian(int index, long value) {
        if (LITTLE_ENDIAN) {
            putLong(index, value);
        } else {
            putLong(index, Long.reverseBytes(value));
        }
    }

    public final void putLongBigEndian(int index, long value) {
        if (LITTLE_ENDIAN) {
            putLong(index, Long.reverseBytes(value));
        } else {
            putLong(index, value);
        }
    }

    public final float getFloat(int index) {
        return Float.intBitsToFloat(getInt(index));
    }

    public final float getFloatLittleEndian(int index) {
        return Float.intBitsToFloat(getIntLittleEndian(index));
    }

    public final float getFloatBigEndian(int index) {
        return Float.intBitsToFloat(getIntBigEndian(index));
    }

    public final void putFloat(int index, float value) {
        putInt(index, Float.floatToRawIntBits(value));
    }

    public final void putFloatLittleEndian(int index, float value) {
        putIntLittleEndian(index, Float.floatToRawIntBits(value));
    }

    public final void putFloatBigEndian(int index, float value) {
        putIntBigEndian(index, Float.floatToRawIntBits(value));
    }

    public final double getDouble(int index) {
        return Double.longBitsToDouble(getLong(index));
    }

    public final double getDoubleLittleEndian(int index) {
        return Double.longBitsToDouble(getLongLittleEndian(index));
    }

    public final double getDoubleBigEndian(int index) {
        return Double.longBitsToDouble(getLongBigEndian(index));
    }

    public final void putDouble(int index, double value) {
        putLong(index, Double.doubleToRawLongBits(value));
    }

    public final void putDoubleLittleEndian(int index, double value) {
        putLongLittleEndian(index, Double.doubleToRawLongBits(value));
    }

    public final void putDoubleBigEndian(int index, double value) {
        putLongBigEndian(index, Double.doubleToRawLongBits(value));
    }

    // -------------------------------------------------------------------------
    //                     Bulk Read and Write Methods
    // -------------------------------------------------------------------------

    public abstract void get(DataOutput out, int offset, int length) throws IOException;

    public abstract void put(DataInput in, int offset, int length) throws IOException;

    public abstract void get(int offset, ByteBuffer target, int numBytes);

    public abstract void put(int offset, ByteBuffer source, int numBytes);

    public final void copyTo(int offset, MemorySegment target, int targetOffset, int numBytes) {
        final byte[] thisHeapRef = this.heapMemory;
        final byte[] otherHeapRef = target.heapMemory;
        final long thisPointer = this.address + offset;
        final long otherPointer = target.address + targetOffset;

        if ((numBytes | offset | targetOffset) >= 0 &&
                thisPointer <= this.addressLimit - numBytes && otherPointer <= target.addressLimit - numBytes) {
            UNSAFE.copyMemory(thisHeapRef, thisPointer, otherHeapRef, otherPointer, numBytes);
        }
        else if (this.address > this.addressLimit) {
            throw new IllegalStateException("this memory segment has been freed.");
        }
        else if (target.address > target.addressLimit) {
            throw new IllegalStateException("target memory segment has been freed.");
        }
        else {
            throw new IndexOutOfBoundsException(
                    String.format("offset=%d, targetOffset=%d, numBytes=%d, address=%d, targetAddress=%d",
                    offset, targetOffset, numBytes, this.address, target.address));
        }
    }

    // -------------------------------------------------------------------------
    //                      Comparisons & Swapping
    // -------------------------------------------------------------------------

    public final int compare(MemorySegment seg2, int offset1, int offset2, int len) {
        while (len >= 8) {
            long l1 = this.getLongBigEndian(offset1);
            long l2 = seg2.getLongBigEndian(offset2);

            if (l1 != l2) {
                return (l1 < l2) ^ (l1 < 0) ^ (l2 < 0) ? -1 : 1;
            }

            offset1 += 8;
            offset2 += 8;
            len -= 8;
        }
        while (len > 0) {
            int b1 = this.get(offset1) & 0xff;
            int b2 = seg2.get(offset2) & 0xff;
            int cmp = b1 - b2;
            if (cmp != 0) {
                return cmp;
            }
            offset1++;
            offset2++;
            len--;
        }
        return 0;
    }

    public final void swapBytes(byte[] tempBuffer, MemorySegment seg2, int offset1, int offset2, int len) {
        if ((offset1 | offset2 | len | (tempBuffer.length - len)) >= 0) {
            final long thisPos = this.address + offset1;
            final long otherPos = seg2.address + offset2;

            if (thisPos <= this.addressLimit - len && otherPos <= seg2.addressLimit - len) {
                // this -> temp buffer
                UNSAFE.copyMemory(this.heapMemory, thisPos, tempBuffer, BYTE_ARRAY_BASE_OFFSET, len);

                // other -> this
                UNSAFE.copyMemory(seg2.heapMemory, otherPos, this.heapMemory, thisPos, len);

                // temp buffer -> other
                UNSAFE.copyMemory(tempBuffer, BYTE_ARRAY_BASE_OFFSET, seg2.heapMemory, otherPos, len);
                return;
            }
            else if (this.address > this.addressLimit) {
                throw new IllegalStateException("this memory segment has been freed.");
            }
            else if (seg2.address > seg2.addressLimit) {
                throw new IllegalStateException("other memory segment has been freed.");
            }
        }

        // index is in fact invalid
        throw new IndexOutOfBoundsException(
                    String.format("offset1=%d, offset2=%d, len=%d, bufferSize=%d, address1=%d, address2=%d",
                            offset1, offset2, len, tempBuffer.length, this.address, seg2.address));
    }
}
  • MemorySegment is somewhat similar to java.nio.ByteBuffer; ; It has a heapMemory property of type byte[]; It has two constructors. Constructors with byte[] type parameters will assign byte[] to Headmemory, while constructors without byte[] type parameters will have Headmemory null; . The isOffHeap method is used to determine whether the current memory segment is heap or off-heap, which is determined according to whether heapMemory is null, and OFF-HEAP if null; In addition, comparison, swapBytes and copyTo methods are provided. It also shows that BigEndian and LittleEndian get and put methods are provided.
  • The relevant methods of BigEndian include: get/putCharBigEndian, get/putShortBigEndian, get/putIntBigEndian, get/putLongBigEndian, get/putFloatBigEndian, get/putDoubleBigEndian; ; The relevant methods of LittleEndian include: get/putCharLittleEndian, get/putShortLittleEndian, get/putIntLittleEndian, get/putLongLittleEndian, get/putFloatLittleEndian, get/putDoubleLittleEndian
  • MemorySegment defines free, wrap, get, put, getBoolean, putBoolean abstract methods that require subclasses to implement; MemorySegment has two subclasses, namely HeapMemorySegment and HybridMemorySegment.

HeapMemorySegment

flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java

@SuppressWarnings("unused")
@Internal
public final class HeapMemorySegment extends MemorySegment {

    private byte[] memory;

    HeapMemorySegment(byte[] memory) {
        this(memory, null);
    }

    HeapMemorySegment(byte[] memory, Object owner) {
        super(Objects.requireNonNull(memory), owner);
        this.memory = memory;
    }

    // -------------------------------------------------------------------------
    //  MemorySegment operations
    // -------------------------------------------------------------------------

    @Override
    public void free() {
        super.free();
        this.memory = null;
    }

    @Override
    public ByteBuffer wrap(int offset, int length) {
        try {
            return ByteBuffer.wrap(this.memory, offset, length);
        }
        catch (NullPointerException e) {
            throw new IllegalStateException("segment has been freed");
        }
    }

    public byte[] getArray() {
        return this.heapMemory;
    }

    // ------------------------------------------------------------------------
    //                    Random Access get() and put() methods
    // ------------------------------------------------------------------------

    @Override
    public final byte get(int index) {
        return this.memory[index];
    }

    @Override
    public final void put(int index, byte b) {
        this.memory[index] = b;
    }

    @Override
    public final void get(int index, byte[] dst) {
        get(index, dst, 0, dst.length);
    }

    @Override
    public final void put(int index, byte[] src) {
        put(index, src, 0, src.length);
    }

    @Override
    public final void get(int index, byte[] dst, int offset, int length) {
        // system arraycopy does the boundary checks anyways, no need to check extra
        System.arraycopy(this.memory, index, dst, offset, length);
    }

    @Override
    public final void put(int index, byte[] src, int offset, int length) {
        // system arraycopy does the boundary checks anyways, no need to check extra
        System.arraycopy(src, offset, this.memory, index, length);
    }

    @Override
    public final boolean getBoolean(int index) {
        return this.memory[index] != 0;
    }

    @Override
    public final void putBoolean(int index, boolean value) {
        this.memory[index] = (byte) (value ? 1 : 0);
    }

    // -------------------------------------------------------------------------
    //                     Bulk Read and Write Methods
    // -------------------------------------------------------------------------

    @Override
    public final void get(DataOutput out, int offset, int length) throws IOException {
        out.write(this.memory, offset, length);
    }

    @Override
    public final void put(DataInput in, int offset, int length) throws IOException {
        in.readFully(this.memory, offset, length);
    }

    @Override
    public final void get(int offset, ByteBuffer target, int numBytes) {
        // ByteBuffer performs the boundary checks
        target.put(this.memory, offset, numBytes);
    }

    @Override
    public final void put(int offset, ByteBuffer source, int numBytes) {
        // ByteBuffer performs the boundary checks
        source.get(this.memory, offset, numBytes);
    }

    // -------------------------------------------------------------------------
    //                             Factoring
    // -------------------------------------------------------------------------

    /**
     * A memory segment factory that produces heap memory segments. Note that this factory does not
     * support to allocate off-heap memory.
     */
    public static final class HeapMemorySegmentFactory  {

        public HeapMemorySegment wrap(byte[] memory) {
            return new HeapMemorySegment(memory);
        }

        public HeapMemorySegment allocateUnpooledSegment(int size, Object owner) {
            return new HeapMemorySegment(new byte[size], owner);
        }

        public HeapMemorySegment wrapPooledHeapMemory(byte[] memory, Object owner) {
            return new HeapMemorySegment(memory, owner);
        }

        /**
         * Prevent external instantiation.
         */
        HeapMemorySegmentFactory() {}
    }

    public static final HeapMemorySegmentFactory FACTORY = new HeapMemorySegmentFactory();
}
  • HeapMemorySegment inherits the MemorySegment, which has a memory attribute of byte[]. the free operation will set the memory to null, and the wrap method uses the memory attribute. Its constructor requires that the incoming memory cannot be null, and then assigns it to the parent class’s heapMemory attribute and its own defined memory attribute (Quote); It also defines the HeapMemorySegmentFactory and provides wrap, allocateUnpooledSegment, wrapPooledHeapMemory methods.

HybridMemorySegment

flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java

@Internal
public final class HybridMemorySegment extends MemorySegment {

    /**
     * The direct byte buffer that allocated the off-heap memory. This memory segment holds a
     * reference to that buffer, so as long as this memory segment lives, the memory will not be
     * released.
     */
    private final ByteBuffer offHeapBuffer;

    /**
     * Creates a new memory segment that represents the memory backing the given direct byte buffer.
     * Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)},
     * otherwise this method with throw an IllegalArgumentException.
     *
     * <p>The owner referenced by this memory segment is null.
     *
     * @param buffer The byte buffer whose memory is represented by this memory segment.
     * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
     */
    HybridMemorySegment(ByteBuffer buffer) {
        this(buffer, null);
    }

    /**
     * Creates a new memory segment that represents the memory backing the given direct byte buffer.
     * Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)},
     * otherwise this method with throw an IllegalArgumentException.
     *
     * <p>The memory segment references the given owner.
     *
     * @param buffer The byte buffer whose memory is represented by this memory segment.
     * @param owner The owner references by this memory segment.
     * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
     */
    HybridMemorySegment(ByteBuffer buffer, Object owner) {
        super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
        this.offHeapBuffer = buffer;
    }

    /**
     * Creates a new memory segment that represents the memory of the byte array.
     *
     * <p>The owner referenced by this memory segment is null.
     *
     * @param buffer The byte array whose memory is represented by this memory segment.
     */
    HybridMemorySegment(byte[] buffer) {
        this(buffer, null);
    }

    /**
     * Creates a new memory segment that represents the memory of the byte array.
     *
     * <p>The memory segment references the given owner.
     *
     * @param buffer The byte array whose memory is represented by this memory segment.
     * @param owner The owner references by this memory segment.
     */
    HybridMemorySegment(byte[] buffer, Object owner) {
        super(buffer, owner);
        this.offHeapBuffer = null;
    }

    // -------------------------------------------------------------------------
    //  MemorySegment operations
    // -------------------------------------------------------------------------

    /**
     * Gets the buffer that owns the memory of this memory segment.
     *
     * @return The byte buffer that owns the memory of this memory segment.
     */
    public ByteBuffer getOffHeapBuffer() {
        if (offHeapBuffer != null) {
            return offHeapBuffer;
        } else {
            throw new IllegalStateException("Memory segment does not represent off heap memory");
        }
    }

    @Override
    public ByteBuffer wrap(int offset, int length) {
        if (address <= addressLimit) {
            if (heapMemory != null) {
                return ByteBuffer.wrap(heapMemory, offset, length);
            }
            else {
                try {
                    ByteBuffer wrapper = offHeapBuffer.duplicate();
                    wrapper.limit(offset + length);
                    wrapper.position(offset);
                    return wrapper;
                }
                catch (IllegalArgumentException e) {
                    throw new IndexOutOfBoundsException();
                }
            }
        }
        else {
            throw new IllegalStateException("segment has been freed");
        }
    }

    // ------------------------------------------------------------------------
    //  Random Access get() and put() methods
    // ------------------------------------------------------------------------

    @Override
    public final byte get(int index) {
        final long pos = address + index;
        if (index >= 0 && pos < addressLimit) {
            return UNSAFE.getByte(heapMemory, pos);
        }
        else if (address > addressLimit) {
            throw new IllegalStateException("segment has been freed");
        }
        else {
            // index is in fact invalid
            throw new IndexOutOfBoundsException();
        }
    }

    @Override
    public final void put(int index, byte b) {
        final long pos = address + index;
        if (index >= 0 && pos < addressLimit) {
            UNSAFE.putByte(heapMemory, pos, b);
        }
        else if (address > addressLimit) {
            throw new IllegalStateException("segment has been freed");
        }
        else {
            // index is in fact invalid
            throw new IndexOutOfBoundsException();
        }
    }

    @Override
    public final void get(int index, byte[] dst) {
        get(index, dst, 0, dst.length);
    }

    @Override
    public final void put(int index, byte[] src) {
        put(index, src, 0, src.length);
    }

    @Override
    public final void get(int index, byte[] dst, int offset, int length) {
        // check the byte array offset and length and the status
        if ((offset | length | (offset + length) | (dst.length - (offset + length))) < 0) {
            throw new IndexOutOfBoundsException();
        }

        final long pos = address + index;
        if (index >= 0 && pos <= addressLimit - length) {
            final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
            UNSAFE.copyMemory(heapMemory, pos, dst, arrayAddress, length);
        }
        else if (address > addressLimit) {
            throw new IllegalStateException("segment has been freed");
        }
        else {
            // index is in fact invalid
            throw new IndexOutOfBoundsException();
        }
    }

    @Override
    public final void put(int index, byte[] src, int offset, int length) {
        // check the byte array offset and length
        if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) {
            throw new IndexOutOfBoundsException();
        }

        final long pos = address + index;

        if (index >= 0 && pos <= addressLimit - length) {
            final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
            UNSAFE.copyMemory(src, arrayAddress, heapMemory, pos, length);
        }
        else if (address > addressLimit) {
            throw new IllegalStateException("segment has been freed");
        }
        else {
            // index is in fact invalid
            throw new IndexOutOfBoundsException();
        }
    }

    @Override
    public final boolean getBoolean(int index) {
        return get(index) != 0;
    }

    @Override
    public final void putBoolean(int index, boolean value) {
        put(index, (byte) (value ? 1 : 0));
    }

    // -------------------------------------------------------------------------
    //  Bulk Read and Write Methods
    // -------------------------------------------------------------------------

    @Override
    public final void get(DataOutput out, int offset, int length) throws IOException {
        if (address <= addressLimit) {
            if (heapMemory != null) {
                out.write(heapMemory, offset, length);
            }
            else {
                while (length >= 8) {
                    out.writeLong(getLongBigEndian(offset));
                    offset += 8;
                    length -= 8;
                }

                while (length > 0) {
                    out.writeByte(get(offset));
                    offset++;
                    length--;
                }
            }
        }
        else {
            throw new IllegalStateException("segment has been freed");
        }
    }

    @Override
    public final void put(DataInput in, int offset, int length) throws IOException {
        if (address <= addressLimit) {
            if (heapMemory != null) {
                in.readFully(heapMemory, offset, length);
            }
            else {
                while (length >= 8) {
                    putLongBigEndian(offset, in.readLong());
                    offset += 8;
                    length -= 8;
                }
                while (length > 0) {
                    put(offset, in.readByte());
                    offset++;
                    length--;
                }
            }
        }
        else {
            throw new IllegalStateException("segment has been freed");
        }
    }

    @Override
    public final void get(int offset, ByteBuffer target, int numBytes) {
        // check the byte array offset and length
        if ((offset | numBytes | (offset + numBytes)) < 0) {
            throw new IndexOutOfBoundsException();
        }

        final int targetOffset = target.position();
        final int remaining = target.remaining();

        if (remaining < numBytes) {
            throw new BufferOverflowException();
        }

        if (target.isDirect()) {
            if (target.isReadOnly()) {
                throw new ReadOnlyBufferException();
            }

            // copy to the target memory directly
            final long targetPointer = getAddress(target) + targetOffset;
            final long sourcePointer = address + offset;

            if (sourcePointer <= addressLimit - numBytes) {
                UNSAFE.copyMemory(heapMemory, sourcePointer, null, targetPointer, numBytes);
                target.position(targetOffset + numBytes);
            }
            else if (address > addressLimit) {
                throw new IllegalStateException("segment has been freed");
            }
            else {
                throw new IndexOutOfBoundsException();
            }
        }
        else if (target.hasArray()) {
            // move directly into the byte array
            get(offset, target.array(), targetOffset + target.arrayOffset(), numBytes);

            // this must be after the get() call to ensue that the byte buffer is not
            // modified in case the call fails
            target.position(targetOffset + numBytes);
        }
        else {
            // neither heap buffer nor direct buffer
            while (target.hasRemaining()) {
                target.put(get(offset++));
            }
        }
    }

    @Override
    public final void put(int offset, ByteBuffer source, int numBytes) {
        // check the byte array offset and length
        if ((offset | numBytes | (offset + numBytes)) < 0) {
            throw new IndexOutOfBoundsException();
        }

        final int sourceOffset = source.position();
        final int remaining = source.remaining();

        if (remaining < numBytes) {
            throw new BufferUnderflowException();
        }

        if (source.isDirect()) {
            // copy to the target memory directly
            final long sourcePointer = getAddress(source) + sourceOffset;
            final long targetPointer = address + offset;

            if (targetPointer <= addressLimit - numBytes) {
                UNSAFE.copyMemory(null, sourcePointer, heapMemory, targetPointer, numBytes);
                source.position(sourceOffset + numBytes);
            }
            else if (address > addressLimit) {
                throw new IllegalStateException("segment has been freed");
            }
            else {
                throw new IndexOutOfBoundsException();
            }
        }
        else if (source.hasArray()) {
            // move directly into the byte array
            put(offset, source.array(), sourceOffset + source.arrayOffset(), numBytes);

            // this must be after the get() call to ensue that the byte buffer is not
            // modified in case the call fails
            source.position(sourceOffset + numBytes);
        }
        else {
            // neither heap buffer nor direct buffer
            while (source.hasRemaining()) {
                put(offset++, source.get());
            }
        }
    }

    //......
}
  • HYBRIDEMORY SEGMENT inherits the MemorySegment, which has an offHeapBuffer attribute of ByteBuffer type. Since the parent class itself already has a heapMemory attribute of byte[], the memory managed by HYBRIDEMORY SEGMENT can be on-heap (Using Constructors with byte[] Type Parameters) can also be off-heap (Use constructors with ByteBuffer type parameters); The wrap method determines that if headmemory is not null, headmemory is used; otherwise, offHeapBuffer is used

Summary

  • MemorySegment is somewhat similar to java.nio.ByteBuffer; ; It has a heapMemory property of type byte[]; It has two constructors. Constructors with byte[] type parameters will assign byte[] to Headmemory, while constructors without byte[] type parameters will have Headmemory null; . The isOffHeap method is used to determine whether the current memory segment is heap or off-heap, which is determined according to whether heapMemory is null, and OFF-HEAP if null; In addition, comparison, swapBytes and copyTo methods are provided. It also shows that BigEndian and LittleEndian get and put methods are provided. MemorySegment defines fre e, wrap, get, put, getBoolean, putBoolean abstract methods that require subclasses to implement; MemorySegment has two subclasses, namely HeapMemorySegment and HybridMemorySegment.
  • HeapMemorySegment inherits the MemorySegment, which has a memory attribute of byte[]. the free operation will set the memory to null, and the wrap method uses the memory attribute. Its constructor requires that the incoming memory cannot be null, and then assigns it to the parent class’s heapMemory attribute and its own defined memory attribute (Quote); It also defines the HeapMemorySegmentFactory and provides wrap, allocateUnpooledSegment, wrapPooledHeapMemory methods.
  • HYBRIDEMORY SEGMENT inherits the MemorySegment, which has an offHeapBuffer attribute of ByteBuffer type. Since the parent class itself already has a heapMemory attribute of byte[], the memory managed by HYBRIDEMORY SEGMENT can be on-heap (Using Constructors with byte[] Type Parameters) can also be off-heap (Use constructors with ByteBuffer type parameters); The wrap method determines that if headmemory is not null, headmemory is used; otherwise, offHeapBuffer is used

doc