"""
Python-VHS is a tiny python wrapper around VHS_,
a tool by charm_ that renders terminal commands into GIFs.
This package searches for VHS and its dependencies
in system's ``PATH``, and invokes them.
On Linux, if VHS is not found in the system,
Python-VHS can download necessary binaries from GitHub.
.. _VHS: https://github.com/charmbracelet/vhs
.. _charm: https://charm.sh/
Quickstart
----------
Install VHS:
.. code-block:: sh
pip3 install vhs
Then resolve VHS binary and run it:
.. code-block:: python
import vhs
vhs_runner = vhs.resolve()
vhs_runner.run("./example.tape", "./example.gif")
Reference
---------
The entry point of the package is the :func:`resolve` function.
It searches for an installed VHS, checks its version, downloads
a new one if necessary, and returns a :class:`Vhs` object
through which you can invoke the found VHS binary:
.. autofunction:: resolve
.. autofunction:: default_cache_path
.. autoclass:: Vhs
:members:
In case of an error, VHS raises a :class:`VhsError` or its subclass:
.. autoclass:: VhsError
.. autoclass:: VhsRunError
By default, the :func:`resolve` function silently detects or installs VHS,
without printing anything (it may emit warning log messages
to the ``"vhs"`` logger).
You can display installation progress by passing a :class:`ProgressReporter`.
Specifically, there's :class:`DefaultProgressReporter` which will cover
most basic cases:
.. autoclass:: ProgressReporter
:members:
.. autoclass:: DefaultProgressReporter
"""
import datetime
import logging
import os
import pathlib
import platform
import re
import shutil
import signal
import stat
import subprocess
import sys
import tempfile
import typing as _t
import github
import requests
import requests.adapters
import urllib3
_logger = logging.getLogger("vhs")
try:
from vhs._version import __version__, __version_tuple__
except ImportError:
raise ImportError(
"vhs._version not found. if you are developing locally, "
"run `pip install -e .[test,doc]` to generate it"
)
__all__ = [
"VhsError",
"VhsRunError",
"Vhs",
"ProgressReporter",
"DefaultProgressReporter",
"resolve",
]
_PathLike = _t.Union[str, os.PathLike]
[docs]class VhsError(Exception):
"""
Raised when VHS is unavailable, or when installation fails.
"""
[docs]class VhsRunError(VhsError, subprocess.CalledProcessError):
"""
Raised when VHS process fails.
"""
def __str__(self):
if self.returncode and self.returncode < 0:
try:
returncode = f"signal {signal.Signals(-self.returncode)}"
except ValueError:
returncode = f"unknown signal {self.returncode}"
else:
returncode = f"code {self.returncode}"
msg = f"VHS run failed with {returncode}"
stderr = self.stderr
if self.stderr:
if isinstance(stderr, bytes):
stderr = stderr.decode("utf-8", errors="replace")
msg += f"\n\nStderr:\n{stderr}"
stdout = self.stdout
if self.stdout:
if isinstance(stdout, bytes):
stdout = stdout.decode("utf-8", errors="replace")
msg += f"\n\nStdout:\n{stdout}"
return msg
[docs]@_t.final
class Vhs:
"""
Interface for a VHS installation.
Do not create directly, use :func:`resolve` instead.
"""
def __init__(
self,
*,
_vhs_path: pathlib.Path,
_path: str,
_quiet: bool = True,
_env: _t.Optional[_t.Dict[str, str]] = None,
_cwd: _t.Optional[_PathLike] = None,
):
self._vhs_path = _vhs_path
self._path = _path
self._quiet = _quiet
self._env = _env
self._cwd = _cwd
[docs] def run(
self,
input_path: _PathLike,
output_path: _t.Optional[_PathLike] = None,
*,
quiet: _t.Optional[bool] = True,
env: _t.Optional[_t.Dict[str, str]] = None,
cwd: _t.Optional[_PathLike] = None,
):
"""
Renter the given VHS file.
:param input_path:
path to a tape file.
:param output_path:
path to the output file.
By default, puts output to whichever path is set in the tape.
:param quiet:
redefine `quiet` for this invocation. (see :func:`resolve`).
:param env:
redefine `env` for this invocation. (see :func:`resolve`).
:param cwd:
redefine `cmd` for this invocation. (see :func:`resolve`).
:raises VhsRunError: VHS process failed with non-zero return code.
"""
if quiet is None:
quiet = self._quiet
if env is None:
env = self._env
if env is None:
env = os.environ.copy()
else:
env = env.copy()
env["PATH"] = self._path
if cwd is None:
cwd = self._cwd
args: _t.List[_t.Union[str, _PathLike]] = [self._vhs_path]
capture_output = False
if quiet:
args += ["-q"]
capture_output = True
if output_path:
args += ["-o", output_path]
args += [input_path]
try:
_logger.debug("running VHS with args %r", args)
subprocess.run(
args,
capture_output=capture_output,
env=env,
cwd=cwd,
check=True,
)
except subprocess.CalledProcessError as e:
raise VhsRunError(
e.returncode,
e.cmd,
e.output,
e.stderr,
) from None
[docs] def run_inline(
self,
input_text: str,
output_path: _t.Optional[_PathLike] = None,
*,
quiet: _t.Optional[bool] = True,
env: _t.Optional[_t.Dict[str, str]] = None,
cwd: _t.Optional[_PathLike] = None,
):
"""
Like :meth:`~Vhs.run`, but accepts tape contents rather than a file.
:param input_text:
contents of a tape.
:param output_path:
path to the output file.
By default, puts output to whichever path is set in the tape.
:param quiet:
redefine `quiet` for this invocation (see :func:`resolve`).
:param env:
redefine `env` for this invocation (see :func:`resolve`).
:param cwd:
redefine `cmd` for this invocation (see :func:`resolve`).
:raises VhsRunError: VHS process failed with non-zero return code.
"""
with tempfile.TemporaryDirectory() as d:
tmp_file = pathlib.Path(d) / "input.tape"
tmp_file.write_text(input_text)
self.run(
input_path=tmp_file,
output_path=output_path,
quiet=quiet,
env=env,
cwd=cwd,
)
[docs]class ProgressReporter:
"""
Interface for reporting installation progress.
"""
[docs] def start(self):
"""
Called when installation starts.
"""
[docs] def progress(self, desc: str, dl_size: int, total_size: int, speed: float, /):
"""
Called to update current progress.
:param desc:
description of the currently performed operation.
:param dl_size:
when the installer downloads files, this number indicates
number of bytes downloaded so far. Otherwise, it is set to zero.
:param total_size:
when the installer downloads files, this number indicates
total number of bytes to download. Otherwise, it is set to zero.
:param speed:
when the installer downloads files, this number indicates
current downloading speed, in bytes per second. Otherwise,
it is set to zero.
"""
[docs] def finish(self, exc_type, exc_val, exc_tb):
"""
Called when installation finishes.
"""
[docs]class DefaultProgressReporter(ProgressReporter):
"""
Default reporter that prints progress to stderr.
"""
_prev_len = 0
def __init__(self, stream: _t.Optional[_t.TextIO] = None):
self.stream = stream or sys.stderr
def progress(self, desc: str, dl_size: int, total_size: int, speed: float, /):
desc = self.format_desc(desc)
if total_size:
desc += self.format_progress(dl_size, total_size, speed)
self.write(desc.ljust(self._prev_len) + "\r")
self._prev_len = len(desc)
def finish(self, exc_type, exc_val, exc_tb):
if exc_val:
self.progress(f"vhs installation failed: {exc_val}", 0, 0, 0)
self.write("\n")
elif self._prev_len > 0:
self.progress(f"vhs installed", 0, 0, 0)
self.write("\n")
def format_desc(self, desc: str) -> str:
return desc
def format_progress(self, dl_size: int, total_size: int, speed: float) -> str:
dl_size_mb = dl_size / 1024**2
total_size_mb = total_size / 1024**2
speed_mb = speed / 1024**2
return f": {dl_size_mb:.1f}/{total_size_mb:.1f}MB - {speed_mb:.2f}MB/s"
def write(self, msg: str):
self.stream.write(msg)
self.stream.flush()
[docs]def resolve(
*,
cache_path: _t.Optional[_PathLike] = None,
min_version: str = "0.5.0",
quiet: bool = True,
env: _t.Optional[_t.Dict[str, str]] = None,
cwd: _t.Optional[_PathLike] = None,
install: bool = True,
reporter: ProgressReporter = ProgressReporter(),
timeout: int = 15,
retry: _t.Optional[urllib3.Retry] = None,
) -> "Vhs":
"""
Find a system VHS installation or download VHS from GitHub.
If VHS is not installed, or it's outdated, try to download it
and install it into `cache_path`.
Automatic download only works on 64-bit Linux.
MacOS users will be presented with an instruction to use `brew`,
and other systems users will get a link to VHS installation guide.
:param cache_path:
path where VHS binaries should be downloaded to.
:param min_version:
minimal VHS version required.
:param quiet:
if true (default), any output from the VHS binary is hidden.
:param env:
overrides environment variables for the VHS process.
:param cwd:
overrides current working directory for the VHS process.
:param install:
if false, disables installing VHS from GitHub.
:param reporter:
a hook that will be called to inform user about installation
progress. See :class:`ProgressReporter` for API documentation,
and :class:`DefaultProgressReporter` for an example.
:param timeout:
timeout in seconds for connecting to GitHub APIs.
:param retry:
retry policy for reading from GitHub and downloading releases.
The default retry polity uses exponential backoff
to avoid rate limiting.
:return:
resolved VHS installation.
:raises VhsError:
VHS not available or installation failed.
"""
if cache_path is None:
cache_path = default_cache_path()
else:
cache_path = pathlib.Path(cache_path)
_logger.debug("using vhs cache path: %s", cache_path)
if retry is None:
retry = urllib3.Retry(10, backoff_factor=0.1)
reporter.start()
try:
vhs_path, path = _check_and_install(
min_version, cache_path, _get_path(env), install, reporter, timeout, retry
)
finally:
reporter.finish(*sys.exc_info())
return Vhs(
_vhs_path=vhs_path,
_path=path,
_quiet=quiet,
_env=env,
_cwd=cwd,
)
[docs]def default_cache_path() -> pathlib.Path:
"""
Return default path where VHS binaries should be downloaded to.
Currently it is equal to ``pathlib.Path(tempfile.gettempdir()) / "python_vhs_cache"``.
"""
return pathlib.Path(tempfile.gettempdir()) / "python_vhs_cache"
def _get_path(env: _t.Optional[_t.Dict[str, str]]) -> str:
path = (env or {}).get("PATH", None)
if path is None:
path = os.environ.get("PATH", None)
if path is None:
try:
path = os.confstr("CS_PATH") # type: ignore
except (AttributeError, ValueError):
pass
if path is None:
path = os.defpath
if path is None:
path = ""
return path
def _download_latest_release(
api: github.Github,
timeout: int,
retry: urllib3.Retry,
name: str,
repo_name: str,
dest: pathlib.Path,
filter: _t.Callable[[str], bool],
reporter: ProgressReporter,
):
reporter.progress(f"resolving {name}", 0, 0, 0)
repo = api.get_repo(repo_name)
for release in repo.get_releases():
if release.draft or release.prerelease:
continue
_logger.debug("found %s release %s", name, release.tag_name)
for asset in release.assets:
_logger.debug("trying %s asset %s", name, asset.name)
if filter(asset.name):
_logger.debug("found %s asset %s", name, asset.name)
basename = asset.name
browser_download_url = asset.browser_download_url
break
else:
raise VhsError(f"unable to find {name} release for platform {sys.platform}")
break
else:
raise VhsError(f"unable to find latest {name} release")
_logger.debug("downloading %s from %s", name, browser_download_url)
with requests.Session() as session:
adapter = requests.adapters.HTTPAdapter(max_retries=retry)
session.mount("https://", adapter)
session.mount("http://", adapter)
with requests.get(browser_download_url, stream=True, timeout=timeout) as stream:
stream.raise_for_status()
try:
size = int(stream.headers["content-length"])
except (KeyError, ValueError):
size = -1
downloaded = 0
reporter.progress(f"downloading {name}", downloaded, size, 0)
start = datetime.datetime.now()
with open(dest / basename, "wb") as dest_file:
for chunk in stream.iter_content(64 * 1024):
dest_file.write(chunk)
if size:
# note: this does not take content-encoding into account.
# our contents are not encoded, though, so this is fine.
downloaded += len(chunk)
speed = (
downloaded
/ (datetime.datetime.now() - start).total_seconds()
)
reporter.progress(
f"downloading {name}", downloaded, size, speed
)
return dest / basename
def _install_vhs(
api: github.Github,
timeout: int,
retry: urllib3.Retry,
bin_path: pathlib.Path,
reporter: ProgressReporter,
):
filter = lambda name: name.endswith("Linux_x86_64.tar.gz")
with tempfile.TemporaryDirectory() as tmp_dir_s:
tmp_dir = pathlib.Path(tmp_dir_s)
try:
tmp_file = _download_latest_release(
api,
timeout,
retry,
"vhs",
"charmbracelet/vhs",
tmp_dir,
filter,
reporter,
)
reporter.progress(f"processing vhs", 0, 0, 0)
_logger.debug("unpacking vhs")
shutil.unpack_archive(tmp_file, tmp_dir)
archive_basename = tmp_file.name
if archive_basename.endswith(".zip"):
archive_basename = archive_basename[: -len(".zip")]
elif archive_basename.endswith(".tar.gz"):
archive_basename = archive_basename[: -len(".tar.gz")]
elif archive_basename.endswith(".tar.xz"):
archive_basename = archive_basename[: -len(".tar.xz")]
src = tmp_dir / archive_basename / "vhs"
dst = bin_path / "vhs"
_logger.debug("copying %s -> %s", src, dst)
os.replace(src, dst)
dst.chmod(dst.stat().st_mode | stat.S_IEXEC)
except Exception as e:
raise VhsError(f"vhs install failed: {e}")
def _install_ttyd(
api: github.Github,
timeout: int,
retry: urllib3.Retry,
bin_path: pathlib.Path,
reporter: ProgressReporter,
):
filter = lambda name: name.endswith("x86_64")
with tempfile.TemporaryDirectory() as tmp_dir_s:
tmp_dir = pathlib.Path(tmp_dir_s)
try:
tmp_file = _download_latest_release(
api, timeout, retry, "ttyd", "tsl0922/ttyd", tmp_dir, filter, reporter
)
reporter.progress(f"processing ttyd", 0, 0, 0)
dst = bin_path / "ttyd"
_logger.debug("copying %s -> %s", tmp_file, dst)
os.replace(tmp_file, dst)
dst.chmod(dst.stat().st_mode | stat.S_IEXEC)
except Exception as e:
raise VhsError(f"ttyd install failed: {e}") from e
def _install_ffmpeg(
api: github.Github,
timeout: int,
retry: urllib3.Retry,
bin_path: pathlib.Path,
reporter: ProgressReporter,
):
filter = lambda name: name.startswith("ffmpeg-n5.1") and "linux64-gpl-5.1" in name
with tempfile.TemporaryDirectory() as tmp_dir_s:
tmp_dir = pathlib.Path(tmp_dir_s)
try:
tmp_file = _download_latest_release(
api,
timeout,
retry,
"ffmpeg",
"BtbN/FFmpeg-Builds",
tmp_dir,
filter,
reporter,
)
reporter.progress(f"processing ffmpeg", 0, 0, 0)
archive_basename = tmp_file.name
if archive_basename.endswith(".zip"):
archive_basename = archive_basename[: -len(".zip")]
elif archive_basename.endswith(".tar.gz"):
archive_basename = archive_basename[: -len(".tar.gz")]
elif archive_basename.endswith(".tar.xz"):
archive_basename = archive_basename[: -len(".tar.xz")]
_logger.debug("unpacking ffmpeg")
shutil.unpack_archive(tmp_file, tmp_dir)
for src in (tmp_dir / archive_basename / "bin").iterdir():
dst = bin_path / src.name
_logger.debug("copying %s -> %s", src, dst)
os.replace(src, dst)
dst.chmod(dst.stat().st_mode | stat.S_IEXEC)
except Exception as e:
raise VhsError(f"ffmpeg install failed: {e}")
def _check_version(
version: str, vhs_path: _PathLike
) -> _t.Tuple[bool, _t.Optional[str]]:
version_tuple = tuple(int(c) for c in version.split("."))
try:
system_version_text_b = subprocess.check_output([vhs_path, "--version"])
system_version_text = system_version_text_b.decode().strip()
if match := re.search(r"(\d+\.\d+\.\d+)", system_version_text):
system_version = match.group(1)
system_version_tuple = tuple(int(c) for c in system_version.split("."))
if system_version_tuple >= version_tuple:
return True, system_version
else:
_logger.debug(
"%s is outdated (got %s, required %s)",
vhs_path,
system_version,
version,
)
return False, system_version
else:
_logger.debug(
"%s printed invalid version %r", vhs_path, system_version_text
)
except (subprocess.SubprocessError, OSError, UnicodeDecodeError):
_logger.debug("%s failed to print its version", vhs_path, exc_info=True)
return False, None
def _check_and_install(
version: str,
bin_path: pathlib.Path,
path: str,
install: bool,
reporter: ProgressReporter,
timeout: int,
retry: urllib3.Retry,
) -> _t.Tuple[pathlib.Path, str]:
if version.startswith("v"):
version = version[1:]
# Try finding pre-installed vhs.
system_vhs_path = shutil.which("vhs", path=path)
system_version = None
if system_vhs_path:
can_use_system_vhs, system_version = _check_version(version, system_vhs_path)
if can_use_system_vhs:
_logger.debug("using pre-installed vhs at %s", system_vhs_path)
return pathlib.Path(system_vhs_path), path
else:
_logger.debug("pre-installed vhs not found")
# Check system compatibility.
if sys.platform == "darwin":
if system_vhs_path:
raise VhsError(
f"you have VHS {system_version}, "
f"but version {version} or newer is required; "
f"run `brew upgrade vhs` to upgrade it, or see installation instructions "
f"at https://github.com/charmbracelet/vhs#installation"
)
else:
raise VhsError(
f"VHS is not installed on your system; "
f"run `brew install vhs` to install it, or see installation instructions "
f"at https://github.com/charmbracelet/vhs#installation"
)
elif (
not install or sys.platform != "linux" or platform.architecture()[0] != "64bit"
):
if system_vhs_path:
raise VhsError(
f"you have VHS {system_version}, "
f"but version {version} or newer is required; "
f"see upgrade instructions "
f"at https://github.com/charmbracelet/vhs#installation"
)
else:
raise VhsError(
f"VHS is not installed on your system; "
f"see installation instructions "
f"at https://github.com/charmbracelet/vhs#installation"
)
# Download binary releases or use cached ones.
api = github.Github(retry=retry, timeout=timeout)
bin_path.mkdir(parents=True, exist_ok=True)
if not (bin_path / "ttyd").exists():
_logger.debug("downloading ttyd")
_install_ttyd(api, timeout, retry, bin_path, reporter)
else:
_logger.debug("using cached ttyd")
if not (bin_path / "ffmpeg").exists():
_logger.debug("downloading ffmpeg")
_install_ffmpeg(api, timeout, retry, bin_path, reporter)
else:
_logger.debug("using cached ffmpeg")
if path:
path = str(bin_path) + ":" + path
else:
path = str(bin_path)
vhs_path = bin_path / "vhs"
if vhs_path.exists():
can_use_cached_vhs, _ = _check_version(version, vhs_path)
if can_use_cached_vhs:
_logger.debug("using cached vhs")
return vhs_path, path
_install_vhs(api, timeout, retry, bin_path, reporter)
can_use_cached_vhs, _ = _check_version(version, vhs_path)
if not can_use_cached_vhs:
_logger.warning(
"downloaded latest vhs is outdated; "
"are you sure min_vhs_version is correct?"
)
return vhs_path, path