Spring 관점에서 보는 Seata의 내부 통신

2025. 5. 6. 07:56·Backend
반응형

해당 글을 이해하기 위해선 이전 글을 보고 오셔야 됩니다. (특히 TC, TM, RM의 관계에 대해 인지하고 계셔야 합니다)
https://solution-is-here.tistory.com/235

 

Apache Seata란?

1. Seata란?Apache Seata는 마이크로서비스 아키텍처에서 고성능과 사용 편의성을 제공하는 분산 트랜잭션 프레임워크입니다. 알리바바에 의해 시작되었으며, 2023년에 Apache 재단에 기부되었습니다. S

solution-is-here.tistory.com

 

이번 글에서는 Seata를 실행했을 때 발생하는 로그를 통해 어떻게 TC와 TM, RM이 연결되는지 알아보려 합니다.

이때 로그에서 필요한 정보를 제외한 나머지 정보는 생략했습니다. (시간, 프로젝트 이름)

초기화 과정 분석

1. Seata 자동 구성

o.a.s.s.b.a.SeataAutoConfiguration : Automatically configure Seata

Spring Boot 애플리케이션이 시작될 때 SeataAutoConfiguration이 자동으로 Seata를 구성합니다. 이 과정에서 Seata의 핵심 구성 요소가 초기화됩니다.

 

코드를 자세히 보면 @DependsOn 어노테이션을 통해 SpringApplicationContextProvider와 failureHandler를 초기화한 후, 빈을 생성하는 것을 볼 수 있습니다. (seata가 springContext에 접근을 해야되기 때문에 SpringApplicationContextProvider 빈이 초기화 된 뒤, 빈이 생성되어야 합니다)

@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public static GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler,
        ConfigurableListableBeanFactory beanFactory,
        @Autowired(required = false) List<ScannerChecker> scannerCheckers) {
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Automatically configure Seata");
    }

    // set bean factory
    GlobalTransactionScanner.setBeanFactory(beanFactory);

    // add checkers
    // '/META-INF/services/org.apache.seata.spring.annotation.ScannerChecker'
    GlobalTransactionScanner.addScannerCheckers(EnhancedServiceLoader.loadAll(ScannerChecker.class));
    // spring beans
    GlobalTransactionScanner.addScannerCheckers(scannerCheckers);

    // add scannable packages
    GlobalTransactionScanner.addScannablePackages(seataProperties.getScanPackages());
    // add excludeBeanNames
    GlobalTransactionScanner.addScannerExcludeBeanNames(seataProperties.getExcludesForScanning());
    //set accessKey and secretKey
    GlobalTransactionScanner.setAccessKey(seataProperties.getAccessKey());
    GlobalTransactionScanner.setSecretKey(seataProperties.getSecretKey());
    // create global transaction scanner
    return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(),
            seataProperties.isExposeProxy(), failureHandler);
}

 

2. 글로벌 트랜잭션 클라이언트 초기화

o.a.s.s.a.GlobalTransactionScanner : Initializing Global Transaction Clients ...

1 단계에서 bean으로 생성된 GlobalTransactionScanner가 글로벌 트랜잭션 클라이언트(TM과 RM)를 초기화합니다.

 

아래의 코드와 같이 GlobalTransactionScanner은 InitializingBean을 구현한 것을 볼 수 있다.

이때 InitializingBean 인터페이스에는 afterPropertiesSet 메서드가 있는데, 해당 메서드는 의존성 주입이 끝난 뒤, 호출되는 특징을 가진 메서드입니다.

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
        implements CachedConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {

 

GlobalTransactionScanner에서 afterPropertiesSet을 Override 한 것을 보면 initClient 메서드를 호출하는 것을 볼 수 있습니다.

@Override
public void afterPropertiesSet() {
    if (disableGlobalTransaction) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global transaction is disabled.");
        }
        ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (CachedConfigurationChangeListener) this);
        return;
    }
    if (initialized.compareAndSet(false, true)) {
        initClient();
    }

    this.findBusinessBeanNamesNeededEnhancement();
}

 

initClient 메서드에서 글로벌 트랜잭션 클라이언트(TM, RM)을 초기화합니다.

protected void initClient() {
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Initializing Global Transaction Clients ... ");
    }
    if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
        LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +
                        "please change your default configuration as soon as possible " +
                        "and we don't recommend you to use default tx-service-group's value provided by seata",
                DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
    }
    if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
        throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
    }
    //init TM
    TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }
    //init RM
    RMClient.init(applicationId, txServiceGroup);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }

    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Global Transaction Clients are initialized. ");
    }
    registerSpringShutdownHook();

}

이러한 코드를 통해 GlobalTransactionScanner 빈이 의존성 주입이 완료되면 자동으로 트랜잭션 클라이언트(TM, RM)가 초기화됩니다. 

 

3. TM 등록 과정

o.a.s.c.rpc.netty.NettyPoolableFactory   : NettyPool create channel to transactionRole:TMROLE,address:127.0.0.1:8091,msg:< RegisterTMRequest{version='2.3.0', applicationId='distributed-transactions-at-demo', transactionServiceGroup='my_test_tx_group', extraData='ak=null

TM은 TC 서버에 자신을 등록하기 위해 물리적 TCP 연결을 생성합니다.

 

//init TM
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);

