Skip to content

API Reference

Core Classes

OCMF

The main class for parsing, validating, and verifying OCMF data.

OCMF

Bases: BaseModel

OCMF data model with three pipe-separated sections: header, payload, and signature.

Source code in src/pyocmf/core/ocmf.py
class OCMF(pydantic.BaseModel):
    """OCMF data model with three pipe-separated sections: header, payload, and signature."""

    header: Literal["OCMF"]
    payload: Payload
    signature: Signature
    _original_payload_json: str | None = pydantic.PrivateAttr(default=None)

    @classmethod
    def from_string(cls, ocmf_string: str) -> OCMF:
        """Parse an OCMF string into an OCMF model.

        Automatically detects whether the input is plain text (starts with "OCMF|")
        or hex-encoded and handles both formats.
        """
        ocmf_text = ocmf_string.strip()

        if not ocmf_text.startswith(OCMF_PREFIX):
            try:
                decoded_bytes = bytes.fromhex(ocmf_text)
                ocmf_text = decoded_bytes.decode("utf-8")
            except ValueError as e:
                msg = (
                    f"Invalid OCMF string: must start with '{OCMF_PREFIX}' or be "
                    f"valid hex-encoded. {e}"
                )
                raise HexDecodingError(msg) from e
        parts = ocmf_text.split(OCMF_SEPARATOR, 2)

        if len(parts) != 3 or parts[0] != OCMF_HEADER:
            msg = (
                f"String does not match expected OCMF format "
                f"'{OCMF_HEADER}{OCMF_SEPARATOR}{{payload}}{OCMF_SEPARATOR}{{signature}}'."
            )
            raise OcmfFormatError(msg)

        payload_json = parts[1]
        signature_json = parts[2]

        try:
            payload = Payload.model_validate_json(payload_json)
        except pydantic.ValidationError as e:
            msg = f"Invalid payload JSON: {e}"
            raise OcmfPayloadError(msg) from e

        try:
            signature = Signature.model_validate_json(signature_json)
        except pydantic.ValidationError as e:
            msg = f"Invalid signature JSON: {e}"
            raise OcmfSignatureError(msg) from e

        ocmf = cls(header=OCMF_HEADER, payload=payload, signature=signature)
        ocmf._original_payload_json = payload_json
        return ocmf

    def to_string(self, hex: bool = False) -> str:
        """Convert the OCMF model to string format "OCMF|{payload}|{signature}".

        Set hex=True to return hex-encoded string instead of plain text.
        """
        payload_json = self.payload.model_dump_json(exclude_none=True)
        signature_json = self.signature.model_dump_json(exclude_none=True)
        ocmf_string = OCMF_SEPARATOR.join([OCMF_HEADER, payload_json, signature_json])

        if hex:
            return ocmf_string.encode("utf-8").hex()
        return ocmf_string

    def verify_signature(self, public_key: PublicKey | str) -> bool:
        """Verify the cryptographic signature of the OCMF data.

        Per OCMF spec, public keys must be transmitted out-of-band (separately from OCMF data).
        Requires that the OCMF was parsed from a string (not constructed programmatically)
        because signature verification needs the exact original payload bytes.
        """
        if self._original_payload_json is None:
            msg = (
                "Cannot verify signature: original payload JSON not available. "
                "Signature verification requires the exact original payload bytes. "
                "Use OCMF.from_string() to parse OCMF data for signature verification."
            )
            raise SignatureVerificationError(msg)

        return verification.verify_signature(
            payload_json=self._original_payload_json,
            signature_data=self.signature.SD,
            signature_method=self.signature.SA,
            signature_encoding=self.signature.SE,
            public_key_hex=public_key.key if isinstance(public_key, PublicKey) else public_key,
        )

    def check_eichrecht(
        self, other: OCMF | None = None, *, errors_only: bool = False
    ) -> list[EichrechtIssue]:
        """Check German calibration law (Eichrecht) compliance.

        Validates that OCMF data complies with German Eichrecht requirements
        (MID 2014/32/EU and PTB) for billing-relevant meter readings.

        Provide 'other' OCMF to check transaction pair (begin + end).
        Set errors_only=True to filter out warnings.
        """
        if other is None:
            if not self.payload.RD:
                return [
                    EichrechtIssue(
                        code=compliance.IssueCode.NO_READINGS,
                        message="No readings (RD) present in payload",
                        field="RD",
                    )
                ]

            issues = []
            for i, reading in enumerate(self.payload.RD):
                reading_issues = compliance.check_eichrecht_reading(
                    reading, is_begin=(i == 0 and reading.TX.value == "B")
                )
                issues.extend(reading_issues)
        else:
            issues = compliance.check_eichrecht_transaction(self.payload, other.payload)

        if errors_only:
            issues = [issue for issue in issues if issue.severity == IssueSeverity.ERROR]

        return issues

    @property
    def is_eichrecht_compliant(self) -> bool:
        issues = self.check_eichrecht(errors_only=True)
        return len(issues) == 0

    def verify(
        self,
        public_key: PublicKey | str,
        other: OCMF | None = None,
        eichrecht: bool = True,
    ) -> tuple[bool, list[EichrechtIssue]]:
        """Verify both cryptographic signature and legal compliance.

        Combines signature verification and Eichrecht compliance checking.
        Returns (signature_valid, compliance_issues).
        Set eichrecht=False to skip compliance checking.
        """
        signature_valid = self.verify_signature(public_key)
        compliance_issues = self.check_eichrecht(other) if eichrecht else []
        return signature_valid, compliance_issues

from_string(ocmf_string: str) -> OCMF classmethod

Parse an OCMF string into an OCMF model.

Automatically detects whether the input is plain text (starts with "OCMF|") or hex-encoded and handles both formats.

