mirror of
https://github.com/espressif/esp-idf.git
synced 2024-10-05 20:47:46 -04:00
55bbd9c95a
This commit adds one more config option under partition table menu to support large size image during ota This also updates the default config for simple ota example.
404 lines
19 KiB
Python
404 lines
19 KiB
Python
# SPDX-FileCopyrightText: 2022-2024 Espressif Systems (Shanghai) CO LTD
|
|
# SPDX-License-Identifier: Unlicense OR CC0-1.0
|
|
import http.server
|
|
import multiprocessing
|
|
import os
|
|
import ssl
|
|
import subprocess
|
|
import sys
|
|
from typing import Optional
|
|
from typing import Tuple
|
|
|
|
import pexpect
|
|
import pytest
|
|
from pytest_embedded import Dut
|
|
|
|
try:
|
|
from common_test_methods import get_env_config_variable, get_host_ip4_by_dest_ip
|
|
except ModuleNotFoundError:
|
|
idf_path = os.environ['IDF_PATH']
|
|
sys.path.insert(0, idf_path + '/tools/ci/python_packages')
|
|
from common_test_methods import get_env_config_variable, get_host_ip4_by_dest_ip
|
|
|
|
server_cert = '-----BEGIN CERTIFICATE-----\n' \
|
|
'MIIDWDCCAkACCQCbF4+gVh/MLjANBgkqhkiG9w0BAQsFADBuMQswCQYDVQQGEwJJ\n'\
|
|
'TjELMAkGA1UECAwCTUgxDDAKBgNVBAcMA1BVTjEMMAoGA1UECgwDRVNQMQwwCgYD\n'\
|
|
'VQQLDANFU1AxDDAKBgNVBAMMA0VTUDEaMBgGCSqGSIb3DQEJARYLZXNwQGVzcC5j\n'\
|
|
'b20wHhcNMjEwNzEyMTIzNjI3WhcNNDEwNzA3MTIzNjI3WjBuMQswCQYDVQQGEwJJ\n'\
|
|
'TjELMAkGA1UECAwCTUgxDDAKBgNVBAcMA1BVTjEMMAoGA1UECgwDRVNQMQwwCgYD\n'\
|
|
'VQQLDANFU1AxDDAKBgNVBAMMA0VTUDEaMBgGCSqGSIb3DQEJARYLZXNwQGVzcC5j\n'\
|
|
'b20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDhxF/y7bygndxPwiWL\n'\
|
|
'SwS9LY3uBMaJgup0ufNKVhx+FhGQOu44SghuJAaH3KkPUnt6SOM8jC97/yQuc32W\n'\
|
|
'ukI7eBZoA12kargSnzdv5m5rZZpd+NznSSpoDArOAONKVlzr25A1+aZbix2mKRbQ\n'\
|
|
'S5w9o1N2BriQuSzd8gL0Y0zEk3VkOWXEL+0yFUT144HnErnD+xnJtHe11yPO2fEz\n'\
|
|
'YaGiilh0ddL26PXTugXMZN/8fRVHP50P2OG0SvFpC7vghlLp4VFM1/r3UJnvL6Oz\n'\
|
|
'3ALc6dhxZEKQucqlpj8l1UegszQToopemtIj0qXTHw2+uUnkUyWIPjPC+wdOAoap\n'\
|
|
'rFTRAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAItw24y565k3C/zENZlxyzto44ud\n'\
|
|
'IYPQXN8Fa2pBlLe1zlSIyuaA/rWQ+i1daS8nPotkCbWZyf5N8DYaTE4B0OfvoUPk\n'\
|
|
'B5uGDmbuk6akvlB5BGiYLfQjWHRsK9/4xjtIqN1H58yf3QNROuKsPAeywWS3Fn32\n'\
|
|
'3//OpbWaClQePx6udRYMqAitKR+QxL7/BKZQsX+UyShuq8hjphvXvk0BW8ONzuw9\n'\
|
|
'RcoORxM0FzySYjeQvm4LhzC/P3ZBhEq0xs55aL2a76SJhq5hJy7T/Xz6NFByvlrN\n'\
|
|
'lFJJey33KFrAf5vnV9qcyWFIo7PYy2VsaaEjFeefr7q3sTFSMlJeadexW2Y=\n'\
|
|
'-----END CERTIFICATE-----\n'
|
|
|
|
server_key = '-----BEGIN PRIVATE KEY-----\n'\
|
|
'MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDhxF/y7bygndxP\n'\
|
|
'wiWLSwS9LY3uBMaJgup0ufNKVhx+FhGQOu44SghuJAaH3KkPUnt6SOM8jC97/yQu\n'\
|
|
'c32WukI7eBZoA12kargSnzdv5m5rZZpd+NznSSpoDArOAONKVlzr25A1+aZbix2m\n'\
|
|
'KRbQS5w9o1N2BriQuSzd8gL0Y0zEk3VkOWXEL+0yFUT144HnErnD+xnJtHe11yPO\n'\
|
|
'2fEzYaGiilh0ddL26PXTugXMZN/8fRVHP50P2OG0SvFpC7vghlLp4VFM1/r3UJnv\n'\
|
|
'L6Oz3ALc6dhxZEKQucqlpj8l1UegszQToopemtIj0qXTHw2+uUnkUyWIPjPC+wdO\n'\
|
|
'AoaprFTRAgMBAAECggEAE0HCxV/N1Q1h+1OeDDGL5+74yjKSFKyb/vTVcaPCrmaH\n'\
|
|
'fPvp0ddOvMZJ4FDMAsiQS6/n4gQ7EKKEnYmwTqj4eUYW8yxGUn3f0YbPHbZT+Mkj\n'\
|
|
'z5woi3nMKi/MxCGDQZX4Ow3xUQlITUqibsfWcFHis8c4mTqdh4qj7xJzehD2PVYF\n'\
|
|
'gNHZsvVj6MltjBDAVwV1IlGoHjuElm6vuzkfX7phxcA1B4ZqdYY17yCXUnvui46z\n'\
|
|
'Xn2kUTOOUCEgfgvGa9E+l4OtdXi5IxjaSraU+dlg2KsE4TpCuN2MEVkeR5Ms3Y7Q\n'\
|
|
'jgJl8vlNFJDQpbFukLcYwG7rO5N5dQ6WWfVia/5XgQKBgQD74at/bXAPrh9NxPmz\n'\
|
|
'i1oqCHMDoM9sz8xIMZLF9YVu3Jf8ux4xVpRSnNy5RU1gl7ZXbpdgeIQ4v04zy5aw\n'\
|
|
'8T4tu9K3XnR3UXOy25AK0q+cnnxZg3kFQm+PhtOCKEFjPHrgo2MUfnj+EDddod7N\n'\
|
|
'JQr9q5rEFbqHupFPpWlqCa3QmQKBgQDldWUGokNaEpmgHDMnHxiibXV5LQhzf8Rq\n'\
|
|
'gJIQXb7R9EsTSXEvsDyqTBb7PHp2Ko7rZ5YQfyf8OogGGjGElnPoU/a+Jij1gVFv\n'\
|
|
'kZ064uXAAISBkwHdcuobqc5EbG3ceyH46F+FBFhqM8KcbxJxx08objmh58+83InN\n'\
|
|
'P9Qr25Xw+QKBgEGXMHuMWgQbSZeM1aFFhoMvlBO7yogBTKb4Ecpu9wI5e3Kan3Al\n'\
|
|
'pZYltuyf+VhP6XG3IMBEYdoNJyYhu+nzyEdMg8CwXg+8LC7FMis/Ve+o7aS5scgG\n'\
|
|
'1to/N9DK/swCsdTRdzmc/ZDbVC+TuVsebFBGYZTyO5KgqLpezqaIQrTxAoGALFCU\n'\
|
|
'10glO9MVyl9H3clap5v+MQ3qcOv/EhaMnw6L2N6WVT481tnxjW4ujgzrFcE4YuxZ\n'\
|
|
'hgwYu9TOCmeqopGwBvGYWLbj+C4mfSahOAs0FfXDoYazuIIGBpuv03UhbpB1Si4O\n'\
|
|
'rJDfRnuCnVWyOTkl54gKJ2OusinhjztBjcrV1XkCgYEA3qNi4uBsPdyz9BZGb/3G\n'\
|
|
'rOMSw0CaT4pEMTLZqURmDP/0hxvTk1polP7O/FYwxVuJnBb6mzDa0xpLFPTpIAnJ\n'\
|
|
'YXB8xpXU69QVh+EBbemdJWOd+zp5UCfXvb2shAeG3Tn/Dz4cBBMEUutbzP+or0nG\n'\
|
|
'vSXnRLaxQhooWm+IuX9SuBQ=\n'\
|
|
'-----END PRIVATE KEY-----\n'
|
|
|
|
OTA1_ADDRESS = '0x20000'
|
|
OTA2_ADDRESS = '0x1d0000'
|
|
|
|
|
|
def start_https_server(ota_image_dir: str, server_ip: str, server_port: int, server_file: Optional[str] = None, key_file: Optional[str] = None) -> None:
|
|
os.chdir(ota_image_dir)
|
|
|
|
if server_file is None:
|
|
server_file = os.path.join(ota_image_dir, 'server_cert.pem')
|
|
cert_file_handle = open(server_file, 'w+')
|
|
cert_file_handle.write(server_cert)
|
|
cert_file_handle.close()
|
|
|
|
if key_file is None:
|
|
key_file = os.path.join(ota_image_dir, 'server_key.pem')
|
|
key_file_handle = open('server_key.pem', 'w+')
|
|
key_file_handle.write(server_key)
|
|
key_file_handle.close()
|
|
|
|
httpd = http.server.HTTPServer((server_ip, server_port), http.server.SimpleHTTPRequestHandler)
|
|
|
|
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
|
ssl_context.load_cert_chain(certfile=server_file, keyfile=key_file)
|
|
|
|
httpd.socket = ssl_context.wrap_socket(httpd.socket, server_side=True)
|
|
httpd.serve_forever()
|
|
|
|
|
|
def start_tls1_3_server(ota_image_dir: str, server_port: int) -> subprocess.Popen:
|
|
os.chdir(ota_image_dir)
|
|
server_file = os.path.join(ota_image_dir, 'server_cert.pem')
|
|
cert_file_handle = open(server_file, 'w+')
|
|
cert_file_handle.write(server_cert)
|
|
cert_file_handle.close()
|
|
|
|
key_file = os.path.join(ota_image_dir, 'server_key.pem')
|
|
key_file_handle = open('server_key.pem', 'w+')
|
|
key_file_handle.write(server_key)
|
|
key_file_handle.close()
|
|
|
|
chunked_server = subprocess.Popen(['openssl', 's_server', '-tls1_3', '-WWW', '-key', key_file, '-cert', server_file, '-port', str(server_port)])
|
|
return chunked_server
|
|
|
|
|
|
def check_sha256(sha256_expected: str, sha256_reported: str) -> None:
|
|
print('sha256_expected: %s' % (sha256_expected))
|
|
print('sha256_reported: %s' % (sha256_reported))
|
|
if sha256_expected not in sha256_reported:
|
|
raise ValueError('SHA256 mismatch')
|
|
else:
|
|
print('SHA256 expected and reported are the same')
|
|
|
|
|
|
def calc_all_sha256(dut: Dut) -> Tuple[str, str]:
|
|
bootloader_path = os.path.join(dut.app.binary_path, 'bootloader', 'bootloader.bin')
|
|
sha256_bootloader = dut.app.get_sha256(bootloader_path)
|
|
|
|
app_path = os.path.join(dut.app.binary_path, 'simple_ota.bin')
|
|
sha256_app = dut.app.get_sha256(app_path)
|
|
|
|
return str(sha256_bootloader), str(sha256_app)
|
|
|
|
|
|
@pytest.mark.esp32
|
|
@pytest.mark.esp32c3
|
|
@pytest.mark.esp32s3
|
|
@pytest.mark.wifi_high_traffic
|
|
def test_examples_protocol_simple_ota_example(dut: Dut) -> None:
|
|
"""
|
|
steps: |
|
|
1. join AP/Ethernet
|
|
2. Fetch OTA image over HTTPS
|
|
3. Reboot with the new OTA image
|
|
"""
|
|
sha256_bootloader, sha256_app = calc_all_sha256(dut)
|
|
# Start server
|
|
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', 8000))
|
|
thread1.daemon = True
|
|
thread1.start()
|
|
try:
|
|
# start test
|
|
dut.expect(f'Loaded app from partition at offset {OTA1_ADDRESS}', timeout=30)
|
|
check_sha256(sha256_bootloader, str(dut.expect(r'SHA-256 for bootloader:\s+([a-f0-9]){64}')[0]))
|
|
check_sha256(sha256_app, str(dut.expect(r'SHA-256 for current firmware:\s+([a-f0-9]){64}')[0]))
|
|
# Parse IP address of STA
|
|
if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
|
|
env_name = 'wifi_high_traffic'
|
|
dut.expect('Please input ssid password:')
|
|
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
|
ap_password = get_env_config_variable(env_name, 'ap_password')
|
|
dut.write(f'{ap_ssid} {ap_password}')
|
|
try:
|
|
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
|
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
|
|
except pexpect.exceptions.TIMEOUT:
|
|
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
|
host_ip = get_host_ip4_by_dest_ip(ip_address)
|
|
|
|
dut.expect('Starting OTA example task', timeout=30)
|
|
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
|
|
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
|
|
dut.expect('OTA Succeed, Rebooting...', timeout=60)
|
|
# after reboot
|
|
dut.expect(f'Loaded app from partition at offset {OTA2_ADDRESS}', timeout=30)
|
|
dut.expect('OTA example app_main start', timeout=10)
|
|
finally:
|
|
thread1.terminate()
|
|
|
|
|
|
@pytest.mark.esp32
|
|
@pytest.mark.ethernet_ota
|
|
@pytest.mark.parametrize('config', ['spiram',], indirect=True)
|
|
def test_examples_protocol_simple_ota_example_ethernet_with_spiram_config(dut: Dut) -> None:
|
|
"""
|
|
steps: |
|
|
1. join AP/Ethernet
|
|
2. Fetch OTA image over HTTPS
|
|
3. Reboot with the new OTA image
|
|
"""
|
|
# Start server
|
|
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', 8000))
|
|
thread1.daemon = True
|
|
thread1.start()
|
|
try:
|
|
# start test
|
|
dut.expect(f'Loaded app from partition at offset {OTA1_ADDRESS}', timeout=30)
|
|
try:
|
|
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
|
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
|
|
except pexpect.exceptions.TIMEOUT:
|
|
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
|
host_ip = get_host_ip4_by_dest_ip(ip_address)
|
|
|
|
dut.expect('Starting OTA example task', timeout=30)
|
|
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
|
|
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
|
|
dut.expect('OTA Succeed, Rebooting...', timeout=60)
|
|
# after reboot
|
|
dut.expect(f'Loaded app from partition at offset {OTA2_ADDRESS}', timeout=30)
|
|
dut.expect('OTA example app_main start', timeout=10)
|
|
finally:
|
|
thread1.terminate()
|
|
|
|
|
|
@pytest.mark.esp32
|
|
@pytest.mark.esp32c3
|
|
@pytest.mark.flash_encryption_wifi_high_traffic
|
|
@pytest.mark.nightly_run
|
|
@pytest.mark.parametrize('config', ['flash_enc_wifi',], indirect=True)
|
|
@pytest.mark.parametrize('skip_autoflash', ['y'], indirect=True)
|
|
def test_examples_protocol_simple_ota_example_with_flash_encryption_wifi(dut: Dut) -> None:
|
|
"""
|
|
steps: |
|
|
1. join AP/Ethernet
|
|
2. Fetch OTA image over HTTPS
|
|
3. Reboot with the new OTA image
|
|
"""
|
|
# start test
|
|
# Erase flash
|
|
dut.serial.erase_flash()
|
|
dut.serial.flash()
|
|
# Start server
|
|
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', 8000))
|
|
thread1.daemon = True
|
|
thread1.start()
|
|
try:
|
|
dut.expect(f'Loaded app from partition at offset {OTA1_ADDRESS}', timeout=30)
|
|
dut.expect('Flash encryption mode is DEVELOPMENT', timeout=10)
|
|
# Parse IP address of STA
|
|
if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
|
|
env_name = 'flash_encryption_wifi_high_traffic'
|
|
dut.expect('Please input ssid password:')
|
|
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
|
ap_password = get_env_config_variable(env_name, 'ap_password')
|
|
dut.write(f'{ap_ssid} {ap_password}')
|
|
try:
|
|
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
|
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
|
|
except pexpect.exceptions.TIMEOUT:
|
|
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
|
host_ip = get_host_ip4_by_dest_ip(ip_address)
|
|
|
|
dut.expect('Starting OTA example task', timeout=30)
|
|
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
|
|
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
|
|
dut.expect('OTA Succeed, Rebooting...', timeout=60)
|
|
# after reboot
|
|
dut.expect(f'Loaded app from partition at offset {OTA2_ADDRESS}', timeout=30)
|
|
dut.expect('Flash encryption mode is DEVELOPMENT', timeout=10)
|
|
dut.expect('OTA example app_main start', timeout=10)
|
|
finally:
|
|
thread1.terminate()
|
|
|
|
|
|
@pytest.mark.esp32
|
|
@pytest.mark.ethernet_ota
|
|
@pytest.mark.parametrize('config', ['on_update_no_sb_ecdsa',], indirect=True)
|
|
def test_examples_protocol_simple_ota_example_with_verify_app_signature_on_update_no_secure_boot_ecdsa(dut: Dut) -> None:
|
|
"""
|
|
steps: |
|
|
1. join AP/Ethernet
|
|
2. Fetch OTA image over HTTPS
|
|
3. Reboot with the new OTA image
|
|
"""
|
|
sha256_bootloader, sha256_app = calc_all_sha256(dut)
|
|
# Start server
|
|
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', 8000))
|
|
thread1.daemon = True
|
|
thread1.start()
|
|
try:
|
|
# start test
|
|
dut.expect(f'Loaded app from partition at offset {OTA1_ADDRESS}', timeout=30)
|
|
check_sha256(sha256_bootloader, str(dut.expect(r'SHA-256 for bootloader:\s+([a-f0-9]){64}')[0]))
|
|
check_sha256(sha256_app, str(dut.expect(r'SHA-256 for current firmware:\s+([a-f0-9]){64}')[0]))
|
|
try:
|
|
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
|
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
|
|
except pexpect.exceptions.TIMEOUT:
|
|
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
|
host_ip = get_host_ip4_by_dest_ip(ip_address)
|
|
|
|
dut.expect('Starting OTA example task', timeout=30)
|
|
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
|
|
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
|
|
dut.expect(f'Writing to partition subtype 17 at offset {OTA2_ADDRESS}', timeout=20)
|
|
dut.expect('Verifying image signature...', timeout=60)
|
|
dut.expect('OTA Succeed, Rebooting...', timeout=60)
|
|
# after reboot
|
|
dut.expect(f'Loaded app from partition at offset {OTA2_ADDRESS}', timeout=20)
|
|
dut.expect('OTA example app_main start', timeout=10)
|
|
finally:
|
|
thread1.terminate()
|
|
|
|
|
|
@pytest.mark.esp32
|
|
@pytest.mark.ethernet_ota
|
|
@pytest.mark.parametrize('config', ['on_update_no_sb_rsa',], indirect=True)
|
|
def test_examples_protocol_simple_ota_example_with_verify_app_signature_on_update_no_secure_boot_rsa(dut: Dut) -> None:
|
|
"""
|
|
steps: |
|
|
1. join AP/Ethernet
|
|
2. Fetch OTA image over HTTPS
|
|
3. Reboot with the new OTA image
|
|
"""
|
|
sha256_bootloader, sha256_app = calc_all_sha256(dut)
|
|
# Start server
|
|
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', 8000))
|
|
thread1.daemon = True
|
|
thread1.start()
|
|
try:
|
|
# start test
|
|
dut.expect(f'Loaded app from partition at offset {OTA1_ADDRESS}', timeout=30)
|
|
check_sha256(sha256_bootloader, str(dut.expect(r'SHA-256 for bootloader:\s+([a-f0-9]){64}')[0]))
|
|
check_sha256(sha256_app, str(dut.expect(r'SHA-256 for current firmware:\s+([a-f0-9]){64}')[0]))
|
|
try:
|
|
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
|
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
|
|
except pexpect.exceptions.TIMEOUT:
|
|
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
|
host_ip = get_host_ip4_by_dest_ip(ip_address)
|
|
|
|
dut.expect('Starting OTA example task', timeout=30)
|
|
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
|
|
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
|
|
dut.expect(f'Writing to partition subtype 17 at offset {OTA2_ADDRESS}', timeout=20)
|
|
dut.expect('Verifying image signature...', timeout=60)
|
|
dut.expect('#0 app key digest == #0 trusted key digest', timeout=10)
|
|
dut.expect('Verifying with RSA-PSS...', timeout=10)
|
|
dut.expect('Signature verified successfully!', timeout=10)
|
|
dut.expect('OTA Succeed, Rebooting...', timeout=60)
|
|
# after reboot
|
|
dut.expect(f'Loaded app from partition at offset {OTA2_ADDRESS}', timeout=20)
|
|
dut.expect('OTA example app_main start', timeout=10)
|
|
finally:
|
|
thread1.terminate()
|
|
|
|
|
|
@pytest.mark.esp32
|
|
@pytest.mark.ethernet_ota
|
|
@pytest.mark.parametrize('config', ['tls1_3',], indirect=True)
|
|
def test_examples_protocol_simple_ota_example_tls1_3(dut: Dut) -> None:
|
|
"""
|
|
steps: |
|
|
1. join AP/Ethernet
|
|
2. Fetch OTA image over HTTPS
|
|
3. Reboot with the new OTA image
|
|
"""
|
|
sha256_bootloader, sha256_app = calc_all_sha256(dut)
|
|
# Start server
|
|
tls1_3_server = start_tls1_3_server(dut.app.binary_path, 8000)
|
|
try:
|
|
# start test
|
|
dut.expect(f'Loaded app from partition at offset {OTA1_ADDRESS}', timeout=30)
|
|
check_sha256(sha256_bootloader, str(dut.expect(r'SHA-256 for bootloader:\s+([a-f0-9]){64}')[0]))
|
|
check_sha256(sha256_app, str(dut.expect(r'SHA-256 for current firmware:\s+([a-f0-9]){64}')[0]))
|
|
# Parse IP address of STA
|
|
if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
|
|
env_name = 'wifi_high_traffic'
|
|
dut.expect('Please input ssid password:')
|
|
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
|
ap_password = get_env_config_variable(env_name, 'ap_password')
|
|
dut.write(f'{ap_ssid} {ap_password}')
|
|
try:
|
|
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
|
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
|
|
except pexpect.exceptions.TIMEOUT:
|
|
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
|
host_ip = get_host_ip4_by_dest_ip(ip_address)
|
|
|
|
dut.expect('Starting OTA example task', timeout=30)
|
|
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
|
|
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
|
|
dut.expect('OTA Succeed, Rebooting...', timeout=120)
|
|
# after reboot
|
|
dut.expect(f'Loaded app from partition at offset {OTA2_ADDRESS}', timeout=30)
|
|
dut.expect('OTA example app_main start', timeout=10)
|
|
finally:
|
|
tls1_3_server.kill()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if sys.argv[2:]: # if two or more arguments provided:
|
|
# Usage: pytest_simple_ota.py <image_dir> <server_port> [cert_di>]
|
|
this_dir = os.path.dirname(os.path.realpath(__file__))
|
|
bin_dir = os.path.join(this_dir, sys.argv[1])
|
|
port = int(sys.argv[2])
|
|
cert_dir = bin_dir if not sys.argv[3:] else os.path.join(this_dir, sys.argv[3]) # optional argument
|
|
print('Starting HTTPS server at "https://:{}"'.format(port))
|
|
start_https_server(bin_dir, '', port,
|
|
server_file=os.path.join(cert_dir, 'ca_cert.pem'),
|
|
key_file=os.path.join(cert_dir, 'ca_key.pem'))
|