diff --git a/tools/idf_py_actions/dfu_ext.py b/tools/idf_py_actions/dfu_ext.py index af8d2d7fa0..ed7092771f 100644 --- a/tools/idf_py_actions/dfu_ext.py +++ b/tools/idf_py_actions/dfu_ext.py @@ -6,9 +6,9 @@ def action_extensions(base_actions, project_path): SUPPORTED_TARGETS = ['esp32s2'] - def dfu_target(target_name, ctx, args): + def dfu_target(target_name, ctx, args, part_size): ensure_build_directory(args, ctx.info_name) - run_target(target_name, args) + run_target(target_name, args, {'ESP_DFU_PART_SIZE': part_size} if part_size else {}) def dfu_flash_target(target_name, ctx, args): ensure_build_directory(args, ctx.info_name) @@ -22,16 +22,24 @@ def action_extensions(base_actions, project_path): raise dfu_actions = { - "actions": { - "dfu": { - "callback": dfu_target, - "short_help": "Build the DFU binary", - "dependencies": ["all"], + 'actions': { + 'dfu': { + 'callback': dfu_target, + 'short_help': 'Build the DFU binary', + 'dependencies': ['all'], + 'options': [ + { + 'names': ['--part-size'], + 'help': 'Large files are split up into smaller partitions in order to avoid timeout during ' + 'erasing flash. This option allows to overwrite the default partition size of ' + 'mkdfu.py.' + } + ], }, - "dfu-flash": { - "callback": dfu_flash_target, - "short_help": "Flash the DFU binary", - "order_dependencies": ["dfu"], + 'dfu-flash': { + 'callback': dfu_flash_target, + 'short_help': 'Flash the DFU binary', + 'order_dependencies': ['dfu'], }, } } diff --git a/tools/mkdfu.py b/tools/mkdfu.py index d2e562c717..6b49da6eff 100755 --- a/tools/mkdfu.py +++ b/tools/mkdfu.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2020 Espressif Systems (Shanghai) PTE LTD +# Copyright 2020-2021 Espressif Systems (Shanghai) CO LTD # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,14 +21,18 @@ # This file must be the first one in the archive. It contains binary structures describing each # subsequent file (for example, where the file needs to be flashed/loaded). -from collections import namedtuple -from future.utils import iteritems +from __future__ import print_function, unicode_literals + import argparse import hashlib import json import os import struct import zlib +from collections import namedtuple +from functools import partial + +from future.utils import iteritems try: import typing @@ -43,28 +47,28 @@ except ImportError: pass # CPIO ("new ASCII") format related things -CPIO_MAGIC = b"070701" -CPIO_STRUCT = b"=6s" + b"8s" * 13 +CPIO_MAGIC = b'070701' +CPIO_STRUCT = b'=6s' + b'8s' * 13 CPIOHeader = namedtuple( - "CPIOHeader", + 'CPIOHeader', [ - "magic", - "ino", - "mode", - "uid", - "gid", - "nlink", - "mtime", - "filesize", - "devmajor", - "devminor", - "rdevmajor", - "rdevminor", - "namesize", - "check", + 'magic', + 'ino', + 'mode', + 'uid', + 'gid', + 'nlink', + 'mtime', + 'filesize', + 'devmajor', + 'devminor', + 'rdevmajor', + 'rdevminor', + 'namesize', + 'check', ], ) -CPIO_TRAILER = "TRAILER!!!" +CPIO_TRAILER = 'TRAILER!!!' def make_cpio_header( @@ -73,7 +77,7 @@ def make_cpio_header( """ Returns CPIOHeader for the given file name and file size """ def as_hex(val): # type: (int) -> bytes - return "{:08x}".format(val).encode("ascii") + return '{:08x}'.format(val).encode('ascii') hex_0 = as_hex(0) mode = hex_0 if is_trailer else as_hex(0o0100644) @@ -98,19 +102,17 @@ def make_cpio_header( # DFU format related things # Structure of one entry in dfuinfo0.dat -DFUINFO_STRUCT = b" int @@ -119,39 +121,56 @@ def dfu_crc(data, crc=0): # type: (bytes, int) -> int return uint32_max - (zlib.crc32(data, crc) & uint32_max) -def pad_bytes(b, multiple, padding=b"\x00"): # type: (bytes, int, bytes) -> bytes +def pad_bytes(b, multiple, padding=b'\x00'): # type: (bytes, int, bytes) -> bytes """ Pad 'b' to a length divisible by 'multiple' """ padded_len = (len(b) + multiple - 1) // multiple * multiple return b + padding * (padded_len - len(b)) class EspDfuWriter(object): - def __init__(self, dest_file): # type: (typing.BinaryIO) -> None + def __init__(self, dest_file, pid, part_size): # type: (typing.BinaryIO, int, int) -> None self.dest = dest_file + self.pid = pid + self.part_size = part_size self.entries = [] # type: typing.List[bytes] self.index = [] # type: typing.List[DFUInfo] def add_file(self, flash_addr, path): # type: (int, str) -> None - """ Add file to be written into flash at given address """ - with open(path, "rb") as f: - self._add_cpio_flash_entry(os.path.basename(path), flash_addr, f.read()) + """ + Add file to be written into flash at given address + + Files are split up into chunks in order avoid timing-out during erasing large regions. Instead of adding + "app.bin" at flash_addr it will add: + 1. app.bin at flash_addr # sizeof(app.bin) == self.part_size + 2. app.bin.1 at flash_addr + self.part_size + 3. app.bin.2 at flash_addr + 2 * self.part_size + ... + + """ + f_name = os.path.basename(path) + with open(path, 'rb') as f: + for i, chunk in enumerate(iter(partial(f.read, self.part_size), b'')): + n = f_name if i == 0 else '.'.join([f_name, str(i)]) + self._add_cpio_flash_entry(n, flash_addr, chunk) + flash_addr += len(chunk) def finish(self): # type: () -> None """ Write DFU file """ # Prepare and add dfuinfo0.dat file - dfuinfo = b"".join([struct.pack(DFUINFO_STRUCT, *item) for item in self.index]) + dfuinfo = b''.join([struct.pack(DFUINFO_STRUCT, *item) for item in self.index]) self._add_cpio_entry(DFUINFO_FILE, dfuinfo, first=True) # Add CPIO archive trailer - self._add_cpio_entry(CPIO_TRAILER, b"", trailer=True) + self._add_cpio_entry(CPIO_TRAILER, b'', trailer=True) # Combine all the entries and pad the file - out_data = b"".join(self.entries) + out_data = b''.join(self.entries) cpio_block_size = 10240 out_data = pad_bytes(out_data, cpio_block_size) # Add DFU suffix and CRC - out_data += struct.pack(DFUSUFFIX_STRUCT, *DFUSUFFIX_DEFAULT) + dfu_suffix = DFUSuffix(0xFFFF, self.pid, ESPRESSIF_VID, 0x0100, b'UFD', 16) + out_data += struct.pack(DFUSUFFIX_STRUCT, *dfu_suffix) out_data += struct.pack(DFUCRC_STRUCT, dfu_crc(out_data)) # Finally write the entire binary @@ -166,7 +185,7 @@ class EspDfuWriter(object): DFUInfo( address=flash_addr, flags=0, - name=filename.encode("utf-8"), + name=filename.encode('utf-8'), md5=md5.digest(), ) ) @@ -175,7 +194,7 @@ class EspDfuWriter(object): def _add_cpio_entry( self, filename, data, first=False, trailer=False ): # type: (str, bytes, bool, bool) -> None - filename_b = filename.encode("utf-8") + b"\x00" + filename_b = filename.encode('utf-8') + b'\x00' cpio_header = make_cpio_header(len(filename_b), len(data), is_trailer=trailer) entry = pad_bytes( struct.pack(CPIO_STRUCT, *cpio_header) + filename_b, 4 @@ -186,30 +205,41 @@ class EspDfuWriter(object): self.entries.insert(0, entry) -def action_write(args): - writer = EspDfuWriter(args['output_file']) +def action_write(args): # type: (typing.Mapping[str, typing.Any]) -> None + writer = EspDfuWriter(args['output_file'], args['pid'], args['part_size']) for addr, f in args['files']: print('Adding {} at {:#x}'.format(f, addr)) writer.add_file(addr, f) writer.finish() print('"{}" has been written. You may proceed with DFU flashing.'.format(args['output_file'].name)) + if args['part_size'] % (4 * 1024) != 0: + print('WARNING: Partition size of DFU is not multiple of 4k (4096). You might get unexpected behavior.') def main(): parser = argparse.ArgumentParser() # Provision to add "info" command - subparsers = parser.add_subparsers(dest="command") - write_parser = subparsers.add_parser("write") - write_parser.add_argument("-o", "--output-file", + subparsers = parser.add_subparsers(dest='command') + write_parser = subparsers.add_parser('write') + write_parser.add_argument('-o', '--output-file', help='Filename for storing the output DFU image', required=True, - type=argparse.FileType("wb")) - write_parser.add_argument("--json", + type=argparse.FileType('wb')) + write_parser.add_argument('--pid', + required=False, # This ESP-IDF release supports one compatible target only + default=2, # ESP32-S2 + type=lambda h: int(h, 16), + help='Hexa-decimal product indentificator') + write_parser.add_argument('--json', help='Optional file for loading "flash_files" dictionary with
items') - write_parser.add_argument("files", - metavar="
", help='Add at
', - nargs="*") + write_parser.add_argument('--part-size', + default=os.environ.get('ESP_DFU_PART_SIZE', 512 * 1024), + type=lambda x: int(x, 0), + help='Larger files are split-up into smaller partitions of this size') + write_parser.add_argument('files', + metavar='
', help='Add at
', + nargs='*') args = parser.parse_args() @@ -236,16 +266,18 @@ def main(): files += [(int(addr, 0), process_json_file(f_name)) for addr, f_name in iteritems(json.load(f)['flash_files'])] - files = sorted([(addr, f_name) for addr, f_name in iteritems(dict(files))], + files = sorted([(addr, f_name.decode('utf-8') if isinstance(f_name, type(b'')) else f_name) for addr, f_name in iteritems(dict(files))], key=lambda x: x[0]) # remove possible duplicates and sort based on the address cmd_args = {'output_file': args.output_file, 'files': files, + 'pid': args.pid, + 'part_size': args.part_size, } {'write': action_write }[args.command](cmd_args) -if __name__ == "__main__": +if __name__ == '__main__': main() diff --git a/tools/test_mkdfu/1/dfu.bin b/tools/test_mkdfu/1/dfu.bin index 74f9fef52c..cc28754f38 100644 Binary files a/tools/test_mkdfu/1/dfu.bin and b/tools/test_mkdfu/1/dfu.bin differ diff --git a/tools/test_mkdfu/1/flasher_args.json b/tools/test_mkdfu/1/flasher_args.json deleted file mode 100644 index 06a54d0cc7..0000000000 --- a/tools/test_mkdfu/1/flasher_args.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "flash_files" : { - "0x8000" : "2.bin", - "0x1000" : "1.bin", - "0x10000" : "3.bin" - } -} diff --git a/tools/test_mkdfu/2/dfu.bin b/tools/test_mkdfu/2/dfu.bin new file mode 100644 index 0000000000..31774a80cf Binary files /dev/null and b/tools/test_mkdfu/2/dfu.bin differ diff --git a/tools/test_mkdfu/test_mkdfu.py b/tools/test_mkdfu/test_mkdfu.py index 1ce60c227d..0646148518 100755 --- a/tools/test_mkdfu/test_mkdfu.py +++ b/tools/test_mkdfu/test_mkdfu.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # -# Copyright 2020 Espressif Systems (Shanghai) CO LTD +# Copyright 2020-2021 Espressif Systems (Shanghai) CO LTD # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,33 +16,49 @@ # limitations under the License. from __future__ import unicode_literals + +import collections import filecmp +import json import os -import pexpect import shutil import sys import tempfile import time import unittest +import pexpect + current_dir = os.path.dirname(os.path.realpath(__file__)) mkdfu_path = os.path.join(current_dir, '..', 'mkdfu.py') -class TestHelloWorldExample(unittest.TestCase): - def common_test(self, add_args): - with tempfile.NamedTemporaryFile(delete=False) as f: - self.addCleanup(os.unlink, f.name) - cmd = ' '.join([sys.executable, mkdfu_path, 'write', - '-o', f.name, - add_args]) - p = pexpect.spawn(cmd, timeout=10) +class TestMkDFU(unittest.TestCase): + def common_test(self, json_input=None, file_args=[], output_to_compare=None, part_size=None): + ''' + - json_input - input JSON file compatible with mkdfu.py - used when not None + - file_args - list of (address, path_to_file) tuples + - output_to_compare - path to the file containing the expected output - tested when not None + - part_size - partition size - used when not None + ''' + with tempfile.NamedTemporaryFile(delete=False) as f_out: + self.addCleanup(os.unlink, f_out.name) + args = [mkdfu_path, 'write', + '-o', f_out.name, + '--pid', '2'] + if part_size: + args += ['--part-size', str(part_size)] + if json_input: + args += ['--json', json_input] + for addr, f_path in file_args: + args += [str(addr), f_path] + p = pexpect.spawn(sys.executable, args, timeout=10, encoding='utf-8') self.addCleanup(p.terminate, force=True) - p.expect_exact(['Adding 1/bootloader.bin at 0x1000', - 'Adding 1/partition-table.bin at 0x8000', - 'Adding 1/hello-world.bin at 0x10000', - '"{}" has been written. You may proceed with DFU flashing.'.format(f.name)]) + for addr, f_path in sorted(file_args, key=lambda e: e[0]): + p.expect_exact('Adding {} at {}'.format(f_path, hex(addr))) + + p.expect_exact('"{}" has been written. You may proceed with DFU flashing.'.format(f_out.name)) # Need to wait for the process to end because the output file is closed when mkdfu exits. # Do non-blocking wait instead of the blocking p.wait(): @@ -53,25 +69,34 @@ class TestHelloWorldExample(unittest.TestCase): else: p.terminate() - self.assertTrue(filecmp.cmp(f.name, os.path.join(current_dir, '1','dfu.bin')), 'Output files are different') + if output_to_compare: + self.assertTrue(filecmp.cmp(f_out.name, os.path.join(current_dir, output_to_compare)), 'Output files are different') + +class TestHelloWorldExample(TestMkDFU): + ''' + tests with images prepared in the "1" subdirectory + ''' def test_with_json(self): - self.common_test(' '.join(['--json', os.path.join(current_dir, '1', 'flasher_args.json')])) + with tempfile.NamedTemporaryFile(mode='w', dir=os.path.join(current_dir, '1'), delete=False) as f: + self.addCleanup(os.unlink, f.name) + + bins = [('0x1000', '1.bin'), ('0x8000', '2.bin'), ('0x10000', '3.bin')] + json.dump({'flash_files': collections.OrderedDict(bins)}, f) + + self.common_test(json_input=f.name, output_to_compare='1/dfu.bin') def test_without_json(self): - self.common_test(' '.join(['0x1000', os.path.join(current_dir, '1', '1.bin'), - '0x8000', os.path.join(current_dir, '1', '2.bin'), - '0x10000', os.path.join(current_dir, '1', '3.bin') - ])) + self.common_test(file_args=[(0x1000, '1/1.bin'), + (0x8000, '1/2.bin'), + (0x10000, '1/3.bin')], + output_to_compare='1/dfu.bin') def test_filenames(self): temp_dir = tempfile.mkdtemp(prefix='very_long_directory_name' * 8) self.addCleanup(shutil.rmtree, temp_dir, ignore_errors=True) - with tempfile.NamedTemporaryFile(dir=temp_dir, delete=False) as f: - output = f.name - with tempfile.NamedTemporaryFile(prefix='ľščťžýáíéěř\u0420\u043e\u0441\u0441\u0438\u044f', dir=temp_dir, delete=False) as f: @@ -79,20 +104,30 @@ class TestHelloWorldExample(unittest.TestCase): shutil.copyfile(os.path.join(current_dir, '1', '1.bin'), bootloader) - cmd = ' '.join([sys.executable, mkdfu_path, 'write', - '-o', output, - ' '.join(['0x1000', bootloader, - '0x8000', os.path.join(current_dir, '1', '2.bin'), - '0x10000', os.path.join(current_dir, '1', '3.bin') - ]) - ]) - p = pexpect.spawn(cmd, timeout=10, encoding='utf-8') - self.addCleanup(p.terminate, force=True) + self.common_test(file_args=[(0x1000, bootloader), + (0x8000, os.path.join(current_dir, '1', '2.bin')), + (0x10000, os.path.join(current_dir, '1', '3.bin'))]) - p.expect_exact(['Adding {} at 0x1000'.format(bootloader), - 'Adding 1/2.bin at 0x8000', - 'Adding 1/3.bin at 0x10000', - '"{}" has been written. You may proceed with DFU flashing.'.format(output)]) + +class TestSplit(TestMkDFU): + ''' + tests with images prepared in the "2" subdirectory + + "2/dfu.bin" was prepared with: + mkdfu.py write --part-size 5 --pid 2 -o 2/dfu.bin 0 bin + where the content of "bin" is b"\xce" * 10 + ''' + def test_split(self): + temp_dir = tempfile.mkdtemp(dir=current_dir) + self.addCleanup(shutil.rmtree, temp_dir, ignore_errors=True) + + with open(os.path.join(temp_dir, 'bin'), 'wb') as f: + self.addCleanup(os.unlink, f.name) + f.write(b'\xce' * 10) + + self.common_test(file_args=[(0, f.name)], + part_size=5, + output_to_compare='2/dfu.bin') if __name__ == '__main__':