From 1de627e68aee139255689c6ce16d8d26d9c9bd4b Mon Sep 17 00:00:00 2001 From: Renz Christian Bagaporo Date: Mon, 27 May 2019 11:08:28 +0800 Subject: [PATCH] app_update: implement Python API for otatool --- components/app_update/CMakeLists.txt | 6 +- components/app_update/Makefile.projbuild | 3 +- components/app_update/otatool.py | 327 ++++++++++++----------- docs/en/api-reference/system/ota.rst | 98 +++++++ 4 files changed, 277 insertions(+), 157 deletions(-) diff --git a/components/app_update/CMakeLists.txt b/components/app_update/CMakeLists.txt index 2c705d5091..e869c188fb 100644 --- a/components/app_update/CMakeLists.txt +++ b/components/app_update/CMakeLists.txt @@ -33,10 +33,8 @@ if(NOT BOOTLOADER_BUILD) idf_build_get_property(idf_path IDF_PATH) idf_build_get_property(python PYTHON) add_custom_command(OUTPUT ${blank_otadata_file} - COMMAND ${python} ${idf_path}/components/partition_table/parttool.py - --partition-type data --partition-subtype ota -q - --partition-table-file ${PARTITION_CSV_PATH} generate_blank_partition_file - --output ${blank_otadata_file}) + COMMAND ${python} ${idf_path}/components/partition_table/gen_empty_partition.py + ${otadata_size} ${blank_otadata_file}) add_custom_target(blank_ota_data ALL DEPENDS ${blank_otadata_file}) add_dependencies(app blank_ota_data) diff --git a/components/app_update/Makefile.projbuild b/components/app_update/Makefile.projbuild index 435d7535ac..a20856a6ef 100644 --- a/components/app_update/Makefile.projbuild +++ b/components/app_update/Makefile.projbuild @@ -17,8 +17,7 @@ endif $(BLANK_OTA_DATA_FILE): partition_table_get_info $(PARTITION_TABLE_CSV_PATH) | check_python_dependencies $(shell if [ "$(OTA_DATA_OFFSET)" != "" ] && [ "$(OTA_DATA_SIZE)" != "" ]; then \ - $(PARTTOOL_PY) --partition-type data --partition-subtype ota --partition-table-file $(PARTITION_TABLE_CSV_PATH) \ - -q generate_blank_partition_file --output $(BLANK_OTA_DATA_FILE); \ + $(PYTHON) $(IDF_PATH)/components/partition_table/gen_empty_partition.py $(OTA_DATA_SIZE) $(BLANK_OTA_DATA_FILE); \ fi; ) $(eval BLANK_OTA_DATA_FILE = $(shell if [ "$(OTA_DATA_OFFSET)" != "" ] && [ "$(OTA_DATA_SIZE)" != "" ]; then \ echo $(BLANK_OTA_DATA_FILE); else echo " "; fi) ) diff --git a/components/app_update/otatool.py b/components/app_update/otatool.py index aead479d92..2cfecde18b 100755 --- a/components/app_update/otatool.py +++ b/components/app_update/otatool.py @@ -21,16 +21,20 @@ import argparse import os import sys import binascii -import subprocess import tempfile import collections import struct -__version__ = '1.0' +try: + from parttool import PartitionName, PartitionType, ParttoolTarget, PARTITION_TABLE_OFFSET +except ImportError: + COMPONENTS_PATH = os.path.expandvars(os.path.join("$IDF_PATH", "components")) + PARTTOOL_DIR = os.path.join(COMPONENTS_PATH, "partition_table") -IDF_COMPONENTS_PATH = os.path.expandvars(os.path.join("$IDF_PATH", "components")) + sys.path.append(PARTTOOL_DIR) + from parttool import PartitionName, PartitionType, ParttoolTarget, PARTITION_TABLE_OFFSET -PARTTOOL_PY = os.path.join(IDF_COMPONENTS_PATH, "partition_table", "parttool.py") +__version__ = '2.0' SPI_FLASH_SEC_SIZE = 0x2000 @@ -42,121 +46,69 @@ def status(msg): print(msg) -def _invoke_parttool(parttool_args, args, output=False, partition=None): - invoke_args = [] +class OtatoolTarget(): - if partition: - invoke_args += [sys.executable, PARTTOOL_PY] + partition - else: - invoke_args += [sys.executable, PARTTOOL_PY, "--partition-type", "data", "--partition-subtype", "ota"] + OTADATA_PARTITION = PartitionType("data", "ota") - if quiet: - invoke_args += ["-q"] + def __init__(self, port=None, partition_table_offset=PARTITION_TABLE_OFFSET, partition_table_file=None, spi_flash_sec_size=SPI_FLASH_SEC_SIZE): + self.target = ParttoolTarget(port, partition_table_offset, partition_table_file) + self.spi_flash_sec_size = spi_flash_sec_size - if args.port != "": - invoke_args += ["--port", args.port] + temp_file = tempfile.NamedTemporaryFile(delete=False) + temp_file.close() + try: + self.target.read_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name) + with open(temp_file.name, "rb") as f: + self.otadata = f.read() + except Exception: + self.otadata = None + finally: + os.unlink(temp_file.name) - if args.partition_table_file: - invoke_args += ["--partition-table-file", args.partition_table_file] + def _check_otadata_partition(self): + if not self.otadata: + raise Exception("No otadata partition found") - if args.partition_table_offset: - invoke_args += ["--partition-table-offset", args.partition_table_offset] + def erase_otadata(self): + self._check_otadata_partition() + self.target.erase_partition(OtatoolTarget.OTADATA_PARTITION) - invoke_args += parttool_args + def _get_otadata_info(self): + info = [] - if output: - return subprocess.check_output(invoke_args) - else: - return subprocess.check_call(invoke_args) + otadata_info = collections.namedtuple("otadata_info", "seq crc") + for i in range(2): + start = i * (self.spi_flash_sec_size >> 1) -def _get_otadata_contents(args, check=True): - global quiet + seq = bytearray(self.otadata[start:start + 4]) + crc = bytearray(self.otadata[start + 28:start + 32]) - if check: - check_args = ["get_partition_info", "--info", "offset", "size"] + seq = struct.unpack('>I', seq) + crc = struct.unpack('>I', crc) - quiet = True - output = _invoke_parttool(check_args, args, True).split(b" ") - quiet = args.quiet + info.append(otadata_info(seq[0], crc[0])) - if not output: - raise RuntimeError("No ota_data partition found") + return info - with tempfile.NamedTemporaryFile(delete=False) as f: - f_name = f.name + def _get_partition_id_from_ota_id(self, ota_id): + if isinstance(ota_id, int): + return PartitionType("app", "ota_" + str(ota_id)) + else: + return PartitionName(ota_id) - try: - invoke_args = ["read_partition", "--output", f_name] - _invoke_parttool(invoke_args, args) - with open(f_name, "rb") as f: - contents = f.read() - finally: - os.unlink(f_name) + def switch_ota_partition(self, ota_id): + self._check_otadata_partition() - return contents + sys.path.append(PARTTOOL_DIR) + import gen_esp32part as gen - -def _get_otadata_status(otadata_contents): - status = [] - - otadata_status = collections.namedtuple("otadata_status", "seq crc") - - for i in range(2): - start = i * (SPI_FLASH_SEC_SIZE >> 1) - - seq = bytearray(otadata_contents[start:start + 4]) - crc = bytearray(otadata_contents[start + 28:start + 32]) - - seq = struct.unpack('>I', seq) - crc = struct.unpack('>I', crc) - - status.append(otadata_status(seq[0], crc[0])) - - return status - - -def read_otadata(args): - status("Reading ota_data partition contents...") - otadata_info = _get_otadata_contents(args) - otadata_info = _get_otadata_status(otadata_info) - - print(otadata_info) - - print("\t\t{:11}\t{:8s}|\t{:8s}\t{:8s}".format("OTA_SEQ", "CRC", "OTA_SEQ", "CRC")) - print("Firmware: 0x{:8x} \t 0x{:8x} |\t0x{:8x} \t 0x{:8x}".format(otadata_info[0].seq, otadata_info[0].crc, - otadata_info[1].seq, otadata_info[1].crc)) - - -def erase_otadata(args): - status("Erasing ota_data partition contents...") - _invoke_parttool(["erase_partition"], args) - status("Erased ota_data partition contents") - - -def switch_otadata(args): - sys.path.append(os.path.join(IDF_COMPONENTS_PATH, "partition_table")) - import gen_esp32part as gen - - with tempfile.NamedTemporaryFile(delete=False) as f: - f_name = f.name - - try: - def is_otadata_status_valid(status): + def is_otadata_info_valid(status): seq = status.seq % (1 << 32) crc = hex(binascii.crc32(struct.pack("I", seq), 0xFFFFFFFF) % (1 << 32)) return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc - status("Looking for ota app partitions...") - - # In order to get the number of ota app partitions, we need the partition table - partition_table = None - invoke_args = ["get_partition_info", "--table", f_name] - - _invoke_parttool(invoke_args, args) - - partition_table = open(f_name, "rb").read() - partition_table = gen.PartitionTable.from_binary(partition_table) + partition_table = self.target.partition_table ota_partitions = list() @@ -171,39 +123,36 @@ def switch_otadata(args): ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype) if not ota_partitions: - raise RuntimeError("No ota app partitions found") - - status("Verifying partition to switch to exists...") + raise Exception("No ota app partitions found") # Look for the app partition to switch to ota_partition_next = None try: - if args.name: - ota_partition_next = filter(lambda p: p.name == args.name, ota_partitions) + if isinstance(ota_id, int): + ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA == ota_id, ota_partitions) else: - ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA == args.slot, ota_partitions) + ota_partition_next = filter(lambda p: p.name == ota_id, ota_partitions) ota_partition_next = list(ota_partition_next)[0] except IndexError: - raise RuntimeError("Partition to switch to not found") + raise Exception("Partition to switch to not found") - otadata_contents = _get_otadata_contents(args) - otadata_status = _get_otadata_status(otadata_contents) + otadata_info = self._get_otadata_info() # Find the copy to base the computation for ota sequence number on otadata_compute_base = -1 # Both are valid, take the max as computation base - if is_otadata_status_valid(otadata_status[0]) and is_otadata_status_valid(otadata_status[1]): - if otadata_status[0].seq >= otadata_status[1].seq: + if is_otadata_info_valid(otadata_info[0]) and is_otadata_info_valid(otadata_info[1]): + if otadata_info[0].seq >= otadata_info[1].seq: otadata_compute_base = 0 else: otadata_compute_base = 1 # Only one copy is valid, use that - elif is_otadata_status_valid(otadata_status[0]): + elif is_otadata_info_valid(otadata_info[0]): otadata_compute_base = 0 - elif is_otadata_status_valid(otadata_status[1]): + elif is_otadata_info_valid(otadata_info[1]): otadata_compute_base = 1 # Both are invalid (could be initial state - all 0xFF's) else: @@ -216,7 +165,7 @@ def switch_otadata(args): # Find the next ota sequence number if otadata_compute_base == 0 or otadata_compute_base == 1: - base_seq = otadata_status[otadata_compute_base].seq % (1 << 32) + base_seq = otadata_info[otadata_compute_base].seq % (1 << 32) i = 0 while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num: @@ -231,47 +180,68 @@ def switch_otadata(args): ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32) ota_seq_crc_next = struct.pack("I", ota_seq_crc_next) - with open(f_name, "wb") as otadata_next_file: - start = (1 if otadata_compute_base == 0 else 0) * (SPI_FLASH_SEC_SIZE >> 1) + temp_file = tempfile.NamedTemporaryFile(delete=False) + temp_file.close() - otadata_next_file.write(otadata_contents) + try: + with open(temp_file.name, "wb") as otadata_next_file: + start = (1 if otadata_compute_base == 0 else 0) * (self.spi_flash_sec_size >> 1) - otadata_next_file.seek(start) - otadata_next_file.write(ota_seq_next) + otadata_next_file.write(self.otadata) - otadata_next_file.seek(start + 28) - otadata_next_file.write(ota_seq_crc_next) + otadata_next_file.seek(start) + otadata_next_file.write(ota_seq_next) - otadata_next_file.flush() + otadata_next_file.seek(start + 28) + otadata_next_file.write(ota_seq_crc_next) - _invoke_parttool(["write_partition", "--input", f_name], args) - status("Updated ota_data partition") - finally: - os.unlink(f_name) + otadata_next_file.flush() + + self.target.write_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name) + finally: + os.unlink(temp_file.name) + + def read_ota_partition(self, ota_id, output): + self.target.read_partition(self._get_partition_id_from_ota_id(ota_id), output) + + def write_ota_partition(self, ota_id, input): + self.target.write_partition(self._get_partition_id_from_ota_id(ota_id), input) + + def erase_ota_partition(self, ota_id): + self.target.erase_partition(self._get_partition_id_from_ota_id(ota_id)) -def _get_partition_specifier(args): - if args.name: - return ["--partition-name", args.name] - else: - return ["--partition-type", "app", "--partition-subtype", "ota_" + str(args.slot)] +def _read_otadata(target): + target._check_otadata_partition() + + otadata_info = target._get_otadata_info(target.otadata) + + print("\t\t{:11}\t{:8s}|\t{:8s}\t{:8s}".format("OTA_SEQ", "CRC", "OTA_SEQ", "CRC")) + print("Firmware: 0x{:8x} \t 0x{:8x} |\t0x{:8x} \t 0x{:8x}".format(otadata_info[0].seq, otadata_info[0].crc, + otadata_info[1].seq, otadata_info[1].crc)) -def read_ota_partition(args): - invoke_args = ["read_partition", "--output", args.output] - _invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args)) - status("Read ota partition contents to file {}".format(args.output)) +def _erase_otadata(target): + target.erase_otadata() + status("Erased ota_data partition contents") -def write_ota_partition(args): - invoke_args = ["write_partition", "--input", args.input] - _invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args)) - status("Written contents of file {} to ota partition".format(args.input)) +def _switch_ota_partition(target, ota_id): + target.switch_ota_partition(ota_id) -def erase_ota_partition(args): - invoke_args = ["erase_partition"] - _invoke_parttool(invoke_args, args, partition=_get_partition_specifier(args)) +def _read_ota_partition(target, ota_id, output): + target.read_ota_partition(ota_id, output) + status("Read ota partition contents to file {}".format(output)) + + +def _write_ota_partition(target, ota_id, input): + target.write_ota_partition(ota_id, input) + status("Written contents of file {} to ota partition".format(input)) + + +def _erase_ota_partition(target, ota_id): + target.erase_ota_partition(ota_id) status("Erased contents of ota partition") @@ -284,17 +254,20 @@ def main(): # There are two possible sources for the partition table: a device attached to the host # or a partition table CSV/binary file. These sources are mutually exclusive. - partition_table_info_source_args = parser.add_mutually_exclusive_group() + parser.add_argument("--port", "-p", help="port where the device to read the partition table from is attached") - partition_table_info_source_args.add_argument("--port", "-p", help="port where the device to read the partition table from is attached", default="") - partition_table_info_source_args.add_argument("--partition-table-file", "-f", help="file (CSV/binary) to read the partition table from", default="") + parser.add_argument("--partition-table-offset", "-o", help="offset to read the partition table from", type=str) - parser.add_argument("--partition-table-offset", "-o", help="offset to read the partition table from", default="0x8000") + parser.add_argument("--partition-table-file", "-f", help="file (CSV/binary) to read the partition table from; \ + overrides device attached to specified port as the partition table source when defined") subparsers = parser.add_subparsers(dest="operation", help="run otatool -h for additional help") + spi_flash_sec_size = argparse.ArgumentParser(add_help=False) + spi_flash_sec_size.add_argument("--spi-flash-sec-size", help="value of SPI_FLASH_SEC_SIZE macro", type=str) + # Specify the supported operations - subparsers.add_parser("read_otadata", help="read otadata partition") + subparsers.add_parser("read_otadata", help="read otadata partition", parents=[spi_flash_sec_size]) subparsers.add_parser("erase_otadata", help="erase otadata partition") slot_or_name_parser = argparse.ArgumentParser(add_help=False) @@ -302,7 +275,7 @@ def main(): slot_or_name_parser_args.add_argument("--slot", help="slot number of the ota partition", type=int) slot_or_name_parser_args.add_argument("--name", help="name of the ota partition") - subparsers.add_parser("switch_otadata", help="switch otadata partition", parents=[slot_or_name_parser]) + subparsers.add_parser("switch_ota_partition", help="switch otadata partition", parents=[slot_or_name_parser, spi_flash_sec_size]) read_ota_partition_subparser = subparsers.add_parser("read_ota_partition", help="read contents of an ota partition", parents=[slot_or_name_parser]) read_ota_partition_subparser.add_argument("--output", help="file to write the contents of the ota partition to") @@ -322,17 +295,69 @@ def main(): parser.print_help() sys.exit(1) - # Else execute the operation - operation_func = globals()[args.operation] + target_args = {} + + if args.port: + target_args["port"] = args.port + + if args.partition_table_file: + target_args["partition_table_file"] = args.partition_table_file + + if args.partition_table_offset: + target_args["partition_table_offset"] = int(args.partition_table_offset, 0) + + try: + if args.spi_flash_sec_size: + target_args["spi_flash_sec_size"] = int(args.spi_flash_sec_size, 0) + except AttributeError: + pass + + target = OtatoolTarget(**target_args) + + # Create the operation table and execute the operation + common_args = {'target':target} + + ota_id = [] + + try: + if args.name is not None: + ota_id = ["name"] + else: + if args.slot is not None: + ota_id = ["slot"] + except AttributeError: + pass + + otatool_ops = { + 'read_otadata':(_read_otadata, []), + 'erase_otadata':(_erase_otadata, []), + 'switch_ota_partition':(_switch_ota_partition, ota_id), + 'read_ota_partition':(_read_ota_partition, ["output"] + ota_id), + 'write_ota_partition':(_write_ota_partition, ["input"] + ota_id), + 'erase_ota_partition':(_erase_ota_partition, ota_id) + } + + (op, op_args) = otatool_ops[args.operation] + + for op_arg in op_args: + common_args.update({op_arg:vars(args)[op_arg]}) + + try: + common_args['ota_id'] = common_args.pop('name') + except KeyError: + try: + common_args['ota_id'] = common_args.pop('slot') + except KeyError: + pass if quiet: # If exceptions occur, suppress and exit quietly try: - operation_func(args) + op(**common_args) except Exception: sys.exit(2) else: - operation_func(args) + op(**common_args) if __name__ == '__main__': diff --git a/docs/en/api-reference/system/ota.rst b/docs/en/api-reference/system/ota.rst index 50938a3776..d83a38dab3 100644 --- a/docs/en/api-reference/system/ota.rst +++ b/docs/en/api-reference/system/ota.rst @@ -199,6 +199,104 @@ Secure OTA Updates Without Secure boot The verification of signed OTA updates can be performed even without enabling hardware secure boot. For doing so, refer :ref:`signed-app-verify` + +OTA Tool (otatool.py) +--------------------- + +The component `app_update` provides a tool :component_file:`otatool.py` for performing OTA partition-related operations on a target device. The following operations can be performed using the tool: + + - read contents of otadata partition (read_otadata) + - erase otadata partition, effectively resetting device to factory app (erase_otadata) + - switch OTA partitions (switch_ota_partition) + - erasing OTA partition (erase_ota_partition) + - write to OTA partition (write_ota_partition) + - read contents of OTA partition (read_ota_partition) + +The tool can either be imported and used from another Python script or invoked from shell script for users wanting to perform operation programmatically. This is facilitated by the tool's Python API +and command-line interface, respectively. + +Python API +^^^^^^^^^^ + +Before anything else, make sure that the `otatool` module is imported. + +.. code-block:: python + + import sys + import os + + idf_path = os.environ["IDF_PATH"] # get value of IDF_PATH from environment + otatool_dir = os.path.join(idf_path, "components", "app_update") # otatool.py lives in $IDF_PATH/components/app_update + + sys.path.append(otatool_dir) # this enables Python to find otatool module + from otatool import * # import all names inside otatool module + +The starting point for using the tool's Python API to do is create a `OtatoolTarget` object: + +.. code-block:: python + + # Create a partool.py target device connected on serial port /dev/ttyUSB1 + target = OtatoolTarget("/dev/ttyUSB1") + +The created object can now be used to perform operations on the target device: + +.. code-block:: python + + # Erase otadata, reseting the device to factory app + target.erase_otadata() + + # Erase contents of OTA app slot 0 + target.erase_ota_partition(0) + + # Switch boot partition to that of app slot 1 + target.switch_ota_partition(1) + + # Read OTA partition 'ota_3' and save contents to a file named 'ota_3.bin' + target.read_ota_partition("ota_3", "ota_3.bin") + +The OTA partition to operate on is specified using either the app slot number or the partition name. + +More information on the Python API is available in the docstrings for the tool. + +Command-line Interface +^^^^^^^^^^^^^^^^^^^^^^ + +The command-line interface of `otatool.py` has the following structure: + +.. code-block:: bash + + otatool.py [command-args] [subcommand] [subcommand-args] + + - command-args - these are arguments that are needed for executing the main command (parttool.py), mostly pertaining to the target device + - subcommand - this is the operation to be performed + - subcommand-args - these are arguments that are specific to the chosen operation + +.. code-block:: bash + + # Erase otadata, resetting the device to factory app + otatool.py --port "/dev/ttyUSB1" erase_otadata + + # Erase contents of OTA app slot 0 + otatool.py --port "/dev/ttyUSB1" erase_ota_partition --slot 0 + + # Switch boot partition to that of app slot 1 + otatool.py --port "/dev/ttyUSB1" switch_ota_partition --slot 1 + + # Read OTA partition 'ota_3' and save contents to a file named 'ota_3.bin' + otatool.py --port "/dev/ttyUSB1" read_ota_partition --name=ota_3 + + +More information can be obtained by specifying `--help` as argument: + +.. code-block:: bash + + # Display possible subcommands and show main command argument descriptions + otatool.py --help + + # Show descriptions for specific subcommand arguments + otatool.py [subcommand] --help + + See also --------