diff --git a/.gitlab/ci/target-test.yml b/.gitlab/ci/target-test.yml index 9629fcf916..3899d71bf1 100644 --- a/.gitlab/ci/target-test.yml +++ b/.gitlab/ci/target-test.yml @@ -244,6 +244,13 @@ example_test_pytest_esp32s3_wifi_router: needs: - build_pytest_examples_esp32s3 tags: [ esp32s3, wifi_router ] +example_test_pytest_esp32_wifi_iperf: + extends: + - .pytest_examples_dir_template + - .rules:test:example_test-esp32-wifi + needs: + - build_pytest_examples_esp32 + tags: [ esp32, Example_ShieldBox_Basic ] example_test_pytest_esp32_wifi_wlan: extends: @@ -253,6 +260,14 @@ example_test_pytest_esp32_wifi_wlan: - build_pytest_examples_esp32 tags: [ esp32, wifi_wlan ] +example_test_pytest_esp32_ethernet_router: + extends: + - .pytest_examples_dir_template + - .rules:test:example_test-esp32-ethernet + needs: + - build_pytest_examples_esp32 + tags: [ esp32, ethernet_router ] + example_test_pytest_esp32_ethernet_ip101: extends: - .pytest_examples_dir_template @@ -988,22 +1003,6 @@ example_test_001C: - ESP32 - Example_GENERIC -example_test_002: - extends: - - .example_test_esp32_template - - .rules:test:example_test-esp32-wifi - tags: - - ESP32 - - Example_ShieldBox_Basic - -example_test_ethernet_router: - extends: - - .example_test_esp32_template - - .rules:test:example_test-esp32-ethernet - tags: - - ESP32 - - ethernet_router - .example_test_003: extends: .example_test_esp32_template tags: @@ -1107,12 +1106,6 @@ example_test_C6_GENERIC: - .test_app_template - .rules:test:custom_test-esp32s3 -test_app_test_eth: - extends: .test_app_esp32_template - tags: - - ESP32 - - ethernet_router - .unit_test_template: extends: .target_test_job_template needs: # the assign already needs all the build jobs diff --git a/conftest.py b/conftest.py index 2b4a4f0bc2..fdb849bac8 100644 --- a/conftest.py +++ b/conftest.py @@ -102,6 +102,8 @@ ENV_MARKERS = { 'wifi_router': 'both the runner and dut connect to the same wifi router', 'wifi_high_traffic': 'wifi high traffic runners', 'wifi_wlan': 'wifi runner with a wireless NIC', + 'Example_ShieldBox_Basic': 'basic configuration of the AP and ESP DUT placed in shielded box', + 'Example_ShieldBox': 'multiple shielded APs connected to shielded ESP DUT via RF cable with programmable attenuator', 'xtal_26mhz': 'runner with 26MHz xtal on board', 'xtal_40mhz': 'runner with 40MHz xtal on board', 'external_flash': 'external flash memory connected via VSPI (FSPI)', diff --git a/examples/ethernet/iperf/iperf_test.py b/examples/ethernet/iperf/pytest_eth_iperf.py similarity index 53% rename from examples/ethernet/iperf/iperf_test.py rename to examples/ethernet/iperf/pytest_eth_iperf.py index 5bd30bd2f3..ce7be622b1 100644 --- a/examples/ethernet/iperf/iperf_test.py +++ b/examples/ethernet/iperf/pytest_eth_iperf.py @@ -1,25 +1,25 @@ +# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Unlicense OR CC0-1.0 """ Test case for iperf example. -This test case might have problem running on windows: +This test case might have problem running on Windows: -1. direct use of `make` -2. use `sudo killall iperf` to force kill iperf, didn't implement windows version +- use `sudo killall iperf` to force kill iperf, didn't implement windows version """ from __future__ import division, unicode_literals import os -import re import subprocess -import ttfw_idf +import pytest from common_test_methods import get_host_ip4_by_dest_ip from idf_iperf_test_util import IperfUtility -from tiny_test_fw import TinyFW +from pytest_embedded import Dut try: - from typing import Any, Tuple + from typing import Any, Callable, Tuple except ImportError: # Only used for type annotations pass @@ -29,10 +29,10 @@ NO_BANDWIDTH_LIMIT = -1 # iperf send bandwidth is not limited class IperfTestUtilityEth(IperfUtility.IperfTestUtility): """ iperf test implementation """ - def __init__(self, dut, config_name, pc_nic_ip, pc_iperf_log_file, test_result=None): # type: (str, str, str,str, Any) -> None + def __init__(self, dut: str, config_name: str, pc_nic_ip: str, pc_iperf_log_file: str, test_result:Any=None) -> None: IperfUtility.IperfTestUtility.__init__(self, dut, config_name, 'None', 'None', pc_nic_ip, pc_iperf_log_file, test_result) - def setup(self): # type: () -> Tuple[str,int] + def setup(self) -> Tuple[str,int]: """ setup iperf test: @@ -45,65 +45,53 @@ class IperfTestUtilityEth(IperfUtility.IperfTestUtility): pass self.dut.write('restart') self.dut.expect("Type 'help' to get the list of commands.") - self.dut.expect_any('iperf>', 'esp32>') - self.dut.write('ethernet start') - dut_ip = self.dut.expect(re.compile(r'esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),'))[0] + self.dut.expect('iperf>') + dut_ip = self.dut.expect(r'esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),').group(1) rssi = 0 return dut_ip, rssi -@ttfw_idf.idf_example_test(env_tag='ethernet_router') -def test_ethernet_throughput_basic(env, _): # type: (Any, Any) -> None +@pytest.mark.esp32 +@pytest.mark.ethernet_router +def test_esp_eth_iperf( + dut: Dut, + log_performance: Callable[[str, object], None], + check_performance: Callable[[str, float, str], None], +) -> None: """ steps: | 1. test TCP tx rx and UDP tx rx throughput 2. compare with the pre-defined pass standard """ - pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md') - # 1. get DUT - dut = env.get_dut('iperf', 'examples/ethernet/iperf', dut_class=ttfw_idf.ESP32DUT) - dut.start_app() - dut.expect_any('iperf>', 'esp32>') + # 1. wait for DUT + dut.expect_exact('iperf>') # 2. preparing - dut.write('ethernet start') - dut_ip = dut.expect(re.compile(r'esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),'))[0] + pc_iperf_log_file = os.path.join(dut.logdir, 'pc_iperf_log.md') + dut_ip = dut.expect(r'esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),').group(1) pc_nic_ip = get_host_ip4_by_dest_ip(dut_ip) - test_result = { 'tcp_tx': IperfUtility.TestResult('tcp', 'tx', 'ethernet'), 'tcp_rx': IperfUtility.TestResult('tcp', 'rx', 'ethernet'), 'udp_tx': IperfUtility.TestResult('udp', 'tx', 'ethernet'), 'udp_rx': IperfUtility.TestResult('udp', 'rx', 'ethernet'), } - test_utility = IperfTestUtilityEth(dut, 'ethernet', pc_nic_ip, pc_iperf_log_file, test_result) # 3. run test for TCP Tx, Rx and UDP Tx, Rx - test_utility.run_test('tcp', 'tx', 0, NO_BANDWIDTH_LIMIT) test_utility.run_test('tcp', 'rx', 0, NO_BANDWIDTH_LIMIT) test_utility.run_test('udp', 'tx', 0, 80) test_utility.run_test('udp', 'rx', 0, NO_BANDWIDTH_LIMIT) # 4. log performance and compare with pass standard - performance_items = [] for throughput_type in test_result: - ttfw_idf.log_performance('{}_throughput'.format(throughput_type), - '{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput())) - performance_items.append(['{}_throughput'.format(throughput_type), - '{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput())]) + log_performance('{}_throughput'.format(throughput_type), + '{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput())) - # 5. save to report - TinyFW.JunitReport.update_performance(performance_items) # do check after logging, otherwise test will exit immediately if check fail, some performance can't be logged. for throughput_type in test_result: - ttfw_idf.check_performance('{}_throughput'.format(throughput_type + '_eth'), - test_result[throughput_type].get_best_throughput(), dut.TARGET) - - env.close_dut('iperf') - - -if __name__ == '__main__': - test_ethernet_throughput_basic(env_config_file='EnvConfig.yml') + check_performance('{}_throughput'.format(throughput_type + '_eth'), + test_result[throughput_type].get_best_throughput(), + dut.target) diff --git a/examples/wifi/iperf/iperf_test.py b/examples/wifi/iperf/iperf_test.py deleted file mode 100644 index a2a83899b0..0000000000 --- a/examples/wifi/iperf/iperf_test.py +++ /dev/null @@ -1,353 +0,0 @@ -""" -Test case for iperf example. - -This test case might have problem running on windows: - -1. direct use of `make` -2. use `sudo killall iperf` to force kill iperf, didn't implement windows version - -The test env Example_ShieldBox do need the following config:: - - Example_ShieldBox: - ap_list: - - ssid: "ssid" - password: "password" - outlet: 1 - apc_ip: "192.168.1.88" - attenuator_port: "/dev/ttyUSB0" - iperf: "/dev/ttyUSB1" - apc_ip: "192.168.1.88" - pc_nic: "eth0" -""" -import os -import re -import subprocess -import time - -import ttfw_idf -from idf_iperf_test_util import Attenuator, IperfUtility, PowerControl, TestReport -from idf_iperf_test_util.IperfUtility import SCAN_RETRY_COUNT, SCAN_TIMEOUT, TEST_TIME -from tiny_test_fw import DUT, TinyFW, Utility - -# configurations -RETRY_COUNT_FOR_BEST_PERFORMANCE = 2 -ATTEN_VALUE_LIST = range(0, 60, 2) -NO_BANDWIDTH_LIMIT = -1 # iperf send bandwith is not limited - -CONFIG_NAME_PATTERN = re.compile(r'sdkconfig\.ci\.(.+)') - -# We need to auto compare the difference between adjacent configs (01 -> 00, 02 -> 01, ...) and put them to reports. -# Using numbers for config will make this easy. -# Use default value `99` for config with best performance. -BEST_PERFORMANCE_CONFIG = '99' - - -class IperfTestUtilitySoftap(IperfUtility.IperfTestUtility): - """ iperf test implementation """ - def __init__(self, dut, softap_dut, config_name, test_result=None): - IperfUtility.IperfTestUtility.__init__(self, dut, config_name, 'softap', '1234567890', None, None, test_result) - self.softap_dut = softap_dut - self.softap_ip = '192.168.4.1' - - def setup(self): - """ - setup iperf test: - - 1. kill current iperf process - 2. reboot DUT (currently iperf is not very robust, need to reboot DUT) - 3. scan to get AP RSSI - 4. connect to AP - """ - self.softap_dut.write('restart') - self.softap_dut.expect_any('iperf>', 'esp32>', timeout=30) - self.softap_dut.write('ap {} {}'.format(self.ap_ssid, self.ap_password)) - self.dut.write('restart') - self.dut.expect_any('iperf>', 'esp32>', timeout=30) - self.dut.write('scan {}'.format(self.ap_ssid)) - for _ in range(SCAN_RETRY_COUNT): - try: - rssi = int(self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)), - timeout=SCAN_TIMEOUT)[0]) - break - except DUT.ExpectTimeout: - continue - else: - raise AssertionError('Failed to scan AP') - self.dut.write('sta {} {}'.format(self.ap_ssid, self.ap_password)) - dut_ip = self.dut.expect(re.compile(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)'))[0] - return dut_ip, rssi - - def _test_once(self, proto, direction): - """ do measure once for one type """ - # connect and scan to get RSSI - dut_ip, rssi = self.setup() - - assert direction in ['rx', 'tx'] - assert proto in ['tcp', 'udp'] - - # run iperf test - if direction == 'tx': - if proto == 'tcp': - self.softap_dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME)) - # wait until DUT TCP server created - try: - self.softap_dut.expect('iperf tcp server create successfully', timeout=1) - except DUT.ExpectTimeout: - # compatible with old iperf example binary - pass - self.dut.write('iperf -c {} -i 1 -t {}'.format(self.softap_ip, TEST_TIME)) - else: - self.softap_dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME)) - self.dut.write('iperf -c {} -u -i 1 -t {}'.format(self.softap_ip, TEST_TIME)) - else: - if proto == 'tcp': - self.dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME)) - # wait until DUT TCP server created - try: - self.dut.expect('iperf tcp server create successfully', timeout=1) - except DUT.ExpectTimeout: - # compatible with old iperf example binary - pass - self.softap_dut.write('iperf -c {} -i 1 -t {}'.format(dut_ip, TEST_TIME)) - else: - self.dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME)) - self.softap_dut.write('iperf -c {} -u -i 1 -t {}'.format(dut_ip, TEST_TIME)) - time.sleep(60) - - if direction == 'tx': - server_raw_data = self.dut.read() - else: - server_raw_data = self.softap_dut.read() - self.dut.write('iperf -a') - self.softap_dut.write('iperf -a') - self.dut.write('heap') - heap_size = self.dut.expect(re.compile(r'min heap size: (\d+)\D'))[0] - - # return server raw data (for parsing test results) and RSSI - return server_raw_data, rssi, heap_size - - -@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox_Basic', target=['ESP32', 'ESP32S2', 'ESP32C3', 'ESP32S3'], category='stress') -def test_wifi_throughput_with_different_configs(env, extra_data): - """ - steps: | - 1. build iperf with specified configs - 2. test throughput for all routers - """ - pc_nic_ip = env.get_pc_nic_info('pc_nic', 'ipv4')['addr'] - pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md') - ap_info = { - 'ssid': env.get_variable('ap_ssid'), - 'password': env.get_variable('ap_password'), - } - - config_names_raw = subprocess.check_output(['ls', os.path.dirname(os.path.abspath(__file__))]) - config_names = CONFIG_NAME_PATTERN.findall(config_names_raw) - if not config_names: - raise ValueError('no configs found in {}'.format(os.path.dirname(__file__))) - - test_result = dict() - sdkconfig_files = dict() - - for config_name in config_names: - # 1. get the config - sdkconfig_files[config_name] = os.path.join(os.path.dirname(__file__), - 'sdkconfig.ci.{}'.format(config_name)) - - # 2. get DUT and download - dut = env.get_dut('iperf', 'examples/wifi/iperf', app_config_name=config_name) - dut.start_app() - dut.expect_any('iperf>', 'esp32>') - - # 3. run test for each required att value - test_result[config_name] = { - 'tcp_tx': IperfUtility.TestResult('tcp', 'tx', config_name), - 'tcp_rx': IperfUtility.TestResult('tcp', 'rx', config_name), - 'udp_tx': IperfUtility.TestResult('udp', 'tx', config_name), - 'udp_rx': IperfUtility.TestResult('udp', 'rx', config_name), - } - - test_utility = IperfUtility.IperfTestUtility(dut, config_name, ap_info['ssid'], ap_info['password'], pc_nic_ip, - pc_iperf_log_file, test_result[config_name]) - - for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE): - test_utility.run_all_cases(0, NO_BANDWIDTH_LIMIT) - - for result_type in test_result[config_name]: - summary = str(test_result[config_name][result_type]) - if summary: - Utility.console_log(summary, color='orange') - - # 4. check test results - env.close_dut('iperf') - - # 5. generate report - report = TestReport.ThroughputForConfigsReport(os.path.join(env.log_path, 'Performance', - 'ThroughputForConfigsReport'), - ap_info['ssid'], test_result, sdkconfig_files) - report.generate_report() - - -@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox', target=['ESP32', 'ESP32S2', 'ESP32C3', 'ESP32S3'], category='stress') -def test_wifi_throughput_vs_rssi(env, extra_data): - """ - steps: | - 1. build with best performance config - 2. switch on one router - 3. set attenuator value from 0-60 for each router - 4. test TCP tx rx and UDP tx rx throughput - """ - att_port = env.get_variable('attenuator_port') - ap_list = env.get_variable('ap_list') - pc_nic_ip = env.get_pc_nic_info('pc_nic', 'ipv4')['addr'] - apc_ip = env.get_variable('apc_ip') - pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md') - - test_result = { - 'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG), - 'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG), - 'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG), - 'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG), - } - - # 1. get DUT and download - dut = env.get_dut('iperf', 'examples/wifi/iperf', app_config_name=BEST_PERFORMANCE_CONFIG) - dut.start_app() - dut.expect_any('iperf>', 'esp32>') - - # 2. run test for each required att value - for ap_info in ap_list: - test_utility = IperfUtility.IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'], - ap_info['password'], pc_nic_ip, pc_iperf_log_file, test_result) - - PowerControl.Control.control_rest(apc_ip, ap_info['outlet'], 'OFF') - PowerControl.Control.control(apc_ip, {ap_info['outlet']: 'ON'}) - Attenuator.set_att(att_port, 0) - - if not test_utility.wait_ap_power_on(): - Utility.console_log('[{}] failed to power on, skip testing this AP' - .format(ap_info['ssid']), color='red') - continue - - for atten_val in ATTEN_VALUE_LIST: - assert Attenuator.set_att(att_port, atten_val) is True - try: - test_utility.run_all_cases(atten_val, NO_BANDWIDTH_LIMIT) - except AssertionError: - break - - # 3. check test results - env.close_dut('iperf') - - # 4. generate report - report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, 'Performance', 'STAThroughputVsRssiReport'), - test_result) - report.generate_report() - - -@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox_Basic', - target=['ESP32', 'ESP32S2', 'ESP32C3', 'ESP32S3'], ci_target=['ESP32']) -def test_wifi_throughput_basic(env, extra_data): - """ - steps: | - 1. test TCP tx rx and UDP tx rx throughput - 2. compare with the pre-defined pass standard - """ - pc_nic_ip = env.get_pc_nic_info('pc_nic', 'ipv4')['addr'] - pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md') - ap_info = { - 'ssid': env.get_variable('ap_ssid'), - 'password': env.get_variable('ap_password'), - } - - # 1. get DUT - dut = env.get_dut('iperf', 'examples/wifi/iperf', app_config_name=BEST_PERFORMANCE_CONFIG) - dut.start_app() - dut.expect_any('iperf>', 'esp32>') - - # 2. preparing - test_result = { - 'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG), - 'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG), - 'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG), - 'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG), - } - - test_utility = IperfUtility.IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'], ap_info['password'], - pc_nic_ip, pc_iperf_log_file, test_result) - - # 3. run test for TCP Tx, Rx and UDP Tx, Rx - for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE): - test_utility.run_all_cases(0, NO_BANDWIDTH_LIMIT) - - # 4. log performance and compare with pass standard - performance_items = [] - for throughput_type in test_result: - ttfw_idf.log_performance('{}_throughput'.format(throughput_type), - '{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput())) - performance_items.append(['{}_throughput'.format(throughput_type), - '{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput())]) - - # 5. save to report - TinyFW.JunitReport.update_performance(performance_items) - # do check after logging, otherwise test will exit immediately if check fail, some performance can't be logged. - for throughput_type in test_result: - ttfw_idf.check_performance('{}_throughput'.format(throughput_type), - test_result[throughput_type].get_best_throughput(), dut.TARGET) - - env.close_dut('iperf') - - -@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox2', target=['ESP32', 'ESP32S2', 'ESP32C3', 'ESP32S3'], category='stress') -def test_softap_throughput_vs_rssi(env, extra_data): - """ - steps: | - 1. build with best performance config - 2. switch on one router - 3. set attenuator value from 0-60 for each router - 4. test TCP tx rx and UDP tx rx throughput - """ - att_port = env.get_variable('attenuator_port') - - test_result = { - 'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG), - 'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG), - 'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG), - 'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG), - } - - # 1. get DUT and download - softap_dut = env.get_dut('softap_iperf', 'examples/wifi/iperf') - softap_dut.start_app() - softap_dut.expect_any('iperf>', 'esp32>') - - sta_dut = env.get_dut('sta_iperf', 'examples/wifi/iperf', app_config_name=BEST_PERFORMANCE_CONFIG) - sta_dut.start_app() - sta_dut.expect_any('iperf>', 'esp32>') - - # 2. run test for each required att value - test_utility = IperfTestUtilitySoftap(sta_dut, softap_dut, BEST_PERFORMANCE_CONFIG, test_result) - - Attenuator.set_att(att_port, 0) - - for atten_val in ATTEN_VALUE_LIST: - assert Attenuator.set_att(att_port, atten_val) is True - try: - test_utility.run_all_cases(atten_val, NO_BANDWIDTH_LIMIT) - except AssertionError: - break - - env.close_dut('softap_iperf') - env.close_dut('sta_iperf') - - # 3. generate report - report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, 'Performance', - 'SoftAPThroughputVsRssiReport'),test_result) - report.generate_report() - - -if __name__ == '__main__': - # test_wifi_throughput_basic(env_config_file='EnvConfig.yml') - # test_wifi_throughput_with_different_configs(env_config_file='EnvConfig.yml') - test_wifi_throughput_vs_rssi(env_config_file='EnvConfig.yml', target='ESP32C3') - test_softap_throughput_vs_rssi(env_config_file='EnvConfig.yml') diff --git a/examples/wifi/iperf/pytest_iperf.py b/examples/wifi/iperf/pytest_iperf.py new file mode 100644 index 0000000000..7f609705b9 --- /dev/null +++ b/examples/wifi/iperf/pytest_iperf.py @@ -0,0 +1,418 @@ +# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Unlicense OR CC0-1.0 +""" +Test case for iperf example. + +This test case might have problem running on windows: + +- use `sudo killall iperf` to force kill iperf, didn't implement windows version + +The test env Example_ShieldBox do need the following config:: + + Example_ShieldBox: + ap_list: + - ssid: "ssid" + password: "password" + outlet: 1 + apc_ip: "192.168.1.88" + attenuator_port: "/dev/ttyUSB0" + iperf: "/dev/ttyUSB1" + apc_ip: "192.168.1.88" + pc_nic: "eth0" +""" + +import logging +import os +import re +import time +from typing import Any, Callable, Dict, Generator, Tuple + +import pexpect +import pytest +from common_test_methods import get_env_config_variable, get_host_ip_by_interface +from idf_iperf_test_util import Attenuator, IperfUtility, PowerControl, TestReport +from idf_iperf_test_util.IperfUtility import SCAN_RETRY_COUNT, SCAN_TIMEOUT, TEST_TIME +from pytest_embedded import Dut +from pytest_embedded_idf.dut import IdfDut + +# configurations +RETRY_COUNT_FOR_BEST_PERFORMANCE = 2 +ATTEN_VALUE_LIST = range(0, 60, 2) +NO_BANDWIDTH_LIMIT = -1 # iperf send bandwith is not limited + +# We need to auto compare the difference between adjacent configs (01 -> 00, 02 -> 01, ...) and put them to reports. +# Using numbers for config will make this easy. +# Use default value `99` for config with best performance. +BEST_PERFORMANCE_CONFIG = '99' + + +class IperfTestUtilitySoftap(IperfUtility.IperfTestUtility): + """ iperf test implementation """ + def __init__(self, dut:IdfDut, softap_dut:IdfDut, config_name:str, test_result:Any=None) -> None: + IperfUtility.IperfTestUtility.__init__(self, dut, config_name, 'softap', '1234567890', None, None, test_result) + self.softap_dut = softap_dut + self.softap_ip = '192.168.4.1' + + def setup(self) -> Tuple[str,int]: + """ + setup iperf test: + + 1. kill current iperf process + 2. reboot DUT (currently iperf is not very robust, need to reboot DUT) + 3. scan to get AP RSSI + 4. connect to AP + """ + self.softap_dut.write('restart') + self.softap_dut.expect_exact("Type 'help' to get the list of commands.") + self.softap_dut.expect('iperf>', timeout=30) + self.softap_dut.write('ap {} {}'.format(self.ap_ssid, self.ap_password)) + self.dut.write('restart') + self.dut.expect_exact("Type 'help' to get the list of commands.") + self.dut.expect('iperf>', timeout=30) + self.dut.write('scan {}'.format(self.ap_ssid)) + for _ in range(SCAN_RETRY_COUNT): + try: + rssi = int(self.dut.expect(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid), + timeout=SCAN_TIMEOUT).group(1)) + break + except pexpect.TIMEOUT: + continue + else: + raise AssertionError('Failed to scan AP') + self.dut.write('sta {} {}'.format(self.ap_ssid, self.ap_password)) + dut_ip = self.dut.expect(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)').group(1).decode('utf-8') + return dut_ip, rssi + + def _test_once(self, proto:str, direction:str, bw_limit:int) -> Tuple[str, int, int]: + """ do measure once for one type """ + # connect and scan to get RSSI + dut_ip, rssi = self.setup() + + assert direction in ['rx', 'tx'] + assert proto in ['tcp', 'udp'] + + # run iperf test + if direction == 'tx': + if proto == 'tcp': + self.softap_dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME)) + # wait until DUT TCP server created + try: + self.softap_dut.expect('iperf tcp server create successfully', timeout=1) + except pexpect.TIMEOUT: + # compatible with old iperf example binary + pass + if bw_limit > 0: + self.dut.write('iperf -c {} -i 1 -t {} -b {}'.format(self.softap_ip, TEST_TIME, bw_limit)) + else: + self.dut.write('iperf -c {} -i 1 -t {}'.format(self.softap_ip, TEST_TIME)) + else: + self.softap_dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME)) + if bw_limit > 0: + self.dut.write('iperf -c {} -u -i 1 -t {} -b {}'.format(self.softap_ip, TEST_TIME, bw_limit)) + else: + self.dut.write('iperf -c {} -u -i 1 -t {}'.format(self.softap_ip, TEST_TIME)) + else: + if proto == 'tcp': + self.dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME)) + # wait until DUT TCP server created + try: + self.dut.expect('iperf tcp server create successfully', timeout=1) + except pexpect.TIMEOUT: + # compatible with old iperf example binary + pass + if bw_limit > 0: + self.softap_dut.write('iperf -c {} -i 1 -t {} -b {}'.format(dut_ip, TEST_TIME, bw_limit)) + else: + self.softap_dut.write('iperf -c {} -i 1 -t {}'.format(dut_ip, TEST_TIME)) + else: + self.dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME)) + if bw_limit > 0: + self.softap_dut.write('iperf -c {} -u -i 1 -t {} -b {}'.format(dut_ip, TEST_TIME, bw_limit)) + else: + self.softap_dut.write('iperf -c {} -u -i 1 -t {}'.format(dut_ip, TEST_TIME)) + time.sleep(TEST_TIME + 5) + + if direction == 'tx': + server_raw_data = self.dut.expect(pexpect.TIMEOUT, timeout=0).decode('utf-8') + else: + server_raw_data = self.dut.expect(pexpect.TIMEOUT, timeout=0).decode('utf-8') + self.dut.write('iperf -a') + self.softap_dut.write('iperf -a') + self.dut.write('heap') + heap_size = self.dut.expect(r'min heap size: (\d+)\D').group(1) + + # return server raw data (for parsing test results) and RSSI + return server_raw_data, rssi, heap_size + + +@pytest.fixture(name='generate_report_different_configs', scope='session') +def fixture_generate_report_different_configs( + session_tempdir:str +) -> Generator[Callable[[Dict[str, Any], Dict[str, Any], str], None], None, None]: + _test_result_dict = dict() + _sdkconfig_files_dict = dict() + _ap_info = dict() + + def add_config(ap_info:Dict[str, Any], test_result:Dict[str, Any], config_name:str) -> None: + """ + Collects results for each config and stores it to a dictionary + Args: + ap_info: AP info + test_result: test results for a specific config + config_name: config name + """ + # need to store the SSID to generate the report in the teardown period + # note that the info passed along with the last call of the fixture is used in the teardown period + _ap_info['ssid'] = ap_info['ssid'] + + _test_result_dict[config_name] = test_result + _sdkconfig_files_dict[config_name] = 'sdkconfig.ci.' + config_name + + yield add_config + + # the final report for all config results is generated during fixture's teardown period + report = TestReport.ThroughputForConfigsReport(os.path.join(session_tempdir, 'Performance', + 'ThroughputForConfigsReport'), _ap_info['ssid'], + _test_result_dict, _sdkconfig_files_dict) + report.generate_report() + + +@pytest.mark.esp32 +@pytest.mark.esp32s2 +@pytest.mark.esp32c3 +@pytest.mark.esp32s3 +@pytest.mark.temp_skip_ci(targets=['esp32s2', 'esp32c3', 'esp32s3'], reason='lack of runners (run only for ESP32)') +@pytest.mark.timeout(1200) +@pytest.mark.Example_ShieldBox_Basic +@pytest.mark.parametrize('config', [ + BEST_PERFORMANCE_CONFIG +], indirect=True) +def test_wifi_throughput_basic( + dut: Dut, + log_performance: Callable[[str, str], None], + check_performance: Callable[[str, float, str], None], +) -> None: + """ + steps: | + 1. test TCP tx rx and UDP tx rx throughput + 2. compare with the pre-defined pass standard + """ + # 1. wait for DUT + dut.expect('iperf>') + + # 2. preparing + env_name = 'Example_ShieldBox_Basic' + pc_nic = get_env_config_variable(env_name, 'pc_nic') + pc_nic_ip = get_host_ip_by_interface(pc_nic) + pc_iperf_log_file = os.path.join(dut.logdir, 'pc_iperf_log.md') + ap_info = { + 'ssid': get_env_config_variable(env_name, 'ap_ssid'), + 'password': get_env_config_variable(env_name, 'ap_password'), + } + + test_result = { + 'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG), + 'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG), + 'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG), + 'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG), + } + + test_utility = IperfUtility.IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'], ap_info['password'], + pc_nic_ip, pc_iperf_log_file, test_result) + + # 3. run test for TCP Tx, Rx and UDP Tx, Rx + for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE): + test_utility.run_all_cases(0, NO_BANDWIDTH_LIMIT) + + # 4. log performance and compare with pass standard + for throughput_type in test_result: + log_performance('{}_throughput'.format(throughput_type), + '{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput())) + + # do check after logging, otherwise test will exit immediately if check fail, some performance can't be logged. + for throughput_type in test_result: + check_performance('{}_throughput'.format(throughput_type), + test_result[throughput_type].get_best_throughput(), dut.target) + + +@pytest.mark.esp32 +@pytest.mark.esp32s2 +@pytest.mark.esp32c3 +@pytest.mark.esp32s3 +@pytest.mark.temp_skip_ci(targets=['esp32', 'esp32s2', 'esp32c3', 'esp32s3'], reason='local stress test') +@pytest.mark.timeout(1200) +@pytest.mark.Example_ShieldBox_Basic +@pytest.mark.parametrize('config', [ + '00', + '01', + '02', + '03', + '04', + '05', + '06', + '07', + '99' +], indirect=True) +def test_wifi_throughput_with_different_configs( + dut: Dut, + generate_report_different_configs: Callable[[Dict[str, Any], Dict[str, Any], str], None], +) -> None: + """ + steps: | + 1. build iperf with specified configs + 2. test throughput for all routers + """ + # 1. wait for DUT + dut.expect('iperf>') + + # 2. preparing + env_name = 'Example_ShieldBox_Basic' + pc_nic = get_env_config_variable(env_name, 'pc_nic') + pc_nic_ip = get_host_ip_by_interface(pc_nic) + pc_iperf_log_file = os.path.join(dut.logdir, 'pc_iperf_log.md') + ap_info = { + 'ssid': get_env_config_variable(env_name, 'ap_ssid'), + 'password': get_env_config_variable(env_name, 'ap_password'), + } + + found_config = re.search(r'esp32.*\.(\w+)\.', dut.test_case_name) + if found_config is not None: + config_name = found_config.group(1) + else: + raise Exception('config name not found') + + # 3. run test for each required att value + test_result = { + 'tcp_tx': IperfUtility.TestResult('tcp', 'tx', config_name), + 'tcp_rx': IperfUtility.TestResult('tcp', 'rx', config_name), + 'udp_tx': IperfUtility.TestResult('udp', 'tx', config_name), + 'udp_rx': IperfUtility.TestResult('udp', 'rx', config_name), + } + test_utility = IperfUtility.IperfTestUtility(dut, config_name, ap_info['ssid'], ap_info['password'], pc_nic_ip, + pc_iperf_log_file, test_result) + for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE): + test_utility.run_all_cases(0, NO_BANDWIDTH_LIMIT) + + for result_type in test_result: + summary = str(test_result[result_type]) + if summary: + logging.info(summary) + + generate_report_different_configs(ap_info, test_result, config_name) + + +@pytest.mark.esp32 +@pytest.mark.esp32s2 +@pytest.mark.esp32c3 +@pytest.mark.esp32s3 +@pytest.mark.temp_skip(targets=['esp32', 'esp32s2', 'esp32c3', 'esp32s3'], reason='lack of runners') +@pytest.mark.timeout(3600) +@pytest.mark.Example_ShieldBox +@pytest.mark.parametrize('config', [ + BEST_PERFORMANCE_CONFIG +], indirect=True) +def test_wifi_throughput_vs_rssi( + dut: Dut, + session_tempdir:str, +) -> None: + """ + steps: | + 1. build with best performance config + 2. switch on one router + 3. set attenuator value from 0-60 for each router + 4. test TCP tx rx and UDP tx rx throughput + """ + # 1. wait for DUT + dut.expect('iperf>') + + # 2. preparing + env_name = 'Example_ShieldBox' + att_port = get_env_config_variable(env_name, 'attenuator_port') + ap_list = get_env_config_variable(env_name, 'ap_list') + pc_nic = get_env_config_variable(env_name, 'pc_nic') + pc_nic_ip = get_host_ip_by_interface(pc_nic) + apc_ip = get_env_config_variable(env_name, 'apc_ip') + pc_iperf_log_file = os.path.join(dut.logdir, 'pc_iperf_log.md') + + test_result = { + 'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG), + 'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG), + 'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG), + 'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG), + } + + # 3. run test for each required att value + for ap_info in ap_list: + test_utility = IperfUtility.IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'], + ap_info['password'], pc_nic_ip, pc_iperf_log_file, test_result) + PowerControl.Control.control_rest(apc_ip, ap_info['outlet'], 'OFF') + PowerControl.Control.control(apc_ip, {ap_info['outlet']: 'ON'}) + Attenuator.set_att(att_port, 0) + if not test_utility.wait_ap_power_on(): + logging.error('[{}] failed to power on, skip testing this AP'.format(ap_info['ssid'])) + continue + for atten_val in ATTEN_VALUE_LIST: + assert Attenuator.set_att(att_port, atten_val) is True + try: + test_utility.run_all_cases(atten_val, NO_BANDWIDTH_LIMIT) + except AssertionError: + break + + # 4. generate report + report = TestReport.ThroughputVsRssiReport(os.path.join(session_tempdir, 'Performance', 'STAThroughputVsRssiReport'), + test_result) + report.generate_report() + + +@pytest.mark.esp32 +@pytest.mark.esp32s2 +@pytest.mark.esp32c3 +@pytest.mark.esp32s3 +@pytest.mark.temp_skip(targets=['esp32', 'esp32s2', 'esp32c3', 'esp32s3'], reason='lack of runners') +@pytest.mark.parametrize('count, config', [ + (2, BEST_PERFORMANCE_CONFIG), +], indirect=True) +def test_softap_throughput_vs_rssi( + dut: Tuple[IdfDut, IdfDut], + session_tempdir:str, +) -> None: + """ + steps: | + 1. build with best performance config + 2. switch on one router + 3. set attenuator value from 0-60 for each router + 4. test TCP tx rx and UDP tx rx throughput + """ + # 1. wait for DUTs + softap_dut = dut[0] + sta_dut = dut[1] + softap_dut.expect('iperf>') + sta_dut.expect('iperf>') + + # 2. preparing + env_name = 'Example_ShieldBox2' + att_port = get_env_config_variable(env_name, 'attenuator_port') + + test_result = { + 'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG), + 'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG), + 'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG), + 'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG), + } + + # 3. run test for each required att value + test_utility = IperfTestUtilitySoftap(sta_dut, softap_dut, BEST_PERFORMANCE_CONFIG, test_result) + + Attenuator.set_att(att_port, 0) + + for atten_val in ATTEN_VALUE_LIST: + assert Attenuator.set_att(att_port, atten_val) is True + try: + test_utility.run_all_cases(atten_val, NO_BANDWIDTH_LIMIT) + except AssertionError: + break + + # 4. generate report + report = TestReport.ThroughputVsRssiReport(os.path.join(session_tempdir, 'Performance', + 'SoftAPThroughputVsRssiReport'),test_result) + report.generate_report() diff --git a/tools/ci/python_packages/idf_iperf_test_util/IperfUtility.py b/tools/ci/python_packages/idf_iperf_test_util/IperfUtility.py index e350c9c57c..797d8f54f4 100644 --- a/tools/ci/python_packages/idf_iperf_test_util/IperfUtility.py +++ b/tools/ci/python_packages/idf_iperf_test_util/IperfUtility.py @@ -1,12 +1,14 @@ -# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD +# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 +import logging import os import re import subprocess import time +import pexpect from idf_iperf_test_util import LineChart -from tiny_test_fw import DUT, Utility +from pytest_embedded import Dut try: from typing import Any, Tuple @@ -45,7 +47,7 @@ class TestResult(object): RSSI_RANGE = [-x for x in range(10, 100)] ATT_RANGE = [x for x in range(0, 64)] - def __init__(self, proto, direction, config_name): # type: (str, str, str) -> None + def __init__(self, proto:str, direction:str, config_name:str) -> None: self.proto = proto self.direction = direction self.config_name = config_name @@ -55,7 +57,7 @@ class TestResult(object): self.heap_size = INVALID_HEAP_SIZE self.error_list = [] # type: list[str] - def _save_result(self, throughput, ap_ssid, att, rssi, heap_size): # type: (float, str, int, int, str) -> None + def _save_result(self, throughput:float, ap_ssid:str, att:int, rssi:int, heap_size:str) -> None: """ save the test results: @@ -70,7 +72,7 @@ class TestResult(object): self.att_rssi_map[ap_ssid][att] = rssi - def record_throughput(database, key_value): # type: (dict, int) -> None + def record_throughput(database:dict, key_value:int) -> None: try: # we save the larger value for same att if throughput > database[ap_ssid][key_value]: @@ -84,7 +86,7 @@ class TestResult(object): if int(heap_size) < self.heap_size: self.heap_size = int(heap_size) - def add_result(self, raw_data, ap_ssid, att, rssi, heap_size): # type: (str, str, int, int, str) -> float + def add_result(self, raw_data:str, ap_ssid:str, att:int, rssi:int, heap_size:str) -> float: """ add result for one test @@ -132,14 +134,14 @@ class TestResult(object): return max_throughput - def post_analysis(self): # type: () -> None + def post_analysis(self) -> None: """ some rules need to be checked after we collected all test raw data: 1. throughput value 30% worse than the next point with lower RSSI 2. throughput value 30% worse than the next point with larger attenuate """ - def analysis_bad_point(data, index_type): # type: (dict, str) -> None + def analysis_bad_point(data:dict, index_type:str) -> None: for ap_ssid in data: result_dict = data[ap_ssid] index_list = list(result_dict.keys()) @@ -160,7 +162,7 @@ class TestResult(object): analysis_bad_point(self.throughput_by_rssi, 'rssi') analysis_bad_point(self.throughput_by_att, 'att') - def draw_throughput_figure(self, path, ap_ssid, draw_type): # type: (str, str, str) -> str + def draw_throughput_figure(self, path:str, ap_ssid:str, draw_type:str) -> str: """ :param path: folder to save figure. make sure the folder is already created. :param ap_ssid: ap ssid string or a list of ap ssid string @@ -189,7 +191,7 @@ class TestResult(object): data, range_list) return file_name - def draw_rssi_vs_att_figure(self, path, ap_ssid): # type: (str, str) -> str + def draw_rssi_vs_att_figure(self, path:str, ap_ssid:str) -> str: """ :param path: folder to save figure. make sure the folder is already created. :param ap_ssid: ap to use @@ -207,13 +209,13 @@ class TestResult(object): self.ATT_RANGE) return file_name - def get_best_throughput(self): # type: () -> Any + def get_best_throughput(self) -> Any: """ get the best throughput during test """ best_for_aps = [max(self.throughput_by_att[ap_ssid].values()) for ap_ssid in self.throughput_by_att] return max(best_for_aps) - def __str__(self): # type: () -> str + def __str__(self) -> str: """ returns summary for this test: @@ -237,8 +239,8 @@ class TestResult(object): class IperfTestUtility(object): """ iperf test implementation """ - def __init__(self, dut, config_name, ap_ssid, ap_password, - pc_nic_ip, pc_iperf_log_file, test_result=None): # type: (str, str, str, str, str, str, Any) -> None + def __init__(self, dut:Dut, config_name:str, ap_ssid:str, ap_password:str, + pc_nic_ip:str, pc_iperf_log_file:str, test_result:Any=None) -> None: self.config_name = config_name self.dut = dut @@ -259,7 +261,7 @@ class IperfTestUtility(object): 'udp_rx': TestResult('udp', 'rx', config_name), } - def setup(self): # type: (Any) -> Tuple[str,int] + def setup(self) -> Tuple[str,int]: """ setup iperf test: @@ -274,25 +276,26 @@ class IperfTestUtility(object): pass time.sleep(5) self.dut.write('restart') - self.dut.expect_any('iperf>', 'esp32>') + self.dut.expect_exact("Type 'help' to get the list of commands.") + self.dut.expect('iperf>') self.dut.write('scan {}'.format(self.ap_ssid)) for _ in range(SCAN_RETRY_COUNT): try: - rssi = int(self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)), - timeout=SCAN_TIMEOUT)[0]) + rssi = int(self.dut.expect(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid), + timeout=SCAN_TIMEOUT).group(1)) break - except DUT.ExpectTimeout: + except pexpect.TIMEOUT: continue else: raise AssertionError('Failed to scan AP') self.dut.write('sta {} {}'.format(self.ap_ssid, self.ap_password)) - dut_ip = self.dut.expect(re.compile(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)'))[0] + dut_ip = self.dut.expect(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)').group(1) return dut_ip, rssi - def _save_test_result(self, test_case, raw_data, att, rssi, heap_size): # type: (str, str, int, int, int) -> Any + def _save_test_result(self, test_case:str, raw_data:str, att:int, rssi:int, heap_size:int) -> Any: return self.test_result[test_case].add_result(raw_data, self.ap_ssid, att, rssi, heap_size) - def _test_once(self, proto, direction, bw_limit): # type: (Any, str, str, int) -> Tuple[str, int, int] + def _test_once(self, proto:str, direction:str, bw_limit:int) -> Tuple[str, int, int]: """ do measure once for one type """ # connect and scan to get RSSI dut_ip, rssi = self.setup() @@ -336,9 +339,9 @@ class IperfTestUtility(object): # wait until DUT TCP server created try: self.dut.expect('iperf: Socket created', timeout=5) - except DUT.ExpectTimeout: + except pexpect.TIMEOUT: # compatible with old iperf example binary - Utility.console_log('create iperf tcp server fail') + logging.info('create iperf tcp server fail') if bw_limit > 0: process = subprocess.Popen(['iperf', '-c', dut_ip, '-b', str(bw_limit) + 'm', '-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f) @@ -356,9 +359,9 @@ class IperfTestUtility(object): # wait until DUT TCP server created try: self.dut.expect('iperf: Socket bound', timeout=5) - except DUT.ExpectTimeout: + except pexpect.TIMEOUT: # compatible with old iperf example binary - Utility.console_log('create iperf udp server fail') + logging.info('create iperf udp server fail') if bw_limit > 0: process = subprocess.Popen(['iperf', '-c', dut_ip, '-u', '-b', str(bw_limit) + 'm', '-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f) @@ -379,10 +382,13 @@ class IperfTestUtility(object): else: process.terminate() - server_raw_data = self.dut.read() + server_raw_data = self.dut.expect(pexpect.TIMEOUT, timeout=0).decode('utf-8') with open(PC_IPERF_TEMP_LOG_FILE, 'r') as f: pc_raw_data = f.read() + if os.path.exists(PC_IPERF_TEMP_LOG_FILE): + os.remove(PC_IPERF_TEMP_LOG_FILE) + # save PC iperf logs to console with open(self.pc_iperf_log_file, 'a+') as f: f.write('## [{}] `{}`\r\n##### {}' @@ -391,18 +397,19 @@ class IperfTestUtility(object): time.strftime('%m-%d %H:%M:%S', time.localtime(time.time())))) f.write('\r\n```\r\n\r\n' + pc_raw_data + '\r\n```\r\n') self.dut.write('heap') - heap_size = self.dut.expect(re.compile(r'min heap size: (\d+)\D'))[0] + heap_size = self.dut.expect(r'min heap size: (\d+)\D').group(1) # return server raw data (for parsing test results) and RSSI return server_raw_data, rssi, heap_size - def run_test(self, proto, direction, atten_val, bw_limit): # type: (str, str, int, int) -> None + def run_test(self, proto:str, direction:str, atten_val:int, bw_limit:int) -> None: """ run test for one type, with specified atten_value and save the test result :param proto: tcp or udp :param direction: tx or rx :param atten_val: attenuate value + :param bw_limit: bandwidth limit """ rssi = FAILED_TO_SCAN_RSSI heap_size = INVALID_HEAP_SIZE @@ -411,32 +418,33 @@ class IperfTestUtility(object): throughput = self._save_test_result('{}_{}'.format(proto, direction), server_raw_data, atten_val, rssi, heap_size) - Utility.console_log('[{}][{}_{}][{}][{}]: {:.02f}' - .format(self.config_name, proto, direction, rssi, self.ap_ssid, throughput)) + logging.info('[{}][{}_{}][{}][{}]: {:.02f}' + .format(self.config_name, proto, direction, rssi, self.ap_ssid, throughput)) self.lowest_rssi_scanned = min(self.lowest_rssi_scanned, rssi) except (ValueError, IndexError): self._save_test_result('{}_{}'.format(proto, direction), '', atten_val, rssi, heap_size) - Utility.console_log('Fail to get throughput results.') + logging.info('Fail to get throughput results.') except AssertionError: self.fail_to_scan += 1 - Utility.console_log('Fail to scan AP.') + logging.info('Fail to scan AP.') - def run_all_cases(self, atten_val, bw_limit): # type: (int, int) -> None + def run_all_cases(self, atten_val:int, bw_limit:int) -> None: """ run test for all types (udp_tx, udp_rx, tcp_tx, tcp_rx). :param atten_val: attenuate value + :param bw_limit: bandwidth limit """ self.run_test('tcp', 'tx', atten_val, bw_limit) self.run_test('tcp', 'rx', atten_val, bw_limit) self.run_test('udp', 'tx', atten_val, bw_limit) self.run_test('udp', 'rx', atten_val, bw_limit) if self.fail_to_scan > 10: - Utility.console_log( + logging.info( 'Fail to scan AP for more than 10 times. Lowest RSSI scanned is {}'.format(self.lowest_rssi_scanned)) raise AssertionError - def wait_ap_power_on(self): # type: (Any) -> bool + def wait_ap_power_on(self) -> bool: """ AP need to take sometime to power on. It changes for different APs. This method will scan to check if the AP powers on. @@ -444,15 +452,15 @@ class IperfTestUtility(object): :return: True or False """ self.dut.write('restart') - self.dut.expect_any('iperf>', 'esp32>') + self.dut.expect('iperf>') for _ in range(WAIT_AP_POWER_ON_TIMEOUT // SCAN_TIMEOUT): try: self.dut.write('scan {}'.format(self.ap_ssid)) - self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)), + self.dut.expect(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid), timeout=SCAN_TIMEOUT) ret = True break - except DUT.ExpectTimeout: + except pexpect.TIMEOUT: pass else: ret = False diff --git a/tools/requirements/requirements.pytest.txt b/tools/requirements/requirements.pytest.txt index 63884b495a..9fd9a6097e 100644 --- a/tools/requirements/requirements.pytest.txt +++ b/tools/requirements/requirements.pytest.txt @@ -20,5 +20,8 @@ dbus-python; sys_platform == 'linux' protobuf paho-mqtt +# iperf_test_util +pyecharts + # for twai tests, communicate with socket can device (e.g. Canable) python-can diff --git a/tools/requirements/requirements.ttfw.txt b/tools/requirements/requirements.ttfw.txt index 7b226aced6..e3e213c27a 100644 --- a/tools/requirements/requirements.ttfw.txt +++ b/tools/requirements/requirements.ttfw.txt @@ -20,9 +20,6 @@ future dbus-python; sys_platform == 'linux' pygobject; sys_platform != 'win32' -# iperf_test_util -pyecharts - # esp_prov bleak # future # addressed before under ble