Spring Tx源码解析(三)

前言

  上篇我们分析了spring-tx中的AOP部分,包括TransactionAttributeSourcePointcut如何定位潜在的事务方法,以及TransactionInterceptor又如何结合PlatformTransactionManager为方法应用事务管理,相信看过上篇的同学也从中get到了使用AOP的新姿势😂

  不过到目前为止,除去开篇中的概念介绍,我们对PlatformTransactionManager还知之甚少,本篇我们就深入一点儿。DataSourceTransactionManager作为最具代表性的 PlatformTransactionManager实现,我们就以它为例来看看隐藏在抽象下的实现细节吧。

一个小例子

  来看一个使用Spring声明式事务的小例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Service
public class ServiceA {
@Autowired
private ServiceB serviceB;
// Propagation.REQUIRED 有就加入,没有就自己玩
@Transactional(propagation = Propagation.REQUIRED)
public void service() {
// do some CRUD stuff...
// then call serviceB#service()
serviceB.service();
}
}

@Service
public class ServiceB {
// Propagation.REQUIRES_NEW 自扫门前雪,各用各的事务
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void service() {
// do some CRUD stuff...
}
}

@RestController
public class DemoController {
@Autowired
private ServiceA serviceA;

@PostMapping("/demo")
public void demo() {
serviceA.service();
}
}

这里我们有两个事务方法,分别属于ServiceAServiceB,处理请求的入口在DemoController。请求到达后,

  1. 进入服务 A 的service()方法
  2. 开启事务 tx-a
  3. 执行CRUD
  4. 进入服务 B 的service()方法
  5. 挂起 tx-a,开启新事务 tx-b
  6. 执行CRUD
  7. 结束serviceB#service()调用,视执行情况提交或回滚事务
  8. 结束serviceA#service()调用,视执行情况提交或回滚事务

麻雀虽小五脏俱全,这个小例子也是后续分析源码时的参照,先给出来混个脸熟吧。

DataSourceTransactionManager

Hierarchy overview
tx-manager

  DataSourceTransactionManager继承自AbstractPlatformTransactionManager,基类中实现了事务管理的一整套工作流:

  1. 检查是否已存在事务
  2. 应用适当的传播行为
  3. 必要时暂停和恢复事务
  4. 提交事务时检查rollback-only标记
  5. 回滚事务时的处理——如实回滚或仅设置rollback-only标记
  6. 触发已注册的transaction synchronization回调

子类只需要根据事务的状态实现特定的模板方法即可,比如事务开始、暂停、恢复、提交和回滚。

TransactionInterceptor

  事务拦截器想必大家都很熟悉了,简化后的模型大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  // 详细的分析请看上篇
public Object invoke(MethodInvocation invocation) throws Throwable {
Object result = null;
// 1. 解析@Transactional注解
TransactionAttribute txAttr = xxx;
// 2. 获取事物管理器
PlatformTransactionManager txManager = xxx;
// 3. 根据配置的事务属性开启事务
TransactionStatus status = txManager.getTransaction(txAttr);
try {
// 4. 执行目标方法
result = invocation.proceed();
} catch (Throwable ex) {
// 5-1. 方法异常退出,根据配置决定是提交还是回滚
if (txAttr.rollbackOn(ex)) {
txManager.rollback(status);
} else {
txManager.commit(status);
}
throw ex;
}
// 5-2. 方法正常执行完成,提交事务
txManager.commit(status);
return result;
}

核心流程大致分为5步,其中和事务管理器有关的是第3和第5步。前情提要到此结束,那我们就从第3步getTransaction(...)开始呗~

