Merge branch 'test/add_ethernet_iperf_example_test_case' into 'master'

Test: add ethernet iperf example test case

Closes TCI-463

See merge request espressif/esp-idf!13696
This commit is contained in:
He Yin Ling 2021-07-28 01:39:12 +00:00
commit 2849f3bf44
8 changed files with 599 additions and 443 deletions

View File

@ -129,6 +129,12 @@ example_test_002:
- ESP32 - ESP32
- Example_ShieldBox_Basic - Example_ShieldBox_Basic
example_test_enternet:
extends: .example_test_esp32_template
tags:
- ESP32
- Example_Ethernet
.example_test_003: .example_test_003:
extends: .example_test_esp32_template extends: .example_test_esp32_template
tags: tags:

View File

@ -50,6 +50,20 @@
#define IDF_PERFORMANCE_MIN_UDP_TX_THROUGHPUT 50 #define IDF_PERFORMANCE_MIN_UDP_TX_THROUGHPUT 50
#endif #endif
// throughput performance by ethernet iperf
#ifndef IDF_PERFORMANCE_MIN_TCP_RX_ETH_THROUGHPUT
#define IDF_PERFORMANCE_MIN_TCP_RX_ETH_THROUGHPUT 20
#endif
#ifndef IDF_PERFORMANCE_MIN_TCP_TX_ETH_THROUGHPUT
#define IDF_PERFORMANCE_MIN_TCP_TX_ETH_THROUGHPUT 30
#endif
#ifndef IDF_PERFORMANCE_MIN_UDP_RX_ETH_THROUGHPUT
#define IDF_PERFORMANCE_MIN_UDP_RX_ETH_THROUGHPUT 50
#endif
#ifndef IDF_PERFORMANCE_MIN_UDP_TX_ETH_THROUGHPUT
#define IDF_PERFORMANCE_MIN_UDP_TX_ETH_THROUGHPUT 70
#endif
// events dispatched per second by event loop library // events dispatched per second by event loop library
#ifndef IDF_PERFORMANCE_MIN_EVENT_DISPATCH #ifndef IDF_PERFORMANCE_MIN_EVENT_DISPATCH
#define IDF_PERFORMANCE_MIN_EVENT_DISPATCH 25000 #define IDF_PERFORMANCE_MIN_EVENT_DISPATCH 25000

View File