Source code in src/pyocmf/core/ocmf.py
@classmethod
def from_string(cls, ocmf_string: str) -> OCMF:
    """Parse an OCMF string into an OCMF model.

    Automatically detects whether the input is plain text (starts with "OCMF|")
    or hex-encoded and handles both formats.
    """
    ocmf_text = ocmf_string.strip()

    if not ocmf_text.startswith(OCMF_PREFIX):
        try:
            decoded_bytes = bytes.fromhex(ocmf_text)
            ocmf_text = decoded_bytes.decode("utf-8")
        except ValueError as e:
            msg = (
                f"Invalid OCMF string: must start with '{OCMF_PREFIX}' or be "
                f"valid hex-encoded. {e}"
            )
            raise HexDecodingError(msg) from e
    parts = ocmf_text.split(OCMF_SEPARATOR, 2)

    if len(parts) != 3 or parts[0] != OCMF_HEADER:
        msg = (
            f"String does not match expected OCMF format "
            f"'{OCMF_HEADER}{OCMF_SEPARATOR}{{payload}}{OCMF_SEPARATOR}{{signature}}'."
        )
        raise OcmfFormatError(msg)

    payload_json = parts[1]
    signature_json = parts[2]

    try:
        payload = Payload.model_validate_json(payload_json)
    except pydantic.ValidationError as e:
        msg = f"Invalid payload JSON: {e}"
        raise OcmfPayloadError(msg) from e

    try:
        signature = Signature.model_validate_json(signature_json)
    except pydantic.ValidationError as e:
        msg = f"Invalid signature JSON: {e}"
        raise OcmfSignatureError(msg) from e

    ocmf = cls(header=OCMF_HEADER, payload=payload, signature=signature)
    ocmf._original_payload_json = payload_json
    return ocmf

to_string(hex: bool = False) -> str

Convert the OCMF model to string format "OCMF|{payload}|{signature}".

Set hex=True to return hex-encoded string instead of plain text.

Source code in src/pyocmf/core/ocmf.py
def to_string(self, hex: bool = False) -> str:
    """Convert the OCMF model to string format "OCMF|{payload}|{signature}".

    Set hex=True to return hex-encoded string instead of plain text.
    """
    payload_json = self.payload.model_dump_json(exclude_none=True)
    signature_json = self.signature.model_dump_json(exclude_none=True)
    ocmf_string = OCMF_SEPARATOR.join([OCMF_HEADER, payload_json, signature_json])

    if hex:
        return ocmf_string.encode("utf-8").hex()
    return ocmf_string

verify_signature(public_key: PublicKey | str) -> bool

Verify the cryptographic signature of the OCMF data.

Per OCMF spec, public keys must be transmitted out-of-band (separately from OCMF data). Requires that the OCMF was parsed from a string (not constructed programmatically) because signature verification needs the exact original payload bytes.

Source code in src/pyocmf/core/ocmf.py
def verify_signature(self, public_key: PublicKey | str) -> bool:
    """Verify the cryptographic signature of the OCMF data.

    Per OCMF spec, public keys must be transmitted out-of-band (separately from OCMF data).
    Requires that the OCMF was parsed from a string (not constructed programmatically)
    because signature verification needs the exact original payload bytes.
    """
    if self._original_payload_json is None:
        msg = (
            "Cannot verify signature: original payload JSON not available. "
            "Signature verification requires the exact original payload bytes. "
            "Use OCMF.from_string() to parse OCMF data for signature verification."
        )
        raise SignatureVerificationError(msg)

    return verification.verify_signature(
        payload_json=self._original_payload_json,
        signature_data=self.signature.SD,
        signature_method=self.signature.SA,
        signature_encoding=self.signature.SE,
        public_key_hex=public_key.key if isinstance(public_key, PublicKey) else public_key,
    )

check_eichrecht(other: OCMF | None = None, *, errors_only: bool = False) -> list[EichrechtIssue]

Check German calibration law (Eichrecht) compliance.

Validates that OCMF data complies with German Eichrecht requirements (MID 2014/32/EU and PTB) for billing-relevant meter readings.

Provide 'other' OCMF to check transaction pair (begin + end). Set errors_only=True to filter out warnings.

Source code in src/pyocmf/core/ocmf.py
def check_eichrecht(
    self, other: OCMF | None = None, *, errors_only: bool = False
) -> list[EichrechtIssue]:
    """Check German calibration law (Eichrecht) compliance.

    Validates that OCMF data complies with German Eichrecht requirements
    (MID 2014/32/EU and PTB) for billing-relevant meter readings.

    Provide 'other' OCMF to check transaction pair (begin + end).
    Set errors_only=True to filter out warnings.
    """
    if other is None:
        if not self.payload.RD:
            return [
                EichrechtIssue(
                    code=compliance.IssueCode.NO_READINGS,
                    message="No readings (RD) present in payload",
                    field="RD",
                )
            ]

        issues = []
        for i, reading in enumerate(self.payload.RD):
            reading_issues = compliance.check_eichrecht_reading(
                reading, is_begin=(i == 0 and reading.TX.value == "B")
            )
            issues.extend(reading_issues)
    else:
        issues = compliance.check_eichrecht_transaction(self.payload, other.payload)

    if errors_only:
        issues = [issue for issue in issues if issue.severity == IssueSeverity.ERROR]

    return issues

verify(public_key: PublicKey | str, other: OCMF | None = None, eichrecht: bool = True) -> tuple[bool, list[EichrechtIssue]]

Verify both cryptographic signature and legal compliance.

Combines signature verification and Eichrecht compliance checking. Returns (signature_valid, compliance_issues). Set eichrecht=False to skip compliance checking.

Source code in src/pyocmf/core/ocmf.py
def verify(
    self,
    public_key: PublicKey | str,
    other: OCMF | None = None,
    eichrecht: bool = True,
) -> tuple[bool, list[EichrechtIssue]]:
    """Verify both cryptographic signature and legal compliance.

    Combines signature verification and Eichrecht compliance checking.
    Returns (signature_valid, compliance_issues).
    Set eichrecht=False to skip compliance checking.
    """
    signature_valid = self.verify_signature(public_key)
    compliance_issues = self.check_eichrecht(other) if eichrecht else []
    return signature_valid, compliance_issues

Payload

The payload section containing meter readings and metadata.

Payload

Bases: BaseModel

