diff --git a/.gitlab/ci/target-test.yml b/.gitlab/ci/target-test.yml index a1921bd906..83c924955c 100644 --- a/.gitlab/ci/target-test.yml +++ b/.gitlab/ci/target-test.yml @@ -869,14 +869,6 @@ example_test_001C: - ESP32 - Example_GENERIC -example_test_protocols: - extends: - - .example_test_esp32_template - - .rules:test:example_test-esp32-wifi - tags: - - ESP32 - - wifi_router - example_test_002: extends: - .example_test_esp32_template diff --git a/examples/protocols/sockets/non_blocking/example_test.py b/examples/protocols/sockets/non_blocking/example_test.py deleted file mode 100644 index ecd9522a85..0000000000 --- a/examples/protocols/sockets/non_blocking/example_test.py +++ /dev/null @@ -1,26 +0,0 @@ -# This example code is in the Public Domain (or CC0 licensed, at your option.) - -# Unless required by applicable law or agreed to in writing, this -# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. - -# -*- coding: utf-8 -*- - -from __future__ import print_function, unicode_literals - -import re - -import ttfw_idf - - -@ttfw_idf.idf_example_test(env_tag='Example_GENERIC') -def test_examples_protocol_socket_non_block(env, _): - dut = env.get_dut('non_blocking_socket', 'examples/protocols/sockets/non_blocking', dut_class=ttfw_idf.ESP32DUT) - - # start the test and expect the client to receive back it's original data - dut.start_app() - dut.expect(re.compile(r'nonblocking-socket-client: Received: GET / HTTP/1.1'), timeout=30) - - -if __name__ == '__main__': - test_examples_protocol_socket_non_block() diff --git a/examples/protocols/sockets/non_blocking/pytet_socket_non_blocking.py b/examples/protocols/sockets/non_blocking/pytet_socket_non_blocking.py new file mode 100644 index 0000000000..03d9c502c6 --- /dev/null +++ b/examples/protocols/sockets/non_blocking/pytet_socket_non_blocking.py @@ -0,0 +1,11 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import pytest +from pytest_embedded import Dut + + +@pytest.mark.supported_targets +@pytest.mark.generic +def test_examples_non_block_socket_localhost(dut: Dut) -> None: + dut.expect(r'nonblocking-socket-client: Received: GET / HTTP/1.1', timeout=30) diff --git a/examples/protocols/sockets/tcp_client/example_test.py b/examples/protocols/sockets/tcp_client/pytest_tcp_client.py similarity index 53% rename from examples/protocols/sockets/tcp_client/example_test.py rename to examples/protocols/sockets/tcp_client/pytest_tcp_client.py index 1f2cdbbd26..25a4dba2e5 100644 --- a/examples/protocols/sockets/tcp_client/example_test.py +++ b/examples/protocols/sockets/tcp_client/pytest_tcp_client.py @@ -1,28 +1,20 @@ -# This example code is in the Public Domain (or CC0 licensed, at your option.) +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 -# Unless required by applicable law or agreed to in writing, this -# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. - -# -*- coding: utf-8 -*- -import os -import re +import logging import socket -import sys from threading import Event, Thread -import ttfw_idf +import pytest from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip, get_my_interface_by_dest_ip) +from pytest_embedded import Dut -# ----------- Config ---------- PORT = 3333 -# ------------------------------- -class TcpServer: - - def __init__(self, port, family_addr, persist=False): +class TcpServer(object): + def __init__(self, port, family_addr, persist=False): # type: ignore self.port = port self.socket = socket.socket(family_addr, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -31,7 +23,7 @@ class TcpServer: self.persist = persist self.family_addr = family_addr - def __enter__(self): + def __enter__(self): # type: ignore try: self.socket.bind(('', self.port)) except socket.error as e: @@ -44,7 +36,7 @@ class TcpServer: self.server_thread.start() return self - def __exit__(self, exc_type, exc_value, traceback): + def __exit__(self, exc_type, exc_value, traceback) -> None: # type: ignore if self.persist: sock = socket.socket(self.family_addr, socket.SOCK_STREAM) sock.connect(('localhost', self.port)) @@ -55,7 +47,7 @@ class TcpServer: self.server_thread.join() self.socket.close() - def run_server(self): + def run_server(self) -> None: while not self.shutdown.is_set(): try: conn, address = self.socket.accept() # accept new connection @@ -76,54 +68,49 @@ class TcpServer: break -@ttfw_idf.idf_example_test(env_tag='wifi_router') -def test_examples_protocol_socket_tcpclient(env, extra_data): - """ - steps: - 1. join AP - 2. have the board connect to the server - 3. send and receive data - """ - dut1 = env.get_dut('tcp_client', 'examples/protocols/sockets/tcp_client', dut_class=ttfw_idf.ESP32DUT) - # check and log bin size - binary_file = os.path.join(dut1.app.binary_path, 'tcp_client.bin') - bin_size = os.path.getsize(binary_file) - ttfw_idf.log_performance('tcp_client_bin_size', '{}KB'.format(bin_size // 1024)) - - # start test - dut1.start_app() - if dut1.app.get_sdkconfig_config_value('CONFIG_EXAMPLE_WIFI_SSID_PWD_FROM_STDIN'): - dut1.expect('Please input ssid password:') +@pytest.mark.esp32 +@pytest.mark.wifi_router +def test_examples_tcp_client_ipv4(dut: Dut) -> None: + # Parse IP address of STA + logging.info('Waiting to connect with AP') + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: + dut.expect('Please input ssid password:') env_name = 'wifi_router' ap_ssid = get_env_config_variable(env_name, 'ap_ssid') ap_password = get_env_config_variable(env_name, 'ap_password') - dut1.write(f'{ap_ssid} {ap_password}') + dut.write(f'{ap_ssid} {ap_password}') + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + print(f'Connected with IPv4={ipv4}') - ipv4 = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]'), timeout=30)[0] - ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) # expect all 8 octets from IPv6 (assumes it's printed in the long form) - ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0] - print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6)) - - my_interface = get_my_interface_by_dest_ip(ipv4) # test IPv4 with TcpServer(PORT, socket.AF_INET): server_ip = get_host_ip4_by_dest_ip(ipv4) print('Connect tcp client to server IP={}'.format(server_ip)) - dut1.write(server_ip) - dut1.expect(re.compile(r'OK: Message from ESP32')) + dut.write(server_ip) + dut.expect('OK: Message from ESP32') + + +@pytest.mark.esp32 +@pytest.mark.wifi_router +def test_examples_tcp_client_ipv6(dut: Dut) -> None: + # Parse IP address of STA + logging.info('Waiting to connect with AP') + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: + dut.expect('Please input ssid password:') + env_name = 'wifi_router' + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') + ap_password = get_env_config_variable(env_name, 'ap_password') + dut.write(f'{ap_ssid} {ap_password}') + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + # expect all 8 octets from IPv6 (assumes it's printed in the long form) + ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) + ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode() + print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6)) + # test IPv6 + my_interface = get_my_interface_by_dest_ip(ipv4) with TcpServer(PORT, socket.AF_INET6): server_ip = get_host_ip6_by_dest_ip(ipv6, my_interface) print('Connect tcp client to server IP={}'.format(server_ip)) - dut1.write(server_ip) - dut1.expect(re.compile(r'OK: Message from ESP32')) - - -if __name__ == '__main__': - if sys.argv[1:] and sys.argv[1].startswith('IPv'): # if additional arguments provided: - # Usage: example_test.py - family_addr = socket.AF_INET6 if sys.argv[1] == 'IPv6' else socket.AF_INET - with TcpServer(PORT, family_addr, persist=True) as s: - print(input('Press Enter stop the server...')) - else: - test_examples_protocol_socket_tcpclient() + dut.write(server_ip) + dut.expect('OK: Message from ESP32') diff --git a/examples/protocols/sockets/tcp_server/example_test.py b/examples/protocols/sockets/tcp_server/example_test.py deleted file mode 100644 index fe83f705df..0000000000 --- a/examples/protocols/sockets/tcp_server/example_test.py +++ /dev/null @@ -1,96 +0,0 @@ -# This example code is in the Public Domain (or CC0 licensed, at your option.) - -# Unless required by applicable law or agreed to in writing, this -# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. - -# -*- coding: utf-8 -*- - -from __future__ import print_function, unicode_literals - -import os -import re -import socket -import sys - -import ttfw_idf -from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip - -# ----------- Config ---------- -PORT = 3333 -# ------------------------------- - - -def tcp_client(address, payload): - for res in socket.getaddrinfo(address, PORT, socket.AF_UNSPEC, - socket.SOCK_STREAM, 0, socket.AI_PASSIVE): - family_addr, socktype, proto, canonname, addr = res - try: - sock = socket.socket(family_addr, socket.SOCK_STREAM) - sock.settimeout(60.0) - except socket.error as msg: - print('Could not create socket: ' + str(msg[0]) + ': ' + msg[1]) - raise - try: - sock.connect(addr) - except socket.error as msg: - print('Could not open socket: ', msg) - sock.close() - raise - sock.sendall(payload.encode()) - data = sock.recv(1024) - if not data: - return - print('Reply : ' + data.decode()) - sock.close() - return data.decode() - - -@ttfw_idf.idf_example_test(env_tag='wifi_router') -def test_examples_protocol_socket_tcpserver(env, extra_data): - MESSAGE = 'Data to ESP' - """ - steps: - 1. join AP - 2. have the board connect to the server - 3. send and receive data - """ - dut1 = env.get_dut('tcp_client', 'examples/protocols/sockets/tcp_server', dut_class=ttfw_idf.ESP32DUT) - # check and log bin size - binary_file = os.path.join(dut1.app.binary_path, 'tcp_server.bin') - bin_size = os.path.getsize(binary_file) - ttfw_idf.log_performance('tcp_server_bin_size', '{}KB'.format(bin_size // 1024)) - - # start test - dut1.start_app() - if dut1.app.get_sdkconfig_config_value('CONFIG_EXAMPLE_WIFI_SSID_PWD_FROM_STDIN'): - dut1.expect('Please input ssid password:') - env_name = 'wifi_router' - ap_ssid = get_env_config_variable(env_name, 'ap_ssid') - ap_password = get_env_config_variable(env_name, 'ap_password') - dut1.write(f'{ap_ssid} {ap_password}') - - ipv4 = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]'), timeout=30)[0] - ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) # expect all 8 octets from IPv6 (assumes it's printed in the long form) - ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0] - print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6)) - - interface = get_my_interface_by_dest_ip(ipv4) - # test IPv4 - received = tcp_client(ipv4, MESSAGE) - if not received == MESSAGE: - raise - dut1.expect(MESSAGE) - # test IPv6 - received = tcp_client('{}%{}'.format(ipv6, interface), MESSAGE) - if not received == MESSAGE: - raise - dut1.expect(MESSAGE) - - -if __name__ == '__main__': - if sys.argv[2:]: # if two arguments provided: - # Usage: example_test.py - tcp_client(sys.argv[1], sys.argv[2]) - else: # otherwise run standard example test as in the CI - test_examples_protocol_socket_tcpserver() diff --git a/examples/protocols/sockets/tcp_server/pytest_tcp_server.py b/examples/protocols/sockets/tcp_server/pytest_tcp_server.py new file mode 100644 index 0000000000..52d3d6c779 --- /dev/null +++ b/examples/protocols/sockets/tcp_server/pytest_tcp_server.py @@ -0,0 +1,88 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import logging +import os +import socket +import time + +import pytest +from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip +from pytest_embedded import Dut + +PORT = 3333 +MESSAGE = 'Data to ESP' + + +def tcp_client(address: str, payload: str) -> str: + for res in socket.getaddrinfo(address, PORT, socket.AF_UNSPEC, + socket.SOCK_STREAM, 0, socket.AI_PASSIVE): + family_addr, socktype, proto, canonname, addr = res + try: + sock = socket.socket(family_addr, socket.SOCK_STREAM) + sock.settimeout(60.0) + except socket.error as msg: + print('Could not create socket') + print(os.strerror(msg.errno)) + raise + try: + sock.connect(addr) + except socket.error as e: + print('Could not open socket: ' + str(e)) + sock.close() + raise + sock.sendall(payload.encode()) + data = sock.recv(1024) + if not data: + return '' + print('Reply : ' + data.decode()) + sock.close() + return data.decode() + + +@pytest.mark.esp32 +@pytest.mark.wifi_router +def test_examples_tcp_server_ipv4(dut: Dut) -> None: + # Parse IP address of STA + logging.info('Waiting to connect with AP') + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: + dut.expect('Please input ssid password:') + env_name = 'wifi_router' + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') + ap_password = get_env_config_variable(env_name, 'ap_password') + dut.write(f'{ap_ssid} {ap_password}') + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + print(f'Connected with IPv4={ipv4}') + time.sleep(1) + + # test IPv4 + received = tcp_client(ipv4, MESSAGE) + if not received == MESSAGE: + raise + dut.expect(MESSAGE) + + +@pytest.mark.esp32 +@pytest.mark.wifi_router +def test_examples_tcp_server_ipv6(dut: Dut) -> None: + # Parse IP address of STA + logging.info('Waiting to connect with AP') + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: + dut.expect('Please input ssid password:') + env_name = 'wifi_router' + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') + ap_password = get_env_config_variable(env_name, 'ap_password') + dut.write(f'{ap_ssid} {ap_password}') + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + # expect all 8 octets from IPv6 (assumes it's printed in the long form) + ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) + ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode() + print(f'Connected with IPv4={ipv4} and IPv6={ipv6}') + time.sleep(1) + + interface = get_my_interface_by_dest_ip(ipv4) + # test IPv6 + received = tcp_client('{}%{}'.format(ipv6, interface), MESSAGE) + if not received == MESSAGE: + raise + dut.expect(MESSAGE) diff --git a/examples/protocols/sockets/udp_client/example_test.py b/examples/protocols/sockets/udp_client/example_test.py deleted file mode 100644 index 71b2297d77..0000000000 --- a/examples/protocols/sockets/udp_client/example_test.py +++ /dev/null @@ -1,138 +0,0 @@ -# This example code is in the Public Domain (or CC0 licensed, at your option.) - -# Unless required by applicable law or agreed to in writing, this -# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. - -# -*- coding: utf-8 -*- -import os -import re -import socket -import sys -from threading import Event, Thread - -import ttfw_idf -from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip, - get_my_interface_by_dest_ip) -from tiny_test_fw.DUT import ExpectTimeout - -# ----------- Config ---------- -PORT = 3333 -# ------------------------------- - - -class UdpServer: - - def __init__(self, port, family_addr, persist=False): - self.port = port - self.family_addr = family_addr - self.socket = socket.socket(family_addr, socket.SOCK_DGRAM) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.socket.settimeout(60.0) - self.shutdown = Event() - self.persist = persist - - def __enter__(self): - try: - self.socket.bind(('', self.port)) - except socket.error as e: - print('Bind failed:{}'.format(e)) - raise - - print('Starting server on port={} family_addr={}'.format(self.port, self.family_addr)) - self.server_thread = Thread(target=self.run_server) - self.server_thread.start() - return self - - def __exit__(self, exc_type, exc_value, traceback): - if self.persist: - sock = socket.socket(self.family_addr, socket.SOCK_DGRAM) - sock.sendto(b'Stop', ('localhost', self.port)) - sock.close() - self.shutdown.set() - self.server_thread.join() - self.socket.close() - - def run_server(self): - while not self.shutdown.is_set(): - try: - data, addr = self.socket.recvfrom(1024) - print(addr) - if not data: - return - data = data.decode() - print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + data) - reply = 'OK: ' + data - self.socket.sendto(reply.encode(), addr) - except socket.error as e: - print('Running server failed:{}'.format(e)) - raise - if not self.persist: - break - - -@ttfw_idf.idf_example_test(env_tag='wifi_router') -def test_examples_protocol_socket_udpclient(env, extra_data): - """ - steps: - 1. join AP - 2. have the board connect to the server - 3. send and receive data - """ - dut1 = env.get_dut('udp_client', 'examples/protocols/sockets/udp_client', dut_class=ttfw_idf.ESP32DUT) - # check and log bin size - binary_file = os.path.join(dut1.app.binary_path, 'udp_client.bin') - bin_size = os.path.getsize(binary_file) - ttfw_idf.log_performance('udp_client_bin_size', '{}KB'.format(bin_size // 1024)) - - # start test - dut1.start_app() - if dut1.app.get_sdkconfig_config_value('CONFIG_EXAMPLE_WIFI_SSID_PWD_FROM_STDIN'): - dut1.expect('Please input ssid password:') - env_name = 'wifi_router' - ap_ssid = get_env_config_variable(env_name, 'ap_ssid') - ap_password = get_env_config_variable(env_name, 'ap_password') - dut1.write(f'{ap_ssid} {ap_password}') - - ipv4 = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]'), timeout=30)[0] - ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) # expect all 8 octets from IPv6 (assumes it's printed in the long form) - ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0] - print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6)) - - my_interface = get_my_interface_by_dest_ip(ipv4) - # test IPv4 - with UdpServer(PORT, socket.AF_INET): - server_ip = get_host_ip4_by_dest_ip(ipv4) - print('Connect udp client to server IP={}'.format(server_ip)) - for _ in range(3): - try: - dut1.write(server_ip) - dut1.expect(re.compile(r'OK: Message from ESP32')) - break - except ExpectTimeout: - pass - else: - raise ValueError('Failed to send/recv udp packets.') - # test IPv6 - with UdpServer(PORT, socket.AF_INET6): - server_ip = get_host_ip6_by_dest_ip(ipv6, my_interface) - print('Connect udp client to server IP={}'.format(server_ip)) - for _ in range(3): - try: - dut1.write(server_ip) - dut1.expect(re.compile(r'OK: Message from ESP32')) - break - except ExpectTimeout: - pass - else: - raise ValueError('Failed to send/recv udp packets.') - - -if __name__ == '__main__': - if sys.argv[1:] and sys.argv[1].startswith('IPv'): # if additional arguments provided: - # Usage: example_test.py - family_addr = socket.AF_INET6 if sys.argv[1] == 'IPv6' else socket.AF_INET - with UdpServer(PORT, family_addr, persist=True) as s: - print(input('Press Enter stop the server...')) - else: - test_examples_protocol_socket_udpclient() diff --git a/examples/protocols/sockets/udp_client/pytest_udp_client.py b/examples/protocols/sockets/udp_client/pytest_udp_client.py new file mode 100644 index 0000000000..d0577390e0 --- /dev/null +++ b/examples/protocols/sockets/udp_client/pytest_udp_client.py @@ -0,0 +1,127 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import logging +import socket +from threading import Event, Thread + +import pytest +from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip, + get_my_interface_by_dest_ip) +from pexpect.exceptions import TIMEOUT +from pytest_embedded import Dut + +PORT = 3333 +MAX_RETRIES = 3 + + +class UdpServer: + + def __init__(self, port, family_addr, persist=False): # type: ignore + self.port = port + self.family_addr = family_addr + self.socket = socket.socket(family_addr, socket.SOCK_DGRAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.settimeout(60.0) + self.shutdown = Event() + self.persist = persist + + def __enter__(self): # type: ignore + try: + self.socket.bind(('', self.port)) + except socket.error as e: + print('Bind failed:{}'.format(e)) + raise + + print('Starting server on port={} family_addr={}'.format(self.port, self.family_addr)) + self.server_thread = Thread(target=self.run_server) + self.server_thread.start() + return self + + def __exit__(self, exc_type, exc_value, traceback): # type: ignore + if self.persist: + sock = socket.socket(self.family_addr, socket.SOCK_DGRAM) + sock.sendto(b'Stop', ('localhost', self.port)) + sock.close() + self.shutdown.set() + self.server_thread.join() + self.socket.close() + + def run_server(self) -> None: + while not self.shutdown.is_set(): + try: + data, addr = self.socket.recvfrom(1024) + print(addr) + if not data: + return + data = data.decode() + print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + data) + reply = 'OK: ' + data + self.socket.sendto(reply.encode(), addr) + except socket.error as e: + print('Running server failed:{}'.format(e)) + raise + if not self.persist: + break + + +@pytest.mark.esp32 +@pytest.mark.wifi_router +def test_examples_udp_client_ipv4(dut: Dut) -> None: + # Parse IP address of STA + logging.info('Waiting to connect with AP') + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: + dut.expect('Please input ssid password:') + env_name = 'wifi_router' + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') + ap_password = get_env_config_variable(env_name, 'ap_password') + dut.write(f'{ap_ssid} {ap_password}') + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + print(f'Connected with IPv4={ipv4}') + + # test IPv4 + with UdpServer(PORT, socket.AF_INET): + server_ip = get_host_ip4_by_dest_ip(ipv4) + print('Connect udp client to server IP={}'.format(server_ip)) + for _ in range(MAX_RETRIES): + try: + dut.write(server_ip) + dut.expect('OK: Message from ESP32') + break + except TIMEOUT: + pass + else: + raise ValueError('Failed to send/recv udp packets.') + + +@pytest.mark.esp32 +@pytest.mark.wifi_router +def test_examples_udp_client_ipv6(dut: Dut) -> None: + # Parse IP address of STA + logging.info('Waiting to connect with AP') + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: + dut.expect('Please input ssid password:') + env_name = 'wifi_router' + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') + ap_password = get_env_config_variable(env_name, 'ap_password') + dut.write(f'{ap_ssid} {ap_password}') + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + # expect all 8 octets from IPv6 (assumes it's printed in the long form) + ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) + ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode() + print(f'Connected with IPv4={ipv4} and IPv6={ipv6}') + + interface = get_my_interface_by_dest_ip(ipv4) + # test IPv6 + with UdpServer(PORT, socket.AF_INET6): + server_ip = get_host_ip6_by_dest_ip(ipv6, interface) + print('Connect udp client to server IP={}'.format(server_ip)) + for _ in range(MAX_RETRIES): + try: + dut.write(server_ip) + dut.expect('OK: Message from ESP32') + break + except TIMEOUT: + pass + else: + raise ValueError('Failed to send/recv udpv6 packets.') diff --git a/examples/protocols/sockets/udp_server/example_test.py b/examples/protocols/sockets/udp_server/example_test.py deleted file mode 100644 index 9207138a89..0000000000 --- a/examples/protocols/sockets/udp_server/example_test.py +++ /dev/null @@ -1,112 +0,0 @@ -# This example code is in the Public Domain (or CC0 licensed, at your option.) - -# Unless required by applicable law or agreed to in writing, this -# software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. - -# -*- coding: utf-8 -*- - -from __future__ import print_function, unicode_literals - -import os -import re -import socket -import sys - -import ttfw_idf -from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip - -# ----------- Config ---------- -PORT = 3333 -# ------------------------------- - - -def udp_client(address, payload): - for res in socket.getaddrinfo(address, PORT, socket.AF_UNSPEC, - socket.SOCK_DGRAM, 0, socket.AI_PASSIVE): - family_addr, socktype, proto, canonname, addr = res - try: - sock = socket.socket(family_addr, socket.SOCK_DGRAM) - sock.settimeout(20.0) - except socket.error as msg: - print('Could not create socket') - print(os.strerror(msg.errno)) - raise - try: - sock.sendto(payload.encode(), addr) - reply, addr = sock.recvfrom(128) - if not reply: - return - print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + str(reply)) - except socket.timeout: - print('Socket operation timeout') - return str(None) - except socket.error as msg: - print('Error while sending or receiving data from the socket') - print(os.strerror(msg.errno)) - sock.close() - raise - return reply.decode() - - -@ttfw_idf.idf_example_test(env_tag='wifi_router') -def test_examples_protocol_socket_udpserver(env, extra_data): - MESSAGE = 'Data to ESP' - MAX_RETRIES = 3 - """ - steps: - 1. join AP - 2. have the board connect to the server - 3. send and receive data - """ - dut1 = env.get_dut('udp_server', 'examples/protocols/sockets/udp_server', dut_class=ttfw_idf.ESP32DUT) - # check and log bin size - binary_file = os.path.join(dut1.app.binary_path, 'udp_server.bin') - bin_size = os.path.getsize(binary_file) - ttfw_idf.log_performance('udp_server_bin_size', '{}KB'.format(bin_size // 1024)) - - # start test - dut1.start_app() - if dut1.app.get_sdkconfig_config_value('CONFIG_EXAMPLE_WIFI_SSID_PWD_FROM_STDIN'): - dut1.expect('Please input ssid password:') - env_name = 'wifi_router' - ap_ssid = get_env_config_variable(env_name, 'ap_ssid') - ap_password = get_env_config_variable(env_name, 'ap_password') - dut1.write(f'{ap_ssid} {ap_password}') - - ipv4 = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]'), timeout=30)[0] - ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) # expect all 8 octets from IPv6 (assumes it's printed in the long form) - ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0] - print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6)) - dut1.expect(re.compile(r'Waiting for data'), timeout=10) - - interface = get_my_interface_by_dest_ip(ipv4) - # test IPv4 - for _ in range(MAX_RETRIES): - print('Testing UDP on IPv4...') - received = udp_client(ipv4, MESSAGE) - if received == MESSAGE: - print('OK') - break - else: - raise ValueError('IPv4: Did not receive UDP message after {} retries'.format(MAX_RETRIES)) - dut1.expect(MESSAGE) - - # test IPv6 - for _ in range(MAX_RETRIES): - print('Testing UDP on IPv6...') - received = udp_client('{}%{}'.format(ipv6, interface), MESSAGE) - if received == MESSAGE: - print('OK') - break - else: - raise ValueError('IPv6: Did not receive UDP message after {} retries'.format(MAX_RETRIES)) - dut1.expect(MESSAGE) - - -if __name__ == '__main__': - if sys.argv[2:]: # if two arguments provided: - # Usage: example_test.py - udp_client(sys.argv[1], sys.argv[2]) - else: # otherwise run standard example test as in the CI - test_examples_protocol_socket_udpserver() diff --git a/examples/protocols/sockets/udp_server/pytest_udp_server.py b/examples/protocols/sockets/udp_server/pytest_udp_server.py new file mode 100644 index 0000000000..da9533e766 --- /dev/null +++ b/examples/protocols/sockets/udp_server/pytest_udp_server.py @@ -0,0 +1,98 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import logging +import os +import socket + +import pytest +from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip +from pytest_embedded import Dut + +PORT = 3333 +MESSAGE = 'Data to ESP' +MAX_RETRIES = 3 + + +def udp_client(address: str, payload: str) -> str: + for res in socket.getaddrinfo(address, PORT, socket.AF_UNSPEC, + socket.SOCK_DGRAM, 0, socket.AI_PASSIVE): + family_addr, socktype, proto, canonname, addr = res + try: + sock = socket.socket(family_addr, socket.SOCK_DGRAM) + sock.settimeout(20.0) + except socket.error as msg: + print('Could not create socket') + print(os.strerror(msg.errno)) + raise + try: + sock.sendto(payload.encode(), addr) + reply, addr = sock.recvfrom(128) + if not reply: + return '' + print('Reply[' + addr[0] + ':' + str(addr[1]) + '] - ' + str(reply)) + except socket.timeout: + print('Socket operation timeout') + return '' + except socket.error as msg: + print('Error while sending or receiving data from the socket') + print(os.strerror(msg.errno)) + sock.close() + raise + return reply.decode() + + +@pytest.mark.esp32 +@pytest.mark.wifi_router +def test_examples_udp_server_ipv4(dut: Dut) -> None: + # Parse IP address of STA + logging.info('Waiting to connect with AP') + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: + dut.expect('Please input ssid password:') + env_name = 'wifi_router' + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') + ap_password = get_env_config_variable(env_name, 'ap_password') + dut.write(f'{ap_ssid} {ap_password}') + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + print(f'Connected with IPv4={ipv4}') + + # test IPv4 + for _ in range(MAX_RETRIES): + print('Testing UDP on IPv4...') + received = udp_client(ipv4, MESSAGE) + if received == MESSAGE: + print('OK') + break + else: + raise ValueError('IPv4: Did not receive UDP message after {} retries'.format(MAX_RETRIES)) + dut.expect(MESSAGE) + + +@pytest.mark.esp32 +@pytest.mark.wifi_router +def test_examples_udp_server_ipv6(dut: Dut) -> None: + # Parse IP address of STA + logging.info('Waiting to connect with AP') + if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True: + dut.expect('Please input ssid password:') + env_name = 'wifi_router' + ap_ssid = get_env_config_variable(env_name, 'ap_ssid') + ap_password = get_env_config_variable(env_name, 'ap_password') + dut.write(f'{ap_ssid} {ap_password}') + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + # expect all 8 octets from IPv6 (assumes it's printed in the long form) + ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) + ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode() + print(f'Connected with IPv4={ipv4} and IPv6={ipv6}') + + interface = get_my_interface_by_dest_ip(ipv4) + # test IPv6 + for _ in range(MAX_RETRIES): + print('Testing UDP on IPv6...') + received = udp_client('{}%{}'.format(ipv6, interface), MESSAGE) + if received == MESSAGE: + print('OK') + break + else: + raise ValueError('IPv6: Did not receive UDP message after {} retries'.format(MAX_RETRIES)) + dut.expect(MESSAGE)