@ -0,0 +1,102 @@
"""
Test case for iperf example.
This test case might have problem running on windows:
1. direct use of `make`
2. use `sudo killall iperf` to force kill iperf, didn't implement windows version
"""
from __future__ import division, unicode_literals
import os
import re
import subprocess
import time
import ttfw_idf
from idf_iperf_test_util import IperfUtility
from tiny_test_fw import TinyFW
try:
from typing import Any, Tuple
except ImportError:
# Only used for type annotations
pass
class IperfTestUtilityEth(IperfUtility.IperfTestUtility):
""" iperf test implementation """
def __init__(self, dut, config_name, pc_nic_ip, pc_iperf_log_file, test_result=None): # type: (str, str, str,str, Any) -> None
IperfUtility.IperfTestUtility.__init__(self, dut, config_name, 'None', 'None', pc_nic_ip, pc_iperf_log_file, test_result)
def setup(self): # type: () -> Tuple[str,int]
"""
setup iperf test:
1. kill current iperf process
2. reboot DUT (currently iperf is not very robust, need to reboot DUT)
"""
try:
subprocess.check_output('sudo killall iperf 2>&1 > /dev/null', shell=True)
except subprocess.CalledProcessError:
pass
self.dut.write('restart')
self.dut.expect_any('iperf>', 'esp32>')
self.dut.write('ethernet start')
time.sleep(10)
self.dut.write('ethernet info')
dut_ip = self.dut.expect(re.compile(r'ETHIP: ([\d.]+)'))[0]
rssi = 0
return dut_ip, rssi
@ttfw_idf.idf_example_test(env_tag='Example_Ethernet')
def test_ethernet_throughput_basic(env, _): # type: (Any, Any) -> None
"""
steps: |
1. test TCP tx rx and UDP tx rx throughput
2. compare with the pre-defined pass standard
"""
pc_nic_ip = env.get_pc_nic_info('pc_nic', 'ipv4')['addr']
pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md')
# 1. get DUT
dut = env.get_dut('iperf', 'examples/ethernet/iperf', dut_class=ttfw_idf.ESP32DUT)
dut.start_app()
dut.expect_any('iperf>', 'esp32>')
# 2. preparing
test_result = {
'tcp_tx': IperfUtility.TestResult('tcp', 'tx', 'ethernet'),
'tcp_rx': IperfUtility.TestResult('tcp', 'rx', 'ethernet'),
'udp_tx': IperfUtility.TestResult('udp', 'tx', 'ethernet'),
'udp_rx': IperfUtility.TestResult('udp', 'rx', 'ethernet'),
}
test_utility = IperfTestUtilityEth(dut, 'ethernet', pc_nic_ip, pc_iperf_log_file, test_result)
# 3. run test for TCP Tx, Rx and UDP Tx, Rx
test_utility.run_all_cases(0)
# 4. log performance and compare with pass standard
performance_items = []
for throughput_type in test_result:
ttfw_idf.log_performance('{}_throughput'.format(throughput_type),
'{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput()))
performance_items.append(['{}_throughput'.format(throughput_type),
'{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput())])
# 5. save to report
TinyFW.JunitReport.update_performance(performance_items)
# do check after logging, otherwise test will exit immediately if check fail, some performance can't be logged.
for throughput_type in test_result:
ttfw_idf.check_performance('{}_throughput'.format(throughput_type + '_eth'),
test_result[throughput_type].get_best_throughput(), dut.TARGET)
env.close_dut('iperf')
if __name__ == '__main__':
test_ethernet_throughput_basic(env_config_file='EnvConfig.yml')

View File

@ -25,25 +25,17 @@ import os
import re import re
import subprocess import subprocess
import time import time
from builtins import object, range, str from builtins import range, str
import ttfw_idf import ttfw_idf
from idf_iperf_test_util import Attenuator, LineChart, PowerControl, TestReport from idf_iperf_test_util import Attenuator, IperfUtility, PowerControl, TestReport
from idf_iperf_test_util.IperfUtility import SCAN_RETRY_COUNT, SCAN_TIMEOUT, TEST_TIME
from tiny_test_fw import DUT, TinyFW, Utility from tiny_test_fw import DUT, TinyFW, Utility
# configurations # configurations
TEST_TIME = TEST_TIMEOUT = 60
WAIT_AP_POWER_ON_TIMEOUT = 90
SCAN_TIMEOUT = 3
SCAN_RETRY_COUNT = 3
RETRY_COUNT_FOR_BEST_PERFORMANCE = 2 RETRY_COUNT_FOR_BEST_PERFORMANCE = 2
ATTEN_VALUE_LIST = range(0, 60, 2) ATTEN_VALUE_LIST = range(0, 60, 2)
# constants
FAILED_TO_SCAN_RSSI = -97
INVALID_HEAP_SIZE = 0xFFFFFFFF
PC_IPERF_TEMP_LOG_FILE = '.tmp_iperf.log'
CONFIG_NAME_PATTERN = re.compile(r'sdkconfig\.ci\.(.+)') CONFIG_NAME_PATTERN = re.compile(r'sdkconfig\.ci\.(.+)')
# We need to auto compare the difference between adjacent configs (01 -> 00, 02 -> 01, ...) and put them to reports. # We need to auto compare the difference between adjacent configs (01 -> 00, 02 -> 01, ...) and put them to reports.
@ -52,397 +44,10 @@ CONFIG_NAME_PATTERN = re.compile(r'sdkconfig\.ci\.(.+)')
BEST_PERFORMANCE_CONFIG = '99' BEST_PERFORMANCE_CONFIG = '99'
class TestResult(object): class IperfTestUtilitySoftap(IperfUtility.IperfTestUtility):
""" record, analysis test result and convert data to output format """
PC_BANDWIDTH_LOG_PATTERN = re.compile(r'(\d+).0\s*-\s*(\d+).0\s+sec\s+[\d.]+\s+MBytes\s+([\d.]+)\s+Mbits/sec')
DUT_BANDWIDTH_LOG_PATTERN = re.compile(r'(\d+)-\s+(\d+)\s+sec\s+([\d.]+)\s+Mbits/sec')
ZERO_POINT_THRESHOLD = -88 # RSSI, dbm
ZERO_THROUGHPUT_THRESHOLD = -92 # RSSI, dbm
BAD_POINT_RSSI_THRESHOLD = -75 # RSSI, dbm
BAD_POINT_MIN_THRESHOLD = 10 # Mbps
BAD_POINT_PERCENTAGE_THRESHOLD = 0.3
# we need at least 1/2 valid points to qualify the test result
THROUGHPUT_QUALIFY_COUNT = TEST_TIME // 2
RSSI_RANGE = [-x for x in range(10, 100)]
ATT_RANGE = [x for x in range(0, 64)]
def __init__(self, proto, direction, config_name):
self.proto = proto
self.direction = direction
self.config_name = config_name
self.throughput_by_rssi = dict()
self.throughput_by_att = dict()
self.att_rssi_map = dict()
self.heap_size = INVALID_HEAP_SIZE
self.error_list = []
def _save_result(self, throughput, ap_ssid, att, rssi, heap_size):
"""
save the test results:
* record the better throughput if att/rssi is the same.
* record the min heap size.
"""
if ap_ssid not in self.att_rssi_map:
# for new ap, create empty dict()
self.throughput_by_att[ap_ssid] = dict()
self.throughput_by_rssi[ap_ssid] = dict()
self.att_rssi_map[ap_ssid] = dict()
self.att_rssi_map[ap_ssid][att] = rssi
def record_throughput(database, key_value):
try:
# we save the larger value for same att
if throughput > database[ap_ssid][key_value]:
database[ap_ssid][key_value] = throughput
except KeyError:
database[ap_ssid][key_value] = throughput
record_throughput(self.throughput_by_att, att)
record_throughput(self.throughput_by_rssi, rssi)
if int(heap_size) < self.heap_size:
self.heap_size = int(heap_size)
def add_result(self, raw_data, ap_ssid, att, rssi, heap_size):
"""
add result for one test
:param raw_data: iperf raw data
:param ap_ssid: ap ssid that tested
:param att: attenuate value
:param rssi: AP RSSI
:param heap_size: min heap size during test
:return: throughput
"""
fall_to_0_recorded = 0
throughput_list = []
result_list = self.PC_BANDWIDTH_LOG_PATTERN.findall(raw_data)
if not result_list:
# failed to find raw data by PC pattern, it might be DUT pattern
result_list = self.DUT_BANDWIDTH_LOG_PATTERN.findall(raw_data)
for result in result_list:
if int(result[1]) - int(result[0]) != 1:
# this could be summary, ignore this
continue
throughput_list.append(float(result[2]))
if float(result[2]) == 0 and rssi > self.ZERO_POINT_THRESHOLD \
and fall_to_0_recorded < 1:
# throughput fall to 0 error. we only record 1 records for one test
self.error_list.append('[Error][fall to 0][{}][att: {}][rssi: {}]: 0 throughput interval: {}-{}'
.format(ap_ssid, att, rssi, result[0], result[1]))
fall_to_0_recorded += 1
if len(throughput_list) > self.THROUGHPUT_QUALIFY_COUNT:
throughput = sum(throughput_list) / len(throughput_list)
else:
throughput = 0.0
if throughput == 0 and rssi > self.ZERO_THROUGHPUT_THRESHOLD:
self.error_list.append('[Error][Fatal][{}][att: {}][rssi: {}]: No throughput data found'
.format(ap_ssid, att, rssi))
self._save_result(throughput, ap_ssid, att, rssi, heap_size)
return throughput
def post_analysis(self):
"""
some rules need to be checked after we collected all test raw data:
1. throughput value 30% worse than the next point with lower RSSI
2. throughput value 30% worse than the next point with larger attenuate
"""
def analysis_bad_point(data, index_type):
for ap_ssid in data:
result_dict = data[ap_ssid]
index_list = list(result_dict.keys())
index_list.sort()
if index_type == 'att':
index_list.reverse()
for i, index_value in enumerate(index_list[1:]):
if index_value < self.BAD_POINT_RSSI_THRESHOLD or \
result_dict[index_list[i]] < self.BAD_POINT_MIN_THRESHOLD:
continue
_percentage = result_dict[index_value] / result_dict[index_list[i]]
if _percentage < 1 - self.BAD_POINT_PERCENTAGE_THRESHOLD:
self.error_list.append('[Error][Bad point][{}][{}: {}]: drop {:.02f}%'
.format(ap_ssid, index_type, index_value,
(1 - _percentage) * 100))
analysis_bad_point(self.throughput_by_rssi, 'rssi')
analysis_bad_point(self.throughput_by_att, 'att')
def draw_throughput_figure(self, path, ap_ssid, draw_type):
"""
:param path: folder to save figure. make sure the folder is already created.
:param ap_ssid: ap ssid string or a list of ap ssid string
:param draw_type: "att" or "rssi"
:return: file_name
"""
if draw_type == 'rssi':
type_name = 'RSSI'
data = self.throughput_by_rssi
range_list = self.RSSI_RANGE
elif draw_type == 'att':
type_name = 'Att'
data = self.throughput_by_att
range_list = self.ATT_RANGE
else:
raise AssertionError('draw type not supported')
if isinstance(ap_ssid, list):
file_name = 'ThroughputVs{}_{}_{}_{}.html'.format(type_name, self.proto, self.direction,
hash(ap_ssid)[:6])
else:
file_name = 'ThroughputVs{}_{}_{}_{}.html'.format(type_name, self.proto, self.direction, ap_ssid)
LineChart.draw_line_chart(os.path.join(path, file_name),
'Throughput Vs {} ({} {})'.format(type_name, self.proto, self.direction),
'{} (dbm)'.format(type_name),
'Throughput (Mbps)',
data, range_list)
return file_name
def draw_rssi_vs_att_figure(self, path, ap_ssid):
"""
:param path: folder to save figure. make sure the folder is already created.
:param ap_ssid: ap to use
:return: file_name
"""
if isinstance(ap_ssid, list):
file_name = 'AttVsRSSI_{}.html'.format(hash(ap_ssid)[:6])
else:
file_name = 'AttVsRSSI_{}.html'.format(ap_ssid)
LineChart.draw_line_chart(os.path.join(path, file_name),
'Att Vs RSSI',
'Att (dbm)',
'RSSI (dbm)',
self.att_rssi_map,
self.ATT_RANGE)
return file_name
def get_best_throughput(self):
""" get the best throughput during test """
best_for_aps = [max(self.throughput_by_att[ap_ssid].values())
for ap_ssid in self.throughput_by_att]
return max(best_for_aps)
def __str__(self):
"""
returns summary for this test:
1. test result (success or fail)
2. best performance for each AP
3. min free heap size during test
"""
if self.throughput_by_att:
ret = '[{}_{}][{}]: {}\r\n\r\n'.format(self.proto, self.direction, self.config_name,
'Fail' if self.error_list else 'Success')
ret += 'Performance for each AP:\r\n'
for ap_ssid in self.throughput_by_att:
ret += '[{}]: {:.02f} Mbps\r\n'.format(ap_ssid, max(self.throughput_by_att[ap_ssid].values()))
if self.heap_size != INVALID_HEAP_SIZE:
ret += 'Minimum heap size: {}'.format(self.heap_size)
else:
ret = ''
return ret
class IperfTestUtility(object):
""" iperf test implementation """
def __init__(self, dut, config_name, ap_ssid, ap_password,
pc_nic_ip, pc_iperf_log_file, test_result=None):
self.config_name = config_name
self.dut = dut
self.pc_iperf_log_file = pc_iperf_log_file
self.ap_ssid = ap_ssid
self.ap_password = ap_password
self.pc_nic_ip = pc_nic_ip
if test_result:
self.test_result = test_result
else:
self.test_result = {
'tcp_tx': TestResult('tcp', 'tx', config_name),
'tcp_rx': TestResult('tcp', 'rx', config_name),
'udp_tx': TestResult('udp', 'tx', config_name),
'udp_rx': TestResult('udp', 'rx', config_name),
}
def setup(self):
"""
setup iperf test:
1. kill current iperf process
2. reboot DUT (currently iperf is not very robust, need to reboot DUT)
3. scan to get AP RSSI
4. connect to AP
"""
try:
subprocess.check_output('sudo killall iperf 2>&1 > /dev/null', shell=True)
except subprocess.CalledProcessError:
pass
self.dut.write('restart')
self.dut.expect_any('iperf>', 'esp32>')
self.dut.write('scan {}'.format(self.ap_ssid))
for _ in range(SCAN_RETRY_COUNT):
try:
rssi = int(self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)),
timeout=SCAN_TIMEOUT)[0])
break
except DUT.ExpectTimeout:
continue
else:
raise AssertionError('Failed to scan AP')
self.dut.write('sta {} {}'.format(self.ap_ssid, self.ap_password))
dut_ip = self.dut.expect(re.compile(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)'))[0]
return dut_ip, rssi
def _save_test_result(self, test_case, raw_data, att, rssi, heap_size):
return self.test_result[test_case].add_result(raw_data, self.ap_ssid, att, rssi, heap_size)
def _test_once(self, proto, direction):
""" do measure once for one type """
# connect and scan to get RSSI
dut_ip, rssi = self.setup()
assert direction in ['rx', 'tx']
assert proto in ['tcp', 'udp']
# run iperf test
if direction == 'tx':
with open(PC_IPERF_TEMP_LOG_FILE, 'w') as f:
if proto == 'tcp':
process = subprocess.Popen(['iperf', '-s', '-B', self.pc_nic_ip,
'-t', str(TEST_TIME), '-i', '1', '-f', 'm'],
stdout=f, stderr=f)
self.dut.write('iperf -c {} -i 1 -t {}'.format(self.pc_nic_ip, TEST_TIME))
else:
process = subprocess.Popen(['iperf', '-s', '-u', '-B', self.pc_nic_ip,
'-t', str(TEST_TIME), '-i', '1', '-f', 'm'],
stdout=f, stderr=f)
self.dut.write('iperf -c {} -u -i 1 -t {}'.format(self.pc_nic_ip, TEST_TIME))
for _ in range(TEST_TIMEOUT):
if process.poll() is not None:
break
time.sleep(1)
else:
process.terminate()
with open(PC_IPERF_TEMP_LOG_FILE, 'r') as f:
pc_raw_data = server_raw_data = f.read()
else:
with open(PC_IPERF_TEMP_LOG_FILE, 'w') as f:
if proto == 'tcp':
self.dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME))
# wait until DUT TCP server created
try:
self.dut.expect('iperf tcp server create successfully', timeout=1)
except DUT.ExpectTimeout:
# compatible with old iperf example binary
pass
process = subprocess.Popen(['iperf', '-c', dut_ip,
'-t', str(TEST_TIME), '-f', 'm'],
stdout=f, stderr=f)
else:
self.dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME))
process = subprocess.Popen(['iperf', '-c', dut_ip, '-u', '-b', '100M',
'-t', str(TEST_TIME), '-f', 'm'],
stdout=f, stderr=f)
for _ in range(TEST_TIMEOUT):
if process.poll() is not None:
break
time.sleep(1)
else:
process.terminate()
server_raw_data = self.dut.read()
with open(PC_IPERF_TEMP_LOG_FILE, 'r') as f:
pc_raw_data = f.read()
# save PC iperf logs to console
with open(self.pc_iperf_log_file, 'a+') as f:
f.write('## [{}] `{}`\r\n##### {}'
.format(self.config_name,
'{}_{}'.format(proto, direction),
time.strftime('%m-%d %H:%M:%S', time.localtime(time.time()))))
f.write('\r\n```\r\n\r\n' + pc_raw_data + '\r\n```\r\n')
self.dut.write('heap')
heap_size = self.dut.expect(re.compile(r'min heap size: (\d+)\D'))[0]
# return server raw data (for parsing test results) and RSSI
return server_raw_data, rssi, heap_size
def run_test(self, proto, direction, atten_val):
"""
run test for one type, with specified atten_value and save the test result
:param proto: tcp or udp
:param direction: tx or rx
:param atten_val: attenuate value
"""
rssi = FAILED_TO_SCAN_RSSI
heap_size = INVALID_HEAP_SIZE
try:
server_raw_data, rssi, heap_size = self._test_once(proto, direction)
throughput = self._save_test_result('{}_{}'.format(proto, direction),
server_raw_data, atten_val,
rssi, heap_size)
Utility.console_log('[{}][{}_{}][{}][{}]: {:.02f}'
.format(self.config_name, proto, direction, rssi, self.ap_ssid, throughput))
except Exception as e:
self._save_test_result('{}_{}'.format(proto, direction), '', atten_val, rssi, heap_size)
Utility.console_log('Failed during test: {}'.format(e))
def run_all_cases(self, atten_val):
"""
run test for all types (udp_tx, udp_rx, tcp_tx, tcp_rx).
:param atten_val: attenuate value
"""
self.run_test('tcp', 'tx', atten_val)
self.run_test('tcp', 'rx', atten_val)
self.run_test('udp', 'tx', atten_val)
self.run_test('udp', 'rx', atten_val)
def wait_ap_power_on(self):
"""
AP need to take sometime to power on. It changes for different APs.
This method will scan to check if the AP powers on.
:return: True or False
"""
self.dut.write('restart')
self.dut.expect_any('iperf>', 'esp32>')
for _ in range(WAIT_AP_POWER_ON_TIMEOUT // SCAN_TIMEOUT):
try:
self.dut.write('scan {}'.format(self.ap_ssid))
self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)),
timeout=SCAN_TIMEOUT)
ret = True
break
except DUT.ExpectTimeout:
pass
else:
ret = False
return ret
class IperfTestUtilitySoftap(IperfTestUtility):
""" iperf test implementation """ """ iperf test implementation """
def __init__(self, dut, softap_dut, config_name, test_result=None): def __init__(self, dut, softap_dut, config_name, test_result=None):
super(IperfTestUtility, self).__init__(dut, config_name, 'softap', '1234567890', None, None, test_result=None) IperfUtility.IperfTestUtility.__init__(self, dut, config_name, 'softap', '1234567890', None, None, test_result)
self.softap_dut = softap_dut self.softap_dut = softap_dut
self.softap_ip = '192.168.4.1' self.softap_ip = '192.168.4.1'
@ -524,7 +129,7 @@ class IperfTestUtilitySoftap(IperfTestUtility):
return server_raw_data, rssi, heap_size return server_raw_data, rssi, heap_size
@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox_Basic', target=['ESP32', 'ESP32S2', 'ESP32C3'], category='stress') @ttfw_idf.idf_example_test(env_tag='Example_ShieldBox_Basic', target=['ESP32', 'ESP32S2', 'ESP32C3', 'ESP32S3'], category='stress')
def test_wifi_throughput_with_different_configs(env, extra_data): def test_wifi_throughput_with_different_configs(env, extra_data):
""" """
steps: | steps: |
@ -558,14 +163,14 @@ def test_wifi_throughput_with_different_configs(env, extra_data):
# 3. run test for each required att value # 3. run test for each required att value
test_result[config_name] = { test_result[config_name] = {
'tcp_tx': TestResult('tcp', 'tx', config_name), 'tcp_tx': IperfUtility.TestResult('tcp', 'tx', config_name),
'tcp_rx': TestResult('tcp', 'rx', config_name), 'tcp_rx': IperfUtility.TestResult('tcp', 'rx', config_name),
'udp_tx': TestResult('udp', 'tx', config_name), 'udp_tx': IperfUtility.TestResult('udp', 'tx', config_name),
'udp_rx': TestResult('udp', 'rx', config_name), 'udp_rx': IperfUtility.TestResult('udp', 'rx', config_name),
} }
test_utility = IperfTestUtility(dut, config_name, ap_info['ssid'], test_utility = IperfUtility.IperfTestUtility(dut, config_name, ap_info['ssid'], ap_info['password'], pc_nic_ip,
ap_info['password'], pc_nic_ip, pc_iperf_log_file, test_result[config_name]) pc_iperf_log_file, test_result[config_name])
for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE): for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
test_utility.run_all_cases(0) test_utility.run_all_cases(0)
@ -579,12 +184,13 @@ def test_wifi_throughput_with_different_configs(env, extra_data):
env.close_dut('iperf') env.close_dut('iperf')
# 5. generate report # 5. generate report
report = TestReport.ThroughputForConfigsReport(os.path.join(env.log_path, 'ThroughputForConfigsReport'), report = TestReport.ThroughputForConfigsReport(os.path.join(env.log_path, 'Performance',
'ThroughputForConfigsReport'),
ap_info['ssid'], test_result, sdkconfig_files) ap_info['ssid'], test_result, sdkconfig_files)
report.generate_report() report.generate_report()
@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox', target=['ESP32', 'ESP32S2', 'ESP32C3'], category='stress') @ttfw_idf.idf_example_test(env_tag='Example_ShieldBox', target=['ESP32', 'ESP32S2', 'ESP32C3', 'ESP32S3'], category='stress')
def test_wifi_throughput_vs_rssi(env, extra_data): def test_wifi_throughput_vs_rssi(env, extra_data):
""" """
steps: | steps: |
@ -600,10 +206,10 @@ def test_wifi_throughput_vs_rssi(env, extra_data):
pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md') pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md')
test_result = { test_result = {
'tcp_tx': TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG), 'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
'tcp_rx': TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG), 'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
'udp_tx': TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG), 'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
'udp_rx': TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG), 'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
} }
# 1. get DUT and download # 1. get DUT and download
@ -613,8 +219,8 @@ def test_wifi_throughput_vs_rssi(env, extra_data):
# 2. run test for each required att value # 2. run test for each required att value
for ap_info in ap_list: for ap_info in ap_list:
test_utility = IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'], ap_info['password'], test_utility = IperfUtility.IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'],
pc_nic_ip, pc_iperf_log_file, test_result) ap_info['password'], pc_nic_ip, pc_iperf_log_file, test_result)
PowerControl.Control.control_rest(apc_ip, ap_info['outlet'], 'OFF') PowerControl.Control.control_rest(apc_ip, ap_info['outlet'], 'OFF')
PowerControl.Control.control(apc_ip, {ap_info['outlet']: 'ON'}) PowerControl.Control.control(apc_ip, {ap_info['outlet']: 'ON'})
@ -627,19 +233,22 @@ def test_wifi_throughput_vs_rssi(env, extra_data):
for atten_val in ATTEN_VALUE_LIST: for atten_val in ATTEN_VALUE_LIST:
assert Attenuator.set_att(att_port, atten_val) is True assert Attenuator.set_att(att_port, atten_val) is True
test_utility.run_all_cases(atten_val) try:
test_utility.run_all_cases(atten_val)
except AssertionError:
break
# 3. check test results # 3. check test results
env.close_dut('iperf') env.close_dut('iperf')
# 4. generate report # 4. generate report
report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, 'STAThroughputVsRssiReport'), report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, 'Performance', 'STAThroughputVsRssiReport'),
test_result) test_result)
report.generate_report() report.generate_report()
@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox_Basic', @ttfw_idf.idf_example_test(env_tag='Example_ShieldBox_Basic',
target=['ESP32', 'ESP32S2', 'ESP32C3'], ci_target=['ESP32']) target=['ESP32', 'ESP32S2', 'ESP32C3', 'ESP32S3'], ci_target=['ESP32'])
def test_wifi_throughput_basic(env, extra_data): def test_wifi_throughput_basic(env, extra_data):
""" """
steps: | steps: |
@ -660,14 +269,14 @@ def test_wifi_throughput_basic(env, extra_data):
# 2. preparing # 2. preparing
test_result = { test_result = {
'tcp_tx': TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG), 'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
'tcp_rx': TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG), 'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
'udp_tx': TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG), 'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
'udp_rx': TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG), 'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
} }
test_utility = IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'], test_utility = IperfUtility.IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'], ap_info['password'],
ap_info['password'], pc_nic_ip, pc_iperf_log_file, test_result) pc_nic_ip, pc_iperf_log_file, test_result)
# 3. run test for TCP Tx, Rx and UDP Tx, Rx # 3. run test for TCP Tx, Rx and UDP Tx, Rx
for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE): for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
@ -691,7 +300,7 @@ def test_wifi_throughput_basic(env, extra_data):
env.close_dut('iperf') env.close_dut('iperf')
@ttfw_idf.idf_example_test(env_tag='Example_ShieldBox2', target=['ESP32', 'ESP32S2', 'ESP32C3'], category='stress') @ttfw_idf.idf_example_test(env_tag='Example_ShieldBox2', target=['ESP32', 'ESP32S2', 'ESP32C3', 'ESP32S3'], category='stress')
def test_softap_throughput_vs_rssi(env, extra_data): def test_softap_throughput_vs_rssi(env, extra_data):
""" """
steps: | steps: |
@ -703,10 +312,10 @@ def test_softap_throughput_vs_rssi(env, extra_data):
att_port = env.get_variable('attenuator_port') att_port = env.get_variable('attenuator_port')
test_result = { test_result = {
'tcp_tx': TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG), 'tcp_tx': IperfUtility.TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
'tcp_rx': TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG), 'tcp_rx': IperfUtility.TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
'udp_tx': TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG), 'udp_tx': IperfUtility.TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
'udp_rx': TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG), 'udp_rx': IperfUtility.TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
} }
# 1. get DUT and download # 1. get DUT and download
@ -725,14 +334,17 @@ def test_softap_throughput_vs_rssi(env, extra_data):
for atten_val in ATTEN_VALUE_LIST: for atten_val in ATTEN_VALUE_LIST:
assert Attenuator.set_att(att_port, atten_val) is True assert Attenuator.set_att(att_port, atten_val) is True
test_utility.run_all_cases(atten_val) try:
test_utility.run_all_cases(atten_val)
except AssertionError:
break
env.close_dut('softap_iperf') env.close_dut('softap_iperf')
env.close_dut('sta_iperf') env.close_dut('sta_iperf')
# 3. generate report # 3. generate report
report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, 'SoftAPThroughputVsRssiReport'), report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, 'Performance',
test_result) 'SoftAPThroughputVsRssiReport'),test_result)
report.generate_report() report.generate_report()

