Talk about the implementation of jpa’s batch operation.

  jdbc

Order

This article mainly studies the realization of jpa’s batch operation.

Save method

SessionImpl.persist

hibernate-core-5.0.12.Final-sources.jar! /org/hibernate/internal/SessionImpl.java

    @Override
    public void persist(String entityName, Object object) throws HibernateException {
        firePersist( new PersistEvent( entityName, object, this ) );
    }

    private void firePersist(PersistEvent event) {
        errorIfClosed();
        checkTransactionSynchStatus();
        checkNoUnresolvedActionsBeforeOperation();
        for ( PersistEventListener listener : listeners( EventType.PERSIST ) ) {
            listener.onPersist( event );
        }
        checkNoUnresolvedActionsAfterOperation();
    }

Persist event triggered

DefaultPersistEventListener.onPersist

hibernate-core-5.0.12.Final-sources.jar! /org/hibernate/event/internal/DefaultPersistEventListener.java

    /**
     * Handle the given create event.
     *
     * @param event The create event to be handled.
     *
     * @throws HibernateException
     */
    public void onPersist(PersistEvent event) throws HibernateException {
        onPersist( event, new IdentityHashMap( 10 ) );
    }

    /**
     * Handle the given create event.
     *
     * @param event The create event to be handled.
     *
     * @throws HibernateException
     */
    public void onPersist(PersistEvent event, Map createCache) throws HibernateException {
        final SessionImplementor source = event.getSession();
        final Object object = event.getObject();

        final Object entity;
        if ( object instanceof HibernateProxy ) {
            LazyInitializer li = ( (HibernateProxy) object ).getHibernateLazyInitializer();
            if ( li.isUninitialized() ) {
                if ( li.getSession() == source ) {
                    return; //NOTE EARLY EXIT!
                }
                else {
                    throw new PersistentObjectException( "uninitialized proxy passed to persist()" );
                }
            }
            entity = li.getImplementation();
        }
        else {
            entity = object;
        }

        final String entityName;
        if ( event.getEntityName() != null ) {
            entityName = event.getEntityName();
        }
        else {
            entityName = source.bestGuessEntityName( entity );
            event.setEntityName( entityName );
        }

        final EntityEntry entityEntry = source.getPersistenceContext().getEntry( entity );
        EntityState entityState = getEntityState( entity, entityName, entityEntry, source );
        if ( entityState == EntityState.DETACHED ) {
            // JPA 2, in its version of a "foreign generated", allows the id attribute value
            // to be manually set by the user, even though this manual value is irrelevant.
            // The issue is that this causes problems with the Hibernate unsaved-value strategy
            // which comes into play here in determining detached/transient state.
            //
            // Detect if we have this situation and if so null out the id value and calculate the
            // entity state again.

            // NOTE: entityEntry must be null to get here, so we cannot use any of its values
            EntityPersister persister = source.getFactory().getEntityPersister( entityName );
            if ( ForeignGenerator.class.isInstance( persister.getIdentifierGenerator() ) ) {
                if ( LOG.isDebugEnabled() && persister.getIdentifier( entity, source ) != null ) {
                    LOG.debug( "Resetting entity id attribute to null for foreign generator" );
                }
                persister.setIdentifier( entity, null, source );
                entityState = getEntityState( entity, entityName, entityEntry, source );
            }
        }

        switch ( entityState ) {
            case DETACHED: {
                throw new PersistentObjectException(
                        "detached entity passed to persist: " +
                                getLoggableName( event.getEntityName(), entity )
                );
            }
            case PERSISTENT: {
                entityIsPersistent( event, createCache );
                break;
            }
            case TRANSIENT: {
                entityIsTransient( event, createCache );
                break;
            }
            case DELETED: {
                entityEntry.setStatus( Status.MANAGED );
                entityEntry.setDeletedState( null );
                event.getSession().getActionQueue().unScheduleDeletion( entityEntry, event.getObject() );
                entityIsDeleted( event, createCache );
                break;
            }
            default: {
                throw new ObjectDeletedException(
                        "deleted entity passed to persist",
                        null,
                        getLoggableName( event.getEntityName(), entity )
                );
            }
        }

    }

