Skip to content

Commit

Permalink
HSEARCH-5076 WIP batch rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
marko-bekhta committed Oct 28, 2024
1 parent 3037821 commit a4f38cb
Show file tree
Hide file tree
Showing 32 changed files with 1,048 additions and 277 deletions.
1 change: 1 addition & 0 deletions build/jqassistant/rules/rules.xml
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@
WHEN 'hibernate-search-mapper-pojo-standalone' THEN 'StandalonePojo'
WHEN 'hibernate-search-mapper-orm' THEN 'HibernateOrm'
WHEN 'hibernate-search-mapper-orm-outbox-polling' THEN 'OutboxPolling'
WHEN 'hibernate-search-mapper-orm-jakarta-batch-core' THEN 'BatchCore'
WHEN 'hibernate-search-mapper-orm-jakarta-batch-jberet' THEN 'JBeret'
ELSE 'UNKNOWN-MODULE-SPECIFIC-KEYWORD-PLEASE-UPDATE-JQASSISTANT-RULES'
END
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.hibernate.engine.spi.SessionFactoryImplementor;
import org.hibernate.search.jakarta.batch.core.logging.impl.Log;
import org.hibernate.search.jakarta.batch.core.massindexing.MassIndexingJob;
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.EntityTypeDescriptor;
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.BatchCoreEntityTypeDescriptor;
import org.hibernate.search.mapper.orm.Search;
import org.hibernate.search.mapper.orm.loading.spi.HibernateOrmLoadingTypeContext;
import org.hibernate.search.mapper.orm.mapping.SearchMapping;
Expand Down Expand Up @@ -145,11 +145,11 @@ private static <T> List<T> find(Session session, Class<T> clazz, String key, Str
.fetchHits( 1000 );
}

