Talk about sentinel’s DataSource

  sentinel

Order

This article mainly studies sentinel’s DataSource

DataSource

sentinel-datasource-extension-0.1.1-sources.jar! /com/alibaba/csp/sentinel/datasource/DataSource.java

public interface DataSource<S, T> {

    /**
     * Load data data source as the target type.
     *
     * @return the target data.
     * @throws Exception
     */
    T loadConfig() throws Exception;

    /**
     * Read original data from the data source.
     *
     * @return the original data.
     * @throws Exception
     */
    S readSource() throws Exception;

    /**
     * Get {@link SentinelProperty} of the data source.
     *
     * @return the property.
     */
    SentinelProperty<T> getProperty();

    /**
     * Write the {@code values} to the data source.
     *
     * @param values
     * @throws Exception
     */
    void writeDataSource(T values) throws Exception;

    /**
     * Close the data source.
     *
     * @throws Exception
     */
    void close() throws Exception;
}
  • Methods such as loadConfig, readSource, writeDataSource are defined.
  • There is an abstract subclass AbstractDataSource.

AbstractDataSource

sentinel-datasource-extension-0.1.1-sources.jar! /com/alibaba/csp/sentinel/datasource/AbstractDataSource.java

public abstract class AbstractDataSource<S, T> implements DataSource<S, T> {

    protected final ConfigParser<S, T> parser;
    protected final SentinelProperty<T> property;

    public AbstractDataSource(ConfigParser<S, T> parser) {
        if (parser == null) {
            throw new IllegalArgumentException("parser can't be null");
        }
        this.parser = parser;
        this.property = new DynamicSentinelProperty<T>();
    }

    @Override
    public T loadConfig() throws Exception {
        S readValue = readSource();
        T value = parser.parse(readValue);
        return value;
    }

    public T loadConfig(S conf) throws Exception {
        T value = parser.parse(conf);
        return value;
    }

    @Override
    public SentinelProperty<T> getProperty() {
        return property;
    }

    @Override
    public void writeDataSource(T values) throws Exception {
        throw new UnsupportedOperationException();
    }

}
  • ConfigParser attribute is defined and used to parse data source
  • It has an abstract subclass AutoRefreshDataSource

AutoRefreshDataSource

sentinel-datasource-extension-0.1.1-sources.jar! /com/alibaba/csp/sentinel/datasource/AutoRefreshDataSource.java

/**
 * A {@link DataSource} automatically fetches the backend data.
 *
 * @param <S> source data type
 * @param <T> target data type
 * @author Carpenter Lee
 */
public abstract class AutoRefreshDataSource<S, T> extends AbstractDataSource<S, T> {

    private ScheduledExecutorService service;
    protected long recommendRefreshMs = 3000;

    public AutoRefreshDataSource(ConfigParser<S, T> configParser) {
        super(configParser);
        startTimerService();
    }

    public AutoRefreshDataSource(ConfigParser<S, T> configParser, final long recommendRefreshMs) {
        super(configParser);
        if (recommendRefreshMs <= 0) {
            throw new IllegalArgumentException("recommendRefreshMs must > 0, but " + recommendRefreshMs + " get");
        }
        this.recommendRefreshMs = recommendRefreshMs;
        startTimerService();
    }

    private void startTimerService() {
        service = Executors.newScheduledThreadPool(1,
            new NamedThreadFactory("sentinel-datasource-auto-refresh-task", true));
        service.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    T newValue = loadConfig();

                    getProperty().updateValue(newValue);
                } catch (Throwable e) {
                    RecordLog.info("loadConfig exception", e);
                }
            }
        }, recommendRefreshMs, recommendRefreshMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public void close() throws Exception {
        if (service != null) {
            service.shutdownNow();
            service = null;
        }
    }

}
  • The ScheduledExecutorService is created, then the loadConfig method is scheduled and executed regularly, and then the acquired data is updated to property.
  • It has a subclass FileRefreshableDataSource