Source code in src/pyocmf/core/payload.py
class Payload(pydantic.BaseModel):
    model_config = pydantic.ConfigDict(extra="allow")

    FV: str | None = pydantic.Field(default=None, description="Format Version")
    GI: str | None = pydantic.Field(default=None, description="Gateway Identification")
    GS: str | None = pydantic.Field(default=None, description="Gateway Serial")
    GV: str | None = pydantic.Field(default=None, description="Gateway Version")

    PG: PaginationString = pydantic.Field(description="Pagination")

    MV: str | None = pydantic.Field(default=None, description="Meter Vendor")
    MM: str | None = pydantic.Field(default=None, description="Meter Model")
    MS: str | None = pydantic.Field(default=None, description="Meter Serial")
    MF: str | None = pydantic.Field(default=None, description="Meter Firmware")

    IS: bool = pydantic.Field(description="Identification Status")
    IL: UserAssignmentStatus | None = pydantic.Field(
        default=None, description="Identification Level"
    )
    IF: list[IdentificationFlag] = pydantic.Field(default=[], description="Identification Flags")
    IT: IdentificationType | None = pydantic.Field(
        default=IdentificationType.NONE, description="Identification Type"
    )
    ID: IdentificationData | None = pydantic.Field(default=None, description="Identification Data")
    TT: str | None = pydantic.Field(default=None, max_length=250, description="Tariff Text")

    CF: str | None = pydantic.Field(
        default=None, max_length=25, description="Charge Controller Firmware Version"
    )
    LC: CableLossCompensation | None = pydantic.Field(default=None, description="Loss Compensation")

    CT: ChargePointIdentificationType | str | None = pydantic.Field(
        default=None, description="Charge Point Identification Type"
    )
    CI: str | None = pydantic.Field(default=None, description="Charge Point Identification")

    RD: list[Reading] = pydantic.Field(description="Readings")

    @pydantic.model_validator(mode="before")
    @classmethod
    def apply_reading_inheritance(cls, data: dict) -> dict:
        """Apply field inheritance for readings.

        Per OCMF spec, some reading fields can be inherited from the previous reading
        if not specified.
        """
        if not isinstance(data, dict):
            return data

        readings_data = data.get("RD", [])
        if not readings_data:
            return data

        if readings_data and isinstance(readings_data[0], Reading):
            return data

        inheritable_fields = ["TM", "TX", "RI", "RU", "RT", "EF", "ST"]
        last_values: dict[str, str] = {}
        processed_readings = []

        for rd in readings_data:
            reading_dict = {
                field: rd.get(field, last_values.get(field))
                for field in inheritable_fields
                if field in rd or field in last_values
            }
            reading_dict.update({k: v for k, v in rd.items() if k not in inheritable_fields})

            last_values.update({k: v for k, v in reading_dict.items() if k in inheritable_fields})

            processed_readings.append(reading_dict)

        return {**data, "RD": processed_readings}

    @pydantic.model_validator(mode="after")
    def validate_serial_numbers(self) -> Payload:
        """Either GS or MS must be present for signature component identification.

        Per OCMF spec: GS is optional (0..1) but MS is mandatory (1..1).
        However, at least one must be non-None (though can be empty string).
        """
        if self.GS is None and self.MS is None:
            msg = "Either Gateway Serial (GS) or Meter Serial (MS) must be provided"
            raise ValidationError(msg)
        return self

    @pydantic.field_validator("FV", mode="before")
    @classmethod
    def convert_fv_to_string(cls, v: int | float | str | None) -> str | None:
        if isinstance(v, (int, float)):
            return str(v)
        return v

    @pydantic.field_validator("CT", mode="before")
    @classmethod
    def convert_ct_empty_to_none(cls, v: str | int | None) -> str | None:
        if v == "" or v == 0:
            return None
        if isinstance(v, int):
            return str(v)
        return v

    # Validator mapping at class level to avoid recreation
    _ID_FORMAT_VALIDATORS = {
        IdentificationType.ISO14443.value: ISO14443,
        IdentificationType.ISO15693.value: ISO15693,
        IdentificationType.EMAID.value: EMAID,
        IdentificationType.EVCCID.value: EVCCID,
        IdentificationType.EVCOID.value: EVCOID,
        IdentificationType.ISO7812.value: ISO7812,
        IdentificationType.PHONE_NUMBER.value: PHONE_NUMBER,
    }

    # Types that accept any string value without validation
    _UNRESTRICTED_TYPES = {
        IdentificationType.LOCAL.value,
        IdentificationType.LOCAL_1.value,
        IdentificationType.LOCAL_2.value,
        IdentificationType.CENTRAL.value,
        IdentificationType.CENTRAL_1.value,
        IdentificationType.CENTRAL_2.value,
        IdentificationType.CARD_TXN_NR.value,
        IdentificationType.KEY_CODE.value,
        IdentificationType.UNDEFINED.value,
        IdentificationType.NONE.value,
        IdentificationType.DENIED.value,
    }

    # Types that validate with warnings instead of errors (permissive mode)
    _PERMISSIVE_TYPES = {
        IdentificationType.ISO14443.value,
        IdentificationType.ISO15693.value,
    }

    def _validate_id_format(self, it_value: str, id_value: str, *, strict: bool = True) -> None:
        """Validate ID format, either strictly (raise) or permissively (warn).

        Args:
            it_value: Identification type value
            id_value: Identification data value
            strict: If True, raise ValidationError on mismatch. If False, emit warning.

        """
        if it_value not in self._ID_FORMAT_VALIDATORS:
            return

        try:
            pydantic.TypeAdapter(self._ID_FORMAT_VALIDATORS[it_value]).validate_python(id_value)
        except pydantic.ValidationError as e:
            msg = (
                f"ID value '{id_value}' does not match expected format for identification "
                f"type '{it_value}'"
            )

            if strict:
                error_msg = f"{msg}: {e}"
                raise ValidationError(error_msg) from e
            else:
                warnings.warn(
                    f"{msg}. This may indicate non-standard RFID card format or vendor-specific "
                    f"implementation. Data will be accepted but may not be fully spec-compliant.",
                    UserWarning,
                    stacklevel=4,
                )

    @pydantic.model_validator(mode="after")
    def validate_id_format_by_type(self) -> Payload:
        """Validate ID format based on the Identification Type (IT).

        For most types, validation is strict (raises ValidationError).
        For ISO14443 and ISO15693, validation emits warnings but allows non-standard formats,
        as real-world RFID cards may have vendor-specific implementations.
        """
        if not self.ID or not self.IT:
            return self

        it_value = self.IT.value if isinstance(self.IT, IdentificationType) else str(self.IT)
        id_value = self.ID

        if it_value in self._UNRESTRICTED_TYPES:
            return self

        # Use permissive validation (warn) for ISO types, strict for others
        strict = it_value not in self._PERMISSIVE_TYPES
        self._validate_id_format(it_value, id_value, strict=strict)
        return self

apply_reading_inheritance(data: dict) -> dict classmethod

Apply field inheritance for readings.

Per OCMF spec, some reading fields can be inherited from the previous reading if not specified.

