Source code for esrf_pathlib._schemas.schema.base

from typing import Dict
from typing import List
from typing import Optional

from .. import errors
from ..fields.base import Field
from ..fields.concept import PathConcept
from ..fields.derived import DerivedConcept
from ..fields.path import PathTemplate
from ..fields.tree import tree_as_strings
from ..identifier import SchemaIdentifier


[docs] class BasePathSchema: """A versioned collection of path concepts, derived concepts and path templates.""" _FORBIDDEN_FIELD_NAMES = ("schema_name", "schema_version", "template_name") def __init__(self, identifier: SchemaIdentifier, description: str) -> None: self._identifier = identifier self._description = description self._concepts: Dict[str, PathConcept] = {} self._derived_concepts: Dict[str, DerivedConcept] = {} self._templates: Dict[str, PathTemplate] = {} self._all_path_segments = [] @property def identifier(self) -> SchemaIdentifier: return self._identifier @property def description(self) -> SchemaIdentifier: return self._description @property def docstring(self) -> SchemaIdentifier: return self._description @property def concepts(self) -> Dict[str, PathConcept]: return self._concepts @property def derived_concepts(self) -> Dict[str, DerivedConcept]: return self._derived_concepts @property def templates(self) -> Dict[str, PathTemplate]: return self._templates def __info__(self) -> str: lines = [f"Schema: {self.identifier}"] lines.append(" Tree:") tree_lines = tree_as_strings(self._all_path_segments) lines.extend([f" {s}" for s in tree_lines]) def field_string(field: Field): category, values = field.value_info() if category is None: return f" - {field.name}: {field.description}" return f" - {field.name}: {field.description} ({category}: {', '.join(values)})" if self.concepts: lines.append(" Concepts:") lines.extend(field_string(concept) for concept in self.concepts.values()) else: lines.append(" Concepts: (none)") if self.derived_concepts: lines.append(" Derived concepts:") lines.extend( field_string(derived) for derived in self.derived_concepts.values() ) else: lines.append(" Derived concepts: (none)") if self.templates: lines.append(" Paths:") lines.extend(field_string(template) for template in self.templates.values()) else: lines.append(" Paths: (none)") return "\n".join(lines) def __repr__(self) -> str: return f"<{type(self).__name__} {self.identifier}>"
[docs] def has_schema(self, schema_identifier: str) -> bool: return self.identifier.has_identifier(schema_identifier)
@staticmethod def _create_identifier( name: Optional[str] = None, version: Optional[int] = None, schemas: Optional[List["BasePathSchema"]] = None, ) -> None: """ :raises PathSchemaCollision: """ if name is not None and version is not None: identifier_kwargs = {name: version} schema_versions = dict() else: identifier_kwargs = schema_versions = dict() for schema in schemas or []: for name, version in schema.identifier._schema_versions.items(): if name not in schema_versions: schema_versions[name] = version elif version != schema_versions[name]: raise errors.PathSchemaCollision( f"Cannot mix different versions ({version} and {schema_versions[name]}) of the same schema {name!r}" ) return SchemaIdentifier(**identifier_kwargs) def _merge_fields_from(self, schema: "BasePathSchema") -> None: """ :raises FieldNameError: """ for concept in schema.concepts.values(): self._add_field(concept) for derived in schema.derived_concepts.values(): self._add_field(derived) for template in schema.templates.values(): self._add_field(template) def _add_field(self, field: Field) -> None: """ :raises FieldNameError: """ if isinstance(field, PathConcept): container = self.concepts others = (self.derived_concepts, self.templates) elif isinstance(field, DerivedConcept): container = self.derived_concepts others = (self.concepts, self.templates) elif isinstance(field, PathTemplate): container = self.templates others = (self.concepts, self.derived_concepts) else: raise TypeError(type(field)) fname = field.name if fname in container: existing = container[fname] if id(existing) == id(field): return raise errors.FieldNameError(f"Field {fname!r} already exists as {existing}") else: for other in others: if fname in other: existing = other[fname] if id(existing) == id(field): return raise errors.FieldNameError( f"{field} of schema {field.schema_name!r} already exists as {existing} of schema {existing.schema_name!r}" ) container[fname] = field