Talk about lettuce’s Indicator Monitoring

  lettuce, redis

Order

This article mainly studies lettuce’s index monitoring

DefaultCommandLatencyEventPublisher

lettuce-core-5.0.4.RELEASE-sources.jar! /io/lettuce/core/event/metrics/DefaultCommandLatencyEventPublisher.java

public class DefaultCommandLatencyEventPublisher implements MetricEventPublisher {

    private final EventExecutorGroup eventExecutorGroup;
    private final EventPublisherOptions options;
    private final EventBus eventBus;
    private final CommandLatencyCollector commandLatencyCollector;

    private final Runnable EMITTER = this::emitMetricsEvent;

    private volatile ScheduledFuture<?> scheduledFuture;

    public DefaultCommandLatencyEventPublisher(EventExecutorGroup eventExecutorGroup, EventPublisherOptions options,
            EventBus eventBus, CommandLatencyCollector commandLatencyCollector) {

        this.eventExecutorGroup = eventExecutorGroup;
        this.options = options;
        this.eventBus = eventBus;
        this.commandLatencyCollector = commandLatencyCollector;

        if (!options.eventEmitInterval().isZero()) {
            scheduledFuture = this.eventExecutorGroup.scheduleAtFixedRate(EMITTER, options.eventEmitInterval().toMillis(),
                    options.eventEmitInterval().toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public boolean isEnabled() {
        return !options.eventEmitInterval().isZero() && scheduledFuture != null;
    }

    @Override
    public void shutdown() {

        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
            scheduledFuture = null;
        }
    }

    @Override
    public void emitMetricsEvent() {

        if (!isEnabled() || !commandLatencyCollector.isEnabled()) {
            return;
        }

        eventBus.publish(new CommandLatencyEvent(commandLatencyCollector.retrieveMetrics()));
    }

}
  • It is judged here that if the eventEmitInterval of EventPublisherOptions is not 0, the scheduling timing is started to call emitMetricsEvent to issue a command delay event.

LettuceConnectionConfiguration

spring-boot-autoconfigure-2.0.4.RELEASE-sources.jar! /org/springframework/boot/autoconfigure/data/redis/LettuceConnectionConfiguration.java

@Configuration
@ConditionalOnClass(RedisClient.class)
class LettuceConnectionConfiguration extends RedisConnectionConfiguration {

    private final RedisProperties properties;

    private final List<LettuceClientConfigurationBuilderCustomizer> builderCustomizers;

    LettuceConnectionConfiguration(RedisProperties properties,
            ObjectProvider<RedisSentinelConfiguration> sentinelConfigurationProvider,
            ObjectProvider<RedisClusterConfiguration> clusterConfigurationProvider,
            ObjectProvider<List<LettuceClientConfigurationBuilderCustomizer>> builderCustomizers) {
        super(properties, sentinelConfigurationProvider, clusterConfigurationProvider);
        this.properties = properties;
        this.builderCustomizers = builderCustomizers
                .getIfAvailable(Collections::emptyList);
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingBean(ClientResources.class)
    public DefaultClientResources lettuceClientResources() {
        return DefaultClientResources.create();
    }

    //......
}
  • LettuceConnectionConfiguration created DefaultClientResources using the default configuration

DefaultEventPublisherOptions

lettuce-core-5.0.4.RELEASE-sources.jar! /io/lettuce/core/event/DefaultEventPublisherOptions.java

public class DefaultEventPublisherOptions implements EventPublisherOptions {

    public static final long DEFAULT_EMIT_INTERVAL = 10;
    public static final TimeUnit DEFAULT_EMIT_INTERVAL_UNIT = TimeUnit.MINUTES;
    public static final Duration DEFAULT_EMIT_INTERVAL_DURATION = Duration.ofMinutes(DEFAULT_EMIT_INTERVAL);

    //......
}
  • The DEFAULT_EMIT_INTERVAL_DURATION here defaults to 10 minutes.

CommandLatencyCollector

lettuce-core-5.0.4.RELEASE-sources.jar! /io/lettuce/core/metrics/CommandLatencyCollector.java

public interface CommandLatencyCollector extends MetricCollector<Map<CommandLatencyId, CommandMetrics>> {

    /**
     * Record the command latency per {@code connectionPoint} and {@code commandType}.
     *
     * @param local the local address
     * @param remote the remote address
     * @param commandType the command type
     * @param firstResponseLatency latency value in {@link TimeUnit#NANOSECONDS} from send to the first response
     * @param completionLatency latency value in {@link TimeUnit#NANOSECONDS} from send to the command completion
     */
    void recordCommandLatency(SocketAddress local, SocketAddress remote, ProtocolKeyword commandType,
            long firstResponseLatency, long completionLatency);

}
  • DefaultComMandatenCentPublisher mainly relies on CommandLatencyCollector to obtain indexes, while its index collection relies on recordCommandLatency method to record

CommandHandler

lettuce-core-5.0.4.RELEASE-sources.jar! /io/lettuce/core/protocol/CommandHandler.java

/**
 * A netty {@link ChannelHandler} responsible for writing redis commands and reading responses from the server.
 *
 * @author Will Glozer
 * @author Mark Paluch
 * @author Jongyeol Choi
 * @author Grzegorz Szpak
 */
public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCommands {
    //......
   /**
     * @see io.netty.channel.ChannelInboundHandlerAdapter#channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object)
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf input = (ByteBuf) msg;

        if (!input.isReadable() || input.refCnt() == 0) {
            logger.warn("{} Input not readable {}, {}", logPrefix(), input.isReadable(), input.refCnt());
            return;
        }

        if (debugEnabled) {
            logger.debug("{} Received: {} bytes, {} commands in the stack", logPrefix(), input.readableBytes(), stack.size());
        }

        try {
            if (buffer.refCnt() < 1) {
                logger.warn("{} Ignoring received data for closed or abandoned connection", logPrefix());
                return;
            }

            if (debugEnabled && ctx.channel() != channel) {
                logger.debug("{} Ignoring data for a non-registered channel {}", logPrefix(), ctx.channel());
                return;
            }

            if (traceEnabled) {
                logger.trace("{} Buffer: {}", logPrefix(), input.toString(Charset.defaultCharset()).trim());
            }

            buffer.writeBytes(input);

            decode(ctx, buffer);
        } finally {
            input.release();
        }
    }

    private boolean decode(ChannelHandlerContext ctx, ByteBuf buffer, RedisCommand<?, ?, ?> command) {

        if (latencyMetricsEnabled && command instanceof WithLatency) {

            WithLatency withLatency = (WithLatency) command;
            if (withLatency.getFirstResponse() == -1) {
                withLatency.firstResponse(nanoTime());
            }

            if (!decode0(ctx, buffer, command)) {
                return false;
            }

            recordLatency(withLatency, command.getType());

            return true;
        }

        return decode0(ctx, buffer, command);
    }

    private void recordLatency(WithLatency withLatency, ProtocolKeyword commandType) {

        if (withLatency != null && clientResources.commandLatencyCollector().isEnabled() && channel != null && remote() != null) {

            long firstResponseLatency = withLatency.getFirstResponse() - withLatency.getSent();
            long completionLatency = nanoTime() - withLatency.getSent();

            clientResources.commandLatencyCollector().recordCommandLatency(local(), remote(), commandType,
                    firstResponseLatency, completionLatency);
        }
    }
}
  • ChannelRead calls the decode method here, and the decode method finally calls clientresources.commandlatencycollector (). recordcommandlatency to record the delay of the command

Consumer event instance

public class LettuceEventConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(LettuceEventConsumer.class);

    EventBus eventBus;

    public LettuceEventConsumer(EventBus eventBus) {
        this.eventBus = eventBus;
    }

    @PostConstruct
    public void init(){
        eventBus.get().subscribe(e -> {
            LOGGER.info("event:{}",e);
        });
    }
}
  • Here we get Flux<Event > through eventBus and then consume it. the output example is as follows:
2018-09-11 16:32:57.361  INFO 6656 --- [xecutorLoop-1-3] com.example.config.LettuceEventConsumer  : event:{[local:any -> /192.168.99.100:6379, commandType=GET]=[count=1, timeUnit=MICROSECONDS, firstResponse=[min=884, max=888, percentiles={50.0=888, 90.0=888, 95.0=888, 99.0=888, 99.9=888}], completion=[min=950, max=954, percentiles={50.0=954, 90.0=954, 95.0=954, 99.0=954, 99.9=954}]], [local:any -> /192.168.99.100:6379, commandType=INFO]=[count=1, timeUnit=MICROSECONDS, firstResponse=[min=1449, max=1458, percentiles={50.0=1458, 90.0=1458, 95.0=1458, 99.0=1458, 99.9=1458}], completion=[min=2457, max=2473, percentiles={50.0=2473, 90.0=2473, 95.0=2473, 99.0=2473, 99.9=2473}]], [local:any -> /192.168.99.100:6379, commandType=PUBLISH]=[count=40, timeUnit=MICROSECONDS, firstResponse=[min=708, max=17956, percentiles={50.0=1343, 90.0=2719, 95.0=3948, 99.0=17956, 99.9=17956}], completion=[min=733, max=17956, percentiles={50.0=1376, 90.0=2752, 95.0=3981, 99.0=17956, 99.9=17956}]], [local:any -> /192.168.99.100:6379, commandType=SET]=[count=1, timeUnit=MICROSECONDS, firstResponse=[min=909, max=913, percentiles={50.0=913, 90.0=913, 95.0=913, 99.0=913, 99.9=913}], completion=[min=995, max=999, percentiles={50.0=999, 90.0=999, 95.0=999, 99.0=999, 99.9=999}]], [local:any -> /192.168.99.100:6379, commandType=SUBSCRIBE]=[count=1, timeUnit=MICROSECONDS, firstResponse=[min=19267, max=19398, percentiles={50.0=19398, 90.0=19398, 95.0=19398, 99.0=19398, 99.9=19398}], completion=[min=41418, max=41680, percentiles={50.0=41680, 90.0=41680, 95.0=41680, 99.0=41680, 99.9=41680}]]}

Summary

Lettuce uses built-in eventBus, and then issues corresponding delay events for the execution of its commands. the client side can consume eventBus data according to the demand to obtain relevant indicators of lettuce. It can be said that in the indicator monitoring scene, the event-driven method is adopted for implementation, which is more flexible and gives full play to the idea of Event-Driven Architecture.

doc