ci: Added napt pytest with vlan example

CI: add vlan example to ethernet patterns
Included a Pytest for the vlan_support example, which focuses on testing
the NAPT module in lwip by forwarding packets between different VLAN interfaces.
This commit is contained in:
Abhik Roy 2023-03-15 18:15:45 +11:00
parent 02f5b9a898
commit ef50beb4d0
9 changed files with 469 additions and 11 deletions

View File

@ -235,6 +235,7 @@
- "examples/protocols/**/*"
- "examples/system/ota/**/*"
- "examples/ethernet/iperf/**/*"
- "examples/network/vlan_support/**/*"
- "components/esp_eth/**/*"
- "components/esp_netif/esp_netif_handlers.c"

View File

@ -419,6 +419,14 @@ pytest_examples_esp32_ethernet_router:
- build_pytest_examples_esp32
tags: [ esp32, ethernet_router ]
pytest_examples_esp32_ethernet_vlan:
extends:
- .pytest_examples_dir_template
- .rules:test:example_test-esp32-ethernet
needs:
- build_pytest_examples_esp32
tags: [ esp32, ethernet_vlan ]
pytest_examples_esp32_ethernet_ip101:
extends:
- .pytest_examples_dir_template

View File

@ -100,6 +100,7 @@ ENV_MARKERS = {
'ethernet': 'ethernet runner',
'ethernet_flash_8m': 'ethernet runner with 8mb flash',
'ethernet_router': 'both the runner and dut connect to the same router through ethernet NIC',
'ethernet_vlan': 'ethernet runner GARM-32-SH-1-R16S5N3',
'wifi_ap': 'a wifi AP in the environment',
'wifi_router': 'both the runner and dut connect to the same wifi router',
'wifi_high_traffic': 'wifi high traffic runners',

View File

@ -20,3 +20,7 @@ examples/network/simple_sniffer:
examples/network/sta2eth:
disable:
- if: SOC_WIFI_SUPPORTED != 1
examples/network/vlan_support:
disable_test:
- if: IDF_TARGET not in ["esp32"]
reason: Runner uses esp32 ethernet kit

View File

@ -57,4 +57,48 @@ menu "Example Configuration"
endif #EXAMPLE_EXTRA_VLAN_INTERFACE
choice EXAMPLE_VLAN_DEFAULT_IF
prompt "Choose the default interface"
default EXAMPLE_ETHERNET_DEF_IF
help
Select the interface to be set as default
config EXAMPLE_ETHERNET_DEF_IF
bool
prompt "Internal Ethernet Interface"
config EXAMPLE_VLAN_DEF_IF
bool
prompt "VLAN Interface"
config EXAMPLE_EXTRA_VLAN_DEF_IF
bool
prompt "Additional VLAN Interface"
depends on EXAMPLE_EXTRA_VLAN_INTERFACE
endchoice
choice EXAMPLE_VLAN_ENABLE_NAPT_IF
prompt "Select the network interface to enable NAPT on"
default EXAMPLE_VLAN_NAPT_IF
help
Choose the interface on which NAPT will be enabled
config EXAMPLE_ETHERNET_NAPT_IF
bool
prompt "Internal Ethernet Interface"
config EXAMPLE_VLAN_NAPT_IF
bool
prompt "VLAN Interface"
config EXAMPLE_EXTRA_VLAN_NAPT_IF
bool
prompt "Additional VLAN Interface"
depends on EXAMPLE_EXTRA_VLAN_INTERFACE
endchoice
config EXAMPLE_VLAN_PYTEST_PC_IFACE
string
default "eth1"
help
The example will set this IPV4 address to this interface.
This configuration is used in pytest only.
This is a hidden configuration, i.e it will not show up im menuconfig.
endmenu

View File

@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
* SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Unlicense OR CC0-1.0
*/
@ -7,6 +7,9 @@
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include "freertos/FreeRTOS.h"
#include "freertos/event_groups.h"
#include "esp_event.h"
#include "esp_netif.h"
#include "esp_eth.h"
#include "ethernet_init.h"
@ -18,9 +21,20 @@
#include "lwip/prot/ieee.h"
#include "esp_netif_net_stack.h"
static const char *TAG = "eth_vlan_example";
/* FreeRTOS event group to signal when we are connected*/
static EventGroupHandle_t s_vlan_event_group;
/* Ehernet Link upevent */
#define VLAN_IF_UP_BIT BIT0
#if CONFIG_EXAMPLE_EXTRA_VLAN_INTERFACE
#define EXTRA_VLAN_IF_UP_BIT BIT1
#define ALL_VLAN_IF_UP_BITS VLAN_IF_UP_BIT | EXTRA_VLAN_IF_UP_BIT
#else
#define ALL_VLAN_IF_UP_BITS VLAN_IF_UP_BIT
#endif
/**
* @brief Event handler for Ethernet events
*
@ -77,20 +91,42 @@ void got_ip_event_handler(void *arg, esp_event_base_t event_base,
ip_event_got_ip_t *event = (ip_event_got_ip_t *) event_data;
const esp_netif_ip_info_t *ip_info = &event->ip_info;
u8_t hwaddr[NETIF_MAX_HWADDR_LEN];
u16_t netif_vlan_id;
esp_netif_get_mac(event->esp_netif, hwaddr);
ESP_LOGI(TAG, "~~~~~~~~~~~");
ESP_LOGI(TAG, "Ethernet interface(%"X8_F":%"X8_F":%"X8_F":%"X8_F":%"X8_F":%"X8_F"): %s, Got IP Address",
hwaddr[0], hwaddr[1], hwaddr[2], hwaddr[3], hwaddr[4], hwaddr[5], esp_netif_get_ifkey(event->esp_netif));
/* Print Intervace VLAN Id */
struct netif *lwip_netif = esp_netif_get_netif_impl(event->esp_netif);
netif_vlan_id = *((uint16_t *)netif_get_client_data(lwip_netif, LWIP_NETIF_CLIENT_DATA_INDEX_MAX + 1));
if (0xFFF != netif_vlan_id) {
ESP_LOGI(TAG, "NETIF VLAN: %d", netif_vlan_id);
}
ESP_LOGI(TAG, "NETIF IP: " IPSTR, IP2STR(&ip_info->ip));
ESP_LOGI(TAG, "NETIF MASK: " IPSTR, IP2STR(&ip_info->netmask));
ESP_LOGI(TAG, "NETIF GW: " IPSTR, IP2STR(&ip_info->gw));
ESP_LOGI(TAG, "~~~~~~~~~~~");
ESP_LOGI(TAG, "ETHIP:" IPSTR, IP2STR(&ip_info->ip));
ESP_LOGI(TAG, "ETHMASK:" IPSTR, IP2STR(&ip_info->netmask));
ESP_LOGI(TAG, "ETHGW:" IPSTR, IP2STR(&ip_info->gw));
ESP_LOGI(TAG, "~~~~~~~~~~~");
/* Set event VLAN interfaces are up */
if (netif_vlan_id == CONFIG_EXAMPLE_ETHERNET_VLAN_ID) {
xEventGroupSetBits(s_vlan_event_group, VLAN_IF_UP_BIT);
}
#if CONFIG_EXAMPLE_EXTRA_VLAN_INTERFACE
else if (netif_vlan_id == CONFIG_EXAMPLE_EXTRA_ETHERNET_VLAN_ID) {
xEventGroupSetBits(s_vlan_event_group, EXTRA_VLAN_IF_UP_BIT);
}
#endif
}
void app_main(void)
{
s_vlan_event_group = xEventGroupCreate();
static esp_vlan_netifs vlan_netif_list;
// Initialize Ethernet driver
@ -131,6 +167,11 @@ void app_main(void)
vlan_id[vlan_netif_list.netif_count] = 0xFFF;
struct netif *lwip_netif = esp_netif_get_netif_impl(vlan_netif_list.esp_netif[vlan_netif_list.netif_count]);
netif_set_client_data(lwip_netif, LWIP_NETIF_CLIENT_DATA_INDEX_MAX + 1, (void *)&vlan_id[vlan_netif_list.netif_count]);
#if CONFIG_EXAMPLE_ETHERNET_DEF_IF
/* Set as the default interface */
esp_netif_set_default_netif(vlan_netif_list.esp_netif[vlan_netif_list.netif_count]);
#endif
vlan_netif_list.netif_count++;
@ -155,11 +196,15 @@ void app_main(void)
esp_netif_ip_info_t info_t;
memset(&info_t, 0, sizeof(esp_netif_ip_info_t));
inet_aton(CONFIG_EXAMPLE_VLAN_STATIC_IPV4_ADDR, &info_t.ip.addr);
inet_aton(CONFIG_EXAMPLE_VLAN_STATIC_ADDR_MASK, &info_t.gw.addr);
inet_aton(CONFIG_EXAMPLE_VLAN_STATIC_ADDR_DEF_GW, &info_t.netmask.addr);
inet_aton(CONFIG_EXAMPLE_VLAN_STATIC_ADDR_DEF_GW, &info_t.gw.addr);
inet_aton(CONFIG_EXAMPLE_VLAN_STATIC_ADDR_MASK, &info_t.netmask.addr);
esp_netif_set_ip_info(vlan_netif_list.esp_netif[vlan_netif_list.netif_count], &info_t);
vlan_netif_list.netif_count++;
#if CONFIG_EXAMPLE_VLAN_DEF_IF
/* Set as the default interface */
esp_netif_set_default_netif(vlan_netif_list.esp_netif[vlan_netif_list.netif_count]);
#endif
vlan_netif_list.netif_count++;
#if defined(CONFIG_EXAMPLE_EXTRA_VLAN_INTERFACE)
@ -182,9 +227,14 @@ void app_main(void)
memset(&info_t, 0, sizeof(esp_netif_ip_info_t));
inet_aton(CONFIG_EXAMPLE_EXTRA_VLAN_STATIC_IPV4_ADDR, &info_t.ip.addr);
inet_aton(CONFIG_EXAMPLE_EXTRA_VLAN_STATIC_ADDR_MASK, &info_t.gw.addr);
inet_aton(CONFIG_EXAMPLE_EXTRA_VLAN_STATIC_ADDR_DEF_GW, &info_t.netmask.addr);
inet_aton(CONFIG_EXAMPLE_EXTRA_VLAN_STATIC_ADDR_DEF_GW, &info_t.gw.addr);
inet_aton(CONFIG_EXAMPLE_EXTRA_VLAN_STATIC_ADDR_MASK, &info_t.netmask.addr);
esp_netif_set_ip_info(vlan_netif_list.esp_netif[vlan_netif_list.netif_count], &info_t);
#if CONFIG_EXAMPLE_EXTRA_VLAN_DEF_IF
/* Set as the default interface */
esp_netif_set_default_netif(vlan_netif_list.esp_netif[vlan_netif_list.netif_count]);
#endif
vlan_netif_list.netif_count++;
#endif //CONFIG_EXAMPLE_EXTRA_VLAN_INTERFACE
@ -193,4 +243,24 @@ void app_main(void)
// start Ethernet driver state machine
ESP_ERROR_CHECK(esp_eth_start(eth_handle[0]));
/* Wait until all the VLAN interfaces are up */
xEventGroupWaitBits(s_vlan_event_group,
ALL_VLAN_IF_UP_BITS,
pdFALSE,
pdTRUE,
portMAX_DELAY);
#if IP_NAPT
/* Enable NAPT on the configured interface */
#if CONFIG_EXAMPLE_ETHERNET_NAPT_IF
if (esp_netif_napt_enable(vlan_netif_list.esp_netif[0]) != ESP_OK) {
#elif CONFIG_EXAMPLE_VLAN_NAPT_IF
if (esp_netif_napt_enable(vlan_netif_list.esp_netif[1]) != ESP_OK) {
#elif CONFIG_EXAMPLE_EXTRA_VLAN_NAPT_IF
if (esp_netif_napt_enable(vlan_netif_list.esp_netif[2]) != ESP_OK) {
#endif
ESP_LOGE(TAG, "Failed to enable NAPT on selected netif");
}
#endif // #if IP_NAPT
}

View File

@ -0,0 +1,322 @@
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: CC0-1.0
import ipaddress
import subprocess
import threading
import time
from typing import Dict, Union
import pytest
from pytest_embedded import Dut
from scapy import layers
from scapy.all import ICMP, IP, TCP, UDP, AsyncSniffer
udp_port = 1234
tcp_port = 4321
def run_cmd(command: str, secure: bool=False) -> str:
if secure is False:
print(f'Running: {command}')
cmd = command.strip().split(' ')
# Run the command
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Send su password if secured
if secure is True:
proc.communicate(input=b'Esp@32')
# Get the output
stdout, stderr = proc.communicate()
# Print the output
if secure is False:
if len(stdout.decode('utf-8')) > 0:
print('Output: ', stdout.decode('utf-8'))
if len(stderr.decode('utf-8')) > 0:
print('Error: ', stderr.decode('utf-8'))
return stdout.decode('utf-8')
def run_cmd_sec(command: str) -> str:
print(f'Running secured: {command}')
return run_cmd(f'sudo -S -k {command}', secure=True)
def clear_network(config: dict) -> None:
# delete route
for each_cmd in config['delete_route_cmd_l']:
run_cmd_sec(each_cmd)
# destroy Vlan interfaces
for each_cmd in config['vlan_destroy_cmd_l']:
run_cmd_sec(each_cmd)
def setup_network(config: dict) -> None:
# Clear network before setting it up
clear_network(config)
# Create Vlan interfaces
for each_cmd in config['vlan_create_cmd_l']:
run_cmd_sec(each_cmd)
# set route
for each_cmd in config['set_route_cmd_l']:
run_cmd_sec(each_cmd)
def create_config(dut: Dut) -> dict:
pc_iface = dut.app.sdkconfig.get('EXAMPLE_VLAN_PYTEST_PC_IFACE')
vlanClient_conf = {'id': str(dut.app.sdkconfig.get('EXAMPLE_ETHERNET_VLAN_ID')),
'name': 'vlanClient',
'ip': dut.app.sdkconfig.get('EXAMPLE_VLAN_STATIC_ADDR_DEF_GW')}
vlanServer_conf = {'id': str(dut.app.sdkconfig.get('EXAMPLE_EXTRA_ETHERNET_VLAN_ID')),
'name': 'vlanServer',
'ip': dut.app.sdkconfig.get('EXAMPLE_EXTRA_VLAN_STATIC_ADDR_DEF_GW')}
esp_vlanClient_ip = dut.app.sdkconfig.get('EXAMPLE_VLAN_STATIC_IPV4_ADDR')
esp_vlanServer_ip = dut.app.sdkconfig.get('EXAMPLE_EXTRA_VLAN_STATIC_IPV4_ADDR')
subnet_mask = ipaddress.IPv4Address('255.255.255.0')
tmp_ip = ipaddress.IPv4Address(vlanServer_conf['ip'])
vlanServer_net_addr = ipaddress.IPv4Address(int(tmp_ip) & int(subnet_mask))
config: Dict[str, Union[str, dict, dict, str, str, list, list, list, list]] = {
# Basic Configurations
'pc_iface': pc_iface,
'vlanClient': vlanClient_conf,
'vlanServer': vlanServer_conf,
'esp_vlanClient_ip': esp_vlanClient_ip,
'esp_vlanServer_ip': esp_vlanServer_ip,
'vlan_create_cmd_l': [f'ip netns add ns_vlanClient',
f"ip link add link {pc_iface} name {vlanClient_conf['name']} type vlan id {vlanClient_conf['id']}",
f"ip link set {vlanClient_conf['name']} netns ns_vlanClient",
f"ip netns exec ns_vlanClient ip addr add {vlanClient_conf['ip']}/255.255.255.0 dev {vlanClient_conf['name']}",
f"ip netns exec ns_vlanClient ip link set dev {vlanClient_conf['name']} up",
f"ip link add link {pc_iface} name {vlanServer_conf['name']} type vlan id {vlanServer_conf['id']}",
f"ip addr add {vlanServer_conf['ip']}/255.255.255.0 dev {vlanServer_conf['name']}",
f"ip link set dev {vlanServer_conf['name']} up"],
'vlan_destroy_cmd_l': [f"ip netns exec ns_vlanClient ip link set dev {vlanClient_conf['name']} down",
f"ip netns exec ns_vlanClient ip link delete {vlanClient_conf['name']}",
f"ip link set dev {vlanServer_conf['name']} down",
f"ip link delete {vlanServer_conf['name']}",
f'ip netns delete ns_vlanClient'],
'set_route_cmd_l': [f'ip netns exec ns_vlanClient ip route add {vlanServer_net_addr}/24 via {esp_vlanClient_ip}'],
'delete_route_cmd_l': [f'ip netns exec ns_vlanClient ip route delete {vlanServer_net_addr}/24 via {esp_vlanClient_ip}'],
}
return config
# Ping Test
def ping_test(config: dict) -> None:
setup_network(config)
capture = AsyncSniffer(iface=config['vlanServer']['name'], filter='icmp', count=10)
# Start sniffing
capture.start()
time.sleep(1)
# Run network test commands here
print(run_cmd_sec(f"ip netns exec ns_vlanClient ping -I {config['vlanClient']['ip']} {config['vlanServer']['ip']} -c 10"))
# Stop sniffing
capture.join(timeout=20)
vlanServer_pkt_list = capture.results
clear_network(config)
# Test Validation
vlanServer_forward_flag = False
vlanServer_return_flag = False
if vlanServer_pkt_list is None:
print('Failure: No packets captured')
assert False
print(f"Captured: {len(vlanServer_pkt_list)} packets on interface {config['vlanServer']['name']}")
for pkt in vlanServer_pkt_list:
print('Summary: ', pkt.summary())
if pkt[ICMP].type == 8 and pkt[IP].src == config['esp_vlanServer_ip'] and pkt[IP].dst == config['vlanServer']['ip']:
vlanServer_forward_flag = True
if pkt[ICMP].type == 0 and pkt[IP].src == config['vlanServer']['ip'] and pkt[IP].dst == config['esp_vlanServer_ip']:
vlanServer_return_flag = True
assert vlanServer_forward_flag and vlanServer_return_flag
# UDP Test
def udp_server(serverip: str, port: int) -> None:
print(f'UDP server listening on IP: {serverip} port: {port}')
run_cmd(f'timeout 10s iperf3 -s -p {port}')
def udp_client(serverip: str, port: int) -> None:
time.sleep(1)
print(run_cmd_sec(f'timeout 10s ip netns exec ns_vlanClient iperf3 -c {serverip} -u -p {port} --bidir -k 20'))
def udp_server_client_comm(config: dict, port: int) -> None:
server_thread = threading.Thread(target=udp_server, args=(config['vlanServer']['ip'], port,))
client_thread = threading.Thread(target=udp_client, args=(config['vlanServer']['ip'], port))
server_thread.start()
client_thread.start()
server_thread.join()
client_thread.join()
print('UDP Test Done')
def udp_lfilter(packet: layers.l2.Ether) -> layers.l2.Ether:
return UDP in packet and (packet[UDP].dport == udp_port or packet[UDP].sport == udp_port)
def udp_test(config: dict) -> None:
setup_network(config)
capture = AsyncSniffer(iface=config['vlanServer']['name'],
filter='udp',
lfilter=udp_lfilter,
count=10)
# Start sniffing
capture.start()
time.sleep(1)
# Run network test commands here
udp_server_client_comm(config, udp_port)
# Stop sniffing
capture.join(timeout=20)
vlanServer_pkt_list = capture.results
clear_network(config)
# Test Validation
vlanServer_forward_flag = False
vlanServer_return_flag = False
if vlanServer_pkt_list is None:
print('Failure: No packets captured')
assert False
print(f"Captured: {len(vlanServer_pkt_list)} packets on interface {config['vlanServer']['name']}")
for pkt in vlanServer_pkt_list:
print('Summary: ', pkt.summary())
if UDP in pkt:
if pkt[UDP].dport == udp_port and pkt[IP].src == config['esp_vlanServer_ip'] and pkt[IP].dst == config['vlanServer']['ip']:
vlanServer_forward_flag = True
if pkt[UDP].sport == udp_port and pkt[IP].src == config['vlanServer']['ip'] and pkt[IP].dst == config['esp_vlanServer_ip']:
vlanServer_return_flag = True
assert vlanServer_forward_flag and vlanServer_return_flag
# TCP Test
def tcp_server(serverip: str, port: int) -> None:
print(f'TCP server listening on IP: {serverip} port: {port}')
run_cmd(f'timeout 10s iperf3 -s -p {port}')
def tcp_client(serverip: str, port: int) -> None:
time.sleep(1)
print(run_cmd_sec(f'timeout 10s ip netns exec ns_vlanClient iperf3 -c {serverip} -p {port} --bidir -k 20'))
def tcp_server_client_comm(config: dict, port: int) -> None:
server_thread = threading.Thread(target=tcp_server, args=(config['vlanServer']['ip'], port,))
client_thread = threading.Thread(target=tcp_client, args=(config['vlanServer']['ip'], port))
server_thread.start()
client_thread.start()
server_thread.join()
client_thread.join()
print('TCP Test Done')
def tcp_lfilter(packet: layers.l2.Ether) -> layers.l2.Ether:
return TCP in packet and (packet[TCP].dport == tcp_port or packet[TCP].sport == tcp_port)
def tcp_test(config: dict) -> None:
setup_network(config)
capture = AsyncSniffer(iface=config['vlanServer']['name'],
filter='tcp',
lfilter=tcp_lfilter,
count=10)
# Start sniffing
capture.start()
time.sleep(1)
# Run network test commands here
tcp_server_client_comm(config, tcp_port)
# Stop sniffing
capture.join(timeout=20)
vlanServer_pkt_list = capture.results
clear_network(config)
# Test Validation
vlanServer_forward_flag = False
vlanServer_return_flag = False
if vlanServer_pkt_list is None:
print('Failure: No packets captured')
assert False
print(f"Captured: {len(vlanServer_pkt_list)} packets on interface {config['vlanServer']['name']}")
for pkt in vlanServer_pkt_list:
print('Summary: ', pkt.summary())
if TCP in pkt:
if pkt[TCP].dport == tcp_port and pkt[IP].src == config['esp_vlanServer_ip'] and pkt[IP].dst == config['vlanServer']['ip']:
vlanServer_forward_flag = True
if pkt[TCP].sport == tcp_port and pkt[IP].src == config['vlanServer']['ip'] and pkt[IP].dst == config['esp_vlanServer_ip']:
vlanServer_return_flag = True
assert vlanServer_forward_flag and vlanServer_return_flag
@pytest.mark.esp32
@pytest.mark.ethernet_vlan
def test_vlan_napt_pingtest(dut: Dut) -> None:
dut.expect('main_task: Returned from app_main()')
test_conf = create_config(dut)
ping_test(test_conf)
@pytest.mark.esp32
@pytest.mark.ethernet_vlan
def test_vlan_napt_udptest(dut: Dut) -> None:
dut.expect('main_task: Returned from app_main()')
test_conf = create_config(dut)
udp_test(test_conf)
@pytest.mark.esp32
@pytest.mark.ethernet_vlan
def test_vlan_napt_tcptest(dut: Dut) -> None:
dut.expect('main_task: Returned from app_main()')
test_conf = create_config(dut)
tcp_test(test_conf)

View File

@ -0,0 +1,5 @@
CONFIG_LWIP_NUM_NETIF_CLIENT_DATA=1
CONFIG_EXAMPLE_EXTRA_VLAN_INTERFACE=y
CONFIG_EXAMPLE_VLAN_NAPT_IF=y
CONFIG_LWIP_IP_FORWARD=y
CONFIG_LWIP_IPV4_NAPT=y

View File

@ -1,4 +1,7 @@
# This file was generated using idf.py save-defconfig. It can be edited manually.
# Espressif IoT Development Framework (ESP-IDF) Project Minimal Configuration
#
CONFIG_EXAMPLE_EXTRA_VLAN_INTERFACE=y
CONFIG_LWIP_IP_FORWARD=y
CONFIG_LWIP_IPV4_NAPT=y
CONFIG_LWIP_NUM_NETIF_CLIENT_DATA=1