Swoft source code analysis-RPC function implementation

  php, swoole

Author:bromine
Links:https://www.jianshu.com/p/411 …
Source: Simple Book
The copyright belongs to the author. This article has been reprinted with the authorization of the author and the original text has been rearranged.
Swoft Github:https://github.com/swoft-clou …

Preface

Swoft provides a self-built RPC (remote method call) implementation that allows you to easily call services on other swofts.

RPC server initialization

RPC can be started in two ways: Http with startup and RPC alone. It is worth mentioning that currently swoole’s tcp service is RPC service and there are no other tcp service functions for the time being, so basically tcp-related configuration refers to RPC.

Http companion startup

Swoft’s RPC service starts with Http service startup

//Swoft\Http\Server\Http\HttpServer.php
/**
 * Http Server
 */
class HttpServer extends AbstractServer
    /**
     * Start Server
     *
     * @throws \Swoft\Exception\RuntimeException
     */
    public function start()
    {
        //code ...

        //根据.env配置文件Server区段的TCPABLE字段决定是否启动RPC服务
        if ((int)$this->serverSetting['tcpable'] === 1) {
            $this->registerRpcEvent();
        }
        //code ....
    }
}

Swoole listening

The initialization process registers an swoole listener according to the relevant comments.

//Swoft\Http\Server\Http\HttpServer.php
/**
 * Register rpc event, swoft/rpc-server required
 *
 * @throws \Swoft\Exception\RuntimeException
 */
protected function registerRpcEvent()
{
    //含有@SwooleListener且type为SwooleEvent::TYPE_PORT的Bean,即RpcEventListener
    $swooleListeners = SwooleListenerCollector::getCollector();
    if (!isset($swooleListeners[SwooleEvent::TYPE_PORT][0]) || empty($swooleListeners[SwooleEvent::TYPE_PORT][0])) {
        throw new RuntimeException("Please use swoft/rpc-server, run 'composer require swoft/rpc-server'");
    }

    //添加swoole RPC相关的tcp监听端口,使用的是.env文件中的TCP区段配置
    $this->listen = $this->server->listen($this->tcpSetting['host'], $this->tcpSetting['port'], $this->tcpSetting['type']);
    $tcpSetting = $this->getListenTcpSetting();
    $this->listen->set($tcpSetting);

    //根据RpcEventListener的相关注解添加监听处理句柄
    $swooleRpcPortEvents = $swooleListeners[SwooleEvent::TYPE_PORT][0];
    $this->registerSwooleEvents($this->listen, $swooleRpcPortEvents);
}

Because it is the first edition, according to@SwooleListenerThe processing of getting RPC listening Bean is still a bit stiff for the time being.
At present, the type in swoft isSwooleEvent::TYPE_PORTThe@SwooleListenerOnlyRpcEventListenerOne, if adding the same Bean is easy to cause problems, there should be relevant optimization when the stable version is released.

RPC startup alone

Entrance fromSwoft\Http\Server\Command\ServerCommandChange intoSwoft\Rpc\Server\Command\RpcCommandThe process is the same as Http, that is, swoole’s setting monitoring is only without HTTP-related monitoring interfaces and events, which will not be repeated here.

RPC request processing

The difference between RPC server and Http server only lies in the format of the message interacting with the client and the network layer where the message is located (the RPC of Swoft is based on TCP layer). Its operation principle is basically the same. They are all routes, middleware and RPC Service (the Controller corresponding to Http). You can fully understand them in the way of HTTP service.

After Swoole’s RPC-TCP listening is set, the RPC server can begin accepting requests.RpcEventListenerThe only job of the is to forward the received data to\Swoft\Rpc\Server\ServiceDispatcherDistribution.DispatcherRequests are passed to eachMiddleware middlewareAnd eventually passed on toHandlerAdapterMiddlewareHandling.

PackerMiddleware

PackerMiddlewareIt is an important middleware in RPC, which is responsible for unpacking and packetizing the data stream in TCP requests.

<?php
//Swoft\Rpc\Server\Middleware.PackerMiddleware
namespace Swoft\Rpc\Server\Middleware;
/**
 * service packer
 *
 * @Bean()
 */