Source code in src/pyocmf/core/payload.py
@pydantic.model_validator(mode="before")
@classmethod
def apply_reading_inheritance(cls, data: dict) -> dict:
    """Apply field inheritance for readings.

    Per OCMF spec, some reading fields can be inherited from the previous reading
    if not specified.
    """
    if not isinstance(data, dict):
        return data

    readings_data = data.get("RD", [])
    if not readings_data:
        return data

    if readings_data and isinstance(readings_data[0], Reading):
        return data

    inheritable_fields = ["TM", "TX", "RI", "RU", "RT", "EF", "ST"]
    last_values: dict[str, str] = {}
    processed_readings = []

    for rd in readings_data:
        reading_dict = {
            field: rd.get(field, last_values.get(field))
            for field in inheritable_fields
            if field in rd or field in last_values
        }
        reading_dict.update({k: v for k, v in rd.items() if k not in inheritable_fields})

        last_values.update({k: v for k, v in reading_dict.items() if k in inheritable_fields})

        processed_readings.append(reading_dict)

    return {**data, "RD": processed_readings}

validate_serial_numbers() -> Payload

Either GS or MS must be present for signature component identification.

Per OCMF spec: GS is optional (0..1) but MS is mandatory (1..1). However, at least one must be non-None (though can be empty string).

Source code in src/pyocmf/core/payload.py
@pydantic.model_validator(mode="after")
def validate_serial_numbers(self) -> Payload:
    """Either GS or MS must be present for signature component identification.

    Per OCMF spec: GS is optional (0..1) but MS is mandatory (1..1).
    However, at least one must be non-None (though can be empty string).
    """
    if self.GS is None and self.MS is None:
        msg = "Either Gateway Serial (GS) or Meter Serial (MS) must be provided"
        raise ValidationError(msg)
    return self

validate_id_format_by_type() -> Payload

Validate ID format based on the Identification Type (IT).

For most types, validation is strict (raises ValidationError). For ISO14443 and ISO15693, validation emits warnings but allows non-standard formats, as real-world RFID cards may have vendor-specific implementations.

Source code in src/pyocmf/core/payload.py
@pydantic.model_validator(mode="after")
def validate_id_format_by_type(self) -> Payload:
    """Validate ID format based on the Identification Type (IT).

    For most types, validation is strict (raises ValidationError).
    For ISO14443 and ISO15693, validation emits warnings but allows non-standard formats,
    as real-world RFID cards may have vendor-specific implementations.
    """
    if not self.ID or not self.IT:
        return self

    it_value = self.IT.value if isinstance(self.IT, IdentificationType) else str(self.IT)
    id_value = self.ID

    if it_value in self._UNRESTRICTED_TYPES:
        return self

    # Use permissive validation (warn) for ISO types, strict for others
    strict = it_value not in self._PERMISSIVE_TYPES
    self._validate_id_format(it_value, id_value, strict=strict)
    return self

Reading

Individual meter reading data.

Reading

Bases: BaseModel

Source code in src/pyocmf/core/reading.py
class Reading(pydantic.BaseModel):
    TM: OCMFTimestamp = pydantic.Field(description="Time (ISO 8601 + time status) - REQUIRED")
    TX: MeterReadingReason | None = pydantic.Field(default=None, description="Transaction")
    RV: decimal.Decimal | None = pydantic.Field(
        default=None,
        description="Reading Value - Conditional (required when RI present)",
    )
    RI: OBISCode | None = pydantic.Field(
        default=None, description="Reading Identification (OBIS code) - Conditional"
    )
    RU: OCMFUnit | str = pydantic.Field(
        description="Reading Unit (e.g. kWh, Wh, mOhm per OCMF spec Table 20"
    )
    CL: decimal.Decimal | None = pydantic.Field(default=None, description="Cumulated Losses")
    EF: ErrorFlags | None = pydantic.Field(
        default=None, description="Error Flags (can contain 'E', 't', or both)"
    )
    ST: MeterStatus = pydantic.Field(description="Status - REQUIRED")

    @pydantic.field_validator("TM", mode="before")
    @classmethod
    def parse_timestamp(cls, v: str | OCMFTimestamp) -> OCMFTimestamp:
        if isinstance(v, OCMFTimestamp):
            return v
        if isinstance(v, str):
            return OCMFTimestamp.from_string(v)
        msg = f"TM must be a string or OCMFTimestamp, got {type(v)}"
        raise TypeError(msg)

    @pydantic.field_serializer("TM")
    def serialize_timestamp(self, tm: OCMFTimestamp) -> str:
        return str(tm)

    @pydantic.field_validator("EF", mode="before")
    @classmethod
    def ef_empty_string_to_none(cls, v: str | None) -> ErrorFlags | None:
        if v == "":
            return None
        return v

    @pydantic.field_validator("RU")
    @classmethod
    def validate_reading_unit(cls, v: OCMFUnit | str) -> OCMFUnit | str:
        """Validate RU is spec-compliant, warn if not."""
        spec_units = {unit.value for unit in EnergyUnit} | {unit.value for unit in ResistanceUnit}
        value_str = str(v)
        if value_str not in spec_units:
            warnings.warn(
                f"Reading Unit '{value_str}' is not in OCMF spec Table 20 "
                f"({', '.join(sorted(spec_units))}). "
                f"This may indicate a vendor-specific or extended unit. "
                f"Data will be accepted.",
                UserWarning,
                stacklevel=2,
            )
        return v

    @pydantic.field_validator("CL")
    @classmethod
    def validate_cl(
        cls, v: decimal.Decimal | None, info: pydantic.ValidationInfo
    ) -> decimal.Decimal | None:
        if v is None:
            return v

        ri = info.data.get("RI")
        cl_register_error = (
            "CL (Cumulated Loss) can only appear when RI indicates an "
            "accumulation register (B0-B3, C0-C3)"
        )
        if not ri:
            raise ValueError(cl_register_error)
        if isinstance(ri, OBIS) and not ri.is_accumulation_register:
            raise ValueError(cl_register_error)
        if isinstance(ri, str) and not is_accumulation_register(ri):
            raise ValueError(cl_register_error)

        if v != 0:
            tx = info.data.get("TX")
            if tx == MeterReadingReason.BEGIN:
                msg = "CL (Cumulated Loss) must be 0 when TX=B (transaction begin)"
                raise ValueError(msg)

        if v < 0:
            msg = "CL (Cumulated Loss) must be non-negative"
            raise ValueError(msg)

        return v

    @pydantic.model_validator(mode="after")
    def validate_ri_ru_group(self) -> Reading:
        ri_present = self.RI is not None
        ru_present = self.RU is not None

        if ri_present != ru_present:
            msg = (
                "RI (Reading Identification) and RU (Reading Unit) must both be "
                "present or both absent"
            )
            raise ValueError(msg)

        return self

    @property
    def timestamp(self):
        return self.TM.timestamp

    @property
    def time_status(self) -> TimeStatus:
        return self.TM.status

validate_reading_unit(v: OCMFUnit | str) -> OCMFUnit | str classmethod

