import os import re import socket import subprocess import time from shutil import copyfile from threading import Event, Thread import ttfw_idf from tiny_test_fw import DUT, Utility stop_sock_listener = Event() stop_io_listener = Event() sock = None client_address = None manual_test = False def io_listener(dut1): global sock global client_address data = b'' while not stop_io_listener.is_set(): try: data = dut1.expect(re.compile(r'PacketOut:\[([a-fA-F0-9]+)\]'), timeout=5) except DUT.ExpectTimeout: continue if data != () and data[0] != b'': packet_data = data[0] print('Packet_data>{}<'.format(packet_data)) response = bytearray.fromhex(packet_data.decode()) print('Sending to socket:') packet = ' '.join(format(x, '02x') for x in bytearray(response)) print('Packet>{}<'.format(packet)) if client_address is not None: sock.sendto(response, ('127.0.0.1', 7777)) def sock_listener(dut1): global sock global client_address sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(5) server_address = '0.0.0.0' server_port = 7771 server = (server_address, server_port) sock.bind(server) try: while not stop_sock_listener.is_set(): try: payload, client_address = sock.recvfrom(1024) packet = ' '.join(format(x, '02x') for x in bytearray(payload)) print('Received from address {}, data {}'.format(client_address, packet)) dut1.write(str.encode(packet)) except socket.timeout: pass finally: sock.close() sock = None @ttfw_idf.idf_example_test(env_tag='Example_WIFI') def lwip_test_suite(env, extra_data): global stop_io_listener global stop_sock_listener """ steps: | 1. Rebuilds test suite with esp32_netsuite.ttcn 2. Starts listeners on stdout and socket 3. Execute ttcn3 test suite 4. Collect result from ttcn3 """ dut1 = env.get_dut('net_suite', 'examples/system/network_tests', dut_class=ttfw_idf.ESP32DUT) # check and log bin size binary_file = os.path.join(dut1.app.binary_path, 'net_suite.bin') bin_size = os.path.getsize(binary_file) ttfw_idf.log_performance('net_suite', '{}KB'.format(bin_size // 1024)) ttfw_idf.check_performance('net_suite', bin_size // 1024, dut1.TARGET) dut1.start_app() thread1 = Thread(target=sock_listener, args=(dut1, )) thread2 = Thread(target=io_listener, args=(dut1, )) if not manual_test: # Variables refering to esp32 ttcn test suite TTCN_SRC = 'esp32_netsuite.ttcn' TTCN_CFG = 'esp32_netsuite.cfg' # System Paths netsuite_path = os.getenv('NETSUITE_PATH') netsuite_src_path = os.path.join(netsuite_path, 'src') test_dir = os.path.dirname(os.path.realpath(__file__)) # Building the suite print('Rebuilding the test suite') print('-------------------------') # copy esp32 specific files to ttcn net-suite dir copyfile(os.path.join(test_dir, TTCN_SRC), os.path.join(netsuite_src_path, TTCN_SRC)) copyfile(os.path.join(test_dir, TTCN_CFG), os.path.join(netsuite_src_path, TTCN_CFG)) proc = subprocess.Popen(['bash', '-c', 'cd ' + netsuite_src_path + ' && source make.sh'], cwd=netsuite_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output = proc.stdout.read() print('Note: First build step we expect failure (titan/net_suite build system not suitable for multijob make)') print(output) proc = subprocess.Popen(['bash', '-c', 'cd ' + netsuite_src_path + ' && make'], cwd=netsuite_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE) print('Note: This time all dependencies shall be generated -- multijob make shall pass') output = proc.stdout.read() print(output) # Executing the test suite thread1.start() thread2.start() time.sleep(2) print('Executing the test suite') print('------------------------') proc = subprocess.Popen(['ttcn3_start', os.path.join(netsuite_src_path,'test_suite'), os.path.join(netsuite_src_path, TTCN_CFG)], stdout=subprocess.PIPE) output = proc.stdout.read() print(output) print('Collecting results') print('------------------') verdict_stats = re.search('(Verdict statistics:.*)', output) if verdict_stats: verdict_stats = verdict_stats.group(1) else: verdict_stats = b'' verdict = re.search('Overall verdict: pass', output) if verdict: print('Test passed!') Utility.console_log(verdict_stats, 'green') else: Utility.console_log(verdict_stats, 'red') raise ValueError('Test failed with: {}'.format(verdict_stats)) else: try: # Executing the test suite thread1.start() thread2.start() time.sleep(2) while True: time.sleep(0.5) except KeyboardInterrupt: pass print('Executing done, waiting for tests to finish') print('-------------------------------------------') stop_io_listener.set() stop_sock_listener.set() thread1.join() thread2.join() if __name__ == '__main__': print('Manual execution, please build and start ttcn in a separate console') manual_test = True lwip_test_suite()