Skip to content

Commit

Permalink
Refactor events to be more generic (#6320)
Browse files Browse the repository at this point in the history
* Organize event table to be more generalized

* Add appropriate fields to data

* Move tracked object logic to own function

* Add source type to event queue

* rename enum

* Fix types that are used in webUI

* remove redundant

* Formatting

* fix typing

* Rename enum
  • Loading branch information
NickM-27 authored Apr 30, 2023
1 parent ca7790f commit ad52e23
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 120 deletions.
199 changes: 110 additions & 89 deletions frigate/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import os
import queue
import threading

from enum import Enum
from pathlib import Path

from peewee import fn

from frigate.config import EventsConfig, FrigateConfig
from frigate.const import CLIPS_DIR
from frigate.models import Event
from frigate.timeline import TimelineSourceEnum
from frigate.types import CameraMetricsTypes
from frigate.util import to_relative_box

Expand All @@ -21,6 +22,12 @@
logger = logging.getLogger(__name__)


class EventTypeEnum(str, Enum):
# api = "api"
# audio = "audio"
tracked_object = "tracked_object"


def should_update_db(prev_event: Event, current_event: Event) -> bool:
"""If current_event has updated fields and (clip or snapshot)."""
if current_event["has_clip"] or current_event["has_snapshot"]:
Expand Down Expand Up @@ -66,7 +73,9 @@ def run(self) -> None:

while not self.stop_event.is_set():
try:
event_type, camera, event_data = self.event_queue.get(timeout=1)
source_type, event_type, camera, event_data = self.event_queue.get(
timeout=1
)
except queue.Empty:
continue

Expand All @@ -75,107 +84,119 @@ def run(self) -> None:
self.timeline_queue.put(
(
camera,
TimelineSourceEnum.tracked_object,
source_type,
event_type,
self.events_in_process.get(event_data["id"]),
event_data,
)
)

# if this is the first message, just store it and continue, its not time to insert it in the db
if event_type == "start":
self.events_in_process[event_data["id"]] = event_data
continue

if should_update_db(self.events_in_process[event_data["id"]], event_data):
camera_config = self.config.cameras[camera]
event_config: EventsConfig = camera_config.record.events
width = camera_config.detect.width
height = camera_config.detect.height
first_detector = list(self.config.detectors.values())[0]

start_time = event_data["start_time"] - event_config.pre_capture
end_time = (
None
if event_data["end_time"] is None
else event_data["end_time"] + event_config.post_capture
)
# score of the snapshot
score = (
None
if event_data["snapshot"] is None
else event_data["snapshot"]["score"]
)
# detection region in the snapshot
region = (
None
if event_data["snapshot"] is None
else to_relative_box(
width,
height,
event_data["snapshot"]["region"],
)
)
# bounding box for the snapshot
box = (
None
if event_data["snapshot"] is None
else to_relative_box(
width,
height,
event_data["snapshot"]["box"],
)
)

# keep these from being set back to false because the event
# may have started while recordings and snapshots were enabled
# this would be an issue for long running events
if self.events_in_process[event_data["id"]]["has_clip"]:
event_data["has_clip"] = True
if self.events_in_process[event_data["id"]]["has_snapshot"]:
event_data["has_snapshot"] = True

event = {
Event.id: event_data["id"],
Event.label: event_data["label"],
Event.camera: camera,
Event.start_time: start_time,
Event.end_time: end_time,
Event.top_score: event_data["top_score"],
Event.score: score,
Event.zones: list(event_data["entered_zones"]),
Event.thumbnail: event_data["thumbnail"],
Event.region: region,
Event.box: box,
Event.has_clip: event_data["has_clip"],
Event.has_snapshot: event_data["has_snapshot"],
Event.model_hash: first_detector.model.model_hash,
Event.model_type: first_detector.model.model_type,
Event.detector_type: first_detector.type,
}
if source_type == EventTypeEnum.tracked_object:
if event_type == "start":
self.events_in_process[event_data["id"]] = event_data
continue

(
Event.insert(event)
.on_conflict(
conflict_target=[Event.id],
update=event,
)
.execute()
)

# update the stored copy for comparison on future update messages
self.events_in_process[event_data["id"]] = event_data

if event_type == "end":
del self.events_in_process[event_data["id"]]
self.event_processed_queue.put((event_data["id"], camera))
self.handle_object_detection(event_type, camera, event_data)

# set an end_time on events without an end_time before exiting
Event.update(end_time=datetime.datetime.now().timestamp()).where(
Event.end_time == None
).execute()
logger.info(f"Exiting event processor...")

def handle_object_detection(
self,
event_type: str,
camera: str,
event_data: Event,
) -> None:
"""handle tracked object event updates."""
# if this is the first message, just store it and continue, its not time to insert it in the db
if should_update_db(self.events_in_process[event_data["id"]], event_data):
camera_config = self.config.cameras[camera]
event_config: EventsConfig = camera_config.record.events
width = camera_config.detect.width
height = camera_config.detect.height
first_detector = list(self.config.detectors.values())[0]

start_time = event_data["start_time"] - event_config.pre_capture
end_time = (
None
if event_data["end_time"] is None
else event_data["end_time"] + event_config.post_capture
)
# score of the snapshot
score = (
None
if event_data["snapshot"] is None
else event_data["snapshot"]["score"]
)
# detection region in the snapshot
region = (
None
if event_data["snapshot"] is None
else to_relative_box(
width,
height,
event_data["snapshot"]["region"],
)
)
# bounding box for the snapshot
box = (
None
if event_data["snapshot"] is None
else to_relative_box(
width,
height,
event_data["snapshot"]["box"],
)
)

# keep these from being set back to false because the event
# may have started while recordings and snapshots were enabled
# this would be an issue for long running events
if self.events_in_process[event_data["id"]]["has_clip"]:
event_data["has_clip"] = True
if self.events_in_process[event_data["id"]]["has_snapshot"]:
event_data["has_snapshot"] = True

event = {
Event.id: event_data["id"],
Event.label: event_data["label"],
Event.camera: camera,
Event.start_time: start_time,
Event.end_time: end_time,
Event.zones: list(event_data["entered_zones"]),
Event.thumbnail: event_data["thumbnail"],
Event.has_clip: event_data["has_clip"],
Event.has_snapshot: event_data["has_snapshot"],
Event.model_hash: first_detector.model.model_hash,
Event.model_type: first_detector.model.model_type,
Event.detector_type: first_detector.type,
Event.data: {
"box": box,
"region": region,
"score": score,
"top_score": event_data["top_score"],
},
}

(
Event.insert(event)
.on_conflict(
conflict_target=[Event.id],
update=event,
)
.execute()
)

# update the stored copy for comparison on future update messages
self.events_in_process[event_data["id"]] = event_data

if event_type == "end":
del self.events_in_process[event_data["id"]]
self.event_processed_queue.put((event_data["id"], camera))


class EventCleanup(threading.Thread):
def __init__(self, config: FrigateConfig, stop_event: MpEvent):
Expand Down
20 changes: 12 additions & 8 deletions frigate/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
restart_frigate,
vainfo_hwaccel,
get_tz_modifiers,
to_relative_box,
)
from frigate.storage import StorageMaintainer
from frigate.version import VERSION
Expand Down Expand Up @@ -196,7 +195,7 @@ def send_to_plus(id):
return make_response(jsonify({"success": False, "message": message}), 404)

