Talk about HystrixEventStream

  hystrix

Order

This article mainly studies the HystrixEventStream

HystrixEventStream

hystrix-core-1.5.12-sources.jar! /com/netflix/hystrix/metric/HystrixEventStream.java

/**
 * Base interface for a stream of {@link com.netflix.hystrix.HystrixEventType}s.  Allows consumption by individual
 * {@link com.netflix.hystrix.HystrixEventType} or by time-based bucketing of events
 */
public interface HystrixEventStream<E extends HystrixEvent> {

    Observable<E> observe();
}

This interface defines an observe method and returns rxjava’s Observable. It has the following implementation classes

  • HystrixCommandStartStream
  • HystrixCommandCompletionStream
  • HystrixThreadPoolStartStream
  • HystrixThreadPoolCompletionStream
  • HystrixCollapserEventStream

HystrixCommandStartStream

hystrix-core-1.5.12-sources.jar! /com/netflix/hystrix/metric/HystrixCommandStartStream.java

/**
 * Per-Command stream of {@link HystrixCommandExecutionStarted}s.  This gets written to by {@link HystrixThreadEventStream}s.
 * Events are emitted synchronously in the same thread that performs the command execution.
 */
public class HystrixCommandStartStream implements HystrixEventStream<HystrixCommandExecutionStarted> {
    private final HystrixCommandKey commandKey;

    private final Subject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted> writeOnlySubject;
    private final Observable<HystrixCommandExecutionStarted> readOnlyStream;

    private static final ConcurrentMap<String, HystrixCommandStartStream> streams = new ConcurrentHashMap<String, HystrixCommandStartStream>();

    public static HystrixCommandStartStream getInstance(HystrixCommandKey commandKey) {
        HystrixCommandStartStream initialStream = streams.get(commandKey.name());
        if (initialStream != null) {
            return initialStream;
        } else {
            synchronized (HystrixCommandStartStream.class) {
                HystrixCommandStartStream existingStream = streams.get(commandKey.name());
                if (existingStream == null) {
                    HystrixCommandStartStream newStream = new HystrixCommandStartStream(commandKey);
                    streams.putIfAbsent(commandKey.name(), newStream);
                    return newStream;
                } else {
                    return existingStream;
                }
            }
        }
    }

    HystrixCommandStartStream(final HystrixCommandKey commandKey) {
        this.commandKey = commandKey;

        this.writeOnlySubject = new SerializedSubject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted>(PublishSubject.<HystrixCommandExecutionStarted>create());
        this.readOnlyStream = writeOnlySubject.share();
    }

    public static void reset() {
        streams.clear();
    }

    public void write(HystrixCommandExecutionStarted event) {
        writeOnlySubject.onNext(event);
    }

    @Override
    public Observable<HystrixCommandExecutionStarted> observe() {
        return readOnlyStream;
    }

    @Override
    public String toString() {
        return "HystrixCommandStartStream(" + commandKey.name() + ")";
    }
}

HystrixThreadEventStream is written synchronously to HystrixCommandExecutionStarted, where the write method calls the onNext method of writeOnlySubject.

HystrixThreadEventStream

hystrix-core-1.5.12-sources.jar! /com/netflix/hystrix/metric/HystrixThreadEventStream.java

/**
 * Per-thread event stream.  No synchronization required when writing to it since it's single-threaded.
 *
 * Some threads will be dedicated to a single HystrixCommandKey (a member of a thread-isolated {@link HystrixThreadPool}.
 * However, many situations arise where a single thread may serve many different commands.  Examples include:
 * * Application caller threads (semaphore-isolated commands, or thread-pool-rejections)
 * * Timer threads (timeouts or collapsers)
 * <p>
 * I don't think that a thread-level view is an interesting one to consume (I could be wrong), so at the moment there
 * is no public way to consume it.  I can always add it later, if desired.
 * <p>
 * Instead, this stream writes to the following streams, which have more meaning to metrics consumers:
 * <ul>
 *     <li>{@link HystrixCommandCompletionStream}</li>
 *     <li>{@link HystrixCommandStartStream}</li>
 *     <li>{@link HystrixThreadPoolCompletionStream}</li>
 *     <li>{@link HystrixThreadPoolStartStream}</li>
 *     <li>{@link HystrixCollapserEventStream}</li>
 * </ul>
 *
 * Also note that any observers of this stream do so on the thread that writes the metric.  This is the command caller
 * thread in the SEMAPHORE-isolated case, and the Hystrix thread in the THREAD-isolated case. I determined this to
 * be more efficient CPU-wise than immediately hopping off-thread and doing all the metric calculations in the
 * RxComputationThreadPool.
 */