Validate RU is spec-compliant, warn if not.

Source code in src/pyocmf/core/reading.py
@pydantic.field_validator("RU")
@classmethod
def validate_reading_unit(cls, v: OCMFUnit | str) -> OCMFUnit | str:
    """Validate RU is spec-compliant, warn if not."""
    spec_units = {unit.value for unit in EnergyUnit} | {unit.value for unit in ResistanceUnit}
    value_str = str(v)
    if value_str not in spec_units:
        warnings.warn(
            f"Reading Unit '{value_str}' is not in OCMF spec Table 20 "
            f"({', '.join(sorted(spec_units))}). "
            f"This may indicate a vendor-specific or extended unit. "
            f"Data will be accepted.",
            UserWarning,
            stacklevel=2,
        )
    return v

Signature

Cryptographic signature data and verification.

Signature

Bases: BaseModel

Source code in src/pyocmf/core/signature.py
class Signature(pydantic.BaseModel):
    SA: SignatureMethod | None = pydantic.Field(
        default=SignatureMethod.SECP256R1_SHA256, description="Signature Algorithm"
    )
    SE: SignatureEncodingType | None = pydantic.Field(
        default=SignatureEncodingType.HEX, description="Signature Encoding"
    )
    SM: SignatureMimeType | None = pydantic.Field(
        default=SignatureMimeType.APPLICATION_X_DER, description="Signature Mime Type"
    )
    SD: SignatureDataType = pydantic.Field(description="Signature Data")

Compliance Checking

Eichrecht Compliance

Functions for validating OCMF data against German Eichrecht (calibration law) requirements.

check_eichrecht_reading(reading: Reading, is_begin: bool = False) -> list[EichrechtIssue]

Check a single reading for Eichrecht compliance.

Parameters:

Name Type Description Default
reading Reading

The reading to check

required
is_begin bool

Whether this is a transaction begin reading (affects CL checking)

False

Returns:

Type Description
list[EichrechtIssue]

List of compliance issues (empty if compliant)

Source code in src/pyocmf/compliance/reading.py
def check_eichrecht_reading(reading: Reading, is_begin: bool = False) -> list[EichrechtIssue]:
    """Check a single reading for Eichrecht compliance.

    Args:
        reading: The reading to check
        is_begin: Whether this is a transaction begin reading (affects CL checking)

    Returns:
        List of compliance issues (empty if compliant)

    """
    issues: list[EichrechtIssue] = []

    if reading.ST != MeterStatus.OK:
        issues.append(
            EichrechtIssue(
                code=IssueCode.METER_STATUS,
                message=(
                    f"Meter status must be 'G' (OK) for billing-relevant readings, "
                    f"got '{reading.ST}'"
                ),
                field="ST",
            )
        )

    if reading.EF and reading.EF.strip():
        issues.append(
            EichrechtIssue(
                code=IssueCode.ERROR_FLAGS,
                message=(
                    f"Error flags must be empty for billing-relevant readings, got '{reading.EF}'"
                ),
                field="EF",
            )
        )

    if reading.time_status != TimeStatus.SYNCHRONIZED:
        issues.append(
            EichrechtIssue(
                code=IssueCode.TIME_SYNC,
                message=(
                    f"Time should be synchronized (status 'S') for billing, "
                    f"got '{reading.time_status.value}'"
                ),
                field="TM",
                severity=IssueSeverity.WARNING,
            )
        )

    if reading.CL is not None:
        if is_begin and reading.CL != 0:
            issues.append(
                EichrechtIssue(
                    code=IssueCode.CL_BEGIN,
                    message=f"Cumulated loss (CL) must be 0 at transaction begin, got {reading.CL}",
                    field="CL",
                )
            )
        if reading.CL < 0:
            issues.append(
                EichrechtIssue(
                    code=IssueCode.CL_NEGATIVE,
                    message=f"Cumulated loss (CL) must be non-negative, got {reading.CL}",
                    field="CL",
                )
            )

    return issues

check_eichrecht_transaction(begin: Payload, end: Payload) -> list[EichrechtIssue]

Check a complete charging transaction for Eichrecht compliance.

Source code in src/pyocmf/compliance/transaction.py
def check_eichrecht_transaction(
    begin: Payload,
    end: Payload,
) -> list[EichrechtIssue]:
    """Check a complete charging transaction for Eichrecht compliance."""
    issues: list[EichrechtIssue] = []

    if not begin.RD or not end.RD:
        issues.append(
            EichrechtIssue(
                code=IssueCode.NO_READINGS,
                message="Both begin and end payloads must contain readings (RD)",
                field="RD",
            )
        )
        return issues

    begin_reading = begin.RD[0]
    end_reading = end.RD[-1]

    issues.extend(_validate_transaction_types(begin_reading, end_reading, len(end.RD)))

    issues.extend(check_eichrecht_reading(begin_reading, is_begin=True))
    issues.extend(check_eichrecht_reading(end_reading, is_begin=False))

    issues.extend(_validate_field_consistency(begin, end, begin_reading, end_reading))
    issues.extend(_validate_value_progression(begin_reading, end_reading))

    if issue := _validate_identification_level(begin, "begin"):
        issues.append(issue)
    if issue := _validate_identification_level(end, "end"):
        issues.append(issue)

    if issue := _validate_pagination_consistency(begin, end):
        issues.append(issue)

    if issue := _check_field_match(
        begin.ID, end.ID, "ID", IssueCode.ID_MISMATCH, "Identification data"
    ):
        issue.severity = IssueSeverity.WARNING
        issues.append(issue)

    return issues

validate_transaction_pair(begin: OCMF, end: OCMF) -> bool

Validate transaction pair compliance (errors only, warnings ignored).

Source code in src/pyocmf/compliance/transaction.py
def validate_transaction_pair(begin: OCMF, end: OCMF) -> bool:
    """Validate transaction pair compliance (errors only, warnings ignored)."""
    issues = check_eichrecht_transaction(begin.payload, end.payload)
    return not any(issue.severity == IssueSeverity.ERROR for issue in issues)

Compliance Models

EichrechtIssue dataclass

Source code in src/pyocmf/compliance/models.py
@dataclass
class EichrechtIssue:
    code: IssueCode
    message: str
    field: str | None = None
    severity: IssueSeverity = IssueSeverity.ERROR

    def __str__(self) -> str:
        prefix = f"[{self.field}] " if self.field else ""
        return f"{prefix}{self.message} ({self.code})"

IssueCode

Bases: StrEnum

