Talk about netbuffer watermark of storm client.

  storm

Order

This article mainly studies the netbuffer watermark of storm client.

Config

storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java

    /**
     * Netty based messaging: The netty write buffer high watermark in bytes.
     * <p>
     * If the number of bytes queued in the netty's write buffer exceeds this value, the netty {@code Channel.isWritable()} will start to
     * return {@code false}. The client will wait until the value falls below the {@linkplain #STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK
     * low water mark}.
     * </p>
     */
    @isInteger
    @isPositiveNumber
    public static final String STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK = "storm.messaging.netty.buffer.high.watermark";
    /**
     * Netty based messaging: The netty write buffer low watermark in bytes.
     * <p>
     * Once the number of bytes queued in the write buffer exceeded the {@linkplain #STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK high water
     * mark} and then dropped down below this value, the netty {@code Channel.isWritable()} will start to return true.
     * </p>
     */
    @isInteger
    @isPositiveNumber
    public static final String STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK = "storm.messaging.netty.buffer.low.watermark";
  • Here are two related parameters, namely, storm.messaging.netty.buffer.high.watermark and storm.messaging.netty.buffer.low.watermark.
  • defaults.yaml
# The netty write buffer high watermark in bytes.
# If the number of bytes queued in the netty's write buffer exceeds this value, the netty client will block
# until the value falls below the low water mark.
storm.messaging.netty.buffer.high.watermark: 16777216 # 16 MB
# The netty write buffer low watermark in bytes.
# Once the number of bytes queued in the write buffer exceeded the high water mark and then
# dropped down below this value, any blocked clients will unblock and start processing further messages.
storm.messaging.netty.buffer.low.watermark: 8388608 # 8 MB

In the defaults.yaml file, the default size of low.watermark is 8388608, or 8m; The default size of high.watermark is 16777216, or 16M.

Client

storm-2.0.0/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java

    Client(Map<String, Object> topoConf, AtomicBoolean[] remoteBpStatus,
        EventLoopGroup eventLoopGroup, HashedWheelTimer scheduler, String host,
           int port) {
        this.topoConf = topoConf;
        closing = false;
        this.scheduler = scheduler;
        int bufferSize = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
        int lowWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK));
        int highWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK));
        // if SASL authentication is disabled, saslChannelReady is initialized as true; otherwise false
        saslChannelReady.set(!ObjectReader.getBoolean(topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false));
        LOG.info("Creating Netty Client, connecting to {}:{}, bufferSize: {}, lowWatermark: {}, highWatermark: {}",
                 host, port, bufferSize, lowWatermark, highWatermark);

        int minWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
        int maxWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
        retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, -1);

        // Initiate connection to remote destination
        this.eventLoopGroup = eventLoopGroup;
        // Initiate connection to remote destination
        bootstrap = new Bootstrap()
            .group(this.eventLoopGroup)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_SNDBUF, bufferSize)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(lowWatermark, highWatermark))
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .handler(new StormClientPipelineFactory(this, remoteBpStatus, topoConf));
        dstAddress = new InetSocketAddress(host, port);
        dstAddressPrefixedName = prefixedName(dstAddress);
        launchChannelAliveThread();
        scheduleConnect(NO_DELAY_MS);
        int messageBatchSize = ObjectReader.getInt(topoConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
        batcher = new MessageBuffer(messageBatchSize);
        String clazz = (String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY);
        if (clazz == null) {
            waitStrategy = new WaitStrategyProgressive();
        } else {
            waitStrategy = ReflectionUtils.newInstance(clazz);
        }
        waitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
    }
  • The WriteBufferWaterMark object is created according to lowWatermark and highWatermark and set to channeloption.write _ buffer _ water _ mark

WriteBufferWaterMark

netty-all-4.1.25.Final-sources.jar! /io/netty/channel/WriteBufferWaterMark.java

/**
 * WriteBufferWaterMark is used to set low water mark and high water mark for the write buffer.
 * <p>
 * If the number of bytes queued in the write buffer exceeds the
 * {@linkplain #high high water mark}, {@link Channel#isWritable()}
 * will start to return {@code false}.
 * <p>
 * If the number of bytes queued in the write buffer exceeds the
 * {@linkplain #high high water mark} and then
 * dropped down below the {@linkplain #low low water mark},
 * {@link Channel#isWritable()} will start to return
 * {@code true} again.
 */
public final class WriteBufferWaterMark {

    private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
    private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;

    public static final WriteBufferWaterMark DEFAULT =
            new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, DEFAULT_HIGH_WATER_MARK, false);

    private final int low;
    private final int high;

    /**
     * Create a new instance.
     *
     * @param low low water mark for write buffer.
     * @param high high water mark for write buffer
     */
    public WriteBufferWaterMark(int low, int high) {
        this(low, high, true);
    }

    /**
     * This constructor is needed to keep backward-compatibility.
     */
    WriteBufferWaterMark(int low, int high, boolean validate) {
        if (validate) {
            if (low < 0) {
                throw new IllegalArgumentException("write buffer's low water mark must be >= 0");
            }
            if (high < low) {
                throw new IllegalArgumentException(
                        "write buffer's high water mark cannot be less than " +
                                " low water mark (" + low + "): " +
                                high);
            }
        }
        this.low = low;
        this.high = high;
    }

    /**
     * Returns the low water mark for the write buffer.
     */
    public int low() {
        return low;
    }

    /**
     * Returns the high water mark for the write buffer.
     */
    public int high() {
        return high;
    }

    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder(55)
            .append("WriteBufferWaterMark(low: ")
            .append(low)
            .append(", high: ")
            .append(high)
            .append(")");
        return builder.toString();
    }

}
  • It can be seen from the comments that these two parameters control the Channel.isWritable () method.

