-
-
Notifications
You must be signed in to change notification settings - Fork 2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add support for aes256-gcm and aes128-gcm
- Loading branch information
1 parent
d2e7c0c
commit 39871d2
Showing
2 changed files
with
161 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,7 +30,7 @@ | |
from hashlib import md5, sha1, sha256, sha512 | ||
|
||
from cryptography.hazmat.backends import default_backend | ||
from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes | ||
from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes, aead | ||
|
||
import paramiko | ||
from paramiko import util | ||
|
@@ -179,6 +179,8 @@ class Transport(threading.Thread, ClosingContextManager): | |
"aes192-cbc", | ||
"aes256-cbc", | ||
"3des-cbc", | ||
"[email protected]", | ||
"[email protected]", | ||
) | ||
_preferred_macs = ( | ||
"hmac-sha2-256", | ||
|
@@ -237,44 +239,66 @@ class Transport(threading.Thread, ClosingContextManager): | |
"class": algorithms.AES, | ||
"mode": modes.CTR, | ||
"block-size": 16, | ||
"iv-size": 16, | ||
"key-size": 16, | ||
}, | ||
"aes192-ctr": { | ||
"class": algorithms.AES, | ||
"mode": modes.CTR, | ||
"block-size": 16, | ||
"iv-size": 16, | ||
"key-size": 24, | ||
}, | ||
"aes256-ctr": { | ||
"class": algorithms.AES, | ||
"mode": modes.CTR, | ||
"block-size": 16, | ||
"iv-size": 16, | ||
"key-size": 32, | ||
}, | ||
"aes128-cbc": { | ||
"class": algorithms.AES, | ||
"mode": modes.CBC, | ||
"block-size": 16, | ||
"iv-size": 16, | ||
"key-size": 16, | ||
}, | ||
"aes192-cbc": { | ||
"class": algorithms.AES, | ||
"mode": modes.CBC, | ||
"block-size": 16, | ||
"iv-size": 16, | ||
"key-size": 24, | ||
}, | ||
"aes256-cbc": { | ||
"class": algorithms.AES, | ||
"mode": modes.CBC, | ||
"block-size": 16, | ||
"iv-size": 16, | ||
"key-size": 32, | ||
}, | ||
"3des-cbc": { | ||
"class": TripleDES, | ||
"mode": modes.CBC, | ||
"block-size": 8, | ||
"iv-size": 8, | ||
"key-size": 24, | ||
}, | ||
# aead cipher | ||
"[email protected]": { | ||
"class": aead.AESGCM, | ||
"block-size": 16, | ||
"iv-size": 12, | ||
"key-size": 16, | ||
"is_aead": True, | ||
}, | ||
"[email protected]": { | ||
"class": aead.AESGCM, | ||
"block-size": 16, | ||
"iv-size": 12, | ||
"key-size": 32, | ||
"is_aead": True, | ||
}, | ||
} | ||
|
||
_mac_info = { | ||
|
@@ -2039,6 +2063,10 @@ def _get_cipher(self, name, key, iv, operation): | |
else: | ||
return cipher.decryptor() | ||
|
||
def _get_aead_cipher(self, name, key): | ||
aead_cipher = self._cipher_info[name]["class"](key) | ||
return aead_cipher | ||
|
||
def _set_forward_agent_handler(self, handler): | ||
if handler is None: | ||
|
||
|
@@ -2707,18 +2735,32 @@ def _activate_inbound(self): | |
inbound traffic""" | ||
block_size = self._cipher_info[self.remote_cipher]["block-size"] | ||
if self.server_mode: | ||
IV_in = self._compute_key("A", block_size) | ||
IV_in = self._compute_key( | ||
"A", self._cipher_info[self.remote_cipher]["iv-size"] | ||
) | ||
key_in = self._compute_key( | ||
"C", self._cipher_info[self.remote_cipher]["key-size"] | ||
) | ||
else: | ||
IV_in = self._compute_key("B", block_size) | ||
IV_in = self._compute_key( | ||
"B", self._cipher_info[self.remote_cipher]["iv-size"] | ||
) | ||
key_in = self._compute_key( | ||
"D", self._cipher_info[self.remote_cipher]["key-size"] | ||
) | ||
engine = self._get_cipher( | ||
self.remote_cipher, key_in, IV_in, self._DECRYPT | ||
|
||
is_aead = ( | ||
True | ||
if self._cipher_info[self.remote_cipher].get("is_aead") | ||
else False | ||
) | ||
|
||
if is_aead: | ||
engine = self._get_aead_cipher(self.remote_cipher, key_in) | ||
else: | ||
engine = self._get_cipher( | ||
self.remote_cipher, key_in, IV_in, self._DECRYPT | ||
) | ||
etm = "[email protected]" in self.remote_mac | ||
mac_size = self._mac_info[self.remote_mac]["size"] | ||
mac_engine = self._mac_info[self.remote_mac]["class"] | ||
|
@@ -2728,9 +2770,24 @@ def _activate_inbound(self): | |
mac_key = self._compute_key("E", mac_engine().digest_size) | ||
else: | ||
mac_key = self._compute_key("F", mac_engine().digest_size) | ||
self.packetizer.set_inbound_cipher( | ||
engine, block_size, mac_engine, mac_size, mac_key, etm=etm | ||
) | ||
|
||
if is_aead: | ||
self._log(DEBUG, "use aead-cipher, so set mac to None") | ||
self.packetizer.set_inbound_cipher( | ||
engine, | ||
block_size, | ||
None, | ||
16, | ||
bytes(), | ||
etm=False, | ||
aead=is_aead, | ||
iv_in=IV_in, | ||
) | ||
else: | ||
self.packetizer.set_inbound_cipher( | ||
engine, block_size, mac_engine, mac_size, mac_key, etm=etm | ||
) | ||
|
||
compress_in = self._compression_info[self.remote_compression][1] | ||
if compress_in is not None and ( | ||
self.remote_compression != "[email protected]" or self.authenticated | ||
|
@@ -2760,18 +2817,32 @@ def _activate_outbound(self): | |
self.packetizer.reset_seqno_out() | ||
block_size = self._cipher_info[self.local_cipher]["block-size"] | ||
if self.server_mode: | ||
IV_out = self._compute_key("B", block_size) | ||
IV_out = self._compute_key( | ||
"B", self._cipher_info[self.local_cipher]["iv-size"] | ||
) | ||
key_out = self._compute_key( | ||
"D", self._cipher_info[self.local_cipher]["key-size"] | ||
) | ||
else: | ||
IV_out = self._compute_key("A", block_size) | ||
IV_out = self._compute_key( | ||
"A", self._cipher_info[self.local_cipher]["iv-size"] | ||
) | ||
key_out = self._compute_key( | ||
"C", self._cipher_info[self.local_cipher]["key-size"] | ||
) | ||
engine = self._get_cipher( | ||
self.local_cipher, key_out, IV_out, self._ENCRYPT | ||
|
||
is_aead = ( | ||
True | ||
if self._cipher_info[self.local_cipher].get("is_aead") | ||
else False | ||
) | ||
|
||
if is_aead: | ||
engine = self._get_aead_cipher(self.local_cipher, key_out) | ||
else: | ||
engine = self._get_cipher( | ||
self.local_cipher, key_out, IV_out, self._ENCRYPT | ||
) | ||
etm = "[email protected]" in self.local_mac | ||
mac_size = self._mac_info[self.local_mac]["size"] | ||
mac_engine = self._mac_info[self.local_mac]["class"] | ||
|
@@ -2782,9 +2853,30 @@ def _activate_outbound(self): | |
else: | ||
mac_key = self._compute_key("E", mac_engine().digest_size) | ||
sdctr = self.local_cipher.endswith("-ctr") | ||
self.packetizer.set_outbound_cipher( | ||
engine, block_size, mac_engine, mac_size, mac_key, sdctr, etm=etm | ||
) | ||
|
||
if is_aead: | ||
self.packetizer.set_outbound_cipher( | ||
engine, | ||
block_size, | ||
None, | ||
16, | ||
bytes(), | ||
sdctr, | ||
etm=False, | ||
aead=is_aead, | ||
iv_out=IV_out, | ||
) | ||
else: | ||
self.packetizer.set_outbound_cipher( | ||
engine, | ||
block_size, | ||
mac_engine, | ||
mac_size, | ||
mac_key, | ||
sdctr, | ||
etm=etm, | ||
) | ||
|
||
compress_out = self._compression_info[self.local_compression][0] | ||
if compress_out is not None and ( | ||
self.local_compression != "[email protected]" or self.authenticated | ||
|