Source code in src/pyocmf/compliance/models.py
class IssueCode(enum.StrEnum):
    # Reading-level issues
    METER_STATUS = "METER_STATUS"
    ERROR_FLAGS = "ERROR_FLAGS"
    TIME_SYNC = "TIME_SYNC"
    CL_BEGIN = "CL_BEGIN"
    CL_NEGATIVE = "CL_NEGATIVE"

    # Transaction-level issues
    NO_READINGS = "NO_READINGS"
    BEGIN_TX = "BEGIN_TX"
    END_TX = "END_TX"
    SERIAL_MISMATCH = "SERIAL_MISMATCH"
    OBIS_MISMATCH = "OBIS_MISMATCH"
    UNIT_MISMATCH = "UNIT_MISMATCH"
    VALUE_REGRESSION = "VALUE_REGRESSION"
    TIME_REGRESSION = "TIME_REGRESSION"
    ID_MISMATCH = "ID_MISMATCH"
    ID_LEVEL_INVALID = "ID_LEVEL_INVALID"
    PAGINATION_INCONSISTENT = "PAGINATION_INCONSISTENT"

IssueSeverity

Bases: StrEnum

Source code in src/pyocmf/compliance/models.py
7
8
9
class IssueSeverity(enum.StrEnum):
    ERROR = "error"
    WARNING = "warning"

Data Models

PublicKey

Public key metadata and validation per OCMF specification.

PublicKey

Bases: BaseModel

Source code in src/pyocmf/models/public_key.py
class PublicKey(pydantic.BaseModel):
    key: HexStr = pydantic.Field(description="Hex-encoded DER public key")
    curve: CurveType = pydantic.Field(description="Elliptic curve type")
    size: int = pydantic.Field(description="Key size in bits")
    block_length: int = pydantic.Field(description="Block length in bytes")

    def to_string(self, base64_encode: bool = False) -> str:
        if base64_encode:
            key_bytes = bytes.fromhex(self.key)
            return base64.b64encode(key_bytes).decode("ascii")
        return self.key

    @classmethod
    def from_string(cls, key_string: str) -> Self:
        """Parse DER public key string (hex or base64) and extract metadata."""
        check_cryptography_available()

        key_string = key_string.strip()

        # Try hex first (DER-encoded keys typically start with 30 in hex)
        key_bytes: bytes | None = None
        key_hex: str

        try:
            key_bytes = bytes.fromhex(key_string)
            key_hex = key_string
        except ValueError:
            # Not valid hex, try base64
            try:
                key_bytes = base64.b64decode(key_string, validate=True)
                key_hex = key_bytes.hex()
            except Exception as e:
                msg = f"Invalid public key encoding: not valid hex or base64. {e}"
                raise Base64DecodingError(msg) from e

        try:
            public_key = serialization.load_der_public_key(key_bytes)

            if not isinstance(public_key, ec.EllipticCurvePublicKey):
                msg = "Public key is not an elliptic curve key"
                raise TypeError(msg)  # noqa: TRY301

            curve_name = public_key.curve.name
            key_size = public_key.curve.key_size
            block_length = key_size // 8

            return cls(
                key=key_hex,
                curve=curve_name,
                size=key_size,
                block_length=block_length,
            )
        except UnsupportedAlgorithm as e:
            msg = f"Unsupported elliptic curve in public key: {e}"
            raise PublicKeyError(msg) from e
        except (ValueError, TypeError) as e:
            msg = f"Failed to parse public key: {e}"
            raise PublicKeyError(msg) from e

    @property
    def key_type_identifier(self) -> KeyType:
        return KeyType.from_curve(self.curve)

    def matches_signature_algorithm(self, signature_algorithm: SignatureMethod | None) -> bool:
        if signature_algorithm is None:
            return False

        return self.curve == signature_algorithm.curve

from_string(key_string: str) -> Self classmethod

Parse DER public key string (hex or base64) and extract metadata.

Source code in src/pyocmf/models/public_key.py
@classmethod
def from_string(cls, key_string: str) -> Self:
    """Parse DER public key string (hex or base64) and extract metadata."""
    check_cryptography_available()

    key_string = key_string.strip()

    # Try hex first (DER-encoded keys typically start with 30 in hex)
    key_bytes: bytes | None = None
    key_hex: str

    try:
        key_bytes = bytes.fromhex(key_string)
        key_hex = key_string
    except ValueError:
        # Not valid hex, try base64
        try:
            key_bytes = base64.b64decode(key_string, validate=True)
            key_hex = key_bytes.hex()
        except Exception as e:
            msg = f"Invalid public key encoding: not valid hex or base64. {e}"
            raise Base64DecodingError(msg) from e

    try:
        public_key = serialization.load_der_public_key(key_bytes)

        if not isinstance(public_key, ec.EllipticCurvePublicKey):
            msg = "Public key is not an elliptic curve key"
            raise TypeError(msg)  # noqa: TRY301

        curve_name = public_key.curve.name
        key_size = public_key.curve.key_size
        block_length = key_size // 8

        return cls(
            key=key_hex,
            curve=curve_name,
            size=key_size,
            block_length=block_length,
        )
    except UnsupportedAlgorithm as e:
        msg = f"Unsupported elliptic curve in public key: {e}"
        raise PublicKeyError(msg) from e
    except (ValueError, TypeError) as e:
        msg = f"Failed to parse public key: {e}"
        raise PublicKeyError(msg) from e

OBIS

OBIS code model for meter reading identifiers.

OBIS

Bases: BaseModel

Source code in src/pyocmf/models/obis.py
class OBIS(pydantic.BaseModel):
    model_config = pydantic.ConfigDict(frozen=True)

    code: str
    suffix: str | None = None

    @classmethod
    def from_string(cls, obis_str: str) -> OBIS:
        if not isinstance(obis_str, str):
            return obis_str

        parts = obis_str.split("*", 1)
        return cls(
            code=parts[0],
            suffix=parts[1] if len(parts) > 1 else None,
        )

    @property
    def info(self):
        return get_obis_info(self.code)

    @property
    def is_billing_relevant(self) -> bool:
        return is_billing_relevant(self.code)

    @property
    def is_accumulation_register(self) -> bool:
        return is_accumulation_register(self.code)

    @property
    def is_transaction_register(self) -> bool:
        return is_transaction_register(self.code)

    def __str__(self) -> str:
        return f"{self.code}*{self.suffix}" if self.suffix else self.code

    def __repr__(self) -> str:
        if self.suffix:
            return f"OBIS('{self.code}*{self.suffix}')"
        return f"OBIS('{self.code}')"