public class HystrixThreadEventStream {
    private final long threadId;
    private final String threadName;

    private final Subject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted> writeOnlyCommandStartSubject;
    private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlyCommandCompletionSubject;
    private final Subject<HystrixCollapserEvent, HystrixCollapserEvent> writeOnlyCollapserSubject;

    private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams = new ThreadLocal<HystrixThreadEventStream>() {
        @Override
        protected HystrixThreadEventStream initialValue() {
            return new HystrixThreadEventStream(Thread.currentThread());
        }
    };

    private static final Action1<HystrixCommandExecutionStarted> writeCommandStartsToShardedStreams = new Action1<HystrixCommandExecutionStarted>() {
        @Override
        public void call(HystrixCommandExecutionStarted event) {
            HystrixCommandStartStream commandStartStream = HystrixCommandStartStream.getInstance(event.getCommandKey());
            commandStartStream.write(event);

            if (event.isExecutedInThread()) {
                HystrixThreadPoolStartStream threadPoolStartStream = HystrixThreadPoolStartStream.getInstance(event.getThreadPoolKey());
                threadPoolStartStream.write(event);
            }
        }
    };

    private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() {
        @Override
        public void call(HystrixCommandCompletion commandCompletion) {
            HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey());
            commandStream.write(commandCompletion);

            if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) {
                HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey());
                threadPoolStream.write(commandCompletion);
            }
        }
    };

    private static final Action1<HystrixCollapserEvent> writeCollapserExecutionsToShardedStreams = new Action1<HystrixCollapserEvent>() {
        @Override
        public void call(HystrixCollapserEvent collapserEvent) {
            HystrixCollapserEventStream collapserStream = HystrixCollapserEventStream.getInstance(collapserEvent.getCollapserKey());
            collapserStream.write(collapserEvent);
        }
    };

    /* package */ HystrixThreadEventStream(Thread thread) {
        this.threadId = thread.getId();
        this.threadName = thread.getName();
        writeOnlyCommandStartSubject = PublishSubject.create();
        writeOnlyCommandCompletionSubject = PublishSubject.create();
        writeOnlyCollapserSubject = PublishSubject.create();

        writeOnlyCommandStartSubject
                .onBackpressureBuffer()
                .doOnNext(writeCommandStartsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());

        writeOnlyCommandCompletionSubject
                .onBackpressureBuffer()
                .doOnNext(writeCommandCompletionsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());

        writeOnlyCollapserSubject
                .onBackpressureBuffer()
                .doOnNext(writeCollapserExecutionsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());
    }

    public static HystrixThreadEventStream getInstance() {
        return threadLocalStreams.get();
    }

    public void shutdown() {
        writeOnlyCommandStartSubject.onCompleted();
        writeOnlyCommandCompletionSubject.onCompleted();
        writeOnlyCollapserSubject.onCompleted();
    }

    public void commandExecutionStarted(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey,
                                        HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy, int currentConcurrency) {
        HystrixCommandExecutionStarted event = new HystrixCommandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, currentConcurrency);
        writeOnlyCommandStartSubject.onNext(event);
    }

    public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
        HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
        writeOnlyCommandCompletionSubject.onNext(event);
    }

    public void collapserResponseFromCache(HystrixCollapserKey collapserKey) {
        HystrixCollapserEvent collapserEvent = HystrixCollapserEvent.from(collapserKey, HystrixEventType.Collapser.RESPONSE_FROM_CACHE, 1);
        writeOnlyCollapserSubject.onNext(collapserEvent);
    }

    public void collapserBatchExecuted(HystrixCollapserKey collapserKey, int batchSize) {
        HystrixCollapserEvent batchExecution = HystrixCollapserEvent.from(collapserKey, HystrixEventType.Collapser.BATCH_EXECUTED, 1);
        HystrixCollapserEvent batchAdditions = HystrixCollapserEvent.from(collapserKey, HystrixEventType.Collapser.ADDED_TO_BATCH, batchSize);
        writeOnlyCollapserSubject.onNext(batchExecution);
        writeOnlyCollapserSubject.onNext(batchAdditions);
    }

    @Override
    public String toString() {
        return "HystrixThreadEventStream (" + threadId + " - " + threadName + ")";
    }
}
  • WriteCommandStartsToHardedStreams will write HystrixCommmandStartedStream to HystrixCommmandStartedStream, and if it is an execution thread, it will call ThreadPoolStartStream. Write (Event);
  • WriteCommandCompletionStoShardedStreams will write HystrixCommmandcompletionStream to HystrixCommmandcompletionStream, and if it is an execution thread, it will call ThreedPoolStream. Write (CommandCompletion);
  • WriteCollapserexecutionsToShardedStreams writes HystrixCollapserEventStream HystrixcollapserEvent
  • The commandExecutionStarted method calls writeonlycommandstratsubject.onext (event)
  • The executionDone method calls writeonlycommmandcompletionsubject.onext (event)
  • The collapsed method calls writeonlycollapsesubject.onconnext (collapsed revent)

