Source code for esrf_pathlib._schemas.schema.base
from typing import Dict
from typing import List
from typing import Optional
from .. import errors
from ..fields.base import Field
from ..fields.concept import PathConcept
from ..fields.derived import DerivedConcept
from ..fields.path import PathTemplate
from ..fields.tree import tree_as_strings
from ..identifier import SchemaIdentifier
[docs]
class BasePathSchema:
"""A versioned collection of path concepts, derived concepts and path templates."""
_FORBIDDEN_FIELD_NAMES = ("schema_name", "schema_version", "template_name")
def __init__(self, identifier: SchemaIdentifier, description: str) -> None:
self._identifier = identifier
self._description = description
self._concepts: Dict[str, PathConcept] = {}
self._derived_concepts: Dict[str, DerivedConcept] = {}
self._templates: Dict[str, PathTemplate] = {}
self._all_path_segments = []
@property
def identifier(self) -> SchemaIdentifier:
return self._identifier
@property
def description(self) -> SchemaIdentifier:
return self._description
@property
def docstring(self) -> SchemaIdentifier:
return self._description
@property
def concepts(self) -> Dict[str, PathConcept]:
return self._concepts
@property
def derived_concepts(self) -> Dict[str, DerivedConcept]:
return self._derived_concepts
@property
def templates(self) -> Dict[str, PathTemplate]:
return self._templates
def __info__(self) -> str:
lines = [f"Schema: {self.identifier}"]
lines.append(" Tree:")
tree_lines = tree_as_strings(self._all_path_segments)
lines.extend([f" {s}" for s in tree_lines])
def field_string(field: Field):
category, values = field.value_info()
if category is None:
return f" - {field.name}: {field.description}"
return f" - {field.name}: {field.description} ({category}: {', '.join(values)})"
if self.concepts:
lines.append(" Concepts:")
lines.extend(field_string(concept) for concept in self.concepts.values())
else:
lines.append(" Concepts: (none)")
if self.derived_concepts:
lines.append(" Derived concepts:")
lines.extend(
field_string(derived) for derived in self.derived_concepts.values()
)
else:
lines.append(" Derived concepts: (none)")
if self.templates:
lines.append(" Paths:")
lines.extend(field_string(template) for template in self.templates.values())
else:
lines.append(" Paths: (none)")
return "\n".join(lines)
def __repr__(self) -> str:
return f"<{type(self).__name__} {self.identifier}>"
[docs]
def has_schema(self, schema_identifier: str) -> bool:
return self.identifier.has_identifier(schema_identifier)
@staticmethod
def _create_identifier(
name: Optional[str] = None,
version: Optional[int] = None,
schemas: Optional[List["BasePathSchema"]] = None,
) -> None:
"""
:raises PathSchemaCollision:
"""
if name is not None and version is not None:
identifier_kwargs = {name: version}
schema_versions = dict()
else:
identifier_kwargs = schema_versions = dict()
for schema in schemas or []:
for name, version in schema.identifier._schema_versions.items():
if name not in schema_versions:
schema_versions[name] = version
elif version != schema_versions[name]:
raise errors.PathSchemaCollision(
f"Cannot mix different versions ({version} and {schema_versions[name]}) of the same schema {name!r}"
)
return SchemaIdentifier(**identifier_kwargs)
def _merge_fields_from(self, schema: "BasePathSchema") -> None:
"""
:raises FieldNameError:
"""
for concept in schema.concepts.values():
self._add_field(concept)
for derived in schema.derived_concepts.values():
self._add_field(derived)
for template in schema.templates.values():
self._add_field(template)
def _add_field(self, field: Field) -> None:
"""
:raises FieldNameError:
"""
if isinstance(field, PathConcept):
container = self.concepts
others = (self.derived_concepts, self.templates)
elif isinstance(field, DerivedConcept):
container = self.derived_concepts
others = (self.concepts, self.templates)
elif isinstance(field, PathTemplate):
container = self.templates
others = (self.concepts, self.derived_concepts)
else:
raise TypeError(type(field))
fname = field.name
if fname in container:
existing = container[fname]
if id(existing) == id(field):
return
raise errors.FieldNameError(f"Field {fname!r} already exists as {existing}")
else:
for other in others:
if fname in other:
existing = other[fname]
if id(existing) == id(field):
return
raise errors.FieldNameError(
f"{field} of schema {field.schema_name!r} already exists as {existing} of schema {existing.schema_name!r}"
)
container[fname] = field