FileRefreshableDataSource

sentinel-datasource-extension-0.1.1-sources.jar! /com/alibaba/csp/sentinel/datasource/FileRefreshableDataSource.java

/**
 * <p>
 * A {@link DataSource} based on file. This class will automatically fetches the backend file every 3 seconds.
 * </p>
 * <p>
 * Limitations: default read buffer size is 1MB, if file size is greater than buffer size, exceeding bytes will
 * be ignored. Default charset is UTF8.
 * </p>
 *
 * @author Carpenter Lee
 */
public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, T> {

    private static final int MAX_SIZE = 1024 * 1024 * 4;
    private static final long DEFAULT_REFRESH_MS = 3000;
    private static final int DEFAULT_BUF_SIZE = 1024 * 1024;
    private static final Charset DEFAULT_CHAR_SET = Charset.forName("utf-8");

    private byte[] buf;
    private Charset charset;
    private File file;

    /**
     * Create a file based {@link DataSource} whose read buffer size is 1MB, charset is UTF8,
     * and read interval is 3 seconds.
     *
     * @param file         the file to read.
     * @param configParser the config parser.
     */
    public FileRefreshableDataSource(File file, ConfigParser<String, T> configParser) throws FileNotFoundException {
        this(file, configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET);
    }

    public FileRefreshableDataSource(String fileName, ConfigParser<String, T> configParser)
        throws FileNotFoundException {
        this(new File(fileName), configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET);
        //System.out.println(file.getAbsoluteFile());
    }

    public FileRefreshableDataSource(File file, ConfigParser<String, T> configParser, int bufSize)
        throws FileNotFoundException {
        this(file, configParser, DEFAULT_REFRESH_MS, bufSize, DEFAULT_CHAR_SET);
    }

    public FileRefreshableDataSource(File file, ConfigParser<String, T> configParser, Charset charset)
        throws FileNotFoundException {
        this(file, configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, charset);
    }

    public FileRefreshableDataSource(File file, ConfigParser<String, T> configParser, long recommendRefreshMs,
                                     int bufSize, Charset charset) throws FileNotFoundException {
        super(configParser, recommendRefreshMs);
        if (bufSize <= 0 || bufSize > MAX_SIZE) {
            throw new IllegalArgumentException("bufSize must between (0, " + MAX_SIZE + "], but " + bufSize + " get");
        }
        if (file == null) {
            throw new IllegalArgumentException("file can't be null");
        }
        if (charset == null) {
            throw new IllegalArgumentException("charset can't be null");
        }
        this.buf = new byte[bufSize];
        this.file = file;
        this.charset = charset;
        firstLoad();
    }

    private void firstLoad() {
        try {
            T newValue = loadConfig();
            getProperty().updateValue(newValue);
        } catch (Throwable e) {
            RecordLog.info("loadConfig exception", e);
        }
    }

    @Override
    public String readSource() throws Exception {
        FileInputStream inputStream = null;
        try {
            inputStream = new FileInputStream(file);
            FileChannel channel = inputStream.getChannel();
            if (channel.size() > buf.length) {
                throw new RuntimeException(file.getAbsolutePath() + " file size=" + channel.size()
                    + ", is bigger than bufSize=" + buf.length + ". Can't read");
            }
            int len = inputStream.read(buf);
            return new String(buf, 0, len, charset);
        } finally {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (Exception ignore) {
                }
            }
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        buf = null;
    }

    @Override
    public void writeDataSource(T values) throws Exception {
        throw new UnsupportedOperationException();
    }
}
  • Reads data from a file, but writeDataSource does not currently support it.

Summary

  • Sentinel-datasource-extension provides FileRefreshableDataSource by default, and there are extension implementations of zookeeper, nacos, and appllo.
  • If you want to extend yourself, use pull mode to directly inherit AutoRefreshDataSource to implement ReadSource (); Push mode directly inherits AbstractDataSource and constructs its own monitoring method to realize readSource ()

doc