getTransaction
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// 使用默认配置,如果没有的话
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 模板方法,子类实现
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 检查是否已存在事务
if (isExistingTransaction(transaction)) {
// 已存在事务,根据传播行为来决定如何处理
return handleExistingTransaction(def, transaction, debugEnabled);
}
// 检查一下超时配置,默认 -1 ,不能更低了
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// 当前不存在事务,同样要根据传播行为来决定如何处理
// PROPAGATION_MANDATORY 要求一定要运行在事务中,当前事务不存在,因此这里选择抛出异常
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// PROPAGATION_REQUIRED 在没有事务存在时会开启新的独立事务
// PROPAGATION_REQUIRES_NEW 无论是否有事务存在都会开启新的独立事务
// PROPAGATION_NESTED 在没有事务存在时和PROPAGATION_REQUIRED的行为一致
// 换言之,走到这里,这三种传播行为都会开启新的独立事务
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 挂起当前事务,这里传null是因为当前没有事务
// 仍然调用suspend的意义在于触发 TransactionSynchronization#suspend() 回调
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
// 开启新的独立事务
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
// 事务开启失败,恢复已存在的事务
// 同样为了触发 TransactionSynchronization 回调,这里是 resume()
resume(null, suspendedResources);
throw ex;
}
}
// 其它传播行为在当前没有事务存在时会继续以非事务方式运行,比如PROPAGATION_SUPPORTS
// 不用为它们开启新的事务,但需要处理可能注册的transaction synchronization
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}

  getTransaction(...)首先会调用模板方法doGetTransaction()来获取事务对象(transaction object)。事务对象虽然特定于具体的PlatformTransactionManeger实现,但一般都会携带相应的事务状态,我们来分析一下DataSourceTransactionManager对它的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
// 是否允许嵌套事务,传播行为PROPAGATION_NESTED时有用
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// 以javax.sql.DataSource为键,ConnectionHolder为值绑定到线程私有存储
// 通过TransactionSynchronizationManager统一管理(重要)
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
// 若能获取到这样一个ConnectionHolder,自然是一个已存在的,因此是false
txObject.setConnectionHolder(conHolder, false);
return txObject;
}

  开篇介绍ResourceHolder接口的时候,我们曾讨论过如何保证多个方法运行在同一个事务中的话题。若将事务窄化为数据库事务的话,自然是要保证多个方法使用同一个JDBC Connection,ConnectionHolder作为ResouceHolder的一种实现就是用来携带JDBC Connection的。当然了,除了作为java.sql.Connection的容器以外,ConnectionHolder还有一些可配置属性,比如设置rollback-only标记、超时时长等。DataSourceTransactionObject可以说是java.sql.Connection的一个快照,它保存了数据库连接的一系列状态(用于后续对数据库连接的复原),比如当前的隔离级别、自动提交(auto commit)的取值以及一个指示java.sql.Connection是通过javax.sql.DataSource新创建的还是从线程私有存储中获取的标记。这两个类都比较简单,大家可以自行查阅一下,这里就不细说了。

  了解了ConnectionHolderDataSourceTransactionObject以后,再看isExistingTransaction(...)就很简单了。

1
2
3
4
5
6
7
8
@Override
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 1. 必须持有 ConnectionHolder 实例
// 2. 事务已开启,新开的事务会设置这个标志
// 满足这两个条件就可以认为已经有一个已存在的事务
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}

还记得前面给出的小例子吗?对于服务 A 的service()方法来说,很明显我们是无法从线程私有存储中获取到ConnectionHolder实例的,因此isExistingTransaction(...)必然返回 false。这里我们也先跳过handleExistingTransaction(...),晚点再来分析它。接下来,在没有任何已知事务的前提下,我们一起来看看spring-tx是如何根据传播行为的配置情况做不同处理的。

  PROPAGATION_MANDATORY要求一定要运行在事务中,而当前不存在任何事务,因此会抛出异常。PROPAGATION_REQUIREDPROPAGATION_REQUIRES_NEWPROPAGATION_NESTED这三种传播行为在没有事务存在时的表现是一致的,它们都会开启一个新的独立事务。新事务的开启也意味着对旧事务(或者说外层事务)的挂起,所以我们看到这第一步便是调用suspend(...),而这里传null也是因为当前不存在任何已知事务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) 
throws TransactionException {
// 首先查看 transaction synchronization 是否已激活
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 执行 TransactionSynchronization#suspend() 回调,解除 transaction synchronization 激活状态
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
// 若已有事务,调用模板方法执行挂起逻辑
if (transaction != null) {
suspendedResources = doSuspend(transaction);
}
// 将当前事务的名称、readOnly标记、隔离级别以及是否事务已激活进行重置
// 并把这些属于旧事务的信息记录到 SuspendedResourcesHolder , 用于后续的复原
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException | Error ex) {
// 事务挂起失败不代表失效,需要进行恢复 -> 回调 TransactionSynchronization#resume()
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}
// 事务已激活,但 transaction synchronization 未激活
else if (transaction != null) {
// 这种情况下只需要调用模板方法挂起事务即可
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
// 事务未激活、transaction synchronization 也未激活,没有什么需要挂起的
else {
return null;
}
}

  suspend(...)中牵涉到spring-tx事务同步机制(transaction synchronization mechanism),这部分内容我们在开篇中介绍过。如若事务同步机制已激活,就需要先执行对应的回调,这也是doSuspendSynchronization()做的工作。接下来才是调用模板方法doSuspend(...)DataSourceTransactionManager实现了这个方法。