class PackerMiddleware implements MiddlewareInterface
{
    /**
     * packer middleware
     *
     * @param \Psr\Http\Message\ServerRequestInterface     $request
     * @param \Psr\Http\Server\RequestHandlerInterface $handler
     *
     * @return \Psr\Http\Message\ResponseInterface
     */
    public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
    {
        //获取servicePacker Bean(\Swoft\Rpc\Packer\ServicePacker)用于字符串解包封包
        $packer = service_packer();
        $data   = $request->getAttribute(self::ATTRIBUTE_DATA);
        $data   = $packer->unpack($data);

        // 触发一个RpcServerEvent::BEFORE_RECEIVE事件,默认只有一个用于添加请求上下文信息的BeforeReceiveListener
        // 利用中间件触发流程关键事件的做法耦合有点高,猜测以后会调整
        App::trigger(RpcServerEvent::BEFORE_RECEIVE, null, $data);
        //替换解包后的解包到Request中,提供给后续中间件和Handler使用
        $request = $request->withAttribute(self::ATTRIBUTE_DATA, $data);

        /* @var \Swoft\Rpc\Server\Rpc\Response $response */
        $response      = $handler->handle($request);

       //为Response封包返回给RPC客户端
        $serviceResult = $response->getAttribute(HandlerAdapter::ATTRIBUTE);
        $serviceResult = $packer->pack($serviceResult);
        return $response->withAttribute(HandlerAdapter::ATTRIBUTE, $serviceResult);
    }
}

RouterMiddleware

RouterMiddlewareThe RPC service class responsible for obtaining and processing RPC requests according to the method, version and interface acts asThe Role of Routing

<?php
//Swoft\Rpc\Server\Middleware\RouterMiddleware.php
/**
 * Service router
 *
 * @Bean()
 */
class RouterMiddleware implements MiddlewareInterface
{
    /**
     * get handler from router
     *
     * @param \Psr\Http\Message\ServerRequestInterface     $request
     * @param \Psr\Http\Server\RequestHandlerInterface $handler
     *
     * @return \Psr\Http\Message\ResponseInterface
     */
    public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
    {
        // service data
        $data = $request->getAttribute(PackerMiddleware::ATTRIBUTE_DATA);

        $method    = $data['method']??"";
        $version   = $data['version']??"";
        $interface = $data['interface']??"";

        /* @var \Swoft\Rpc\Server\Router\HandlerMapping $serviceRouter */
        $serviceRouter  = App::getBean('serviceRouter');
        //路由匹配,即向Swoft\Rpc\Server\Router\HandlerMapping->$routes获取RPC服务信息
        $serviceHandler = $serviceRouter->getHandler($interface, $version, $method);

        // deliver service data
        $request = $request->withAttribute(self::ATTRIBUTE, $serviceHandler);

        return $handler->handle($request);
    }
}

The Swoft startup phase scans and initializes annotation information (Reference notes section), a comment will be triggered after initializationAppEvent::APPLICATION_LOADEREvent, all RPC routing information from @Service will be registered to theSwoft\Rpc\Server\Router\HandlerMapping->$routesIn, forserviceRouter BeanThe routing of the.

HandlerAdapterMiddleware

HandlerAdapterMiddlewareFinally forward the request toHandlerAdapterProcessing, HandlerAdapter will use the service class information just matched by RouterMiddleware to forward the request and encapsulate the Response to finally return to ServiceDispatcher, which will return TCP flow to the client and end the request.

<?php
//Swoft\Rpc\Server\Router\HandlerAdapter.php
/**
 * Service handler adapter
 * @Bean("serviceHandlerAdapter")
 */
class HandlerAdapter implements HandlerAdapterInterface
{

    /**
     * Execute service handler
     *
     * @param \Psr\Http\Message\ServerRequestInterface $request
     * @param array                                    $handler
     * @return Response
     */
    public function doHandler(ServerRequestInterface $request, array $handler): Response
    {
        // RPC方法的各个参数
        $data = $request->getAttribute(PackerMiddleware::ATTRIBUTE_DATA);
        $params = $data['params'] ?? [];
        
        //路由解析出来的,处理该请求的服务Bean和方法
        list($serviceClass, $method) = $handler;
        $service = App::getBean($serviceClass);

        // execute handler with params
        $response = PhpHelper::call([$service, $method], $params);
        $response = ResponseHelper::formatData($response);

        // 构造Response返回客户端
        if (! $response instanceof Response) {
            $response = (new Response())->withAttribute(self::ATTRIBUTE, $response);
        }

        return $response;
    }
}

RPC client implementation

Declare in Bean’s property@Reference, Swoft will be based on@varDeclared types are injected into the corresponding RPC client instance.

/**
 * @Reference(name="useraaaaaa")
 *
 * @var DemoInterface
 */
private $demoService;

