Ethernet & WiFi iperf test migration to pytest

This commit is contained in:
Ondrej Kosta 2022-12-20 15:38:48 +01:00
parent 3df87a91a3
commit 7aa4462d0b
8 changed files with 512 additions and 456 deletions

View File

@ -244,6 +244,13 @@ example_test_pytest_esp32s3_wifi_router:
needs:
- build_pytest_examples_esp32s3
tags: [ esp32s3, wifi_router ]
example_test_pytest_esp32_wifi_iperf:
extends:
- .pytest_examples_dir_template
- .rules:test:example_test-esp32-wifi
needs:
- build_pytest_examples_esp32
tags: [ esp32, Example_ShieldBox_Basic ]
example_test_pytest_esp32_wifi_wlan:
extends:
@ -253,6 +260,14 @@ example_test_pytest_esp32_wifi_wlan:
- build_pytest_examples_esp32
tags: [ esp32, wifi_wlan ]
example_test_pytest_esp32_ethernet_router:
extends:
- .pytest_examples_dir_template
- .rules:test:example_test-esp32-ethernet
needs:
- build_pytest_examples_esp32
tags: [ esp32, ethernet_router ]
example_test_pytest_esp32_ethernet_ip101:
extends:
- .pytest_examples_dir_template
@ -988,22 +1003,6 @@ example_test_001C:
- ESP32
- Example_GENERIC
example_test_002:
extends:
- .example_test_esp32_template
- .rules:test:example_test-esp32-wifi
tags:
- ESP32
- Example_ShieldBox_Basic
example_test_ethernet_router:
extends:
- .example_test_esp32_template
- .rules:test:example_test-esp32-ethernet
tags:
- ESP32
- ethernet_router
.example_test_003:
extends: .example_test_esp32_template
tags:
@ -1107,12 +1106,6 @@ example_test_C6_GENERIC:
- .test_app_template
- .rules:test:custom_test-esp32s3
test_app_test_eth:
extends: .test_app_esp32_template
tags:
- ESP32
- ethernet_router
.unit_test_template:
extends: .target_test_job_template
needs: # the assign already needs all the build jobs

View File