Calling entityIsTransient Method

entityIsTransient

    /**
     * Handle the given create event.
     *
     * @param event The save event to be handled.
     * @param createCache The copy cache of entity instance to merge/copy instance.
     */
    @SuppressWarnings({"unchecked"})
    protected void entityIsTransient(PersistEvent event, Map createCache) {
        LOG.trace( "Saving transient instance" );

        final EventSource source = event.getSession();
        final Object entity = source.getPersistenceContext().unproxy( event.getObject() );

        if ( createCache.put( entity, entity ) == null ) {
            saveWithGeneratedId( entity, event.getEntityName(), createCache, source, false );
        }
    }

AbstractSaveEventListener.saveWithGeneratedId

hibernate-core-5.0.12.Final-sources.jar! /org/hibernate/event/internal/AbstractSaveEventListener.java

    /**
     * Prepares the save call using a newly generated id.
     *
     * @param entity The entity to be saved
     * @param entityName The entity-name for the entity to be saved
     * @param anything Generally cascade-specific information.
     * @param source The session which is the source of this save event.
     * @param requiresImmediateIdAccess does the event context require
     * access to the identifier immediately after execution of this method (if
     * not, post-insert style id generators may be postponed if we are outside
     * a transaction).
     *
     * @return The id used to save the entity; may be null depending on the
     *         type of id generator used and the requiresImmediateIdAccess value
     */
    protected Serializable saveWithGeneratedId(
            Object entity,
            String entityName,
            Object anything,
            EventSource source,
            boolean requiresImmediateIdAccess) {
        EntityPersister persister = source.getEntityPersister( entityName, entity );
        Serializable generatedId = persister.getIdentifierGenerator().generate( source, entity );
        if ( generatedId == null ) {
            throw new IdentifierGenerationException( "null id generated for:" + entity.getClass() );
        }
        else if ( generatedId == IdentifierGeneratorHelper.SHORT_CIRCUIT_INDICATOR ) {
            return source.getIdentifier( entity );
        }
        else if ( generatedId == IdentifierGeneratorHelper.POST_INSERT_INDICATOR ) {
            return performSave( entity, null, persister, true, anything, source, requiresImmediateIdAccess );
        }
        else {
            // TODO: define toString()s for generators
            if ( LOG.isDebugEnabled() ) {
                LOG.debugf(
                        "Generated identifier: %s, using strategy: %s",
                        persister.getIdentifierType().toLoggableString( generatedId, source.getFactory() ),
                        persister.getIdentifierGenerator().getClass().getName()
                );
            }

            return performSave( entity, generatedId, persister, false, anything, source, true );
        }
    }

Finally performSave is to call performSaveOrReplicate.

