Source code for esrf_pathlib.core

import pathlib
import sys
from typing import Any
from typing import List
from typing import Optional
from typing import Union

from ._schemas.definitions.types import ConceptValueType as _ConceptValueType
from ._schemas.errors import PathSchemaMatchError as _PathSchemaMatchError
from ._schemas.errors import PathTemplateMatchError as _PathTemplateMatchError
from ._schemas.errors import SchemaAttributeError as _SchemaAttributeError
from ._schemas.registry import PathSchemaRegistry as _Registry
from ._schemas.registry import get_schema_registry as _get_registry

if sys.version_info >= (3, 12):
    # https://docs.python.org/3/whatsnew/3.12.html
    # -> The pathlib.Path class now supports subclassing

    class _BaseESRFPath(pathlib.Path):
        __slots__ = ("_esrf_parsed",)

        def __init__(self, *args):
            super().__init__(*args)
            _add_esrf_parsed(self, *args)

else:

    class _BaseESRFPath(type(pathlib.Path())):
        __slots__ = ("_esrf_parsed",)

        def __new__(cls, *args):
            self = super().__new__(cls, *args)
            _add_esrf_parsed(self, *args)
            return self

        @classmethod
        def _from_parts(cls, args, **kwargs):
            self = super()._from_parts(args, **kwargs)
            _add_esrf_parsed(self, *args)
            return self

        @classmethod
        def _from_parsed_parts(cls, drv, root, parts):
            self = super()._from_parsed_parts(drv, root, parts)
            _add_esrf_parsed(self)
            return self


