nvs: NVS tool refinement

add: JSON output
add: integrity check - correct number of free pages check
change: split to separate files
This commit is contained in:
Adam Múdry 2022-10-18 16:38:37 +02:00
parent 11ec9d392d
commit 87bdcc89d0
6 changed files with 1082 additions and 783 deletions

View File

@ -1,780 +0,0 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import argparse
import sys
import traceback
from typing import Any, Dict, List
from zlib import crc32
class Logger:
ansi = {
'red': '\033[31m',
'green': '\033[32m',
'yellow': '\033[33m',
'blue': '\033[34m',
'cyan': '\033[36m',
'bold': '\033[1m',
'clear': '\033[0m'
}
def __init__(self, *, color:str='auto'):
self.color = color == 'always' or (color == 'auto' and sys.stdout.isatty())
def set_color(self, color:str) -> None:
self.color = color == 'always' or (color == 'auto' and sys.stdout.isatty())
def info(self, *args, **kwargs) -> None: # type: ignore
kwargs['file'] = kwargs.get('file', sys.stdout) # Set default output to be stdout, but can be overwritten
print(*args, **kwargs)
def error(self, *args, **kwargs) -> None: # type: ignore
kwargs['file'] = kwargs.get('file', sys.stderr) # Set default output to be stderr, but can be overwritten
print(*args, **kwargs)
def red(self, text:str) -> str:
if self.color:
return Logger.ansi['red'] + text + Logger.ansi['clear']
return text
def green(self, text:str) -> str:
if self.color:
return Logger.ansi['green'] + text + Logger.ansi['clear']
return text
def yellow(self, text:str) -> str:
if self.color:
return Logger.ansi['yellow'] + text + Logger.ansi['clear']
return text
def blue(self, text:str) -> str:
if self.color:
return Logger.ansi['blue'] + text + Logger.ansi['clear']
return text
def cyan(self, text:str) -> str:
if self.color:
return Logger.ansi['cyan'] + text + Logger.ansi['clear']
return text
def bold(self, text:str) -> str:
if self.color:
return Logger.ansi['bold'] + text + Logger.ansi['clear']
return text
# Constants
class Constants:
class ConstantError(AttributeError):
pass
def __init__(self) -> None:
self.page_size = 4096
self.entry_size = 32
self.item_type = {
0x01: 'uint8_t',
0x11: 'int8_t',
0x02: 'uint16_t',
0x12: 'int16_t',
0x04: 'uint32_t',
0x14: 'int32_t',
0x08: 'uint64_t',
0x18: 'int64_t',
0x21: 'string',
0x41: 'blob',
0x42: 'blob_data',
0x48: 'blob_index',
}
self.page_status = {
0xffffffff: 'Empty',
0xfffffffe: 'Active',
0xfffffffc: 'Full',
0xfffffff8: 'Freeing',
}
self.entry_status = {
0b11: 'Empty',
0b10: 'Written',
0b00: 'Erased',
}
def __setattr__(self, key:str, val:Any) -> None:
if self.__dict__.get(key, None) is None:
self.__dict__[key] = val
else:
raise Constants.ConstantError('Cannot change a constant!')
const = Constants()
log = Logger()
class NotAlignedError(ValueError):
pass
class NVS_Partition:
def __init__(self, name:str, raw_data:bytearray):
if len(raw_data) % const.page_size != 0:
raise NotAlignedError(f'Given partition data is not aligned to page size ({len(raw_data)} % {const.page_size} = {len(raw_data)%const.page_size})')
# Divide partition into pages
self.name = name
self.pages = []
for i in range(0, len(raw_data), const.page_size):
self.pages.append(NVS_Page(raw_data[i:i + const.page_size], i))
class NVS_Page:
def __init__(self, page_data:bytearray, address:int):
if len(page_data) != const.page_size:
raise NotAlignedError(f'Size of given page does not match page size ({len(page_data)} != {const.page_size})')
# Initialize class
self.is_empty = page_data[0:const.entry_size] == bytearray({0xff}) * const.entry_size
self.start_address = address
self.raw_header = page_data[0:const.entry_size]
self.raw_entry_state_bitmap = page_data[const.entry_size:2 * const.entry_size]
self.entries = []
# Load header
self.header:Dict[str, Any] = {
'status': const.page_status.get(int.from_bytes(page_data[0:4], byteorder='little'), 'Invalid'),
'page_index': int.from_bytes(page_data[4:8], byteorder='little'),
'version': 256 - page_data[8],
'crc':{
'original': int.from_bytes(page_data[28:32], byteorder='little'),
'computed': crc32(page_data[4:28], 0xFFFFFFFF)
}
}
# Load entry state bitmap
entry_states = []
for c in self.raw_entry_state_bitmap:
for index in range(0, 8, 2):
entry_states.append(const.entry_status.get((c >> index) & 3, 'Invalid'))
entry_states = entry_states[:-2]
# Load entries
i = 2
while i < int(const.page_size / const.entry_size): # Loop through every entry
span = page_data[(i * const.entry_size) + 2]
if span in [0xff, 0]: # 'Default' span length to prevent span overflow
span = 1
# Load an entry
entry = NVS_Entry(i - 2, page_data[i * const.entry_size:(i + 1) * const.entry_size], entry_states[i - 2])
self.entries.append(entry)
# Load all children entries
if span != 1:
for span_idx in range(1, span):
page_addr = i + span_idx
entry_idx = page_addr - 2
if page_addr * const.entry_size >= const.page_size:
break
child_entry = NVS_Entry(entry_idx, page_data[page_addr * const.entry_size:(page_addr + 1) * const.entry_size], entry_states[entry_idx])
entry.child_assign(child_entry)
entry.compute_crc()
i += span
class NVS_Entry:
def __init__(self, index:int, entry_data:bytearray, entry_state:str):
if len(entry_data) != const.entry_size:
raise NotAlignedError(f'Given entry is not aligned to entry size ({len(entry_data)} % {const.entry_size} = {len(entry_data)%const.entry_size})')
def item_convert(i_type:int, data:bytearray) -> Dict:
byte_size_mask = 0x0f
number_sign_mask = 0xf0
fixed_entry_length_threshold = 0x20 # Fixed length entry type number is always smaller than this
if i_type in const.item_type:
# Deal with non variable length entries
if i_type < fixed_entry_length_threshold:
size = i_type & byte_size_mask
num = int.from_bytes(data[:size], byteorder='little', signed=bool(i_type & number_sign_mask))
return {'value':num}
# Deal with variable length entries
if const.item_type[i_type] in ['string', 'blob_data', 'blob']:
size = int.from_bytes(data[:2], byteorder='little')
crc = int.from_bytes(data[4:8], byteorder='little')
return {'value': [size, crc],
'size': size,
'crc': crc}
if const.item_type[i_type] == 'blob_index':
size = int.from_bytes(data[:4], byteorder='little')
chunk_count = data[4]
chunk_start = data[5]
return {'value': [size, chunk_count, chunk_start],
'size': size,
'chunk_count': chunk_count,
'chunk_start': chunk_start}
return {'value': log.red('Cannot parse')}
def key_decode(data:bytearray) -> str:
decoded = ''
for n in data:
char = chr(n)
if char.isprintable():
decoded += char
return decoded
self.raw = entry_data
self.state = entry_state
self.is_empty = self.raw == bytearray({0xff}) * const.entry_size
self.index = index
namespace = self.raw[0]
entry_type = self.raw[1]
span = self.raw[2]
chunk_index = self.raw[3]
crc = self.raw[4:8]
key = self.raw[8:24]
data = self.raw[24:32]
raw_without_crc = self.raw[:4] + self.raw[8:32]
self.metadata: Dict[str, Any] = {
'namespace': namespace,
'type': const.item_type.get(entry_type, f'0x{entry_type:02x}'),
'span': span,
'chunk_index': chunk_index,
'crc':{
'original': int.from_bytes(crc, byteorder='little'),
'computed': crc32(raw_without_crc, 0xFFFFFFFF),
'data_original': int.from_bytes(data[-4:], byteorder='little'),
'data_computed': 0
}
}
self.children: List['NVS_Entry'] = []
self.key = key_decode(key)
self.data = item_convert(entry_type, data)
def dump_raw(self) -> str:
hex_bytes = ''
decoded = ''
for i, c in enumerate(self.raw):
middle_index = int(len(self.raw) / 2)
if i == middle_index: # Add a space in the middle
hex_bytes += ' '
decoded += ' '
hex_bytes += f'{c:02x} '
decoded += chr(c) if chr(c).isprintable() else '.'
return hex_bytes + ' ' + decoded
def child_assign(self, entry:'NVS_Entry') -> None:
if not isinstance(entry, type(self)):
raise ValueError('You can assign only NVS_Entry')
self.children.append(entry)
def compute_crc(self) -> None:
if self.metadata['span'] == 1:
return
# Merge entries into one buffer
children_data = bytearray()
for entry in self.children:
children_data += entry.raw
children_data = children_data[:self.data['size']] # Discard padding
self.metadata['crc']['data_computed'] = crc32(children_data, 0xFFFFFFFF)
def storage_stats(nvs_partition:NVS_Partition) -> None:
global_stats = {
'written_entries': 0,
'free_entries': 0,
'erased_entries': 0,
'invalid_entries': 0
}
for page in nvs_partition.pages:
written_e = 0
free_e = 0
erased_e = 0
invalid_e = 0
for entry in page.entries:
if entry.state == 'Written':
written_e += 1
elif entry.state == 'Empty':
free_e += 1
elif entry.state == 'Erased':
erased_e += 1
else:
invalid_e += 1
log.info(log.bold(f'Page {page.header["status"]}'))
log.info(' Found entries:')
log.info(f' Written: {written_e: 5d}')
log.info(f' Erased: {erased_e: 5d}')
log.info(f' Empty: {free_e: 5d}')
log.info(f' Invalid: {invalid_e: 5d}')
log.info(f' Total: {written_e + free_e + erased_e + invalid_e: 5d}')
log.info()
global_stats['written_entries'] += written_e
global_stats['erased_entries'] += erased_e
global_stats['free_entries'] += free_e
global_stats['invalid_entries'] += invalid_e
log.info(log.bold('Global'))
log.info(' Config:')
log.info(f' Page size: {const.page_size: 5d}')
log.info(f' Entry size: {const.entry_size: 5d}')
log.info(f' Total pages: {len(nvs_partition.pages): 5d}')
log.info(' Entries:')
log.info(f' Written: {global_stats["written_entries"]: 5d}')
log.info(f' Erased: {global_stats["erased_entries"]: 5d}')
log.info(f' Empty: {global_stats["free_entries"]: 5d}')
log.info(f' Invalid: {global_stats["invalid_entries"]: 5d}')
log.info(f' Total: {sum([global_stats[key] for key in global_stats]): 5d}')
log.info()
def dump_everything(nvs_partition:NVS_Partition, written_only:bool=False) -> None:
for page in nvs_partition.pages:
# Print page header
if page.is_empty:
log.info(log.bold(f'Page Empty, Page address: 0x{page.start_address:x}'))
else:
if page.header['crc']['original'] == page.header['crc']['computed']: # Color CRC32
crc = log.green(f'{page.header["crc"]["original"]: >8x}')
else:
crc = log.red(f'{page.header["crc"]["original"]: >8x}')
log.info(log.bold(f'Page no. {page.header["page_index"]}'
+ f', Status: {page.header["status"]}'
+ f', Version: {page.header["version"]}'
+ f', CRC32: {crc}')
+ log.bold(f', Page address: 0x{page.start_address:x}'))
log.info(log.bold(' Entry state bitmap: '), end='')
for x in page.raw_entry_state_bitmap:
log.info(f'{x:02x} ', end='')
log.info()
# Dump entries
empty_entries = []
for entry in page.entries:
# Skip non-written entries if needed
if written_only and not entry.state == 'Written':
continue
# Compress all empty entries
if entry.state == 'Empty' and entry.is_empty: # Gather all subsequent empty entries
empty_entries.append(entry)
continue
else:
# Print the empty entries
if len(empty_entries) >= 3: # There is enough entries to compress
log.info(log.bold(f' {empty_entries[0].index:03d}.'), 'Empty')
log.info(log.bold(' ...'))
log.info(log.bold(f' {empty_entries[-1].index:03d}.'), 'Empty')
else: # No need for compression
for e in empty_entries:
log.info(log.bold(f' {e.index:03d}.'), 'Empty')
empty_entries.clear()
# Dump a single entry
status = entry.state
if status == 'Written':
status = log.green(f'{status: <7}')
elif status == 'Erased':
status = log.red(f'{status: <7}')
crc = ''
if entry.metadata['crc']['original'] == entry.metadata['crc']['computed']: # Color CRC32
crc = log.green(f'{entry.metadata["crc"]["original"]: >8x}')
else:
crc = log.red(f'{entry.metadata["crc"]["original"]: >8x}')
log.info(log.bold(f' {entry.index:03d}.')
+ ' ' + status
+ f', Namespace Index: {entry.metadata["namespace"]:03d}'
+ f', Type: {entry.metadata["type"]:<10}'
+ f', Span: {entry.metadata["span"]:03d}'
+ f', Chunk Index: {entry.metadata["chunk_index"]:03d}'
+ f', CRC32: {crc}'
+ f' | {entry.key} : ', end='')
if entry.metadata['type'] not in ['string', 'blob_data', 'blob_index', 'blob']: # Entry is non-variable length
log.info(entry.data['value'])
else:
if entry.metadata['type'] == 'blob_index':
log.info(f'Size={entry.data["size"]}'
+ f', ChunkCount={entry.data["chunk_count"]}'
+ f', ChunkStart={entry.data["chunk_start"]}')
else:
if entry.metadata['crc']['data_original'] == entry.metadata['crc']['data_computed']: # Color CRC32
crc = log.green(f'{entry.metadata["crc"]["data_original"]:x}')
else:
crc = log.red(f'{entry.metadata["crc"]["data_original"]:x}')
log.info(f'Size={entry.data["size"]}, CRC32={crc}')
# Dump all children entries
if entry.metadata['span'] != 1:
for i, data in enumerate(entry.children):
log.info(f'{"": >6}0x{(i*const.entry_size):03x} {data.dump_raw()}')
# Dump trailing empty entries
if len(empty_entries) >= 3:
log.info(log.bold(f' {empty_entries[0].index:03d}.'), 'Empty')
log.info(log.bold(' ...'))
log.info(log.bold(f' {empty_entries[-1].index:03d}.'), 'Empty')
else:
for e in empty_entries:
log.info(log.bold(f' {e.index:03d}.'), 'Empty')
empty_entries.clear()
log.info()
def dump_written_entries(nvs_partition:NVS_Partition) -> None:
dump_everything(nvs_partition, True)
def list_namespaces(nvs_partition:NVS_Partition) -> None:
# Gather namespaces
ns = {}
for page in nvs_partition.pages:
for entry in page.entries:
if entry.state == 'Written' and entry.metadata['namespace'] == 0:
ns[entry.data['value']] = entry.key
# Print found namespaces
log.info(log.bold(f'Index : Namespace'))
for ns_index in sorted(ns):
log.info(f' {ns_index:03d} :', log.cyan(ns[ns_index]))
def dump_key_value_pairs(nvs_partition:NVS_Partition) -> None:
# Get namespace list
ns = {}
for page in nvs_partition.pages:
for entry in page.entries:
if entry.state == 'Written' and entry.metadata['namespace'] == 0:
ns[entry.data['value']] = entry.key
# Print key-value pairs
for page in nvs_partition.pages:
# Print page header
if page.is_empty:
log.info(log.bold('Page Empty'))
else:
log.info(log.bold(f'Page no. {page.header["page_index"]}'
+ f', Status: {page.header["status"]}'))
# Print entries
for entry in page.entries:
if entry.state == 'Written' and entry.metadata['namespace'] != 0: # Ignore non-written entries
chunk_index = ''
data = ''
if entry.metadata['type'] not in ['string', 'blob_data', 'blob_index', 'blob']: # Non-variable length entry
data = entry.data['value']
elif entry.metadata['type'] == 'blob_index':
continue
else: # Variable length entries
tmp = b''
for e in entry.children: # Merge all children entries
tmp += bytes(e.raw)
tmp = tmp[:entry.data['size']] # Discard padding
if entry.metadata['type'] == 'blob_data':
if entry.metadata['chunk_index'] >= 128: # Get real chunk index
chunk_index = f'[{entry.metadata["chunk_index"] - 128}]'
else:
chunk_index = f'[{entry.metadata["chunk_index"]}]'
data = str(tmp)
if entry.metadata['namespace'] not in ns:
continue
else:
log.info(' '
+ log.cyan(ns[entry.metadata['namespace']])
+ ':'
+ log.yellow(entry.key)
+ f'{chunk_index} = {data}')
log.info()
def dump_written_blobs(nvs_partition:NVS_Partition) -> None:
blobs: Dict = {}
strings: List[NVS_Entry] = []
legacy_blobs: List[NVS_Entry] = []
ns = {}
empty_entry = NVS_Entry(-1, bytearray(32), 'Erased')
# Gather namespaces, blob indexes and legacy blobs
for page in nvs_partition.pages:
for entry in page.entries:
if entry.state == 'Written':
if entry.metadata['type'] == 'blob_index':
blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [empty_entry] * entry.data['chunk_count']
elif entry.metadata['type'] == 'blob':
legacy_blobs.append(entry)
elif entry.metadata['type'] == 'string':
strings.append(entry)
elif entry.metadata['namespace'] == 0:
ns[entry.data['value']] = entry.key
# Dump blobs
for key in blobs:
for page in nvs_partition.pages:
for entry in page.entries:
# Gather all blob chunks
if entry.state == 'Written' \
and entry.metadata['type'] != 'blob_index' \
and entry.metadata['namespace'] == blobs[key][0].metadata['namespace'] \
and entry.key == blobs[key][0].key:
blobs[key][1 + entry.metadata['chunk_index'] - blobs[key][0].data['chunk_start']] = entry
blob_index = blobs[key][0]
blob_chunks = blobs[key][1:]
# Print blob info
log.info(log.cyan(ns.get(blob_index.metadata['namespace'], blob_index.metadata['namespace']))
+ ':'
+ log.yellow(blob_index.key)
+ ' - '
+ f'Type: Blob (Version 2), '
+ f'Size: {blob_index.data["size"]}')
# Print blob data
raw_entries = []
for kid in blob_chunks: # Gather all chunk entries
if kid is empty_entry:
raw_entries += [empty_entry]
else:
raw_entries += kid.children
for i, entry in enumerate(raw_entries):
if entry is empty_entry:
log.info(log.yellow(f' {"":->63} Missing data {"":-<64}'))
else:
log.info(f' 0x{(i * const.entry_size):05x} {entry.dump_raw()}')
log.info()
# Dump strings
for string in strings:
log.info(log.cyan(ns.get(string.metadata['namespace'], string.metadata['namespace']))
+ ':'
+ log.yellow(string.key)
+ ' - '
+ 'Type: String, '
+ f'Size: {string.data["size"]}')
for i, entry in enumerate(string.children):
log.info(f' 0x{(i * const.entry_size):05x} {entry.dump_raw()}')
log.info()
# Dump legacy blobs
for blob in legacy_blobs:
log.info(log.cyan(ns.get(blob.metadata['namespace'], blob.metadata['namespace']))
+ ':'
+ log.yellow(blob.key)
+ ' - '
+ 'Type: Blob (Version 1), '
+ f'Size: {blob.data["size"]}')
for i, entry in enumerate(blob.children):
log.info(f' 0x{(i * const.entry_size):05x} {entry.dump_raw()}')
log.info()
def integrity_check(nvs_partition:NVS_Partition) -> None:
used_namespaces: Dict[int, None] = {}
found_namespaces: Dict[int, str] = {}
blobs:Dict = {}
blob_chunks: List[NVS_Entry] = []
empty_entry = NVS_Entry(-1, bytearray(32), 'Erased')
# Partition size
if len(nvs_partition.pages) < 3:
log.info(log.yellow('Partition has to have at least 3 pages to function properly!'))
for page in nvs_partition.pages:
# Print page header
if page.header['status'] == 'Empty':
log.info(log.cyan('Page Empty'))
# Check if page is truly empty
if page.raw_entry_state_bitmap != bytearray({0xff}) * const.entry_size:
log.info(log.red('Page is reported as empty but entry state bitmap is not empty!'))
if any([not e.is_empty for e in page.entries]):
log.info(log.red('Page is reported as emtpy but there is written data!'))
else:
# Check page header CRC32
if page.header['crc']['original'] == page.header['crc']['computed']:
log.info(log.cyan(f'Page no. {page.header["page_index"]}'))
else:
log.info(log.cyan(f'Page no. {page.header["page_index"]}'),
f'Written CRC32:',
log.red(f'{page.header["crc"]["original"]:x}'),
f'Generated CRC32:',
log.green(f'{page.header["crc"]["computed"]:x}'))
# Check all entries
for entry in page.entries:
# Entry state check
if entry.is_empty:
if entry.state == 'Written':
log.info(log.red(f' Entry #{entry.index:03d} is reported as written but is empty!'))
continue
elif entry.state == 'Erased':
log.info(log.yellow(f' Entry #{entry.index:03d} is reported as erased but is empty!'))
if entry.state == 'Written':
# Entry CRC32 check
if entry.metadata['crc']['original'] != entry.metadata['crc']['computed']:
log.info(log.red(f' Entry #{entry.index:03d} {entry.key} has wrong CRC32!{"": <5}'),
f'Written:',
log.red(f'{entry.metadata["crc"]["original"]:x}'),
f'Generated:',
log.green(f'{entry.metadata["crc"]["computed"]:x}'))
# Entry children CRC32 check
if entry.metadata['span'] > 1 and entry.metadata['crc']['data_original'] != entry.metadata['crc']['data_computed']:
log.info(log.red(f' Entry #{entry.index:03d} {entry.key} data has wrong CRC32!'),
f'Written:',
log.red(f'{entry.metadata["crc"]["original"]:x}'),
f'Generated:',
log.green(f'{entry.metadata["crc"]["computed"]:x}'))
# Entry type check
if entry.metadata['type'] not in [const.item_type[key] for key in const.item_type]:
log.info(log.yellow(f' Type of entry #{entry.index:03d} {entry.key} is unrecognized!'),
f'Type: {entry.metadata["type"]}')
# Span check
if entry.index + entry.metadata['span'] - 1 >= int(const.page_size / const.entry_size) - 2:
log.info(log.red(f' Variable length entry #{entry.index:03d} {entry.key} is out of bounds!'))
# Spanned entry state checks
elif entry.metadata['span'] > 1:
parent_state = entry.state
for kid in entry.children:
if parent_state != kid.state:
log.info(log.yellow(' Inconsistent data state!'),
f'Entry #{entry.index:03d} {entry.key} state: {parent_state},',
f'Data entry #{kid.index:03d} {entry.key} state: {kid.state}')
# Gather blobs & namespaces
if entry.metadata['type'] == 'blob_index':
blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [empty_entry] * entry.data['chunk_count']
elif entry.metadata['type'] == 'blob_data':
blob_chunks.append(entry)
if entry.metadata['namespace'] == 0:
found_namespaces[entry.data['value']] = entry.key
else:
used_namespaces[entry.metadata['namespace']] = None
log.info()
# Assemble blobs
for chunk in blob_chunks:
parent = blobs.get(f'{chunk.metadata["namespace"]:03d}{chunk.key}', [empty_entry])[0]
if parent is empty_entry:
log.info(log.red(f'Blob {chunk.key} chunk has no blob index!'),
f'Namespace index: {chunk.metadata["namespace"]:03d}',
f'[{found_namespaces.get(chunk.metadata["namespace"], "undefined")}],',
f'Chunk Index: {chunk.metadata["chunk_index"]:03d}')
else:
blob_key = f'{chunk.metadata["namespace"]:03d}{chunk.key}'
chunk_index = chunk.metadata['chunk_index'] - parent.data['chunk_start']
blobs[blob_key][chunk_index + 1] = chunk
# Check all blobs
for blob_key in blobs:
blob_index = blobs[blob_key][0]
blob_chunks = blobs[blob_key][1:]
blob_size = blob_index.data['size']
# Blob data check
for i, chunk in enumerate(blob_chunks):
if chunk is empty_entry:
log.info(log.red(f'Blob {blob_index.key} is missing a chunk!'),
f'Namespace index: {blob_index.metadata["namespace"]:03d}',
f'[{found_namespaces.get(blob_index.metadata["namespace"], "undefined")}],',
f'Chunk Index: {i:03d}')
else:
blob_size -= len(chunk.children) * const.entry_size
if blob_size > 0:
log.info(log.red(f'Blob {blob_index.key} is missing {blob_size} B of data!'),
f'Namespace index: {blob_index.metadata["namespace"]:03d}')
# Namespaces
for used_ns in used_namespaces:
key = found_namespaces.pop(used_ns, '')
if key == '':
log.info(log.red('Undefined namespace index!'),
f'Namespace index: {used_ns:03d}',
f'[undefined]')
for unused_ns in found_namespaces:
log.info(log.yellow('Found unused namespace.'),
f'Namespace index: {unused_ns:03d}',
f'[{found_namespaces[unused_ns]}]')
def program_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description='Parse NVS partition')
parser.add_argument('file', help='Path to dumped NVS partition')
parser.add_argument('-i', '--integrity-check', action='store_true',
help='Check partition for potential errors')
tmp = {'all': 'Print everything',
'written': 'Print only currently written entries',
'minimal': 'Print only namespace:key=value pairs',
'namespaces': 'List all written namespaces',
'blobs': 'Print all blobs and strings',
'storage_info': 'Print storage related information (free/used entries, etc)',
'none': 'Do not print anything (if you only want to do integrity check)'}
parser.add_argument('-d', '--dump', choices=tmp, default='all', metavar='type',
help=f"type={{{str(list(tmp.keys()))[1:-1]}}} : {' ; '.join([f'{opt} - {tmp[opt]}' for opt in tmp])}")
parser.add_argument('--color', choices=['never', 'auto', 'always'], default='auto', help='Enable color (ANSI)')
return parser.parse_args()
def main() -> None:
args = program_args()
if const.entry_size != 32:
raise ValueError(f'Entry size is not 32 B! This is currently non negotiable.')
log.set_color(args.color)
try:
with open(args.file, 'rb') as f:
partition = f.read()
except IndexError:
log.error('No file given')
raise
except FileNotFoundError:
log.error('Bad filename')
raise
nvs = NVS_Partition(args.file.split('/')[-1], bytearray(partition))
def noop(_:NVS_Partition) -> None:
pass
def not_implemented(_:NVS_Partition) -> None:
raise RuntimeError(f'{args.dump} is not implemented')
cmds = {
'all': dump_everything,
'written': dump_written_entries,
'minimal': dump_key_value_pairs,
'namespaces': list_namespaces,
'blobs': dump_written_blobs,
'storage_info': storage_stats,
'none': noop,
}
cmds.get(args.dump, not_implemented)(nvs) # type: ignore
if args.integrity_check:
log.info()
integrity_check(nvs)
if __name__ == '__main__':
try:
main()
except ValueError:
traceback.print_exc(file=sys.stderr)
sys.exit(1)
except Constants.ConstantError:
traceback.print_exc(file=sys.stderr)
sys.exit(1)

