1
0
mirror of https://github.com/home-assistant/supervisor.git synced 2025-12-20 02:18:59 +00:00
Files
supervisor/supervisor/utils/codenotary.py
Pascal Vizeli 3478005e70 Using CAS for content-trust (#3382)
* Using CAS for content-trust

* v2

* Fix linting errors

* Adjust field checked for status in CAS response

* CI workflow needs CAS not VCN now

* Use cwd in test as code won't be in /usr/src

* Pre-cache CAS pub key for supervisor

* Cas doesn't actually need key file executable

Co-authored-by: Mike Degatano <michael.degatano@gmail.com>
2022-02-10 09:21:21 +01:00

89 lines
2.5 KiB
Python

"""Small wrapper for CodeNotary."""
import asyncio
import hashlib
import json
import logging
from pathlib import Path
import shlex
from typing import Final, Union
import async_timeout
from dirhash import dirhash
from . import clean_env
from ..exceptions import CodeNotaryBackendError, CodeNotaryError, CodeNotaryUntrusted
_LOGGER: logging.Logger = logging.getLogger(__name__)
_CAS_CMD: str = (
"cas authenticate --signerID {signer} --silent --output json --hash {sum}"
)
_CACHE: set[tuple[str, str]] = set()
_ATTR_ERROR: Final = "error"
_ATTR_STATUS: Final = "status"
def calc_checksum(data: Union[str, bytes]) -> str:
"""Generate checksum for CodeNotary."""
if isinstance(data, str):
return hashlib.sha256(data.encode()).hexdigest()
return hashlib.sha256(data).hexdigest()
def calc_checksum_path_sourcecode(folder: Path) -> str:
"""Calculate checksum for a path source code."""
return dirhash(folder.as_posix(), "sha256", match=["*.py"])
async def cas_validate(
signer: str,
checksum: str,
) -> None:
"""Validate data against CodeNotary."""
if (checksum, signer) in _CACHE:
return
# Generate command for request
command = shlex.split(_CAS_CMD.format(signer=signer, sum=checksum))
# Request notary authorization
_LOGGER.debug("Send cas command: %s", command)
try:
proc = await asyncio.create_subprocess_exec(
*command,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
env=clean_env(),
)
async with async_timeout.timeout(10):
data, _ = await proc.communicate()
except OSError as err:
raise CodeNotaryError(
f"CodeNotary fatal error: {err!s}", _LOGGER.critical
) from err
except asyncio.TimeoutError:
raise CodeNotaryError(
"Timeout while processing CodeNotary", _LOGGER.error
) from None
# Parse data
try:
data_json = json.loads(data)
_LOGGER.debug("CodeNotary response with: %s", data_json)
except (json.JSONDecodeError, UnicodeDecodeError) as err:
raise CodeNotaryError(
f"Can't parse CodeNotary output: {data!s} - {err!s}", _LOGGER.error
) from err
if _ATTR_ERROR in data_json:
raise CodeNotaryBackendError(data_json[_ATTR_ERROR], _LOGGER.warning)
if data_json[_ATTR_STATUS] == 0:
_CACHE.add((checksum, signer))
else:
raise CodeNotaryUntrusted()