View File

@ -0,0 +1,429 @@
import os
import re
import subprocess
import time
from builtins import object, range, str
from idf_iperf_test_util import LineChart
from tiny_test_fw import DUT, Utility
try:
from typing import Any, Tuple
except ImportError:
# Only used for type annotations
pass
# configurations
TEST_TIME = TEST_TIMEOUT = 60
WAIT_AP_POWER_ON_TIMEOUT = 90
SCAN_TIMEOUT = 3
SCAN_RETRY_COUNT = 3
# constants
FAILED_TO_SCAN_RSSI = -97
INVALID_HEAP_SIZE = 0xFFFFFFFF
PC_IPERF_TEMP_LOG_FILE = '.tmp_iperf.log'
class TestResult(object):
""" record, analysis test result and convert data to output format """
PC_BANDWIDTH_LOG_PATTERN = re.compile(r'(\d+).0\s*-\s*(\d+).0\s+sec\s+[\d.]+\s+MBytes\s+([\d.]+)\s+Mbits/sec')
DUT_BANDWIDTH_LOG_PATTERN = re.compile(r'(\d+)-\s+(\d+)\s+sec\s+([\d.]+)\s+Mbits/sec')
ZERO_POINT_THRESHOLD = -88 # RSSI, dbm
ZERO_THROUGHPUT_THRESHOLD = -92 # RSSI, dbm
BAD_POINT_RSSI_THRESHOLD = -75 # RSSI, dbm
BAD_POINT_MIN_THRESHOLD = 10 # Mbps
BAD_POINT_PERCENTAGE_THRESHOLD = 0.3
# we need at least 1/2 valid points to qualify the test result
THROUGHPUT_QUALIFY_COUNT = TEST_TIME // 2
RSSI_RANGE = [-x for x in range(10, 100)]
ATT_RANGE = [x for x in range(0, 64)]
def __init__(self, proto, direction, config_name): # type: (str, str, str) -> None
self.proto = proto
self.direction = direction
self.config_name = config_name
self.throughput_by_rssi = dict() # type: dict
self.throughput_by_att = dict() # type: dict
self.att_rssi_map = dict() # type: dict
self.heap_size = INVALID_HEAP_SIZE
self.error_list = [] # type: list[str]
def _save_result(self, throughput, ap_ssid, att, rssi, heap_size): # type: (float, str, int, int, str) -> None
"""
save the test results:
* record the better throughput if att/rssi is the same.
* record the min heap size.
"""
if ap_ssid not in self.att_rssi_map:
# for new ap, create empty dict()
self.throughput_by_att[ap_ssid] = dict()
self.throughput_by_rssi[ap_ssid] = dict()
self.att_rssi_map[ap_ssid] = dict()
self.att_rssi_map[ap_ssid][att] = rssi
def record_throughput(database, key_value): # type: (dict, int) -> None
try:
# we save the larger value for same att
if throughput > database[ap_ssid][key_value]:
database[ap_ssid][key_value] = throughput
except KeyError:
database[ap_ssid][key_value] = throughput
record_throughput(self.throughput_by_att, att)
record_throughput(self.throughput_by_rssi, rssi)
if int(heap_size) < self.heap_size:
self.heap_size = int(heap_size)
def add_result(self, raw_data, ap_ssid, att, rssi, heap_size): # type: (str, str, int, int, str) -> float
"""
add result for one test
:param raw_data: iperf raw data
:param ap_ssid: ap ssid that tested
:param att: attenuate value
:param rssi: AP RSSI
:param heap_size: min heap size during test
:return: throughput
"""
fall_to_0_recorded = 0
throughput_list = []
result_list = self.PC_BANDWIDTH_LOG_PATTERN.findall(raw_data)
if not result_list:
# failed to find raw data by PC pattern, it might be DUT pattern
result_list = self.DUT_BANDWIDTH_LOG_PATTERN.findall(raw_data)
for result in result_list:
if int(result[1]) - int(result[0]) != 1:
# this could be summary, ignore this
continue
throughput_list.append(float(result[2]))
if float(result[2]) == 0 and rssi > self.ZERO_POINT_THRESHOLD \
and fall_to_0_recorded < 1:
# throughput fall to 0 error. we only record 1 records for one test
self.error_list.append('[Error][fall to 0][{}][att: {}][rssi: {}]: 0 throughput interval: {}-{}'
.format(ap_ssid, att, rssi, result[0], result[1]))
fall_to_0_recorded += 1
if len(throughput_list) > self.THROUGHPUT_QUALIFY_COUNT:
throughput = sum(throughput_list) / len(throughput_list)
else:
throughput = 0.0
if throughput == 0 and rssi > self.ZERO_THROUGHPUT_THRESHOLD:
self.error_list.append('[Error][Fatal][{}][att: {}][rssi: {}]: No throughput data found'
.format(ap_ssid, att, rssi))
self._save_result(throughput, ap_ssid, att, rssi, heap_size)
return throughput
def post_analysis(self): # type: () -> None
"""
some rules need to be checked after we collected all test raw data:
1. throughput value 30% worse than the next point with lower RSSI
2. throughput value 30% worse than the next point with larger attenuate
"""
def analysis_bad_point(data, index_type): # type: (dict, str) -> None
for ap_ssid in data:
result_dict = data[ap_ssid]
index_list = list(result_dict.keys())
index_list.sort()
if index_type == 'att':
index_list.reverse()
for i, index_value in enumerate(index_list[1:]):
if index_value < self.BAD_POINT_RSSI_THRESHOLD or \
result_dict[index_list[i]] < self.BAD_POINT_MIN_THRESHOLD:
continue
_percentage = result_dict[index_value] / result_dict[index_list[i]]
if _percentage < 1 - self.BAD_POINT_PERCENTAGE_THRESHOLD:
self.error_list.append('[Error][Bad point][{}][{}: {}]: drop {:.02f}%'
.format(ap_ssid, index_type, index_value,
(1 - _percentage) * 100))
analysis_bad_point(self.throughput_by_rssi, 'rssi')
analysis_bad_point(self.throughput_by_att, 'att')
def draw_throughput_figure(self, path, ap_ssid, draw_type): # type: (str, str, str) -> str
"""
:param path: folder to save figure. make sure the folder is already created.
:param ap_ssid: ap ssid string or a list of ap ssid string
:param draw_type: "att" or "rssi"
:return: file_name
"""
if draw_type == 'rssi':
type_name = 'RSSI'
data = self.throughput_by_rssi
range_list = self.RSSI_RANGE
elif draw_type == 'att':
type_name = 'Att'
data = self.throughput_by_att
range_list = self.ATT_RANGE
else:
raise AssertionError('draw type not supported')
if isinstance(ap_ssid, list):
file_name = 'ThroughputVs{}_{}_{}.html'.format(type_name, self.proto, self.direction)
else:
file_name = 'ThroughputVs{}_{}_{}.html'.format(type_name, self.proto, self.direction)
LineChart.draw_line_chart(os.path.join(path, file_name),
'Throughput Vs {} ({} {})'.format(type_name, self.proto, self.direction),
'{} (dbm)'.format(type_name),
'Throughput (Mbps)',
data, range_list)
return file_name
def draw_rssi_vs_att_figure(self, path, ap_ssid): # type: (str, str) -> str
"""
:param path: folder to save figure. make sure the folder is already created.
:param ap_ssid: ap to use
:return: file_name
"""
if isinstance(ap_ssid, list):
file_name = 'AttVsRSSI.html'
else:
file_name = 'AttVsRSSI.html'
LineChart.draw_line_chart(os.path.join(path, file_name),
'Att Vs RSSI',
'Att (dbm)',
'RSSI (dbm)',
self.att_rssi_map,
self.ATT_RANGE)
return file_name
def get_best_throughput(self): # type: () -> Any
""" get the best throughput during test """
best_for_aps = [max(self.throughput_by_att[ap_ssid].values())
for ap_ssid in self.throughput_by_att]
return max(best_for_aps)
def __str__(self): # type: () -> str
"""
returns summary for this test:
1. test result (success or fail)
2. best performance for each AP
3. min free heap size during test
"""
if self.throughput_by_att:
ret = '[{}_{}][{}]: {}\r\n\r\n'.format(self.proto, self.direction, self.config_name,
'Fail' if self.error_list else 'Success')
ret += 'Performance for each AP:\r\n'
for ap_ssid in self.throughput_by_att:
ret += '[{}]: {:.02f} Mbps\r\n'.format(ap_ssid, max(self.throughput_by_att[ap_ssid].values()))
if self.heap_size != INVALID_HEAP_SIZE:
ret += 'Minimum heap size: {}'.format(self.heap_size)
else:
ret = ''
return ret
class IperfTestUtility(object):
""" iperf test implementation """
def __init__(self, dut, config_name, ap_ssid, ap_password,
pc_nic_ip, pc_iperf_log_file, test_result=None): # type: (str, str, str, str, str, str, Any) -> None
self.config_name = config_name
self.dut = dut
self.pc_iperf_log_file = pc_iperf_log_file
self.ap_ssid = ap_ssid
self.ap_password = ap_password
self.pc_nic_ip = pc_nic_ip
self.fail_to_scan = 0
self.lowest_rssi_scanned = 0
if test_result:
self.test_result = test_result
else:
self.test_result = {
'tcp_tx': TestResult('tcp', 'tx', config_name),
'tcp_rx': TestResult('tcp', 'rx', config_name),
'udp_tx': TestResult('udp', 'tx', config_name),
'udp_rx': TestResult('udp', 'rx', config_name),
}
def setup(self): # type: (Any) -> Tuple[str,int]
"""
setup iperf test:
1. kill current iperf process
2. reboot DUT (currently iperf is not very robust, need to reboot DUT)
3. scan to get AP RSSI
4. connect to AP
"""
try:
subprocess.check_output('sudo killall iperf 2>&1 > /dev/null', shell=True)
except subprocess.CalledProcessError:
pass
time.sleep(5)
self.dut.write('restart')
self.dut.expect_any('iperf>', 'esp32>')
self.dut.write('scan {}'.format(self.ap_ssid))
for _ in range(SCAN_RETRY_COUNT):
try:
rssi = int(self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)),
timeout=SCAN_TIMEOUT)[0])
break
except DUT.ExpectTimeout:
continue
else:
raise AssertionError('Failed to scan AP')
self.dut.write('sta {} {}'.format(self.ap_ssid, self.ap_password))
dut_ip = self.dut.expect(re.compile(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)'))[0]
return dut_ip, rssi
def _save_test_result(self, test_case, raw_data, att, rssi, heap_size): # type: (str, str, int, int, int) -> Any
return self.test_result[test_case].add_result(raw_data, self.ap_ssid, att, rssi, heap_size)
def _test_once(self, proto, direction): # type: (Any, str, str) -> Tuple[str, int, int]
""" do measure once for one type """
# connect and scan to get RSSI
dut_ip, rssi = self.setup()
assert direction in ['rx', 'tx']
assert proto in ['tcp', 'udp']
# run iperf test
if direction == 'tx':
with open(PC_IPERF_TEMP_LOG_FILE, 'w') as f:
if proto == 'tcp':
process = subprocess.Popen(['iperf', '-s', '-B', self.pc_nic_ip,
'-t', str(TEST_TIME), '-i', '1', '-f', 'm'],
stdout=f, stderr=f)
self.dut.write('iperf -c {} -i 1 -t {}'.format(self.pc_nic_ip, TEST_TIME))
else:
process = subprocess.Popen(['iperf', '-s', '-u', '-B', self.pc_nic_ip,
'-t', str(TEST_TIME), '-i', '1', '-f', 'm'],
stdout=f, stderr=f)
self.dut.write('iperf -c {} -u -i 1 -t {}'.format(self.pc_nic_ip, TEST_TIME))
for _ in range(TEST_TIMEOUT):
if process.poll() is not None:
break
time.sleep(1)
else:
process.terminate()
with open(PC_IPERF_TEMP_LOG_FILE, 'r') as f:
pc_raw_data = server_raw_data = f.read()
else:
with open(PC_IPERF_TEMP_LOG_FILE, 'w') as f:
if proto == 'tcp':
self.dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME))
# wait until DUT TCP server created
try:
self.dut.expect('iperf tcp server create successfully', timeout=1)
except DUT.ExpectTimeout:
# compatible with old iperf example binary
Utility.console_log('create iperf tcp server fail')
process = subprocess.Popen(['iperf', '-c', dut_ip,
'-t', str(TEST_TIME), '-f', 'm'],
stdout=f, stderr=f)
else:
self.dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME))
# wait until DUT TCP server created
try:
self.dut.expect('iperf udp server create successfully', timeout=1)
except DUT.ExpectTimeout:
# compatible with old iperf example binary
Utility.console_log('create iperf udp server fail')
process = subprocess.Popen(['iperf', '-c', dut_ip, '-u', '-b', '100M',
'-t', str(TEST_TIME), '-f', 'm'],
stdout=f, stderr=f)
for _ in range(TEST_TIMEOUT):
if process.poll() is not None:
break
time.sleep(1)
else:
process.terminate()
server_raw_data = self.dut.read()
with open(PC_IPERF_TEMP_LOG_FILE, 'r') as f:
pc_raw_data = f.read()
# save PC iperf logs to console
with open(self.pc_iperf_log_file, 'a+') as f:
f.write('## [{}] `{}`\r\n##### {}'
.format(self.config_name,
'{}_{}'.format(proto, direction),
time.strftime('%m-%d %H:%M:%S', time.localtime(time.time()))))
f.write('\r\n```\r\n\r\n' + pc_raw_data + '\r\n```\r\n')
self.dut.write('heap')
heap_size = self.dut.expect(re.compile(r'min heap size: (\d+)\D'))[0]
# return server raw data (for parsing test results) and RSSI
return server_raw_data, rssi, heap_size
def run_test(self, proto, direction, atten_val): # type: (str, str, int) -> None
"""
run test for one type, with specified atten_value and save the test result
:param proto: tcp or udp
:param direction: tx or rx
:param atten_val: attenuate value
"""
rssi = FAILED_TO_SCAN_RSSI
heap_size = INVALID_HEAP_SIZE
try:
server_raw_data, rssi, heap_size = self._test_once(proto, direction)
throughput = self._save_test_result('{}_{}'.format(proto, direction),
server_raw_data, atten_val,
rssi, heap_size)
Utility.console_log('[{}][{}_{}][{}][{}]: {:.02f}'
.format(self.config_name, proto, direction, rssi, self.ap_ssid, throughput))
self.lowest_rssi_scanned = min(self.lowest_rssi_scanned, rssi)
except (ValueError, IndexError):
self._save_test_result('{}_{}'.format(proto, direction), '', atten_val, rssi, heap_size)
Utility.console_log('Fail to get throughput results.')
except AssertionError:
self.fail_to_scan += 1
Utility.console_log('Fail to scan AP.')
def run_all_cases(self, atten_val): # type: (int) -> None
"""
run test for all types (udp_tx, udp_rx, tcp_tx, tcp_rx).
:param atten_val: attenuate value
"""
self.run_test('tcp', 'tx', atten_val)
self.run_test('tcp', 'rx', atten_val)
self.run_test('udp', 'tx', atten_val)
self.run_test('udp', 'rx', atten_val)
if self.fail_to_scan > 10:
Utility.console_log(
'Fail to scan AP for more than 10 times. Lowest RSSI scanned is {}'.format(self.lowest_rssi_scanned))
raise AssertionError
def wait_ap_power_on(self): # type: (Any) -> bool
"""
AP need to take sometime to power on. It changes for different APs.
This method will scan to check if the AP powers on.
:return: True or False
"""
self.dut.write('restart')
self.dut.expect_any('iperf>', 'esp32>')
for _ in range(WAIT_AP_POWER_ON_TIMEOUT // SCAN_TIMEOUT):
try:
self.dut.write('scan {}'.format(self.ap_ssid))
self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)),
timeout=SCAN_TIMEOUT)
ret = True
break
except DUT.ExpectTimeout:
pass
else:
ret = False
return ret

