文章目录
- 一、前言
- 二、Seata Server启动
- 1、找入口
- 2、整体执行流程
- 1)对配置文件做参数解析
- 2)初始化监控
- 3)创建TC与RM/TM通信的RPC服务器
- 4)初始化UUID生成器
- IdWorker
- 1> initTimestampAndSequence()
- 2> initWorkerId(Long)
- 5)设置事务会话(`SessionHolder`)、全局锁(`LockManager`)的持久化方式并初始化
- 1> SessionHolder
- 2> LockerManager
- 6)创建并初始化事务协调器(`DefaultCoordinator`)
- 7)注册ServerRunner销毁(Spring容器销毁)的回调钩子函数DefaultCoordinator
- 8)启动NettyServer(NettyRemotingServer)
- 1> 首先注册消息处理器
- 2> 初始化`NettyRemotingServer`
- AbstractNettyRemotingServer.ServerHandler类
- 三、总结和后续
一、前言
至此,seata系列的内容包括:
- can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决;
- Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
- Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
- 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
- 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
- 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
本文着重聊一聊seata-server启动时都做了什么?
PS:前文中搭建的Seata案例,seata的版本为1.3.0,而本文开始的源码分析将基于当前(2022年8月)最新的版本1.5.2进行源码解析。
二、Seata Server启动
Seata Server包含几个主要模块:Config(配置TC)、Store(TC运行时全局事务以及分支事务的相关信息通过Store持久化)、Coordinator(TC实现事务协调的核心)、Netty-RPC(负责TC与TM/RM交互)、Lock(资源全局锁的实现);
1、找入口
当要启动一个seata-server时,只需要执行压缩包中bin/目录下的seata-server.sh
,在这个脚本中会运行seata-server.jar
;
即对应于源码工程中的server目录 / seata-server 模块,由于seata-server是一个SpringBoot项目,找到其启动类ServerApplication
,里面仅仅指定了一个包扫描路径为io.seata
,并无其余特殊配置;
在启动类的同级目录下,有一个ServerRunner
类;
ServerRunner
类实现了CommandLineRunner
接口:
而CommandLineRunner
接口主要用于实现在Spring容器初始化后执行,并且在整个应用生命周期内只会执行一次;也就是说在Spring容器初始化后会执行ServerRunner#run()
方法;
ServerRunner#run()
方法中仅仅调用了Server#start()
方法;因此可以确定入口为io.seata.server.Server
类的start()方法;
2、整体执行流程
Server#start()
方法:
public class Server {
/**
* The entry point of application.
*
* @param args the input arguments
*/
public static void start(String[] args) {
// create logger
final Logger logger = LoggerFactory.getLogger(Server.class);
//initialize the parameter parser
//Note that the parameter parser should always be the first line to execute.
//Because, here we need to parse the parameters needed for startup.
// 1. 对配置文件做参数解析:包括registry.conf、file.conf的解析
ParameterParser parameterParser = new ParameterParser(args);
// 2、初始化监控,做metric指标采集
MetricsManager.get().init();
// 将Store资源持久化方式放到系统的环境变量store.mode中
System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
// seata server里netty server 的io线程池(核心线程数50,最大线程数100)
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(),
NettyServerConfig.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
// 3、创建TC与RM/TM通信的RPC服务器--netty
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
// 4、初始化UUID生成器(雪花算法)
UUIDGenerator.init(parameterParser.getServerNode());
//log store mode : file, db, redis
// 5、设置事务会话的持久化方式,有三种类型可选:file/db/redis
SessionHolder.init(parameterParser.getSessionStoreMode());
LockerManagerFactory.init(parameterParser.getLockStoreMode());
// 6、创建并初始化事务协调器,创建时后台会启动一堆线程
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
coordinator.init();
// 将DefaultCoordinator作为Netty Server的transactionMessageHandler;
// 用于做AT、TCC、SAGA等不同事务类型的逻辑处理
nettyRemotingServer.setHandler(coordinator);
// let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
// 7、注册ServerRunner销毁(Spring容器销毁)的回调钩子函数
ServerRunner.addDisposable(coordinator);
//127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);
if (StringUtils.isNotBlank(preferredNetworks)) {
XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
}
// 8、启动netty Server,用于接收TM/RM的请求
nettyRemotingServer.init();
}
}
Server端的启动流程大致做了八件事:
- 对配置文件(包括registry.conf、file.conf)做参数解析;
- 初始化监控,做metric指标采集;
- 创建TC与RM/TM通信的RPC服务器(
NettyRemotingServer
)–netty;- 初始化UUID生成器(雪花算法),用于生成全局事务id和分支事务id;
- 设置事务会话(
SessionHolder
)、全局锁(LockManager
)的持久化方式并初始化,有三种类型可选:file/db/redis;- 创建并初始化事务协调器(
DefaultCoordinator
),后台启动一堆线程做定时任务,并将DefaultCoordinator
绑定到RPC服务器上做为transactionMessageHandler
;- 注册ServerRunner销毁(Spring容器销毁)的回调钩子函数DefaultCoordinator;
- 启动netty Server,用于接收TM/RM的请求;
1)对配置文件做参数解析
具体代码执行流程如下:
ParameterParser的init()方法中:
- 首先从启动命令(运行时参数)中解析;
- 接着判断server端是否在容器中启动,是则从容器环境中获取seata环境、host、port、serverNode、storeMode存储模式等信息;
- 如果storeMode不存在,则从配置中心/文件中获取配置。
// 解析运行期参数,默认什么里面什么都没有
private void getCommandParameters(String[] args) {
JCommander jCommander = JCommander.newBuilder().addObject(this).build();
jCommander.parse(args);
if (help) {
jCommander.setProgramName(PROGRAM_NAME);
jCommander.usage();
System.exit(0);
}
}
// server端在容器中启动,则从容器环境中读取环境、host、port、server节点以及StoreMode存储模式
private void getEnvParameters() {
// 设置seata的环境
if (StringUtils.isBlank(seataEnv)) {
seataEnv = ContainerHelper.getEnv();
}
// 设置Host
if (StringUtils.isBlank(host)) {
host = ContainerHelper.getHost();
}
// 设置端口号
if (port == 0) {
port = ContainerHelper.getPort();
}
if (serverNode == null) {
serverNode = ContainerHelper.getServerNode();
}
if (StringUtils.isBlank(storeMode)) {
storeMode = ContainerHelper.getStoreMode();
}
if (StringUtils.isBlank(sessionStoreMode)) {
sessionStoreMode = ContainerHelper.getSessionStoreMode();
}
if (StringUtils.isBlank(lockStoreMode)) {
lockStoreMode = ContainerHelper.getLockStoreMode();
}
}
2)初始化监控
默认不开启,此处不做过多介绍
3)创建TC与RM/TM通信的RPC服务器
单纯的new一个NettyRemotingServer
,也没啥可说的;
4)初始化UUID生成器
UUID底层采用雪花算法,其用于生成全局事务id和分支事务id;
代码执行流程如下:
UUIDGenerator
会委托IdWorker来生成雪花id,生成的雪花Id由0、10位的workerId、41位的时间戳、12位的sequence序列号组成。
IdWorker
IdWorker中有8个重要的成员变量/常量:
/**
* Start time cut (2020-05-03)
*/
private final long twepoch = 1588435200000L;
/**
* The number of bits occupied by workerId
*/
private final int workerIdBits = 10;
/**
* The number of bits occupied by timestamp
*/
private final int timestampBits = 41;
/**
* The number of bits occupied by sequence
*/
private final int sequenceBits = 12;
/**
* Maximum supported machine id, the result is 1023
*/
private final int maxWorkerId = ~(-1 << workerIdBits);
/**
* business meaning: machine ID (0 ~ 1023)
* actual layout in memory:
* highest 1 bit: 0
* middle 10 bit: workerId
* lowest 53 bit: all 0
*/
private long workerId;
/**
* 又是一个雪花算法(64位,8字节)
* timestamp and sequence mix in one Long
* highest 11 bit: not used
* middle 41 bit: timestamp
* lowest 12 bit: sequence
*/
private AtomicLong timestampAndSequence;
/**
* 从一个long数组类型中抽取出一个时间戳伴随序列号,偏向一个辅助性质
* mask that help to extract timestamp and sequence from a long
*/
private final long timestampAndSequenceMask = ~(-1L << (timestampBits + sequenceBits));
变量/常量解释:
- 常量
twepoch
表示我们的时间戳时间从2020-05-03
开始计算,即当前时间的时间戳需要减去twepoch
的值1588435200000L
;- 常量
workerIdBits
表示机器号workerId占10位;- 常量
timestampBits
表示时间戳timestamp占41位;- 常量
sequenceBits
表示序列化占12位;- 常量
maxWorkerId
表示机器号的最大值为1023;- long类型的变量
workerId
本身也是一个雪花算法,只是从开头往后数,第2位开始,一共10位用来表示workerId,其余位全是0;- AtomicLong类型的变量
timestampAndSequence
,其本身也是一个雪花算法,头11位不使用,中间41位表示timestamp,最后12位表示sequence;- long类型的常量
timestampAndSequenceMask
,用于从一个完整的雪花ID(long类型)中摘出timestamp 和 sequence
IdWorker构造器中会分别初始化TimestampAndSequence、WorkerId。
1> initTimestampAndSequence()
initTimestampAndSequence()方法负责初始化timestamp
和sequence
;
private void initTimestampAndSequence() {
// 拿到当前时间戳 - (2020-05-03 时间戳)的数值,即当前时间相对2020-05-03的时间戳
long timestamp = getNewestTimestamp();
// 把时间戳左移12位,后12位流程sequence使用
long timestampWithSequence = timestamp << sequenceBits;
// 把混合sequence(默认为0)的时间戳赋值给timestampAndSequence
this.timestampAndSequence = new AtomicLong(timestampWithSequence);
}
// 获取当前时间戳
private long getNewestTimestamp() {
//当前时间的时间戳减去2020-05-03的时间戳
return System.currentTimeMillis() - twepoch;
}
2> initWorkerId(Long)
initWorkerId(Long workerId)方法负责初始化workId,默认不会传过来workerId,如果传过来则使用传过来的workerId,并校验其不能大于1023,然后将其左移53位;
private void initWorkerId(Long workerId) {
if (workerId == null) {
// workid为null时,自动生成一个workerId
workerId = generateWorkerId();
}
// workerId最大只能是1023,因为其只占10bit
if (workerId > maxWorkerId || workerId < 0) {
String message = String.format("worker Id can't be greater than %d or less than 0", maxWorkerId);
throw new IllegalArgumentException(message);
}
this.workerId = workerId << (timestampBits + sequenceBits);
}
如果没传则基于MAC地址生成;
如果基于MAC地址生成workerId出现异常,则也1023为基数生成一个随机的workerId;
最后同样,校验workerId不能大于1023,然后将其左移53位;
5)设置事务会话(SessionHolder
)、全局锁(LockManager
)的持久化方式并初始化
1> SessionHolder
SessionHolder负责事务会话Session的持久化,一个session对应一个事务,事务又分为全局事务和分支事务;
SessionHolder支持db,file和redis的持久化方式,其中redis和db支持集群模式,项目上推荐使用redis或db模式;
SessionHolder有五个重要的属性,如下:
// 用于管理所有的Setssion,以及Session的创建、更新、删除等
private static SessionManager ROOT_SESSION_MANAGER;
// 用于管理所有的异步commit的Session,包括创建、更新以及删除
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 用于管理所有的重试commit的Session,包括创建、更新以及删除
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 用于管理所有的重试rollback的Session,包括创建、更新以及删除
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;
// 用于管理分布式锁
private static DistributedLocker DISTRIBUTED_LOCKER;
这五个属性在SessionHolder#init()
方法中初始化,init()方法源码如下:
public static void init(String mode) {
if (StringUtils.isBlank(mode)) {
mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE,
CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));
}
StoreMode storeMode = StoreMode.get(mode);
// 根据storeMode采用SPI机制初始化SessionManager
// db模式
if (StoreMode.DB.equals(storeMode)) {
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.DB.getName());
} else if (StoreMode.FILE.equals(storeMode)) {
// 文件模式
String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,
DEFAULT_SESSION_STORE_FILE_DIR);
if (StringUtils.isBlank(sessionStorePath)) {
throw new StoreException("the {store.file.dir} is empty.");
}
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath});
ASYNC_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
RETRY_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
RETRY_ROLLBACKING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.FILE.getName());
} else if (StoreMode.REDIS.equals(storeMode)) {
// redis模式
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName());
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
StoreMode.REDIS.getName(), new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
StoreMode.REDIS.getName(), new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
StoreMode.REDIS.getName(), new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.REDIS.getName());
} else {
// unknown store
throw new IllegalArgumentException("unknown store mode:" + mode);
}
// 根据storeMode重新加载
reload(storeMode);
}
init()方法中根据storeMode采用SPI机制初始化SessionManager,SessionManager
有三个实现类:
2> LockerManager
和SessionHolder
一样,LockManagerFactory#init()
方法同样根据storeMode采用SPI机制初始化LockManager,LockManager
有三个实现类:
6)创建并初始化事务协调器(DefaultCoordinator
)
DefaultCoordinator
是事务协调的核心,比如:开启、提交、回滚全局事务,注册、提交、回滚分支事务都是通过DefaultCoordinator进行协调处理的。
(1)先来看DefaultCoordinator的创建;
使用Double Check Lock(DCL-双重检查锁)机制获取到单例的DefaultCoordinator
;如果DefaultCoordinator
为实例化过,则new一个:
在DefaultCoordinator
的类构造器中,首先绑定远程通信的Server的具体实现到内部成员中,然后实例化一个DefaultCore
,DefaultCore是AT、TCC、XA、Saga四种分布式事务模式的具体实现类;
DefaultCore
的类构造器中首先通过SPI机制加载出所有的AbstractCore的子类,一共有四个:ATCore、TccCore、SagaCore、XACore;然后将AbstractCore
子类可以处理的事务模式作为Key、AbstractCore
子类作为Value存储到一个缓存Map(Map coreMap
)中;
private static Map<BranchType, AbstractCore> coreMap = new ConcurrentHashMap<>();
后续通过BranchType(分支类型)就可以从coreMap中获取到相应事务模式的具体AbstractCore实现类。
(2)初始化DefaultCoordinator;
所谓的初始化,其实就是后台启动一堆线程做定时任务;去定时处理重试回滚、重试提交、异步提交、超时的检测,以及定时清理undo_log。
除定时清理undo_log外,其余定时任务的处理逻辑基本都是:
- 首先获取所有可回滚的全局事务会话Session,如果可回滚的分支事务为空,则直接返回;
- 否者,遍历所有的可回滚Session;为了防止重复回滚,如果session的状态是正在回滚中并且session不是死亡的,则直接返回;
- 如果Session重试回滚超时,从缓存中删除已经超时的回滚Session;
- 发布session回滚完成事件给到Metric,对回滚中的Session添加Session生命周期的监听;
- 使用DefaultCoordinator组合的DefaultCore执行全局回滚。
以处理重试回滚的方法handleRetryRollbacking()
为例:
protected void handleRetryRollbacking() {
SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);
sessionCondition.setLazyLoadBranch(true);
// 获取所有的可回滚的全局事务session
Collection<GlobalSession> rollbackingSessions =
SessionHolder.getRetryRollbackingSessionManager