155 lines
6.1 KiB
Python
155 lines
6.1 KiB
Python
import argparse
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
import backoff
|
|
from asyncio_mqtt import Client
|
|
from asyncio_mqtt.error import MqttError
|
|
|
|
from unifi.cams.base import SmartDetectObjectType
|
|
from unifi.cams.rtsp import RTSPCam
|
|
|
|
|
|
class FrigateCam(RTSPCam):
|
|
def __init__(self, args: argparse.Namespace, logger: logging.Logger) -> None:
|
|
super().__init__(args, logger)
|
|
self.args = args
|
|
self.event_id: Optional[str] = None
|
|
self.event_label: Optional[str] = None
|
|
self.event_snapshot_ready = None
|
|
|
|
@classmethod
|
|
def add_parser(cls, parser: argparse.ArgumentParser) -> None:
|
|
super().add_parser(parser)
|
|
parser.add_argument("--mqtt-host", required=True, help="MQTT server")
|
|
parser.add_argument("--mqtt-port", default=1883, type=int, help="MQTT server")
|
|
parser.add_argument("--mqtt-username", required=False)
|
|
parser.add_argument("--mqtt-password", required=False)
|
|
parser.add_argument(
|
|
"--mqtt-prefix", default="frigate", type=str, help="Topic prefix"
|
|
)
|
|
parser.add_argument(
|
|
"--frigate-camera",
|
|
required=True,
|
|
type=str,
|
|
help="Name of camera in frigate",
|
|
)
|
|
|
|
async def get_feature_flags(self) -> dict[str, Any]:
|
|
return {
|
|
**await super().get_feature_flags(),
|
|
**{
|
|
"mic": True,
|
|
"smartDetect": [
|
|
"person",
|
|
"vehicle",
|
|
],
|
|
},
|
|
}
|
|
|
|
@classmethod
|
|
def label_to_object_type(cls, label: str) -> Optional[SmartDetectObjectType]:
|
|
if label == "person":
|
|
return SmartDetectObjectType.PERSON
|
|
elif label in {"vehicle", "car", "motorcycle", "bus"}:
|
|
return SmartDetectObjectType.VEHICLE
|
|
|
|
async def run(self) -> None:
|
|
has_connected = False
|
|
|
|
@backoff.on_predicate(backoff.expo, max_value=60, logger=self.logger)
|
|
async def mqtt_connect():
|
|
nonlocal has_connected
|
|
try:
|
|
async with Client(
|
|
self.args.mqtt_host,
|
|
port=self.args.mqtt_port,
|
|
username=self.args.mqtt_username,
|
|
password=self.args.mqtt_password,
|
|
) as client:
|
|
has_connected = True
|
|
self.logger.info(
|
|
f"Connected to {self.args.mqtt_host}:{self.args.mqtt_port}"
|
|
)
|
|
tasks = [
|
|
self.handle_detection_events(client),
|
|
self.handle_snapshot_events(client),
|
|
]
|
|
await client.subscribe(f"{self.args.mqtt_prefix}/#")
|
|
await asyncio.gather(*tasks)
|
|
except MqttError:
|
|
if not has_connected:
|
|
raise
|
|
|
|
await mqtt_connect()
|
|
|
|
async def handle_detection_events(self, client) -> None:
|
|
async with client.filtered_messages(
|
|
f"{self.args.mqtt_prefix}/events"
|
|
) as messages:
|
|
async for message in messages:
|
|
msg = message.payload.decode()
|
|
try:
|
|
frigate_msg = json.loads(message.payload.decode())
|
|
if not frigate_msg["after"]["camera"] == self.args.frigate_camera:
|
|
continue
|
|
|
|
label = frigate_msg["after"]["label"]
|
|
object_type = self.label_to_object_type(label)
|
|
if not object_type:
|
|
self.logger.warning(
|
|
f"Received unsupported detection label type: {label}"
|
|
)
|
|
|
|
if not self.event_id and frigate_msg["type"] == "new":
|
|
self.event_id = frigate_msg["after"]["id"]
|
|
self.event_label = label
|
|
self.event_snapshot_ready = asyncio.Event()
|
|
self.logger.info(
|
|
f"Starting {self.event_label} motion event"
|
|
f" (id: {self.event_id})"
|
|
)
|
|
await self.trigger_motion_start(object_type)
|
|
elif (
|
|
self.event_id == frigate_msg["after"]["id"]
|
|
and frigate_msg["type"] == "end"
|
|
):
|
|
# Wait for the best snapshot to be ready before
|
|
# ending the motion event
|
|
self.logger.info(f"Awaiting snapshot (id: {self.event_id})")
|
|
await self.event_snapshot_ready.wait()
|
|
self.logger.info(
|
|
f"Ending {self.event_label} motion event"
|
|
f" (id: {self.event_id})"
|
|
)
|
|
await self.trigger_motion_stop()
|
|
self.event_id = None
|
|
self.event_label = None
|
|
except json.JSONDecodeError:
|
|
self.logger.exception(f"Could not decode payload: {msg}")
|
|
|
|
async def handle_snapshot_events(self, client) -> None:
|
|
topic_fmt = f"{self.args.mqtt_prefix}/{self.args.frigate_camera}/{{}}/snapshot"
|
|
async with client.filtered_messages(topic_fmt.format("+")) as messages:
|
|
async for message in messages:
|
|
if (
|
|
self.event_id
|
|
and not message.retain
|
|
and message.topic == topic_fmt.format(self.event_label)
|
|
):
|
|
f = tempfile.NamedTemporaryFile()
|
|
f.write(message.payload)
|
|
self.logger.debug(
|
|
f"Updating snapshot for {self.event_label} with {f.name}"
|
|
)
|
|
self.update_motion_snapshot(Path(f.name))
|
|
self.event_snapshot_ready.set()
|
|
else:
|
|
self.logger.debug(
|
|
f"Discarding snapshot message ({len(message.payload)})"
|
|
)
|