-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathIdleAlgorithm.py
51 lines (47 loc) · 2 KB
/
IdleAlgorithm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from logging import Logger
import asyncio
from connection import ImageHTTPExtractor, ModelPredictionsReceiver, IdleReporter
from confs import configs
import utils
import numpy as np
import time
class IdleAlgorithm:
def __init__(
self,
logger: Logger,
image_extractor: ImageHTTPExtractor,
model_predictor: ModelPredictionsReceiver,
reporter: IdleReporter
) -> None:
self._logger = logger
self._image_extractor = image_extractor
self._model_predictor = model_predictor
self._reporter = reporter
self._min_epoch_time = 2 # replace on config in future
def start(self) -> None:
self.prev_preds = np.array([[]]).astype(np.float32)
while True:
start_epoch_time = time.time()
self._run_one_idle_epoch()
end_epoch_time = time.time()
passed_time = end_epoch_time - start_epoch_time
if passed_time < self._min_epoch_time:
time.sleep(self._min_epoch_time - passed_time)
def _run_one_idle_epoch(self) -> None:
img, start_tracking = self._image_extractor.get_snapshot()
if img is None:
return
self._logger.debug("Sending request for model predictions")
preds = self._model_predictor.predict(img)
self._logger.debug("Model predictiions was recieved")
if preds is None:
return
if preds.size != 0 and not np.any(preds == 1.):
self._logger.info("Telephone is detected")
if utils.bboxes_not_equal(self.prev_preds, preds, configs["threshold"]):
utils.save_cropped_bbox(img, preds[:, :4])
img = utils.put_rectangle(img, preds[:, :4], preds[:, 4])
self._reporter.send_report(self._reporter.create_report(img, str(start_tracking)))
else:
self._logger.debug("Equal bboxes")
self.prev_preds = preds