performSaveOrReplicate

    /**
     * Performs all the actual work needed to save an entity (well to get the save moved to
     * the execution queue).
     *
     * @param entity The entity to be saved
     * @param key The id to be used for saving the entity (or null, in the case of identity columns)
     * @param persister The entity's persister instance.
     * @param useIdentityColumn Should an identity column be used for id generation?
     * @param anything Generally cascade-specific information.
     * @param source The session which is the source of the current event.
     * @param requiresImmediateIdAccess Is access to the identifier required immediately
     * after the completion of the save?  persist(), for example, does not require this...
     *
     * @return The id used to save the entity; may be null depending on the
     *         type of id generator used and the requiresImmediateIdAccess value
     */
    protected Serializable performSaveOrReplicate(
            Object entity,
            EntityKey key,
            EntityPersister persister,
            boolean useIdentityColumn,
            Object anything,
            EventSource source,
            boolean requiresImmediateIdAccess) {

        Serializable id = key == null ? null : key.getIdentifier();

        boolean inTxn = source.isTransactionInProgress();
        boolean shouldDelayIdentityInserts = !inTxn && !requiresImmediateIdAccess;

        // Put a placeholder in entries, so we don't recurse back and try to save() the
        // same object again. QUESTION: should this be done before onSave() is called?
        // likewise, should it be done before onUpdate()?
        EntityEntry original = source.getPersistenceContext().addEntry(
                entity,
                Status.SAVING,
                null,
                null,
                id,
                null,
                LockMode.WRITE,
                useIdentityColumn,
                persister,
                false,
                false
        );

        cascadeBeforeSave( source, persister, entity, anything );

        Object[] values = persister.getPropertyValuesToInsert( entity, getMergeMap( anything ), source );
        Type[] types = persister.getPropertyTypes();

        boolean substitute = substituteValuesIfNecessary( entity, id, values, persister, source );

        if ( persister.hasCollections() ) {
            substitute = substitute || visitCollectionsBeforeSave( entity, id, values, types, source );
        }

        if ( substitute ) {
            persister.setPropertyValues( entity, values );
        }

        TypeHelper.deepCopy(
                values,
                types,
                persister.getPropertyUpdateability(),
                values,
                source
        );

        AbstractEntityInsertAction insert = addInsertAction(
                values, id, entity, persister, useIdentityColumn, source, shouldDelayIdentityInserts
        );

        // postpone initializing id in case the insert has non-nullable transient dependencies
        // that are not resolved until cascadeAfterSave() is executed
        cascadeAfterSave( source, persister, entity, anything );
        if ( useIdentityColumn && insert.isEarlyInsert() ) {
            if ( !EntityIdentityInsertAction.class.isInstance( insert ) ) {
                throw new IllegalStateException(
                        "Insert should be using an identity column, but action is of unexpected type: " +
                                insert.getClass().getName()
                );
            }
            id = ((EntityIdentityInsertAction) insert).getGeneratedId();

            insert.handleNaturalIdPostSaveNotifications( id );
        }

        markInterceptorDirty( entity, persister, source );

        EntityEntry newEntry = source.getPersistenceContext().getEntry( entity );

        if ( newEntry != original ) {
            EntityEntryExtraState extraState = newEntry.getExtraState( EntityEntryExtraState.class );
            if ( extraState == null ) {
                newEntry.addExtraState( original.getExtraState( EntityEntryExtraState.class ) );
            }
        }

        return id;
    }

AddInsertAction is called here

addInsertAction

    private AbstractEntityInsertAction addInsertAction(
            Object[] values,
            Serializable id,
            Object entity,
            EntityPersister persister,
            boolean useIdentityColumn,
            EventSource source,
            boolean shouldDelayIdentityInserts) {
        if ( useIdentityColumn ) {
            EntityIdentityInsertAction insert = new EntityIdentityInsertAction(
                    values, entity, persister, isVersionIncrementDisabled(), source, shouldDelayIdentityInserts
            );
            source.getActionQueue().addAction( insert );
            return insert;
        }
        else {
            Object version = Versioning.getVersion( values, persister );
            EntityInsertAction insert = new EntityInsertAction(
                    id, values, entity, version, persister, isVersionIncrementDisabled(), source
            );
            source.getActionQueue().addAction( insert );
            return insert;
        }
    }

ActionQueue.addAction

