from __future__ import annotations
from dataclasses import dataclass, field
import logging
import shutil
import tempfile
from typing import Any, Dict, List, Optional, Set, Union
from pathlib import Path
from ..lib import SequenceInfo, utils
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
SUPPORTED_CODECS = ["ProRes422-HQ", "ProRes4444-XQ", "DNxHR-SQ"]
[docs]
@dataclass
class Codec:
"""Utility class for abstracting ffmpeg codec arguments.
Important:
Currently this only supports 2 flavors of ProRes and 1 of DNxHR but
could deserve more. Supported codecs are:
``ProRes422-HQ``, ``ProRes4444-XQ``, ``DNxHR-SQ``
Attributes:
name (str): The name of the codec.
"""
name: str = field(default_factory=str, init=True, repr=True)
def __post_init__(self) -> None:
if self.name not in SUPPORTED_CODECS:
raise ValueError(
f"{self.name} is not found in supported "
f"codecs.\n{SUPPORTED_CODECS = }"
)
[docs]
def get_ffmpeg_args(self) -> List[str]:
"""Get the ffmpeg arguments for the codec.
TODO:
* treating arguments more generally - similarly as the AYON server
settings of ``Extract Review``.
Returns:
List[str]: The ffmpeg arguments.
"""
args = []
# fmt: off
if self.name == "ProRes422-HQ":
args = [
"-vcodec", "prores_ks",
"-profile:v", "3",
"-vendor", "apl0",
"-pix_fmt", "yuv422p10le",
"-vtag", "apch",
]
if self.name == "ProRes4444-XQ":
args = [
"-vcodec", "prores_ks",
"-profile:v", "4",
"-vendor", "apl0",
"-pix_fmt", "yuva444p10le",
"-vtag", "ap4h",
]
if self.name == "DNxHR-SQ":
args = [
"-vcodec", "dnxhd",
"-profile:v", "2",
"-pix_fmt", "yuv422p",
]
# fmt: on
return args
[docs]
@dataclass
class Burnin:
"""Utility class for handling burnins with OIIO.
.. admonition:: Example ``data`` attribute structure
.. code-block:: json
{
"text": "YOUR_TEXT_HERE",
"position": [
"top_left"
"top_center"
"top_right"
"bottom_left"
"bottom_center"
"bottom_right"
],
},
Attributes:
data (Dict[str, str]): The text to be drawn and its positioning.
size (int): The size of the text.
padding (int): The padding around the text.
color (Set[float]): The color of the text.
font (Optional[str]): The font to use.
outline (Optional[int]): The outline size.
"""
data: Dict[str, str] = field(default_factory=dict)
size: int = field(default=64)
padding: int = field(default=30)
color: Set[float] = field(default=(1, 1, 1))
font: Optional[str] = field(default=None)
outline: Optional[int] = field(default=None)
def __post_init__(self) -> None:
if not self.data:
raise ValueError("Burnin data is empty")
if self.font:
self._font = Path(self.font).resolve()
[docs]
class BasicRenderer:
"""Basic renderer for image sequences.
Note:
Also supports single image rendering but needs to support more formats.
PSDs would be really nice to have.
.. admonition:: Example
.. code-block:: python
# render sequence into a ProRes with a basic reformat to 1080p
rend = BasicRenderer(
processor=OIIORepositionProcessor(
dst_width=1920,
dst_height=1080,
fit="letterbox",
),
source_sequence=SequenceInfo.scan("resources/public/plateMain/v002")[0],
output_dir="test_results/reformat_1080p/letterbox",
codec="ProRes422-HQ",
fps=25,
keep_only_container=False,
)
rend.render(debug=True)
Attributes:
source_sequence (SequenceInfo): The source sequence to render.
output_dir (Union[Path, str]): The output directory.
name (str): The name of the output file.
threads (int): The number of threads to use for rendering.
keep_only_container (bool): Keep only the container file.
"""
name: str = "lablib.mov"
# rendering options
threads: int = 4
# cleanup
keep_only_container: bool = False
def __init__(
self,
source_sequence: SequenceInfo,
output_dir: Union[Path, str],
**kwargs,
) -> None:
self.source_sequence = source_sequence
self.output_dir = Path(output_dir).resolve()
for k, v in kwargs.items():
if not hasattr(self.__class__, k):
raise ValueError(f"Unknown attribute {k}")
setattr(self, k, v)
if not kwargs.get("staging_dir"):
self._staging_dir = Path(tempfile.mkdtemp())
def __repr__(self) -> str:
exposed_props = ["source_sequence", "output_dir"]
props = ""
for prop in exposed_props:
props = props + f"{prop}={getattr(self, prop)}, "
optional_props = ["codec", "audio", "processor"]
for prop in optional_props:
if hasattr(self, prop):
props = props + f"{prop}={getattr(self, prop)}, "
return f"{self.__class__.__name__}({props[:-2]})"
[docs]
def get_ffmpeg_cmd(self) -> List[str]:
"""Get arguments for rendering with ffmpeg.
Returns:
List[str]: The ffmpeg arguments.
"""
cmd = ["ffmpeg", "-loglevel", "info", "-hide_banner"]
# common args
common_args = [
"-y",
"-xerror",
"-start_number",
str(self.source_sequence.start_frame),
"-r",
str(self.fps),
"-thread_queue_size",
"4096",
"-framerate",
str(self.fps),
]
cmd.extend(common_args)
# input args
input_path = Path(
self.source_sequence.path, self.source_sequence.format_string
).resolve()
if hasattr(self, "_oiio_out"):
si = SequenceInfo.scan(self._oiio_out.parent)[0]
input_path = Path(si.path, si.format_string).resolve()
input_args = ["-i", input_path.as_posix()]
if self.audio:
audio_path: str = Path(self.audio).resolve().as_posix()
input_args.extend(["-i", audio_path])
audio_args = ["-map", "0:v", "-map", "1:a"]
input_args.extend(audio_args)
cmd.extend(input_args)
# timecode args
timecode = min(self.source_sequence.frames).timecode
cmd.extend(["-timecode", timecode])
# codec args
if self.codec:
codec_args = Codec(name=self.codec).get_ffmpeg_args()
cmd.extend(codec_args)
# output args
# NOTE: ffmpegs output arg needs to be the last one
output_path = Path(self._staging_dir, self.name)
output_args = [output_path.as_posix()]
cmd.extend(output_args)
return cmd
[docs]
def render(self, debug=False) -> None:
"""Render the sequence with the given options.
Important:
This always tries to render into a local temporary EXR sequence
first and then converts it to the desired codec. These will then
be attempted to be copied to the output directory.
In any case, the temporary directory will be cleaned up afterwards.
Hint:
If you're only interested in the video file you can set
``BasicRenderer(*args, keep_only_container=True)``.
Arguments:
debug (Optional[bool]): Whether to increase log verbosity.
"""
# run oiiotool command
cmd = self.get_oiiotool_cmd(debug)
log.info("oiiotool cmd >>> {}".format(" ".join(cmd)))
oiio_out, oiio_err = utils.call_cmd(cmd)
if debug:
for line in oiio_out.splitlines():
log.info(f"oiio out: {line}")
for line in oiio_err.splitlines():
log.info(f"oiio err: {line}")
# run ffmpeg command
if self.codec:
ffmpeg_cmd = self.get_ffmpeg_cmd()
log.info("ffmpeg cmd >>> {}".format(" ".join(ffmpeg_cmd)))
# NOTE: ffmpeg only outputs to stderr
_, ffmpeg_err = utils.call_cmd(ffmpeg_cmd)
if debug:
for line in ffmpeg_err.splitlines():
log.info(f"ffmpeg out: {line}")
# copy renders to output directory
if not self.output_dir.exists():
self.output_dir.mkdir(parents=True)
for item in self._staging_dir.iterdir():
if item.is_file():
if item.suffix in [".exr"] and self.keep_only_container:
continue
if item.suffix not in [".mov", ".mp4", ".exr"]:
continue
log.info(f"Copying {item} to {self.output_dir}")
shutil.copy2(item, self.output_dir)
# cleanup
shutil.rmtree(self._staging_dir)
@property
def processor(self) -> Any:
""":obj:`Any`: The processor to use for rendering."""
if not hasattr(self, "_processor"):
return None
return self._processor
@processor.setter
def processor(self, value: Any) -> None:
self._processor = value
@property
def codec(self) -> str:
""":obj:`Codec`: The codec to use.
Attention:
Please check the supported codecs.
The passed ``str`` will be looked up against them.
"""
if not hasattr(self, "_codec"):
return None
return self._codec.name
@codec.setter
def codec(self, value: str) -> None:
self._codec = Codec(name=value)
@property
def fps(self) -> int:
""":obj:`int`: The frames per second to use.
TODO:
* should be a float. But currently only 24 and 25 are tested.
* testing with 23.976 and 29.97 would be nice.
"""
if not hasattr(self, "_fps"):
return min(self.source_sequence.frames).fps
return self._fps
@fps.setter
def fps(self, value: int) -> None:
self._fps = value
@property
def audio(self) -> str:
""":obj:`str`: The path to an audio file to be used.
The passed string will be resolved to an absolute path object.
"""
if not hasattr(self, "_audio"):
return None
return self._audio.as_posix()
@audio.setter
def audio(self, value: str) -> None:
self._audio = Path(value).resolve()
@property
def burnins(self) -> Burnin:
""":obj:`Burnin`: The burnins to use.
Attention:
Please check the Burnin class for formatting.
Currently you can only pass in a dict formatted accordingly.
TODO:
* Should also accept passing a ``Burnin`` directly.
"""
if not hasattr(self, "_burnins"):
return None
return self._burnins
@burnins.setter
def burnins(self, values: dict) -> None:
self._burnins = Burnin(**values)