1
2
3
4
5
6
7
8
@Override
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 清除持有的 ConnectionHolder
txObject.setConnectionHolder(null);
// 并将其从ThreadLocal中解绑
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

主要就是将ConnectionHolder从事务对象和线程私有存储中移除,然后转移到SuspendedResourcesHolder中用于后续的恢复。回到getTransaction(...),挂起旧事务后自然是要开启新事物。

1
2
3
4
5
6
7
8
9
10
11
12
13
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
// 是否需要激活 transaction synchronization
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建一个代表事务状态的对象
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 模板方法,通知新事务已启动
doBegin(transaction, definition);
// 按需初始化 transaction synchronization
prepareSynchronization(status, definition);
return status;
}

简单明了的三步:

  1. 创建TransactionStatus实例,它包含了一系列的事务状态
  2. 调用模板方法doBegin(...),通知新事务已启动
  3. 按需开启事务同步机制

DeaultTransactionStatus就是一个上下文对象或者说信息的集合体,代码好像没什么可说的,大家自行查阅一下吧,我们重点来看一下doBegin(...)模板方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
 @Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
// 事务对象未持有 ConnectionHolder
// 或事务对象持有 ConnectionHolder,并且与事务同步
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 获取一个新的数据库连接
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
// 绑定到事务对象,注意这里 newConnectionHolder 为 true
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 标记 ConnectionHolder 与事务同步,对应上面的检查,避免过多的获取数据库连接
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 应用read-only标记,并返回初始的隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
// 应用上配置的隔离级别,初始的隔离级别已经记录下来了,后续可恢复
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 打 read-only 标记
txObject.setReadOnly(definition.isReadOnly());
// 事务的开启需要设置自动提交为false
if (con.getAutoCommit()) {
// 记录下需要在后续恢复自动提交
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
// 自动提交设为false,标识着事务的开启
con.setAutoCommit(false);
}
// read-only 标记相关,执行 SET TRANSACTION READ ONLY 语句
prepareTransactionalConnection(con, definition);
// 重要:标记事务已激活,此时新的数据库连接已获取,并且关闭了自动提交
txObject.getConnectionHolder().setTransactionActive(true);
// 应用超时时长
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 绑定到线程私有存储,后续可通过 TransactionSynchronizationManager#getResource(...) 获取
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
// 出错的话,释放数据库连接并抛出异常
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}

doBegin(...)设置transaction active标记,isExistingTransaction(...)会检查这个标记,这样就构成了一个闭环。最后一步是调用prepareSynchronization(...)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
// 检查是否是全新开启的事务同步机制
// 换言之 TransactionSynchronizationManager.isSynchronizationActive() 需为false
if (status.isNewSynchronization()) {
// 记录事务是否已激活,也就是是否有事务对象
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
// 记录当前的隔离级别
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
// 记录是否是read-only型事务
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
// 记录事务名称
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
// 激活事务同步机制
// initSynchronization()调用之后isSynchronizationActive()就返回true,我们已经在suspend(...)中见过这个检查
TransactionSynchronizationManager.initSynchronization();
}
}

再次回到getTransaction(...),可以看到startTransaction(...)是包裹在 try 块中的,以便在出现异常时恢复已有事务,那我们就看看具体是怎么恢复的呗。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected final void resume(@Nullable Object transaction,
@Nullable SuspendedResourcesHolder resourcesHolder) throws TransactionException {
// 如果有挂起的事务
if (resourcesHolder != null) {
// 挂起的资源,也就是doSuspend(...)中解绑的ConnectionHolder
Object suspendedResources = resourcesHolder.suspendedResources;
// 调用模板方法,重新绑定ConnectionHolder到ThreadLocal
if (suspendedResources != null) {
doResume(transaction, suspendedResources);
}
// suspend(...)的逆操作
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
// 回调TransactionSynchronization#resume(),重新激活 transaction synchronization
doResumeSynchronization(suspendedSynchronizations);
}
}
}