@ -102,6 +102,8 @@ ENV_MARKERS = {
'wifi_router': 'both the runner and dut connect to the same wifi router',
'wifi_high_traffic': 'wifi high traffic runners',
'wifi_wlan': 'wifi runner with a wireless NIC',
'Example_ShieldBox_Basic': 'basic configuration of the AP and ESP DUT placed in shielded box',
'Example_ShieldBox': 'multiple shielded APs connected to shielded ESP DUT via RF cable with programmable attenuator',
'xtal_26mhz': 'runner with 26MHz xtal on board',
'xtal_40mhz': 'runner with 40MHz xtal on board',
'external_flash': 'external flash memory connected via VSPI (FSPI)',

View File

@ -1,25 +1,25 @@
# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
"""
Test case for iperf example.
This test case might have problem running on windows:
This test case might have problem running on Windows:
1. direct use of `make`
2. use `sudo killall iperf` to force kill iperf, didn't implement windows version
- use `sudo killall iperf` to force kill iperf, didn't implement windows version
"""
from __future__ import division, unicode_literals
import os
import re
import subprocess
import ttfw_idf
import pytest
from common_test_methods import get_host_ip4_by_dest_ip
from idf_iperf_test_util import IperfUtility
from tiny_test_fw import TinyFW
from pytest_embedded import Dut
try:
from typing import Any, Tuple
from typing import Any, Callable, Tuple
except ImportError:
# Only used for type annotations
pass
@ -29,10 +29,10 @@ NO_BANDWIDTH_LIMIT = -1 # iperf send bandwidth is not limited
class IperfTestUtilityEth(IperfUtility.IperfTestUtility):
""" iperf test implementation """
def __init__(self, dut, config_name, pc_nic_ip, pc_iperf_log_file, test_result=None): # type: (str, str, str,str, Any) -> None
def __init__(self, dut: str, config_name: str, pc_nic_ip: str, pc_iperf_log_file: str, test_result:Any=None) -> None:
IperfUtility.IperfTestUtility.__init__(self, dut, config_name, 'None', 'None', pc_nic_ip, pc_iperf_log_file, test_result)
def setup(self): # type: () -> Tuple[str,int]
def setup(self) -> Tuple[str,int]:
"""
setup iperf test:
@ -45,65 +45,53 @@ class IperfTestUtilityEth(IperfUtility.IperfTestUtility):
pass
self.dut.write('restart')
self.dut.expect("Type 'help' to get the list of commands.")
self.dut.expect_any('iperf>', 'esp32>')
self.dut.write('ethernet start')
dut_ip = self.dut.expect(re.compile(r'esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),'))[0]
self.dut.expect('iperf>')
dut_ip = self.dut.expect(r'esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),').group(1)
rssi = 0
return dut_ip, rssi
@ttfw_idf.idf_example_test(env_tag='ethernet_router')
def test_ethernet_throughput_basic(env, _): # type: (Any, Any) -> None
@pytest.mark.esp32
@pytest.mark.ethernet_router
def test_esp_eth_iperf(
dut: Dut,
log_performance: Callable[[str, object], None],
check_performance: Callable[[str, float, str], None],
) -> None:
"""
steps: |
1. test TCP tx rx and UDP tx rx throughput
2. compare with the pre-defined pass standard
"""
pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md')
# 1. get DUT
dut = env.get_dut('iperf', 'examples/ethernet/iperf', dut_class=ttfw_idf.ESP32DUT)
dut.start_app()
dut.expect_any('iperf>', 'esp32>')
# 1. wait for DUT
dut.expect_exact('iperf>')
# 2. preparing
dut.write('ethernet start')
dut_ip = dut.expect(re.compile(r'esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),'))[0]
pc_iperf_log_file = os.path.join(dut.logdir, 'pc_iperf_log.md')
dut_ip = dut.expect(r'esp_netif_handlers: .+ ip: (\d+\.\d+\.\d+\.\d+),').group(1)
pc_nic_ip = get_host_ip4_by_dest_ip(dut_ip)
test_result = {
'tcp_tx': IperfUtility.TestResult('tcp', 'tx', 'ethernet'),
'tcp_rx': IperfUtility.TestResult('tcp', 'rx', 'ethernet'),
'udp_tx': IperfUtility.TestResult('udp', 'tx', 'ethernet'),
'udp_rx': IperfUtility.TestResult('udp', 'rx', 'ethernet'),
}
test_utility = IperfTestUtilityEth(dut, 'ethernet', pc_nic_ip, pc_iperf_log_file, test_result)
# 3. run test for TCP Tx, Rx and UDP Tx, Rx
test_utility.run_test('tcp', 'tx', 0, NO_BANDWIDTH_LIMIT)
test_utility.run_test('tcp', 'rx', 0, NO_BANDWIDTH_LIMIT)
test_utility.run_test('udp', 'tx', 0, 80)
test_utility.run_test('udp', 'rx', 0, NO_BANDWIDTH_LIMIT)
# 4. log performance and compare with pass standard
performance_items = []
for throughput_type in test_result:
ttfw_idf.log_performance('{}_throughput'.format(throughput_type),
'{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput()))
performance_items.append(['{}_throughput'.format(throughput_type),
'{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput())])
log_performance('{}_throughput'.format(throughput_type),
'{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput()))
# 5. save to report
TinyFW.JunitReport.update_performance(performance_items)
# do check after logging, otherwise test will exit immediately if check fail, some performance can't be logged.
for throughput_type in test_result:
ttfw_idf.check_performance('{}_throughput'.format(throughput_type + '_eth'),
test_result[throughput_type].get_best_throughput(), dut.TARGET)
env.close_dut('iperf')
if __name__ == '__main__':
test_ethernet_throughput_basic(env_config_file='EnvConfig.yml')
check_performance('{}_throughput'.format(throughput_type + '_eth'),
test_result[throughput_type].get_best_throughput(),
dut.target)

View File

@ -1,353 +0,0 @@
"""
Test case for iperf example.
This test case might have problem running on windows:
1. direct use of `make`
2. use `sudo killall iperf` to force kill iperf, didn't implement windows version
The test env Example_ShieldBox do need the following config::
Example_ShieldBox:
ap_list:
- ssid: "ssid"
password: "password"
outlet: 1
apc_ip: "192.168.1.88"
attenuator_port: "/dev/ttyUSB0"
iperf: "/dev/ttyUSB1"
apc_ip: "192.168.1.88"
pc_nic: "eth0"
"""
import os
import re
import subprocess
import time
import ttfw_idf
from idf_iperf_test_util import Attenuator, IperfUtility, PowerControl, TestReport
from idf_iperf_test_util.IperfUtility import SCAN_RETRY_COUNT, SCAN_TIMEOUT, TEST_TIME
from tiny_test_fw import DUT, TinyFW, Utility
# configurations
RETRY_COUNT_FOR_BEST_PERFORMANCE = 2
ATTEN_VALUE_LIST = range(0, 60, 2)
NO_BANDWIDTH_LIMIT = -1 # iperf send bandwith is not limited
CONFIG_NAME_PATTERN = re.compile(r'sdkconfig\.ci\.(.+)')
# We need to auto compare the difference between adjacent configs (01 -> 00, 02 -> 01, ...) and put them to reports.
# Using numbers for config will make this easy.
# Use default value `99` for config with best performance.
BEST_PERFORMANCE_CONFIG = '99'
class IperfTestUtilitySoftap(IperfUtility.IperfTestUtility):
""" iperf test implementation """
def __init__(self, dut, softap_dut, config_name, test_result=None):
IperfUtility.IperfTestUtility.__init__(self, dut, config_name, 'softap', '1234567890', None, None, test_result)
self.softap_dut = softap_dut
self.softap_ip = '192.168.4.1'
def setup(self):
"""
setup iperf test:
1. kill current iperf process
2. reboot DUT (currently iperf is not very robust, need to reboot DUT)
3. scan to get AP RSSI
4. connect to AP
"""
self.softap_dut.write('restart')
self.softap_dut.expect_any('iperf>', 'esp32>', timeout=30)
self.softap_dut.write('ap {} {}'.format(self.ap_ssid, self.ap_password))
self.dut.write('restart')
self.dut.expect_any('iperf>', 'esp32>', timeout=30)
self.dut.write('scan {}'.format(self.ap_ssid))
for _ in range(SCAN_RETRY_COUNT):
try:
rssi = int(self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)),
timeout=SCAN_TIMEOUT)[0])
break
except DUT.ExpectTimeout:
continue
else:
raise AssertionError('Failed to scan AP')
self.dut.write('sta {} {}'.format(self.ap_ssid, self.ap_password))
dut_ip = self.dut.expect(re.compile(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)'))[0]
return dut_ip, rssi
def _test_once(self, proto, direction):
""" do measure once for one type """
# connect and scan to get RSSI
dut_ip, rssi = self.setup()
assert direction in ['rx', 'tx']
assert proto in ['tcp', 'udp']
# run iperf test
if direction == 'tx':
if proto == 'tcp':
self.softap_dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME))
# wait until DUT TCP server created
try:
self.softap_dut.expect('iperf tcp server create successfully', timeout=1)
except DUT.ExpectTimeout:
# compatible with old iperf example binary
pass
self.dut.write('iperf -c {} -i 1 -t {}'.format(self.softap_ip, TEST_TIME))
else:
self.softap_dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME))
self.dut.write('iperf -c {} -u -i 1 -t {}'.format(self.softap_ip, TEST_TIME))
else:
if proto == 'tcp':
self.dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME))
# wait until DUT TCP server created
try:
self.dut.expect('iperf tcp server create successfully', timeout=1)
except DUT.ExpectTimeout:
# compatible with old iperf example binary
pass
self.softap_dut.write('iperf -c {} -i 1 -t {}'.format(dut_ip, TEST_TIME))
else:
self.dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME))
self.softap_dut.write('iperf -c {} -u -i 1 -t {}'.format(dut_ip, TEST_TIME))
time.sleep(60)
if direction == 'tx':
server_raw_data = self.dut.read()
else:
server_raw_data = self.softap_dut.read()
self.dut.write('iperf -a')
self.softap_dut.write('iperf -a')
self.dut.write('heap')
heap_size = self.dut.expect(re.compile(r'min heap size: (\d+)\D'))[0]
# return server raw data (for parsing test results) and RSSI
return server_raw_data, rssi, heap_size
@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox_Basic', target=['ESP32', 'ESP32S2', 'ESP32C3', 'ESP32S3'], category='stress')
def test_wifi_throughput_with_different_configs(env, extra_data):
"""
steps: |
1. build iperf with specified configs
2. test throughput for all routers
"""
pc_nic_ip = env.get_pc_nic_info('pc_nic', 'ipv4')['addr']
pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md')
ap_info = {
'ssid': env.get_variable('ap_ssid'),
'password': env.get_variable('ap_password'),
}
config_names_raw = subprocess.check_output(['ls', os.path.dirname(os.path.abspath(__file__))])
config_names = CONFIG_NAME_PATTERN.findall(config_names_raw)
if not config_names:
raise ValueError('no configs found in {}'.format(os.path.dirname(__file__)))
test_result = dict()
sdkconfig_files = dict()
for config_name in config_names:
# 1. get the config
sdkconfig_files[config_name] = os.path.join(os.path.dirname(__file__),
'sdkconfig.ci.{}'.format(config_name))
# 2. get DUT and download
dut = env.get_dut('iperf', 'examples/wifi/iperf', app_config_name=config_name)
dut.start_app()
dut.expect_any('iperf>', 'esp32>')
# 3. run test for each required att value
test_result[config_name] = {
'tcp_tx': IperfUtility.TestResult('tcp', 'tx', config_name),
'tcp_rx': IperfUtility.TestResult('tcp', 'rx', config_name),
'udp_tx': IperfUtility.TestResult('udp', 'tx', config_name),
'udp_rx': IperfUtility.TestResult('udp', 'rx', config_name),
}
test_utility = IperfUtility.IperfTestUtility(dut, config_name, ap_info['ssid'], ap_info['password'], pc_nic_ip,
pc_iperf_log_file, test_result[config_name])
for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
test_utility.run_all_cases(0, NO_BANDWIDTH_LIMIT)
for result_type in test_result[config_name]:
summary = str(test_result[config_name][result_type])
if summary:
Utility.console_log(summary, color='orange')
# 4. check test results
env.close_dut('iperf')
# 5. generate report
report = TestReport.ThroughputForConfigsReport(os.path.join(env.log_path, 'Performance',
'ThroughputForConfigsReport'),
ap_info['ssid'], test_result, sdkconfig_files)
report.generate_report()
@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox', target=['ESP32', 'ESP32S2', 'ESP32C3', 'ESP32S3'], category='stress')
def test_wifi_throughput_vs_rssi(env, extra_data):
"""
steps: |
1. build with best performance config
2. switch on one router
3. set attenuator value from 0-60 for each router
4. test TCP tx rx and UDP tx rx throughput
"""
att_port = env.get_variable('attenuator_port')
ap_list = env.get_variable('ap_list')
pc_nic_ip = env.get_pc_nic_info('pc_nic', 'ipv4')['addr']
apc_ip = env.get_variable('apc_ip')
pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md')
test_result = {
'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
}
# 1. get DUT and download
dut = env.get_dut('iperf', 'examples/wifi/iperf', app_config_name=BEST_PERFORMANCE_CONFIG)
dut.start_app()
dut.expect_any('iperf>', 'esp32>')
# 2. run test for each required att value
for ap_info in ap_list:
test_utility = IperfUtility.IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'],
ap_info['password'], pc_nic_ip, pc_iperf_log_file, test_result)
PowerControl.Control.control_rest(apc_ip, ap_info['outlet'], 'OFF')
PowerControl.Control.control(apc_ip, {ap_info['outlet']: 'ON'})
Attenuator.set_att(att_port, 0)
if not test_utility.wait_ap_power_on():
Utility.console_log('[{}] failed to power on, skip testing this AP'
.format(ap_info['ssid']), color='red')
continue
for atten_val in ATTEN_VALUE_LIST:
assert Attenuator.set_att(att_port, atten_val) is True
try:
test_utility.run_all_cases(atten_val, NO_BANDWIDTH_LIMIT)
except AssertionError:
break
# 3. check test results
env.close_dut('iperf')
# 4. generate report
report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, 'Performance', 'STAThroughputVsRssiReport'),
test_result)
report.generate_report()
@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox_Basic',
target=['ESP32', 'ESP32S2', 'ESP32C3', 'ESP32S3'], ci_target=['ESP32'])
def test_wifi_throughput_basic(env, extra_data):
"""
steps: |
1. test TCP tx rx and UDP tx rx throughput
2. compare with the pre-defined pass standard
"""
pc_nic_ip = env.get_pc_nic_info('pc_nic', 'ipv4')['addr']
pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md')
ap_info = {
'ssid': env.get_variable('ap_ssid'),
'password': env.get_variable('ap_password'),
}
# 1. get DUT
dut = env.get_dut('iperf', 'examples/wifi/iperf', app_config_name=BEST_PERFORMANCE_CONFIG)
dut.start_app()
dut.expect_any('iperf>', 'esp32>')
# 2. preparing
test_result = {
'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
}
test_utility = IperfUtility.IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'], ap_info['password'],
pc_nic_ip, pc_iperf_log_file, test_result)
# 3. run test for TCP Tx, Rx and UDP Tx, Rx
for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
test_utility.run_all_cases(0, NO_BANDWIDTH_LIMIT)
# 4. log performance and compare with pass standard
performance_items = []
for throughput_type in test_result:
ttfw_idf.log_performance('{}_throughput'.format(throughput_type),
'{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput()))
performance_items.append(['{}_throughput'.format(throughput_type),
'{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput())])
# 5. save to report
TinyFW.JunitReport.update_performance(performance_items)
# do check after logging, otherwise test will exit immediately if check fail, some performance can't be logged.
for throughput_type in test_result:
ttfw_idf.check_performance('{}_throughput'.format(throughput_type),
test_result[throughput_type].get_best_throughput(), dut.TARGET)
env.close_dut('iperf')
@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox2', target=['ESP32', 'ESP32S2', 'ESP32C3', 'ESP32S3'], category='stress')
def test_softap_throughput_vs_rssi(env, extra_data):
"""
steps: |
1. build with best performance config
2. switch on one router
3. set attenuator value from 0-60 for each router
4. test TCP tx rx and UDP tx rx throughput
"""
att_port = env.get_variable('attenuator_port')
test_result = {
'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
}
# 1. get DUT and download
softap_dut = env.get_dut('softap_iperf', 'examples/wifi/iperf')
softap_dut.start_app()
softap_dut.expect_any('iperf>', 'esp32>')
sta_dut = env.get_dut('sta_iperf', 'examples/wifi/iperf', app_config_name=BEST_PERFORMANCE_CONFIG)
sta_dut.start_app()
sta_dut.expect_any('iperf>', 'esp32>')
# 2. run test for each required att value
test_utility = IperfTestUtilitySoftap(sta_dut, softap_dut, BEST_PERFORMANCE_CONFIG, test_result)
Attenuator.set_att(att_port, 0)
for atten_val in ATTEN_VALUE_LIST:
assert Attenuator.set_att(att_port, atten_val) is True
try:
test_utility.run_all_cases(atten_val, NO_BANDWIDTH_LIMIT)
except AssertionError:
break
env.close_dut('softap_iperf')
env.close_dut('sta_iperf')
# 3. generate report
report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, 'Performance',
'SoftAPThroughputVsRssiReport'),test_result)
report.generate_report()
if __name__ == '__main__':
# test_wifi_throughput_basic(env_config_file='EnvConfig.yml')
# test_wifi_throughput_with_different_configs(env_config_file='EnvConfig.yml')
test_wifi_throughput_vs_rssi(env_config_file='EnvConfig.yml', target='ESP32C3')
test_softap_throughput_vs_rssi(env_config_file='EnvConfig.yml')

View File

@ -0,0 +1,418 @@
# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
"""
Test case for iperf example.
This test case might have problem running on windows:
- use `sudo killall iperf` to force kill iperf, didn't implement windows version
The test env Example_ShieldBox do need the following config::
Example_ShieldBox:
ap_list:
- ssid: "ssid"
password: "password"
outlet: 1
apc_ip: "192.168.1.88"
attenuator_port: "/dev/ttyUSB0"
iperf: "/dev/ttyUSB1"
apc_ip: "192.168.1.88"
pc_nic: "eth0"
"""
import logging
import os
import re
import time
from typing import Any, Callable, Dict, Generator, Tuple
import pexpect
import pytest
from common_test_methods import get_env_config_variable, get_host_ip_by_interface
from idf_iperf_test_util import Attenuator, IperfUtility, PowerControl, TestReport
from idf_iperf_test_util.IperfUtility import SCAN_RETRY_COUNT, SCAN_TIMEOUT, TEST_TIME
from pytest_embedded import Dut
from pytest_embedded_idf.dut import IdfDut
# configurations
RETRY_COUNT_FOR_BEST_PERFORMANCE = 2
ATTEN_VALUE_LIST = range(0, 60, 2)
NO_BANDWIDTH_LIMIT = -1 # iperf send bandwith is not limited
# We need to auto compare the difference between adjacent configs (01 -> 00, 02 -> 01, ...) and put them to reports.
# Using numbers for config will make this easy.
# Use default value `99` for config with best performance.
BEST_PERFORMANCE_CONFIG = '99'
class IperfTestUtilitySoftap(IperfUtility.IperfTestUtility):
""" iperf test implementation """
def __init__(self, dut:IdfDut, softap_dut:IdfDut, config_name:str, test_result:Any=None) -> None:
IperfUtility.IperfTestUtility.__init__(self, dut, config_name, 'softap', '1234567890', None, None, test_result)
self.softap_dut = softap_dut
self.softap_ip = '192.168.4.1'
def setup(self) -> Tuple[str,int]:
"""
setup iperf test:
1. kill current iperf process
2. reboot DUT (currently iperf is not very robust, need to reboot DUT)
3. scan to get AP RSSI
4. connect to AP
"""
self.softap_dut.write('restart')
self.softap_dut.expect_exact("Type 'help' to get the list of commands.")
self.softap_dut.expect('iperf>', timeout=30)
self.softap_dut.write('ap {} {}'.format(self.ap_ssid, self.ap_password))
self.dut.write('restart')
self.dut.expect_exact("Type 'help' to get the list of commands.")
self.dut.expect('iperf>', timeout=30)
self.dut.write('scan {}'.format(self.ap_ssid))
for _ in range(SCAN_RETRY_COUNT):
try:
rssi = int(self.dut.expect(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid),
timeout=SCAN_TIMEOUT).group(1))
break
except pexpect.TIMEOUT:
continue
else:
raise AssertionError('Failed to scan AP')
self.dut.write('sta {} {}'.format(self.ap_ssid, self.ap_password))
dut_ip = self.dut.expect(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)').group(1).decode('utf-8')
return dut_ip, rssi
def _test_once(self, proto:str, direction:str, bw_limit:int) -> Tuple[str, int, int]:
""" do measure once for one type """
# connect and scan to get RSSI
dut_ip, rssi = self.setup()
assert direction in ['rx', 'tx']
assert proto in ['tcp', 'udp']
# run iperf test
if direction == 'tx':
if proto == 'tcp':
self.softap_dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME))
# wait until DUT TCP server created
try:
self.softap_dut.expect('iperf tcp server create successfully', timeout=1)
except pexpect.TIMEOUT:
# compatible with old iperf example binary
pass
if bw_limit > 0:
self.dut.write('iperf -c {} -i 1 -t {} -b {}'.format(self.softap_ip, TEST_TIME, bw_limit))
else:
self.dut.write('iperf -c {} -i 1 -t {}'.format(self.softap_ip, TEST_TIME))
else:
self.softap_dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME))
if bw_limit > 0:
self.dut.write('iperf -c {} -u -i 1 -t {} -b {}'.format(self.softap_ip, TEST_TIME, bw_limit))
else:
self.dut.write('iperf -c {} -u -i 1 -t {}'.format(self.softap_ip, TEST_TIME))
else:
if proto == 'tcp':
self.dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME))
# wait until DUT TCP server created
try:
self.dut.expect('iperf tcp server create successfully', timeout=1)
except pexpect.TIMEOUT:
# compatible with old iperf example binary
pass
if bw_limit > 0:
self.softap_dut.write('iperf -c {} -i 1 -t {} -b {}'.format(dut_ip, TEST_TIME, bw_limit))
else:
self.softap_dut.write('iperf -c {} -i 1 -t {}'.format(dut_ip, TEST_TIME))
else:
self.dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME))
if bw_limit > 0:
self.softap_dut.write('iperf -c {} -u -i 1 -t {} -b {}'.format(dut_ip, TEST_TIME, bw_limit))
else:
self.softap_dut.write('iperf -c {} -u -i 1 -t {}'.format(dut_ip, TEST_TIME))
time.sleep(TEST_TIME + 5)
if direction == 'tx':
server_raw_data = self.dut.expect(pexpect.TIMEOUT, timeout=0).decode('utf-8')
else:
server_raw_data = self.dut.expect(pexpect.TIMEOUT, timeout=0).decode('utf-8')
self.dut.write('iperf -a')
self.softap_dut.write('iperf -a')
self.dut.write('heap')
heap_size = self.dut.expect(r'min heap size: (\d+)\D').group(1)
# return server raw data (for parsing test results) and RSSI
return server_raw_data, rssi, heap_size
@pytest.fixture(name='generate_report_different_configs', scope='session')
def fixture_generate_report_different_configs(
session_tempdir:str
) -> Generator[Callable[[Dict[str, Any], Dict[str, Any], str], None], None, None]:
_test_result_dict = dict()
_sdkconfig_files_dict = dict()
_ap_info = dict()
def add_config(ap_info:Dict[str, Any], test_result:Dict[str, Any], config_name:str) -> None:
"""
Collects results for each config and stores it to a dictionary
Args:
ap_info: AP info
test_result: test results for a specific config
config_name: config name
"""
# need to store the SSID to generate the report in the teardown period
# note that the info passed along with the last call of the fixture is used in the teardown period
_ap_info['ssid'] = ap_info['ssid']
_test_result_dict[config_name] = test_result
_sdkconfig_files_dict[config_name] = 'sdkconfig.ci.' + config_name
yield add_config
# the final report for all config results is generated during fixture's teardown period
report = TestReport.ThroughputForConfigsReport(os.path.join(session_tempdir, 'Performance',
'ThroughputForConfigsReport'), _ap_info['ssid'],
_test_result_dict, _sdkconfig_files_dict)
report.generate_report()
@pytest.mark.esp32
@pytest.mark.esp32s2
@pytest.mark.esp32c3
@pytest.mark.esp32s3
@pytest.mark.temp_skip_ci(targets=['esp32s2', 'esp32c3', 'esp32s3'], reason='lack of runners (run only for ESP32)')
@pytest.mark.timeout(1200)
@pytest.mark.Example_ShieldBox_Basic
@pytest.mark.parametrize('config', [
BEST_PERFORMANCE_CONFIG
], indirect=True)
def test_wifi_throughput_basic(
dut: Dut,
log_performance: Callable[[str, str], None],
check_performance: Callable[[str, float, str], None],
) -> None:
"""
steps: |
1. test TCP tx rx and UDP tx rx throughput
2. compare with the pre-defined pass standard
"""
# 1. wait for DUT
dut.expect('iperf>')
# 2. preparing
env_name = 'Example_ShieldBox_Basic'
pc_nic = get_env_config_variable(env_name, 'pc_nic')
pc_nic_ip = get_host_ip_by_interface(pc_nic)
pc_iperf_log_file = os.path.join(dut.logdir, 'pc_iperf_log.md')
ap_info = {
'ssid': get_env_config_variable(env_name, 'ap_ssid'),
'password': get_env_config_variable(env_name, 'ap_password'),
}
test_result = {
'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
}
test_utility = IperfUtility.IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'], ap_info['password'],
pc_nic_ip, pc_iperf_log_file, test_result)
# 3. run test for TCP Tx, Rx and UDP Tx, Rx
for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
test_utility.run_all_cases(0, NO_BANDWIDTH_LIMIT)
# 4. log performance and compare with pass standard
for throughput_type in test_result:
log_performance('{}_throughput'.format(throughput_type),
'{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput()))
# do check after logging, otherwise test will exit immediately if check fail, some performance can't be logged.
for throughput_type in test_result:
check_performance('{}_throughput'.format(throughput_type),
test_result[throughput_type].get_best_throughput(), dut.target)
@pytest.mark.esp32
@pytest.mark.esp32s2
@pytest.mark.esp32c3
@pytest.mark.esp32s3
@pytest.mark.temp_skip_ci(targets=['esp32', 'esp32s2', 'esp32c3', 'esp32s3'], reason='local stress test')
@pytest.mark.timeout(1200)
@pytest.mark.Example_ShieldBox_Basic
@pytest.mark.parametrize('config', [
'00',
'01',
'02',
'03',
'04',
'05',
'06',
'07',
'99'
], indirect=True)
def test_wifi_throughput_with_different_configs(
dut: Dut,
generate_report_different_configs: Callable[[Dict[str, Any], Dict[str, Any], str], None],
) -> None:
"""
steps: |
1. build iperf with specified configs
2. test throughput for all routers
"""
# 1. wait for DUT
dut.expect('iperf>')
# 2. preparing
env_name = 'Example_ShieldBox_Basic'
pc_nic = get_env_config_variable(env_name, 'pc_nic')
pc_nic_ip = get_host_ip_by_interface(pc_nic)
pc_iperf_log_file = os.path.join(dut.logdir, 'pc_iperf_log.md')
ap_info = {
'ssid': get_env_config_variable(env_name, 'ap_ssid'),
'password': get_env_config_variable(env_name, 'ap_password'),
}
found_config = re.search(r'esp32.*\.(\w+)\.', dut.test_case_name)
if found_config is not None:
config_name = found_config.group(1)
else:
raise Exception('config name not found')
# 3. run test for each required att value
test_result = {
'tcp_tx': IperfUtility.TestResult('tcp', 'tx', config_name),
'tcp_rx': IperfUtility.TestResult('tcp', 'rx', config_name),
'udp_tx': IperfUtility.TestResult('udp', 'tx', config_name),
'udp_rx': IperfUtility.TestResult('udp', 'rx', config_name),
}
test_utility = IperfUtility.IperfTestUtility(dut, config_name, ap_info['ssid'], ap_info['password'], pc_nic_ip,
pc_iperf_log_file, test_result)
for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
test_utility.run_all_cases(0, NO_BANDWIDTH_LIMIT)
for result_type in test_result:
summary = str(test_result[result_type])
if summary:
logging.info(summary)
generate_report_different_configs(ap_info, test_result, config_name)
@pytest.mark.esp32
@pytest.mark.esp32s2
@pytest.mark.esp32c3
@pytest.mark.esp32s3
@pytest.mark.temp_skip(targets=['esp32', 'esp32s2', 'esp32c3', 'esp32s3'], reason='lack of runners')
@pytest.mark.timeout(3600)
@pytest.mark.Example_ShieldBox
@pytest.mark.parametrize('config', [
BEST_PERFORMANCE_CONFIG
], indirect=True)
def test_wifi_throughput_vs_rssi(
dut: Dut,
session_tempdir:str,
) -> None:
"""
steps: |
1. build with best performance config
2. switch on one router
3. set attenuator value from 0-60 for each router
4. test TCP tx rx and UDP tx rx throughput
"""
# 1. wait for DUT
dut.expect('iperf>')
# 2. preparing
env_name = 'Example_ShieldBox'
att_port = get_env_config_variable(env_name, 'attenuator_port')
ap_list = get_env_config_variable(env_name, 'ap_list')
pc_nic = get_env_config_variable(env_name, 'pc_nic')
pc_nic_ip = get_host_ip_by_interface(pc_nic)
apc_ip = get_env_config_variable(env_name, 'apc_ip')
pc_iperf_log_file = os.path.join(dut.logdir, 'pc_iperf_log.md')
test_result = {
'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
}
# 3. run test for each required att value
for ap_info in ap_list:
test_utility = IperfUtility.IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'],
ap_info['password'], pc_nic_ip, pc_iperf_log_file, test_result)
PowerControl.Control.control_rest(apc_ip, ap_info['outlet'], 'OFF')
PowerControl.Control.control(apc_ip, {ap_info['outlet']: 'ON'})
Attenuator.set_att(att_port, 0)
if not test_utility.wait_ap_power_on():
logging.error('[{}] failed to power on, skip testing this AP'.format(ap_info['ssid']))
continue
for atten_val in ATTEN_VALUE_LIST:
assert Attenuator.set_att(att_port, atten_val) is True
try:
test_utility.run_all_cases(atten_val, NO_BANDWIDTH_LIMIT)
except AssertionError:
break
# 4. generate report
report = TestReport.ThroughputVsRssiReport(os.path.join(session_tempdir, 'Performance', 'STAThroughputVsRssiReport'),
test_result)
report.generate_report()
@pytest.mark.esp32
@pytest.mark.esp32s2
@pytest.mark.esp32c3
@pytest.mark.esp32s3
@pytest.mark.temp_skip(targets=['esp32', 'esp32s2', 'esp32c3', 'esp32s3'], reason='lack of runners')
@pytest.mark.parametrize('count, config', [
(2, BEST_PERFORMANCE_CONFIG),
], indirect=True)
def test_softap_throughput_vs_rssi(
dut: Tuple[IdfDut, IdfDut],
session_tempdir:str,
) -> None:
"""
steps: |
1. build with best performance config
2. switch on one router
3. set attenuator value from 0-60 for each router
4. test TCP tx rx and UDP tx rx throughput
"""
# 1. wait for DUTs
softap_dut = dut[0]
sta_dut = dut[1]
softap_dut.expect('iperf>')
sta_dut.expect('iperf>')
# 2. preparing
env_name = 'Example_ShieldBox2'
att_port = get_env_config_variable(env_name, 'attenuator_port')
test_result = {
'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
}
# 3. run test for each required att value
test_utility = IperfTestUtilitySoftap(sta_dut, softap_dut, BEST_PERFORMANCE_CONFIG, test_result)
Attenuator.set_att(att_port, 0)
for atten_val in ATTEN_VALUE_LIST:
assert Attenuator.set_att(att_port, atten_val) is True
try:
test_utility.run_all_cases(atten_val, NO_BANDWIDTH_LIMIT)
except AssertionError:
break
# 4. generate report
report = TestReport.ThroughputVsRssiReport(os.path.join(session_tempdir, 'Performance',
'SoftAPThroughputVsRssiReport'),test_result)
report.generate_report()

View File

@ -1,12 +1,14 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import logging
import os
import re
import subprocess
import time
import pexpect
from idf_iperf_test_util import LineChart
from tiny_test_fw import DUT, Utility
from pytest_embedded import Dut
try:
from typing import Any, Tuple
@ -45,7 +47,7 @@ class TestResult(object):
RSSI_RANGE = [-x for x in range(10, 100)]
ATT_RANGE = [x for x in range(0, 64)]
def __init__(self, proto, direction, config_name): # type: (str, str, str) -> None
def __init__(self, proto:str, direction:str, config_name:str) -> None:
self.proto = proto
self.direction = direction
self.config_name = config_name
@ -55,7 +57,7 @@ class TestResult(object):
self.heap_size = INVALID_HEAP_SIZE
self.error_list = [] # type: list[str]
def _save_result(self, throughput, ap_ssid, att, rssi, heap_size): # type: (float, str, int, int, str) -> None
def _save_result(self, throughput:float, ap_ssid:str, att:int, rssi:int, heap_size:str) -> None:
"""
save the test results:
@ -70,7 +72,7 @@ class TestResult(object):
self.att_rssi_map[ap_ssid][att] = rssi
def record_throughput(database, key_value): # type: (dict, int) -> None
def record_throughput(database:dict, key_value:int) -> None:
try:
# we save the larger value for same att
if throughput > database[ap_ssid][key_value]:
@ -84,7 +86,7 @@ class TestResult(object):
if int(heap_size) < self.heap_size:
self.heap_size = int(heap_size)
def add_result(self, raw_data, ap_ssid, att, rssi, heap_size): # type: (str, str, int, int, str) -> float
def add_result(self, raw_data:str, ap_ssid:str, att:int, rssi:int, heap_size:str) -> float:
"""
add result for one test
@ -132,14 +134,14 @@ class TestResult(object):
return max_throughput
def post_analysis(self): # type: () -> None
def post_analysis(self) -> None:
"""
some rules need to be checked after we collected all test raw data:
1. throughput value 30% worse than the next point with lower RSSI
2. throughput value 30% worse than the next point with larger attenuate
"""
def analysis_bad_point(data, index_type): # type: (dict, str) -> None
def analysis_bad_point(data:dict, index_type:str) -> None:
for ap_ssid in data:
result_dict = data[ap_ssid]
index_list = list(result_dict.keys())
@ -160,7 +162,7 @@ class TestResult(object):
analysis_bad_point(self.throughput_by_rssi, 'rssi')
analysis_bad_point(self.throughput_by_att, 'att')
def draw_throughput_figure(self, path, ap_ssid, draw_type): # type: (str, str, str) -> str
def draw_throughput_figure(self, path:str, ap_ssid:str, draw_type:str) -> str:
"""
:param path: folder to save figure. make sure the folder is already created.
:param ap_ssid: ap ssid string or a list of ap ssid string
@ -189,7 +191,7 @@ class TestResult(object):
data, range_list)
return file_name
def draw_rssi_vs_att_figure(self, path, ap_ssid): # type: (str, str) -> str
def draw_rssi_vs_att_figure(self, path:str, ap_ssid:str) -> str:
"""
:param path: folder to save figure. make sure the folder is already created.
:param ap_ssid: ap to use
@ -207,13 +209,13 @@ class TestResult(object):
self.ATT_RANGE)
return file_name
def get_best_throughput(self): # type: () -> Any
def get_best_throughput(self) -> Any:
""" get the best throughput during test """
best_for_aps = [max(self.throughput_by_att[ap_ssid].values())
for ap_ssid in self.throughput_by_att]
return max(best_for_aps)
def __str__(self): # type: () -> str
def __str__(self) -> str:
"""
returns summary for this test:
@ -237,8 +239,8 @@ class TestResult(object):
class IperfTestUtility(object):
""" iperf test implementation """
def __init__(self, dut, config_name, ap_ssid, ap_password,
pc_nic_ip, pc_iperf_log_file, test_result=None): # type: (str, str, str, str, str, str, Any) -> None
def __init__(self, dut:Dut, config_name:str, ap_ssid:str, ap_password:str,
pc_nic_ip:str, pc_iperf_log_file:str, test_result:Any=None) -> None:
self.config_name = config_name
self.dut = dut
@ -259,7 +261,7 @@ class IperfTestUtility(object):
'udp_rx': TestResult('udp', 'rx', config_name),
}
def setup(self): # type: (Any) -> Tuple[str,int]
def setup(self) -> Tuple[str,int]:
"""
setup iperf test:
@ -274,25 +276,26 @@ class IperfTestUtility(object):
pass
time.sleep(5)
self.dut.write('restart')
self.dut.expect_any('iperf>', 'esp32>')
self.dut.expect_exact("Type 'help' to get the list of commands.")
self.dut.expect('iperf>')
self.dut.write('scan {}'.format(self.ap_ssid))
for _ in range(SCAN_RETRY_COUNT):
try:
rssi = int(self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)),
timeout=SCAN_TIMEOUT)[0])
rssi = int(self.dut.expect(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid),
timeout=SCAN_TIMEOUT).group(1))
break
except DUT.ExpectTimeout:
except pexpect.TIMEOUT:
continue
else:
raise AssertionError('Failed to scan AP')
self.dut.write('sta {} {}'.format(self.ap_ssid, self.ap_password))
dut_ip = self.dut.expect(re.compile(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)'))[0]
dut_ip = self.dut.expect(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)').group(1)
return dut_ip, rssi
def _save_test_result(self, test_case, raw_data, att, rssi, heap_size): # type: (str, str, int, int, int) -> Any
def _save_test_result(self, test_case:str, raw_data:str, att:int, rssi:int, heap_size:int) -> Any:
return self.test_result[test_case].add_result(raw_data, self.ap_ssid, att, rssi, heap_size)
def _test_once(self, proto, direction, bw_limit): # type: (Any, str, str, int) -> Tuple[str, int, int]
def _test_once(self, proto:str, direction:str, bw_limit:int) -> Tuple[str, int, int]:
""" do measure once for one type """
# connect and scan to get RSSI
dut_ip, rssi = self.setup()
@ -336,9 +339,9 @@ class IperfTestUtility(object):
# wait until DUT TCP server created
try:
self.dut.expect('iperf: Socket created', timeout=5)
except DUT.ExpectTimeout:
except pexpect.TIMEOUT:
# compatible with old iperf example binary
Utility.console_log('create iperf tcp server fail')
logging.info('create iperf tcp server fail')
if bw_limit > 0:
process = subprocess.Popen(['iperf', '-c', dut_ip, '-b', str(bw_limit) + 'm',
'-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f)
@ -356,9 +359,9 @@ class IperfTestUtility(object):
# wait until DUT TCP server created
try:
self.dut.expect('iperf: Socket bound', timeout=5)
except DUT.ExpectTimeout:
except pexpect.TIMEOUT:
# compatible with old iperf example binary
Utility.console_log('create iperf udp server fail')
logging.info('create iperf udp server fail')
if bw_limit > 0:
process = subprocess.Popen(['iperf', '-c', dut_ip, '-u', '-b', str(bw_limit) + 'm',
'-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f)
@ -379,10 +382,13 @@ class IperfTestUtility(object):
else:
process.terminate()
server_raw_data = self.dut.read()
server_raw_data = self.dut.expect(pexpect.TIMEOUT, timeout=0).decode('utf-8')
with open(PC_IPERF_TEMP_LOG_FILE, 'r') as f:
pc_raw_data = f.read()
if os.path.exists(PC_IPERF_TEMP_LOG_FILE):
os.remove(PC_IPERF_TEMP_LOG_FILE)
# save PC iperf logs to console
with open(self.pc_iperf_log_file, 'a+') as f:
f.write('## [{}] `{}`\r\n##### {}'
@ -391,18 +397,19 @@ class IperfTestUtility(object):
time.strftime('%m-%d %H:%M:%S', time.localtime(time.time()))))
f.write('\r\n```\r\n\r\n' + pc_raw_data + '\r\n```\r\n')
self.dut.write('heap')
heap_size = self.dut.expect(re.compile(r'min heap size: (\d+)\D'))[0]
heap_size = self.dut.expect(r'min heap size: (\d+)\D').group(1)
# return server raw data (for parsing test results) and RSSI
return server_raw_data, rssi, heap_size
def run_test(self, proto, direction, atten_val, bw_limit): # type: (str, str, int, int) -> None
def run_test(self, proto:str, direction:str, atten_val:int, bw_limit:int) -> None:
"""
run test for one type, with specified atten_value and save the test result
:param proto: tcp or udp
:param direction: tx or rx
:param atten_val: attenuate value
:param bw_limit: bandwidth limit
"""
rssi = FAILED_TO_SCAN_RSSI
heap_size = INVALID_HEAP_SIZE
@ -411,32 +418,33 @@ class IperfTestUtility(object):
throughput = self._save_test_result('{}_{}'.format(proto, direction),
server_raw_data, atten_val,
rssi, heap_size)
Utility.console_log('[{}][{}_{}][{}][{}]: {:.02f}'
.format(self.config_name, proto, direction, rssi, self.ap_ssid, throughput))
logging.info('[{}][{}_{}][{}][{}]: {:.02f}'
.format(self.config_name, proto, direction, rssi, self.ap_ssid, throughput))
self.lowest_rssi_scanned = min(self.lowest_rssi_scanned, rssi)
except (ValueError, IndexError):
self._save_test_result('{}_{}'.format(proto, direction), '', atten_val, rssi, heap_size)
Utility.console_log('Fail to get throughput results.')
logging.info('Fail to get throughput results.')
except AssertionError:
self.fail_to_scan += 1
Utility.console_log('Fail to scan AP.')
logging.info('Fail to scan AP.')
def run_all_cases(self, atten_val, bw_limit): # type: (int, int) -> None
def run_all_cases(self, atten_val:int, bw_limit:int) -> None:
"""
run test for all types (udp_tx, udp_rx, tcp_tx, tcp_rx).
:param atten_val: attenuate value
:param bw_limit: bandwidth limit
"""
self.run_test('tcp', 'tx', atten_val, bw_limit)
self.run_test('tcp', 'rx', atten_val, bw_limit)
self.run_test('udp', 'tx', atten_val, bw_limit)
self.run_test('udp', 'rx', atten_val, bw_limit)
if self.fail_to_scan > 10:
Utility.console_log(
logging.info(
'Fail to scan AP for more than 10 times. Lowest RSSI scanned is {}'.format(self.lowest_rssi_scanned))
raise AssertionError
def wait_ap_power_on(self): # type: (Any) -> bool
def wait_ap_power_on(self) -> bool:
"""
AP need to take sometime to power on. It changes for different APs.
This method will scan to check if the AP powers on.
@ -444,15 +452,15 @@ class IperfTestUtility(object):
:return: True or False
"""
self.dut.write('restart')
self.dut.expect_any('iperf>', 'esp32>')
self.dut.expect('iperf>')
for _ in range(WAIT_AP_POWER_ON_TIMEOUT // SCAN_TIMEOUT):
try:
self.dut.write('scan {}'.format(self.ap_ssid))
self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)),
self.dut.expect(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid),
timeout=SCAN_TIMEOUT)
ret = True
break
except DUT.ExpectTimeout:
except pexpect.TIMEOUT:
pass
else:
ret = False

View File

@ -20,5 +20,8 @@ dbus-python; sys_platform == 'linux'
protobuf
paho-mqtt
# iperf_test_util
pyecharts
# for twai tests, communicate with socket can device (e.g. Canable)
python-can

View File

@ -20,9 +20,6 @@ future
dbus-python; sys_platform == 'linux'
pygobject; sys_platform != 'win32'
# iperf_test_util
pyecharts
# esp_prov
bleak
# future # addressed before under ble