diff --git a/examples/protocols/sockets/tcp_client/example_test.py b/examples/protocols/sockets/tcp_client/example_test.py index 6ab409fda5..1f2cdbbd26 100644 --- a/examples/protocols/sockets/tcp_client/example_test.py +++ b/examples/protocols/sockets/tcp_client/example_test.py @@ -11,9 +11,9 @@ import socket import sys from threading import Event, Thread -import netifaces import ttfw_idf -from common_test_methods import get_env_config_variable, get_host_ip_by_interface, get_my_interface_by_dest_ip +from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip, + get_my_interface_by_dest_ip) # ----------- Config ---------- PORT = 3333 @@ -107,13 +107,13 @@ def test_examples_protocol_socket_tcpclient(env, extra_data): my_interface = get_my_interface_by_dest_ip(ipv4) # test IPv4 with TcpServer(PORT, socket.AF_INET): - server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET) + server_ip = get_host_ip4_by_dest_ip(ipv4) print('Connect tcp client to server IP={}'.format(server_ip)) dut1.write(server_ip) dut1.expect(re.compile(r'OK: Message from ESP32')) # test IPv6 with TcpServer(PORT, socket.AF_INET6): - server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET6) + server_ip = get_host_ip6_by_dest_ip(ipv6, my_interface) print('Connect tcp client to server IP={}'.format(server_ip)) dut1.write(server_ip) dut1.expect(re.compile(r'OK: Message from ESP32')) diff --git a/examples/protocols/sockets/udp_client/example_test.py b/examples/protocols/sockets/udp_client/example_test.py index 5b4593fc15..71b2297d77 100644 --- a/examples/protocols/sockets/udp_client/example_test.py +++ b/examples/protocols/sockets/udp_client/example_test.py @@ -11,9 +11,9 @@ import socket import sys from threading import Event, Thread -import netifaces import ttfw_idf -from common_test_methods import get_env_config_variable, get_host_ip_by_interface, get_my_interface_by_dest_ip +from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip, + get_my_interface_by_dest_ip) from tiny_test_fw.DUT import ExpectTimeout # ----------- Config ---------- @@ -102,7 +102,7 @@ def test_examples_protocol_socket_udpclient(env, extra_data): my_interface = get_my_interface_by_dest_ip(ipv4) # test IPv4 with UdpServer(PORT, socket.AF_INET): - server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET) + server_ip = get_host_ip4_by_dest_ip(ipv4) print('Connect udp client to server IP={}'.format(server_ip)) for _ in range(3): try: @@ -115,7 +115,7 @@ def test_examples_protocol_socket_udpclient(env, extra_data): raise ValueError('Failed to send/recv udp packets.') # test IPv6 with UdpServer(PORT, socket.AF_INET6): - server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET6) + server_ip = get_host_ip6_by_dest_ip(ipv6, my_interface) print('Connect udp client to server IP={}'.format(server_ip)) for _ in range(3): try: diff --git a/tools/ci/python_packages/common_test_methods.py b/tools/ci/python_packages/common_test_methods.py index 85ace23373..80392fbc27 100644 --- a/tools/ci/python_packages/common_test_methods.py +++ b/tools/ci/python_packages/common_test_methods.py @@ -4,7 +4,7 @@ import logging import os import socket -from typing import Any +from typing import Any, List import netifaces import yaml @@ -26,10 +26,23 @@ $IDF_PATH/EnvConfig.yml: def get_host_ip_by_interface(interface_name: str, ip_type: int = netifaces.AF_INET) -> str: - for _addr in netifaces.ifaddresses(interface_name)[ip_type]: - host_ip = _addr['addr'].replace('%{}'.format(interface_name), '') - assert isinstance(host_ip, str) - return host_ip + if ip_type == netifaces.AF_INET: + for _addr in netifaces.ifaddresses(interface_name)[ip_type]: + host_ip = _addr['addr'].replace('%{}'.format(interface_name), '') + assert isinstance(host_ip, str) + return host_ip + elif ip_type == netifaces.AF_INET6: + ip6_addrs: List[str] = [] + for _addr in netifaces.ifaddresses(interface_name)[ip_type]: + host_ip = _addr['addr'].replace('%{}'.format(interface_name), '') + assert isinstance(host_ip, str) + # prefer to use link local address due to example settings + if host_ip.startswith('FE80::'): + ip6_addrs.insert(0, host_ip) + else: + ip6_addrs.append(host_ip) + if ip6_addrs: + return ip6_addrs[0] return '' @@ -45,6 +58,17 @@ def get_host_ip4_by_dest_ip(dest_ip: str = '') -> str: return host_ip +def get_host_ip6_by_dest_ip(dest_ip: str, interface: str) -> str: + addr_info = socket.getaddrinfo(f'{dest_ip}%{interface}', 80, socket.AF_INET6, socket.SOCK_DGRAM) + s1 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) + s1.connect(addr_info[0][-1]) + host_ip = s1.getsockname()[0] + s1.close() + assert isinstance(host_ip, str) + print(f'Using host ip: {host_ip}') + return host_ip + + def get_my_interface_by_dest_ip(dest_ip: str = '') -> str: my_ip = get_host_ip4_by_dest_ip(dest_ip) interfaces = netifaces.interfaces()