This repository has been archived on 2024-03-03. You can view files and clone it, but cannot push or open issues or pull requests.
unifi-cam-proxy/unifi/cams/base.py

964 lines
35 KiB
Python
Raw Permalink Normal View History

2024-03-02 11:37:04 +00:00
import argparse
import atexit
import json
import logging
import shutil
import ssl
import subprocess
import sys
import tempfile
import time
import urllib
from abc import ABCMeta, abstractmethod
from enum import Enum
from pathlib import Path
from typing import Any, Optional
import aiohttp
import packaging
import websockets
from unifi.core import RetryableError
AVClientRequest = AVClientResponse = dict[str, Any]
class SmartDetectObjectType(Enum):
PERSON = "person"
VEHICLE = "vehicle"
class UnifiCamBase(metaclass=ABCMeta):
def __init__(self, args: argparse.Namespace, logger: logging.Logger) -> None:
self.args = args
self.logger = logger
self._msg_id: int = 0
self._init_time: float = time.time()
self._streams: dict[str, str] = {}
self._motion_snapshot: Optional[Path] = None
self._motion_event_id: int = 0
self._motion_event_ts: Optional[float] = None
self._motion_object_type: Optional[SmartDetectObjectType] = None
self._ffmpeg_handles: dict[str, subprocess.Popen] = {}
# Set up ssl context for requests
self._ssl_context = ssl.create_default_context()
self._ssl_context.check_hostname = False
self._ssl_context.verify_mode = ssl.CERT_NONE
self._ssl_context.load_cert_chain(args.cert, args.cert)
self._session: Optional[websockets.legacy.client.WebSocketClientProtocol] = None
atexit.register(self.close_streams)
self._needs_flv_timestamps: bool = False
@classmethod
def add_parser(cls, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--ffmpeg-args",
"-f",
default="-c:v copy -ar 32000 -ac 1 -codec:a aac -b:a 32k",
help="Transcoding args for `ffmpeg -i <src> <args> <dst>`",
)
parser.add_argument(
"--rtsp-transport",
default="tcp",
choices=["tcp", "udp", "http", "udp_multicast"],
help="RTSP transport protocol used by stream",
)
async def _run(self, ws) -> None:
self._session = ws
await self.init_adoption()
while True:
try:
msg = await ws.recv()
except websockets.exceptions.ConnectionClosedError:
self.logger.info(f"Connection to {self.args.host} was closed.")
raise RetryableError()
if msg is not None:
force_reconnect = await self.process(msg)
if force_reconnect:
self.logger.info("Reconnecting...")
raise RetryableError()
async def run(self) -> None:
return
async def get_video_settings(self) -> dict[str, Any]:
return {}
async def change_video_settings(self, options) -> None:
return
@abstractmethod
async def get_snapshot(self) -> Path:
raise NotImplementedError("You need to write this!")
@abstractmethod
async def get_stream_source(self, stream_index: str) -> str:
raise NotImplementedError("You need to write this!")
def get_extra_ffmpeg_args(self, stream_index: str = "") -> str:
return self.args.ffmpeg_args
async def get_feature_flags(self) -> dict[str, Any]:
return {
"mic": True,
"aec": [],
"videoMode": ["default"],
"motionDetect": ["enhanced"],
}
# API for subclasses
async def trigger_motion_start(
self, object_type: Optional[SmartDetectObjectType] = None
) -> None:
if not self._motion_event_ts:
payload: dict[str, Any] = {
"clockBestMonotonic": 0,
"clockBestWall": 0,
"clockMonotonic": int(self.get_uptime()),
"clockStream": int(self.get_uptime()),
"clockStreamRate": 1000,
"clockWall": int(round(time.time() * 1000)),
"edgeType": "start",
"eventId": self._motion_event_id,
"eventType": "motion",
"levels": {"0": 47},
"motionHeatmap": "",
"motionSnapshot": "",
}
if object_type:
payload.update(
{
"objectTypes": [object_type.value],
"edgeType": "enter",
"zonesStatus": {"0": 48},
"smartDetectSnapshot": "",
}
)
self.logger.info(
f"Triggering motion start (idx: {self._motion_event_id})"
+ f" for {object_type.value}"
if object_type
else ""
)
await self.send(
self.gen_response(
"EventSmartDetect" if object_type else "EventAnalytics",
payload=payload,
),
)
self._motion_event_ts = time.time()
self._motion_object_type = object_type
# Capture snapshot at beginning of motion event for thumbnail
motion_snapshot_path: str = tempfile.NamedTemporaryFile(delete=False).name
try:
shutil.copyfile(await self.get_snapshot(), motion_snapshot_path)
self.logger.debug(f"Captured motion snapshot to {motion_snapshot_path}")
self._motion_snapshot = Path(motion_snapshot_path)
except FileNotFoundError:
pass
async def trigger_motion_stop(self) -> None:
motion_start_ts = self._motion_event_ts
motion_object_type = self._motion_object_type
if motion_start_ts:
payload: dict[str, Any] = {
"clockBestMonotonic": int(self.get_uptime()),
"clockBestWall": int(round(motion_start_ts * 1000)),
"clockMonotonic": int(self.get_uptime()),
"clockStream": int(self.get_uptime()),
"clockStreamRate": 1000,
"clockWall": int(round(time.time() * 1000)),
"edgeType": "stop",
"eventId": self._motion_event_id,
"eventType": "motion",
"levels": {"0": 49},
"motionHeatmap": "heatmap.png",
"motionSnapshot": "motionsnap.jpg",
}
if motion_object_type:
payload.update(
{
"objectTypes": [motion_object_type.value],
"edgeType": "leave",
"zonesStatus": {"0": 48},
"smartDetectSnapshot": "motionsnap.jpg",
}
)
self.logger.info(
f"Triggering motion stop (idx: {self._motion_event_id})"
+ f" for {motion_object_type.value}"
if motion_object_type
else ""
)
await self.send(
self.gen_response(
"EventSmartDetect" if motion_object_type else "EventAnalytics",
payload=payload,
),
)
self._motion_event_id += 1
self._motion_event_ts = None
self._motion_object_type = None
def update_motion_snapshot(self, path: Path) -> None:
self._motion_snapshot = path
async def fetch_to_file(self, url: str, dst: Path) -> bool:
try:
async with aiohttp.request("GET", url) as resp:
if resp.status != 200:
self.logger.error(f"Error retrieving file {resp.status}")
return False
with dst.open("wb") as f:
f.write(await resp.read())
return True
except aiohttp.ClientError:
return False
# Protocol implementation
def gen_msg_id(self) -> int:
self._msg_id += 1
return self._msg_id
async def init_adoption(self) -> None:
self.logger.info(
f"Adopting with token [{self.args.token}] and mac [{self.args.mac}]"
)
await self.send(
self.gen_response(
"ubnt_avclient_hello",
payload={
"adoptionCode": self.args.token,
"connectionHost": self.args.host,
"connectionSecurePort": 7442,
"fwVersion": self.args.fw_version,
"hwrev": 19,
"idleTime": 191.96,
"ip": self.args.ip,
"mac": self.args.mac,
"model": self.args.model,
"name": self.args.name,
"protocolVersion": 67,
"rebootTimeoutSec": 30,
"semver": "v4.4.8",
"totalLoad": 0.5474,
"upgradeTimeoutSec": 150,
"uptime": int(self.get_uptime()),
"features": await self.get_feature_flags(),
},
),
)
async def process_hello(self, msg: AVClientRequest) -> None:
controller_version = packaging.version.parse(
msg["payload"].get("controllerVersion")
)
self._needs_flv_timestamps = controller_version >= packaging.version.parse(
"1.21.4"
)
async def process_param_agreement(self, msg: AVClientRequest) -> AVClientResponse:
return self.gen_response(
"ubnt_avclient_paramAgreement",
msg["messageId"],
{
"authToken": self.args.token,
"features": await self.get_feature_flags(),
},
)
async def process_upgrade(self, msg: AVClientRequest) -> None:
url = msg["payload"]["uri"]
headers = {"Range": "bytes=0-100"}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, ssl=False) as r:
# Parse the new version string from the upgrade binary
content = await r.content.readexactly(54)
version = ""
for i in range(0, 50):
b = content[4 + i]
if b != b"\x00":
version += chr(b)
self.logger.debug(f"Pretending to upgrade to: {version}")
self.args.fw_version = version
async def process_isp_settings(self, msg: AVClientRequest) -> AVClientResponse:
payload = {
"aeMode": "auto",
"aeTargetPercent": 50,
"aggressiveAntiFlicker": 0,
"brightness": 50,
"contrast": 50,
"criticalTmpOfProtect": 40,
"darkAreaCompensateLevel": 0,
"denoise": 50,
"enable3dnr": 1,
"enableMicroTmpProtect": 1,
"enablePauseMotion": 0,
"flip": 0,
"focusMode": "ztrig",
"focusPosition": 0,
"forceFilterIrSwitchEvents": 0,
"hue": 50,
"icrLightSensorNightThd": 0,
"icrSensitivity": 0,
"irLedLevel": 215,
"irLedMode": "auto",
"irOnStsBrightness": 0,
"irOnStsContrast": 0,
"irOnStsDenoise": 0,
"irOnStsHue": 0,
"irOnStsSaturation": 0,
"irOnStsSharpness": 0,
"irOnStsWdr": 0,
"irOnValBrightness": 50,
"irOnValContrast": 50,
"irOnValDenoise": 50,
"irOnValHue": 50,
"irOnValSaturation": 50,
"irOnValSharpness": 50,
"irOnValWdr": 1,
"mirror": 0,
"queryIrLedStatus": 0,
"saturation": 50,
"sharpness": 50,
"touchFocusX": 1001,
"touchFocusY": 1001,
"wdr": 1,
"zoomPosition": 0,
}
payload.update(await self.get_video_settings())
return self.gen_response(
"ResetIspSettings",
msg["messageId"],
payload,
)
async def process_video_settings(self, msg: AVClientRequest) -> AVClientResponse:
vid_dst = {
"video1": ["file:///dev/null"],
"video2": ["file:///dev/null"],
"video3": ["file:///dev/null"],
}
if msg["payload"] is not None and "video" in msg["payload"]:
for k, v in msg["payload"]["video"].items():
if v:
if "avSerializer" in v:
vid_dst[k] = v["avSerializer"]["destinations"]
if "/dev/null" in vid_dst[k]:
self.stop_video_stream(k)
elif "parameters" in v["avSerializer"]:
self._streams[k] = stream = v["avSerializer"]["parameters"][
"streamName"
]
try:
host, port = urllib.parse.urlparse(
v["avSerializer"]["destinations"][0]
).netloc.split(":")
await self.start_video_stream(
k, stream, destination=(host, int(port))
)
except ValueError:
pass
return self.gen_response(
"ChangeVideoSettings",
msg["messageId"],
{
"audio": {
"bitRate": 32000,
"channels": 1,
"description": "audio track",
"enableTemporalNoiseShaping": False,
"enabled": True,
"mode": 0,
"quality": 0,
"sampleRate": 11025,
"type": "aac",
"volume": 0,
},
"firmwarePath": "/lib/firmware/",
"video": {
"enableHrd": False,
"hdrMode": 0,
"lowDelay": False,
"videoMode": "default",
"mjpg": {
"avSerializer": {
"destinations": [
"file:///tmp/snap.jpeg",
"file:///tmp/snap_av.jpg",
],
"parameters": {
"audioId": 1000,
"enableTimestampsOverlapAvoidance": False,
"suppressAudio": True,
"suppressVideo": False,
"videoId": 1001,
},
"type": "mjpg",
},
"bitRateCbrAvg": 500000,
"bitRateVbrMax": 500000,
"bitRateVbrMin": None,
"description": "JPEG pictures",
"enabled": True,
"fps": 5,
"height": 720,
"isCbr": False,
"maxFps": 5,
"minClientAdaptiveBitRate": 0,
"minMotionAdaptiveBitRate": 0,
"nMultiplier": None,
"name": "mjpg",
"quality": 80,
"sourceId": 3,
"streamId": 8,
"streamOrdinal": 3,
"type": "mjpg",
"validBitrateRangeMax": 6000000,
"validBitrateRangeMin": 32000,
"width": 1280,
},
"video1": {
"M": 1,
"N": 30,
"avSerializer": {
"destinations": vid_dst["video1"],
"parameters": None
if "video1" not in self._streams
else {
"audioId": None,
"streamName": self._streams["video1"],
"suppressAudio": None,
"suppressVideo": None,
"videoId": None,
},
"type": "extendedFlv",
},
"bitRateCbrAvg": 1400000,
"bitRateVbrMax": 2800000,
"bitRateVbrMin": 48000,
"description": "Hi quality video track",
"enabled": True,
"fps": 15,
"gopModel": 0,
"height": 1080,
"horizontalFlip": False,
"isCbr": False,
"maxFps": 30,
"minClientAdaptiveBitRate": 0,
"minMotionAdaptiveBitRate": 0,
"nMultiplier": 6,
"name": "video1",
"sourceId": 0,
"streamId": 1,
"streamOrdinal": 0,
"type": "h264",
"validBitrateRangeMax": 2800000,
"validBitrateRangeMin": 32000,
"validFpsValues": [
1,
2,
3,
4,
5,
6,
8,
9,
10,
12,
15,
16,
18,
20,
24,
25,
30,
],
"verticalFlip": False,
"width": 1920,
},
"video2": {
"M": 1,
"N": 30,
"avSerializer": {
"destinations": vid_dst["video2"],
"parameters": None
if "video2" not in self._streams
else {
"audioId": None,
"streamName": self._streams["video2"],
"suppressAudio": None,
"suppressVideo": None,
"videoId": None,
},
"type": "extendedFlv",
},
"bitRateCbrAvg": 500000,
"bitRateVbrMax": 1200000,
"bitRateVbrMin": 48000,
"currentVbrBitrate": 1200000,
"description": "Medium quality video track",
"enabled": True,
"fps": 15,
"gopModel": 0,
"height": 720,
"horizontalFlip": False,
"isCbr": False,
"maxFps": 30,
"minClientAdaptiveBitRate": 0,
"minMotionAdaptiveBitRate": 0,
"nMultiplier": 6,
"name": "video2",
"sourceId": 1,
"streamId": 2,
"streamOrdinal": 1,
"type": "h264",
"validBitrateRangeMax": 1500000,
"validBitrateRangeMin": 32000,
"validFpsValues": [
1,
2,
3,
4,
5,
6,
8,
9,
10,
12,
15,
16,
18,
20,
24,
25,
30,
],
"verticalFlip": False,
"width": 1280,
},
"video3": {
"M": 1,
"N": 30,
"avSerializer": {
"destinations": vid_dst["video3"],
"parameters": None
if "video3" not in self._streams
else {
"audioId": None,
"streamName": self._streams["video3"],
"suppressAudio": None,
"suppressVideo": None,
"videoId": None,
},
"type": "extendedFlv",
},
"bitRateCbrAvg": 300000,
"bitRateVbrMax": 200000,
"bitRateVbrMin": 48000,
"currentVbrBitrate": 200000,
"description": "Low quality video track",
"enabled": True,
"fps": 15,
"gopModel": 0,
"height": 360,
"horizontalFlip": False,
"isCbr": False,
"maxFps": 30,
"minClientAdaptiveBitRate": 0,
"minMotionAdaptiveBitRate": 0,
"nMultiplier": 6,
"name": "video3",
"sourceId": 2,
"streamId": 4,
"streamOrdinal": 2,
"type": "h264",
"validBitrateRangeMax": 750000,
"validBitrateRangeMin": 32000,
"validFpsValues": [
1,
2,
3,
4,
5,
6,
8,
9,
10,
12,
15,
16,
18,
20,
24,
25,
30,
],
"verticalFlip": False,
"width": 640,
},
"vinFps": 30,
},
},
)
async def process_device_settings(self, msg: AVClientRequest) -> AVClientResponse:
return self.gen_response(
"ChangeDeviceSettings",
msg["messageId"],
{
"name": self.args.name,
"timezone": "PST8PDT,M3.2.0,M11.1.0",
},
)
async def process_osd_settings(self, msg: AVClientRequest) -> AVClientResponse:
return self.gen_response(
"ChangeOsdSettings",
msg["messageId"],
{
"_1": {
"enableDate": 1,
"enableLogo": 1,
"enableReportdStatsLevel": 0,
"enableStreamerStatsLevel": 0,
"tag": self.args.name,
},
"_2": {
"enableDate": 1,
"enableLogo": 1,
"enableReportdStatsLevel": 0,
"enableStreamerStatsLevel": 0,
"tag": self.args.name,
},
"_3": {
"enableDate": 1,
"enableLogo": 1,
"enableReportdStatsLevel": 0,
"enableStreamerStatsLevel": 0,
"tag": self.args.name,
},
"_4": {
"enableDate": 1,
"enableLogo": 1,
"enableReportdStatsLevel": 0,
"enableStreamerStatsLevel": 0,
"tag": self.args.name,
},
"enableOverlay": 1,
"logoScale": 50,
"overlayColorId": 0,
"textScale": 50,
"useCustomLogo": 0,
},
)
async def process_network_status(self, msg: AVClientRequest) -> AVClientResponse:
return self.gen_response(
"NetworkStatus",
msg["messageId"],
{
"connectionState": 2,
"connectionStateDescription": "CONNECTED",
"defaultInterface": "eth0",
"dhcpLeasetime": 86400,
"dnsServer": "8.8.8.8 4.2.2.2",
"gateway": "192.168.103.1",
"ipAddress": self.args.ip,
"linkDuplex": 1,
"linkSpeedMbps": 100,
"mode": "dhcp",
"networkMask": "255.255.255.0",
},
)
async def process_sound_led_settings(
self, msg: AVClientRequest
) -> AVClientResponse:
return self.gen_response(
"ChangeSoundLedSettings",
msg["messageId"],
{
"ledFaceAlwaysOnWhenManaged": 1,
"ledFaceEnabled": 1,
"speakerEnabled": 1,
"speakerVolume": 100,
"systemSoundsEnabled": 1,
"userLedBlinkPeriodMs": 0,
"userLedColorFg": "blue",
"userLedOnNoff": 1,
},
)
async def process_change_isp_settings(
self, msg: AVClientRequest
) -> AVClientResponse:
payload = {
"aeMode": "auto",
"aeTargetPercent": 50,
"aggressiveAntiFlicker": 0,
"brightness": 50,
"contrast": 50,
"criticalTmpOfProtect": 40,
"dZoomCenterX": 50,
"dZoomCenterY": 50,
"dZoomScale": 0,
"dZoomStreamId": 4,
"darkAreaCompensateLevel": 0,
"denoise": 50,
"enable3dnr": 1,
"enableExternalIr": 0,
"enableMicroTmpProtect": 1,
"enablePauseMotion": 0,
"flip": 0,
"focusMode": "ztrig",
"focusPosition": 0,
"forceFilterIrSwitchEvents": 0,
"hue": 50,
"icrLightSensorNightThd": 0,
"icrSensitivity": 0,
"irLedLevel": 215,
"irLedMode": "auto",
"irOnStsBrightness": 0,
"irOnStsContrast": 0,
"irOnStsDenoise": 0,
"irOnStsHue": 0,
"irOnStsSaturation": 0,
"irOnStsSharpness": 0,
"irOnStsWdr": 0,
"irOnValBrightness": 50,
"irOnValContrast": 50,
"irOnValDenoise": 50,
"irOnValHue": 50,
"irOnValSaturation": 50,
"irOnValSharpness": 50,
"irOnValWdr": 1,
"lensDistortionCorrection": 1,
"masks": None,
"mirror": 0,
"queryIrLedStatus": 0,
"saturation": 50,
"sharpness": 50,
"touchFocusX": 1001,
"touchFocusY": 1001,
"wdr": 1,
"zoomPosition": 0,
}
if msg["payload"]:
await self.change_video_settings(msg["payload"])
payload.update(await self.get_video_settings())
return self.gen_response("ChangeIspSettings", msg["messageId"], payload)
async def process_analytics_settings(
self, msg: AVClientRequest
) -> AVClientResponse:
return self.gen_response(
"ChangeAnalyticsSettings", msg["messageId"], msg["payload"]
)
async def process_snapshot_request(
self, msg: AVClientRequest
) -> Optional[AVClientResponse]:
snapshot_type = msg["payload"]["what"]
if snapshot_type in ["motionSnapshot", "smartDetectZoneSnapshot"]:
path = self._motion_snapshot
else:
path = await self.get_snapshot()
if path and path.exists():
async with aiohttp.ClientSession() as session:
files = {"payload": open(path, "rb")}
files.update(msg["payload"].get("formFields", {}))
try:
await session.post(
msg["payload"]["uri"],
data=files,
ssl=self._ssl_context,
)
self.logger.debug(f"Uploaded {snapshot_type} from {path}")
except aiohttp.ClientError:
self.logger.exception("Failed to upload snapshot")
else:
self.logger.warning(
f"Snapshot file {path} is not ready yet, skipping upload"
)
if msg["responseExpected"]:
return self.gen_response("GetRequest", response_to=msg["messageId"])
async def process_time(self, msg: AVClientRequest) -> AVClientResponse:
return self.gen_response(
"ubnt_avclient_paramAgreement",
msg["messageId"],
{
"monotonicMs": self.get_uptime(),
"wallMs": int(round(time.time() * 1000)),
"features": {},
},
)
def gen_response(
self, name: str, response_to: int = 0, payload: Optional[dict[str, Any]] = None
) -> AVClientResponse:
if not payload:
payload = {}
return {
"from": "ubnt_avclient",
"functionName": name,
"inResponseTo": response_to,
"messageId": self.gen_msg_id(),
"payload": payload,
"responseExpected": False,
"to": "UniFiVideo",
}
def get_uptime(self) -> float:
return time.time() - self._init_time
async def send(self, msg: AVClientRequest) -> None:
self.logger.debug(f"Sending: {msg}")
ws = self._session
if ws:
await ws.send(json.dumps(msg).encode())
async def process(self, msg: bytes) -> bool:
m = json.loads(msg)
fn = m["functionName"]
self.logger.info(f"Processing [{fn}] message")
self.logger.debug(f"Message contents: {m}")
if (("responseExpected" not in m) or (m["responseExpected"] is False)) and (
fn
not in [
"GetRequest",
"ChangeVideoSettings",
"UpdateFirmwareRequest",
"Reboot",
"ubnt_avclient_hello",
]
):
return False
res: Optional[AVClientResponse] = None
if fn == "ubnt_avclient_time":
res = await self.process_time(m)
elif fn == "ubnt_avclient_hello":
await self.process_hello(m)
elif fn == "ubnt_avclient_paramAgreement":
res = await self.process_param_agreement(m)
elif fn == "ResetIspSettings":
res = await self.process_isp_settings(m)
elif fn == "ChangeVideoSettings":
res = await self.process_video_settings(m)
elif fn == "ChangeDeviceSettings":
res = await self.process_device_settings(m)
elif fn == "ChangeOsdSettings":
res = await self.process_osd_settings(m)
elif fn == "NetworkStatus":
res = await self.process_network_status(m)
elif fn == "AnalyticsTest":
res = self.gen_response("AnalyticsTest", response_to=m["messageId"])
elif fn == "ChangeSoundLedSettings":
res = await self.process_sound_led_settings(m)
elif fn == "ChangeIspSettings":
res = await self.process_change_isp_settings(m)
elif fn == "ChangeAnalyticsSettings":
res = await self.process_analytics_settings(m)
elif fn == "GetRequest":
res = await self.process_snapshot_request(m)
elif fn == "UpdateUsernamePassword":
res = self.gen_response(
"UpdateUsernamePassword", response_to=m["messageId"]
)
elif fn == "ChangeSmartDetectSettings":
res = self.gen_response(
"ChangeSmartDetectSettings", response_to=m["messageId"]
)
elif fn == "UpdateFirmwareRequest":
await self.process_upgrade(m)
return True
elif fn == "Reboot":
return True
if res is not None:
await self.send(res)
return False
def get_base_ffmpeg_args(self, stream_index: str = "") -> str:
base_args = [
2024-03-02 14:42:36 +00:00
"-avoid_negative_ts",
"make_zero",
2024-03-02 16:39:33 +00:00
"-fflags",
"+genpts+discardcorrupt",
"-use_wallclock_as_timestamps 1",
2024-03-02 11:37:04 +00:00
]
2024-03-02 16:39:33 +00:00
try:
output = subprocess.check_output(["ffmpeg", "-h", "full"])
if b"stimeout" in output:
base_args.append("-stimeout 15000000")
else:
base_args.append("-timeout 15000000")
except subprocess.CalledProcessError:
self.logger.exception("Could not check for ffmpeg options")
2024-03-02 11:37:04 +00:00
return " ".join(base_args)
async def start_video_stream(
self, stream_index: str, stream_name: str, destination: tuple[str, int]
):
has_spawned = stream_index in self._ffmpeg_handles
is_dead = has_spawned and self._ffmpeg_handles[stream_index].poll() is not None
if not has_spawned or is_dead:
source = await self.get_stream_source(stream_index)
cmd = (
2024-03-02 16:39:33 +00:00
"ffmpeg -nostdin -loglevel error -y"
f" {self.get_base_ffmpeg_args(stream_index)} -rtsp_transport"
f' {self.args.rtsp_transport} -i "{source}"'
f" {self.get_extra_ffmpeg_args(stream_index)} -metadata"
f" streamName={stream_name} -f flv - | {sys.executable} -m"
" unifi.clock_sync"
f" {'--write-timestamps' if self._needs_flv_timestamps else ''} | nc"
f" {destination[0]} {destination[1]}"
2024-03-02 11:37:04 +00:00
)
if is_dead:
self.logger.warn(f"Previous ffmpeg process for {stream_index} died.")
self.logger.info(
f"Spawning ffmpeg for {stream_index} ({stream_name}): {cmd}"
)
self._ffmpeg_handles[stream_index] = subprocess.Popen(
2024-03-02 16:39:33 +00:00
cmd, stdout=subprocess.DEVNULL, shell=True
2024-03-02 11:37:04 +00:00
)
def stop_video_stream(self, stream_index: str):
if stream_index in self._ffmpeg_handles:
self.logger.info(f"Stopping stream {stream_index}")
self._ffmpeg_handles[stream_index].kill()
async def close(self):
self.logger.info("Cleaning up instance")
await self.trigger_motion_stop()
self.close_streams()
def close_streams(self):
for stream in self._ffmpeg_handles:
self.stop_video_stream(stream)