import os
import re
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from .. import errors
from ..definitions.types import ConceptValueType
from ..identifier import SchemaIdentifier
from .base import Field
from .concept import PathConcept
from .tree import SegmentTemplate
[docs]
class PathTemplate(Field):
"""Class for matching, parsing, and rendering paths from segment templates."""
_CONCEPT_RE = re.compile(
r"\{([^\}=]+)(?:=([^\}]+))?\}"
) # matches {<name>} or {<name>=<literal>}
def __init__(
self,
name: str,
schema_identifier: SchemaIdentifier,
segments: List[SegmentTemplate],
concepts: Dict[str, PathConcept],
):
"""
:raises UnknownPathConcept:
"""
# String representation of the templates
template_string = ""
suffix = ""
for segment in segments:
if segment.mandatory:
if template_string:
template_string = f"{template_string}/{segment.template}"
else:
template_string = segment.template
else:
if template_string:
template_string = f"{template_string}[/{segment.template}"
else:
template_string = f"[{segment.template}"
suffix = f"{suffix}]"
template_string += suffix
# Count the required levels
num_required_segments = 0
for segment in segments:
if not segment.mandatory:
break
num_required_segments += 1
# Extract the used concepts and calculate template score
used_concepts = {}
required_concepts = list()
required = True
concept_score = 0 # total concept score in required segments, multiple occurrences do NOT count
num_placeholders = 0 # total number of placeholders in all required segments, multiple occurrences count
for segment in segments:
if not segment.mandatory:
required = False
for placeholder_name, _ in self._CONCEPT_RE.findall(segment.template):
if placeholder_name not in concepts:
raise errors.UnknownPathConcept(
f"Placeholder {placeholder_name!r} is not a concept of schema {str(schema_identifier)!r}"
)
concept = concepts[placeholder_name]
if required:
num_placeholders += 1
if placeholder_name not in required_concepts:
required_concepts.append(placeholder_name)
if concept.name not in used_concepts:
concept_score += concept.score
used_concepts[placeholder_name] = concept
super().__init__(
name=name,
description=template_string,
schema_identifier=schema_identifier,
value_generator=str,
)
self._concepts: Dict[str, PathConcept] = used_concepts
self._segments: List[SegmentTemplate] = segments
self._regex: re.Pattern = self._compile_regex(segments)
self._concept_score: int = concept_score
self._num_placeholders: int = num_placeholders
self._required_concepts: List[str] = required_concepts
def __repr__(self) -> str:
return f"<{type(self).__name__} path={self._name!r} schema={str(self._schema_identifier)!r} template={self._description!r}>"
@property
def docstring(self) -> str:
return f"Filesystem path template\n\n``{self._description}``"
[docs]
def parse(self, path: str) -> Dict[str, ConceptValueType]:
"""Match and convert a path to a dictionary of path concept values.
:raises PathTemplateMatchError:
"""
return self._parse(path)
[docs]
def parse_with_score(
self, path: str
) -> Tuple[Optional[Dict[str, ConceptValueType]], Tuple[int, ...]]:
"""Returns an empty tuple if it does not match.
If it does match it returns the concept values and the match score.
"""
try:
concept_values = self._parse(path)
except errors.PathTemplateMatchError:
return None, tuple()
score = self._concept_score, self._num_placeholders
return concept_values, score
def _parse(self, path: str) -> Dict[str, ConceptValueType]:
"""
:raises PathTemplateMatchError:
"""
norm_path = path.replace(os.sep, "/")
m = self._regex.match(norm_path)
if not m:
raise errors.PathTemplateMatchError(f"Path {path!r} does not match {self}")
return {
cname: self._concepts[cname].deserialize(cvalue)
for cname, cvalue in m.groupdict().items()
}
[docs]
def render(
self,
concept_values: Dict[str, ConceptValueType],
raise_on_missing: bool = True,
raise_on_mismatch: bool = True,
) -> str:
"""Convert a dictionary of path concept values to a path.
:raises PathConceptValueError:
"""
parts, _ = self._match(
concept_values,
raise_on_missing=raise_on_missing,
raise_on_mismatch=raise_on_mismatch,
)
return os.path.join(*parts)
[docs]
def render_score(
self, concept_values: Dict[str, ConceptValueType]
) -> Tuple[int, ...]:
"""Returns an empty tuple if it does not match.
If it does match it returns the match score.
"""
try:
_, score = self._match(concept_values)
except errors.PathConceptValueError:
return tuple()
return score
def _match(
self,
concept_values: Dict[str, ConceptValueType],
raise_on_missing: bool = True,
raise_on_mismatch: bool = True,
) -> Tuple[List[str], Tuple[int, ...]]:
"""
:raises PathConceptValueError:
"""
used = set()
def serialize_concept(match: re.Match):
concept, cvalue = self._serialize_concept(
match,
concept_values=concept_values,
raise_on_missing=raise_on_missing or not segment.mandatory,
raise_on_mismatch=raise_on_mismatch,
)
used.add(concept.name)
return cvalue
parts = []
for segment in self._segments:
try:
rendered = self._CONCEPT_RE.sub(serialize_concept, segment.template)
except errors.PathConceptValueError:
if segment.mandatory:
raise
continue
parts.append(rendered)
num_missing = len(set(concept_values) - used)
score = self._concept_score, self._num_placeholders, -num_missing
return parts, score
def _compile_regex(self, segments: List[SegmentTemplate]) -> re.Pattern:
"""
:raises UnknownPathConcept:
"""
used_groups = []
def segment_as_regex(match):
return self._segment_as_regex(match, used_groups)
regex_parts = []
for segment in segments:
seg_regex = self._CONCEPT_RE.sub(segment_as_regex, segment.template)
if regex_parts:
# Not the first segment
if segment.mandatory:
seg_regex = f"/{seg_regex}"
else:
seg_regex = f"(?:/{seg_regex})?"
else:
# First segment
if segment.mandatory:
seg_regex = seg_regex
else:
seg_regex = f"(?:{seg_regex})?"
regex_parts.append(seg_regex)
pattern = "".join(regex_parts)
return re.compile(f"^{pattern}$")
def _segment_as_regex(self, match: re.Match, used_groups: List[str]) -> str:
"""
:raises UnknownPathConcept:
"""
cname, literal = match.groups()
if cname not in self._concepts:
raise errors.UnknownPathConcept(cname)
if cname in used_groups:
return f"(?P={cname})"
used_groups.append(cname)
concept = self._concepts[cname]
if literal:
return f"(?P<{cname}>{re.escape(literal)})"
else:
return f"(?P<{cname}>{concept.regex})"
def _serialize_concept(
self,
match: re.Match,
concept_values: Dict[str, ConceptValueType],
raise_on_missing: bool = True,
raise_on_mismatch: bool = True,
) -> Tuple[PathConcept, ConceptValueType]:
"""
:raises PathConceptValueError:
"""
cname, literal = match.groups()
concept = self._concepts[cname]
has_literal = literal is not None
provided = concept_values.get(cname)
if has_literal:
literal = concept.serialize(literal, raise_on_missing=raise_on_missing)
if provided is None:
provided = literal
has_literal = False
result = concept.serialize(provided, raise_on_missing=raise_on_missing)
if not has_literal:
return concept, result
literal = concept.serialize(literal, raise_on_missing=raise_on_missing)
if provided is not None and result != literal and raise_on_mismatch:
raise errors.PathConceptMatchError(
f"{self.name!r}={result!r} instead of {literal!r}"
)
return concept, literal
[docs]
def contains_optionals(self, concept_values: Dict[str, ConceptValueType]) -> bool:
return any(
cname not in self._required_concepts and cvalue is not None
for cname, cvalue in concept_values.items()
)
[docs]
def remove_optionals(
self, concept_values: Dict[str, ConceptValueType]
) -> Dict[str, ConceptValueType]:
names = self._required_concepts or self._concepts
return {
cname: cvalue
for cname, cvalue in concept_values.items()
if cname in names and cvalue is not None
}
@property
def required_concept_names(self) -> List[str]:
return self._required_concepts
[docs]
def strip_mount_point(
self, concept_values: Dict[str, ConceptValueType]
) -> Dict[str, ConceptValueType]:
"""
Strip all segments from the root concept before a specific marker segment.
"""
concept_values = dict(concept_values)
if not self._segments or not self._segments[0].symbolic_root_segment:
return concept_values
root_concept_name = self._segments[0].template.strip("{}")
if root_concept_name not in concept_values:
return concept_values
concept_values[root_concept_name] = _strip_before_segment(
concept_values[root_concept_name],
segment=self._segments[0].symbolic_root_segment,
)
return concept_values
def _strip_before_segment(path: str, segment: Optional[str] = None) -> str:
if segment is None:
return path
drive, tail = os.path.splitdrive(path)
parts = tail.split(os.sep)
try:
idx = parts.index(segment)
except ValueError:
return path
return os.path.join(drive or os.sep, *parts[idx:])