Source code for esrf_pathlib._schemas.fields.tree
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Tuple
from typing import Union
RawSegmentType = Union[str, Union[Tuple[str], Tuple[str, str], Tuple[str, str, bool]]]
# Parsed as _ValidatedRawSegment
class _ValidatedRawSegment(NamedTuple):
template: str
path_name: Optional[str] = None
mandatory: bool = True
[docs]
class SegmentTemplate(NamedTuple):
template: str
mandatory: bool
symbolic_root_segment: Optional[str] = None
[docs]
class PathTemplateArguments(NamedTuple):
name: str
segments: List[SegmentTemplate]
[docs]
def parse_template_tree(
template_tree: Dict[RawSegmentType, Optional[dict]],
symbolic_root_segment: Optional[str] = None,
_parent_segments: Optional[List[PathTemplateArguments]] = None,
) -> List[PathTemplateArguments]:
"""Convert a nested dictionary with :class:`RawSegmentType` keys to a
flat list of :class:`PathTemplate` arguments.
"""
if _parent_segments is None:
_parent_segments = []
templates: List[PathTemplateArguments] = []
for key, subtree in template_tree.items():
if isinstance(key, str):
key = (key,)
segment = _ValidatedRawSegment(*key)
current_segments = _parent_segments + [
SegmentTemplate(
template=segment.template,
mandatory=segment.mandatory,
symbolic_root_segment=symbolic_root_segment,
)
]
symbolic_root_segment = None
if segment.path_name:
templates.append(
PathTemplateArguments(
name=segment.path_name,
segments=current_segments,
)
)
if subtree is not None:
templates.extend(
parse_template_tree(subtree, _parent_segments=current_segments)
)
return templates
[docs]
def tree_as_strings(segments: List[List[SegmentTemplate]]) -> List[str]:
"""Convert several segment lists to a stringified representation of a directory structure.
For example
.. code::
{data_root}/{proposal}/{beamline}/{session_date}/
├── {proposal}_{beamline}.h5
└── {collection}/
├── {proposal}_{collection}.h5
└── {dataset}/
└── {collection}_{dataset}.h5
"""
tree: Dict[str, dict] = {}
for seg_path in segments:
_insert_path(tree, seg_path)
# Collapse common root prefix only if not a leaf
collapsed = []
node = tree
while len(node) == 1:
key, subnode = next(iter(node.items()))
if not subnode: # stop collapsing at a leaf
break
collapsed.append(key)
node = subnode
collapsed_node = "/".join(collapsed) + "/" if collapsed else "/"
if node:
return [collapsed_node] + _render_tree(node)
return [collapsed_node]
_BRANCH = " ├──"
_LAST_BRANCH = " └──"
_PREFIX = " │ "
_LAST_PREFIX = " "
_OPTIONAL = " [*]"
def _insert_path(tree: Dict[str, dict], path: List[SegmentTemplate]):
"""Recursively insert a segment path into the nested tree dict."""
if not path:
return
current_segment, *sub_path = path
label = current_segment.template + (
_OPTIONAL if not current_segment.mandatory else ""
)
subtree = tree.setdefault(label, {})
_insert_path(subtree, sub_path)
def _render_tree(node: Dict[str, dict], prefix: str = "") -> List[str]:
"""Render a nested dict tree into pretty tree lines."""
lines = []
keys = list(node.keys())
for i, key in enumerate(keys):
is_last = i == len(keys) - 1
branch = _LAST_BRANCH if is_last else _BRANCH
is_dir = bool(node[key])
optional = key.endswith(_OPTIONAL)
clean_key = key[: -len(_OPTIONAL)] if optional else key
slash = "/" if is_dir else ""
opt_marker = _OPTIONAL if optional else ""
lines.append(f"{prefix}{branch} {clean_key}{slash}{opt_marker}")
if is_dir:
next_prefix = prefix + (_LAST_PREFIX if is_last else _PREFIX)
lines.extend(_render_tree(node[key], next_prefix))
return lines