Spring Tx源码解析(一)

引言

  Spring提供的声明式事务想必大家都很熟悉了,简简单单一个@Transactional注解便能提供如此强大的功能,那么它是如何实现的呢?带着这点好奇心,我们一起扒拉扒拉spring-tx-5.2.6.RELEASE的源码吧。

  所谓工欲善其事,必先利其器,深入源码之前了解清楚spring-tx的相关概念还是很有必要的。本篇算是Spring AOP源码解析的姊妹篇,毕竟声明式事务就是AOP思想的一个实际应用嘛。

Spring Tx Concepts

PlatformTransactionManager

  PlatformTransactionManager是Spring事务管理的核心接口,它规范了应用程序操作事务的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface PlatformTransactionManager extends TransactionManager {
/**
* 获取事务的状态信息(依据传播行为的不同,可能返回一个已激活的事务或创建一个新的独立事务)
*/
TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException;

/**
* 提交当前事务,依据事务的当前状态,也可能会进行回滚,比如标记为rollback-only的事务
*/
void commit(TransactionStatus status) throws TransactionException;

/**
* 回滚当前事务,依据传播行为的不同,非独立的内部事务仅会打上rollback-only标记
*/
void rollback(TransactionStatus status) throws TransactionException;
}

TransactionDefinition

  TransactionDefinition描述了事务的相关属性,比如事务的隔离级别、传播行为,亦或是超时时长是多少等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public interface TransactionDefinition {

/**
* 返回事务的传播行为
*/
default int getPropagationBehavior() {
return PROPAGATION_REQUIRED;
}

/**
* 返回事务的隔离级别
*/
default int getIsolationLevel() {
return ISOLATION_DEFAULT;
}

/**
* 获取事务的超时时长
*/
default int getTimeout() {
return TIMEOUT_DEFAULT;
}

/**
* 是否是只读事务
*/
default boolean isReadOnly() {
return false;
}

/**
* 获取事务名称,多用于debug
*/
@Nullable
default String getName() {
return null;
}
}

  事务的隔离级别想必不用多做解释,传播行为(Propagation Behavior)是什么概念呢?简单来说,当一个事务方法被另一个事务方法调用时,传播行为可以控制是否需要创建事务以及如何创建事务,spring-tx中定义了7种事务传播行为:

  1. PROPAGATION_REQUIRED: 表示方法必须运行在事务中,如果当前事务存在,方法将会在该事务中运行,否则将开启一个新的独立事务
  2. PROPAGATION_SUPPORTS:表示方法支持在事务中运行,如果当前事务存在,方法将会在该事务中运行,否则以非事务方式运行
  3. PROPAGATION_MANDATORY:表示方法必须运行在事务中,如果当前事务不存在,抛出异常
  4. PROPAGATION_REQUIRES_NEW:表示方法必须运行在独立事务中,无论当前是否存在事务,该级别总会开启一个新的独立事务
  5. PROPAGATION_NOT_SUPPORTED:表示方法不支持在事务中运行,如果当前事务存在,挂起当前事务从而以非事务方式运行
  6. PROPAGATION_NEVER:表示方法不支持在事务中运行,如果当前事务存在,抛出异常
  7. PROPAGATION_NESTED:表示方法必须运行在事务中,如果当前事务存在,开启一个嵌套事务(Savepoint),否则等同于PROPAGATION_REQUIRED

  TransactionDefition更多的是表达通用的概念,它的子接口TransactionAttribute添加了基于AOP的rollbackOn(...)操作。

1
2
3
4
5
6
7
8
9
10
11
12
public interface TransactionAttribute extends TransactionDefinition {
/**
* 用于在Spring IoC Container中获取PlatformTransactionManager
*/
@Nullable
String getQualifier();

/**
* 判断是否在遇到指定类型的异常时进行回滚
*/
boolean rollbackOn(Throwable ex);
}

再看一眼@Transactional注解的定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public @interface Transactional {

@AliasFor("transactionManager")
String value() default "";

@AliasFor("value")
String transactionManager() default "";

Propagation propagation() default Propagation.REQUIRED;

Isolation isolation() default Isolation.DEFAULT;

int timeout() default TransactionDefinition.TIMEOUT_DEFAULT;

boolean readOnly() default false;

Class<? extends Throwable>[] rollbackFor() default {};

String[] rollbackForClassName() default {};

Class<? extends Throwable>[] noRollbackFor() default {};

String[] noRollbackForClassName() default {};

}

想必你也看出来了,TransactionAttribute正是对运行时获取到的@Transactional注解的封装。

TransctionStatus

  TransctionStatus描述了某一时间点上事务的状态信息,比如是否是新开启的独立事务、是否已完成以及是否打上了rollback-only标记等等,并且为了支持嵌套事务,TransctionStatus还额外提供了对保存点(Savepoint)的支持。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// 管理Savepoint
public interface SavepointManager {
/**
* 创建一个新的Savepoint,后续可以回滚到指定的SavePoint
*/
Object createSavepoint() throws TransactionException;

/**
* 回滚到指定的Savepoint
*/
void rollbackToSavepoint(Object savepoint) throws TransactionException;

/**
* 释放指定Savepoint
*/
void releaseSavepoint(Object savepoint) throws TransactionException;
}