The implementation of dependency injection will be explained separately in a separate article. Let’s look at the code of RPC client first.

Remote agent

namespace Swoft\Rpc\Client\Service;

/**
 * The proxy of service
 */
class ServiceProxy
{
    /**
     * @param string $className
     * @param string $interfaceClass
     */
    public static function loadProxyClass(string $className, string $interfaceClass)
    {
        $reflectionClass   = new \ReflectionClass($interfaceClass);
        $reflectionMethods = $reflectionClass->getMethods(\ReflectionMethod::IS_PUBLIC);

        $template = "class $className extends \\Swoft\\Rpc\\Client\\Service implements {$interfaceClass} {";

        //\Swoft\Rpc\Client\Service::class
        // the template of methods
        $template .= self::getMethodsTemplate($reflectionMethods);
        $template .= "}";
            
        eval($template);
    }
    //code ...
}

Like AOP, the principle is the sameDynamic proxy, more specificallyDynamic remote agent.
RPC dynamic client class implements the Interface type declared by the client (such as DemoInterface) and inherits\Swoft\Rpc\Client\ServiceClass.
The implementation of dynamic classes is very simple, and methods explicitly declared by interfaces are actually called\Swoft\Rpc\Client\Service->call()Methods.

interface DemoInterface
{
    /**
     * @param array $ids
     * @return array
     */
    public function getUsers(array $ids);
}
class 动态生成RPC客户端类 extends \Swoft\Rpc\Client\Service implements \App\Lib\DemoInterface { 
    public function getUsers ( array  $ids  ) {
        $params = func_get_args();
        return $this->call('getUsers', $params);
    }
    //code ...
}

For automatically generated defer method, it is through magic method__call(), calling\Swoft\Rpc\Client\Service->deferCall()

/**
 * @param string $name
 * @param array  $arguments
 *
 * @return ResultInterface
 * @throws RpcClientException
 */
function __call(string $name, array $arguments)
{
    $method = $name;
    $prefix = self::DEFER_PREFIX;//'defer'
    if (strpos($name, $prefix) !== 0) {
        throw new RpcClientException(sprintf('the method of %s is not exist! ', $name));
    }

    if ($name == $prefix) {
        $method = array_shift($arguments);
    } elseif (strpos($name, $prefix) === 0) {
        $method = lcfirst(ltrim($name, $prefix));
    }

    return $this->deferCall($method, $arguments);
}

We only look at representative ones here.call()Methods,deferCall()About the same.
The essence of RPC client dynamic class is to transfer client parameters and interface information to RPC server according to Swoft’s own format, then unpack the data returned by the server and return the returned value to the caller of RPC, masquerading as a common object to shield remote calling operation.

// Swoft\Rpc\Client\Service.php
/**
 * Do call service
 *
 * @param string $func
 * @param array  $params
 *
 * @throws \Throwable
 * @return mixed
 */
public function call(string $func, array $params)
{
    $profileKey = $this->interface . '->' . $func;
    //根据@reference的fallback属性获取降级处理句柄,在RPC服务调用失败的时候可以会使用fallback句柄代替
    $fallback   = $this->getFallbackHandler($func);
    try {
        $connectPool    = $this->getPool();
        $circuitBreaker = $this->getBreaker();

        /* @var $client AbstractServiceConnection */
        $client = $connectPool->getConnection();
        //数据封包,和RPC服务端一致
        $packer   = service_packer();
        $type     = $this->getPackerName();
        $data     = $packer->formatData($this->interface, $this->version, $func, $params);
        $packData = $packer->pack($data, $type);

        //通过熔断器调用接口
        $result = $circuitBreaker->call([$client, 'send'], [$packData], $fallback);
        if ($result === null || $result === false) {
            return null;
        }

        //和defercall不一致这里直接收包,解包
        App::profileStart($profileKey);
        $result = $client->recv();
        App::profileEnd($profileKey);
        $connectPool->release($client);

        App::debug(sprintf('%s call %s success, data=%', $this->interface, $func, json_encode($data, JSON_UNESCAPED_UNICODE)));
        $result = $packer->unpack($result);
        $data   = $packer->checkData($result);
    } catch (\Throwable $throwable) {
        if (empty($fallback)) {
            throw $throwable;
        }
        //RPC调用失败则调用降级句柄,代替实际RPC服务直接返回
        $data = PhpHelper::call($fallback, $params);
    }

    return $data;
}

Fuse

