mirror of
https://github.com/espressif/esp-idf.git
synced 2024-10-05 20:47:46 -04:00
http examples pytest migration
This commit is contained in:
parent
aa3ddbc3c6
commit
83ace52a36
@ -1,99 +0,0 @@
|
||||
import os
|
||||
import re
|
||||
|
||||
import ttfw_idf
|
||||
|
||||
|
||||
@ttfw_idf.idf_example_test(env_tag='Example_EthKitV1')
|
||||
def test_examples_protocol_esp_http_client(env, extra_data):
|
||||
"""
|
||||
steps: |
|
||||
1. join AP
|
||||
2. Send HTTP request to httpbin.org
|
||||
"""
|
||||
dut1 = env.get_dut('esp_http_client', 'examples/protocols/esp_http_client', dut_class=ttfw_idf.ESP32DUT)
|
||||
# check and log bin size
|
||||
binary_file = os.path.join(dut1.app.binary_path, 'esp_http_client_example.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
ttfw_idf.log_performance('esp_http_client_bin_size', '{}KB'.format(bin_size // 1024))
|
||||
# start test
|
||||
dut1.start_app()
|
||||
dut1.expect('Connected to AP, begin http example', timeout=30)
|
||||
dut1.expect(re.compile(r'HTTP GET Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP POST Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP PUT Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP PATCH Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP DELETE Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP HEAD Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP GET Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP POST Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP PUT Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP PATCH Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP DELETE Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP HEAD Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP Basic Auth Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP Basic Auth redirect Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP Digest Auth Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP Relative path redirect Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP Absolute path redirect Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP Absolute path redirect \(manual\) Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTPS Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTPS Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP redirect to HTTPS Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP chunk encoding Status = 200, content_length = (-?\d)'))
|
||||
# content-len for chunked encoding is typically -1, could be a positive length in some cases
|
||||
dut1.expect(re.compile(r'HTTP Stream reader Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTPS Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'Last esp error code: 0x8001'))
|
||||
dut1.expect(re.compile(r'HTTP GET Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP POST Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP Status = 206, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP Status = 206, content_length = 10'))
|
||||
dut1.expect(re.compile(r'HTTP Status = 206, content_length = 10'))
|
||||
dut1.expect('Finish http example')
|
||||
|
||||
# test mbedtls dynamic resource
|
||||
dut1 = env.get_dut('esp_http_client', 'examples/protocols/esp_http_client', dut_class=ttfw_idf.ESP32DUT, app_config_name='ssldyn')
|
||||
# check and log bin size
|
||||
binary_file = os.path.join(dut1.app.binary_path, 'esp_http_client_example.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
ttfw_idf.log_performance('esp_http_client_bin_size', '{}KB'.format(bin_size // 1024))
|
||||
# start test
|
||||
dut1.start_app()
|
||||
dut1.expect('Connected to AP, begin http example', timeout=30)
|
||||
dut1.expect(re.compile(r'HTTP GET Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP POST Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP PUT Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP PATCH Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP DELETE Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP HEAD Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP GET Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP POST Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP PUT Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP PATCH Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP DELETE Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP HEAD Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP Basic Auth Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP Basic Auth redirect Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP Digest Auth Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP Relative path redirect Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP Absolute path redirect Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP Absolute path redirect \(manual\) Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTPS Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTPS Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP redirect to HTTPS Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP chunk encoding Status = 200, content_length = (-?\d)'))
|
||||
# content-len for chunked encoding is typically -1, could be a positive length in some cases
|
||||
dut1.expect(re.compile(r'HTTP Stream reader Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTPS Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'Last esp error code: 0x8001'))
|
||||
dut1.expect(re.compile(r'HTTP GET Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP POST Status = 200, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP Status = 206, content_length = (\d)'))
|
||||
dut1.expect(re.compile(r'HTTP Status = 206, content_length = 10'))
|
||||
dut1.expect(re.compile(r'HTTP Status = 206, content_length = 10'))
|
||||
dut1.expect('Finish http example')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_examples_protocol_esp_http_client()
|
99
examples/protocols/esp_http_client/pytest_esp_http_client.py
Normal file
99
examples/protocols/esp_http_client/pytest_esp_http_client.py
Normal file
@ -0,0 +1,99 @@
|
||||
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Unlicense OR CC0-1.0
|
||||
import logging
|
||||
import os
|
||||
|
||||
import pytest
|
||||
from pytest_embedded import Dut
|
||||
|
||||
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.esp32c3
|
||||
@pytest.mark.esp32s2
|
||||
@pytest.mark.esp32s3
|
||||
@pytest.mark.ethernet
|
||||
def test_examples_protocol_esp_http_client(dut: Dut) -> None:
|
||||
"""
|
||||
steps: |
|
||||
1. join AP
|
||||
2. Send HTTP request to httpbin.org
|
||||
"""
|
||||
binary_file = os.path.join(dut.app.binary_path, 'esp_http_client_example.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('esp_http_client_bin_size : {}KB'.format(bin_size // 1024))
|
||||
# start test
|
||||
dut.expect('Connected to AP, begin http example', timeout=30)
|
||||
dut.expect(r'HTTP GET Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP POST Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP PUT Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP PATCH Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP DELETE Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP HEAD Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP GET Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP POST Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP PUT Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP PATCH Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP DELETE Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP HEAD Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP Basic Auth Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP Basic Auth redirect Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP Digest Auth Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP Relative path redirect Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP Absolute path redirect Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP Absolute path redirect \(manual\) Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTPS Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTPS Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP redirect to HTTPS Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP chunk encoding Status = 200, content_length = (-?\d)')
|
||||
# content-len for chunked encoding is typically -1, could be a positive length in some cases
|
||||
dut.expect(r'HTTP Stream reader Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTPS Status = 200, content_length = (\d)')
|
||||
dut.expect(r'Last esp error code: 0x8001')
|
||||
dut.expect(r'HTTP GET Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP POST Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP Status = 206, content_length = (\d)')
|
||||
dut.expect(r'HTTP Status = 206, content_length = 10')
|
||||
dut.expect(r'HTTP Status = 206, content_length = 10')
|
||||
dut.expect('Finish http example')
|
||||
|
||||
|
||||
@pytest.mark.parametrize('config', [pytest.param('ssldyn', marks=[pytest.mark.supported_targets, pytest.mark.ethernet]),], indirect=True)
|
||||
def test_examples_protocol_esp_http_client_dynamic_buffer(dut: Dut) -> None:
|
||||
# test mbedtls dynamic resource
|
||||
# check and log bin size
|
||||
binary_file = os.path.join(dut.app.binary_path, 'esp_http_client_example.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('esp_http_client_bin_size : {}KB'.format(bin_size // 1024))
|
||||
|
||||
dut.expect('Connected to AP, begin http example', timeout=30)
|
||||
dut.expect(r'HTTP GET Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP POST Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP PUT Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP PATCH Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP DELETE Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP HEAD Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP GET Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP POST Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP PUT Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP PATCH Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP DELETE Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP HEAD Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP Basic Auth Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP Basic Auth redirect Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP Relative path redirect Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP Absolute path redirect Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP Absolute path redirect \(manual\) Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTPS Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTPS Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP redirect to HTTPS Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP chunk encoding Status = 200, content_length = (-?\d)')
|
||||
# content-len for chunked encoding is typically -1, could be a positive length in some cases
|
||||
dut.expect(r'HTTP Stream reader Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTPS Status = 200, content_length = (\d)')
|
||||
dut.expect(r'Last esp error code: 0x8001')
|
||||
dut.expect(r'HTTP GET Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP POST Status = 200, content_length = (\d)')
|
||||
dut.expect(r'HTTP Status = 206, content_length = (\d)')
|
||||
dut.expect(r'HTTP Status = 206, content_length = 10')
|
||||
dut.expect(r'HTTP Status = 206, content_length = 10')
|
||||
dut.expect('Finish http example')
|
@ -4,11 +4,11 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import http.client
|
||||
import logging
|
||||
import os
|
||||
|
||||
import tiny_test_fw
|
||||
import ttfw_idf
|
||||
from tiny_test_fw import Utility
|
||||
import pytest
|
||||
from pytest_embedded import Dut
|
||||
|
||||
HTTP_OK = 200
|
||||
TEST_SERVER = 'http2.github.io'
|
||||
@ -22,14 +22,18 @@ def is_test_server_available(): # type: () -> bool
|
||||
resp = conn.getresponse()
|
||||
return True if resp.status == HTTP_OK else False
|
||||
except Exception as msg:
|
||||
Utility.console_log('Exception occurred when connecting to {}: {}'.format(TEST_SERVER, msg))
|
||||
logging.info('Exception occurred when connecting to {}: {}'.format(TEST_SERVER, msg))
|
||||
return False
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
@ttfw_idf.idf_example_test(env_tag='Example_EthKitV1')
|
||||
def test_examples_protocol_http2_request(env, extra_data): # type: (tiny_test_fw.Env.Env, None) -> None # pylint: disable=unused-argument
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.esp32c3
|
||||
@pytest.mark.esp32s2
|
||||
@pytest.mark.esp32s3
|
||||
@pytest.mark.ethernet
|
||||
def test_examples_protocol_http2_request(dut: Dut) -> None:
|
||||
"""
|
||||
steps: |
|
||||
1. join AP
|
||||
@ -37,25 +41,19 @@ def test_examples_protocol_http2_request(env, extra_data): # type: (tiny_test_f
|
||||
3. send http2 request
|
||||
4. send http2 put response
|
||||
"""
|
||||
dut1 = env.get_dut('http2_request', 'examples/protocols/http2_request', dut_class=ttfw_idf.ESP32DUT)
|
||||
# check and log bin size
|
||||
binary_file = os.path.join(dut1.app.binary_path, 'http2_request.bin')
|
||||
binary_file = os.path.join(dut.app.binary_path, 'http2_request.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
ttfw_idf.log_performance('http2_request_bin_size', '{}KB'.format(bin_size // 1024))
|
||||
logging.info('http2_request_bin_size : {}KB'.format(bin_size // 1024))
|
||||
# start the test
|
||||
# check if test server is avilable
|
||||
test_server_available = is_test_server_available()
|
||||
# Skip the test if the server test server (http2.github.io) is not available at the moment.
|
||||
if test_server_available:
|
||||
Utility.console_log('test server \"{}\" is available'.format(TEST_SERVER))
|
||||
dut1.start_app()
|
||||
logging.info('test server \"{}\" is available'.format(TEST_SERVER))
|
||||
# check for connection
|
||||
dut1.expect('Connection done', timeout=30)
|
||||
dut.expect('Connection done', timeout=30)
|
||||
# check for get response
|
||||
dut1.expect('[get-response] Frame fully received')
|
||||
dut.expect('Frame fully received')
|
||||
else:
|
||||
Utility.console_log('test server \"{0}\" is not available at the moment.\nSkipping the test with status = success.'.format(TEST_SERVER))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_examples_protocol_http2_request() # pylint: disable=no-value-for-parameter
|
||||
logging.info('test server \"{0}\" is not available at the moment.\nSkipping the test with status = success.'.format(TEST_SERVER))
|
@ -1,41 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import os
|
||||
import re
|
||||
|
||||
import tiny_test_fw
|
||||
import ttfw_idf
|
||||
|
||||
|
||||
@ttfw_idf.idf_example_test(env_tag='Example_EthKitV1')
|
||||
def test_examples_protocol_http_request(env, extra_data): # type: (tiny_test_fw.Env.Env, None) -> None # pylint: disable=unused-argument
|
||||
"""
|
||||
steps: |
|
||||
1. join AP
|
||||
2. connect to example.com
|
||||
3. check conneciton success
|
||||
"""
|
||||
dut1 = env.get_dut('http_request', 'examples/protocols/http_request', dut_class=ttfw_idf.ESP32DUT)
|
||||
# check and log bin size
|
||||
binary_file = os.path.join(dut1.app.binary_path, 'http_request.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
ttfw_idf.log_performance('http_request_bin_size', '{}KB'.format(bin_size // 1024))
|
||||
# start test
|
||||
dut1.start_app()
|
||||
dut1.expect(re.compile(r'DNS lookup succeeded.'), timeout=30)
|
||||
# check if connected or not
|
||||
dut1.expect(' ... connected', timeout=60)
|
||||
dut1.expect(' ... socket send success')
|
||||
dut1.expect(' ... set socket receiving timeout success')
|
||||
# check server response
|
||||
dut1.expect(re.compile(r'HTTP/1.0 200 OK'))
|
||||
# read from the socket completed
|
||||
dut1.expect('... done reading from socket. Last read return=0 errno=128')
|
||||
dut1.expect(re.compile(r'(\d)...'))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_examples_protocol_http_request() # pylint: disable=no-value-for-parameter
|
39
examples/protocols/http_request/pytest_http_request.py
Normal file
39
examples/protocols/http_request/pytest_http_request.py
Normal file
@ -0,0 +1,39 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
import pytest
|
||||
from pytest_embedded import Dut
|
||||
|
||||
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.esp32c3
|
||||
@pytest.mark.esp32s2
|
||||
@pytest.mark.esp32s3
|
||||
@pytest.mark.ethernet
|
||||
def test_examples_protocol_http_request(dut: Dut) -> None:
|
||||
"""
|
||||
steps: |
|
||||
1. join AP
|
||||
2. connect to example.com
|
||||
3. check conneciton success
|
||||
"""
|
||||
# check and log bin size
|
||||
binary_file = os.path.join(dut.app.binary_path, 'http_request.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('http_request_bin_size : {}KB'.format(bin_size // 1024))
|
||||
# start test
|
||||
dut.expect(r'DNS lookup succeeded.', timeout=30)
|
||||
# check if connected or not
|
||||
dut.expect(' ... connected', timeout=60)
|
||||
dut.expect(' ... socket send success')
|
||||
dut.expect(' ... set socket receiving timeout success')
|
||||
# check server response
|
||||
dut.expect(r'HTTP/1.0 200 OK')
|
||||
# read from the socket completed
|
||||
dut.expect('... done reading from socket. Last read return=0 errno=128')
|
||||
dut.expect(r'(\d)...')
|
@ -1,27 +1,23 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
from __future__ import division, print_function, unicode_literals
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
|
||||
import ttfw_idf
|
||||
from idf_http_server_test import test as client
|
||||
from tiny_test_fw import Utility
|
||||
import pytest
|
||||
|
||||
try:
|
||||
from idf_http_server_test import test as client
|
||||
except ModuleNotFoundError:
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', 'tools', 'ci', 'python_packages'))
|
||||
from idf_http_server_test import test as client
|
||||
|
||||
from pytest_embedded import Dut
|
||||
|
||||
# When running on local machine execute the following before running this script
|
||||
# > make app bootloader
|
||||
@ -35,46 +31,47 @@ from tiny_test_fw import Utility
|
||||
# features to this component.
|
||||
|
||||
|
||||
@ttfw_idf.idf_example_test(env_tag='Example_WIFI_Protocols')
|
||||
def test_examples_protocol_http_server_advanced(env, extra_data):
|
||||
# Acquire DUT
|
||||
dut1 = env.get_dut('http_server', 'examples/protocols/http_server/advanced_tests', dut_class=ttfw_idf.ESP32DUT)
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.esp32c3
|
||||
@pytest.mark.esp32s2
|
||||
@pytest.mark.esp32s3
|
||||
@pytest.mark.wifi
|
||||
def test_examples_protocol_http_server_advanced(dut: Dut) -> None:
|
||||
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut1.app.binary_path, 'tests.bin')
|
||||
binary_file = os.path.join(dut.app.binary_path, 'tests.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
ttfw_idf.log_performance('http_server_bin_size', '{}KB'.format(bin_size // 1024))
|
||||
logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
|
||||
# Upload binary and start testing
|
||||
Utility.console_log('Starting http_server advanced test app')
|
||||
dut1.start_app()
|
||||
logging.info('Starting http_server advanced test app')
|
||||
|
||||
# Parse IP address of STA
|
||||
Utility.console_log('Waiting to connect with AP')
|
||||
got_ip = dut1.expect(re.compile(r'(?:[\s\S]*)IPv4 address: (\d+.\d+.\d+.\d+)'), timeout=30)[0]
|
||||
logging.info('Waiting to connect with AP')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)', timeout=30)[1].decode()
|
||||
|
||||
got_port = dut1.expect(re.compile(r"(?:[\s\S]*)Started HTTP server on port: '(\d+)'"), timeout=15)[0]
|
||||
result = dut1.expect(re.compile(r"(?:[\s\S]*)Max URI handlers: '(\d+)'(?:[\s\S]*)Max Open Sessions: " # noqa: W605
|
||||
r"'(\d+)'(?:[\s\S]*)Max Header Length: '(\d+)'(?:[\s\S]*)Max URI Length: "
|
||||
r"'(\d+)'(?:[\s\S]*)Max Stack Size: '(\d+)'"), timeout=15)
|
||||
# max_uri_handlers = int(result[0])
|
||||
max_sessions = int(result[1])
|
||||
max_hdr_len = int(result[2])
|
||||
max_uri_len = int(result[3])
|
||||
max_stack_size = int(result[4])
|
||||
got_port = dut.expect(r"(?:[\s\S]*)Started HTTP server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
|
||||
Utility.console_log('Got IP : ' + got_ip)
|
||||
Utility.console_log('Got Port : ' + got_port)
|
||||
result = dut.expect(r"(?:[\s\S]*)Max URI handlers: '(\d+)'(?:[\s\S]*)Max Open Sessions: " # noqa: W605
|
||||
r"'(\d+)'(?:[\s\S]*)Max Header Length: '(\d+)'(?:[\s\S]*)Max URI Length: "
|
||||
r"'(\d+)'(?:[\s\S]*)Max Stack Size: '(\d+)'", timeout=15)
|
||||
# max_uri_handlers = int(result[1])
|
||||
max_sessions = int(result[2])
|
||||
max_hdr_len = int(result[3])
|
||||
max_uri_len = int(result[4])
|
||||
max_stack_size = int(result[5])
|
||||
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
|
||||
# Run test script
|
||||
# If failed raise appropriate exception
|
||||
failed = False
|
||||
|
||||
Utility.console_log('Sessions and Context Tests...')
|
||||
logging.info('Sessions and Context Tests...')
|
||||
if not client.spillover_session(got_ip, got_port, max_sessions):
|
||||
Utility.console_log('Ignoring failure')
|
||||
logging.info('Ignoring failure')
|
||||
if not client.parallel_sessions_adder(got_ip, got_port, max_sessions):
|
||||
Utility.console_log('Ignoring failure')
|
||||
logging.info('Ignoring failure')
|
||||
if not client.leftover_data_test(got_ip, got_port):
|
||||
failed = True
|
||||
if not client.async_response_test(got_ip, got_port):
|
||||
@ -87,19 +84,19 @@ def test_examples_protocol_http_server_advanced(env, extra_data):
|
||||
# This test fails a lot! Enable when connection is stable
|
||||
# test_size = 50*1024 # 50KB
|
||||
# if not client.packet_size_limit_test(got_ip, got_port, test_size):
|
||||
# Utility.console_log("Ignoring failure")
|
||||
# logging.info("Ignoring failure")
|
||||
|
||||
Utility.console_log('Getting initial stack usage...')
|
||||
logging.info('Getting initial stack usage...')
|
||||
if not client.get_hello(got_ip, got_port):
|
||||
failed = True
|
||||
|
||||
inital_stack = int(dut1.expect(re.compile(r"(?:[\s\S]*)Free Stack for server task: '(\d+)'"), timeout=15)[0])
|
||||
inital_stack = int(dut.expect(r"(?:[\s\S]*)Free Stack for server task: '(\d+)'", timeout=15)[1])
|
||||
|
||||
if inital_stack < 0.1 * max_stack_size:
|
||||
Utility.console_log('More than 90% of stack being used on server start')
|
||||
logging.info('More than 90% of stack being used on server start')
|
||||
failed = True
|
||||
|
||||
Utility.console_log('Basic HTTP Client Tests...')
|
||||
logging.info('Basic HTTP Client Tests...')
|
||||
if not client.get_hello(got_ip, got_port):
|
||||
failed = True
|
||||
if not client.post_hello(got_ip, got_port):
|
||||
@ -121,7 +118,7 @@ def test_examples_protocol_http_server_advanced(env, extra_data):
|
||||
if not client.get_test_headers(got_ip, got_port):
|
||||
failed = True
|
||||
|
||||
Utility.console_log('Error code tests...')
|
||||
logging.info('Error code tests...')
|
||||
if not client.code_500_server_error_test(got_ip, got_port):
|
||||
failed = True
|
||||
if not client.code_501_method_not_impl(got_ip, got_port):
|
||||
@ -137,25 +134,21 @@ def test_examples_protocol_http_server_advanced(env, extra_data):
|
||||
if not client.code_408_req_timeout(got_ip, got_port):
|
||||
failed = True
|
||||
if not client.code_414_uri_too_long(got_ip, got_port, max_uri_len):
|
||||
Utility.console_log('Ignoring failure')
|
||||
logging.info('Ignoring failure')
|
||||
if not client.code_431_hdr_too_long(got_ip, got_port, max_hdr_len):
|
||||
Utility.console_log('Ignoring failure')
|
||||
logging.info('Ignoring failure')
|
||||
if not client.test_upgrade_not_supported(got_ip, got_port):
|
||||
failed = True
|
||||
|
||||
Utility.console_log('Getting final stack usage...')
|
||||
logging.info('Getting final stack usage...')
|
||||
if not client.get_hello(got_ip, got_port):
|
||||
failed = True
|
||||
|
||||
final_stack = int(dut1.expect(re.compile(r"(?:[\s\S]*)Free Stack for server task: '(\d+)'"), timeout=15)[0])
|
||||
final_stack = int(dut.expect(r"(?:[\s\S]*)Free Stack for server task: '(\d+)'", timeout=15)[1])
|
||||
|
||||
if final_stack < 0.05 * max_stack_size:
|
||||
Utility.console_log('More than 95% of stack got used during tests')
|
||||
logging.info('More than 95% of stack got used during tests')
|
||||
failed = True
|
||||
|
||||
if failed:
|
||||
raise RuntimeError
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_examples_protocol_http_server_advanced()
|
@ -1,30 +1,19 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright 2021 Espressif Systems (Shanghai) PTE LTD
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import http.client
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import sys
|
||||
|
||||
import tiny_test_fw
|
||||
import ttfw_idf
|
||||
from tiny_test_fw import Utility
|
||||
import pexpect
|
||||
import pytest
|
||||
from pytest_embedded import Dut
|
||||
|
||||
try:
|
||||
import wifi_tools
|
||||
@ -35,7 +24,7 @@ except ImportError:
|
||||
import wifi_tools
|
||||
|
||||
|
||||
def test_redirect(ip, port): # type: (str, str) -> str # pylint: disable=unused-argument
|
||||
def test_redirect(ip: str, port: str) -> str:
|
||||
# Establish HTTP connection
|
||||
sess = http.client.HTTPConnection(ip + ':' + port, timeout=15)
|
||||
|
||||
@ -60,7 +49,7 @@ def test_redirect(ip, port): # type: (str, str) -> str # pylint: disable=unused
|
||||
return uri
|
||||
|
||||
|
||||
def test_captive_page(ip, port, uri): # type: (str, str, str) -> bool # pylint: disable=unused-argument
|
||||
def test_captive_page(ip: str, port: str, uri: str) -> bool:
|
||||
# Establish HTTP connection
|
||||
sess = http.client.HTTPConnection(ip + ':' + port, timeout=15)
|
||||
|
||||
@ -79,25 +68,27 @@ def test_captive_page(ip, port, uri): # type: (str, str, str) -> bool # pylint:
|
||||
return True
|
||||
|
||||
|
||||
@ttfw_idf.idf_example_test(env_tag='Example_WIFI_BT', ignore=True)
|
||||
def test_example_captive_portal(env, extra_data): # type: (tiny_test_fw.Env.Env, None) -> None # pylint: disable=unused-argument
|
||||
# Acquire DUT
|
||||
dut1 = env.get_dut('captive_portal', 'examples/protocols/http_server/captive_portal', dut_class=ttfw_idf.ESP32DUT)
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.esp32c3
|
||||
@pytest.mark.esp32s2
|
||||
@pytest.mark.esp32s3
|
||||
@pytest.mark.wifi_bt
|
||||
@pytest.mark.xfail(reason='Runner unable to connect to target over WiFi', run=False)
|
||||
def test_example_captive_portal(dut: Dut) -> None:
|
||||
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut1.app.binary_path, 'captive_portal.bin')
|
||||
binary_file = os.path.join(dut.app.binary_path, 'captive_portal.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
ttfw_idf.log_performance('captive_portal_bin_size', '{}KB'.format(bin_size // 1024))
|
||||
|
||||
# Upload binary and start testing
|
||||
dut1.start_app()
|
||||
logging.info('captive_portal_bin_size : {}KB'.format(bin_size // 1024))
|
||||
|
||||
# Parse IP address of STA
|
||||
Utility.console_log('Waiting to connect with softAP')
|
||||
ap_ip = dut1.expect(re.compile(r'Set up softAP with IP: (\d+.\d+.\d+.\d+)'), timeout=60)[0]
|
||||
logging.info('Waiting to connect with softAP')
|
||||
ap_ip = dut.expect(r'Set up softAP with IP: (\d+.\d+.\d+.\d+)', timeout=60)[1]
|
||||
|
||||
[ssid, password] = dut1.expect(re.compile(r"wifi_init_softap finished. SSID:'(\S+)' password:'(\S+)'"), timeout=30)
|
||||
port = dut1.expect(re.compile(r"(?:[\s\S]*)Starting server on port: '(\d+)'"), timeout=30)[0]
|
||||
wifi_info = dut.expect(r"wifi_init_softap finished. SSID:'(\S+)' password:'(\S+)'", timeout=30)
|
||||
ssid = wifi_info[1].decode()
|
||||
password = wifi_info[2].decode()
|
||||
port = dut.expect(r"(?:[\s\S]*)Starting server on port: '(\d+)'", timeout=30)[1]
|
||||
|
||||
iface = wifi_tools.get_wiface_name()
|
||||
if iface is None:
|
||||
@ -112,17 +103,16 @@ def test_example_captive_portal(env, extra_data): # type: (tiny_test_fw.Env.Env
|
||||
try:
|
||||
ip = ctrl.connect(ssid, password)
|
||||
except RuntimeError as err:
|
||||
Utility.console_log('error: {}'.format(err))
|
||||
logging.info('error: {}'.format(err))
|
||||
try:
|
||||
got_ip = dut1.expect(re.compile(r'DHCP server assigned IP to a station, IP is: (\d+.\d+.\d+.\d+)'), timeout=60)
|
||||
Utility.console_log('got_ip: {}'.format(got_ip))
|
||||
got_ip = got_ip[0]
|
||||
got_ip = dut.expect(r'DHCP server assigned IP to a station, IP is: (\d+.\d+.\d+.\d+)', timeout=60)[1].decode()
|
||||
logging.info('got_ip: {}'.format(got_ip))
|
||||
if ip != got_ip:
|
||||
raise RuntimeError('SoftAP connected to another host! {} != {}'.format(ip, got_ip))
|
||||
except tiny_test_fw.DUT.ExpectTimeout:
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
# print what is happening on DUT side
|
||||
Utility.console_log('in exception tiny_test_fw.DUT.ExpectTimeout')
|
||||
Utility.console_log(dut1.read())
|
||||
logging.info('in exception tiny_test_fw.DUT.ExpectTimeout')
|
||||
logging.info(dut.read())
|
||||
raise
|
||||
print('Connected to DUT SoftAP')
|
||||
|
||||
@ -136,7 +126,3 @@ def test_example_captive_portal(env, extra_data): # type: (tiny_test_fw.Env.Env
|
||||
test_captive_page(ap_ip, port, uri)
|
||||
finally:
|
||||
ctrl.reset()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_example_captive_portal() # pylint: disable=no-value-for-parameter
|
@ -1,136 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import hashlib
|
||||
import http.client
|
||||
import os
|
||||
import re
|
||||
|
||||
import tiny_test_fw
|
||||
import ttfw_idf
|
||||
from idf_http_server_test import adder as client
|
||||
from tiny_test_fw import Utility
|
||||
|
||||
|
||||
@ttfw_idf.idf_example_test(env_tag='Example_WIFI_Protocols')
|
||||
def test_examples_protocol_http_server_file_serving(env, _): # type: (tiny_test_fw.Env.Env, None) -> None
|
||||
# Acquire DUT
|
||||
dut1 = env.get_dut('http file_serving', 'examples/protocols/http_server/file_serving', dut_class=ttfw_idf.ESP32DUT, app_config_name='spiffs')
|
||||
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut1.app.binary_path, 'file_server.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
ttfw_idf.log_performance('file_server_bin_size', '{}KB'.format(bin_size // 1024))
|
||||
Utility.console_log('Erasing the flash on the chip')
|
||||
# erase the flash
|
||||
dut1.erase_flash()
|
||||
# Upload binary and start testing
|
||||
Utility.console_log('Starting http file serving simple test app')
|
||||
dut1.start_app()
|
||||
|
||||
dut1.expect('Initializing SPIFFS', timeout=30)
|
||||
# Parse IP address of STA
|
||||
Utility.console_log('Waiting to connect with AP')
|
||||
got_ip = dut1.expect(re.compile(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)'), timeout=30)[0]
|
||||
# Expected logs
|
||||
got_port = dut1.expect(re.compile(r"Starting HTTP Server on port: '(\d+)'"), timeout=30)[0]
|
||||
Utility.console_log('Got IP : ' + got_ip)
|
||||
Utility.console_log('Got Port : ' + got_port)
|
||||
|
||||
# Run test script
|
||||
conn = client.start_session(got_ip, got_port)
|
||||
|
||||
# upload a file onto the server
|
||||
upload_data = 'Test data to be sent to the server'
|
||||
|
||||
upload_file_name = 'example.txt'
|
||||
upload_file_hash = hashlib.md5(upload_data.encode('UTF-8'))
|
||||
upload_file_digest = upload_file_hash.digest()
|
||||
Utility.console_log('\nTesting the uploading of file on the file server')
|
||||
client.postreq(conn, '/upload/' + str(upload_file_name), upload_data)
|
||||
|
||||
try:
|
||||
dut1.expect('File reception complete', timeout=10)
|
||||
except Exception:
|
||||
Utility.console_log('Failed the test to upload file on the file server')
|
||||
raise
|
||||
Utility.console_log('Passed the test to uploaded file on the file server')
|
||||
|
||||
# Download the uploaded file from the file server
|
||||
Utility.console_log("\nTesting for Download of \"existing\" file from the file server")
|
||||
|
||||
download_data = client.getreq(conn, '/' + str(upload_file_name))
|
||||
|
||||
try:
|
||||
dut1.expect('File sending complete', timeout=10)
|
||||
except Exception:
|
||||
Utility.console_log('Failed the test to download existing file from the file server')
|
||||
raise
|
||||
Utility.console_log('Passed the test to downloaded existing file from the file server')
|
||||
|
||||
download_file_hash = hashlib.md5(download_data)
|
||||
download_file_digest = download_file_hash.digest()
|
||||
|
||||
if download_file_digest != upload_file_digest:
|
||||
raise RuntimeError('The md5 hash of the downloaded file does not match with that of the uploaded file')
|
||||
|
||||
# Upload existing file on the file server
|
||||
Utility.console_log("\nTesting the upload of \"already existing\" file on the file server")
|
||||
client.postreq(conn, '/upload/' + str(upload_file_name), data=None)
|
||||
try:
|
||||
dut1.expect('File already exists : /data/' + str(upload_file_name), timeout=10)
|
||||
except Exception:
|
||||
Utility.console_log('Failed the test for uploading existing file on the file server')
|
||||
raise
|
||||
Utility.console_log('Passed the test for uploading existing file on the file server')
|
||||
# Previous URI was an invalid URI so the server should have closed the connection.
|
||||
# Trying to send request to the server
|
||||
try:
|
||||
client.getreq(conn, '/')
|
||||
except http.client.RemoteDisconnected:
|
||||
# It is correct behavior that the connection was closed by the server
|
||||
pass
|
||||
except Exception:
|
||||
Utility.console_log('Connection was not closed successfully by the server after last invalid URI')
|
||||
raise
|
||||
|
||||
conn = client.start_session(got_ip, got_port)
|
||||
# Delete the existing file from the file server
|
||||
Utility.console_log("\nTesting the deletion of \"existing\" file on the file server")
|
||||
client.postreq(conn, '/delete/' + str(upload_file_name), data=None)
|
||||
try:
|
||||
dut1.expect('Deleting file : /' + str(upload_file_name), timeout=10)
|
||||
except Exception:
|
||||
Utility.console_log('Failed the test for deletion of existing file on the file server')
|
||||
raise
|
||||
Utility.console_log('Passed the test for deletion of existing file on the file server')
|
||||
|
||||
conn = client.start_session(got_ip, got_port)
|
||||
# Try to delete non existing file from the file server
|
||||
Utility.console_log("\nTesting the deletion of \"non existing\" file on the file server")
|
||||
client.postreq(conn, '/delete/' + str(upload_file_name), data=None)
|
||||
try:
|
||||
dut1.expect('File does not exist : /' + str(upload_file_name), timeout=10)
|
||||
except Exception:
|
||||
Utility.console_log('Failed the test for deleting non existing file on the file server')
|
||||
raise
|
||||
Utility.console_log('Passed the test for deleting non existing file on the file server')
|
||||
|
||||
conn = client.start_session(got_ip, got_port)
|
||||
# Try to download non existing file from the file server
|
||||
Utility.console_log("\nTesting for Download of \"non existing\" file from the file server")
|
||||
|
||||
download_data = client.getreq(conn, '/' + str(upload_file_name))
|
||||
|
||||
try:
|
||||
dut1.expect('Failed to stat file : /data/' + str(upload_file_name), timeout=10)
|
||||
except Exception:
|
||||
Utility.console_log('Failed the test to download non existing file from the file server')
|
||||
raise
|
||||
Utility.console_log('Passed the test to downloaded non existing file from the file server')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_examples_protocol_http_server_file_serving()
|
@ -0,0 +1,137 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import hashlib
|
||||
import http.client
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
try:
|
||||
from idf_http_server_test import adder as client
|
||||
except ModuleNotFoundError:
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', 'tools', 'ci', 'python_packages'))
|
||||
from idf_http_server_test import adder as client
|
||||
|
||||
from pytest_embedded import Dut
|
||||
|
||||
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.esp32c3
|
||||
@pytest.mark.esp32s2
|
||||
@pytest.mark.esp32s3
|
||||
@pytest.mark.parametrize('config', ['spiffs',], indirect=True)
|
||||
def test_examples_protocol_http_server_file_serving(dut: Dut) -> None:
|
||||
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'file_server.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('file_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
logging.info('Erasing the flash on the chip')
|
||||
# Upload binary and start testing
|
||||
logging.info('Starting http file serving simple test app')
|
||||
|
||||
dut.expect('Initializing SPIFFS', timeout=30)
|
||||
# Parse IP address of STA
|
||||
logging.info('Waiting to connect with AP')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)', timeout=30)[1].decode()
|
||||
# Expected logs
|
||||
got_port = dut.expect(r"Starting HTTP Server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
|
||||
# Run test script
|
||||
conn = client.start_session(got_ip, got_port)
|
||||
|
||||
# upload a file onto the server
|
||||
upload_data = 'Test data to be sent to the server'
|
||||
|
||||
upload_file_name = 'example.txt'
|
||||
upload_file_hash = hashlib.md5(upload_data.encode('UTF-8'))
|
||||
upload_file_digest = upload_file_hash.digest()
|
||||
logging.info('\nTesting the uploading of file on the file server')
|
||||
client.postreq(conn, '/upload/' + str(upload_file_name), upload_data)
|
||||
|
||||
try:
|
||||
dut.expect('File reception complete', timeout=10)
|
||||
except Exception:
|
||||
logging.info('Failed the test to upload file on the file server')
|
||||
raise
|
||||
logging.info('Passed the test to uploaded file on the file server')
|
||||
|
||||
# Download the uploaded file from the file server
|
||||
logging.info("\nTesting for Download of \"existing\" file from the file server")
|
||||
|
||||
download_data = client.getreq(conn, '/' + str(upload_file_name))
|
||||
|
||||
try:
|
||||
dut.expect('File sending complete', timeout=10)
|
||||
except Exception:
|
||||
logging.info('Failed the test to download existing file from the file server')
|
||||
raise
|
||||
logging.info('Passed the test to downloaded existing file from the file server')
|
||||
|
||||
download_file_hash = hashlib.md5(download_data)
|
||||
download_file_digest = download_file_hash.digest()
|
||||
|
||||
if download_file_digest != upload_file_digest:
|
||||
raise RuntimeError('The md5 hash of the downloaded file does not match with that of the uploaded file')
|
||||
|
||||
# Upload existing file on the file server
|
||||
logging.info("\nTesting the upload of \"already existing\" file on the file server")
|
||||
client.postreq(conn, '/upload/' + str(upload_file_name), data=None)
|
||||
try:
|
||||
dut.expect('File already exists : /data/' + str(upload_file_name), timeout=10)
|
||||
except Exception:
|
||||
logging.info('Failed the test for uploading existing file on the file server')
|
||||
raise
|
||||
logging.info('Passed the test for uploading existing file on the file server')
|
||||
# Previous URI was an invalid URI so the server should have closed the connection.
|
||||
# Trying to send request to the server
|
||||
try:
|
||||
client.getreq(conn, '/')
|
||||
except http.client.RemoteDisconnected:
|
||||
# It is correct behavior that the connection was closed by the server
|
||||
pass
|
||||
except Exception:
|
||||
logging.info('Connection was not closed successfully by the server after last invalid URI')
|
||||
raise
|
||||
|
||||
conn = client.start_session(got_ip, got_port)
|
||||
# Delete the existing file from the file server
|
||||
logging.info("\nTesting the deletion of \"existing\" file on the file server")
|
||||
client.postreq(conn, '/delete/' + str(upload_file_name), data=None)
|
||||
try:
|
||||
dut.expect('Deleting file : /' + str(upload_file_name), timeout=10)
|
||||
except Exception:
|
||||
logging.info('Failed the test for deletion of existing file on the file server')
|
||||
raise
|
||||
logging.info('Passed the test for deletion of existing file on the file server')
|
||||
|
||||
conn = client.start_session(got_ip, got_port)
|
||||
# Try to delete non existing file from the file server
|
||||
logging.info("\nTesting the deletion of \"non existing\" file on the file server")
|
||||
client.postreq(conn, '/delete/' + str(upload_file_name), data=None)
|
||||
try:
|
||||
dut.expect('File does not exist : /' + str(upload_file_name), timeout=10)
|
||||
except Exception:
|
||||
logging.info('Failed the test for deleting non existing file on the file server')
|
||||
raise
|
||||
logging.info('Passed the test for deleting non existing file on the file server')
|
||||
|
||||
conn = client.start_session(got_ip, got_port)
|
||||
# Try to download non existing file from the file server
|
||||
logging.info("\nTesting for Download of \"non existing\" file from the file server")
|
||||
|
||||
download_data = client.getreq(conn, '/' + str(upload_file_name))
|
||||
|
||||
try:
|
||||
dut.expect('Failed to stat file : /data/' + str(upload_file_name), timeout=10)
|
||||
except Exception:
|
||||
logging.info('Failed the test to download non existing file from the file server')
|
||||
raise
|
||||
logging.info('Passed the test to downloaded non existing file from the file server')
|
@ -1,126 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import division, print_function, unicode_literals
|
||||
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
from builtins import range, str
|
||||
|
||||
import ttfw_idf
|
||||
from idf_http_server_test import adder as client
|
||||
from tiny_test_fw import Utility
|
||||
|
||||
# When running on local machine execute the following before running this script
|
||||
# > make app bootloader
|
||||
# > make print_flash_cmd | tail -n 1 > build/download.config
|
||||
|
||||
|
||||
@ttfw_idf.idf_example_test(env_tag='Example_WIFI_Protocols')
|
||||
def test_examples_protocol_http_server_persistence(env, extra_data):
|
||||
# Acquire DUT
|
||||
dut1 = env.get_dut('http_server', 'examples/protocols/http_server/persistent_sockets',
|
||||
dut_class=ttfw_idf.ESP32DUT)
|
||||
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut1.app.binary_path, 'persistent_sockets.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
ttfw_idf.log_performance('http_server_bin_size', '{}KB'.format(bin_size // 1024))
|
||||
|
||||
# Upload binary and start testing
|
||||
Utility.console_log('Starting http_server persistance test app')
|
||||
dut1.start_app()
|
||||
|
||||
# Parse IP address of STA
|
||||
Utility.console_log('Waiting to connect with AP')
|
||||
got_ip = dut1.expect(re.compile(r'(?:[\s\S]*)IPv4 address: (\d+.\d+.\d+.\d+)'), timeout=30)[0]
|
||||
got_port = dut1.expect(re.compile(r"(?:[\s\S]*)Starting server on port: '(\d+)'"), timeout=30)[0]
|
||||
|
||||
Utility.console_log('Got IP : ' + got_ip)
|
||||
Utility.console_log('Got Port : ' + got_port)
|
||||
|
||||
# Expected Logs
|
||||
dut1.expect('Registering URI handlers', timeout=30)
|
||||
|
||||
# Run test script
|
||||
conn = client.start_session(got_ip, got_port)
|
||||
visitor = 0
|
||||
adder = 0
|
||||
|
||||
# Test PUT request and initialize session context
|
||||
num = random.randint(0,100)
|
||||
client.putreq(conn, '/adder', str(num))
|
||||
visitor += 1
|
||||
dut1.expect('/adder visitor count = ' + str(visitor), timeout=30)
|
||||
dut1.expect('/adder PUT handler read ' + str(num), timeout=30)
|
||||
dut1.expect('PUT allocating new session', timeout=30)
|
||||
|
||||
# Retest PUT request and change session context value
|
||||
num = random.randint(0,100)
|
||||
Utility.console_log('Adding: ' + str(num))
|
||||
client.putreq(conn, '/adder', str(num))
|
||||
visitor += 1
|
||||
adder += num
|
||||
dut1.expect('/adder visitor count = ' + str(visitor), timeout=30)
|
||||
dut1.expect('/adder PUT handler read ' + str(num), timeout=30)
|
||||
try:
|
||||
# Re allocation shouldn't happen
|
||||
dut1.expect('PUT allocating new session', timeout=30)
|
||||
# Not expected
|
||||
raise RuntimeError
|
||||
except Exception:
|
||||
# As expected
|
||||
pass
|
||||
|
||||
# Test POST request and session persistence
|
||||
random_nums = [random.randint(0,100) for _ in range(100)]
|
||||
for num in random_nums:
|
||||
Utility.console_log('Adding: ' + str(num))
|
||||
client.postreq(conn, '/adder', str(num))
|
||||
visitor += 1
|
||||
adder += num
|
||||
dut1.expect('/adder visitor count = ' + str(visitor), timeout=30)
|
||||
dut1.expect('/adder handler read ' + str(num), timeout=30)
|
||||
|
||||
# Test GET request and session persistence
|
||||
Utility.console_log('Matching final sum: ' + str(adder))
|
||||
if client.getreq(conn, '/adder').decode() != str(adder):
|
||||
raise RuntimeError
|
||||
visitor += 1
|
||||
dut1.expect('/adder visitor count = ' + str(visitor), timeout=30)
|
||||
dut1.expect('/adder GET handler send ' + str(adder), timeout=30)
|
||||
|
||||
Utility.console_log('Ending session')
|
||||
# Close connection and check for invocation of context "Free" function
|
||||
client.end_session(conn)
|
||||
dut1.expect('/adder Free Context function called', timeout=30)
|
||||
|
||||
Utility.console_log('Validating user context data')
|
||||
# Start another session to check user context data
|
||||
client.start_session(got_ip, got_port)
|
||||
num = random.randint(0,100)
|
||||
client.putreq(conn, '/adder', str(num))
|
||||
visitor += 1
|
||||
dut1.expect('/adder visitor count = ' + str(visitor), timeout=30)
|
||||
dut1.expect('/adder PUT handler read ' + str(num), timeout=30)
|
||||
dut1.expect('PUT allocating new session', timeout=30)
|
||||
client.end_session(conn)
|
||||
dut1.expect('/adder Free Context function called', timeout=30)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_examples_protocol_http_server_persistence()
|
@ -0,0 +1,118 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
from __future__ import division, print_function, unicode_literals
|
||||
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
from builtins import range, str
|
||||
|
||||
import pytest
|
||||
|
||||
try:
|
||||
from idf_http_server_test import adder as client
|
||||
except ModuleNotFoundError:
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', 'tools', 'ci', 'python_packages'))
|
||||
from idf_http_server_test import adder as client
|
||||
|
||||
from pytest_embedded import Dut
|
||||
|
||||
# When running on local machine execute the following before running this script
|
||||
# > make app bootloader
|
||||
# > make print_flash_cmd | tail -n 1 > build/download.config
|
||||
|
||||
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.esp32c3
|
||||
@pytest.mark.esp32s2
|
||||
@pytest.mark.esp32s3
|
||||
@pytest.mark.wifi
|
||||
def test_examples_protocol_http_server_persistence(dut: Dut) -> None:
|
||||
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'persistent_sockets.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
|
||||
# Upload binary and start testing
|
||||
logging.info('Starting http_server persistance test app')
|
||||
|
||||
# Parse IP address of STA
|
||||
logging.info('Waiting to connect with AP')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)', timeout=30)[1].decode()
|
||||
got_port = dut.expect(r"(?:[\s\S]*)Starting server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
|
||||
# Expected Logs
|
||||
dut.expect('Registering URI handlers', timeout=30)
|
||||
|
||||
# Run test script
|
||||
conn = client.start_session(got_ip, got_port)
|
||||
visitor = 0
|
||||
adder = 0
|
||||
|
||||
# Test PUT request and initialize session context
|
||||
num = random.randint(0,100)
|
||||
client.putreq(conn, '/adder', str(num))
|
||||
visitor += 1
|
||||
dut.expect('/adder visitor count = ' + str(visitor), timeout=30)
|
||||
dut.expect('/adder PUT handler read ' + str(num), timeout=30)
|
||||
dut.expect('PUT allocating new session', timeout=30)
|
||||
|
||||
# Retest PUT request and change session context value
|
||||
num = random.randint(0,100)
|
||||
logging.info('Adding: {}'.format(num))
|
||||
client.putreq(conn, '/adder', str(num))
|
||||
visitor += 1
|
||||
adder += num
|
||||
dut.expect('/adder visitor count = ' + str(visitor), timeout=30)
|
||||
dut.expect('/adder PUT handler read ' + str(num), timeout=30)
|
||||
try:
|
||||
# Re allocation shouldn't happen
|
||||
dut.expect('PUT allocating new session', timeout=30)
|
||||
# Not expected
|
||||
raise RuntimeError
|
||||
except Exception:
|
||||
# As expected
|
||||
pass
|
||||
|
||||
# Test POST request and session persistence
|
||||
random_nums = [random.randint(0,100) for _ in range(100)]
|
||||
for num in random_nums:
|
||||
logging.info('Adding: {}'.format(num))
|
||||
client.postreq(conn, '/adder', str(num))
|
||||
visitor += 1
|
||||
adder += num
|
||||
dut.expect('/adder visitor count = ' + str(visitor), timeout=30)
|
||||
dut.expect('/adder handler read ' + str(num), timeout=30)
|
||||
|
||||
# Test GET request and session persistence
|
||||
logging.info('Matching final sum: {}'.format(adder))
|
||||
if client.getreq(conn, '/adder').decode() != str(adder):
|
||||
raise RuntimeError
|
||||
visitor += 1
|
||||
dut.expect('/adder visitor count = ' + str(visitor), timeout=30)
|
||||
dut.expect('/adder GET handler send ' + str(adder), timeout=30)
|
||||
|
||||
logging.info('Ending session')
|
||||
# Close connection and check for invocation of context "Free" function
|
||||
client.end_session(conn)
|
||||
dut.expect('/adder Free Context function called', timeout=30)
|
||||
|
||||
logging.info('Validating user context data')
|
||||
# Start another session to check user context data
|
||||
client.start_session(got_ip, got_port)
|
||||
num = random.randint(0,100)
|
||||
client.putreq(conn, '/adder', str(num))
|
||||
visitor += 1
|
||||
dut.expect('/adder visitor count = ' + str(visitor), timeout=30)
|
||||
dut.expect('/adder PUT handler read ' + str(num), timeout=30)
|
||||
dut.expect('PUT allocating new session', timeout=30)
|
||||
client.end_session(conn)
|
||||
dut.expect('/adder Free Context function called', timeout=30)
|
@ -1,172 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import division, print_function, unicode_literals
|
||||
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import socket
|
||||
import string
|
||||
import threading
|
||||
import time
|
||||
from builtins import range
|
||||
|
||||
import ttfw_idf
|
||||
from idf_http_server_test import client
|
||||
from tiny_test_fw import Utility
|
||||
|
||||
|
||||
class http_client_thread(threading.Thread):
|
||||
def __init__(self, ip, port, delay):
|
||||
threading.Thread.__init__(self)
|
||||
self.ip = ip
|
||||
self.port = port
|
||||
self.delay = delay
|
||||
self.exc = None
|
||||
|
||||
# Thread function used to open a socket and wait for specific amount of time before returning
|
||||
def open_connection(self, ip, port, delay):
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.settimeout(delay)
|
||||
s.connect((ip, port))
|
||||
time.sleep(delay)
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
self.open_connection(self.ip, self.port, self.delay)
|
||||
except socket.timeout as e:
|
||||
self.exc = e
|
||||
|
||||
def join(self, timeout=None):
|
||||
threading.Thread.join(self)
|
||||
if self.exc:
|
||||
raise self.exc
|
||||
|
||||
|
||||
# When running on local machine execute the following before running this script
|
||||
# > make app bootloader
|
||||
# > make print_flash_cmd | tail -n 1 > build/download.config
|
||||
|
||||
|
||||
@ttfw_idf.idf_example_test(env_tag='Example_WIFI_Protocols')
|
||||
def test_examples_protocol_http_server_simple(env, extra_data):
|
||||
# Acquire DUT
|
||||
dut1 = env.get_dut('http_server', 'examples/protocols/http_server/simple', dut_class=ttfw_idf.ESP32DUT)
|
||||
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut1.app.binary_path, 'simple.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
ttfw_idf.log_performance('http_server_bin_size', '{}KB'.format(bin_size // 1024))
|
||||
|
||||
# Upload binary and start testing
|
||||
Utility.console_log('Starting http_server simple test app')
|
||||
dut1.start_app()
|
||||
|
||||
# Parse IP address of STA
|
||||
Utility.console_log('Waiting to connect with AP')
|
||||
got_ip = dut1.expect(re.compile(r'(?:[\s\S]*)IPv4 address: (\d+.\d+.\d+.\d+)'), timeout=30)[0]
|
||||
got_port = dut1.expect(re.compile(r"(?:[\s\S]*)Starting server on port: '(\d+)'"), timeout=30)[0]
|
||||
|
||||
Utility.console_log('Got IP : ' + got_ip)
|
||||
Utility.console_log('Got Port : ' + got_port)
|
||||
|
||||
# Expected Logs
|
||||
dut1.expect('Registering URI handlers', timeout=30)
|
||||
|
||||
# Run test script
|
||||
# If failed raise appropriate exception
|
||||
Utility.console_log('Test /hello GET handler')
|
||||
if not client.test_get_handler(got_ip, got_port):
|
||||
raise RuntimeError
|
||||
|
||||
# Acquire host IP. Need a way to check it
|
||||
dut1.expect(re.compile(r'(?:[\s\S]*)Found header => Host: (\d+.\d+.\d+.\d+)'), timeout=30)[0]
|
||||
|
||||
# Match additional headers sent in the request
|
||||
dut1.expect('Found header => Test-Header-2: Test-Value-2', timeout=30)
|
||||
dut1.expect('Found header => Test-Header-1: Test-Value-1', timeout=30)
|
||||
dut1.expect('Found URL query parameter => query1=value1', timeout=30)
|
||||
dut1.expect('Found URL query parameter => query3=value3', timeout=30)
|
||||
dut1.expect('Found URL query parameter => query2=value2', timeout=30)
|
||||
dut1.expect('Request headers lost', timeout=30)
|
||||
|
||||
Utility.console_log('Test /ctrl PUT handler and realtime handler de/registration')
|
||||
if not client.test_put_handler(got_ip, got_port):
|
||||
raise RuntimeError
|
||||
dut1.expect('Unregistering /hello and /echo URIs', timeout=30)
|
||||
dut1.expect('Registering /hello and /echo URIs', timeout=30)
|
||||
|
||||
# Generate random data of 10KB
|
||||
random_data = ''.join(string.printable[random.randint(0,len(string.printable)) - 1] for _ in range(10 * 1024))
|
||||
Utility.console_log('Test /echo POST handler with random data')
|
||||
if not client.test_post_handler(got_ip, got_port, random_data):
|
||||
raise RuntimeError
|
||||
|
||||
query = 'http://foobar'
|
||||
Utility.console_log('Test /hello with custom query : ' + query)
|
||||
if not client.test_custom_uri_query(got_ip, got_port, query):
|
||||
raise RuntimeError
|
||||
dut1.expect('Found URL query => ' + query, timeout=30)
|
||||
|
||||
query = 'abcd+1234%20xyz'
|
||||
Utility.console_log('Test /hello with custom query : ' + query)
|
||||
if not client.test_custom_uri_query(got_ip, got_port, query):
|
||||
raise RuntimeError
|
||||
dut1.expect('Found URL query => ' + query, timeout=30)
|
||||
|
||||
|
||||
@ttfw_idf.idf_example_test(env_tag='Example_WIFI_Protocols')
|
||||
def test_examples_protocol_http_server_lru_purge_enable(env, extra_data):
|
||||
# Acquire DUT
|
||||
dut1 = env.get_dut('http_server', 'examples/protocols/http_server/simple', dut_class=ttfw_idf.ESP32DUT)
|
||||
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut1.app.binary_path, 'simple.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
ttfw_idf.log_performance('http_server_bin_size', '{}KB'.format(bin_size // 1024))
|
||||
|
||||
# Upload binary and start testing
|
||||
Utility.console_log('Starting http_server simple test app')
|
||||
dut1.start_app()
|
||||
|
||||
# Parse IP address of STA
|
||||
Utility.console_log('Waiting to connect with AP')
|
||||
got_ip = dut1.expect(re.compile(r'(?:[\s\S]*)IPv4 address: (\d+.\d+.\d+.\d+)'), timeout=30)[0]
|
||||
got_port = dut1.expect(re.compile(r"(?:[\s\S]*)Starting server on port: '(\d+)'"), timeout=30)[0]
|
||||
|
||||
Utility.console_log('Got IP : ' + got_ip)
|
||||
Utility.console_log('Got Port : ' + got_port)
|
||||
|
||||
# Expected Logs
|
||||
dut1.expect('Registering URI handlers', timeout=30)
|
||||
threads = []
|
||||
# Open 20 sockets, one from each thread
|
||||
for _ in range(20):
|
||||
try:
|
||||
thread = http_client_thread(got_ip, (int(got_port)), 20)
|
||||
thread.start()
|
||||
threads.append(thread)
|
||||
except OSError as err:
|
||||
Utility.console_log('Error: unable to start thread, ' + err)
|
||||
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_examples_protocol_http_server_simple()
|
||||
test_examples_protocol_http_server_lru_purge_enable()
|
@ -0,0 +1,165 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
from __future__ import division, print_function, unicode_literals
|
||||
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import socket
|
||||
import string
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from builtins import range
|
||||
|
||||
import pytest
|
||||
|
||||
try:
|
||||
from idf_http_server_test import client
|
||||
except ModuleNotFoundError:
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', 'tools', 'ci', 'python_packages'))
|
||||
from idf_http_server_test import client
|
||||
|
||||
from pytest_embedded import Dut
|
||||
|
||||
|
||||
class http_client_thread(threading.Thread):
|
||||
def __init__(self, ip: str, port: int, delay: int) -> None:
|
||||
threading.Thread.__init__(self)
|
||||
self.ip = ip
|
||||
self.port = port
|
||||
self.delay = delay
|
||||
self.exc = 0
|
||||
|
||||
# Thread function used to open a socket and wait for specific amount of time before returning
|
||||
def open_connection(self, ip: str, port: int, delay: int) -> None:
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.settimeout(delay)
|
||||
s.connect((ip, port))
|
||||
time.sleep(delay)
|
||||
|
||||
def run(self) -> None:
|
||||
try:
|
||||
self.open_connection(self.ip, self.port, self.delay)
|
||||
except socket.timeout:
|
||||
self.exc = 1
|
||||
|
||||
def join(self, timeout=None): # type: ignore
|
||||
threading.Thread.join(self)
|
||||
if self.exc:
|
||||
raise socket.timeout
|
||||
|
||||
|
||||
# When running on local machine execute the following before running this script
|
||||
# > make app bootloader
|
||||
# > make print_flash_cmd | tail -n 1 > build/download.config
|
||||
|
||||
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.esp32c3
|
||||
@pytest.mark.esp32s2
|
||||
@pytest.mark.esp32s3
|
||||
@pytest.mark.wifi
|
||||
def test_examples_protocol_http_server_simple(dut: Dut) -> None:
|
||||
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'simple.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
|
||||
# Upload binary and start testing
|
||||
logging.info('Starting http_server simple test app')
|
||||
|
||||
# Parse IP address of STA
|
||||
logging.info('Waiting to connect with AP')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)', timeout=30)[1].decode()
|
||||
got_port = dut.expect(r"(?:[\s\S]*)Starting server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
|
||||
# Expected Logs
|
||||
dut.expect('Registering URI handlers', timeout=30)
|
||||
|
||||
# Run test script
|
||||
# If failed raise appropriate exception
|
||||
logging.info('Test /hello GET handler')
|
||||
if not client.test_get_handler(got_ip, str(got_port)):
|
||||
raise RuntimeError
|
||||
|
||||
# Acquire host IP. Need a way to check it
|
||||
dut.expect(r'(?:[\s\S]*)Found header => Host: (\d+.\d+.\d+.\d+)', timeout=30)
|
||||
|
||||
# Match additional headers sent in the request
|
||||
dut.expect('Found header => Test-Header-2: Test-Value-2', timeout=30)
|
||||
dut.expect('Found header => Test-Header-1: Test-Value-1', timeout=30)
|
||||
dut.expect('Found URL query parameter => query1=value1', timeout=30)
|
||||
dut.expect('Found URL query parameter => query3=value3', timeout=30)
|
||||
dut.expect('Found URL query parameter => query2=value2', timeout=30)
|
||||
dut.expect('Request headers lost', timeout=30)
|
||||
|
||||
logging.info('Test /ctrl PUT handler and realtime handler de/registration')
|
||||
if not client.test_put_handler(got_ip, got_port):
|
||||
raise RuntimeError
|
||||
dut.expect('Unregistering /hello and /echo URIs', timeout=30)
|
||||
dut.expect('Registering /hello and /echo URIs', timeout=30)
|
||||
|
||||
# Generate random data of 10KB
|
||||
random_data = ''.join(string.printable[random.randint(0,len(string.printable)) - 1] for _ in range(10 * 1024))
|
||||
logging.info('Test /echo POST handler with random data')
|
||||
if not client.test_post_handler(got_ip, got_port, random_data):
|
||||
raise RuntimeError
|
||||
|
||||
query = 'http://foobar'
|
||||
logging.info('Test /hello with custom query : {}'.format(query))
|
||||
if not client.test_custom_uri_query(got_ip, got_port, query):
|
||||
raise RuntimeError
|
||||
dut.expect('Found URL query => ' + query, timeout=30)
|
||||
|
||||
query = 'abcd+1234%20xyz'
|
||||
logging.info('Test /hello with custom query : {}'.format(query))
|
||||
if not client.test_custom_uri_query(got_ip, got_port, query):
|
||||
raise RuntimeError
|
||||
dut.expect_exact('Found URL query => ' + query, timeout=30)
|
||||
|
||||
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.esp32c3
|
||||
@pytest.mark.esp32s2
|
||||
@pytest.mark.esp32s3
|
||||
@pytest.mark.wifi
|
||||
def test_examples_protocol_http_server_lru_purge_enable(dut: Dut) -> None:
|
||||
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'simple.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
|
||||
# Upload binary and start testing
|
||||
logging.info('Starting http_server simple test app')
|
||||
|
||||
# Parse IP address of STA
|
||||
logging.info('Waiting to connect with AP')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)', timeout=30)[1].decode()
|
||||
got_port = dut.expect(r"(?:[\s\S]*)Starting server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
|
||||
# Expected Logs
|
||||
dut.expect('Registering URI handlers', timeout=30)
|
||||
threads = []
|
||||
# Open 20 sockets, one from each thread
|
||||
for _ in range(20):
|
||||
try:
|
||||
thread = http_client_thread(got_ip, (int(got_port)), 20)
|
||||
thread.start()
|
||||
threads.append(thread)
|
||||
except OSError as err:
|
||||
logging.info('Error: unable to start thread, {}'.format(err))
|
||||
|
||||
for t in threads:
|
||||
t.join()
|
@ -1,15 +1,15 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
from __future__ import division, print_function, unicode_literals
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
|
||||
import ttfw_idf
|
||||
from tiny_test_fw import Utility
|
||||
import pytest
|
||||
from pytest_embedded import Dut
|
||||
|
||||
try:
|
||||
import websocket
|
||||
@ -24,22 +24,22 @@ OPCODE_PONG = 0xa
|
||||
|
||||
|
||||
class WsClient:
|
||||
def __init__(self, ip, port):
|
||||
def __init__(self, ip: str, port: int) -> None:
|
||||
self.port = port
|
||||
self.ip = ip
|
||||
self.ws = websocket.WebSocket()
|
||||
|
||||
def __enter__(self):
|
||||
def __enter__(self): # type: ignore
|
||||
self.ws.connect('ws://{}:{}/ws'.format(self.ip, self.port))
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
def __exit__(self, exc_type, exc_value, traceback): # type: ignore
|
||||
self.ws.close()
|
||||
|
||||
def read(self):
|
||||
def read(self): # type: ignore
|
||||
return self.ws.recv_data(control_frame=True)
|
||||
|
||||
def write(self, data='', opcode=OPCODE_TEXT):
|
||||
def write(self, data='', opcode=OPCODE_TEXT): # type: ignore
|
||||
if opcode == OPCODE_BIN:
|
||||
return self.ws.send_binary(data.encode())
|
||||
if opcode == OPCODE_PING:
|
||||
@ -47,27 +47,26 @@ class WsClient:
|
||||
return self.ws.send(data)
|
||||
|
||||
|
||||
@ttfw_idf.idf_example_test(env_tag='Example_WIFI_Protocols')
|
||||
def test_examples_protocol_http_ws_echo_server(env, extra_data):
|
||||
# Acquire DUT
|
||||
dut1 = env.get_dut('http_server', 'examples/protocols/http_server/ws_echo_server', dut_class=ttfw_idf.ESP32DUT)
|
||||
|
||||
@pytest.mark.esp32
|
||||
@pytest.mark.esp32c3
|
||||
@pytest.mark.esp32s2
|
||||
@pytest.mark.esp32s3
|
||||
@pytest.mark.wifi
|
||||
def test_examples_protocol_http_ws_echo_server(dut: Dut) -> None:
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut1.app.binary_path, 'ws_echo_server.bin')
|
||||
binary_file = os.path.join(dut.app.binary_path, 'ws_echo_server.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
ttfw_idf.log_performance('http_ws_server_bin_size', '{}KB'.format(bin_size // 1024))
|
||||
logging.info('http_ws_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
|
||||
# Upload binary and start testing
|
||||
Utility.console_log('Starting ws-echo-server test app based on http_server')
|
||||
dut1.start_app()
|
||||
logging.info('Starting ws-echo-server test app based on http_server')
|
||||
|
||||
# Parse IP address of STA
|
||||
Utility.console_log('Waiting to connect with AP')
|
||||
got_ip = dut1.expect(re.compile(r'IPv4 address: (\d+.\d+.\d+.\d+)'), timeout=60)[0]
|
||||
got_port = dut1.expect(re.compile(r"Starting server on port: '(\d+)'"), timeout=60)[0]
|
||||
logging.info('Waiting to connect with AP')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)', timeout=30)[1].decode()
|
||||
got_port = dut.expect(r"Starting server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
|
||||
Utility.console_log('Got IP : ' + got_ip)
|
||||
Utility.console_log('Got Port : ' + got_port)
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
|
||||
# Start ws server test
|
||||
with WsClient(got_ip, int(got_port)) as ws:
|
||||
@ -75,24 +74,21 @@ def test_examples_protocol_http_ws_echo_server(env, extra_data):
|
||||
for expected_opcode in [OPCODE_TEXT, OPCODE_BIN, OPCODE_PING]:
|
||||
ws.write(data=DATA, opcode=expected_opcode)
|
||||
opcode, data = ws.read()
|
||||
Utility.console_log('Testing opcode {}: Received opcode:{}, data:{}'.format(expected_opcode, opcode, data))
|
||||
logging.info('Testing opcode {}: Received opcode:{}, data:{}'.format(expected_opcode, opcode, data))
|
||||
data = data.decode()
|
||||
if expected_opcode == OPCODE_PING:
|
||||
dut1.expect('Got a WS PING frame, Replying PONG')
|
||||
dut.expect('Got a WS PING frame, Replying PONG')
|
||||
if opcode != OPCODE_PONG or data != DATA:
|
||||
raise RuntimeError('Failed to receive correct opcode:{} or data:{}'.format(opcode, data))
|
||||
continue
|
||||
dut_data = dut1.expect(re.compile(r'Got packet with message: ([A-Za-z0-9_]*)'))[0]
|
||||
dut_opcode = int(dut1.expect(re.compile(r'Packet type: ([0-9]*)'))[0])
|
||||
if opcode != expected_opcode or data != DATA or opcode != dut_opcode or data != dut_data:
|
||||
dut_data = dut.expect(r'Got packet with message: ([A-Za-z0-9_]*)')[1]
|
||||
dut_opcode = dut.expect(r'Packet type: ([0-9]*)')[1].decode()
|
||||
|
||||
if opcode != expected_opcode or data != DATA or opcode != int(dut_opcode) or (data not in str(dut_data)):
|
||||
raise RuntimeError('Failed to receive correct opcode:{} or data:{}'.format(opcode, data))
|
||||
ws.write(data='Trigger async', opcode=OPCODE_TEXT)
|
||||
opcode, data = ws.read()
|
||||
Utility.console_log('Testing async send: Received opcode:{}, data:{}'.format(opcode, data))
|
||||
logging.info('Testing async send: Received opcode:{}, data:{}'.format(opcode, data))
|
||||
data = data.decode()
|
||||
if opcode != OPCODE_TEXT or data != 'Async data':
|
||||
raise RuntimeError('Failed to receive correct opcode:{} or data:{}'.format(opcode, data))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_examples_protocol_http_ws_echo_server()
|
Loading…
x
Reference in New Issue
Block a user