View File

@ -6,7 +6,9 @@ NVS Partition Parser Utility
Introduction
------------
The utility :component_file:`nvs_flash/nvs_partition_parser/nvs_read.py` loads and parses an NVS storage partition for easier debugging and data extraction. The utility also features integrity check which scans the partition for potential errors.
The utility :component_file:`nvs_flash/nvs_partition_parser/nvs_tool.py` loads and parses an NVS storage partition for easier debugging and data extraction.
The utility also features integrity check which scans the partition for potential errors.
Data blobs are encoded in `base64` format.
Encrypted Partitions
--------------------
@ -16,7 +18,11 @@ This utility does not support decryption. To decrypt the NVS partition, please u
Usage
-----
The utility provides six different output styles with `-d` or `--dump` option:
There are 2 output format styles available with `-f` or `--format` option:
- `json` - All of the output is printed as a JSON.
- `text` - The output is printed as a human-readable text with different selectable output styles mentioned below.
For the `text` output format the utility provides six different output styles with `-d` or `--dump` option:
- `all` (default) - Prints all entries with metadata.
- `written` - Prints only written entries with metadata.
- `minimal` - Prints written `namespace:key = value` pairs.
@ -26,4 +32,4 @@ The utility provides six different output styles with `-d` or `--dump` option:
.. note:: There is also a `none` option which will not print anything. This can be used with the integrity check option if the NVS partition contents are irrelevant.
The utility also provides an integrity check feature via the `-i` or `--integrity-check` option. This feature scans through the entire partition and prints potential errors. It can be used with the `-d none` option which will print only the potential errors.
The utility also provides an integrity check feature via the `-i` or `--integrity-check` option (available only with the `text` format as it would invalidate the `json` output). This feature scans through the entire partition and prints potential errors. It can be used with the `-d none` option which will print only the potential errors.

