fix: Refactored script for initiating Python-based HTTPS server

This commit refactors the script responsible for starting a Python-based HTTPS server
to align with the latest Python version's requirements and best practices.

Closes https://github.com/espressif/esp-idf/issues/13575
This commit is contained in:
nilesh.kale 2024-04-15 14:52:37 +05:30
parent 72f42db7e2
commit 553a117894
5 changed files with 55 additions and 48 deletions

View File

@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
# SPDX-FileCopyrightText: 2022-2024 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import http.server
import logging
@ -50,8 +50,10 @@ def start_https_server(server_file: str, key_file: str, server_ip: str, server_p
requestHandler = https_request_handler()
httpd = http.server.HTTPServer((server_ip, server_port), requestHandler)
httpd.socket = ssl.wrap_socket(httpd.socket, keyfile=key_file,
certfile=server_file, server_side=True)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(certfile=server_file, keyfile=key_file)
httpd.socket = ssl_context.wrap_socket(httpd.socket, server_side=True)
httpd.serve_forever()

View File

@ -50,9 +50,10 @@ def start_https_server(ota_image_dir: str, server_ip: str, server_port: int) ->
requestHandler = https_request_handler()
httpd = http.server.HTTPServer((server_ip, server_port), requestHandler)
httpd.socket = ssl.wrap_socket(httpd.socket,
keyfile=key_file,
certfile=server_file, server_side=True)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(certfile=server_file, keyfile=key_file)
httpd.socket = ssl_context.wrap_socket(httpd.socket, server_side=True)
httpd.serve_forever()
@ -88,9 +89,10 @@ def start_redirect_server(ota_image_dir: str, server_ip: str, server_port: int,
httpd = http.server.HTTPServer((server_ip, server_port), redirectHandler)
httpd.socket = ssl.wrap_socket(httpd.socket,
keyfile=key_file,
certfile=server_file, server_side=True)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(certfile=server_file, keyfile=key_file)
httpd.socket = ssl_context.wrap_socket(httpd.socket, server_side=True)
httpd.serve_forever()
@ -154,8 +156,8 @@ def test_examples_protocol_advanced_https_ota_example_truncated_bin(dut: Dut) ->
truncated_bin_size = 64000
binary_file = os.path.join(dut.app.binary_path, bin_name)
with open(binary_file, 'rb+') as f:
with open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+') as fo:
fo.write(f.read(truncated_bin_size))
with open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+') as output_file:
output_file.write(f.read(truncated_bin_size))
binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
# Start server
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port))
@ -187,7 +189,7 @@ def test_examples_protocol_advanced_https_ota_example_truncated_bin(dut: Dut) ->
@pytest.mark.ethernet_ota
def test_examples_protocol_advanced_https_ota_example_truncated_header(dut: Dut) -> None:
"""
Working of OTA if headers of binary file are truncated is vaildated in this test case.
Working of OTA if headers of binary file are truncated is validated in this test case.
Application should return with error message in this case.
steps: |
1. join AP/Ethernet
@ -205,8 +207,8 @@ def test_examples_protocol_advanced_https_ota_example_truncated_header(dut: Dut)
# check and log bin size
binary_file = os.path.join(dut.app.binary_path, bin_name)
with open(binary_file, 'rb+') as f:
with open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+') as fo:
fo.write(f.read(truncated_bin_size))
with open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+') as output_file:
output_file.write(f.read(truncated_bin_size))
binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
# Start server
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port))
@ -249,16 +251,16 @@ def test_examples_protocol_advanced_https_ota_example_random(dut: Dut) -> None:
server_port = 8001
# Random binary file to be generated
random_bin_name = 'random.bin'
# Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
# Size of random binary file. 32000 is chosen, to reduce the time required to run the test-case
random_bin_size = 32000
# check and log bin size
binary_file = os.path.join(dut.app.binary_path, random_bin_name)
with open(binary_file, 'wb+') as fo:
with open(binary_file, 'wb+') as output_file:
# First byte of binary file is always set to zero. If first byte is generated randomly,
# in some cases it may generate 0xE9 which will result in failure of testcase.
fo.write(struct.pack('B', 0))
output_file.write(struct.pack('B', 0))
for i in range(random_bin_size - 1):
fo.write(struct.pack('B', random.randrange(0,255,1)))
output_file.write(struct.pack('B', random.randrange(0,255,1)))
# Start server
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port))
thread1.daemon = True
@ -302,7 +304,7 @@ def test_examples_protocol_advanced_https_ota_example_invalid_chip_id(dut: Dut)
# Random binary file to be generated
random_bin_name = 'random.bin'
random_binary_file = os.path.join(dut.app.binary_path, random_bin_name)
# Size of random binary file. 2000 is choosen, to reduce the time required to run the test-case
# Size of random binary file. 2000 is chosen, to reduce the time required to run the test-case
random_bin_size = 2000
binary_file = os.path.join(dut.app.binary_path, bin_name)
@ -310,8 +312,8 @@ def test_examples_protocol_advanced_https_ota_example_invalid_chip_id(dut: Dut)
data = list(f.read(random_bin_size))
# Changing Chip id
data[13] = 0xfe
with open(random_binary_file, 'wb+') as fo:
fo.write(bytearray(data))
with open(random_binary_file, 'wb+') as output_file:
output_file.write(bytearray(data))
# Start server
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port))
thread1.daemon = True
@ -452,11 +454,11 @@ def test_examples_protocol_advanced_https_ota_example_anti_rollback(dut: Dut) ->
binary_file = os.path.join(dut.app.binary_path, bin_name)
file_size = os.path.getsize(binary_file)
with open(binary_file, 'rb+') as f:
with open(os.path.join(dut.app.binary_path, anti_rollback_bin_name), 'wb+') as fo:
fo.write(f.read(file_size))
with open(os.path.join(dut.app.binary_path, anti_rollback_bin_name), 'wb+') as output_file:
output_file.write(f.read(file_size))
# Change security_version to 0 for negative test case
fo.seek(36)
fo.write(b'\x00')
output_file.seek(36)
output_file.write(b'\x00')
binary_file = os.path.join(dut.app.binary_path, anti_rollback_bin_name)
# Start server
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port))
@ -666,10 +668,10 @@ def test_examples_protocol_advanced_https_ota_example_openssl_aligned_bin(dut: D
# Dummy data required to align binary size to 289 bytes boundary
dummy_data_size = 289 - (bin_size % 289)
with open(binary_file, 'rb+') as f:
with open(os.path.join(dut.app.binary_path, aligned_bin_name), 'wb+') as fo:
fo.write(f.read(bin_size))
with open(os.path.join(dut.app.binary_path, aligned_bin_name), 'wb+') as output_file:
output_file.write(f.read(bin_size))
for _ in range(dummy_data_size):
fo.write(struct.pack('B', random.randrange(0,255,1)))
output_file.write(struct.pack('B', random.randrange(0,255,1)))
# Start server
chunked_server = start_chunked_server(dut.app.binary_path, 8070)
try:

View File

@ -110,9 +110,10 @@ def start_https_server(ota_image_dir: str, server_ip: str, server_port: int) ->
requestHandler = https_request_handler()
httpd = http.server.HTTPServer((server_ip, server_port), requestHandler)
httpd.socket = ssl.wrap_socket(httpd.socket,
keyfile=key_file,
certfile=server_file, server_side=True)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(certfile=server_file, keyfile=key_file)
httpd.socket = ssl_context.wrap_socket(httpd.socket, server_side=True)
httpd.serve_forever()
@ -186,8 +187,8 @@ def test_examples_protocol_native_ota_example_truncated_bin(dut: Dut) -> None:
with open(binary_file, 'rb+') as fr:
bin_data = fr.read(truncated_bin_size)
binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
with open(binary_file, 'wb+') as fo:
fo.write(bin_data)
with open(binary_file, 'wb+') as output_file:
output_file.write(bin_data)
# Start server
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port))
thread1.daemon = True
@ -215,7 +216,7 @@ def test_examples_protocol_native_ota_example_truncated_bin(dut: Dut) -> None:
@pytest.mark.ethernet_ota
def test_examples_protocol_native_ota_example_truncated_header(dut: Dut) -> None:
"""
Working of OTA if headers of binary file are truncated is vaildated in this test case.
Working of OTA if headers of binary file are truncated is validated in this test case.
Application should return with error message in this case.
steps: |
1. join AP/Ethernet
@ -235,8 +236,8 @@ def test_examples_protocol_native_ota_example_truncated_header(dut: Dut) -> None
with open(binary_file, 'rb+') as fr:
bin_data = fr.read(truncated_bin_size)
binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
with open(binary_file, 'wb+') as fo:
fo.write(bin_data)
with open(binary_file, 'wb+') as output_file:
output_file.write(bin_data)
# Start server
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port))
thread1.daemon = True
@ -275,17 +276,17 @@ def test_examples_protocol_native_ota_example_random(dut: Dut) -> None:
server_port = 8002
# Random binary file to be generated
random_bin_name = 'random.bin'
# Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
# Size of random binary file. 32000 is chosen, to reduce the time required to run the test-case
random_bin_size = 32000
# check and log bin size
binary_file = os.path.join(dut.app.binary_path, random_bin_name)
fo = open(binary_file, 'wb+')
output_file = open(binary_file, 'wb+')
# First byte of binary file is always set to zero. If first byte is generated randomly,
# in some cases it may generate 0xE9 which will result in failure of testcase.
with open(binary_file, 'wb+') as fo:
fo.write(struct.pack('B', 0))
with open(binary_file, 'wb+') as output_file:
output_file.write(struct.pack('B', 0))
for _ in range(random_bin_size - 1):
fo.write(struct.pack('B', random.randrange(0,255,1)))
output_file.write(struct.pack('B', random.randrange(0,255,1)))
# Start server
thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port))
thread1.daemon = True

View File

@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
# SPDX-FileCopyrightText: 2022-2024 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import http.server
import multiprocessing
@ -46,9 +46,10 @@ def start_https_server(ota_image_dir: str, server_ip: str, server_port: int) ->
requestHandler = https_request_handler()
httpd = http.server.HTTPServer((server_ip, server_port), requestHandler)
httpd.socket = ssl.wrap_socket(httpd.socket,
keyfile=key_file,
certfile=server_file, server_side=True)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(certfile=server_file, keyfile=key_file)
httpd.socket = ssl_context.wrap_socket(httpd.socket, server_side=True)
httpd.serve_forever()

View File

@ -87,9 +87,10 @@ def start_https_server(ota_image_dir: str, server_ip: str, server_port: int, ser
httpd = http.server.HTTPServer((server_ip, server_port), http.server.SimpleHTTPRequestHandler)
httpd.socket = ssl.wrap_socket(httpd.socket,
keyfile=key_file,
certfile=server_file, server_side=True)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(certfile=server_file, keyfile=key_file)
httpd.socket = ssl_context.wrap_socket(httpd.socket, server_side=True)
httpd.serve_forever()