해당 글을 이해하기 위해선 이전 글을 보고 오셔야 됩니다. (특히 TC, TM, RM의 관계에 대해 인지하고 계셔야 합니다)
https://solution-is-here.tistory.com/235
Apache Seata란?
1. Seata란?Apache Seata는 마이크로서비스 아키텍처에서 고성능과 사용 편의성을 제공하는 분산 트랜잭션 프레임워크입니다. 알리바바에 의해 시작되었으며, 2023년에 Apache 재단에 기부되었습니다. S
solution-is-here.tistory.com
이번 글에서는 Seata를 실행했을 때 발생하는 로그를 통해 어떻게 TC와 TM, RM이 연결되는지 알아보려 합니다.
이때 로그에서 필요한 정보를 제외한 나머지 정보는 생략했습니다. (시간, 프로젝트 이름)
초기화 과정 분석
1. Seata 자동 구성
o.a.s.s.b.a.SeataAutoConfiguration : Automatically configure Seata
Spring Boot 애플리케이션이 시작될 때 SeataAutoConfiguration이 자동으로 Seata를 구성합니다. 이 과정에서 Seata의 핵심 구성 요소가 초기화됩니다.
코드를 자세히 보면 @DependsOn 어노테이션을 통해 SpringApplicationContextProvider와 failureHandler를 초기화한 후, 빈을 생성하는 것을 볼 수 있습니다. (seata가 springContext에 접근을 해야되기 때문에 SpringApplicationContextProvider 빈이 초기화 된 뒤, 빈이 생성되어야 합니다)
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public static GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler,
ConfigurableListableBeanFactory beanFactory,
@Autowired(required = false) List<ScannerChecker> scannerCheckers) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
// set bean factory
GlobalTransactionScanner.setBeanFactory(beanFactory);
// add checkers
// '/META-INF/services/org.apache.seata.spring.annotation.ScannerChecker'
GlobalTransactionScanner.addScannerCheckers(EnhancedServiceLoader.loadAll(ScannerChecker.class));
// spring beans
GlobalTransactionScanner.addScannerCheckers(scannerCheckers);
// add scannable packages
GlobalTransactionScanner.addScannablePackages(seataProperties.getScanPackages());
// add excludeBeanNames
GlobalTransactionScanner.addScannerExcludeBeanNames(seataProperties.getExcludesForScanning());
//set accessKey and secretKey
GlobalTransactionScanner.setAccessKey(seataProperties.getAccessKey());
GlobalTransactionScanner.setSecretKey(seataProperties.getSecretKey());
// create global transaction scanner
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(),
seataProperties.isExposeProxy(), failureHandler);
}
2. 글로벌 트랜잭션 클라이언트 초기화
o.a.s.s.a.GlobalTransactionScanner : Initializing Global Transaction Clients ...
1 단계에서 bean으로 생성된 GlobalTransactionScanner가 글로벌 트랜잭션 클라이언트(TM과 RM)를 초기화합니다.
아래의 코드와 같이 GlobalTransactionScanner은 InitializingBean을 구현한 것을 볼 수 있다.
이때 InitializingBean 인터페이스에는 afterPropertiesSet 메서드가 있는데, 해당 메서드는 의존성 주입이 끝난 뒤, 호출되는 특징을 가진 메서드입니다.
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements CachedConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
GlobalTransactionScanner에서 afterPropertiesSet을 Override 한 것을 보면 initClient 메서드를 호출하는 것을 볼 수 있습니다.
@Override
public void afterPropertiesSet() {
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (CachedConfigurationChangeListener) this);
return;
}
if (initialized.compareAndSet(false, true)) {
initClient();
}
this.findBusinessBeanNamesNeededEnhancement();
}
initClient 메서드에서 글로벌 트랜잭션 클라이언트(TM, RM)을 초기화합니다.
protected void initClient() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +
"please change your default configuration as soon as possible " +
"and we don't recommend you to use default tx-service-group's value provided by seata",
DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
}
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
//init TM
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
//init RM
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
registerSpringShutdownHook();
}
이러한 코드를 통해 GlobalTransactionScanner 빈이 의존성 주입이 완료되면 자동으로 트랜잭션 클라이언트(TM, RM)가 초기화됩니다.
3. TM 등록 과정
o.a.s.c.rpc.netty.NettyPoolableFactory : NettyPool create channel to transactionRole:TMROLE,address:127.0.0.1:8091,msg:< RegisterTMRequest{version='2.3.0', applicationId='distributed-transactions-at-demo', transactionServiceGroup='my_test_tx_group', extraData='ak=null
TM은 TC 서버에 자신을 등록하기 위해 물리적 TCP 연결을 생성합니다.
//init TM
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
GlobalTransactionScanner의 initClient에서 TMClient.init을 통해 TM을 초기화합니다.
이 과정에서 TM을 등록하는데 필요한, applicationId, txServiceGroup, accessKey, secretKey 등을 함께 전달합니다.
public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);
tmNettyRemotingClient.init();
}
TmClient는 GlobalTransactionScanner로부터 받은 정보를 바탕으로 TmNettyRemotingClient 인스턴스를 생성한 뒤, init 메서드를 통해 초기화합니다.
@Override
public void init() {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
if (isNotBlank(transactionServiceGroup)) {
initConnection();
}
}
}
TmNettyRemotingClient에서 부모 클래스의 init 메서드를 호출합니다.
@Override
public void init() {
timerExecutor.scheduleAtFixedRate(
() -> {
try {
clientChannelManager.reconnect(getTransactionServiceGroup());
} catch (Exception ex) {
LOGGER.warn("reconnect server failed. {}", ex.getMessage());
}
},
SCHEDULE_DELAY_MILLS,
SCHEDULE_INTERVAL_MILLS,
TimeUnit.MILLISECONDS);
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(
MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
}
부모 클래스인 AbstractNettyRemotingClient의 init 메서드에서 try-catch를 통해 NettyClientChannelManager 클래스의 reconnect를 호출합니다.
private Channel doConnect(String serverAddress) {
Channel channelToServer = channels.get(serverAddress);
if (channelToServer != null && channelToServer.isActive()) {
return channelToServer;
}
Channel channelFromPool;
try {
NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
poolKeyMap.put(serverAddress, currentPoolKey);
channelFromPool = nettyClientKeyPool.borrowObject(currentPoolKey);
channels.put(serverAddress, channelFromPool);
} catch (Exception exx) {
LOGGER.error("{} register RM failed.", FrameworkErrorCode.RegisterRM.getErrCode(), exx);
throw new FrameworkException("can not register RM,err:" + exx.getMessage());
}
return channelFromPool;
}
NettyClientChannelManager의 reConnect가 doConnect를 호출하는데 중간 과정이 복잡해 해당 과정은 생략하겠습니다.
NettyClientChannelManager의 doConnect에서 nettyClientKeyPool.borrowObject를 통해 채널을 획득하려고 합니다.
하지만, 초기화를 하는 과정에서는 nettyClientKeyPool에 저장된 채널이 없기에 borrowObject를 해도 얻는 것이 없습니다.
그래서 채널을 생성하기 위해 makeObject 메서드가 호출됩니다.
(이 메서드가 호출되는 것을 설명하기 위해 이렇게 길게 설명했습니다.....)
@Override
public Channel makeObject(NettyPoolKey key) {
InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());
if (LOGGER.isInfoEnabled()) {
LOGGER.info("NettyPool create channel to " + key);
}
Channel tmpChannel = clientBootstrap.getNewChannel(address);
long start = System.currentTimeMillis();
Object response;
Channel channelToServer = null;
if (key.getMessage() == null) {
throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name());
}
try {
response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage());
if (!isRegisterSuccess(response, key.getTransactionRole())) {
rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());
} else {
channelToServer = tmpChannel;
rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage());
}
} catch (Exception exx) {
if (tmpChannel != null) {
tmpChannel.close();
}
throw new FrameworkException(
"register " + key.getTransactionRole().name() + " error, errMsg:" + exx.getMessage());
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("register success, cost " + (System.currentTimeMillis() - start) + " ms, version:" + getVersion(
response, key.getTransactionRole()) + ",role:" + key.getTransactionRole().name() + ",channel:"
+ channelToServer);
}
return channelToServer;
}
makeObject에서 InetSocketAddress 객체를 생성하면서 Netty의 Bootstrap을 사용해 물리적 TCP 연결을 생성합니다.
4. TM 등록 성공
o.a.s.c.rpc.netty.TmNettyRemotingClient : register TM success. client version:2.3.0, server version:2.3.0,channel:[id: 0xd0eadd3f, L:/127.0.0.1:56429 - R:/127.0.0.1:8091]
TC 서버가 TM의 등록 요청을 승인하고, TM과 TC 간의 Netty 채널이 설정됩니다. 이 채널은 글로벌 트랜잭션의 시작, 커밋, 롤백 등을 위한 통신에 사용됩니다.
@Override
public Channel makeObject(NettyPoolKey key) {
InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());
if (LOGGER.isInfoEnabled()) {
LOGGER.info("NettyPool create channel to " + key);
}
Channel tmpChannel = clientBootstrap.getNewChannel(address);
long start = System.currentTimeMillis();
Object response;
Channel channelToServer = null;
if (key.getMessage() == null) {
throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name());
}
try {
response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage());
if (!isRegisterSuccess(response, key.getTransactionRole())) {
rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());
} else {
channelToServer = tmpChannel;
rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage());
}
} catch (Exception exx) {
if (tmpChannel != null) {
tmpChannel.close();
}
throw new FrameworkException(
"register " + key.getTransactionRole().name() + " error, errMsg:" + exx.getMessage());
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("register success, cost " + (System.currentTimeMillis() - start) + " ms, version:" + getVersion(
response, key.getTransactionRole()) + ",role:" + key.getTransactionRole().name() + ",channel:"
+ channelToServer);
}
return channelToServer;
}
3단계에서 생성한 InetSocketAddress를 바탕으로 채널을 생성합니다.
이후, 생성된 채널을 통해 등록 메시지를 TC 서버로 전송합니다.
메시지 전송이 성공한 경우 "register TM success. ~~"와 같은 메시지를 출력합니다.
5. RM 초기화 및 등록
o.a.s.c.rpc.netty.NettyPoolableFactory : NettyPool create channel to transactionRole:RMROLE,address:127.0.0.1:8091,msg:< RegisterRMRequest{resourceIds='jdbc:mysql://localhost:3306/user-service', version='2.3.0', applicationId='distributed-transactions-at-demo', transactionServiceGroup='my_test_tx_group', extraData='null'} >
RM은 TC 서버에 자신을 등록하기 위해 물리적 TCP 연결을 생성합니다.
TM과 RM의 초기화 & 등록 방법은 동일합니다.
6. RM 등록 성공
o.a.s.c.rpc.netty.RmNettyRemotingClient : register RM success. client version:2.3.0, server version:2.3.0,channel:[id: 0xb44a1918, L:/127.0.0.1:56432 - R:/127.0.0.1:8091]
TC 서버가 RM의 등록 요청을 승인하고, RM과 TC 간의 Netty 채널이 설정됩니다. 이 채널은 브랜치 트랜잭션의 등록, 보고, 커밋, 롤백 등을 위한 통신에 사용됩니다.
이러한 일련의 과정을 통해 TC와 TM, RM이 연결되는 것입니다.
결론
Seata의 분산 트랜잭션 처리는 TC, TM, RM 세 가지 역할이 상호 협력하여 이루어집니다. TC는 중앙 코디네이터 역할을 하며, TM은 글로벌 트랜잭션의 시작과 종료를 관리하고, RM은 리소스 작업과 브랜치 트랜잭션을 처리합니다. Spring Boot 애플리케이션에서 Seata가 시작될 때, TM과 RM이 TC에 등록되고 Netty 기반의 통신 채널이 설정됩니다. 이후 글로벌 트랜잭션이 시작되면 TM, RM, TC 간의 메시지 교환을 통해 분산 트랜잭션의 일관성이 보장됩니다.