2024-01-18 11:00:01 +01:00
|
|
|
# SPDX-FileCopyrightText: 2022-2024 Espressif Systems (Shanghai) CO LTD
|
2022-03-31 14:52:27 +02:00
|
|
|
# SPDX-License-Identifier: CC0-1.0
|
|
|
|
import contextlib
|
|
|
|
import logging
|
|
|
|
import os
|
|
|
|
import socket
|
|
|
|
import sys
|
|
|
|
from typing import Iterator
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
from pytest_embedded import Dut
|
2024-01-18 11:00:01 +01:00
|
|
|
from scapy.all import Ether
|
|
|
|
from scapy.all import raw
|
2022-03-31 14:52:27 +02:00
|
|
|
|
|
|
|
ETH_TYPE_1 = 0x2220
|
|
|
|
ETH_TYPE_2 = 0x2221
|
|
|
|
ETH_TYPE_3 = 0x2223
|
|
|
|
|
|
|
|
|
|
|
|
@contextlib.contextmanager
|
|
|
|
def configure_eth_if(eth_type: int, target_if: str='') -> Iterator[socket.socket]:
|
|
|
|
if target_if == '':
|
|
|
|
# try to determine which interface to use
|
|
|
|
netifs = os.listdir('/sys/class/net/')
|
2024-01-18 11:00:01 +01:00
|
|
|
# order matters - ETH NIC with the highest number is connected to DUT on CI runner
|
|
|
|
netifs.sort(reverse=True)
|
2022-03-31 14:52:27 +02:00
|
|
|
logging.info('detected interfaces: %s', str(netifs))
|
|
|
|
for netif in netifs:
|
|
|
|
if netif.find('eth') == 0 or netif.find('enx') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
|
|
|
|
target_if = netif
|
|
|
|
break
|
|
|
|
if target_if == '':
|
|
|
|
raise Exception('no network interface found')
|
|
|
|
logging.info('Use %s for testing', target_if)
|
|
|
|
|
|
|
|
so = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(eth_type))
|
|
|
|
so.bind((target_if, 0))
|
|
|
|
|
|
|
|
try:
|
|
|
|
yield so
|
|
|
|
finally:
|
|
|
|
so.close()
|
|
|
|
|
|
|
|
|
|
|
|
def send_recv_eth_frame(payload_str: str, eth_type: int, dest_mac: str, eth_if: str='') -> str:
|
|
|
|
with configure_eth_if(eth_type, eth_if) as so:
|
|
|
|
so.settimeout(10)
|
|
|
|
eth_frame = Ether(dst=dest_mac, src=so.getsockname()[4], type=eth_type) / raw(payload_str.encode())
|
|
|
|
try:
|
|
|
|
so.send(raw(eth_frame))
|
|
|
|
logging.info('Sent %d bytes to %s', len(eth_frame), dest_mac)
|
|
|
|
logging.info('Sent msg: "%s"', payload_str)
|
|
|
|
|
|
|
|
eth_frame_repl = Ether(so.recv(128))
|
|
|
|
if eth_frame_repl.type == eth_type:
|
|
|
|
logging.info('Received %d bytes echoed from %s', len(eth_frame_repl), eth_frame_repl.src)
|
|
|
|
logging.info('Echoed msg: "%s"', eth_frame_repl.load.decode())
|
|
|
|
except Exception as e:
|
|
|
|
raise e
|
|
|
|
# return echoed message and remove possible null characters which might have been appended since
|
|
|
|
# minimal size of Ethernet frame to be transmitted physical layer is 60B (not including CRC)
|
|
|
|
return str(eth_frame_repl.load.decode().rstrip('\x00'))
|
|
|
|
|
|
|
|
|
|
|
|
def recv_eth_frame(eth_type: int, eth_if: str='') -> str:
|
|
|
|
with configure_eth_if(eth_type, eth_if) as so:
|
|
|
|
so.settimeout(10)
|
|
|
|
try:
|
|
|
|
eth_frame = Ether(so.recv(128))
|
|
|
|
if eth_frame.type == eth_type:
|
|
|
|
logging.info('Received %d bytes from %s', len(eth_frame), eth_frame.src)
|
|
|
|
logging.info('Received msg: "%s"', eth_frame.load.decode())
|
|
|
|
except Exception as e:
|
|
|
|
raise e
|
|
|
|
return str(eth_frame.load.decode().rstrip('\x00'))
|
|
|
|
|
|
|
|
|
|
|
|
def actual_test(dut: Dut) -> None:
|
|
|
|
# Get DUT's MAC address
|
|
|
|
res = dut.expect(
|
|
|
|
r'([\s\S]*)'
|
|
|
|
r'Ethernet HW Addr ([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})'
|
|
|
|
)
|
|
|
|
dut_mac = res.group(2)
|
|
|
|
|
|
|
|
# Receive "ESP32 Hello frame"
|
|
|
|
recv_eth_frame(ETH_TYPE_3)
|
|
|
|
|
|
|
|
# Sent a message and receive its echo
|
|
|
|
message = 'ESP32 test message with EthType ' + hex(ETH_TYPE_1)
|
|
|
|
echoed = send_recv_eth_frame(message, ETH_TYPE_1, dut_mac)
|
|
|
|
if echoed == message:
|
|
|
|
logging.info('PASS')
|
|
|
|
else:
|
|
|
|
raise Exception('Echoed message does not match!')
|
|
|
|
message = 'ESP32 test message with EthType ' + hex(ETH_TYPE_2)
|
|
|
|
echoed = send_recv_eth_frame(message, ETH_TYPE_2, dut_mac)
|
|
|
|
if echoed == message:
|
|
|
|
logging.info('PASS')
|
|
|
|
else:
|
|
|
|
raise Exception('Echoed message does not match!')
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.esp32 # internally tested using ESP32 with IP101 but may support all targets with SPI Ethernet
|
2024-01-18 11:00:01 +01:00
|
|
|
@pytest.mark.eth_ip101
|
2022-03-31 14:52:27 +02:00
|
|
|
@pytest.mark.flaky(reruns=3, reruns_delay=5)
|
|
|
|
def test_esp_netif_l2tap_example(dut: Dut) -> None:
|
|
|
|
actual_test(dut)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
|
|
|
|
message_1 = 'ESP32 test message with EthType ' + hex(ETH_TYPE_1)
|
|
|
|
message_2 = 'ESP32 test message with EthType ' + hex(ETH_TYPE_2)
|
|
|
|
# Usage: pytest_example_l2tap_echo.py [<dest_mac_address>] [<host_eth_interface>]
|
|
|
|
if sys.argv[2:]: # if two arguments provided:
|
|
|
|
send_recv_eth_frame(message_1, ETH_TYPE_1, sys.argv[1], sys.argv[2])
|
|
|
|
send_recv_eth_frame(message_2, ETH_TYPE_2, sys.argv[1], sys.argv[2])
|
|
|
|
elif sys.argv[1:]: # if one argument provided:
|
|
|
|
send_recv_eth_frame(message_1, ETH_TYPE_1, sys.argv[1])
|
|
|
|
send_recv_eth_frame(message_2, ETH_TYPE_2, sys.argv[1])
|
|
|
|
else: # if no argument provided:
|
|
|
|
send_recv_eth_frame(message_1, ETH_TYPE_1, 'ff:ff:ff:ff:ff:ff')
|
|
|
|
send_recv_eth_frame(message_2, ETH_TYPE_2, 'ff:ff:ff:ff:ff:ff')
|