@Override
protected void doResume(@Nullable Object transaction, Object suspendedResources) {
// 重新绑定
TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}

基本上是suspend(...)的逆操作,至此,创建新事务的流程就分析完了。等等,你以为这就完了?还记得我们的小例子吗?ServiceB说这才哪到哪,好戏才刚刚开始呢。没错,对ServiceB而言,ServiceA#service()已经开启事务了,等到它调用service()的时候,isExistingTransaction(...)就返回 true 而不是 false 了,那spring-tx是如何处理当前已存在事务的情况呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// PROPAGATION_NEVER不允许运行在事务中,因此抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// PROPAGATION_NOT_SUPPORTED不支持在事务中运行
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
// 挂起当前事务,以非事务方式运行
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 创建不支持事务的 TransactionStatus,因此 newTransaction 为 false
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// PROPAGATION_REQUIRES_NEW需要开启新事务
// 流程和之前分析的一致,只不过这里不再传 null 了而已
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
// 挂起已有事务
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
// 开启新事务
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
// 出现异常后恢复已有事务
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// PROPAGATION_NESTED在已有事务的情况下是创建嵌套事务,也就是Savepoint
// 限于篇幅,Savepoint相关的我们就不讲了,用得也不是那么多
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
return startTransaction(definition, transaction, debugEnabled, null);
}
}

// 剩下来还能发挥作用的就只有PROPAGATION_SUPPORTS 和 PROPAGATION_REQUIRED 了
// 这两在已有事务的情况下行为是一致的,都是加入已有事务
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
// 检验事务配置
if (isValidateExistingTransaction()) {
// 隔离级别需保持一致
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
// read-only 标记要一致
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建加入已有事务的 TransactionStatus,因此 newTransaction 为false
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

protected final DefaultTransactionStatus prepareTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
// 创建 TransactionStatus
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
// 按需初始化事务同步机制
prepareSynchronization(status, definition);
return status;
}

handleExistingTransaction(...)也对不同的传播行为做了处理,至于其中涉及到的方法调用,前面都已经分析过了。各位同学在自己看源码的时候,建议动手调试调试,彻底梳理清楚其间的状态标志及其转换为好。

rollback

  嘟嘟嘟,CRUD出异常了,这会该回滚事务了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
// 同一个事务不能多次回滚
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 处理回滚,unexpected用于标记这是不是一个预期之外的回滚
// 方法执行出现异常,且符合rollbackOn配置,那么回滚是正常的,否则就是不正常的
processRollback(defStatus, false);
}

private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
// 预期之外的异常?
boolean unexpectedRollback = unexpected;
try {
// 回调 TransactionSynchronization#beforeCompletion()
triggerBeforeCompletion(status);
// 有Savepoint,回滚到保存点
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
// 独立事务
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
// 调用模板方法进行回滚
doRollback(status);
}
else {
// 非独立事务,加入的一个已存在的事务
if (status.hasTransaction()) {
// 已标记为rollback-only,出现于多个事务方法的嵌套调用,比如 A -> B -> C,且传播行为的配置不会导致开启多个事务
// 或者配置了外层事务会因为内层事务的异常而回滚,默认true
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
// 调用模板方法设置rollback-only标记
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// 不需要fail-fast,清楚标志,后续不抛异常
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
// 回调 TransactionSynchronization#afterCompletion(...),状态未知
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
// 回调 TransactionSynchronization#afterCompletion(...),已回滚
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// 抛出异常
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
// 清理
cleanupAfterCompletion(status);
}
}

回滚分为三种情况:

  1. 嵌套事务(创建有Savepoint)回滚
  2. 独立事务回滚
  3. 非独立事务回滚

篇幅有限,Savepoint就不说了,重点看2、3两种情况。对于独立事务,调用doRollback(...)进行回滚,基于java.sql.Connection的回滚机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
// 回滚
con.rollback();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}

对于非独立事务,调用doSetRollbackOnly(...),仅仅是打上rollback-only标记用于后续的检测,processRollback(...)本身就对这个标记进行了检测。

