-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathencryption_algorithms.py
More file actions
102 lines (74 loc) · 3.82 KB
/
encryption_algorithms.py
File metadata and controls
102 lines (74 loc) · 3.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import os
from abc import ABC, abstractmethod
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.hmac import HMAC
import tls
def hmac_sign(content, key, algorithm):
h = HMAC(key, algorithm, backend=default_backend())
h.update(content)
return h.finalize()
def hmac_verify(content, signature, key, algorithm):
h = HMAC(key, algorithm, backend=default_backend())
h.update(content)
h.verify(signature)
class EncryptionAlgorithm(ABC):
@abstractmethod
def encrypt(self, tls_version, key, iv, content, *, add=None, sign_key=None, hash_algorithm=None):
pass
@abstractmethod
def decrypt(self, tls_version, key, iv, encrypted, *, add=None, sign_key=None, hash_algorithm=None):
pass
class AES(EncryptionAlgorithm):
def encrypt(self, tls_version, key, iv, content, *, add=None, sign_key=None, hash_algorithm=None):
content_bytes_len = len(content).to_bytes(2, 'big')
add += content_bytes_len
signature = hmac_sign(add + content, sign_key, algorithm=hash_algorithm)
iv = os.urandom(len(iv)) # Use a random iv, len is always 16 for AES block ciphers
algorithm = algorithms.AES(key)
algorithm.block_size = len(iv) * 8
encryptor = Cipher(algorithm, modes.CBC(iv), backend=default_backend()).encryptor()
content = content + signature
if tls_version <= tls.TLSV1_2:
length = 16 - len(content) % 16
if 0 < length <= 16:
content += (length - 1).to_bytes(1, 'big') * length
c = encryptor.update(content) + encryptor.finalize()
return iv + c
else:
raise NotImplementedError('TLSV1.3 todo')
def decrypt(self, tls_version, key, iv, encrypted, *, add=None, sign_key=None, hash_algorithm=None):
algorithm = algorithms.AES(key)
algorithm.block_size = len(iv) * 8
iv, rest = encrypted[:len(iv)], encrypted[len(iv):]
decryptor = Cipher(algorithm, modes.CBC(iv), backend=default_backend()).decryptor()
unpadder = padding.PKCS7(algorithm.block_size).unpadder()
result = decryptor.update(rest) + decryptor.finalize()
result = unpadder.update(result) + unpadder.finalize()
result = result[:-1]
signature = result[-hash_algorithm.digest_size:]
content = result[:-hash_algorithm.digest_size]
content_bytes_len = len(content).to_bytes(2, 'big')
add += content_bytes_len
hmac_verify(add + content, signature, sign_key, hash_algorithm)
return content
class AESGCM(EncryptionAlgorithm):
nonce_size = 8
def encrypt(self, tls_version, key, iv, content, *, add=None, sign_key=None, hash_algorithm=None):
content_bytes_len = len(content).to_bytes(2, 'big')
nonce = os.urandom(AESGCM.nonce_size)
algorithm = algorithms.AES(key)
encryptor = Cipher(algorithm, modes.GCM(iv + nonce), backend=default_backend()).encryptor()
encryptor.authenticate_additional_data(add + content_bytes_len)
result = encryptor.update(content) + encryptor.finalize()
return nonce + result + encryptor.tag
def decrypt(self, tls_version, key, iv, encrypted, *, add=None, sign_key=None, hash_algorithm=None):
algorithm = algorithms.AES(key)
nonce, rest = encrypted[:AESGCM.nonce_size], encrypted[AESGCM.nonce_size:]
decryptor = Cipher(algorithm, modes.GCM(iv + nonce), backend=default_backend()).decryptor()
tag_size = 16
length = len(rest) - tag_size
decryptor.authenticate_additional_data(add + length.to_bytes(2, 'big'))
result = decryptor.update(rest[:-16]) + decryptor.finalize_with_tag(rest[-16:])
return result