[docs] class ESRFPath(_BaseESRFPath): """Provides all features of ``pathlib.Path`` with additional attributes depending on the matching ESRF filesystem path template. """ _SCHEMAS = {"esrf": None} # Latest version _FALLBACK_DEPTH = 1 # Fall back one version by default _REGISTRY = None def __init_subclass__(cls, fallback_depth: Optional[int] = None, **schemas): """Allow subclasses to declare their own schemas like: class TomoPath(ESRFPath, tomo=1): pass """ super().__init_subclass__() if schemas != cls._SCHEMAS: cls._SCHEMAS = schemas cls._REGISTRY = None if fallback_depth != cls._FALLBACK_DEPTH: cls._FALLBACK_DEPTH = fallback_depth cls._REGISTRY = None def __getattr__(self, attr_name: str) -> Any: """ :raises SchemaAttributeError: """ if attr_name in self._get_schema_path_attributes(): return ESRFPath(getattr(self._esrf_parsed, attr_name)) if attr_name in self._get_schema_nonpath_attributes(): return getattr(self._esrf_parsed, attr_name) if not attr_name.startswith("_"): raise _SchemaAttributeError(f"{self!r} has no attribute {attr_name!r}") raise _SchemaAttributeError( f"{type(self).__name__!r} object has no attribute {attr_name!r}" ) def __setattr__(self, attr_name: str, value: Any) -> None: if not attr_name.startswith("_") and attr_name in self._get_schema_attributes(): raise _SchemaAttributeError( f"Attribute {attr_name!r} is immutable. Create a new path instance with `{type(self).__name__}.replace_fields({attr_name}={value!r})`." ) return super().__setattr__(attr_name, value) def _get_schema_path_attributes(self) -> List[str]: if self._esrf_parsed is None: return [] return self._esrf_parsed.path_field_names() def _get_schema_nonpath_attributes(self) -> List[str]: if self._esrf_parsed is None: return [] return self._esrf_parsed.nonpath_field_names() def _get_schema_attributes(self) -> List[str]: if self._esrf_parsed is None: return [] return self._esrf_parsed.field_names() def __dir__(self) -> List[str]: return sorted(set(super().__dir__()) | set(self._get_schema_attributes())) def __repr__(self) -> str: return f"<{type(self).__name__} {str(self)!r} schema={self.schema_name!r} template={self.template_name!r}>" @property def schema_name(self) -> str: """Name of the ESRF path schema if it matches a registered schema.""" if self._esrf_parsed: return str(self._esrf_parsed.schema_identifier) else: return "<none>" @property def template_name(self) -> str: """Name of the ESRF path template if it matches a template from a registered schema.""" if self._esrf_parsed: return self._esrf_parsed.template_name else: return "<none>"
[docs] def replace_fields( self, template_name: Optional[str] = None, **fields: _ConceptValueType, ) -> "ESRFPath": """Creates a new `ESRFPath`, replacing fields with values from `fields`. :raises PathSchemaMatchError: :raises PathTemplateMatchError: """ if self._esrf_parsed is None: raise _PathSchemaMatchError("Path does not match any ESRF schema") current_parsed = self._esrf_parsed # Add/replace concept values new_parsed = current_parsed.replace_concepts( template_name=template_name, **fields ) # Render the new path with the existing template new_path = None if template_name is None and set(current_parsed.concept_names) == set( new_parsed.concept_names ): try: new_path = new_parsed.render(template_name=current_parsed.template_name) except _PathTemplateMatchError: pass # Render the new path with a new template (best match) if new_path is None: new_path = new_parsed.render() # New ESRFPath instance new_instance = self.__class__(new_path) # Append part not defined by the schema original_schema_base = current_parsed.render() part_not_in_template = self.relative_to(original_schema_base) return new_instance / part_not_in_template
[docs] def strip_mount_point(self) -> "ESRFPath": """ Remove leading segments typically introduced by ``ESRFPath.resolve()``. """ if self._esrf_parsed is None: return self.__class__(self) new_parsed = self._esrf_parsed.strip_mount_point() new_path = new_parsed.render() return self.__class__(new_path)
[docs] @classmethod def from_fields( cls, schema_name: Optional[str] = None, schema_version: Optional[int] = None, template_name: Optional[str] = None, **fields: _ConceptValueType, ) -> "ESRFPath": """Create ``ESRFPath`` from schema fields. :raises PathSchemaMatchError: :raises PathTemplateMatchError: :raises UnknownPathTemplate: """ if schema_name is not None: registry = _get_registry(**{schema_name: schema_version}) else: registry = cls._get_registry() path = registry.render_path(fields, template_name=template_name) instance = cls(path) instance._esrf_parsed = registry.parse_path(path) return instance
[docs] @classmethod def from_path( cls, path: Union[str, pathlib.Path], schema_name: Optional[str] = None, schema_version: Optional[int] = None, template_name: Optional[str] = None, ) -> "ESRFPath": """Instantiate ``ESRFPath`` with a specific schema, schema version and template. :raises UnknownPathSchema: :raises PathSchemaCollision: :raises UnknownPathTemplate: :raises PathSchemaMatchError: :raises PathTemplateMatchError: """ if schema_name is not None: registry = _get_registry(**{schema_name: schema_version}) else: registry = cls._get_registry() parsed_path = registry.parse_path(path) if template_name: _ = parsed_path.render(template_name) instance = cls(path) instance._esrf_parsed = parsed_path return instance
@classmethod def _get_registry(cls) -> _Registry: """Get default registry of the class. :raises UnknownPathSchema: :raises PathSchemaCollision: """ if cls._REGISTRY is None: cls._REGISTRY = _get_registry( fallback_depth=cls._FALLBACK_DEPTH, **cls._SCHEMAS ) return cls._REGISTRY
def _add_esrf_parsed(self: _BaseESRFPath, *args) -> None: self._esrf_parsed = None registry = None for arg in args: if isinstance(arg, ESRFPath): try: _esrf_schema = arg._esrf_schema except AttributeError: continue if _esrf_schema is not None: registry = _esrf_schema._registry break if registry is None: registry = self._get_registry() try: self._esrf_parsed = registry.parse_path(self) except _PathSchemaMatchError: pass