Added bandwidth limitaion option to iperf test scripts

This commit is contained in:
Ondrej Kosta 2022-03-25 14:45:56 +01:00 committed by BOT
parent 3f7cf8a540
commit 4d581456e4
3 changed files with 47 additions and 22 deletions

View File

@ -24,6 +24,8 @@ except ImportError:
# Only used for type annotations
pass
NO_BANDWIDTH_LIMIT = -1 # iperf send bandwith is not limited
class IperfTestUtilityEth(IperfUtility.IperfTestUtility):
""" iperf test implementation """
@ -46,7 +48,7 @@ class IperfTestUtilityEth(IperfUtility.IperfTestUtility):
self.dut.write('ethernet start')
time.sleep(10)
self.dut.write('ethernet info')
dut_ip = self.dut.expect(re.compile(r'ETHIP: ([\d.]+)'))[0]
dut_ip = self.dut.expect(re.compile(r'ETHIP: (\d+[.]\d+[.]\d+[.]\d+)'))[0]
rssi = 0
return dut_ip, rssi
@ -78,7 +80,10 @@ def test_ethernet_throughput_basic(env, _): # type: (Any, Any) -> None
# 3. run test for TCP Tx, Rx and UDP Tx, Rx
test_utility.run_all_cases(0)
test_utility.run_test('tcp', 'tx', 0, NO_BANDWIDTH_LIMIT)
test_utility.run_test('tcp', 'rx', 0, NO_BANDWIDTH_LIMIT)
test_utility.run_test('udp', 'tx', 0, 80)
test_utility.run_test('udp', 'rx', 0, NO_BANDWIDTH_LIMIT)
# 4. log performance and compare with pass standard
performance_items = []

View File

@ -35,6 +35,7 @@ from tiny_test_fw import DUT, TinyFW, Utility
# configurations
RETRY_COUNT_FOR_BEST_PERFORMANCE = 2
ATTEN_VALUE_LIST = range(0, 60, 2)
NO_BANDWIDTH_LIMIT = -1 # iperf send bandwith is not limited
CONFIG_NAME_PATTERN = re.compile(r'sdkconfig\.ci\.(.+)')
@ -173,7 +174,7 @@ def test_wifi_throughput_with_different_configs(env, extra_data):
pc_iperf_log_file, test_result[config_name])
for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
test_utility.run_all_cases(0)
test_utility.run_all_cases(0, NO_BANDWIDTH_LIMIT)
for result_type in test_result[config_name]:
summary = str(test_result[config_name][result_type])
@ -234,7 +235,7 @@ def test_wifi_throughput_vs_rssi(env, extra_data):
for atten_val in ATTEN_VALUE_LIST:
assert Attenuator.set_att(att_port, atten_val) is True
try:
test_utility.run_all_cases(atten_val)
test_utility.run_all_cases(atten_val, NO_BANDWIDTH_LIMIT)
except AssertionError:
break
@ -280,7 +281,7 @@ def test_wifi_throughput_basic(env, extra_data):
# 3. run test for TCP Tx, Rx and UDP Tx, Rx
for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
test_utility.run_all_cases(0)
test_utility.run_all_cases(0, NO_BANDWIDTH_LIMIT)
# 4. log performance and compare with pass standard
performance_items = []
@ -335,7 +336,7 @@ def test_softap_throughput_vs_rssi(env, extra_data):
for atten_val in ATTEN_VALUE_LIST:
assert Attenuator.set_att(att_port, atten_val) is True
try:
test_utility.run_all_cases(atten_val)
test_utility.run_all_cases(atten_val, NO_BANDWIDTH_LIMIT)
except AssertionError:
break

View File

