This repository has been archived on 2024-03-03. You can view files and clone it, but cannot push or open issues or pull requests.
unifi-cam-proxy/unifi/cams/frigate.py

155 lines
6.1 KiB
Python
Raw Normal View History

2024-03-02 11:37:04 +00:00
import argparse
import asyncio
import json
import logging
import tempfile
from pathlib import Path
from typing import Any, Optional
import backoff
from asyncio_mqtt import Client
from asyncio_mqtt.error import MqttError
from unifi.cams.base import SmartDetectObjectType
from unifi.cams.rtsp import RTSPCam
class FrigateCam(RTSPCam):
def __init__(self, args: argparse.Namespace, logger: logging.Logger) -> None:
super().__init__(args, logger)
self.args = args
self.event_id: Optional[str] = None
self.event_label: Optional[str] = None
self.event_snapshot_ready = None
@classmethod
def add_parser(cls, parser: argparse.ArgumentParser) -> None:
super().add_parser(parser)
parser.add_argument("--mqtt-host", required=True, help="MQTT server")
parser.add_argument("--mqtt-port", default=1883, type=int, help="MQTT server")
parser.add_argument("--mqtt-username", required=False)
parser.add_argument("--mqtt-password", required=False)
parser.add_argument(
"--mqtt-prefix", default="frigate", type=str, help="Topic prefix"
)
parser.add_argument(
"--frigate-camera",
required=True,
type=str,
help="Name of camera in frigate",
)
async def get_feature_flags(self) -> dict[str, Any]:
return {
**await super().get_feature_flags(),
**{
"mic": True,
"smartDetect": [
"person",
"vehicle",
],
},
}
@classmethod
def label_to_object_type(cls, label: str) -> Optional[SmartDetectObjectType]:
if label == "person":
return SmartDetectObjectType.PERSON
elif label in {"vehicle", "car", "motorcycle", "bus"}:
return SmartDetectObjectType.VEHICLE
async def run(self) -> None:
has_connected = False
@backoff.on_predicate(backoff.expo, max_value=60, logger=self.logger)
async def mqtt_connect():
nonlocal has_connected
try:
async with Client(
self.args.mqtt_host,
port=self.args.mqtt_port,
username=self.args.mqtt_username,
password=self.args.mqtt_password,
) as client:
has_connected = True
self.logger.info(
f"Connected to {self.args.mqtt_host}:{self.args.mqtt_port}"
)
tasks = [
self.handle_detection_events(client),
self.handle_snapshot_events(client),
]
await client.subscribe(f"{self.args.mqtt_prefix}/#")
await asyncio.gather(*tasks)
except MqttError:
if not has_connected:
raise
await mqtt_connect()
async def handle_detection_events(self, client) -> None:
async with client.filtered_messages(
f"{self.args.mqtt_prefix}/events"
) as messages:
async for message in messages:
msg = message.payload.decode()
try:
frigate_msg = json.loads(message.payload.decode())
if not frigate_msg["after"]["camera"] == self.args.frigate_camera:
continue
label = frigate_msg["after"]["label"]
object_type = self.label_to_object_type(label)
if not object_type:
self.logger.warning(
f"Received unsupported detection label type: {label}"
)
if not self.event_id and frigate_msg["type"] == "new":
self.event_id = frigate_msg["after"]["id"]
self.event_label = label
self.event_snapshot_ready = asyncio.Event()
self.logger.info(
f"Starting {self.event_label} motion event"
f" (id: {self.event_id})"
)
await self.trigger_motion_start(object_type)
elif (
self.event_id == frigate_msg["after"]["id"]
and frigate_msg["type"] == "end"
):
# Wait for the best snapshot to be ready before
# ending the motion event
self.logger.info(f"Awaiting snapshot (id: {self.event_id})")
await self.event_snapshot_ready.wait()
self.logger.info(
f"Ending {self.event_label} motion event"
f" (id: {self.event_id})"
)
await self.trigger_motion_stop()
self.event_id = None
self.event_label = None
except json.JSONDecodeError:
self.logger.exception(f"Could not decode payload: {msg}")
async def handle_snapshot_events(self, client) -> None:
topic_fmt = f"{self.args.mqtt_prefix}/{self.args.frigate_camera}/{{}}/snapshot"
async with client.filtered_messages(topic_fmt.format("+")) as messages:
async for message in messages:
if (
self.event_id
and not message.retain
and message.topic == topic_fmt.format(self.event_label)
):
f = tempfile.NamedTemporaryFile()
f.write(message.payload)
self.logger.debug(
f"Updating snapshot for {self.event_label} with {f.name}"
)
self.update_motion_snapshot(Path(f.name))
self.event_snapshot_ready.set()
else:
self.logger.debug(
f"Discarding snapshot message ({len(message.payload)})"
)