OCMFTimestamp

Timestamp with time synchronization status.

OCMFTimestamp dataclass

Source code in src/pyocmf/models/timestamp.py
@dataclass(frozen=True)
class OCMFTimestamp:
    timestamp: datetime
    status: TimeStatus

    def __str__(self) -> str:
        return self.serialize()

    @classmethod
    def from_string(cls, timestamp_str: str) -> OCMFTimestamp:
        """Parse OCMF timestamp string to OCMFTimestamp.

        OCMF format: "2023-06-15T14:30:45,123+0200 S" (note: comma for milliseconds).
        """
        if " " in timestamp_str:
            ts_part, status_part = timestamp_str.rsplit(" ", 1)
            status = TimeStatus(status_part)
        else:
            ts_part = timestamp_str
            status = TimeStatus.UNKNOWN_OR_UNSYNCHRONIZED

        ts_normalized = ts_part.replace(",", ".")
        dt = datetime.fromisoformat(ts_normalized)

        return cls(timestamp=dt, status=status)

    def serialize(self) -> str:
        """Serialize to OCMF timestamp format.

        Uses comma for milliseconds as required by OCMF spec.
        """
        if self.timestamp.tzinfo is None:
            error_message = "Datetime must be timezone-aware for OCMF format"
            raise ValueError(error_message)

        iso_str = self.timestamp.isoformat(timespec="milliseconds")
        ocmf_str = iso_str.replace(".", ",")

        return f"{ocmf_str} {self.status.value}"

from_string(timestamp_str: str) -> OCMFTimestamp classmethod

Parse OCMF timestamp string to OCMFTimestamp.

OCMF format: "2023-06-15T14:30:45,123+0200 S" (note: comma for milliseconds).

Source code in src/pyocmf/models/timestamp.py
@classmethod
def from_string(cls, timestamp_str: str) -> OCMFTimestamp:
    """Parse OCMF timestamp string to OCMFTimestamp.

    OCMF format: "2023-06-15T14:30:45,123+0200 S" (note: comma for milliseconds).
    """
    if " " in timestamp_str:
        ts_part, status_part = timestamp_str.rsplit(" ", 1)
        status = TimeStatus(status_part)
    else:
        ts_part = timestamp_str
        status = TimeStatus.UNKNOWN_OR_UNSYNCHRONIZED

    ts_normalized = ts_part.replace(",", ".")
    dt = datetime.fromisoformat(ts_normalized)

    return cls(timestamp=dt, status=status)

serialize() -> str

Serialize to OCMF timestamp format.

Uses comma for milliseconds as required by OCMF spec.

Source code in src/pyocmf/models/timestamp.py
def serialize(self) -> str:
    """Serialize to OCMF timestamp format.

    Uses comma for milliseconds as required by OCMF spec.
    """
    if self.timestamp.tzinfo is None:
        error_message = "Datetime must be timezone-aware for OCMF format"
        raise ValueError(error_message)

    iso_str = self.timestamp.isoformat(timespec="milliseconds")
    ocmf_str = iso_str.replace(".", ",")

    return f"{ocmf_str} {self.status.value}"

CableLossCompensation

Cable loss compensation data.

CableLossCompensation

Bases: BaseModel

Source code in src/pyocmf/models/cable_loss.py
class CableLossCompensation(pydantic.BaseModel):
    LN: str | None = pydantic.Field(
        default=None, max_length=20, description="Loss Compensation Naming"
    )
    LI: int | None = pydantic.Field(default=None, description="Loss Compensation Identification")
    LR: decimal.Decimal = pydantic.Field(description="Loss Compensation Cable Resistance")
    LU: ResistanceUnit = pydantic.Field(description="Loss Compensation Unit")

Registries

OBIS Code Registry

Utilities for working with OBIS codes.

get_obis_info(obis_code: str) -> OBISInfo | None

Source code in src/pyocmf/registries/obis.py
def get_obis_info(obis_code: str) -> OBISInfo | None:
    return OBISInfo.from_code(obis_code)

is_billing_relevant(obis_code: str) -> bool

Source code in src/pyocmf/registries/obis.py
def is_billing_relevant(obis_code: str) -> bool:
    normalized = _normalize(obis_code)

    if normalized in ALL_KNOWN_OBIS:
        return ALL_KNOWN_OBIS[normalized].billing_relevant

    if _ACCUMULATION_REGISTER_PATTERN.match(normalized):
        return True

    if _ACTIVE_ENERGY_PATTERN.match(normalized):
        return True

    return False

is_accumulation_register(obis_code: str) -> bool

Source code in src/pyocmf/registries/obis.py
def is_accumulation_register(obis_code: str) -> bool:
    normalized = _normalize(obis_code)
    return bool(_ACCUMULATION_REGISTER_PATTERN.match(normalized))

is_transaction_register(obis_code: str) -> bool

Source code in src/pyocmf/registries/obis.py
def is_transaction_register(obis_code: str) -> bool:
    normalized = _normalize(obis_code)
    return bool(_TRANSACTION_REGISTER_PATTERN.match(normalized))

OBISInfo dataclass

Source code in src/pyocmf/registries/obis.py
@dataclass
class OBISInfo:
    code: str
    description: str
    billing_relevant: bool
    category: OBISCategory

    @staticmethod
    def normalize(obis_code: str) -> str:
        return _normalize(obis_code)

    @staticmethod
    def from_code(obis_code: str) -> OBISInfo | None:
        normalized = _normalize(obis_code)
        return ALL_KNOWN_OBIS.get(normalized)

    def is_accumulation_register(self) -> bool:
        return bool(_ACCUMULATION_REGISTER_PATTERN.match(self.code))

    def is_transaction_register(self) -> bool:
        return bool(_TRANSACTION_REGISTER_PATTERN.match(self.code))

XML Utilities

OcmfContainer

Container for parsing OCMF data from XML transaction files.

OcmfContainer