// 代表事务的当前状态
public interface TransactionExecution {
/**
* 检查当前事务是否是一个独立事务,返回false表示是加入的一个已存在的事务或以非事务方式运行
*/
boolean isNewTransaction();

/**
* 给当前事务打上rollback-only标记,被标记为rollback-only的事务只能被回滚而不会被提交
*/
void setRollbackOnly();

/**
* 检查当前事务是否打上了rollback-only标记
*/
boolean isRollbackOnly();

/**
* 检查当前事务是否已完成,已提交或已回滚都认为是已完成
*/
boolean isCompleted();
}

public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {
/**
* 检查当前事务是否携带有Savepoint,也就是说是否创建了嵌套事务
*/
boolean hasSavepoint();

/**
* 如果底层TxManager支持的话就有用,比如Hibernate Session,
* 而对于JDBC DataSource/Connection来讲是没有flush这个概念的,基本上是no-op
*/
@Override
void flush();
}
ResourceHolder

  当一个事务方法 A 调用另一个事务方法 B 的时候,如何保证这两个方法运行在同一个事务中呢?如果方法 A 和 B 使用不同的java.sql.Connection来操作数据库,能保证它们运行在同一个事务中吗?很明显,是不能的。多个方法运行在同一个事务中的前提是它们必须使用同一个java.sql.Connection,以伪代码的形式就是:

1
2
3
4
5
6
7
8
9
10
try {
connection.setAutoCommit(false)
// 方法 A 执行 sql
methodA(connection);
// 方法 B 执行 sql
methodB(connection);
connection.commit()
} catch (Exception ex) {
connection.rollback()
}

java.sql.Connection实例必须传递给方法 A 和方法 B,这样才能保证它们使用同一个连接对象。实际开发中,我们并没有像这样传递过连接对象,spring-tx将我们从这些细节中解放了出来。

  ResouceHolder就是设计来包裹底层连接资源的,spring-tx内部会使用线程私有存储ThreadLocal在同一个线程中进行传递,对方法 A 和 方法 B 来说,只要它们运行在同一个线程中,就能使用上同一个连接对象。当然了,ResourceHolder并非只能携带java.sql.Connection,对于使用MyBatis的用户来说,它携带的就是SqlSession了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface ResourceHolder {
/**
* 重置
*/
void reset();

/**
* 解绑连接资源
*/
void unbound();

/**
* 检查此Holder是否携带有连接资源
*/
boolean isVoid();
}
TransactionSynchronization

  由于spring-tx全盘接管了事务管理,那么它自然可以管理事务的生命周期。 TransactionSynchronization就是这样一个回调接口,它为我们揭示了事务运行时的各个阶段,如果我们需要在事务执行前后做一些额外的操作,使用它就再好不过了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public interface TransactionSynchronization extends Flushable {
/** 事务已提交 */
int STATUS_COMMITTED = 0;
/** 事务已回滚 */
int STATUS_ROLLED_BACK = 1;
/** 状态未知 */
int STATUS_UNKNOWN = 2;

/**
* 事务被挂起时回调
*/
default void suspend() {
}

/**
* 事务恢复时回调
*/
default void resume() {
}

@Override
default void flush() {
}

/**
* 事务提交前回调
*/
default void beforeCommit(boolean readOnly) {
}

/**
* 事务完成前回调,也就是在事务管理器 commit/rollback 之前
*/
default void beforeCompletion() {
}

/**
* 事务成功提交后回调
*/
default void afterCommit() {
}

/**
* 事务完成后回调,status揭示了事务当前状态
*/
default void afterCompletion(int status) {
}
}
TransactionSynchronizationManager

  spring-txResourceHolder的绑定和传递、TransactionSynchronization的注册和获取,均是代理给TransactionSynchronizationManager来完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
public abstract class TransactionSynchronizationManager {

private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class);

// ResourceHolder的绑定关系
// 比如在DataSourceTransactionManager中key是java.sql.DataSource,value是ConnectionHolder
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
// 已注册的TransactionSynchronization
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
// 当前的事务名称
private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");
// 当前事务是否是只读事务
private static final ThreadLocal<Boolean> currentTransactionReadOnly =
new NamedThreadLocal<>("Current transaction read-only status");
// 当前事务的隔离级别
private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
new NamedThreadLocal<>("Current transaction isolation level");
// 事务是否真的被激活
private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<>("Actual transaction active");

//-------------------------------------------------------------------------
// Management of transaction-associated resource handles
//-------------------------------------------------------------------------

/**
* 根据提供的key找寻底层资源
*/
@Nullable
public static Object getResource(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {
logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}

@Nullable
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// 对于ResourceHolder#isVid()的情况,认为其没有实际绑定上任何资源
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}