@ -286,7 +286,7 @@ class IperfTestUtility(object):
def _save_test_result(self, test_case, raw_data, att, rssi, heap_size): # type: (str, str, int, int, int) -> Any
return self.test_result[test_case].add_result(raw_data, self.ap_ssid, att, rssi, heap_size)
def _test_once(self, proto, direction): # type: (Any, str, str) -> Tuple[str, int, int]
def _test_once(self, proto, direction, bw_limit): # type: (Any, str, str, int) -> Tuple[str, int, int]
""" do measure once for one type """
# connect and scan to get RSSI
dut_ip, rssi = self.setup()
@ -301,12 +301,18 @@ class IperfTestUtility(object):
process = subprocess.Popen(['iperf', '-s', '-B', self.pc_nic_ip,
'-t', str(TEST_TIME), '-i', '1', '-f', 'm'],
stdout=f, stderr=f)
self.dut.write('iperf -c {} -i 1 -t {}'.format(self.pc_nic_ip, TEST_TIME))
if bw_limit > 0:
self.dut.write('iperf -c {} -i 1 -t {} -b {}'.format(self.pc_nic_ip, TEST_TIME, bw_limit))
else:
self.dut.write('iperf -c {} -i 1 -t {}'.format(self.pc_nic_ip, TEST_TIME))
else:
process = subprocess.Popen(['iperf', '-s', '-u', '-B', self.pc_nic_ip,
'-t', str(TEST_TIME), '-i', '1', '-f', 'm'],
stdout=f, stderr=f)
self.dut.write('iperf -c {} -u -i 1 -t {}'.format(self.pc_nic_ip, TEST_TIME))
if bw_limit > 0:
self.dut.write('iperf -c {} -u -i 1 -t {} -b {}'.format(self.pc_nic_ip, TEST_TIME, bw_limit))
else:
self.dut.write('iperf -c {} -u -i 1 -t {}'.format(self.pc_nic_ip, TEST_TIME))
for _ in range(TEST_TIMEOUT):
if process.poll() is not None:
@ -327,9 +333,12 @@ class IperfTestUtility(object):
except DUT.ExpectTimeout:
# compatible with old iperf example binary
Utility.console_log('create iperf tcp server fail')
process = subprocess.Popen(['iperf', '-c', dut_ip,
'-t', str(TEST_TIME), '-f', 'm'],
stdout=f, stderr=f)
if bw_limit > 0:
process = subprocess.Popen(['iperf', '-c', dut_ip, '-b', str(bw_limit) + 'm',
'-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f)
else:
process = subprocess.Popen(['iperf', '-c', dut_ip,
'-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f)
for _ in range(TEST_TIMEOUT):
if process.poll() is not None:
break
@ -344,15 +353,25 @@ class IperfTestUtility(object):
except DUT.ExpectTimeout:
# compatible with old iperf example binary
Utility.console_log('create iperf udp server fail')
for bandwidth in range(50, 101, 5):
process = subprocess.Popen(['iperf', '-c', dut_ip, '-u', '-b', str(bandwidth) + 'm',
'-t', str(TEST_TIME / 11), '-f', 'm'], stdout=f, stderr=f)
if bw_limit > 0:
process = subprocess.Popen(['iperf', '-c', dut_ip, '-u', '-b', str(bw_limit) + 'm',
'-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f)
for _ in range(TEST_TIMEOUT):
if process.poll() is not None:
break
time.sleep(1)
else:
process.terminate()
else:
for bandwidth in range(50, 101, 5):
process = subprocess.Popen(['iperf', '-c', dut_ip, '-u', '-b', str(bandwidth) + 'm',
'-t', str(TEST_TIME / 11), '-f', 'm'], stdout=f, stderr=f)
for _ in range(TEST_TIMEOUT):
if process.poll() is not None:
break
time.sleep(1)
else:
process.terminate()
server_raw_data = self.dut.read()
with open(PC_IPERF_TEMP_LOG_FILE, 'r') as f:
@ -371,7 +390,7 @@ class IperfTestUtility(object):
# return server raw data (for parsing test results) and RSSI
return server_raw_data, rssi, heap_size
def run_test(self, proto, direction, atten_val): # type: (str, str, int) -> None
def run_test(self, proto, direction, atten_val, bw_limit): # type: (str, str, int, int) -> None
"""
run test for one type, with specified atten_value and save the test result
@ -382,7 +401,7 @@ class IperfTestUtility(object):
rssi = FAILED_TO_SCAN_RSSI
heap_size = INVALID_HEAP_SIZE
try:
server_raw_data, rssi, heap_size = self._test_once(proto, direction)
server_raw_data, rssi, heap_size = self._test_once(proto, direction, bw_limit)
throughput = self._save_test_result('{}_{}'.format(proto, direction),
server_raw_data, atten_val,
rssi, heap_size)
@ -396,16 +415,16 @@ class IperfTestUtility(object):
self.fail_to_scan += 1
Utility.console_log('Fail to scan AP.')
def run_all_cases(self, atten_val): # type: (int) -> None
def run_all_cases(self, atten_val, bw_limit): # type: (int, int) -> None
"""
run test for all types (udp_tx, udp_rx, tcp_tx, tcp_rx).
:param atten_val: attenuate value
"""
self.run_test('tcp', 'tx', atten_val)
self.run_test('tcp', 'rx', atten_val)
self.run_test('udp', 'tx', atten_val)
self.run_test('udp', 'rx', atten_val)
self.run_test('tcp', 'tx', atten_val, bw_limit)
self.run_test('tcp', 'rx', atten_val, bw_limit)
self.run_test('udp', 'tx', atten_val, bw_limit)
self.run_test('udp', 'rx', atten_val, bw_limit)
if self.fail_to_scan > 10:
Utility.console_log(
'Fail to scan AP for more than 10 times. Lowest RSSI scanned is {}'.format(self.lowest_rssi_scanned))