Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

feat(score_fn): add score_fn as a new module #194

Merged
merged 8 commits into from
Sep 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions gnes/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class TrainableType(type):
'batch_size': None,
'work_dir': os.environ.get('GNES_VOLUME', os.getcwd()),
'name': None,
'on_gpu': False
'on_gpu': False,
'warn_unnamed': True
}

def __new__(cls, *args, **kwargs):
Expand Down Expand Up @@ -180,11 +181,12 @@ def _post_init_wrapper(self):
if not getattr(self, 'name', None) and os.environ.get('GNES_WARN_UNNAMED_COMPONENT', '1') == '1':
_id = str(uuid.uuid4()).split('-')[0]
_name = '%s-%s' % (self.__class__.__name__, _id)
self.logger.warning(
'this object is not named ("name" is not found under "gnes_config" in YAML config), '
'i will call it "%s". '
'naming the object is important as it provides an unique identifier when '
'serializing/deserializing this object.' % _name)
if self.warn_unnamed:
self.logger.warning(
'this object is not named ("name" is not found under "gnes_config" in YAML config), '
'i will call it "%s". '
'naming the object is important as it provides an unique identifier when '
'serializing/deserializing this object.' % _name)
setattr(self, 'name', _name)

_before = set(list(self.__dict__.keys()))
Expand Down
121 changes: 14 additions & 107 deletions gnes/indexer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,33 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import List, Any, Union, Callable, Tuple

import numpy as np

from ..base import TrainableBase, CompositionalTrainableBase
from ..proto import gnes_pb2, blob2array
from ..score_fn.base import get_unary_score, ModifierFn


class BaseIndexer(TrainableBase):
def __init__(self,
normalize_fn: 'BaseScoreFn' = ModifierFn(),
score_fn: 'BaseScoreFn' = ModifierFn(), *args, **kwargs):
super().__init__(*args, **kwargs)
self.normalize_fn = normalize_fn
self.score_fn = score_fn

def add(self, keys: Any, docs: Any, weights: List[float], *args, **kwargs):
pass

def query(self, keys: Any, *args, **kwargs) -> List[Any]:
pass

def normalize_score(self, *args, **kwargs):
pass

def query_and_score(self, q_chunks: List[Union['gnes_pb2.Chunk', 'gnes_pb2.Document']], top_k: int) -> List[
'gnes_pb2.Response.QueryResponse.ScoredResult']:
raise NotImplementedError

def score(self, *args, **kwargs) -> 'gnes_pb2.Response.QueryResponse.ScoredResult.Score':
raise NotImplementedError


class BaseChunkIndexer(BaseIndexer):