View File

@ -0,0 +1,665 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import binascii
import json
import sys
from typing import Any, Dict, List, Union
from nvs_parser import NVS_Entry, NVS_Partition, nvs_const
class NVS_Logger:
ansi = {
'red': '\033[31m',
'green': '\033[32m',
'yellow': '\033[33m',
'blue': '\033[34m',
'cyan': '\033[36m',
'bold': '\033[1m',
'clear': '\033[0m',
}
def __init__(self, *, color: str = 'auto', out_format: str = 'text'):
self.color = color == 'always' or (color == 'auto' and sys.stdout.isatty())
self.output_format = out_format
def set_color(self, color: str) -> None:
self.color = color == 'always' or (color == 'auto' and sys.stdout.isatty())
def set_format(self, out_format: str) -> None:
self.output_format = out_format
def info(self, *args, **kwargs) -> None: # type: ignore
kwargs['file'] = kwargs.get(
'file', sys.stdout
) # Set default output to be stdout, but can be overwritten
print(*args, **kwargs)
def error(self, *args, **kwargs) -> None: # type: ignore
kwargs['file'] = kwargs.get(
'file', sys.stderr
) # Set default output to be stderr, but can be overwritten
print(*args, **kwargs)
def red(self, text: str) -> str:
if self.color:
return NVS_Logger.ansi['red'] + text + NVS_Logger.ansi['clear']
return text
def green(self, text: str) -> str:
if self.color:
return NVS_Logger.ansi['green'] + text + NVS_Logger.ansi['clear']
return text
def yellow(self, text: str) -> str:
if self.color:
return NVS_Logger.ansi['yellow'] + text + NVS_Logger.ansi['clear']
return text
def blue(self, text: str) -> str:
if self.color:
return NVS_Logger.ansi['blue'] + text + NVS_Logger.ansi['clear']
return text
def cyan(self, text: str) -> str:
if self.color:
return NVS_Logger.ansi['cyan'] + text + NVS_Logger.ansi['clear']
return text
def bold(self, text: str) -> str:
if self.color:
return NVS_Logger.ansi['bold'] + text + NVS_Logger.ansi['clear']
return text
nvs_log = NVS_Logger()
def storage_stats(nvs_partition: NVS_Partition) -> None:
global_stats = {
'written_entries': 0,
'free_entries': 0,
'erased_entries': 0,
'invalid_entries': 0,
}
for page in nvs_partition.pages:
written_e = 0
free_e = 0
erased_e = 0
invalid_e = 0
for entry in page.entries:
if entry.state == 'Written':
written_e += 1
elif entry.state == 'Empty':
free_e += 1
elif entry.state == 'Erased':
erased_e += 1
else:
invalid_e += 1
nvs_log.info(nvs_log.bold(f'Page {page.header["status"]}'))
nvs_log.info(' Found entries:')
nvs_log.info(f' Written: {written_e: 5d}')
nvs_log.info(f' Erased: {erased_e: 5d}')
nvs_log.info(f' Empty: {free_e: 5d}')
nvs_log.info(f' Invalid: {invalid_e: 5d}')
nvs_log.info(f' Total: {written_e + free_e + erased_e + invalid_e: 5d}')
nvs_log.info()
global_stats['written_entries'] += written_e
global_stats['erased_entries'] += erased_e
global_stats['free_entries'] += free_e
global_stats['invalid_entries'] += invalid_e
nvs_log.info(nvs_log.bold('Global'))
nvs_log.info(' Config:')
nvs_log.info(f' Page size: {nvs_const.page_size: 5d}')
nvs_log.info(f' Entry size: {nvs_const.entry_size: 5d}')
nvs_log.info(f' Total pages: {len(nvs_partition.pages): 5d}')
nvs_log.info(' Entries:')
nvs_log.info(f' Written: {global_stats["written_entries"]: 5d}')
nvs_log.info(f' Erased: {global_stats["erased_entries"]: 5d}')
nvs_log.info(f' Empty: {global_stats["free_entries"]: 5d}')
nvs_log.info(f' Invalid: {global_stats["invalid_entries"]: 5d}')
nvs_log.info(f' Total: {sum([global_stats[key] for key in global_stats]): 5d}')
nvs_log.info()
def dump_everything(nvs_partition: NVS_Partition, written_only: bool = False) -> None:
for page in nvs_partition.pages:
# Print page header
if page.is_empty:
nvs_log.info(
nvs_log.bold(f'Page Empty, Page address: 0x{page.start_address:x}')
)
else:
if (
page.header['crc']['original'] == page.header['crc']['computed']
): # Color CRC32
crc = nvs_log.green(f'{page.header["crc"]["original"]: >8x}')
else:
crc = nvs_log.red(f'{page.header["crc"]["original"]: >8x}')
nvs_log.info(
nvs_log.bold(
f'Page no. {page.header["page_index"]}'
+ f', Status: {page.header["status"]}'
+ f', Version: {page.header["version"]}'
+ f', CRC32: {crc}'
)
+ nvs_log.bold(f', Page address: 0x{page.start_address:x}')
)
nvs_log.info(nvs_log.bold(' Entry state bitmap: '), end='')
for x in page.raw_entry_state_bitmap:
nvs_log.info(f'{x:02x} ', end='')
nvs_log.info()
# Dump entries
empty_entries = []
for entry in page.entries:
# Skip non-written entries if needed
if written_only and not entry.state == 'Written':
continue
# Compress all empty entries
if (
entry.state == 'Empty' and entry.is_empty
): # Gather all subsequent empty entries
empty_entries.append(entry)
continue
else:
# Print the empty entries
if len(empty_entries) >= 3: # There is enough entries to compress
nvs_log.info(
nvs_log.bold(f' {empty_entries[0].index:03d}.'), 'Empty'
)
nvs_log.info(nvs_log.bold(' ...'))
nvs_log.info(
nvs_log.bold(f' {empty_entries[-1].index:03d}.'), 'Empty'
)
else: # No need for compression
for e in empty_entries:
nvs_log.info(nvs_log.bold(f' {e.index:03d}.'), 'Empty')
empty_entries.clear()
# Dump a single entry
status = entry.state
if status == 'Written':
status = nvs_log.green(f'{status: <7}')
elif status == 'Erased':
status = nvs_log.red(f'{status: <7}')
crc = ''
if (
entry.metadata['crc']['original'] == entry.metadata['crc']['computed']
): # Color CRC32
crc = nvs_log.green(f'{entry.metadata["crc"]["original"]: >8x}')
else:
crc = nvs_log.red(f'{entry.metadata["crc"]["original"]: >8x}')
nvs_log.info(
nvs_log.bold(f' {entry.index:03d}.')
+ ' '
+ status
+ f', Namespace Index: {entry.metadata["namespace"]:03d}'
+ f', Type: {entry.metadata["type"]:<10}'
+ f', Span: {entry.metadata["span"]:03d}'
+ f', Chunk Index: {entry.metadata["chunk_index"]:03d}'
+ f', CRC32: {crc}'
+ f' | {entry.key} : ',
end='',
)
if entry.metadata['type'] not in [
'string',
'blob_data',
'blob_index',
'blob',
]: # Entry is non-variable length
nvs_log.info(entry.data['value'])
else:
if entry.metadata['type'] == 'blob_index':
nvs_log.info(
f'Size={entry.data["size"]}'
+ f', ChunkCount={entry.data["chunk_count"]}'
+ f', ChunkStart={entry.data["chunk_start"]}'
)
else:
if (
entry.metadata['crc']['data_original']
== entry.metadata['crc']['data_computed']
): # Color CRC32
crc = nvs_log.green(
f'{entry.metadata["crc"]["data_original"]:x}'
)
else:
crc = nvs_log.red(f'{entry.metadata["crc"]["data_original"]:x}')
nvs_log.info(f'Size={entry.data["size"]}, CRC32={crc}')
# Dump all children entries
if entry.metadata['span'] != 1:
for i, data in enumerate(entry.children):
nvs_log.info(
f'{"": >6}0x{(i*nvs_const.entry_size):03x} {data.dump_raw()}'
)
# Dump trailing empty entries
if len(empty_entries) >= 3:
nvs_log.info(nvs_log.bold(f' {empty_entries[0].index:03d}.'), 'Empty')
nvs_log.info(nvs_log.bold(' ...'))
nvs_log.info(nvs_log.bold(f' {empty_entries[-1].index:03d}.'), 'Empty')
else:
for e in empty_entries:
nvs_log.info(nvs_log.bold(f' {e.index:03d}.'), 'Empty')
empty_entries.clear()
nvs_log.info()
def dump_written_entries(nvs_partition: NVS_Partition) -> None:
dump_everything(nvs_partition, True)
def list_namespaces(nvs_partition: NVS_Partition) -> None:
# Gather namespaces
ns = {}
for page in nvs_partition.pages:
for entry in page.entries:
if entry.state == 'Written' and entry.metadata['namespace'] == 0:
ns[entry.data['value']] = entry.key
# Print found namespaces
nvs_log.info(nvs_log.bold(f'Index : Namespace'))
for ns_index in sorted(ns):
nvs_log.info(f' {ns_index:03d} :', nvs_log.cyan(ns[ns_index]))
def dump_key_value_pairs(nvs_partition: NVS_Partition) -> None:
# Get namespace list
ns = {}
for page in nvs_partition.pages:
for entry in page.entries:
if entry.state == 'Written' and entry.metadata['namespace'] == 0:
ns[entry.data['value']] = entry.key
# Print key-value pairs
for page in nvs_partition.pages:
# Print page header
if page.is_empty:
nvs_log.info(nvs_log.bold('Page Empty'))
else:
nvs_log.info(
nvs_log.bold(
f'Page no. {page.header["page_index"]}'
+ f', Status: {page.header["status"]}'
)
)
# Print entries
for entry in page.entries:
if (
entry.state == 'Written' and entry.metadata['namespace'] != 0
): # Ignore non-written entries
chunk_index = ''
data = ''
if entry.metadata['type'] not in [
'string',
'blob_data',
'blob_index',
'blob',
]: # Non-variable length entry
data = entry.data['value']
elif entry.metadata['type'] == 'blob_index':
continue
else: # Variable length entries
tmp = b''
for e in entry.children: # Merge all children entries
tmp += bytes(e.raw)
tmp = tmp[: entry.data['size']] # Discard padding
if entry.metadata['type'] == 'blob_data':
if entry.metadata['chunk_index'] >= 128: # Get real chunk index
chunk_index = f'[{entry.metadata["chunk_index"] - 128}]'
else:
chunk_index = f'[{entry.metadata["chunk_index"]}]'
data = str(tmp)
if entry.metadata['namespace'] not in ns:
continue
else:
nvs_log.info(
' '
+ nvs_log.cyan(ns[entry.metadata['namespace']])
+ ':'
+ nvs_log.yellow(entry.key)
+ f'{chunk_index} = {data}'
)
nvs_log.info()
def dump_written_blobs(nvs_partition: NVS_Partition) -> None:
blobs: Dict = {}
strings: List[NVS_Entry] = []
legacy_blobs: List[NVS_Entry] = []
ns = {}
empty_entry = NVS_Entry(-1, bytearray(32), 'Erased')
# Gather namespaces, blob indexes and legacy blobs
for page in nvs_partition.pages:
for entry in page.entries:
if entry.state == 'Written':
if entry.metadata['type'] == 'blob_index':
blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [
empty_entry
] * entry.data['chunk_count']
elif entry.metadata['type'] == 'blob':
legacy_blobs.append(entry)
elif entry.metadata['type'] == 'string':
strings.append(entry)
elif entry.metadata['namespace'] == 0:
ns[entry.data['value']] = entry.key
# Dump blobs
for key in blobs:
for page in nvs_partition.pages:
for entry in page.entries:
# Gather all blob chunks
if (
entry.state == 'Written'
and entry.metadata['type'] != 'blob_index'
and entry.metadata['namespace']
== blobs[key][0].metadata['namespace']
and entry.key == blobs[key][0].key
):
blobs[key][
1
+ entry.metadata['chunk_index']
- blobs[key][0].data['chunk_start']
] = entry
blob_index = blobs[key][0]
blob_chunks = blobs[key][1:]
# Print blob info
nvs_log.info(
nvs_log.cyan(
ns.get(
blob_index.metadata['namespace'], blob_index.metadata['namespace']
)
)
+ ':'
+ nvs_log.yellow(blob_index.key)
+ ' - '
+ f'Type: Blob (Version 2), '
+ f'Size: {blob_index.data["size"]}'
)
# Print blob data
raw_entries = []
for kid in blob_chunks: # Gather all chunk entries
if kid is empty_entry:
raw_entries += [empty_entry]
else:
raw_entries += kid.children
for i, entry in enumerate(raw_entries):
if entry is empty_entry:
nvs_log.info(nvs_log.yellow(f' {"":->63} Missing data {"":-<64}'))
else:
nvs_log.info(
f' 0x{(i * nvs_const.entry_size):05x} {entry.dump_raw()}'
)
nvs_log.info()
# Dump strings
for string in strings:
nvs_log.info(
nvs_log.cyan(
ns.get(string.metadata['namespace'], string.metadata['namespace'])
)
+ ':'
+ nvs_log.yellow(string.key)
+ ' - '
+ 'Type: String, '
+ f'Size: {string.data["size"]}'
)
for i, entry in enumerate(string.children):
nvs_log.info(f' 0x{(i * nvs_const.entry_size):05x} {entry.dump_raw()}')
nvs_log.info()
# Dump legacy blobs
for blob in legacy_blobs:
nvs_log.info(
nvs_log.cyan(ns.get(blob.metadata['namespace'], blob.metadata['namespace']))
+ ':'
+ nvs_log.yellow(blob.key)
+ ' - '
+ 'Type: Blob (Version 1), '
+ f'Size: {blob.data["size"]}'
)
for i, entry in enumerate(blob.children):
nvs_log.info(f' 0x{(i * nvs_const.entry_size):05x} {entry.dump_raw()}')
nvs_log.info()
def integrity_check(nvs_partition: NVS_Partition) -> None:
used_namespaces: Dict[int, None] = {}
found_namespaces: Dict[int, str] = {}
blobs: Dict = {}
blob_chunks: List[NVS_Entry] = []
empty_entry = NVS_Entry(-1, bytearray(32), 'Erased')
# Partition size
if len(nvs_partition.pages) < 3:
nvs_log.info(
nvs_log.yellow(
'Partition has to have at least 3 pages to function properly!'
)
)
# Check if there's any free/empty page
if not any(page.header['status'] == 'Empty' for page in nvs_partition.pages):
nvs_log.info(
nvs_log.red(
'There are no free (empty) pages in the partition, there needs to be at least one free page!'
)
)
nvs_log.info(nvs_log.red('Has the NVS partition been truncated?\n'))
for page in nvs_partition.pages:
# Print page header
if page.header['status'] == 'Empty':
nvs_log.info(nvs_log.cyan('Page Empty'))
# Check if page is truly empty
if page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size:
nvs_log.info(
nvs_log.red(
'Page is reported as empty but entry state bitmap is not empty!'
)
)
if any([not e.is_empty for e in page.entries]):
nvs_log.info(
nvs_log.red('Page is reported as empty but there are data written!')
)
else:
# Check page header CRC32
if page.header['crc']['original'] == page.header['crc']['computed']:
nvs_log.info(
nvs_log.cyan(f'Page no. {page.header["page_index"]}'), '\tCRC32: OK'
)
else:
nvs_log.info(
nvs_log.cyan(f'Page no. {page.header["page_index"]}'),
f'Original CRC32:',
nvs_log.red(f'{page.header["crc"]["original"]:x}'),
f'Generated CRC32:',
nvs_log.green(f'{page.header["crc"]["computed"]:x}'),
)
# Check all entries
for entry in page.entries:
# Entry state check
if entry.is_empty:
if entry.state == 'Written':
nvs_log.info(
nvs_log.red(
f' Entry #{entry.index:03d} is reported as written but is empty!'
)
)
continue
elif entry.state == 'Erased':
nvs_log.info(
nvs_log.yellow(
f' Entry #{entry.index:03d} is reported as erased but is empty!'
)
)
if entry.state == 'Written':
# Entry CRC32 check
if (
entry.metadata['crc']['original']
!= entry.metadata['crc']['computed']
):
nvs_log.info(
nvs_log.red(
f' Entry #{entry.index:03d} {entry.key} has wrong CRC32!{"": <5}'
),
f'Written:',
nvs_log.red(f'{entry.metadata["crc"]["original"]:x}'),
f'Generated:',
nvs_log.green(f'{entry.metadata["crc"]["computed"]:x}'),
)
# Entry children CRC32 check
if (
entry.metadata['span'] > 1
and entry.metadata['crc']['data_original']
!= entry.metadata['crc']['data_computed']
):
nvs_log.info(
nvs_log.red(
f' Entry #{entry.index:03d} {entry.key} data has wrong CRC32!'
),
f'Written:',
nvs_log.red(f'{entry.metadata["crc"]["original"]:x}'),
f'Generated:',
nvs_log.green(f'{entry.metadata["crc"]["computed"]:x}'),
)
# Entry type check
if entry.metadata['type'] not in [
nvs_const.item_type[key] for key in nvs_const.item_type
]:
nvs_log.info(
nvs_log.yellow(
f' Type of entry #{entry.index:03d} {entry.key} is unrecognized!'
),
f'Type: {entry.metadata["type"]}',
)
# Span check
if (
entry.index + entry.metadata['span'] - 1
>= int(nvs_const.page_size / nvs_const.entry_size) - 2
):
nvs_log.info(
nvs_log.red(
f' Variable length entry #{entry.index:03d} {entry.key} is out of bounds!'
)
)
# Spanned entry state checks
elif entry.metadata['span'] > 1:
parent_state = entry.state
for kid in entry.children:
if parent_state != kid.state:
nvs_log.info(
nvs_log.yellow(' Inconsistent data state!'),
f'Entry #{entry.index:03d} {entry.key} state: {parent_state},',
f'Data entry #{kid.index:03d} {entry.key} state: {kid.state}',
)
# Gather blobs & namespaces
if entry.metadata['type'] == 'blob_index':
blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [
empty_entry
] * entry.data['chunk_count']
elif entry.metadata['type'] == 'blob_data':
blob_chunks.append(entry)
if entry.metadata['namespace'] == 0:
found_namespaces[entry.data['value']] = entry.key
else:
used_namespaces[entry.metadata['namespace']] = None
nvs_log.info()
# Assemble blobs
for chunk in blob_chunks:
parent = blobs.get(
f'{chunk.metadata["namespace"]:03d}{chunk.key}', [empty_entry]
)[0]
if parent is empty_entry:
nvs_log.info(
nvs_log.red(f'Blob {chunk.key} chunk has no blob index!'),
f'Namespace index: {chunk.metadata["namespace"]:03d}',
f'[{found_namespaces.get(chunk.metadata["namespace"], "undefined")}],',
f'Chunk Index: {chunk.metadata["chunk_index"]:03d}',
)
else:
blob_key = f'{chunk.metadata["namespace"]:03d}{chunk.key}'
chunk_index = chunk.metadata['chunk_index'] - parent.data['chunk_start']
blobs[blob_key][chunk_index + 1] = chunk
# Check all blobs
for blob_key in blobs:
blob_index = blobs[blob_key][0]
blob_chunks = blobs[blob_key][1:]
blob_size = blob_index.data['size']
# Blob data check
for i, chunk in enumerate(blob_chunks):
if chunk is empty_entry:
nvs_log.info(
nvs_log.red(f'Blob {blob_index.key} is missing a chunk!'),
f'Namespace index: {blob_index.metadata["namespace"]:03d}',
f'[{found_namespaces.get(blob_index.metadata["namespace"], "undefined")}],',
f'Chunk Index: {i:03d}',
)
else:
blob_size -= len(chunk.children) * nvs_const.entry_size
if blob_size > 0:
nvs_log.info(
nvs_log.red(f'Blob {blob_index.key} is missing {blob_size} B of data!'),
f'Namespace index: {blob_index.metadata["namespace"]:03d}',
)
# Namespaces
for used_ns in used_namespaces:
key = found_namespaces.pop(used_ns, '')
if key == '':
nvs_log.info(
nvs_log.red('Undefined namespace index!'),
f'Namespace index: {used_ns:03d}',
f'[undefined]',
)
for unused_ns in found_namespaces:
nvs_log.info(
nvs_log.yellow('Found unused namespace.'),
f'Namespace index: {unused_ns:03d}',
f'[{found_namespaces[unused_ns]}]',
)
def print_json(nvs: NVS_Partition) -> None:
class NVSEncoder(json.JSONEncoder):
def default(self, obj: Any) -> Union[Any, Dict[str, Any], str]:
if hasattr(obj, 'toJSON'):
return obj.toJSON()
if isinstance(obj, bytearray):
return binascii.b2a_base64(obj, newline=False).decode(
'ascii'
) # Binary to Base64 ASCII representation
return json.JSONEncoder.default(self, obj)
print(json.dumps(nvs.toJSON(), cls=NVSEncoder, indent=2))