HystrixCommandMetrics

hystrix-core-1.5.12-sources.jar! /com/netflix/hystrix/HystrixCommandMetrics.java

/**
 * Used by {@link HystrixCommand} to record metrics.
 */
public class HystrixCommandMetrics extends HystrixMetrics {
    //......
    /* package-private */ void markCommandStart(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy) {
        int currentCount = concurrentExecutionCount.incrementAndGet();
        HystrixThreadEventStream.getInstance().commandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, currentCount);
    }

    /* package-private */ void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {
        HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey);
        if (executionStarted) {
            concurrentExecutionCount.decrementAndGet();
        }
    }
    //......
}

The markCommandStart and markCommandDone methods in HystrixCommandMetrics call HystrixThreadEventStream to get the instance and write the pointer.

HystrixCollapserMetrics

hystrix-core-1.5.12-sources.jar! /com/netflix/hystrix/HystrixCollapserMetrics.java

/**
 * Used by {@link HystrixCollapser} to record metrics.
 * {@link HystrixEventNotifier} not hooked up yet.  It may be in the future.
 */
public class HystrixCollapserMetrics extends HystrixMetrics {
    //......
    public void markRequestBatched() {
    }

    public void markResponseFromCache() {
        HystrixThreadEventStream.getInstance().collapserResponseFromCache(collapserKey);
    }

    public void markBatch(int batchSize) {
        HystrixThreadEventStream.getInstance().collapserBatchExecuted(collapserKey, batchSize);
    }

    public void markShards(int numShards) {
    }
    //......
}
  • The markResponseFromCache and markBatch methods in HystrixCollapserMetrics call HystrixThreadEventStream to get the instance and write the pointer.
  • The toObservable method in HystrixCollapser and HystrixObservableCollapser calls markResponseFromCache
  • The createObservableCommand method in HystrixCollapser and HystrixObservableCom calls the markBatch method

AbstractCommand

hystrix-core-1.5.12-sources.jar! /com/netflix/hystrix/AbstractCommand.java

markCommandStart

    private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
        if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
            // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                        return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                    }

                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

                    if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                        // the command timed out in the wrapping thread so we will return immediately
                        // and not increment any of the counters below or other such logic
                        return Observable.error(new RuntimeException("timed out before executing run()"));
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                        //we have not been unsubscribed, so should proceed
                        HystrixCounters.incrementGlobalConcurrentThreads();
                        threadPool.markThreadExecution();
                        // store the command that is being run
                        endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                        executionResult = executionResult.setExecutedInThread();
                        /**
                         * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
                         */
                        try {
                            executionHook.onThreadStart(_cmd);
                            executionHook.onRunStart(_cmd);
                            executionHook.onExecutionStart(_cmd);
                            return getUserExecutionObservable(_cmd);
                        } catch (Throwable ex) {
                            return Observable.error(ex);
                        }
                    } else {
                        //command has already been unsubscribed, so return immediately
                        return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                    }
                }
            }).doOnTerminate(new Action0() {
                @Override
                public void call() {
                    if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
                        handleThreadEnd(_cmd);
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
                        //if it was never started and received terminal, then no need to clean up (I don't think this is possible)
                    }
                    //if it was unsubscribed, then other cleanup handled it
                }
            }).doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
                        handleThreadEnd(_cmd);
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
                        //if it was never started and was cancelled, then no need to clean up
                    }
                    //if it was terminal, then other cleanup handled it
                }
            }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {
                    return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                }
            }));
        } else {
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                        return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                    }

                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                    // semaphore isolated
                    // store the command that is being run
                    endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                    try {
                        executionHook.onRunStart(_cmd);
                        executionHook.onExecutionStart(_cmd);
                        return getUserExecutionObservable(_cmd);  //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
                    } catch (Throwable ex) {
                        //If the above hooks throw, then use that as the result of the run method
                        return Observable.error(ex);
                    }
                }
            });
        }
    }

The executecommandwithspecifiedisolation method called the markCommandStart method

