import logging
import os
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
from . import constants
from . import errors
from .definitions.load import default_schema_version
from .definitions.load import fallback_schema_versions
from .definitions.load import get_schema
from .definitions.types import ConceptValueType
from .fields.concept import PathConcept
from .fields.derived import DerivedConcept
from .fields.path import PathTemplate
from .identifier import SchemaIdentifier
from .schema.merged import MergedPathSchema
from .schema.path import BasePathSchema
from .schema.path import PathSchema
_logger = logging.getLogger(__name__)
[docs]
class PathSchemaRegistry:
"""Use one or more path schemas to find the best matching *path template* for a path
string or a dictionary of *path concepts*. With the template path strings are *parsed*
to a dictionary of concept values (wrapped by a ``ParsedPath`` class). Inversally
a dictionary of concept values can be *rendered* as a path string.
"""
def __init__(self, schemas: List[BasePathSchema]):
"""The order of the schemas is the order of field fallback resolution."""
if not schemas:
raise ValueError("At least one schema is required")
self._schemas = schemas
self._name = " > ".join([str(schema.identifier) for schema in schemas])
_logger.debug("Created registry %r", self.name)
@property
def name(self) -> str:
return self._name
def __info__(self) -> str:
lines = [f"PathSchemaRegistry: {self.name}"]
for _schema in self._schemas:
lines.extend(_schema.__info__().split("\n"))
return "\n ".join(lines)
def __repr__(self) -> str:
return f"<{type(self).__name__} {self.name}>"
[docs]
def concept_names(
self, schema_identifier: Optional[SchemaIdentifier] = None
) -> List[str]:
schema = self._get_schema(schema_identifier=schema_identifier)
return list(schema.concepts)
[docs]
def derived_concept_names(
self, schema_identifier: Optional[SchemaIdentifier] = None
) -> List[str]:
schema = self._get_schema(schema_identifier=schema_identifier)
return list(schema.derived_concepts)
[docs]
def template_names(
self, schema_identifier: Optional[SchemaIdentifier] = None
) -> List[str]:
schema = self._get_schema(schema_identifier=schema_identifier)
return list(schema.templates)
[docs]
def get_concept(
self, name: str, schema_identifier: Optional[SchemaIdentifier] = None
) -> PathConcept:
"""
:raises UnknownPathConcept:
"""
schema = self._get_schema(schema_identifier=schema_identifier)
if name not in schema.concepts:
raise errors.UnknownPathConcept(name)
return schema.concepts[name]
[docs]
def get_derived_concept(
self, name: str, schema_identifier: Optional[SchemaIdentifier] = None
) -> DerivedConcept:
"""
:raises UnknownPathConcept:
"""
schema = self._get_schema(schema_identifier=schema_identifier)
if name not in schema.derived_concepts:
raise errors.UnknownPathConcept(name)
return schema.derived_concepts[name]
[docs]
def get_template(
self, name: str, schema_identifier: Optional[SchemaIdentifier] = None
) -> PathTemplate:
"""
:raises UnknownPathTemplate:
"""
schema = self._get_schema(schema_identifier=schema_identifier)
if name not in schema.templates:
raise errors.UnknownPathTemplate(name)
return schema.templates[name]
def _get_schema(
self, schema_identifier: Optional[SchemaIdentifier] = None
) -> BasePathSchema:
if schema_identifier is None:
return self._schemas[0]
for schema in self._schemas:
if schema.has_schema(schema_identifier):
return schema
return self._schemas[0]
[docs]
def parse_path(self, path: Union[str, Path]) -> "ParsedPath":
"""
:raises PathSchemaMatchError:
"""
norm_path = str(path).replace(os.sep, "/")
# Keep the commented prints until we find a better we of debugging matching
# print("---")
# print("Scores for", norm_path)
best_template = None
best_score = None
best_concept_values = None
best_has_optionals = None
for _schema in self._schemas:
for template in _schema.templates.values():
concept_values, score = template.parse_with_score(norm_path)
if not score:
continue
# print(" ", score, template.name, "from", template.schema_identifier)
if best_score is None or score > best_score:
best_template = template
best_score = score
best_concept_values = concept_values
best_has_optionals = template.contains_optionals(concept_values)
if best_template is None:
continue
if not best_has_optionals:
break
if best_template is None:
raise errors.PathSchemaMatchError(f"No template matched path {path!r}")
# print(
# "Final:",
# best_score,
# best_template.name,
# "from",
# best_template.schema_identifier,
# )
return ParsedPath(self, best_concept_values, template=best_template)
[docs]
def render_path(
self,
concept_values: Dict[str, ConceptValueType],
template_name: Optional[str] = None,
schema_identifier: Optional[SchemaIdentifier] = None,
) -> str:
"""
:raises PathSchemaMatchError:
:raises PathTemplateMatchError:
:raises UnknownPathTemplate:
"""
concept_values = self._fill_default_concepts(
concept_values, schema_identifier=schema_identifier
)
parsed = ParsedPath(self, concept_values)
# Note: do not keep the `parsed` instance because the concept values
# not to go through a serialize+deserialize cycle for normalization.
return parsed.render(template_name=template_name)
def _fill_default_concepts(
self,
concept_values: Dict[str, ConceptValueType],
schema_identifier: Optional[SchemaIdentifier] = None,
) -> None:
provided_names = set(concept_values)
all_names = set(self.concept_names(schema_identifier))
if not (provided_names & all_names):
return concept_values
concept_values = {
cname: cvalue
for cname, cvalue in concept_values.items()
if cvalue is not None
}
known_names = set(concept_values)
unknown_names = all_names - known_names
schema = self._get_schema()
for concept_name in unknown_names:
concept = schema.concepts[concept_name]
default_value = concept._default_value
if default_value is not None:
concept_values[concept_name] = default_value
return concept_values
_CACHE: Dict[Tuple[Tuple[str, int]], PathSchemaRegistry] = {}
[docs]
def get_schema_registry(
fallback_depth: Optional[int] = None, **schemas: Optional[int]
) -> PathSchemaRegistry:
"""
:raises FieldNameError:
:raises PathSchemaCollision:
:raises UnknownPathSchema:
"""
if not schemas:
schemas = {constants.UNKNOWN_SCHEMA_NAME: 1}
# Resolve version for each schema: 0 or None -> latest available
main_versions = {
sname: sversion if sversion is not None else default_schema_version(sname)
for sname, sversion in schemas.items()
}
# Return when cached
cache_key = (tuple(sorted(main_versions.items())), fallback_depth)
registry = _CACHE.get(cache_key)
if registry is not None:
return registry
# Build main merged schema
main_schemas: List[PathSchema] = []
for sname, sversion in main_versions.items():
main_schemas.append(get_schema(sname, sversion))
main_schema = MergedPathSchema(main_schemas)
schemas = [main_schema]
# Build secondary merged schemas from older versions
fallback_versions = fallback_schema_versions(
main_versions, fallback_depth=fallback_depth
)
if fallback_versions:
for older_versions in fallback_versions:
older_schemas = [
get_schema(sname, sversion)
for sname, sversion in older_versions.items()
]
try:
schemas.append(MergedPathSchema(older_schemas))
except (errors.FieldNameError, errors.PathSchemaCollision):
# Schema versions are not compatible
pass
registry = PathSchemaRegistry(schemas)
_CACHE[cache_key] = registry
return registry
[docs]
class ParsedPath:
"""Container of path concept values which are either provided by the users
or extracted from a path string by a schema registry. The corresponding path
template is provided or can be derived using the registry.
`Fields` are exposed as instance attributes. A field can be a *path concept*,
*derived concept* or *rendered path template*.
"""
def __init__(
self,
registry: PathSchemaRegistry,
concept_values: Dict[str, ConceptValueType],
template: Optional[PathTemplate] = None,
):
self._concept_values = {
k: v for k, v in concept_values.items() if v is not None
}
self._template = template
self._registry = registry
@property
def concept_names(self) -> List[str]:
return list(self._concept_values)
@property
def required_concept_names(self) -> List[str]:
return self.template.required_concept_names
@property
def template(self) -> PathTemplate:
"""
:raises UnknownPathTemplate:
:raises PathSchemaMatchError:
"""
return self._get_template()
@property
def template_name(self) -> str:
try:
return self.template.name
except errors.UnknownPathTemplate:
return "<none>"
except errors.PathSchemaMatchError:
return "<none>"
except Exception:
# Used in __repr__ and avoid circular calls in case we get an unexpected error
_logger.exception("Failed unexpectidly")
return "<none>"
@property
def schema_identifier(self) -> SchemaIdentifier:
try:
return self.template.schema_identifier
except errors.UnknownPathTemplate:
return "<none>"
except errors.PathSchemaMatchError:
return "<none>"
except Exception:
# Used in __repr__ and avoid circular calls in case we get an unexpected error
_logger.exception("Failed unexpectidly")
return "<none>"
@property
def _optional_schema_identifier(self) -> Optional[str]:
try:
return self.template.schema_identifier
except (errors.UnknownPathTemplate, errors.PathSchemaMatchError):
return None
def __repr__(self) -> str:
return f"<{type(self).__name__} schema={str(self.schema_identifier)!r} template={self.template_name!r} concepts={self._concept_values}>"
def __info__(self, include_missing: bool = False) -> str:
lines = [
f"Template name: {self.template_name} (schema={self.schema_identifier})",
f"Path: {self.render()}",
]
fields = self.field_values(include_missing=include_missing)
for fname, fvalue in fields.items():
lines.append(f" {fname}: {fvalue}")
return "\n".join(lines)
def __eq__(self, value: Any) -> bool:
if isinstance(value, ParsedPath):
return self._concept_values == value._concept_values
return super().__eq__(value)
def __getattr__(self, item: str) -> Any:
"""
:raises SchemaAttributeError:
"""
try:
return self.get_field_value(item, raise_on_mismatch=False)
except (errors.PathConceptValueError, errors.UnknownField) as ex:
raise errors.SchemaAttributeError(item) from ex
[docs]
def get_field_value(
self, item: str, raise_on_missing: bool = True, raise_on_mismatch: bool = True
) -> Any:
"""
:raises UnknownField:
:raises PathConceptValueError:
"""
item = str(item)
schema_identifier = self._optional_schema_identifier
if item in self._registry.concept_names(schema_identifier):
if item in self._concept_values:
value = self._concept_values[item]
if value is not None:
return value
raise errors.PathConceptWithoutValue(
f"{item!r} not known for {self.template_name!r}"
)
if item in self._registry.template_names(schema_identifier):
template = self._registry.get_template(item, schema_identifier)
concept_values = template.remove_optionals(self._concept_values)
return template.render(
concept_values,
raise_on_missing=raise_on_missing,
raise_on_mismatch=raise_on_mismatch,
)
if item in self._registry.derived_concept_names(schema_identifier):
concept = self._registry.get_derived_concept(item, schema_identifier)
return concept.derive(
self._concept_values, raise_on_missing=raise_on_missing
)
raise errors.UnknownField(
f"{item!r} is not a known field: {self.field_names()}"
)
[docs]
def field_names(self, include_missing: bool = False) -> List[str]:
"""Return all field names. A field is either a path concept,
derived path concept or a path template."""
schema_identifier = self._optional_schema_identifier
if include_missing:
concept_names = self._registry.concept_names(schema_identifier)
else:
concept_names = list(self._concept_values)
return (
concept_names
+ self._registry.derived_concept_names(schema_identifier)
+ self._registry.template_names(schema_identifier)
)
[docs]
def field_values(self, include_missing: bool = False) -> Dict[str, Any]:
"""Return all fields that can be derived from the known concepts.
Use "*" for missing concepts.
"""
field_values = {}
for fname in self.field_names(include_missing=include_missing):
try:
field_values[fname] = self.get_field_value(
fname, raise_on_missing=not include_missing
)
except (errors.PathConceptValueError, errors.UnknownField):
pass
return field_values
[docs]
def path_field_names(self) -> List[str]:
"""Return all field names that are paths (path templates)."""
schema_identifier = self._optional_schema_identifier
return self._registry.template_names(schema_identifier)
[docs]
def nonpath_field_names(self, include_missing: bool = False) -> List[str]:
"""Return all field names that are not paths (path concepts and derived path concepts)."""
schema_identifier = self._optional_schema_identifier
if include_missing:
concept_names = self._registry.concept_names(schema_identifier)
else:
concept_names = list(self._concept_values)
return concept_names + self._registry.derived_concept_names(schema_identifier)
[docs]
def render(self, template_name: Optional[str] = None) -> str:
"""Convert schema to path.
:raises PathSchemaMatchError:
:raises PathTemplateMatchError:
:raises UnknownPathTemplate:
"""
template = self._get_template(template_name=template_name)
try:
return template.render(self._concept_values)
except errors.PathConceptValueError as ex:
raise errors.PathTemplateMatchError(
f"Path is not a {template.name!r}"
) from ex
def _get_template(self, template_name: Optional[str] = None) -> PathTemplate:
"""
:raises UnknownPathTemplate:
:raises PathSchemaMatchError:
"""
if template_name:
return self._registry.get_template(template_name)
if self._template is not None:
return self._template
# Keep the commented prints until we find a better we of debugging matching
# print("---")
# print("Scores for", self._concept_values)
best_template = None
best_score = None
for template_name in self._registry.template_names():
template = self._registry.get_template(template_name)
score = template.render_score(self._concept_values)
if not score:
continue
# print(" ", score, template.name, "from", template.schema_identifier)
if best_score is None or score > best_score:
best_template = template
best_score = score
if best_template is None:
raise errors.PathSchemaMatchError(
f"No path template can be found that is fully described by the fields {self._concept_values}"
)
# print(
# "Best:",
# best_score,
# best_template.name,
# "from",
# best_template.schema_identifier,
# )
self._template = best_template
return best_template
[docs]
def replace_concepts(
self,
template_name: Optional[str] = None,
**concept_values: ConceptValueType,
) -> "ParsedPath":
"""Returns a new ``ParsedPath`` instance with additional or replaces concepts.
:raises PathSchemaMatchError:
:raises PathTemplateMatchError:
"""
new_concept_values = self._concept_values.copy()
new_concept_values.update(concept_values)
if template_name:
template = self._get_template(template_name)
else:
template = None
parsed = ParsedPath(self._registry, new_concept_values, template=template)
path = parsed.render()
return self._registry.parse_path(path)
[docs]
def strip_mount_point(self) -> "ParsedPath":
"""Returns a new ``ParsedPath`` instance with stripped leading segments.
:raises PathSchemaMatchError:
:raises PathTemplateMatchError:
"""
template = self._get_template()
new_concept_values = self._template.strip_mount_point(self._concept_values)
parsed = ParsedPath(self._registry, new_concept_values, template=template)
path = parsed.render()
return self._registry.parse_path(path)