refactor(examples): Refactoring and clean up OTA tests

This commit is contained in:
Konstantin Kondrashov 2024-09-24 19:37:46 +03:00
parent 68e6939163
commit d11b958209
5 changed files with 59 additions and 96 deletions

View File

@ -7,5 +7,3 @@ factory, 0, 0, , 0xB0000
ota_0, 0, ota_0, , 0xB0000
ota_1, 0, ota_1, , 0xB0000
test, 0, test, , 0xB0000
# flash_test partition used for SPI flash tests, WL FAT tests, and SPIFFS tests
flash_test, data, fat, , 528K

1 # Special partition table for unit test app_update
7 ota_0, 0, ota_0, , 0xB0000
8 ota_1, 0, ota_1, , 0xB0000
9 test, 0, test, , 0xB0000
# flash_test partition used for SPI flash tests, WL FAT tests, and SPIFFS tests
flash_test, data, fat, , 528K

View File

@ -7,5 +7,3 @@ factory, 0, 0, , 0x70000
ota_0, 0, ota_0, , 0x70000
ota_1, 0, ota_1, , 0x70000
test, 0, test, , 0x70000
# flash_test partition used for SPI flash tests, WL FAT tests, and SPIFFS tests
flash_test, data, fat, , 128K

1 # Special partition table for unit test app_update
7 ota_0, 0, ota_0, , 0x70000
8 ota_1, 0, ota_1, , 0x70000
9 test, 0, test, , 0x70000
# flash_test partition used for SPI flash tests, WL FAT tests, and SPIFFS tests
flash_test, data, fat, , 128K

View File

