Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
Merge pull request #194 from gnes-ai/feat-score-fn
Browse files Browse the repository at this point in the history
feat(score_fn): add score_fn as a new module
  • Loading branch information
Han Xiao authored Sep 3, 2019
2 parents 9d7bfd0 + f406f8f commit 93a43f5
Show file tree
Hide file tree
Showing 16 changed files with 372 additions and 174 deletions.
14 changes: 8 additions & 6 deletions gnes/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class TrainableType(type):
'batch_size': None,
'work_dir': os.environ.get('GNES_VOLUME', os.getcwd()),
'name': None,
'on_gpu': False
'on_gpu': False,
'warn_unnamed': True
}

def __new__(cls, *args, **kwargs):
Expand Down Expand Up @@ -180,11 +181,12 @@ def _post_init_wrapper(self):
if not getattr(self, 'name', None) and os.environ.get('GNES_WARN_UNNAMED_COMPONENT', '1') == '1':
_id = str(uuid.uuid4()).split('-')[0]
_name = '%s-%s' % (self.__class__.__name__, _id)
self.logger.warning(
'this object is not named ("name" is not found under "gnes_config" in YAML config), '
'i will call it "%s". '
'naming the object is important as it provides an unique identifier when '
'serializing/deserializing this object.' % _name)
if self.warn_unnamed:
self.logger.warning(
'this object is not named ("name" is not found under "gnes_config" in YAML config), '
'i will call it "%s". '
'naming the object is important as it provides an unique identifier when '
'serializing/deserializing this object.' % _name)
setattr(self, 'name', _name)

_before = set(list(self.__dict__.keys()))
Expand Down
121 changes: 14 additions & 107 deletions gnes/indexer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,33 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import List, Any, Union, Callable, Tuple

import numpy as np

from ..base import TrainableBase, CompositionalTrainableBase
from ..proto import gnes_pb2, blob2array
from ..score_fn.base import get_unary_score, ModifierFn


class BaseIndexer(TrainableBase):
def __init__(self,
normalize_fn: 'BaseScoreFn' = ModifierFn(),
score_fn: 'BaseScoreFn' = ModifierFn(), *args, **kwargs):
super().__init__(*args, **kwargs)
self.normalize_fn = normalize_fn
self.score_fn = score_fn

def add(self, keys: Any, docs: Any, weights: List[float], *args, **kwargs):
pass

def query(self, keys: Any, *args, **kwargs) -> List[Any]:
pass

def normalize_score(self, *args, **kwargs):
pass

def query_and_score(self, q_chunks: List[Union['gnes_pb2.Chunk', 'gnes_pb2.Document']], top_k: int) -> List[
'gnes_pb2.Response.QueryResponse.ScoredResult']:
raise NotImplementedError

def score(self, *args, **kwargs) -> 'gnes_pb2.Response.QueryResponse.ScoredResult.Score':
raise NotImplementedError


class BaseChunkIndexer(BaseIndexer):