View File

@ -0,0 +1,285 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from typing import Any, Dict, List, Optional
from zlib import crc32
# Constants
class NVS_Constants:
class ConstantError(AttributeError):
pass
def __init__(self) -> None:
self.page_size = 4096
self.entry_size = 32
self.item_type = {
0x01: 'uint8_t',
0x11: 'int8_t',
0x02: 'uint16_t',
0x12: 'int16_t',
0x04: 'uint32_t',
0x14: 'int32_t',
0x08: 'uint64_t',
0x18: 'int64_t',
0x21: 'string',
0x41: 'blob',
0x42: 'blob_data',
0x48: 'blob_index',
}
self.page_status = {
0xFFFFFFFF: 'Empty',
0xFFFFFFFE: 'Active',
0xFFFFFFFC: 'Full',
0xFFFFFFF8: 'Erasing',
0x00000000: 'Corrupted',
}
self.entry_status = {
0b11: 'Empty',
0b10: 'Written',
0b00: 'Erased',
}
def __setattr__(self, key: str, val: Any) -> None:
if self.__dict__.get(key, None) is None:
self.__dict__[key] = val
else:
raise NVS_Constants.ConstantError('Cannot change a constant!')
nvs_const = NVS_Constants()
class NotAlignedError(ValueError):
pass
class NVS_Partition:
def __init__(self, name: str, raw_data: bytearray):
if len(raw_data) % nvs_const.page_size != 0:
raise NotAlignedError(
f'Given partition data is not aligned to page size ({len(raw_data)} % {nvs_const.page_size} = {len(raw_data)%nvs_const.page_size})'
)
# Divide partition into pages
self.name = name
self.pages = []
for i in range(0, len(raw_data), nvs_const.page_size):
self.pages.append(NVS_Page(raw_data[i: i + nvs_const.page_size], i))
def toJSON(self) -> Dict[str, Any]:
return dict(name=self.name, pages=self.pages)
class NVS_Page:
def __init__(self, page_data: bytearray, address: int):
if len(page_data) != nvs_const.page_size:
raise NotAlignedError(
f'Size of given page does not match page size ({len(page_data)} != {nvs_const.page_size})'
)
# Initialize class
self.is_empty = (
page_data[0: nvs_const.entry_size]
== bytearray({0xFF}) * nvs_const.entry_size
)
self.start_address = address
self.raw_header = page_data[0: nvs_const.entry_size]
self.raw_entry_state_bitmap = page_data[
nvs_const.entry_size: 2 * nvs_const.entry_size
]
self.entries = []
# Load header
self.header: Dict[str, Any] = {
'status': nvs_const.page_status.get(
int.from_bytes(page_data[0:4], byteorder='little'), 'Invalid'
),
'page_index': int.from_bytes(page_data[4:8], byteorder='little'),
'version': 256 - page_data[8],
'crc': {
'original': int.from_bytes(page_data[28:32], byteorder='little'),
'computed': crc32(page_data[4:28], 0xFFFFFFFF),
},
}
# Load entry state bitmap
entry_states = []
for c in self.raw_entry_state_bitmap:
for index in range(0, 8, 2):
entry_states.append(
nvs_const.entry_status.get((c >> index) & 3, 'Invalid')
)
entry_states = entry_states[:-2]
# Load entries
i = 2
while i < int(
nvs_const.page_size / nvs_const.entry_size
): # Loop through every entry
span = page_data[(i * nvs_const.entry_size) + 2]
if span in [0xFF, 0]: # 'Default' span length to prevent span overflow
span = 1
# Load an entry
entry = NVS_Entry(
i - 2,
page_data[i * nvs_const.entry_size: (i + 1) * nvs_const.entry_size],
entry_states[i - 2],
)
self.entries.append(entry)
# Load all children entries
if span != 1:
for span_idx in range(1, span):
page_addr = i + span_idx
entry_idx = page_addr - 2
if page_addr * nvs_const.entry_size >= nvs_const.page_size:
break
child_entry = NVS_Entry(
entry_idx,
page_data[
page_addr
* nvs_const.entry_size: (page_addr + 1)
* nvs_const.entry_size
],
entry_states[entry_idx],
)
entry.child_assign(child_entry)
entry.compute_crc()
i += span
def toJSON(self) -> Dict[str, Any]:
return dict(
is_empty=self.is_empty,
start_address=self.start_address,
raw_header=self.raw_header,
raw_entry_state_bitmap=self.raw_entry_state_bitmap,
header=self.header,
entries=self.entries,
)
class NVS_Entry:
def __init__(self, index: int, entry_data: bytearray, entry_state: str):
if len(entry_data) != nvs_const.entry_size:
raise NotAlignedError(
f'Given entry is not aligned to entry size ({len(entry_data)} % {nvs_const.entry_size} = {len(entry_data)%nvs_const.entry_size})'
)
def item_convert(i_type: int, data: bytearray) -> Dict:
byte_size_mask = 0x0F
number_sign_mask = 0xF0
fixed_entry_length_threshold = (
0x20 # Fixed length entry type number is always smaller than this
)
if i_type in nvs_const.item_type:
# Deal with non variable length entries
if i_type < fixed_entry_length_threshold:
size = i_type & byte_size_mask
num = int.from_bytes(
data[:size],
byteorder='little',
signed=bool(i_type & number_sign_mask),
)
return {'value': num}
# Deal with variable length entries
if nvs_const.item_type[i_type] in ['string', 'blob_data', 'blob']:
size = int.from_bytes(data[:2], byteorder='little')
crc = int.from_bytes(data[4:8], byteorder='little')
return {'value': [size, crc], 'size': size, 'crc': crc}
if nvs_const.item_type[i_type] == 'blob_index':
size = int.from_bytes(data[:4], byteorder='little')
chunk_count = data[4]
chunk_start = data[5]
return {
'value': [size, chunk_count, chunk_start],
'size': size,
'chunk_count': chunk_count,
'chunk_start': chunk_start,
}
return {'value': None}
def key_decode(data: bytearray) -> Optional[str]:
decoded = ''
for n in data.rstrip(b'\x00'):
char = chr(n)
if char.isascii():
decoded += char
else:
return None
return decoded
self.raw = entry_data
self.state = entry_state
self.is_empty = self.raw == bytearray({0xFF}) * nvs_const.entry_size
self.index = index
namespace = self.raw[0]
entry_type = self.raw[1]
span = self.raw[2]
chunk_index = self.raw[3]
crc = self.raw[4:8]
key = self.raw[8:24]
data = self.raw[24:32]
raw_without_crc = self.raw[:4] + self.raw[8:32]
self.metadata: Dict[str, Any] = {
'namespace': namespace,
'type': nvs_const.item_type.get(entry_type, f'0x{entry_type:02x}'),
'span': span,
'chunk_index': chunk_index,
'crc': {
'original': int.from_bytes(crc, byteorder='little'),
'computed': crc32(raw_without_crc, 0xFFFFFFFF),
'data_original': int.from_bytes(data[-4:], byteorder='little'),
'data_computed': 0,
},
}
self.children: List['NVS_Entry'] = []
self.key = key_decode(key)
if self.key is None:
self.data = None
else:
self.data = item_convert(entry_type, data)
def dump_raw(self) -> str:
hex_bytes = ''
decoded = ''
for i, c in enumerate(self.raw):
middle_index = int(len(self.raw) / 2)
if i == middle_index: # Add a space in the middle
hex_bytes += ' '
decoded += ' '
hex_bytes += f'{c:02x} '
decoded += chr(c) if chr(c).isprintable() else '.'
return hex_bytes + ' ' + decoded
def child_assign(self, entry: 'NVS_Entry') -> None:
if not isinstance(entry, type(self)):
raise ValueError('You can assign only NVS_Entry')
self.children.append(entry)
def compute_crc(self) -> None:
if self.metadata['span'] == 1:
return
# Merge entries into one buffer
children_data = bytearray()
for entry in self.children:
children_data += entry.raw
if self.data:
children_data = children_data[: self.data['size']] # Discard padding
self.metadata['crc']['data_computed'] = crc32(children_data, 0xFFFFFFFF)
def toJSON(self) -> Dict[str, Any]:
return dict(
raw=self.raw,
state=self.state,
is_empty=self.is_empty,
index=self.index,
metadata=self.metadata,
children=self.children,
key=self.key,
data=self.data,
)