ChannelOutboundBuffer.setWritable

netty-all-4.1.25.Final-sources.jar! /io/netty/channel/ChannelOutboundBuffer.java

    private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");

    private volatile int unwritable;

    /**
     * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
     * not exceed the write watermark of the {@link Channel} and
     * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
     * {@code false}.
     */
    public boolean isWritable() {
        return unwritable == 0;
    }

    /**
     * Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}.
     * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
     */
    public long bytesBeforeWritable() {
        long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark();
        // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
        // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
        // together. totalPendingSize will be updated before isWritable().
        if (bytes > 0) {
            return isWritable() ? 0 : bytes;
        }
        return 0;
    }

    /**
     * Decrement the pending bytes which will be written at some point.
     * This method is thread-safe!
     */
    void decrementPendingOutboundBytes(long size) {
        decrementPendingOutboundBytes(size, true, true);
    }

    private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
        if (size == 0) {
            return;
        }

        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
        if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
            setWritable(invokeLater);
        }
    }

    private void setWritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue & ~1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue != 0 && newValue == 0) {
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }

    private void fireChannelWritabilityChanged(boolean invokeLater) {
        final ChannelPipeline pipeline = channel.pipeline();
        if (invokeLater) {
            Runnable task = fireChannelWritabilityChangedTask;
            if (task == null) {
                fireChannelWritabilityChangedTask = task = new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelWritabilityChanged();
                    }
                };
            }
            channel.eventLoop().execute(task);
        } else {
            pipeline.fireChannelWritabilityChanged();
        }
    }
  • BytesBeforeWritable method first determines whether totalPendingSize is greater than lowWatermark, returns 0 if it is not greater than, returns 0 if it is greater than and isWritable returns true, otherwise returns the difference value
  • The decrementPendingOutboundBytes method determines that setwriteblesetwriteable (invoklater) is called if notifyWritability is true and newwritebuffersize < channel.config (). getwritebufferlowwatermark ()
  • SetWritable will update unwritable, and if it changes from non-zero to zero, it will also trigger fireChannelWritabilityChanged for notification.

ChannelOutboundBuffer.setUnwritable

netty-all-4.1.25.Final-sources.jar! /io/netty/channel/ChannelOutboundBuffer.java

    private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");

    private volatile int unwritable;
    
    /**
     * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
     * not exceed the write watermark of the {@link Channel} and
     * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
     * {@code false}.
     */
    public boolean isWritable() {
        return unwritable == 0;
    }

    /**
     * Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
     * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
     */
    public long bytesBeforeUnwritable() {
        long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize;
        // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability.
        // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
        // together. totalPendingSize will be updated before isWritable().
        if (bytes > 0) {
            return isWritable() ? bytes : 0;
        }
        return 0;
    }

    /**
     * Increment the pending bytes which will be written at some point.
     * This method is thread-safe!
     */
    void incrementPendingOutboundBytes(long size) {
        incrementPendingOutboundBytes(size, true);
    }

    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size == 0) {
            return;
        }

        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }

    private void setUnwritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue | 1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue == 0 && newValue != 0) {
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }

    private void fireChannelWritabilityChanged(boolean invokeLater) {
        final ChannelPipeline pipeline = channel.pipeline();
        if (invokeLater) {
            Runnable task = fireChannelWritabilityChangedTask;
            if (task == null) {
                fireChannelWritabilityChangedTask = task = new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelWritabilityChanged();
                    }
                };
            }
            channel.eventLoop().execute(task);
        } else {
            pipeline.fireChannelWritabilityChanged();
        }
    }
  • BytesBeforeUnwritable method first judges the difference between highWatermark and totalPendingSize, totalPendingSize is greater than or equal to highWatermark, then returns 0; If it is less than highWatermark and isWritable is true, the difference is returned; otherwise, 0 is returned.
  • The incrementPendingOutboundBytes method determines that if newwritebuffersize > channel.config (). getwritebufferinghuatermark (), setUnwritable(invokeLater) is called
  • SetUnwritable will update unwritable, and if it changes from 0 to non-0, it will also trigger fireChannelWritabilityChanged to notify

Summary

  • Net ty.buffer.high.watermark (Default 16M) and storm.messaging.netty.buffer.low.watermark (Default 8MIn fact, netty’s channeloption.write _ buffer _ water _ mark is configured
  • Netty’s WriteBufferWaterMark affects the bytestbeforewriteable and bytestbeforewriteable methods return values (At present, these two methods seem to call less)
  • LowWatermark and highWatermark are used in the methods of decrementPendingOutboundBytes and incrementPendingOutboundBytes respectively (At present, these two methods should work.), when less than lowWatermark or greater than highWatermark, trigger setWritable and setUnwritable respectively to change unwritable field of ChannelOutboundBuffer, thus affecting isWritable method; Write requests are executed immediately when isWritable is true, and when false is returned, write requests are queued until isWritable is true to execute these stacked write requests.

doc