# events from before the conversion to relative dimensions cant include annotations
if any(d > 1 for d in event.box):
if any(d > 1 for d in event.data["box"]):
include_annotation = None

if event.end_time is None:
Expand Down Expand Up @@ -252,8 +251,8 @@ def send_to_plus(id):
event.save()

if not include_annotation is None:
region = event.region
box = event.box
region = event.data["region"]
box = event.data["box"]

try:
current_app.plus_api.add_annotation(
Expand Down Expand Up @@ -294,7 +293,7 @@ def false_positive(id):
return make_response(jsonify({"success": False, "message": message}), 404)

# events from before the conversion to relative dimensions cant include annotations
if any(d > 1 for d in event.box):
if any(d > 1 for d in event.data["box"]):
message = f"Events prior to 0.13 cannot be submitted as false positives"
logger.error(message)
return make_response(jsonify({"success": False, "message": message}), 400)
Expand All @@ -311,11 +310,15 @@ def false_positive(id):
# need to refetch the event now that it has a plus_id
event = Event.get(Event.id == id)

region = event.region
box = event.box
region = event.data["region"]
box = event.data["box"]

# provide top score if score is unavailable
score = event.top_score if event.score is None else event.score
score = (
(event.data["top_score"] if event.data["top_score"] else event.top_score)
if event.data["score"] is None
else event.data["score"]
)

try:
current_app.plus_api.add_false_positive(
Expand Down Expand Up @@ -756,6 +759,7 @@ def events():
Event.top_score,
Event.false_positive,
Event.box,
Event.data,
]

if camera != "all":
Expand Down
21 changes: 16 additions & 5 deletions frigate/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,33 @@ class Event(Model): # type: ignore[misc]
camera = CharField(index=True, max_length=20)
start_time = DateTimeField()
end_time = DateTimeField()
top_score = FloatField()
score = FloatField()
top_score = (
FloatField()
) # TODO remove when columns can be dropped without rebuilding table
score = (
FloatField()
) # TODO remove when columns can be dropped without rebuilding table
false_positive = BooleanField()
zones = JSONField()
thumbnail = TextField()
has_clip = BooleanField(default=True)
has_snapshot = BooleanField(default=True)
region = JSONField()
box = JSONField()
area = IntegerField()
region = (
JSONField()
) # TODO remove when columns can be dropped without rebuilding table
box = (
JSONField()
) # TODO remove when columns can be dropped without rebuilding table
area = (
IntegerField()
) # TODO remove when columns can be dropped without rebuilding table
retain_indefinitely = BooleanField(default=False)
ratio = FloatField(default=1.0)
plus_id = CharField(max_length=30)
model_hash = CharField(max_length=32)
detector_type = CharField(max_length=32)
model_type = CharField(max_length=32)
data = JSONField() # ex: tracked object box, region, etc.


class Timeline(Model): # type: ignore[misc]
Expand Down
21 changes: 18 additions & 3 deletions frigate/object_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
FrigateConfig,
)
from frigate.const import CLIPS_DIR
from frigate.events import EventTypeEnum
from frigate.util import (
SharedMemoryFrameManager,
calculate_region,
Expand Down Expand Up @@ -656,7 +657,9 @@ def __init__(
self.last_motion_detected: dict[str, float] = {}

def start(camera, obj: TrackedObject, current_frame_time):
self.event_queue.put(("start", camera, obj.to_dict()))
self.event_queue.put(
(EventTypeEnum.tracked_object, "start", camera, obj.to_dict())
)

def update(camera, obj: TrackedObject, current_frame_time):
obj.has_snapshot = self.should_save_snapshot(camera, obj)
Expand All @@ -670,7 +673,12 @@ def update(camera, obj: TrackedObject, current_frame_time):
self.dispatcher.publish("events", json.dumps(message), retain=False)
obj.previous = after
self.event_queue.put(
("update", camera, obj.to_dict(include_thumbnail=True))
(
EventTypeEnum.tracked_object,
"update",
camera,
obj.to_dict(include_thumbnail=True),
)
)

def end(camera, obj: TrackedObject, current_frame_time):
Expand Down Expand Up @@ -722,7 +730,14 @@ def end(camera, obj: TrackedObject, current_frame_time):
}
self.dispatcher.publish("events", json.dumps(message), retain=False)

self.event_queue.put(("end", camera, obj.to_dict(include_thumbnail=True)))
self.event_queue.put(
(
EventTypeEnum.tracked_object,
"end",
camera,
obj.to_dict(include_thumbnail=True),
)
)

def snapshot(camera, obj: TrackedObject, current_frame_time):
mqtt_config: MqttConfig = self.config.cameras[camera].mqtt
Expand Down
Loading

0 comments on commit ad52e23

Please sign in to comment.