mirror of
https://github.com/espressif/esp-idf.git
synced 2024-10-05 20:47:46 -04:00
166 lines
5.6 KiB
Python
166 lines
5.6 KiB
Python
|
#!/usr/bin/env python
|
||
|
#
|
||
|
# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
|
||
|
# SPDX-License-Identifier: Apache-2.0
|
||
|
|
||
|
from __future__ import division, print_function, unicode_literals
|
||
|
|
||
|
import logging
|
||
|
import os
|
||
|
import random
|
||
|
import socket
|
||
|
import string
|
||
|
import sys
|
||
|
import threading
|
||
|
import time
|
||
|
from builtins import range
|
||
|
|
||
|
import pytest
|
||
|
|
||
|
try:
|
||
|
from idf_http_server_test import client
|
||
|
except ModuleNotFoundError:
|
||
|
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', 'tools', 'ci', 'python_packages'))
|
||
|
from idf_http_server_test import client
|
||
|
|
||
|
from pytest_embedded import Dut
|
||
|
|
||
|
|
||
|
class http_client_thread(threading.Thread):
|
||
|
def __init__(self, ip: str, port: int, delay: int) -> None:
|
||
|
threading.Thread.__init__(self)
|
||
|
self.ip = ip
|
||
|
self.port = port
|
||
|
self.delay = delay
|
||
|
self.exc = 0
|
||
|
|
||
|
# Thread function used to open a socket and wait for specific amount of time before returning
|
||
|
def open_connection(self, ip: str, port: int, delay: int) -> None:
|
||
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
|
s.settimeout(delay)
|
||
|
s.connect((ip, port))
|
||
|
time.sleep(delay)
|
||
|
|
||
|
def run(self) -> None:
|
||
|
try:
|
||
|
self.open_connection(self.ip, self.port, self.delay)
|
||
|
except socket.timeout:
|
||
|
self.exc = 1
|
||
|
|
||
|
def join(self, timeout=None): # type: ignore
|
||
|
threading.Thread.join(self)
|
||
|
if self.exc:
|
||
|
raise socket.timeout
|
||
|
|
||
|
|
||
|
# When running on local machine execute the following before running this script
|
||
|
# > make app bootloader
|
||
|
# > make print_flash_cmd | tail -n 1 > build/download.config
|
||
|
|
||
|
|
||
|
@pytest.mark.esp32
|
||
|
@pytest.mark.esp32c3
|
||
|
@pytest.mark.esp32s2
|
||
|
@pytest.mark.esp32s3
|
||
|
@pytest.mark.wifi
|
||
|
def test_examples_protocol_http_server_simple(dut: Dut) -> None:
|
||
|
|
||
|
# Get binary file
|
||
|
binary_file = os.path.join(dut.app.binary_path, 'simple.bin')
|
||
|
bin_size = os.path.getsize(binary_file)
|
||
|
logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024))
|
||
|
|
||
|
# Upload binary and start testing
|
||
|
logging.info('Starting http_server simple test app')
|
||
|
|
||
|
# Parse IP address of STA
|
||
|
logging.info('Waiting to connect with AP')
|
||
|
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)', timeout=30)[1].decode()
|
||
|
got_port = dut.expect(r"(?:[\s\S]*)Starting server on port: '(\d+)'", timeout=30)[1].decode()
|
||
|
|
||
|
logging.info('Got IP : {}'.format(got_ip))
|
||
|
logging.info('Got Port : {}'.format(got_port))
|
||
|
|
||
|
# Expected Logs
|
||
|
dut.expect('Registering URI handlers', timeout=30)
|
||
|
|
||
|
# Run test script
|
||
|
# If failed raise appropriate exception
|
||
|
logging.info('Test /hello GET handler')
|
||
|
if not client.test_get_handler(got_ip, str(got_port)):
|
||
|
raise RuntimeError
|
||
|
|
||
|
# Acquire host IP. Need a way to check it
|
||
|
dut.expect(r'(?:[\s\S]*)Found header => Host: (\d+.\d+.\d+.\d+)', timeout=30)
|
||
|
|
||
|
# Match additional headers sent in the request
|
||
|
dut.expect('Found header => Test-Header-2: Test-Value-2', timeout=30)
|
||
|
dut.expect('Found header => Test-Header-1: Test-Value-1', timeout=30)
|
||
|
dut.expect('Found URL query parameter => query1=value1', timeout=30)
|
||
|
dut.expect('Found URL query parameter => query3=value3', timeout=30)
|
||
|
dut.expect('Found URL query parameter => query2=value2', timeout=30)
|
||
|
dut.expect('Request headers lost', timeout=30)
|
||
|
|
||
|
logging.info('Test /ctrl PUT handler and realtime handler de/registration')
|
||
|
if not client.test_put_handler(got_ip, got_port):
|
||
|
raise RuntimeError
|
||
|
dut.expect('Unregistering /hello and /echo URIs', timeout=30)
|
||
|
dut.expect('Registering /hello and /echo URIs', timeout=30)
|
||
|
|
||
|
# Generate random data of 10KB
|
||
|
random_data = ''.join(string.printable[random.randint(0,len(string.printable)) - 1] for _ in range(10 * 1024))
|
||
|
logging.info('Test /echo POST handler with random data')
|
||
|
if not client.test_post_handler(got_ip, got_port, random_data):
|
||
|
raise RuntimeError
|
||
|
|
||
|
query = 'http://foobar'
|
||
|
logging.info('Test /hello with custom query : {}'.format(query))
|
||
|
if not client.test_custom_uri_query(got_ip, got_port, query):
|
||
|
raise RuntimeError
|
||
|
dut.expect('Found URL query => ' + query, timeout=30)
|
||
|
|
||
|
query = 'abcd+1234%20xyz'
|
||
|
logging.info('Test /hello with custom query : {}'.format(query))
|
||
|
if not client.test_custom_uri_query(got_ip, got_port, query):
|
||
|
raise RuntimeError
|
||
|
dut.expect_exact('Found URL query => ' + query, timeout=30)
|
||
|
|
||
|
|
||
|
@pytest.mark.esp32
|
||
|
@pytest.mark.esp32c3
|
||
|
@pytest.mark.esp32s2
|
||
|
@pytest.mark.esp32s3
|
||
|
@pytest.mark.wifi
|
||
|
def test_examples_protocol_http_server_lru_purge_enable(dut: Dut) -> None:
|
||
|
|
||
|
# Get binary file
|
||
|
binary_file = os.path.join(dut.app.binary_path, 'simple.bin')
|
||
|
bin_size = os.path.getsize(binary_file)
|
||
|
logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024))
|
||
|
|
||
|
# Upload binary and start testing
|
||
|
logging.info('Starting http_server simple test app')
|
||
|
|
||
|
# Parse IP address of STA
|
||
|
logging.info('Waiting to connect with AP')
|
||
|
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)', timeout=30)[1].decode()
|
||
|
got_port = dut.expect(r"(?:[\s\S]*)Starting server on port: '(\d+)'", timeout=30)[1].decode()
|
||
|
|
||
|
logging.info('Got IP : {}'.format(got_ip))
|
||
|
logging.info('Got Port : {}'.format(got_port))
|
||
|
|
||
|
# Expected Logs
|
||
|
dut.expect('Registering URI handlers', timeout=30)
|
||
|
threads = []
|
||
|
# Open 20 sockets, one from each thread
|
||
|
for _ in range(20):
|
||
|
try:
|
||
|
thread = http_client_thread(got_ip, (int(got_port)), 20)
|
||
|
thread.start()
|
||
|
threads.append(thread)
|
||
|
except OSError as err:
|
||
|
logging.info('Error: unable to start thread, {}'.format(err))
|
||
|
|
||
|
for t in threads:
|
||
|
t.join()
|