From c84c55324998e627ac14b743c3efdcd84ffb6862 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Thu, 28 Jul 2022 11:09:20 +0200 Subject: [PATCH 1/6] gh-95341: Implement tls-exporter channel bindings and export key materials --- Lib/ssl.py | 16 ++++++- Lib/test/test_ssl.py | 90 +++++++++++++++++++++++++------------- Modules/_ssl.c | 97 +++++++++++++++++++++++++++++++++++++---- Modules/clinic/_ssl.c.h | 41 ++++++++++++++++- 4 files changed, 202 insertions(+), 42 deletions(-) diff --git a/Lib/ssl.py b/Lib/ssl.py index 1d5873726441e4..57e19a5de13924 100644 --- a/Lib/ssl.py +++ b/Lib/ssl.py @@ -267,7 +267,7 @@ class _TLSMessageType: socket_error = OSError # keep that public name in module namespace -CHANNEL_BINDING_TYPES = ['tls-unique'] +CHANNEL_BINDING_TYPES = ['tls-unique', 'tls-exporter'] HAS_NEVER_CHECK_COMMON_NAME = hasattr(_ssl, 'HOSTFLAG_NEVER_CHECK_SUBJECT') @@ -926,6 +926,13 @@ def get_channel_binding(self, cb_type="tls-unique"): or None if the data is not available (e.g. before the handshake).""" return self._sslobj.get_channel_binding(cb_type) + def export_keying_material(self, length, label, context=None): + """Export keying material for current connection + + See RFC 5705 (for TLS 1.2) and RFC 8446 (for TLS 1.3) + """ + return self._sslobj.export_keying_material(length, label, context) + def version(self): """Return a string identifying the protocol version used by the current SSL channel. """ @@ -1344,6 +1351,13 @@ def get_channel_binding(self, cb_type="tls-unique"): ) return None + @_sslcopydoc + def export_keying_material(self, length, label, context=None): + if self._sslobj is not None: + return self._sslobj.export_keying_material(length, label, context) + else: + return None + @_sslcopydoc def version(self): if self._sslobj is not None: diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 5007e08f321b5a..4446ce76db3a29 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -668,17 +668,17 @@ def test_unknown_channel_binding(self): ss.get_channel_binding("unknown-type") s.close() - @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES, - "'tls-unique' channel binding not available") - def test_tls_unique_channel_binding(self): + def test_tls_channel_binding(self): # unconnected should return None for known type s = socket.socket(socket.AF_INET) with test_wrap_socket(s) as ss: self.assertIsNone(ss.get_channel_binding("tls-unique")) + self.assertIsNone(ss.get_channel_binding("tls-exporter")) # the same for server-side s = socket.socket(socket.AF_INET) with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss: self.assertIsNone(ss.get_channel_binding("tls-unique")) + self.assertIsNone(ss.get_channel_binding("tls-exporter")) def test_dealloc_warn(self): ss = test_wrap_socket(socket.socket(socket.AF_INET)) @@ -2086,15 +2086,15 @@ def test_bio_handshake(self): self.assertIsNone(sslobj.version()) self.assertIsNotNone(sslobj.shared_ciphers()) self.assertRaises(ValueError, sslobj.getpeercert) - if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: - self.assertIsNone(sslobj.get_channel_binding('tls-unique')) + for cb in ssl.CHANNEL_BINDING_TYPES: + self.assertIsNone(sslobj.get_channel_binding(cb)) self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake) self.assertTrue(sslobj.cipher()) self.assertIsNotNone(sslobj.shared_ciphers()) self.assertIsNotNone(sslobj.version()) self.assertTrue(sslobj.getpeercert()) - if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: - self.assertTrue(sslobj.get_channel_binding('tls-unique')) + self.assertTrue(sslobj.get_channel_binding("tls-unique")) + self.assertTrue(sslobj.get_channel_binding("tls-exporter")) try: self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap) except ssl.SSLSyscallError: @@ -2317,6 +2317,12 @@ def run(self): sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n") data = self.sslconn.get_channel_binding("tls-unique") self.write(repr(data).encode("us-ascii") + b"\n") + elif stripped == b'CB tls-exporter': + if support.verbose and self.server.connectionchatty: + sys.stdout.write(" server: read CB tls-exporter from client, sending our CB data...\n") + data = self.sslconn.get_channel_binding("tls-exporter") + self.write(repr(data).encode("us-ascii") + b"\n") + elif stripped == b'PHA': if support.verbose and self.server.connectionchatty: sys.stdout.write(" server: initiating post handshake auth\n") @@ -3737,14 +3743,30 @@ def test_default_ecdh_curve(self): s.connect((HOST, server.port)) self.assertIn("ECDH", s.cipher()[0]) - @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES, - "'tls-unique' channel binding not available") - def test_tls_unique_channel_binding(self): - """Test tls-unique channel binding.""" + def test_tls_channel_binding_unique_tlsv1_2(self): + self._test_tls_channel_binding(ssl.TLSVersion.TLSv1_2, "tls-unique", 12) + + def test_tls_channel_binding_unique_tlsv1_2(self): + self._test_tls_channel_binding(ssl.TLSVersion.TLSv1_3, "tls-unique", 48) + + def test_tls_channel_binding_exporter_tlsv1_2(self): + self._test_tls_channel_binding( + ssl.TLSVersion.TLSv1_2, "tls-exporter", 32, "EXPORTER-Channel-Binding" + ) + + def test_tls_channel_binding_exporter_tlsv1_3(self): + self._test_tls_channel_binding( + ssl.TLSVersion.TLSv1_3, "tls-exporter", 32, "EXPORTER-Channel-Binding" + ) + + def _test_tls_channel_binding(self, version, cb, cb_size, ekm=None): + """Test tls-unique and tls-export channel binding.""" if support.verbose: sys.stdout.write("\n") client_context, server_context, hostname = testing_context() + server_context.minimum_version = version + server_context.maximum_version = version server = ThreadedEchoServer(context=server_context, chatty=True, @@ -3756,22 +3778,31 @@ def test_tls_unique_channel_binding(self): server_hostname=hostname) as s: s.connect((HOST, server.port)) # get the data - cb_data = s.get_channel_binding("tls-unique") + cb_data = s.get_channel_binding(cb) if support.verbose: sys.stdout.write( - " got channel binding data: {0!r}\n".format(cb_data)) + f" got {cb} channel binding data: {cb_data!r}\n" + ) + + if ekm: + cb_ekm = s.export_keying_material(cb_size, ekm, "") + self.assertEqual(cb_ekm, cb_data) + # TLS 1.3: empty and no context result in equal values + # other: empty context and no context result in different values + cb_ekm_no_context = s.export_keying_material(cb_size, ekm, None) + if version == ssl.TLSVersion.TLSv1_3: + self.assertEqual(cb_data, cb_ekm_no_context) + else: + self.assertNotEqual(cb_data, cb_ekm_no_context) # check if it is sane self.assertIsNotNone(cb_data) - if s.version() == 'TLSv1.3': - self.assertEqual(len(cb_data), 48) - else: - self.assertEqual(len(cb_data), 12) # True for TLSv1 + self.assertEqual(len(cb_data), cb_size) # and compare with the peers version - s.write(b"CB tls-unique\n") - peer_data_repr = s.read().strip() - self.assertEqual(peer_data_repr, + s.write(f"CB {cb}\n".encode("ascii")) + peer_data = s.read().strip() + self.assertEqual(peer_data, repr(cb_data).encode("us-ascii")) # now, again @@ -3779,22 +3810,19 @@ def test_tls_unique_channel_binding(self): socket.socket(), server_hostname=hostname) as s: s.connect((HOST, server.port)) - new_cb_data = s.get_channel_binding("tls-unique") + new_cb_data = s.get_channel_binding(cb) if support.verbose: sys.stdout.write( - "got another channel binding data: {0!r}\n".format( - new_cb_data) + f" got another {cb} channel binding data: {new_cb_data!r}\n" ) # is it really unique self.assertNotEqual(cb_data, new_cb_data) - self.assertIsNotNone(cb_data) - if s.version() == 'TLSv1.3': - self.assertEqual(len(cb_data), 48) - else: - self.assertEqual(len(cb_data), 12) # True for TLSv1 - s.write(b"CB tls-unique\n") - peer_data_repr = s.read().strip() - self.assertEqual(peer_data_repr, + self.assertIsNotNone(new_cb_data) + self.assertEqual(len(cb_data), cb_size) + + s.write(f"CB {cb}\n".encode("ascii")) + peer_unique= s.read().strip() + self.assertEqual(peer_unique, repr(new_cb_data).encode("us-ascii")) def test_compression(self): diff --git a/Modules/_ssl.c b/Modules/_ssl.c index bf8bd9dea89b6b..a93fd302bdbbe4 100644 --- a/Modules/_ssl.c +++ b/Modules/_ssl.c @@ -2699,10 +2699,9 @@ _ssl__SSLSocket_get_channel_binding_impl(PySSLSocket *self, const char *cb_type) /*[clinic end generated code: output=34bac9acb6a61d31 input=08b7e43b99c17d41]*/ { - char buf[PySSL_CB_MAXLEN]; - size_t len; - if (strcmp(cb_type, "tls-unique") == 0) { + char buf[PySSL_CB_MAXLEN]; + size_t len; if (SSL_session_reused(self->ssl) ^ !self->socket_type) { /* if session is resumed XOR we are the client */ len = SSL_get_finished(self->ssl, buf, PySSL_CB_MAXLEN); @@ -2711,8 +2710,45 @@ _ssl__SSLSocket_get_channel_binding_impl(PySSLSocket *self, /* if a new session XOR we are the server */ len = SSL_get_peer_finished(self->ssl, buf, PySSL_CB_MAXLEN); } - } - else { + /* It cannot be negative in current OpenSSL version as of July 2011 */ + if (len == 0) + Py_RETURN_NONE; + + return PyBytes_FromStringAndSize(buf, len); + } else if (strcmp(cb_type, "tls-exporter") == 0) { + if (!SSL_is_init_finished(self->ssl)) { + Py_RETURN_NONE; + } + + // RFC 9266 requires extended master secret for tls-exporter + // channel binding. EMS is always present with TLS 1.3 and an + // optional extension with TLS 1.2. + if (SSL_version(self->ssl) != TLS1_3_VERSION) { + long res = SSL_ctrl(self->ssl, SSL_CTRL_GET_EXTMS_SUPPORT, 0, NULL); + if (res == -1) { + return _setSSLError(get_state_sock(self), NULL, 0, __FILE__, __LINE__); + } + if (res == 0) { + PyErr_SetString(PyExc_ValueError, "connect has no extended master secret"); + return NULL; + } + } + + const char tls_exporter[] = "EXPORTER-Channel-Binding"; + PyObject *km = PyBytes_FromStringAndSize(NULL, 32); + if (km == NULL) { + return NULL; + } + if (SSL_export_keying_material( + self->ssl, + (unsigned char*)PyBytes_AS_STRING(km), PyBytes_GET_SIZE(km), + tls_exporter, 24, + (unsigned char*)"", 0, 1) != 1) { + Py_DECREF(km); + return _setSSLError(get_state_sock(self), NULL, 0, __FILE__, __LINE__); + } + return km; + } else { PyErr_Format( PyExc_ValueError, "'%s' channel binding type not implemented", @@ -2720,12 +2756,54 @@ _ssl__SSLSocket_get_channel_binding_impl(PySSLSocket *self, ); return NULL; } +} - /* It cannot be negative in current OpenSSL version as of July 2011 */ - if (len == 0) - Py_RETURN_NONE; +/*[clinic input] +_ssl._SSLSocket.export_keying_material + length: Py_ssize_t + label: str(accept={str, robuffer}, zeroes=True) + context: str(accept={str, robuffer, NoneType}, zeroes=True) = None + / - return PyBytes_FromStringAndSize(buf, len); +Get keying material for current connection. + +See RFC 5705 (for TLS 1.2) and RFC 8446 (for TLS 1.3) +[clinic start generated code]*/ + +static PyObject * +_ssl__SSLSocket_export_keying_material_impl(PySSLSocket *self, + Py_ssize_t length, + const char *label, + Py_ssize_t label_length, + const char *context, + Py_ssize_t context_length) +/*[clinic end generated code: output=be8a7315a471b85f input=f671482d6a429306]*/ +{ + PyObject *km; + + if (!SSL_is_init_finished(self->ssl)) { + PyErr_SetString(PyExc_ValueError, + "handshake not done yet"); + return NULL; + } + if (length < 1) { + PyErr_SetString(PyExc_ValueError, "invalid export length"); + return NULL; + } + km = PyBytes_FromStringAndSize(NULL, length); + if (km == NULL) { + return NULL; + } + if (SSL_export_keying_material( + self->ssl, + (unsigned char*)PyBytes_AS_STRING(km), (size_t)length, + label, (size_t)label_length, + (const unsigned char*)context, (size_t)context_length, + (context != NULL)) != 1) { + Py_DECREF(km); + return _setSSLError(get_state_sock(self), NULL, 0, __FILE__, __LINE__); + } + return km; } /*[clinic input] @@ -2918,6 +2996,7 @@ static PyMethodDef PySSLMethods[] = { _SSL__SSLSOCKET_VERIFY_CLIENT_POST_HANDSHAKE_METHODDEF _SSL__SSLSOCKET_GET_UNVERIFIED_CHAIN_METHODDEF _SSL__SSLSOCKET_GET_VERIFIED_CHAIN_METHODDEF + _SSL__SSLSOCKET_EXPORT_KEYING_MATERIAL_METHODDEF {NULL, NULL} }; diff --git a/Modules/clinic/_ssl.c.h b/Modules/clinic/_ssl.c.h index 24604dd43687c5..3f7f8a02706088 100644 --- a/Modules/clinic/_ssl.c.h +++ b/Modules/clinic/_ssl.c.h @@ -381,6 +381,45 @@ _ssl__SSLSocket_get_channel_binding(PySSLSocket *self, PyObject *const *args, Py return return_value; } +PyDoc_STRVAR(_ssl__SSLSocket_export_keying_material__doc__, +"export_keying_material($self, length, label, context=None, /)\n" +"--\n" +"\n" +"Get keying material for current connection.\n" +"\n" +"See RFC 5705 (for TLS 1.2) and RFC 8446 (for TLS 1.3)"); + +#define _SSL__SSLSOCKET_EXPORT_KEYING_MATERIAL_METHODDEF \ + {"export_keying_material", _PyCFunction_CAST(_ssl__SSLSocket_export_keying_material), METH_FASTCALL, _ssl__SSLSocket_export_keying_material__doc__}, + +static PyObject * +_ssl__SSLSocket_export_keying_material_impl(PySSLSocket *self, + Py_ssize_t length, + const char *label, + Py_ssize_t label_length, + const char *context, + Py_ssize_t context_length); + +static PyObject * +_ssl__SSLSocket_export_keying_material(PySSLSocket *self, PyObject *const *args, Py_ssize_t nargs) +{ + PyObject *return_value = NULL; + Py_ssize_t length; + const char *label; + Py_ssize_t label_length; + const char *context = NULL; + Py_ssize_t context_length; + + if (!_PyArg_ParseStack(args, nargs, "ns#|z#:export_keying_material", + &length, &label, &label_length, &context, &context_length)) { + goto exit; + } + return_value = _ssl__SSLSocket_export_keying_material_impl(self, length, label, label_length, context, context_length); + +exit: + return return_value; +} + PyDoc_STRVAR(_ssl__SSLSocket_verify_client_post_handshake__doc__, "verify_client_post_handshake($self, /)\n" "--\n" @@ -1330,4 +1369,4 @@ _ssl_enum_crls(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObje #ifndef _SSL_ENUM_CRLS_METHODDEF #define _SSL_ENUM_CRLS_METHODDEF #endif /* !defined(_SSL_ENUM_CRLS_METHODDEF) */ -/*[clinic end generated code: output=9d806f8ff4a06ed3 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=d6dab5ba1c9e85cc input=a9049054013a1b77]*/ From 4fb4a4f7a158583507c28519de0f60db22095c7b Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Thu, 28 Jul 2022 13:19:37 +0200 Subject: [PATCH 2/6] Implement on top of export_keying_material --- Lib/ssl.py | 31 +++++++++++++++++-- Lib/test/test_ssl.py | 6 ++-- Modules/_ssl.c | 60 +++++++++++++----------------------- Modules/_ssl/cert.c | 59 +++++++++++++++++++++++++++++++++++ Modules/_ssl/clinic/cert.c.h | 27 +++++++++++++++- Modules/clinic/_ssl.c.h | 21 ++++++++----- 6 files changed, 151 insertions(+), 53 deletions(-) diff --git a/Lib/ssl.py b/Lib/ssl.py index 57e19a5de13924..c8a17759623528 100644 --- a/Lib/ssl.py +++ b/Lib/ssl.py @@ -254,6 +254,31 @@ class _TLSMessageType: CHANGE_CIPHER_SPEC = 0x0101 +@_simple_enum(_Enum) +class ChannelBindings: + TLS_UNIQUE = "tls-unique" + TLS_EXPORTER = "tls-exporter" + # TLS_SERVER_END_POINT = "tls-server-end-point" + + def _get_channel_bindings(self, sslobj): + cls = type(self) + match self: + case cls.TLS_UNIQUE: + return sslobj.get_channel_bindings(self.value) + case cls.TLS_EXPORTER: + return sslobj.export_keying_material( + 32, + "EXPORTER-Channel-Binding", + context="", + require_extms=True + ) + # case cls.TLS_SERVER_END_POINT: + # chain = sslobj.get_verified_chain() + # return chain[0].get_rfc5929_endpoint_hash() + case _: + raise ValueError(f"{self!r} channel binding type not implemented") + + if sys.platform == "win32": from _ssl import enum_certificates, enum_crls @@ -267,7 +292,7 @@ class _TLSMessageType: socket_error = OSError # keep that public name in module namespace -CHANNEL_BINDING_TYPES = ['tls-unique', 'tls-exporter'] +CHANNEL_BINDING_TYPES = list(cb.value for cb in ChannelBindings) HAS_NEVER_CHECK_COMMON_NAME = hasattr(_ssl, 'HOSTFLAG_NEVER_CHECK_SUBJECT') @@ -924,7 +949,7 @@ def get_channel_binding(self, cb_type="tls-unique"): """Get channel binding data for current connection. Raise ValueError if the requested `cb_type` is not supported. Return bytes of the data or None if the data is not available (e.g. before the handshake).""" - return self._sslobj.get_channel_binding(cb_type) + return ChannelBindings(cb_type)._get_channel_bindings(self._sslobj) def export_keying_material(self, length, label, context=None): """Export keying material for current connection @@ -1343,7 +1368,7 @@ def accept(self): @_sslcopydoc def get_channel_binding(self, cb_type="tls-unique"): if self._sslobj is not None: - return self._sslobj.get_channel_binding(cb_type) + return ChannelBindings(cb_type)._get_channel_bindings(self._sslobj) else: if cb_type not in CHANNEL_BINDING_TYPES: raise ValueError( diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 4446ce76db3a29..f62dcdc6471246 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -3785,11 +3785,13 @@ def _test_tls_channel_binding(self, version, cb, cb_size, ekm=None): ) if ekm: - cb_ekm = s.export_keying_material(cb_size, ekm, "") + cb_ekm = s.export_keying_material( + cb_size, ekm, context="", require_extms=True + ) self.assertEqual(cb_ekm, cb_data) # TLS 1.3: empty and no context result in equal values # other: empty context and no context result in different values - cb_ekm_no_context = s.export_keying_material(cb_size, ekm, None) + cb_ekm_no_context = s.export_keying_material(cb_size, ekm) if version == ssl.TLSVersion.TLSv1_3: self.assertEqual(cb_data, cb_ekm_no_context) else: diff --git a/Modules/_ssl.c b/Modules/_ssl.c index a93fd302bdbbe4..2fe1b249e588f6 100644 --- a/Modules/_ssl.c +++ b/Modules/_ssl.c @@ -2715,39 +2715,6 @@ _ssl__SSLSocket_get_channel_binding_impl(PySSLSocket *self, Py_RETURN_NONE; return PyBytes_FromStringAndSize(buf, len); - } else if (strcmp(cb_type, "tls-exporter") == 0) { - if (!SSL_is_init_finished(self->ssl)) { - Py_RETURN_NONE; - } - - // RFC 9266 requires extended master secret for tls-exporter - // channel binding. EMS is always present with TLS 1.3 and an - // optional extension with TLS 1.2. - if (SSL_version(self->ssl) != TLS1_3_VERSION) { - long res = SSL_ctrl(self->ssl, SSL_CTRL_GET_EXTMS_SUPPORT, 0, NULL); - if (res == -1) { - return _setSSLError(get_state_sock(self), NULL, 0, __FILE__, __LINE__); - } - if (res == 0) { - PyErr_SetString(PyExc_ValueError, "connect has no extended master secret"); - return NULL; - } - } - - const char tls_exporter[] = "EXPORTER-Channel-Binding"; - PyObject *km = PyBytes_FromStringAndSize(NULL, 32); - if (km == NULL) { - return NULL; - } - if (SSL_export_keying_material( - self->ssl, - (unsigned char*)PyBytes_AS_STRING(km), PyBytes_GET_SIZE(km), - tls_exporter, 24, - (unsigned char*)"", 0, 1) != 1) { - Py_DECREF(km); - return _setSSLError(get_state_sock(self), NULL, 0, __FILE__, __LINE__); - } - return km; } else { PyErr_Format( PyExc_ValueError, @@ -2762,8 +2729,9 @@ _ssl__SSLSocket_get_channel_binding_impl(PySSLSocket *self, _ssl._SSLSocket.export_keying_material length: Py_ssize_t label: str(accept={str, robuffer}, zeroes=True) + * context: str(accept={str, robuffer, NoneType}, zeroes=True) = None - / + require_extms: bool = True Get keying material for current connection. @@ -2776,20 +2744,34 @@ _ssl__SSLSocket_export_keying_material_impl(PySSLSocket *self, const char *label, Py_ssize_t label_length, const char *context, - Py_ssize_t context_length) -/*[clinic end generated code: output=be8a7315a471b85f input=f671482d6a429306]*/ + Py_ssize_t context_length, + int require_extms) +/*[clinic end generated code: output=7ff9f8a13d326773 input=1ed86925a1a8c634]*/ { PyObject *km; if (!SSL_is_init_finished(self->ssl)) { - PyErr_SetString(PyExc_ValueError, - "handshake not done yet"); - return NULL; + Py_RETURN_NONE; } if (length < 1) { PyErr_SetString(PyExc_ValueError, "invalid export length"); return NULL; } + if (require_extms) { + // RFC 9266 requires extended master secret for tls-exporter + // channel binding. EMS is always present with TLS 1.3 and an + // optional extension with TLS 1.2. + if (SSL_version(self->ssl) != TLS1_3_VERSION) { + long res = SSL_ctrl(self->ssl, SSL_CTRL_GET_EXTMS_SUPPORT, 0, NULL); + if (res == -1) { + return _setSSLError(get_state_sock(self), NULL, 0, __FILE__, __LINE__); + } + if (res == 0) { + PyErr_SetString(PyExc_ValueError, "connect has no extended master secret"); + return NULL; + } + } + } km = PyBytes_FromStringAndSize(NULL, length); if (km == NULL) { return NULL; diff --git a/Modules/_ssl/cert.c b/Modules/_ssl/cert.c index bda66dc4d94ae6..71d490121f7a61 100644 --- a/Modules/_ssl/cert.c +++ b/Modules/_ssl/cert.c @@ -149,6 +149,64 @@ _x509name_print(_sslmodulestate *state, X509_NAME *name, int indent, unsigned lo return res; } +#if 0 +/*[clinic input] +_ssl.Certificate.get_rfc5929_endpoint_hash + +[clinic start generated code]*/ + +static PyObject * +_ssl_Certificate_get_rfc5929_endpoint_hash_impl(PySSLCertificate *self) +/*[clinic end generated code: output=cbbf76629040b21a input=1b2f00e11718c10b]*/ +{ + int digest_nid, key_nid; + int sig_nid = X509_get_signature_nid(self->cert); + if (!OBJ_find_sigid_algs(sig_nid, &digest_nid, &key_nid)) { + _setSSLError(get_state_cert(self), NULL, 0, __FILE__, __LINE__); + return NULL; + } + + switch (digest_nid) { + case NID_md5: + case NID_md5_sha1: + case NID_sha1: + digest_nid = NID_sha256; + break; + } + + const EVP_MD *md = EVP_get_digestbynid(digest_nid); + if (md == NULL) { + _setSSLError(get_state_cert(self), NULL, 0, __FILE__, __LINE__); + return NULL; + } + + BIO *bio = BIO_new(BIO_s_mem()); + if (bio == NULL) { + PyErr_SetString(get_state_cert(self)->PySSLErrorObject, + "failed to allocate BIO"); + return NULL; + } + if (i2d_X509_bio(bio, self->cert) != 1) { + BIO_free(bio); + _setSSLError(get_state_cert(self), NULL, 0, __FILE__, __LINE__); + return NULL; + } + + char *data; + long data_size = BIO_get_mem_data(bio, &data); + + unsigned char digest[EVP_MAX_MD_SIZE]; + unsigned int digest_size; + int result = EVP_Digest(data, data_size, digest, &digest_size, md, NULL); + BIO_free(bio); + if (result != 1) { + _setSSLError(get_state_cert(self), NULL, 0, __FILE__, __LINE__); + return NULL; + } + return PyBytes_FromStringAndSize((const char*)digest, digest_size); +} +#endif + /* ************************************************************************ * PySSLCertificate_Type */ @@ -224,6 +282,7 @@ static PyMethodDef certificate_methods[] = { /* methods */ _SSL_CERTIFICATE_PUBLIC_BYTES_METHODDEF _SSL_CERTIFICATE_GET_INFO_METHODDEF + _SSL_CERTIFICATE_GET_RFC5929_ENDPOINT_HASH_METHODDEF {NULL, NULL} }; diff --git a/Modules/_ssl/clinic/cert.c.h b/Modules/_ssl/clinic/cert.c.h index 53cedabc3f7b47..3ddcaa48274684 100644 --- a/Modules/_ssl/clinic/cert.c.h +++ b/Modules/_ssl/clinic/cert.c.h @@ -57,4 +57,29 @@ _ssl_Certificate_get_info(PySSLCertificate *self, PyObject *Py_UNUSED(ignored)) { return _ssl_Certificate_get_info_impl(self); } -/*[clinic end generated code: output=18885c4d167d5244 input=a9049054013a1b77]*/ + +#if (0) + +PyDoc_STRVAR(_ssl_Certificate_get_rfc5929_endpoint_hash__doc__, +"get_rfc5929_endpoint_hash($self, /)\n" +"--\n" +"\n"); + +#define _SSL_CERTIFICATE_GET_RFC5929_ENDPOINT_HASH_METHODDEF \ + {"get_rfc5929_endpoint_hash", (PyCFunction)_ssl_Certificate_get_rfc5929_endpoint_hash, METH_NOARGS, _ssl_Certificate_get_rfc5929_endpoint_hash__doc__}, + +static PyObject * +_ssl_Certificate_get_rfc5929_endpoint_hash_impl(PySSLCertificate *self); + +static PyObject * +_ssl_Certificate_get_rfc5929_endpoint_hash(PySSLCertificate *self, PyObject *Py_UNUSED(ignored)) +{ + return _ssl_Certificate_get_rfc5929_endpoint_hash_impl(self); +} + +#endif /* (0) */ + +#ifndef _SSL_CERTIFICATE_GET_RFC5929_ENDPOINT_HASH_METHODDEF + #define _SSL_CERTIFICATE_GET_RFC5929_ENDPOINT_HASH_METHODDEF +#endif /* !defined(_SSL_CERTIFICATE_GET_RFC5929_ENDPOINT_HASH_METHODDEF) */ +/*[clinic end generated code: output=904881790ad59cbd input=a9049054013a1b77]*/ diff --git a/Modules/clinic/_ssl.c.h b/Modules/clinic/_ssl.c.h index 3f7f8a02706088..2a346a9fcf81f7 100644 --- a/Modules/clinic/_ssl.c.h +++ b/Modules/clinic/_ssl.c.h @@ -382,7 +382,8 @@ _ssl__SSLSocket_get_channel_binding(PySSLSocket *self, PyObject *const *args, Py } PyDoc_STRVAR(_ssl__SSLSocket_export_keying_material__doc__, -"export_keying_material($self, length, label, context=None, /)\n" +"export_keying_material($self, /, length, label, *, context=None,\n" +" require_extms=True)\n" "--\n" "\n" "Get keying material for current connection.\n" @@ -390,7 +391,7 @@ PyDoc_STRVAR(_ssl__SSLSocket_export_keying_material__doc__, "See RFC 5705 (for TLS 1.2) and RFC 8446 (for TLS 1.3)"); #define _SSL__SSLSOCKET_EXPORT_KEYING_MATERIAL_METHODDEF \ - {"export_keying_material", _PyCFunction_CAST(_ssl__SSLSocket_export_keying_material), METH_FASTCALL, _ssl__SSLSocket_export_keying_material__doc__}, + {"export_keying_material", _PyCFunction_CAST(_ssl__SSLSocket_export_keying_material), METH_FASTCALL|METH_KEYWORDS, _ssl__SSLSocket_export_keying_material__doc__}, static PyObject * _ssl__SSLSocket_export_keying_material_impl(PySSLSocket *self, @@ -398,23 +399,27 @@ _ssl__SSLSocket_export_keying_material_impl(PySSLSocket *self, const char *label, Py_ssize_t label_length, const char *context, - Py_ssize_t context_length); + Py_ssize_t context_length, + int require_extms); static PyObject * -_ssl__SSLSocket_export_keying_material(PySSLSocket *self, PyObject *const *args, Py_ssize_t nargs) +_ssl__SSLSocket_export_keying_material(PySSLSocket *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) { PyObject *return_value = NULL; + static const char * const _keywords[] = {"length", "label", "context", "require_extms", NULL}; + static _PyArg_Parser _parser = {"ns#|$z#p:export_keying_material", _keywords, 0}; Py_ssize_t length; const char *label; Py_ssize_t label_length; const char *context = NULL; Py_ssize_t context_length; + int require_extms = 1; - if (!_PyArg_ParseStack(args, nargs, "ns#|z#:export_keying_material", - &length, &label, &label_length, &context, &context_length)) { + if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser, + &length, &label, &label_length, &context, &context_length, &require_extms)) { goto exit; } - return_value = _ssl__SSLSocket_export_keying_material_impl(self, length, label, label_length, context, context_length); + return_value = _ssl__SSLSocket_export_keying_material_impl(self, length, label, label_length, context, context_length, require_extms); exit: return return_value; @@ -1369,4 +1374,4 @@ _ssl_enum_crls(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObje #ifndef _SSL_ENUM_CRLS_METHODDEF #define _SSL_ENUM_CRLS_METHODDEF #endif /* !defined(_SSL_ENUM_CRLS_METHODDEF) */ -/*[clinic end generated code: output=d6dab5ba1c9e85cc input=a9049054013a1b77]*/ +/*[clinic end generated code: output=d6d65f40541312d3 input=a9049054013a1b77]*/ From 6d5b5df9e37765e073eb47c1cc21d694376e0f4c Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Thu, 28 Jul 2022 13:58:51 +0200 Subject: [PATCH 3/6] Fix bindings --- Lib/ssl.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/Lib/ssl.py b/Lib/ssl.py index c8a17759623528..f74392c59fdac0 100644 --- a/Lib/ssl.py +++ b/Lib/ssl.py @@ -260,11 +260,11 @@ class ChannelBindings: TLS_EXPORTER = "tls-exporter" # TLS_SERVER_END_POINT = "tls-server-end-point" - def _get_channel_bindings(self, sslobj): + def _get_channel_binding(self, sslobj): cls = type(self) match self: case cls.TLS_UNIQUE: - return sslobj.get_channel_bindings(self.value) + return sslobj.get_channel_binding(self.value) case cls.TLS_EXPORTER: return sslobj.export_keying_material( 32, @@ -949,14 +949,16 @@ def get_channel_binding(self, cb_type="tls-unique"): """Get channel binding data for current connection. Raise ValueError if the requested `cb_type` is not supported. Return bytes of the data or None if the data is not available (e.g. before the handshake).""" - return ChannelBindings(cb_type)._get_channel_bindings(self._sslobj) + return ChannelBindings(cb_type)._get_channel_binding(self._sslobj) - def export_keying_material(self, length, label, context=None): + def export_keying_material(self, length, label, context=None, require_extms=True): """Export keying material for current connection See RFC 5705 (for TLS 1.2) and RFC 8446 (for TLS 1.3) """ - return self._sslobj.export_keying_material(length, label, context) + return self._sslobj.export_keying_material( + length, label, context=context, require_extms=require_extms + ) def version(self): """Return a string identifying the protocol version used by the @@ -1368,7 +1370,7 @@ def accept(self): @_sslcopydoc def get_channel_binding(self, cb_type="tls-unique"): if self._sslobj is not None: - return ChannelBindings(cb_type)._get_channel_bindings(self._sslobj) + return ChannelBindings(cb_type)._get_channel_binding(self._sslobj) else: if cb_type not in CHANNEL_BINDING_TYPES: raise ValueError( @@ -1377,9 +1379,11 @@ def get_channel_binding(self, cb_type="tls-unique"): return None @_sslcopydoc - def export_keying_material(self, length, label, context=None): + def export_keying_material(self, length, label, context=None, require_extms=True): if self._sslobj is not None: - return self._sslobj.export_keying_material(length, label, context) + return self._sslobj.export_keying_material( + length, label, context=context, require_extms=require_extms + ) else: return None From cf3ee4f67bf473518ebf4fd70e5624116ef08820 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Thu, 28 Jul 2022 14:01:26 +0200 Subject: [PATCH 4/6] drop code for tls-server-end-point experiment --- Lib/ssl.py | 4 --- Modules/_ssl/cert.c | 59 ------------------------------------ Modules/_ssl/clinic/cert.c.h | 27 +---------------- 3 files changed, 1 insertion(+), 89 deletions(-) diff --git a/Lib/ssl.py b/Lib/ssl.py index f74392c59fdac0..07ce1772969050 100644 --- a/Lib/ssl.py +++ b/Lib/ssl.py @@ -258,7 +258,6 @@ class _TLSMessageType: class ChannelBindings: TLS_UNIQUE = "tls-unique" TLS_EXPORTER = "tls-exporter" - # TLS_SERVER_END_POINT = "tls-server-end-point" def _get_channel_binding(self, sslobj): cls = type(self) @@ -272,9 +271,6 @@ def _get_channel_binding(self, sslobj): context="", require_extms=True ) - # case cls.TLS_SERVER_END_POINT: - # chain = sslobj.get_verified_chain() - # return chain[0].get_rfc5929_endpoint_hash() case _: raise ValueError(f"{self!r} channel binding type not implemented") diff --git a/Modules/_ssl/cert.c b/Modules/_ssl/cert.c index 71d490121f7a61..bda66dc4d94ae6 100644 --- a/Modules/_ssl/cert.c +++ b/Modules/_ssl/cert.c @@ -149,64 +149,6 @@ _x509name_print(_sslmodulestate *state, X509_NAME *name, int indent, unsigned lo return res; } -#if 0 -/*[clinic input] -_ssl.Certificate.get_rfc5929_endpoint_hash - -[clinic start generated code]*/ - -static PyObject * -_ssl_Certificate_get_rfc5929_endpoint_hash_impl(PySSLCertificate *self) -/*[clinic end generated code: output=cbbf76629040b21a input=1b2f00e11718c10b]*/ -{ - int digest_nid, key_nid; - int sig_nid = X509_get_signature_nid(self->cert); - if (!OBJ_find_sigid_algs(sig_nid, &digest_nid, &key_nid)) { - _setSSLError(get_state_cert(self), NULL, 0, __FILE__, __LINE__); - return NULL; - } - - switch (digest_nid) { - case NID_md5: - case NID_md5_sha1: - case NID_sha1: - digest_nid = NID_sha256; - break; - } - - const EVP_MD *md = EVP_get_digestbynid(digest_nid); - if (md == NULL) { - _setSSLError(get_state_cert(self), NULL, 0, __FILE__, __LINE__); - return NULL; - } - - BIO *bio = BIO_new(BIO_s_mem()); - if (bio == NULL) { - PyErr_SetString(get_state_cert(self)->PySSLErrorObject, - "failed to allocate BIO"); - return NULL; - } - if (i2d_X509_bio(bio, self->cert) != 1) { - BIO_free(bio); - _setSSLError(get_state_cert(self), NULL, 0, __FILE__, __LINE__); - return NULL; - } - - char *data; - long data_size = BIO_get_mem_data(bio, &data); - - unsigned char digest[EVP_MAX_MD_SIZE]; - unsigned int digest_size; - int result = EVP_Digest(data, data_size, digest, &digest_size, md, NULL); - BIO_free(bio); - if (result != 1) { - _setSSLError(get_state_cert(self), NULL, 0, __FILE__, __LINE__); - return NULL; - } - return PyBytes_FromStringAndSize((const char*)digest, digest_size); -} -#endif - /* ************************************************************************ * PySSLCertificate_Type */ @@ -282,7 +224,6 @@ static PyMethodDef certificate_methods[] = { /* methods */ _SSL_CERTIFICATE_PUBLIC_BYTES_METHODDEF _SSL_CERTIFICATE_GET_INFO_METHODDEF - _SSL_CERTIFICATE_GET_RFC5929_ENDPOINT_HASH_METHODDEF {NULL, NULL} }; diff --git a/Modules/_ssl/clinic/cert.c.h b/Modules/_ssl/clinic/cert.c.h index 3ddcaa48274684..53cedabc3f7b47 100644 --- a/Modules/_ssl/clinic/cert.c.h +++ b/Modules/_ssl/clinic/cert.c.h @@ -57,29 +57,4 @@ _ssl_Certificate_get_info(PySSLCertificate *self, PyObject *Py_UNUSED(ignored)) { return _ssl_Certificate_get_info_impl(self); } - -#if (0) - -PyDoc_STRVAR(_ssl_Certificate_get_rfc5929_endpoint_hash__doc__, -"get_rfc5929_endpoint_hash($self, /)\n" -"--\n" -"\n"); - -#define _SSL_CERTIFICATE_GET_RFC5929_ENDPOINT_HASH_METHODDEF \ - {"get_rfc5929_endpoint_hash", (PyCFunction)_ssl_Certificate_get_rfc5929_endpoint_hash, METH_NOARGS, _ssl_Certificate_get_rfc5929_endpoint_hash__doc__}, - -static PyObject * -_ssl_Certificate_get_rfc5929_endpoint_hash_impl(PySSLCertificate *self); - -static PyObject * -_ssl_Certificate_get_rfc5929_endpoint_hash(PySSLCertificate *self, PyObject *Py_UNUSED(ignored)) -{ - return _ssl_Certificate_get_rfc5929_endpoint_hash_impl(self); -} - -#endif /* (0) */ - -#ifndef _SSL_CERTIFICATE_GET_RFC5929_ENDPOINT_HASH_METHODDEF - #define _SSL_CERTIFICATE_GET_RFC5929_ENDPOINT_HASH_METHODDEF -#endif /* !defined(_SSL_CERTIFICATE_GET_RFC5929_ENDPOINT_HASH_METHODDEF) */ -/*[clinic end generated code: output=904881790ad59cbd input=a9049054013a1b77]*/ +/*[clinic end generated code: output=18885c4d167d5244 input=a9049054013a1b77]*/ From d328de11b053906fe5e5743a47513f1198c89554 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Thu, 28 Jul 2022 14:11:56 +0200 Subject: [PATCH 5/6] Warn about tls-unique with TLS 1.3 --- Lib/ssl.py | 6 ++++++ Lib/test/test_ssl.py | 5 +++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/Lib/ssl.py b/Lib/ssl.py index 07ce1772969050..0c8fc9708a1c10 100644 --- a/Lib/ssl.py +++ b/Lib/ssl.py @@ -263,6 +263,12 @@ def _get_channel_binding(self, sslobj): cls = type(self) match self: case cls.TLS_UNIQUE: + if sslobj.version() == "TLSv1.3": + warnings.warn( + "tls-unique channel binding is not specified for TLS 1.3", + DeprecationWarning, + stacklevel=3 + ) return sslobj.get_channel_binding(self.value) case cls.TLS_EXPORTER: return sslobj.export_keying_material( diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index f62dcdc6471246..ded0b6b01a96b2 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -3746,8 +3746,9 @@ def test_default_ecdh_curve(self): def test_tls_channel_binding_unique_tlsv1_2(self): self._test_tls_channel_binding(ssl.TLSVersion.TLSv1_2, "tls-unique", 12) - def test_tls_channel_binding_unique_tlsv1_2(self): - self._test_tls_channel_binding(ssl.TLSVersion.TLSv1_3, "tls-unique", 48) + def test_tls_channel_binding_unique_tlsv1_3(self): + with warnings_helper.check_warnings(("tls-unique", DeprecationWarning)): + self._test_tls_channel_binding(ssl.TLSVersion.TLSv1_3, "tls-unique", 48) def test_tls_channel_binding_exporter_tlsv1_2(self): self._test_tls_channel_binding( From 678556e315b49245337f883684b27263851f98d0 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Thu, 28 Jul 2022 18:07:06 +0200 Subject: [PATCH 6/6] Use SSL_get_extms_support() --- Modules/_ssl.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Modules/_ssl.c b/Modules/_ssl.c index 2fe1b249e588f6..30803d30007719 100644 --- a/Modules/_ssl.c +++ b/Modules/_ssl.c @@ -2762,7 +2762,7 @@ _ssl__SSLSocket_export_keying_material_impl(PySSLSocket *self, // channel binding. EMS is always present with TLS 1.3 and an // optional extension with TLS 1.2. if (SSL_version(self->ssl) != TLS1_3_VERSION) { - long res = SSL_ctrl(self->ssl, SSL_CTRL_GET_EXTMS_SUPPORT, 0, NULL); + int res = SSL_get_extms_support(self->ssl); if (res == -1) { return _setSSLError(get_state_sock(self), NULL, 0, __FILE__, __LINE__); }