[case48] Talk about flink’s SocketClientSink

  flink

Order

This article mainly studies flink’s SocketClientSink

DataStream.writeToSocket

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/DataStream.java

    /**
     * Writes the DataStream to a socket as a byte array. The format of the
     * output is specified by a {@link SerializationSchema}.
     *
     * @param hostName
     *            host of the socket
     * @param port
     *            port of the socket
     * @param schema
     *            schema for serialization
     * @return the closed DataStream
     */
    @PublicEvolving
    public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T> schema) {
        DataStreamSink<T> returnStream = addSink(new SocketClientSink<>(hostName, port, schema, 0));
        returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
        return returnStream;
    }
  • DataStream’s writeToSocket method internally creates SocketClientSink, which passes four construction parameters, namely hostName, port, schema, maxNumRetries (Here is 0)

SocketClientSink

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/sink/SocketClientSink.java

/**
 * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
 *
 * <p>The sink can be set to retry message sends after the sending failed.
 *
 * <p>The sink can be set to 'autoflush', in which case the socket stream is flushed after every
 * message. This significantly reduced throughput, but also decreases message latency.
 *
 * @param <IN> data to be written into the Socket.
 */
@PublicEvolving
public class SocketClientSink<IN> extends RichSinkFunction<IN> {

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);

    private static final int CONNECTION_RETRY_DELAY = 500;


    private final SerializableObject lock = new SerializableObject();
    private final SerializationSchema<IN> schema;
    private final String hostName;
    private final int port;
    private final int maxNumRetries;
    private final boolean autoFlush;

    private transient Socket client;
    private transient OutputStream outputStream;

    private int retries;

    private volatile boolean isRunning = true;

    /**
     * Creates a new SocketClientSink. The sink will not attempt to retry connections upon failure
     * and will not auto-flush the stream.
     *
     * @param hostName Hostname of the server to connect to.
     * @param port Port of the server.
     * @param schema Schema used to serialize the data into bytes.
     */
    public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema) {
        this(hostName, port, schema, 0);
    }

    /**
     * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
     * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
     * The sink will not auto-flush the stream.
     *
     * @param hostName Hostname of the server to connect to.
     * @param port Port of the server.
     * @param schema Schema used to serialize the data into bytes.
     * @param maxNumRetries The maximum number of retries after a message send failed.
     */
    public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema, int maxNumRetries) {
        this(hostName, port, schema, maxNumRetries, false);
    }

    /**
     * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
     * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
     *
     * @param hostName Hostname of the server to connect to.
     * @param port Port of the server.
     * @param schema Schema used to serialize the data into bytes.
     * @param maxNumRetries The maximum number of retries after a message send failed.
     * @param autoflush Flag to indicate whether the socket stream should be flushed after each message.
     */
    public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema,
                            int maxNumRetries, boolean autoflush) {
        checkArgument(port > 0 && port < 65536, "port is out of range");
        checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");

        this.hostName = checkNotNull(hostName, "hostname must not be null");
        this.port = port;
        this.schema = checkNotNull(schema);
        this.maxNumRetries = maxNumRetries;
        this.autoFlush = autoflush;
    }

    // ------------------------------------------------------------------------
    //  Life cycle
    // ------------------------------------------------------------------------

    /**
     * Initialize the connection with the Socket in the server.
     * @param parameters Configuration.
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        try {
            synchronized (lock) {
                createConnection();
            }
        }
        catch (IOException e) {
            throw new IOException("Cannot connect to socket server at " + hostName + ":" + port, e);
        }
    }


    /**
     * Called when new data arrives to the sink, and forwards it to Socket.
     *
     * @param value The value to write to the socket.
     */
    @Override
    public void invoke(IN value) throws Exception {
        byte[] msg = schema.serialize(value);

        try {
            outputStream.write(msg);
            if (autoFlush) {
                outputStream.flush();
            }
        }
        catch (IOException e) {
            // if no re-tries are enable, fail immediately
            if (maxNumRetries == 0) {
                throw new IOException("Failed to send message '" + value + "' to socket server at "
                        + hostName + ":" + port + ". Connection re-tries are not enabled.", e);
            }

            LOG.error("Failed to send message '" + value + "' to socket server at " + hostName + ":" + port +
                    ". Trying to reconnect..." , e);

            // do the retries in locked scope, to guard against concurrent close() calls
            // note that the first re-try comes immediately, without a wait!

            synchronized (lock) {
                IOException lastException = null;
                retries = 0;

                while (isRunning && (maxNumRetries < 0 || retries < maxNumRetries)) {

                    // first, clean up the old resources
                    try {
                        if (outputStream != null) {
                            outputStream.close();
                        }
                    }
                    catch (IOException ee) {
                        LOG.error("Could not close output stream from failed write attempt", ee);
                    }
                    try {
                        if (client != null) {
                            client.close();
                        }
                    }
                    catch (IOException ee) {
                        LOG.error("Could not close socket from failed write attempt", ee);
                    }

                    // try again
                    retries++;

                    try {
                        // initialize a new connection
                        createConnection();

                        // re-try the write
                        outputStream.write(msg);

                        // success!
                        return;
                    }
                    catch (IOException ee) {
                        lastException = ee;
                        LOG.error("Re-connect to socket server and send message failed. Retry time(s): " + retries, ee);
                    }

                    // wait before re-attempting to connect
                    lock.wait(CONNECTION_RETRY_DELAY);
                }

                // throw an exception if the task is still running, otherwise simply leave the method
                if (isRunning) {
                    throw new IOException("Failed to send message '" + value + "' to socket server at "
                            + hostName + ":" + port + ". Failed after " + retries + " retries.", lastException);
                }
            }
        }
    }

    /**
     * Closes the connection with the Socket server.
     */
    @Override
    public void close() throws Exception {
        // flag this as not running any more
        isRunning = false;

        // clean up in locked scope, so there is no concurrent change to the stream and client
        synchronized (lock) {
            // we notify first (this statement cannot fail). The notified thread will not continue
            // anyways before it can re-acquire the lock
            lock.notifyAll();

            try {
                if (outputStream != null) {
                    outputStream.close();
                }
            }
            finally {
                if (client != null) {
                    client.close();
                }
            }
        }
    }

    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------

    private void createConnection() throws IOException {
        client = new Socket(hostName, port);
        client.setKeepAlive(true);
        client.setTcpNoDelay(true);

        outputStream = client.getOutputStream();
    }

    // ------------------------------------------------------------------------
    //  For testing
    // ------------------------------------------------------------------------

    int getCurrentNumberOfRetries() {
        synchronized (lock) {
            return retries;
        }
    }
}
  • SocketClientSink inherits RichSinkFunction, and its autoFlush property defaults to false.
  • CreateConnection is called in the open method to initialize the connection with socket. If IOException occurs at this time, FAILFAST will immediately occur. When createConnection, keepAlive and tcpNoDelay set here are both true.
  • The invoke method first calls the schema.serialize method to serialize the value, and then calls socket’s outputStream.write if autoFlush is true, flush outputStream; immediately; If there is an IOException, it will be retried immediately. The logic of retries here is written directly in catch. According to maxNumRetries, when retries, it is to createConnection first and then call outputStream.write. The delay of retries is CONNECTION_RETRY_DELAY (500)

Summary

  • DataStream’s writeToSocket method internally creates a SocketClientSink, the default passed maxNumRetries is 0, and there is no constructor called with the default of false for the autoFlush property, which defaults to false.
  • Socket created by the open method, its keepAlive and tcpNoDelay are both true. if IOException occurs when open, an exception is thrown to terminate the operation.
  • The invoke method is relatively simple, that is, SerializationSchema is used to serialize value, and then write to outputStream;; Here, a simple failed retry was performed, and the default retry delay is CONNECTION_RETRY_DELAY (500), this version of the implementation of the retry is relatively simple, is synchronized

doc