second commit
This commit is contained in:
@@ -0,0 +1,10 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
from typing import Any
|
||||
|
||||
|
||||
def default_backend() -> Any:
|
||||
from cryptography.hazmat.backends.openssl.backend import backend
|
||||
|
||||
return backend
|
||||
@@ -0,0 +1,9 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
|
||||
from cryptography.hazmat.backends.openssl.backend import backend
|
||||
|
||||
|
||||
__all__ = ["backend"]
|
||||
@@ -0,0 +1,251 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
import typing
|
||||
|
||||
from cryptography.exceptions import InvalidTag
|
||||
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from cryptography.hazmat.backends.openssl.backend import Backend
|
||||
from cryptography.hazmat.primitives.ciphers.aead import (
|
||||
AESCCM,
|
||||
AESGCM,
|
||||
AESOCB3,
|
||||
AESSIV,
|
||||
ChaCha20Poly1305,
|
||||
)
|
||||
|
||||
_AEAD_TYPES = typing.Union[
|
||||
AESCCM, AESGCM, AESOCB3, AESSIV, ChaCha20Poly1305
|
||||
]
|
||||
|
||||
_ENCRYPT = 1
|
||||
_DECRYPT = 0
|
||||
|
||||
|
||||
def _aead_cipher_name(cipher: "_AEAD_TYPES") -> bytes:
|
||||
from cryptography.hazmat.primitives.ciphers.aead import (
|
||||
AESCCM,
|
||||
AESGCM,
|
||||
AESOCB3,
|
||||
AESSIV,
|
||||
ChaCha20Poly1305,
|
||||
)
|
||||
|
||||
if isinstance(cipher, ChaCha20Poly1305):
|
||||
return b"chacha20-poly1305"
|
||||
elif isinstance(cipher, AESCCM):
|
||||
return f"aes-{len(cipher._key) * 8}-ccm".encode("ascii")
|
||||
elif isinstance(cipher, AESOCB3):
|
||||
return f"aes-{len(cipher._key) * 8}-ocb".encode("ascii")
|
||||
elif isinstance(cipher, AESSIV):
|
||||
return f"aes-{len(cipher._key) * 8 // 2}-siv".encode("ascii")
|
||||
else:
|
||||
assert isinstance(cipher, AESGCM)
|
||||
return f"aes-{len(cipher._key) * 8}-gcm".encode("ascii")
|
||||
|
||||
|
||||
def _evp_cipher(cipher_name: bytes, backend: "Backend"):
|
||||
if cipher_name.endswith(b"-siv"):
|
||||
evp_cipher = backend._lib.EVP_CIPHER_fetch(
|
||||
backend._ffi.NULL,
|
||||
cipher_name,
|
||||
backend._ffi.NULL,
|
||||
)
|
||||
backend.openssl_assert(evp_cipher != backend._ffi.NULL)
|
||||
evp_cipher = backend._ffi.gc(evp_cipher, backend._lib.EVP_CIPHER_free)
|
||||
else:
|
||||
evp_cipher = backend._lib.EVP_get_cipherbyname(cipher_name)
|
||||
backend.openssl_assert(evp_cipher != backend._ffi.NULL)
|
||||
|
||||
return evp_cipher
|
||||
|
||||
|
||||
def _aead_setup(
|
||||
backend: "Backend",
|
||||
cipher_name: bytes,
|
||||
key: bytes,
|
||||
nonce: bytes,
|
||||
tag: typing.Optional[bytes],
|
||||
tag_len: int,
|
||||
operation: int,
|
||||
):
|
||||
evp_cipher = _evp_cipher(cipher_name, backend)
|
||||
ctx = backend._lib.EVP_CIPHER_CTX_new()
|
||||
ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free)
|
||||
res = backend._lib.EVP_CipherInit_ex(
|
||||
ctx,
|
||||
evp_cipher,
|
||||
backend._ffi.NULL,
|
||||
backend._ffi.NULL,
|
||||
backend._ffi.NULL,
|
||||
int(operation == _ENCRYPT),
|
||||
)
|
||||
backend.openssl_assert(res != 0)
|
||||
res = backend._lib.EVP_CIPHER_CTX_set_key_length(ctx, len(key))
|
||||
backend.openssl_assert(res != 0)
|
||||
res = backend._lib.EVP_CIPHER_CTX_ctrl(
|
||||
ctx,
|
||||
backend._lib.EVP_CTRL_AEAD_SET_IVLEN,
|
||||
len(nonce),
|
||||
backend._ffi.NULL,
|
||||
)
|
||||
backend.openssl_assert(res != 0)
|
||||
if operation == _DECRYPT:
|
||||
assert tag is not None
|
||||
res = backend._lib.EVP_CIPHER_CTX_ctrl(
|
||||
ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag
|
||||
)
|
||||
backend.openssl_assert(res != 0)
|
||||
elif cipher_name.endswith(b"-ccm"):
|
||||
res = backend._lib.EVP_CIPHER_CTX_ctrl(
|
||||
ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, tag_len, backend._ffi.NULL
|
||||
)
|
||||
backend.openssl_assert(res != 0)
|
||||
|
||||
nonce_ptr = backend._ffi.from_buffer(nonce)
|
||||
key_ptr = backend._ffi.from_buffer(key)
|
||||
res = backend._lib.EVP_CipherInit_ex(
|
||||
ctx,
|
||||
backend._ffi.NULL,
|
||||
backend._ffi.NULL,
|
||||
key_ptr,
|
||||
nonce_ptr,
|
||||
int(operation == _ENCRYPT),
|
||||
)
|
||||
backend.openssl_assert(res != 0)
|
||||
return ctx
|
||||
|
||||
|
||||
def _set_length(backend: "Backend", ctx, data_len: int) -> None:
|
||||
intptr = backend._ffi.new("int *")
|
||||
res = backend._lib.EVP_CipherUpdate(
|
||||
ctx, backend._ffi.NULL, intptr, backend._ffi.NULL, data_len
|
||||
)
|
||||
backend.openssl_assert(res != 0)
|
||||
|
||||
|
||||
def _process_aad(backend: "Backend", ctx, associated_data: bytes) -> None:
|
||||
outlen = backend._ffi.new("int *")
|
||||
res = backend._lib.EVP_CipherUpdate(
|
||||
ctx, backend._ffi.NULL, outlen, associated_data, len(associated_data)
|
||||
)
|
||||
backend.openssl_assert(res != 0)
|
||||
|
||||
|
||||
def _process_data(backend: "Backend", ctx, data: bytes) -> bytes:
|
||||
outlen = backend._ffi.new("int *")
|
||||
buf = backend._ffi.new("unsigned char[]", len(data))
|
||||
res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data, len(data))
|
||||
if res == 0:
|
||||
# AES SIV can error here if the data is invalid on decrypt
|
||||
backend._consume_errors()
|
||||
raise InvalidTag
|
||||
return backend._ffi.buffer(buf, outlen[0])[:]
|
||||
|
||||
|
||||
def _encrypt(
|
||||
backend: "Backend",
|
||||
cipher: "_AEAD_TYPES",
|
||||
nonce: bytes,
|
||||
data: bytes,
|
||||
associated_data: typing.List[bytes],
|
||||
tag_length: int,
|
||||
) -> bytes:
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESSIV
|
||||
|
||||
cipher_name = _aead_cipher_name(cipher)
|
||||
ctx = _aead_setup(
|
||||
backend, cipher_name, cipher._key, nonce, None, tag_length, _ENCRYPT
|
||||
)
|
||||
# CCM requires us to pass the length of the data before processing anything
|
||||
# However calling this with any other AEAD results in an error
|
||||
if isinstance(cipher, AESCCM):
|
||||
_set_length(backend, ctx, len(data))
|
||||
|
||||
for ad in associated_data:
|
||||
_process_aad(backend, ctx, ad)
|
||||
processed_data = _process_data(backend, ctx, data)
|
||||
outlen = backend._ffi.new("int *")
|
||||
# All AEADs we support besides OCB are streaming so they return nothing
|
||||
# in finalization. OCB can return up to (16 byte block - 1) bytes so
|
||||
# we need a buffer here too.
|
||||
buf = backend._ffi.new("unsigned char[]", 16)
|
||||
res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen)
|
||||
backend.openssl_assert(res != 0)
|
||||
processed_data += backend._ffi.buffer(buf, outlen[0])[:]
|
||||
tag_buf = backend._ffi.new("unsigned char[]", tag_length)
|
||||
res = backend._lib.EVP_CIPHER_CTX_ctrl(
|
||||
ctx, backend._lib.EVP_CTRL_AEAD_GET_TAG, tag_length, tag_buf
|
||||
)
|
||||
backend.openssl_assert(res != 0)
|
||||
tag = backend._ffi.buffer(tag_buf)[:]
|
||||
|
||||
if isinstance(cipher, AESSIV):
|
||||
# RFC 5297 defines the output as IV || C, where the tag we generate is
|
||||
# the "IV" and C is the ciphertext. This is the opposite of our
|
||||
# other AEADs, which are Ciphertext || Tag
|
||||
backend.openssl_assert(len(tag) == 16)
|
||||
return tag + processed_data
|
||||
else:
|
||||
return processed_data + tag
|
||||
|
||||
|
||||
def _decrypt(
|
||||
backend: "Backend",
|
||||
cipher: "_AEAD_TYPES",
|
||||
nonce: bytes,
|
||||
data: bytes,
|
||||
associated_data: typing.List[bytes],
|
||||
tag_length: int,
|
||||
) -> bytes:
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESSIV
|
||||
|
||||
if len(data) < tag_length:
|
||||
raise InvalidTag
|
||||
|
||||
if isinstance(cipher, AESSIV):
|
||||
# RFC 5297 defines the output as IV || C, where the tag we generate is
|
||||
# the "IV" and C is the ciphertext. This is the opposite of our
|
||||
# other AEADs, which are Ciphertext || Tag
|
||||
tag = data[:tag_length]
|
||||
data = data[tag_length:]
|
||||
else:
|
||||
tag = data[-tag_length:]
|
||||
data = data[:-tag_length]
|
||||
cipher_name = _aead_cipher_name(cipher)
|
||||
ctx = _aead_setup(
|
||||
backend, cipher_name, cipher._key, nonce, tag, tag_length, _DECRYPT
|
||||
)
|
||||
# CCM requires us to pass the length of the data before processing anything
|
||||
# However calling this with any other AEAD results in an error
|
||||
if isinstance(cipher, AESCCM):
|
||||
_set_length(backend, ctx, len(data))
|
||||
|
||||
for ad in associated_data:
|
||||
_process_aad(backend, ctx, ad)
|
||||
# CCM has a different error path if the tag doesn't match. Errors are
|
||||
# raised in Update and Final is irrelevant.
|
||||
if isinstance(cipher, AESCCM):
|
||||
outlen = backend._ffi.new("int *")
|
||||
buf = backend._ffi.new("unsigned char[]", len(data))
|
||||
res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data, len(data))
|
||||
if res != 1:
|
||||
backend._consume_errors()
|
||||
raise InvalidTag
|
||||
|
||||
processed_data = backend._ffi.buffer(buf, outlen[0])[:]
|
||||
else:
|
||||
processed_data = _process_data(backend, ctx, data)
|
||||
outlen = backend._ffi.new("int *")
|
||||
# OCB can return up to 15 bytes (16 byte block - 1) in finalization
|
||||
buf = backend._ffi.new("unsigned char[]", 16)
|
||||
res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen)
|
||||
processed_data += backend._ffi.buffer(buf, outlen[0])[:]
|
||||
if res == 0:
|
||||
backend._consume_errors()
|
||||
raise InvalidTag
|
||||
|
||||
return processed_data
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,282 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
import typing
|
||||
|
||||
from cryptography.exceptions import InvalidTag, UnsupportedAlgorithm, _Reasons
|
||||
from cryptography.hazmat.primitives import ciphers
|
||||
from cryptography.hazmat.primitives.ciphers import algorithms, modes
|
||||
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from cryptography.hazmat.backends.openssl.backend import Backend
|
||||
|
||||
|
||||
class _CipherContext:
|
||||
_ENCRYPT = 1
|
||||
_DECRYPT = 0
|
||||
_MAX_CHUNK_SIZE = 2**30 - 1
|
||||
|
||||
def __init__(
|
||||
self, backend: "Backend", cipher, mode, operation: int
|
||||
) -> None:
|
||||
self._backend = backend
|
||||
self._cipher = cipher
|
||||
self._mode = mode
|
||||
self._operation = operation
|
||||
self._tag: typing.Optional[bytes] = None
|
||||
|
||||
if isinstance(self._cipher, ciphers.BlockCipherAlgorithm):
|
||||
self._block_size_bytes = self._cipher.block_size // 8
|
||||
else:
|
||||
self._block_size_bytes = 1
|
||||
|
||||
ctx = self._backend._lib.EVP_CIPHER_CTX_new()
|
||||
ctx = self._backend._ffi.gc(
|
||||
ctx, self._backend._lib.EVP_CIPHER_CTX_free
|
||||
)
|
||||
|
||||
registry = self._backend._cipher_registry
|
||||
try:
|
||||
adapter = registry[type(cipher), type(mode)]
|
||||
except KeyError:
|
||||
raise UnsupportedAlgorithm(
|
||||
"cipher {} in {} mode is not supported "
|
||||
"by this backend.".format(
|
||||
cipher.name, mode.name if mode else mode
|
||||
),
|
||||
_Reasons.UNSUPPORTED_CIPHER,
|
||||
)
|
||||
|
||||
evp_cipher = adapter(self._backend, cipher, mode)
|
||||
if evp_cipher == self._backend._ffi.NULL:
|
||||
msg = "cipher {0.name} ".format(cipher)
|
||||
if mode is not None:
|
||||
msg += "in {0.name} mode ".format(mode)
|
||||
msg += (
|
||||
"is not supported by this backend (Your version of OpenSSL "
|
||||
"may be too old. Current version: {}.)"
|
||||
).format(self._backend.openssl_version_text())
|
||||
raise UnsupportedAlgorithm(msg, _Reasons.UNSUPPORTED_CIPHER)
|
||||
|
||||
if isinstance(mode, modes.ModeWithInitializationVector):
|
||||
iv_nonce = self._backend._ffi.from_buffer(
|
||||
mode.initialization_vector
|
||||
)
|
||||
elif isinstance(mode, modes.ModeWithTweak):
|
||||
iv_nonce = self._backend._ffi.from_buffer(mode.tweak)
|
||||
elif isinstance(mode, modes.ModeWithNonce):
|
||||
iv_nonce = self._backend._ffi.from_buffer(mode.nonce)
|
||||
elif isinstance(cipher, algorithms.ChaCha20):
|
||||
iv_nonce = self._backend._ffi.from_buffer(cipher.nonce)
|
||||
else:
|
||||
iv_nonce = self._backend._ffi.NULL
|
||||
# begin init with cipher and operation type
|
||||
res = self._backend._lib.EVP_CipherInit_ex(
|
||||
ctx,
|
||||
evp_cipher,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
operation,
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
# set the key length to handle variable key ciphers
|
||||
res = self._backend._lib.EVP_CIPHER_CTX_set_key_length(
|
||||
ctx, len(cipher.key)
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
if isinstance(mode, modes.GCM):
|
||||
res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
|
||||
ctx,
|
||||
self._backend._lib.EVP_CTRL_AEAD_SET_IVLEN,
|
||||
len(iv_nonce),
|
||||
self._backend._ffi.NULL,
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
if mode.tag is not None:
|
||||
res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
|
||||
ctx,
|
||||
self._backend._lib.EVP_CTRL_AEAD_SET_TAG,
|
||||
len(mode.tag),
|
||||
mode.tag,
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
self._tag = mode.tag
|
||||
|
||||
# pass key/iv
|
||||
res = self._backend._lib.EVP_CipherInit_ex(
|
||||
ctx,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.from_buffer(cipher.key),
|
||||
iv_nonce,
|
||||
operation,
|
||||
)
|
||||
|
||||
# Check for XTS mode duplicate keys error
|
||||
errors = self._backend._consume_errors()
|
||||
lib = self._backend._lib
|
||||
if res == 0 and (
|
||||
(
|
||||
lib.CRYPTOGRAPHY_OPENSSL_111D_OR_GREATER
|
||||
and errors[0]._lib_reason_match(
|
||||
lib.ERR_LIB_EVP, lib.EVP_R_XTS_DUPLICATED_KEYS
|
||||
)
|
||||
)
|
||||
or (
|
||||
lib.Cryptography_HAS_PROVIDERS
|
||||
and errors[0]._lib_reason_match(
|
||||
lib.ERR_LIB_PROV, lib.PROV_R_XTS_DUPLICATED_KEYS
|
||||
)
|
||||
)
|
||||
):
|
||||
raise ValueError("In XTS mode duplicated keys are not allowed")
|
||||
|
||||
self._backend.openssl_assert(res != 0, errors=errors)
|
||||
|
||||
# We purposely disable padding here as it's handled higher up in the
|
||||
# API.
|
||||
self._backend._lib.EVP_CIPHER_CTX_set_padding(ctx, 0)
|
||||
self._ctx = ctx
|
||||
|
||||
def update(self, data: bytes) -> bytes:
|
||||
buf = bytearray(len(data) + self._block_size_bytes - 1)
|
||||
n = self.update_into(data, buf)
|
||||
return bytes(buf[:n])
|
||||
|
||||
def update_into(self, data: bytes, buf: bytes) -> int:
|
||||
total_data_len = len(data)
|
||||
if len(buf) < (total_data_len + self._block_size_bytes - 1):
|
||||
raise ValueError(
|
||||
"buffer must be at least {} bytes for this "
|
||||
"payload".format(len(data) + self._block_size_bytes - 1)
|
||||
)
|
||||
|
||||
data_processed = 0
|
||||
total_out = 0
|
||||
outlen = self._backend._ffi.new("int *")
|
||||
baseoutbuf = self._backend._ffi.from_buffer(buf)
|
||||
baseinbuf = self._backend._ffi.from_buffer(data)
|
||||
|
||||
while data_processed != total_data_len:
|
||||
outbuf = baseoutbuf + total_out
|
||||
inbuf = baseinbuf + data_processed
|
||||
inlen = min(self._MAX_CHUNK_SIZE, total_data_len - data_processed)
|
||||
|
||||
res = self._backend._lib.EVP_CipherUpdate(
|
||||
self._ctx, outbuf, outlen, inbuf, inlen
|
||||
)
|
||||
if res == 0 and isinstance(self._mode, modes.XTS):
|
||||
self._backend._consume_errors()
|
||||
raise ValueError(
|
||||
"In XTS mode you must supply at least a full block in the "
|
||||
"first update call. For AES this is 16 bytes."
|
||||
)
|
||||
else:
|
||||
self._backend.openssl_assert(res != 0)
|
||||
data_processed += inlen
|
||||
total_out += outlen[0]
|
||||
|
||||
return total_out
|
||||
|
||||
def finalize(self) -> bytes:
|
||||
if (
|
||||
self._operation == self._DECRYPT
|
||||
and isinstance(self._mode, modes.ModeWithAuthenticationTag)
|
||||
and self.tag is None
|
||||
):
|
||||
raise ValueError(
|
||||
"Authentication tag must be provided when decrypting."
|
||||
)
|
||||
|
||||
buf = self._backend._ffi.new("unsigned char[]", self._block_size_bytes)
|
||||
outlen = self._backend._ffi.new("int *")
|
||||
res = self._backend._lib.EVP_CipherFinal_ex(self._ctx, buf, outlen)
|
||||
if res == 0:
|
||||
errors = self._backend._consume_errors()
|
||||
|
||||
if not errors and isinstance(self._mode, modes.GCM):
|
||||
raise InvalidTag
|
||||
|
||||
lib = self._backend._lib
|
||||
self._backend.openssl_assert(
|
||||
errors[0]._lib_reason_match(
|
||||
lib.ERR_LIB_EVP,
|
||||
lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH,
|
||||
)
|
||||
or (
|
||||
lib.Cryptography_HAS_PROVIDERS
|
||||
and errors[0]._lib_reason_match(
|
||||
lib.ERR_LIB_PROV,
|
||||
lib.PROV_R_WRONG_FINAL_BLOCK_LENGTH,
|
||||
)
|
||||
)
|
||||
or (
|
||||
lib.CRYPTOGRAPHY_IS_BORINGSSL
|
||||
and errors[0].reason
|
||||
== lib.CIPHER_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH
|
||||
),
|
||||
errors=errors,
|
||||
)
|
||||
raise ValueError(
|
||||
"The length of the provided data is not a multiple of "
|
||||
"the block length."
|
||||
)
|
||||
|
||||
if (
|
||||
isinstance(self._mode, modes.GCM)
|
||||
and self._operation == self._ENCRYPT
|
||||
):
|
||||
tag_buf = self._backend._ffi.new(
|
||||
"unsigned char[]", self._block_size_bytes
|
||||
)
|
||||
res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
|
||||
self._ctx,
|
||||
self._backend._lib.EVP_CTRL_AEAD_GET_TAG,
|
||||
self._block_size_bytes,
|
||||
tag_buf,
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
self._tag = self._backend._ffi.buffer(tag_buf)[:]
|
||||
|
||||
res = self._backend._lib.EVP_CIPHER_CTX_reset(self._ctx)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
return self._backend._ffi.buffer(buf)[: outlen[0]]
|
||||
|
||||
def finalize_with_tag(self, tag: bytes) -> bytes:
|
||||
tag_len = len(tag)
|
||||
if tag_len < self._mode._min_tag_length:
|
||||
raise ValueError(
|
||||
"Authentication tag must be {} bytes or longer.".format(
|
||||
self._mode._min_tag_length
|
||||
)
|
||||
)
|
||||
elif tag_len > self._block_size_bytes:
|
||||
raise ValueError(
|
||||
"Authentication tag cannot be more than {} bytes.".format(
|
||||
self._block_size_bytes
|
||||
)
|
||||
)
|
||||
res = self._backend._lib.EVP_CIPHER_CTX_ctrl(
|
||||
self._ctx, self._backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
self._tag = tag
|
||||
return self.finalize()
|
||||
|
||||
def authenticate_additional_data(self, data: bytes) -> None:
|
||||
outlen = self._backend._ffi.new("int *")
|
||||
res = self._backend._lib.EVP_CipherUpdate(
|
||||
self._ctx,
|
||||
self._backend._ffi.NULL,
|
||||
outlen,
|
||||
self._backend._ffi.from_buffer(data),
|
||||
len(data),
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
|
||||
@property
|
||||
def tag(self) -> typing.Optional[bytes]:
|
||||
return self._tag
|
||||
@@ -0,0 +1,87 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
import typing
|
||||
|
||||
from cryptography.exceptions import (
|
||||
InvalidSignature,
|
||||
UnsupportedAlgorithm,
|
||||
_Reasons,
|
||||
)
|
||||
from cryptography.hazmat.primitives import constant_time
|
||||
from cryptography.hazmat.primitives.ciphers.modes import CBC
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from cryptography.hazmat.primitives import ciphers
|
||||
from cryptography.hazmat.backends.openssl.backend import Backend
|
||||
|
||||
|
||||
class _CMACContext:
|
||||
def __init__(
|
||||
self,
|
||||
backend: "Backend",
|
||||
algorithm: "ciphers.BlockCipherAlgorithm",
|
||||
ctx=None,
|
||||
) -> None:
|
||||
if not backend.cmac_algorithm_supported(algorithm):
|
||||
raise UnsupportedAlgorithm(
|
||||
"This backend does not support CMAC.",
|
||||
_Reasons.UNSUPPORTED_CIPHER,
|
||||
)
|
||||
|
||||
self._backend = backend
|
||||
self._key = algorithm.key
|
||||
self._algorithm = algorithm
|
||||
self._output_length = algorithm.block_size // 8
|
||||
|
||||
if ctx is None:
|
||||
registry = self._backend._cipher_registry
|
||||
adapter = registry[type(algorithm), CBC]
|
||||
|
||||
evp_cipher = adapter(self._backend, algorithm, CBC)
|
||||
|
||||
ctx = self._backend._lib.CMAC_CTX_new()
|
||||
|
||||
self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
|
||||
ctx = self._backend._ffi.gc(ctx, self._backend._lib.CMAC_CTX_free)
|
||||
|
||||
key_ptr = self._backend._ffi.from_buffer(self._key)
|
||||
res = self._backend._lib.CMAC_Init(
|
||||
ctx,
|
||||
key_ptr,
|
||||
len(self._key),
|
||||
evp_cipher,
|
||||
self._backend._ffi.NULL,
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
|
||||
self._ctx = ctx
|
||||
|
||||
def update(self, data: bytes) -> None:
|
||||
res = self._backend._lib.CMAC_Update(self._ctx, data, len(data))
|
||||
self._backend.openssl_assert(res == 1)
|
||||
|
||||
def finalize(self) -> bytes:
|
||||
buf = self._backend._ffi.new("unsigned char[]", self._output_length)
|
||||
length = self._backend._ffi.new("size_t *", self._output_length)
|
||||
res = self._backend._lib.CMAC_Final(self._ctx, buf, length)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
|
||||
self._ctx = None
|
||||
|
||||
return self._backend._ffi.buffer(buf)[:]
|
||||
|
||||
def copy(self) -> "_CMACContext":
|
||||
copied_ctx = self._backend._lib.CMAC_CTX_new()
|
||||
copied_ctx = self._backend._ffi.gc(
|
||||
copied_ctx, self._backend._lib.CMAC_CTX_free
|
||||
)
|
||||
res = self._backend._lib.CMAC_CTX_copy(copied_ctx, self._ctx)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
return _CMACContext(self._backend, self._algorithm, ctx=copied_ctx)
|
||||
|
||||
def verify(self, signature: bytes) -> None:
|
||||
digest = self.finalize()
|
||||
if not constant_time.bytes_eq(digest, signature):
|
||||
raise InvalidSignature("Signature did not match digest.")
|
||||
@@ -0,0 +1,31 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
|
||||
from cryptography import x509
|
||||
|
||||
# CRLReason ::= ENUMERATED {
|
||||
# unspecified (0),
|
||||
# keyCompromise (1),
|
||||
# cACompromise (2),
|
||||
# affiliationChanged (3),
|
||||
# superseded (4),
|
||||
# cessationOfOperation (5),
|
||||
# certificateHold (6),
|
||||
# -- value 7 is not used
|
||||
# removeFromCRL (8),
|
||||
# privilegeWithdrawn (9),
|
||||
# aACompromise (10) }
|
||||
_CRL_ENTRY_REASON_ENUM_TO_CODE = {
|
||||
x509.ReasonFlags.unspecified: 0,
|
||||
x509.ReasonFlags.key_compromise: 1,
|
||||
x509.ReasonFlags.ca_compromise: 2,
|
||||
x509.ReasonFlags.affiliation_changed: 3,
|
||||
x509.ReasonFlags.superseded: 4,
|
||||
x509.ReasonFlags.cessation_of_operation: 5,
|
||||
x509.ReasonFlags.certificate_hold: 6,
|
||||
x509.ReasonFlags.remove_from_crl: 8,
|
||||
x509.ReasonFlags.privilege_withdrawn: 9,
|
||||
x509.ReasonFlags.aa_compromise: 10,
|
||||
}
|
||||
@@ -0,0 +1,318 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
import typing
|
||||
|
||||
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import dh
|
||||
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from cryptography.hazmat.backends.openssl.backend import Backend
|
||||
|
||||
|
||||
def _dh_params_dup(dh_cdata, backend: "Backend"):
|
||||
lib = backend._lib
|
||||
ffi = backend._ffi
|
||||
|
||||
param_cdata = lib.DHparams_dup(dh_cdata)
|
||||
backend.openssl_assert(param_cdata != ffi.NULL)
|
||||
param_cdata = ffi.gc(param_cdata, lib.DH_free)
|
||||
if lib.CRYPTOGRAPHY_IS_LIBRESSL:
|
||||
# In libressl DHparams_dup don't copy q
|
||||
q = ffi.new("BIGNUM **")
|
||||
lib.DH_get0_pqg(dh_cdata, ffi.NULL, q, ffi.NULL)
|
||||
q_dup = lib.BN_dup(q[0])
|
||||
res = lib.DH_set0_pqg(param_cdata, ffi.NULL, q_dup, ffi.NULL)
|
||||
backend.openssl_assert(res == 1)
|
||||
|
||||
return param_cdata
|
||||
|
||||
|
||||
def _dh_cdata_to_parameters(dh_cdata, backend: "Backend") -> "_DHParameters":
|
||||
param_cdata = _dh_params_dup(dh_cdata, backend)
|
||||
return _DHParameters(backend, param_cdata)
|
||||
|
||||
|
||||
class _DHParameters(dh.DHParameters):
|
||||
def __init__(self, backend: "Backend", dh_cdata):
|
||||
self._backend = backend
|
||||
self._dh_cdata = dh_cdata
|
||||
|
||||
def parameter_numbers(self) -> dh.DHParameterNumbers:
|
||||
p = self._backend._ffi.new("BIGNUM **")
|
||||
g = self._backend._ffi.new("BIGNUM **")
|
||||
q = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
|
||||
self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
|
||||
q_val: typing.Optional[int]
|
||||
if q[0] == self._backend._ffi.NULL:
|
||||
q_val = None
|
||||
else:
|
||||
q_val = self._backend._bn_to_int(q[0])
|
||||
return dh.DHParameterNumbers(
|
||||
p=self._backend._bn_to_int(p[0]),
|
||||
g=self._backend._bn_to_int(g[0]),
|
||||
q=q_val,
|
||||
)
|
||||
|
||||
def generate_private_key(self) -> dh.DHPrivateKey:
|
||||
return self._backend.generate_dh_private_key(self)
|
||||
|
||||
def parameter_bytes(
|
||||
self,
|
||||
encoding: serialization.Encoding,
|
||||
format: serialization.ParameterFormat,
|
||||
) -> bytes:
|
||||
if encoding is serialization.Encoding.OpenSSH:
|
||||
raise TypeError("OpenSSH encoding is not supported")
|
||||
|
||||
if format is not serialization.ParameterFormat.PKCS3:
|
||||
raise ValueError("Only PKCS3 serialization is supported")
|
||||
|
||||
q = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.DH_get0_pqg(
|
||||
self._dh_cdata, self._backend._ffi.NULL, q, self._backend._ffi.NULL
|
||||
)
|
||||
if (
|
||||
q[0] != self._backend._ffi.NULL
|
||||
and not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX
|
||||
):
|
||||
raise UnsupportedAlgorithm(
|
||||
"DH X9.42 serialization is not supported",
|
||||
_Reasons.UNSUPPORTED_SERIALIZATION,
|
||||
)
|
||||
|
||||
if encoding is serialization.Encoding.PEM:
|
||||
if q[0] != self._backend._ffi.NULL:
|
||||
write_bio = self._backend._lib.PEM_write_bio_DHxparams
|
||||
else:
|
||||
write_bio = self._backend._lib.PEM_write_bio_DHparams
|
||||
elif encoding is serialization.Encoding.DER:
|
||||
if q[0] != self._backend._ffi.NULL:
|
||||
write_bio = self._backend._lib.Cryptography_i2d_DHxparams_bio
|
||||
else:
|
||||
write_bio = self._backend._lib.i2d_DHparams_bio
|
||||
else:
|
||||
raise TypeError("encoding must be an item from the Encoding enum")
|
||||
|
||||
bio = self._backend._create_mem_bio_gc()
|
||||
res = write_bio(bio, self._dh_cdata)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
return self._backend._read_mem_bio(bio)
|
||||
|
||||
|
||||
def _get_dh_num_bits(backend, dh_cdata) -> int:
|
||||
p = backend._ffi.new("BIGNUM **")
|
||||
backend._lib.DH_get0_pqg(dh_cdata, p, backend._ffi.NULL, backend._ffi.NULL)
|
||||
backend.openssl_assert(p[0] != backend._ffi.NULL)
|
||||
return backend._lib.BN_num_bits(p[0])
|
||||
|
||||
|
||||
class _DHPrivateKey(dh.DHPrivateKey):
|
||||
def __init__(self, backend: "Backend", dh_cdata, evp_pkey):
|
||||
self._backend = backend
|
||||
self._dh_cdata = dh_cdata
|
||||
self._evp_pkey = evp_pkey
|
||||
self._key_size_bytes = self._backend._lib.DH_size(dh_cdata)
|
||||
|
||||
@property
|
||||
def key_size(self) -> int:
|
||||
return _get_dh_num_bits(self._backend, self._dh_cdata)
|
||||
|
||||
def private_numbers(self) -> dh.DHPrivateNumbers:
|
||||
p = self._backend._ffi.new("BIGNUM **")
|
||||
g = self._backend._ffi.new("BIGNUM **")
|
||||
q = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
|
||||
self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
|
||||
if q[0] == self._backend._ffi.NULL:
|
||||
q_val = None
|
||||
else:
|
||||
q_val = self._backend._bn_to_int(q[0])
|
||||
pub_key = self._backend._ffi.new("BIGNUM **")
|
||||
priv_key = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.DH_get0_key(self._dh_cdata, pub_key, priv_key)
|
||||
self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(priv_key[0] != self._backend._ffi.NULL)
|
||||
return dh.DHPrivateNumbers(
|
||||
public_numbers=dh.DHPublicNumbers(
|
||||
parameter_numbers=dh.DHParameterNumbers(
|
||||
p=self._backend._bn_to_int(p[0]),
|
||||
g=self._backend._bn_to_int(g[0]),
|
||||
q=q_val,
|
||||
),
|
||||
y=self._backend._bn_to_int(pub_key[0]),
|
||||
),
|
||||
x=self._backend._bn_to_int(priv_key[0]),
|
||||
)
|
||||
|
||||
def exchange(self, peer_public_key: dh.DHPublicKey) -> bytes:
|
||||
if not isinstance(peer_public_key, _DHPublicKey):
|
||||
raise TypeError("peer_public_key must be a DHPublicKey")
|
||||
|
||||
ctx = self._backend._lib.EVP_PKEY_CTX_new(
|
||||
self._evp_pkey, self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
|
||||
ctx = self._backend._ffi.gc(ctx, self._backend._lib.EVP_PKEY_CTX_free)
|
||||
res = self._backend._lib.EVP_PKEY_derive_init(ctx)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
res = self._backend._lib.EVP_PKEY_derive_set_peer(
|
||||
ctx, peer_public_key._evp_pkey
|
||||
)
|
||||
# Invalid kex errors here in OpenSSL 3.0 because checks were moved
|
||||
# to EVP_PKEY_derive_set_peer
|
||||
self._exchange_assert(res == 1)
|
||||
keylen = self._backend._ffi.new("size_t *")
|
||||
res = self._backend._lib.EVP_PKEY_derive(
|
||||
ctx, self._backend._ffi.NULL, keylen
|
||||
)
|
||||
# Invalid kex errors here in OpenSSL < 3
|
||||
self._exchange_assert(res == 1)
|
||||
self._backend.openssl_assert(keylen[0] > 0)
|
||||
buf = self._backend._ffi.new("unsigned char[]", keylen[0])
|
||||
res = self._backend._lib.EVP_PKEY_derive(ctx, buf, keylen)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
|
||||
key = self._backend._ffi.buffer(buf, keylen[0])[:]
|
||||
pad = self._key_size_bytes - len(key)
|
||||
|
||||
if pad > 0:
|
||||
key = (b"\x00" * pad) + key
|
||||
|
||||
return key
|
||||
|
||||
def _exchange_assert(self, ok: bool) -> None:
|
||||
if not ok:
|
||||
errors_with_text = self._backend._consume_errors_with_text()
|
||||
raise ValueError(
|
||||
"Error computing shared key.",
|
||||
errors_with_text,
|
||||
)
|
||||
|
||||
def public_key(self) -> dh.DHPublicKey:
|
||||
dh_cdata = _dh_params_dup(self._dh_cdata, self._backend)
|
||||
pub_key = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.DH_get0_key(
|
||||
self._dh_cdata, pub_key, self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
|
||||
pub_key_dup = self._backend._lib.BN_dup(pub_key[0])
|
||||
self._backend.openssl_assert(pub_key_dup != self._backend._ffi.NULL)
|
||||
|
||||
res = self._backend._lib.DH_set0_key(
|
||||
dh_cdata, pub_key_dup, self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
evp_pkey = self._backend._dh_cdata_to_evp_pkey(dh_cdata)
|
||||
return _DHPublicKey(self._backend, dh_cdata, evp_pkey)
|
||||
|
||||
def parameters(self) -> dh.DHParameters:
|
||||
return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
|
||||
|
||||
def private_bytes(
|
||||
self,
|
||||
encoding: serialization.Encoding,
|
||||
format: serialization.PrivateFormat,
|
||||
encryption_algorithm: serialization.KeySerializationEncryption,
|
||||
) -> bytes:
|
||||
if format is not serialization.PrivateFormat.PKCS8:
|
||||
raise ValueError(
|
||||
"DH private keys support only PKCS8 serialization"
|
||||
)
|
||||
if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
|
||||
q = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.DH_get0_pqg(
|
||||
self._dh_cdata,
|
||||
self._backend._ffi.NULL,
|
||||
q,
|
||||
self._backend._ffi.NULL,
|
||||
)
|
||||
if q[0] != self._backend._ffi.NULL:
|
||||
raise UnsupportedAlgorithm(
|
||||
"DH X9.42 serialization is not supported",
|
||||
_Reasons.UNSUPPORTED_SERIALIZATION,
|
||||
)
|
||||
|
||||
return self._backend._private_key_bytes(
|
||||
encoding,
|
||||
format,
|
||||
encryption_algorithm,
|
||||
self,
|
||||
self._evp_pkey,
|
||||
self._dh_cdata,
|
||||
)
|
||||
|
||||
|
||||
class _DHPublicKey(dh.DHPublicKey):
|
||||
def __init__(self, backend: "Backend", dh_cdata, evp_pkey):
|
||||
self._backend = backend
|
||||
self._dh_cdata = dh_cdata
|
||||
self._evp_pkey = evp_pkey
|
||||
self._key_size_bits = _get_dh_num_bits(self._backend, self._dh_cdata)
|
||||
|
||||
@property
|
||||
def key_size(self) -> int:
|
||||
return self._key_size_bits
|
||||
|
||||
def public_numbers(self) -> dh.DHPublicNumbers:
|
||||
p = self._backend._ffi.new("BIGNUM **")
|
||||
g = self._backend._ffi.new("BIGNUM **")
|
||||
q = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
|
||||
self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
|
||||
if q[0] == self._backend._ffi.NULL:
|
||||
q_val = None
|
||||
else:
|
||||
q_val = self._backend._bn_to_int(q[0])
|
||||
pub_key = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.DH_get0_key(
|
||||
self._dh_cdata, pub_key, self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
|
||||
return dh.DHPublicNumbers(
|
||||
parameter_numbers=dh.DHParameterNumbers(
|
||||
p=self._backend._bn_to_int(p[0]),
|
||||
g=self._backend._bn_to_int(g[0]),
|
||||
q=q_val,
|
||||
),
|
||||
y=self._backend._bn_to_int(pub_key[0]),
|
||||
)
|
||||
|
||||
def parameters(self) -> dh.DHParameters:
|
||||
return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
|
||||
|
||||
def public_bytes(
|
||||
self,
|
||||
encoding: serialization.Encoding,
|
||||
format: serialization.PublicFormat,
|
||||
) -> bytes:
|
||||
if format is not serialization.PublicFormat.SubjectPublicKeyInfo:
|
||||
raise ValueError(
|
||||
"DH public keys support only "
|
||||
"SubjectPublicKeyInfo serialization"
|
||||
)
|
||||
|
||||
if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
|
||||
q = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.DH_get0_pqg(
|
||||
self._dh_cdata,
|
||||
self._backend._ffi.NULL,
|
||||
q,
|
||||
self._backend._ffi.NULL,
|
||||
)
|
||||
if q[0] != self._backend._ffi.NULL:
|
||||
raise UnsupportedAlgorithm(
|
||||
"DH X9.42 serialization is not supported",
|
||||
_Reasons.UNSUPPORTED_SERIALIZATION,
|
||||
)
|
||||
|
||||
return self._backend._public_key_bytes(
|
||||
encoding, format, self, self._evp_pkey, None
|
||||
)
|
||||
@@ -0,0 +1,239 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
|
||||
import typing
|
||||
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from cryptography.hazmat.backends.openssl.utils import (
|
||||
_calculate_digest_and_algorithm,
|
||||
)
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import (
|
||||
dsa,
|
||||
utils as asym_utils,
|
||||
)
|
||||
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from cryptography.hazmat.backends.openssl.backend import Backend
|
||||
|
||||
|
||||
def _dsa_sig_sign(
|
||||
backend: "Backend", private_key: "_DSAPrivateKey", data: bytes
|
||||
) -> bytes:
|
||||
sig_buf_len = backend._lib.DSA_size(private_key._dsa_cdata)
|
||||
sig_buf = backend._ffi.new("unsigned char[]", sig_buf_len)
|
||||
buflen = backend._ffi.new("unsigned int *")
|
||||
|
||||
# The first parameter passed to DSA_sign is unused by OpenSSL but
|
||||
# must be an integer.
|
||||
res = backend._lib.DSA_sign(
|
||||
0, data, len(data), sig_buf, buflen, private_key._dsa_cdata
|
||||
)
|
||||
backend.openssl_assert(res == 1)
|
||||
backend.openssl_assert(buflen[0])
|
||||
|
||||
return backend._ffi.buffer(sig_buf)[: buflen[0]]
|
||||
|
||||
|
||||
def _dsa_sig_verify(
|
||||
backend: "Backend",
|
||||
public_key: "_DSAPublicKey",
|
||||
signature: bytes,
|
||||
data: bytes,
|
||||
) -> None:
|
||||
# The first parameter passed to DSA_verify is unused by OpenSSL but
|
||||
# must be an integer.
|
||||
res = backend._lib.DSA_verify(
|
||||
0, data, len(data), signature, len(signature), public_key._dsa_cdata
|
||||
)
|
||||
|
||||
if res != 1:
|
||||
backend._consume_errors()
|
||||
raise InvalidSignature
|
||||
|
||||
|
||||
class _DSAParameters(dsa.DSAParameters):
|
||||
def __init__(self, backend: "Backend", dsa_cdata):
|
||||
self._backend = backend
|
||||
self._dsa_cdata = dsa_cdata
|
||||
|
||||
def parameter_numbers(self) -> dsa.DSAParameterNumbers:
|
||||
p = self._backend._ffi.new("BIGNUM **")
|
||||
q = self._backend._ffi.new("BIGNUM **")
|
||||
g = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.DSA_get0_pqg(self._dsa_cdata, p, q, g)
|
||||
self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(q[0] != self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
|
||||
return dsa.DSAParameterNumbers(
|
||||
p=self._backend._bn_to_int(p[0]),
|
||||
q=self._backend._bn_to_int(q[0]),
|
||||
g=self._backend._bn_to_int(g[0]),
|
||||
)
|
||||
|
||||
def generate_private_key(self) -> dsa.DSAPrivateKey:
|
||||
return self._backend.generate_dsa_private_key(self)
|
||||
|
||||
|
||||
class _DSAPrivateKey(dsa.DSAPrivateKey):
|
||||
_key_size: int
|
||||
|
||||
def __init__(self, backend: "Backend", dsa_cdata, evp_pkey):
|
||||
self._backend = backend
|
||||
self._dsa_cdata = dsa_cdata
|
||||
self._evp_pkey = evp_pkey
|
||||
|
||||
p = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.DSA_get0_pqg(
|
||||
dsa_cdata, p, self._backend._ffi.NULL, self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(p[0] != backend._ffi.NULL)
|
||||
self._key_size = self._backend._lib.BN_num_bits(p[0])
|
||||
|
||||
@property
|
||||
def key_size(self) -> int:
|
||||
return self._key_size
|
||||
|
||||
def private_numbers(self) -> dsa.DSAPrivateNumbers:
|
||||
p = self._backend._ffi.new("BIGNUM **")
|
||||
q = self._backend._ffi.new("BIGNUM **")
|
||||
g = self._backend._ffi.new("BIGNUM **")
|
||||
pub_key = self._backend._ffi.new("BIGNUM **")
|
||||
priv_key = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.DSA_get0_pqg(self._dsa_cdata, p, q, g)
|
||||
self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(q[0] != self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
|
||||
self._backend._lib.DSA_get0_key(self._dsa_cdata, pub_key, priv_key)
|
||||
self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(priv_key[0] != self._backend._ffi.NULL)
|
||||
return dsa.DSAPrivateNumbers(
|
||||
public_numbers=dsa.DSAPublicNumbers(
|
||||
parameter_numbers=dsa.DSAParameterNumbers(
|
||||
p=self._backend._bn_to_int(p[0]),
|
||||
q=self._backend._bn_to_int(q[0]),
|
||||
g=self._backend._bn_to_int(g[0]),
|
||||
),
|
||||
y=self._backend._bn_to_int(pub_key[0]),
|
||||
),
|
||||
x=self._backend._bn_to_int(priv_key[0]),
|
||||
)
|
||||
|
||||
def public_key(self) -> dsa.DSAPublicKey:
|
||||
dsa_cdata = self._backend._lib.DSAparams_dup(self._dsa_cdata)
|
||||
self._backend.openssl_assert(dsa_cdata != self._backend._ffi.NULL)
|
||||
dsa_cdata = self._backend._ffi.gc(
|
||||
dsa_cdata, self._backend._lib.DSA_free
|
||||
)
|
||||
pub_key = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.DSA_get0_key(
|
||||
self._dsa_cdata, pub_key, self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
|
||||
pub_key_dup = self._backend._lib.BN_dup(pub_key[0])
|
||||
res = self._backend._lib.DSA_set0_key(
|
||||
dsa_cdata, pub_key_dup, self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
evp_pkey = self._backend._dsa_cdata_to_evp_pkey(dsa_cdata)
|
||||
return _DSAPublicKey(self._backend, dsa_cdata, evp_pkey)
|
||||
|
||||
def parameters(self) -> dsa.DSAParameters:
|
||||
dsa_cdata = self._backend._lib.DSAparams_dup(self._dsa_cdata)
|
||||
self._backend.openssl_assert(dsa_cdata != self._backend._ffi.NULL)
|
||||
dsa_cdata = self._backend._ffi.gc(
|
||||
dsa_cdata, self._backend._lib.DSA_free
|
||||
)
|
||||
return _DSAParameters(self._backend, dsa_cdata)
|
||||
|
||||
def private_bytes(
|
||||
self,
|
||||
encoding: serialization.Encoding,
|
||||
format: serialization.PrivateFormat,
|
||||
encryption_algorithm: serialization.KeySerializationEncryption,
|
||||
) -> bytes:
|
||||
return self._backend._private_key_bytes(
|
||||
encoding,
|
||||
format,
|
||||
encryption_algorithm,
|
||||
self,
|
||||
self._evp_pkey,
|
||||
self._dsa_cdata,
|
||||
)
|
||||
|
||||
def sign(
|
||||
self,
|
||||
data: bytes,
|
||||
algorithm: typing.Union[asym_utils.Prehashed, hashes.HashAlgorithm],
|
||||
) -> bytes:
|
||||
data, _ = _calculate_digest_and_algorithm(data, algorithm)
|
||||
return _dsa_sig_sign(self._backend, self, data)
|
||||
|
||||
|
||||
class _DSAPublicKey(dsa.DSAPublicKey):
|
||||
_key_size: int
|
||||
|
||||
def __init__(self, backend: "Backend", dsa_cdata, evp_pkey):
|
||||
self._backend = backend
|
||||
self._dsa_cdata = dsa_cdata
|
||||
self._evp_pkey = evp_pkey
|
||||
p = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.DSA_get0_pqg(
|
||||
dsa_cdata, p, self._backend._ffi.NULL, self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(p[0] != backend._ffi.NULL)
|
||||
self._key_size = self._backend._lib.BN_num_bits(p[0])
|
||||
|
||||
@property
|
||||
def key_size(self) -> int:
|
||||
return self._key_size
|
||||
|
||||
def public_numbers(self) -> dsa.DSAPublicNumbers:
|
||||
p = self._backend._ffi.new("BIGNUM **")
|
||||
q = self._backend._ffi.new("BIGNUM **")
|
||||
g = self._backend._ffi.new("BIGNUM **")
|
||||
pub_key = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.DSA_get0_pqg(self._dsa_cdata, p, q, g)
|
||||
self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(q[0] != self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
|
||||
self._backend._lib.DSA_get0_key(
|
||||
self._dsa_cdata, pub_key, self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
|
||||
return dsa.DSAPublicNumbers(
|
||||
parameter_numbers=dsa.DSAParameterNumbers(
|
||||
p=self._backend._bn_to_int(p[0]),
|
||||
q=self._backend._bn_to_int(q[0]),
|
||||
g=self._backend._bn_to_int(g[0]),
|
||||
),
|
||||
y=self._backend._bn_to_int(pub_key[0]),
|
||||
)
|
||||
|
||||
def parameters(self) -> dsa.DSAParameters:
|
||||
dsa_cdata = self._backend._lib.DSAparams_dup(self._dsa_cdata)
|
||||
dsa_cdata = self._backend._ffi.gc(
|
||||
dsa_cdata, self._backend._lib.DSA_free
|
||||
)
|
||||
return _DSAParameters(self._backend, dsa_cdata)
|
||||
|
||||
def public_bytes(
|
||||
self,
|
||||
encoding: serialization.Encoding,
|
||||
format: serialization.PublicFormat,
|
||||
) -> bytes:
|
||||
return self._backend._public_key_bytes(
|
||||
encoding, format, self, self._evp_pkey, None
|
||||
)
|
||||
|
||||
def verify(
|
||||
self,
|
||||
signature: bytes,
|
||||
data: bytes,
|
||||
algorithm: typing.Union[asym_utils.Prehashed, hashes.HashAlgorithm],
|
||||
) -> None:
|
||||
data, _ = _calculate_digest_and_algorithm(data, algorithm)
|
||||
return _dsa_sig_verify(self._backend, self, signature, data)
|
||||
@@ -0,0 +1,315 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
import typing
|
||||
|
||||
from cryptography.exceptions import (
|
||||
InvalidSignature,
|
||||
UnsupportedAlgorithm,
|
||||
_Reasons,
|
||||
)
|
||||
from cryptography.hazmat.backends.openssl.utils import (
|
||||
_calculate_digest_and_algorithm,
|
||||
_evp_pkey_derive,
|
||||
)
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from cryptography.hazmat.backends.openssl.backend import Backend
|
||||
|
||||
|
||||
def _check_signature_algorithm(
|
||||
signature_algorithm: ec.EllipticCurveSignatureAlgorithm,
|
||||
) -> None:
|
||||
if not isinstance(signature_algorithm, ec.ECDSA):
|
||||
raise UnsupportedAlgorithm(
|
||||
"Unsupported elliptic curve signature algorithm.",
|
||||
_Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM,
|
||||
)
|
||||
|
||||
|
||||
def _ec_key_curve_sn(backend: "Backend", ec_key) -> str:
|
||||
group = backend._lib.EC_KEY_get0_group(ec_key)
|
||||
backend.openssl_assert(group != backend._ffi.NULL)
|
||||
|
||||
nid = backend._lib.EC_GROUP_get_curve_name(group)
|
||||
# The following check is to find EC keys with unnamed curves and raise
|
||||
# an error for now.
|
||||
if nid == backend._lib.NID_undef:
|
||||
raise ValueError(
|
||||
"ECDSA keys with explicit parameters are unsupported at this time"
|
||||
)
|
||||
|
||||
# This is like the above check, but it also catches the case where you
|
||||
# explicitly encoded a curve with the same parameters as a named curve.
|
||||
# Don't do that.
|
||||
if (
|
||||
not backend._lib.CRYPTOGRAPHY_IS_LIBRESSL
|
||||
and backend._lib.EC_GROUP_get_asn1_flag(group) == 0
|
||||
):
|
||||
raise ValueError(
|
||||
"ECDSA keys with explicit parameters are unsupported at this time"
|
||||
)
|
||||
|
||||
curve_name = backend._lib.OBJ_nid2sn(nid)
|
||||
backend.openssl_assert(curve_name != backend._ffi.NULL)
|
||||
|
||||
sn = backend._ffi.string(curve_name).decode("ascii")
|
||||
return sn
|
||||
|
||||
|
||||
def _mark_asn1_named_ec_curve(backend: "Backend", ec_cdata):
|
||||
"""
|
||||
Set the named curve flag on the EC_KEY. This causes OpenSSL to
|
||||
serialize EC keys along with their curve OID which makes
|
||||
deserialization easier.
|
||||
"""
|
||||
|
||||
backend._lib.EC_KEY_set_asn1_flag(
|
||||
ec_cdata, backend._lib.OPENSSL_EC_NAMED_CURVE
|
||||
)
|
||||
|
||||
|
||||
def _check_key_infinity(backend: "Backend", ec_cdata) -> None:
|
||||
point = backend._lib.EC_KEY_get0_public_key(ec_cdata)
|
||||
backend.openssl_assert(point != backend._ffi.NULL)
|
||||
group = backend._lib.EC_KEY_get0_group(ec_cdata)
|
||||
backend.openssl_assert(group != backend._ffi.NULL)
|
||||
if backend._lib.EC_POINT_is_at_infinity(group, point):
|
||||
raise ValueError(
|
||||
"Cannot load an EC public key where the point is at infinity"
|
||||
)
|
||||
|
||||
|
||||
def _sn_to_elliptic_curve(backend: "Backend", sn: str) -> ec.EllipticCurve:
|
||||
try:
|
||||
return ec._CURVE_TYPES[sn]()
|
||||
except KeyError:
|
||||
raise UnsupportedAlgorithm(
|
||||
"{} is not a supported elliptic curve".format(sn),
|
||||
_Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
|
||||
)
|
||||
|
||||
|
||||
def _ecdsa_sig_sign(
|
||||
backend: "Backend", private_key: "_EllipticCurvePrivateKey", data: bytes
|
||||
) -> bytes:
|
||||
max_size = backend._lib.ECDSA_size(private_key._ec_key)
|
||||
backend.openssl_assert(max_size > 0)
|
||||
|
||||
sigbuf = backend._ffi.new("unsigned char[]", max_size)
|
||||
siglen_ptr = backend._ffi.new("unsigned int[]", 1)
|
||||
res = backend._lib.ECDSA_sign(
|
||||
0, data, len(data), sigbuf, siglen_ptr, private_key._ec_key
|
||||
)
|
||||
backend.openssl_assert(res == 1)
|
||||
return backend._ffi.buffer(sigbuf)[: siglen_ptr[0]]
|
||||
|
||||
|
||||
def _ecdsa_sig_verify(
|
||||
backend: "Backend",
|
||||
public_key: "_EllipticCurvePublicKey",
|
||||
signature: bytes,
|
||||
data: bytes,
|
||||
) -> None:
|
||||
res = backend._lib.ECDSA_verify(
|
||||
0, data, len(data), signature, len(signature), public_key._ec_key
|
||||
)
|
||||
if res != 1:
|
||||
backend._consume_errors()
|
||||
raise InvalidSignature
|
||||
|
||||
|
||||
class _EllipticCurvePrivateKey(ec.EllipticCurvePrivateKey):
|
||||
def __init__(self, backend: "Backend", ec_key_cdata, evp_pkey):
|
||||
self._backend = backend
|
||||
self._ec_key = ec_key_cdata
|
||||
self._evp_pkey = evp_pkey
|
||||
|
||||
sn = _ec_key_curve_sn(backend, ec_key_cdata)
|
||||
self._curve = _sn_to_elliptic_curve(backend, sn)
|
||||
_mark_asn1_named_ec_curve(backend, ec_key_cdata)
|
||||
_check_key_infinity(backend, ec_key_cdata)
|
||||
|
||||
@property
|
||||
def curve(self) -> ec.EllipticCurve:
|
||||
return self._curve
|
||||
|
||||
@property
|
||||
def key_size(self) -> int:
|
||||
return self.curve.key_size
|
||||
|
||||
def exchange(
|
||||
self, algorithm: ec.ECDH, peer_public_key: ec.EllipticCurvePublicKey
|
||||
) -> bytes:
|
||||
if not (
|
||||
self._backend.elliptic_curve_exchange_algorithm_supported(
|
||||
algorithm, self.curve
|
||||
)
|
||||
):
|
||||
raise UnsupportedAlgorithm(
|
||||
"This backend does not support the ECDH algorithm.",
|
||||
_Reasons.UNSUPPORTED_EXCHANGE_ALGORITHM,
|
||||
)
|
||||
|
||||
if peer_public_key.curve.name != self.curve.name:
|
||||
raise ValueError(
|
||||
"peer_public_key and self are not on the same curve"
|
||||
)
|
||||
|
||||
return _evp_pkey_derive(self._backend, self._evp_pkey, peer_public_key)
|
||||
|
||||
def public_key(self) -> ec.EllipticCurvePublicKey:
|
||||
group = self._backend._lib.EC_KEY_get0_group(self._ec_key)
|
||||
self._backend.openssl_assert(group != self._backend._ffi.NULL)
|
||||
|
||||
curve_nid = self._backend._lib.EC_GROUP_get_curve_name(group)
|
||||
public_ec_key = self._backend._ec_key_new_by_curve_nid(curve_nid)
|
||||
|
||||
point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key)
|
||||
self._backend.openssl_assert(point != self._backend._ffi.NULL)
|
||||
|
||||
res = self._backend._lib.EC_KEY_set_public_key(public_ec_key, point)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
|
||||
evp_pkey = self._backend._ec_cdata_to_evp_pkey(public_ec_key)
|
||||
|
||||
return _EllipticCurvePublicKey(self._backend, public_ec_key, evp_pkey)
|
||||
|
||||
def private_numbers(self) -> ec.EllipticCurvePrivateNumbers:
|
||||
bn = self._backend._lib.EC_KEY_get0_private_key(self._ec_key)
|
||||
private_value = self._backend._bn_to_int(bn)
|
||||
return ec.EllipticCurvePrivateNumbers(
|
||||
private_value=private_value,
|
||||
public_numbers=self.public_key().public_numbers(),
|
||||
)
|
||||
|
||||
def private_bytes(
|
||||
self,
|
||||
encoding: serialization.Encoding,
|
||||
format: serialization.PrivateFormat,
|
||||
encryption_algorithm: serialization.KeySerializationEncryption,
|
||||
) -> bytes:
|
||||
return self._backend._private_key_bytes(
|
||||
encoding,
|
||||
format,
|
||||
encryption_algorithm,
|
||||
self,
|
||||
self._evp_pkey,
|
||||
self._ec_key,
|
||||
)
|
||||
|
||||
def sign(
|
||||
self,
|
||||
data: bytes,
|
||||
signature_algorithm: ec.EllipticCurveSignatureAlgorithm,
|
||||
) -> bytes:
|
||||
_check_signature_algorithm(signature_algorithm)
|
||||
data, _ = _calculate_digest_and_algorithm(
|
||||
data,
|
||||
signature_algorithm.algorithm,
|
||||
)
|
||||
return _ecdsa_sig_sign(self._backend, self, data)
|
||||
|
||||
|
||||
class _EllipticCurvePublicKey(ec.EllipticCurvePublicKey):
|
||||
def __init__(self, backend: "Backend", ec_key_cdata, evp_pkey):
|
||||
self._backend = backend
|
||||
self._ec_key = ec_key_cdata
|
||||
self._evp_pkey = evp_pkey
|
||||
|
||||
sn = _ec_key_curve_sn(backend, ec_key_cdata)
|
||||
self._curve = _sn_to_elliptic_curve(backend, sn)
|
||||
_mark_asn1_named_ec_curve(backend, ec_key_cdata)
|
||||
_check_key_infinity(backend, ec_key_cdata)
|
||||
|
||||
@property
|
||||
def curve(self) -> ec.EllipticCurve:
|
||||
return self._curve
|
||||
|
||||
@property
|
||||
def key_size(self) -> int:
|
||||
return self.curve.key_size
|
||||
|
||||
def public_numbers(self) -> ec.EllipticCurvePublicNumbers:
|
||||
get_func, group = self._backend._ec_key_determine_group_get_func(
|
||||
self._ec_key
|
||||
)
|
||||
point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key)
|
||||
self._backend.openssl_assert(point != self._backend._ffi.NULL)
|
||||
|
||||
with self._backend._tmp_bn_ctx() as bn_ctx:
|
||||
bn_x = self._backend._lib.BN_CTX_get(bn_ctx)
|
||||
bn_y = self._backend._lib.BN_CTX_get(bn_ctx)
|
||||
|
||||
res = get_func(group, point, bn_x, bn_y, bn_ctx)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
|
||||
x = self._backend._bn_to_int(bn_x)
|
||||
y = self._backend._bn_to_int(bn_y)
|
||||
|
||||
return ec.EllipticCurvePublicNumbers(x=x, y=y, curve=self._curve)
|
||||
|
||||
def _encode_point(self, format: serialization.PublicFormat) -> bytes:
|
||||
if format is serialization.PublicFormat.CompressedPoint:
|
||||
conversion = self._backend._lib.POINT_CONVERSION_COMPRESSED
|
||||
else:
|
||||
assert format is serialization.PublicFormat.UncompressedPoint
|
||||
conversion = self._backend._lib.POINT_CONVERSION_UNCOMPRESSED
|
||||
|
||||
group = self._backend._lib.EC_KEY_get0_group(self._ec_key)
|
||||
self._backend.openssl_assert(group != self._backend._ffi.NULL)
|
||||
point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key)
|
||||
self._backend.openssl_assert(point != self._backend._ffi.NULL)
|
||||
with self._backend._tmp_bn_ctx() as bn_ctx:
|
||||
buflen = self._backend._lib.EC_POINT_point2oct(
|
||||
group, point, conversion, self._backend._ffi.NULL, 0, bn_ctx
|
||||
)
|
||||
self._backend.openssl_assert(buflen > 0)
|
||||
buf = self._backend._ffi.new("char[]", buflen)
|
||||
res = self._backend._lib.EC_POINT_point2oct(
|
||||
group, point, conversion, buf, buflen, bn_ctx
|
||||
)
|
||||
self._backend.openssl_assert(buflen == res)
|
||||
|
||||
return self._backend._ffi.buffer(buf)[:]
|
||||
|
||||
def public_bytes(
|
||||
self,
|
||||
encoding: serialization.Encoding,
|
||||
format: serialization.PublicFormat,
|
||||
) -> bytes:
|
||||
if (
|
||||
encoding is serialization.Encoding.X962
|
||||
or format is serialization.PublicFormat.CompressedPoint
|
||||
or format is serialization.PublicFormat.UncompressedPoint
|
||||
):
|
||||
if encoding is not serialization.Encoding.X962 or format not in (
|
||||
serialization.PublicFormat.CompressedPoint,
|
||||
serialization.PublicFormat.UncompressedPoint,
|
||||
):
|
||||
raise ValueError(
|
||||
"X962 encoding must be used with CompressedPoint or "
|
||||
"UncompressedPoint format"
|
||||
)
|
||||
|
||||
return self._encode_point(format)
|
||||
else:
|
||||
return self._backend._public_key_bytes(
|
||||
encoding, format, self, self._evp_pkey, None
|
||||
)
|
||||
|
||||
def verify(
|
||||
self,
|
||||
signature: bytes,
|
||||
data: bytes,
|
||||
signature_algorithm: ec.EllipticCurveSignatureAlgorithm,
|
||||
) -> None:
|
||||
_check_signature_algorithm(signature_algorithm)
|
||||
data, _ = _calculate_digest_and_algorithm(
|
||||
data,
|
||||
signature_algorithm.algorithm,
|
||||
)
|
||||
_ecdsa_sig_verify(self._backend, self, signature, data)
|
||||
@@ -0,0 +1,155 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
import typing
|
||||
|
||||
from cryptography import exceptions
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
|
||||
Ed25519PrivateKey,
|
||||
Ed25519PublicKey,
|
||||
_ED25519_KEY_SIZE,
|
||||
_ED25519_SIG_SIZE,
|
||||
)
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from cryptography.hazmat.backends.openssl.backend import Backend
|
||||
|
||||
|
||||
class _Ed25519PublicKey(Ed25519PublicKey):
|
||||
def __init__(self, backend: "Backend", evp_pkey):
|
||||
self._backend = backend
|
||||
self._evp_pkey = evp_pkey
|
||||
|
||||
def public_bytes(
|
||||
self,
|
||||
encoding: serialization.Encoding,
|
||||
format: serialization.PublicFormat,
|
||||
) -> bytes:
|
||||
if (
|
||||
encoding is serialization.Encoding.Raw
|
||||
or format is serialization.PublicFormat.Raw
|
||||
):
|
||||
if (
|
||||
encoding is not serialization.Encoding.Raw
|
||||
or format is not serialization.PublicFormat.Raw
|
||||
):
|
||||
raise ValueError(
|
||||
"When using Raw both encoding and format must be Raw"
|
||||
)
|
||||
|
||||
return self._raw_public_bytes()
|
||||
|
||||
return self._backend._public_key_bytes(
|
||||
encoding, format, self, self._evp_pkey, None
|
||||
)
|
||||
|
||||
def _raw_public_bytes(self) -> bytes:
|
||||
buf = self._backend._ffi.new("unsigned char []", _ED25519_KEY_SIZE)
|
||||
buflen = self._backend._ffi.new("size_t *", _ED25519_KEY_SIZE)
|
||||
res = self._backend._lib.EVP_PKEY_get_raw_public_key(
|
||||
self._evp_pkey, buf, buflen
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
self._backend.openssl_assert(buflen[0] == _ED25519_KEY_SIZE)
|
||||
return self._backend._ffi.buffer(buf, _ED25519_KEY_SIZE)[:]
|
||||
|
||||
def verify(self, signature: bytes, data: bytes) -> None:
|
||||
evp_md_ctx = self._backend._lib.EVP_MD_CTX_new()
|
||||
self._backend.openssl_assert(evp_md_ctx != self._backend._ffi.NULL)
|
||||
evp_md_ctx = self._backend._ffi.gc(
|
||||
evp_md_ctx, self._backend._lib.EVP_MD_CTX_free
|
||||
)
|
||||
res = self._backend._lib.EVP_DigestVerifyInit(
|
||||
evp_md_ctx,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
self._evp_pkey,
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
res = self._backend._lib.EVP_DigestVerify(
|
||||
evp_md_ctx, signature, len(signature), data, len(data)
|
||||
)
|
||||
if res != 1:
|
||||
self._backend._consume_errors()
|
||||
raise exceptions.InvalidSignature
|
||||
|
||||
|
||||
class _Ed25519PrivateKey(Ed25519PrivateKey):
|
||||
def __init__(self, backend: "Backend", evp_pkey):
|
||||
self._backend = backend
|
||||
self._evp_pkey = evp_pkey
|
||||
|
||||
def public_key(self) -> Ed25519PublicKey:
|
||||
buf = self._backend._ffi.new("unsigned char []", _ED25519_KEY_SIZE)
|
||||
buflen = self._backend._ffi.new("size_t *", _ED25519_KEY_SIZE)
|
||||
res = self._backend._lib.EVP_PKEY_get_raw_public_key(
|
||||
self._evp_pkey, buf, buflen
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
self._backend.openssl_assert(buflen[0] == _ED25519_KEY_SIZE)
|
||||
public_bytes = self._backend._ffi.buffer(buf)[:]
|
||||
return self._backend.ed25519_load_public_bytes(public_bytes)
|
||||
|
||||
def sign(self, data: bytes) -> bytes:
|
||||
evp_md_ctx = self._backend._lib.EVP_MD_CTX_new()
|
||||
self._backend.openssl_assert(evp_md_ctx != self._backend._ffi.NULL)
|
||||
evp_md_ctx = self._backend._ffi.gc(
|
||||
evp_md_ctx, self._backend._lib.EVP_MD_CTX_free
|
||||
)
|
||||
res = self._backend._lib.EVP_DigestSignInit(
|
||||
evp_md_ctx,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
self._evp_pkey,
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
buf = self._backend._ffi.new("unsigned char[]", _ED25519_SIG_SIZE)
|
||||
buflen = self._backend._ffi.new("size_t *", len(buf))
|
||||
res = self._backend._lib.EVP_DigestSign(
|
||||
evp_md_ctx, buf, buflen, data, len(data)
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
self._backend.openssl_assert(buflen[0] == _ED25519_SIG_SIZE)
|
||||
return self._backend._ffi.buffer(buf, buflen[0])[:]
|
||||
|
||||
def private_bytes(
|
||||
self,
|
||||
encoding: serialization.Encoding,
|
||||
format: serialization.PrivateFormat,
|
||||
encryption_algorithm: serialization.KeySerializationEncryption,
|
||||
) -> bytes:
|
||||
if (
|
||||
encoding is serialization.Encoding.Raw
|
||||
or format is serialization.PublicFormat.Raw
|
||||
):
|
||||
if (
|
||||
format is not serialization.PrivateFormat.Raw
|
||||
or encoding is not serialization.Encoding.Raw
|
||||
or not isinstance(
|
||||
encryption_algorithm, serialization.NoEncryption
|
||||
)
|
||||
):
|
||||
raise ValueError(
|
||||
"When using Raw both encoding and format must be Raw "
|
||||
"and encryption_algorithm must be NoEncryption()"
|
||||
)
|
||||
|
||||
return self._raw_private_bytes()
|
||||
|
||||
return self._backend._private_key_bytes(
|
||||
encoding, format, encryption_algorithm, self, self._evp_pkey, None
|
||||
)
|
||||
|
||||
def _raw_private_bytes(self) -> bytes:
|
||||
buf = self._backend._ffi.new("unsigned char []", _ED25519_KEY_SIZE)
|
||||
buflen = self._backend._ffi.new("size_t *", _ED25519_KEY_SIZE)
|
||||
res = self._backend._lib.EVP_PKEY_get_raw_private_key(
|
||||
self._evp_pkey, buf, buflen
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
self._backend.openssl_assert(buflen[0] == _ED25519_KEY_SIZE)
|
||||
return self._backend._ffi.buffer(buf, _ED25519_KEY_SIZE)[:]
|
||||
@@ -0,0 +1,156 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
import typing
|
||||
|
||||
from cryptography import exceptions
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric.ed448 import (
|
||||
Ed448PrivateKey,
|
||||
Ed448PublicKey,
|
||||
)
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from cryptography.hazmat.backends.openssl.backend import Backend
|
||||
|
||||
_ED448_KEY_SIZE = 57
|
||||
_ED448_SIG_SIZE = 114
|
||||
|
||||
|
||||
class _Ed448PublicKey(Ed448PublicKey):
|
||||
def __init__(self, backend: "Backend", evp_pkey):
|
||||
self._backend = backend
|
||||
self._evp_pkey = evp_pkey
|
||||
|
||||
def public_bytes(
|
||||
self,
|
||||
encoding: serialization.Encoding,
|
||||
format: serialization.PublicFormat,
|
||||
) -> bytes:
|
||||
if (
|
||||
encoding is serialization.Encoding.Raw
|
||||
or format is serialization.PublicFormat.Raw
|
||||
):
|
||||
if (
|
||||
encoding is not serialization.Encoding.Raw
|
||||
or format is not serialization.PublicFormat.Raw
|
||||
):
|
||||
raise ValueError(
|
||||
"When using Raw both encoding and format must be Raw"
|
||||
)
|
||||
|
||||
return self._raw_public_bytes()
|
||||
|
||||
return self._backend._public_key_bytes(
|
||||
encoding, format, self, self._evp_pkey, None
|
||||
)
|
||||
|
||||
def _raw_public_bytes(self) -> bytes:
|
||||
buf = self._backend._ffi.new("unsigned char []", _ED448_KEY_SIZE)
|
||||
buflen = self._backend._ffi.new("size_t *", _ED448_KEY_SIZE)
|
||||
res = self._backend._lib.EVP_PKEY_get_raw_public_key(
|
||||
self._evp_pkey, buf, buflen
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
self._backend.openssl_assert(buflen[0] == _ED448_KEY_SIZE)
|
||||
return self._backend._ffi.buffer(buf, _ED448_KEY_SIZE)[:]
|
||||
|
||||
def verify(self, signature: bytes, data: bytes) -> None:
|
||||
evp_md_ctx = self._backend._lib.EVP_MD_CTX_new()
|
||||
self._backend.openssl_assert(evp_md_ctx != self._backend._ffi.NULL)
|
||||
evp_md_ctx = self._backend._ffi.gc(
|
||||
evp_md_ctx, self._backend._lib.EVP_MD_CTX_free
|
||||
)
|
||||
res = self._backend._lib.EVP_DigestVerifyInit(
|
||||
evp_md_ctx,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
self._evp_pkey,
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
res = self._backend._lib.EVP_DigestVerify(
|
||||
evp_md_ctx, signature, len(signature), data, len(data)
|
||||
)
|
||||
if res != 1:
|
||||
self._backend._consume_errors()
|
||||
raise exceptions.InvalidSignature
|
||||
|
||||
|
||||
class _Ed448PrivateKey(Ed448PrivateKey):
|
||||
def __init__(self, backend: "Backend", evp_pkey):
|
||||
self._backend = backend
|
||||
self._evp_pkey = evp_pkey
|
||||
|
||||
def public_key(self) -> Ed448PublicKey:
|
||||
buf = self._backend._ffi.new("unsigned char []", _ED448_KEY_SIZE)
|
||||
buflen = self._backend._ffi.new("size_t *", _ED448_KEY_SIZE)
|
||||
res = self._backend._lib.EVP_PKEY_get_raw_public_key(
|
||||
self._evp_pkey, buf, buflen
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
self._backend.openssl_assert(buflen[0] == _ED448_KEY_SIZE)
|
||||
public_bytes = self._backend._ffi.buffer(buf)[:]
|
||||
return self._backend.ed448_load_public_bytes(public_bytes)
|
||||
|
||||
def sign(self, data: bytes) -> bytes:
|
||||
evp_md_ctx = self._backend._lib.EVP_MD_CTX_new()
|
||||
self._backend.openssl_assert(evp_md_ctx != self._backend._ffi.NULL)
|
||||
evp_md_ctx = self._backend._ffi.gc(
|
||||
evp_md_ctx, self._backend._lib.EVP_MD_CTX_free
|
||||
)
|
||||
res = self._backend._lib.EVP_DigestSignInit(
|
||||
evp_md_ctx,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
self._evp_pkey,
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
buf = self._backend._ffi.new("unsigned char[]", _ED448_SIG_SIZE)
|
||||
buflen = self._backend._ffi.new("size_t *", len(buf))
|
||||
res = self._backend._lib.EVP_DigestSign(
|
||||
evp_md_ctx, buf, buflen, data, len(data)
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
self._backend.openssl_assert(buflen[0] == _ED448_SIG_SIZE)
|
||||
return self._backend._ffi.buffer(buf, buflen[0])[:]
|
||||
|
||||
def private_bytes(
|
||||
self,
|
||||
encoding: serialization.Encoding,
|
||||
format: serialization.PrivateFormat,
|
||||
encryption_algorithm: serialization.KeySerializationEncryption,
|
||||
) -> bytes:
|
||||
if (
|
||||
encoding is serialization.Encoding.Raw
|
||||
or format is serialization.PublicFormat.Raw
|
||||
):
|
||||
if (
|
||||
format is not serialization.PrivateFormat.Raw
|
||||
or encoding is not serialization.Encoding.Raw
|
||||
or not isinstance(
|
||||
encryption_algorithm, serialization.NoEncryption
|
||||
)
|
||||
):
|
||||
raise ValueError(
|
||||
"When using Raw both encoding and format must be Raw "
|
||||
"and encryption_algorithm must be NoEncryption()"
|
||||
)
|
||||
|
||||
return self._raw_private_bytes()
|
||||
|
||||
return self._backend._private_key_bytes(
|
||||
encoding, format, encryption_algorithm, self, self._evp_pkey, None
|
||||
)
|
||||
|
||||
def _raw_private_bytes(self) -> bytes:
|
||||
buf = self._backend._ffi.new("unsigned char []", _ED448_KEY_SIZE)
|
||||
buflen = self._backend._ffi.new("size_t *", _ED448_KEY_SIZE)
|
||||
res = self._backend._lib.EVP_PKEY_get_raw_private_key(
|
||||
self._evp_pkey, buf, buflen
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
self._backend.openssl_assert(buflen[0] == _ED448_KEY_SIZE)
|
||||
return self._backend._ffi.buffer(buf, _ED448_KEY_SIZE)[:]
|
||||
@@ -0,0 +1,18 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
|
||||
from cryptography import x509
|
||||
|
||||
|
||||
_CRLREASONFLAGS = {
|
||||
x509.ReasonFlags.key_compromise: 1,
|
||||
x509.ReasonFlags.ca_compromise: 2,
|
||||
x509.ReasonFlags.affiliation_changed: 3,
|
||||
x509.ReasonFlags.superseded: 4,
|
||||
x509.ReasonFlags.cessation_of_operation: 5,
|
||||
x509.ReasonFlags.certificate_hold: 6,
|
||||
x509.ReasonFlags.privilege_withdrawn: 7,
|
||||
x509.ReasonFlags.aa_compromise: 8,
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
import typing
|
||||
|
||||
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from cryptography.hazmat.backends.openssl.backend import Backend
|
||||
|
||||
|
||||
class _HashContext(hashes.HashContext):
|
||||
def __init__(
|
||||
self, backend: "Backend", algorithm: hashes.HashAlgorithm, ctx=None
|
||||
) -> None:
|
||||
self._algorithm = algorithm
|
||||
|
||||
self._backend = backend
|
||||
|
||||
if ctx is None:
|
||||
ctx = self._backend._lib.EVP_MD_CTX_new()
|
||||
ctx = self._backend._ffi.gc(
|
||||
ctx, self._backend._lib.EVP_MD_CTX_free
|
||||
)
|
||||
evp_md = self._backend._evp_md_from_algorithm(algorithm)
|
||||
if evp_md == self._backend._ffi.NULL:
|
||||
raise UnsupportedAlgorithm(
|
||||
"{} is not a supported hash on this backend.".format(
|
||||
algorithm.name
|
||||
),
|
||||
_Reasons.UNSUPPORTED_HASH,
|
||||
)
|
||||
res = self._backend._lib.EVP_DigestInit_ex(
|
||||
ctx, evp_md, self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
|
||||
self._ctx = ctx
|
||||
|
||||
@property
|
||||
def algorithm(self) -> hashes.HashAlgorithm:
|
||||
return self._algorithm
|
||||
|
||||
def copy(self) -> "_HashContext":
|
||||
copied_ctx = self._backend._lib.EVP_MD_CTX_new()
|
||||
copied_ctx = self._backend._ffi.gc(
|
||||
copied_ctx, self._backend._lib.EVP_MD_CTX_free
|
||||
)
|
||||
res = self._backend._lib.EVP_MD_CTX_copy_ex(copied_ctx, self._ctx)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
return _HashContext(self._backend, self.algorithm, ctx=copied_ctx)
|
||||
|
||||
def update(self, data: bytes) -> None:
|
||||
data_ptr = self._backend._ffi.from_buffer(data)
|
||||
res = self._backend._lib.EVP_DigestUpdate(
|
||||
self._ctx, data_ptr, len(data)
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
|
||||
def finalize(self) -> bytes:
|
||||
if isinstance(self.algorithm, hashes.ExtendableOutputFunction):
|
||||
# extendable output functions use a different finalize
|
||||
return self._finalize_xof()
|
||||
else:
|
||||
buf = self._backend._ffi.new(
|
||||
"unsigned char[]", self._backend._lib.EVP_MAX_MD_SIZE
|
||||
)
|
||||
outlen = self._backend._ffi.new("unsigned int *")
|
||||
res = self._backend._lib.EVP_DigestFinal_ex(self._ctx, buf, outlen)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
self._backend.openssl_assert(
|
||||
outlen[0] == self.algorithm.digest_size
|
||||
)
|
||||
return self._backend._ffi.buffer(buf)[: outlen[0]]
|
||||
|
||||
def _finalize_xof(self) -> bytes:
|
||||
buf = self._backend._ffi.new(
|
||||
"unsigned char[]", self.algorithm.digest_size
|
||||
)
|
||||
res = self._backend._lib.EVP_DigestFinalXOF(
|
||||
self._ctx, buf, self.algorithm.digest_size
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
return self._backend._ffi.buffer(buf)[: self.algorithm.digest_size]
|
||||
@@ -0,0 +1,85 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
import typing
|
||||
|
||||
from cryptography.exceptions import (
|
||||
InvalidSignature,
|
||||
UnsupportedAlgorithm,
|
||||
_Reasons,
|
||||
)
|
||||
from cryptography.hazmat.primitives import constant_time, hashes
|
||||
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from cryptography.hazmat.backends.openssl.backend import Backend
|
||||
|
||||
|
||||
class _HMACContext(hashes.HashContext):
|
||||
def __init__(
|
||||
self,
|
||||
backend: "Backend",
|
||||
key: bytes,
|
||||
algorithm: hashes.HashAlgorithm,
|
||||
ctx=None,
|
||||
):
|
||||
self._algorithm = algorithm
|
||||
self._backend = backend
|
||||
|
||||
if ctx is None:
|
||||
ctx = self._backend._lib.HMAC_CTX_new()
|
||||
self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
|
||||
ctx = self._backend._ffi.gc(ctx, self._backend._lib.HMAC_CTX_free)
|
||||
evp_md = self._backend._evp_md_from_algorithm(algorithm)
|
||||
if evp_md == self._backend._ffi.NULL:
|
||||
raise UnsupportedAlgorithm(
|
||||
"{} is not a supported hash on this backend".format(
|
||||
algorithm.name
|
||||
),
|
||||
_Reasons.UNSUPPORTED_HASH,
|
||||
)
|
||||
key_ptr = self._backend._ffi.from_buffer(key)
|
||||
res = self._backend._lib.HMAC_Init_ex(
|
||||
ctx, key_ptr, len(key), evp_md, self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
|
||||
self._ctx = ctx
|
||||
self._key = key
|
||||
|
||||
@property
|
||||
def algorithm(self) -> hashes.HashAlgorithm:
|
||||
return self._algorithm
|
||||
|
||||
def copy(self) -> "_HMACContext":
|
||||
copied_ctx = self._backend._lib.HMAC_CTX_new()
|
||||
self._backend.openssl_assert(copied_ctx != self._backend._ffi.NULL)
|
||||
copied_ctx = self._backend._ffi.gc(
|
||||
copied_ctx, self._backend._lib.HMAC_CTX_free
|
||||
)
|
||||
res = self._backend._lib.HMAC_CTX_copy(copied_ctx, self._ctx)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
return _HMACContext(
|
||||
self._backend, self._key, self.algorithm, ctx=copied_ctx
|
||||
)
|
||||
|
||||
def update(self, data: bytes) -> None:
|
||||
data_ptr = self._backend._ffi.from_buffer(data)
|
||||
res = self._backend._lib.HMAC_Update(self._ctx, data_ptr, len(data))
|
||||
self._backend.openssl_assert(res != 0)
|
||||
|
||||
def finalize(self) -> bytes:
|
||||
buf = self._backend._ffi.new(
|
||||
"unsigned char[]", self._backend._lib.EVP_MAX_MD_SIZE
|
||||
)
|
||||
outlen = self._backend._ffi.new("unsigned int *")
|
||||
res = self._backend._lib.HMAC_Final(self._ctx, buf, outlen)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
self._backend.openssl_assert(outlen[0] == self.algorithm.digest_size)
|
||||
return self._backend._ffi.buffer(buf)[: outlen[0]]
|
||||
|
||||
def verify(self, signature: bytes) -> None:
|
||||
digest = self.finalize()
|
||||
if not constant_time.bytes_eq(digest, signature):
|
||||
raise InvalidSignature("Signature did not match digest.")
|
||||
@@ -0,0 +1,68 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
import typing
|
||||
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from cryptography.hazmat.primitives import constant_time
|
||||
|
||||
|
||||
_POLY1305_TAG_SIZE = 16
|
||||
_POLY1305_KEY_SIZE = 32
|
||||
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from cryptography.hazmat.backends.openssl.backend import Backend
|
||||
|
||||
|
||||
class _Poly1305Context:
|
||||
def __init__(self, backend: "Backend", key: bytes) -> None:
|
||||
self._backend = backend
|
||||
|
||||
key_ptr = self._backend._ffi.from_buffer(key)
|
||||
# This function copies the key into OpenSSL-owned memory so we don't
|
||||
# need to retain it ourselves
|
||||
evp_pkey = self._backend._lib.EVP_PKEY_new_raw_private_key(
|
||||
self._backend._lib.NID_poly1305,
|
||||
self._backend._ffi.NULL,
|
||||
key_ptr,
|
||||
len(key),
|
||||
)
|
||||
self._backend.openssl_assert(evp_pkey != self._backend._ffi.NULL)
|
||||
self._evp_pkey = self._backend._ffi.gc(
|
||||
evp_pkey, self._backend._lib.EVP_PKEY_free
|
||||
)
|
||||
ctx = self._backend._lib.EVP_MD_CTX_new()
|
||||
self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
|
||||
self._ctx = self._backend._ffi.gc(
|
||||
ctx, self._backend._lib.EVP_MD_CTX_free
|
||||
)
|
||||
res = self._backend._lib.EVP_DigestSignInit(
|
||||
self._ctx,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
self._evp_pkey,
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
|
||||
def update(self, data: bytes) -> None:
|
||||
data_ptr = self._backend._ffi.from_buffer(data)
|
||||
res = self._backend._lib.EVP_DigestSignUpdate(
|
||||
self._ctx, data_ptr, len(data)
|
||||
)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
|
||||
def finalize(self) -> bytes:
|
||||
buf = self._backend._ffi.new("unsigned char[]", _POLY1305_TAG_SIZE)
|
||||
outlen = self._backend._ffi.new("size_t *", _POLY1305_TAG_SIZE)
|
||||
res = self._backend._lib.EVP_DigestSignFinal(self._ctx, buf, outlen)
|
||||
self._backend.openssl_assert(res != 0)
|
||||
self._backend.openssl_assert(outlen[0] == _POLY1305_TAG_SIZE)
|
||||
return self._backend._ffi.buffer(buf)[: outlen[0]]
|
||||
|
||||
def verify(self, tag: bytes) -> None:
|
||||
mac = self.finalize()
|
||||
if not constant_time.bytes_eq(mac, tag):
|
||||
raise InvalidSignature("Value did not match computed tag.")
|
||||
@@ -0,0 +1,567 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
|
||||
import typing
|
||||
|
||||
from cryptography.exceptions import (
|
||||
InvalidSignature,
|
||||
UnsupportedAlgorithm,
|
||||
_Reasons,
|
||||
)
|
||||
from cryptography.hazmat.backends.openssl.utils import (
|
||||
_calculate_digest_and_algorithm,
|
||||
)
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import (
|
||||
utils as asym_utils,
|
||||
)
|
||||
from cryptography.hazmat.primitives.asymmetric.padding import (
|
||||
AsymmetricPadding,
|
||||
MGF1,
|
||||
OAEP,
|
||||
PKCS1v15,
|
||||
PSS,
|
||||
_Auto,
|
||||
_DigestLength,
|
||||
_MaxLength,
|
||||
calculate_max_pss_salt_length,
|
||||
)
|
||||
from cryptography.hazmat.primitives.asymmetric.rsa import (
|
||||
RSAPrivateKey,
|
||||
RSAPrivateNumbers,
|
||||
RSAPublicKey,
|
||||
RSAPublicNumbers,
|
||||
)
|
||||
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from cryptography.hazmat.backends.openssl.backend import Backend
|
||||
|
||||
|
||||
def _get_rsa_pss_salt_length(
|
||||
backend: "Backend",
|
||||
pss: PSS,
|
||||
key: typing.Union[RSAPrivateKey, RSAPublicKey],
|
||||
hash_algorithm: hashes.HashAlgorithm,
|
||||
) -> int:
|
||||
salt = pss._salt_length
|
||||
|
||||
if isinstance(salt, _MaxLength):
|
||||
return calculate_max_pss_salt_length(key, hash_algorithm)
|
||||
elif isinstance(salt, _DigestLength):
|
||||
return hash_algorithm.digest_size
|
||||
elif isinstance(salt, _Auto):
|
||||
if isinstance(key, RSAPrivateKey):
|
||||
raise ValueError(
|
||||
"PSS salt length can only be set to AUTO when verifying"
|
||||
)
|
||||
return backend._lib.RSA_PSS_SALTLEN_AUTO
|
||||
else:
|
||||
return salt
|
||||
|
||||
|
||||
def _enc_dec_rsa(
|
||||
backend: "Backend",
|
||||
key: typing.Union["_RSAPrivateKey", "_RSAPublicKey"],
|
||||
data: bytes,
|
||||
padding: AsymmetricPadding,
|
||||
) -> bytes:
|
||||
if not isinstance(padding, AsymmetricPadding):
|
||||
raise TypeError("Padding must be an instance of AsymmetricPadding.")
|
||||
|
||||
if isinstance(padding, PKCS1v15):
|
||||
padding_enum = backend._lib.RSA_PKCS1_PADDING
|
||||
elif isinstance(padding, OAEP):
|
||||
padding_enum = backend._lib.RSA_PKCS1_OAEP_PADDING
|
||||
|
||||
if not isinstance(padding._mgf, MGF1):
|
||||
raise UnsupportedAlgorithm(
|
||||
"Only MGF1 is supported by this backend.",
|
||||
_Reasons.UNSUPPORTED_MGF,
|
||||
)
|
||||
|
||||
if not backend.rsa_padding_supported(padding):
|
||||
raise UnsupportedAlgorithm(
|
||||
"This combination of padding and hash algorithm is not "
|
||||
"supported by this backend.",
|
||||
_Reasons.UNSUPPORTED_PADDING,
|
||||
)
|
||||
|
||||
else:
|
||||
raise UnsupportedAlgorithm(
|
||||
"{} is not supported by this backend.".format(padding.name),
|
||||
_Reasons.UNSUPPORTED_PADDING,
|
||||
)
|
||||
|
||||
return _enc_dec_rsa_pkey_ctx(backend, key, data, padding_enum, padding)
|
||||
|
||||
|
||||
def _enc_dec_rsa_pkey_ctx(
|
||||
backend: "Backend",
|
||||
key: typing.Union["_RSAPrivateKey", "_RSAPublicKey"],
|
||||
data: bytes,
|
||||
padding_enum: int,
|
||||
padding: AsymmetricPadding,
|
||||
) -> bytes:
|
||||
init: typing.Callable[[typing.Any], int]
|
||||
crypt: typing.Callable[[typing.Any, typing.Any, int, bytes, int], int]
|
||||
if isinstance(key, _RSAPublicKey):
|
||||
init = backend._lib.EVP_PKEY_encrypt_init
|
||||
crypt = backend._lib.EVP_PKEY_encrypt
|
||||
else:
|
||||
init = backend._lib.EVP_PKEY_decrypt_init
|
||||
crypt = backend._lib.EVP_PKEY_decrypt
|
||||
|
||||
pkey_ctx = backend._lib.EVP_PKEY_CTX_new(key._evp_pkey, backend._ffi.NULL)
|
||||
backend.openssl_assert(pkey_ctx != backend._ffi.NULL)
|
||||
pkey_ctx = backend._ffi.gc(pkey_ctx, backend._lib.EVP_PKEY_CTX_free)
|
||||
res = init(pkey_ctx)
|
||||
backend.openssl_assert(res == 1)
|
||||
res = backend._lib.EVP_PKEY_CTX_set_rsa_padding(pkey_ctx, padding_enum)
|
||||
backend.openssl_assert(res > 0)
|
||||
buf_size = backend._lib.EVP_PKEY_size(key._evp_pkey)
|
||||
backend.openssl_assert(buf_size > 0)
|
||||
if isinstance(padding, OAEP):
|
||||
mgf1_md = backend._evp_md_non_null_from_algorithm(
|
||||
padding._mgf._algorithm
|
||||
)
|
||||
res = backend._lib.EVP_PKEY_CTX_set_rsa_mgf1_md(pkey_ctx, mgf1_md)
|
||||
backend.openssl_assert(res > 0)
|
||||
oaep_md = backend._evp_md_non_null_from_algorithm(padding._algorithm)
|
||||
res = backend._lib.EVP_PKEY_CTX_set_rsa_oaep_md(pkey_ctx, oaep_md)
|
||||
backend.openssl_assert(res > 0)
|
||||
|
||||
if (
|
||||
isinstance(padding, OAEP)
|
||||
and padding._label is not None
|
||||
and len(padding._label) > 0
|
||||
):
|
||||
# set0_rsa_oaep_label takes ownership of the char * so we need to
|
||||
# copy it into some new memory
|
||||
labelptr = backend._lib.OPENSSL_malloc(len(padding._label))
|
||||
backend.openssl_assert(labelptr != backend._ffi.NULL)
|
||||
backend._ffi.memmove(labelptr, padding._label, len(padding._label))
|
||||
res = backend._lib.EVP_PKEY_CTX_set0_rsa_oaep_label(
|
||||
pkey_ctx, labelptr, len(padding._label)
|
||||
)
|
||||
backend.openssl_assert(res == 1)
|
||||
|
||||
outlen = backend._ffi.new("size_t *", buf_size)
|
||||
buf = backend._ffi.new("unsigned char[]", buf_size)
|
||||
# Everything from this line onwards is written with the goal of being as
|
||||
# constant-time as is practical given the constraints of Python and our
|
||||
# API. See Bleichenbacher's '98 attack on RSA, and its many many variants.
|
||||
# As such, you should not attempt to change this (particularly to "clean it
|
||||
# up") without understanding why it was written this way (see
|
||||
# Chesterton's Fence), and without measuring to verify you have not
|
||||
# introduced observable time differences.
|
||||
res = crypt(pkey_ctx, buf, outlen, data, len(data))
|
||||
resbuf = backend._ffi.buffer(buf)[: outlen[0]]
|
||||
backend._lib.ERR_clear_error()
|
||||
if res <= 0:
|
||||
raise ValueError("Encryption/decryption failed.")
|
||||
return resbuf
|
||||
|
||||
|
||||
def _rsa_sig_determine_padding(
|
||||
backend: "Backend",
|
||||
key: typing.Union["_RSAPrivateKey", "_RSAPublicKey"],
|
||||
padding: AsymmetricPadding,
|
||||
algorithm: typing.Optional[hashes.HashAlgorithm],
|
||||
) -> int:
|
||||
if not isinstance(padding, AsymmetricPadding):
|
||||
raise TypeError("Expected provider of AsymmetricPadding.")
|
||||
|
||||
pkey_size = backend._lib.EVP_PKEY_size(key._evp_pkey)
|
||||
backend.openssl_assert(pkey_size > 0)
|
||||
|
||||
if isinstance(padding, PKCS1v15):
|
||||
# Hash algorithm is ignored for PKCS1v15-padding, may be None.
|
||||
padding_enum = backend._lib.RSA_PKCS1_PADDING
|
||||
elif isinstance(padding, PSS):
|
||||
if not isinstance(padding._mgf, MGF1):
|
||||
raise UnsupportedAlgorithm(
|
||||
"Only MGF1 is supported by this backend.",
|
||||
_Reasons.UNSUPPORTED_MGF,
|
||||
)
|
||||
|
||||
# PSS padding requires a hash algorithm
|
||||
if not isinstance(algorithm, hashes.HashAlgorithm):
|
||||
raise TypeError("Expected instance of hashes.HashAlgorithm.")
|
||||
|
||||
# Size of key in bytes - 2 is the maximum
|
||||
# PSS signature length (salt length is checked later)
|
||||
if pkey_size - algorithm.digest_size - 2 < 0:
|
||||
raise ValueError(
|
||||
"Digest too large for key size. Use a larger "
|
||||
"key or different digest."
|
||||
)
|
||||
|
||||
padding_enum = backend._lib.RSA_PKCS1_PSS_PADDING
|
||||
else:
|
||||
raise UnsupportedAlgorithm(
|
||||
"{} is not supported by this backend.".format(padding.name),
|
||||
_Reasons.UNSUPPORTED_PADDING,
|
||||
)
|
||||
|
||||
return padding_enum
|
||||
|
||||
|
||||
# Hash algorithm can be absent (None) to initialize the context without setting
|
||||
# any message digest algorithm. This is currently only valid for the PKCS1v15
|
||||
# padding type, where it means that the signature data is encoded/decoded
|
||||
# as provided, without being wrapped in a DigestInfo structure.
|
||||
def _rsa_sig_setup(
|
||||
backend: "Backend",
|
||||
padding: AsymmetricPadding,
|
||||
algorithm: typing.Optional[hashes.HashAlgorithm],
|
||||
key: typing.Union["_RSAPublicKey", "_RSAPrivateKey"],
|
||||
init_func: typing.Callable[[typing.Any], int],
|
||||
):
|
||||
padding_enum = _rsa_sig_determine_padding(backend, key, padding, algorithm)
|
||||
pkey_ctx = backend._lib.EVP_PKEY_CTX_new(key._evp_pkey, backend._ffi.NULL)
|
||||
backend.openssl_assert(pkey_ctx != backend._ffi.NULL)
|
||||
pkey_ctx = backend._ffi.gc(pkey_ctx, backend._lib.EVP_PKEY_CTX_free)
|
||||
res = init_func(pkey_ctx)
|
||||
if res != 1:
|
||||
errors = backend._consume_errors()
|
||||
raise ValueError("Unable to sign/verify with this key", errors)
|
||||
|
||||
if algorithm is not None:
|
||||
evp_md = backend._evp_md_non_null_from_algorithm(algorithm)
|
||||
res = backend._lib.EVP_PKEY_CTX_set_signature_md(pkey_ctx, evp_md)
|
||||
if res <= 0:
|
||||
backend._consume_errors()
|
||||
raise UnsupportedAlgorithm(
|
||||
"{} is not supported by this backend for RSA signing.".format(
|
||||
algorithm.name
|
||||
),
|
||||
_Reasons.UNSUPPORTED_HASH,
|
||||
)
|
||||
res = backend._lib.EVP_PKEY_CTX_set_rsa_padding(pkey_ctx, padding_enum)
|
||||
if res <= 0:
|
||||
backend._consume_errors()
|
||||
raise UnsupportedAlgorithm(
|
||||
"{} is not supported for the RSA signature operation.".format(
|
||||
padding.name
|
||||
),
|
||||
_Reasons.UNSUPPORTED_PADDING,
|
||||
)
|
||||
if isinstance(padding, PSS):
|
||||
assert isinstance(algorithm, hashes.HashAlgorithm)
|
||||
res = backend._lib.EVP_PKEY_CTX_set_rsa_pss_saltlen(
|
||||
pkey_ctx,
|
||||
_get_rsa_pss_salt_length(backend, padding, key, algorithm),
|
||||
)
|
||||
backend.openssl_assert(res > 0)
|
||||
|
||||
mgf1_md = backend._evp_md_non_null_from_algorithm(
|
||||
padding._mgf._algorithm
|
||||
)
|
||||
res = backend._lib.EVP_PKEY_CTX_set_rsa_mgf1_md(pkey_ctx, mgf1_md)
|
||||
backend.openssl_assert(res > 0)
|
||||
|
||||
return pkey_ctx
|
||||
|
||||
|
||||
def _rsa_sig_sign(
|
||||
backend: "Backend",
|
||||
padding: AsymmetricPadding,
|
||||
algorithm: hashes.HashAlgorithm,
|
||||
private_key: "_RSAPrivateKey",
|
||||
data: bytes,
|
||||
) -> bytes:
|
||||
pkey_ctx = _rsa_sig_setup(
|
||||
backend,
|
||||
padding,
|
||||
algorithm,
|
||||
private_key,
|
||||
backend._lib.EVP_PKEY_sign_init,
|
||||
)
|
||||
buflen = backend._ffi.new("size_t *")
|
||||
res = backend._lib.EVP_PKEY_sign(
|
||||
pkey_ctx, backend._ffi.NULL, buflen, data, len(data)
|
||||
)
|
||||
backend.openssl_assert(res == 1)
|
||||
buf = backend._ffi.new("unsigned char[]", buflen[0])
|
||||
res = backend._lib.EVP_PKEY_sign(pkey_ctx, buf, buflen, data, len(data))
|
||||
if res != 1:
|
||||
errors = backend._consume_errors_with_text()
|
||||
raise ValueError(
|
||||
"Digest or salt length too long for key size. Use a larger key "
|
||||
"or shorter salt length if you are specifying a PSS salt",
|
||||
errors,
|
||||
)
|
||||
|
||||
return backend._ffi.buffer(buf)[:]
|
||||
|
||||
|
||||
def _rsa_sig_verify(
|
||||
backend: "Backend",
|
||||
padding: AsymmetricPadding,
|
||||
algorithm: hashes.HashAlgorithm,
|
||||
public_key: "_RSAPublicKey",
|
||||
signature: bytes,
|
||||
data: bytes,
|
||||
) -> None:
|
||||
pkey_ctx = _rsa_sig_setup(
|
||||
backend,
|
||||
padding,
|
||||
algorithm,
|
||||
public_key,
|
||||
backend._lib.EVP_PKEY_verify_init,
|
||||
)
|
||||
res = backend._lib.EVP_PKEY_verify(
|
||||
pkey_ctx, signature, len(signature), data, len(data)
|
||||
)
|
||||
# The previous call can return negative numbers in the event of an
|
||||
# error. This is not a signature failure but we need to fail if it
|
||||
# occurs.
|
||||
backend.openssl_assert(res >= 0)
|
||||
if res == 0:
|
||||
backend._consume_errors()
|
||||
raise InvalidSignature
|
||||
|
||||
|
||||
def _rsa_sig_recover(
|
||||
backend: "Backend",
|
||||
padding: AsymmetricPadding,
|
||||
algorithm: typing.Optional[hashes.HashAlgorithm],
|
||||
public_key: "_RSAPublicKey",
|
||||
signature: bytes,
|
||||
) -> bytes:
|
||||
pkey_ctx = _rsa_sig_setup(
|
||||
backend,
|
||||
padding,
|
||||
algorithm,
|
||||
public_key,
|
||||
backend._lib.EVP_PKEY_verify_recover_init,
|
||||
)
|
||||
|
||||
# Attempt to keep the rest of the code in this function as constant/time
|
||||
# as possible. See the comment in _enc_dec_rsa_pkey_ctx. Note that the
|
||||
# buflen parameter is used even though its value may be undefined in the
|
||||
# error case. Due to the tolerant nature of Python slicing this does not
|
||||
# trigger any exceptions.
|
||||
maxlen = backend._lib.EVP_PKEY_size(public_key._evp_pkey)
|
||||
backend.openssl_assert(maxlen > 0)
|
||||
buf = backend._ffi.new("unsigned char[]", maxlen)
|
||||
buflen = backend._ffi.new("size_t *", maxlen)
|
||||
res = backend._lib.EVP_PKEY_verify_recover(
|
||||
pkey_ctx, buf, buflen, signature, len(signature)
|
||||
)
|
||||
resbuf = backend._ffi.buffer(buf)[: buflen[0]]
|
||||
backend._lib.ERR_clear_error()
|
||||
# Assume that all parameter errors are handled during the setup phase and
|
||||
# any error here is due to invalid signature.
|
||||
if res != 1:
|
||||
raise InvalidSignature
|
||||
return resbuf
|
||||
|
||||
|
||||
class _RSAPrivateKey(RSAPrivateKey):
|
||||
_evp_pkey: object
|
||||
_rsa_cdata: object
|
||||
_key_size: int
|
||||
|
||||
def __init__(
|
||||
self, backend: "Backend", rsa_cdata, evp_pkey, _skip_check_key: bool
|
||||
):
|
||||
res: int
|
||||
# RSA_check_key is slower in OpenSSL 3.0.0 due to improved
|
||||
# primality checking. In normal use this is unlikely to be a problem
|
||||
# since users don't load new keys constantly, but for TESTING we've
|
||||
# added an init arg that allows skipping the checks. You should not
|
||||
# use this in production code unless you understand the consequences.
|
||||
if not _skip_check_key:
|
||||
res = backend._lib.RSA_check_key(rsa_cdata)
|
||||
if res != 1:
|
||||
errors = backend._consume_errors_with_text()
|
||||
raise ValueError("Invalid private key", errors)
|
||||
# 2 is prime and passes an RSA key check, so we also check
|
||||
# if p and q are odd just to be safe.
|
||||
p = backend._ffi.new("BIGNUM **")
|
||||
q = backend._ffi.new("BIGNUM **")
|
||||
backend._lib.RSA_get0_factors(rsa_cdata, p, q)
|
||||
backend.openssl_assert(p[0] != backend._ffi.NULL)
|
||||
backend.openssl_assert(q[0] != backend._ffi.NULL)
|
||||
p_odd = backend._lib.BN_is_odd(p[0])
|
||||
q_odd = backend._lib.BN_is_odd(q[0])
|
||||
if p_odd != 1 or q_odd != 1:
|
||||
errors = backend._consume_errors_with_text()
|
||||
raise ValueError("Invalid private key", errors)
|
||||
|
||||
# Blinding is on by default in many versions of OpenSSL, but let's
|
||||
# just be conservative here.
|
||||
res = backend._lib.RSA_blinding_on(rsa_cdata, backend._ffi.NULL)
|
||||
backend.openssl_assert(res == 1)
|
||||
|
||||
self._backend = backend
|
||||
self._rsa_cdata = rsa_cdata
|
||||
self._evp_pkey = evp_pkey
|
||||
|
||||
n = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.RSA_get0_key(
|
||||
self._rsa_cdata,
|
||||
n,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
)
|
||||
self._backend.openssl_assert(n[0] != self._backend._ffi.NULL)
|
||||
self._key_size = self._backend._lib.BN_num_bits(n[0])
|
||||
|
||||
@property
|
||||
def key_size(self) -> int:
|
||||
return self._key_size
|
||||
|
||||
def decrypt(self, ciphertext: bytes, padding: AsymmetricPadding) -> bytes:
|
||||
key_size_bytes = (self.key_size + 7) // 8
|
||||
if key_size_bytes != len(ciphertext):
|
||||
raise ValueError("Ciphertext length must be equal to key size.")
|
||||
|
||||
return _enc_dec_rsa(self._backend, self, ciphertext, padding)
|
||||
|
||||
def public_key(self) -> RSAPublicKey:
|
||||
ctx = self._backend._lib.RSAPublicKey_dup(self._rsa_cdata)
|
||||
self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
|
||||
ctx = self._backend._ffi.gc(ctx, self._backend._lib.RSA_free)
|
||||
evp_pkey = self._backend._rsa_cdata_to_evp_pkey(ctx)
|
||||
return _RSAPublicKey(self._backend, ctx, evp_pkey)
|
||||
|
||||
def private_numbers(self) -> RSAPrivateNumbers:
|
||||
n = self._backend._ffi.new("BIGNUM **")
|
||||
e = self._backend._ffi.new("BIGNUM **")
|
||||
d = self._backend._ffi.new("BIGNUM **")
|
||||
p = self._backend._ffi.new("BIGNUM **")
|
||||
q = self._backend._ffi.new("BIGNUM **")
|
||||
dmp1 = self._backend._ffi.new("BIGNUM **")
|
||||
dmq1 = self._backend._ffi.new("BIGNUM **")
|
||||
iqmp = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.RSA_get0_key(self._rsa_cdata, n, e, d)
|
||||
self._backend.openssl_assert(n[0] != self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(e[0] != self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(d[0] != self._backend._ffi.NULL)
|
||||
self._backend._lib.RSA_get0_factors(self._rsa_cdata, p, q)
|
||||
self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(q[0] != self._backend._ffi.NULL)
|
||||
self._backend._lib.RSA_get0_crt_params(
|
||||
self._rsa_cdata, dmp1, dmq1, iqmp
|
||||
)
|
||||
self._backend.openssl_assert(dmp1[0] != self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(dmq1[0] != self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(iqmp[0] != self._backend._ffi.NULL)
|
||||
return RSAPrivateNumbers(
|
||||
p=self._backend._bn_to_int(p[0]),
|
||||
q=self._backend._bn_to_int(q[0]),
|
||||
d=self._backend._bn_to_int(d[0]),
|
||||
dmp1=self._backend._bn_to_int(dmp1[0]),
|
||||
dmq1=self._backend._bn_to_int(dmq1[0]),
|
||||
iqmp=self._backend._bn_to_int(iqmp[0]),
|
||||
public_numbers=RSAPublicNumbers(
|
||||
e=self._backend._bn_to_int(e[0]),
|
||||
n=self._backend._bn_to_int(n[0]),
|
||||
),
|
||||
)
|
||||
|
||||
def private_bytes(
|
||||
self,
|
||||
encoding: serialization.Encoding,
|
||||
format: serialization.PrivateFormat,
|
||||
encryption_algorithm: serialization.KeySerializationEncryption,
|
||||
) -> bytes:
|
||||
return self._backend._private_key_bytes(
|
||||
encoding,
|
||||
format,
|
||||
encryption_algorithm,
|
||||
self,
|
||||
self._evp_pkey,
|
||||
self._rsa_cdata,
|
||||
)
|
||||
|
||||
def sign(
|
||||
self,
|
||||
data: bytes,
|
||||
padding: AsymmetricPadding,
|
||||
algorithm: typing.Union[asym_utils.Prehashed, hashes.HashAlgorithm],
|
||||
) -> bytes:
|
||||
data, algorithm = _calculate_digest_and_algorithm(data, algorithm)
|
||||
return _rsa_sig_sign(self._backend, padding, algorithm, self, data)
|
||||
|
||||
|
||||
class _RSAPublicKey(RSAPublicKey):
|
||||
_evp_pkey: object
|
||||
_rsa_cdata: object
|
||||
_key_size: int
|
||||
|
||||
def __init__(self, backend: "Backend", rsa_cdata, evp_pkey):
|
||||
self._backend = backend
|
||||
self._rsa_cdata = rsa_cdata
|
||||
self._evp_pkey = evp_pkey
|
||||
|
||||
n = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.RSA_get0_key(
|
||||
self._rsa_cdata,
|
||||
n,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
)
|
||||
self._backend.openssl_assert(n[0] != self._backend._ffi.NULL)
|
||||
self._key_size = self._backend._lib.BN_num_bits(n[0])
|
||||
|
||||
@property
|
||||
def key_size(self) -> int:
|
||||
return self._key_size
|
||||
|
||||
def encrypt(self, plaintext: bytes, padding: AsymmetricPadding) -> bytes:
|
||||
return _enc_dec_rsa(self._backend, self, plaintext, padding)
|
||||
|
||||
def public_numbers(self) -> RSAPublicNumbers:
|
||||
n = self._backend._ffi.new("BIGNUM **")
|
||||
e = self._backend._ffi.new("BIGNUM **")
|
||||
self._backend._lib.RSA_get0_key(
|
||||
self._rsa_cdata, n, e, self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(n[0] != self._backend._ffi.NULL)
|
||||
self._backend.openssl_assert(e[0] != self._backend._ffi.NULL)
|
||||
return RSAPublicNumbers(
|
||||
e=self._backend._bn_to_int(e[0]),
|
||||
n=self._backend._bn_to_int(n[0]),
|
||||
)
|
||||
|
||||
def public_bytes(
|
||||
self,
|
||||
encoding: serialization.Encoding,
|
||||
format: serialization.PublicFormat,
|
||||
) -> bytes:
|
||||
return self._backend._public_key_bytes(
|
||||
encoding, format, self, self._evp_pkey, self._rsa_cdata
|
||||
)
|
||||
|
||||
def verify(
|
||||
self,
|
||||
signature: bytes,
|
||||
data: bytes,
|
||||
padding: AsymmetricPadding,
|
||||
algorithm: typing.Union[asym_utils.Prehashed, hashes.HashAlgorithm],
|
||||
) -> None:
|
||||
data, algorithm = _calculate_digest_and_algorithm(data, algorithm)
|
||||
_rsa_sig_verify(
|
||||
self._backend, padding, algorithm, self, signature, data
|
||||
)
|
||||
|
||||
def recover_data_from_signature(
|
||||
self,
|
||||
signature: bytes,
|
||||
padding: AsymmetricPadding,
|
||||
algorithm: typing.Optional[hashes.HashAlgorithm],
|
||||
) -> bytes:
|
||||
if isinstance(algorithm, asym_utils.Prehashed):
|
||||
raise TypeError(
|
||||
"Prehashed is only supported in the sign and verify methods. "
|
||||
"It cannot be used with recover_data_from_signature."
|
||||
)
|
||||
return _rsa_sig_recover(
|
||||
self._backend, padding, algorithm, self, signature
|
||||
)
|
||||
@@ -0,0 +1,52 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
import typing
|
||||
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from cryptography.hazmat.backends.openssl.backend import Backend
|
||||
|
||||
|
||||
def _evp_pkey_derive(backend: "Backend", evp_pkey, peer_public_key) -> bytes:
|
||||
ctx = backend._lib.EVP_PKEY_CTX_new(evp_pkey, backend._ffi.NULL)
|
||||
backend.openssl_assert(ctx != backend._ffi.NULL)
|
||||
ctx = backend._ffi.gc(ctx, backend._lib.EVP_PKEY_CTX_free)
|
||||
res = backend._lib.EVP_PKEY_derive_init(ctx)
|
||||
backend.openssl_assert(res == 1)
|
||||
res = backend._lib.EVP_PKEY_derive_set_peer(ctx, peer_public_key._evp_pkey)
|
||||
backend.openssl_assert(res == 1)
|
||||
keylen = backend._ffi.new("size_t *")
|
||||
res = backend._lib.EVP_PKEY_derive(ctx, backend._ffi.NULL, keylen)
|
||||
backend.openssl_assert(res == 1)
|
||||
backend.openssl_assert(keylen[0] > 0)
|
||||
buf = backend._ffi.new("unsigned char[]", keylen[0])
|
||||
res = backend._lib.EVP_PKEY_derive(ctx, buf, keylen)
|
||||
if res != 1:
|
||||
errors_with_text = backend._consume_errors_with_text()
|
||||
raise ValueError("Error computing shared key.", errors_with_text)
|
||||
|
||||
return backend._ffi.buffer(buf, keylen[0])[:]
|
||||
|
||||
|
||||
def _calculate_digest_and_algorithm(
|
||||
data: bytes,
|
||||
algorithm: typing.Union[Prehashed, hashes.HashAlgorithm],
|
||||
) -> typing.Tuple[bytes, hashes.HashAlgorithm]:
|
||||
if not isinstance(algorithm, Prehashed):
|
||||
hash_ctx = hashes.Hash(algorithm)
|
||||
hash_ctx.update(data)
|
||||
data = hash_ctx.finalize()
|
||||
else:
|
||||
algorithm = algorithm._algorithm
|
||||
|
||||
if len(data) != algorithm.digest_size:
|
||||
raise ValueError(
|
||||
"The provided data must be the same length as the hash "
|
||||
"algorithm's digest size."
|
||||
)
|
||||
|
||||
return (data, algorithm)
|
||||
@@ -0,0 +1,132 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
import typing
|
||||
|
||||
from cryptography.hazmat.backends.openssl.utils import _evp_pkey_derive
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric.x25519 import (
|
||||
X25519PrivateKey,
|
||||
X25519PublicKey,
|
||||
)
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from cryptography.hazmat.backends.openssl.backend import Backend
|
||||
|
||||
|
||||
_X25519_KEY_SIZE = 32
|
||||
|
||||
|
||||
class _X25519PublicKey(X25519PublicKey):
|
||||
def __init__(self, backend: "Backend", evp_pkey):
|
||||
self._backend = backend
|
||||
self._evp_pkey = evp_pkey
|
||||
|
||||
def public_bytes(
|
||||
self,
|
||||
encoding: serialization.Encoding,
|
||||
format: serialization.PublicFormat,
|
||||
) -> bytes:
|
||||
if (
|
||||
encoding is serialization.Encoding.Raw
|
||||
or format is serialization.PublicFormat.Raw
|
||||
):
|
||||
if (
|
||||
encoding is not serialization.Encoding.Raw
|
||||
or format is not serialization.PublicFormat.Raw
|
||||
):
|
||||
raise ValueError(
|
||||
"When using Raw both encoding and format must be Raw"
|
||||
)
|
||||
|
||||
return self._raw_public_bytes()
|
||||
|
||||
return self._backend._public_key_bytes(
|
||||
encoding, format, self, self._evp_pkey, None
|
||||
)
|
||||
|
||||
def _raw_public_bytes(self) -> bytes:
|
||||
ucharpp = self._backend._ffi.new("unsigned char **")
|
||||
res = self._backend._lib.EVP_PKEY_get1_tls_encodedpoint(
|
||||
self._evp_pkey, ucharpp
|
||||
)
|
||||
self._backend.openssl_assert(res == 32)
|
||||
self._backend.openssl_assert(ucharpp[0] != self._backend._ffi.NULL)
|
||||
data = self._backend._ffi.gc(
|
||||
ucharpp[0], self._backend._lib.OPENSSL_free
|
||||
)
|
||||
return self._backend._ffi.buffer(data, res)[:]
|
||||
|
||||
|
||||
class _X25519PrivateKey(X25519PrivateKey):
|
||||
def __init__(self, backend: "Backend", evp_pkey):
|
||||
self._backend = backend
|
||||
self._evp_pkey = evp_pkey
|
||||
|
||||
def public_key(self) -> X25519PublicKey:
|
||||
bio = self._backend._create_mem_bio_gc()
|
||||
res = self._backend._lib.i2d_PUBKEY_bio(bio, self._evp_pkey)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
evp_pkey = self._backend._lib.d2i_PUBKEY_bio(
|
||||
bio, self._backend._ffi.NULL
|
||||
)
|
||||
self._backend.openssl_assert(evp_pkey != self._backend._ffi.NULL)
|
||||
evp_pkey = self._backend._ffi.gc(
|
||||
evp_pkey, self._backend._lib.EVP_PKEY_free
|
||||
)
|
||||
return _X25519PublicKey(self._backend, evp_pkey)
|
||||
|
||||
def exchange(self, peer_public_key: X25519PublicKey) -> bytes:
|
||||
if not isinstance(peer_public_key, X25519PublicKey):
|
||||
raise TypeError("peer_public_key must be X25519PublicKey.")
|
||||
|
||||
return _evp_pkey_derive(self._backend, self._evp_pkey, peer_public_key)
|
||||
|
||||
def private_bytes(
|
||||
self,
|
||||
encoding: serialization.Encoding,
|
||||
format: serialization.PrivateFormat,
|
||||
encryption_algorithm: serialization.KeySerializationEncryption,
|
||||
) -> bytes:
|
||||
if (
|
||||
encoding is serialization.Encoding.Raw
|
||||
or format is serialization.PublicFormat.Raw
|
||||
):
|
||||
if (
|
||||
format is not serialization.PrivateFormat.Raw
|
||||
or encoding is not serialization.Encoding.Raw
|
||||
or not isinstance(
|
||||
encryption_algorithm, serialization.NoEncryption
|
||||
)
|
||||
):
|
||||
raise ValueError(
|
||||
"When using Raw both encoding and format must be Raw "
|
||||
"and encryption_algorithm must be NoEncryption()"
|
||||
)
|
||||
|
||||
return self._raw_private_bytes()
|
||||
|
||||
return self._backend._private_key_bytes(
|
||||
encoding, format, encryption_algorithm, self, self._evp_pkey, None
|
||||
)
|
||||
|
||||
def _raw_private_bytes(self) -> bytes:
|
||||
# When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can
|
||||
# switch this to EVP_PKEY_new_raw_private_key
|
||||
# The trick we use here is serializing to a PKCS8 key and just
|
||||
# using the last 32 bytes, which is the key itself.
|
||||
bio = self._backend._create_mem_bio_gc()
|
||||
res = self._backend._lib.i2d_PKCS8PrivateKey_bio(
|
||||
bio,
|
||||
self._evp_pkey,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
0,
|
||||
self._backend._ffi.NULL,
|
||||
self._backend._ffi.NULL,
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
pkcs8 = self._backend._read_mem_bio(bio)
|
||||
self._backend.openssl_assert(len(pkcs8) == 48)
|
||||
return pkcs8[-_X25519_KEY_SIZE:]
|
||||
@@ -0,0 +1,117 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
import typing
|
||||
|
||||
from cryptography.hazmat.backends.openssl.utils import _evp_pkey_derive
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric.x448 import (
|
||||
X448PrivateKey,
|
||||
X448PublicKey,
|
||||
)
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from cryptography.hazmat.backends.openssl.backend import Backend
|
||||
|
||||
_X448_KEY_SIZE = 56
|
||||
|
||||
|
||||
class _X448PublicKey(X448PublicKey):
|
||||
def __init__(self, backend: "Backend", evp_pkey):
|
||||
self._backend = backend
|
||||
self._evp_pkey = evp_pkey
|
||||
|
||||
def public_bytes(
|
||||
self,
|
||||
encoding: serialization.Encoding,
|
||||
format: serialization.PublicFormat,
|
||||
) -> bytes:
|
||||
if (
|
||||
encoding is serialization.Encoding.Raw
|
||||
or format is serialization.PublicFormat.Raw
|
||||
):
|
||||
if (
|
||||
encoding is not serialization.Encoding.Raw
|
||||
or format is not serialization.PublicFormat.Raw
|
||||
):
|
||||
raise ValueError(
|
||||
"When using Raw both encoding and format must be Raw"
|
||||
)
|
||||
|
||||
return self._raw_public_bytes()
|
||||
|
||||
return self._backend._public_key_bytes(
|
||||
encoding, format, self, self._evp_pkey, None
|
||||
)
|
||||
|
||||
def _raw_public_bytes(self) -> bytes:
|
||||
buf = self._backend._ffi.new("unsigned char []", _X448_KEY_SIZE)
|
||||
buflen = self._backend._ffi.new("size_t *", _X448_KEY_SIZE)
|
||||
res = self._backend._lib.EVP_PKEY_get_raw_public_key(
|
||||
self._evp_pkey, buf, buflen
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
self._backend.openssl_assert(buflen[0] == _X448_KEY_SIZE)
|
||||
return self._backend._ffi.buffer(buf, _X448_KEY_SIZE)[:]
|
||||
|
||||
|
||||
class _X448PrivateKey(X448PrivateKey):
|
||||
def __init__(self, backend: "Backend", evp_pkey):
|
||||
self._backend = backend
|
||||
self._evp_pkey = evp_pkey
|
||||
|
||||
def public_key(self) -> X448PublicKey:
|
||||
buf = self._backend._ffi.new("unsigned char []", _X448_KEY_SIZE)
|
||||
buflen = self._backend._ffi.new("size_t *", _X448_KEY_SIZE)
|
||||
res = self._backend._lib.EVP_PKEY_get_raw_public_key(
|
||||
self._evp_pkey, buf, buflen
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
self._backend.openssl_assert(buflen[0] == _X448_KEY_SIZE)
|
||||
public_bytes = self._backend._ffi.buffer(buf)[:]
|
||||
return self._backend.x448_load_public_bytes(public_bytes)
|
||||
|
||||
def exchange(self, peer_public_key: X448PublicKey) -> bytes:
|
||||
if not isinstance(peer_public_key, X448PublicKey):
|
||||
raise TypeError("peer_public_key must be X448PublicKey.")
|
||||
|
||||
return _evp_pkey_derive(self._backend, self._evp_pkey, peer_public_key)
|
||||
|
||||
def private_bytes(
|
||||
self,
|
||||
encoding: serialization.Encoding,
|
||||
format: serialization.PrivateFormat,
|
||||
encryption_algorithm: serialization.KeySerializationEncryption,
|
||||
) -> bytes:
|
||||
if (
|
||||
encoding is serialization.Encoding.Raw
|
||||
or format is serialization.PublicFormat.Raw
|
||||
):
|
||||
if (
|
||||
format is not serialization.PrivateFormat.Raw
|
||||
or encoding is not serialization.Encoding.Raw
|
||||
or not isinstance(
|
||||
encryption_algorithm, serialization.NoEncryption
|
||||
)
|
||||
):
|
||||
raise ValueError(
|
||||
"When using Raw both encoding and format must be Raw "
|
||||
"and encryption_algorithm must be NoEncryption()"
|
||||
)
|
||||
|
||||
return self._raw_private_bytes()
|
||||
|
||||
return self._backend._private_key_bytes(
|
||||
encoding, format, encryption_algorithm, self, self._evp_pkey, None
|
||||
)
|
||||
|
||||
def _raw_private_bytes(self) -> bytes:
|
||||
buf = self._backend._ffi.new("unsigned char []", _X448_KEY_SIZE)
|
||||
buflen = self._backend._ffi.new("size_t *", _X448_KEY_SIZE)
|
||||
res = self._backend._lib.EVP_PKEY_get_raw_private_key(
|
||||
self._evp_pkey, buf, buflen
|
||||
)
|
||||
self._backend.openssl_assert(res == 1)
|
||||
self._backend.openssl_assert(buflen[0] == _X448_KEY_SIZE)
|
||||
return self._backend._ffi.buffer(buf, _X448_KEY_SIZE)[:]
|
||||
@@ -0,0 +1,45 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
|
||||
|
||||
import warnings
|
||||
|
||||
from cryptography import utils, x509
|
||||
|
||||
|
||||
# This exists for pyOpenSSL compatibility and SHOULD NOT BE USED
|
||||
# WE WILL REMOVE THIS VERY SOON.
|
||||
def _Certificate(backend, x509) -> x509.Certificate: # noqa: N802
|
||||
warnings.warn(
|
||||
"This version of cryptography contains a temporary pyOpenSSL "
|
||||
"fallback path. Upgrade pyOpenSSL now.",
|
||||
utils.DeprecatedIn35,
|
||||
)
|
||||
return backend._ossl2cert(x509)
|
||||
|
||||
|
||||
# This exists for pyOpenSSL compatibility and SHOULD NOT BE USED
|
||||
# WE WILL REMOVE THIS VERY SOON.
|
||||
def _CertificateSigningRequest( # noqa: N802
|
||||
backend, x509_req
|
||||
) -> x509.CertificateSigningRequest:
|
||||
warnings.warn(
|
||||
"This version of cryptography contains a temporary pyOpenSSL "
|
||||
"fallback path. Upgrade pyOpenSSL now.",
|
||||
utils.DeprecatedIn35,
|
||||
)
|
||||
return backend._ossl2csr(x509_req)
|
||||
|
||||
|
||||
# This exists for pyOpenSSL compatibility and SHOULD NOT BE USED
|
||||
# WE WILL REMOVE THIS VERY SOON.
|
||||
def _CertificateRevocationList( # noqa: N802
|
||||
backend, x509_crl
|
||||
) -> x509.CertificateRevocationList:
|
||||
warnings.warn(
|
||||
"This version of cryptography contains a temporary pyOpenSSL "
|
||||
"fallback path. Upgrade pyOpenSSL now.",
|
||||
utils.DeprecatedIn35,
|
||||
)
|
||||
return backend._ossl2crl(x509_crl)
|
||||
Reference in New Issue
Block a user