From 6cb20800760fb905a0521561e854ac344562a994 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20M=C3=BAdry?= Date: Tue, 30 Jul 2024 23:22:22 +0200 Subject: [PATCH] fix(nvs): nvs_tool.py reduce false duplicate warnings --- .../nvs_flash/nvs_partition_tool/nvs_check.py | 153 ++++++++++++++++-- 1 file changed, 136 insertions(+), 17 deletions(-) diff --git a/components/nvs_flash/nvs_partition_tool/nvs_check.py b/components/nvs_flash/nvs_partition_tool/nvs_check.py index 6051ee80a3..608d796512 100644 --- a/components/nvs_flash/nvs_partition_tool/nvs_check.py +++ b/components/nvs_flash/nvs_partition_tool/nvs_check.py @@ -1,7 +1,8 @@ #!/usr/bin/env python3 -# SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD +# SPDX-FileCopyrightText: 2023-2024 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 -from typing import Dict, List +from typing import Dict +from typing import List from nvs_logger import NVS_Logger from nvs_parser import nvs_const @@ -9,6 +10,8 @@ from nvs_parser import NVS_Entry from nvs_parser import NVS_Page from nvs_parser import NVS_Partition +# from pprint import pprint + EMPTY_ENTRY = NVS_Entry(-1, bytearray(32), 'Erased') @@ -18,16 +21,18 @@ blobs: Dict = {} blob_chunks: List[NVS_Entry] = [] -def check_partition_size(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: +def check_partition_size(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> bool: if len(nvs_partition.pages) < 3: nvs_log.info( nvs_log.yellow( 'NVS Partition must contain 3 pages (sectors) at least to function properly!' ) ) + return False + return True -def check_empty_page_present(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: +def check_empty_page_present(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> bool: if not any(page.header['status'] == 'Empty' for page in nvs_partition.pages): nvs_log.info( nvs_log.red( @@ -36,12 +41,16 @@ at least one free page is required for proper function!''' ) ) nvs_log.info(nvs_log.red('NVS partition possibly truncated?\n')) + return False + return True -def check_empty_page_content(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> None: +def check_empty_page_content(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> bool: + result = True nvs_log.info(nvs_log.cyan(f'Page {nvs_page.header["status"]}')) if nvs_page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size: + result = False nvs_log.info( nvs_log.red( 'The page is reported as Empty but its entry state bitmap is not empty!' @@ -49,16 +58,20 @@ def check_empty_page_content(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> None: ) if any([not e.is_empty for e in nvs_page.entries]): + result = False nvs_log.info( nvs_log.red('The page is reported as Empty but there are data written!') ) + return result -def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> None: + +def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> bool: if nvs_page.header['crc']['original'] == nvs_page.header['crc']['computed']: nvs_log.info( nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'), '\tCRC32: OK' ) + return True else: nvs_log.info( nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'), @@ -67,6 +80,7 @@ def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> None: f'Generated CRC32:', nvs_log.green(f'{nvs_page.header["crc"]["computed"]:x}'), ) + return False def identify_entry_duplicates(entry: NVS_Entry, seen_written_entires: Dict[str, list[NVS_Entry]]) -> Dict[str, list[NVS_Entry]]: @@ -187,13 +201,111 @@ def check_page_entries(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> Dict[str, lis return seen_written_entires -def filter_entry_duplicates(seen_written_entires: Dict[str, list[NVS_Entry]]) -> List[List[NVS_Entry]]: - duplicate_entries_list = [seen_written_entires[key] for key in seen_written_entires if len(seen_written_entires[key]) > 1] - return duplicate_entries_list +def filter_namespaces_fake_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]: + new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {} + for key, duplicate_entries in duplicate_entries_dict.items(): + seen_entries: List[NVS_Entry] = [] + entry_same_namespace_collisions_list: set[NVS_Entry] = set() + + # Search through the "duplicates" and see if there are real duplicates + # E.g. the key can be the same if the namespace is different + for entry in duplicate_entries: + if entry.metadata['type'] in nvs_const.item_type.values(): + + entry_same_namespace_collisions = set() + for other_entry in seen_entries: + if entry.metadata['namespace'] == other_entry.metadata['namespace']: + entry_same_namespace_collisions.add(entry) + entry_same_namespace_collisions.add(other_entry) + + if len(entry_same_namespace_collisions) != 0: + entry_same_namespace_collisions_list.update(entry_same_namespace_collisions) + seen_entries.append(entry) + + # Catch real duplicates + new_duplicate_entries: List[NVS_Entry] = [] + if len(seen_entries) > 1: + for entry in seen_entries: + if entry in entry_same_namespace_collisions_list: + new_duplicate_entries.append(entry) + + if len(new_duplicate_entries) > 0: + new_duplicate_entries_dict[key] = new_duplicate_entries + + return new_duplicate_entries_dict -def print_entry_duplicates(page: NVS_Page, duplicate_entries_list: List[List[NVS_Entry]], nvs_log: NVS_Logger) -> None: - for duplicate_entries in duplicate_entries_list: +def filter_blob_related_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]: + new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {} + for key, duplicate_entries in duplicate_entries_dict.items(): + seen_blob_index: List[NVS_Entry] = [] + seen_blob_data: List[NVS_Entry] = [] + seen_another_type_data: List[NVS_Entry] = [] + blob_index_chunk_index_collisions_list: set[NVS_Entry] = set() + blob_data_chunk_index_collisions_list: set[NVS_Entry] = set() + + # Search through the "duplicates" and see if there are real duplicates + # E.g. the key can be the same for blob_index and blob_data + # (and even for more blob_data entries if they have a different chunk_index) + for entry in duplicate_entries: + if entry.metadata['type'] == 'blob_index': + + blob_index_chunk_index_collisions = set() + for other_entry in seen_blob_index: + if entry.metadata['namespace'] == other_entry.metadata['namespace']: + blob_index_chunk_index_collisions.add(entry) + blob_index_chunk_index_collisions.add(other_entry) + + if len(blob_index_chunk_index_collisions) != 0: + blob_index_chunk_index_collisions_list.update(blob_index_chunk_index_collisions) + seen_blob_index.append(entry) + + elif entry.metadata['type'] == 'blob_data': + + blob_data_chunk_index_collisions = set() + for other_entry in seen_blob_data: + if (entry.metadata['namespace'] == other_entry.metadata['namespace'] + and entry.metadata['chunk_index'] == other_entry.metadata['chunk_index']): + blob_data_chunk_index_collisions.add(entry) + blob_data_chunk_index_collisions.add(other_entry) + + if len(blob_data_chunk_index_collisions) != 0: + blob_data_chunk_index_collisions_list.update(blob_data_chunk_index_collisions) + seen_blob_data.append(entry) + + else: + seen_another_type_data.append(entry) + + # Catch real duplicates + new_duplicate_entries: List[NVS_Entry] = [] + if len(seen_blob_index) > 1: + for entry in seen_blob_index: + if entry in blob_index_chunk_index_collisions_list: + new_duplicate_entries.append(entry) + + if len(seen_blob_data) > 1: + for entry in seen_blob_data: + if entry in blob_data_chunk_index_collisions_list: + new_duplicate_entries.append(entry) + + for entry in seen_another_type_data: # If there are any duplicates of other types + new_duplicate_entries.append(entry) + + if len(new_duplicate_entries) > 0: + new_duplicate_entries_dict[key] = new_duplicate_entries + + return new_duplicate_entries_dict + + +def filter_entry_duplicates(seen_written_entires: Dict[str, list[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]: + duplicate_entries_list = {key: v for key, v in seen_written_entires.items() if len(v) > 1} + duplicate_entries_list_1 = filter_namespaces_fake_duplicates(duplicate_entries_list) + duplicate_entries_list_2 = filter_blob_related_duplicates(duplicate_entries_list_1) + return duplicate_entries_list_2 + + +def print_entry_duplicates(page: NVS_Page, duplicate_entries_list: Dict[str, List[NVS_Entry]], nvs_log: NVS_Logger) -> None: + for _, duplicate_entries in duplicate_entries_list.items(): # duplicate_entries: list[NVS_Entry] nvs_log.info( nvs_log.red( @@ -228,8 +340,6 @@ def assemble_blobs(nvs_log: NVS_Logger) -> None: chunk_index = chunk.metadata['chunk_index'] - parent.data['chunk_start'] blobs[blob_key][chunk_index + 1] = chunk - # return blobs - def check_blob_data(nvs_log: NVS_Logger) -> None: for blob_key in blobs: @@ -292,6 +402,8 @@ def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: # Free/empty page check check_empty_page_present(nvs_partition, nvs_log) + seen_written_entires_all: Dict[str, list[NVS_Entry]] = {} + for page in nvs_partition.pages: # page: NVS_Page @@ -306,10 +418,17 @@ def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: # Check all entries seen_written_entires = check_page_entries(page, nvs_log) - # Duplicate entry check (2) - same key, different index - print duplicates - duplicates = filter_entry_duplicates(seen_written_entires) - # Print duplicate entries - print_entry_duplicates(page, duplicates, nvs_log) + # Collect all seen written entries + for key in seen_written_entires: + if key in seen_written_entires_all: + seen_written_entires_all[key].extend(seen_written_entires[key]) + else: + seen_written_entires_all[key] = seen_written_entires[key] + + # Duplicate entry check (2) - same key, different index + duplicates = filter_entry_duplicates(seen_written_entires_all) + # Print duplicate entries + print_entry_duplicates(page, duplicates, nvs_log) nvs_log.info() # Empty line