Source code for esrf_pathlib._schemas.fields.tree

from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Tuple
from typing import Union

RawSegmentType = Union[str, Union[Tuple[str], Tuple[str, str], Tuple[str, str, bool]]]
# Parsed as _ValidatedRawSegment


class _ValidatedRawSegment(NamedTuple):
    template: str
    path_name: Optional[str] = None
    mandatory: bool = True


[docs] class SegmentTemplate(NamedTuple): template: str mandatory: bool symbolic_root_segment: Optional[str] = None
[docs] class PathTemplateArguments(NamedTuple): name: str segments: List[SegmentTemplate]
[docs] def parse_template_tree( template_tree: Dict[RawSegmentType, Optional[dict]], symbolic_root_segment: Optional[str] = None, _parent_segments: Optional[List[PathTemplateArguments]] = None, ) -> List[PathTemplateArguments]: """Convert a nested dictionary with :class:`RawSegmentType` keys to a flat list of :class:`PathTemplate` arguments. """ if _parent_segments is None: _parent_segments = [] templates: List[PathTemplateArguments] = [] for key, subtree in template_tree.items(): if isinstance(key, str): key = (key,) segment = _ValidatedRawSegment(*key) current_segments = _parent_segments + [ SegmentTemplate( template=segment.template, mandatory=segment.mandatory, symbolic_root_segment=symbolic_root_segment, ) ] symbolic_root_segment = None if segment.path_name: templates.append( PathTemplateArguments( name=segment.path_name, segments=current_segments, ) ) if subtree is not None: templates.extend( parse_template_tree(subtree, _parent_segments=current_segments) ) return templates
[docs] def tree_as_strings(segments: List[List[SegmentTemplate]]) -> List[str]: """Convert several segment lists to a stringified representation of a directory structure. For example .. code:: {data_root}/{proposal}/{beamline}/{session_date}/ ├── {proposal}_{beamline}.h5 └── {collection}/ ├── {proposal}_{collection}.h5 └── {dataset}/ └── {collection}_{dataset}.h5 """ tree: Dict[str, dict] = {} for seg_path in segments: _insert_path(tree, seg_path) # Collapse common root prefix only if not a leaf collapsed = [] node = tree while len(node) == 1: key, subnode = next(iter(node.items())) if not subnode: # stop collapsing at a leaf break collapsed.append(key) node = subnode collapsed_node = "/".join(collapsed) + "/" if collapsed else "/" if node: return [collapsed_node] + _render_tree(node) return [collapsed_node]
_BRANCH = " ├──" _LAST_BRANCH = " └──" _PREFIX = " │ " _LAST_PREFIX = " " _OPTIONAL = " [*]" def _insert_path(tree: Dict[str, dict], path: List[SegmentTemplate]): """Recursively insert a segment path into the nested tree dict.""" if not path: return current_segment, *sub_path = path label = current_segment.template + ( _OPTIONAL if not current_segment.mandatory else "" ) subtree = tree.setdefault(label, {}) _insert_path(subtree, sub_path) def _render_tree(node: Dict[str, dict], prefix: str = "") -> List[str]: """Render a nested dict tree into pretty tree lines.""" lines = [] keys = list(node.keys()) for i, key in enumerate(keys): is_last = i == len(keys) - 1 branch = _LAST_BRANCH if is_last else _BRANCH is_dir = bool(node[key]) optional = key.endswith(_OPTIONAL) clean_key = key[: -len(_OPTIONAL)] if optional else key slash = "/" if is_dir else "" opt_marker = _OPTIONAL if optional else "" lines.append(f"{prefix}{branch} {clean_key}{slash}{opt_marker}") if is_dir: next_prefix = prefix + (_LAST_PREFIX if is_last else _PREFIX) lines.extend(_render_tree(node[key], next_prefix)) return lines