FuseAnother important concept of swoft-RPC is that all requests from RPC are sent through fuses.
Fuses are implemented using the state mode. Fuses includeOpen,ajar,CloseIn the three states, fuses will hold different state instances in different states. The states will be switched according to RPC calls. Fuses will behave differently according to the holding state instances.

Fuse Off State Policy

<?php
//Swoft\Sg\Circuit\CloseState.php 
/**
 * close状态的熔断器,对所有RPC调用都通过协程客户端发送到RPC服务器
 *  关闭状态及切换
 * 1. 重置failCounter=0 successCount=0
 * 2. 操作失败,failCounter计数
 * 3. 操作失败一定计数,切换为open开启状态
 */
class CloseState extends CircuitBreakerState
{
    /**
     * 熔断器调用
     *
     * @param mixed $callback 回调函数
     * @param array $params 参数
     * @param mixed $fallback 失败回调
     *
     * @return mixed 返回结果
     */
    public function doCall($callback, $params = [], $fallback = null)
    {
        list($class, $method) = $callback;

        try {
            if ($class == null) {
                throw new \Exception($this->circuitBreaker->serviceName . "服务,连接建立失败(null)");
            }

            if ($class instanceof Client && $class->isConnected() == false) {
                throw new \Exception($this->circuitBreaker->serviceName . "服务,当前连接已断开");
            }
            //调用swoole协程客户端的send()方法发送数据
            $data = $class->$method(...$params);
        } catch (\Exception $e) {
            //递增失败计数
            if ($this->circuitBreaker->isClose()) {
                $this->circuitBreaker->incFailCount();
            }

            App::error($this->circuitBreaker->serviceName . "服务,当前[关闭状态],服务端调用失败,开始服务降级容错处理,error=" . $e->getMessage());
            //RPC调用失败则使用降级接口 
            $data = $this->circuitBreaker->fallback($fallback);
        }
        
        //失败次数过线则切换状态
        $failCount = $this->circuitBreaker->getFailCounter();
        $switchToFailCount = $this->circuitBreaker->getSwitchToFailCount();
        if ($failCount >= $switchToFailCount && $this->circuitBreaker->isClose()) {
            App::trace($this->circuitBreaker->serviceName . "服务,当前[关闭状态],服务失败次数达到上限,开始切换为开启状态,failCount=" . $failCount);
            $this->circuitBreaker->switchToOpenState();
        }

        App::trace($this->circuitBreaker->serviceName . "服务,当前[关闭状态],failCount=" . $this->circuitBreaker->getFailCounter());
        return $data;
    }
}

Fuse open state strategy

<?php
\\Swoft\Sg\Circuit\OpenState .php;
/**
 * open状态的熔断器,对所有RPC调用都使用降级句柄代替
 * 开启状态及切换(open)
 * 1. 重置failCounter=0 successCounter=0
 * 2. 请求立即返回错误响应
 * 3. 定时器一定时间后切换为半开状态(open)
 */
class OpenState extends CircuitBreakerState
{
    /**
     * 熔断器调用
     *
     * @param mixed $callback 回调函数
     * @param array $params 参数
     * @param mixed $fallback 失败回调
     *
     * @return mixed 返回结果
     */
    public function doCall($callback, $params = [], $fallback = null)
    {
        $data = $this->circuitBreaker->fallback();

        App::trace($this->getServiceName() . "服务,当前[开启状态],执行服务fallback服务降级容错处理");
        $nowTime = time();

        if ($this->circuitBreaker->isOpen()
            && $nowTime > $this->circuitBreaker->getSwitchOpenToHalfOpenTime()
        ) {
            $delayTime = $this->circuitBreaker->getDelaySwitchTimer();

            // swoole定时器不是严格的,3s容错时间 ,定时切换状态的半开
            $switchToHalfStateTime = $nowTime + ($delayTime / 1000) + 3;
            App::getTimer()->addAfterTimer('openState', $delayTime, [$this, 'delayCallback']);
            $this->circuitBreaker->setSwitchOpenToHalfOpenTime($switchToHalfStateTime);

            App::trace($this->getServiceName() . "服务,当前[开启状态],创建延迟触发器,一段时间后状态切换为半开状态");
        }

        return $data;
    }

}

Fuse half-open state strategy

The semi-open fuse is an excessive state between the closed state of the fuse and the open state of the fuse. All RPC calls of the semi-open fuse are locked and will switch to the closed state or the open state after continuous success or continuous failure to the threshold value. The code is similar and will not be repeated here. Interested readers can study it by themselves.

Swoft source code analysis series catalog:https://segmentfault.com/a/11 …