hibernate-core-5.0.12.Final-sources.jar! /org/hibernate/engine/spi/ActionQueue.java

    /**
     * Adds an entity insert action
     *
     * @param action The action representing the entity insertion
     */
    public void addAction(EntityInsertAction action) {
        LOG.tracev( "Adding an EntityInsertAction for [{0}] object", action.getEntityName() );
        addInsertAction( action );
    }

    private void addInsertAction(AbstractEntityInsertAction insert) {
        if ( insert.isEarlyInsert() ) {
            // For early inserts, must execute inserts before finding non-nullable transient entities.
            // TODO: find out why this is necessary
            LOG.tracev( "Executing inserts before finding non-nullable transient entities for early insert: [{0}]", insert );
            executeInserts();
        }
        NonNullableTransientDependencies nonNullableTransientDependencies = insert.findNonNullableTransientEntities();
        if ( nonNullableTransientDependencies == null ) {
            LOG.tracev( "Adding insert with no non-nullable, transient entities: [{0}]", insert );
            addResolvedEntityInsertAction( insert );
        }
        else {
            if ( LOG.isTraceEnabled() ) {
                LOG.tracev( "Adding insert with non-nullable, transient entities; insert=[{0}], dependencies=[{1}]", insert,
                            nonNullableTransientDependencies.toLoggableString( insert.getSession() ) );
            }
            if( unresolvedInsertions == null ) {
                unresolvedInsertions = new UnresolvedEntityInsertActions();
            }
            unresolvedInsertions.addUnresolvedEntityInsertAction( insert, nonNullableTransientDependencies );
        }
    }

    private void addResolvedEntityInsertAction(AbstractEntityInsertAction insert) {
        if ( insert.isEarlyInsert() ) {
            LOG.trace( "Executing insertions before resolved early-insert" );
            executeInserts();
            LOG.debug( "Executing identity-insert immediately" );
            execute( insert );
        }
        else {
            LOG.trace( "Adding resolved non-early insert action." );
            addAction( AbstractEntityInsertAction.class, insert );
        }
        insert.makeEntityManaged();
        if( unresolvedInsertions != null ) {
            for (AbstractEntityInsertAction resolvedAction : unresolvedInsertions.resolveDependentActions(insert.getInstance(), session)) {
                addResolvedEntityInsertAction(resolvedAction);
            }
        }
    }

    @SuppressWarnings("unchecked")
    private <T extends Executable & Comparable & Serializable> void addAction(Class<T> executableClass, T action) {
        EXECUTABLE_LISTS_MAP.get( executableClass ).getOrInit( this ).add( action );
    }

Executable _ lists _ map.get (executableclass) means executable _ lists _ map.get (abstractentityinsertaction.class)–> returns ActionQueue

private static abstract class ListProvider<T extends Executable & Comparable & Serializable> {
        abstract ExecutableList<T> get(ActionQueue instance);
        abstract ExecutableList<T> init(ActionQueue instance);
        ExecutableList<T> getOrInit( ActionQueue instance ) {
            ExecutableList<T> list = get( instance );
            if ( list == null ) {
                list = init( instance );
            }
            return list;
        }
    }

Add to ExecutableList

Flush method

SessionImpl.flush

hibernate-core-5.0.12.Final-sources.jar! /org/hibernate/internal/SessionImpl.java

@Override
    public void flush() throws HibernateException {
        errorIfClosed();
        checkTransactionSynchStatus();
        if ( persistenceContext.getCascadeLevel() > 0 ) {
            throw new HibernateException( "Flush during cascade is dangerous" );
        }
        FlushEvent flushEvent = new FlushEvent( this );
        for ( FlushEventListener listener : listeners( EventType.FLUSH ) ) {
            listener.onFlush( flushEvent );
        }
        delayedAfterCompletion();
    }

DefaultFlushEventListener.onFlush

hibernate-core-5.0.12.Final-sources.jar! /org/hibernate/event/internal/DefaultFlushEventListener.java

