case_tester: use multi-processing instead of threads

Pros:
- Using thread would face GIL issue and turns out very slow when running
with poor hardware.

Cons:
- Does not support windows anymore. For testing purpose, it's fine.
This commit is contained in:
Fu Hanxi 2023-02-20 11:18:14 +08:00
parent d55060d184
commit b04e5d070a

View File

@ -1,7 +1,8 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import time
from threading import Semaphore, Thread
from multiprocessing import Manager, Process, Semaphore
from multiprocessing.managers import SyncManager
from typing import List, Union
from pexpect.exceptions import TIMEOUT
@ -146,7 +147,7 @@ class MultiStageCaseTester(BaseTester):
for retry in range(self.retry_times):
self.dut.write(str(case.index))
try:
self.dut.expect_exact(case.name, timeout=1)
self.dut.expect_exact('Running {}...'.format(case.name), timeout=1)
break
except TIMEOUT as e:
if retry >= self.retry_times - 1:
@ -164,14 +165,14 @@ class MultiDevResource:
dut (Dut): Object of the Device under test
sem (Semaphore): Semaphore of monitoring whether the case finished
recv_sig (List[str]): The list of received signals from other dut
thread (Thread): The thread of monitoring the signals
thread (Process): The thread of monitoring the signals
"""
def __init__(self, dut: Dut) -> None:
def __init__(self, dut: Dut, manager: SyncManager) -> None:
self.dut = dut
self.sem = Semaphore()
self.recv_sig: List[str] = []
self.thread: Thread = None # type: ignore
self.recv_sig = manager.list() # type: list[str]
self.process: Process = None # type: ignore
class MultiDevCaseTester(BaseTester):
@ -196,14 +197,15 @@ class MultiDevCaseTester(BaseTester):
Create the object for every dut and put them into the group
"""
super().__init__(dut, **kwargs)
self._manager = Manager()
self.group: List[MultiDevResource] = []
if isinstance(dut, List):
for item in dut:
if isinstance(item, Dut):
dev_res = MultiDevResource(item)
dev_res = MultiDevResource(item, self._manager)
self.group.append(dev_res)
else:
dev_res = MultiDevResource(dut)
dev_res = MultiDevResource(dut, self._manager)
self.group.append(dev_res)
def _wait_multi_dev_case_finish(self, timeout: int = 60) -> None:
@ -229,9 +231,9 @@ class MultiDevCaseTester(BaseTester):
_kwargs['timeout'] = timeout
# Create the thread of the sub-case
dev_res.thread = Thread(target=self._run, kwargs=_kwargs, daemon=True)
dev_res.thread.start()
# Thread starts, acquire the semaphore to block '_wait_multi_dev_case_finish'
dev_res.process = Process(target=self._run, kwargs=_kwargs, daemon=True)
dev_res.process.start()
# Process starts, acquire the semaphore to block '_wait_multi_dev_case_finish'
dev_res.sem.acquire()
def _run(self, **kwargs) -> None: # type: ignore
@ -262,7 +264,7 @@ class MultiDevCaseTester(BaseTester):
for retry in range(self.retry_times):
dut.write(str(case.index))
try:
dut.expect_exact(case.name, timeout=1)
dut.expect_exact('Running {}...'.format(case.name), timeout=1)
break
except TIMEOUT as e:
if retry >= self.retry_times - 1: