引言 Spring提供的声明式事务想必大家都很熟悉了,简简单单一个@Transactional
注解便能提供如此强大的功能,那么它是如何实现的呢?带着这点好奇心,我们一起扒拉扒拉spring-tx-5.2.6.RELEASE
的源码吧。
所谓工欲善其事,必先利其器,深入源码之前了解清楚spring-tx
的相关概念还是很有必要的。本篇算是Spring AOP源码解析 的姊妹篇,毕竟声明式事务就是AOP思想的一个实际应用嘛。
Spring Tx Concepts PlatformTransactionManager
是Spring事务管理的核心接口,它规范了应用程序操作事务的方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public interface PlatformTransactionManager extends TransactionManager { TransactionStatus getTransaction (@Nullable TransactionDefinition definition) throws TransactionException ; void commit (TransactionStatus status) throws TransactionException ; void rollback (TransactionStatus status) throws TransactionException ; }
TransactionDefinition TransactionDefinition
描述了事务的相关属性,比如事务的隔离级别、传播行为,亦或是超时时长是多少等等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public interface TransactionDefinition { default int getPropagationBehavior () { return PROPAGATION_REQUIRED; } default int getIsolationLevel () { return ISOLATION_DEFAULT; } default int getTimeout () { return TIMEOUT_DEFAULT; } default boolean isReadOnly () { return false ; } @Nullable default String getName () { return null ; } }
事务的隔离级别想必不用多做解释,传播行为(Propagation Behavior
)是什么概念呢?简单来说,当一个事务方法被另一个事务方法调用时,传播行为可以控制是否需要创建事务以及如何创建事务,spring-tx
中定义了7种事务传播行为:
PROPAGATION_REQUIRED
: 表示方法必须运行在事务中,如果当前事务存在,方法将会在该事务中运行,否则将开启一个新的独立事务
PROPAGATION_SUPPORTS
:表示方法支持在事务中运行,如果当前事务存在,方法将会在该事务中运行,否则以非事务方式运行
PROPAGATION_MANDATORY
:表示方法必须运行在事务中,如果当前事务不存在,抛出异常
PROPAGATION_REQUIRES_NEW
:表示方法必须运行在独立事务中,无论当前是否存在事务,该级别总会开启一个新的独立事务
PROPAGATION_NOT_SUPPORTED
:表示方法不支持在事务中运行,如果当前事务存在,挂起当前事务从而以非事务方式运行
PROPAGATION_NEVER
:表示方法不支持在事务中运行,如果当前事务存在,抛出异常
PROPAGATION_NESTED
:表示方法必须运行在事务中,如果当前事务存在,开启一个嵌套事务(Savepoint
),否则等同于PROPAGATION_REQUIRED
TransactionDefition
更多的是表达通用的概念,它的子接口TransactionAttribute
添加了基于AOP的rollbackOn(...)
操作。
1 2 3 4 5 6 7 8 9 10 11 12 public interface TransactionAttribute extends TransactionDefinition { @Nullable String getQualifier () ; boolean rollbackOn (Throwable ex) ; }
再看一眼@Transactional
注解的定义。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public @interface Transactional { @AliasFor("transactionManager") String value () default "" ; @AliasFor("value") String transactionManager () default "" ; Propagation propagation () default Propagation.REQUIRED ; Isolation isolation () default Isolation.DEFAULT ; int timeout () default TransactionDefinition.TIMEOUT_DEFAULT ; boolean readOnly () default false ; Class<? extends Throwable>[] rollbackFor() default {}; String[] rollbackForClassName() default {}; Class<? extends Throwable>[] noRollbackFor() default {}; String[] noRollbackForClassName() default {}; }
想必你也看出来了,TransactionAttribute
正是对运行时获取到的@Transactional
注解的封装。
TransctionStatus TransctionStatus
描述了某一时间点上事务的状态信息,比如是否是新开启的独立事务、是否已完成以及是否打上了rollback-only
标记等等,并且为了支持嵌套事务,TransctionStatus
还额外提供了对保存点(Savepoint
)的支持。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public interface SavepointManager { Object createSavepoint () throws TransactionException ; void rollbackToSavepoint (Object savepoint) throws TransactionException ; void releaseSavepoint (Object savepoint) throws TransactionException ; }public interface TransactionExecution { boolean isNewTransaction () ; void setRollbackOnly () ; boolean isRollbackOnly () ; boolean isCompleted () ; }public interface TransactionStatus extends TransactionExecution , SavepointManager , Flushable { boolean hasSavepoint () ; @Override void flush () ; }
ResourceHolder 当一个事务方法 A 调用另一个事务方法 B 的时候,如何保证这两个方法运行在同一个事务中呢?如果方法 A 和 B 使用不同的java.sql.Connection
来操作数据库,能保证它们运行在同一个事务中吗?很明显,是不能的。多个方法运行在同一个事务中的前提是它们必须使用同一个java.sql.Connection
,以伪代码的形式就是:
1 2 3 4 5 6 7 8 9 10 try { connection.setAutoCommit(false ) methodA(connection); methodB(connection); connection.commit() } catch (Exception ex) { connection.rollback() }
java.sql.Connection
实例必须传递给方法 A 和方法 B,这样才能保证它们使用同一个连接对象。实际开发中,我们并没有像这样传递过连接对象,spring-tx
将我们从这些细节中解放了出来。
ResouceHolder
就是设计来包裹底层连接资源的,spring-tx
内部会使用线程私有存储ThreadLocal
在同一个线程中进行传递,对方法 A 和 方法 B 来说,只要它们运行在同一个线程中,就能使用上同一个连接对象。当然了,ResourceHolder
并非只能携带java.sql.Connection
,对于使用MyBatis的用户来说,它携带的就是SqlSession
了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public interface ResourceHolder { void reset () ; void unbound () ; boolean isVoid () ; }
TransactionSynchronization 由于spring-tx
全盘接管了事务管理,那么它自然可以管理事务的生命周期。 TransactionSynchronization
就是这样一个回调接口,它为我们揭示了事务运行时的各个阶段,如果我们需要在事务执行前后做一些额外的操作,使用它就再好不过了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public interface TransactionSynchronization extends Flushable { int STATUS_COMMITTED = 0 ; int STATUS_ROLLED_BACK = 1 ; int STATUS_UNKNOWN = 2 ; default void suspend () { } default void resume () { } @Override default void flush () { } default void beforeCommit (boolean readOnly) { } default void beforeCompletion () { } default void afterCommit () { } default void afterCompletion (int status) { } }
TransactionSynchronizationManager spring-tx
中ResourceHolder
的绑定和传递、TransactionSynchronization
的注册和获取,均是代理给TransactionSynchronizationManager
来完成的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 public abstract class TransactionSynchronizationManager { private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class); private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources" ); private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations" ); private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<>("Current transaction name" ); private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status" ); private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level" ); private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<>("Actual transaction active" ); @Nullable public static Object getResource (Object key) { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doGetResource(actualKey); if (value != null && logger.isTraceEnabled()) { logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]" ); } return value; } @Nullable private static Object doGetResource (Object actualKey) { Map<Object, Object> map = resources.get(); if (map == null ) { return null ; } Object value = map.get(actualKey); if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { map.remove(actualKey); if (map.isEmpty()) { resources.remove(); } value = null ; } return value; } public static void bindResource (Object key, Object value) throws IllegalStateException { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Assert.notNull(value, "Value must not be null" ); Map<Object, Object> map = resources.get(); if (map == null ) { map = new HashMap<>(); resources.set(map); } Object oldValue = map.put(actualKey, value); if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) { oldValue = null ; } if (oldValue != null ) { throw new IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]" ); } if (logger.isTraceEnabled()) { logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" + Thread.currentThread().getName() + "]" ); } } public static Object unbindResource (Object key) throws IllegalStateException { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doUnbindResource(actualKey); if (value == null ) { throw new IllegalStateException( "No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]" ); } return value; } @Nullable public static Object unbindResourceIfPossible (Object key) { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); return doUnbindResource(actualKey); } @Nullable private static Object doUnbindResource (Object actualKey) { Map<Object, Object> map = resources.get(); if (map == null ) { return null ; } Object value = map.remove(actualKey); if (map.isEmpty()) { resources.remove(); } if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { value = null ; } if (value != null && logger.isTraceEnabled()) { logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" + Thread.currentThread().getName() + "]" ); } return value; } public static boolean isSynchronizationActive () { return (synchronizations.get() != null ); } public static void initSynchronization () throws IllegalStateException { if (isSynchronizationActive()) { throw new IllegalStateException("Cannot activate transaction synchronization - already active" ); } logger.trace("Initializing transaction synchronization" ); synchronizations.set(new LinkedHashSet<>()); } public static void registerSynchronization (TransactionSynchronization synchronization) throws IllegalStateException { Assert.notNull(synchronization, "TransactionSynchronization must not be null" ); Set<TransactionSynchronization> synchs = synchronizations.get(); if (synchs == null ) { throw new IllegalStateException("Transaction synchronization is not active" ); } synchs.add(synchronization); } public static List<TransactionSynchronization> getSynchronizations () throws IllegalStateException { Set<TransactionSynchronization> synchs = synchronizations.get(); if (synchs == null ) { throw new IllegalStateException("Transaction synchronization is not active" ); } if (synchs.isEmpty()) { return Collections.emptyList(); } else { List<TransactionSynchronization> sortedSynchs = new ArrayList<>(synchs); AnnotationAwareOrderComparator.sort(sortedSynchs); return Collections.unmodifiableList(sortedSynchs); } } public static void clearSynchronization () throws IllegalStateException { if (!isSynchronizationActive()) { throw new IllegalStateException("Cannot deactivate transaction synchronization - not active" ); } logger.trace("Clearing transaction synchronization" ); synchronizations.remove(); } public static void setCurrentTransactionName (@Nullable String name) { currentTransactionName.set(name); } @Nullable public static String getCurrentTransactionName () { return currentTransactionName.get(); } public static void setCurrentTransactionReadOnly (boolean readOnly) { currentTransactionReadOnly.set(readOnly ? Boolean.TRUE : null ); } public static boolean isCurrentTransactionReadOnly () { return (currentTransactionReadOnly.get() != null ); } public static void setCurrentTransactionIsolationLevel (@Nullable Integer isolationLevel) { currentTransactionIsolationLevel.set(isolationLevel); } @Nullable public static Integer getCurrentTransactionIsolationLevel () { return currentTransactionIsolationLevel.get(); } public static void setActualTransactionActive (boolean active) { actualTransactionActive.set(active ? Boolean.TRUE : null ); } public static boolean isActualTransactionActive () { return (actualTransactionActive.get() != null ); } public static void clear () { synchronizations.remove(); currentTransactionName.remove(); currentTransactionReadOnly.remove(); currentTransactionIsolationLevel.remove(); actualTransactionActive.remove(); } }
注意,TransactionSynchronizationManager
在操作ResourceHolder
时是不允许直接覆盖的,旧的资源必须先解绑才能绑定新的资源。同时TransactionSynchronization
只能在transaction synchronization
激活时才能绑定,为此TransactionSynchronizationManager
提供了initSynchronization()
和clearSynchronization()
来分别开启开启和关闭transaction synchronization
。
结语 抽象是编程的先决条件,编码不过是对抽象的具体实现。好的抽象才能带出好的代码,spring-tx
也是如此,下一篇让我们一起钻到具体的实现细节里去吧~~