/** Handle the given flush event.
     *
     * @param event The flush event to be handled.
     * @throws HibernateException
     */
    public void onFlush(FlushEvent event) throws HibernateException {
        final EventSource source = event.getSession();
        final PersistenceContext persistenceContext = source.getPersistenceContext();

        if ( persistenceContext.getNumberOfManagedEntities() > 0 ||
                persistenceContext.getCollectionEntries().size() > 0 ) {

            try {
                source.getEventListenerManager().flushStart();

                flushEverythingToExecutions( event );
                performExecutions( source );
                postFlush( source );
            }
            finally {
                source.getEventListenerManager().flushEnd(
                        event.getNumberOfEntitiesProcessed(),
                        event.getNumberOfCollectionsProcessed()
                );
            }

            postPostFlush( source );

            if ( source.getFactory().getStatistics().isStatisticsEnabled() ) {
                source.getFactory().getStatisticsImplementor().flush();
            }
        }
    }

PerformExecutions were called here.

AbstractFlushingEventListener.performExecutions

hibernate-core-5.0.12.Final-sources.jar! /org/hibernate/event/internal/AbstractFlushingEventListener.java

    /**
     * Execute all SQL (and second-level cache updates) in a special order so that foreign-key constraints cannot
     * be violated: <ol>
     * <li> Inserts, in the order they were performed
     * <li> Updates
     * <li> Deletion of collection elements
     * <li> Insertion of collection elements
     * <li> Deletes, in the order they were performed
     * </ol>
     *
     * @param session The session being flushed
     */
    protected void performExecutions(EventSource session) {
        LOG.trace( "Executing flush" );

        // IMPL NOTE : here we alter the flushing flag of the persistence context to allow
        //        during-flush callbacks more leniency in regards to initializing proxies and
        //        lazy collections during their processing.
        // For more information, see HHH-2763
        try {
            session.getJdbcCoordinator().flushBeginning();
            session.getPersistenceContext().setFlushing( true );
            // we need to lock the collection caches before executing entity inserts/updates in order to
            // account for bi-directional associations
            session.getActionQueue().prepareActions();
            session.getActionQueue().executeActions();
        }
        finally {
            session.getPersistenceContext().setFlushing( false );
            session.getJdbcCoordinator().flushEnding();
        }
    }

Getactionqueue (). executeactions (); is called here.

ActionQueue.executeActions

hibernate-core-5.0.12.Final-sources.jar! /org/hibernate/engine/spi/ActionQueue.java

    /**
     * Perform all currently queued actions.
     * 
     * @throws HibernateException error executing queued actions.
     */
    public void executeActions() throws HibernateException {
        if ( hasUnresolvedEntityInsertActions() ) {
            throw new IllegalStateException( "About to execute actions, but there are unresolved entity insert actions." );
        }

        for ( ListProvider listProvider : EXECUTABLE_LISTS_MAP.values() ) {
            ExecutableList<?> l = listProvider.get( this );
            if ( l != null && !l.isEmpty() ) {
                executeActions( l );
            }
        }
    }

        /**
     * Perform {@link org.hibernate.action.spi.Executable#execute()} on each element of the list
     * 
     * @param list The list of Executable elements to be performed
     *
     * @throws HibernateException
     */
    private <E extends Executable & Comparable<?> & Serializable> void executeActions(ExecutableList<E> list) throws HibernateException {
        // todo : consider ways to improve the double iteration of Executables here:
        //        1) we explicitly iterate list here to perform Executable#execute()
        //        2) ExecutableList#getQuerySpaces also iterates the Executables to collect query spaces.
        try {
            for ( E e : list ) {
                try {
                    e.execute();
                }
                finally {
                    if( e.getBeforeTransactionCompletionProcess() != null ) {
                        if( beforeTransactionProcesses == null ) {
                            beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session );
                        }
                        beforeTransactionProcesses.register(e.getBeforeTransactionCompletionProcess());
                    }
                    if( e.getAfterTransactionCompletionProcess() != null ) {
                        if( afterTransactionProcesses == null ) {
                            afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session );
                        }
                        afterTransactionProcesses.register(e.getAfterTransactionCompletionProcess());
                    }
                }
            }
        }
        finally {
            if ( session.getFactory().getSessionFactoryOptions().isQueryCacheEnabled() ) {
                // Strictly speaking, only a subset of the list may have been processed if a RuntimeException occurs.
                // We still invalidate all spaces. I don't see this as a big deal - after all, RuntimeExceptions are
                // unexpected.
                Set<Serializable> propertySpaces = list.getQuerySpaces();
                invalidateSpaces( propertySpaces.toArray( new Serializable[propertySpaces.size()] ) );
            }
        }

        list.clear();
        session.getJdbcCoordinator().executeBatch();
    }