1
2
3
4
5
6
7
8
9
10
@Override
protected void doSetRollbackOnly(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
if (status.isDebug()) {
logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() +
"] rollback-only");
}
// 代理给ConnectionHolder,打上rollback-only标记
txObject.setRollbackOnly();
}

能做的都做了,最后该清理现场了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
// 标记事务已完成
status.setCompleted();
// 若开启了事务同步机制,清除相关的线程私有变量
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
// 若是独立事务,调用模板方法进行清理
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
// 若还存在外层事务
if (status.getSuspendedResources() != null) {
if (status.isDebug()) {
logger.debug("Resuming suspended transaction after completion of inner transaction");
}
// 恢复外层事务
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}

/**
* 基本上是 doBegin(...)的逆操作
*/
@Override
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 解绑
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
Connection con = txObject.getConnectionHolder().getConnection();
try {
// 如有必要,恢复自动提交
if (txObject.isMustRestoreAutoCommit()) {
con.setAutoCommit(true);
}
// 恢复原有隔离级别和readOnly标记
DataSourceUtils.resetConnectionAfterTransaction(
con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
}
catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction", ex);
}
// 若是独立事务,释放连接
if (txObject.isNewConnectionHolder()) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
}
DataSourceUtils.releaseConnection(con, this.dataSource);
}
// 清除ConnectionHolder的状态
txObject.getConnectionHolder().clear();
}
commit

  要是没出什么幺蛾子,就可以提交事务了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@Override
public final void commit(TransactionStatus status) throws TransactionException {
// 同一个事务不能重复提交
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 当前事务被标记为rollback-only
// 何时打上rollback-only标记可参考对 rollback 的分析
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
// 执行回滚而不是提交
processRollback(defStatus, false);
return;
}
// 全局回滚,XA事务可能会有吧
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
// 执行回滚,unexpected为true
// 比如一个XA事务,事务A执行正常,事务B执行异常
// 此时这个大事务是不能提交的,事务协调者会标记全局rollback-only
// 这样就可以让原本正常的事务A也回滚而不是提交
processRollback(defStatus, true);
return;
}
// 终于可以提交了,开心😄
processCommit(defStatus);
}

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;

try {
boolean unexpectedRollback = false;
// 准备提交,扩展点
prepareForCommit(status);
// 回调 TransactionSynchronization#beforeCommit()
triggerBeforeCommit(status);
// 回调 TransactionSynchronization#beforeCompletion()
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
// Savepoint,忽略不说了
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
// 独立事务
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
// 读取全局rollback-only标记
unexpectedRollback = status.isGlobalRollbackOnly();
// 调用模板方法进行提交
doCommit(status);
}
// fail-fast,读取全局rollback-only标记
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// 抛出异常,如果全局标记有 rollback-only
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// 回调 TransactionSynchronization#afterCompletion(...)
// 对应上面抛出的异常,此时事务是回滚的而不是已提交
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// 如果需要在提交失败时进行回滚的话,默认false
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
// 回调 TransactionSynchronization#afterCompletion(...),状态未知
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
// 确保 TransactionSynchronization#beforeCompletion()得到执行
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
// 回滚事务,如果提交失败的话,注意与前面遇到TransactionException时处理的异同
doRollbackOnCommitException(status, ex);
throw ex;
}
// 回调 TransactionSynchronization#afterCommit()
try {
triggerAfterCommit(status);
}
finally {
// 回调 TransactionSynchronization#afterCompletion(...),状态已提交
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
// 清理
cleanupAfterCompletion(status);
}
}

  commit相对来讲是比较复杂的,它牵涉到对rollback-only标记的处理。这里解释一下global rollback only,区别于local rollback only,这个标记多用于多事务协作的情况,比如两阶段提交协议下的分布式事务。

  区别于rollbackcommit不需要处理非独立事务的情况,内层事务不需要提交,它只需要等待外层事务提交即可。对于独立事务,模板方法doCommit(...)也只是简单的代理给java.sql.Connection的提交机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
catch (SQLException ex) {
// processCommit(...)对doCommit(...)抛出的TransactionException有特殊处理
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}

后记

  最后回顾一下我们的小例子,现在它还有秘密吗?本篇我们一起探索了PlatformTransactionManeger抽象下的事务管理机制,篇幅较长,相信耐心看完的你一定有所收获😊


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!