Merge branch 'fix/nvs_tool_false_duplicate_warning' into 'master'

fix(nvs): nvs_tool.py refactor, reduce false duplicate warnings, add a test

Closes IDF-10684

See merge request espressif/esp-idf!32449
This commit is contained in:
Adam Múdry 2024-09-04 11:05:44 +08:00
commit 98cf50d140
5 changed files with 722 additions and 158 deletions

View File

@ -388,3 +388,17 @@ test_idf_build_apps_load_soc_caps:
extends: .host_test_template
script:
- python tools/ci/check_soc_headers_load_in_idf_build_apps.py
test_nvs_gen_check:
extends: .host_test_template
artifacts:
paths:
- XUNIT_RESULT.xml
- components/nvs_flash/nvs_partition_tool
reports:
junit: XUNIT_RESULT.xml
variables:
LC_ALL: C.UTF-8
script:
- cd ${IDF_PATH}/components/nvs_flash/nvs_partition_tool
- pytest --noconftest test_nvs_gen_check.py --junitxml=XUNIT_RESULT.xml

View File

@ -1,28 +1,38 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD
# SPDX-FileCopyrightText: 2023-2024 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from typing import Dict, List
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from nvs_logger import NVS_Logger
from nvs_parser import NVS_Entry, NVS_Partition, nvs_const
from nvs_parser import nvs_const
from nvs_parser import NVS_Entry
from nvs_parser import NVS_Page
from nvs_parser import NVS_Partition
def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None:
used_namespaces: Dict[int, None] = {}
found_namespaces: Dict[int, str] = {}
blobs: Dict = {}
blob_chunks: List[NVS_Entry] = []
empty_entry = NVS_Entry(-1, bytearray(32), 'Erased')
EMPTY_ENTRY = NVS_Entry(-1, bytearray(32), 'Erased')
# Partition size check
used_namespaces: Dict[int, Optional[str]] = {}
found_namespaces: Dict[int, str] = {}
blobs: Dict = {}
blob_chunks: List[NVS_Entry] = []
def check_partition_size(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> bool:
if len(nvs_partition.pages) < 3:
nvs_log.info(
nvs_log.yellow(
'NVS Partition must contain 3 pages (sectors) at least to function properly!'
)
)
return False
return True
# Free/empty page check
def check_empty_page_present(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> bool:
if not any(page.header['status'] == 'Empty' for page in nvs_partition.pages):
nvs_log.info(
nvs_log.red(
@ -31,176 +41,313 @@ at least one free page is required for proper function!'''
)
)
nvs_log.info(nvs_log.red('NVS partition possibly truncated?\n'))
return False
return True
for page in nvs_partition.pages:
# page: NVS_Page
# Print page header
if page.header['status'] == 'Empty':
nvs_log.info(nvs_log.cyan('Page Empty'))
def check_empty_page_content(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> bool:
result = True
nvs_log.info(nvs_log.cyan(f'Page {nvs_page.header["status"]}'))
# Check if page is truly empty
if page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size:
if nvs_page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size:
result = False
nvs_log.info(
nvs_log.red(
'The page is reported as Empty but its entry state bitmap is not empty!'
)
)
if any([not e.is_empty for e in nvs_page.entries]):
result = False
nvs_log.info(
nvs_log.red('The page is reported as Empty but there are data written!')
)
return result
def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> bool:
if nvs_page.header['crc']['original'] == nvs_page.header['crc']['computed']:
nvs_log.info(
nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'), '\tCRC32: OK'
)
return True
else:
nvs_log.info(
nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'),
f'Original CRC32:',
nvs_log.red(f'{nvs_page.header["crc"]["original"]:x}'),
f'Generated CRC32:',
nvs_log.green(f'{nvs_page.header["crc"]["computed"]:x}'),
)
return False
def identify_entry_duplicates(entry: NVS_Entry, seen_written_entires: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
if entry.state == 'Written':
if entry.key in seen_written_entires:
seen_written_entires[entry.key].append(entry)
else:
seen_written_entires[entry.key] = [entry]
return seen_written_entires
def check_page_entries(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> Dict[str, List[NVS_Entry]]:
seen_written_entires: Dict[str, List[NVS_Entry]] = {}
for entry in nvs_page.entries:
# entry: NVS_Entry
entry.page = nvs_page
# Entries stored in 'page.entries' are primitive data types, blob indexes or string/blob data
# Variable length values themselves occupy whole 32 bytes (therefore metadata values are meaningless)
# and are stored in as entries inside string/blob data entry 'entry.children' list
# Duplicate entry check (1) - same key, different index - find duplicates
seen_written_entires = identify_entry_duplicates(entry, seen_written_entires)
# Entry state check - doesn't check variable length values (metadata such as state are meaningless as all 32 bytes are pure data)
if entry.is_empty:
if entry.state == 'Written':
nvs_log.info(
nvs_log.red(
'The page is reported as Empty but its entry state bitmap is not empty!'
f' Entry #{entry.index:03d} is reported as Written but it is empty!'
)
)
if any([not e.is_empty for e in page.entries]):
continue
elif entry.state == 'Erased':
nvs_log.info(
nvs_log.red('The page is reported as Empty but there are data written!')
nvs_log.yellow(
f' Entry #{entry.index:03d} is reported as Erased but it is empty! (Only entries reported as Empty should be empty)'
)
)
else:
# Check page header CRC32
if page.header['crc']['original'] == page.header['crc']['computed']:
if entry.state == 'Written':
# Entry CRC32 check
if (
entry.metadata['crc']['original']
!= entry.metadata['crc']['computed']
):
nvs_log.info(
nvs_log.cyan(f'Page no. {page.header["page_index"]}'), '\tCRC32: OK'
nvs_log.red(
f' Entry #{entry.index:03d} {entry.key} has wrong CRC32!{"": <5}'
),
f'Written:',
nvs_log.red(f'{entry.metadata["crc"]["original"]:x}'),
f'Generated:',
nvs_log.green(f'{entry.metadata["crc"]["computed"]:x}'),
)
# Entry children CRC32 check
if (
entry.metadata['span'] > 1
and (entry.metadata['crc']['data_original'] != entry.metadata['crc']['data_computed'])
):
nvs_log.info(
nvs_log.red(
f' Entry #{entry.index:03d} {entry.key} data (string, blob) has wrong CRC32!'
),
f'Written:',
nvs_log.red(f'{entry.metadata["crc"]["data_original"]:x}'),
f'Generated:',
nvs_log.green(f'{entry.metadata["crc"]["data_computed"]:x}'),
)
# Entry type check
if entry.metadata['type'] not in [
nvs_const.item_type[key] for key in nvs_const.item_type
]:
nvs_log.info(
nvs_log.yellow(
f' Type of entry #{entry.index:03d} {entry.key} is unrecognized!'
),
f'Type: {entry.metadata["type"]}',
)
# Span check
if (
entry.index + entry.metadata['span'] - 1
>= int(nvs_const.page_size / nvs_const.entry_size) - 2
):
nvs_log.info(
nvs_log.red(
f' Variable length entry #{entry.index:03d} {entry.key} is out of bounds!'
)
)
# Spanned entry state checks
elif entry.metadata['span'] > 1:
parent_state = entry.state
for kid in entry.children:
if parent_state != kid.state:
nvs_log.info(
nvs_log.yellow(' Inconsistent data state!'),
f'Entry #{entry.index:03d} {entry.key} state: {parent_state},',
f'Data entry #{kid.index:03d} {entry.key} state: {kid.state}',
)
# Gather blobs & namespaces
if entry.metadata['type'] == 'blob_index':
blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [
EMPTY_ENTRY
] * entry.data['chunk_count']
elif entry.metadata['type'] == 'blob_data':
blob_chunks.append(entry)
if entry.metadata['namespace'] == 0:
found_namespaces[entry.data['value']] = entry.key
else:
nvs_log.info(
nvs_log.cyan(f'Page no. {page.header["page_index"]}'),
f'Original CRC32:',
nvs_log.red(f'{page.header["crc"]["original"]:x}'),
f'Generated CRC32:',
nvs_log.green(f'{page.header["crc"]["computed"]:x}'),
)
used_namespaces[entry.metadata['namespace']] = None
# Check all entries
seen_written_entires: Dict[str, list[NVS_Entry]] = {}
for entry in page.entries:
return seen_written_entires
def filter_namespaces_fake_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {}
for key, duplicate_entries in duplicate_entries_dict.items():
seen_entries: List[NVS_Entry] = []
entry_same_namespace_collisions_list: Set[NVS_Entry] = set()
# Search through the "duplicates" and see if there are real duplicates
# E.g. the key can be the same if the namespace is different
for entry in duplicate_entries:
if entry.metadata['type'] in nvs_const.item_type.values():
entry_same_namespace_collisions = set()
for other_entry in seen_entries:
if entry.metadata['namespace'] == other_entry.metadata['namespace']:
entry_same_namespace_collisions.add(entry)
entry_same_namespace_collisions.add(other_entry)
if len(entry_same_namespace_collisions) != 0:
entry_same_namespace_collisions_list.update(entry_same_namespace_collisions)
seen_entries.append(entry)
# Catch real duplicates
new_duplicate_entries: List[NVS_Entry] = []
if len(seen_entries) > 1:
for entry in seen_entries:
if entry in entry_same_namespace_collisions_list:
new_duplicate_entries.append(entry)
if len(new_duplicate_entries) > 0:
new_duplicate_entries_dict[key] = new_duplicate_entries
return new_duplicate_entries_dict
def filter_blob_related_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {}
for key, duplicate_entries in duplicate_entries_dict.items():
seen_blob_index: List[NVS_Entry] = []
seen_blob_data: List[NVS_Entry] = []
seen_another_type_data: List[NVS_Entry] = []
blob_index_chunk_index_collisions_list: Set[NVS_Entry] = set()
blob_data_chunk_index_collisions_list: Set[NVS_Entry] = set()
# Search through the "duplicates" and see if there are real duplicates
# E.g. the key can be the same for blob_index and blob_data
# (and even for more blob_data entries if they have a different chunk_index)
for entry in duplicate_entries:
if entry.metadata['type'] == 'blob_index':
blob_index_chunk_index_collisions = set()
for other_entry in seen_blob_index:
if entry.metadata['namespace'] == other_entry.metadata['namespace']:
blob_index_chunk_index_collisions.add(entry)
blob_index_chunk_index_collisions.add(other_entry)
if len(blob_index_chunk_index_collisions) != 0:
blob_index_chunk_index_collisions_list.update(blob_index_chunk_index_collisions)
seen_blob_index.append(entry)
elif entry.metadata['type'] == 'blob_data':
blob_data_chunk_index_collisions = set()
for other_entry in seen_blob_data:
if (entry.metadata['namespace'] == other_entry.metadata['namespace']
and entry.metadata['chunk_index'] == other_entry.metadata['chunk_index']):
blob_data_chunk_index_collisions.add(entry)
blob_data_chunk_index_collisions.add(other_entry)
if len(blob_data_chunk_index_collisions) != 0:
blob_data_chunk_index_collisions_list.update(blob_data_chunk_index_collisions)
seen_blob_data.append(entry)
else:
seen_another_type_data.append(entry)
# Catch real duplicates
new_duplicate_entries: List[NVS_Entry] = []
if len(seen_blob_index) > 1:
for entry in seen_blob_index:
if entry in blob_index_chunk_index_collisions_list:
new_duplicate_entries.append(entry)
if len(seen_blob_data) > 1:
for entry in seen_blob_data:
if entry in blob_data_chunk_index_collisions_list:
new_duplicate_entries.append(entry)
for entry in seen_another_type_data: # If there are any duplicates of other types
new_duplicate_entries.append(entry)
if len(new_duplicate_entries) > 0:
new_duplicate_entries_dict[key] = new_duplicate_entries
return new_duplicate_entries_dict
def filter_entry_duplicates(seen_written_entires: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]:
duplicate_entries_list = {key: v for key, v in seen_written_entires.items() if len(v) > 1}
duplicate_entries_list_1 = filter_namespaces_fake_duplicates(duplicate_entries_list)
duplicate_entries_list_2 = filter_blob_related_duplicates(duplicate_entries_list_1)
return duplicate_entries_list_2
def print_entry_duplicates(duplicate_entries_list: Dict[str, List[NVS_Entry]], nvs_log: NVS_Logger) -> None:
if len(duplicate_entries_list) > 0:
nvs_log.info(nvs_log.red('Found duplicate entries:'))
nvs_log.info(nvs_log.red('Entry\tKey\t\t\tType\t\tNamespace idx\tPage\tPage status'))
for _, duplicate_entries in duplicate_entries_list.items():
# duplicate_entries: List[NVS_Entry]
for entry in duplicate_entries:
# entry: NVS_Entry
if entry.metadata['namespace'] == 0:
entry_type = f'namespace ({entry.data["value"]})'
else:
entry_type = entry.metadata['type']
# Entries stored in 'page.entries' are primitive data types, blob indexes or string/blob data
if entry.page is not None:
page_num = entry.page.header['page_index']
page_status = entry.page.header['status']
else:
page_num = 'Unknown'
page_status = 'Unknown'
# Variable length values themselves occupy whole 32 bytes (therefore metadata values are meaningless)
# and are stored in as entries inside string/blob data entry 'entry.children' list
entry_key_tab_cnt = len(entry.key) // 8
entry_key_tab = '\t' * (3 - entry_key_tab_cnt)
# Duplicate entry check (1) - same key, different index - find duplicates
if entry.state == 'Written':
if entry.key in seen_written_entires:
seen_written_entires[entry.key].append(entry)
else:
seen_written_entires[entry.key] = [entry]
namespace_tab_cnt = len(entry_type) // 8
namepace_tab = '\t' * (2 - namespace_tab_cnt)
namespace_str = f'{entry.metadata["namespace"]}'
# Entry state check - doesn't check variable length values (metadata such as state are meaningless as all 32 bytes are pure data)
if entry.is_empty:
if entry.state == 'Written':
nvs_log.info(
nvs_log.red(
f' Entry #{entry.index:03d} is reported as Written but it is empty!'
)
)
continue
elif entry.state == 'Erased':
nvs_log.info(
nvs_log.yellow(
f' Entry #{entry.index:03d} is reported as Erased but it is empty! (Only entries reported as Empty should be empty)'
)
)
if entry.state == 'Written':
# Entry CRC32 check
if (
entry.metadata['crc']['original']
!= entry.metadata['crc']['computed']
):
nvs_log.info(
nvs_log.red(
f' Entry #{entry.index:03d} {entry.key} has wrong CRC32!{"": <5}'
),
f'Written:',
nvs_log.red(f'{entry.metadata["crc"]["original"]:x}'),
f'Generated:',
nvs_log.green(f'{entry.metadata["crc"]["computed"]:x}'),
)
# Entry children CRC32 check
if (
entry.metadata['span'] > 1
and (entry.metadata['crc']['data_original'] != entry.metadata['crc']['data_computed'])
):
nvs_log.info(
nvs_log.red(
f' Entry #{entry.index:03d} {entry.key} data (string, blob) has wrong CRC32!'
),
f'Written:',
nvs_log.red(f'{entry.metadata["crc"]["data_original"]:x}'),
f'Generated:',
nvs_log.green(f'{entry.metadata["crc"]["data_computed"]:x}'),
)
# Entry type check
if entry.metadata['type'] not in [
nvs_const.item_type[key] for key in nvs_const.item_type
]:
nvs_log.info(
nvs_log.yellow(
f' Type of entry #{entry.index:03d} {entry.key} is unrecognized!'
),
f'Type: {entry.metadata["type"]}',
)
# Span check
if (
entry.index + entry.metadata['span'] - 1
>= int(nvs_const.page_size / nvs_const.entry_size) - 2
):
nvs_log.info(
nvs_log.red(
f' Variable length entry #{entry.index:03d} {entry.key} is out of bounds!'
)
)
# Spanned entry state checks
elif entry.metadata['span'] > 1:
parent_state = entry.state
for kid in entry.children:
if parent_state != kid.state:
nvs_log.info(
nvs_log.yellow(' Inconsistent data state!'),
f'Entry #{entry.index:03d} {entry.key} state: {parent_state},',
f'Data entry #{kid.index:03d} {entry.key} state: {kid.state}',
)
# Gather blobs & namespaces
if entry.metadata['type'] == 'blob_index':
blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [
empty_entry
] * entry.data['chunk_count']
elif entry.metadata['type'] == 'blob_data':
blob_chunks.append(entry)
if entry.metadata['namespace'] == 0:
found_namespaces[entry.data['value']] = entry.key
else:
used_namespaces[entry.metadata['namespace']] = None
# Duplicate entry check (2) - same key, different index - print duplicates
duplicate_entries_list = [seen_written_entires[key] for key in seen_written_entires if len(seen_written_entires[key]) > 1]
for duplicate_entries in duplicate_entries_list:
# duplicate_entries: list[NVS_Entry]
nvs_log.info(
nvs_log.red(
f'''Entry key {duplicate_entries[0].key} on page no. {page.header["page_index"]}
with status {page.header["status"]} is used by the following entries:'''
f'#{entry.index:03d}\t{entry.key}{entry_key_tab}{entry_type}{namepace_tab}{namespace_str}\t\t{page_num}\t{page_status}'
)
)
for entry in duplicate_entries:
nvs_log.info(
nvs_log.red(
f'Entry #{entry.index:03d} {entry.key} is a duplicate!'
)
)
nvs_log.info()
# Blob checks
# Assemble blobs
def assemble_blobs(nvs_log: NVS_Logger) -> None:
for chunk in blob_chunks:
# chunk: NVS_Entry
parent = blobs.get(
f'{chunk.metadata["namespace"]:03d}{chunk.key}', [empty_entry]
f'{chunk.metadata["namespace"]:03d}{chunk.key}', [EMPTY_ENTRY]
)[0]
# Blob chunk without blob index check
if parent is empty_entry:
if parent is EMPTY_ENTRY:
nvs_log.info(
nvs_log.red(f'Blob {chunk.key} chunk has no blob index!'),
f'Namespace index: {chunk.metadata["namespace"]:03d}',
@ -212,15 +359,17 @@ with status {page.header["status"]} is used by the following entries:'''
chunk_index = chunk.metadata['chunk_index'] - parent.data['chunk_start']
blobs[blob_key][chunk_index + 1] = chunk
# Blob data check
def check_blob_data(nvs_log: NVS_Logger) -> None:
for blob_key in blobs:
blob_index = blobs[blob_key][0]
blob_chunks = blobs[blob_key][1:]
blob_size = blob_index.data['size']
for i, chunk in enumerate(blob_chunks):
# chunk: NVS_Entry
# Blob missing chunk check
if chunk is empty_entry:
if chunk is EMPTY_ENTRY:
nvs_log.info(
nvs_log.red(f'Blob {blob_index.key} is missing a chunk!'),
f'Namespace index: {blob_index.metadata["namespace"]:03d}',
@ -237,11 +386,19 @@ with status {page.header["status"]} is used by the following entries:'''
f'Namespace index: {blob_index.metadata["namespace"]:03d}',
)
# Namespace checks
def check_blobs(nvs_log: NVS_Logger) -> None:
# Assemble blobs
assemble_blobs(nvs_log)
# Blob data check
check_blob_data(nvs_log)
def check_namespaces(nvs_log: NVS_Logger) -> None:
# Undefined namespace index check
for used_ns in used_namespaces:
key = found_namespaces.pop(used_ns, '')
if key == '':
key = found_namespaces.pop(used_ns, None)
if key is None:
nvs_log.info(
nvs_log.red('Undefined namespace index!'),
f'Namespace index: {used_ns:03d}',
@ -255,3 +412,57 @@ with status {page.header["status"]} is used by the following entries:'''
f'Namespace index: {unused_ns:03d}',
f'[{found_namespaces[unused_ns]}]',
)
def reset_global_variables() -> None:
global used_namespaces, found_namespaces, blobs, blob_chunks
used_namespaces = {}
found_namespaces = {}
blobs = {}
blob_chunks = []
def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None:
# Partition size check
check_partition_size(nvs_partition, nvs_log)
# Free/empty page check
check_empty_page_present(nvs_partition, nvs_log)
seen_written_entires_all: Dict[str, List[NVS_Entry]] = {}
for page in nvs_partition.pages:
# page: NVS_Page
# Print page header
if page.header['status'] == 'Empty':
# Check if page is truly empty
check_empty_page_content(page, nvs_log)
else:
# Check page header CRC32
check_page_crc(page, nvs_log)
# Check all entries
seen_written_entires = check_page_entries(page, nvs_log)
# Collect all seen written entries
for key in seen_written_entires:
if key in seen_written_entires_all:
seen_written_entires_all[key].extend(seen_written_entires[key])
else:
seen_written_entires_all[key] = seen_written_entires[key]
# Duplicate entry check (2) - same key, different index
duplicates = filter_entry_duplicates(seen_written_entires_all)
# Print duplicate entries
print_entry_duplicates(duplicates, nvs_log)
nvs_log.info() # Empty line
# Blob checks
check_blobs(nvs_log)
# Namespace checks
check_namespaces(nvs_log)
reset_global_variables()

View File

@ -219,6 +219,7 @@ class NVS_Entry:
self.state = entry_state
self.is_empty = self.raw == bytearray({0xFF}) * nvs_const.entry_size
self.index = index
self.page = None
namespace = self.raw[0]
entry_type = self.raw[1]

View File

@ -0,0 +1,12 @@
[pytest]
addopts = -s -p no:pytest_embedded
# log related
log_cli = True
log_cli_level = INFO
log_cli_format = %(asctime)s %(levelname)s %(message)s
log_cli_date_format = %Y-%m-%d %H:%M:%S
## log all to `system-out` when case fail
junit_logging = stdout
junit_log_passing_tests = False

View File

@ -0,0 +1,326 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from io import BufferedRandom
from io import BytesIO
from pathlib import Path
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
from zlib import crc32
import esp_idf_nvs_partition_gen.nvs_partition_gen as nvs_partition_gen
import nvs_check as nvs_check
import pytest
from esp_idf_nvs_partition_gen.nvs_partition_gen import NVS
from nvs_logger import nvs_log
from nvs_logger import NVS_Logger
from nvs_parser import nvs_const
from nvs_parser import NVS_Entry
from nvs_parser import NVS_Partition
class SilentLogger(NVS_Logger):
def __init__(self) -> None:
super().__init__()
self.color = False
def info(self, *args, **kwargs) -> None: # type: ignore
pass
logger = nvs_log # SilentLogger()
LOREM_STRING = '''Lorem ipsum dolor sit amet, consectetur adipiscing elit.
Nullam eget orci fringilla, cursus nisi sit amet, hendrerit tortor.
Vivamus lectus dolor, rhoncus eget metus id, convallis placerat quam.
Nulla facilisi.
In at aliquam nunc, in dictum augue.
Nullam dapibus ligula nec enim commodo lobortis.
Praesent facilisis ante nec magna various lobortis.
Phasellus sodales sed nisi vitae pulvinar.
Aliquam tempor quis sem et tempor.
Etiam interdum nunc quis justo pharetra, sed finibus arcu lacinia.
Suspendisse potenti.
Praesent et turpis ut justo accumsan pellentesque sed at leo.
Aenean consequat ligula ac mattis porta.
Nullam id justo a arcu tincidunt sodales.
Nunc rhoncus pretium nibh ut convallis.
Maecenas orci enim, tincidunt eget vestibulum eu, placerat non ante.
Proin sit amet felis tempor, ullamcorper sem sed, scelerisque nibh.
Aliquam sit amet semper leo, in fringilla nulla.
Vestibulum sit amet tortor tincidunt, laoreet risus eget, ullamcorper sapien.
Fusce non finibus nisl. Cras vitae dui nibh.
Sed fermentum ullamcorper various.
Integer sit amet elit sed nunc fringilla molestie nec nec diam.
Etiam et ornare tellus.
Donec tristique auctor urna, ac aliquam tellus sodales id.
Duis nec magna eget mi consequat gravida.
Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae;
Name imperdiet ante neque, nec viverra sem pellentesque vel.
Sed nec arcu non nisl tempor pretium.
Quisque facilisis auctor lobortis.
Pellentesque sed finibus sem, eu lacinia tellus.
Vivamus imperdiet non augue in tincidunt.
Sed aliquet tincidunt dignissim.
Name vehicula leo eu dolor pellentesque, ultrices tempus ex hendrerit.
'''
def get_entry_type_bin(entry_type_str: str) -> Optional[int]:
# Reverse `item_type` dict lookup
entry_type_bin: Optional[int] = next(key for key, value in nvs_const.item_type.items() if value == entry_type_str)
if entry_type_bin is None:
logger.info(logger.yellow(f'Unknown entry type {entry_type_str}'))
return entry_type_bin
def create_entry_data_bytearray(namespace_index: int, entry_type: int, span: int, chunk_index: int, key: str, data: Any) -> bytearray:
key_bytearray = bytearray(key, 'ascii')
key_encoded = (key_bytearray + bytearray({0x00}) * (16 - len(key_bytearray)))[:16] # Pad key with null bytes
key_encoded[15] = 0x00 # Null-terminate the key
is_signed = entry_type >= 0x11 and entry_type <= 0x18
entry_data: bytearray = bytearray({0xFF}) * nvs_const.entry_size # Empty entry
entry_data[0] = namespace_index
entry_data[1] = entry_type
entry_data[2] = span
entry_data[3] = chunk_index
# entry_data[4:8] # CRC32
entry_data[8:24] = key_encoded
entry_data[24:32] = data.to_bytes(8, byteorder='little', signed=is_signed)
raw_without_crc = entry_data[:4] + entry_data[8:32]
entry_data[4:8] = crc32(raw_without_crc, 0xFFFFFFFF).to_bytes(4, byteorder='little', signed=False)
return entry_data
@pytest.fixture
def generate_nvs() -> Callable:
def _execute_nvs_setup(nvs_setup_func: Callable, size: int = 0x4000, output: Optional[Path] = None) -> NVS_Partition:
nvs_file: Optional[Union[BytesIO, BufferedRandom]] = None
if output is None:
nvs_file = BytesIO()
else:
try:
nvs_file = open(output, 'wb+')
except OSError as e:
raise RuntimeError(f'Cannot open file {output}, error: {e}')
size_fixed = nvs_partition_gen.check_size(str(size))
nvs_obj = nvs_partition_gen.nvs_open(
result_obj=nvs_file,
input_size=size_fixed,
version=nvs_partition_gen.Page.VERSION2,
is_encrypt=False,
key=None
)
nvs_setup_func(nvs_obj)
nvs_partition_gen.nvs_close(nvs_obj)
nvs_file.seek(0)
nvs_parsed = NVS_Partition('test', bytearray(nvs_file.read()))
nvs_file.close()
return nvs_parsed
return _execute_nvs_setup
# Setup functions
def setup_ok_primitive(nvs_obj: NVS) -> None:
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '')
nvs_partition_gen.write_entry(nvs_obj, 'int32_test', 'data', 'i32', str(42))
nvs_partition_gen.write_entry(nvs_obj, 'uint32_test', 'data', 'u32', str(42))
nvs_partition_gen.write_entry(nvs_obj, 'int8_test', 'data', 'i8', str(100))
def setup_ok_variable_len(nvs_obj: NVS) -> None:
size_fixed = nvs_partition_gen.check_size(str('0x5000'))
nvs_obj.size = size_fixed
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '')
nvs_partition_gen.write_entry(nvs_obj, 'short_string_key', 'data', 'string', 'Hello world!')
nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_blob.bin')
nvs_partition_gen.write_entry(nvs_obj, 'lorem_string_key', 'data', 'string', LOREM_STRING * 2)
nvs_partition_gen.write_entry(nvs_obj, 'uniq_string_key', 'data', 'string', 'I am unique!')
nvs_partition_gen.write_entry(nvs_obj, 'multi_blob_key', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_multipage_blob.bin')
def setup_ok_mixed(nvs_obj: NVS) -> None:
size_fixed = nvs_partition_gen.check_size(str('0x6000'))
nvs_obj.size = size_fixed
prim_types = ['i8', 'u8', 'i16', 'u16', 'i32', 'u32']
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '')
for i in range(20):
nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', prim_types[i % len(prim_types)], str(i))
nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_singlepage_blob.bin')
nvs_partition_gen.write_entry(nvs_obj, 'etc', 'namespace', '', '')
for i in range(20):
nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', prim_types[i % len(prim_types)], str(i))
nvs_partition_gen.write_entry(nvs_obj, 'lorem_string_key', 'data', 'string', LOREM_STRING * 2)
nvs_partition_gen.write_entry(nvs_obj, 'abcd', 'namespace', '', '')
for i in range(20):
nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', prim_types[i % len(prim_types)], str(i))
nvs_partition_gen.write_entry(nvs_obj, 'uniq_string_key', 'data', 'string', 'I am unique!')
nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_multipage_blob.bin')
def setup_bad_mixed_same_key_different_page(nvs_obj: NVS) -> None:
size_fixed = nvs_partition_gen.check_size(str('0x6000'))
nvs_obj.size = size_fixed
prim_types = ['i8', 'u8', 'i16', 'u16', 'i32', 'u32']
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '')
for i in range(20):
nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', prim_types[i % len(prim_types)], str(i))
nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_singlepage_blob.bin')
nvs_partition_gen.write_entry(nvs_obj, 'etc', 'namespace', '', '')
for i in range(20):
nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', prim_types[i % len(prim_types)], str(i))
nvs_partition_gen.write_entry(nvs_obj, 'lorem_string_key', 'data', 'string', LOREM_STRING * 2)
nvs_partition_gen.write_entry(nvs_obj, 'uniq_string_key', 'data', 'string', 'I am unique!')
nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_multipage_blob.bin')
# Should be on a different page already - start creating duplicates
for i in range(6):
data_type = prim_types[i % len(prim_types)]
nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', data_type, str(i)) # Conflicting keys under "abcd" namespace - 6 duplicates
nvs_partition_gen.write_entry(nvs_obj, 'lorem_string_key', 'data', 'string', 'abc') # Conflicting key for string - 7th duplicate
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '') # Conflicting namespace - 8th duplicate
nvs_partition_gen.write_entry(nvs_obj, 'storage2', 'namespace', '', '') # New namespace, ignored
nvs_partition_gen.write_entry(nvs_obj, 'lorem_string_key', 'data', 'string', 'abc') # Should be ignored as is under different "storage2" namespace
nvs_partition_gen.write_entry(nvs_obj, 'lorem_string', 'data', 'string', 'abc') # 3 conflicting keys under "storage2" namespace - 9th duplicate
nvs_partition_gen.write_entry(nvs_obj, 'lorem_string', 'data', 'string', 'def')
nvs_partition_gen.write_entry(nvs_obj, 'lorem_string', 'data', 'string', '123')
def setup_bad_same_key_primitive(nvs_obj: NVS) -> None:
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '')
nvs_partition_gen.write_entry(nvs_obj, 'unique_key', 'data', 'i16', str(1234))
nvs_partition_gen.write_entry(nvs_obj, 'same_key', 'data', 'i32', str(42))
nvs_partition_gen.write_entry(nvs_obj, 'same_key', 'data', 'u32', str(24))
nvs_partition_gen.write_entry(nvs_obj, 'same_key', 'data', 'i8', str(-5))
nvs_partition_gen.write_entry(nvs_obj, 'another_same_key', 'data', 'u16', str(321))
nvs_partition_gen.write_entry(nvs_obj, 'another_same_key', 'data', 'u16', str(456))
def setup_bad_same_key_variable_len(nvs_obj: NVS) -> None:
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '')
nvs_partition_gen.write_entry(nvs_obj, 'same_string_key', 'data', 'string', 'Hello')
nvs_partition_gen.write_entry(nvs_obj, 'same_string_key', 'data', 'string', 'world!')
nvs_partition_gen.write_entry(nvs_obj, 'unique_string_key', 'data', 'string', 'I am unique!')
def setup_bad_same_key_blob_index(nvs_obj: NVS) -> None:
size_fixed = nvs_partition_gen.check_size(str('0x6000'))
nvs_obj.size = size_fixed
nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '')
nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_multipage_blob.bin')
nvs_partition_gen.write_entry(nvs_obj, 'blob_key_2', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_multipage_blob.bin')
nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary',
'../nvs_partition_generator/testdata/sample_multipage_blob.bin') # Duplicate key
# Helper functions
def prepare_duplicate_list(nvs: NVS_Partition) -> Dict[str, List[NVS_Entry]]:
seen_written_entires_all: Dict[str, List[NVS_Entry]] = {}
for page in nvs.pages:
# page: NVS_Page
for entry in page.entries:
# entry: NVS_Entry
# Duplicate entry check (1) - same key, different index - find duplicates
seen_written_entires_all = nvs_check.identify_entry_duplicates(entry, seen_written_entires_all)
# Duplicate entry check (2) - same key, different index
duplicates: Dict[str, List[NVS_Entry]] = nvs_check.filter_entry_duplicates(seen_written_entires_all)
return duplicates
# Tests
@pytest.mark.parametrize('setup_func', [setup_ok_primitive, setup_ok_variable_len, setup_ok_mixed])
def test_check_partition_size(generate_nvs: Callable, setup_func: Callable) -> None:
nvs = generate_nvs(setup_func)
assert nvs_check.check_partition_size(nvs, logger)
@pytest.mark.parametrize('setup_func', [setup_ok_primitive, setup_ok_variable_len, setup_ok_mixed])
def test_check_empty_page_present(generate_nvs: Callable, setup_func: Callable) -> None:
nvs = generate_nvs(setup_func)
assert nvs_check.check_empty_page_present(nvs, logger)
@pytest.mark.parametrize('setup_func', [setup_ok_primitive, setup_ok_variable_len, setup_ok_mixed])
def test_check_empty_page_content__check_page_crc(generate_nvs: Callable, setup_func: Callable) -> None:
nvs = generate_nvs(setup_func)
for page in nvs.pages:
if page.header['status'] == 'Empty':
assert page.is_empty
assert nvs_check.check_empty_page_content(page, logger)
else:
assert not page.is_empty
assert nvs_check.check_page_crc(page, logger)
@pytest.mark.parametrize('setup_func', [setup_ok_primitive, setup_ok_variable_len, setup_ok_mixed])
def test_check_duplicates_ok(generate_nvs: Callable, setup_func: Callable) -> None:
nvs = generate_nvs(setup_func)
duplicates = prepare_duplicate_list(nvs)
assert len(duplicates) == 0 # No duplicates in any page
@pytest.mark.parametrize('setup_func', [setup_bad_same_key_primitive])
def test_check_duplicates_bad_same_key_primitive_type(generate_nvs: Callable, setup_func: Callable) -> None:
nvs = generate_nvs(setup_func)
duplicates = prepare_duplicate_list(nvs)
assert len(duplicates) == 2 # 2 different lists of duplicate keys
assert len(list(duplicates.values())[0]) == 3 # 3 entries with the same_key
assert len(list(duplicates.values())[1]) == 2 # 2 entries with the another_same_key
nvs_check.integrity_check(nvs, logger)
@pytest.mark.parametrize('setup_func', [setup_bad_same_key_variable_len])
def test_check_duplicates_bad_same_key_variable_len_type(generate_nvs: Callable, setup_func: Callable) -> None:
nvs = generate_nvs(setup_func)
duplicates = prepare_duplicate_list(nvs)
assert len(duplicates) == 1 # Only one duplicate key list
assert len(list(duplicates.values())[0]) == 2 # 2 entries with the same_string_key
nvs_check.integrity_check(nvs, logger)
@pytest.mark.parametrize('setup_func', [setup_bad_mixed_same_key_different_page])
def test_check_duplicates_bad_same_key_different_pages(generate_nvs: Callable, setup_func: Callable) -> None:
nvs = generate_nvs(setup_func)
duplicates = prepare_duplicate_list(nvs)
assert len(duplicates) == 9 # 9 duplicate keys in total (8 pairs of 2 duplicates + 1 triplet)
for i, value in enumerate(list(duplicates.values())):
if i < 8:
assert len(value) == 2 # i in range 0-7 -- pairs of 2 entries with the same key
else:
assert len(value) == 3 # i == 8 -- 3 entries with the lorem_string key
nvs_check.integrity_check(nvs, logger)
@pytest.mark.parametrize('setup_func', [setup_bad_same_key_blob_index])
def test_check_duplicates_bad_same_key_blob_index(generate_nvs: Callable, setup_func: Callable) -> None:
nvs = generate_nvs(setup_func)
duplicates = prepare_duplicate_list(nvs)
assert len(duplicates) == 1 # Only one duplicate key list - blob_index and blob_data share the same key (which is OK),
# however there are 2 duplicates of each blob_index and blob_data
assert len(list(duplicates.values())[0]) == 6 # 6 entries with the blob_key (2x blob_index, 4x blob_data)
nvs_check.integrity_check(nvs, logger)