Here, e.execute () is called in the for loop; At the same time, after the loop and finally, the session. getjdbccooordinator (). executebatch () was called.
Is in line with the jdbc statement’s call mode of executeBatch. it can be predicted that e.execute () performs addBatch operations, and when a batch is reached, executeBatch () will be called first

EntityInsertAction.execute

hibernate-core-5.0.12.Final-sources.jar! /org/hibernate/action/internal/EntityInsertAction.java

    @Override
    public void execute() throws HibernateException {
        nullifyTransientReferencesIfNotAlready();

        final EntityPersister persister = getPersister();
        final SessionImplementor session = getSession();
        final Object instance = getInstance();
        final Serializable id = getId();

        final boolean veto = preInsert();

        // Don't need to lock the cache here, since if someone
        // else inserted the same pk first, the insert would fail

        if ( !veto ) {
            
            persister.insert( id, getState(), instance, session );
            PersistenceContext persistenceContext = session.getPersistenceContext();
            final EntityEntry entry = persistenceContext.getEntry( instance );
            if ( entry == null ) {
                throw new AssertionFailure( "possible non-threadsafe access to session" );
            }
            
            entry.postInsert( getState() );
    
            if ( persister.hasInsertGeneratedProperties() ) {
                persister.processInsertGeneratedProperties( id, instance, getState(), session );
                if ( persister.isVersionPropertyGenerated() ) {
                    version = Versioning.getVersion( getState(), persister );
                }
                entry.postUpdate( instance, getState(), version );
            }

            persistenceContext.registerInsertedKey( persister, getId() );
        }

        final SessionFactoryImplementor factory = session.getFactory();

        if ( isCachePutEnabled( persister, session ) ) {
            final CacheEntry ce = persister.buildCacheEntry(
                    instance,
                    getState(),
                    version,
                    session
            );
            cacheEntry = persister.getCacheEntryStructure().structure( ce );
            final EntityRegionAccessStrategy cache = persister.getCacheAccessStrategy();
            final Object ck = cache.generateCacheKey( id, persister, factory, session.getTenantIdentifier() );

            final boolean put = cacheInsert( persister, ck );

            if ( put && factory.getStatistics().isStatisticsEnabled() ) {
                factory.getStatisticsImplementor().secondLevelCachePut( cache.getRegion().getName() );
            }
        }

        handleNaturalIdPostSaveNotifications( id );

        postInsert();

        if ( factory.getStatistics().isStatisticsEnabled() && !veto ) {
            factory.getStatisticsImplementor().insertEntity( getPersister().getEntityName() );
        }

        markExecuted();
    }

The insert method of persister was called

AbstractEntityPersister.insert

hibernate-core-5.0.12.Final-sources.jar! /org/hibernate/persister/entity/AbstractEntityPersister.java

    public void insert(Serializable id, Object[] fields, Object object, SessionImplementor session) {
        // apply any pre-insert in-memory value generation
        preInsertInMemoryValueGeneration( fields, object, session );

        final int span = getTableSpan();
        if ( entityMetamodel.isDynamicInsert() ) {
            // For the case of dynamic-insert="true", we need to generate the INSERT SQL
            boolean[] notNull = getPropertiesToInsert( fields );
            for ( int j = 0; j < span; j++ ) {
                insert( id, fields, notNull, j, generateInsertString( notNull, j ), object, session );
            }
        }
        else {
            // For the case of dynamic-insert="false", use the static SQL
            for ( int j = 0; j < span; j++ ) {
                insert( id, fields, getPropertyInsertability(), j, getSQLInsertStrings()[j], object, session );
            }
        }
    }

