mirror of
https://github.com/espressif/esp-idf.git
synced 2024-10-05 20:47:46 -04:00
fix(nvs): nvs_tool.py reduce false duplicate warnings
This commit is contained in:
parent
4e7d2ec241
commit
6cb2080076
@ -1,7 +1,8 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
# SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD
|
# SPDX-FileCopyrightText: 2023-2024 Espressif Systems (Shanghai) CO LTD
|
||||||
# SPDX-License-Identifier: Apache-2.0
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
from typing import Dict, List
|
from typing import Dict
|
||||||
|
from typing import List
|
||||||
|
|
||||||
from nvs_logger import NVS_Logger
|
from nvs_logger import NVS_Logger
|
||||||
from nvs_parser import nvs_const
|
from nvs_parser import nvs_const
|
||||||
@ -9,6 +10,8 @@ from nvs_parser import NVS_Entry
|
|||||||
from nvs_parser import NVS_Page
|
from nvs_parser import NVS_Page
|
||||||
from nvs_parser import NVS_Partition
|
from nvs_parser import NVS_Partition
|
||||||
|
|
||||||
|
# from pprint import pprint
|
||||||
|
|
||||||
|
|
||||||
EMPTY_ENTRY = NVS_Entry(-1, bytearray(32), 'Erased')
|
EMPTY_ENTRY = NVS_Entry(-1, bytearray(32), 'Erased')
|
||||||
|
|
||||||
@ -18,16 +21,18 @@ blobs: Dict = {}
|
|||||||
blob_chunks: List[NVS_Entry] = []
|
blob_chunks: List[NVS_Entry] = []
|
||||||
|
|
||||||
|
|
||||||
def check_partition_size(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None:
|
def check_partition_size(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> bool:
|
||||||
if len(nvs_partition.pages) < 3:
|
if len(nvs_partition.pages) < 3:
|
||||||
nvs_log.info(
|
nvs_log.info(
|
||||||
nvs_log.yellow(
|
nvs_log.yellow(
|
||||||
'NVS Partition must contain 3 pages (sectors) at least to function properly!'
|
'NVS Partition must contain 3 pages (sectors) at least to function properly!'
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
def check_empty_page_present(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None:
|
def check_empty_page_present(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> bool:
|
||||||
if not any(page.header['status'] == 'Empty' for page in nvs_partition.pages):
|
if not any(page.header['status'] == 'Empty' for page in nvs_partition.pages):
|
||||||
nvs_log.info(
|
nvs_log.info(
|
||||||
nvs_log.red(
|
nvs_log.red(
|
||||||
@ -36,12 +41,16 @@ at least one free page is required for proper function!'''
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
nvs_log.info(nvs_log.red('NVS partition possibly truncated?\n'))
|
nvs_log.info(nvs_log.red('NVS partition possibly truncated?\n'))
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
def check_empty_page_content(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> None:
|
def check_empty_page_content(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> bool:
|
||||||
|
result = True
|
||||||
nvs_log.info(nvs_log.cyan(f'Page {nvs_page.header["status"]}'))
|
nvs_log.info(nvs_log.cyan(f'Page {nvs_page.header["status"]}'))
|
||||||
|
|
||||||
if nvs_page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size:
|
if nvs_page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size:
|
||||||
|
result = False
|
||||||
nvs_log.info(
|
nvs_log.info(
|
||||||
nvs_log.red(
|
nvs_log.red(
|
||||||
'The page is reported as Empty but its entry state bitmap is not empty!'
|
'The page is reported as Empty but its entry state bitmap is not empty!'
|
||||||
@ -49,16 +58,20 @@ def check_empty_page_content(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
if any([not e.is_empty for e in nvs_page.entries]):
|
if any([not e.is_empty for e in nvs_page.entries]):
|
||||||
|
result = False
|
||||||
nvs_log.info(
|
nvs_log.info(
|
||||||
nvs_log.red('The page is reported as Empty but there are data written!')
|
nvs_log.red('The page is reported as Empty but there are data written!')
|
||||||
)
|
)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> None:
|
|
||||||
|
def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> bool:
|
||||||
if nvs_page.header['crc']['original'] == nvs_page.header['crc']['computed']:
|
if nvs_page.header['crc']['original'] == nvs_page.header['crc']['computed']:
|
||||||
nvs_log.info(
|
nvs_log.info(
|
||||||
nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'), '\tCRC32: OK'
|
nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'), '\tCRC32: OK'
|
||||||
)
|
)
|
||||||
|
return True
|
||||||
else:
|
else:
|
||||||
nvs_log.info(
|
nvs_log.info(
|
||||||
nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'),
|
nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'),
|
||||||
@ -67,6 +80,7 @@ def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> None:
|
|||||||
f'Generated CRC32:',
|
f'Generated CRC32:',
|
||||||
nvs_log.green(f'{nvs_page.header["crc"]["computed"]:x}'),
|
nvs_log.green(f'{nvs_page.header["crc"]["computed"]:x}'),
|
||||||
)
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def identify_entry_duplicates(entry: NVS_Entry, seen_written_entires: Dict[str, list[NVS_Entry]]) -> Dict[str, list[NVS_Entry]]:
|
def identify_entry_duplicates(entry: NVS_Entry, seen_written_entires: Dict[str, list[NVS_Entry]]) -> Dict[str, list[NVS_Entry]]:
|
||||||
@ -187,13 +201,111 @@ def check_page_entries(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> Dict[str, lis
|
|||||||
return seen_written_entires
|
return seen_written_entires
|
||||||
|
|
||||||
|
|
||||||
def filter_entry_duplicates(seen_written_entires: Dict[str, list[NVS_Entry]]) -> List[List[NVS_Entry]]:
|
def filter_namespaces_fake_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
|
||||||
duplicate_entries_list = [seen_written_entires[key] for key in seen_written_entires if len(seen_written_entires[key]) > 1]
|
new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {}
|
||||||
return duplicate_entries_list
|
for key, duplicate_entries in duplicate_entries_dict.items():
|
||||||
|
seen_entries: List[NVS_Entry] = []
|
||||||
|
entry_same_namespace_collisions_list: set[NVS_Entry] = set()
|
||||||
|
|
||||||
|
# Search through the "duplicates" and see if there are real duplicates
|
||||||
|
# E.g. the key can be the same if the namespace is different
|
||||||
|
for entry in duplicate_entries:
|
||||||
|
if entry.metadata['type'] in nvs_const.item_type.values():
|
||||||
|
|
||||||
|
entry_same_namespace_collisions = set()
|
||||||
|
for other_entry in seen_entries:
|
||||||
|
if entry.metadata['namespace'] == other_entry.metadata['namespace']:
|
||||||
|
entry_same_namespace_collisions.add(entry)
|
||||||
|
entry_same_namespace_collisions.add(other_entry)
|
||||||
|
|
||||||
|
if len(entry_same_namespace_collisions) != 0:
|
||||||
|
entry_same_namespace_collisions_list.update(entry_same_namespace_collisions)
|
||||||
|
seen_entries.append(entry)
|
||||||
|
|
||||||
|
# Catch real duplicates
|
||||||
|
new_duplicate_entries: List[NVS_Entry] = []
|
||||||
|
if len(seen_entries) > 1:
|
||||||
|
for entry in seen_entries:
|
||||||
|
if entry in entry_same_namespace_collisions_list:
|
||||||
|
new_duplicate_entries.append(entry)
|
||||||
|
|
||||||
|
if len(new_duplicate_entries) > 0:
|
||||||
|
new_duplicate_entries_dict[key] = new_duplicate_entries
|
||||||
|
|
||||||
|
return new_duplicate_entries_dict
|
||||||
|
|
||||||
|
|
||||||
def print_entry_duplicates(page: NVS_Page, duplicate_entries_list: List[List[NVS_Entry]], nvs_log: NVS_Logger) -> None:
|
def filter_blob_related_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
|
||||||
for duplicate_entries in duplicate_entries_list:
|
new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {}
|
||||||
|
for key, duplicate_entries in duplicate_entries_dict.items():
|
||||||
|
seen_blob_index: List[NVS_Entry] = []
|
||||||
|
seen_blob_data: List[NVS_Entry] = []
|
||||||
|
seen_another_type_data: List[NVS_Entry] = []
|
||||||
|
blob_index_chunk_index_collisions_list: set[NVS_Entry] = set()
|
||||||
|
blob_data_chunk_index_collisions_list: set[NVS_Entry] = set()
|
||||||
|
|
||||||
|
# Search through the "duplicates" and see if there are real duplicates
|
||||||
|
# E.g. the key can be the same for blob_index and blob_data
|
||||||
|
# (and even for more blob_data entries if they have a different chunk_index)
|
||||||
|
for entry in duplicate_entries:
|
||||||
|
if entry.metadata['type'] == 'blob_index':
|
||||||
|
|
||||||
|
blob_index_chunk_index_collisions = set()
|
||||||
|
for other_entry in seen_blob_index:
|
||||||
|
if entry.metadata['namespace'] == other_entry.metadata['namespace']:
|
||||||
|
blob_index_chunk_index_collisions.add(entry)
|
||||||
|
blob_index_chunk_index_collisions.add(other_entry)
|
||||||
|
|
||||||
|
if len(blob_index_chunk_index_collisions) != 0:
|
||||||
|
blob_index_chunk_index_collisions_list.update(blob_index_chunk_index_collisions)
|
||||||
|
seen_blob_index.append(entry)
|
||||||
|
|
||||||
|
elif entry.metadata['type'] == 'blob_data':
|
||||||
|
|
||||||
|
blob_data_chunk_index_collisions = set()
|
||||||
|
for other_entry in seen_blob_data:
|
||||||
|
if (entry.metadata['namespace'] == other_entry.metadata['namespace']
|
||||||
|
and entry.metadata['chunk_index'] == other_entry.metadata['chunk_index']):
|
||||||
|
blob_data_chunk_index_collisions.add(entry)
|
||||||
|
blob_data_chunk_index_collisions.add(other_entry)
|
||||||
|
|
||||||
|
if len(blob_data_chunk_index_collisions) != 0:
|
||||||
|
blob_data_chunk_index_collisions_list.update(blob_data_chunk_index_collisions)
|
||||||
|
seen_blob_data.append(entry)
|
||||||
|
|
||||||
|
else:
|
||||||
|
seen_another_type_data.append(entry)
|
||||||
|
|
||||||
|
# Catch real duplicates
|
||||||
|
new_duplicate_entries: List[NVS_Entry] = []
|
||||||
|
if len(seen_blob_index) > 1:
|
||||||
|
for entry in seen_blob_index:
|
||||||
|
if entry in blob_index_chunk_index_collisions_list:
|
||||||
|
new_duplicate_entries.append(entry)
|
||||||
|
|
||||||
|
if len(seen_blob_data) > 1:
|
||||||
|
for entry in seen_blob_data:
|
||||||
|
if entry in blob_data_chunk_index_collisions_list:
|
||||||
|
new_duplicate_entries.append(entry)
|
||||||
|
|
||||||
|
for entry in seen_another_type_data: # If there are any duplicates of other types
|
||||||
|
new_duplicate_entries.append(entry)
|
||||||
|
|
||||||
|
if len(new_duplicate_entries) > 0:
|
||||||
|
new_duplicate_entries_dict[key] = new_duplicate_entries
|
||||||
|
|
||||||
|
return new_duplicate_entries_dict
|
||||||
|
|
||||||
|
|
||||||
|
def filter_entry_duplicates(seen_written_entires: Dict[str, list[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
|
||||||
|
duplicate_entries_list = {key: v for key, v in seen_written_entires.items() if len(v) > 1}
|
||||||
|
duplicate_entries_list_1 = filter_namespaces_fake_duplicates(duplicate_entries_list)
|
||||||
|
duplicate_entries_list_2 = filter_blob_related_duplicates(duplicate_entries_list_1)
|
||||||
|
return duplicate_entries_list_2
|
||||||
|
|
||||||
|
|
||||||
|
def print_entry_duplicates(page: NVS_Page, duplicate_entries_list: Dict[str, List[NVS_Entry]], nvs_log: NVS_Logger) -> None:
|
||||||
|
for _, duplicate_entries in duplicate_entries_list.items():
|
||||||
# duplicate_entries: list[NVS_Entry]
|
# duplicate_entries: list[NVS_Entry]
|
||||||
nvs_log.info(
|
nvs_log.info(
|
||||||
nvs_log.red(
|
nvs_log.red(
|
||||||
@ -228,8 +340,6 @@ def assemble_blobs(nvs_log: NVS_Logger) -> None:
|
|||||||
chunk_index = chunk.metadata['chunk_index'] - parent.data['chunk_start']
|
chunk_index = chunk.metadata['chunk_index'] - parent.data['chunk_start']
|
||||||
blobs[blob_key][chunk_index + 1] = chunk
|
blobs[blob_key][chunk_index + 1] = chunk
|
||||||
|
|
||||||
# return blobs
|
|
||||||
|
|
||||||
|
|
||||||
def check_blob_data(nvs_log: NVS_Logger) -> None:
|
def check_blob_data(nvs_log: NVS_Logger) -> None:
|
||||||
for blob_key in blobs:
|
for blob_key in blobs:
|
||||||
@ -292,6 +402,8 @@ def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None:
|
|||||||
# Free/empty page check
|
# Free/empty page check
|
||||||
check_empty_page_present(nvs_partition, nvs_log)
|
check_empty_page_present(nvs_partition, nvs_log)
|
||||||
|
|
||||||
|
seen_written_entires_all: Dict[str, list[NVS_Entry]] = {}
|
||||||
|
|
||||||
for page in nvs_partition.pages:
|
for page in nvs_partition.pages:
|
||||||
# page: NVS_Page
|
# page: NVS_Page
|
||||||
|
|
||||||
@ -306,8 +418,15 @@ def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None:
|
|||||||
# Check all entries
|
# Check all entries
|
||||||
seen_written_entires = check_page_entries(page, nvs_log)
|
seen_written_entires = check_page_entries(page, nvs_log)
|
||||||
|
|
||||||
# Duplicate entry check (2) - same key, different index - print duplicates
|
# Collect all seen written entries
|
||||||
duplicates = filter_entry_duplicates(seen_written_entires)
|
for key in seen_written_entires:
|
||||||
|
if key in seen_written_entires_all:
|
||||||
|
seen_written_entires_all[key].extend(seen_written_entires[key])
|
||||||
|
else:
|
||||||
|
seen_written_entires_all[key] = seen_written_entires[key]
|
||||||
|
|
||||||
|
# Duplicate entry check (2) - same key, different index
|
||||||
|
duplicates = filter_entry_duplicates(seen_written_entires_all)
|
||||||
# Print duplicate entries
|
# Print duplicate entries
|
||||||
print_entry_duplicates(page, duplicates, nvs_log)
|
print_entry_duplicates(page, duplicates, nvs_log)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user