Source code for esrf_pathlib._schemas.registry

import logging
import os
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union

from . import constants
from . import errors
from .definitions.load import default_schema_version
from .definitions.load import fallback_schema_versions
from .definitions.load import get_schema
from .definitions.types import ConceptValueType
from .fields.concept import PathConcept
from .fields.derived import DerivedConcept
from .fields.path import PathTemplate
from .identifier import SchemaIdentifier
from .schema.merged import MergedPathSchema
from .schema.path import BasePathSchema
from .schema.path import PathSchema

_logger = logging.getLogger(__name__)


[docs] class PathSchemaRegistry: """Use one or more path schemas to find the best matching *path template* for a path string or a dictionary of *path concepts*. With the template path strings are *parsed* to a dictionary of concept values (wrapped by a ``ParsedPath`` class). Inversally a dictionary of concept values can be *rendered* as a path string. """ def __init__(self, schemas: List[BasePathSchema]): """The order of the schemas is the order of field fallback resolution.""" if not schemas: raise ValueError("At least one schema is required") self._schemas = schemas self._name = " > ".join([str(schema.identifier) for schema in schemas]) _logger.debug("Created registry %r", self.name) @property def name(self) -> str: return self._name def __info__(self) -> str: lines = [f"PathSchemaRegistry: {self.name}"] for _schema in self._schemas: lines.extend(_schema.__info__().split("\n")) return "\n ".join(lines) def __repr__(self) -> str: return f"<{type(self).__name__} {self.name}>"
[docs] def concept_names( self, schema_identifier: Optional[SchemaIdentifier] = None ) -> List[str]: schema = self._get_schema(schema_identifier=schema_identifier) return list(schema.concepts)
[docs] def derived_concept_names( self, schema_identifier: Optional[SchemaIdentifier] = None ) -> List[str]: schema = self._get_schema(schema_identifier=schema_identifier) return list(schema.derived_concepts)
[docs] def template_names( self, schema_identifier: Optional[SchemaIdentifier] = None ) -> List[str]: schema = self._get_schema(schema_identifier=schema_identifier) return list(schema.templates)
[docs] def get_concept( self, name: str, schema_identifier: Optional[SchemaIdentifier] = None ) -> PathConcept: """ :raises UnknownPathConcept: """ schema = self._get_schema(schema_identifier=schema_identifier) if name not in schema.concepts: raise errors.UnknownPathConcept(name) return schema.concepts[name]
[docs] def get_derived_concept( self, name: str, schema_identifier: Optional[SchemaIdentifier] = None ) -> DerivedConcept: """ :raises UnknownPathConcept: """ schema = self._get_schema(schema_identifier=schema_identifier) if name not in schema.derived_concepts: raise errors.UnknownPathConcept(name) return schema.derived_concepts[name]
[docs] def get_template( self, name: str, schema_identifier: Optional[SchemaIdentifier] = None ) -> PathTemplate: """ :raises UnknownPathTemplate: """ schema = self._get_schema(schema_identifier=schema_identifier) if name not in schema.templates: raise errors.UnknownPathTemplate(name) return schema.templates[name]
def _get_schema( self, schema_identifier: Optional[SchemaIdentifier] = None ) -> BasePathSchema: if schema_identifier is None: return self._schemas[0] for schema in self._schemas: if schema.has_schema(schema_identifier): return schema return self._schemas[0]
[docs] def parse_path(self, path: Union[str, Path]) -> "ParsedPath": """ :raises PathSchemaMatchError: """ norm_path = str(path).replace(os.sep, "/") # Keep the commented prints until we find a better we of debugging matching # print("---") # print("Scores for", norm_path) best_template = None best_score = None best_concept_values = None best_has_optionals = None for _schema in self._schemas: for template in _schema.templates.values(): concept_values, score = template.parse_with_score(norm_path) if not score: continue # print(" ", score, template.name, "from", template.schema_identifier) if best_score is None or score > best_score: best_template = template best_score = score best_concept_values = concept_values best_has_optionals = template.contains_optionals(concept_values) if best_template is None: continue if not best_has_optionals: break if best_template is None: raise errors.PathSchemaMatchError(f"No template matched path {path!r}") # print( # "Final:", # best_score, # best_template.name, # "from", # best_template.schema_identifier, # ) return ParsedPath(self, best_concept_values, template=best_template)
[docs] def render_path( self, concept_values: Dict[str, ConceptValueType], template_name: Optional[str] = None, schema_identifier: Optional[SchemaIdentifier] = None, ) -> str: """ :raises PathSchemaMatchError: :raises PathTemplateMatchError: :raises UnknownPathTemplate: """ concept_values = self._fill_default_concepts( concept_values, schema_identifier=schema_identifier ) parsed = ParsedPath(self, concept_values) # Note: do not keep the `parsed` instance because the concept values # not to go through a serialize+deserialize cycle for normalization. return parsed.render(template_name=template_name)
def _fill_default_concepts( self, concept_values: Dict[str, ConceptValueType], schema_identifier: Optional[SchemaIdentifier] = None, ) -> None: provided_names = set(concept_values) all_names = set(self.concept_names(schema_identifier)) if not (provided_names & all_names): return concept_values concept_values = { cname: cvalue for cname, cvalue in concept_values.items() if cvalue is not None } known_names = set(concept_values) unknown_names = all_names - known_names schema = self._get_schema() for concept_name in unknown_names: concept = schema.concepts[concept_name] default_value = concept._default_value if default_value is not None: concept_values[concept_name] = default_value return concept_values
_CACHE: Dict[Tuple[Tuple[str, int]], PathSchemaRegistry] = {}
[docs] def get_schema_registry( fallback_depth: Optional[int] = None, **schemas: Optional[int] ) -> PathSchemaRegistry: """ :raises FieldNameError: :raises PathSchemaCollision: :raises UnknownPathSchema: """ if not schemas: schemas = {constants.UNKNOWN_SCHEMA_NAME: 1} # Resolve version for each schema: 0 or None -> latest available main_versions = { sname: sversion if sversion is not None else default_schema_version(sname) for sname, sversion in schemas.items() } # Return when cached cache_key = (tuple(sorted(main_versions.items())), fallback_depth) registry = _CACHE.get(cache_key) if registry is not None: return registry # Build main merged schema main_schemas: List[PathSchema] = [] for sname, sversion in main_versions.items(): main_schemas.append(get_schema(sname, sversion)) main_schema = MergedPathSchema(main_schemas) schemas = [main_schema] # Build secondary merged schemas from older versions fallback_versions = fallback_schema_versions( main_versions, fallback_depth=fallback_depth ) if fallback_versions: for older_versions in fallback_versions: older_schemas = [ get_schema(sname, sversion) for sname, sversion in older_versions.items() ] try: schemas.append(MergedPathSchema(older_schemas)) except (errors.FieldNameError, errors.PathSchemaCollision): # Schema versions are not compatible pass registry = PathSchemaRegistry(schemas) _CACHE[cache_key] = registry return registry
[docs] class ParsedPath: """Container of path concept values which are either provided by the users or extracted from a path string by a schema registry. The corresponding path template is provided or can be derived using the registry. `Fields` are exposed as instance attributes. A field can be a *path concept*, *derived concept* or *rendered path template*. """ def __init__( self, registry: PathSchemaRegistry, concept_values: Dict[str, ConceptValueType], template: Optional[PathTemplate] = None, ): self._concept_values = { k: v for k, v in concept_values.items() if v is not None } self._template = template self._registry = registry @property def concept_names(self) -> List[str]: return list(self._concept_values) @property def required_concept_names(self) -> List[str]: return self.template.required_concept_names @property def template(self) -> PathTemplate: """ :raises UnknownPathTemplate: :raises PathSchemaMatchError: """ return self._get_template() @property def template_name(self) -> str: try: return self.template.name except errors.UnknownPathTemplate: return "<none>" except errors.PathSchemaMatchError: return "<none>" except Exception: # Used in __repr__ and avoid circular calls in case we get an unexpected error _logger.exception("Failed unexpectidly") return "<none>" @property def schema_identifier(self) -> SchemaIdentifier: try: return self.template.schema_identifier except errors.UnknownPathTemplate: return "<none>" except errors.PathSchemaMatchError: return "<none>" except Exception: # Used in __repr__ and avoid circular calls in case we get an unexpected error _logger.exception("Failed unexpectidly") return "<none>" @property def _optional_schema_identifier(self) -> Optional[str]: try: return self.template.schema_identifier except (errors.UnknownPathTemplate, errors.PathSchemaMatchError): return None def __repr__(self) -> str: return f"<{type(self).__name__} schema={str(self.schema_identifier)!r} template={self.template_name!r} concepts={self._concept_values}>" def __info__(self, include_missing: bool = False) -> str: lines = [ f"Template name: {self.template_name} (schema={self.schema_identifier})", f"Path: {self.render()}", ] fields = self.field_values(include_missing=include_missing) for fname, fvalue in fields.items(): lines.append(f" {fname}: {fvalue}") return "\n".join(lines) def __eq__(self, value: Any) -> bool: if isinstance(value, ParsedPath): return self._concept_values == value._concept_values return super().__eq__(value) def __getattr__(self, item: str) -> Any: """ :raises SchemaAttributeError: """ try: return self.get_field_value(item, raise_on_mismatch=False) except (errors.PathConceptValueError, errors.UnknownField) as ex: raise errors.SchemaAttributeError(item) from ex
[docs] def get_field_value( self, item: str, raise_on_missing: bool = True, raise_on_mismatch: bool = True ) -> Any: """ :raises UnknownField: :raises PathConceptValueError: """ item = str(item) schema_identifier = self._optional_schema_identifier if item in self._registry.concept_names(schema_identifier): if item in self._concept_values: value = self._concept_values[item] if value is not None: return value raise errors.PathConceptWithoutValue( f"{item!r} not known for {self.template_name!r}" ) if item in self._registry.template_names(schema_identifier): template = self._registry.get_template(item, schema_identifier) concept_values = template.remove_optionals(self._concept_values) return template.render( concept_values, raise_on_missing=raise_on_missing, raise_on_mismatch=raise_on_mismatch, ) if item in self._registry.derived_concept_names(schema_identifier): concept = self._registry.get_derived_concept(item, schema_identifier) return concept.derive( self._concept_values, raise_on_missing=raise_on_missing ) raise errors.UnknownField( f"{item!r} is not a known field: {self.field_names()}" )
[docs] def field_names(self, include_missing: bool = False) -> List[str]: """Return all field names. A field is either a path concept, derived path concept or a path template.""" schema_identifier = self._optional_schema_identifier if include_missing: concept_names = self._registry.concept_names(schema_identifier) else: concept_names = list(self._concept_values) return ( concept_names + self._registry.derived_concept_names(schema_identifier) + self._registry.template_names(schema_identifier) )
[docs] def field_values(self, include_missing: bool = False) -> Dict[str, Any]: """Return all fields that can be derived from the known concepts. Use "*" for missing concepts. """ field_values = {} for fname in self.field_names(include_missing=include_missing): try: field_values[fname] = self.get_field_value( fname, raise_on_missing=not include_missing ) except (errors.PathConceptValueError, errors.UnknownField): pass return field_values
[docs] def path_field_names(self) -> List[str]: """Return all field names that are paths (path templates).""" schema_identifier = self._optional_schema_identifier return self._registry.template_names(schema_identifier)
[docs] def nonpath_field_names(self, include_missing: bool = False) -> List[str]: """Return all field names that are not paths (path concepts and derived path concepts).""" schema_identifier = self._optional_schema_identifier if include_missing: concept_names = self._registry.concept_names(schema_identifier) else: concept_names = list(self._concept_values) return concept_names + self._registry.derived_concept_names(schema_identifier)
[docs] def render(self, template_name: Optional[str] = None) -> str: """Convert schema to path. :raises PathSchemaMatchError: :raises PathTemplateMatchError: :raises UnknownPathTemplate: """ template = self._get_template(template_name=template_name) try: return template.render(self._concept_values) except errors.PathConceptValueError as ex: raise errors.PathTemplateMatchError( f"Path is not a {template.name!r}" ) from ex
def _get_template(self, template_name: Optional[str] = None) -> PathTemplate: """ :raises UnknownPathTemplate: :raises PathSchemaMatchError: """ if template_name: return self._registry.get_template(template_name) if self._template is not None: return self._template # Keep the commented prints until we find a better we of debugging matching # print("---") # print("Scores for", self._concept_values) best_template = None best_score = None for template_name in self._registry.template_names(): template = self._registry.get_template(template_name) score = template.render_score(self._concept_values) if not score: continue # print(" ", score, template.name, "from", template.schema_identifier) if best_score is None or score > best_score: best_template = template best_score = score if best_template is None: raise errors.PathSchemaMatchError( f"No path template can be found that is fully described by the fields {self._concept_values}" ) # print( # "Best:", # best_score, # best_template.name, # "from", # best_template.schema_identifier, # ) self._template = best_template return best_template
[docs] def replace_concepts( self, template_name: Optional[str] = None, **concept_values: ConceptValueType, ) -> "ParsedPath": """Returns a new ``ParsedPath`` instance with additional or replaces concepts. :raises PathSchemaMatchError: :raises PathTemplateMatchError: """ new_concept_values = self._concept_values.copy() new_concept_values.update(concept_values) if template_name: template = self._get_template(template_name) else: template = None parsed = ParsedPath(self._registry, new_concept_values, template=template) path = parsed.render() return self._registry.parse_path(path)
[docs] def strip_mount_point(self) -> "ParsedPath": """Returns a new ``ParsedPath`` instance with stripped leading segments. :raises PathSchemaMatchError: :raises PathTemplateMatchError: """ template = self._get_template() new_concept_values = self._template.strip_mount_point(self._concept_values) parsed = ParsedPath(self._registry, new_concept_values, template=template) path = parsed.render() return self._registry.parse_path(path)