diff --git a/CHANGELOG.rst b/CHANGELOG.rst index f20e574ce4fd..3d285b638ae2 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -26,6 +26,10 @@ Changelog :class:`~cryptography.hazmat.primitives.hpke.KEM` so callers can split the encapsulated key from the ciphertext returned by :meth:`~cryptography.hazmat.primitives.hpke.Suite.encrypt`. +* Added support for using :class:`~cryptography.x509.Certificate`, + :class:`~cryptography.x509.CertificateSigningRequest`, and + :class:`~cryptography.x509.CertificateRevocationList` as field types in + :doc:`/hazmat/asn1/index` structures. .. _v48-0-0: diff --git a/docs/hazmat/asn1/reference.rst b/docs/hazmat/asn1/reference.rst index 1ebd22d5faa4..0f94772620aa 100644 --- a/docs/hazmat/asn1/reference.rst +++ b/docs/hazmat/asn1/reference.rst @@ -77,6 +77,24 @@ The following built-in Python types are supported as ASN.1 field types: Additionally, :class:`~cryptography.x509.ObjectIdentifier` maps to ``OBJECT IDENTIFIER``. +.. versionadded:: 49.0.0 + +:class:`~cryptography.x509.Certificate`, +:class:`~cryptography.x509.CertificateSigningRequest`, and +:class:`~cryptography.x509.CertificateRevocationList` can also be used as +field types. They are encoded by embedding their DER serialization, and +decoded by parsing the field as the corresponding X.509 object. These +fields cannot have :class:`Implicit` annotations. + +.. code-block:: python + + from cryptography import x509 + from cryptography.hazmat import asn1 + + @asn1.sequence + class Example: + cert: x509.Certificate + The following decorators and types are provided for the rest of the ASN.1 types that have no direct Python equivalent: diff --git a/src/cryptography/hazmat/asn1/asn1.py b/src/cryptography/hazmat/asn1/asn1.py index aa18d7e5aa52..a308d69db7dd 100644 --- a/src/cryptography/hazmat/asn1/asn1.py +++ b/src/cryptography/hazmat/asn1/asn1.py @@ -18,6 +18,7 @@ LiteralString = typing.LiteralString from cryptography.hazmat.bindings._rust import declarative_asn1 +from cryptography.hazmat.bindings._rust import x509 as rust_x509 if sys.version_info < (3, 10): NoneType = type(None) @@ -56,6 +57,25 @@ class Variant(typing.Generic[U, Tag]): encode_der = declarative_asn1.encode_der +_X509_TYPES = ( + rust_x509.Certificate, + rust_x509.CertificateSigningRequest, + rust_x509.CertificateRevocationList, +) + + +def _check_x509_field_annotations( + field_type: typing.Any, + annotation: declarative_asn1.Annotation, + field_name: str, +) -> None: + if field_type in _X509_TYPES and isinstance(annotation.encoding, Implicit): + raise TypeError( + f"field '{field_name}' has an IMPLICIT annotation, but " + "IMPLICIT annotations are not supported for X.509 types." + ) + + def _is_union(field_type: type) -> bool: # NOTE: types.UnionType for `T | U`, typing.Union for `Union[T, U]`. # TODO: Drop the `hasattr()` once the minimum supported Python version @@ -144,6 +164,8 @@ def _normalize_field_type( "DEFAULT annotations are not supported for TLV types." ) + _check_x509_field_annotations(field_type, annotation, field_name) + if hasattr(field_type, "__asn1_root__"): root_type = field_type.__asn1_root__ if not isinstance( @@ -166,6 +188,11 @@ def _normalize_field_type( "optional TLV types (`TLV | None`) are not " "currently supported" ) + # For optional types, the annotation is associated with the + # union, so we check it against the inner type here. + _check_x509_field_annotations( + optional_type, annotation, field_name + ) annotated_type = _normalize_field_type(optional_type, field_name) if not annotated_type.annotation.is_empty(): diff --git a/src/rust/src/declarative_asn1/decode.rs b/src/rust/src/declarative_asn1/decode.rs index 01cfca900ce6..a945fa7bc38e 100644 --- a/src/rust/src/declarative_asn1/decode.rs +++ b/src/rust/src/declarative_asn1/decode.rs @@ -192,6 +192,59 @@ fn decode_tlv<'a>( )?) } +// Reads a TLV from `parser` and returns its full data as `PyBytes`, +// suitable for passing to the X.509 loaders. +fn decode_x509_der_bytes<'a>( + py: pyo3::Python<'a>, + parser: &mut Parser<'a>, + encoding: &Option>, +) -> ParseResult> { + let tlv = match encoding { + Some(e) => match e.get() { + Encoding::Implicit(_) => Err(CryptographyError::Py( + // We don't support IMPLICIT X.509 fields + // (they are caught first at the Python level) + pyo3::exceptions::PyValueError::new_err( + "invalid type definition: X.509 fields cannot be implicitly encoded", + ), + ))?, + Encoding::Explicit(n) => parser.read_explicit_element::>(*n), + }, + None => parser.read_element::>(), + }?; + Ok(pyo3::types::PyBytes::new(py, tlv.full_data()).unbind()) +} + +fn decode_certificate<'a>( + py: pyo3::Python<'a>, + parser: &mut Parser<'a>, + encoding: &Option>, +) -> ParseResult> { + let data = decode_x509_der_bytes(py, parser, encoding)?; + let cert = crate::x509::certificate::load_der_x509_certificate(py, data, None)?; + Ok(pyo3::Bound::new(py, cert)?) +} + +fn decode_csr<'a>( + py: pyo3::Python<'a>, + parser: &mut Parser<'a>, + encoding: &Option>, +) -> ParseResult> { + let data = decode_x509_der_bytes(py, parser, encoding)?; + let csr = crate::x509::csr::load_der_x509_csr(py, data, None)?; + Ok(pyo3::Bound::new(py, csr)?) +} + +fn decode_crl<'a>( + py: pyo3::Python<'a>, + parser: &mut Parser<'a>, + encoding: &Option>, +) -> ParseResult> { + let data = decode_x509_der_bytes(py, parser, encoding)?; + let crl = crate::x509::crl::load_der_x509_crl(py, data, None)?; + Ok(pyo3::Bound::new(py, crl)?) +} + fn decode_null<'a>( py: pyo3::Python<'a>, parser: &mut Parser<'a>, @@ -379,6 +432,9 @@ pub(crate) fn decode_annotated_type<'a>( Type::BitString() => decode_bitstring(py, parser, annotation)?.into_any(), Type::Tlv() => decode_tlv(py, parser, encoding)?.into_any(), Type::Null() => decode_null(py, parser, encoding)?.into_any(), + Type::Certificate() => decode_certificate(py, parser, encoding)?.into_any(), + Type::CertificateSigningRequest() => decode_csr(py, parser, encoding)?.into_any(), + Type::CertificateRevocationList() => decode_crl(py, parser, encoding)?.into_any(), }; match &ann_type.annotation.get().default { @@ -434,4 +490,19 @@ mod tests { .contains("invalid type definition: TLV/ANY fields cannot be implicitly encoded")); }); } + + #[test] + fn test_decode_implicit_x509() { + pyo3::Python::initialize(); + pyo3::Python::attach(|py| { + let result = asn1::parse(&[], |parser| { + let encoding = pyo3::Py::new(py, Encoding::Implicit(0)).ok(); + super::decode_x509_der_bytes(py, parser, &encoding) + }); + assert!(result.is_err()); + let error = result.unwrap_err(); + assert!(format!("{error}") + .contains("invalid type definition: X.509 fields cannot be implicitly encoded")); + }); + } } diff --git a/src/rust/src/declarative_asn1/encode.rs b/src/rust/src/declarative_asn1/encode.rs index d3036ce820ad..6d6493645985 100644 --- a/src/rust/src/declarative_asn1/encode.rs +++ b/src/rust/src/declarative_asn1/encode.rs @@ -265,6 +265,30 @@ impl asn1::Asn1Writable for AnnotatedTypeObject<'_> { ), )), Type::Null() => Ok(write_value(writer, &(), encoding)?), + Type::Certificate() => { + let val = value.cast::()?; + Ok(write_value( + writer, + val.get().raw.borrow_dependent(), + encoding, + )?) + } + Type::CertificateSigningRequest() => { + let val = value.cast::()?; + Ok(write_value( + writer, + val.get().raw.borrow_dependent(), + encoding, + )?) + } + Type::CertificateRevocationList() => { + let val = value.cast::()?; + Ok(write_value( + writer, + val.get().owned.borrow_dependent(), + encoding, + )?) + } } } } diff --git a/src/rust/src/declarative_asn1/types.rs b/src/rust/src/declarative_asn1/types.rs index cdd5a311151f..4efbe75a849d 100644 --- a/src/rust/src/declarative_asn1/types.rs +++ b/src/rust/src/declarative_asn1/types.rs @@ -61,6 +61,16 @@ pub enum Type { Tlv(), /// NULL Null(), + + // X.509 types that we special-case to allow embedding them + // in ASN.1 structures. + // + /// `x509.Certificate` + Certificate(), + /// `x509.CertificateSigningRequest` + CertificateSigningRequest(), + /// `x509.CertificateRevocationList` + CertificateRevocationList(), } /// A type that we know how to encode/decode, along with any @@ -530,6 +540,12 @@ pub fn non_root_python_to_rust<'p>( Type::Tlv().into_pyobject(py) } else if class.is(Null::type_object(py)) { Type::Null().into_pyobject(py) + } else if class.is(crate::x509::certificate::Certificate::type_object(py)) { + Type::Certificate().into_pyobject(py) + } else if class.is(crate::x509::csr::CertificateSigningRequest::type_object(py)) { + Type::CertificateSigningRequest().into_pyobject(py) + } else if class.is(crate::x509::crl::CertificateRevocationList::type_object(py)) { + Type::CertificateRevocationList().into_pyobject(py) } else { Err(pyo3::exceptions::PyTypeError::new_err(format!( "cannot handle type: {class:?}" @@ -673,6 +689,12 @@ pub(crate) fn is_tag_valid_for_type( } } Type::Null() => check_tag_with_encoding(asn1::Null::TAG, encoding, tag), + // Certificates, CSRs, and CRLs are all SEQUENCEs + Type::Certificate() + | Type::CertificateSigningRequest() + | Type::CertificateRevocationList() => { + check_tag_with_encoding(asn1::Sequence::TAG, encoding, tag) + } } } diff --git a/src/rust/src/x509/crl.rs b/src/rust/src/x509/crl.rs index 54f2692b9756..8f51be6916b4 100644 --- a/src/rust/src/x509/crl.rs +++ b/src/rust/src/x509/crl.rs @@ -82,7 +82,7 @@ pub(crate) fn load_pem_x509_crl( } self_cell::self_cell!( - struct OwnedCertificateRevocationList { + pub(crate) struct OwnedCertificateRevocationList { owner: pyo3::Py, #[covariant] dependent: RawCertificateRevocationList, @@ -91,7 +91,7 @@ self_cell::self_cell!( #[pyo3::pyclass(frozen, module = "cryptography.hazmat.bindings._rust.x509")] pub(crate) struct CertificateRevocationList { - owned: OwnedCertificateRevocationList, + pub(crate) owned: OwnedCertificateRevocationList, revoked_certs: pyo3::sync::PyOnceLock>, cached_extensions: pyo3::sync::PyOnceLock>, diff --git a/src/rust/src/x509/csr.rs b/src/rust/src/x509/csr.rs index c6cf5fe04f91..55389c703fcf 100644 --- a/src/rust/src/x509/csr.rs +++ b/src/rust/src/x509/csr.rs @@ -17,7 +17,7 @@ use crate::x509::{certificate, sign}; use crate::{exceptions, types, x509}; self_cell::self_cell!( - struct OwnedCsr { + pub(crate) struct OwnedCsr { owner: pyo3::Py, #[covariant] @@ -27,7 +27,7 @@ self_cell::self_cell!( #[pyo3::pyclass(frozen, module = "cryptography.hazmat.bindings._rust.x509")] pub(crate) struct CertificateSigningRequest { - raw: OwnedCsr, + pub(crate) raw: OwnedCsr, cached_extensions: pyo3::sync::PyOnceLock>, cached_attributes: pyo3::sync::PyOnceLock>, } diff --git a/tests/hazmat/asn1/__init__.py b/tests/hazmat/asn1/__init__.py new file mode 100644 index 000000000000..b509336233c2 --- /dev/null +++ b/tests/hazmat/asn1/__init__.py @@ -0,0 +1,3 @@ +# This file is dual licensed under the terms of the Apache License, Version +# 2.0, and the BSD License. See the LICENSE file in the root of this repository +# for complete details. diff --git a/tests/hazmat/asn1/test_serialization.py b/tests/hazmat/asn1/test_serialization.py index 978c52b55ce7..7a60a2f6827e 100644 --- a/tests/hazmat/asn1/test_serialization.py +++ b/tests/hazmat/asn1/test_serialization.py @@ -4,6 +4,7 @@ import dataclasses import datetime +import os import re import sys import typing @@ -13,6 +14,9 @@ from cryptography import x509 from cryptography.hazmat import asn1 +from cryptography.hazmat.primitives.serialization import Encoding + +from ...utils import load_vectors_from_file U = typing.TypeVar("U") @@ -1918,3 +1922,169 @@ class Example: asn1.encode_der( Example(a=asn1.BitString(data=b"\xf0", padding_bits=4)) ) + + +def _der_length(n: int) -> bytes: + if n < 0x80: + return bytes([n]) + length_bytes = n.to_bytes((n.bit_length() + 7) // 8, "big") + return bytes([0x80 | len(length_bytes)]) + length_bytes + + +class TestX509Types: + @pytest.fixture + def cert(self) -> x509.Certificate: + return load_vectors_from_file( + filename=os.path.join("x509", "custom", "post2000utctime.pem"), + loader=lambda f: x509.load_pem_x509_certificate(f.read()), + mode="rb", + ) + + @pytest.fixture + def csr(self) -> x509.CertificateSigningRequest: + return load_vectors_from_file( + filename=os.path.join("x509", "requests", "rsa_sha1.pem"), + loader=lambda f: x509.load_pem_x509_csr(f.read()), + mode="rb", + ) + + @pytest.fixture + def crl(self) -> x509.CertificateRevocationList: + return load_vectors_from_file( + filename=os.path.join("x509", "custom", "crl_all_reasons.pem"), + loader=lambda f: x509.load_pem_x509_crl(f.read()), + mode="rb", + ) + + def test_certificate(self, cert: x509.Certificate) -> None: + assert_roundtrips([(cert, cert.public_bytes(Encoding.DER))]) + + def test_csr(self, csr: x509.CertificateSigningRequest) -> None: + assert_roundtrips([(csr, csr.public_bytes(Encoding.DER))]) + + def test_crl(self, crl: x509.CertificateRevocationList) -> None: + assert_roundtrips([(crl, crl.public_bytes(Encoding.DER))]) + + def test_fields( + self, + cert: x509.Certificate, + csr: x509.CertificateSigningRequest, + crl: x509.CertificateRevocationList, + ) -> None: + @asn1.sequence + @_comparable_dataclass + class Example: + cert: x509.Certificate + csr: x509.CertificateSigningRequest + crl: x509.CertificateRevocationList + + inner = ( + cert.public_bytes(Encoding.DER) + + csr.public_bytes(Encoding.DER) + + crl.public_bytes(Encoding.DER) + ) + expected = b"\x30" + _der_length(len(inner)) + inner + assert_roundtrips([(Example(cert=cert, csr=csr, crl=crl), expected)]) + + def test_certificate_explicit(self, cert: x509.Certificate) -> None: + @asn1.sequence + @_comparable_dataclass + class Example: + cert: Annotated[x509.Certificate, asn1.Explicit(0)] + + cert_der = cert.public_bytes(Encoding.DER) + inner = b"\xa0" + _der_length(len(cert_der)) + cert_der + expected = b"\x30" + _der_length(len(inner)) + inner + assert_roundtrips([(Example(cert=cert), expected)]) + + def test_fail_certificate_implicit(self) -> None: + with pytest.raises( + TypeError, + match=re.escape( + "IMPLICIT annotations are not supported for X.509 types" + ), + ): + + @asn1.sequence + class Example: + cert: Annotated[x509.Certificate, asn1.Implicit(0)] + + def test_fail_optional_certificate_implicit(self) -> None: + with pytest.raises( + TypeError, + match=re.escape( + "IMPLICIT annotations are not supported for X.509 types" + ), + ): + + @asn1.sequence + class Example: + cert: Annotated[ + typing.Union[x509.Certificate, None], asn1.Implicit(0) + ] + + def test_optional_certificate(self, cert: x509.Certificate) -> None: + @asn1.sequence + @_comparable_dataclass + class Example: + cert: typing.Union[x509.Certificate, None] + + cert_der = cert.public_bytes(Encoding.DER) + assert_roundtrips( + [ + ( + Example(cert=cert), + b"\x30" + _der_length(len(cert_der)) + cert_der, + ), + (Example(cert=None), b"\x30\x00"), + ] + ) + + def test_certificate_choice(self, cert: x509.Certificate) -> None: + @asn1.sequence + @_comparable_dataclass + class Example: + field: typing.Union[x509.Certificate, int] + + cert_der = cert.public_bytes(Encoding.DER) + assert_roundtrips( + [ + ( + Example(field=cert), + b"\x30" + _der_length(len(cert_der)) + cert_der, + ), + ( + Example(field=9), + b"\x30" + _der_length(3) + b"\x02\x01\x09", + ), + ] + ) + + def test_decode_invalid(self) -> None: + # Not even a valid TLV + with pytest.raises(ValueError): + asn1.decode_der(x509.Certificate, b"") + + # Wrong tag (INTEGER instead of SEQUENCE) + with pytest.raises(ValueError): + asn1.decode_der(x509.Certificate, b"\x02\x01\x00") + + # Valid SEQUENCEs, but not valid certificates/CSRs/CRLs + with pytest.raises(ValueError): + asn1.decode_der(x509.Certificate, b"\x30\x03\x02\x01\x00") + with pytest.raises(ValueError): + asn1.decode_der( + x509.CertificateSigningRequest, b"\x30\x03\x02\x01\x00" + ) + with pytest.raises(ValueError): + asn1.decode_der( + x509.CertificateRevocationList, b"\x30\x03\x02\x01\x00" + ) + + def test_encode_wrong_type(self) -> None: + @asn1.sequence + class Example: + cert: x509.Certificate + + with pytest.raises(TypeError): + asn1.encode_der(Example(cert=9)) # type: ignore[arg-type]