CI: fix ipv6 test failed on some runners

This commit is contained in:
Chen Yudong 2022-12-14 15:58:49 +08:00
parent 627d03182a
commit df92048180
3 changed files with 37 additions and 13 deletions

View File

@ -11,9 +11,9 @@ import socket
import sys
from threading import Event, Thread
import netifaces
import ttfw_idf
from common_test_methods import get_env_config_variable, get_host_ip_by_interface, get_my_interface_by_dest_ip
from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip,
get_my_interface_by_dest_ip)
# ----------- Config ----------
PORT = 3333
@ -107,13 +107,13 @@ def test_examples_protocol_socket_tcpclient(env, extra_data):
my_interface = get_my_interface_by_dest_ip(ipv4)
# test IPv4
with TcpServer(PORT, socket.AF_INET):
server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET)
server_ip = get_host_ip4_by_dest_ip(ipv4)
print('Connect tcp client to server IP={}'.format(server_ip))
dut1.write(server_ip)
dut1.expect(re.compile(r'OK: Message from ESP32'))
# test IPv6
with TcpServer(PORT, socket.AF_INET6):
server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET6)
server_ip = get_host_ip6_by_dest_ip(ipv6, my_interface)
print('Connect tcp client to server IP={}'.format(server_ip))
dut1.write(server_ip)
dut1.expect(re.compile(r'OK: Message from ESP32'))

View File

@ -11,9 +11,9 @@ import socket
import sys
from threading import Event, Thread
import netifaces
import ttfw_idf
from common_test_methods import get_env_config_variable, get_host_ip_by_interface, get_my_interface_by_dest_ip
from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip,
get_my_interface_by_dest_ip)
from tiny_test_fw.DUT import ExpectTimeout
# ----------- Config ----------
@ -102,7 +102,7 @@ def test_examples_protocol_socket_udpclient(env, extra_data):
my_interface = get_my_interface_by_dest_ip(ipv4)
# test IPv4
with UdpServer(PORT, socket.AF_INET):
server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET)
server_ip = get_host_ip4_by_dest_ip(ipv4)
print('Connect udp client to server IP={}'.format(server_ip))
for _ in range(3):
try:
@ -115,7 +115,7 @@ def test_examples_protocol_socket_udpclient(env, extra_data):
raise ValueError('Failed to send/recv udp packets.')
# test IPv6
with UdpServer(PORT, socket.AF_INET6):
server_ip = get_host_ip_by_interface(my_interface, netifaces.AF_INET6)
server_ip = get_host_ip6_by_dest_ip(ipv6, my_interface)
print('Connect udp client to server IP={}'.format(server_ip))
for _ in range(3):
try:

View File

@ -4,7 +4,7 @@
import logging
import os
import socket
from typing import Any
from typing import Any, List
import netifaces
import yaml
@ -26,10 +26,23 @@ $IDF_PATH/EnvConfig.yml:
def get_host_ip_by_interface(interface_name: str, ip_type: int = netifaces.AF_INET) -> str:
for _addr in netifaces.ifaddresses(interface_name)[ip_type]:
host_ip = _addr['addr'].replace('%{}'.format(interface_name), '')
assert isinstance(host_ip, str)
return host_ip
if ip_type == netifaces.AF_INET:
for _addr in netifaces.ifaddresses(interface_name)[ip_type]:
host_ip = _addr['addr'].replace('%{}'.format(interface_name), '')
assert isinstance(host_ip, str)
return host_ip
elif ip_type == netifaces.AF_INET6:
ip6_addrs: List[str] = []
for _addr in netifaces.ifaddresses(interface_name)[ip_type]:
host_ip = _addr['addr'].replace('%{}'.format(interface_name), '')
assert isinstance(host_ip, str)
# prefer to use link local address due to example settings
if host_ip.startswith('FE80::'):
ip6_addrs.insert(0, host_ip)
else:
ip6_addrs.append(host_ip)
if ip6_addrs:
return ip6_addrs[0]
return ''
@ -45,6 +58,17 @@ def get_host_ip4_by_dest_ip(dest_ip: str = '') -> str:
return host_ip
def get_host_ip6_by_dest_ip(dest_ip: str, interface: str) -> str:
addr_info = socket.getaddrinfo(f'{dest_ip}%{interface}', 80, socket.AF_INET6, socket.SOCK_DGRAM)
s1 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
s1.connect(addr_info[0][-1])
host_ip = s1.getsockname()[0]
s1.close()
assert isinstance(host_ip, str)
print(f'Using host ip: {host_ip}')
return host_ip
def get_my_interface_by_dest_ip(dest_ip: str = '') -> str:
my_ip = get_host_ip4_by_dest_ip(dest_ip)
interfaces = netifaces.interfaces()