from__future__importannotationsimportconcurrent.futuresimportcontextlibimportioimportjsonimportlogging.handlersimportmultiprocessingimportosimportreimportshutilimporttracebackfromfunctoolsimportpartialfrompathlibimportPathfromtypesimportModuleTypefromtypingimportTYPE_CHECKING,Unionimportpharaoh.logfrompharaohimportprojectfrompharaoh.assetlibimportpatchesfrompharaoh.assetlib.contextimportcontext_stackfrompharaoh.assetlib.finderimportAssetfrompharaoh.templating.second_level.sphinx_ext.asset_tmplimportfind_asset_templatefrompharaoh.util.contextlib_chdirimportchdirfrompharaoh.util.json_encoderimportCustomJSONEncoderifTYPE_CHECKING:fromcollections.abcimportIterablefromqueueimportQueuelog=pharaoh.log.logPathLike=Union[str,Path]defgenerate_assets(project_root:Path,asset_src:Path,component_name:str="",mp_log_queue:Queue|None=None):# Since this function is always called in a process by generate_assets_parallel,# we need to remove at least all file handlers so child-processes don't log to the same file# as the parent process, otherwise race conditions may occur.# So we just remove all handlers and add a QueueHandler# to send all log records to the parent in order to handle them.ifmp_log_queueisnotNone:# pragma: no coverforhdlinlog.handlers:log.removeHandler(hdl)log.addHandler(logging.handlers.QueueHandler(mp_log_queue))# Also forbid the project instance to add loggers we just removedproj=project.PharaohProject(project_root=project_root,logging_add_filehandler=False)context_stack.reset()try:script_path=asset_src.relative_to(project_root).as_posix()exceptValueError:script_path=asset_src.as_posix()ifasset_src.suffix.lower()==".py":script_ignore_pattern=proj.get_setting("asset_gen.script_ignore_pattern")with(patches.patch_3rd_party_libraries(),context_stack.new_context(context_name="generate_assets",asset={"script_name":asset_src.name,"script_path":asset_src,"index":0,"component_name":component_name,},),):code=asset_src.read_text(encoding="utf-8")first_line=code.split("\n",maxsplit=1)[0].strip()ifre.fullmatch(script_ignore_pattern,asset_src.name)orre.fullmatch(r"^# *pharaoh?: *ignore *",first_line,re.IGNORECASE):log.info(f"Ignoring file {script_path}")returnlog.info(f"Generating assets from script {script_path!r}...")asset_module=module_from_file(asset_src)WAVEWATSON_LEGACY_INPLACE=os.environ.get("WAVEWATSON_LEGACY_INPLACE")os.environ["WAVEWATSON_LEGACY_INPLACE"]="0"try:run_module(asset_module,code)exceptExceptionase:msg=(f"An exception was raised when executing module "f"{str(asset_src)!r}:\n\n{e}\n\nTraceback:\n{traceback.format_exc()}")raiseException(msg)fromNonefinally:ifWAVEWATSON_LEGACY_INPLACEisNone:delos.environ["WAVEWATSON_LEGACY_INPLACE"]else:os.environ["WAVEWATSON_LEGACY_INPLACE"]=WAVEWATSON_LEGACY_INPLACEelifasset_src.suffix.lower()==".ipynb":# import locally, otherwise Sphinx autodoc on pharaoh.assetlib.api fails because of nbformat package and this# issue: https://github.com/sphinx-doc/sphinx/issues/11662importnbformatfromnbconvert.preprocessorsimportCellExecutionError,ExecutePreprocessorlog.info(f"Generating assets from notebook {script_path!r}...")withopen(asset_src)asfile:nb=nbformat.read(file,as_version=4)initial_node=nbformat.notebooknode.from_dict({"cell_type":"code","execution_count":None,"id":"000000","outputs":[],"metadata":{},"source":f"""import osfrom pharaoh.api import PharaohProjectfrom pharaoh.assetlib.api import metadata_contextfrom pharaoh.assetlib.patches import patch_3rd_party_librariesproj = PharaohProject(project_root="{project_root.as_posix()}")patcher = patch_3rd_party_libraries()patcher.__enter__()metadata_context( context_name="generate_assets", asset=dict( script_name="{asset_src.name}", script_path="{asset_src.as_posix()}", component_name="{component_name}", index=0 )).activate()os.environ["WAVEWATSON_LEGACY_INPLACE"] = "0"os.environ["JPY_SESSION_NAME"] = "{asset_src.as_posix()}""""[1:-1],})# Remark: JPY_SESSION_NAME is normally set by Jupyter runtime. In our case we use the nbconvert preprocessor# so the variable is not set. So we have do set it manually to the current notebook path in order for the# pharaoh.assetlib.api.get_current_component function to be able to find the "executing" script..nb.cells.insert(0,initial_node)ep=ExecutePreprocessor(timeout=600)subdir=component_nameor"default"try:ep.preprocess(nb,{"metadata":{"path":str(asset_src.parent)}})completed_notebooks_path=proj.asset_build_dir/"completed_notebooks"/subdir/asset_src.namecompleted_notebooks_path.parent.mkdir(parents=True,exist_ok=True)withopen(completed_notebooks_path,"w",encoding="utf-8")asfile:nbformat.write(nb,file)exceptCellExecutionErrorase:failed_notebooks_path=proj.asset_build_dir/"failed_notebooks"/subdir/asset_src.namefailed_notebooks_path.parent.mkdir(parents=True,exist_ok=True)withopen(failed_notebooks_path,"w",encoding="utf-8")asfile:nbformat.write(nb,file)msg=(f"An exception was raised when executing notebook '{asset_src.stem}': {e}\n"f"Check the notebook for errors/traces: {failed_notebooks_path}")# Remove all 7-bit C1 ANSI sequences for better readabilityansi_escape=re.compile(r""" \x1B # ESC (?: # 7-bit C1 Fe (except CSI) [@-Z\\-_] | # or [ for CSI, followed by a control sequence \[ [0-?]* # Parameter bytes [ -/]* # Intermediate bytes [@-~] # Final byte ) """,re.VERBOSE,)msg=ansi_escape.sub("",msg)raiseException(msg)fromNonedefgenerate_assets_parallel(project_root:PathLike,asset_sources:Iterable[tuple[str,Path]],workers:str|int="auto"):project_root=Path(project_root)ifisinstance(workers,int):workers=max(1,workers)ifisinstance(workers,str):ifworkers.lower()=="auto":workers=multiprocessing.cpu_count()else:msg="Argument worker may only be an integer number or the string 'auto'!"raiseValueError(msg)log.info(f"Executing asset generation with {workers} worker processes")generate_asset_partial=partial(generate_assets,project_root=project_root)mp_manager=multiprocessing.Manager()mp_log_queue=mp_manager.Queue(-1)results=[]# The queue listener collects all log records handled via the queue handler (defined inside generate_assets)# in order to log them in the parent processql=logging.handlers.QueueListener(mp_log_queue,*log.handlers,respect_handler_level=True)ql.start()withconcurrent.futures.ProcessPoolExecutor(max_workers=workers)asexecutor:futures_map={executor.submit(generate_asset_partial,asset_src=asset_source,component_name=component_name,mp_log_queue=mp_log_queue):asset_sourceforcomponent_name,asset_sourceinasset_sources}forfutureinconcurrent.futures.as_completed(futures_map.keys()):result=Nonetry:result=future.result()results.append((futures_map[future],result))exceptSystemExitase:ife.code==0:results.append((futures_map[future],result))else:results.append((futures_map[future],e))exceptExceptionase:results.append((futures_map[future],e))ql.stop()returnresultsdefmodule_from_file(path:str|Path)->ModuleType:""" Creates a module from file at runtime. :param path: Path to the module source code. :return: A fresh module """module_path=Path(path)module_name=f"asset_module_{module_path.stem.replace(' ','_').replace('-','_')}"module=ModuleType(module_name)module.__dict__["__file__"]=str(module_path.absolute())module.__path__=[str(module_path.parent)]module.__dict__["__name__"]="__main__"module.__dict__["__module_name__"]=module_namereturnmoduledefrun_module(module:ModuleType,code:str):""" Execute the configured source code in a module. :param module: A module object. :param code: The code to be run inside the module """compiled_code=compile(code,module.__dict__["__file__"],"exec")withchdir(Path(module.__dict__["__file__"]).parent):exec(compiled_code,module.__dict__)
[docs]defregister_asset(file:PathLike,metadata:dict|None=None,template:str|None=None,data:io.BytesIO|None=None,copy2build:bool=False,**kwargs,)->Asset|None:""" Register an asset manually. The file will be copied (if data is None and 'file' is a real file) or written (if data is given) to the asset build folder of the current Pharaoh project. :param file: The filename. Must exist even if data is set, to have a filename to store the asset and to automatically determine the template (if not set via template argument). :param metadata: Additional metadata to store on the asset. :param template: The template used to render the asset. If omitted, it is inferred by the file extension. .. seealso:: :ref:`reference/directive:Asset Templates` :param data: An io.BytesIO instance. Used if the asset is generated in memory and should be stored to disk by this function. :param copy2build: If True, the asset will be copied to the asset build directory, even if not referenced in the template. Background: Pharaoh stores all assets in the project directory and copies them to the build directory only if copy2build is set to True or on-demand by Pharaoh. For example if an HTML file is rendered using an iframe, the HTML file has to be copied to the build folder where the iframe can later include it. :returns: The file path where the asset will be actually stored """try:active_app=project.get_project()exceptException:# If there is no Pharaoh application yet, the calling file is presumably executed standalone,# so we have to skip exporting any filesreturnNonecomponent_name=kwargs.pop("component",None)ifkwargs:raiseException("Unknown keyword arguments "+",".join(kwargs.keys()))file=Path(file)ifnottemplate:suffix=file.suffix.lower()frompharaoh.plugins.plugin_managerimportPMtemplate=PM.pharaoh_get_asset_render_template_mappings().get(suffix)iftemplateisnotNone:find_asset_template(template)# will fail if template does not existiftemplatein("iframe",):copy2build=True# If this function is used in an asset script that is executed directly, the component name is not added to the# metadata context, so we have to find the component via the callstack and pass it to _build_asset_filepathfrompharaoh.assetlib.apiimportget_current_componentifcomponent_nameisNone:try:component_name=get_current_component()exceptLookupError:# raised if method is not executed from inside a componentmsg=("When register_asset is called outside a component of a Pharaoh project, keyword argument ""'component' must be set!")raiseException(msg)fromNoneasset_file_path=active_app._build_asset_filepath(file,component_name)metadata=metadataor{}metadata.pop("context_name",None)metadata.pop("asset",None)withcontext_stack.new_context(context_name="manual_registry",asset={"user_filepath":str(file),"file":str(asset_file_path),"name":asset_file_path.name,"stem":asset_file_path.stem,"suffix":asset_file_path.suffix,"template":template,"copy2build":copy2build,},**metadata,):withcontextlib.suppress(LookupError):# If asset scripts are executed directly, this context does not existcontext_stack.get_parent_context(name="generate_assets")["asset"]["index"]+=1ifisinstance(data,io.BytesIO):withopen(asset_file_path,"wb")asfp:fp.write(bytes(data.getbuffer()))else:iffile.is_file():shutil.copy(file.absolute(),asset_file_path)eliffile.is_dir():shutil.copytree(file,asset_file_path)else:msg=f"{file} does not exist!"raiseFileNotFoundError(msg)info_file=context_stack.dump(asset_file_path)returnAsset(info_file)
[docs]defregister_templating_context(name:str,context:str|Path|dict|list,metadata:dict|None=None,**kwargs):""" Register a data context for the build-time templating stage. The data may be given directly as dict/list or via a json or yaml file. This function is designed to be used within asset scripts, to easily register data you extract from resources for the templating process. Example:: from pharaoh.assetlib.api import register_templating_context register_templating_context(name="foo", context={"bar": "baz"}) # will be accessed like this: {{ ctx.local.foo.bar.baz }} :param name: The name under which the data context is available inside Jinja templates. Access like this (name: mycontext):: {% set mycontext = ctx.local.mycontext %} :param context: Either a str or :external:class:`Path <pathlib.Path>` instance pointing to a json or yaml file, or a dict or list. All data must contain only json-compatible types, otherwise the data cannot be stored. :param metadata: The given context will be internally registered as an asset with following metadata: ``dict(pharaoh_templating_context=name, **metadata)`` :param kwargs: Keyword arguments that are mostly (except ``component``) passed to ``json.dumps(...)``, in case ``context`` is a dict or list. """component=kwargs.pop("component",None)metadata=metadataor{}metadata.pop("pharaoh_templating_context",None)kwargs.setdefault("cls",CustomJSONEncoder)ifnotname:msg="name must be a non-empty string!"raiseValueError(msg)ifisinstance(context,(str,Path)):file=Path(context)iffile.suffix.lower()notin(".json",".yaml"):msg="If context is a file path, it's suffix must be either .json or .yaml!"raiseValueError(msg)register_asset(file,metadata=dict(pharaoh_templating_context=name,**metadata),component=component)elifisinstance(context,(list,dict)):data=io.BytesIO(json.dumps(context,**kwargs).encode("utf-8"))register_asset("pharaoh_templating_context.json",metadata=dict(pharaoh_templating_context=name,**metadata),data=data,component=component,)else:msg=f"Unsupported type {type(context)}!"raiseTypeError(msg)