Talk about PeerAwareInstanceRegistryImpl of eureka

  springcloud

Order

This article mainly studies eureka’s PeerAwareInstanceRegistryImpl

EurekaServerAutoConfiguration

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
        InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
    //......
    @Bean
    public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
            ServerCodecs serverCodecs) {
        this.eurekaClient.getApplications(); // force initialization
        return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
                serverCodecs, this.eurekaClient,
                this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
                this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    }

    @Bean
    @ConditionalOnMissingBean
    public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
            ServerCodecs serverCodecs) {
        return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
                this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
    }

    @Bean
    public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
            PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
        return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
                registry, peerEurekaNodes, this.applicationInfoManager);
    }

    @Bean
    public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
            EurekaServerContext serverContext) {
        return new EurekaServerBootstrap(this.applicationInfoManager,
                this.eurekaClientConfig, this.eurekaServerConfig, registry,
                serverContext);
    }
    //......
}

The main focus here is PeerAwareInstanceRegistry. For the configuration of EurekaClient, see EurekaClientAutoConfiguration.

InstanceRegistry

spring-cloud-netflix-eureka-server-2.0.0.RC1-sources.jar! /org/springframework/cloud/netflix/eureka/server/InstanceRegistry.java

public class InstanceRegistry extends PeerAwareInstanceRegistryImpl
        implements ApplicationContextAware {
        //......
    private void handleCancelation(String appName, String id, boolean isReplication) {
        log("cancel " + appName + ", serverId " + id + ", isReplication " + isReplication);
        publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication));
    }

    private void handleRegistration(InstanceInfo info, int leaseDuration,
            boolean isReplication) {
        log("register " + info.getAppName() + ", vip " + info.getVIPAddress()
                + ", leaseDuration " + leaseDuration + ", isReplication "
                + isReplication);
        publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration,
                isReplication));
    }
        
    private void log(String message) {
        if (log.isDebugEnabled()) {
            log.debug(message);
        }
    }

    private void publishEvent(ApplicationEvent applicationEvent) {
        this.ctxt.publishEvent(applicationEvent);
    }
}

This inherits PeerAwareInstanceRegistryImpl, and the rewriting method mainly publishes relevant events before execution.

PeerAwareInstanceRegistryImpl

eureka-core-1.8.8-sources.jar! /com/netflix/eureka/registry/PeerAwareInstanceRegistryImpl.java

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
    //......
}

This class implements openForTraffic, shutdown, statusUpdate, deleteStatusOverride and other methods of InstanceRegistry. As well as init, syncUp, shouldAllowAccess, register (instanceinfo, boolean is replication), statusupdate (final stringsgname, Asgstatus newstatus.
However, AbstractInstanceRegistry mainly implements the basic operations of LeaseManager such as register, cancel, renew, evict, and some query operations of LookupService.
PeerAwareInstanceRegistryImpl mainly adds peer-related processing and RenewalThreshold update on the basis of AbstractInstanceRegistry

replicateToPeers

As shown in the following figure, PeerAwareInstanceRegistryImpl rewrites the cancel, register, renew, statusUpdate, deleteStatusOverride methods, calling the operation of super first, and then calling replicateToPeers.

    /**
     * Replicates all eureka actions to peer eureka nodes except for replication
     * traffic to this node.
     *
     */
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }

This method is to copy the instance information to eureka server peers. IsReplication is used to identify whether the request was replicated by other nodes.
It can be seen that if peerEurekaNodes is empty or isReplication is true, then no further progress will be made.

replicateInstanceActionsToPeers

    /**
     * Replicates all instance changes to peer eureka nodes except for
     * replication traffic to this node.
     *
     */
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

Here, different methods of PeerEurekaNode are called according to different Action.

PeerEurekaNode

eureka-core-1.8.8-sources.jar! /com/netflix/eureka/cluster/PeerEurekaNode.java

HttpReplicationClient

eureka-core-1.8.8-sources.jar! /com/netflix/eureka/cluster/PeerEurekaNodes.java

    protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
        HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
        String targetHost = hostFromUrl(peerEurekaNodeUrl);
        if (targetHost == null) {
            targetHost = "host";
        }
        return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
    }

eureka-core-1.8.8-sources.jar! /com/netflix/eureka/transport/JerseyReplicationClient.java

public class JerseyReplicationClient extends AbstractJerseyEurekaHttpClient implements HttpReplicationClient {

    private static final Logger logger = LoggerFactory.getLogger(JerseyReplicationClient.class);

    private final EurekaJerseyClient jerseyClient;
    private final ApacheHttpClient4 jerseyApacheClient;

    public JerseyReplicationClient(EurekaJerseyClient jerseyClient, String serviceUrl) {
        super(jerseyClient.getClient(), serviceUrl);
        this.jerseyClient = jerseyClient;
        this.jerseyApacheClient = jerseyClient.getClient();
    }

    @Override
    protected void addExtraHeaders(Builder webResource) {
        webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true");
    }

//......
}

This shows that the value of PEERREKANODE. HEADER _ replication is set to true, which means that requests triggered from this client belong to the REPLICATION behavior. It tells the target eureka server not to use replicate again to avoid dead loops.

InstanceResource

eureka-core-1.8.8-sources.jar! /com/netflix/eureka/resources/InstanceResource.java

    /**
     * Handles cancellation of leases for this particular instance.
     *
     * @param isReplication
     *            a header parameter containing information whether this is
     *            replicated from other nodes.
     * @return response indicating whether the operation was a success or
     *         failure.
     */
    @DELETE
    public Response cancelLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        try {
            boolean isSuccess = registry.cancel(app.getName(), id,
                "true".equals(isReplication));

            if (isSuccess) {
                logger.debug("Found (Cancel): {} - {}", app.getName(), id);
                return Response.ok().build();
            } else {
                logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
                return Response.status(Status.NOT_FOUND).build();
            }
        } catch (Throwable e) {
            logger.error("Error (cancel): {} - {}", app.getName(), id, e);
            return Response.serverError().build();
        }

    }

Let’s take cancel as an example, and then repeat replication. Here, we can find that the value of Peereurekanode. Header _ Replication is passed to the registry.cancel method, and this registry is PeerAwareInstanceRegistryImpl, thus looping up.

Summary

PeerAwareInstanceRegistryImpl mainly adds peer-related processing and RenewalThreshold update on peer basis of AbstractInstanceRegistry. When replicate passes PEERREKANODE to other peers, header _ header with replication of true indicates that this is a replicate request, so that the receiver will no longer replicate to his peers.

doc