GlobalTransactionScanner의 initClient에서 TMClient.init을 통해 TM을 초기화합니다.

이 과정에서 TM을 등록하는데 필요한, applicationId, txServiceGroup, accessKey, secretKey 등을 함께 전달합니다.

 

public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
    TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);
    tmNettyRemotingClient.init();
}

TmClient는 GlobalTransactionScanner로부터 받은 정보를 바탕으로 TmNettyRemotingClient 인스턴스를 생성한 뒤, init 메서드를 통해 초기화합니다.

 

@Override
public void init() {
    // registry processor
    registerProcessor();
    if (initialized.compareAndSet(false, true)) {
        super.init();
        if (isNotBlank(transactionServiceGroup)) {
            initConnection();
        }
    }
}

TmNettyRemotingClient에서 부모 클래스의 init 메서드를 호출합니다.

 

@Override
public void init() {
    timerExecutor.scheduleAtFixedRate(
            () -> {
                try {
                    clientChannelManager.reconnect(getTransactionServiceGroup());
                } catch (Exception ex) {
                    LOGGER.warn("reconnect server failed. {}", ex.getMessage());
                }
            },
            SCHEDULE_DELAY_MILLS,
            SCHEDULE_INTERVAL_MILLS,
            TimeUnit.MILLISECONDS);
    if (this.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(
                MAX_MERGE_SEND_THREAD,
                MAX_MERGE_SEND_THREAD,
                KEEP_ALIVE_TIME,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    super.init();
    clientBootstrap.start();
}

부모 클래스인 AbstractNettyRemotingClient의 init 메서드에서 try-catch를 통해 NettyClientChannelManager 클래스의 reconnect를 호출합니다.

 

private Channel doConnect(String serverAddress) {
    Channel channelToServer = channels.get(serverAddress);
    if (channelToServer != null && channelToServer.isActive()) {
        return channelToServer;
    }
    Channel channelFromPool;
    try {
        NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
        poolKeyMap.put(serverAddress, currentPoolKey);
        channelFromPool = nettyClientKeyPool.borrowObject(currentPoolKey);
        channels.put(serverAddress, channelFromPool);
    } catch (Exception exx) {
        LOGGER.error("{} register RM failed.", FrameworkErrorCode.RegisterRM.getErrCode(), exx);
        throw new FrameworkException("can not register RM,err:" + exx.getMessage());
    }
    return channelFromPool;
}
NettyClientChannelManager의 reConnect가 doConnect를 호출하는데 중간 과정이 복잡해 해당 과정은 생략하겠습니다.

NettyClientChannelManager의 doConnect에서 nettyClientKeyPool.borrowObject를 통해 채널을 획득하려고 합니다.

하지만, 초기화를 하는 과정에서는 nettyClientKeyPool에 저장된 채널이 없기에 borrowObject를 해도 얻는 것이 없습니다.

 

그래서 채널을 생성하기 위해 makeObject 메서드가 호출됩니다.
(이 메서드가 호출되는 것을 설명하기 위해 이렇게 길게 설명했습니다.....)

 

@Override
public Channel makeObject(NettyPoolKey key) {
    InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("NettyPool create channel to " + key);
    }
    Channel tmpChannel = clientBootstrap.getNewChannel(address);
    long start = System.currentTimeMillis();
    Object response;
    Channel channelToServer = null;
    if (key.getMessage() == null) {
        throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name());
    }
    try {
        response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage());
        if (!isRegisterSuccess(response, key.getTransactionRole())) {
            rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());
        } else {
            channelToServer = tmpChannel;
            rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage());
        }
    } catch (Exception exx) {
        if (tmpChannel != null) {
            tmpChannel.close();
        }
        throw new FrameworkException(
            "register " + key.getTransactionRole().name() + " error, errMsg:" + exx.getMessage());
    }
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("register success, cost " + (System.currentTimeMillis() - start) + " ms, version:" + getVersion(
            response, key.getTransactionRole()) + ",role:" + key.getTransactionRole().name() + ",channel:"
            + channelToServer);
    }
    return channelToServer;
}

makeObject에서 InetSocketAddress 객체를 생성하면서 Netty의 Bootstrap을 사용해 물리적 TCP 연결을 생성합니다.

 

 

4. TM 등록 성공

o.a.s.c.rpc.netty.TmNettyRemotingClient  : register TM success. client version:2.3.0, server version:2.3.0,channel:[id: 0xd0eadd3f, L:/127.0.0.1:56429 - R:/127.0.0.1:8091]

TC 서버가 TM의 등록 요청을 승인하고, TM과 TC 간의 Netty 채널이 설정됩니다. 이 채널은 글로벌 트랜잭션의 시작, 커밋, 롤백 등을 위한 통신에 사용됩니다.

 

