Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip/experimental(elasticsearch): Use secondary index for Item and ImageObject #1569

Draft
wants to merge 5 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions rest/src/main/groovy/whelk/rest/api/SearchUtils.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class SearchUtils {
String addStats = getReservedQueryParameter('_stats', queryParameters)
String suggest = getReservedQueryParameter('_suggest', queryParameters)
String spell = getReservedQueryParameter('_spell', queryParameters)
String searchMainOnly = getReservedQueryParameter('_searchMainOnly', queryParameters)

if (queryParameters['p'] && !object) {
throw new InvalidQueryException("Parameter 'p' can only be used together with 'o'")
Expand All @@ -89,6 +90,7 @@ class SearchUtils {
'_stats' : addStats,
'_suggest' : suggest,
'_spell': spell,
'_searchMainOnly': searchMainOnly,
]

Map results = queryElasticSearch(
Expand Down Expand Up @@ -125,6 +127,7 @@ class SearchUtils {
String addStats = pageParams['_stats']
String suggest = pageParams['_suggest']
String spell = pageParams['_spell']
String searchMainOnly = pageParams['_searchMainOnly']
lens = lens ?: 'cards'

log.debug("Querying ElasticSearch")
Expand All @@ -136,7 +139,7 @@ class SearchUtils {
// TODO Only manipulate `_limit` in one place
queryParameters['_limit'] = [limit.toString()]

Map esResult = esQuery.doQuery(queryParameters, suggest, spell)
Map esResult = esQuery.doQuery(queryParameters, suggest, spell, searchMainOnly)
Lookup lookup = new Lookup()

List<Map> mappings = []
Expand Down Expand Up @@ -803,7 +806,7 @@ class SearchUtils {
* Return a list of reserved query params
*/
private List getReservedParameters() {
return ['q', 'p', 'o', 'value', '_limit', '_offset', '_suggest', '_spell']
return ['q', 'p', 'o', 'value', '_limit', '_offset', '_suggest', '_spell', '_searchMainOnly']
}

/*
Expand Down
2 changes: 1 addition & 1 deletion rest/src/main/groovy/whelk/rest/api/SearchUtils2.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Map<String, Object> doSearch(Map<String, String[]> queryParameters) throws Inval

Map<String, Object> esQueryDsl = getEsQueryDsl(qTree, queryParams, appParams.statsRepr);

QueryResult queryRes = new QueryResult(queryUtil.query(esQueryDsl), queryParams.debug);
QueryResult queryRes = new QueryResult(queryUtil.query(esQueryDsl, queryParams.searchMainOnly), queryParams.debug);

Map<String, Object> partialCollectionView = getPartialCollectionView(queryRes, qTree, queryParams, appParams);

Expand Down
1 change: 1 addition & 0 deletions secret.properties.in
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ sqlMaxPoolSize = 4

elasticHost = localhost:9200
elasticIndex = libris_local
elasticIndexSecondary = libris_local_secondary
elasticUser = elastic
elasticPassword = elastic

Expand Down
6 changes: 4 additions & 2 deletions whelk-core/src/main/groovy/whelk/Whelk.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ class Whelk {
removedLinks.each { link ->
String id = storage.getSystemIdByIri(link.iri)
if (id) {
elastic.decrementReverseLinks(id, link.relation)
Document doc = storage.load(id)
elastic.decrementReverseLinks(id, link.relation, elastic.getIndexForDoc(doc))
}
}

Expand All @@ -377,7 +378,7 @@ class Whelk {
reindexAffectedReverseIntegral(doc)
} else {
// just update link counter
elastic.incrementReverseLinks(id, link.relation)
elastic.incrementReverseLinks(id, link.relation, elastic.getIndexForDoc(doc))
}
}
}
Expand Down Expand Up @@ -561,6 +562,7 @@ class Whelk {
assertNoDependers(doc)
}
storage.remove(id, changedIn, changedBy)
// TODO use getThingType here?
indexAsyncOrSync {
elastic.remove(id)
if (features.isEnabled(INDEX_BLANK_WORKS)) {
Expand Down
55 changes: 37 additions & 18 deletions whelk-core/src/main/groovy/whelk/component/ElasticSearch.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class ElasticSearch {
public int maxTermsCount = 65536 // Elasticsearch default (fallback value)

String defaultIndex = null
String defaultIndexSecondary = null
private List<String> elasticHosts
private String elasticUser
private String elasticPassword
Expand All @@ -56,14 +57,16 @@ class ElasticSearch {
this(
props.getProperty("elasticHost"),
props.getProperty("elasticIndex"),
props.getProperty("elasticIndexSecondary"),
props.getProperty("elasticUser"),
props.getProperty("elasticPassword")
)
}

ElasticSearch(String elasticHost, String elasticIndex, String elasticUser, String elasticPassword) {
ElasticSearch(String elasticHost, String elasticIndex, String elasticIndexSecondary, String elasticUser, String elasticPassword) {
this.elasticHosts = getElasticHosts(elasticHost)
this.defaultIndex = elasticIndex
this.defaultIndexSecondary = elasticIndexSecondary
this.elasticUser = elasticUser
this.elasticPassword = elasticPassword

Expand Down Expand Up @@ -126,17 +129,23 @@ class ElasticSearch {
}

String getIndexName() { defaultIndex }
String getSecondaryIndexName() { defaultIndexSecondary }

/**
* Get ES mappings for associated index
*
*/
Map getMappings() {
Map getMappings(String index = null) {
String indexToCheck = indexName
if (index) {
indexToCheck = index
}

Map response
try {
response = mapper.readValue(client.performRequest('GET', "/${indexName}/_mappings", ''), Map)
response = mapper.readValue(client.performRequest('GET', "/${indexToCheck}/_mappings", ''), Map)
} catch (UnexpectedHttpStatusException e) {
log.warn("Got unexpected status code ${e.statusCode} when getting ES mappings: ${e.message}", e)
log.warn("Got unexpected status code ${e.statusCode} when getting ES mappings for ${indexToCheck}: ${e.message}", e)
return [:]
}

Expand All @@ -147,7 +156,7 @@ class ElasticSearch {
if (keys.size() == 1 && response[(keys[0])].containsKey('mappings')) {
return response[(keys[0])]['mappings']
} else {
log.warn("Couldn't get mappings from ES index ${indexName}, response was ${response}.")
log.warn("Couldn't get mappings from ES index ${indexToCheck}, response was ${response}.")
return [:]
}
}
Expand Down Expand Up @@ -279,8 +288,15 @@ class ElasticSearch {
}
}

String getIndexForDoc(Document doc) {
if (doc.getThingType() in ["Item", "ImageObject"]) {
return secondaryIndexName
}
return indexName
}

String createActionRow(Document doc) {
def action = ["index" : [ "_index" : indexName,
def action = ["index" : [ "_index" : getIndexForDoc(doc),
"_id" : toElasticId(doc.getShortId()) ]]
return mapper.writeValueAsString(action)
}
Expand All @@ -291,11 +307,11 @@ class ElasticSearch {
try {
String response = client.performRequest(
'PUT',
"/${indexName}/_doc/${toElasticId(doc.getShortId())}",
"/${getIndexForDoc(doc)}/_doc/${toElasticId(doc.getShortId())}",
getShapeForIndex(doc, whelk))
if (log.isDebugEnabled()) {
Map responseMap = mapper.readValue(response, Map)
log.debug("Indexed the document ${doc.getShortId()} as ${indexName}/_doc/${responseMap['_id']} as version ${responseMap['_version']}")
log.debug("Indexed the document ${doc.getShortId()} as ${getIndexForDoc(doc)}/_doc/${responseMap['_id']} as version ${responseMap['_version']}")
}
} catch (Exception e) {
if (!isBadRequest(e)) {
Expand All @@ -308,15 +324,15 @@ class ElasticSearch {
}
}

void incrementReverseLinks(String shortId, String relation) {
updateReverseLinkCounter(shortId, relation, 1)
void incrementReverseLinks(String shortId, String relation, String indexToUse) {
updateReverseLinkCounter(shortId, relation, indexToUse, 1)
}

void decrementReverseLinks(String shortId, String relation) {
updateReverseLinkCounter(shortId, relation, -1)
void decrementReverseLinks(String shortId, String relation, String indexToUse) {
updateReverseLinkCounter(shortId, relation, indexToUse, -1)
}

private void updateReverseLinkCounter(String shortId, String relation, int deltaCount) {
private void updateReverseLinkCounter(String shortId, String relation, String indexToUse, int deltaCount) {
// An indexed document will always have reverseLinks.totalItems set to an integer,
// and reverseLinks.totalItemsByRelation set to a map, but reverseLinks.totalItemsByRelation['foo']
// doesn't necessarily exist at this time; hence the null check before trying to update the link counter.
Expand All @@ -334,7 +350,7 @@ class ElasticSearch {
try {
client.performRequest(
'POST',
"/${indexName}/_update/${toElasticId(shortId)}",
"/${indexToUse}/_update/${toElasticId(shortId)}",
body)
}
catch (Exception e) {
Expand All @@ -348,7 +364,7 @@ class ElasticSearch {
}
else {
log.warn("Failed to update reverse link counter ($deltaCount) for $shortId: $e, placing in retry queue.", e)
indexingRetryQueue.add({ -> updateReverseLinkCounter(shortId, relation, deltaCount) })
indexingRetryQueue.add({ -> updateReverseLinkCounter(shortId, relation, indexToUse, deltaCount) })
}
}
}
Expand All @@ -360,7 +376,7 @@ class ElasticSearch {
def dsl = ["query":["term":["_id":toElasticId(identifier)]]]
try {
def response = client.performRequest('POST',
"/${indexName}/_delete_by_query",
"/${indexName},${secondaryIndexName}/_delete_by_query",
JsonOutput.toJson(dsl))

Map responseMap = mapper.readValue(response, Map)
Expand Down Expand Up @@ -584,8 +600,11 @@ class ElasticSearch {
isnis.findAll{ it.size() == 16 }.collect { Unicode.formatIsni(it) }
}

Map query(Map jsonDsl) {
return performQuery(jsonDsl, getQueryUrl())
Map query(Map jsonDsl, boolean searchMainOnly = false) {
if (searchMainOnly) {
return performQuery(jsonDsl, getQueryUrl())
}
return performQuery(jsonDsl, getQueryUrl([], "${indexName},${secondaryIndexName}"))
}

Map queryIds(Map jsonDsl) {
Expand Down
26 changes: 16 additions & 10 deletions whelk-core/src/main/groovy/whelk/search/ESQuery.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ESQuery {

private static final int DEFAULT_PAGE_SIZE = 50
private static final List RESERVED_PARAMS = [
'q', 'o', '_limit', '_offset', '_sort', '_statsrepr', '_site_base_uri', '_debug', '_boost', '_lens', '_stats', '_suggest', '_site', '_spell'
'q', 'o', '_limit', '_offset', '_sort', '_statsrepr', '_site_base_uri', '_debug', '_boost', '_lens', '_stats', '_suggest', '_site', '_spell', '_searchMainOnly'
]
public static final String AND_PREFIX = 'and-'
public static final String AND_MATCHES_PREFIX = 'and-matches-'
Expand Down Expand Up @@ -86,11 +86,12 @@ class ESQuery {
void initFieldMappings(Whelk whelk) {
if (whelk.elastic) {
Map mappings = whelk.elastic.getMappings()
this.keywordFields = getKeywordFields(mappings)
this.dateFields = getFieldsOfType('date', mappings)
this.nestedFields = getFieldsOfType('nested', mappings)
this.nestedNotInParentFields = nestedFields - getFieldsWithSetting('include_in_parent', true, mappings)
this.numericExtractorFields = getFieldsWithAnalyzer('numeric_extractor', mappings)
Map mappingsSecondary = whelk.elastic.getMappings(whelk.elastic.secondaryIndexName)
this.keywordFields = getKeywordFields(mappings) + getKeywordFields(mappingsSecondary)
this.dateFields = getFieldsOfType('date', mappings) + getFieldsOfType('date', mappingsSecondary)
this.nestedFields = getFieldsOfType('nested', mappings) + getFieldsOfType('nested', mappingsSecondary)
this.nestedNotInParentFields = nestedFields - (getFieldsWithSetting('include_in_parent', true, mappings) + getFieldsWithSetting('include_in_parent', true, mappingsSecondary))
this.numericExtractorFields = getFieldsWithAnalyzer('numeric_extractor', mappings) + getFieldsWithAnalyzer('numeric_extractor', mappingsSecondary)

if (DocumentUtil.getAtPath(mappings, ['properties', '_sortKeyByLang', 'properties', 'sv', 'fields', 'trigram'], null)) {
ENABLE_SPELL_CHECK = true
Expand All @@ -109,9 +110,14 @@ class ESQuery {
}

@CompileStatic(TypeCheckingMode.SKIP)
Map doQuery(Map<String, String[]> queryParameters, String suggest = null, String spell = null) {
Map doQuery(Map<String, String[]> queryParameters, String suggest = null, String spell = null, String searchMainOnly = null) {
Map esQuery = getESQuery(queryParameters, suggest, spell)
Map esResponse = whelk.elastic.query(esQuery)
Map esResponse
if (searchMainOnly == 'true') {
esResponse = whelk.elastic.query(esQuery, true)
} else {
esResponse = whelk.elastic.query(esQuery)
}
return collectQueryResults(esResponse, esQuery, queryParameters, { def d = it."_source"; d."_id" = it."_id"; return d })
}

Expand Down Expand Up @@ -1078,11 +1084,11 @@ class ESQuery {
}
}

static Set getFieldsOfType(String type, Map mappings) {
static Set<String> getFieldsOfType(String type, Map mappings) {
getFieldsWithSetting('type', type, mappings)
}

static Set getFieldsWithAnalyzer(String analyzer, Map mappings) {
static Set<String> getFieldsWithAnalyzer(String analyzer, Map mappings) {
getFieldsWithSetting('analyzer', analyzer, mappings)
}

Expand Down
13 changes: 11 additions & 2 deletions whelk-core/src/main/groovy/whelk/search2/EsMappings.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,22 @@ public class EsMappings {

private final boolean isSpellCheckAvailable;

public EsMappings(Map<?, ?> mappings) {
public EsMappings(Map<?, ?> mappings, Map<?, ?> mappingsSecondary) {
this.keywordFields = getKeywordFields(mappings);
this.dateFields = getFieldsOfType("date", mappings);
this.nestedFields = getFieldsOfType("nested", mappings);
this.numericExtractorFields = getFieldsWithAnalyzer("numeric_extractor", mappings);

if (!mappingsSecondary.isEmpty()) {
this.keywordFields.addAll(getKeywordFields(mappingsSecondary));
this.dateFields.addAll(getFieldsOfType("date", mappingsSecondary));
this.nestedFields.addAll(getFieldsOfType("nested", mappingsSecondary));
this.numericExtractorFields.addAll(getFieldsWithAnalyzer("numeric_extractor", mappingsSecondary));
}

this.nestedNotInParentFields = new HashSet<>(nestedFields);
this.nestedNotInParentFields.removeAll(getFieldsWithSetting("include_in_parent", true, mappings));
this.numericExtractorFields = getFieldsWithAnalyzer("numeric_extractor", mappings);
this.nestedNotInParentFields.removeAll(getFieldsWithSetting("include_in_parent", true, mappingsSecondary));

// TODO: temporary feature flag, to be removed
// this feature only works after a full reindex has been done, so we have to detect that
Expand Down
6 changes: 6 additions & 0 deletions whelk-core/src/main/groovy/whelk/search2/QueryParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public static class ApiParams {
public static final String APP_CONFIG = "_appConfig";
public static final String BOOST = "_boost";
public static final String STATS = "_stats";
public static final String SEARCH_MAIN_ONLY = "_searchMainOnly";
}

public static class Debug {
Expand All @@ -52,6 +53,7 @@ public static class Debug {
public final String i;

public final boolean skipStats;
public final boolean searchMainOnly;

public QueryParams(Map<String, String[]> apiParameters) throws InvalidQueryException {
this.sortBy = Sort.fromString(getOptionalSingleNonEmpty(ApiParams.SORT, apiParameters).orElse(""));
Expand All @@ -67,6 +69,7 @@ public QueryParams(Map<String, String[]> apiParameters) throws InvalidQueryExcep
this.q = getOptionalSingle(ApiParams.QUERY, apiParameters).orElse("");
this.i = getOptionalSingle(ApiParams.SIMPLE_FREETEXT, apiParameters).orElse("");
this.skipStats = getOptionalSingle(ApiParams.STATS, apiParameters).map("false"::equalsIgnoreCase).isPresent();
this.searchMainOnly = getOptionalSingle(ApiParams.SEARCH_MAIN_ONLY, apiParameters).map("true"::equalsIgnoreCase).isPresent();
}

public Map<String, String> getNonQueryParams() {
Expand Down Expand Up @@ -105,6 +108,9 @@ public Map<String, String> getNonQueryParams(int offset) {
if (skipStats) {
params.put(ApiParams.STATS, "false");
}
if (searchMainOnly) {
params.put(ApiParams.SEARCH_MAIN_ONLY, "true");
}
return params;
}

Expand Down
10 changes: 7 additions & 3 deletions whelk-core/src/main/groovy/whelk/search2/QueryUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@ public class QueryUtil {

public QueryUtil(Whelk whelk) {
this.whelk = whelk;
this.esMappings = new EsMappings(whelk.elastic != null ? whelk.elastic.getMappings() : Collections.emptyMap());
if (whelk.elastic != null) {
this.esMappings = new EsMappings(whelk.elastic.getMappings(), whelk.elastic.getMappings(whelk.elastic.getSecondaryIndexName()));
} else {
this.esMappings = new EsMappings(Collections.emptyMap(), Collections.emptyMap());
}
this.esBoost = new EsBoost(whelk.getJsonld());
}

public Map<?, ?> query(Map<String, Object> queryDsl) {
return whelk.elastic.query(queryDsl);
public Map<?, ?> query(Map<String, Object> queryDsl, boolean searchMainOnly) {
return whelk.elastic.query(queryDsl, searchMainOnly);
}

public boolean esIsConfigured() {
Expand Down
2 changes: 1 addition & 1 deletion whelk-core/src/main/groovy/whelk/search2/Stats.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ private List<Map<String, Object>> getCuratedPredicateLinks() {
if (curatedPredicates.isEmpty()) {
return Collections.emptyList();
}
QueryResult queryRes = new QueryResult(queryUtil.query(getCuratedPredicateEsQueryDsl(o, curatedPredicates)));
QueryResult queryRes = new QueryResult(queryUtil.query(getCuratedPredicateEsQueryDsl(o, curatedPredicates), queryParams.searchMainOnly));
return predicateLinks(queryRes.pAggs, o, queryParams.getNonQueryParams(0));
}

Expand Down