from __future__ import print_function from __future__ import unicode_literals import re import os import socket import random import string from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket from tiny_test_fw import Utility from threading import Thread, Event import ttfw_idf def get_my_ip(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: # doesn't even have to be reachable s.connect(('10.255.255.255', 1)) IP = s.getsockname()[0] except Exception: IP = '127.0.0.1' finally: s.close() return IP class TestEcho(WebSocket): def handleMessage(self): self.sendMessage(self.data) print('Server sent: {}'.format(self.data)) def handleConnected(self): print('Connection from: {}'.format(self.address)) def handleClose(self): print('{} closed the connection'.format(self.address)) # Simple Websocket server for testing purposes class Websocket(object): def send_data(self, data): for nr, conn in self.server.connections.items(): conn.sendMessage(data) def run(self): self.server = SimpleWebSocketServer('', self.port, TestEcho) while not self.exit_event.is_set(): self.server.serveonce() def __init__(self, port): self.port = port self.exit_event = Event() self.thread = Thread(target=self.run) self.thread.start() def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.exit_event.set() self.thread.join(10) if self.thread.is_alive(): Utility.console_log('Thread cannot be joined', 'orange') def test_echo(dut): dut.expect("WEBSOCKET_EVENT_CONNECTED") for i in range(0, 10): dut.expect(re.compile(r"Received=hello (\d)"), timeout=30) print("All echos received") def test_close(dut): code = dut.expect(re.compile(r"WEBSOCKET: Received closed message with code=(\d*)"), timeout=60)[0] print("Received close frame with code {}".format(code)) def test_recv_long_msg(dut, websocket, msg_len, repeats): send_msg = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(msg_len)) for _ in range(repeats): websocket.send_data(send_msg) recv_msg = '' while len(recv_msg) < msg_len: # Filter out color encoding match = dut.expect(re.compile(r"Received=([a-zA-Z0-9]*).*\n"), timeout=30)[0] recv_msg += match if recv_msg == send_msg: print("Sent message and received message are equal") else: raise ValueError("DUT received string do not match sent string, \nexpected: {}\nwith length {}\ \nreceived: {}\nwith length {}".format(send_msg, len(send_msg), recv_msg, len(recv_msg))) @ttfw_idf.idf_example_test(env_tag="Example_WIFI") def test_examples_protocol_websocket(env, extra_data): """ steps: 1. join AP 2. connect to uri specified in the config 3. send and receive data """ dut1 = env.get_dut("websocket", "examples/protocols/websocket", dut_class=ttfw_idf.ESP32DUT) # check and log bin size binary_file = os.path.join(dut1.app.binary_path, "websocket-example.bin") bin_size = os.path.getsize(binary_file) ttfw_idf.log_performance("websocket_bin_size", "{}KB".format(bin_size // 1024)) try: if "CONFIG_WEBSOCKET_URI_FROM_STDIN" in dut1.app.get_sdkconfig(): uri_from_stdin = True else: uri = dut1.app.get_sdkconfig()["CONFIG_WEBSOCKET_URI"].strip('"') uri_from_stdin = False except Exception: print('ENV_TEST_FAILURE: Cannot find uri settings in sdkconfig') raise # start test dut1.start_app() if uri_from_stdin: server_port = 4455 with Websocket(server_port) as ws: uri = "ws://{}:{}".format(get_my_ip(), server_port) print("DUT connecting to {}".format(uri)) dut1.expect("Please enter uri of websocket endpoint", timeout=30) dut1.write(uri) test_echo(dut1) # Message length should exceed DUT's buffer size to test fragmentation, default is 1024 byte test_recv_long_msg(dut1, ws, 2000, 3) test_close(dut1) else: print("DUT connecting to {}".format(uri)) test_echo(dut1) if __name__ == '__main__': test_examples_protocol_websocket()