前言 上篇 我们分析了spring-tx
中的AOP部分,包括TransactionAttributeSourcePointcut
如何定位潜在的事务方法,以及TransactionInterceptor
又如何结合PlatformTransactionManager
为方法应用事务管理,相信看过上篇的同学也从中get到了使用AOP的新姿势😂
不过到目前为止,除去开篇 中的概念介绍,我们对PlatformTransactionManager
还知之甚少,本篇我们就深入一点儿。DataSourceTransactionManager
作为最具代表性的 PlatformTransactionManager
实现,我们就以它为例来看看隐藏在抽象下的实现细节吧。
一个小例子 来看一个使用Spring声明式事务的小例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Service public class ServiceA { @Autowired private ServiceB serviceB; @Transactional(propagation = Propagation.REQUIRED) public void service () { serviceB.service(); } }@Service public class ServiceB { @Transactional(propagation = Propagation.REQUIRES_NEW) public void service () { } }@RestController public class DemoController { @Autowired private ServiceA serviceA; @PostMapping("/demo") public void demo () { serviceA.service(); } }
这里我们有两个事务方法,分别属于ServiceA
和ServiceB
,处理请求的入口在DemoController
。请求到达后,
进入服务 A 的service()
方法
开启事务 tx-a
执行CRUD
进入服务 B 的service()
方法
挂起 tx-a,开启新事务 tx-b
执行CRUD
结束serviceB#service()
调用,视执行情况提交或回滚事务
结束serviceA#service()
调用,视执行情况提交或回滚事务
麻雀虽小五脏俱全,这个小例子也是后续分析源码时的参照,先给出来混个脸熟吧。
DataSourceTransactionManager Hierarchy overview
DataSourceTransactionManager
继承自AbstractPlatformTransactionManager
,基类中实现了事务管理的一整套工作流:
检查是否已存在事务
应用适当的传播行为
必要时暂停和恢复事务
提交事务时检查rollback-only
标记
回滚事务时的处理——如实回滚或仅设置rollback-only
标记
触发已注册的transaction synchronization
回调
子类只需要根据事务的状态实现特定的模板方法即可,比如事务开始、暂停、恢复、提交和回滚。
TransactionInterceptor 事务拦截器想必大家都很熟悉了,简化后的模型大致如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public Object invoke (MethodInvocation invocation) throws Throwable { Object result = null ; TransactionAttribute txAttr = xxx; PlatformTransactionManager txManager = xxx; TransactionStatus status = txManager.getTransaction(txAttr); try { result = invocation.proceed(); } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { txManager.rollback(status); } else { txManager.commit(status); } throw ex; } txManager.commit(status); return result; }
核心流程大致分为5步,其中和事务管理器有关的是第3和第5步。前情提要到此结束,那我们就从第3步getTransaction(...)
开始呗~
getTransaction 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 @Override public final TransactionStatus getTransaction (@Nullable TransactionDefinition definition) throws TransactionException { TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); if (isExistingTransaction(transaction)) { return handleExistingTransaction(def, transaction, debugEnabled); } if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout" , def.getTimeout()); } if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'" ); } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null ); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { return startTransaction(def, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error ex) { resume(null , suspendedResources); throw ex; } } else { if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + def); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(def, null , true , newSynchronization, debugEnabled, null ); } }
getTransaction(...)
首先会调用模板方法doGetTransaction()
来获取事务对象(transaction object
)。事务对象虽然特定于具体的PlatformTransactionManeger
实现,但一般都会携带相应的事务状态,我们来分析一下DataSourceTransactionManager
对它的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override protected Object doGetTransaction () { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource()); txObject.setConnectionHolder(conHolder, false ); return txObject; }
开篇 介绍ResourceHolder
接口的时候,我们曾讨论过如何保证多个方法运行在同一个事务中的话题。若将事务窄化为数据库事务的话,自然是要保证多个方法使用同一个JDBC Connection,ConnectionHolder
作为ResouceHolder
的一种实现就是用来携带JDBC Connection的。当然了,除了作为java.sql.Connection
的容器以外,ConnectionHolder
还有一些可配置属性,比如设置rollback-only
标记、超时时长等。DataSourceTransactionObject
可以说是java.sql.Connection
的一个快照,它保存了数据库连接的一系列状态(用于后续对数据库连接的复原),比如当前的隔离级别、自动提交(auto commit
)的取值以及一个指示java.sql.Connection
是通过javax.sql.DataSource
新创建的还是从线程私有存储中获取的标记。这两个类都比较简单,大家可以自行查阅一下,这里就不细说了。
了解了ConnectionHolder
和DataSourceTransactionObject
以后,再看isExistingTransaction(...)
就很简单了。
1 2 3 4 5 6 7 8 @Override protected boolean isExistingTransaction (Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()); }
还记得前面给出的小例子吗?对于服务 A 的service()
方法来说,很明显我们是无法从线程私有存储中获取到ConnectionHolder
实例的,因此isExistingTransaction(...)
必然返回 false。这里我们也先跳过handleExistingTransaction(...)
,晚点再来分析它。接下来,在没有任何已知事务的前提下,我们一起来看看spring-tx
是如何根据传播行为的配置情况做不同处理的。
PROPAGATION_MANDATORY
要求一定要运行在事务中,而当前不存在任何事务,因此会抛出异常。PROPAGATION_REQUIRED
、PROPAGATION_REQUIRES_NEW
和PROPAGATION_NESTED
这三种传播行为在没有事务存在时的表现是一致的,它们都会开启一个新的独立事务。新事务的开启也意味着对旧事务(或者说外层事务)的挂起,所以我们看到这第一步便是调用suspend(...)
,而这里传null
也是因为当前不存在任何已知事务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 protected final SuspendedResourcesHolder suspend (@Nullable Object transaction) throws TransactionException { if (TransactionSynchronizationManager.isSynchronizationActive()) { List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization(); try { Object suspendedResources = null ; if (transaction != null ) { suspendedResources = doSuspend(transaction); } String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null ); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false ); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null ); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false ); return new SuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException | Error ex) { doResumeSynchronization(suspendedSynchronizations); throw ex; } } else if (transaction != null ) { Object suspendedResources = doSuspend(transaction); return new SuspendedResourcesHolder(suspendedResources); } else { return null ; } }
suspend(...)
中牵涉到spring-tx
事务同步机制(transaction synchronization mechanism
),这部分内容我们在开篇 中介绍过。如若事务同步机制已激活,就需要先执行对应的回调,这也是doSuspendSynchronization()
做的工作。接下来才是调用模板方法doSuspend(...)
,DataSourceTransactionManager
实现了这个方法。
1 2 3 4 5 6 7 8 @Override protected Object doSuspend (Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; txObject.setConnectionHolder(null ); return TransactionSynchronizationManager.unbindResource(obtainDataSource()); }
主要就是将ConnectionHolder
从事务对象和线程私有存储中移除,然后转移到SuspendedResourcesHolder
中用于后续的恢复。回到getTransaction(...)
,挂起旧事务后自然是要开启新事物。
1 2 3 4 5 6 7 8 9 10 11 12 13 private TransactionStatus startTransaction (TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true , newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; }
简单明了的三步:
创建TransactionStatus
实例,它包含了一系列的事务状态
调用模板方法doBegin(...)
,通知新事务已启动
按需开启事务同步机制
DeaultTransactionStatus
就是一个上下文对象或者说信息的集合体,代码好像没什么可说的,大家自行查阅一下吧,我们重点来看一下doBegin(...)
模板方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 @Override protected void doBegin (Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null ; try { if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = obtainDataSource().getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction" ); } txObject.setConnectionHolder(new ConnectionHolder(newCon), true ); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true ); con = txObject.getConnectionHolder().getConnection(); Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); txObject.setReadOnly(definition.isReadOnly()); if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true ); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit" ); } con.setAutoCommit(false ); } prepareTransactionalConnection(con, definition); txObject.getConnectionHolder().setTransactionActive(true ); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, obtainDataSource()); txObject.setConnectionHolder(null , false ); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction" , ex); } }
doBegin(...)
设置transaction active
标记,isExistingTransaction(...)
会检查这个标记,这样就构成了一个闭环。最后一步是调用prepareSynchronization(...)
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 protected void prepareSynchronization (DefaultTransactionStatus status, TransactionDefinition definition) { if (status.isNewSynchronization()) { TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel( definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null ); TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); TransactionSynchronizationManager.setCurrentTransactionName(definition.getName()); TransactionSynchronizationManager.initSynchronization(); } }
再次回到getTransaction(...)
,可以看到startTransaction(...)
是包裹在 try 块中的,以便在出现异常时恢复已有事务,那我们就看看具体是怎么恢复的呗。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 protected final void resume (@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder) throws TransactionException { if (resourcesHolder != null ) { Object suspendedResources = resourcesHolder.suspendedResources; if (suspendedResources != null ) { doResume(transaction, suspendedResources); } List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations; if (suspendedSynchronizations != null ) { TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel); TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly); TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name); doResumeSynchronization(suspendedSynchronizations); } } } @Override protected void doResume (@Nullable Object transaction, Object suspendedResources) { TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources); }
基本上是suspend(...)
的逆操作,至此,创建新事务的流程就分析完了。等等,你以为这就完了?还记得我们的小例子吗?ServiceB
说这才哪到哪,好戏才刚刚开始呢。没错,对ServiceB
而言,ServiceA#service()
已经开启事务了,等到它调用service()
的时候,isExistingTransaction(...)
就返回 true 而不是 false 了,那spring-tx
是如何处理当前已存在事务的情况呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 private TransactionStatus handleExistingTransaction ( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'" ); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction" ); } Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null , false , newSynchronization, debugEnabled, suspendedResources); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]" ); } SuspendedResourcesHolder suspendedResources = suspend(transaction); try { return startTransaction(definition, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'" ); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]" ); } if (useSavepointForNestedTransaction()) { DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false , false , debugEnabled, null ); status.createAndHoldSavepoint(); return status; } else { return startTransaction(definition, transaction, debugEnabled, null ); } } if (debugEnabled) { logger.debug("Participating in existing transaction" ); } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)" )); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is" ); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false , newSynchronization, debugEnabled, null ); }protected final DefaultTransactionStatus prepareTransactionStatus ( TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { DefaultTransactionStatus status = newTransactionStatus( definition, transaction, newTransaction, newSynchronization, debug, suspendedResources); prepareSynchronization(status, definition); return status; }
handleExistingTransaction(...)
也对不同的传播行为做了处理,至于其中涉及到的方法调用,前面都已经分析过了。各位同学在自己看源码的时候,建议动手调试调试,彻底梳理清楚其间的状态标志及其转换为好。
rollback 嘟嘟嘟,CRUD出异常了,这会该回滚事务了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 @Override public final void rollback (TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction" ); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; processRollback(defStatus, false ); }private void processRollback (DefaultTransactionStatus status, boolean unexpected) { try { boolean unexpectedRollback = unexpected; try { triggerBeforeCompletion(status); if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Rolling back transaction to savepoint" ); } status.rollbackToHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction rollback" ); } doRollback(status); } else { if (status.hasTransaction()) { if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { logger.debug("Participating transaction failed - marking existing transaction as rollback-only" ); } doSetRollbackOnly(status); } else { if (status.isDebug()) { logger.debug("Participating transaction failed - letting transaction originator decide on rollback" ); } } } else { logger.debug("Should roll back transaction but cannot - no transaction available" ); } if (!isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = false ; } } } catch (RuntimeException | Error ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only" ); } } finally { cleanupAfterCompletion(status); } }
回滚分为三种情况:
嵌套事务(创建有Savepoint
)回滚
独立事务回滚
非独立事务回滚
篇幅有限,Savepoint
就不说了,重点看2、3两种情况。对于独立事务,调用doRollback(...)
进行回滚,基于java.sql.Connection
的回滚机制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override protected void doRollback (DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Rolling back JDBC transaction on Connection [" + con + "]" ); } try { con.rollback(); } catch (SQLException ex) { throw new TransactionSystemException("Could not roll back JDBC transaction" , ex); } }
对于非独立事务,调用doSetRollbackOnly(...)
,仅仅是打上rollback-only
标记用于后续的检测,processRollback(...)
本身就对这个标记进行了检测。
1 2 3 4 5 6 7 8 9 10 @Override protected void doSetRollbackOnly (DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); if (status.isDebug()) { logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() + "] rollback-only" ); } txObject.setRollbackOnly(); }
能做的都做了,最后该清理现场了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 private void cleanupAfterCompletion (DefaultTransactionStatus status) { status.setCompleted(); if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); } if (status.isNewTransaction()) { doCleanupAfterCompletion(status.getTransaction()); } if (status.getSuspendedResources() != null ) { if (status.isDebug()) { logger.debug("Resuming suspended transaction after completion of inner transaction" ); } Object transaction = (status.hasTransaction() ? status.getTransaction() : null ); resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources()); } }@Override protected void doCleanupAfterCompletion (Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.unbindResource(obtainDataSource()); } Connection con = txObject.getConnectionHolder().getConnection(); try { if (txObject.isMustRestoreAutoCommit()) { con.setAutoCommit(true ); } DataSourceUtils.resetConnectionAfterTransaction( con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly()); } catch (Throwable ex) { logger.debug("Could not reset JDBC Connection after transaction" , ex); } if (txObject.isNewConnectionHolder()) { if (logger.isDebugEnabled()) { logger.debug("Releasing JDBC Connection [" + con + "] after transaction" ); } DataSourceUtils.releaseConnection(con, this .dataSource); } txObject.getConnectionHolder().clear(); }
commit 要是没出什么幺蛾子,就可以提交事务了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 @Override public final void commit (TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction" ); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback" ); } processRollback(defStatus, false ); return ; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit" ); } processRollback(defStatus, true ); return ; } processCommit(defStatus); }private void processCommit (DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false ; try { boolean unexpectedRollback = false ; prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true ; if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint" ); } unexpectedRollback = status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit" ); } unexpectedRollback = status.isGlobalRollbackOnly(); doCommit(status); } else if (isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = status.isGlobalRollbackOnly(); } if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only" ); } } catch (UnexpectedRollbackException ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException | Error ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; } try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); } }
commit
相对来讲是比较复杂的,它牵涉到对rollback-only
标记的处理。这里解释一下global rollback only
,区别于local rollback only
,这个标记多用于多事务协作的情况,比如两阶段提交协议下的分布式事务。
区别于rollback
,commit
不需要处理非独立事务的情况,内层事务不需要提交,它只需要等待外层事务提交即可。对于独立事务,模板方法doCommit(...)
也只是简单的代理给java.sql.Connection
的提交机制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override protected void doCommit (DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Committing JDBC transaction on Connection [" + con + "]" ); } try { con.commit(); } catch (SQLException ex) { throw new TransactionSystemException("Could not commit JDBC transaction" , ex); } }
后记 最后回顾一下我们的小例子,现在它还有秘密吗?本篇我们一起探索了PlatformTransactionManeger
抽象下的事务管理机制,篇幅较长,相信耐心看完的你一定有所收获😊