View File

@ -0,0 +1 @@
pyecharts

View File

@ -31,14 +31,8 @@ Config file format is yaml. it's a set of key-value pair. The following is an ex
It will first define the env tag for each environment, then add its key-value pairs. It will first define the env tag for each environment, then add its key-value pairs.
This will prevent test cases from getting configs from other env when there're configs for multiple env in one file. This will prevent test cases from getting configs from other env when there're configs for multiple env in one file.
""" """
import logging
import yaml import yaml
from yaml import Loader as Loader
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader as Loader
class Config(object): class Config(object):
@ -59,11 +53,8 @@ class Config(object):
try: try:
with open(config_file) as f: with open(config_file) as f:
configs = yaml.load(f, Loader=Loader)[env_name] configs = yaml.load(f, Loader=Loader)[env_name]
except (OSError, TypeError, IOError): except (OSError, TypeError, IOError, KeyError):
configs = dict() configs = dict()
except KeyError:
logging.error('No config env "{}" in config file "{}"'.format(env_name, config_file))
raise
return configs return configs
def get_variable(self, variable_name): def get_variable(self, variable_name):

View File

@ -1,3 +1,4 @@
-r ../tiny_test_fw/requirements.txt -r ../tiny_test_fw/requirements.txt
pexpect pexpect
python-gitlab python-gitlab
pygdbmi<=0.9.0.2