From 553a117894a14026dab2a97e51b1311dece01b3e Mon Sep 17 00:00:00 2001 From: "nilesh.kale" Date: Mon, 15 Apr 2024 14:52:37 +0530 Subject: [PATCH] fix: Refactored script for initiating Python-based HTTPS server This commit refactors the script responsible for starting a Python-based HTTPS server to align with the latest Python version's requirements and best practices. Closes https://github.com/espressif/esp-idf/issues/13575 --- .../https_request/pytest_https_request.py | 8 +-- .../advanced_https_ota/pytest_advanced_ota.py | 52 ++++++++++--------- .../native_ota_example/pytest_native_ota.py | 27 +++++----- .../pytest_pre_encrypted_ota.py | 9 ++-- .../simple_ota_example/pytest_simple_ota.py | 7 +-- 5 files changed, 55 insertions(+), 48 deletions(-) diff --git a/examples/protocols/https_request/pytest_https_request.py b/examples/protocols/https_request/pytest_https_request.py index 0afcda609e..340945937e 100644 --- a/examples/protocols/https_request/pytest_https_request.py +++ b/examples/protocols/https_request/pytest_https_request.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD +# SPDX-FileCopyrightText: 2022-2024 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Unlicense OR CC0-1.0 import http.server import logging @@ -50,8 +50,10 @@ def start_https_server(server_file: str, key_file: str, server_ip: str, server_p requestHandler = https_request_handler() httpd = http.server.HTTPServer((server_ip, server_port), requestHandler) - httpd.socket = ssl.wrap_socket(httpd.socket, keyfile=key_file, - certfile=server_file, server_side=True) + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ssl_context.load_cert_chain(certfile=server_file, keyfile=key_file) + + httpd.socket = ssl_context.wrap_socket(httpd.socket, server_side=True) httpd.serve_forever() diff --git a/examples/system/ota/advanced_https_ota/pytest_advanced_ota.py b/examples/system/ota/advanced_https_ota/pytest_advanced_ota.py index d25ca408e4..fbabc2c4e7 100644 --- a/examples/system/ota/advanced_https_ota/pytest_advanced_ota.py +++ b/examples/system/ota/advanced_https_ota/pytest_advanced_ota.py @@ -50,9 +50,10 @@ def start_https_server(ota_image_dir: str, server_ip: str, server_port: int) -> requestHandler = https_request_handler() httpd = http.server.HTTPServer((server_ip, server_port), requestHandler) - httpd.socket = ssl.wrap_socket(httpd.socket, - keyfile=key_file, - certfile=server_file, server_side=True) + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ssl_context.load_cert_chain(certfile=server_file, keyfile=key_file) + + httpd.socket = ssl_context.wrap_socket(httpd.socket, server_side=True) httpd.serve_forever() @@ -88,9 +89,10 @@ def start_redirect_server(ota_image_dir: str, server_ip: str, server_port: int, httpd = http.server.HTTPServer((server_ip, server_port), redirectHandler) - httpd.socket = ssl.wrap_socket(httpd.socket, - keyfile=key_file, - certfile=server_file, server_side=True) + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ssl_context.load_cert_chain(certfile=server_file, keyfile=key_file) + + httpd.socket = ssl_context.wrap_socket(httpd.socket, server_side=True) httpd.serve_forever() @@ -154,8 +156,8 @@ def test_examples_protocol_advanced_https_ota_example_truncated_bin(dut: Dut) -> truncated_bin_size = 64000 binary_file = os.path.join(dut.app.binary_path, bin_name) with open(binary_file, 'rb+') as f: - with open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+') as fo: - fo.write(f.read(truncated_bin_size)) + with open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+') as output_file: + output_file.write(f.read(truncated_bin_size)) binary_file = os.path.join(dut.app.binary_path, truncated_bin_name) # Start server thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port)) @@ -187,7 +189,7 @@ def test_examples_protocol_advanced_https_ota_example_truncated_bin(dut: Dut) -> @pytest.mark.ethernet_ota def test_examples_protocol_advanced_https_ota_example_truncated_header(dut: Dut) -> None: """ - Working of OTA if headers of binary file are truncated is vaildated in this test case. + Working of OTA if headers of binary file are truncated is validated in this test case. Application should return with error message in this case. steps: | 1. join AP/Ethernet @@ -205,8 +207,8 @@ def test_examples_protocol_advanced_https_ota_example_truncated_header(dut: Dut) # check and log bin size binary_file = os.path.join(dut.app.binary_path, bin_name) with open(binary_file, 'rb+') as f: - with open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+') as fo: - fo.write(f.read(truncated_bin_size)) + with open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+') as output_file: + output_file.write(f.read(truncated_bin_size)) binary_file = os.path.join(dut.app.binary_path, truncated_bin_name) # Start server thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port)) @@ -249,16 +251,16 @@ def test_examples_protocol_advanced_https_ota_example_random(dut: Dut) -> None: server_port = 8001 # Random binary file to be generated random_bin_name = 'random.bin' - # Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case + # Size of random binary file. 32000 is chosen, to reduce the time required to run the test-case random_bin_size = 32000 # check and log bin size binary_file = os.path.join(dut.app.binary_path, random_bin_name) - with open(binary_file, 'wb+') as fo: + with open(binary_file, 'wb+') as output_file: # First byte of binary file is always set to zero. If first byte is generated randomly, # in some cases it may generate 0xE9 which will result in failure of testcase. - fo.write(struct.pack('B', 0)) + output_file.write(struct.pack('B', 0)) for i in range(random_bin_size - 1): - fo.write(struct.pack('B', random.randrange(0,255,1))) + output_file.write(struct.pack('B', random.randrange(0,255,1))) # Start server thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port)) thread1.daemon = True @@ -302,7 +304,7 @@ def test_examples_protocol_advanced_https_ota_example_invalid_chip_id(dut: Dut) # Random binary file to be generated random_bin_name = 'random.bin' random_binary_file = os.path.join(dut.app.binary_path, random_bin_name) - # Size of random binary file. 2000 is choosen, to reduce the time required to run the test-case + # Size of random binary file. 2000 is chosen, to reduce the time required to run the test-case random_bin_size = 2000 binary_file = os.path.join(dut.app.binary_path, bin_name) @@ -310,8 +312,8 @@ def test_examples_protocol_advanced_https_ota_example_invalid_chip_id(dut: Dut) data = list(f.read(random_bin_size)) # Changing Chip id data[13] = 0xfe - with open(random_binary_file, 'wb+') as fo: - fo.write(bytearray(data)) + with open(random_binary_file, 'wb+') as output_file: + output_file.write(bytearray(data)) # Start server thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port)) thread1.daemon = True @@ -452,11 +454,11 @@ def test_examples_protocol_advanced_https_ota_example_anti_rollback(dut: Dut) -> binary_file = os.path.join(dut.app.binary_path, bin_name) file_size = os.path.getsize(binary_file) with open(binary_file, 'rb+') as f: - with open(os.path.join(dut.app.binary_path, anti_rollback_bin_name), 'wb+') as fo: - fo.write(f.read(file_size)) + with open(os.path.join(dut.app.binary_path, anti_rollback_bin_name), 'wb+') as output_file: + output_file.write(f.read(file_size)) # Change security_version to 0 for negative test case - fo.seek(36) - fo.write(b'\x00') + output_file.seek(36) + output_file.write(b'\x00') binary_file = os.path.join(dut.app.binary_path, anti_rollback_bin_name) # Start server thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port)) @@ -666,10 +668,10 @@ def test_examples_protocol_advanced_https_ota_example_openssl_aligned_bin(dut: D # Dummy data required to align binary size to 289 bytes boundary dummy_data_size = 289 - (bin_size % 289) with open(binary_file, 'rb+') as f: - with open(os.path.join(dut.app.binary_path, aligned_bin_name), 'wb+') as fo: - fo.write(f.read(bin_size)) + with open(os.path.join(dut.app.binary_path, aligned_bin_name), 'wb+') as output_file: + output_file.write(f.read(bin_size)) for _ in range(dummy_data_size): - fo.write(struct.pack('B', random.randrange(0,255,1))) + output_file.write(struct.pack('B', random.randrange(0,255,1))) # Start server chunked_server = start_chunked_server(dut.app.binary_path, 8070) try: diff --git a/examples/system/ota/native_ota_example/pytest_native_ota.py b/examples/system/ota/native_ota_example/pytest_native_ota.py index ba61b6c75c..3c0fcfe6db 100644 --- a/examples/system/ota/native_ota_example/pytest_native_ota.py +++ b/examples/system/ota/native_ota_example/pytest_native_ota.py @@ -110,9 +110,10 @@ def start_https_server(ota_image_dir: str, server_ip: str, server_port: int) -> requestHandler = https_request_handler() httpd = http.server.HTTPServer((server_ip, server_port), requestHandler) - httpd.socket = ssl.wrap_socket(httpd.socket, - keyfile=key_file, - certfile=server_file, server_side=True) + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ssl_context.load_cert_chain(certfile=server_file, keyfile=key_file) + + httpd.socket = ssl_context.wrap_socket(httpd.socket, server_side=True) httpd.serve_forever() @@ -186,8 +187,8 @@ def test_examples_protocol_native_ota_example_truncated_bin(dut: Dut) -> None: with open(binary_file, 'rb+') as fr: bin_data = fr.read(truncated_bin_size) binary_file = os.path.join(dut.app.binary_path, truncated_bin_name) - with open(binary_file, 'wb+') as fo: - fo.write(bin_data) + with open(binary_file, 'wb+') as output_file: + output_file.write(bin_data) # Start server thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port)) thread1.daemon = True @@ -215,7 +216,7 @@ def test_examples_protocol_native_ota_example_truncated_bin(dut: Dut) -> None: @pytest.mark.ethernet_ota def test_examples_protocol_native_ota_example_truncated_header(dut: Dut) -> None: """ - Working of OTA if headers of binary file are truncated is vaildated in this test case. + Working of OTA if headers of binary file are truncated is validated in this test case. Application should return with error message in this case. steps: | 1. join AP/Ethernet @@ -235,8 +236,8 @@ def test_examples_protocol_native_ota_example_truncated_header(dut: Dut) -> None with open(binary_file, 'rb+') as fr: bin_data = fr.read(truncated_bin_size) binary_file = os.path.join(dut.app.binary_path, truncated_bin_name) - with open(binary_file, 'wb+') as fo: - fo.write(bin_data) + with open(binary_file, 'wb+') as output_file: + output_file.write(bin_data) # Start server thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port)) thread1.daemon = True @@ -275,17 +276,17 @@ def test_examples_protocol_native_ota_example_random(dut: Dut) -> None: server_port = 8002 # Random binary file to be generated random_bin_name = 'random.bin' - # Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case + # Size of random binary file. 32000 is chosen, to reduce the time required to run the test-case random_bin_size = 32000 # check and log bin size binary_file = os.path.join(dut.app.binary_path, random_bin_name) - fo = open(binary_file, 'wb+') + output_file = open(binary_file, 'wb+') # First byte of binary file is always set to zero. If first byte is generated randomly, # in some cases it may generate 0xE9 which will result in failure of testcase. - with open(binary_file, 'wb+') as fo: - fo.write(struct.pack('B', 0)) + with open(binary_file, 'wb+') as output_file: + output_file.write(struct.pack('B', 0)) for _ in range(random_bin_size - 1): - fo.write(struct.pack('B', random.randrange(0,255,1))) + output_file.write(struct.pack('B', random.randrange(0,255,1))) # Start server thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port)) thread1.daemon = True diff --git a/examples/system/ota/pre_encrypted_ota/pytest_pre_encrypted_ota.py b/examples/system/ota/pre_encrypted_ota/pytest_pre_encrypted_ota.py index 85f27c3148..c232173574 100644 --- a/examples/system/ota/pre_encrypted_ota/pytest_pre_encrypted_ota.py +++ b/examples/system/ota/pre_encrypted_ota/pytest_pre_encrypted_ota.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD +# SPDX-FileCopyrightText: 2022-2024 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Unlicense OR CC0-1.0 import http.server import multiprocessing @@ -46,9 +46,10 @@ def start_https_server(ota_image_dir: str, server_ip: str, server_port: int) -> requestHandler = https_request_handler() httpd = http.server.HTTPServer((server_ip, server_port), requestHandler) - httpd.socket = ssl.wrap_socket(httpd.socket, - keyfile=key_file, - certfile=server_file, server_side=True) + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ssl_context.load_cert_chain(certfile=server_file, keyfile=key_file) + + httpd.socket = ssl_context.wrap_socket(httpd.socket, server_side=True) httpd.serve_forever() diff --git a/examples/system/ota/simple_ota_example/pytest_simple_ota.py b/examples/system/ota/simple_ota_example/pytest_simple_ota.py index a6f4a32679..0595a9c2f7 100644 --- a/examples/system/ota/simple_ota_example/pytest_simple_ota.py +++ b/examples/system/ota/simple_ota_example/pytest_simple_ota.py @@ -87,9 +87,10 @@ def start_https_server(ota_image_dir: str, server_ip: str, server_port: int, ser httpd = http.server.HTTPServer((server_ip, server_port), http.server.SimpleHTTPRequestHandler) - httpd.socket = ssl.wrap_socket(httpd.socket, - keyfile=key_file, - certfile=server_file, server_side=True) + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ssl_context.load_cert_chain(certfile=server_file, keyfile=key_file) + + httpd.socket = ssl_context.wrap_socket(httpd.socket, server_side=True) httpd.serve_forever()