mirror of
https://github.com/espressif/esp-idf.git
synced 2024-10-05 20:47:46 -04:00
nvs_util: Add Python2 and Python3 compatible
This commit is contained in:
parent
cd7dcd0c85
commit
dca5ac78f0
@ -17,8 +17,11 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
from __future__ import division
|
|
||||||
from __future__ import print_function
|
from __future__ import division, print_function #, unicode_literals
|
||||||
|
from future.utils import raise_
|
||||||
|
from builtins import int, range
|
||||||
|
from io import open
|
||||||
import sys
|
import sys
|
||||||
import argparse
|
import argparse
|
||||||
import binascii
|
import binascii
|
||||||
@ -80,7 +83,7 @@ class Page(object):
|
|||||||
global page_header
|
global page_header
|
||||||
|
|
||||||
# set page state to active
|
# set page state to active
|
||||||
page_header= bytearray(b'\xff')*32
|
page_header= bytearray(b'\xff') *32
|
||||||
page_state_active_seq = Page.ACTIVE
|
page_state_active_seq = Page.ACTIVE
|
||||||
struct.pack_into('<I', page_header, 0, page_state_active_seq)
|
struct.pack_into('<I', page_header, 0, page_state_active_seq)
|
||||||
# set page sequence number
|
# set page sequence number
|
||||||
@ -91,11 +94,8 @@ class Page(object):
|
|||||||
elif version == Page.VERSION1:
|
elif version == Page.VERSION1:
|
||||||
page_header[8] = Page.VERSION1
|
page_header[8] = Page.VERSION1
|
||||||
# set header's CRC
|
# set header's CRC
|
||||||
crc_data = page_header[4:28]
|
crc_data = bytes(page_header[4:28])
|
||||||
if sys.version_info[0] < 3:
|
crc = zlib.crc32(crc_data, 0xFFFFFFFF)
|
||||||
crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
|
|
||||||
else:
|
|
||||||
crc = zlib.crc32(memoryview(crc_data), 0xFFFFFFFF)
|
|
||||||
struct.pack_into('<I', page_header, 28, crc & 0xFFFFFFFF)
|
struct.pack_into('<I', page_header, 28, crc & 0xFFFFFFFF)
|
||||||
self.page_buf[0:len(page_header)] = page_header
|
self.page_buf[0:len(page_header)] = page_header
|
||||||
|
|
||||||
@ -122,12 +122,8 @@ class Page(object):
|
|||||||
def encrypt_entry(self, data_arr, tweak_arr, encr_key):
|
def encrypt_entry(self, data_arr, tweak_arr, encr_key):
|
||||||
# Encrypt 32 bytes of data using AES-XTS encryption
|
# Encrypt 32 bytes of data using AES-XTS encryption
|
||||||
backend = default_backend()
|
backend = default_backend()
|
||||||
if sys.version_info[0] < 3:
|
plain_text = codecs.decode(data_arr, 'hex')
|
||||||
plain_text = data_arr.decode('hex')
|
tweak = codecs.decode(tweak_arr, 'hex')
|
||||||
tweak = tweak_arr.decode('hex')
|
|
||||||
else:
|
|
||||||
plain_text = codecs.decode(data_arr, 'hex')
|
|
||||||
tweak = codecs.decode(tweak_arr, 'hex')
|
|
||||||
|
|
||||||
cipher = Cipher(algorithms.AES(encr_key), modes.XTS(tweak), backend=backend)
|
cipher = Cipher(algorithms.AES(encr_key), modes.XTS(tweak), backend=backend)
|
||||||
encryptor = cipher.encryptor()
|
encryptor = cipher.encryptor()
|
||||||
@ -158,32 +154,16 @@ class Page(object):
|
|||||||
|
|
||||||
|
|
||||||
# Extract encryption key and tweak key from given key input
|
# Extract encryption key and tweak key from given key input
|
||||||
if sys.version_info[0] < 3:
|
encr_key_input = codecs.decode(self.encr_key, 'hex')
|
||||||
encr_key_input = self.encr_key.decode('hex')
|
|
||||||
else:
|
|
||||||
encr_key_input = codecs.decode(self.encr_key, 'hex')
|
|
||||||
|
|
||||||
|
|
||||||
rel_addr = nvs_obj.page_num * Page.PAGE_PARAMS["max_size"] + Page.FIRST_ENTRY_OFFSET
|
rel_addr = nvs_obj.page_num * Page.PAGE_PARAMS["max_size"] + Page.FIRST_ENTRY_OFFSET
|
||||||
|
|
||||||
|
if not isinstance(data_input, bytearray):
|
||||||
if type(data_input) != bytearray:
|
|
||||||
byte_arr = bytearray(b'\xff') * 32
|
byte_arr = bytearray(b'\xff') * 32
|
||||||
if sys.version_info[0] < 3:
|
byte_arr[0:len(data_input)] = data_input
|
||||||
byte_arr[0:len(data_input)] = data_input
|
|
||||||
else:
|
|
||||||
if type(data_input) == str:
|
|
||||||
byte_arr[0:len(data_input)] = data_input
|
|
||||||
else:
|
|
||||||
byte_arr[0:len(data_input)] = data_input
|
|
||||||
|
|
||||||
data_input = byte_arr
|
data_input = byte_arr
|
||||||
|
|
||||||
|
data_input = binascii.hexlify(data_input)
|
||||||
if sys.version_info[0] < 3:
|
|
||||||
data_input = binascii.hexlify(bytearray(data_input))
|
|
||||||
else:
|
|
||||||
data_input = data_input.hex()
|
|
||||||
|
|
||||||
entry_no = self.entry_num
|
entry_no = self.entry_num
|
||||||
start_idx = 0
|
start_idx = 0
|
||||||
@ -208,11 +188,10 @@ class Page(object):
|
|||||||
|
|
||||||
# Encrypt data
|
# Encrypt data
|
||||||
data_bytes = data_input[start_idx:end_idx]
|
data_bytes = data_input[start_idx:end_idx]
|
||||||
if sys.version_info[0] < 3:
|
if type(data_bytes) == bytes:
|
||||||
data_val = data_bytes + (init_data_val * (data_len_needed - len(data_bytes)))
|
data_bytes = data_bytes.decode()
|
||||||
else:
|
|
||||||
data_val = str(data_bytes) + (init_data_val * (data_len_needed - len(data_bytes)))
|
|
||||||
|
|
||||||
|
data_val = data_bytes + (init_data_val * (data_len_needed - len(data_bytes)))
|
||||||
encr_data_ret = self.encrypt_entry(data_val, tweak_val, encr_key_input)
|
encr_data_ret = self.encrypt_entry(data_val, tweak_val, encr_key_input)
|
||||||
encr_data_to_write = encr_data_to_write + encr_data_ret
|
encr_data_to_write = encr_data_to_write + encr_data_ret
|
||||||
# Update values for encrypting next set of data bytes
|
# Update values for encrypting next set of data bytes
|
||||||
@ -220,32 +199,21 @@ class Page(object):
|
|||||||
end_idx = start_idx + 64
|
end_idx = start_idx + 64
|
||||||
entry_no += 1
|
entry_no += 1
|
||||||
|
|
||||||
|
|
||||||
return encr_data_to_write
|
return encr_data_to_write
|
||||||
|
|
||||||
|
|
||||||
def write_entry_to_buf(self, data, entrycount,nvs_obj):
|
def write_entry_to_buf(self, data, entrycount,nvs_obj):
|
||||||
encr_data = bytearray()
|
encr_data = bytearray()
|
||||||
|
|
||||||
if self.is_encrypt:
|
if self.is_encrypt:
|
||||||
encr_data_ret = self.encrypt_data(data, entrycount,nvs_obj)
|
encr_data_ret = self.encrypt_data(data, entrycount,nvs_obj)
|
||||||
if sys.version_info[0] < 3:
|
encr_data[0:len(encr_data_ret)] = encr_data_ret
|
||||||
encr_data[0:len(encr_data_ret)] = encr_data_ret
|
|
||||||
else:
|
|
||||||
encr_data[0:len(encr_data_ret)] = encr_data_ret
|
|
||||||
|
|
||||||
data = encr_data
|
data = encr_data
|
||||||
|
|
||||||
data_offset = Page.FIRST_ENTRY_OFFSET + (Page.SINGLE_ENTRY_SIZE * self.entry_num)
|
data_offset = Page.FIRST_ENTRY_OFFSET + (Page.SINGLE_ENTRY_SIZE * self.entry_num)
|
||||||
start_idx = data_offset
|
start_idx = data_offset
|
||||||
end_idx = data_offset + len(data)
|
end_idx = data_offset + len(data)
|
||||||
if not sys.version_info[0] < 3:
|
self.page_buf[start_idx:end_idx] = data
|
||||||
if type(data) == str:
|
|
||||||
self.page_buf[start_idx:end_idx] = data
|
|
||||||
else:
|
|
||||||
self.page_buf[start_idx:end_idx] = data
|
|
||||||
else:
|
|
||||||
self.page_buf[start_idx:end_idx] = data
|
|
||||||
|
|
||||||
|
|
||||||
# Set bitmap array for entries in current page
|
# Set bitmap array for entries in current page
|
||||||
for i in range(0, entrycount):
|
for i in range(0, entrycount):
|
||||||
@ -257,13 +225,12 @@ class Page(object):
|
|||||||
crc_data = bytearray(b'28')
|
crc_data = bytearray(b'28')
|
||||||
crc_data[0:4] = entry_struct[0:4]
|
crc_data[0:4] = entry_struct[0:4]
|
||||||
crc_data[4:28] = entry_struct[8:32]
|
crc_data[4:28] = entry_struct[8:32]
|
||||||
if sys.version_info[0] < 3:
|
crc_data = bytes(crc_data)
|
||||||
crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
|
crc = zlib.crc32(crc_data, 0xFFFFFFFF)
|
||||||
else:
|
|
||||||
crc = zlib.crc32(crc_data, 0xFFFFFFFF)
|
|
||||||
struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF)
|
struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF)
|
||||||
return entry_struct
|
return entry_struct
|
||||||
|
|
||||||
|
|
||||||
def write_varlen_binary_data(self, entry_struct, ns_index, key, data, data_size, total_entry_count,nvs_obj):
|
def write_varlen_binary_data(self, entry_struct, ns_index, key, data, data_size, total_entry_count,nvs_obj):
|
||||||
chunk_start = 0
|
chunk_start = 0
|
||||||
chunk_count = 0
|
chunk_count = 0
|
||||||
@ -307,10 +274,7 @@ class Page(object):
|
|||||||
|
|
||||||
# Compute CRC of data chunk
|
# Compute CRC of data chunk
|
||||||
struct.pack_into('<H', entry_struct, 24, chunk_size)
|
struct.pack_into('<H', entry_struct, 24, chunk_size)
|
||||||
if not sys.version_info[0] < 3:
|
data_chunk = bytes(data_chunk)
|
||||||
if type(data_chunk) == str:
|
|
||||||
data_chunk = codecs.encode(data_chunk, 'utf8')
|
|
||||||
|
|
||||||
crc = zlib.crc32(data_chunk, 0xFFFFFFFF)
|
crc = zlib.crc32(data_chunk, 0xFFFFFFFF)
|
||||||
struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF)
|
struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF)
|
||||||
|
|
||||||
@ -367,15 +331,9 @@ class Page(object):
|
|||||||
def write_single_page_entry(self, entry_struct, data, datalen, data_entry_count, nvs_obj):
|
def write_single_page_entry(self, entry_struct, data, datalen, data_entry_count, nvs_obj):
|
||||||
# compute CRC of data
|
# compute CRC of data
|
||||||
struct.pack_into('<H', entry_struct, 24, datalen)
|
struct.pack_into('<H', entry_struct, 24, datalen)
|
||||||
if sys.version_info[0] < 3:
|
if not type(data) == bytes:
|
||||||
crc = zlib.crc32(data, 0xFFFFFFFF)
|
data = data.encode()
|
||||||
else:
|
crc = zlib.crc32(data, 0xFFFFFFFF)
|
||||||
if (type(data)) == str:
|
|
||||||
data = data.encode('utf8')
|
|
||||||
crc = zlib.crc32(data, 0xFFFFFFFF)
|
|
||||||
else:
|
|
||||||
crc = zlib.crc32(data, 0xFFFFFFFF)
|
|
||||||
|
|
||||||
struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF)
|
struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF)
|
||||||
|
|
||||||
# compute crc of entry header
|
# compute crc of entry header
|
||||||
@ -429,12 +387,9 @@ class Page(object):
|
|||||||
entry_struct[2] = data_entry_count + 1
|
entry_struct[2] = data_entry_count + 1
|
||||||
|
|
||||||
# set key
|
# set key
|
||||||
key_array = bytearray(b'\x00')*16
|
key_array = b'\x00' * 16
|
||||||
entry_struct[8:24] = key_array
|
entry_struct[8:24] = key_array
|
||||||
if sys.version_info[0] < 3:
|
entry_struct[8:8 + len(key)] = key.encode()
|
||||||
entry_struct[8:8 + len(key)] = key
|
|
||||||
else:
|
|
||||||
entry_struct[8:8 + len(key)] = key.encode('utf8')
|
|
||||||
|
|
||||||
# set Type
|
# set Type
|
||||||
if encoding == "string":
|
if encoding == "string":
|
||||||
@ -463,14 +418,9 @@ class Page(object):
|
|||||||
entry_struct[3] = chunk_index
|
entry_struct[3] = chunk_index
|
||||||
|
|
||||||
# write key
|
# write key
|
||||||
key_array = bytearray(b'\x00')*16
|
key_array = b'\x00' *16
|
||||||
entry_struct[8:24] = key_array
|
entry_struct[8:24] = key_array
|
||||||
|
entry_struct[8:8 + len(key)] = key.encode()
|
||||||
if sys.version_info[0] < 3:
|
|
||||||
entry_struct[8:8 + len(key)] = key
|
|
||||||
else:
|
|
||||||
entry_struct[8:8 + len(key)] = key.encode('utf8')
|
|
||||||
|
|
||||||
|
|
||||||
if encoding == "u8":
|
if encoding == "u8":
|
||||||
entry_struct[1] = Page.U8
|
entry_struct[1] = Page.U8
|
||||||
@ -492,10 +442,8 @@ class Page(object):
|
|||||||
crc_data = bytearray(b'28')
|
crc_data = bytearray(b'28')
|
||||||
crc_data[0:4] = entry_struct[0:4]
|
crc_data[0:4] = entry_struct[0:4]
|
||||||
crc_data[4:28] = entry_struct[8:32]
|
crc_data[4:28] = entry_struct[8:32]
|
||||||
if sys.version_info[0] < 3:
|
crc_data = bytes(crc_data)
|
||||||
crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
|
crc = zlib.crc32(crc_data, 0xFFFFFFFF)
|
||||||
else:
|
|
||||||
crc = zlib.crc32(memoryview(crc_data), 0xFFFFFFFF)
|
|
||||||
struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF)
|
struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF)
|
||||||
|
|
||||||
# write to file
|
# write to file
|
||||||
@ -562,7 +510,6 @@ class NVS(object):
|
|||||||
except PageFullError:
|
except PageFullError:
|
||||||
new_page = self.create_new_page()
|
new_page = self.create_new_page()
|
||||||
new_page.write_primitive_data(key, self.namespace_idx, "u8", 0,self)
|
new_page.write_primitive_data(key, self.namespace_idx, "u8", 0,self)
|
||||||
pass
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Write key-value pair. Function accepts value in the form of ascii character and converts
|
Write key-value pair. Function accepts value in the form of ascii character and converts
|
||||||
@ -580,6 +527,8 @@ class NVS(object):
|
|||||||
value = binascii.a2b_base64(value)
|
value = binascii.a2b_base64(value)
|
||||||
|
|
||||||
if encoding == "string":
|
if encoding == "string":
|
||||||
|
if type(value) == bytes:
|
||||||
|
value = value.decode()
|
||||||
value += '\0'
|
value += '\0'
|
||||||
|
|
||||||
encoding = encoding.lower()
|
encoding = encoding.lower()
|
||||||
@ -591,15 +540,12 @@ class NVS(object):
|
|||||||
except PageFullError:
|
except PageFullError:
|
||||||
new_page = self.create_new_page()
|
new_page = self.create_new_page()
|
||||||
new_page.write_varlen_data(key, value, encoding, self.namespace_idx,self)
|
new_page.write_varlen_data(key, value, encoding, self.namespace_idx,self)
|
||||||
pass
|
|
||||||
elif encoding in primitive_encodings:
|
elif encoding in primitive_encodings:
|
||||||
try:
|
try:
|
||||||
self.cur_page.write_primitive_data(key, int(value), encoding, self.namespace_idx,self)
|
self.cur_page.write_primitive_data(key, int(value), encoding, self.namespace_idx,self)
|
||||||
except PageFullError:
|
except PageFullError:
|
||||||
new_page = self.create_new_page()
|
new_page = self.create_new_page()
|
||||||
new_page.write_primitive_data(key, int(value), encoding, self.namespace_idx,self)
|
new_page.write_primitive_data(key, int(value), encoding, self.namespace_idx,self)
|
||||||
sys.exc_clear()
|
|
||||||
pass
|
|
||||||
else:
|
else:
|
||||||
raise InputError("%s: Unsupported encoding" % encoding)
|
raise InputError("%s: Unsupported encoding" % encoding)
|
||||||
|
|
||||||
@ -658,12 +604,9 @@ def write_entry(nvs_instance, key, datatype, encoding, value):
|
|||||||
if os.path.isabs(value) == False:
|
if os.path.isabs(value) == False:
|
||||||
script_dir = os.path.dirname(__file__)
|
script_dir = os.path.dirname(__file__)
|
||||||
abs_file_path = os.path.join(script_dir, value)
|
abs_file_path = os.path.join(script_dir, value)
|
||||||
if sys.version_info[0] < 3:
|
|
||||||
with open(abs_file_path, 'rb') as f:
|
with open(abs_file_path, 'rb') as f:
|
||||||
value = f.read()
|
value = f.read()
|
||||||
else:
|
|
||||||
with open(abs_file_path, 'r', newline='') as f:
|
|
||||||
value = f.read()
|
|
||||||
|
|
||||||
if datatype == "namespace":
|
if datatype == "namespace":
|
||||||
nvs_instance.write_namespace(key)
|
nvs_instance.write_namespace(key)
|
||||||
@ -683,7 +626,7 @@ def nvs_part_gen(input_filename=None, output_filename=None, input_size=None, key
|
|||||||
|
|
||||||
:param input_filename: Name of input file containing data
|
:param input_filename: Name of input file containing data
|
||||||
:param output_filename: Name of output file to store generated binary
|
:param output_filename: Name of output file to store generated binary
|
||||||
:param input_size: Size of partition (must be multiple of 4096)
|
:param input_size: Size of partition in bytes (must be multiple of 4096)
|
||||||
:param key_gen: Enable encryption key generation in encryption mode
|
:param key_gen: Enable encryption key generation in encryption mode
|
||||||
:param encrypt_mode: Enable/Disable encryption mode
|
:param encrypt_mode: Enable/Disable encryption mode
|
||||||
:param key_file: Input file having encryption keys in encryption mode
|
:param key_file: Input file having encryption keys in encryption mode
|
||||||
@ -695,7 +638,7 @@ def nvs_part_gen(input_filename=None, output_filename=None, input_size=None, key
|
|||||||
is_encrypt_data = encrypt_mode
|
is_encrypt_data = encrypt_mode
|
||||||
|
|
||||||
# Set size
|
# Set size
|
||||||
input_size = int(input_size, 16)
|
input_size = int(input_size, 0)
|
||||||
|
|
||||||
if input_size % 4096 !=0:
|
if input_size % 4096 !=0:
|
||||||
sys.exit("Size of partition (must be multiple of 4096)")
|
sys.exit("Size of partition (must be multiple of 4096)")
|
||||||
@ -734,20 +677,13 @@ def nvs_part_gen(input_filename=None, output_filename=None, input_size=None, key
|
|||||||
sys.exit("Invalid. Cannot give --key_file as --encrypt is set to False.")
|
sys.exit("Invalid. Cannot give --key_file as --encrypt is set to False.")
|
||||||
|
|
||||||
if key_gen:
|
if key_gen:
|
||||||
if sys.version_info[0] < 3:
|
key_input = ''.join(random.choice('0123456789abcdef') for _ in range(128)).strip()
|
||||||
key_input = ''.join(random.choice('0123456789abcdef') for _ in xrange(128))
|
|
||||||
else:
|
|
||||||
key_input = ''.join(random.choice('0123456789abcdef') for _ in range(128)).strip()
|
|
||||||
elif key_file:
|
elif key_file:
|
||||||
with open(key_file) as key_f:
|
with open(key_file, 'rt', encoding='utf8') as key_f:
|
||||||
key_input = key_f.readline()
|
key_input = key_f.readline()
|
||||||
key_input = key_input.strip()
|
key_input = key_input.strip()
|
||||||
|
|
||||||
if sys.version_info[0] < 3:
|
input_file = open(input_filename, 'rt', encoding='utf8')
|
||||||
input_file = open(input_filename, 'rb')
|
|
||||||
else:
|
|
||||||
input_file = open(input_filename, 'r', newline='')
|
|
||||||
|
|
||||||
output_file = open(output_filename, 'wb')
|
output_file = open(output_filename, 'wb')
|
||||||
|
|
||||||
with nvs_open(output_file, input_size) as nvs_obj:
|
with nvs_open(output_file, input_size) as nvs_obj:
|
||||||
@ -765,28 +701,21 @@ def nvs_part_gen(input_filename=None, output_filename=None, input_size=None, key
|
|||||||
output_file.close()
|
output_file.close()
|
||||||
|
|
||||||
if is_encrypt_data:
|
if is_encrypt_data:
|
||||||
output_keys_file = open("encryption_keys.bin",'wb')
|
|
||||||
keys_page_buf = bytearray(b'\xff')*Page.PAGE_PARAMS["max_size"]
|
keys_page_buf = bytearray(b'\xff')*Page.PAGE_PARAMS["max_size"]
|
||||||
|
key_bytes = bytearray()
|
||||||
|
|
||||||
if sys.version_info[0] < 3:
|
key_bytes = codecs.decode(key_input, 'hex')
|
||||||
key_bytes = key_input.decode('hex')
|
|
||||||
else:
|
|
||||||
key_bytes = bytearray()
|
|
||||||
key_bytes = codecs.decode(key_input, 'hex')
|
|
||||||
|
|
||||||
|
|
||||||
key_len = len(key_bytes)
|
key_len = len(key_bytes)
|
||||||
keys_page_buf[0:key_len] = key_bytes
|
keys_page_buf[0:key_len] = key_bytes
|
||||||
|
|
||||||
crc_data = keys_page_buf[0:key_len]
|
crc_data = keys_page_buf[0:key_len]
|
||||||
|
crc_data = bytes(crc_data)
|
||||||
if sys.version_info[0] < 3:
|
crc = zlib.crc32(crc_data, 0xFFFFFFFF)
|
||||||
crc = zlib.crc32(buffer(crc_data), 0xFFFFFFFF)
|
|
||||||
else:
|
|
||||||
crc = zlib.crc32(memoryview(crc_data), 0xFFFFFFFF)
|
|
||||||
|
|
||||||
struct.pack_into('<I', keys_page_buf, key_len, crc & 0xFFFFFFFF)
|
struct.pack_into('<I', keys_page_buf, key_len, crc & 0xFFFFFFFF)
|
||||||
output_keys_file.write(keys_page_buf)
|
|
||||||
|
with open("encryption_keys.bin",'wb') as output_keys_file:
|
||||||
|
output_keys_file.write(keys_page_buf)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user