insert

    /**
     * Perform an SQL INSERT.
     * <p/>
     * This for is used for all non-root tables as well as the root table
     * in cases where the identifier value is known before the insert occurs.
     */
    protected void insert(
            final Serializable id,
            final Object[] fields,
            final boolean[] notNull,
            final int j,
            final String sql,
            final Object object,
            final SessionImplementor session) throws HibernateException {

        if ( isInverseTable( j ) ) {
            return;
        }

        //note: it is conceptually possible that a UserType could map null to
        //      a non-null value, so the following is arguable:
        if ( isNullableTable( j ) && isAllNull( fields, j ) ) {
            return;
        }

        if ( LOG.isTraceEnabled() ) {
            LOG.tracev( "Inserting entity: {0}", MessageHelper.infoString( this, id, getFactory() ) );
            if ( j == 0 && isVersioned() ) {
                LOG.tracev( "Version: {0}", Versioning.getVersion( fields, this ) );
            }
        }

        // TODO : shouldn't inserts be Expectations.NONE?
        final Expectation expectation = Expectations.appropriateExpectation( insertResultCheckStyles[j] );
        // we can't batch joined inserts, *especially* not if it is an identity insert;
        // nor can we batch statements where the expectation is based on an output param
        final boolean useBatch = j == 0 && expectation.canBeBatched();
        if ( useBatch && inserBatchKey == null ) {
            inserBatchKey = new BasicBatchKey(
                    getEntityName() + "#INSERT",
                    expectation
            );
        }
        final boolean callable = isInsertCallable( j );

        try {
            // Render the SQL query
            final PreparedStatement insert;
            if ( useBatch ) {
                insert = session
                        .getJdbcCoordinator()
                        .getBatch( inserBatchKey )
                        .getBatchStatement( sql, callable );
            }
            else {
                insert = session
                        .getJdbcCoordinator()
                        .getStatementPreparer()
                        .prepareStatement( sql, callable );
            }

            try {
                int index = 1;
                index += expectation.prepare( insert );

                // Write the values of fields onto the prepared statement - we MUST use the state at the time the
                // insert was issued (cos of foreign key constraints). Not necessarily the object's current state

                dehydrate( id, fields, null, notNull, propertyColumnInsertable, j, insert, session, index, false );

                if ( useBatch ) {
                    session.getJdbcCoordinator().getBatch( inserBatchKey ).addToBatch();
                }
                else {
                    expectation.verifyOutcome(
                            session.getJdbcCoordinator()
                                    .getResultSetReturn()
                                    .executeUpdate( insert ), insert, -1
                    );
                }

            }
            catch (SQLException e) {
                if ( useBatch ) {
                    session.getJdbcCoordinator().abortBatch();
                }
                throw e;
            }
            finally {
                if ( !useBatch ) {
                    session.getJdbcCoordinator().getResourceRegistry().release( insert );
                    session.getJdbcCoordinator().afterStatementExecution();
                }
            }
        }
        catch (SQLException e) {
            throw getFactory().getSQLExceptionHelper().convert(
                    e,
                    "could not insert: " + MessageHelper.infoString( this ),
                    sql
            );
        }

    }

UseBatch is true, calling session.getjdbccooordinator (). getbatch (insertatchkey) .addtobatch ()
The insertBatchKey here is com.example.domain.demoser # insert

JdbcCoordinatorImpl.getBatch

