diff --git a/src/cryptography/hazmat/bindings/_rust/x509.pyi b/src/cryptography/hazmat/bindings/_rust/x509.pyi index 976ac3256ebd..b93f555277c7 100644 --- a/src/cryptography/hazmat/bindings/_rust/x509.pyi +++ b/src/cryptography/hazmat/bindings/_rust/x509.pyi @@ -201,6 +201,9 @@ class PolicyBuilder: def time(self, new_time: datetime.datetime) -> PolicyBuilder: ... def store(self, new_store: Store) -> PolicyBuilder: ... def max_chain_depth(self, new_max_chain_depth: int) -> PolicyBuilder: ... + def extension_policies( + self, new_ca_policy: ExtensionPolicy, new_ee_policy: ExtensionPolicy + ) -> PolicyBuilder: ... def build_client_verifier(self) -> ClientVerifier: ... def build_server_verifier( self, subject: x509.verification.Subject @@ -218,6 +221,50 @@ class Policy: @property def minimum_rsa_modulus(self) -> int: ... +class Criticality: + CRITICAL: Criticality + AGNOSTIC: Criticality + NON_CRITICAL: Criticality + +type MaybeExtensionValidatorCallback[T: x509.ExtensionType] = typing.Callable[ + [ + Policy, + x509.Certificate, + T | None, + ], + None, +] + +type PresentExtensionValidatorCallback[T: x509.ExtensionType] = ( + typing.Callable[ + [Policy, x509.Certificate, T], + None, + ] +) + +class ExtensionPolicy: + @staticmethod + def permit_all() -> ExtensionPolicy: ... + @staticmethod + def webpki_defaults_ca() -> ExtensionPolicy: ... + @staticmethod + def webpki_defaults_ee() -> ExtensionPolicy: ... + def require_not_present( + self, extension_type: type[x509.ExtensionType] + ) -> ExtensionPolicy: ... + def may_be_present[T: x509.ExtensionType]( + self, + extension_type: type[T], + criticality: Criticality, + validator: MaybeExtensionValidatorCallback[T] | None, + ) -> ExtensionPolicy: ... + def require_present[T: x509.ExtensionType]( + self, + extension_type: type[T], + criticality: Criticality, + validator: PresentExtensionValidatorCallback[T] | None, + ) -> ExtensionPolicy: ... + class VerifiedClient: @property def subjects(self) -> list[x509.GeneralName] | None: ... diff --git a/src/cryptography/x509/verification.py b/src/cryptography/x509/verification.py index f275a28aad53..2db4324d9615 100644 --- a/src/cryptography/x509/verification.py +++ b/src/cryptography/x509/verification.py @@ -11,6 +11,9 @@ __all__ = [ "ClientVerifier", + "Criticality", + "ExtensionPolicy", + "Policy", "PolicyBuilder", "ServerVerifier", "Store", @@ -26,4 +29,6 @@ ServerVerifier = rust_x509.ServerVerifier PolicyBuilder = rust_x509.PolicyBuilder Policy = rust_x509.Policy +ExtensionPolicy = rust_x509.ExtensionPolicy +Criticality = rust_x509.Criticality VerificationError = rust_x509.VerificationError diff --git a/src/rust/cryptography-x509-verification/src/lib.rs b/src/rust/cryptography-x509-verification/src/lib.rs index 07c53d4281bb..40c23fe8d66e 100644 --- a/src/rust/cryptography-x509-verification/src/lib.rs +++ b/src/rust/cryptography-x509-verification/src/lib.rs @@ -49,7 +49,7 @@ pub struct ValidationError<'chain, B: CryptoOps> { } impl<'chain, B: CryptoOps> ValidationError<'chain, B> { - pub(crate) fn new(kind: ValidationErrorKind<'chain, B>) -> Self { + pub fn new(kind: ValidationErrorKind<'chain, B>) -> Self { ValidationError { kind, cert: None } } diff --git a/src/rust/cryptography-x509-verification/src/policy/extension.rs b/src/rust/cryptography-x509-verification/src/policy/extension.rs index 88ebb45ea200..8e6f858e128b 100644 --- a/src/rust/cryptography-x509-verification/src/policy/extension.rs +++ b/src/rust/cryptography-x509-verification/src/policy/extension.rs @@ -16,6 +16,7 @@ use crate::{ ops::CryptoOps, policy::Policy, ValidationError, ValidationErrorKind, ValidationResult, }; +#[derive(Clone)] pub struct ExtensionPolicy<'cb, B: CryptoOps> { pub authority_information_access: ExtensionValidator<'cb, B>, pub authority_key_identifier: ExtensionValidator<'cb, B>, @@ -28,6 +29,27 @@ pub struct ExtensionPolicy<'cb, B: CryptoOps> { } impl<'cb, B: CryptoOps + 'cb> ExtensionPolicy<'cb, B> { + pub fn new_permit_all() -> Self { + const fn make_permissive_validator<'cb, B: CryptoOps + 'cb>() -> ExtensionValidator<'cb, B> + { + ExtensionValidator::MaybePresent { + criticality: Criticality::Agnostic, + validator: None, + } + } + + ExtensionPolicy { + authority_information_access: make_permissive_validator(), + authority_key_identifier: make_permissive_validator(), + subject_key_identifier: make_permissive_validator(), + key_usage: make_permissive_validator(), + subject_alternative_name: make_permissive_validator(), + basic_constraints: make_permissive_validator(), + name_constraints: make_permissive_validator(), + extended_key_usage: make_permissive_validator(), + } + } + pub fn new_default_webpki_ca() -> Self { ExtensionPolicy { // 5280 4.2.2.1: Authority Information Access @@ -214,6 +236,7 @@ impl<'cb, B: CryptoOps + 'cb> ExtensionPolicy<'cb, B> { } /// Represents different criticality states for an extension. +#[derive(Clone)] pub enum Criticality { /// The extension MUST be marked as critical. Critical, @@ -258,6 +281,7 @@ pub type MaybeExtensionValidatorCallback<'cb, B> = Arc< >; /// Represents different validation states for an extension. +#[derive(Clone)] pub enum ExtensionValidator<'cb, B: CryptoOps> { /// The extension MUST NOT be present. NotPresent, diff --git a/src/rust/src/lib.rs b/src/rust/src/lib.rs index bfe2f6f72ceb..636f6014edb5 100644 --- a/src/rust/src/lib.rs +++ b/src/rust/src/lib.rs @@ -135,8 +135,8 @@ mod _rust { use crate::x509::sct::Sct; #[pymodule_export] use crate::x509::verify::{ - PolicyBuilder, PyClientVerifier, PyPolicy, PyServerVerifier, PyStore, PyVerifiedClient, - VerificationError, + PolicyBuilder, PyClientVerifier, PyCriticality, PyExtensionPolicy, PyPolicy, + PyServerVerifier, PyStore, PyVerifiedClient, VerificationError, }; } diff --git a/src/rust/src/types.rs b/src/rust/src/types.rs index 5422e01f2f31..1dfa20ec8139 100644 --- a/src/rust/src/types.rs +++ b/src/rust/src/types.rs @@ -181,6 +181,9 @@ pub static REASON_FLAGS: LazyPyImport = LazyPyImport::new("cryptography.x509", & pub static ATTRIBUTE: LazyPyImport = LazyPyImport::new("cryptography.x509", &["Attribute"]); pub static ATTRIBUTES: LazyPyImport = LazyPyImport::new("cryptography.x509", &["Attributes"]); +pub static EXTENSION_TYPE: LazyPyImport = + LazyPyImport::new("cryptography.x509", &["ExtensionType"]); + pub static CRL_NUMBER: LazyPyImport = LazyPyImport::new("cryptography.x509", &["CRLNumber"]); pub static DELTA_CRL_INDICATOR: LazyPyImport = LazyPyImport::new("cryptography.x509", &["DeltaCRLIndicator"]); diff --git a/src/rust/src/x509/verify/extension_policy.rs b/src/rust/src/x509/verify/extension_policy.rs new file mode 100644 index 000000000000..cbb31ddb35b7 --- /dev/null +++ b/src/rust/src/x509/verify/extension_policy.rs @@ -0,0 +1,274 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use cryptography_x509::oid::{ + AUTHORITY_INFORMATION_ACCESS_OID, AUTHORITY_KEY_IDENTIFIER_OID, BASIC_CONSTRAINTS_OID, + EXTENDED_KEY_USAGE_OID, KEY_USAGE_OID, NAME_CONSTRAINTS_OID, SUBJECT_ALTERNATIVE_NAME_OID, + SUBJECT_KEY_IDENTIFIER_OID, +}; + +use cryptography_x509::extensions::Extension; + +use cryptography_x509_verification::ops::VerificationCertificate; +use cryptography_x509_verification::policy::{ + Criticality, ExtensionPolicy, ExtensionValidator, MaybeExtensionValidatorCallback, Policy, + PresentExtensionValidatorCallback, +}; +use cryptography_x509_verification::{ValidationError, ValidationErrorKind, ValidationResult}; +use pyo3::types::PyAnyMethods; +use pyo3::types::PyTypeMethods; +use pyo3::{intern, PyResult}; + +use crate::asn1::py_oid_to_oid; + +use crate::types; +use crate::x509::certificate::parse_cert_ext; +use crate::x509::certificate::Certificate as PyCertificate; + +use super::policy::PyPolicy; +use super::PyCryptoOps; + +#[pyo3::pyclass( + frozen, + eq, + module = "cryptography.x509.verification", + name = "Criticality" +)] +#[derive(PartialEq, Eq, Clone)] +pub(crate) enum PyCriticality { + #[pyo3(name = "CRITICAL")] + Critical, + #[pyo3(name = "AGNOSTIC")] + Agnostic, + #[pyo3(name = "NON_CRITICAL")] + NonCritical, +} + +impl From for Criticality { + fn from(criticality: PyCriticality) -> Criticality { + match criticality { + PyCriticality::Critical => Criticality::Critical, + PyCriticality::Agnostic => Criticality::Agnostic, + PyCriticality::NonCritical => Criticality::NonCritical, + } + } +} + +#[pyo3::pyclass( + frozen, + module = "cryptography.x509.verification", + name = "ExtensionPolicy" +)] +pub(crate) struct PyExtensionPolicy { + rust_policy: ExtensionPolicy<'static, PyCryptoOps>, + already_set_oids: HashSet, +} + +impl PyExtensionPolicy { + pub(super) fn clone_rust_policy(&self) -> ExtensionPolicy<'static, PyCryptoOps> { + self.rust_policy.clone() + } + + fn new(rust_policy: ExtensionPolicy<'static, PyCryptoOps>) -> Self { + PyExtensionPolicy { + rust_policy, + already_set_oids: HashSet::new(), + } + } + + fn check_duplicate_oid(&self, oid: &asn1::ObjectIdentifier) -> PyResult<()> { + if self.already_set_oids.contains(oid) { + return Err(pyo3::exceptions::PyValueError::new_err(format!( + "ExtensionPolicy already configured for extension with OID {oid}" + ))); + } + Ok(()) + } + + fn with_assigned_validator( + &self, + oid: asn1::ObjectIdentifier, + validator: ExtensionValidator<'static, PyCryptoOps>, + ) -> PyResult { + let mut policy = self.rust_policy.clone(); + match oid { + AUTHORITY_INFORMATION_ACCESS_OID => policy.authority_information_access = validator, + AUTHORITY_KEY_IDENTIFIER_OID => policy.authority_key_identifier = validator, + SUBJECT_KEY_IDENTIFIER_OID => policy.subject_key_identifier = validator, + KEY_USAGE_OID => policy.key_usage = validator, + SUBJECT_ALTERNATIVE_NAME_OID => policy.subject_alternative_name = validator, + BASIC_CONSTRAINTS_OID => policy.basic_constraints = validator, + NAME_CONSTRAINTS_OID => policy.name_constraints = validator, + EXTENDED_KEY_USAGE_OID => policy.extended_key_usage = validator, + _ => { + return Err(pyo3::exceptions::PyValueError::new_err(format!( + "Unsupported extension OID: {}", + oid + ))) + } + } + + let mut already_set_oids = self.already_set_oids.clone(); + already_set_oids.insert(oid); + Ok(PyExtensionPolicy { + rust_policy: policy, + already_set_oids, + }) + } +} + +fn oid_from_py_extension_type( + py: pyo3::Python<'_>, + extension_type: pyo3::Bound<'_, pyo3::types::PyType>, +) -> pyo3::PyResult { + if !extension_type.is_subclass(&types::EXTENSION_TYPE.get(py)?)? { + return Err(pyo3::exceptions::PyTypeError::new_err( + "extension_type must be a subclass of ExtensionType", + )); + } + + py_oid_to_oid(extension_type.getattr(intern!(py, "oid"))?) +} + +#[pyo3::pymethods] +impl PyExtensionPolicy { + #[staticmethod] + pub(crate) fn permit_all() -> Self { + PyExtensionPolicy::new(ExtensionPolicy::new_permit_all()) + } + + #[staticmethod] + pub(crate) fn webpki_defaults_ca() -> Self { + PyExtensionPolicy::new(ExtensionPolicy::new_default_webpki_ca()) + } + + #[staticmethod] + pub(crate) fn webpki_defaults_ee() -> Self { + PyExtensionPolicy::new(ExtensionPolicy::new_default_webpki_ee()) + } + + pub(crate) fn require_not_present( + &self, + py: pyo3::Python<'_>, + extension_type: pyo3::Bound<'_, pyo3::types::PyType>, + ) -> pyo3::PyResult { + let oid = oid_from_py_extension_type(py, extension_type)?; + self.check_duplicate_oid(&oid)?; + self.with_assigned_validator(oid, ExtensionValidator::NotPresent) + } + + #[pyo3(signature = (extension_type, criticality, validator_cb))] + pub(crate) fn may_be_present( + &self, + py: pyo3::Python<'_>, + extension_type: pyo3::Bound<'_, pyo3::types::PyType>, + criticality: PyCriticality, + validator_cb: Option, + ) -> pyo3::PyResult { + let oid = oid_from_py_extension_type(py, extension_type)?; + self.check_duplicate_oid(&oid)?; + self.with_assigned_validator( + oid, + ExtensionValidator::MaybePresent { + criticality: criticality.into(), + validator: validator_cb.map(make_rust_maybe_validator_cb), + }, + ) + } + + #[pyo3(signature = (extension_type, criticality, validator_cb))] + pub(crate) fn require_present( + &self, + py: pyo3::Python<'_>, + extension_type: pyo3::Bound<'_, pyo3::types::PyType>, + criticality: PyCriticality, + validator_cb: Option, + ) -> pyo3::PyResult { + let oid = oid_from_py_extension_type(py, extension_type)?; + self.check_duplicate_oid(&oid)?; + self.with_assigned_validator( + oid, + ExtensionValidator::Present { + criticality: criticality.into(), + validator: validator_cb.map(make_rust_present_validator_cb), + }, + ) + } +} + +fn make_rust_maybe_validator_cb( + py_cb: pyo3::PyObject, +) -> MaybeExtensionValidatorCallback<'static, PyCryptoOps> { + Arc::new( + move |policy: &Policy<'_, PyCryptoOps>, + cert: &VerificationCertificate<'_, PyCryptoOps>, + ext: Option<&Extension<'_>>| { + pyo3::Python::with_gil(|py| { + let args = make_python_callback_args(py, policy, cert, ext)?; + invoke_py_validator_callback(py, &py_cb, args) + }) + }, + ) +} + +fn make_rust_present_validator_cb( + py_cb: pyo3::PyObject, +) -> PresentExtensionValidatorCallback<'static, PyCryptoOps> { + Arc::new( + move |policy: &Policy<'_, PyCryptoOps>, + cert: &VerificationCertificate<'_, PyCryptoOps>, + ext: &Extension<'_>| { + pyo3::Python::with_gil(|py| { + let (policy, cert, ext) = make_python_callback_args(py, policy, cert, Some(ext))?; + invoke_py_validator_callback(py, &py_cb, (policy, cert, ext.unwrap())) + }) + }, + ) +} + +fn make_validation_error(msg: String) -> ValidationError<'static, PyCryptoOps> { + ValidationError::new(ValidationErrorKind::Other(msg)) +} + +type PyCallbackArgs<'p> = ( + pyo3::Py, + pyo3::Py, + Option>, +); + +fn make_python_callback_args<'chain, 'p>( + py: pyo3::Python<'p>, + policy: &Policy<'_, PyCryptoOps>, + cert: &VerificationCertificate<'chain, PyCryptoOps>, + ext: Option<&Extension<'p>>, +) -> ValidationResult<'chain, PyCallbackArgs<'p>, PyCryptoOps> { + let py_policy = policy.extra.clone_ref(py); + let py_cert = cert.extra().clone_ref(py); + let py_ext = match ext { + None => None, + Some(ext) => parse_cert_ext(py, ext).map_err(|e| { + make_validation_error(format!("{e} (while converting Extension to Python object)")) + })?, + }; + + Ok((py_policy, py_cert, py_ext)) +} + +fn invoke_py_validator_callback<'py>( + py: pyo3::Python<'py>, + py_cb: &pyo3::PyObject, + args: impl pyo3::IntoPyObject<'py, Target = pyo3::types::PyTuple>, +) -> ValidationResult<'static, (), PyCryptoOps> { + let result = py_cb + .bind(py) + .call1(args) + .map_err(|e| make_validation_error(format!("Python extension validator failed: {}", e)))?; + + if !result.is_none() { + Err(make_validation_error( + "Python validator must return None.".to_string(), + )) + } else { + Ok(()) + } +} diff --git a/src/rust/src/x509/verify/mod.rs b/src/rust/src/x509/verify/mod.rs index 1f15c7bba561..4de584d5403c 100644 --- a/src/rust/src/x509/verify/mod.rs +++ b/src/rust/src/x509/verify/mod.rs @@ -11,6 +11,7 @@ use cryptography_x509_verification::trust_store::Store; use cryptography_x509_verification::types::{DNSName, IPAddress}; use pyo3::types::{PyAnyMethods, PyListMethods}; +mod extension_policy; mod policy; use super::parse_general_names; use crate::backend::keys; @@ -20,8 +21,10 @@ use crate::utils::cstr_from_literal; use crate::x509::certificate::Certificate as PyCertificate; use crate::x509::common::{datetime_now, py_to_datetime}; use crate::x509::sign; +pub(crate) use extension_policy::{PyCriticality, PyExtensionPolicy}; pub(crate) use policy::PyPolicy; +#[derive(Clone)] pub(crate) struct PyCryptoOps {} impl CryptoOps for PyCryptoOps { @@ -82,6 +85,8 @@ pub(crate) struct PolicyBuilder { time: Option, store: Option>, max_chain_depth: Option, + ca_ext_policy: Option>, + ee_ext_policy: Option>, } impl PolicyBuilder { @@ -90,6 +95,8 @@ impl PolicyBuilder { time: self.time.clone(), store: self.store.as_ref().map(|s| s.clone_ref(py)), max_chain_depth: self.max_chain_depth, + ca_ext_policy: self.ca_ext_policy.as_ref().map(|p| p.clone_ref(py)), + ee_ext_policy: self.ee_ext_policy.as_ref().map(|p| p.clone_ref(py)), } } } @@ -102,6 +109,8 @@ impl PolicyBuilder { time: None, store: None, max_chain_depth: None, + ca_ext_policy: None, + ee_ext_policy: None, } } @@ -144,6 +153,22 @@ impl PolicyBuilder { }) } + fn extension_policies( + &self, + py: pyo3::Python<'_>, + new_ca_policy: pyo3::Py, + new_ee_policy: pyo3::Py, + ) -> CryptographyResult { + // Enough to check one of the two, since they can only be set together. + policy_builder_set_once_check!(self, ca_ext_policy, "extension policies"); + + Ok(PolicyBuilder { + ca_ext_policy: Some(new_ca_policy), + ee_ext_policy: Some(new_ee_policy), + ..self.py_clone(py) + }) + } + fn build_client_verifier(&self, py: pyo3::Python<'_>) -> CryptographyResult { let store = match self.store.as_ref() { Some(s) => s.clone_ref(py), @@ -162,8 +187,17 @@ impl PolicyBuilder { }; let policy_definition = OwnedPolicyDefinition::new(None, |_subject| { - // TODO: Pass extension policies here once implemented in cryptography-x509-verification. - PolicyDefinition::client(PyCryptoOps {}, time, self.max_chain_depth, None, None) + PolicyDefinition::client( + PyCryptoOps {}, + time, + self.max_chain_depth, + self.ca_ext_policy + .as_ref() + .map(|p| p.get().clone_rust_policy()), + self.ee_ext_policy + .as_ref() + .map(|p| p.get().clone_rust_policy()), + ) }); let py_policy = PyPolicy { @@ -208,14 +242,17 @@ impl PolicyBuilder { .expect("subject_owner for ServerVerifier can not be None"), )?; - // TODO: Pass extension policies here once implemented in cryptography-x509-verification. Ok::, pyo3::PyErr>(PolicyDefinition::server( PyCryptoOps {}, subject, time, self.max_chain_depth, - None, - None, + self.ca_ext_policy + .as_ref() + .map(|p| p.get().clone_rust_policy()), + self.ee_ext_policy + .as_ref() + .map(|p| p.get().clone_rust_policy()), )) })?; @@ -345,22 +382,24 @@ impl PyClientVerifier { py_chain.append(c.extra())?; } - // NOTE: These `unwrap()`s cannot fail, since the underlying policy - // enforces the presence of a SAN and the well-formedness of the - // extension set. - let leaf_san = &chain[0] + // NOTE: The `unwrap()` cannot fail, since the underlying policy + // enforces the well-formedness of the extension set. + let subjects = match &chain[0] .certificate() .extensions() .ok() .unwrap() .get_extension(&SUBJECT_ALTERNATIVE_NAME_OID) - .unwrap(); - - let leaf_gns = leaf_san.value::>()?; - let py_gns = parse_general_names(py, &leaf_gns)?; + { + Some(leaf_san) => { + let leaf_gns = leaf_san.value::>()?; + Some(parse_general_names(py, &leaf_gns)?.unbind()) + } + None => None, + }; Ok(PyVerifiedClient { - subjects: Some(py_gns.into()), + subjects, chain: py_chain.unbind(), }) } @@ -534,3 +573,15 @@ impl PyStore { }) } } + +#[cfg(test)] +mod tests { + use super::PyCryptoOps; + + #[test] + fn test_crypto_ops_clone() { + // Just for coverage. + // The trait is needed to be able to clone ExtensionPolicy<'_, PyCryptoOps>. + let _ = PyCryptoOps {}.clone(); + } +} diff --git a/tests/x509/verification/test_verification.py b/tests/x509/verification/test_verification.py index 95c05fc532bc..9fb095223198 100644 --- a/tests/x509/verification/test_verification.py +++ b/tests/x509/verification/test_verification.py @@ -6,13 +6,18 @@ import os from functools import lru_cache from ipaddress import IPv4Address +from typing import Optional, Type import pytest from cryptography import utils, x509 from cryptography.hazmat._oid import ExtendedKeyUsageOID +from cryptography.x509 import ExtensionType from cryptography.x509.general_name import DNSName, IPAddress from cryptography.x509.verification import ( + Criticality, + ExtensionPolicy, + Policy, PolicyBuilder, Store, VerificationError, @@ -209,6 +214,38 @@ def test_verify_fails_renders_oid(self): ): verifier.verify(leaf, []) + def test_custom_ext_policy_no_san(self): + leaf = _load_cert( + os.path.join("x509", "custom", "no_sans.pem"), + x509.load_pem_x509_certificate, + ) + + store = Store([leaf]) + validation_time = datetime.datetime.fromisoformat( + "2025-04-14T00:00:00+00:00" + ) + + builder = PolicyBuilder().store(store) + builder = builder.time(validation_time) + + with pytest.raises( + VerificationError, + match="missing required extension", + ): + builder.build_client_verifier().verify(leaf, []) + + ee_extension_policy = ExtensionPolicy.webpki_defaults_ee() + ee_extension_policy = ee_extension_policy.require_not_present( + x509.SubjectAlternativeName + ) + + builder = builder.extension_policies( + ExtensionPolicy.webpki_defaults_ca(), ee_extension_policy + ) + + verified_client = builder.build_client_verifier().verify(leaf, []) + assert verified_client.subjects is None + class TestServerVerifier: @pytest.mark.parametrize( @@ -261,3 +298,262 @@ def test_error_message(self): match=r"", ): verifier.verify(leaf, []) + + +SUPPORTED_EXTENSION_TYPES = ( + x509.AuthorityInformationAccess, + x509.AuthorityKeyIdentifier, + x509.SubjectKeyIdentifier, + x509.KeyUsage, + x509.SubjectAlternativeName, + x509.BasicConstraints, + x509.NameConstraints, + x509.ExtendedKeyUsage, +) + + +class TestCustomExtensionPolicies: + leaf = _load_cert( + os.path.join("x509", "cryptography.io.pem"), + x509.load_pem_x509_certificate, + ) + ca = _load_cert( + os.path.join("x509", "rapidssl_sha256_ca_g3.pem"), + x509.load_pem_x509_certificate, + ) + store = Store([ca]) + validation_time = datetime.datetime.fromisoformat( + "2018-11-16T00:00:00+00:00" + ) + + def test_builder_methods(self): + ext_policy = ExtensionPolicy.permit_all() + ext_policy = ext_policy.require_not_present(x509.BasicConstraints) + with pytest.raises( + ValueError, match=x509.BasicConstraints.oid.dotted_string + ): + ext_policy.require_not_present(x509.BasicConstraints) + with pytest.raises( + ValueError, match=x509.BasicConstraints.oid.dotted_string + ): + ext_policy.may_be_present( + x509.BasicConstraints, Criticality.NON_CRITICAL, None + ) + with pytest.raises( + ValueError, match=x509.BasicConstraints.oid.dotted_string + ): + ext_policy.require_present( + x509.BasicConstraints, Criticality.CRITICAL, None + ) + + with pytest.raises(TypeError): + + class _Extension: + pass + + ext_policy.require_present( + _Extension, # type: ignore + Criticality.AGNOSTIC, + None, + ) + + def test_unsupported_extension(self): + ext_policy = ExtensionPolicy.permit_all() + message = ( + f"Unsupported extension OID: {x509.Admissions.oid.dotted_string}" + ) + with pytest.raises( + ValueError, + match=message, + ): + ext_policy.may_be_present( + x509.Admissions, + Criticality.AGNOSTIC, + None, + ) + + @staticmethod + def _make_validator_cb(extension_type: Type[ExtensionType]): + def validator_cb(policy, cert, ext: Optional[ExtensionType]): + assert isinstance(policy, Policy) + assert ( + policy.validation_time + == TestCustomExtensionPolicies.validation_time.replace( + tzinfo=None + ) + ) + assert isinstance(cert, x509.Certificate) + assert ext is None or isinstance(ext, extension_type) + + return validator_cb + + def test_require_not_present(self): + default_ee = ExtensionPolicy.webpki_defaults_ee() + no_basic_constraints_ee = default_ee.require_not_present( + x509.BasicConstraints + ) + + default_builder = ( + PolicyBuilder().store(self.store).time(self.validation_time) + ) + builder_no_basic_constraints = default_builder.extension_policies( + ExtensionPolicy.webpki_defaults_ca(), no_basic_constraints_ee + ) + + default_builder.build_client_verifier().verify(self.leaf, []) + + with pytest.raises( + VerificationError, + match="Certificate contains prohibited extension", + ): + builder_no_basic_constraints.build_client_verifier().verify( + self.leaf, [] + ) + + def test_require_present(self): + default_builder = ( + PolicyBuilder().store(self.store).time(self.validation_time) + ) + builder_require_subject_keyid = default_builder.extension_policies( + ExtensionPolicy.webpki_defaults_ca(), + ExtensionPolicy.webpki_defaults_ee().require_present( + x509.SubjectKeyIdentifier, + Criticality.AGNOSTIC, + self._make_validator_cb(x509.SubjectKeyIdentifier), + ), + ) + builder_require_san = default_builder.extension_policies( + ExtensionPolicy.webpki_defaults_ca(), + ExtensionPolicy.webpki_defaults_ee().require_present( + x509.SubjectAlternativeName, + Criticality.AGNOSTIC, + self._make_validator_cb(x509.SubjectAlternativeName), + ), + ) + + default_builder.build_client_verifier().verify(self.leaf, []) + builder_require_san.build_client_verifier().verify(self.leaf, []) + + with pytest.raises( + VerificationError, + match="missing required extension", + ): + builder_require_subject_keyid.build_client_verifier().verify( + self.leaf, [] + ) + + def test_criticality_constraints(self): + builder = PolicyBuilder().store(self.store).time(self.validation_time) + noncrit_key_usage_builder = builder.extension_policies( + ExtensionPolicy.webpki_defaults_ca(), + ExtensionPolicy.webpki_defaults_ee().require_present( + x509.KeyUsage, Criticality.NON_CRITICAL, None + ), + ) + critical_eku_builder = builder.extension_policies( + ExtensionPolicy.webpki_defaults_ca(), + ExtensionPolicy.webpki_defaults_ee().require_present( + x509.ExtendedKeyUsage, Criticality.CRITICAL, None + ), + ) + + def make_pattern(extension_type: Type[ExtensionType]): + return ( + f"invalid extension: {extension_type.oid.dotted_string}:" + " Certificate extension has incorrect criticality" + ) + + builder.build_client_verifier().verify(self.leaf, []) + with pytest.raises( + VerificationError, + match=make_pattern(x509.KeyUsage), + ): + noncrit_key_usage_builder.build_client_verifier().verify( + self.leaf, [] + ) + with pytest.raises( + VerificationError, + match=make_pattern(x509.ExtendedKeyUsage), + ): + critical_eku_builder.build_client_verifier().verify(self.leaf, []) + + @pytest.mark.parametrize( + "extension_type", + SUPPORTED_EXTENSION_TYPES, + ) + def test_custom_cb_pass(self, extension_type: Type[x509.ExtensionType]): + ca_ext_policy = ExtensionPolicy.webpki_defaults_ca() + ee_ext_policy = ExtensionPolicy.webpki_defaults_ee() + + ee_ext_policy = ee_ext_policy.may_be_present( + extension_type, + Criticality.AGNOSTIC, + self._make_validator_cb(extension_type), + ) + + builder = PolicyBuilder().store(self.store) + builder = builder.time(self.validation_time).max_chain_depth(16) + builder = builder.extension_policies(ca_ext_policy, ee_ext_policy) + + builder.build_client_verifier().verify(self.leaf, []) + + path = builder.build_server_verifier( + DNSName("cryptography.io") + ).verify(self.leaf, []) + assert path == [self.leaf, self.ca] + + @pytest.mark.parametrize( + "extension_type", + SUPPORTED_EXTENSION_TYPES, + ) + def test_custom_cb_exception_fails_verification(self, extension_type): + ca_ext_policy = ExtensionPolicy.webpki_defaults_ca() + ee_ext_policy = ExtensionPolicy.webpki_defaults_ee() + + def validator(*_): + raise ValueError("test") + + ca_ext_policy = ca_ext_policy.may_be_present( + extension_type, + Criticality.AGNOSTIC, + validator, + ) + + builder = PolicyBuilder().store(self.store).time(self.validation_time) + builder = builder.extension_policies(ca_ext_policy, ee_ext_policy) + + for verifier in ( + builder.build_client_verifier(), + builder.build_server_verifier(DNSName("cryptography.io")), + ): + with pytest.raises( + VerificationError, + match="Python extension validator failed: ValueError: test", + ): + verifier.verify(self.leaf, []) + + def test_custom_cb_no_retval_enforced(self): + ca_ext_policy = ExtensionPolicy.webpki_defaults_ca() + ee_ext_policy = ExtensionPolicy.webpki_defaults_ee() + + def validator(*_): + return False + + ee_ext_policy = ee_ext_policy.may_be_present( + x509.ExtendedKeyUsage, + Criticality.AGNOSTIC, + validator, + ) + + builder = PolicyBuilder().store(self.store).time(self.validation_time) + builder = builder.extension_policies(ca_ext_policy, ee_ext_policy) + + for verifier in ( + builder.build_client_verifier(), + builder.build_server_verifier(DNSName("cryptography.io")), + ): + with pytest.raises( + VerificationError, + match="Python validator must return None.", + ): + verifier.verify(self.leaf, [])