Source code in src/pyocmf/utils/xml.py
class OcmfContainer:
    def __init__(self, entries: list[OcmfRecord]) -> None:
        self._entries = entries

    @classmethod
    def from_xml(cls, xml_path: pathlib.Path | str) -> OcmfContainer:
        """Parse OCMF data from an XML file.

        Args:
            xml_path: Path to the XML file

        Returns:
            OcmfContainer with parsed OCMF entries

        Raises:
            XmlParsingError: If the XML file cannot be parsed
            DataNotFoundError: If no OCMF data is found

        """
        path = pathlib.Path(xml_path)

        try:
            tree = ET.parse(path)
            root = tree.getroot()
        except ET.ParseError as e:
            msg = f"Failed to parse XML file: {e}"
            raise XmlParsingError(msg) from e

        entries = []
        seen_strings: set[str] = set()

        for value_elem in root.findall("value"):
            ocmf_str = _extract_ocmf_string(value_elem)

            if ocmf_str and ocmf_str not in seen_strings:
                ocmf = OCMF.from_string(ocmf_str)
                public_key = _extract_public_key(value_elem)
                entries.append(OcmfRecord(ocmf=ocmf, public_key=public_key))
                seen_strings.add(ocmf_str)

        if not entries:
            msg = "No OCMF data found in XML file"
            raise DataNotFoundError(msg)

        return cls(entries)

    @property
    def entries(self) -> list[OcmfRecord]:
        return self._entries

    def __len__(self) -> int:
        return len(self._entries)

    def __iter__(self) -> Iterator[OcmfRecord]:
        return iter(self._entries)

    def __getitem__(self, index: int) -> OcmfRecord:
        return self._entries[index]

from_xml(xml_path: pathlib.Path | str) -> OcmfContainer classmethod

Parse OCMF data from an XML file.

Parameters:

Name Type Description Default
xml_path Path | str

Path to the XML file

required

Returns:

Type Description
OcmfContainer

OcmfContainer with parsed OCMF entries

Raises:

Type Description
XmlParsingError

If the XML file cannot be parsed

DataNotFoundError

If no OCMF data is found

Source code in src/pyocmf/utils/xml.py
@classmethod
def from_xml(cls, xml_path: pathlib.Path | str) -> OcmfContainer:
    """Parse OCMF data from an XML file.

    Args:
        xml_path: Path to the XML file

    Returns:
        OcmfContainer with parsed OCMF entries

    Raises:
        XmlParsingError: If the XML file cannot be parsed
        DataNotFoundError: If no OCMF data is found

    """
    path = pathlib.Path(xml_path)

    try:
        tree = ET.parse(path)
        root = tree.getroot()
    except ET.ParseError as e:
        msg = f"Failed to parse XML file: {e}"
        raise XmlParsingError(msg) from e

    entries = []
    seen_strings: set[str] = set()

    for value_elem in root.findall("value"):
        ocmf_str = _extract_ocmf_string(value_elem)

        if ocmf_str and ocmf_str not in seen_strings:
            ocmf = OCMF.from_string(ocmf_str)
            public_key = _extract_public_key(value_elem)
            entries.append(OcmfRecord(ocmf=ocmf, public_key=public_key))
            seen_strings.add(ocmf_str)

    if not entries:
        msg = "No OCMF data found in XML file"
        raise DataNotFoundError(msg)

    return cls(entries)

OcmfRecord

A single OCMF record with its associated public key.

OcmfRecord dataclass

Source code in src/pyocmf/utils/xml.py
@dataclass
class OcmfRecord:
    ocmf: OCMF
    public_key: PublicKey | None = None

    def verify_signature(self) -> bool:
        if self.public_key is None:
            msg = "No public key available for signature verification"
            raise SignatureVerificationError(msg)

        return self.ocmf.verify_signature(self.public_key)

Exceptions

All exceptions inherit from PyOCMFError.

PyOCMFError

Bases: Exception

Source code in src/pyocmf/exceptions.py
class PyOCMFError(Exception):
    def __init__(
        self,
        message: str,
        *,
        field: str | None = None,
        details: list[dict] | None = None,
    ) -> None:
        super().__init__(message)
        self.field = field
        self.details = details

OCMF Parsing Errors

OcmfFormatError

Bases: PyOCMFError

Source code in src/pyocmf/exceptions.py
class OcmfFormatError(PyOCMFError):
    pass

OcmfPayloadError

Bases: PyOCMFError

Source code in src/pyocmf/exceptions.py
class OcmfPayloadError(PyOCMFError):
    pass

OcmfSignatureError

Bases: PyOCMFError

Source code in src/pyocmf/exceptions.py
class OcmfSignatureError(PyOCMFError):
    pass

Validation Errors

ValidationError

Bases: PyOCMFError, ValueError

Source code in src/pyocmf/exceptions.py
class ValidationError(PyOCMFError, ValueError):
    pass

Encoding Errors

EncodingError

Bases: PyOCMFError, ValueError

Source code in src/pyocmf/exceptions.py
class EncodingError(PyOCMFError, ValueError):
    def __init__(
        self,
        message: str,
        *,
        value: str | None = None,
        field: str | None = None,
        details: list[dict] | None = None,
    ) -> None:
        super().__init__(message, field=field, details=details)
        self.value = value

EncodingTypeError

Bases: PyOCMFError, TypeError

Source code in src/pyocmf/exceptions.py
class EncodingTypeError(PyOCMFError, TypeError):
    def __init__(
        self,
        message: str,
        *,
        value: object = None,
        expected_type: str | None = None,
        field: str | None = None,
        details: list[dict] | None = None,
    ) -> None:
        super().__init__(message, field=field, details=details)
        self.value = value
        self.expected_type = expected_type

HexDecodingError

Bases: EncodingError

Source code in src/pyocmf/exceptions.py
class HexDecodingError(EncodingError):
    pass

Base64DecodingError

Bases: EncodingError

Source code in src/pyocmf/exceptions.py
class Base64DecodingError(EncodingError):
    pass

Cryptography Errors

CryptoError

Bases: PyOCMFError

Source code in src/pyocmf/exceptions.py
class CryptoError(PyOCMFError):
    pass

SignatureVerificationError

Bases: CryptoError

Source code in src/pyocmf/exceptions.py
class SignatureVerificationError(CryptoError):
    def __init__(
        self,
        message: str,
        *,
        reason: str | None = None,
        field: str | None = None,
        details: list[dict] | None = None,
    ) -> None:
        super().__init__(message, field=field, details=details)
        self.reason = reason

PublicKeyError

Bases: CryptoError

Source code in src/pyocmf/exceptions.py
class PublicKeyError(CryptoError):
    def __init__(
        self,
        message: str,
        *,
        key_data: str | None = None,
        field: str | None = None,
        details: list[dict] | None = None,
    ) -> None:
        super().__init__(message, field=field, details=details)
        self.key_data = key_data

Data Errors

DataNotFoundError

Bases: PyOCMFError

Source code in src/pyocmf/exceptions.py
class DataNotFoundError(PyOCMFError):
    pass

XmlParsingError

Bases: PyOCMFError

Source code in src/pyocmf/exceptions.py
class XmlParsingError(PyOCMFError):
    pass