hibernate-core-5.0.12.Final-sources.jar! /org/hibernate/engine/jdbc/internal/JdbcCoordinatorImpl.java

    @Override
    public Batch getBatch(BatchKey key) {
        if ( currentBatch != null ) {
            if ( currentBatch.getKey().equals( key ) ) {
                return currentBatch;
            }
            else {
                currentBatch.execute();
                currentBatch.release();
            }
        }
        currentBatch = batchBuilder().buildBatch( key, this );
        return currentBatch;
    }

BatchingBatch.addToBatch

hibernate-core-5.0.12.Final-sources.jar! /org/hibernate/engine/jdbc/batch/internal/BatchingBatch.java


    @Override
    public void addToBatch() {
        try {
            currentStatement.addBatch();
        }
        catch ( SQLException e ) {
            LOG.debugf( "SQLException escaped proxy", e );
            throw sqlExceptionHelper().convert( e, "could not perform addBatch", currentStatementSql );
        }
        statementPosition++;
        if ( statementPosition >= getKey().getBatchedStatementCount() ) {
            batchPosition++;
            if ( batchPosition == batchSize ) {
                notifyObserversImplicitExecution();
                performExecution();
                batchPosition = 0;
                batchExecuted = true;
            }
            statementPosition = 0;
        }
    }

If there is enough batch here, performExecution will be executed.

performExecution

private void performExecution() {
        LOG.debugf( "Executing batch size: %s", batchPosition );
        try {
            for ( Map.Entry<String,PreparedStatement> entry : getStatements().entrySet() ) {
                try {
                    final PreparedStatement statement = entry.getValue();
                    final int[] rowCounts;
                    try {
                        getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchStart();
                        rowCounts = statement.executeBatch();
                    }
                    finally {
                        getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchEnd();
                    }
                    checkRowCounts( rowCounts, statement );
                }
                catch ( SQLException e ) {
                    abortBatch();
                    throw sqlExceptionHelper().convert( e, "could not execute batch", entry.getKey() );
                }
            }
        }
        catch ( RuntimeException re ) {
            LOG.unableToExecuteBatch( re.getMessage() );
            throw re;
        }
        finally {
            batchPosition = 0;
        }
    }

You can see that statement.executeBatch () is called here.

Summary

  • Jpa’s save method first adds the data to the action queue
  • When flush, insert action to construct the batch operation of statement, and then perform when reaching a batch.
  • Jpa’s batch operation is also encapsulated on jdbc’s statment’s addBatch and executeBatch. For details, please refer to ActionQueue.executeActions.

The specific model is as follows

    public void jdbcBatchOperationTemplate(List<Employee> data){
        String sql = "insert into employee (name, city, phone) values (?, ?, ?)";

        Connection conn = null;
        PreparedStatement pstmt = null;

        final int batchSize = 1000;
        int count = 0;

        try{
            conn = dataSource.getConnection();
            pstmt = conn.prepareStatement(sql);

            for (Employee item: data) {
                pstmt.setString(1,item.getName());
                pstmt.setString(2,item.getCity());
                pstmt.setString(3,item.getPhone());

                //添加到batch
                pstmt.addBatch();

                //小批量提交,避免OOM
                if(++count % batchSize == 0) {
                    pstmt.executeBatch();
                }
            }

            pstmt.executeBatch(); //提交剩余的数据

        }catch (SQLException e){
            e.printStackTrace();
        }finally {
            DbUtils.closeQuietly(pstmt);
            DbUtils.closeQuietly(conn);
        }
    }

The only difference is that jpa submits all data to action queue when saving, and triggers addBatch and executeBatch operations similar to above when flush.

For using @ generatedvalue (strategy = generationtype.auto), the database is called to get the id before every save is added to action queue. In other words, assuming that 1, 000 pieces of data are to be insert1000 in batch, save will call 1000 times to obtain their id before putting it into action queue, and then when flushing at last, it will execute 1000 pieces of data of action queue in batch, which is equivalent to that the method Data parameter of the above template is 1,000 Employee objects with id.

    select
        nextval ('hibernate_sequence')

doc