mirror of
https://github.com/espressif/esp-idf.git
synced 2024-10-05 20:47:46 -04:00
a908174c06
the binary size check in example test was removed long time ago. Now we have updated ttfw_idf to raise exception when performance standard is not found. These fake performance check will be regarded as error. Remove them now.
146 lines
4.4 KiB
Python
146 lines
4.4 KiB
Python
from __future__ import print_function
|
|
from __future__ import unicode_literals
|
|
import re
|
|
import os
|
|
import socket
|
|
import random
|
|
import string
|
|
from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket
|
|
from tiny_test_fw import Utility
|
|
from threading import Thread, Event
|
|
import ttfw_idf
|
|
|
|
|
|
def get_my_ip():
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
try:
|
|
# doesn't even have to be reachable
|
|
s.connect(('10.255.255.255', 1))
|
|
IP = s.getsockname()[0]
|
|
except Exception:
|
|
IP = '127.0.0.1'
|
|
finally:
|
|
s.close()
|
|
return IP
|
|
|
|
|
|
class TestEcho(WebSocket):
|
|
|
|
def handleMessage(self):
|
|
self.sendMessage(self.data)
|
|
print('Server sent: {}'.format(self.data))
|
|
|
|
def handleConnected(self):
|
|
print('Connection from: {}'.format(self.address))
|
|
|
|
def handleClose(self):
|
|
print('{} closed the connection'.format(self.address))
|
|
|
|
|
|
# Simple Websocket server for testing purposes
|
|
class Websocket(object):
|
|
|
|
def send_data(self, data):
|
|
for nr, conn in self.server.connections.items():
|
|
conn.sendMessage(data)
|
|
|
|
def run(self):
|
|
self.server = SimpleWebSocketServer('', self.port, TestEcho)
|
|
while not self.exit_event.is_set():
|
|
self.server.serveonce()
|
|
|
|
def __init__(self, port):
|
|
self.port = port
|
|
self.exit_event = Event()
|
|
self.thread = Thread(target=self.run)
|
|
self.thread.start()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
self.exit_event.set()
|
|
self.thread.join(10)
|
|
if self.thread.is_alive():
|
|
Utility.console_log('Thread cannot be joined', 'orange')
|
|
|
|
|
|
def test_echo(dut):
|
|
dut.expect("WEBSOCKET_EVENT_CONNECTED")
|
|
for i in range(0, 10):
|
|
dut.expect(re.compile(r"Received=hello (\d)"), timeout=30)
|
|
print("All echos received")
|
|
|
|
|
|
def test_close(dut):
|
|
code = dut.expect(re.compile(r"WEBSOCKET: Received closed message with code=(\d*)"), timeout=60)[0]
|
|
print("Received close frame with code {}".format(code))
|
|
|
|
|
|
def test_recv_long_msg(dut, websocket, msg_len, repeats):
|
|
send_msg = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(msg_len))
|
|
|
|
for _ in range(repeats):
|
|
websocket.send_data(send_msg)
|
|
|
|
recv_msg = ''
|
|
while len(recv_msg) < msg_len:
|
|
# Filter out color encoding
|
|
match = dut.expect(re.compile(r"Received=([a-zA-Z0-9]*).*\n"), timeout=30)[0]
|
|
recv_msg += match
|
|
|
|
if recv_msg == send_msg:
|
|
print("Sent message and received message are equal")
|
|
else:
|
|
raise ValueError("DUT received string do not match sent string, \nexpected: {}\nwith length {}\
|
|
\nreceived: {}\nwith length {}".format(send_msg, len(send_msg), recv_msg, len(recv_msg)))
|
|
|
|
|
|
@ttfw_idf.idf_example_test(env_tag="Example_WIFI")
|
|
def test_examples_protocol_websocket(env, extra_data):
|
|
"""
|
|
steps:
|
|
1. join AP
|
|
2. connect to uri specified in the config
|
|
3. send and receive data
|
|
"""
|
|
dut1 = env.get_dut("websocket", "examples/protocols/websocket", dut_class=ttfw_idf.ESP32DUT)
|
|
# check and log bin size
|
|
binary_file = os.path.join(dut1.app.binary_path, "websocket-example.bin")
|
|
bin_size = os.path.getsize(binary_file)
|
|
ttfw_idf.log_performance("websocket_bin_size", "{}KB".format(bin_size // 1024))
|
|
|
|
try:
|
|
if "CONFIG_WEBSOCKET_URI_FROM_STDIN" in dut1.app.get_sdkconfig():
|
|
uri_from_stdin = True
|
|
else:
|
|
uri = dut1.app.get_sdkconfig()["CONFIG_WEBSOCKET_URI"].strip('"')
|
|
uri_from_stdin = False
|
|
|
|
except Exception:
|
|
print('ENV_TEST_FAILURE: Cannot find uri settings in sdkconfig')
|
|
raise
|
|
|
|
# start test
|
|
dut1.start_app()
|
|
|
|
if uri_from_stdin:
|
|
server_port = 4455
|
|
with Websocket(server_port) as ws:
|
|
uri = "ws://{}:{}".format(get_my_ip(), server_port)
|
|
print("DUT connecting to {}".format(uri))
|
|
dut1.expect("Please enter uri of websocket endpoint", timeout=30)
|
|
dut1.write(uri)
|
|
test_echo(dut1)
|
|
# Message length should exceed DUT's buffer size to test fragmentation, default is 1024 byte
|
|
test_recv_long_msg(dut1, ws, 2000, 3)
|
|
test_close(dut1)
|
|
|
|
else:
|
|
print("DUT connecting to {}".format(uri))
|
|
test_echo(dut1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
test_examples_protocol_websocket()
|