Source code for esrf_pathlib._schemas.fields.path

import os
import re
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple

from .. import errors
from ..definitions.types import ConceptValueType
from ..identifier import SchemaIdentifier
from .base import Field
from .concept import PathConcept
from .tree import SegmentTemplate


[docs] class PathTemplate(Field): """Class for matching, parsing, and rendering paths from segment templates.""" _CONCEPT_RE = re.compile( r"\{([^\}=]+)(?:=([^\}]+))?\}" ) # matches {<name>} or {<name>=<literal>} def __init__( self, name: str, schema_identifier: SchemaIdentifier, segments: List[SegmentTemplate], concepts: Dict[str, PathConcept], ): """ :raises UnknownPathConcept: """ # String representation of the templates template_string = "" suffix = "" for segment in segments: if segment.mandatory: if template_string: template_string = f"{template_string}/{segment.template}" else: template_string = segment.template else: if template_string: template_string = f"{template_string}[/{segment.template}" else: template_string = f"[{segment.template}" suffix = f"{suffix}]" template_string += suffix # Count the required levels num_required_segments = 0 for segment in segments: if not segment.mandatory: break num_required_segments += 1 # Extract the used concepts and calculate template score used_concepts = {} required_concepts = list() required = True concept_score = 0 # total concept score in required segments, multiple occurrences do NOT count num_placeholders = 0 # total number of placeholders in all required segments, multiple occurrences count for segment in segments: if not segment.mandatory: required = False for placeholder_name, _ in self._CONCEPT_RE.findall(segment.template): if placeholder_name not in concepts: raise errors.UnknownPathConcept( f"Placeholder {placeholder_name!r} is not a concept of schema {str(schema_identifier)!r}" ) concept = concepts[placeholder_name] if required: num_placeholders += 1 if placeholder_name not in required_concepts: required_concepts.append(placeholder_name) if concept.name not in used_concepts: concept_score += concept.score used_concepts[placeholder_name] = concept super().__init__( name=name, description=template_string, schema_identifier=schema_identifier, value_generator=str, ) self._concepts: Dict[str, PathConcept] = used_concepts self._segments: List[SegmentTemplate] = segments self._regex: re.Pattern = self._compile_regex(segments) self._concept_score: int = concept_score self._num_placeholders: int = num_placeholders self._required_concepts: List[str] = required_concepts def __repr__(self) -> str: return f"<{type(self).__name__} path={self._name!r} schema={str(self._schema_identifier)!r} template={self._description!r}>" @property def docstring(self) -> str: return f"Filesystem path template\n\n``{self._description}``"
[docs] def parse(self, path: str) -> Dict[str, ConceptValueType]: """Match and convert a path to a dictionary of path concept values. :raises PathTemplateMatchError: """ return self._parse(path)
[docs] def parse_with_score( self, path: str ) -> Tuple[Optional[Dict[str, ConceptValueType]], Tuple[int, ...]]: """Returns an empty tuple if it does not match. If it does match it returns the concept values and the match score. """ try: concept_values = self._parse(path) except errors.PathTemplateMatchError: return None, tuple() score = self._concept_score, self._num_placeholders return concept_values, score
def _parse(self, path: str) -> Dict[str, ConceptValueType]: """ :raises PathTemplateMatchError: """ norm_path = path.replace(os.sep, "/") m = self._regex.match(norm_path) if not m: raise errors.PathTemplateMatchError(f"Path {path!r} does not match {self}") return { cname: self._concepts[cname].deserialize(cvalue) for cname, cvalue in m.groupdict().items() }
[docs] def render( self, concept_values: Dict[str, ConceptValueType], raise_on_missing: bool = True, raise_on_mismatch: bool = True, ) -> str: """Convert a dictionary of path concept values to a path. :raises PathConceptValueError: """ parts, _ = self._match( concept_values, raise_on_missing=raise_on_missing, raise_on_mismatch=raise_on_mismatch, ) return os.path.join(*parts)
[docs] def render_score( self, concept_values: Dict[str, ConceptValueType] ) -> Tuple[int, ...]: """Returns an empty tuple if it does not match. If it does match it returns the match score. """ try: _, score = self._match(concept_values) except errors.PathConceptValueError: return tuple() return score
def _match( self, concept_values: Dict[str, ConceptValueType], raise_on_missing: bool = True, raise_on_mismatch: bool = True, ) -> Tuple[List[str], Tuple[int, ...]]: """ :raises PathConceptValueError: """ used = set() def serialize_concept(match: re.Match): concept, cvalue = self._serialize_concept( match, concept_values=concept_values, raise_on_missing=raise_on_missing or not segment.mandatory, raise_on_mismatch=raise_on_mismatch, ) used.add(concept.name) return cvalue parts = [] for segment in self._segments: try: rendered = self._CONCEPT_RE.sub(serialize_concept, segment.template) except errors.PathConceptValueError: if segment.mandatory: raise continue parts.append(rendered) num_missing = len(set(concept_values) - used) score = self._concept_score, self._num_placeholders, -num_missing return parts, score def _compile_regex(self, segments: List[SegmentTemplate]) -> re.Pattern: """ :raises UnknownPathConcept: """ used_groups = [] def segment_as_regex(match): return self._segment_as_regex(match, used_groups) regex_parts = [] for segment in segments: seg_regex = self._CONCEPT_RE.sub(segment_as_regex, segment.template) if regex_parts: # Not the first segment if segment.mandatory: seg_regex = f"/{seg_regex}" else: seg_regex = f"(?:/{seg_regex})?" else: # First segment if segment.mandatory: seg_regex = seg_regex else: seg_regex = f"(?:{seg_regex})?" regex_parts.append(seg_regex) pattern = "".join(regex_parts) return re.compile(f"^{pattern}$") def _segment_as_regex(self, match: re.Match, used_groups: List[str]) -> str: """ :raises UnknownPathConcept: """ cname, literal = match.groups() if cname not in self._concepts: raise errors.UnknownPathConcept(cname) if cname in used_groups: return f"(?P={cname})" used_groups.append(cname) concept = self._concepts[cname] if literal: return f"(?P<{cname}>{re.escape(literal)})" else: return f"(?P<{cname}>{concept.regex})" def _serialize_concept( self, match: re.Match, concept_values: Dict[str, ConceptValueType], raise_on_missing: bool = True, raise_on_mismatch: bool = True, ) -> Tuple[PathConcept, ConceptValueType]: """ :raises PathConceptValueError: """ cname, literal = match.groups() concept = self._concepts[cname] has_literal = literal is not None provided = concept_values.get(cname) if has_literal: literal = concept.serialize(literal, raise_on_missing=raise_on_missing) if provided is None: provided = literal has_literal = False result = concept.serialize(provided, raise_on_missing=raise_on_missing) if not has_literal: return concept, result literal = concept.serialize(literal, raise_on_missing=raise_on_missing) if provided is not None and result != literal and raise_on_mismatch: raise errors.PathConceptMatchError( f"{self.name!r}={result!r} instead of {literal!r}" ) return concept, literal
[docs] def contains_optionals(self, concept_values: Dict[str, ConceptValueType]) -> bool: return any( cname not in self._required_concepts and cvalue is not None for cname, cvalue in concept_values.items() )
[docs] def remove_optionals( self, concept_values: Dict[str, ConceptValueType] ) -> Dict[str, ConceptValueType]: names = self._required_concepts or self._concepts return { cname: cvalue for cname, cvalue in concept_values.items() if cname in names and cvalue is not None }
@property def required_concept_names(self) -> List[str]: return self._required_concepts
[docs] def strip_mount_point( self, concept_values: Dict[str, ConceptValueType] ) -> Dict[str, ConceptValueType]: """ Strip all segments from the root concept before a specific marker segment. """ concept_values = dict(concept_values) if not self._segments or not self._segments[0].symbolic_root_segment: return concept_values root_concept_name = self._segments[0].template.strip("{}") if root_concept_name not in concept_values: return concept_values concept_values[root_concept_name] = _strip_before_segment( concept_values[root_concept_name], segment=self._segments[0].symbolic_root_segment, ) return concept_values
def _strip_before_segment(path: str, segment: Optional[str] = None) -> str: if segment is None: return path drive, tail = os.path.splitdrive(path) parts = tail.split(os.sep) try: idx = parts.index(segment) except ValueError: return path return os.path.join(drive or os.sep, *parts[idx:])