mirror of
https://github.com/espressif/esp-idf.git
synced 2024-10-05 20:47:46 -04:00
05b356f87f
Little fixes in nvs_check.py
469 lines
18 KiB
Python
469 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
# SPDX-FileCopyrightText: 2023-2024 Espressif Systems (Shanghai) CO LTD
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
from typing import Dict
|
|
from typing import List
|
|
from typing import Optional
|
|
from typing import Set
|
|
|
|
from nvs_logger import NVS_Logger
|
|
from nvs_parser import nvs_const
|
|
from nvs_parser import NVS_Entry
|
|
from nvs_parser import NVS_Page
|
|
from nvs_parser import NVS_Partition
|
|
|
|
|
|
EMPTY_ENTRY = NVS_Entry(-1, bytearray(32), 'Erased')
|
|
|
|
used_namespaces: Dict[int, Optional[str]] = {}
|
|
found_namespaces: Dict[int, str] = {}
|
|
blobs: Dict = {}
|
|
blob_chunks: List[NVS_Entry] = []
|
|
|
|
|
|
def check_partition_size(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> bool:
|
|
if len(nvs_partition.pages) < 3:
|
|
nvs_log.info(
|
|
nvs_log.yellow(
|
|
'NVS Partition must contain 3 pages (sectors) at least to function properly!'
|
|
)
|
|
)
|
|
return False
|
|
return True
|
|
|
|
|
|
def check_empty_page_present(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> bool:
|
|
if not any(page.header['status'] == 'Empty' for page in nvs_partition.pages):
|
|
nvs_log.info(
|
|
nvs_log.red(
|
|
'''No free (empty) page found in the NVS partition,
|
|
at least one free page is required for proper function!'''
|
|
)
|
|
)
|
|
nvs_log.info(nvs_log.red('NVS partition possibly truncated?\n'))
|
|
return False
|
|
return True
|
|
|
|
|
|
def check_empty_page_content(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> bool:
|
|
result = True
|
|
nvs_log.info(nvs_log.cyan(f'Page {nvs_page.header["status"]}'))
|
|
|
|
if nvs_page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size:
|
|
result = False
|
|
nvs_log.info(
|
|
nvs_log.red(
|
|
'The page is reported as Empty but its entry state bitmap is not empty!'
|
|
)
|
|
)
|
|
|
|
if any([not e.is_empty for e in nvs_page.entries]):
|
|
result = False
|
|
nvs_log.info(
|
|
nvs_log.red('The page is reported as Empty but there are data written!')
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> bool:
|
|
if nvs_page.header['crc']['original'] == nvs_page.header['crc']['computed']:
|
|
nvs_log.info(
|
|
nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'), '\tCRC32: OK'
|
|
)
|
|
return True
|
|
else:
|
|
nvs_log.info(
|
|
nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'),
|
|
f'Original CRC32:',
|
|
nvs_log.red(f'{nvs_page.header["crc"]["original"]:x}'),
|
|
f'Generated CRC32:',
|
|
nvs_log.green(f'{nvs_page.header["crc"]["computed"]:x}'),
|
|
)
|
|
return False
|
|
|
|
|
|
def identify_entry_duplicates(entry: NVS_Entry, seen_written_entires: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
|
|
if entry.state == 'Written':
|
|
if entry.key in seen_written_entires:
|
|
seen_written_entires[entry.key].append(entry)
|
|
else:
|
|
seen_written_entires[entry.key] = [entry]
|
|
return seen_written_entires
|
|
|
|
|
|
def check_page_entries(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> Dict[str, List[NVS_Entry]]:
|
|
seen_written_entires: Dict[str, List[NVS_Entry]] = {}
|
|
|
|
for entry in nvs_page.entries:
|
|
# entry: NVS_Entry
|
|
entry.page = nvs_page
|
|
|
|
# Entries stored in 'page.entries' are primitive data types, blob indexes or string/blob data
|
|
|
|
# Variable length values themselves occupy whole 32 bytes (therefore metadata values are meaningless)
|
|
# and are stored in as entries inside string/blob data entry 'entry.children' list
|
|
|
|
# Duplicate entry check (1) - same key, different index - find duplicates
|
|
seen_written_entires = identify_entry_duplicates(entry, seen_written_entires)
|
|
|
|
# Entry state check - doesn't check variable length values (metadata such as state are meaningless as all 32 bytes are pure data)
|
|
if entry.is_empty:
|
|
if entry.state == 'Written':
|
|
nvs_log.info(
|
|
nvs_log.red(
|
|
f' Entry #{entry.index:03d} is reported as Written but it is empty!'
|
|
)
|
|
)
|
|
continue
|
|
elif entry.state == 'Erased':
|
|
nvs_log.info(
|
|
nvs_log.yellow(
|
|
f' Entry #{entry.index:03d} is reported as Erased but it is empty! (Only entries reported as Empty should be empty)'
|
|
)
|
|
)
|
|
|
|
if entry.state == 'Written':
|
|
# Entry CRC32 check
|
|
if (
|
|
entry.metadata['crc']['original']
|
|
!= entry.metadata['crc']['computed']
|
|
):
|
|
nvs_log.info(
|
|
nvs_log.red(
|
|
f' Entry #{entry.index:03d} {entry.key} has wrong CRC32!{"": <5}'
|
|
),
|
|
f'Written:',
|
|
nvs_log.red(f'{entry.metadata["crc"]["original"]:x}'),
|
|
f'Generated:',
|
|
nvs_log.green(f'{entry.metadata["crc"]["computed"]:x}'),
|
|
)
|
|
|
|
# Entry children CRC32 check
|
|
if (
|
|
entry.metadata['span'] > 1
|
|
and (entry.metadata['crc']['data_original'] != entry.metadata['crc']['data_computed'])
|
|
):
|
|
nvs_log.info(
|
|
nvs_log.red(
|
|
f' Entry #{entry.index:03d} {entry.key} data (string, blob) has wrong CRC32!'
|
|
),
|
|
f'Written:',
|
|
nvs_log.red(f'{entry.metadata["crc"]["data_original"]:x}'),
|
|
f'Generated:',
|
|
nvs_log.green(f'{entry.metadata["crc"]["data_computed"]:x}'),
|
|
)
|
|
|
|
# Entry type check
|
|
if entry.metadata['type'] not in [
|
|
nvs_const.item_type[key] for key in nvs_const.item_type
|
|
]:
|
|
nvs_log.info(
|
|
nvs_log.yellow(
|
|
f' Type of entry #{entry.index:03d} {entry.key} is unrecognized!'
|
|
),
|
|
f'Type: {entry.metadata["type"]}',
|
|
)
|
|
|
|
# Span check
|
|
if (
|
|
entry.index + entry.metadata['span'] - 1
|
|
>= int(nvs_const.page_size / nvs_const.entry_size) - 2
|
|
):
|
|
nvs_log.info(
|
|
nvs_log.red(
|
|
f' Variable length entry #{entry.index:03d} {entry.key} is out of bounds!'
|
|
)
|
|
)
|
|
# Spanned entry state checks
|
|
elif entry.metadata['span'] > 1:
|
|
parent_state = entry.state
|
|
for kid in entry.children:
|
|
if parent_state != kid.state:
|
|
nvs_log.info(
|
|
nvs_log.yellow(' Inconsistent data state!'),
|
|
f'Entry #{entry.index:03d} {entry.key} state: {parent_state},',
|
|
f'Data entry #{kid.index:03d} {entry.key} state: {kid.state}',
|
|
)
|
|
|
|
# Gather blobs & namespaces
|
|
if entry.metadata['type'] == 'blob_index':
|
|
blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [
|
|
EMPTY_ENTRY
|
|
] * entry.data['chunk_count']
|
|
elif entry.metadata['type'] == 'blob_data':
|
|
blob_chunks.append(entry)
|
|
|
|
if entry.metadata['namespace'] == 0:
|
|
found_namespaces[entry.data['value']] = entry.key
|
|
else:
|
|
used_namespaces[entry.metadata['namespace']] = None
|
|
|
|
return seen_written_entires
|
|
|
|
|
|
def filter_namespaces_fake_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
|
|
new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {}
|
|
for key, duplicate_entries in duplicate_entries_dict.items():
|
|
seen_entries: List[NVS_Entry] = []
|
|
entry_same_namespace_collisions_list: Set[NVS_Entry] = set()
|
|
|
|
# Search through the "duplicates" and see if there are real duplicates
|
|
# E.g. the key can be the same if the namespace is different
|
|
for entry in duplicate_entries:
|
|
if entry.metadata['type'] in nvs_const.item_type.values():
|
|
|
|
entry_same_namespace_collisions = set()
|
|
for other_entry in seen_entries:
|
|
if entry.metadata['namespace'] == other_entry.metadata['namespace']:
|
|
entry_same_namespace_collisions.add(entry)
|
|
entry_same_namespace_collisions.add(other_entry)
|
|
|
|
if len(entry_same_namespace_collisions) != 0:
|
|
entry_same_namespace_collisions_list.update(entry_same_namespace_collisions)
|
|
seen_entries.append(entry)
|
|
|
|
# Catch real duplicates
|
|
new_duplicate_entries: List[NVS_Entry] = []
|
|
if len(seen_entries) > 1:
|
|
for entry in seen_entries:
|
|
if entry in entry_same_namespace_collisions_list:
|
|
new_duplicate_entries.append(entry)
|
|
|
|
if len(new_duplicate_entries) > 0:
|
|
new_duplicate_entries_dict[key] = new_duplicate_entries
|
|
|
|
return new_duplicate_entries_dict
|
|
|
|
|
|
def filter_blob_related_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
|
|
new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {}
|
|
for key, duplicate_entries in duplicate_entries_dict.items():
|
|
seen_blob_index: List[NVS_Entry] = []
|
|
seen_blob_data: List[NVS_Entry] = []
|
|
seen_another_type_data: List[NVS_Entry] = []
|
|
blob_index_chunk_index_collisions_list: Set[NVS_Entry] = set()
|
|
blob_data_chunk_index_collisions_list: Set[NVS_Entry] = set()
|
|
|
|
# Search through the "duplicates" and see if there are real duplicates
|
|
# E.g. the key can be the same for blob_index and blob_data
|
|
# (and even for more blob_data entries if they have a different chunk_index)
|
|
for entry in duplicate_entries:
|
|
if entry.metadata['type'] == 'blob_index':
|
|
|
|
blob_index_chunk_index_collisions = set()
|
|
for other_entry in seen_blob_index:
|
|
if entry.metadata['namespace'] == other_entry.metadata['namespace']:
|
|
blob_index_chunk_index_collisions.add(entry)
|
|
blob_index_chunk_index_collisions.add(other_entry)
|
|
|
|
if len(blob_index_chunk_index_collisions) != 0:
|
|
blob_index_chunk_index_collisions_list.update(blob_index_chunk_index_collisions)
|
|
seen_blob_index.append(entry)
|
|
|
|
elif entry.metadata['type'] == 'blob_data':
|
|
|
|
blob_data_chunk_index_collisions = set()
|
|
for other_entry in seen_blob_data:
|
|
if (entry.metadata['namespace'] == other_entry.metadata['namespace']
|
|
and entry.metadata['chunk_index'] == other_entry.metadata['chunk_index']):
|
|
blob_data_chunk_index_collisions.add(entry)
|
|
blob_data_chunk_index_collisions.add(other_entry)
|
|
|
|
if len(blob_data_chunk_index_collisions) != 0:
|
|
blob_data_chunk_index_collisions_list.update(blob_data_chunk_index_collisions)
|
|
seen_blob_data.append(entry)
|
|
|
|
else:
|
|
seen_another_type_data.append(entry)
|
|
|
|
# Catch real duplicates
|
|
new_duplicate_entries: List[NVS_Entry] = []
|
|
if len(seen_blob_index) > 1:
|
|
for entry in seen_blob_index:
|
|
if entry in blob_index_chunk_index_collisions_list:
|
|
new_duplicate_entries.append(entry)
|
|
|
|
if len(seen_blob_data) > 1:
|
|
for entry in seen_blob_data:
|
|
if entry in blob_data_chunk_index_collisions_list:
|
|
new_duplicate_entries.append(entry)
|
|
|
|
for entry in seen_another_type_data: # If there are any duplicates of other types
|
|
new_duplicate_entries.append(entry)
|
|
|
|
if len(new_duplicate_entries) > 0:
|
|
new_duplicate_entries_dict[key] = new_duplicate_entries
|
|
|
|
return new_duplicate_entries_dict
|
|
|
|
|
|
def filter_entry_duplicates(seen_written_entires: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
|
|
duplicate_entries_list = {key: v for key, v in seen_written_entires.items() if len(v) > 1}
|
|
duplicate_entries_list_1 = filter_namespaces_fake_duplicates(duplicate_entries_list)
|
|
duplicate_entries_list_2 = filter_blob_related_duplicates(duplicate_entries_list_1)
|
|
return duplicate_entries_list_2
|
|
|
|
|
|
def print_entry_duplicates(duplicate_entries_list: Dict[str, List[NVS_Entry]], nvs_log: NVS_Logger) -> None:
|
|
if len(duplicate_entries_list) > 0:
|
|
nvs_log.info(nvs_log.red('Found duplicate entries:'))
|
|
nvs_log.info(nvs_log.red('Entry\tKey\t\t\tType\t\tNamespace idx\tPage\tPage status'))
|
|
|
|
for _, duplicate_entries in duplicate_entries_list.items():
|
|
# duplicate_entries: List[NVS_Entry]
|
|
for entry in duplicate_entries:
|
|
# entry: NVS_Entry
|
|
if entry.metadata['namespace'] == 0:
|
|
entry_type = f'namespace ({entry.data["value"]})'
|
|
else:
|
|
entry_type = entry.metadata['type']
|
|
|
|
if entry.page is not None:
|
|
page_num = entry.page.header['page_index']
|
|
page_status = entry.page.header['status']
|
|
else:
|
|
page_num = 'Unknown'
|
|
page_status = 'Unknown'
|
|
|
|
entry_key_tab_cnt = len(entry.key) // 8
|
|
entry_key_tab = '\t' * (3 - entry_key_tab_cnt)
|
|
|
|
namespace_tab_cnt = len(entry_type) // 8
|
|
namepace_tab = '\t' * (2 - namespace_tab_cnt)
|
|
namespace_str = f'{entry.metadata["namespace"]}'
|
|
|
|
nvs_log.info(
|
|
nvs_log.red(
|
|
f'#{entry.index:03d}\t{entry.key}{entry_key_tab}{entry_type}{namepace_tab}{namespace_str}\t\t{page_num}\t{page_status}'
|
|
)
|
|
)
|
|
|
|
|
|
def assemble_blobs(nvs_log: NVS_Logger) -> None:
|
|
for chunk in blob_chunks:
|
|
# chunk: NVS_Entry
|
|
parent = blobs.get(
|
|
f'{chunk.metadata["namespace"]:03d}{chunk.key}', [EMPTY_ENTRY]
|
|
)[0]
|
|
# Blob chunk without blob index check
|
|
if parent is EMPTY_ENTRY:
|
|
nvs_log.info(
|
|
nvs_log.red(f'Blob {chunk.key} chunk has no blob index!'),
|
|
f'Namespace index: {chunk.metadata["namespace"]:03d}',
|
|
f'[{found_namespaces.get(chunk.metadata["namespace"], "undefined")}],',
|
|
f'Chunk Index: {chunk.metadata["chunk_index"]:03d}',
|
|
)
|
|
else:
|
|
blob_key = f'{chunk.metadata["namespace"]:03d}{chunk.key}'
|
|
chunk_index = chunk.metadata['chunk_index'] - parent.data['chunk_start']
|
|
blobs[blob_key][chunk_index + 1] = chunk
|
|
|
|
|
|
def check_blob_data(nvs_log: NVS_Logger) -> None:
|
|
for blob_key in blobs:
|
|
blob_index = blobs[blob_key][0]
|
|
blob_chunks = blobs[blob_key][1:]
|
|
blob_size = blob_index.data['size']
|
|
|
|
for i, chunk in enumerate(blob_chunks):
|
|
# chunk: NVS_Entry
|
|
# Blob missing chunk check
|
|
if chunk is EMPTY_ENTRY:
|
|
nvs_log.info(
|
|
nvs_log.red(f'Blob {blob_index.key} is missing a chunk!'),
|
|
f'Namespace index: {blob_index.metadata["namespace"]:03d}',
|
|
f'[{found_namespaces.get(blob_index.metadata["namespace"], "undefined")}],',
|
|
f'Chunk Index: {i:03d}',
|
|
)
|
|
else:
|
|
blob_size -= len(chunk.children) * nvs_const.entry_size
|
|
|
|
# Blob missing data check
|
|
if blob_size > 0:
|
|
nvs_log.info(
|
|
nvs_log.red(f'Blob {blob_index.key} is missing {blob_size} B of data!'),
|
|
f'Namespace index: {blob_index.metadata["namespace"]:03d}',
|
|
)
|
|
|
|
|
|
def check_blobs(nvs_log: NVS_Logger) -> None:
|
|
# Assemble blobs
|
|
assemble_blobs(nvs_log)
|
|
# Blob data check
|
|
check_blob_data(nvs_log)
|
|
|
|
|
|
def check_namespaces(nvs_log: NVS_Logger) -> None:
|
|
# Undefined namespace index check
|
|
for used_ns in used_namespaces:
|
|
key = found_namespaces.pop(used_ns, None)
|
|
if key is None:
|
|
nvs_log.info(
|
|
nvs_log.red('Undefined namespace index!'),
|
|
f'Namespace index: {used_ns:03d}',
|
|
f'[undefined]',
|
|
)
|
|
|
|
# Unused namespace index check
|
|
for unused_ns in found_namespaces:
|
|
nvs_log.info(
|
|
nvs_log.yellow('Found unused namespace.'),
|
|
f'Namespace index: {unused_ns:03d}',
|
|
f'[{found_namespaces[unused_ns]}]',
|
|
)
|
|
|
|
|
|
def reset_global_variables() -> None:
|
|
global used_namespaces, found_namespaces, blobs, blob_chunks
|
|
used_namespaces = {}
|
|
found_namespaces = {}
|
|
blobs = {}
|
|
blob_chunks = []
|
|
|
|
|
|
def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None:
|
|
# Partition size check
|
|
check_partition_size(nvs_partition, nvs_log)
|
|
|
|
# Free/empty page check
|
|
check_empty_page_present(nvs_partition, nvs_log)
|
|
|
|
seen_written_entires_all: Dict[str, List[NVS_Entry]] = {}
|
|
|
|
for page in nvs_partition.pages:
|
|
# page: NVS_Page
|
|
|
|
# Print page header
|
|
if page.header['status'] == 'Empty':
|
|
# Check if page is truly empty
|
|
check_empty_page_content(page, nvs_log)
|
|
else:
|
|
# Check page header CRC32
|
|
check_page_crc(page, nvs_log)
|
|
|
|
# Check all entries
|
|
seen_written_entires = check_page_entries(page, nvs_log)
|
|
|
|
# Collect all seen written entries
|
|
for key in seen_written_entires:
|
|
if key in seen_written_entires_all:
|
|
seen_written_entires_all[key].extend(seen_written_entires[key])
|
|
else:
|
|
seen_written_entires_all[key] = seen_written_entires[key]
|
|
|
|
# Duplicate entry check (2) - same key, different index
|
|
duplicates = filter_entry_duplicates(seen_written_entires_all)
|
|
# Print duplicate entries
|
|
print_entry_duplicates(duplicates, nvs_log)
|
|
|
|
nvs_log.info() # Empty line
|
|
|
|
# Blob checks
|
|
check_blobs(nvs_log)
|
|
|
|
# Namespace checks
|
|
check_namespaces(nvs_log)
|
|
|
|
reset_global_variables()
|