Expand All @@ -59,14 +59,13 @@ def query_and_score(self, q_chunks: List['gnes_pb2.Chunk'], top_k: int, *args, *
r.chunk.doc_id = _doc_id
r.chunk.offset = _offset
r.chunk.weight = _weight
r.score.CopyFrom(self.score(q_chunk, r.chunk, _relevance))
_score = get_unary_score(value=_relevance, name=self.__class__.__name__)
_score = self.normalize_fn(_score)
_score = self.score_fn(_score, q_chunk, r.chunk)
r.score.CopyFrom(_score)
results.append(r)
return results

def score(self, q_chunk: 'gnes_pb2.Chunk', d_chunk: 'gnes_pb2.Chunk',
relevance) -> 'gnes_pb2.Response.QueryResponse.ScoredResult.Score':
return ChunkScorer.eq1(q_chunk, d_chunk, relevance)


class BaseDocIndexer(BaseIndexer):

Expand All @@ -84,14 +83,12 @@ def query_and_score(self, docs: List['gnes_pb2.Response.QueryResponse.ScoredResu
for d, r in zip(queried_results, docs):
if d:
r.doc.CopyFrom(d)
r.score.CopyFrom(self.score(d, r.score))
_score = self.normalize_fn(r.score)
_score = self.score_fn(_score, d)
r.score.CopyFrom(_score)
results.append(r)
return results

def score(self, d: 'gnes_pb2.Document', s: 'gnes_pb2.Response.QueryResponse.ScoredResult.Score', *args,
**kwargs) -> 'gnes_pb2.Response.QueryResponse.ScoredResult.Score':
return DocScorer.eq1(d, s)


class BaseKeyIndexer(BaseIndexer):

Expand All @@ -102,96 +99,6 @@ def query(self, keys: List[int], *args, **kwargs) -> List[Tuple[int, int, float]
pass


class ChunkScorer:

@staticmethod
def eq1(q_chunk: 'gnes_pb2.Chunk', d_chunk: 'gnes_pb2.Chunk',
relevance):
"""
score = d_chunk.weight * relevance * q_chunk.weight
"""
score = gnes_pb2.Response.QueryResponse.ScoredResult.Score()
score.value = d_chunk.weight * relevance * q_chunk.weight
score.explained = json.dumps({
'name': 'chunk-eq1',
'operand': [{'name': 'd_chunk_weight',
'value': float(d_chunk.weight),
'doc_id': d_chunk.doc_id,
'offset': d_chunk.offset},
{'name': 'q_chunk_weight',
'value': float(q_chunk.weight),
'offset': q_chunk.offset},
{'name': 'relevance',
'value': float(relevance)}],
'op': 'prod',
'value': float(score.value)
})
return score

@staticmethod
def eq2(q_chunk: 'gnes_pb2.Chunk', d_chunk: 'gnes_pb2.Chunk',
relevance):
"""
score = d_chunk.weight * relevance * offset_divergence * q_chunk.weight
offset_divergence is calculated based on doc_type:
TEXT && VIDEO && AUDIO: offset is 1-D
IMAGE: offset is 2-D
"""

def _cal_divergence(q_chunk: 'gnes_pb2.Chunk', d_chunk: 'gnes_pb2.Chunk'):
if q_chunk.offset_nd and d_chunk.offset_nd:
return 1 / (1 + np.sqrt((q_chunk.offset_nd[0] - d_chunk.offset_nd[0]) ** 2 +
(q_chunk.offset_nd[1] - d_chunk.offset_nd[1]) ** 2))
else:
return np.abs(q_chunk.offset - d_chunk.offset)

score = gnes_pb2.Response.QueryResponse.ScoredResult.Score()

divergence = _cal_divergence(q_chunk, d_chunk)
score.value = d_chunk.weight * relevance * divergence * q_chunk.weight
score.explained = json.dumps({
'name': 'chunk-eq2',
'operand': [{'name': 'd_chunk_weight',
'value': float(d_chunk.weight),
'doc_id': d_chunk.doc_id,
'offset': d_chunk.offset},
{'name': 'q_chunk_weight',
'value': float(q_chunk.weight),
'offset': q_chunk.offset},
{'name': 'relevance',
'value': float(relevance)},
{'name': 'offset_divergence',
'value': float(divergence)}],
'op': 'prod',
'value': float(score.value)
})
return score


class DocScorer:

@staticmethod
def eq1(d: 'gnes_pb2.Document',
s: 'gnes_pb2.Response.QueryResponse.ScoredResult.Score') -> 'gnes_pb2.Response.QueryResponse.ScoredResult.Score':
"""
score *= d.weight
:param d:
:param s:
:return:
"""
s.value *= d.weight
s.explained = json.dumps({
'name': 'doc-eq1',
'operand': [json.loads(s.explained),
{'name': 'doc_weight',
'value': float(d.weight),
'doc_id': d.doc_id}],
'op': 'prod',
'value': float(s.value)
})
return s


class JointIndexer(CompositionalTrainableBase):

@property
Expand Down
14 changes: 0 additions & 14 deletions gnes/indexer/chunk/annoy.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,10 @@ def query(self, keys: 'np.ndarray', top_k: int, *args, **kwargs) -> List[List[Tu
res = []
for k in keys:
ret, relevance_score = self._index.get_nns_by_vector(k, top_k, include_distances=True)
relevance_score = self.normalize_score(relevance_score, self.metric)
chunk_info = self._key_info_indexer.query(ret)
res.append([(*r, s) for r, s in zip(chunk_info, relevance_score)])
return res

def normalize_score(self, score: List[float], metrics: str, *args, **kwargs) -> List[float]:
if metrics == 'angular':
return list(map(lambda x: 1 / (1 + x), score))
elif metrics == 'euclidean':
import math
return list(map(lambda x: 1 / (1 + math.sqrt(x) / self.num_dim), score))
elif metrics == 'manhattan':
return list(map(lambda x: 1 / (1 + x / self.num_dim), score))
elif metrics == 'hamming':
return list(map(lambda x: 1 / (1 + x), score))
elif metrics == 'dot':
raise NotImplementedError

@property
def size(self):
return self._index.get_n_items()
Expand Down
7 changes: 2 additions & 5 deletions gnes/indexer/chunk/bindexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def query(self,
for (i, o, w, d, q) in zip(doc_ids, offsets, weights, dists, q_idx):
if d == 0:
continue
result[q].append((i, o, w / self._weight_norm, self.normalize_score(d)))
result[q].append((i, o, w / self._weight_norm, d))

# get the top-k
for q in range(num_rows):
Expand All @@ -108,12 +108,9 @@ def query(self,
doc_ids, offsets, weights, dists, q_idx = self.bindexer.force_search(
keys, num_rows, top_k)
for (i, o, w, d, q) in zip(doc_ids, offsets, weights, dists, q_idx):
result[q].append((i, o, w / self._weight_norm, self.normalize_score(d)))
result[q].append((i, o, w / self._weight_norm, d))
return result

def normalize_score(self, distance: int, *args, **kwargs) -> float:
return 1. - distance / self.num_bytes

def __getstate__(self):
self.bindexer.save(self.data_path)
d = super().__getstate__()
Expand Down
7 changes: 0 additions & 7 deletions gnes/indexer/chunk/faiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def query(self, keys: np.ndarray, top_k: int, *args, **kwargs) -> List[List[Tupl
raise ValueError("vectors should be ndarray of float32")

score, ids = self._faiss_index.search(keys, top_k)
score = self.normalize_score(score)
ret = []
for _id, _score in zip(ids, score):
ret_i = []
Expand All @@ -70,12 +69,6 @@ def query(self, keys: np.ndarray, top_k: int, *args, **kwargs) -> List[List[Tupl

return ret

def normalize_score(self, score: np.ndarray, *args, **kwargs) -> np.ndarray:
if 'HNSW' in self.index_key:
return 1 / (1 + np.sqrt(score) / self.num_dim)
elif 'PQ' or 'Flat' in self.index_key:
return 1 / (1 + np.abs(np.sqrt(score)))

@property
def size(self):
return self._faiss_index.ntotal
Expand Down
7 changes: 2 additions & 5 deletions gnes/indexer/chunk/hbindexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,9 @@ def query(self,
doc_ids, offsets, weights, dists, q_idx = self.hbindexer.query(
vectors, clusters, n, top_k * self.n_idx)
for (i, o, w, d, q) in zip(doc_ids, offsets, weights, dists, q_idx):
result[q][(i, o, w / self._weight_norm)] = self.normalize_score(d)
result[q][(i, o, w / self._weight_norm)] = d

return [sorted(ret.items(), key=lambda x: -x[1])[:top_k] for ret in result]

def normalize_score(self, distance: int, *args, **kwargs) -> float:
return 1. - distance / self.n_bytes * 8
return [list(ret.items()) for ret in result]

def __getstate__(self):
self.hbindexer.save(self.data_path)
Expand Down
31 changes: 7 additions & 24 deletions gnes/router/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from _operator import add, mul
from collections import defaultdict
from functools import reduce
from typing import List, Generator

from gnes.score_fn.base import ScoreCombinedFn
from ..base import TrainableBase, CompositionalTrainableBase
from ..proto import gnes_pb2, merge_routes

Expand Down Expand Up @@ -63,19 +61,11 @@ def apply(self, msg: 'gnes_pb2.Message', accum_msgs: List['gnes_pb2.Message'], *
class BaseTopkReduceRouter(BaseReduceRouter):
def __init__(self, reduce_op: str = 'sum', descending: bool = True, *args, **kwargs):
super().__init__(*args, **kwargs)
if reduce_op not in {'sum', 'prod', 'max', 'min', 'avg'}:
raise ValueError('reduce_op=%s is not acceptable' % reduce_op)
self._reduce_op = reduce_op
self.descending = descending

def post_init(self):
self.reduce_op = {
'prod': lambda v: reduce(mul, v),
'sum': lambda v: reduce(add, v),
'max': lambda v: reduce(max, v),
'min': lambda v: reduce(min, v),
'avg': lambda v: reduce(add, v) / len(v),
}[self._reduce_op]
self.reduce_op = ScoreCombinedFn(score_mode=self._reduce_op)

def get_key(self, x: 'gnes_pb2.Response.QueryResponse.ScoredResult') -> str:
raise NotImplementedError
Expand All @@ -86,29 +76,22 @@ def set_key(self, x: 'gnes_pb2.Response.QueryResponse.ScoredResult', k: str) ->
def apply(self, msg: 'gnes_pb2.Message', accum_msgs: List['gnes_pb2.Message'], *args, **kwargs):
# now convert chunk results to doc results
all_scored_results = [sr for m in accum_msgs for sr in m.response.search.topk_results]
score_dict = defaultdict(lambda: {'values': [], 'explains': [], 'reduced_value': 0})
score_dict = defaultdict(list)

# count score by iterating over chunks
for c in all_scored_results:
k = self.get_key(c)
score_dict[k]['values'].append(c.score.value)
score_dict[k]['explains'].append(c.score.explained)
score_dict[k].append(c.score)

for k, v in score_dict.items():
score_dict[k]['reduced_value'] = self.reduce_op(v['values'])
score_dict[k] = self.reduce_op(*v)

msg.response.search.ClearField('topk_results')

# sort and add docs
for k, v in sorted(score_dict.items(), key=lambda kv: kv[1]['reduced_value'] * (-1 if self.descending else 1)):
for k, v in sorted(score_dict.items(), key=lambda kv: kv[1].value, reverse=self.descending):
r = msg.response.search.topk_results.add()
r.score.value = v['reduced_value']
r.score.explained = json.dumps({
'name': 'topk-reduce',
'op': self._reduce_op,
'operand': [json.loads(vv) for vv in v['explains']],
'value': float(r.score.value)
})
r.score.CopyFrom(v)
self.set_key(r, k)

super().apply(msg, accum_msgs)
Expand Down
Empty file added gnes/score_fn/__init__.py
Empty file.
Loading