markCommandDone

    private void cleanUpAfterResponseFromCache(boolean commandExecutionStarted) {
        Reference<TimerListener> tl = timeoutTimer.get();
        if (tl != null) {
            tl.clear();
        }

        final long latency = System.currentTimeMillis() - commandStartTimestamp;
        executionResult = executionResult
                .addEvent(-1, HystrixEventType.RESPONSE_FROM_CACHE)
                .markUserThreadCompletion(latency)
                .setNotExecutedInThread();
        ExecutionResult cacheOnlyForMetrics = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE)
                .markUserThreadCompletion(latency);
        metrics.markCommandDone(cacheOnlyForMetrics, commandKey, threadPoolKey, commandExecutionStarted);
        eventNotifier.markEvent(HystrixEventType.RESPONSE_FROM_CACHE, commandKey);
    }

    private void handleCommandEnd(boolean commandExecutionStarted) {
        Reference<TimerListener> tl = timeoutTimer.get();
        if (tl != null) {
            tl.clear();
        }

        long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
        executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
        if (executionResultAtTimeOfCancellation == null) {
            metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
        } else {
            metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
        }

        if (endCurrentThreadExecutingCommand != null) {
            endCurrentThreadExecutingCommand.call();
        }
    }

The cleanUpAfterResponseFromCache and handleCommandEnd methods called the markCommandDone method

HystrixEventStream.observe

BucketedCounterStream

hystrix-core-1.5.12-sources.jar! /com/netflix/hystrix/metric/consumer/BucketedCounterStream.java

public abstract class BucketedCounterStream<Event extends HystrixEvent, Bucket, Output> {
    //......
    protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs,
                                    final Func2<Bucket, Event, Bucket> appendRawEventToBucket) {
        this.numBuckets = numBuckets;
        this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call(Observable<Event> eventBucket) {
                return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
            }
        };

        final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>();
        for (int i = 0; i < numBuckets; i++) {
            emptyEventCountsToStart.add(getEmptyBucketSummary());
        }

        this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call() {
                return inputEventStream
                        .observe()
                        .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
                        .flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types
                        .startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
            }
        });
    }
    //......
}

The observe method of HystrixEventStream<Event > is called here to consume Event stream, where event is generic, i.e. a subclass of HystrixEvent

RollingConcurrencyStream

hystrix-core-1.5.12-sources.jar! /com/netflix/hystrix/metric/consumer/RollingConcurrencyStream.java

public abstract class RollingConcurrencyStream {
    //......
    protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
        final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
        for (int i = 0; i < numBuckets; i++) {
            emptyRollingMaxBuckets.add(0);
        }

        rollingMaxStream = inputEventStream
                .observe()
                .map(getConcurrencyCountFromEvent)
                .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
                .flatMap(reduceStreamToMax)
                .startWith(emptyRollingMaxBuckets)
                .window(numBuckets, 1)
                .flatMap(reduceStreamToMax)
                .share()
                .onBackpressureDrop();
    }
    //......
}

Here, the observe method of hytrixevent stream < hytrixcommanddexecutionstarted > is called to consume eventstream

RollingDistributionStream

hystrix-core-1.5.12-sources.jar! /com/netflix/hystrix/metric/consumer/RollingDistributionStream.java

public class RollingDistributionStream<Event extends HystrixEvent> {
    //......
    protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
                                        final Func2<Histogram, Event, Histogram> addValuesToBucket) {
        final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
        for (int i = 0; i < numBuckets; i++) {
            emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
        }

        final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
            @Override
            public Observable<Histogram> call(Observable<Event> bucket) {
                return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
            }
        };

        rollingDistributionStream = stream
                .observe()
                .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
                .flatMap(reduceBucketToSingleDistribution)     //stream of aggregated Histograms
                .startWith(emptyDistributionsToStart)          //stream of aggregated Histograms that starts with n empty
                .window(numBuckets, 1)                         //windowed stream: each OnNext is a stream of n Histograms
                .flatMap(reduceWindowToSingleDistribution)     //reduced stream: each OnNext is a single Histogram
                .map(cacheHistogramValues)                     //convert to CachedValueHistogram (commonly-accessed values are cached)
                .share()
                .onBackpressureDrop();
    }
    //......
}

The observe method of HystrixEventStream<Event > is called here to consume Event stream, where event is generic, i.e. a subclass of HystrixEvent

Summary

HystrixEventStream is a reactive stream designed by Hystrix based on rxjava. hystrix command will call HystrixThreadEventStream to obtain an instance in the corresponding life cycle, and publish the corresponding event to the relevant stream that specifies HystrixCommandKey. The event stream is formed, and then other stream consume the event stream and form corresponding metrics.

doc