@Override
public Channel makeObject(NettyPoolKey key) {
    InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("NettyPool create channel to " + key);
    }
    Channel tmpChannel = clientBootstrap.getNewChannel(address);
    long start = System.currentTimeMillis();
    Object response;
    Channel channelToServer = null;
    if (key.getMessage() == null) {
        throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name());
    }
    try {
        response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage());
        if (!isRegisterSuccess(response, key.getTransactionRole())) {
            rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());
        } else {
            channelToServer = tmpChannel;
            rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage());
        }
    } catch (Exception exx) {
        if (tmpChannel != null) {
            tmpChannel.close();
        }
        throw new FrameworkException(
            "register " + key.getTransactionRole().name() + " error, errMsg:" + exx.getMessage());
    }
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("register success, cost " + (System.currentTimeMillis() - start) + " ms, version:" + getVersion(
            response, key.getTransactionRole()) + ",role:" + key.getTransactionRole().name() + ",channel:"
            + channelToServer);
    }
    return channelToServer;
}

3단계에서 생성한 InetSocketAddress를 바탕으로 채널을 생성합니다.

이후, 생성된 채널을 통해 등록 메시지를 TC 서버로 전송합니다.
메시지 전송이 성공한 경우 "register TM success. ~~"와 같은 메시지를 출력합니다.

 

5. RM 초기화 및 등록

o.a.s.c.rpc.netty.NettyPoolableFactory   : NettyPool create channel to transactionRole:RMROLE,address:127.0.0.1:8091,msg:< RegisterRMRequest{resourceIds='jdbc:mysql://localhost:3306/user-service', version='2.3.0', applicationId='distributed-transactions-at-demo', transactionServiceGroup='my_test_tx_group', extraData='null'} >

RM은 TC 서버에 자신을 등록하기 위해 물리적 TCP 연결을 생성합니다.

TM과 RM의 초기화 & 등록 방법은 동일합니다.

6. RM 등록 성공

o.a.s.c.rpc.netty.RmNettyRemotingClient  : register RM success. client version:2.3.0, server version:2.3.0,channel:[id: 0xb44a1918, L:/127.0.0.1:56432 - R:/127.0.0.1:8091]

TC 서버가 RM의 등록 요청을 승인하고, RM과 TC 간의 Netty 채널이 설정됩니다. 이 채널은 브랜치 트랜잭션의 등록, 보고, 커밋, 롤백 등을 위한 통신에 사용됩니다.

 

이러한 일련의 과정을 통해 TC와 TM, RM이 연결되는 것입니다.

결론

Seata의 분산 트랜잭션 처리는 TC, TM, RM 세 가지 역할이 상호 협력하여 이루어집니다. TC는 중앙 코디네이터 역할을 하며, TM은 글로벌 트랜잭션의 시작과 종료를 관리하고, RM은 리소스 작업과 브랜치 트랜잭션을 처리합니다. Spring Boot 애플리케이션에서 Seata가 시작될 때, TM과 RM이 TC에 등록되고 Netty 기반의 통신 채널이 설정됩니다. 이후 글로벌 트랜잭션이 시작되면 TM, RM, TC 간의 메시지 교환을 통해 분산 트랜잭션의 일관성이 보장됩니다.

반응형
저작자표시 동일조건 (새창열림)
'Backend' 카테고리의 다른 글
  • 분산 데이터베이스 환경에서 Seata를 이용한 트랜잭션 처리
  • ACID 관점에서 보는 Seata의 4가지 트랜잭션 모드
  • Apache Seata란?
  • 내가 JUnit5에 글로벌 Extension 필터링 기능을 추가한 이야기
코딩하는_대학생
코딩하는_대학생
Java Developer, Open Source Enthusiast, Proud Son
  • 코딩하는_대학생
    코딩하는 대학생에서 개발자까지
    코딩하는_대학생
  • 전체
    오늘
    어제
    • 분류 전체보기 (218)
      • 코딩하는 대학생의 책 추천 (8)
        • 클린코드 (5)
        • 헤드퍼스트 디자인패턴 (3)
      • Backend (8)
        • Spring (14)
        • AWS (3)
        • 회고 (4)
        • Redis (5)
        • 다양한 시각에서 바라본 백엔드 (3)
      • Python (35)
        • 개념 및 정리 (15)
        • 백준 문제풀이 (20)
      • JAVA (17)
        • 개념 및 정리 (14)
        • 백준 문제풀이 (2)
      • 왜? (7)
      • C언어 (42)
        • 개념 및 정리 (9)
        • 백준 문제풀이 (32)
      • 개인 공부 (27)
        • 대학 수학 (5)
        • 대학 영어 (10)
        • 시계열데이터 처리 및 분석 (5)
        • 컴퓨터 네트워크 (6)
        • 운영체제 (1)
      • 솔직 리뷰 (23)
        • 꿀팁 (6)
        • IT기기 (1)
        • 국내 여행 (7)
        • 맛집 (2)
        • 알바 리뷰 (2)
      • 대외활동 (17)
        • 체리피우미 3기 (4)
        • 꿀잠이들 6기 (13)
      • 음식 평가 (1)
      • 일상 & 근황 (2)
  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
코딩하는_대학생
Spring 관점에서 보는 Seata의 내부 통신
상단으로

티스토리툴바