当前位置:Gxlcms > 数据库问题 > 如何将一个操作“绑定到数据库事务上”

如何将一个操作“绑定到数据库事务上”

时间:2021-07-01 10:21:17 帮助过:7人阅读

  • spring-cache简介

    • 基本机制

    • 事务上下文中的问题

    • 将操作绑定到数据库事务上

    • spring-cache的相关实现

    • TransactionSynchronizationManager和TransactionSynchronizationAdapter

    • 事务相关操作注册与回调流程

    • 其它应用

    摘要

    在开发中,我们常常会遇到(或者需要)把一些操作“绑定到数据库事务上”。也就是说,如果数据库事务成功提交,则执行这个操作;如果数据库事务回滚,则不执行这个操作(或者执行另一个操作)。

    例如,JMS与事务中介绍了一种JmsTemplate的配置方法,可以把“发送JMS消息”的操作绑定到数据库事务上。除此之外,更新缓存的操作也需要做类似的绑定处理。否则,数据库事务回滚了,而缓存中却完成了更新操作,可能导致一段时间内都会发生“脏读”。

    那么,这种“绑定到数据库事务上”的功能,是如何实现的呢?spring-cache中就有一个很好的例子。

     


     

    spring-cache简介

    spring-cache本质上不是一个具体的缓存实现方案(例如EHCache 或者 OSCache),而是一个对缓存使用的抽象,通过在既有代码中添加少量它定义的各种 annotation,即能够简单而快捷地操作缓存。

     spring-cache提供了一个CacheManager接口,用于抽象和管理缓存;缓存则抽象为Cache接口;而业务数据的CRUD操作,则由@CachePut/@Cacheable/@CacheEviet注解来进行配置后,由Cache接口下的各种实现类来处理。此外还有一些辅助类、配置类,由于这里是“简介”,按下不表。

    基本机制

    显然,spring-cache使用了基于注解的AOP机制。以@CachePut注解为例,它的基本操作流程是这样的:

    技术分享

     

    其中,“获取缓存实例Cache”就是由CacheManager接口负责的。这里的“缓存实例”只是一个“逻辑”上的实例;在物理实现上,它可能是同一个缓存中的不同命名空间、也可能确实是不同的物理缓存。

    “将返回结果写入缓存”,以及其它的缓存读、写操作,都由Cache接口来负责。

    事务上下文中的问题

    在事务上下文中,上面所说的“基本流程”是存在问题的:如果“写缓存”操作成功、而数据库事务回滚了,那么缓存中就会出现一笔脏数据。如下图所示:

    技术分享

     

    这种场景下,我们就需要把缓存操作绑定到数据库事务上。


     

    将操作绑定到数据库事务上

    spring-cache的相关实现

    与JmsTemplate类似,Spring-cache提供了一个“绑定数据库事务”的CacheManager实现类:AbstractTransactionSupportingCacheManager。不过,这个类只提供一个“是否绑定到数据库事务上”的配置项(transactionAware),自身并不处理“绑定数据库事务”这个操作。真正实现了“绑定”处理的,是AbstractTransactionSupportingCacheManager提供的Cache实现类:TransactionAwareCacheDecorator。这个类的put方法代码如下:

    TransactionAwareCacheDecorator 

    public class TransactionAwareCacheDecorator implements Cache {

        private final Cache targetCache;

     

        @Override

        public void put(final Object key, final Object value) {

            // 判断是否开启了事务

            if (TransactionSynchronizationManager.isSynchronizationActive()) {

                // 将操作注册到“afterCommit”阶段

                TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

                    @Override

                    public void afterCommit() {

                        targetCache.put(key, value);

                    }

                });

            }

            else {

                this.targetCache.put(key, value);

            }

        }

        // 省略其它方法

    }

     

    AbstractTransactionSupportingCacheManager是基于“继承”来提供TransactionAwareCacheDecorator。除了它之外,spring-cache还提供了一个基于“组合”的CacheManager实现类:TransactionAwareCacheManagerProxy。不过,后者本质上也要通过TransactionAwareCacheDecorator来实现所需功能。

    TransactionSynchronizationManager和TransactionSynchronizationAdapter

    TransactionSynchronizationManager中的代码有点复杂。但是其功能可以“一言以蔽之”:维护事务状态。在这个类中有一系列的ThreadLocal类型的类变量,它们就负责存储当前线程中的事务数据。相关代码如下:

    TransactionSynchronizationManager中的ThreadLocal 

    private static final ThreadLocal<Map<Object, Object>> resources =

            new NamedThreadLocal<Map<Object, Object>>("Transactional resources");

    // 关注点:事务相关操作的回调模板

    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =

            new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");

    private static final ThreadLocal<String> currentTransactionName =

            new NamedThreadLocal<String>("Current transaction name");

    private static final ThreadLocal<Boolean> currentTransactionReadOnly =

            new NamedThreadLocal<Boolean>("Current transaction read-only status");

    private static final ThreadLocal<Integer> currentTransactionIsolationLevel =

            new NamedThreadLocal<Integer>("Current transaction isolation level");

    private static final ThreadLocal<Boolean> actualTransactionActive =

            new NamedThreadLocal<Boolean>("Actual transaction active");

    这些类变量中,我们需要关注的是synchronizations 。在TransactionAwareCacheDecorator中使用到的TransactionSynchronizationManager.isSynchronizationActive()、TransactionSynchronizationManager.registerSynchronization()和new TransactionSynchronizationAdapter(),都与它有关。

     

    先看isSynchronizationActive()方法。它的代码实现非常简单,仅仅是判断了synchronizations中是否有数据(Set<TransactionSynchronization>非null即可,并不要求其中有TransactionSynchronization实例)。之所以可以这样判断,是因为Spring在开启数据库事务(无论是使用@Transactional注解,还是用xml配置)时,都会向其中写入一个实例,用于自动处理Connection的获取、提交或回滚等操作。这个方法的代码如下:

    isSynchronizationActive() 

    /**

     * Return if transaction synchronization is active for the current thread.

     * Can be called before register to avoid unnecessary instance creation.

     * @see #registerSynchronization

     */

    public static boolean isSynchronizationActive() {

        return (synchronizations.get() != null);

    }

     

    再看registerSynchronization()方法。它其实也非常简单:首先调用isSynchronizationActive()做一个校验;然后将入参synchronization添加到synchronizations 中。入参synchronization中的方法不会在这里执行,而是要等到事务执行到一定阶段时才会被调用。这个方法的代码如下:

    registerSynchronization() 

    /**

     * Register a new transaction synchronization for the current thread.

     * Typically called by resource management code.

     * <p>Note that synchronizations can implement the

     * {@link org.springframework.core.Ordered} interface.

     * They will be executed in an order according to their order value (if any).

     * @param synchronization the synchronization object to register

     * @throws IllegalStateException if transaction synchronization is not active

     * @see org.springframework.core.Ordered

     */

    public static void registerSynchronization(TransactionSynchronization synchronization)

            throws IllegalStateException {

        Assert.notNull(synchronization, "TransactionSynchronization must not be null");

        if (!isSynchronizationActive()) {

            throw new IllegalStateException("Transaction synchronization is not active");

        }

        synchronizations.get().add(synchronization);

    }

     

    比较复杂的是TransactionSynchronizationAdapter类。在进入这个类之前,我们得先看看TransactionSynchronization接口。

    TransactionSynchronization接口定义了一系列的回调方法,对应一个事务执行的不同阶段:挂起、恢复、flush、提交(前、后)、完成(事务成功或失败)等。当事务运行到对应阶段时,事务管理器会从TransactionSynchronizationManager维护的synchronizations中拿出所有的回调器,逐个回调其中的对应方法。这个接口的代码如下:

    TransactionSynchronization 

    /**

     * Interface for transaction synchronization callbacks.

     * Supported by AbstractPlatformTransactionManager.

     *

     * <p>TransactionSynchronization implementations can implement the Ordered interface

     * to influence their execution order. A synchronization that does not implement the

     * Ordered interface is appended to the end of the synchronization chain.

     *

     * <p>System synchronizations performed by Spring itself use specific order values,

     * allowing for fine-grained interaction with their execution order (if necessary).

     *

     * @author Juergen Hoeller

     * @since 02.06.2003

     * @see TransactionSynchronizationManager

     * @see AbstractPlatformTransactionManager

     * @see org.springframework.jdbc.datasource.DataSourceUtils#CONNECTION_SYNCHRONIZATION_ORDER

     */

    public interface TransactionSynchronization extends Flushable {

        /** Completion status in case of proper commit */

        int STATUS_COMMITTED = 0;

        /** Completion status in case of proper rollback */

        int STATUS_ROLLED_BACK = 1;

        /** Completion status in case of heuristic mixed completion or system errors */

        int STATUS_UNKNOWN = 2;

     

        /**

         * Suspend this synchronization.

         * Supposed to unbind resources from TransactionSynchronizationManager if managing any.

         * @see TransactionSynchronizationManager#unbindResource

         */

        void suspend();

        /**

         * Resume this synchronization.

         * Supposed to rebind resources to TransactionSynchronizationManager if managing any.

         * @see TransactionSynchronizationManager#bindResource

         */

        void resume();

        /**

         * Flush the underlying session to the datastore, if applicable:

         * for example, a Hibernate/JPA session.

         * @see org.springframework.transaction.TransactionStatus#flush()

         */

        @Override

        void flush();

        /**

         * Invoked before transaction commit (before "beforeCompletion").

         * Can e.g. flush transactional O/R Mapping sessions to the database.

         * <p>This callback does <i>not</i> mean that the transaction will actually be committed.

         * A rollback decision can still occur after this method has been called. This callback

         * is rather meant to perform work that‘s only relevant if a commit still has a chance

         * to happen, such as flushing SQL statements to the database.

         * <p>Note that exceptions will get propagated to the commit caller and cause a

         * rollback of the transaction.

         * @param readOnly whether the transaction is defined as read-only transaction

         * @throws RuntimeException in case of errors; will be <b>propagated to the caller</b>

         * (note: do not throw TransactionException subclasses here!)

         * @see #beforeCompletion

         */

        void beforeCommit(boolean readOnly);

        /**

         * Invoked before transaction commit/rollback.

         * Can perform resource cleanup <i>before</i> transaction completion.

         * <p>This method will be invoked after {@code beforeCommit}, even when

         * {@code beforeCommit} threw an exception. This callback allows for

         * closing resources before transaction completion, for any outcome.

         * @throws RuntimeException in case of errors; will be <b>logged but not propagated</b>

         * (note: do not throw TransactionException subclasses here!)

         * @see #beforeCommit

         * @see #afterCompletion

         */

        void beforeCompletion();

        /**

         * Invoked after transaction commit. Can perform further operations right

         * <i>after</i> the main transaction has <i>successfully</i> committed.

         * <p>Can e.g. commit further operations that are supposed to follow on a successful

         * commit of the main transaction, like confirmation messages or emails.

         * <p><b>NOTE:</b> The transaction will have been committed already, but the

         * transactional resources might still be active and accessible. As a consequence,

         * any data access code triggered at this point will still "participate" in the

         * original transaction, allowing to perform some cleanup (with no commit following

         * anymore!), unless it explicitly declares that it needs to run in a separate

         * transaction. Hence: <b>Use {@code PROPAGATION_REQUIRES_NEW} for any

         * transactional operation that is called from here.</b>

         * @throws RuntimeException in case of errors; will be <b>propagated to the caller</b>

         * (note: do not throw TransactionException subclasses here!)

         */

        void afterCommit();

        /**

         * Invoked after transaction commit/rollback.

         * Can perform resource cleanup <i>after</i> transaction completion.

         * <p><b>NOTE:</b> The transaction will have been committed or rolled back already,

         * but the transactional resources might still be active and accessible. As a

         * consequence, any data access code triggered at this point will still "participate"

         * in the original transaction, allowing to perform some cleanup (with no commit

         * following anymore!), unless it explicitly declares that it needs to run in a

         * separate transaction. Hence: <b>Use {@code PROPAGATION_REQUIRES_NEW}

         * for any transactional operation that is called from here.</b>

         

    人气教程排行