tools: fix multi-byte character appearance in idf.py monitor

This commit is contained in:
simon.chupin 2022-08-15 17:34:11 +02:00 committed by Simon Chupin
parent 39c47cb6d8
commit 34230426a6

View File

@ -233,13 +233,11 @@ class RunTool:
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', text) return ansi_escape.sub('', text)
def prepare_for_print(out: bytes) -> str: def prepare_for_print(out: str) -> str:
# errors='ignore' is here because some chips produce some garbage bytes
result = out.decode(errors='ignore')
if not output_stream.isatty(): if not output_stream.isatty():
# delete escape sequence if we printing in environments where ANSI coloring is disabled # delete escape sequence if we printing in environments where ANSI coloring is disabled
return delete_ansi_escape(result) return delete_ansi_escape(out)
return result return out
def print_progression(output: str) -> None: def print_progression(output: str) -> None:
# Print a new line on top of the previous line # Print a new line on top of the previous line
@ -247,20 +245,40 @@ class RunTool:
print('\r', end='') print('\r', end='')
print(fit_text_in_terminal(output.strip('\n\r')), end='', file=output_stream) print(fit_text_in_terminal(output.strip('\n\r')), end='', file=output_stream)
async def read_stream() -> Optional[str]:
output_b = await input_stream.readline()
if not output_b:
return None
return output_b.decode(errors='ignore')
async def read_interactive_stream() -> Optional[str]:
buffer = b''
while True:
output_b = await input_stream.read(1)
if not output_b:
return None
try: try:
with open(output_filename, 'w') as output_file: return (buffer + output_b).decode()
except UnicodeDecodeError:
buffer += output_b
if len(buffer) > 4:
# Multi-byte character contain up to 4 bytes and if buffer have more then 4 bytes
# and still can not decode it we can just ignore some bytes
return buffer.decode(errors='ignore')
try:
with open(output_filename, 'w', encoding='utf8') as output_file:
while True: while True:
if self.interactive: if self.interactive:
out = await input_stream.read(1) output = await read_interactive_stream()
else: else:
out = await input_stream.readline() output = await read_stream()
if not out: if not output:
break break
output = prepare_for_print(out) output = prepare_for_print(output)
output_file.write(output) output_file.write(output)
# print output in progression way but only the progression related (that started with '[') and if verbose flag is not set
if self.force_progression and output[0] == '[' and '-v' not in self.args and output_stream.isatty(): if self.force_progression and output[0] == '[' and '-v' not in self.args and output_stream.isatty():
# print output in progression way but only the progression related (that started with '[') and if verbose flag is not set
print_progression(output) print_progression(output)
else: else:
output_stream.write(output) output_stream.write(output)