View File

@ -0,0 +1,123 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import argparse
import os
import sys
import traceback
import nvs_logger
import nvs_parser
from nvs_logger import nvs_log
from nvs_parser import nvs_const
def program_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description='Parse NVS partition', formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('file', help='Path to dumped NVS partition')
parser.add_argument(
'-i',
'--integrity-check',
action='store_true',
help='check partition for potential errors',
)
tmp = {
'all': 'print everything',
'written': 'print only currently written entries',
'minimal': 'print only namespace:key=value pairs',
'namespaces': 'list all written namespaces',
'blobs': 'print all blobs and strings',
'storage_info': 'print storage related information (free/used entries, etc)',
'none': 'do not print anything (if you only want to do integrity check)',
}
parser.add_argument(
'-d',
'--dump',
choices=tmp,
default='all',
metavar='type',
help=(
f"""type: {str(list(tmp.keys()))[1:-1]}
{os.linesep.join([f'{opt} - {tmp[opt]}' for opt in tmp])}"""
),
)
parser.add_argument(
'--color',
choices=['never', 'auto', 'always'],
default='auto',
help='Enable color (ANSI)',
)
tmp = {
'text': 'print output as a human-readable text',
'json': 'print output as JSON and exit',
}
parser.add_argument(
'-f', '--format', choices=tmp, default='text', help='Output format'
)
return parser.parse_args()
def main() -> None:
args = program_args()
if nvs_const.entry_size != 32:
raise ValueError(f'Entry size is not 32B! This is currently non negotiable.')
nvs_log.set_color(args.color)
nvs_log.set_format(args.format)
try:
with open(args.file, 'rb') as f:
partition = f.read()
except IndexError:
nvs_log.error('No file given')
raise
except FileNotFoundError:
nvs_log.error('Bad filename')
raise
nvs = nvs_parser.NVS_Partition(args.file.split('/')[-1], bytearray(partition))
def noop(_: nvs_parser.NVS_Partition) -> None:
pass
def format_not_implemented(_: nvs_parser.NVS_Partition) -> None:
raise RuntimeError(f'{args.format} is not implemented')
def cmd_not_implemented(_: nvs_parser.NVS_Partition) -> None:
raise RuntimeError(f'{args.dump} is not implemented')
formats = {
'text': noop,
'json': nvs_logger.print_json,
}
formats.get(args.format, format_not_implemented)(nvs)
if args.format == 'text':
cmds = {
'all': nvs_logger.dump_everything,
'written': nvs_logger.dump_written_entries,
'minimal': nvs_logger.dump_key_value_pairs,
'namespaces': nvs_logger.list_namespaces,
'blobs': nvs_logger.dump_written_blobs,
'storage_info': nvs_logger.storage_stats,
'none': noop,
}
cmds.get(args.dump, cmd_not_implemented)(nvs) # type: ignore
if args.integrity_check:
nvs_log.info()
nvs_logger.integrity_check(nvs)
if __name__ == '__main__':
try:
main()
except ValueError:
traceback.print_exc(file=sys.stderr)
sys.exit(1)
except nvs_parser.NVS_Constants.ConstantError:
traceback.print_exc(file=sys.stderr)
sys.exit(1)