@ -376,8 +376,8 @@ esp_err_t esp_https_ota_begin(const esp_https_ota_config_t *ota_config, esp_http
err = ESP_FAIL;
goto http_cleanup;
}
ESP_LOGI(TAG, "Writing to partition subtype %d at offset 0x%" PRIx32,
https_ota_handle->update_partition->subtype, https_ota_handle->update_partition->address);
ESP_LOGI(TAG, "Writing to <%s> partition at offset 0x%" PRIx32,
https_ota_handle->update_partition->label, https_ota_handle->update_partition->address);
const int alloc_size = MAX(ota_config->http_config->buffer_size, DEFAULT_OTA_BUF_SIZE);
if (ota_config->buffer_caps != 0) {

View File

@ -6,6 +6,7 @@ import os
import ssl
import subprocess
import sys
from typing import Any
from typing import Optional
from typing import Tuple
@ -70,8 +71,8 @@ server_key = '-----BEGIN PRIVATE KEY-----\n'\
'vSXnRLaxQhooWm+IuX9SuBQ=\n'\
'-----END PRIVATE KEY-----\n'
OTA1_ADDRESS = '0x20000'
OTA2_ADDRESS = '0x1d0000'
OTA_0_ADDRESS = '0x20000'
OTA_1_ADDRESS = '0x1d0000'
def start_https_server(ota_image_dir: str, server_ip: str, server_port: int, server_file: Optional[str] = None, key_file: Optional[str] = None) -> None:
@ -133,6 +134,20 @@ def calc_all_sha256(dut: Dut) -> Tuple[str, str]:
return str(sha256_bootloader), str(sha256_app)
def setting_connection(dut: Dut, env_name: Optional[str] = None) -> Any:
if env_name is not None and dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
dut.expect('Please input ssid password:')
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
ap_password = get_env_config_variable(env_name, 'ap_password')
dut.write(f'{ap_ssid} {ap_password}')
try:
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
print(f'Connected to AP/Ethernet with IP: {ip_address}')
except pexpect.exceptions.TIMEOUT:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
return get_host_ip4_by_dest_ip(ip_address)
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s3
@ -151,29 +166,18 @@ def test_examples_protocol_simple_ota_example(dut: Dut) -> None:
thread1.start()
try:
# start test
dut.expect(f'Loaded app from partition at offset {OTA1_ADDRESS}', timeout=30)
dut.expect(f'Loaded app from partition at offset {OTA_0_ADDRESS}', timeout=30)
check_sha256(sha256_bootloader, str(dut.expect(r'SHA-256 for bootloader:\s+([a-f0-9]){64}')[0]))
check_sha256(sha256_app, str(dut.expect(r'SHA-256 for current firmware:\s+([a-f0-9]){64}')[0]))
# Parse IP address of STA
if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
env_name = 'wifi_high_traffic'
dut.expect('Please input ssid password:')
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
ap_password = get_env_config_variable(env_name, 'ap_password')
dut.write(f'{ap_ssid} {ap_password}')
try:
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
except pexpect.exceptions.TIMEOUT:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
host_ip = get_host_ip4_by_dest_ip(ip_address)
env_name = 'wifi_high_traffic' if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True else None
host_ip = setting_connection(dut, env_name)
dut.expect('Starting OTA example task', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
print(f'writing to device: https://{host_ip}:8000/simple_ota.bin')
dut.write(f'https://{host_ip}:8000/simple_ota.bin')
dut.expect('OTA Succeed, Rebooting...', timeout=60)
# after reboot
dut.expect(f'Loaded app from partition at offset {OTA2_ADDRESS}', timeout=30)
dut.expect(f'Loaded app from partition at offset {OTA_1_ADDRESS}', timeout=30)
dut.expect('OTA example app_main start', timeout=10)
finally:
thread1.terminate()
@ -195,20 +199,14 @@ def test_examples_protocol_simple_ota_example_ethernet_with_spiram_config(dut: D
thread1.start()
try:
# start test
dut.expect(f'Loaded app from partition at offset {OTA1_ADDRESS}', timeout=30)
try:
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
except pexpect.exceptions.TIMEOUT:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
host_ip = get_host_ip4_by_dest_ip(ip_address)
dut.expect(f'Loaded app from partition at offset {OTA_0_ADDRESS}', timeout=30)
host_ip = setting_connection(dut)
dut.expect('Starting OTA example task', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
print(f'writing to device: https://{host_ip}:8000/simple_ota.bin')
dut.write(f'https://{host_ip}:8000/simple_ota.bin')
dut.expect('OTA Succeed, Rebooting...', timeout=60)
# after reboot
dut.expect(f'Loaded app from partition at offset {OTA2_ADDRESS}', timeout=30)
dut.expect(f'Loaded app from partition at offset {OTA_1_ADDRESS}', timeout=30)
dut.expect('OTA example app_main start', timeout=10)
finally:
thread1.terminate()
@ -236,28 +234,18 @@ def test_examples_protocol_simple_ota_example_with_flash_encryption_wifi(dut: Du
thread1.daemon = True
thread1.start()
try:
dut.expect(f'Loaded app from partition at offset {OTA1_ADDRESS}', timeout=30)
dut.expect(f'Loaded app from partition at offset {OTA_0_ADDRESS}', timeout=30)
dut.expect('Flash encryption mode is DEVELOPMENT', timeout=10)
# Parse IP address of STA
if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
env_name = 'flash_encryption_wifi_high_traffic'
dut.expect('Please input ssid password:')
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
ap_password = get_env_config_variable(env_name, 'ap_password')
dut.write(f'{ap_ssid} {ap_password}')
try:
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
except pexpect.exceptions.TIMEOUT:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
host_ip = get_host_ip4_by_dest_ip(ip_address)
env_name = 'flash_encryption_wifi_high_traffic' if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True else None
host_ip = setting_connection(dut, env_name)
dut.expect('Starting OTA example task', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
print(f'writing to device: https://{host_ip}:8000/simple_ota.bin')
dut.write(f'https://{host_ip}:8000/simple_ota.bin')
dut.expect('OTA Succeed, Rebooting...', timeout=60)
# after reboot
dut.expect(f'Loaded app from partition at offset {OTA2_ADDRESS}', timeout=30)
dut.expect(f'Loaded app from partition at offset {OTA_1_ADDRESS}', timeout=30)
dut.expect('Flash encryption mode is DEVELOPMENT', timeout=10)
dut.expect('OTA example app_main start', timeout=10)
finally:
@ -281,24 +269,20 @@ def test_examples_protocol_simple_ota_example_with_verify_app_signature_on_updat
thread1.start()
try:
# start test
dut.expect(f'Loaded app from partition at offset {OTA1_ADDRESS}', timeout=30)
dut.expect(f'Loaded app from partition at offset {OTA_0_ADDRESS}', timeout=30)
check_sha256(sha256_bootloader, str(dut.expect(r'SHA-256 for bootloader:\s+([a-f0-9]){64}')[0]))
check_sha256(sha256_app, str(dut.expect(r'SHA-256 for current firmware:\s+([a-f0-9]){64}')[0]))
try:
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
except pexpect.exceptions.TIMEOUT:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
host_ip = get_host_ip4_by_dest_ip(ip_address)
host_ip = setting_connection(dut)
dut.expect('Starting OTA example task', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
dut.expect(f'Writing to partition subtype 17 at offset {OTA2_ADDRESS}', timeout=20)
print(f'writing to device: https://{host_ip}:8000/simple_ota.bin')
dut.write(f'https://{host_ip}:8000/simple_ota.bin')
dut.expect(f'Writing to <ota_1> partition at offset {OTA_1_ADDRESS}', timeout=20)
dut.expect('Verifying image signature...', timeout=60)
dut.expect('OTA Succeed, Rebooting...', timeout=60)
# after reboot
dut.expect(f'Loaded app from partition at offset {OTA2_ADDRESS}', timeout=20)
dut.expect(f'Loaded app from partition at offset {OTA_1_ADDRESS}', timeout=20)
dut.expect('OTA example app_main start', timeout=10)
finally:
thread1.terminate()
@ -321,27 +305,23 @@ def test_examples_protocol_simple_ota_example_with_verify_app_signature_on_updat
thread1.start()
try:
# start test
dut.expect(f'Loaded app from partition at offset {OTA1_ADDRESS}', timeout=30)
dut.expect(f'Loaded app from partition at offset {OTA_0_ADDRESS}', timeout=30)
check_sha256(sha256_bootloader, str(dut.expect(r'SHA-256 for bootloader:\s+([a-f0-9]){64}')[0]))
check_sha256(sha256_app, str(dut.expect(r'SHA-256 for current firmware:\s+([a-f0-9]){64}')[0]))
try:
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
except pexpect.exceptions.TIMEOUT:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
host_ip = get_host_ip4_by_dest_ip(ip_address)
host_ip = setting_connection(dut)
dut.expect('Starting OTA example task', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
dut.expect(f'Writing to partition subtype 17 at offset {OTA2_ADDRESS}', timeout=20)
print(f'writing to device: https://{host_ip}:8000/simple_ota.bin')
dut.write(f'https://{host_ip}:8000/simple_ota.bin')
dut.expect(f'Writing to <ota_1> partition at offset {OTA_1_ADDRESS}', timeout=20)
dut.expect('Verifying image signature...', timeout=60)
dut.expect('#0 app key digest == #0 trusted key digest', timeout=10)
dut.expect('Verifying with RSA-PSS...', timeout=10)
dut.expect('Signature verified successfully!', timeout=10)
dut.expect('OTA Succeed, Rebooting...', timeout=60)
# after reboot
dut.expect(f'Loaded app from partition at offset {OTA2_ADDRESS}', timeout=20)
dut.expect(f'Loaded app from partition at offset {OTA_1_ADDRESS}', timeout=20)
dut.expect('OTA example app_main start', timeout=10)
finally:
thread1.terminate()
@ -362,29 +342,19 @@ def test_examples_protocol_simple_ota_example_tls1_3(dut: Dut) -> None:
tls1_3_server = start_tls1_3_server(dut.app.binary_path, 8000)
try:
# start test
dut.expect(f'Loaded app from partition at offset {OTA1_ADDRESS}', timeout=30)
dut.expect(f'Loaded app from partition at offset {OTA_0_ADDRESS}', timeout=30)
check_sha256(sha256_bootloader, str(dut.expect(r'SHA-256 for bootloader:\s+([a-f0-9]){64}')[0]))
check_sha256(sha256_app, str(dut.expect(r'SHA-256 for current firmware:\s+([a-f0-9]){64}')[0]))
# Parse IP address of STA
if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
env_name = 'wifi_high_traffic'
dut.expect('Please input ssid password:')
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
ap_password = get_env_config_variable(env_name, 'ap_password')
dut.write(f'{ap_ssid} {ap_password}')
try:
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
except pexpect.exceptions.TIMEOUT:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
host_ip = get_host_ip4_by_dest_ip(ip_address)
env_name = 'wifi_high_traffic' if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True else None
host_ip = setting_connection(dut, env_name)
dut.expect('Starting OTA example task', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
print(f'writing to device: https://{host_ip}:8000/simple_ota.bin')
dut.write(f'https://{host_ip}:8000/simple_ota.bin')
dut.expect('OTA Succeed, Rebooting...', timeout=120)
# after reboot
dut.expect(f'Loaded app from partition at offset {OTA2_ADDRESS}', timeout=30)
dut.expect(f'Loaded app from partition at offset {OTA_1_ADDRESS}', timeout=30)
dut.expect('OTA example app_main start', timeout=10)
finally:
tls1_3_server.kill()
@ -392,12 +362,12 @@ def test_examples_protocol_simple_ota_example_tls1_3(dut: Dut) -> None:
if __name__ == '__main__':
if sys.argv[2:]: # if two or more arguments provided:
# Usage: pytest_simple_ota.py <image_dir> <server_port> [cert_di>]
# Usage: pytest_simple_ota.py <image_dir> <server_port> [cert_dir]
this_dir = os.path.dirname(os.path.realpath(__file__))
bin_dir = os.path.join(this_dir, sys.argv[1])
port = int(sys.argv[2])
cert_dir = bin_dir if not sys.argv[3:] else os.path.join(this_dir, sys.argv[3]) # optional argument
print('Starting HTTPS server at "https://:{}"'.format(port))
print(f'Starting HTTPS server at "https://0.0.0.0:{port}"')
start_https_server(bin_dir, '', port,
server_file=os.path.join(cert_dir, 'ca_cert.pem'),
key_file=os.path.join(cert_dir, 'ca_key.pem'))

View File

@ -1,12 +1,9 @@
# Default sdkconfig parameters to use the OTA
# partition table layout, with a 4MB flash size
CONFIG_ESPTOOLPY_FLASHSIZE_4MB=y
CONFIG_PARTITION_TABLE_TWO_OTA=y
CONFIG_PARTITION_TABLE_TWO_OTA_LARGE=y
# Certificate bundle configuration
CONFIG_MBEDTLS_CERTIFICATE_BUNDLE_DEFAULT_CMN=y
CONFIG_MBEDTLS_CUSTOM_CERTIFICATE_BUNDLE=y
CONFIG_MBEDTLS_CUSTOM_CERTIFICATE_BUNDLE_PATH="server_certs/ca_cert.pem"
# Default partition table config
CONFIG_PARTITION_TABLE_TWO_OTA_LARGE=y