Expand All @@ -59,14 +59,13 @@ def query_and_score(self, q_chunks: List['gnes_pb2.Chunk'], top_k: int, *args, *
r.chunk.doc_id = _doc_id
r.chunk.offset = _offset
r.chunk.weight = _weight
r.score.CopyFrom(self.score(q_chunk, r.chunk, _relevance))
_score = get_unary_score(value=_relevance, name=self.__class__.__name__)
_score = self.normalize_fn(_score)
_score = self.score_fn(_score, q_chunk, r.chunk)
r.score.CopyFrom(_score)
results.append(r)
return results

def score(self, q_chunk: 'gnes_pb2.Chunk', d_chunk: 'gnes_pb2.Chunk',
relevance) -> 'gnes_pb2.Response.QueryResponse.ScoredResult.Score':
return ChunkScorer.eq1(q_chunk, d_chunk, relevance)


class BaseDocIndexer(BaseIndexer):

Expand All @@ -84,14 +83,12 @@ def query_and_score(self, docs: List['gnes_pb2.Response.QueryResponse.ScoredResu
for d, r in zip(queried_results, docs):
if d:
r.doc.CopyFrom(d)
r.score.CopyFrom(self.score(d, r.score))
_score = self.normalize_fn(r.score)
_score = self.score_fn(_score, d)
r.score.CopyFrom(_score)
results.append(r)
return results

def score(self, d: 'gnes_pb2.Document', s: 'gnes_pb2.Response.QueryResponse.ScoredResult.Score', *args,
**kwargs) -> 'gnes_pb2.Response.QueryResponse.ScoredResult.Score':
return DocScorer.eq1(d, s)


class BaseKeyIndexer(BaseIndexer):

Expand All @@ -102,96 +99,6 @@ def query(self, keys: List[int], *args, **kwargs) -> List[Tuple[int, int, float]
pass


class ChunkScorer:

@staticmethod
def eq1(q_chunk: 'gnes_pb2.Chunk', d_chunk: 'gnes_pb2.Chunk',
relevance):
"""
score = d_chunk.weight * relevance * q_chunk.weight
"""
score = gnes_pb2.Response.QueryResponse.ScoredResult.Score()
score.value = d_chunk.weight * relevance * q_chunk.weight
score.explained = json.dumps({
'name': 'chunk-eq1',
'operand': [{'name': 'd_chunk_weight',
'value': float(d_chunk.weight),
'doc_id': d_chunk.doc_id,
'offset': d_chunk.offset},
{'name': 'q_chunk_weight',
'value': float(q_chunk.weight),
'offset': q_chunk.offset},
{'name': 'relevance',
'value': float(relevance)}],
'op': 'prod',
'value': float(score.value)
})
return score

@staticmethod
def eq2(q_chunk: 'gnes_pb2.Chunk', d_chunk: 'gnes_pb2.Chunk',
relevance):
"""
score = d_chunk.weight * relevance * offset_divergence * q_chunk.weight
offset_divergence is calculated based on doc_type:
TEXT && VIDEO && AUDIO: offset is 1-D
IMAGE: offset is 2-D
"""

def _cal_divergence(q_chunk: 'gnes_pb2.Chunk', d_chunk: 'gnes_pb2.Chunk'):
if q_chunk.offset_nd and d_chunk.offset_nd:
return 1 / (1 + np.sqrt((q_chunk.offset_nd[0] - d_chunk.offset_nd[0]) ** 2 +
(q_chunk.offset_nd[1] - d_chunk.offset_nd[1]) ** 2))
else:
return np.abs(q_chunk.offset - d_chunk.offset)

score = gnes_pb2.Response.QueryResponse.ScoredResult.Score()

divergence = _cal_divergence(q_chunk, d_chunk)
score.value = d_chunk.weight * relevance * divergence * q_chunk.weight
score.explained = json.dumps({
'name': 'chunk-eq2',
'operand': [{'name': 'd_chunk_weight',
'value': float(d_chunk.weight),
'doc_id': d_chunk.doc_id,
'offset': d_chunk.offset},
{'name': 'q_chunk_weight',
'value': float(q_chunk.weight),
'offset': q_chunk.offset},
{'name': 'relevance',
'value': float(relevance)},
{'name': 'offset_divergence',
'value': float(divergence)}],
'op': 'prod',
'value': float(score.value)
})
return score


class DocScorer:

@staticmethod
def eq1(d: 'gnes_pb2.Document',
s: 'gnes_pb2.Response.QueryResponse.ScoredResult.Score') -> 'gnes_pb2.Response.QueryResponse.ScoredResult.Score':
"""
score *= d.weight
:param d:
:param s:
:return:
"""
s.value *= d.weight
s.explained = json.dumps({
'name': 'doc-eq1',
'operand': [json.loads(s.explained),
{'name': 'doc_weight',
'value': float(d.weight),
'doc_id': d.doc_id}],
'op': 'prod',
'value': float(s.value)
})
return s


class JointIndexer(CompositionalTrainableBase):

@property
Expand Down
14 changes: 0 additions & 14 deletions gnes/indexer/chunk/annoy.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,10 @@ def query(self, keys: 'np.ndarray', top_k: int, *args, **kwargs) -> List[List[Tu
res = []
for k in keys:
ret, relevance_score = self._index.get_nns_by_vector(k, top_k, include_distances=True)
relevance_score = self.normalize_score(relevance_score, self.metric)
chunk_info = self._key_info_indexer.query(ret)
res.append([(*r, s) for r, s in zip(chunk_info, relevance_score)])
return res

def normalize_score(self, score: List[float], metrics: str, *args, **kwargs) -> List[float]:
if metrics == 'angular':
return list(map(lambda x: 1 / (1 + x), score))
elif metrics == 'euclidean':
import math
return list(map(lambda x: 1 / (1 + math.sqrt(x) / self.num_dim), score))
elif metrics == 'manhattan':
return list(map(lambda x: 1 / (1 + x / self.num_dim), score))
elif metrics == 'hamming':
return list(map(lambda x: 1 / (1 + x), score))
elif metrics == 'dot':
raise NotImplementedError

@property
def size(self):
return self._index.get_n_items()
Expand Down
7 changes: 2 additions & 5 deletions gnes/indexer/chunk/bindexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def query(self,
for (i, o, w, d, q) in zip(doc_ids, offsets, weights, dists, q_idx):
if d == 0:
continue
result[q].append((i, o, w / self._weight_norm, self.normalize_score(d)))
result[q].append((i, o, w / self._weight_norm, d))

# get the top-k
for q in range(num_rows):
Expand All @@ -108,12 +108,9 @@ def query(self,
doc_ids, offsets, weights, dists, q_idx = self.bindexer.force_search(
keys, num_rows, top_k)
for (i, o, w, d, q) in zip(doc_ids, offsets, weights, dists, q_idx):
result[q].append((i, o, w / self._weight_norm, self.normalize_score(d)))
result[q].append((i, o, w / self._weight_norm, d))
return result

def normalize_score(self, distance: int, *args, **kwargs) -> float:
return 1. - distance / self.num_bytes

def __getstate__(self):
self.bindexer.save(self.data_path)
d = super().__getstate__()
Expand Down
7 changes: 0 additions & 7 deletions gnes/indexer/chunk/faiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def query(self, keys: np.ndarray, top_k: int, *args, **kwargs) -> List[List[Tupl
raise ValueError("vectors should be ndarray of float32")

score, ids = self._faiss_index.search(keys, top_k)
score = self.normalize_score(score)
ret = []
for _id, _score in zip(ids, score):
ret_i = []
Expand All @@ -70,12 +69,6 @@ def query(self, keys: np.ndarray, top_k: int, *args, **kwargs) -> List[List[Tupl

return ret

def normalize_score(self, score: np.ndarray, *args, **kwargs) -> np.ndarray:
if 'HNSW' in self.index_key:
return 1 / (1 + np.sqrt(score) / self.num_dim)
elif 'PQ' or 'Flat' in self.index_key:
return 1 / (1 + np.abs(np.sqrt(score)))

@property
def size(self):
return self._faiss_index.ntotal
Expand Down
7 changes: 2 additions & 5 deletions gnes/indexer/chunk/hbindexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,9 @@ def query(self,
doc_ids, offsets, weights, dists, q_idx = self.hbindexer.query(
vectors, clusters, n, top_k * self.n_idx)
for (i, o, w, d, q) in zip(doc_ids, offsets, weights, dists, q_idx):
result[q][(i, o, w / self._weight_norm)] = self.normalize_score(d)
result[q][(i, o, w / self._weight_norm)] = d

return [sorted(ret.items(), key=lambda x: -x[1])[:top_k] for ret in result]

def normalize_score(self, distance: int, *args, **kwargs) -> float:
return 1. - distance / self.n_bytes * 8
return [list(ret.items()) for ret in result]

def __getstate__(self):
self.hbindexer.save(self.data_path)
Expand Down
31 changes: 7 additions & 24 deletions gnes/router/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from _operator import add, mul
from collections import defaultdict
from functools import reduce
from typing import List, Generator

from gnes.score_fn.base import ScoreCombinedFn
from ..base import TrainableBase, CompositionalTrainableBase
from ..proto import gnes_pb2, merge_routes

Expand Down Expand Up @@ -63,19 +61,11 @@ def apply(self, msg: 'gnes_pb2.Message', accum_msgs: List['gnes_pb2.Message'], *
class BaseTopkReduceRouter(BaseReduceRouter):
def __init__(self, reduce_op: str = 'sum', descending: bool = True, *args, **kwargs):
super().__init__(*args, **kwargs)
if reduce_op not in {'sum', 'prod', 'max', 'min', 'avg'}:
raise ValueError('reduce_op=%s is not acceptable' % reduce_op)
self._reduce_op = reduce_op
self.descending = descending

def post_init(self):
self.reduce_op = {
'prod': lambda v: reduce(mul, v),
'sum': lambda v: reduce(add, v),
'max': lambda v: reduce(max, v),
'min': lambda v: reduce(min, v),
'avg': lambda v: reduce(add, v) / len(v),
}[self._reduce_op]
self.reduce_op = ScoreCombinedFn(score_mode=self._reduce_op)

def get_key(self, x: 'gnes_pb2.Response.QueryResponse.ScoredResult') -> str:
raise NotImplementedError
Expand All @@ -86,29 +76,22 @@ def set_key(self, x: 'gnes_pb2.Response.QueryResponse.ScoredResult', k: str) ->
def apply(self, msg: 'gnes_pb2.Message', accum_msgs: List['gnes_pb2.Message'], *args, **kwargs):
# now convert chunk results to doc results
all_scored_results = [sr for m in accum_msgs for sr in m.response.search.topk_results]
score_dict = defaultdict(lambda: {'values': [], 'explains': [], 'reduced_value': 0})
score_dict = defaultdict(list)

# count score by iterating over chunks
for c in all_scored_results:
k = self.get_key(c)
score_dict[k]['values'].append(c.score.value)
score_dict[k]['explains'].append(c.score.explained)
score_dict[k].append(c.score)

for k, v in score_dict.items():
score_dict[k]['reduced_value'] = self.reduce_op(v['values'])
score_dict[k] = self.reduce_op(*v)

msg.response.search.ClearField('topk_results')

# sort and add docs
for k, v in sorted(score_dict.items(), key=lambda kv: kv[1]['reduced_value'] * (-1 if self.descending else 1)):
for k, v in sorted(score_dict.items(), key=lambda kv: kv[1].value, reverse=self.descending):
r = msg.response.search.topk_results.add()
r.score.value = v['reduced_value']
r.score.explained = json.dumps({
'name': 'topk-reduce',
'op': self._reduce_op,
'operand': [json.loads(vv) for vv in v['explains']],
'value': float(r.score.value)
})
r.score.CopyFrom(v)
self.set_key(r, k)

super().apply(msg, accum_msgs)
Expand Down
Empty file added gnes/score_fn/__init__.py
Empty file.
Loading

0 comments on commit 93a43f5

Please sign in to comment.