from __future__ import annotations
import hashlib
import json
import shutil
from pathlib import Path
from typing import TYPE_CHECKING
import omegaconf
from pharaoh.log import log
from .util import obj_groupby
if TYPE_CHECKING:
from collections.abc import Iterable, Iterator
class AssetFileLinkBrokenError(LookupError):
pass
[docs]
class Asset:
"""
Holds information about a generated asset.
:ivar id: An MD5 hash of the asset's filename, prefixed with "__ID__".
Since a unique suffix is included in the filename, this ID hash is also unique.
Can be used to quickly find this Asset instance.
:ivar Path infofile: Absolute path to the ``*.assetinfo`` file
:ivar Path assetfile: Absolute path to the actual asset file
:ivar omegaconf.DictConfig context: The content of *infofile* parsed into a OmegaConf dict.
"""
def __init__(self, info_file: Path):
assert info_file.suffix == ".assetinfo"
self.id: str = "__ID__" + hashlib.md5(bytes(info_file.name, "utf-8")).hexdigest()
self.infofile: Path = info_file
self.context = omegaconf.OmegaConf.create(json.loads(self.infofile.read_text()))
for file in self.infofile.parent.glob(f"{self.infofile.stem}*"):
if file.suffix != ".assetinfo":
self.assetfile: Path = file
break
else:
msg = f"There is no asset for inventory file {self.infofile}!"
raise AssetFileLinkBrokenError(msg)
def __str__(self):
return repr(self)
def __repr__(self):
return f"Asset[{self.infofile.stem}]"
def __eq__(self, other):
if isinstance(other, Asset):
return self.id == other.id and self.infofile == other.infofile and self.assetfile == other.assetfile
raise NotImplementedError
def __hash__(self):
return hash(self.infofile.name)
def __lt__(self, other):
if isinstance(other, Asset):
return self.infofile < other.infofile
raise NotImplementedError
[docs]
def copy_to(self, target_dir: Path) -> Path:
"""
Copy the asset plus info-file.
:param target_dir: The target directory to copy to. Will be created if it does not exist.
:returns: True if files were copied, False otherwise (files already exist)
"""
target_dir.mkdir(exist_ok=True, parents=True)
target_info_file = target_dir / self.infofile.name
if target_info_file.exists():
return target_dir / self.assetfile.name
log.debug(f"Copying asset {self} to build directory")
shutil.copy(self.infofile, target_info_file)
if Path(self.assetfile).is_file():
return Path(shutil.copy(self.assetfile, target_dir))
if Path(self.assetfile).is_dir():
shutil.copytree(self.assetfile, target_dir / self.assetfile.name)
return target_dir / self.assetfile.name
raise NotImplementedError
[docs]
def read_json(self) -> dict:
"""
Reads the file using a JSON parser.
"""
if self.assetfile.suffix.lower() != ".json":
msg = "Can only read .json files!"
raise Exception(msg)
return json.loads(self.assetfile.read_text("utf-8"))
[docs]
def read_yaml(self) -> dict:
"""
Reads the file using a YAML parser.
"""
import yaml
if self.assetfile.suffix.lower() not in (".yaml", ".yml"):
msg = "Can only read .yaml/.yml files!"
raise Exception(msg)
with open(self.assetfile, encoding="utf-8") as fp:
return yaml.safe_load(fp)
[docs]
def read_text(self, encoding: str = "utf-8") -> str:
"""
Reads the file as text
"""
return self.assetfile.read_text(encoding)
[docs]
def read_bytes(self) -> bytes:
"""
Reads the file as bytes
"""
return self.assetfile.read_bytes()
[docs]
class AssetFinder:
[docs]
def __init__(self, lookup_path: Path):
"""
A class for discovering and searching generated assets.
An instance of this class will be created by the Pharaoh project, where ``lookup_path`` will be set to
``report_project/.asset_build``.
:param lookup_path: The root directory to look for assets. It will be searched recursively for assets.
"""
self._lookup_path = lookup_path
self._assets: dict[str, list[Asset]] = {}
self.discover_assets()
[docs]
def discover_assets(self, components: list[str] | None = None) -> dict[str, list[Asset]]:
"""
Discovers all assets by recursively searching for ``*.assetinfo`` files and stores
the collection as instance variable (`_assets`).
:param components: A list of components to search for assets.
If None (the default), all components will be searched.
:return: A dictionary that maps component names to a list of :class:`Asset` instances.
"""
if isinstance(components, list) and len(components):
for component in components:
self._assets[component] = [Asset(file) for file in (self._lookup_path / component).glob("*.assetinfo")]
else:
self._assets.clear()
for asset in (Asset(file) for file in self._lookup_path.glob("*/*.assetinfo")):
component = asset.assetfile.parent.name
if component not in self._assets:
self._assets[component] = []
self._assets[component].append(asset)
return self._assets
[docs]
def search_assets(self, condition: str, components: str | Iterable[str] | None = None) -> list[Asset]:
"""
Searches already discovered assets (see :func:`discover_assets`) that match a condition.
:param condition: A Python expression that is evaluated using the content of the ``*.assetinfo`` JSON file
as namespace. If the evaluation returns a truthy result, the asset is returned.
Refer to :ref:`this example assetinfo file <example_asset_info>` to see the available default namespace.
Example::
# All HTML file where the "label" metadata ends with "_plot"
finder.search_assets('asset.suffix == ".html" and label.endswith("_plot")')
:param components: A list of component names to search. If None (the default), all components will be searched.
:return: A list of assets whose metadata match the condition.
"""
if not condition.strip():
return []
code = compile(condition, "<string>", "eval")
found = []
for asset in self.iter_assets(components):
try:
result = eval(code, {}, asset.context)
except Exception:
result = False
if result:
found.append(asset)
def sort_key(asset):
try:
return asset.context.asset.index
except AttributeError:
return 0
# Sort by asset index, which reflects the order in which the assets were generated in the asset script
return sorted(found, key=sort_key)
[docs]
def iter_assets(self, components: str | Iterable[str] | None = None) -> Iterator[Asset]:
"""
Iterates over all discovered assets.
:param components: A list of component names to search. If None (the default), all components will be searched.
:return: An iterator over all discovered assets.
"""
if not self._assets:
self.discover_assets()
if isinstance(components, str):
components = [components]
components = components or list(self._assets.keys())
for component in components:
if component in self._assets:
yield from self._assets[component]
[docs]
def get_asset_by_id(self, id: str) -> Asset | None:
"""
Returns the corresponding :class:`Asset` instance for a certain ID.
:param id: The ID of the asset to return
:return: An :class:`Asset` instance if found, None otherwise.
"""
for asset in self.iter_assets():
if asset.id == id:
return asset
return None
[docs]
def asset_groupby(
seq: Iterable[Asset], key: str, sort_reverse: bool = False, default: str | None = None
) -> dict[str, list[Asset]]:
"""
Groups an iterable of Assets by a certain metadata key.
During build-time rendering this function will be available as Jinja global function
``asset_groupby`` and alias ``agroupby``.
Example:
.. code-block:: none
We have following 4 assets (simplified notation of specified metadata):
Asset[a="1", b="3"]
Asset[a="1", c="4"]
Asset[a="2", b="3"]
Asset[a="2", c="4"]
Grouping by "a":
asset_groupby(assets, "a")
will yield
{
"1": [Asset[a="1", b="3"], Asset[a="1", c="4"]],
"2": [Asset[a="2", b="3"], Asset[a="2", c="4"]],
}
Grouping by "b" and default "default":
asset_groupby(assets, "b", default="default")
will yield
{
"3": [Asset[a="1", b="3"], Asset[a="2", b="3"]],
"default": [Asset[a="1", c="4"], Asset[a="2", c="4"]],
}
:param seq: The iterable of assets to group
:param key: The nested attribute to use for grouping, e.g. "A.B.C"
:param sort_reverse: Reverse-sort the keys in the returned dictionary
:param default: Sort each item, where "key" is not an existing attribute, into this default group
:return: A dictionary that maps the group names (values of A.B.C) to a list of items out of the input iterable
"""
return obj_groupby(seq=seq, key=key, sort_reverse=sort_reverse, attr="context", default=default)