Skip to content

Commit

Permalink
Global score modifiers for Hybrid search - RRF (#1082)
Browse files Browse the repository at this point in the history
Co-authored-by: Farshid Zavareh <[email protected]>
  • Loading branch information
vicilliar and farshidz authored Jan 14, 2025
1 parent 41fc52b commit 096781b
Show file tree
Hide file tree
Showing 41 changed files with 1,816 additions and 217 deletions.
4 changes: 4 additions & 0 deletions src/marqo/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
MARQO_STRUCTURED_HYBRID_SEARCH_MINIMUM_VERSION = semver.VersionInfo.parse('2.10.0')
MARQO_UNSTRUCTURED_HYBRID_SEARCH_MINIMUM_VERSION = semver.VersionInfo.parse('2.11.0')
MARQO_CUSTOM_VECTOR_NORMALIZATION_MINIMUM_VERSION = semver.VersionInfo.parse('2.13.0')
MARQO_GLOBAL_SCORE_MODIFIERS_MINIMUM_VERSION = semver.VersionInfo.parse('2.15.0')

# For score modifiers
QUERY_INPUT_SCORE_MODIFIERS_MULT_WEIGHTS_2_9 = 'marqo__mult_weights'
Expand All @@ -25,3 +26,6 @@
QUERY_INPUT_SCORE_MODIFIERS_ADD_WEIGHTS_LEXICAL = 'marqo__add_weights_lexical'
QUERY_INPUT_SCORE_MODIFIERS_MULT_WEIGHTS_TENSOR = 'marqo__mult_weights_tensor'
QUERY_INPUT_SCORE_MODIFIERS_ADD_WEIGHTS_TENSOR = 'marqo__add_weights_tensor'
QUERY_INPUT_SCORE_MODIFIERS_MULT_WEIGHTS_GLOBAL = 'marqo__mult_weights_global'
QUERY_INPUT_SCORE_MODIFIERS_ADD_WEIGHTS_GLOBAL = 'marqo__add_weights_global'
MARQO_GLOBAL_SCORE_MODIFIERS = 'global'
7 changes: 5 additions & 2 deletions src/marqo/core/index_management/index_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ def bootstrap_vespa(self) -> bool:

# Only retrieving existing index when the vespa app is not configured and the index settings schema exists
existing_indexes = self._get_existing_indexes() if not vespa_app.is_configured and \
vespa_app.has_schema(self._MARQO_SETTINGS_SCHEMA_NAME) else None
vespa_app.has_schema(
self._MARQO_SETTINGS_SCHEMA_NAME) else None

vespa_app.bootstrap(to_version, existing_indexes)

Expand Down Expand Up @@ -169,7 +170,9 @@ def batch_create_indexes(self, marqo_index_requests: List[MarqoIndexRequest]) ->
if request.model.text_chunk_prefix is None:
request.model.text_chunk_prefix = request.model.get_default_text_chunk_prefix()

index_to_create.append(vespa_schema_factory(request).generate_schema())
schema, marqo_index = vespa_schema_factory(request).generate_schema()
index_to_create.append((schema, marqo_index))
logger.debug(f'Creating index {str(request.name)} with schema:\n{schema}')

with self._vespa_deployment_lock():
self._get_vespa_application().batch_add_index_setting_and_schema(index_to_create)
Expand Down
16 changes: 10 additions & 6 deletions src/marqo/core/models/marqo_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from marqo.base_model import StrictBaseModel
from marqo.core.models.score_modifier import ScoreModifier
from marqo.core.search.search_filter import SearchFilter, MarqoFilterStringParser
from marqo.core.models.hybrid_parameters import HybridParameters
from marqo.core.models.hybrid_parameters import RankingMethod, HybridParameters


class MarqoQuery(StrictBaseModel, ABC):
Expand All @@ -17,6 +17,7 @@ class Config(StrictBaseModel.Config):
index_name: str
limit: int
offset: Optional[int] = None
rerank_count: Optional[int] = None
searchable_attributes: Optional[List[str]] = None
attributes_to_retrieve: Optional[List[str]] = None
filter: Optional[SearchFilter] = None
Expand Down Expand Up @@ -61,13 +62,16 @@ class MarqoHybridQuery(MarqoTensorQuery, MarqoLexicalQuery):
# Core module will use these fields instead of the score_modifiers_lexical and score_modifiers_tensor inside the HybridParameters
score_modifiers_lexical: Optional[List[ScoreModifier]] = None
score_modifiers_tensor: Optional[List[ScoreModifier]] = None

@root_validator(pre=True)
def validate_searchable_attributes_and_score_modifiers(cls, values):
# score_modifiers cannot defined for hybrid search
if values.get("score_modifiers") is not None:
raise ValueError("'scoreModifiers' cannot be used for hybrid search. Instead, define the "
"'scoreModifiersTensor' and/or 'scoreModifiersLexical' keys inside the "
"'hybridParameters' dict parameter.")
# score_modifiers can only be set for hybrid search - RRF
hybrid_parameters = values.get("hybrid_parameters")
if values.get("score_modifiers") is not None and hybrid_parameters.rankingMethod != RankingMethod.RRF:
raise ValueError(f"'scoreModifiers' is only supported for hybrid search if 'rankingMethod' is 'RRF'. "
f"For your 'rankingMethod': {hybrid_parameters.rankingMethod}, define the "
f"'scoreModifiersTensor' and/or 'scoreModifiersLexical' keys inside the "
f"'hybridParameters' dict parameter.")

# searchable_attributes cannot be defined for hybrid search
if values.get("searchable_attributes") is not None:
Expand Down
14 changes: 12 additions & 2 deletions src/marqo/core/search/hybrid_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
class HybridSearch:
def search(
self, config: Config, index_name: str, query: Optional[Union[str, CustomVectorQuery]],
result_count: int = 5,
offset: int = 0, ef_search: Optional[int] = None, approximate: bool = True,
result_count: int = 5, offset: int = 0, rerank_count: Optional[int] = None,
ef_search: Optional[int] = None, approximate: bool = True,
searchable_attributes: Iterable[str] = None, filter_string: str = None, device: str = None,
attributes_to_retrieve: Optional[List[str]] = None, boost: Optional[Dict] = None,
media_download_headers: Optional[Dict] = None, context: Optional[SearchContext] = None,
Expand Down Expand Up @@ -102,6 +102,15 @@ def search(
f"This index was created with Marqo {marqo_index_version}."
)

if score_modifiers is not None \
and marqo_index_version < constants.MARQO_GLOBAL_SCORE_MODIFIERS_MINIMUM_VERSION:
raise core_exceptions.UnsupportedFeatureError(
f"Hybrid search with global score modifiers is only supported for Marqo indexes created with Marqo "
f"{str(constants.MARQO_GLOBAL_SCORE_MODIFIERS_MINIMUM_VERSION)} or later. "
f"This index was created with Marqo {marqo_index_version}."
)


# Use default hybrid settings if not provided
if hybrid_parameters is None:
hybrid_parameters = HybridParameters()
Expand Down Expand Up @@ -176,6 +185,7 @@ def search(
ef_search=ef_search,
approximate=approximate,
offset=offset,
rerank_count=rerank_count,
or_phrases=optional_terms,
and_phrases=required_terms,
attributes_to_retrieve=attributes_to_retrieve,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,24 @@ schema {{ index.schema_name }} {
query(marqo__add_weights_lexical) tensor<double>(p{})
query(marqo__mult_weights_tensor) tensor<double>(p{})
query(marqo__add_weights_tensor) tensor<double>(p{})
query(marqo__mult_weights_global) tensor<double>(p{})
query(marqo__add_weights_global) tensor<double>(p{})
}

function mult_modifier(mult_weights) {
expression: if (count(mult_weights * attribute(marqo__score_modifiers)) == 0, 1, reduce(mult_weights * attribute(marqo__score_modifiers), prod))
}
function add_modifier(add_weights) {
expression: reduce(add_weights * attribute(marqo__score_modifiers), sum)
}
function modify(score, mult_weights, add_weights) {
expression: if (count(mult_weights * attribute(marqo__score_modifiers)) == 0, 1, reduce(mult_weights * attribute(marqo__score_modifiers), prod)) * score + reduce(add_weights * attribute(marqo__score_modifiers), sum)
expression: mult_modifier(mult_weights) * score + add_modifier(add_weights)
}
function global_mult_modifier() {
expression: mult_modifier(query(marqo__mult_weights_global))
}
function global_add_modifier() {
expression: add_modifier(query(marqo__add_weights_global))
}

{% macro max_score(score_macro, fields) -%}
Expand Down Expand Up @@ -171,6 +185,8 @@ schema {{ index.schema_name }} {
function embedding_score() {
expression: {{ max_score(embedding_score, index.tensor_fields) }}
}

match-features: global_mult_modifier global_add_modifier
}

{% if index.lexical_fields -%}
Expand All @@ -187,9 +203,14 @@ schema {{ index.schema_name }} {
expression: modify(embedding_score(), query(marqo__mult_weights_tensor), query(marqo__add_weights_tensor))
}
{% if index.tensor_fields -%}
match-features:
{%- for field in index.tensor_fields %} closest({{ field.embeddings_field_name }}){% endfor -%}
{%- for field in index.tensor_fields %} distance(field, {{ field.embeddings_field_name }}){% endfor %}
match-features inherits base_rank_profile {
{%- for field in index.tensor_fields %}
closest({{ field.embeddings_field_name }})
{%- endfor %}
{%- for field in index.tensor_fields %}
distance(field, {{ field.embeddings_field_name }})
{%- endfor %}
}
{%- endif %}
}

Expand All @@ -203,6 +224,8 @@ schema {{ index.schema_name }} {
query(marqo__add_weights_lexical) tensor<double>(p{})
query(marqo__mult_weights_tensor) tensor<double>(p{})
query(marqo__add_weights_tensor) tensor<double>(p{})
query(marqo__mult_weights_global) tensor<double>(p{})
query(marqo__add_weights_global) tensor<double>(p{})
}
}

Expand All @@ -213,9 +236,14 @@ schema {{ index.schema_name }} {
second-phase {
expression: modify(embedding_score(), query(marqo__mult_weights_tensor), query(marqo__add_weights_tensor))
}
match-features:
{%- for field in index.tensor_fields %} closest({{ field.embeddings_field_name }}){% endfor -%}
{%- for field in index.tensor_fields %} distance(field, {{ field.embeddings_field_name }}){% endfor -%}
match-features inherits base_rank_profile {
{%- for field in index.tensor_fields %}
closest({{ field.embeddings_field_name }})
{%- endfor %}
{%- for field in index.tensor_fields %}
distance(field, {{ field.embeddings_field_name }})
{%- endfor %}
}
}

rank-profile hybrid_embedding_similarity_then_bm25 inherits base_rank_profile {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ def _to_vespa_hybrid_query(self, marqo_query: MarqoHybridQuery) -> Dict[str, Any
query_inputs.update(hybrid_score_modifiers[constants.MARQO_SEARCH_METHOD_LEXICAL])
if hybrid_score_modifiers[constants.MARQO_SEARCH_METHOD_TENSOR]:
query_inputs.update(hybrid_score_modifiers[constants.MARQO_SEARCH_METHOD_TENSOR])
if hybrid_score_modifiers[constants.MARQO_GLOBAL_SCORE_MODIFIERS]:
query_inputs.update(hybrid_score_modifiers[constants.MARQO_GLOBAL_SCORE_MODIFIERS])

query = {
'searchChain': 'marqo',
Expand Down Expand Up @@ -579,10 +581,11 @@ def _to_vespa_hybrid_query(self, marqo_query: MarqoHybridQuery) -> Dict[str, Any

if marqo_query.hybrid_parameters.rankingMethod in {RankingMethod.RRF}: # TODO: Add NormalizeLinear
query["marqo__hybrid.alpha"] = marqo_query.hybrid_parameters.alpha

if marqo_query.hybrid_parameters.rankingMethod in {RankingMethod.RRF}:
query["marqo__hybrid.rrf_k"] = marqo_query.hybrid_parameters.rrfK

if marqo_query.rerank_count:
query["marqo__hybrid.rerankCountGlobal"] = marqo_query.rerank_count

return query

def _get_tensor_fields_to_search(
Expand Down
50 changes: 40 additions & 10 deletions src/marqo/core/structured_vespa_index/structured_vespa_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,21 +252,26 @@ def _generate_rank_profiles(self, marqo_index: StructuredMarqoIndex) -> List[str
for field in lexical_fields]) + ')')
bm25_max_expression = self._generate_max_bm25_expression(list(lexical_fields))
embedding_similarity_expression = self._generate_max_similarity_expression(tensor_fields)
score_modifier_expression = (

mult_modifier_expression = (
f'if (count(mult_weights * attribute({common.FIELD_SCORE_MODIFIERS_DOUBLE_LONG})) == 0, '
f' 1, reduce(mult_weights * attribute({common.FIELD_SCORE_MODIFIERS_DOUBLE_LONG}), prod)) '
f'* if (count(mult_weights * attribute({common.FIELD_SCORE_MODIFIERS_FLOAT})) == 0, '
f' 1, reduce(mult_weights * attribute({common.FIELD_SCORE_MODIFIERS_FLOAT}), prod)) '
f'* score '
f'+ reduce(add_weights * attribute({common.FIELD_SCORE_MODIFIERS_DOUBLE_LONG}), sum) '
) if score_modifier_fields_names else '1'
add_modifier_expression = (
f'reduce(add_weights * attribute({common.FIELD_SCORE_MODIFIERS_DOUBLE_LONG}), sum) '
f'+ reduce(add_weights * attribute({common.FIELD_SCORE_MODIFIERS_FLOAT}), sum)'
) if score_modifier_fields_names else '0'
score_modifier_expression = (
'mult_modifier(mult_weights) * score + add_modifier(add_weights)'
) if score_modifier_fields_names else 'score'

embedding_match_features_expression = \
'match-features: ' + \
' '.join([f'closest({field.embeddings_field_name})' for field in marqo_index.tensor_fields]) + \
' ' + \
' '.join([f'distance(field, {field.embeddings_field_name})' for field in marqo_index.tensor_fields])
embedding_match_features_expression = [f'match-features inherits {common.RANK_PROFILE_BASE} {{']
for field in marqo_index.tensor_fields:
embedding_match_features_expression.append(f'closest({field.embeddings_field_name})')
embedding_match_features_expression.append(f'distance(field, {field.embeddings_field_name})')
embedding_match_features_expression.append('}')

# Base rank profile
rank_profiles.append(f'rank-profile {common.RANK_PROFILE_BASE} inherits default {{')
Expand All @@ -282,13 +287,31 @@ def _generate_rank_profiles(self, marqo_index: StructuredMarqoIndex) -> List[str
rank_profiles.append(f'query({constants.QUERY_INPUT_SCORE_MODIFIERS_ADD_WEIGHTS_LEXICAL}) tensor<double>(p{{}})')
rank_profiles.append(f'query({constants.QUERY_INPUT_SCORE_MODIFIERS_MULT_WEIGHTS_TENSOR}) tensor<double>(p{{}})')
rank_profiles.append(f'query({constants.QUERY_INPUT_SCORE_MODIFIERS_ADD_WEIGHTS_TENSOR}) tensor<double>(p{{}})')
rank_profiles.append(f'query({constants.QUERY_INPUT_SCORE_MODIFIERS_MULT_WEIGHTS_GLOBAL}) tensor<double>(p{{}})')
rank_profiles.append(f'query({constants.QUERY_INPUT_SCORE_MODIFIERS_ADD_WEIGHTS_GLOBAL}) tensor<double>(p{{}})')

rank_profiles.append('}')

rank_profiles.append('function mult_modifier(mult_weights) {')
rank_profiles.append(f' expression: {mult_modifier_expression}')
rank_profiles.append('}')

rank_profiles.append('function add_modifier(add_weights) {')
rank_profiles.append(f' expression: {add_modifier_expression}')
rank_profiles.append('}')

rank_profiles.append('function modify(score, mult_weights, add_weights) {')
rank_profiles.append(f' expression: {score_modifier_expression}')
rank_profiles.append('}')

rank_profiles.append('function global_mult_modifier() {')
rank_profiles.append(f' expression: mult_modifier(query({constants.QUERY_INPUT_SCORE_MODIFIERS_MULT_WEIGHTS_GLOBAL}))')
rank_profiles.append('}')

rank_profiles.append('function global_add_modifier() {')
rank_profiles.append(f' expression: add_modifier(query({constants.QUERY_INPUT_SCORE_MODIFIERS_ADD_WEIGHTS_GLOBAL}))')
rank_profiles.append('}')

if lexical_fields:
rank_profiles.append('function lexical_score_sum() {')
rank_profiles.append(f'expression: {bm25_sum_expression}')
Expand All @@ -311,6 +334,9 @@ def _generate_rank_profiles(self, marqo_index: StructuredMarqoIndex) -> List[str
rank_profiles.append(f'expression: {embedding_similarity_expression}')
rank_profiles.append('}')

# Global add and mult modifiers should be accessible in searcher
rank_profiles.append('match-features: global_mult_modifier global_add_modifier')

rank_profiles.append('}')

if lexical_fields:
Expand All @@ -335,7 +361,7 @@ def _generate_rank_profiles(self, marqo_index: StructuredMarqoIndex) -> List[str
f'query({constants.QUERY_INPUT_SCORE_MODIFIERS_ADD_WEIGHTS_TENSOR})'
f')')
rank_profiles.append('}')
rank_profiles.append(embedding_match_features_expression)
rank_profiles.extend(embedding_match_features_expression) # This is a multi-line list, so we extend
rank_profiles.append('}')

# Hybrid search
Expand All @@ -359,6 +385,10 @@ def _generate_rank_profiles(self, marqo_index: StructuredMarqoIndex) -> List[str
f'query({constants.QUERY_INPUT_SCORE_MODIFIERS_MULT_WEIGHTS_TENSOR}) tensor<double>(p{{}})')
rank_profiles.append(
f'query({constants.QUERY_INPUT_SCORE_MODIFIERS_ADD_WEIGHTS_TENSOR}) tensor<double>(p{{}})')
rank_profiles.append(
f'query({constants.QUERY_INPUT_SCORE_MODIFIERS_MULT_WEIGHTS_GLOBAL}) tensor<double>(p{{}})')
rank_profiles.append(
f'query({constants.QUERY_INPUT_SCORE_MODIFIERS_ADD_WEIGHTS_GLOBAL}) tensor<double>(p{{}})')

rank_profiles.append('}')
rank_profiles.append('}')
Expand All @@ -384,7 +414,7 @@ def _generate_rank_profiles(self, marqo_index: StructuredMarqoIndex) -> List[str
f'query({constants.QUERY_INPUT_SCORE_MODIFIERS_ADD_WEIGHTS_TENSOR}))'
)
rank_profiles.append('}')
rank_profiles.append(embedding_match_features_expression)
rank_profiles.extend(embedding_match_features_expression) # This is a multi-line list, so we extend
rank_profiles.append('}')

# HYBRID SEARCH TENSOR THEN LEXICAL
Expand Down
22 changes: 19 additions & 3 deletions src/marqo/core/vespa_index/vespa_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def _get_hybrid_score_modifiers(self, hybrid_query: MarqoHybridQuery) -> \

"""
Specifically for hybrid queries.
Returns a dictionary with 2 keys: 'lexical' and 'tensor'.
Returns a dictionary with 3 keys: 'lexical', 'tensor', and 'global'.
Each key points to a dictionary containing the score modifiers for the respective field types.
Example:
Expand All @@ -174,13 +174,21 @@ def _get_hybrid_score_modifiers(self, hybrid_query: MarqoHybridQuery) -> \
'marqo__add_weights_tensor': {
'field7': 23, 'field8': 12
}
}
},
'global': {
'marqo__mult_weights_global': {
'field9': 0.5, 'field10': 0.4
},
'marqo__add_weights_global': {
'field11': 23, 'field12': 12
}
}
"""

result = {
constants.MARQO_SEARCH_METHOD_LEXICAL: None,
constants.MARQO_SEARCH_METHOD_TENSOR: None
constants.MARQO_SEARCH_METHOD_TENSOR: None,
constants.MARQO_GLOBAL_SCORE_MODIFIERS: None
}

if hybrid_query.score_modifiers_lexical:
Expand All @@ -197,6 +205,14 @@ def _get_hybrid_score_modifiers(self, hybrid_query: MarqoHybridQuery) -> \
constants.QUERY_INPUT_SCORE_MODIFIERS_ADD_WEIGHTS_TENSOR: add_tensor
}

# Treat root level score modifiers as global. Currently only supported for RRF.
if hybrid_query.score_modifiers:
mult_tensor, add_tensor = self._convert_score_modifiers_to_tensors(hybrid_query.score_modifiers)
result[constants.MARQO_GLOBAL_SCORE_MODIFIERS] = {
constants.QUERY_INPUT_SCORE_MODIFIERS_MULT_WEIGHTS_GLOBAL: mult_tensor,
constants.QUERY_INPUT_SCORE_MODIFIERS_ADD_WEIGHTS_GLOBAL: add_tensor
}

return result


Expand Down
Loading

0 comments on commit 096781b

Please sign in to comment.