Migrate OTA example test to pytest framework

This commit is contained in:
Harshit Malpani 2022-03-30 14:37:25 +05:30
parent 70109e2393
commit 25f02efaa1
7 changed files with 390 additions and 438 deletions

View File

@ -1,3 +1,5 @@
| Supported Targets | ESP32 | ESP32-S2 | ESP32-S3 | ESP32-C3 |
| ----------------- | ----- | -------- | -------- | -------- |
# Advanced HTTPS OTA example
This example is based on `esp_https_ota` component's APIs.

View File

@ -1,30 +1,34 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import http.server
import multiprocessing
import os
import random
import re
import socket
import ssl
import struct
import subprocess
from typing import Callable
import ttfw_idf
import pexpect
import pytest
from pytest_embedded import Dut
from RangeHTTPServer import RangeRequestHandler
from tiny_test_fw import DUT, Utility
server_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_certs/server_cert.pem')
key_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_certs/server_key.pem')
def get_my_ip():
def get_my_ip() -> str:
s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s1.connect(('8.8.8.8', 80))
my_ip = ''
my_ip = s1.getsockname()[0]
s1.close()
return my_ip
def get_server_status(host_ip, port):
def get_server_status(host_ip: str, port: int) -> bool:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_status = sock.connect_ex((host_ip, port))
sock.close()
@ -33,12 +37,12 @@ def get_server_status(host_ip, port):
return False
def https_request_handler():
def https_request_handler() -> Callable[...,http.server.BaseHTTPRequestHandler]:
"""
Returns a request handler class that handles broken pipe exception
"""
class RequestHandler(RangeRequestHandler):
def finish(self):
def finish(self) -> None:
try:
if not self.wfile.closed:
self.wfile.flush()
@ -47,7 +51,7 @@ def https_request_handler():
pass
self.rfile.close()
def handle(self):
def handle(self) -> None:
try:
RangeRequestHandler.handle(self)
except socket.error:
@ -56,7 +60,7 @@ def https_request_handler():
return RequestHandler
def start_https_server(ota_image_dir, server_ip, server_port):
def start_https_server(ota_image_dir: str, server_ip: str, server_port: int) -> None:
os.chdir(ota_image_dir)
requestHandler = https_request_handler()
httpd = http.server.HTTPServer((server_ip, server_port), requestHandler)
@ -67,24 +71,24 @@ def start_https_server(ota_image_dir, server_ip, server_port):
httpd.serve_forever()
def start_chunked_server(ota_image_dir, server_port):
def start_chunked_server(ota_image_dir: str, server_port: int) -> subprocess.Popen:
os.chdir(ota_image_dir)
chunked_server = subprocess.Popen(['openssl', 's_server', '-WWW', '-key', key_file, '-cert', server_file, '-port', str(server_port)])
return chunked_server
def redirect_handler_factory(url):
def redirect_handler_factory(url: str) -> Callable[...,http.server.BaseHTTPRequestHandler]:
"""
Returns a request handler class that redirects to supplied `url`
"""
class RedirectHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
def do_GET(self) -> None:
print('Sending resp, URL: ' + url)
self.send_response(301)
self.send_header('Location', url)
self.end_headers()
def handle(self):
def handle(self) -> None:
try:
http.server.BaseHTTPRequestHandler.handle(self)
except socket.error:
@ -93,7 +97,7 @@ def redirect_handler_factory(url):
return RedirectHandler
def start_redirect_server(ota_image_dir, server_ip, server_port, redirection_port):
def start_redirect_server(ota_image_dir: str, server_ip: str, server_port: int, redirection_port: int) -> None:
os.chdir(ota_image_dir)
redirectHandler = redirect_handler_factory('https://' + server_ip + ':' + str(redirection_port) + '/advanced_https_ota.bin')
@ -105,8 +109,12 @@ def start_redirect_server(ota_image_dir, server_ip, server_port, redirection_por
httpd.serve_forever()
@ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
def test_examples_protocol_advanced_https_ota_example(env, extra_data):
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.ethernet_ota
def test_examples_protocol_advanced_https_ota_example(dut: Dut) -> None:
"""
This is a positive test case, which downloads complete binary file multiple number of times.
Number of iterations can be specified in variable iterations.
@ -115,43 +123,37 @@ def test_examples_protocol_advanced_https_ota_example(env, extra_data):
2. Fetch OTA image over HTTPS
3. Reboot with the new OTA image
"""
dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
# Number of iterations to validate OTA
iterations = 3
server_port = 8001
# File to be downloaded. This file is generated after compilation
bin_name = 'advanced_https_ota.bin'
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, bin_name)
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
# start test
host_ip = get_my_ip()
if (get_server_status(host_ip, server_port) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
thread1.daemon = True
thread1.start()
dut1.start_app()
for i in range(iterations):
dut1.expect('Loaded app from partition at offset', timeout=30)
dut.expect('Loaded app from partition at offset', timeout=30)
try:
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
except pexpect.exceptions.TIMEOUT:
thread1.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
dut.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
dut1.expect('Loaded app from partition at offset', timeout=60)
dut1.expect('Starting Advanced OTA example', timeout=30)
dut1.reset()
dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
thread1.terminate()
@ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
def test_examples_protocol_advanced_https_ota_example_truncated_bin(env, extra_data):
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.ethernet_ota
def test_examples_protocol_advanced_https_ota_example_truncated_bin(dut: Dut) -> None:
"""
Working of OTA if binary file is truncated is validated in this test case.
Application should return with error message in this case.
@ -161,7 +163,6 @@ def test_examples_protocol_advanced_https_ota_example_truncated_bin(env, extra_d
3. Fetch OTA image over HTTPS
4. Check working of code if bin is truncated
"""
dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
server_port = 8001
# Original binary file generated after compilation
bin_name = 'advanced_https_ota.bin'
@ -170,34 +171,30 @@ def test_examples_protocol_advanced_https_ota_example_truncated_bin(env, extra_d
# Size of truncated file to be grnerated. This value can range from 288 bytes (Image header size) to size of original binary file
# truncated_bin_size is set to 64000 to reduce consumed by the test case
truncated_bin_size = 64000
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, bin_name)
binary_file = os.path.join(dut.app.binary_path, bin_name)
with open(binary_file, 'rb+') as f:
with open(os.path.join(dut1.app.binary_path, truncated_bin_name), 'wb+') as fo:
with open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+') as fo:
fo.write(f.read(truncated_bin_size))
binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
# start test
host_ip = get_my_ip()
if (get_server_status(host_ip, server_port) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
thread1.daemon = True
thread1.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset', timeout=30)
dut.expect('Loaded app from partition at offset', timeout=30)
try:
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
except pexpect.exceptions.TIMEOUT:
thread1.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
dut.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
dut1.expect('Image validation failed, image is corrupted', timeout=30)
dut.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
dut.expect('Image validation failed, image is corrupted', timeout=30)
try:
os.remove(binary_file)
except OSError:
@ -205,8 +202,12 @@ def test_examples_protocol_advanced_https_ota_example_truncated_bin(env, extra_d
thread1.terminate()
@ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
def test_examples_protocol_advanced_https_ota_example_truncated_header(env, extra_data):
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.ethernet_ota
def test_examples_protocol_advanced_https_ota_example_truncated_header(dut: Dut) -> None:
"""
Working of OTA if headers of binary file are truncated is vaildated in this test case.
Application should return with error message in this case.
@ -216,42 +217,38 @@ def test_examples_protocol_advanced_https_ota_example_truncated_header(env, extr
3. Fetch OTA image over HTTPS
4. Check working of code if headers are not sent completely
"""
dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
server_port = 8001
# Original binary file generated after compilation
bin_name = 'advanced_https_ota.bin'
# Truncated binary file to be generated from original binary file
truncated_bin_name = 'truncated_header.bin'
# Size of truncated file to be grnerated. This value should be less than 288 bytes (Image header size)
# Size of truncated file to be generated. This value should be less than 288 bytes (Image header size)
truncated_bin_size = 180
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, bin_name)
binary_file = os.path.join(dut.app.binary_path, bin_name)
with open(binary_file, 'rb+') as f:
with open(os.path.join(dut1.app.binary_path, truncated_bin_name), 'wb+') as fo:
with open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+') as fo:
fo.write(f.read(truncated_bin_size))
binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
# start test
host_ip = get_my_ip()
if (get_server_status(host_ip, server_port) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
thread1.daemon = True
thread1.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset', timeout=30)
dut.expect('Loaded app from partition at offset', timeout=30)
try:
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
except pexpect.exceptions.TIMEOUT:
thread1.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
dut.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
dut1.expect('advanced_https_ota_example: esp_https_ota_read_img_desc failed', timeout=30)
dut.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
dut.expect('advanced_https_ota_example: esp_https_ota_read_img_desc failed', timeout=30)
try:
os.remove(binary_file)
except OSError:
@ -259,8 +256,12 @@ def test_examples_protocol_advanced_https_ota_example_truncated_header(env, extr
thread1.terminate()
@ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
def test_examples_protocol_advanced_https_ota_example_random(env, extra_data):
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.ethernet_ota
def test_examples_protocol_advanced_https_ota_example_random(dut: Dut) -> None:
"""
Working of OTA if random data is added in binary file are validated in this test case.
Magic byte verification should fail in this case.
@ -270,14 +271,13 @@ def test_examples_protocol_advanced_https_ota_example_random(env, extra_data):
3. Fetch OTA image over HTTPS
4. Check working of code for random binary file
"""
dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
server_port = 8001
# Random binary file to be generated
random_bin_name = 'random.bin'
# Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
random_bin_size = 32000
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, random_bin_name)
binary_file = os.path.join(dut.app.binary_path, random_bin_name)
with open(binary_file, 'wb+') as fo:
# First byte of binary file is always set to zero. If first byte is generated randomly,
# in some cases it may generate 0xE9 which will result in failure of testcase.
@ -285,27 +285,24 @@ def test_examples_protocol_advanced_https_ota_example_random(env, extra_data):
for i in range(random_bin_size - 1):
fo.write(struct.pack('B', random.randrange(0,255,1)))
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
# start test
host_ip = get_my_ip()
if (get_server_status(host_ip, server_port) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
thread1.daemon = True
thread1.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset', timeout=30)
dut.expect('Loaded app from partition at offset', timeout=30)
try:
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
except pexpect.exceptions.TIMEOUT:
thread1.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
dut.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name))
dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name)
dut1.expect(re.compile(r'esp_https_ota: Incorrect app descriptor magic'), timeout=10)
dut.write('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name)
dut.expect(r'esp_https_ota: Incorrect app descriptor magic', timeout=10)
try:
os.remove(binary_file)
except OSError:
@ -313,8 +310,12 @@ def test_examples_protocol_advanced_https_ota_example_random(env, extra_data):
thread1.terminate()
@ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
def test_examples_protocol_advanced_https_ota_example_invalid_chip_id(env, extra_data):
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.ethernet_ota
def test_examples_protocol_advanced_https_ota_example_invalid_chip_id(dut: Dut) -> None:
"""
Working of OTA if binary file have invalid chip id is validated in this test case.
Chip id verification should fail in this case.
@ -324,16 +325,15 @@ def test_examples_protocol_advanced_https_ota_example_invalid_chip_id(env, extra
3. Fetch OTA image over HTTPS
4. Check working of code for random binary file
"""
dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
server_port = 8001
bin_name = 'advanced_https_ota.bin'
# Random binary file to be generated
random_bin_name = 'random.bin'
random_binary_file = os.path.join(dut1.app.binary_path, random_bin_name)
random_binary_file = os.path.join(dut.app.binary_path, random_bin_name)
# Size of random binary file. 2000 is choosen, to reduce the time required to run the test-case
random_bin_size = 2000
binary_file = os.path.join(dut1.app.binary_path, bin_name)
binary_file = os.path.join(dut.app.binary_path, bin_name)
with open(binary_file, 'rb+') as f:
data = list(f.read(random_bin_size))
# Changing Chip id
@ -344,22 +344,21 @@ def test_examples_protocol_advanced_https_ota_example_invalid_chip_id(env, extra
# start test
host_ip = get_my_ip()
if (get_server_status(host_ip, server_port) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
thread1.daemon = True
thread1.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset', timeout=30)
dut.expect('Loaded app from partition at offset', timeout=30)
try:
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
except pexpect.exceptions.TIMEOUT:
thread1.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
dut.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name))
dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name)
dut1.expect(re.compile(r'esp_https_ota: Mismatch chip id, expected 0, found \d'), timeout=10)
dut.write('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name)
dut.expect(r'esp_https_ota: Mismatch chip id, expected 0, found \d', timeout=10)
try:
os.remove(random_binary_file)
except OSError:
@ -367,8 +366,12 @@ def test_examples_protocol_advanced_https_ota_example_invalid_chip_id(env, extra
thread1.terminate()
@ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
def test_examples_protocol_advanced_https_ota_example_chunked(env, extra_data):
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.ethernet_ota
def test_examples_protocol_advanced_https_ota_example_chunked(dut: Dut) -> None:
"""
This is a positive test case, which downloads complete binary file multiple number of times.
Number of iterations can be specified in variable iterations.
@ -377,34 +380,32 @@ def test_examples_protocol_advanced_https_ota_example_chunked(env, extra_data):
2. Fetch OTA image over HTTPS
3. Reboot with the new OTA image
"""
dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
# File to be downloaded. This file is generated after compilation
bin_name = 'advanced_https_ota.bin'
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, bin_name)
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
# start test
host_ip = get_my_ip()
chunked_server = start_chunked_server(dut1.app.binary_path, 8070)
dut1.start_app()
dut1.expect('Loaded app from partition at offset', timeout=30)
chunked_server = start_chunked_server(dut.app.binary_path, 8070)
dut.expect('Loaded app from partition at offset', timeout=30)
try:
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
except pexpect.exceptions.TIMEOUT:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
dut.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':8070/' + bin_name))
dut1.write('https://' + host_ip + ':8070/' + bin_name)
dut1.expect('Loaded app from partition at offset', timeout=60)
dut1.expect('Starting Advanced OTA example', timeout=30)
dut.write('https://' + host_ip + ':8070/' + bin_name)
dut.expect('Loaded app from partition at offset', timeout=60)
dut.expect('Starting Advanced OTA example', timeout=30)
chunked_server.kill()
@ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
def test_examples_protocol_advanced_https_ota_example_redirect_url(env, extra_data):
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.ethernet_ota
def test_examples_protocol_advanced_https_ota_example_redirect_url(dut: Dut) -> None:
"""
This is a positive test case, which starts a server and a redirection server.
Redirection server redirects http_request to different port
@ -414,53 +415,52 @@ def test_examples_protocol_advanced_https_ota_example_redirect_url(env, extra_da
2. Fetch OTA image over HTTPS
3. Reboot with the new OTA image
"""
dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
server_port = 8001
# Port to which the request should be redirected
redirection_server_port = 8081
redirection_server_port1 = 8082
# File to be downloaded. This file is generated after compilation
bin_name = 'advanced_https_ota.bin'
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, bin_name)
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
# start test
host_ip = get_my_ip()
if (get_server_status(host_ip, server_port) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
thread1.daemon = True
thread1.start()
thread2 = multiprocessing.Process(target=start_redirect_server, args=(dut1.app.binary_path, host_ip, redirection_server_port, redirection_server_port1))
thread2 = multiprocessing.Process(target=start_redirect_server, args=(dut.app.binary_path, host_ip, redirection_server_port, redirection_server_port1))
thread2.daemon = True
thread2.start()
thread3 = multiprocessing.Process(target=start_redirect_server, args=(dut1.app.binary_path, host_ip, redirection_server_port1, server_port))
thread3 = multiprocessing.Process(target=start_redirect_server, args=(dut.app.binary_path, host_ip, redirection_server_port1, server_port))
thread3.daemon = True
thread3.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset', timeout=30)
dut.expect('Loaded app from partition at offset', timeout=30)
try:
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
except pexpect.exceptions.TIMEOUT:
thread1.terminate()
thread2.terminate()
thread3.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
dut.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(redirection_server_port) + '/' + bin_name))
dut1.write('https://' + host_ip + ':' + str(redirection_server_port) + '/' + bin_name)
dut1.expect('Loaded app from partition at offset', timeout=60)
dut1.expect('Starting Advanced OTA example', timeout=30)
dut1.reset()
dut.write('https://' + host_ip + ':' + str(redirection_server_port) + '/' + bin_name)
dut.expect('Loaded app from partition at offset', timeout=60)
dut.expect('Starting Advanced OTA example', timeout=30)
thread1.terminate()
thread2.terminate()
thread3.terminate()
@ttfw_idf.idf_example_test(env_tag='Example_8Mflash_Ethernet')
def test_examples_protocol_advanced_https_ota_example_anti_rollback(env, extra_data):
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.ethernet_flash_8m
@pytest.mark.parametrize('config', ['anti_rollback',], indirect=True)
@pytest.mark.parametrize('skip_autoflash', ['y'], indirect=True)
def test_examples_protocol_advanced_https_ota_example_anti_rollback(dut: Dut) -> None:
"""
Working of OTA when anti_rollback is enabled and security version of new image is less than current one.
Application should return with error message in this case.
@ -470,57 +470,52 @@ def test_examples_protocol_advanced_https_ota_example_anti_rollback(env, extra_d
3. Fetch OTA image over HTTPS
4. Check working of anti_rollback feature
"""
dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT, app_config_name='anti_rollback')
Utility.console_log('Erasing the flash on the chip')
# erase the flash
dut1.erase_flash()
dut.serial.erase_flash()
dut.serial.flash()
server_port = 8001
# Original binary file generated after compilation
bin_name = 'advanced_https_ota.bin'
# Modified firmware image to lower security version in its header. This is to enable negative test case
anti_rollback_bin_name = 'advanced_https_ota_lower_sec_version.bin'
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, bin_name)
binary_file = os.path.join(dut.app.binary_path, bin_name)
file_size = os.path.getsize(binary_file)
with open(binary_file, 'rb+') as f:
with open(os.path.join(dut1.app.binary_path, anti_rollback_bin_name), 'wb+') as fo:
with open(os.path.join(dut.app.binary_path, anti_rollback_bin_name), 'wb+') as fo:
fo.write(f.read(file_size))
# Change security_version to 0 for negative test case
fo.seek(36)
fo.write(b'\x00')
binary_file = os.path.join(dut1.app.binary_path, anti_rollback_bin_name)
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
binary_file = os.path.join(dut.app.binary_path, anti_rollback_bin_name)
# start test
host_ip = get_my_ip()
if (get_server_status(host_ip, server_port) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
thread1.daemon = True
thread1.start()
dut1.start_app()
# Positive Case
dut1.expect('Loaded app from partition at offset', timeout=30)
dut.expect('Loaded app from partition at offset', timeout=30)
try:
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
except pexpect.exceptions.TIMEOUT:
thread1.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
dut.expect('Starting Advanced OTA example', timeout=30)
# Use originally generated image with secure_version=1
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
dut1.expect('Loaded app from partition at offset', timeout=60)
dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
dut1.expect('App is valid, rollback cancelled successfully', 30)
dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
dut.expect('Loaded app from partition at offset', timeout=60)
dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
dut.expect(r'App is valid, rollback cancelled successfully', timeout=30)
# Negative Case
dut1.expect('Starting Advanced OTA example', timeout=30)
dut.expect('Starting Advanced OTA example', timeout=30)
# Use modified image with secure_version=0
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + anti_rollback_bin_name))
dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + anti_rollback_bin_name)
dut1.expect('New firmware security version is less than eFuse programmed, 0 < 1', timeout=30)
dut.write('https://' + host_ip + ':' + str(server_port) + '/' + anti_rollback_bin_name)
dut.expect('New firmware security version is less than eFuse programmed, 0 < 1', timeout=30)
try:
os.remove(binary_file)
except OSError:
@ -528,8 +523,13 @@ def test_examples_protocol_advanced_https_ota_example_anti_rollback(env, extra_d
thread1.terminate()
@ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
def test_examples_protocol_advanced_https_ota_example_partial_request(env, extra_data):
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.ethernet_ota
@pytest.mark.parametrize('config', ['partial_download',], indirect=True)
def test_examples_protocol_advanced_https_ota_example_partial_request(dut: Dut) -> None:
"""
This is a positive test case, to test OTA workflow with Range HTTP header.
steps: |
@ -537,46 +537,46 @@ def test_examples_protocol_advanced_https_ota_example_partial_request(env, extra
2. Fetch OTA image over HTTPS
3. Reboot with the new OTA image
"""
dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT, app_config_name='partial_download')
server_port = 8001
# Size of partial HTTP request
request_size = 16384
# File to be downloaded. This file is generated after compilation
bin_name = 'advanced_https_ota.bin'
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, bin_name)
binary_file = os.path.join(dut.app.binary_path, bin_name)
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
http_requests = int((bin_size / request_size) - 1)
# start test
host_ip = get_my_ip()
if (get_server_status(host_ip, server_port) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
thread1.daemon = True
thread1.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset', timeout=30)
dut.expect('Loaded app from partition at offset', timeout=30)
try:
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
Utility.console_log('ENV_TEST_FAILURE: Cannot connect to AP')
except pexpect.exceptions.TIMEOUT:
print('ENV_TEST_FAILURE: Cannot connect to AP')
thread1.terminate()
raise
dut1.expect('Starting Advanced OTA example', timeout=30)
dut.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
for _ in range(http_requests):
dut1.expect('Connection closed', timeout=60)
dut1.expect('Loaded app from partition at offset', timeout=60)
dut1.expect('Starting Advanced OTA example', timeout=30)
dut1.reset()
dut.expect('Connection closed', timeout=60)
dut.expect('Loaded app from partition at offset', timeout=60)
dut.expect('Starting Advanced OTA example', timeout=30)
thread1.terminate()
@ttfw_idf.idf_example_test(env_tag='Example_WIFI_OTA', nightly_run=True)
def test_examples_protocol_advanced_https_ota_example_nimble_gatts(env, extra_data):
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.wifi_ota
@pytest.mark.parametrize('config', ['nimble',], indirect=True)
def test_examples_protocol_advanced_https_ota_example_nimble_gatts(dut: Dut) -> None:
"""
Run an OTA image update while a BLE GATT Server is running in background. This GATT server will be using NimBLE Host stack.
steps: |
@ -585,42 +585,40 @@ def test_examples_protocol_advanced_https_ota_example_nimble_gatts(env, extra_da
3. Fetch OTA image over HTTPS
4. Reboot with the new OTA image
"""
dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT, app_config_name='nimble')
server_port = 8001
# File to be downloaded. This file is generated after compilation
bin_name = 'advanced_https_ota.bin'
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, bin_name)
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
# start test
host_ip = get_my_ip()
if (get_server_status(host_ip, server_port) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
thread1.daemon = True
thread1.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset', timeout=30)
dut.expect('Loaded app from partition at offset', timeout=30)
try:
ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' sta ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
except pexpect.exceptions.TIMEOUT:
thread1.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
dut.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
print('Started GAP advertising.')
dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
dut1.expect('Loaded app from partition at offset', timeout=60)
dut1.expect('Starting Advanced OTA example', timeout=30)
dut1.reset()
dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
dut.expect('Loaded app from partition at offset', timeout=60)
dut.expect('Starting Advanced OTA example', timeout=30)
thread1.terminate()
@ttfw_idf.idf_example_test(env_tag='Example_WIFI_OTA', nightly_run=True)
def test_examples_protocol_advanced_https_ota_example_bluedroid_gatts(env, extra_data):
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.wifi_ota
@pytest.mark.parametrize('config', ['bluedroid',], indirect=True)
def test_examples_protocol_advanced_https_ota_example_bluedroid_gatts(dut: Dut) -> None:
"""
Run an OTA image update while a BLE GATT Server is running in background. This GATT server will be using Bluedroid Host stack.
steps: |
@ -629,43 +627,40 @@ def test_examples_protocol_advanced_https_ota_example_bluedroid_gatts(env, extra
3. Fetch OTA image over HTTPS
4. Reboot with the new OTA image
"""
dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT, app_config_name='bluedroid')
server_port = 8001
# File to be downloaded. This file is generated after compilation
bin_name = 'advanced_https_ota.bin'
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, bin_name)
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
# start test
host_ip = get_my_ip()
if (get_server_status(host_ip, server_port) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
thread1.daemon = True
thread1.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset', timeout=30)
dut.expect('Loaded app from partition at offset', timeout=30)
try:
ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' sta ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
except pexpect.exceptions.TIMEOUT:
thread1.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
dut.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
dut1.expect('Started advertising.', timeout=30)
dut.expect('Started advertising.', timeout=30)
print('Started GAP advertising.')
dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
dut1.expect('Loaded app from partition at offset', timeout=60)
dut1.expect('Starting Advanced OTA example', timeout=30)
dut1.reset()
dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
dut.expect('Loaded app from partition at offset', timeout=60)
dut.expect('Starting Advanced OTA example', timeout=30)
thread1.terminate()
@ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
def test_examples_protocol_advanced_https_ota_example_openssl_aligned_bin(env, extra_data):
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.ethernet_ota
def test_examples_protocol_advanced_https_ota_example_openssl_aligned_bin(dut: Dut) -> None:
"""
This is a test case for esp_http_client_read with binary size multiple of 289 bytes
steps: |
@ -673,55 +668,38 @@ def test_examples_protocol_advanced_https_ota_example_openssl_aligned_bin(env, e
2. Fetch OTA image over HTTPS
3. Reboot with the new OTA image
"""
dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
# Original binary file generated after compilation
bin_name = 'advanced_https_ota.bin'
# Binary file aligned to DEFAULT_OTA_BUF_SIZE(289 bytes) boundary
aligned_bin_name = 'aligned.bin'
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, bin_name)
binary_file = os.path.join(dut.app.binary_path, bin_name)
# Original binary size
bin_size = os.path.getsize(binary_file)
# Dummy data required to align binary size to 289 bytes boundary
dummy_data_size = 289 - (bin_size % 289)
with open(binary_file, 'rb+') as f:
with open(os.path.join(dut1.app.binary_path, aligned_bin_name), 'wb+') as fo:
with open(os.path.join(dut.app.binary_path, aligned_bin_name), 'wb+') as fo:
fo.write(f.read(bin_size))
for _ in range(dummy_data_size):
fo.write(struct.pack('B', random.randrange(0,255,1)))
# start test
host_ip = get_my_ip()
chunked_server = start_chunked_server(dut1.app.binary_path, 8070)
dut1.start_app()
dut1.expect('Loaded app from partition at offset', timeout=30)
chunked_server = start_chunked_server(dut.app.binary_path, 8070)
dut.expect('Loaded app from partition at offset', timeout=30)
try:
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
except pexpect.exceptions.TIMEOUT:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Advanced OTA example', timeout=30)
dut.expect('Starting Advanced OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':8070/' + aligned_bin_name))
dut1.write('https://' + host_ip + ':8070/' + aligned_bin_name)
dut1.expect('Loaded app from partition at offset', timeout=60)
dut1.expect('Starting Advanced OTA example', timeout=30)
dut.write('https://' + host_ip + ':8070/' + aligned_bin_name)
dut.expect('Loaded app from partition at offset', timeout=60)
dut.expect('Starting Advanced OTA example', timeout=30)
chunked_server.kill()
try:
os.remove(aligned_bin_name)
except OSError:
pass
if __name__ == '__main__':
test_examples_protocol_advanced_https_ota_example()
test_examples_protocol_advanced_https_ota_example_chunked()
test_examples_protocol_advanced_https_ota_example_redirect_url()
test_examples_protocol_advanced_https_ota_example_truncated_bin()
test_examples_protocol_advanced_https_ota_example_truncated_header()
test_examples_protocol_advanced_https_ota_example_random()
test_examples_protocol_advanced_https_ota_example_invalid_chip_id()
test_examples_protocol_advanced_https_ota_example_anti_rollback()
test_examples_protocol_advanced_https_ota_example_partial_request()
test_examples_protocol_advanced_https_ota_example_nimble_gatts()
test_examples_protocol_advanced_https_ota_example_bluedroid_gatts()
test_examples_protocol_advanced_https_ota_example_openssl_aligned_bin()

View File

@ -1,35 +1,30 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
from __future__ import print_function
import os
import subprocess
import sys
import ttfw_idf
import pytest
from pytest_embedded import Dut
@ttfw_idf.idf_example_test(env_tag='Example_WIFI')
def test_otatool_example(env, extra_data):
dut = env.get_dut('otatool', 'examples/system/ota/otatool', dut_class=ttfw_idf.ESP32DUT)
@pytest.mark.supported_targets
@pytest.mark.wifi
def test_otatool_example(dut: Dut) -> None:
# Verify factory firmware
dut.start_app()
dut.expect('OTA Tool Example')
dut.expect('Example end')
# Close connection to DUT
dut.receive_thread.exit()
dut.port_inst.close()
dut.serial.proc.close()
script_path = os.path.join(os.getenv('IDF_PATH'), 'examples', 'system', 'ota', 'otatool', 'otatool_example.py')
script_path = os.path.join(str(os.getenv('IDF_PATH')), 'examples', 'system', 'ota', 'otatool', 'otatool_example.py')
binary_path = ''
for flash_file in dut.app.flash_files:
if 'otatool.bin' in flash_file[1]:
binary_path = flash_file[1]
break
subprocess.check_call([sys.executable, script_path, '--binary', binary_path])
if __name__ == '__main__':
test_otatool_example()

View File

@ -1,4 +1,5 @@
| Supported Targets | ESP32 | ESP32-S2 | ESP32-S3 | ESP32-C3 |
| ----------------- | ----- | -------- | -------- | -------- |
# Encrypted Binary OTA
This example demonstrates OTA updates with pre-encrypted binary using `esp_encrypted_img` component's APIs and tool.

View File

@ -3,36 +3,37 @@
import http.server
import multiprocessing
import os
import re
import socket
import ssl
from typing import Any
from typing import Callable
import ttfw_idf
import pexpect
import pytest
from pytest_embedded import Dut
from RangeHTTPServer import RangeRequestHandler
from tiny_test_fw import DUT
server_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'server_certs/ca_cert.pem')
key_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'server_certs/server_key.pem')
enc_bin_name = 'pre_encrypted_ota_secure.bin'
def get_my_ip() -> Any:
def get_my_ip() -> str:
s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s1.connect(('8.8.8.8', 80))
my_ip = ''
my_ip = s1.getsockname()[0]
s1.close()
return my_ip
def get_server_status(host_ip: Any, port: int) -> bool:
def get_server_status(host_ip: str, port: int) -> bool:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_status = sock.connect_ex((host_ip, port))
sock.close()
return server_status == 0
def https_request_handler(): # type: ignore
def https_request_handler() -> Callable[...,http.server.BaseHTTPRequestHandler]:
"""
Returns a request handler class that handles broken pipe exception
"""
@ -55,7 +56,7 @@ def https_request_handler(): # type: ignore
return RequestHandler
def start_https_server(ota_image_dir: str, server_ip: Any, server_port: int) -> None:
def start_https_server(ota_image_dir: str, server_ip: str, server_port: int) -> None:
os.chdir(ota_image_dir)
requestHandler = https_request_handler()
httpd = http.server.HTTPServer((server_ip, server_port), requestHandler)
@ -66,42 +67,32 @@ def start_https_server(ota_image_dir: str, server_ip: Any, server_port: int) ->
httpd.serve_forever()
@ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
def test_examples_protocol_pre_encrypted_ota_example(env, extra_data): # type: ignore
dut1 = env.get_dut('pre_encrypted_ota_example', 'examples/system/ota/pre_encrypted_ota', dut_class=ttfw_idf.ESP32DUT)
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.ethernet_ota
def test_examples_protocol_pre_encrypted_ota_example(dut: Dut) -> None:
server_port = 8001
# start test
host_ip = get_my_ip()
if (get_server_status(host_ip, server_port) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
thread1.daemon = True
thread1.start()
artifacts = dut1.app.artifact_cls(dut1.app.idf_path,
dut1.app.case_group.get_artifact_index_file(),
dut1.app.app_path, dut1.app.config_name, dut1.app.target)
artifacts.download_artifact_files(file_names=['pre_encrypted_ota_secure.bin'])
dut1.start_app()
dut1.expect('Loaded app from partition at offset', timeout=30)
dut.expect('Loaded app from partition at offset', timeout=30)
try:
ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
except pexpect.exceptions.TIMEOUT:
thread1.terminate()
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut1.expect('Starting Pre Encrypted OTA example', timeout=30)
dut.expect('Starting Pre Encrypted OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + enc_bin_name))
dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + enc_bin_name)
dut1.expect('Magic Verified', timeout=30)
dut1.expect('Reading RSA private key', timeout=30)
dut1.expect('upgrade successful. Rebooting', timeout=30)
dut.write('https://' + host_ip + ':' + str(server_port) + '/' + enc_bin_name)
dut.expect('Magic Verified', timeout=30)
dut.expect('Reading RSA private key', timeout=30)
dut.expect('upgrade successful. Rebooting', timeout=30)
thread1.terminate()
if __name__ == '__main__':
test_examples_protocol_pre_encrypted_ota_example()

View File

@ -1,3 +1,5 @@
| Supported Targets | ESP32 | ESP32-S2 | ESP32-S3 | ESP32-C3 |
| ----------------- | ----- | -------- | -------- | -------- |
# Simple OTA example
This example is based on `esp_https_ota` component's APIs.

View File

@ -1,13 +1,16 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import http.server
import multiprocessing
import os
import re
import socket
import ssl
import sys
from typing import Tuple
import ttfw_idf
from tiny_test_fw import DUT, Utility
import pexpect
import pytest
from pytest_embedded import Dut
server_cert = '-----BEGIN CERTIFICATE-----\n' \
'MIIDWDCCAkACCQCbF4+gVh/MLjANBgkqhkiG9w0BAQsFADBuMQswCQYDVQQGEwJJ\n'\
@ -60,15 +63,16 @@ server_key = '-----BEGIN PRIVATE KEY-----\n'\
'-----END PRIVATE KEY-----\n'
def get_my_ip():
def get_my_ip() -> str:
s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s1.connect(('8.8.8.8', 80))
my_ip = ''
my_ip = s1.getsockname()[0]
s1.close()
return my_ip
def get_server_status(host_ip, server_port):
def get_server_status(host_ip: str, server_port: int) -> bool:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_status = sock.connect_ex((host_ip, server_port))
sock.close()
@ -77,7 +81,7 @@ def get_server_status(host_ip, server_port):
return False
def start_https_server(ota_image_dir, server_ip, server_port, server_file=None, key_file=None):
def start_https_server(ota_image_dir: str, server_ip: str, server_port: int, server_file: str = None, key_file: str = None) -> None:
os.chdir(ota_image_dir)
if server_file is None:
@ -100,274 +104,260 @@ def start_https_server(ota_image_dir, server_ip, server_port, server_file=None,
httpd.serve_forever()
def check_sha256(sha256_expected, sha256_reported):
Utility.console_log('sha256_expected: %s' % (sha256_expected))
Utility.console_log('sha256_reported: %s' % (sha256_reported))
if sha256_reported not in sha256_expected:
def check_sha256(sha256_expected: str, sha256_reported: str) -> None:
print('sha256_expected: %s' % (sha256_expected))
print('sha256_reported: %s' % (sha256_reported))
if sha256_expected not in sha256_reported:
raise ValueError('SHA256 mismatch')
else:
Utility.console_log('SHA256 expected and reported are the same')
print('SHA256 expected and reported are the same')
def calc_all_sha256(dut):
def calc_all_sha256(dut: Dut) -> Tuple[str, str]:
bootloader_path = os.path.join(dut.app.binary_path, 'bootloader', 'bootloader.bin')
output = dut.image_info(bootloader_path)
sha256_bootloader = re.search(r'Validation Hash:\s+([a-f0-9]+)', output).group(1)
Utility.console_log('bootloader SHA256: %s' % sha256_bootloader)
sha256_bootloader = dut.app.get_sha256(bootloader_path)
app_path = os.path.join(dut.app.binary_path, 'simple_ota.bin')
output = dut.image_info(app_path)
sha256_app = re.search(r'Validation Hash:\s+([a-f0-9]+)', output).group(1)
Utility.console_log('app SHA256: %s' % sha256_app)
sha256_app = dut.app.get_sha256(app_path)
return sha256_bootloader, sha256_app
return str(sha256_bootloader), str(sha256_app)
@ttfw_idf.idf_example_test(env_tag='Example_WIFI_OTA', nightly_run=True)
def test_examples_protocol_simple_ota_example(env, extra_data):
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.wifi_ota
def test_examples_protocol_simple_ota_example(dut: Dut) -> None:
"""
steps: |
1. join AP
2. Fetch OTA image over HTTPS
3. Reboot with the new OTA image
"""
dut1 = env.get_dut('simple_ota_example', 'examples/system/ota/simple_ota_example', dut_class=ttfw_idf.ESP32DUT)
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, 'simple_ota.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('simple_ota_bin_size', '{}KB'.format(bin_size // 1024))
sha256_bootloader, sha256_app = calc_all_sha256(dut1)
sha256_bootloader, sha256_app = calc_all_sha256(dut)
# start test
host_ip = get_my_ip()
if (get_server_status(host_ip, 8000) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, 8000))
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
thread1.daemon = True
thread1.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset 0x10000', timeout=30)
check_sha256(sha256_bootloader, dut1.expect(re.compile(r'SHA-256 for bootloader:\s+([a-f0-9]+)'))[0])
check_sha256(sha256_app, dut1.expect(re.compile(r'SHA-256 for current firmware:\s+([a-f0-9]+)'))[0])
dut.expect('Loaded app from partition at offset 0x10000', timeout=30)
check_sha256(sha256_bootloader, str(dut.expect(r'SHA-256 for bootloader:\s+([a-f0-9]){64}')[0]))
check_sha256(sha256_app, str(dut.expect(r'SHA-256 for current firmware:\s+([a-f0-9]){64}')[0]))
try:
ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' sta ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
except pexpect.exceptions.TIMEOUT:
thread1.terminate()
dut1.expect('Starting OTA example', timeout=30)
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut.expect('Starting OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
dut1.write('https://' + host_ip + ':8000/simple_ota.bin')
dut1.expect('Loaded app from partition at offset 0x110000', timeout=60)
dut1.expect('Starting OTA example', timeout=30)
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
dut.expect('Loaded app from partition at offset 0x110000', timeout=60)
dut.expect('Starting OTA example', timeout=30)
thread1.terminate()
@ttfw_idf.idf_example_test(env_tag='Example_EthKitV1')
def test_examples_protocol_simple_ota_example_ethernet_with_spiram_config(env, extra_data):
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.ethernet_ota
@pytest.mark.parametrize('config', ['spiram',], indirect=True)
def test_examples_protocol_simple_ota_example_ethernet_with_spiram_config(dut: Dut) -> None:
"""
steps: |
1. join AP
2. Fetch OTA image over HTTPS
3. Reboot with the new OTA image
"""
dut1 = env.get_dut('simple_ota_example', 'examples/system/ota/simple_ota_example', dut_class=ttfw_idf.ESP32DUT, app_config_name='spiram')
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, 'simple_ota.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('simple_ota_bin_size', '{}KB'.format(bin_size // 1024))
# start test
host_ip = get_my_ip()
if (get_server_status(host_ip, 8000) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, 8000))
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
thread1.daemon = True
thread1.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset 0x10000', timeout=30)
dut.expect('Loaded app from partition at offset 0x10000', timeout=30)
try:
ip_address = dut1.expect(re.compile(r' eth ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' eth ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
except pexpect.exceptions.TIMEOUT:
thread1.terminate()
dut1.expect('Starting OTA example', timeout=30)
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut.expect('Starting OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
dut1.write('https://' + host_ip + ':8000/simple_ota.bin')
dut1.expect('Loaded app from partition at offset 0x110000', timeout=60)
dut1.expect('Starting OTA example', timeout=30)
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
dut.expect('Loaded app from partition at offset 0x110000', timeout=60)
dut.expect('Starting OTA example', timeout=30)
thread1.terminate()
@ttfw_idf.idf_example_test(env_tag='Example_Flash_Encryption_OTA')
def test_examples_protocol_simple_ota_example_with_flash_encryption(env, extra_data):
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.flash_encryption_ota
@pytest.mark.parametrize('config', ['flash_enc',], indirect=True)
@pytest.mark.parametrize('skip_autoflash', ['y'], indirect=True)
def test_examples_protocol_simple_ota_example_with_flash_encryption(dut: Dut) -> None:
"""
steps: |
1. join AP
2. Fetch OTA image over HTTPS
3. Reboot with the new OTA image
"""
dut1 = env.get_dut('simple_ota_example', 'examples/system/ota/simple_ota_example', dut_class=ttfw_idf.ESP32DUT, app_config_name='flash_enc')
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, 'simple_ota.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('simple_ota_bin_size', '{}KB'.format(bin_size // 1024))
# erase flash on the device
print('Erasing the flash in order to have an empty NVS key partiton')
dut1.erase_flash()
# Erase flash
dut.serial.erase_flash()
dut.serial.flash()
# start test
host_ip = get_my_ip()
if (get_server_status(host_ip, 8000) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, 8000))
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
thread1.daemon = True
thread1.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset 0x20000', timeout=30)
dut1.expect('Flash encryption mode is DEVELOPMENT (not secure)', timeout=10)
dut.expect('Loaded app from partition at offset 0x20000', timeout=30)
dut.expect('Flash encryption mode is DEVELOPMENT', timeout=10)
try:
ip_address = dut1.expect(re.compile(r' eth ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' eth ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
except pexpect.exceptions.TIMEOUT:
thread1.terminate()
dut1.expect('Starting OTA example', timeout=30)
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut.expect('Starting OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
dut1.write('https://' + host_ip + ':8000/simple_ota.bin')
dut1.expect('Loaded app from partition at offset 0x120000', timeout=60)
dut1.expect('Flash encryption mode is DEVELOPMENT (not secure)', timeout=10)
dut1.expect('Starting OTA example', timeout=30)
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
dut.expect('Loaded app from partition at offset 0x120000', timeout=60)
dut.expect('Flash encryption mode is DEVELOPMENT', timeout=10)
dut.expect('Starting OTA example', timeout=30)
thread1.terminate()
@ttfw_idf.idf_example_test(env_tag='Example_Flash_Encryption_OTA_WiFi', target=['esp32c3'], nightly_run=True)
def test_examples_protocol_simple_ota_example_with_flash_encryption_wifi(env, extra_data):
@pytest.mark.esp32c3
@pytest.mark.flash_encryption_wifi_ota
@pytest.mark.parametrize('config', ['flash_enc_wifi',], indirect=True)
@pytest.mark.parametrize('skip_autoflash', ['y'], indirect=True)
def test_examples_protocol_simple_ota_example_with_flash_encryption_wifi(dut: Dut) -> None:
"""
steps: |
1. join AP
2. Fetch OTA image over HTTPS
3. Reboot with the new OTA image
"""
dut1 = env.get_dut('simple_ota_example', 'examples/system/ota/simple_ota_example', app_config_name='flash_enc_wifi')
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, 'simple_ota.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('simple_ota_bin_size', '{}KB'.format(bin_size // 1024))
# erase flash on the device
print('Erasing the flash in order to have an empty NVS key partiton')
dut1.erase_flash()
# start test
# Erase flash
dut.serial.erase_flash()
dut.serial.flash()
host_ip = get_my_ip()
if (get_server_status(host_ip, 8000) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, 8000))
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
thread1.daemon = True
thread1.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset 0x20000', timeout=30)
dut1.expect('Flash encryption mode is DEVELOPMENT (not secure)', timeout=10)
dut.expect('Loaded app from partition at offset 0x20000', timeout=30)
dut.expect('Flash encryption mode is DEVELOPMENT', timeout=10)
try:
ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' sta ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
except pexpect.exceptions.TIMEOUT:
thread1.terminate()
dut1.expect('Starting OTA example', timeout=30)
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut.expect('Starting OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
dut1.write('https://' + host_ip + ':8000/simple_ota.bin')
dut1.expect('Loaded app from partition at offset 0x120000', timeout=60)
dut1.expect('Flash encryption mode is DEVELOPMENT (not secure)', timeout=10)
dut1.expect('Starting OTA example', timeout=30)
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
dut.expect('Loaded app from partition at offset 0x120000', timeout=60)
dut.expect('Flash encryption mode is DEVELOPMENT', timeout=10)
dut.expect('Starting OTA example', timeout=30)
thread1.terminate()
@ttfw_idf.idf_example_test(env_tag='Example_EthKitV1')
def test_examples_protocol_simple_ota_example_with_verify_app_signature_on_update_no_secure_boot_ecdsa(env, extra_data):
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.ethernet_ota
@pytest.mark.parametrize('config', ['on_update_no_sb_ecdsa',], indirect=True)
def test_examples_protocol_simple_ota_example_with_verify_app_signature_on_update_no_secure_boot_ecdsa(dut: Dut) -> None:
"""
steps: |
1. join AP
2. Fetch OTA image over HTTPS
3. Reboot with the new OTA image
"""
dut1 = env.get_dut('simple_ota_example', 'examples/system/ota/simple_ota_example', dut_class=ttfw_idf.ESP32DUT,
app_config_name='on_update_no_sb_ecdsa')
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, 'simple_ota.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('simple_ota_bin_size', '{}KB'.format(bin_size // 1024))
sha256_bootloader, sha256_app = calc_all_sha256(dut1)
sha256_bootloader, sha256_app = calc_all_sha256(dut)
# start test
host_ip = get_my_ip()
if (get_server_status(host_ip, 8000) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, 8000))
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
thread1.daemon = True
thread1.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset 0x20000', timeout=30)
check_sha256(sha256_bootloader, dut1.expect(re.compile(r'SHA-256 for bootloader:\s+([a-f0-9]+)'))[0])
check_sha256(sha256_app, dut1.expect(re.compile(r'SHA-256 for current firmware:\s+([a-f0-9]+)'))[0])
dut.expect('Loaded app from partition at offset 0x20000', timeout=30)
check_sha256(sha256_bootloader, str(dut.expect(r'SHA-256 for bootloader:\s+([a-f0-9]){64}')[0]))
check_sha256(sha256_app, str(dut.expect(r'SHA-256 for current firmware:\s+([a-f0-9]){64}')[0]))
try:
ip_address = dut1.expect(re.compile(r' eth ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' eth ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
except pexpect.exceptions.TIMEOUT:
thread1.terminate()
dut1.expect('Starting OTA example', timeout=30)
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut.expect('Starting OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
dut1.write('https://' + host_ip + ':8000/simple_ota.bin')
dut1.expect('Writing to partition subtype 16 at offset 0x120000', timeout=20)
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
dut.expect('Writing to partition subtype 16 at offset 0x120000', timeout=20)
dut1.expect('Verifying image signature...', timeout=60)
dut.expect('Verifying image signature...', timeout=60)
dut1.expect('Loaded app from partition at offset 0x120000', timeout=20)
dut1.expect('Starting OTA example', timeout=30)
dut.expect('Loaded app from partition at offset 0x120000', timeout=20)
dut.expect('Starting OTA example', timeout=30)
thread1.terminate()
@ttfw_idf.idf_example_test(env_tag='Example_EthKitV12')
def test_examples_protocol_simple_ota_example_with_verify_app_signature_on_update_no_secure_boot_rsa(env, extra_data):
@pytest.mark.esp32
@pytest.mark.esp32c3
@pytest.mark.esp32s2
@pytest.mark.esp32s3
@pytest.mark.ethernet
@pytest.mark.parametrize('config', ['on_update_no_sb_rsa',], indirect=True)
def test_examples_protocol_simple_ota_example_with_verify_app_signature_on_update_no_secure_boot_rsa(dut: Dut) -> None:
"""
steps: |
1. join AP
2. Fetch OTA image over HTTPS
3. Reboot with the new OTA image
"""
dut1 = env.get_dut('simple_ota_example', 'examples/system/ota/simple_ota_example', dut_class=ttfw_idf.ESP32DUT,
app_config_name='on_update_no_sb_rsa')
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, 'simple_ota.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('simple_ota_bin_size', '{}KB'.format(bin_size // 1024))
sha256_bootloader, sha256_app = calc_all_sha256(dut1)
sha256_bootloader, sha256_app = calc_all_sha256(dut)
# start test
host_ip = get_my_ip()
if (get_server_status(host_ip, 8000) is False):
thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, 8000))
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
thread1.daemon = True
thread1.start()
dut1.start_app()
dut1.expect('Loaded app from partition at offset 0x20000', timeout=30)
check_sha256(sha256_bootloader, dut1.expect(re.compile(r'SHA-256 for bootloader:\s+([a-f0-9]+)'))[0])
check_sha256(sha256_app, dut1.expect(re.compile(r'SHA-256 for current firmware:\s+([a-f0-9]+)'))[0])
dut.expect('Loaded app from partition at offset 0x20000', timeout=30)
check_sha256(sha256_bootloader, str(dut.expect(r'SHA-256 for bootloader:\s+([a-f0-9]){64}')[0]))
check_sha256(sha256_app, str(dut.expect(r'SHA-256 for current firmware:\s+([a-f0-9]){64}')[0]))
try:
ip_address = dut1.expect(re.compile(r' eth ip: ([^,]+),'), timeout=30)
ip_address = dut.expect(r' eth ip: ([^,]+),', timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
except pexpect.exceptions.TIMEOUT:
thread1.terminate()
dut1.expect('Starting OTA example', timeout=30)
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
dut.expect('Starting OTA example', timeout=30)
print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
dut1.write('https://' + host_ip + ':8000/simple_ota.bin')
dut1.expect('Writing to partition subtype 16 at offset 0x120000', timeout=20)
dut.write('https://' + host_ip + ':8000/simple_ota.bin')
dut.expect('Writing to partition subtype 16 at offset 0x120000', timeout=20)
dut1.expect('Verifying image signature...', timeout=60)
dut1.expect('#0 app key digest == #0 trusted key digest', timeout=10)
dut1.expect('Verifying with RSA-PSS...', timeout=10)
dut1.expect('Signature verified successfully!', timeout=10)
dut.expect('Verifying image signature...', timeout=60)
dut.expect('#0 app key digest == #0 trusted key digest', timeout=10)
dut.expect('Verifying with RSA-PSS...', timeout=10)
dut.expect('Signature verified successfully!', timeout=10)
dut1.expect('Loaded app from partition at offset 0x120000', timeout=20)
dut1.expect('Starting OTA example', timeout=30)
dut.expect('Loaded app from partition at offset 0x120000', timeout=20)
dut.expect('Starting OTA example', timeout=30)
thread1.terminate()
@ -382,10 +372,3 @@ if __name__ == '__main__':
start_https_server(bin_dir, '', port,
server_file=os.path.join(cert_dir, 'ca_cert.pem'),
key_file=os.path.join(cert_dir, 'ca_key.pem'))
else:
test_examples_protocol_simple_ota_example()
test_examples_protocol_simple_ota_example_ethernet_with_spiram_config()
test_examples_protocol_simple_ota_example_with_flash_encryption()
test_examples_protocol_simple_ota_example_with_flash_encryption_wifi()
test_examples_protocol_simple_ota_example_with_verify_app_signature_on_update_no_secure_boot_ecdsa()
test_examples_protocol_simple_ota_example_with_verify_app_signature_on_update_no_secure_boot_rsa()