public static EntityTypeDescriptor<?, ?> createEntityTypeDescriptor(EntityManagerFactory emf, Class<?> clazz) {
public static BatchCoreEntityTypeDescriptor<?, ?> createEntityTypeDescriptor(EntityManagerFactory emf, Class<?> clazz) {
SearchMapping mapping = Search.mapping( emf );
BatchMappingContext mappingContext = (BatchMappingContext) mapping;
HibernateOrmLoadingTypeContext<?> type = mappingContext.typeContextProvider()
.byEntityName().getOrFail( mapping.indexedEntity( clazz ).jpaName() );
return EntityTypeDescriptor.create( emf.unwrap( SessionFactoryImplementor.class ), type );
return BatchCoreEntityTypeDescriptor.create( emf.unwrap( SessionFactoryImplementor.class ), type );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import jakarta.persistence.EntityManagerFactory;

import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.EntityTypeDescriptor;
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.BatchCoreEntityTypeDescriptor;
import org.hibernate.search.mapper.orm.tenancy.spi.TenancyConfiguration;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingDefaultCleanOperation;

Expand All @@ -33,7 +33,7 @@ public class JobContextData {
* In Jakarta Batch standard, only string values can be propagated using job properties, but class types are frequently
* used too. So this map has string keys to facilitate lookup for values extracted from job properties.
*/
private Map<String, EntityTypeDescriptor<?, ?>> entityTypeDescriptorMap;
private Map<String, BatchCoreEntityTypeDescriptor<?, ?>> entityTypeDescriptorMap;

private TenancyConfiguration tenancyConfiguration;
private MassIndexingDefaultCleanOperation massIndexingDefaultCleanOperation;
Expand All @@ -50,8 +50,8 @@ public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) {
this.entityManagerFactory = entityManagerFactory;
}

public void setEntityTypeDescriptors(Collection<EntityTypeDescriptor<?, ?>> descriptors) {
for ( EntityTypeDescriptor<?, ?> descriptor : descriptors ) {
public void setEntityTypeDescriptors(Collection<BatchCoreEntityTypeDescriptor<?, ?>> descriptors) {
for ( BatchCoreEntityTypeDescriptor<?, ?> descriptor : descriptors ) {
entityTypeDescriptorMap.put( descriptor.jpaEntityName(), descriptor );
}
}
Expand All @@ -72,22 +72,22 @@ public void setMassIndexingDefaultCleanOperation(MassIndexingDefaultCleanOperati
this.massIndexingDefaultCleanOperation = massIndexingDefaultCleanOperation;
}

public EntityTypeDescriptor<?, ?> getEntityTypeDescriptor(String entityName) {
EntityTypeDescriptor<?, ?> descriptor = entityTypeDescriptorMap.get( entityName );
public BatchCoreEntityTypeDescriptor<?, ?> getEntityTypeDescriptor(String entityName) {
BatchCoreEntityTypeDescriptor<?, ?> descriptor = entityTypeDescriptorMap.get( entityName );
if ( descriptor == null ) {
String msg = String.format( Locale.ROOT, "entity type %s not found.", entityName );
throw new NoSuchElementException( msg );
}
return descriptor;
}

public List<EntityTypeDescriptor<?, ?>> getEntityTypeDescriptors() {
public List<BatchCoreEntityTypeDescriptor<?, ?>> getEntityTypeDescriptors() {
return new ArrayList<>( entityTypeDescriptorMap.values() );
}

public List<Class<?>> getEntityTypes() {
return entityTypeDescriptorMap.values().stream()
.map( EntityTypeDescriptor::javaClass )
.map( BatchCoreEntityTypeDescriptor::javaClass )
.collect( Collectors.toList() );
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.search.jakarta.batch.core.massindexing.loading.impl;

import java.util.List;

import jakarta.persistence.LockModeType;

import org.hibernate.FlushMode;
import org.hibernate.Session;
import org.hibernate.query.Query;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntityLoader;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntityLoadingOptions;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntitySink;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchLoadingTypeContext;

public class BatchCoreDefaultHibernateOrmBatchEntityLoader<E> implements HibernateOrmBatchEntityLoader {
private static final String ID_PARAMETER_NAME = "ids";

private final HibernateOrmBatchEntitySink<E> sink;
private final Query<E> query;

public BatchCoreDefaultHibernateOrmBatchEntityLoader(HibernateOrmBatchLoadingTypeContext<E> typeContext,
HibernateOrmBatchEntitySink<E> sink, HibernateOrmBatchEntityLoadingOptions options) {
this.sink = sink;

StringBuilder query = new StringBuilder();
query.append( "select e from " )
.append( typeContext.jpaEntityName() )
.append( " e where e." )
.append( typeContext.uniquePropertyName() )
.append( " in(:" )
.append( ID_PARAMETER_NAME )
.append( ")" );

this.query = options.context( Session.class ).createQuery( query.toString(), typeContext.javaClass() )
.setReadOnly( true )
.setCacheable( false )
.setLockMode( LockModeType.NONE )
.setCacheMode( options.cacheMode() )
.setHibernateFlushMode( FlushMode.MANUAL )
.setFetchSize( options.batchSize() );
}

@Override
public void close() {
}

@Override
public void load(List<Object> identifiers) {
sink.accept( query.setParameter( ID_PARAMETER_NAME, identifiers ).list() );
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.search.jakarta.batch.core.massindexing.loading.impl;

import java.util.HashSet;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;

import jakarta.persistence.LockModeType;

import org.hibernate.ScrollMode;
import org.hibernate.ScrollableResults;
import org.hibernate.StatelessSession;
import org.hibernate.query.Query;
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.IdOrder;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchIdentifierLoader;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchIdentifierLoadingOptions;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchLoadingTypeContext;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchReindexCondition;
import org.hibernate.search.util.common.AssertionFailure;
import org.hibernate.search.util.common.impl.Closer;

public class BatchCoreDefaultHibernateOrmBatchIdentifierLoader<E> implements HibernateOrmBatchIdentifierLoader {

private final StatelessSession session;
private final String ormEntityName;
private final String uniquePropertyName;
private final IdOrder idOrder;
private final HibernateOrmBatchIdentifierLoadingOptions options;
private final IdLoader idLoader;

public BatchCoreDefaultHibernateOrmBatchIdentifierLoader(HibernateOrmBatchLoadingTypeContext<E> typeContext,
HibernateOrmBatchIdentifierLoadingOptions options, IdOrder idOrder) {
this.session = options.context( StatelessSession.class );
this.ormEntityName = typeContext.jpaEntityName();
this.uniquePropertyName = typeContext.uniquePropertyName();
this.idOrder = idOrder;
this.options = options;
this.idLoader = options.maxResults().orElse( -1 ) == 1 ? new QuerySingleIdLoader() : new ScrollIdLoader();
}

@Override
public void close() {
try ( Closer<RuntimeException> closer = new Closer<>() ) {
if ( idLoader != null ) {
closer.push( IdLoader::close, idLoader );
}
}
}

@Override
public OptionalLong totalCount() {
StringBuilder query = new StringBuilder();
query.append( "select count(e) from " )
.append( ormEntityName )
.append( " e " );

return OptionalLong.of( createQuery( session, query,
options.reindexOnlyCondition().map( Set::of ).orElseGet( Set::of ), Long.class, Optional.empty() )
.uniqueResult() );
}

@Override
public Object next() {
return idLoader.next();
}

@Override
public boolean hasNext() {
return idLoader.hasNext();
}

private Query<Object> createQueryLoading(StatelessSession session) {
StringBuilder query = new StringBuilder();
query.append( "select e." )
.append( uniquePropertyName )
.append( " from " )
.append( ormEntityName )
.append( " e " );
Set<HibernateOrmBatchReindexCondition> conditions = new HashSet<>();
options.reindexOnlyCondition().ifPresent( conditions::add );
options.lowerBound().ifPresent( b -> conditions
.add( idOrder.idGreater( "HIBERNATE_SEARCH_ID_LOWER_BOUND_", b, options.lowerBoundInclusive() ) ) );
options.upperBound().ifPresent( b -> conditions
.add( idOrder.idLesser( "HIBERNATE_SEARCH_ID_UPPER_BOUND_", b, options.upperBoundInclusive() ) ) );

Query<Object> select = createQuery( session, query, conditions, Object.class, Optional.of( idOrder.ascOrder() ) )
.setFetchSize( options.fetchSize() )
.setReadOnly( true )
.setCacheable( false )
.setLockMode( LockModeType.NONE );
options.offset().ifPresent( select::setFirstResult );
options.maxResults().ifPresent( select::setMaxResults );
return select;
}

private <T> Query<T> createQuery(StatelessSession session,
StringBuilder hql, Set<HibernateOrmBatchReindexCondition> conditions, Class<T> returnedType,
Optional<String> order) {
if ( !conditions.isEmpty() ) {
hql.append( " where " );
hql.append( conditions.stream()
.map( c -> "( " + c.conditionString() + " )" )
.collect( Collectors.joining( " AND ", " ", " " ) )
);
}
order.ifPresent( o -> hql.append( " ORDER BY " ).append( o ) );
Query<T> query = session.createQuery( hql.toString(), returnedType )
.setCacheable( false );

for ( var condition : conditions ) {
for ( var entry : condition.params().entrySet() ) {
query.setParameter( entry.getKey(), entry.getValue() );
}
}

return query;
}

private interface IdLoader {
Object next();

boolean hasNext();

void close();
}

private class QuerySingleIdLoader implements IdLoader {

private boolean hasNextCalled = false;
private boolean nextCalled = false;

private Query<Object> id = createQueryLoading( session );
private Object currentId;

@Override
public Object next() {
if ( hasNextCalled ) {
nextCalled = true;
hasNextCalled = false;
return currentId;
}
else {
throw new AssertionFailure( "Cannot call next() before calling hasNext()" );
}
}

@Override
public boolean hasNext() {
if ( nextCalled ) {
// we expect to have just a single ID, so if we called next and got the id we don't need to execute the query anymore:
return false;
}
currentId = id.getSingleResultOrNull();
hasNextCalled = true;
return currentId != null;
}

@Override
public void close() {
id = null;
}
}

private class ScrollIdLoader implements IdLoader {
private ScrollableResults<Object> id = createQueryLoading( session ).scroll( ScrollMode.FORWARD_ONLY );

@Override
public Object next() {
return id.get();
}

@Override
public boolean hasNext() {
return id.next();
}

@Override
public void close() {
try ( Closer<RuntimeException> closer = new Closer<>() ) {
if ( id != null ) {
closer.push( ScrollableResults::close, id );
id = null;
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.search.jakarta.batch.core.massindexing.loading.impl;

import org.hibernate.metamodel.mapping.EmbeddableMappingType;
import org.hibernate.metamodel.mapping.EntityIdentifierMapping;
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.CompositeIdOrder;
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.IdOrder;
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.SingularIdOrder;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntityLoader;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntityLoadingOptions;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntitySink;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchIdentifierLoader;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchIdentifierLoadingOptions;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchLoadingStrategy;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchLoadingTypeContext;
import org.hibernate.search.mapper.orm.loading.spi.HibernateOrmLoadingTypeContext;

public class BatchCoreDefaultHibernateOrmBatchLoadingStrategy<E, I> implements HibernateOrmBatchLoadingStrategy<E, I> {

private final IdOrder idOrder;

public BatchCoreDefaultHibernateOrmBatchLoadingStrategy(HibernateOrmLoadingTypeContext<E> type) {
EntityIdentifierMapping identifierMapping = type.entityMappingType().getIdentifierMapping();
if ( identifierMapping.getPartMappingType() instanceof EmbeddableMappingType ) {
idOrder = new CompositeIdOrder<>( type );
}
else {
idOrder = new SingularIdOrder<>( type );
}
}

@Override
public HibernateOrmBatchIdentifierLoader createIdentifierLoader(HibernateOrmBatchLoadingTypeContext<E> typeContext,
HibernateOrmBatchIdentifierLoadingOptions options) {
return new BatchCoreDefaultHibernateOrmBatchIdentifierLoader<>( typeContext, options, idOrder );
}

@Override
public HibernateOrmBatchEntityLoader createEntityLoader(HibernateOrmBatchLoadingTypeContext<E> typeContext,
HibernateOrmBatchEntitySink<E> sink, HibernateOrmBatchEntityLoadingOptions options) {
return new BatchCoreDefaultHibernateOrmBatchEntityLoader<>( typeContext, sink, options );
}
}
Loading

0 comments on commit a4f38cb

Please sign in to comment.