tools: Split up large binaries into smaller chunks in the DFU binary

ROM will erase the region a partition is in as soon as it receives the
first bit of the data that is in the partition. For large partitions it
takes more than 5 seconds to erase which is a hard-coded limit in
dfu-utils.

This splits large binaries and adds them by chunks which should avoid
timing-out during flashing.

Closes https://github.com/espressif/esp-idf/issues/6999
This commit is contained in:
Roland Dobai 2021-05-14 14:22:54 +02:00
parent 8d7599cc3d
commit 6b75bad2b1
5 changed files with 114 additions and 53 deletions

View File

@ -6,7 +6,11 @@ def action_extensions(base_actions, project_path):
SUPPORTED_TARGETS = ['esp32s2'] SUPPORTED_TARGETS = ['esp32s2']
def dfu_target(target_name, ctx, args): def dfu_target(target_name, ctx, args, part_size):
ensure_build_directory(args, ctx.info_name)
run_target(target_name, args, {'ESP_DFU_PART_SIZE': part_size} if part_size else {})
def dfu_list_target(target_name, ctx, args):
ensure_build_directory(args, ctx.info_name) ensure_build_directory(args, ctx.info_name)
run_target(target_name, args) run_target(target_name, args)
@ -27,9 +31,17 @@ def action_extensions(base_actions, project_path):
'callback': dfu_target, 'callback': dfu_target,
'short_help': 'Build the DFU binary', 'short_help': 'Build the DFU binary',
'dependencies': ['all'], 'dependencies': ['all'],
'options': [
{
'names': ['--part-size'],
'help': 'Large files are split up into smaller partitions in order to avoid timeout during '
'erasing flash. This option allows to overwrite the default partition size of '
'mkdfu.py.'
}
],
}, },
'dfu-list': { 'dfu-list': {
'callback': dfu_target, 'callback': dfu_list_target,
'short_help': 'List DFU capable devices', 'short_help': 'List DFU capable devices',
'dependencies': [], 'dependencies': [],
}, },
@ -42,7 +54,7 @@ def action_extensions(base_actions, project_path):
'names': ['--path'], 'names': ['--path'],
'default': '', 'default': '',
'help': 'Specify path to DFU device. The default empty path works if there is just one ' 'help': 'Specify path to DFU device. The default empty path works if there is just one '
'ESP device with the same product identificator. See the device list for paths ' 'ESP device with the same product identifier. See the device list for paths '
'of available devices.' 'of available devices.'
} }
], ],

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
# #
# Copyright 2020 Espressif Systems (Shanghai) PTE LTD # Copyright 2020-2021 Espressif Systems (Shanghai) CO LTD
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -21,6 +21,8 @@
# This file must be the first one in the archive. It contains binary structures describing each # This file must be the first one in the archive. It contains binary structures describing each
# subsequent file (for example, where the file needs to be flashed/loaded). # subsequent file (for example, where the file needs to be flashed/loaded).
from __future__ import print_function, unicode_literals
import argparse import argparse
import hashlib import hashlib
import json import json
@ -28,6 +30,7 @@ import os
import struct import struct
import zlib import zlib
from collections import namedtuple from collections import namedtuple
from functools import partial
from future.utils import iteritems from future.utils import iteritems
@ -125,16 +128,31 @@ def pad_bytes(b, multiple, padding=b'\x00'): # type: (bytes, int, bytes) -> byt
class EspDfuWriter(object): class EspDfuWriter(object):
def __init__(self, dest_file, pid): # type: (typing.BinaryIO, int) -> None def __init__(self, dest_file, pid, part_size): # type: (typing.BinaryIO, int, int) -> None
self.dest = dest_file self.dest = dest_file
self.pid = pid self.pid = pid
self.part_size = part_size
self.entries = [] # type: typing.List[bytes] self.entries = [] # type: typing.List[bytes]
self.index = [] # type: typing.List[DFUInfo] self.index = [] # type: typing.List[DFUInfo]
def add_file(self, flash_addr, path): # type: (int, str) -> None def add_file(self, flash_addr, path): # type: (int, str) -> None
""" Add file to be written into flash at given address """ """
Add file to be written into flash at given address
Files are split up into chunks in order avoid timing-out during erasing large regions. Instead of adding
"app.bin" at flash_addr it will add:
1. app.bin at flash_addr # sizeof(app.bin) == self.part_size
2. app.bin.1 at flash_addr + self.part_size
3. app.bin.2 at flash_addr + 2 * self.part_size
...
"""
f_name = os.path.basename(path)
with open(path, 'rb') as f: with open(path, 'rb') as f:
self._add_cpio_flash_entry(os.path.basename(path), flash_addr, f.read()) for i, chunk in enumerate(iter(partial(f.read, self.part_size), b'')):
n = f_name if i == 0 else '.'.join([f_name, str(i)])
self._add_cpio_flash_entry(n, flash_addr, chunk)
flash_addr += len(chunk)
def finish(self): # type: () -> None def finish(self): # type: () -> None
""" Write DFU file """ """ Write DFU file """
@ -188,12 +206,14 @@ class EspDfuWriter(object):
def action_write(args): # type: (typing.Mapping[str, typing.Any]) -> None def action_write(args): # type: (typing.Mapping[str, typing.Any]) -> None
writer = EspDfuWriter(args['output_file'], args['pid']) writer = EspDfuWriter(args['output_file'], args['pid'], args['part_size'])
for addr, f in args['files']: for addr, f in args['files']:
print('Adding {} at {:#x}'.format(f, addr)) print('Adding {} at {:#x}'.format(f, addr))
writer.add_file(addr, f) writer.add_file(addr, f)
writer.finish() writer.finish()
print('"{}" has been written. You may proceed with DFU flashing.'.format(args['output_file'].name)) print('"{}" has been written. You may proceed with DFU flashing.'.format(args['output_file'].name))
if args['part_size'] % (4 * 1024) != 0:
print('WARNING: Partition size of DFU is not multiple of 4k (4096). You might get unexpected behavior.')
def main(): # type: () -> None def main(): # type: () -> None
@ -212,6 +232,10 @@ def main(): # type: () -> None
help='Hexa-decimal product indentificator') help='Hexa-decimal product indentificator')
write_parser.add_argument('--json', write_parser.add_argument('--json',
help='Optional file for loading "flash_files" dictionary with <address> <file> items') help='Optional file for loading "flash_files" dictionary with <address> <file> items')
write_parser.add_argument('--part-size',
default=os.environ.get('ESP_DFU_PART_SIZE', 512 * 1024),
type=lambda x: int(x, 0),
help='Larger files are split-up into smaller partitions of this size')
write_parser.add_argument('files', write_parser.add_argument('files',
metavar='<address> <file>', help='Add <file> at <address>', metavar='<address> <file>', help='Add <file> at <address>',
nargs='*') nargs='*')
@ -241,12 +265,13 @@ def main(): # type: () -> None
files += [(int(addr, 0), files += [(int(addr, 0),
process_json_file(f_name)) for addr, f_name in iteritems(json.load(f)['flash_files'])] process_json_file(f_name)) for addr, f_name in iteritems(json.load(f)['flash_files'])]
files = sorted([(addr, f_name) for addr, f_name in iteritems(dict(files))], files = sorted([(addr, f_name.decode('utf-8') if isinstance(f_name, type(b'')) else f_name) for addr, f_name in iteritems(dict(files))],
key=lambda x: x[0]) # remove possible duplicates and sort based on the address key=lambda x: x[0]) # remove possible duplicates and sort based on the address
cmd_args = {'output_file': args.output_file, cmd_args = {'output_file': args.output_file,
'files': files, 'files': files,
'pid': args.pid, 'pid': args.pid,
'part_size': args.part_size,
} }
{'write': action_write {'write': action_write

View File

@ -1,7 +0,0 @@
{
"flash_files" : {
"0x8000" : "2.bin",
"0x1000" : "1.bin",
"0x10000" : "3.bin"
}
}

BIN
tools/test_mkdfu/2/dfu.bin Normal file

Binary file not shown.

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
# Copyright 2020 Espressif Systems (Shanghai) CO LTD # Copyright 2020-2021 Espressif Systems (Shanghai) CO LTD
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -17,7 +17,9 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import collections
import filecmp import filecmp
import json
import os import os
import shutil import shutil
import sys import sys
@ -31,21 +33,32 @@ current_dir = os.path.dirname(os.path.realpath(__file__))
mkdfu_path = os.path.join(current_dir, '..', 'mkdfu.py') mkdfu_path = os.path.join(current_dir, '..', 'mkdfu.py')
class TestHelloWorldExample(unittest.TestCase): class TestMkDFU(unittest.TestCase):
def common_test(self, add_args): def common_test(self, json_input=None, file_args=[], output_to_compare=None, part_size=None):
with tempfile.NamedTemporaryFile(delete=False) as f: '''
self.addCleanup(os.unlink, f.name) - json_input - input JSON file compatible with mkdfu.py - used when not None
cmd = ' '.join([sys.executable, mkdfu_path, 'write', - file_args - list of (address, path_to_file) tuples
'-o', f.name, - output_to_compare - path to the file containing the expected output - tested when not None
'--pid', '2', - part_size - partition size - used when not None
add_args]) '''
p = pexpect.spawn(cmd, timeout=10) with tempfile.NamedTemporaryFile(delete=False) as f_out:
self.addCleanup(os.unlink, f_out.name)
args = [mkdfu_path, 'write',
'-o', f_out.name,
'--pid', '2']
if part_size:
args += ['--part-size', str(part_size)]
if json_input:
args += ['--json', json_input]
for addr, f_path in file_args:
args += [str(addr), f_path]
p = pexpect.spawn(sys.executable, args, timeout=10, encoding='utf-8')
self.addCleanup(p.terminate, force=True) self.addCleanup(p.terminate, force=True)
p.expect_exact(['Adding 1/bootloader.bin at 0x1000', for addr, f_path in sorted(file_args, key=lambda e: e[0]):
'Adding 1/partition-table.bin at 0x8000', p.expect_exact('Adding {} at {}'.format(f_path, hex(addr)))
'Adding 1/hello-world.bin at 0x10000',
'"{}" has been written. You may proceed with DFU flashing.'.format(f.name)]) p.expect_exact('"{}" has been written. You may proceed with DFU flashing.'.format(f_out.name))
# Need to wait for the process to end because the output file is closed when mkdfu exits. # Need to wait for the process to end because the output file is closed when mkdfu exits.
# Do non-blocking wait instead of the blocking p.wait(): # Do non-blocking wait instead of the blocking p.wait():
@ -56,25 +69,34 @@ class TestHelloWorldExample(unittest.TestCase):
else: else:
p.terminate() p.terminate()
self.assertTrue(filecmp.cmp(f.name, os.path.join(current_dir, '1','dfu.bin')), 'Output files are different') if output_to_compare:
self.assertTrue(filecmp.cmp(f_out.name, os.path.join(current_dir, output_to_compare)), 'Output files are different')
class TestHelloWorldExample(TestMkDFU):
'''
tests with images prepared in the "1" subdirectory
'''
def test_with_json(self): def test_with_json(self):
self.common_test(' '.join(['--json', os.path.join(current_dir, '1', 'flasher_args.json')])) with tempfile.NamedTemporaryFile(mode='w', dir=os.path.join(current_dir, '1'), delete=False) as f:
self.addCleanup(os.unlink, f.name)
bins = [('0x1000', '1.bin'), ('0x8000', '2.bin'), ('0x10000', '3.bin')]
json.dump({'flash_files': collections.OrderedDict(bins)}, f)
self.common_test(json_input=f.name, output_to_compare='1/dfu.bin')
def test_without_json(self): def test_without_json(self):
self.common_test(' '.join(['0x1000', os.path.join(current_dir, '1', '1.bin'), self.common_test(file_args=[(0x1000, '1/1.bin'),
'0x8000', os.path.join(current_dir, '1', '2.bin'), (0x8000, '1/2.bin'),
'0x10000', os.path.join(current_dir, '1', '3.bin') (0x10000, '1/3.bin')],
])) output_to_compare='1/dfu.bin')
def test_filenames(self): def test_filenames(self):
temp_dir = tempfile.mkdtemp(prefix='very_long_directory_name' * 8) temp_dir = tempfile.mkdtemp(prefix='very_long_directory_name' * 8)
self.addCleanup(shutil.rmtree, temp_dir, ignore_errors=True) self.addCleanup(shutil.rmtree, temp_dir, ignore_errors=True)
with tempfile.NamedTemporaryFile(dir=temp_dir, delete=False) as f:
output = f.name
with tempfile.NamedTemporaryFile(prefix='ľščťžýáíéěř\u0420\u043e\u0441\u0441\u0438\u044f', with tempfile.NamedTemporaryFile(prefix='ľščťžýáíéěř\u0420\u043e\u0441\u0441\u0438\u044f',
dir=temp_dir, dir=temp_dir,
delete=False) as f: delete=False) as f:
@ -82,21 +104,30 @@ class TestHelloWorldExample(unittest.TestCase):
shutil.copyfile(os.path.join(current_dir, '1', '1.bin'), bootloader) shutil.copyfile(os.path.join(current_dir, '1', '1.bin'), bootloader)
cmd = ' '.join([sys.executable, mkdfu_path, 'write', self.common_test(file_args=[(0x1000, bootloader),
'-o', output, (0x8000, os.path.join(current_dir, '1', '2.bin')),
'--pid', '2', (0x10000, os.path.join(current_dir, '1', '3.bin'))])
' '.join(['0x1000', bootloader,
'0x8000', os.path.join(current_dir, '1', '2.bin'),
'0x10000', os.path.join(current_dir, '1', '3.bin')
])
])
p = pexpect.spawn(cmd, timeout=10, encoding='utf-8')
self.addCleanup(p.terminate, force=True)
p.expect_exact(['Adding {} at 0x1000'.format(bootloader),
'Adding 1/2.bin at 0x8000', class TestSplit(TestMkDFU):
'Adding 1/3.bin at 0x10000', '''
'"{}" has been written. You may proceed with DFU flashing.'.format(output)]) tests with images prepared in the "2" subdirectory
"2/dfu.bin" was prepared with:
mkdfu.py write --part-size 5 --pid 2 -o 2/dfu.bin 0 bin
where the content of "bin" is b"\xce" * 10
'''
def test_split(self):
temp_dir = tempfile.mkdtemp(dir=current_dir)
self.addCleanup(shutil.rmtree, temp_dir, ignore_errors=True)
with open(os.path.join(temp_dir, 'bin'), 'wb') as f:
self.addCleanup(os.unlink, f.name)
f.write(b'\xce' * 10)
self.common_test(file_args=[(0, f.name)],
part_size=5,
output_to_compare='2/dfu.bin')
if __name__ == '__main__': if __name__ == '__main__':