/**
* 绑定一个资源
*/
public static void bindResource(Object key, Object value) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Assert.notNull(value, "Value must not be null");
Map<Object, Object> map = resources.get();
if (map == null) {
map = new HashMap<>();
resources.set(map);
}
Object oldValue = map.put(actualKey, value);
// ResourceHolder#isVid()等同于不存在任何资源
if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
oldValue = null;
}
// TransactionSynchronizationManager中的任何操作,都只能先解除再操作,而不能进行覆盖
if (oldValue != null) {
throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
if (logger.isTraceEnabled()) {
logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +
Thread.currentThread().getName() + "]");
}
}

/**
* 解绑资源,如果资源不存在抛出异常
*/
public static Object unbindResource(Object key) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doUnbindResource(actualKey);
if (value == null) {
throw new IllegalStateException(
"No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
return value;
}

/**
* 解绑资源,资源不存在等于no-op
*/
@Nullable
public static Object unbindResourceIfPossible(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
return doUnbindResource(actualKey);
}

@Nullable
private static Object doUnbindResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.remove(actualKey);
if (map.isEmpty()) {
resources.remove();
}
// ResourceHolder#isVid()等同于不存在任何资源
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
value = null;
}
if (value != null && logger.isTraceEnabled()) {
logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}

//-------------------------------------------------------------------------
// Management of transaction synchronizations
//-------------------------------------------------------------------------

/**
* 检查transaction synchronization是否已激活
*/
public static boolean isSynchronizationActive() {
return (synchronizations.get() != null);
}

/**
* 激活transaction synchronization,如果已激活,抛出异常
*/
public static void initSynchronization() throws IllegalStateException {
if (isSynchronizationActive()) {
throw new IllegalStateException("Cannot activate transaction synchronization - already active");
}
logger.trace("Initializing transaction synchronization");
synchronizations.set(new LinkedHashSet<>());
}

/**
* 注册一个TransactionSynchronization,如果transaction synchronization未激活,抛出异常
*/
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
Set<TransactionSynchronization> synchs = synchronizations.get();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
synchs.add(synchronization);
}

/**
* 返回已注册的所有TransactionSynchronization,如果transaction synchronization未激活,抛出异常
*/
public static List<TransactionSynchronization> getSynchronizations() throws IllegalStateException {
Set<TransactionSynchronization> synchs = synchronizations.get();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
if (synchs.isEmpty()) {
return Collections.emptyList();
}
else {
List<TransactionSynchronization> sortedSynchs = new ArrayList<>(synchs);
// 排序,TransactionSynchronization可实现Ordered接口或使用@Order注解指定优先级
AnnotationAwareOrderComparator.sort(sortedSynchs);
return Collections.unmodifiableList(sortedSynchs);
}
}

/**
* 关闭transaction synchronization
*/
public static void clearSynchronization() throws IllegalStateException {
if (!isSynchronizationActive()) {
throw new IllegalStateException("Cannot deactivate transaction synchronization - not active");
}
logger.trace("Clearing transaction synchronization");
synchronizations.remove();
}

//-------------------------------------------------------------------------
// Exposure of transaction characteristics
//-------------------------------------------------------------------------

/**
* 记录当前事务名称
*/
public static void setCurrentTransactionName(@Nullable String name) {
currentTransactionName.set(name);
}

/**
* 返回当前事务名称
*/
@Nullable
public static String getCurrentTransactionName() {
return currentTransactionName.get();
}

/**
* 设置当前事务为只读事务
*/
public static void setCurrentTransactionReadOnly(boolean readOnly) {
currentTransactionReadOnly.set(readOnly ? Boolean.TRUE : null);
}

/**
* 检查当前事务是否为只读事务
*/
public static boolean isCurrentTransactionReadOnly() {
return (currentTransactionReadOnly.get() != null);
}

/**
* 设置当前事务的隔离级别
*/
public static void setCurrentTransactionIsolationLevel(@Nullable Integer isolationLevel) {
currentTransactionIsolationLevel.set(isolationLevel);
}

/**
* 返回当前事务的隔离级别
*/
@Nullable
public static Integer getCurrentTransactionIsolationLevel() {
return currentTransactionIsolationLevel.get();
}

/**
* 设置当前事务已开启
*/
public static void setActualTransactionActive(boolean active) {
actualTransactionActive.set(active ? Boolean.TRUE : null);
}

/**
* 检查当前事务已开启,也就是说是否真的存在事务而不仅仅只有transaction synchronization
*/
public static boolean isActualTransactionActive() {
return (actualTransactionActive.get() != null);
}

/**
* 完全清除transaction synchronization各种状态信息
*/
public static void clear() {
synchronizations.remove();
currentTransactionName.remove();
currentTransactionReadOnly.remove();
currentTransactionIsolationLevel.remove();
actualTransactionActive.remove();
}
}

注意,TransactionSynchronizationManager在操作ResourceHolder时是不允许直接覆盖的,旧的资源必须先解绑才能绑定新的资源。同时TransactionSynchronization只能在transaction synchronization激活时才能绑定,为此TransactionSynchronizationManager提供了initSynchronization()clearSynchronization()来分别开启开启和关闭transaction synchronization

结语

  抽象是编程的先决条件,编码不过是对抽象的具体实现。好的抽象才能带出好的代码,spring-tx也是如此,下一篇让我们一起钻到具体的实现细节里去吧~~


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!