Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

feat(score_fn): add score_fn as a new module #194

Merged
merged 8 commits into from
Sep 3, 2019
Merged
119 changes: 13 additions & 106 deletions gnes/indexer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,29 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import List, Any, Union, Callable, Tuple

import numpy as np

from ..base import TrainableBase, CompositionalTrainableBase
from ..proto import gnes_pb2, blob2array
from ..score_fn.base import get_unary_score, ModifierFn


class BaseIndexer(TrainableBase):
normalize_fn = ModifierFn()
score_fn = ModifierFn()

def add(self, keys: Any, docs: Any, weights: List[float], *args, **kwargs):
pass

def query(self, keys: Any, *args, **kwargs) -> List[Any]:
pass

def normalize_score(self, *args, **kwargs):
pass

def query_and_score(self, q_chunks: List[Union['gnes_pb2.Chunk', 'gnes_pb2.Document']], top_k: int) -> List[
'gnes_pb2.Response.QueryResponse.ScoredResult']:
raise NotImplementedError

def score(self, *args, **kwargs) -> 'gnes_pb2.Response.QueryResponse.ScoredResult.Score':
raise NotImplementedError


class BaseChunkIndexer(BaseIndexer):

Expand All @@ -59,13 +55,16 @@ def query_and_score(self, q_chunks: List['gnes_pb2.Chunk'], top_k: int, *args, *
r.chunk.doc_id = _doc_id
r.chunk.offset = _offset
r.chunk.weight = _weight
r.score.CopyFrom(self.score(q_chunk, r.chunk, _relevance))
_score = get_unary_score(value=_relevance, name=self.__class__.__name__)
_score = self.normalize_fn(_score)
_score = self.score_fn(_score, q_chunk, r.chunk)
r.score.CopyFrom(_score)
results.append(r)
return results

def score(self, q_chunk: 'gnes_pb2.Chunk', d_chunk: 'gnes_pb2.Chunk',
relevance) -> 'gnes_pb2.Response.QueryResponse.ScoredResult.Score':
return ChunkScorer.eq1(q_chunk, d_chunk, relevance)
# def score(self, q_chunk: 'gnes_pb2.Chunk', d_chunk: 'gnes_pb2.Chunk',
# relevance, relevance_cls) -> 'gnes_pb2.Response.QueryResponse.ScoredResult.Score':
# return ChunkScorer.eq1(q_chunk, d_chunk, relevance, relevance_cls)


class BaseDocIndexer(BaseIndexer):
Expand All @@ -84,14 +83,12 @@ def query_and_score(self, docs: List['gnes_pb2.Response.QueryResponse.ScoredResu
for d, r in zip(queried_results, docs):
if d:
r.doc.CopyFrom(d)
r.score.CopyFrom(self.score(d, r.score))
_score = self.normalize_fn(r.score)
_score = self.score_fn(_score, d)
r.score.CopyFrom(_score)
results.append(r)
return results

def score(self, d: 'gnes_pb2.Document', s: 'gnes_pb2.Response.QueryResponse.ScoredResult.Score', *args,
**kwargs) -> 'gnes_pb2.Response.QueryResponse.ScoredResult.Score':
return DocScorer.eq1(d, s)


class BaseKeyIndexer(BaseIndexer):

Expand All @@ -102,96 +99,6 @@ def query(self, keys: List[int], *args, **kwargs) -> List[Tuple[int, int, float]
pass


class ChunkScorer:

@staticmethod
def eq1(q_chunk: 'gnes_pb2.Chunk', d_chunk: 'gnes_pb2.Chunk',
relevance):
"""
score = d_chunk.weight * relevance * q_chunk.weight
"""
score = gnes_pb2.Response.QueryResponse.ScoredResult.Score()
score.value = d_chunk.weight * relevance * q_chunk.weight
score.explained = json.dumps({
'name': 'chunk-eq1',
'operand': [{'name': 'd_chunk_weight',
'value': float(d_chunk.weight),
'doc_id': d_chunk.doc_id,
'offset': d_chunk.offset},
{'name': 'q_chunk_weight',
'value': float(q_chunk.weight),
'offset': q_chunk.offset},
{'name': 'relevance',
'value': float(relevance)}],
'op': 'prod',
'value': float(score.value)
})
return score

@staticmethod
def eq2(q_chunk: 'gnes_pb2.Chunk', d_chunk: 'gnes_pb2.Chunk',
relevance):
"""
score = d_chunk.weight * relevance * offset_divergence * q_chunk.weight
offset_divergence is calculated based on doc_type:
TEXT && VIDEO && AUDIO: offset is 1-D
IMAGE: offset is 2-D
"""

def _cal_divergence(q_chunk: 'gnes_pb2.Chunk', d_chunk: 'gnes_pb2.Chunk'):
if q_chunk.offset_nd and d_chunk.offset_nd:
return 1 / (1 + np.sqrt((q_chunk.offset_nd[0] - d_chunk.offset_nd[0]) ** 2 +
(q_chunk.offset_nd[1] - d_chunk.offset_nd[1]) ** 2))
else:
return np.abs(q_chunk.offset - d_chunk.offset)

score = gnes_pb2.Response.QueryResponse.ScoredResult.Score()

divergence = _cal_divergence(q_chunk, d_chunk)
score.value = d_chunk.weight * relevance * divergence * q_chunk.weight
score.explained = json.dumps({
'name': 'chunk-eq2',
'operand': [{'name': 'd_chunk_weight',
'value': float(d_chunk.weight),
'doc_id': d_chunk.doc_id,
'offset': d_chunk.offset},
{'name': 'q_chunk_weight',
'value': float(q_chunk.weight),
'offset': q_chunk.offset},
{'name': 'relevance',
'value': float(relevance)},
{'name': 'offset_divergence',
'value': float(divergence)}],
'op': 'prod',
'value': float(score.value)
})
return score


class DocScorer:

@staticmethod
def eq1(d: 'gnes_pb2.Document',
s: 'gnes_pb2.Response.QueryResponse.ScoredResult.Score') -> 'gnes_pb2.Response.QueryResponse.ScoredResult.Score':
"""
score *= d.weight
:param d:
:param s:
:return:
"""
s.value *= d.weight
s.explained = json.dumps({
'name': 'doc-eq1',
'operand': [json.loads(s.explained),
{'name': 'doc_weight',
'value': float(d.weight),
'doc_id': d.doc_id}],
'op': 'prod',
'value': float(s.value)
})
return s


class JointIndexer(CompositionalTrainableBase):

@property
Expand Down
23 changes: 9 additions & 14 deletions gnes/indexer/chunk/annoy.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

from ..base import BaseChunkIndexer
from ..key_only import ListKeyIndexer
from ...score_fn.base import ScoreOps
from ...score_fn.normalize import Normalizer3, Normalizer2


class AnnoyIndexer(BaseChunkIndexer):
Expand All @@ -44,6 +46,13 @@ def post_init(self):
except:
self.logger.warning('fail to load model from %s, will create an empty one' % self.data_path)

if self.metric in {'angular', 'hamming'}:
self.normalize_fn = ScoreOps.reciprocal1p
elif self.metric == 'euclidean':
self.normalize_fn = Normalizer3(self.num_dim)
elif self.metric == 'manhattan':
self.normalize_fn = Normalizer2(self.num_dim)

def add(self, keys: List[Tuple[int, Any]], vectors: np.ndarray, weights: List[float], *args, **kwargs):
last_idx = self._key_info_indexer.size

Expand All @@ -65,24 +74,10 @@ def query(self, keys: 'np.ndarray', top_k: int, *args, **kwargs) -> List[List[Tu
res = []
for k in keys:
ret, relevance_score = self._index.get_nns_by_vector(k, top_k, include_distances=True)
relevance_score = self.normalize_score(relevance_score, self.metric)
chunk_info = self._key_info_indexer.query(ret)
res.append([(*r, s) for r, s in zip(chunk_info, relevance_score)])
return res

def normalize_score(self, score: List[float], metrics: str, *args, **kwargs) -> List[float]:
if metrics == 'angular':
return list(map(lambda x: 1 / (1 + x), score))
elif metrics == 'euclidean':
import math
return list(map(lambda x: 1 / (1 + math.sqrt(x) / self.num_dim), score))
elif metrics == 'manhattan':
return list(map(lambda x: 1 / (1 + x / self.num_dim), score))
elif metrics == 'hamming':
return list(map(lambda x: 1 / (1 + x), score))
elif metrics == 'dot':
raise NotImplementedError

@property
def size(self):
return self._index.get_n_items()
Expand Down
10 changes: 5 additions & 5 deletions gnes/indexer/chunk/bindexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from .cython import IndexCore
from ...base import BaseChunkIndexer
from ....score_fn.normalize import Normalizer4


class BIndexer(BaseChunkIndexer):
Expand Down Expand Up @@ -55,6 +56,8 @@ def post_init(self):
except (FileNotFoundError, IsADirectoryError):
self.logger.warning('fail to load model from %s, will create an empty one' % self.data_path)

self.normalize_fn = Normalizer4(self.num_bytes)

def add(self, keys: List[Tuple[int, Any]], vectors: np.ndarray, weights: List[float], *args,
**kwargs):
if len(vectors) != len(keys):
Expand Down Expand Up @@ -99,7 +102,7 @@ def query(self,
for (i, o, w, d, q) in zip(doc_ids, offsets, weights, dists, q_idx):
if d == 0:
continue
result[q].append((i, o, w / self._weight_norm, self.normalize_score(d)))
result[q].append((i, o, w / self._weight_norm, d))

# get the top-k
for q in range(num_rows):
Expand All @@ -108,12 +111,9 @@ def query(self,
doc_ids, offsets, weights, dists, q_idx = self.bindexer.force_search(
keys, num_rows, top_k)
for (i, o, w, d, q) in zip(doc_ids, offsets, weights, dists, q_idx):
result[q].append((i, o, w / self._weight_norm, self.normalize_score(d)))
result[q].append((i, o, w / self._weight_norm, d))
return result

def normalize_score(self, distance: int, *args, **kwargs) -> float:
return 1. - distance / self.num_bytes

def __getstate__(self):
self.bindexer.save(self.data_path)
d = super().__getstate__()
Expand Down
14 changes: 7 additions & 7 deletions gnes/indexer/chunk/faiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ def post_init(self):
self.logger.warning('fail to load model from %s, will init an empty one' % self.data_path)
self._faiss_index = faiss.index_factory(self.num_dim, self.index_key)

if 'HNSW' in self.index_key:
from ...score_fn.normalize import Normalizer3
self.normalize_fn = Normalizer3(self.num_dim)
elif ('Flat' in self.index_key) or ('PQ' in self.index_key):
from ...score_fn.normalize import Normalizer5
self.normalize_fn = Normalizer5(self.num_dim)

def add(self, keys: List[Tuple[int, Any]], vectors: np.ndarray, weights: List[float], *args, **kwargs):
if len(vectors) != len(keys):
raise ValueError("vectors length should be equal to doc_ids")
Expand All @@ -59,7 +66,6 @@ def query(self, keys: np.ndarray, top_k: int, *args, **kwargs) -> List[List[Tupl
raise ValueError("vectors should be ndarray of float32")

score, ids = self._faiss_index.search(keys, top_k)
score = self.normalize_score(score)
ret = []
for _id, _score in zip(ids, score):
ret_i = []
Expand All @@ -70,12 +76,6 @@ def query(self, keys: np.ndarray, top_k: int, *args, **kwargs) -> List[List[Tupl

return ret

def normalize_score(self, score: np.ndarray, *args, **kwargs) -> np.ndarray:
if 'HNSW' in self.index_key:
return 1 / (1 + np.sqrt(score) / self.num_dim)
elif 'PQ' or 'Flat' in self.index_key:
return 1 / (1 + np.abs(np.sqrt(score)))

@property
def size(self):
return self._faiss_index.ntotal
Expand Down
11 changes: 6 additions & 5 deletions gnes/indexer/chunk/hbindexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from .cython import IndexCore
from ...base import BaseChunkIndexer
from ....score_fn.normalize import Normalizer4


class HBIndexer(BaseChunkIndexer):
Expand All @@ -41,6 +42,7 @@ def __init__(self,
if self.n_idx <= 0:
raise ValueError('There should be at least 1 clustering slot')


def post_init(self):
self.hbindexer = IndexCore(self.n_clusters, self.n_bytes, self.n_idx)
try:
Expand All @@ -52,6 +54,8 @@ def post_init(self):
except (FileNotFoundError, IsADirectoryError):
self.logger.warning('fail to load model from %s, will create an empty one' % self.data_path)

self.normalize_fn = Normalizer4(self.n_bytes * 8)

def add(self, keys: List[Tuple[int, Any]], vectors: np.ndarray, weights: List[float], *args, **kwargs):
if len(vectors) != len(keys):
raise ValueError("vectors length should be equal to doc_ids")
Expand Down Expand Up @@ -87,12 +91,9 @@ def query(self,
doc_ids, offsets, weights, dists, q_idx = self.hbindexer.query(
vectors, clusters, n, top_k * self.n_idx)
for (i, o, w, d, q) in zip(doc_ids, offsets, weights, dists, q_idx):
result[q][(i, o, w / self._weight_norm)] = self.normalize_score(d)

return [sorted(ret.items(), key=lambda x: -x[1])[:top_k] for ret in result]
result[q][(i, o, w / self._weight_norm)] = d

def normalize_score(self, distance: int, *args, **kwargs) -> float:
return 1. - distance / self.n_bytes * 8
return [list(ret.items()) for ret in result]

def __getstate__(self):
self.hbindexer.save(self.data_path)
Expand Down
Loading