Skip to content

Commit

Permalink
HSEARCH-5076 WIP batch rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
marko-bekhta committed Jan 10, 2025
1 parent 549b469 commit 6b3dca3
Show file tree
Hide file tree
Showing 34 changed files with 1,051 additions and 279 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ public class MavenProjectUtils {
public static final String HIBERNATE_SEARCH_PARENT_PUBLIC = "hibernate-search-parent-public";
public static final String HIBERNATE_SEARCH_PARENT_PUBLIC_LUCENE_NEXT = "hibernate-search-parent-public-lucene-next";
public static final String HIBERNATE_SEARCH_PARENT_INTEGRATION_TEST = "hibernate-search-parent-integrationtest";
public static final String HIBERNATE_SEARCH_PARENT_INTEGRATION_TEST_LUCENE_NEXT = "hibernate-search-parent-integrationtest-lucene-next";
public static final String HIBERNATE_SEARCH_PARENT_INTEGRATION_TEST_LUCENE_NEXT =
"hibernate-search-parent-integrationtest-lucene-next";
public static final String HIBERNATE_SEARCH_PARENT_RELOCATION = "hibernate-search-parent-relocation";
public static final String DEPLOY_SKIP = "deploy.skip";

Expand All @@ -35,7 +36,7 @@ public static boolean isAnyParentRelocationParent(MavenProject project) {
public static boolean isAnyParentIntegrationTestParent(MavenProject project) {
return project.hasParent()
&& ( HIBERNATE_SEARCH_PARENT_INTEGRATION_TEST.equals( project.getParent().getArtifactId() )
|| HIBERNATE_SEARCH_PARENT_INTEGRATION_TEST_LUCENE_NEXT.equals( project.getParent().getArtifactId() )
|| HIBERNATE_SEARCH_PARENT_INTEGRATION_TEST_LUCENE_NEXT.equals( project.getParent().getArtifactId() )
|| isAnyParentIntegrationTestParent( project.getParent() ) );
}

Expand Down
1 change: 1 addition & 0 deletions build/jqassistant/rules/rules.xml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@
WHEN 'hibernate-search-mapper-pojo-standalone' THEN 'StandalonePojo'
WHEN 'hibernate-search-mapper-orm' THEN 'HibernateOrm'
WHEN 'hibernate-search-mapper-orm-outbox-polling' THEN 'OutboxPolling'
WHEN 'hibernate-search-mapper-orm-jakarta-batch-core' THEN 'BatchCore'
WHEN 'hibernate-search-mapper-orm-jakarta-batch-jberet' THEN 'JBeret'
ELSE 'UNKNOWN-MODULE-SPECIFIC-KEYWORD-PLEASE-UPDATE-JQASSISTANT-RULES'
END
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.hibernate.SessionFactory;
import org.hibernate.engine.spi.SessionFactoryImplementor;
import org.hibernate.search.jakarta.batch.core.massindexing.MassIndexingJob;
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.EntityTypeDescriptor;
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.BatchCoreEntityTypeDescriptor;
import org.hibernate.search.mapper.orm.Search;
import org.hibernate.search.mapper.orm.loading.spi.HibernateOrmLoadingTypeContext;
import org.hibernate.search.mapper.orm.mapping.SearchMapping;
Expand Down Expand Up @@ -141,11 +141,11 @@ private static <T> List<T> find(Session session, Class<T> clazz, String key, Str
.fetchHits( 1000 );
}

public static EntityTypeDescriptor<?, ?> createEntityTypeDescriptor(EntityManagerFactory emf, Class<?> clazz) {
public static BatchCoreEntityTypeDescriptor<?, ?> createEntityTypeDescriptor(EntityManagerFactory emf, Class<?> clazz) {
SearchMapping mapping = Search.mapping( emf );
BatchMappingContext mappingContext = (BatchMappingContext) mapping;
HibernateOrmLoadingTypeContext<?> type = mappingContext.typeContextProvider()
.byEntityName().getOrFail( mapping.indexedEntity( clazz ).jpaName() );
return EntityTypeDescriptor.create( emf.unwrap( SessionFactoryImplementor.class ), type );
return BatchCoreEntityTypeDescriptor.create( emf.unwrap( SessionFactoryImplementor.class ), type );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import jakarta.persistence.EntityManagerFactory;

import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.EntityTypeDescriptor;
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.BatchCoreEntityTypeDescriptor;
import org.hibernate.search.mapper.orm.tenancy.spi.TenancyConfiguration;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingDefaultCleanOperation;

Expand All @@ -33,7 +33,7 @@ public class JobContextData {
* In Jakarta Batch standard, only string values can be propagated using job properties, but class types are frequently
* used too. So this map has string keys to facilitate lookup for values extracted from job properties.
*/
private Map<String, EntityTypeDescriptor<?, ?>> entityTypeDescriptorMap;
private Map<String, BatchCoreEntityTypeDescriptor<?, ?>> entityTypeDescriptorMap;

private TenancyConfiguration tenancyConfiguration;
private MassIndexingDefaultCleanOperation massIndexingDefaultCleanOperation;
Expand All @@ -50,8 +50,8 @@ public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) {
this.entityManagerFactory = entityManagerFactory;
}

public void setEntityTypeDescriptors(Collection<EntityTypeDescriptor<?, ?>> descriptors) {
for ( EntityTypeDescriptor<?, ?> descriptor : descriptors ) {
public void setEntityTypeDescriptors(Collection<BatchCoreEntityTypeDescriptor<?, ?>> descriptors) {
for ( BatchCoreEntityTypeDescriptor<?, ?> descriptor : descriptors ) {
entityTypeDescriptorMap.put( descriptor.jpaEntityName(), descriptor );
}
}
Expand All @@ -72,22 +72,22 @@ public void setMassIndexingDefaultCleanOperation(MassIndexingDefaultCleanOperati
this.massIndexingDefaultCleanOperation = massIndexingDefaultCleanOperation;
}

public EntityTypeDescriptor<?, ?> getEntityTypeDescriptor(String entityName) {
EntityTypeDescriptor<?, ?> descriptor = entityTypeDescriptorMap.get( entityName );
public BatchCoreEntityTypeDescriptor<?, ?> getEntityTypeDescriptor(String entityName) {
BatchCoreEntityTypeDescriptor<?, ?> descriptor = entityTypeDescriptorMap.get( entityName );
if ( descriptor == null ) {
String msg = String.format( Locale.ROOT, "entity type %s not found.", entityName );
throw new NoSuchElementException( msg );
}
return descriptor;
}

public List<EntityTypeDescriptor<?, ?>> getEntityTypeDescriptors() {
public List<BatchCoreEntityTypeDescriptor<?, ?>> getEntityTypeDescriptors() {
return new ArrayList<>( entityTypeDescriptorMap.values() );
}

public List<Class<?>> getEntityTypes() {
return entityTypeDescriptorMap.values().stream()
.map( EntityTypeDescriptor::javaClass )
.map( BatchCoreEntityTypeDescriptor::javaClass )
.collect( Collectors.toList() );
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.search.jakarta.batch.core.massindexing.loading.impl;

import java.util.List;

import jakarta.persistence.LockModeType;

import org.hibernate.Session;
import org.hibernate.query.Query;
import org.hibernate.query.QueryFlushMode;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntityLoader;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntityLoadingOptions;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntitySink;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchLoadingTypeContext;

public class BatchCoreDefaultHibernateOrmBatchEntityLoader<E> implements HibernateOrmBatchEntityLoader {
private static final String ID_PARAMETER_NAME = "ids";

private final HibernateOrmBatchEntitySink<E> sink;
private final Query<E> query;

public BatchCoreDefaultHibernateOrmBatchEntityLoader(HibernateOrmBatchLoadingTypeContext<E> typeContext,
HibernateOrmBatchEntitySink<E> sink, HibernateOrmBatchEntityLoadingOptions options) {
this.sink = sink;

StringBuilder query = new StringBuilder();
query.append( "select e from " )
.append( typeContext.jpaEntityName() )
.append( " e where e." )
.append( typeContext.uniquePropertyName() )
.append( " in(:" )
.append( ID_PARAMETER_NAME )
.append( ")" );

this.query = options.context( Session.class ).createQuery( query.toString(), typeContext.javaClass() )
.setReadOnly( true )
.setCacheable( false )
.setLockMode( LockModeType.NONE )
.setCacheMode( options.cacheMode() )
.setQueryFlushMode( QueryFlushMode.NO_FLUSH )
.setFetchSize( options.batchSize() );
}

@Override
public void close() {
}

@Override
public void load(List<Object> identifiers) {
sink.accept( query.setParameter( ID_PARAMETER_NAME, identifiers ).list() );
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.search.jakarta.batch.core.massindexing.loading.impl;

import java.util.HashSet;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;

import jakarta.persistence.LockModeType;

import org.hibernate.ScrollMode;
import org.hibernate.ScrollableResults;
import org.hibernate.StatelessSession;
import org.hibernate.query.Query;
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.IdOrder;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchIdentifierLoader;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchIdentifierLoadingOptions;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchLoadingTypeContext;
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchReindexCondition;
import org.hibernate.search.util.common.AssertionFailure;
import org.hibernate.search.util.common.impl.Closer;

public class BatchCoreDefaultHibernateOrmBatchIdentifierLoader<E> implements HibernateOrmBatchIdentifierLoader {

private final StatelessSession session;
private final String ormEntityName;
private final String uniquePropertyName;
private final IdOrder idOrder;
private final HibernateOrmBatchIdentifierLoadingOptions options;
private final IdLoader idLoader;

public BatchCoreDefaultHibernateOrmBatchIdentifierLoader(HibernateOrmBatchLoadingTypeContext<E> typeContext,
HibernateOrmBatchIdentifierLoadingOptions options, IdOrder idOrder) {
this.session = options.context( StatelessSession.class );
this.ormEntityName = typeContext.jpaEntityName();
this.uniquePropertyName = typeContext.uniquePropertyName();
this.idOrder = idOrder;
this.options = options;
this.idLoader = options.maxResults().orElse( -1 ) == 1 ? new QuerySingleIdLoader() : new ScrollIdLoader();
}

@Override
public void close() {
try ( Closer<RuntimeException> closer = new Closer<>() ) {
if ( idLoader != null ) {
closer.push( IdLoader::close, idLoader );
}
}
}

@Override
public OptionalLong totalCount() {
StringBuilder query = new StringBuilder();
query.append( "select count(e) from " )
.append( ormEntityName )
.append( " e " );

return OptionalLong.of( createQuery( session, query,
options.reindexOnlyCondition().map( Set::of ).orElseGet( Set::of ), Long.class, Optional.empty() )
.uniqueResult() );
}

@Override
public Object next() {
return idLoader.next();
}

@Override
public boolean hasNext() {
return idLoader.hasNext();
}

private Query<Object> createQueryLoading(StatelessSession session) {
StringBuilder query = new StringBuilder();
query.append( "select e." )
.append( uniquePropertyName )
.append( " from " )
.append( ormEntityName )
.append( " e " );
Set<HibernateOrmBatchReindexCondition> conditions = new HashSet<>();
options.reindexOnlyCondition().ifPresent( conditions::add );
options.lowerBound().ifPresent( b -> conditions
.add( idOrder.idGreater( "HIBERNATE_SEARCH_ID_LOWER_BOUND_", b, options.lowerBoundInclusive() ) ) );
options.upperBound().ifPresent( b -> conditions
.add( idOrder.idLesser( "HIBERNATE_SEARCH_ID_UPPER_BOUND_", b, options.upperBoundInclusive() ) ) );

Query<Object> select = createQuery( session, query, conditions, Object.class, Optional.of( idOrder.ascOrder() ) )
.setFetchSize( options.fetchSize() )
.setReadOnly( true )
.setCacheable( false )
.setLockMode( LockModeType.NONE );
options.offset().ifPresent( select::setFirstResult );
options.maxResults().ifPresent( select::setMaxResults );
return select;
}

private <T> Query<T> createQuery(StatelessSession session,
StringBuilder hql, Set<HibernateOrmBatchReindexCondition> conditions, Class<T> returnedType,
Optional<String> order) {
if ( !conditions.isEmpty() ) {
hql.append( " where " );
hql.append( conditions.stream()
.map( c -> "( " + c.conditionString() + " )" )
.collect( Collectors.joining( " AND ", " ", " " ) )
);
}
order.ifPresent( o -> hql.append( " ORDER BY " ).append( o ) );
Query<T> query = session.createQuery( hql.toString(), returnedType )
.setCacheable( false );

for ( var condition : conditions ) {
for ( var entry : condition.params().entrySet() ) {
query.setParameter( entry.getKey(), entry.getValue() );
}
}

return query;
}

private interface IdLoader {
Object next();

boolean hasNext();

void close();
}

private class QuerySingleIdLoader implements IdLoader {

private boolean hasNextCalled = false;
private boolean nextCalled = false;

private Query<Object> id = createQueryLoading( session );
private Object currentId;

@Override
public Object next() {
if ( hasNextCalled ) {
nextCalled = true;
hasNextCalled = false;
return currentId;
}
else {
throw new AssertionFailure( "Cannot call next() before calling hasNext()" );
}
}

@Override
public boolean hasNext() {
if ( nextCalled ) {
// we expect to have just a single ID, so if we called next and got the id we don't need to execute the query anymore:
return false;
}
currentId = id.getSingleResultOrNull();
hasNextCalled = true;
return currentId != null;
}

@Override
public void close() {
id = null;
}
}

private class ScrollIdLoader implements IdLoader {
private ScrollableResults<Object> id = createQueryLoading( session ).scroll( ScrollMode.FORWARD_ONLY );

@Override
public Object next() {
return id.get();
}

@Override
public boolean hasNext() {
return id.next();
}

@Override
public void close() {
try ( Closer<RuntimeException> closer = new Closer<>() ) {
if ( id != null ) {
closer.push( ScrollableResults::close, id );
id = null;
}
}
}
}

}
Loading

0 comments on commit 6b3dca3

Please sign in to comment.