coder

分布式

共识算法

设想有一台游戏服务器或者一台负责交易撮合的服务器,如果当前的状态只在一台服务器,那么当服务器宕机时,这个时候服务就会不可用,需要重启服务器,重启时还要加载磁盘的状态以恢复内存的状态。为了做高可用,引入了共识算法,简而言之,就是做服务状态的副本,设置N台服务器,但是从外部看来,仍然满足一定的一致性,比较严格的一致性就是线性一致性了。

线性一致性:就是在外部看来,对N台服务器的读写,就像对一台服务器的读写一样。也就是说如果写入了最新值,就不会读到旧的值。

Raft算法

https://blog.csdn.net/z69183787/article/details/112168120

https://ms2008.github.io/2019/12/04/etcd-rumor/

Raft协议分选主(Leader Election)和日志复制(Log Replication)两部分。

选主

Raft为集群中的节点定义了三种状态,分别是Follower,Candidate,Leader。

选主步骤如下:

日志复制

Raft协议规定只能通过Leader来写,可以从任意节点读。Leader通过日志复制的方式将数据同步到其他节点。Leader需要更新的数据封装在Append Entries消息中,通过heartbeat传输给其他节点。写数据一共分两个阶段:

状态机

Raft节点收到提交的log后,会将其应用到自己的状态机(内存、磁盘等),当收到某条commit的log,记为commit index,当某条log被应用到状态机,记为apply index。如果各个节点的初始状态相同,应用相同的log后,最终各个节点会达到相同的状态。

Aeron Cluster

Aeron Cluster实现了Raft算法,下面是Aeron集群核心方法:

public interface ClusteredService
{
    // Start event for the service where the service can perform any initialisation required and load snapshot state.
    // 加载快照
    void onStart(Cluster cluster, Image snapshotImage);

    // A session has been opened for a client to the cluster.
    void onSessionOpen(ClientSession session, long timestamp);

    // A session has been closed for a client to the cluster.
    void onSessionClose(ClientSession session, long timestamp, CloseReason closeReason);

    // A message has been received to be processed by a clustered service.
    // 这个消息已经集群共识了
    void onSessionMessage(...);

    // The service should take a snapshot and store its state to the provided archive
    // 在此将当前内存状态保存到快照里
    void onTakeSnapshot(ExclusivePublication snapshotPublication);

    // Notify that the cluster node has changed role.
    void onRoleChange(Cluster.Role newRole);

    // An election has been successful and a leader has entered a new term.
    default void onNewLeadershipTermEvent(...);
}

所以aeron主要有两种文件:

aeron快照

如何打快照?

boolean snapshot = ClusterTool.snapshot(new File(this.cluster.context().clusterDirectoryName()), new PrintStream(System.out));

如何清理快照文件?

RecordingLog recordingLog = new RecordingLog(new File(consensusPath), false);

// Invalidate the last snapshot taken by the cluster so that on restart it can revert to the previous one.
boolean res = recordingLog.invalidateLatestSnapshot();
recordingLog.close();

为什么需要快照?

如果没有快照,每次启动时都会从最初加载log文件,显然不合适,所以需要一个快照,加载快照,然后aeron就会从快照后面一个log开始加载log文件。

message log file

如果log持续增长,也不合适,因为已经有快照了,快照之前的log是可以剪裁掉的。

可以通过下面的代码剪裁log,Purge (detach and delete) segments from the beginning of a recording up to the provided new start position.

AeronArchive.purgeSegments(final long recordingId, final long newStartPosition)

用aeron cluster设计系统

介绍2种方法,一种方法是快照中存储全部状态。如果快照中存不下全部状态,可以将状态存储在数据库中。

三阶段写

下图是一种基于数据库的设计思路,称为三阶段写

如何处理写

3stage

如何处理一致性读

如果对一致性没有要求,则直接读任意节点内存即可,如果对一致性有要求,则需要遵循如下的规则:

  1. 读取Leader的seq
  2. 带上seq去请求任意节点,等到状态机至少应用到seq之后,返回内存的状态。

❓为什么要分三阶段,而不是在集群共识后统一更新内存?

因为要在共识前生成db更新log,db更新log作为集群共识消息的一部分,持久化模块会消费集群消息,以事务的方式更新db中的业务、seq以及Aeron的position。在这种模式下,Aeron的快照是空文件(只记录position),真正的快照存储在数据库中。当Aeron集群启动时,找到最新的快照,判定db中持久化的position是否大于等于快照的position,如果是,则加载这个快照,反之,这个快照无效,再找一个更早的快照,直到满足加载条件为止。