import pathlib
import sys
from typing import Any
from typing import List
from typing import Optional
from typing import Union
from ._schemas.definitions.types import ConceptValueType as _ConceptValueType
from ._schemas.errors import PathSchemaMatchError as _PathSchemaMatchError
from ._schemas.errors import PathTemplateMatchError as _PathTemplateMatchError
from ._schemas.errors import SchemaAttributeError as _SchemaAttributeError
from ._schemas.registry import PathSchemaRegistry as _Registry
from ._schemas.registry import get_schema_registry as _get_registry
if sys.version_info >= (3, 12):
# https://docs.python.org/3/whatsnew/3.12.html
# -> The pathlib.Path class now supports subclassing
class _BaseESRFPath(pathlib.Path):
__slots__ = ("_esrf_parsed",)
def __init__(self, *args):
super().__init__(*args)
_add_esrf_parsed(self, *args)
else:
class _BaseESRFPath(type(pathlib.Path())):
__slots__ = ("_esrf_parsed",)
def __new__(cls, *args):
self = super().__new__(cls, *args)
_add_esrf_parsed(self, *args)
return self
@classmethod
def _from_parts(cls, args, **kwargs):
self = super()._from_parts(args, **kwargs)
_add_esrf_parsed(self, *args)
return self
@classmethod
def _from_parsed_parts(cls, drv, root, parts):
self = super()._from_parsed_parts(drv, root, parts)
_add_esrf_parsed(self)
return self
[docs]
class ESRFPath(_BaseESRFPath):
"""Provides all features of ``pathlib.Path`` with additional attributes
depending on the matching ESRF filesystem path template.
"""
_SCHEMAS = {"esrf": None} # Latest version
_FALLBACK_DEPTH = 1 # Fall back one version by default
_REGISTRY = None
def __init_subclass__(cls, fallback_depth: Optional[int] = None, **schemas):
"""Allow subclasses to declare their own schemas like:
class TomoPath(ESRFPath, tomo=1):
pass
"""
super().__init_subclass__()
if schemas != cls._SCHEMAS:
cls._SCHEMAS = schemas
cls._REGISTRY = None
if fallback_depth != cls._FALLBACK_DEPTH:
cls._FALLBACK_DEPTH = fallback_depth
cls._REGISTRY = None
def __getattr__(self, attr_name: str) -> Any:
"""
:raises SchemaAttributeError:
"""
if attr_name in self._get_schema_path_attributes():
return ESRFPath(getattr(self._esrf_parsed, attr_name))
if attr_name in self._get_schema_nonpath_attributes():
return getattr(self._esrf_parsed, attr_name)
if not attr_name.startswith("_"):
raise _SchemaAttributeError(f"{self!r} has no attribute {attr_name!r}")
raise _SchemaAttributeError(
f"{type(self).__name__!r} object has no attribute {attr_name!r}"
)
def __setattr__(self, attr_name: str, value: Any) -> None:
if not attr_name.startswith("_") and attr_name in self._get_schema_attributes():
raise _SchemaAttributeError(
f"Attribute {attr_name!r} is immutable. Create a new path instance with `{type(self).__name__}.replace_fields({attr_name}={value!r})`."
)
return super().__setattr__(attr_name, value)
def _get_schema_path_attributes(self) -> List[str]:
if self._esrf_parsed is None:
return []
return self._esrf_parsed.path_field_names()
def _get_schema_nonpath_attributes(self) -> List[str]:
if self._esrf_parsed is None:
return []
return self._esrf_parsed.nonpath_field_names()
def _get_schema_attributes(self) -> List[str]:
if self._esrf_parsed is None:
return []
return self._esrf_parsed.field_names()
def __dir__(self) -> List[str]:
return sorted(set(super().__dir__()) | set(self._get_schema_attributes()))
def __repr__(self) -> str:
return f"<{type(self).__name__} {str(self)!r} schema={self.schema_name!r} template={self.template_name!r}>"
@property
def schema_name(self) -> str:
"""Name of the ESRF path schema if it matches a registered schema."""
if self._esrf_parsed:
return str(self._esrf_parsed.schema_identifier)
else:
return "<none>"
@property
def template_name(self) -> str:
"""Name of the ESRF path template if it matches a template from a registered schema."""
if self._esrf_parsed:
return self._esrf_parsed.template_name
else:
return "<none>"
[docs]
def replace_fields(
self,
template_name: Optional[str] = None,
**fields: _ConceptValueType,
) -> "ESRFPath":
"""Creates a new `ESRFPath`, replacing fields with values from `fields`.
:raises PathSchemaMatchError:
:raises PathTemplateMatchError:
"""
if self._esrf_parsed is None:
raise _PathSchemaMatchError("Path does not match any ESRF schema")
current_parsed = self._esrf_parsed
# Add/replace concept values
new_parsed = current_parsed.replace_concepts(
template_name=template_name, **fields
)
# Render the new path with the existing template
new_path = None
if template_name is None and set(current_parsed.concept_names) == set(
new_parsed.concept_names
):
try:
new_path = new_parsed.render(template_name=current_parsed.template_name)
except _PathTemplateMatchError:
pass
# Render the new path with a new template (best match)
if new_path is None:
new_path = new_parsed.render()
# New ESRFPath instance
new_instance = self.__class__(new_path)
# Append part not defined by the schema
original_schema_base = current_parsed.render()
part_not_in_template = self.relative_to(original_schema_base)
return new_instance / part_not_in_template
[docs]
def strip_mount_point(self) -> "ESRFPath":
"""
Remove leading segments typically introduced by ``ESRFPath.resolve()``.
"""
if self._esrf_parsed is None:
return self.__class__(self)
new_parsed = self._esrf_parsed.strip_mount_point()
new_path = new_parsed.render()
return self.__class__(new_path)
[docs]
@classmethod
def from_fields(
cls,
schema_name: Optional[str] = None,
schema_version: Optional[int] = None,
template_name: Optional[str] = None,
**fields: _ConceptValueType,
) -> "ESRFPath":
"""Create ``ESRFPath`` from schema fields.
:raises PathSchemaMatchError:
:raises PathTemplateMatchError:
:raises UnknownPathTemplate:
"""
if schema_name is not None:
registry = _get_registry(**{schema_name: schema_version})
else:
registry = cls._get_registry()
path = registry.render_path(fields, template_name=template_name)
instance = cls(path)
instance._esrf_parsed = registry.parse_path(path)
return instance
[docs]
@classmethod
def from_path(
cls,
path: Union[str, pathlib.Path],
schema_name: Optional[str] = None,
schema_version: Optional[int] = None,
template_name: Optional[str] = None,
) -> "ESRFPath":
"""Instantiate ``ESRFPath`` with a specific schema, schema version and template.
:raises UnknownPathSchema:
:raises PathSchemaCollision:
:raises UnknownPathTemplate:
:raises PathSchemaMatchError:
:raises PathTemplateMatchError:
"""
if schema_name is not None:
registry = _get_registry(**{schema_name: schema_version})
else:
registry = cls._get_registry()
parsed_path = registry.parse_path(path)
if template_name:
_ = parsed_path.render(template_name)
instance = cls(path)
instance._esrf_parsed = parsed_path
return instance
@classmethod
def _get_registry(cls) -> _Registry:
"""Get default registry of the class.
:raises UnknownPathSchema:
:raises PathSchemaCollision:
"""
if cls._REGISTRY is None:
cls._REGISTRY = _get_registry(
fallback_depth=cls._FALLBACK_DEPTH, **cls._SCHEMAS
)
return cls._REGISTRY
def _add_esrf_parsed(self: _BaseESRFPath, *args) -> None:
self._esrf_parsed = None
registry = None
for arg in args:
if isinstance(arg, ESRFPath):
try:
_esrf_schema = arg._esrf_schema
except AttributeError:
continue
if _esrf_schema is not None:
registry = _esrf_schema._registry
break
if registry is None:
registry = self._get_registry()
try:
self._esrf_parsed = registry.parse_path(self)
except _PathSchemaMatchError:
pass