mirror of
https://github.com/espressif/esp-idf.git
synced 2024-10-05 20:47:46 -04:00
9aefcb12f5
- Removed python 2 compatibility - Removed dependencies on redundant external modules - Interactive provisioning input for security scheme 2 - Style changes: Updated print statements to format strings Colored verbose logging Raised exceptions on errors instead of clean exits
166 lines
6.6 KiB
Python
166 lines
6.6 KiB
Python
# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
# APIs for interpreting and creating protobuf packets for
|
|
# protocomm endpoint with security type protocomm_security2
|
|
|
|
|
|
from typing import Any, Type
|
|
|
|
import proto
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
from utils import long_to_bytes, str_to_bytes
|
|
|
|
from .security import Security
|
|
from .srp6a import Srp6a, generate_salt_and_verifier
|
|
|
|
AES_KEY_LEN = 256 // 8
|
|
|
|
|
|
# Enum for state of protocomm_security1 FSM
|
|
class security_state:
|
|
REQUEST1 = 0
|
|
RESPONSE1_REQUEST2 = 1
|
|
RESPONSE2 = 2
|
|
FINISHED = 3
|
|
|
|
|
|
def sec2_gen_salt_verifier(username: str, password: str, salt_len: int) -> Any:
|
|
salt, verifier = generate_salt_and_verifier(username, password, len_s=salt_len)
|
|
|
|
salt_str = ', '.join([format(b, '#04x') for b in salt])
|
|
salt_c_arr = '\n '.join(salt_str[i: i + 96] for i in range(0, len(salt_str), 96))
|
|
print(f'static const char sec2_salt[] = {{\n {salt_c_arr}\n}};\n')
|
|
|
|
verifier_str = ', '.join([format(b, '#04x') for b in verifier])
|
|
verifier_c_arr = '\n '.join(verifier_str[i: i + 96] for i in range(0, len(verifier_str), 96))
|
|
print(f'static const char sec2_verifier[] = {{\n {verifier_c_arr}\n}};\n')
|
|
|
|
|
|
class Security2(Security):
|
|
def __init__(self, username: str, password: str, verbose: bool) -> None:
|
|
# Initialize state of the security2 FSM
|
|
self.session_state = security_state.REQUEST1
|
|
self.username = username
|
|
self.password = password
|
|
self.verbose = verbose
|
|
|
|
self.srp6a_ctx: Type[Srp6a]
|
|
self.cipher: Type[AESGCM]
|
|
|
|
self.client_pop_key = None
|
|
self.nonce = None
|
|
|
|
Security.__init__(self, self.security2_session)
|
|
|
|
def security2_session(self, response_data: bytes) -> Any:
|
|
# protocomm security2 FSM which interprets/forms
|
|
# protobuf packets according to present state of session
|
|
if (self.session_state == security_state.REQUEST1):
|
|
self.session_state = security_state.RESPONSE1_REQUEST2
|
|
return self.setup0_request()
|
|
|
|
if (self.session_state == security_state.RESPONSE1_REQUEST2):
|
|
self.session_state = security_state.RESPONSE2
|
|
self.setup0_response(response_data)
|
|
return self.setup1_request()
|
|
|
|
if (self.session_state == security_state.RESPONSE2):
|
|
self.session_state = security_state.FINISHED
|
|
self.setup1_response(response_data)
|
|
return None
|
|
|
|
print('---- Unexpected state! ----')
|
|
return None
|
|
|
|
def _print_verbose(self, data: str) -> None:
|
|
if (self.verbose):
|
|
print(f'\x1b[32;20m++++ {data} ++++\x1b[0m')
|
|
|
|
def setup0_request(self) -> Any:
|
|
# Form SessionCmd0 request packet using client public key
|
|
setup_req = proto.session_pb2.SessionData()
|
|
setup_req.sec_ver = proto.session_pb2.SecScheme2
|
|
setup_req.sec2.msg = proto.sec2_pb2.S2Session_Command0
|
|
|
|
setup_req.sec2.sc0.client_username = str_to_bytes(self.username)
|
|
self.srp6a_ctx = Srp6a(self.username, self.password)
|
|
if self.srp6a_ctx is None:
|
|
raise RuntimeError('Failed to initialize SRP6a instance!')
|
|
|
|
client_pubkey = long_to_bytes(self.srp6a_ctx.A)
|
|
setup_req.sec2.sc0.client_pubkey = client_pubkey
|
|
|
|
self._print_verbose(f'Client Public Key:\t0x{client_pubkey.hex()}')
|
|
return setup_req.SerializeToString().decode('latin-1')
|
|
|
|
def setup0_response(self, response_data: bytes) -> None:
|
|
# Interpret SessionResp0 response packet
|
|
setup_resp = proto.session_pb2.SessionData()
|
|
setup_resp.ParseFromString(str_to_bytes(response_data))
|
|
self._print_verbose(f'Security version:\t{str(setup_resp.sec_ver)}')
|
|
if setup_resp.sec_ver != proto.session_pb2.SecScheme2:
|
|
raise RuntimeError('Incorrect security scheme')
|
|
|
|
# Device public key, random salt and password verifier
|
|
device_pubkey = setup_resp.sec2.sr0.device_pubkey
|
|
device_salt = setup_resp.sec2.sr0.device_salt
|
|
|
|
self._print_verbose(f'Device Public Key:\t0x{device_pubkey.hex()}')
|
|
self.client_pop_key = self.srp6a_ctx.process_challenge(device_salt, device_pubkey)
|
|
|
|
def setup1_request(self) -> Any:
|
|
# Form SessionCmd1 request packet using encrypted device public key
|
|
setup_req = proto.session_pb2.SessionData()
|
|
setup_req.sec_ver = proto.session_pb2.SecScheme2
|
|
setup_req.sec2.msg = proto.sec2_pb2.S2Session_Command1
|
|
|
|
# Encrypt device public key and attach to the request packet
|
|
if self.client_pop_key is None:
|
|
raise RuntimeError('Failed to generate client proof!')
|
|
|
|
self._print_verbose(f'Client Proof:\t0x{self.client_pop_key.hex()}')
|
|
setup_req.sec2.sc1.client_proof = self.client_pop_key
|
|
|
|
return setup_req.SerializeToString().decode('latin-1')
|
|
|
|
def setup1_response(self, response_data: bytes) -> Any:
|
|
# Interpret SessionResp1 response packet
|
|
setup_resp = proto.session_pb2.SessionData()
|
|
setup_resp.ParseFromString(str_to_bytes(response_data))
|
|
# Ensure security scheme matches
|
|
if setup_resp.sec_ver == proto.session_pb2.SecScheme2:
|
|
# Read encrypyed device proof string
|
|
device_proof = setup_resp.sec2.sr1.device_proof
|
|
self._print_verbose(f'Device Proof:\t0x{device_proof.hex()}')
|
|
self.srp6a_ctx.verify_session(device_proof)
|
|
if not self.srp6a_ctx.authenticated():
|
|
raise RuntimeError('Failed to verify device proof')
|
|
else:
|
|
raise RuntimeError('Unsupported security protocol')
|
|
|
|
# Getting the shared secret
|
|
shared_secret = self.srp6a_ctx.get_session_key()
|
|
self._print_verbose(f'Shared Secret:\t0x{shared_secret.hex()}')
|
|
|
|
# Using the first 256 bits of a 512 bit key
|
|
session_key = shared_secret[:AES_KEY_LEN]
|
|
self._print_verbose(f'Session Key:\t0x{session_key.hex()}')
|
|
|
|
# 96-bit nonce
|
|
self.nonce = setup_resp.sec2.sr1.device_nonce
|
|
if self.nonce is None:
|
|
raise RuntimeError('Received invalid nonce from device!')
|
|
self._print_verbose(f'Nonce:\t0x{self.nonce.hex()}')
|
|
|
|
# Initialize the encryption engine with Shared Key and initialization vector
|
|
self.cipher = AESGCM(session_key)
|
|
if self.cipher is None:
|
|
raise RuntimeError('Failed to initialize AES-GCM cryptographic engine!')
|
|
|
|
def encrypt_data(self, data: bytes) -> Any:
|
|
return self.cipher.encrypt(self.nonce, data, None)
|
|
|
|
def decrypt_data(self, data: bytes) -> Any:
|
|
return self.cipher.decrypt(self.nonce, data, None)
|