support for generating FATFS on a host

This commit is contained in:
Martin Gano 2021-09-22 00:32:54 +02:00 committed by Martin Gaňo
parent e14b39e8fb
commit 3c4034d36e
24 changed files with 1459 additions and 0 deletions

View File

@ -103,6 +103,7 @@
/components/esptool_py/ @esp-idf-codeowners/tools
/components/expat/ @esp-idf-codeowners/app-utilities
/components/fatfs/ @esp-idf-codeowners/storage
/components/fatfs/**/*.py @esp-idf-codeowners/tools
/components/freemodbus/ @esp-idf-codeowners/peripherals
/components/freertos/ @esp-idf-codeowners/system
/components/hal/ @esp-idf-codeowners/peripherals

View File

@ -115,6 +115,12 @@ test_spiffs_on_host:
- cd ../test_spiffsgen
- ./test_spiffsgen.py
test_fatfsgen_on_host:
extends: .host_test_template
script:
- cd components/fatfs/test_fatfsgen/
- ./test_fatfsgen.py
test_multi_heap_on_host:
extends: .host_test_template
script:

216
components/fatfs/fatfsgen.py Executable file
View File

@ -0,0 +1,216 @@
#!/usr/bin/env python
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import argparse
import os
import uuid
from typing import Any, List, Optional
from construct import Const, Int8ul, Int16ul, Int32ul, PaddedString, Struct
from fatfsgen_utils.fat import FAT
from fatfsgen_utils.fatfs_state import FATFSState
from fatfsgen_utils.fs_object import Directory
from fatfsgen_utils.utils import pad_string
class FATFS:
"""
The class FATFS provides API for generating FAT file system.
It contains reference to the FAT table and to the root directory.
"""
MAX_VOL_LAB_SIZE = 11
MAX_OEM_NAME_SIZE = 8
MAX_FS_TYPE_SIZE = 8
BOOT_HEADER_SIZE = 512
BOOT_SECTOR_HEADER = Struct(
'BS_jmpBoot' / Const(b'\xeb\xfe\x90'),
'BS_OEMName' / PaddedString(MAX_OEM_NAME_SIZE, 'utf-8'),
'BPB_BytsPerSec' / Int16ul,
'BPB_SecPerClus' / Int8ul,
'BPB_RsvdSecCnt' / Int16ul,
'BPB_NumFATs' / Int8ul,
'BPB_RootEntCnt' / Int16ul,
'BPB_TotSec16' / Int16ul,
'BPB_Media' / Int8ul,
'BPB_FATSz16' / Int16ul,
'BPB_SecPerTrk' / Int16ul,
'BPB_NumHeads' / Int16ul,
'BPB_HiddSec' / Int32ul,
'BPB_TotSec32' / Int32ul,
'BS_DrvNum' / Const(b'\x80'),
'BS_Reserved1' / Const(b'\x00'),
'BS_BootSig' / Const(b'\x29'),
'BS_VolID' / Int32ul,
'BS_VolLab' / PaddedString(MAX_VOL_LAB_SIZE, 'utf-8'),
'BS_FilSysType' / PaddedString(MAX_FS_TYPE_SIZE, 'utf-8'),
'BS_EMPTY' / Const(448 * b'\x00'),
'Signature_word' / Const(b'\x55\xAA')
)
def __init__(self,
binary_image_path: Optional[str] = None,
size: int = 1024 * 1024,
reserved_sectors_cnt: int = 1,
fat_tables_cnt: int = 1,
sectors_per_cluster: int = 1,
sector_size: int = 0x1000,
sectors_per_fat: int = 1,
root_dir_sectors_cnt: int = 4,
hidden_sectors: int = 0,
long_names_enabled: bool = False,
entry_size: int = 32,
wl_sectors: int = 0,
num_heads: int = 0xff,
oem_name: str = 'MSDOS5.0',
sec_per_track: int = 0x3f,
volume_label: str = 'Espressif',
file_sys_type: str = 'FAT',
media_type: int = 0xf8) -> None:
self.state = FATFSState(entry_size=entry_size,
sector_size=sector_size,
reserved_sectors_cnt=reserved_sectors_cnt,
root_dir_sectors_cnt=root_dir_sectors_cnt,
size=size,
file_sys_type=file_sys_type,
num_heads=num_heads,
fat_tables_cnt=fat_tables_cnt,
sectors_per_fat=sectors_per_fat,
sectors_per_cluster=sectors_per_cluster,
media_type=media_type,
hidden_sectors=hidden_sectors,
sec_per_track=sec_per_track,
long_names_enabled=long_names_enabled,
volume_label=volume_label,
wl_sectors=wl_sectors,
oem_name=oem_name)
binary_image = bytearray(
self.read_filesystem(binary_image_path) if binary_image_path else self.create_empty_fatfs())
self.state.binary_image = binary_image
self.fat = FAT(fatfs_state=self.state,
reserved_sectors_cnt=self.state.reserved_sectors_cnt)
self.root_directory = Directory(name='A', # the name is not important
size=self.state.root_dir_sectors_cnt * self.state.sector_size,
fat=self.fat,
cluster=self.fat.clusters[1],
fatfs_state=self.state)
self.root_directory.init_directory()
def create_file(self, name: str, extension: str = '', path_from_root: Optional[List[str]] = None) -> None:
# when path_from_root is None the dir is root
self.root_directory.new_file(name=name, extension=extension, path_from_root=path_from_root)
def create_directory(self, name: str, path_from_root: Optional[List[str]] = None) -> None:
# when path_from_root is None the dir is root
parent_dir = self.root_directory
if path_from_root:
parent_dir = self.root_directory.recursive_search(path_from_root, self.root_directory)
self.root_directory.new_directory(name=name, parent=parent_dir, path_from_root=path_from_root)
def write_content(self, path_from_root: List[str], content: str) -> None:
"""
fat fs invokes root directory to recursively find the required file and writes the content
"""
self.root_directory.write_to_file(path_from_root, content)
def create_empty_fatfs(self) -> Any:
sectors_count = self.state.size // self.state.sector_size
volume_uuid = uuid.uuid4().int & 0xFFFFFFFF
return (
FATFS.BOOT_SECTOR_HEADER.build(
dict(BS_OEMName=pad_string(self.state.oem_name, size=FATFS.MAX_OEM_NAME_SIZE),
BPB_BytsPerSec=self.state.sectors_per_cluster * self.state.sector_size,
BPB_SecPerClus=self.state.sectors_per_cluster,
BPB_RsvdSecCnt=self.state.reserved_sectors_cnt,
BPB_NumFATs=self.state.fat_tables_cnt,
BPB_RootEntCnt=self.state.entries_root_count,
BPB_TotSec16=0x00 if self.state.fatfs_type == FATFSState.FAT32 else sectors_count,
BPB_Media=self.state.media_type,
BPB_FATSz16=self.state.sectors_per_fat_cnt,
BPB_SecPerTrk=self.state.sec_per_track,
BPB_NumHeads=self.state.num_heads,
BPB_HiddSec=self.state.hidden_sectors,
BPB_TotSec32=sectors_count if self.state.fatfs_type == FATFSState.FAT32 else 0x00,
BS_VolID=volume_uuid,
BS_VolLab=pad_string(self.state.volume_label, size=FATFS.MAX_VOL_LAB_SIZE),
BS_FilSysType=pad_string(self.state.file_sys_type, size=FATFS.MAX_FS_TYPE_SIZE)
)
)
+ (self.state.sector_size - FATFS.BOOT_HEADER_SIZE) * b'\x00'
+ self.state.sectors_per_fat_cnt * self.state.fat_tables_cnt * self.state.sector_size * b'\x00'
+ self.state.root_dir_sectors_cnt * self.state.sector_size * b'\x00'
+ self.state.data_sectors * self.state.sector_size * b'\xff'
)
@staticmethod
def read_filesystem(path: str) -> bytearray:
with open(path, 'rb') as fs_file:
return bytearray(fs_file.read())
def write_filesystem(self, output_path: str) -> None:
with open(output_path, 'wb') as output:
output.write(bytearray(self.state.binary_image))
def _generate_partition_from_folder(self,
folder_relative_path: str,
folder_path: str = '',
is_dir: bool = False) -> None:
"""
Given path to folder and folder name recursively encodes folder into binary image.
Used by method generate
"""
real_path = os.path.join(folder_path, folder_relative_path)
smaller_path = folder_relative_path
folder_relative_path = folder_relative_path.upper()
normal_path = os.path.normpath(folder_relative_path)
split_path = normal_path.split(os.sep)
if os.path.isfile(real_path):
with open(real_path) as file:
content = file.read()
file_name, extension = os.path.splitext(split_path[-1])
extension = extension[1:] # remove the dot from the extension
self.create_file(name=file_name, extension=extension, path_from_root=split_path[1:-1] or None)
self.write_content(split_path[1:], content)
elif os.path.isdir(real_path):
if not is_dir:
self.create_directory(split_path[-1], split_path[1:-1])
# sorting files for better testability
dir_content = list(sorted(os.listdir(real_path)))
for path in dir_content:
self._generate_partition_from_folder(os.path.join(smaller_path, path), folder_path=folder_path)
def generate(self, input_directory: str) -> None:
"""
Normalize path to folder and recursively encode folder to binary image
"""
path_to_folder, folder_name = os.path.split(input_directory)
self._generate_partition_from_folder(folder_name, folder_path=path_to_folder, is_dir=True)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Create a FAT filesystem and populate it with directory content')
parser.add_argument('input_directory',
help='Path to the directory that will be encoded into fatfs image')
parser.add_argument('--output_file',
default='fatfs_image.img',
help='Filename of the generated fatfs image')
parser.add_argument('--partition_size',
default=1024 * 1024,
help='Size of the partition in bytes')
args = parser.parse_args()
input_dir = args.input_directory
try:
partition_size = eval(args.partition_size)
except ValueError:
partition_size = args.partition_size
fatfs = FATFS(size=partition_size)
fatfs.generate(input_dir)
fatfs.write_filesystem(args.output_file)

View File

@ -0,0 +1,112 @@
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from typing import Optional
from .fatfs_state import FATFSState
from .utils import build_byte, clean_first_half_byte, clean_second_half_byte, split_by_half_byte_12_bit_little_endian
class Cluster:
"""
class Cluster handles values in FAT table and allocates sectors in data region.
"""
RESERVED_BLOCK_ID = 0
ROOT_BLOCK_ID = 1
ALLOCATED_BLOCK_VALUE = 0xFFF # for fat 12
def __init__(self,
cluster_id: int,
fatfs_state: FATFSState,
is_empty: bool = True) -> None:
self.id = cluster_id
self.fatfs_state = fatfs_state
self._next_cluster = None # type: Optional[Cluster]
if self.id == Cluster.RESERVED_BLOCK_ID:
self.is_empty = False
self.set_in_fat(0xff8)
return
self.cluster_data_address = self._compute_cluster_data_address()
self.is_empty = is_empty
assert self.cluster_data_address or self.is_empty
@property
def next_cluster(self): # type: () -> Optional[Cluster]
return self._next_cluster
@next_cluster.setter
def next_cluster(self, value): # type: (Optional[Cluster]) -> None
self._next_cluster = value
def _cluster_id_to_logical_position_in_bits(self, _id: int) -> int:
# computes address of the cluster in fat table
return self.fatfs_state.fatfs_type * _id # type: ignore
def _compute_cluster_data_address(self) -> int:
if self.id == Cluster.ROOT_BLOCK_ID:
return self.fatfs_state.root_directory_start # type: ignore
# the first data cluster id is 2 (we have to subtract reserved cluster and cluster for root)
return self.fatfs_state.sector_size * (self.id - 2) + self.fatfs_state.data_region_start # type: ignore
def _set_first_half_byte(self, address: int, value: int) -> None:
clean_second_half_byte(self.fatfs_state.binary_image, address)
self.fatfs_state.binary_image[address] |= value << 4
def _set_second_half_byte(self, address: int, value: int) -> None:
clean_first_half_byte(self.fatfs_state.binary_image, address)
self.fatfs_state.binary_image[address] |= value
@property
def fat_cluster_address(self) -> int:
"""Determines how many bits precede the first bit of the cluster in FAT"""
return self._cluster_id_to_logical_position_in_bits(self.id)
@property
def real_cluster_address(self) -> int:
return self.fatfs_state.start_address + self.fat_cluster_address // 8 # type: ignore
def set_in_fat(self, value: int) -> None:
"""
Sets cluster in FAT to certain value.
Firstly, we split the target value into 3 half bytes (max value is 0xfff).
Then we could encounter two situations:
1. if the cluster index (indexed from zero) is even, we set the full byte computed by
self.cluster_id_to_logical_position_in_bits and the second half of the consequent byte.
Order of half bytes is 2, 1, 3.
2. if the cluster index is odd, we set the first half of the computed byte and the full consequent byte.
Order of half bytes is 1, 3, 2.
"""
# value must fit into number of bits of the fat (12, 16 or 32)
assert value <= (1 << self.fatfs_state.fatfs_type) - 1
half_bytes = split_by_half_byte_12_bit_little_endian(value)
# hardcoded for fat 12
# IDF-4046 will extend it for fat 16
if self.fat_cluster_address % 8 == 0:
self.fatfs_state.binary_image[self.real_cluster_address] = build_byte(half_bytes[1], half_bytes[0])
self._set_second_half_byte(self.real_cluster_address + 1, half_bytes[2])
elif self.fat_cluster_address % 8 != 0:
self._set_first_half_byte(self.real_cluster_address, half_bytes[0])
self.fatfs_state.binary_image[self.real_cluster_address + 1] = build_byte(half_bytes[2], half_bytes[1])
@property
def is_root(self) -> bool:
return self.id == Cluster.ROOT_BLOCK_ID
def allocate_cluster(self) -> None:
"""
This method sets bits in FAT table to `allocated` and clean the corresponding sector(s)
"""
self.is_empty = False
self.set_in_fat(Cluster.ALLOCATED_BLOCK_VALUE)
cluster_start = self.cluster_data_address
dir_size = self.fatfs_state.get_dir_size(self.is_root)
cluster_end = cluster_start + dir_size
self.fatfs_state.binary_image[cluster_start:cluster_end] = dir_size * b'\x00'

View File

@ -0,0 +1,127 @@
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from typing import Any, Optional
from construct import Const, Int8ul, Int16ul, Int32ul, PaddedString, Struct
from .exceptions import LowerCaseException, TooLongNameException
from .fatfs_state import FATFSState
from .utils import is_valid_fatfs_name, pad_string
class Entry:
"""
The class Entry represents entry of the directory.
"""
ATTR_READ_ONLY = 0x01
ATTR_HIDDEN = 0x02
ATTR_SYSTEM = 0x04
ATTR_VOLUME_ID = 0x08
ATTR_DIRECTORY = 0x10
ATTR_ARCHIVE = 0x20
MAX_NAME_SIZE_S = 8
MAX_EXT_SIZE_S = 3
ENTRY_FORMAT_SHORT_NAME = Struct(
'DIR_Name' / PaddedString(MAX_NAME_SIZE_S, 'utf-8'),
'DIR_Name_ext' / PaddedString(MAX_EXT_SIZE_S, 'utf-8'),
'DIR_Attr' / Int8ul,
'DIR_NTRes' / Const(b'\x00'),
'DIR_CrtTimeTenth' / Const(b'\x00'),
'DIR_CrtTime' / Const(b'\x01\x00'),
'DIR_CrtDate' / Const(b'\x21\x00'),
'DIR_LstAccDate' / Const(b'\x00\x00'),
'DIR_FstClusHI' / Const(b'\x00\x00'),
'DIR_WrtTime' / Const(b'\x01\x00'),
'DIR_WrtDate' / Const(b'\x01\x00'),
'DIR_FstClusLO' / Int16ul,
'DIR_FileSize' / Int32ul,
)
# IDF-4044
ENTRY_FORMAT_LONG_NAME = Struct()
def __init__(self,
entry_id: int,
parent_dir_entries_address: int,
fatfs_state: FATFSState) -> None:
self.fatfs_state = fatfs_state
self.id = entry_id
self.entry_address = parent_dir_entries_address + self.id * self.fatfs_state.entry_size
self._is_alias = False
self._is_empty = True
@property
def is_empty(self) -> bool:
return self._is_empty
def _parse_entry(self, entry_bytearray: Optional[bytearray]) -> dict:
if self.fatfs_state.long_names_enabled:
return Entry.ENTRY_FORMAT_LONG_NAME.parse(entry_bytearray) # type: ignore
return Entry.ENTRY_FORMAT_SHORT_NAME.parse(entry_bytearray) # type: ignore
def _build_entry(self, **kwargs) -> Any: # type: ignore
if self.fatfs_state.long_names_enabled:
return Entry.ENTRY_FORMAT_LONG_NAME.build(dict(**kwargs))
return Entry.ENTRY_FORMAT_SHORT_NAME.build(dict(**kwargs))
@property
def entry_bytes(self) -> Any:
return self.fatfs_state.binary_image[self.entry_address: self.entry_address + self.fatfs_state.entry_size]
@entry_bytes.setter
def entry_bytes(self, value: int) -> None:
self.fatfs_state.binary_image[self.entry_address: self.entry_address + self.fatfs_state.entry_size] = value
def _clean_entry(self) -> None:
self.entry_bytes = self.fatfs_state.entry_size * b'\x00'
def allocate_entry(self,
first_cluster_id: int,
entity_name: str,
entity_type: int,
entity_extension: str = '',
size: int = 0) -> None:
"""
:param first_cluster_id: id of the first data cluster for given entry
:param entity_name: name recorded in the entry
:param entity_extension: extension recorded in the entry
:param size: size of the content of the file
:param entity_type: type of the entity (file [0x20] or directory [0x10])
:returns: None
:raises LowerCaseException: In case when long_names_enabled is set to False and filename exceeds 8 chars
for name or 3 chars for extension the exception is raised
"""
if not ((is_valid_fatfs_name(entity_name) and
is_valid_fatfs_name(entity_extension)) or
self.fatfs_state.long_names_enabled):
raise LowerCaseException('Lower case is not supported because long name support is not enabled!')
# clean entry before allocation
self._clean_entry()
self._is_empty = False
object_name = entity_name.upper()
object_extension = entity_extension.upper()
# implementation of long names support will be part of IDF-4044
exceeds_short_name = len(object_name) > Entry.MAX_NAME_SIZE_S or len(object_extension) > Entry.MAX_EXT_SIZE_S
if not self.fatfs_state.long_names_enabled and exceeds_short_name:
raise TooLongNameException(
'Maximal length of the object name is 8 characters and 3 characters for extension!')
start_address = self.entry_address
end_address = start_address + self.fatfs_state.entry_size
self.fatfs_state.binary_image[start_address: end_address] = self._build_entry(
DIR_Name=pad_string(object_name, size=Entry.MAX_NAME_SIZE_S),
DIR_Name_ext=pad_string(object_extension, size=Entry.MAX_EXT_SIZE_S),
DIR_Attr=entity_type,
DIR_FstClusLO=first_cluster_id,
DIR_FileSize=size
)
def update_content_size(self, content_size: int) -> None:
parsed_entry = self._parse_entry(self.entry_bytes)
parsed_entry.DIR_FileSize = content_size # type: ignore
self.entry_bytes = Entry.ENTRY_FORMAT_SHORT_NAME.build(parsed_entry)

View File

@ -0,0 +1,33 @@
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
class WriteDirectoryException(Exception):
"""
Exception is raised when the user tries to write the content into the directory instead of file
"""
pass
class NoFreeClusterException(Exception):
"""
Exception is raised when the user tries allocate cluster but no free one is available
"""
pass
class LowerCaseException(Exception):
"""
Exception is raised when the user tries to write file or directory with lower case
"""
pass
class TooLongNameException(Exception):
"""
Exception is raised when long name support is not enabled and user tries to write file longer then allowed
"""
pass
class FatalError(Exception):
pass

View File

@ -0,0 +1,42 @@
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from .cluster import Cluster
from .exceptions import NoFreeClusterException
from .fatfs_state import FATFSState
class FAT:
"""
The FAT represents the FAT region in file system. It is responsible for storing clusters
and chaining them in case we need to extend file or directory to more clusters.
"""
def __init__(self,
fatfs_state: FATFSState,
reserved_sectors_cnt: int) -> None:
self.fatfs_state = fatfs_state
self.reserved_sectors_cnt = reserved_sectors_cnt
self.clusters = [Cluster(cluster_id=i, fatfs_state=self.fatfs_state) for i in
range(1, self.fatfs_state.max_clusters)]
# update root directory record
self.clusters[0].allocate_cluster()
# add first reserved cluster
self.clusters = [Cluster(cluster_id=Cluster.RESERVED_BLOCK_ID, fatfs_state=self.fatfs_state)] + self.clusters
def find_free_cluster(self) -> Cluster:
# finds first empty cluster and allocates it
for cluster in self.clusters:
if cluster.is_empty:
cluster.allocate_cluster()
return cluster
raise NoFreeClusterException('No free cluster available!')
def allocate_chain(self, first_cluster: Cluster, size: int) -> None:
current = first_cluster
for _ in range(size - 1):
free_cluster = self.find_free_cluster()
current.next_cluster = free_cluster
current.set_in_fat(free_cluster.id)
current = free_cluster

View File

@ -0,0 +1,98 @@
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
class FATFSState:
"""
The class represents the state and the configuration of the FATFS.
"""
FAT12_MAX_CLUSTERS = 4085
FAT16_MAX_CLUSTERS = 65525
FAT12 = 12
FAT16 = 16
FAT32 = 32
def __init__(self,
entry_size: int,
sector_size: int,
reserved_sectors_cnt: int,
root_dir_sectors_cnt: int,
size: int,
media_type: int,
sectors_per_fat: int,
sectors_per_cluster: int,
volume_label: str,
oem_name: str,
fat_tables_cnt: int,
sec_per_track: int,
num_heads: int,
hidden_sectors: int,
file_sys_type: str,
wl_sectors: int,
long_names_enabled: bool = False):
self._binary_image: bytearray = bytearray(b'')
self.fat_tables_cnt: int = fat_tables_cnt
self.oem_name: str = oem_name
self.wl_sectors_cnt: int = wl_sectors
self.file_sys_type: str = file_sys_type
self.sec_per_track: int = sec_per_track
self.hidden_sectors: int = hidden_sectors
self.volume_label: str = volume_label
self.media_type: int = media_type
self.long_names_enabled: bool = long_names_enabled
self.entry_size: int = entry_size
self.num_heads: int = num_heads
self.sector_size: int = sector_size
self.root_dir_sectors_cnt: int = root_dir_sectors_cnt
self.reserved_sectors_cnt: int = reserved_sectors_cnt
self.size: int = size
self.sectors_per_fat_cnt: int = sectors_per_fat
self.sectors_per_cluster: int = sectors_per_cluster
@property
def binary_image(self) -> bytearray:
return self._binary_image
@binary_image.setter
def binary_image(self, value: bytearray) -> None:
self._binary_image = value
def get_dir_size(self, is_root: bool) -> int:
return self.root_dir_sectors_cnt * self.sector_size if is_root else self.sector_size
@property
def start_address(self) -> int:
return self.sector_size * self.reserved_sectors_cnt
@property
def data_sectors(self) -> int:
return (self.size // self.sector_size) - self.non_data_sectors
@property
def non_data_sectors(self) -> int:
return self.reserved_sectors_cnt + self.sectors_per_fat_cnt + self.root_dir_sectors_cnt + self.wl_sectors_cnt
@property
def data_region_start(self) -> int:
return self.non_data_sectors * self.sector_size
@property
def max_clusters(self) -> int:
return self.data_sectors // self.sectors_per_cluster
@property
def root_directory_start(self) -> int:
return (self.reserved_sectors_cnt + self.sectors_per_fat_cnt) * self.sector_size
@property
def fatfs_type(self) -> int:
if self.max_clusters < FATFSState.FAT12_MAX_CLUSTERS:
return FATFSState.FAT12
elif self.max_clusters < FATFSState.FAT16_MAX_CLUSTERS:
return FATFSState.FAT16
# fat is FAT.FAT32, not supported now
raise NotImplementedError('FAT32 is currently not supported.')
@property
def entries_root_count(self) -> int:
return (self.root_dir_sectors_cnt * self.sector_size) // self.entry_size

View File

@ -0,0 +1,225 @@
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import os
from typing import List, Optional, Tuple
from .entry import Entry
from .exceptions import FatalError, WriteDirectoryException
from .fat import FAT, Cluster
from .fatfs_state import FATFSState
from .utils import required_clusters_count, split_content_into_sectors, split_to_name_and_extension
class File:
"""
The class File provides API to write into the files. It represents file in the FS.
"""
ATTR_ARCHIVE = 0x20
ENTITY_TYPE = ATTR_ARCHIVE
def __init__(self, name: str, fat: FAT, fatfs_state: FATFSState, entry: Entry, extension: str = '') -> None:
self.name = name
self.extension = extension
self.fatfs_state = fatfs_state
self.fat = fat
self.size = 0
self._first_cluster = None
self._entry = entry
@property
def entry(self) -> Entry:
return self._entry
@property
def first_cluster(self) -> Optional[Cluster]:
return self._first_cluster
@first_cluster.setter
def first_cluster(self, value: Cluster) -> None:
self._first_cluster = value
def name_equals(self, name: str, extension: str) -> bool:
return self.name == name and self.extension == extension
def write(self, content: str) -> None:
self.entry.update_content_size(len(content))
# we assume that the correct amount of clusters is allocated
current_cluster = self._first_cluster
for content_part in split_content_into_sectors(content, self.fatfs_state.sector_size):
content_as_list = content_part.encode()
if current_cluster is None:
raise FatalError('No free space left!')
address = current_cluster.cluster_data_address
self.fatfs_state.binary_image[address: address + len(content_part)] = content_as_list
current_cluster = current_cluster.next_cluster
class Directory:
"""
The Directory class provides API to add files and directories into the directory
and to find the file according to path and write it.
"""
ATTR_DIRECTORY = 0x10
ATTR_ARCHIVE = 0x20
ENTITY_TYPE = ATTR_DIRECTORY
def __init__(self,
name,
fat,
fatfs_state,
entry=None,
cluster=None,
size=None,
extension='',
parent=None):
# type: (str, FAT, FATFSState, Optional[Entry], Cluster, Optional[int], str, Directory) -> None
self.name = name
self.fatfs_state = fatfs_state
self.extension = extension
self.fat = fat
self.size = size or self.fatfs_state.sector_size
# if directory is root its parent is itself
self.parent: Directory = parent or self
self._first_cluster = cluster
# entries will be initialized after the cluster allocation
self.entries: List[Entry] = []
self.entities = [] # type: ignore
self._entry = entry # currently not in use (will use later for e.g. modification time, etc.)
@property
def is_root(self) -> bool:
return self.parent is self
@property
def first_cluster(self) -> Cluster:
return self._first_cluster
@first_cluster.setter
def first_cluster(self, value: Cluster) -> None:
self._first_cluster = value
def name_equals(self, name: str, extension: str) -> bool:
return self.name == name and self.extension == extension
def create_entries(self, cluster: Cluster) -> list:
return [Entry(entry_id=i,
parent_dir_entries_address=cluster.cluster_data_address,
fatfs_state=self.fatfs_state)
for i in range(self.size // self.fatfs_state.entry_size)]
def init_directory(self) -> None:
self.entries = self.create_entries(self._first_cluster)
if not self.is_root:
# the root directory doesn't contain link to itself nor the parent
free_entry1 = self.find_free_entry() or self.chain_directory()
free_entry1.allocate_entry(first_cluster_id=self.first_cluster.id,
entity_name='.',
entity_extension='',
entity_type=self.ENTITY_TYPE)
self.first_cluster = self._first_cluster
free_entry2 = self.find_free_entry() or self.chain_directory()
free_entry2.allocate_entry(first_cluster_id=self.parent.first_cluster.id,
entity_name='..',
entity_extension='',
entity_type=self.parent.ENTITY_TYPE)
self.parent.first_cluster = self.parent.first_cluster
def lookup_entity(self, object_name: str, extension: str): # type: ignore
for entity in self.entities:
if entity.name == object_name and entity.extension == extension:
return entity
return None
def recursive_search(self, path_as_list, current_dir): # type: ignore
name, extension = split_to_name_and_extension(path_as_list[0])
next_obj = current_dir.lookup_entity(name, extension)
if next_obj is None:
raise FileNotFoundError('No such file or directory!')
if len(path_as_list) == 1 and next_obj.name_equals(name, extension):
return next_obj
return self.recursive_search(path_as_list[1:], next_obj)
def find_free_entry(self) -> Optional[Entry]:
for entry in self.entries:
if entry.is_empty:
return entry
return None
def _extend_directory(self) -> None:
current = self.first_cluster
while current.next_cluster is not None:
current = current.next_cluster
new_cluster = self.fat.find_free_cluster()
current.set_in_fat(new_cluster.id)
current.next_cluster = new_cluster
self.entries += self.create_entries(new_cluster)
def chain_directory(self) -> Entry:
self._extend_directory()
free_entry = self.find_free_entry()
if free_entry is None:
raise FatalError('No more space left!')
return free_entry
def allocate_object(self,
name,
entity_type,
path_from_root=None,
extension=''):
# type: (str, int, Optional[List[str]], str) -> Tuple[Cluster, Entry, Directory]
"""
Method finds the target directory in the path
and allocates cluster (both the record in FAT and cluster in the data region)
and entry in the specified directory
"""
free_cluster = self.fat.find_free_cluster()
target_dir = self if not path_from_root else self.recursive_search(path_from_root, self)
free_entry = target_dir.find_free_entry() or target_dir.chain_directory()
free_entry.allocate_entry(first_cluster_id=free_cluster.id,
entity_name=name,
entity_extension=extension,
entity_type=entity_type)
return free_cluster, free_entry, target_dir
def new_file(self, name: str, extension: str, path_from_root: Optional[List[str]]) -> None:
free_cluster, free_entry, target_dir = self.allocate_object(name=name,
extension=extension,
entity_type=Directory.ATTR_ARCHIVE,
path_from_root=path_from_root)
file = File(name, fat=self.fat, extension=extension, fatfs_state=self.fatfs_state, entry=free_entry)
file.first_cluster = free_cluster
target_dir.entities.append(file)
def new_directory(self, name, parent, path_from_root):
# type: (str, Directory, Optional[List[str]]) -> None
free_cluster, free_entry, target_dir = self.allocate_object(name=name,
entity_type=Directory.ATTR_DIRECTORY,
path_from_root=path_from_root)
directory = Directory(name=name, fat=self.fat, parent=parent, fatfs_state=self.fatfs_state, entry=free_entry)
directory.first_cluster = free_cluster
directory.init_directory()
target_dir.entities.append(directory)
def write_to_file(self, path: List[str], content: str) -> None:
"""
Writes to file existing in the directory structure.
:param path: path split into the list
:param content: content as a string to be written into a file
:returns: None
:raises WriteDirectoryException: raised is the target object for writing is a directory
"""
entity_to_write = self.recursive_search(path, self)
if isinstance(entity_to_write, File):
clusters_cnt = required_clusters_count(cluster_size=self.fatfs_state.sector_size, content=content)
self.fat.allocate_chain(entity_to_write.first_cluster, clusters_cnt)
entity_to_write.write(content)
else:
raise WriteDirectoryException(f'`{os.path.join(*path)}` is a directory!')

View File

@ -0,0 +1,60 @@
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import os
import typing
from construct import Int16ul
def required_clusters_count(cluster_size: int, content: str) -> int:
# compute number of required clusters for file text
return (len(content) + cluster_size - 1) // cluster_size
def pad_string(content: str, size: typing.Optional[int] = None, pad: int = 0x20) -> str:
# cut string if longer and fill with pad character if shorter than size
return content.ljust(size or len(content), chr(pad))[:size]
def split_to_name_and_extension(full_name: str) -> typing.Tuple[str, str]:
name, extension = os.path.splitext(full_name)
return name, extension.replace('.', '')
def is_valid_fatfs_name(string: str) -> bool:
return string == string.upper()
def split_by_half_byte_12_bit_little_endian(value: int) -> typing.Tuple[int, int, int]:
value_as_bytes = Int16ul.build(value)
return value_as_bytes[0] & 0x0f, value_as_bytes[0] >> 4, value_as_bytes[1] & 0x0f
def build_byte(first_half: int, second_half: int) -> int:
return (first_half << 4) | second_half
def clean_first_half_byte(bytes_array: bytearray, address: int) -> None:
"""
the function sets to zero first four bits of the byte.
E.g. 10111100 -> 10110000
"""
bytes_array[address] &= 0xf0
def clean_second_half_byte(bytes_array: bytearray, address: int) -> None:
"""
the function sets to zero last four bits of the byte.
E.g. 10111100 -> 00001100
"""
bytes_array[address] &= 0x0f
def split_content_into_sectors(content: str, sector_size: int) -> typing.List[str]:
result = []
clusters_cnt = required_clusters_count(cluster_size=sector_size, content=content)
for i in range(clusters_cnt):
result.append(content[sector_size * i:(i + 1) * sector_size])
return result

View File

@ -0,0 +1,51 @@
# fatfs_create_partition_image
#
# Create a fatfs image of the specified directory on the host during build and optionally
# have the created image flashed using `idf.py flash`
function(fatfs_create_partition_image partition base_dir)
set(options FLASH_IN_PROJECT)
cmake_parse_arguments(arg "${options}" "" "${multi}" "${ARGN}")
idf_build_get_property(idf_path IDF_PATH)
idf_build_get_property(python PYTHON)
set(fatfsgen_py ${python} ${idf_path}/components/fatfs/fatfsgen.py)
get_filename_component(base_dir_full_path ${base_dir} ABSOLUTE)
partition_table_get_partition_info(size "--partition-name ${partition}" "size")
partition_table_get_partition_info(offset "--partition-name ${partition}" "offset")
if("${size}" AND "${offset}")
set(image_file ${CMAKE_BINARY_DIR}/${partition}.bin)
# Execute FATFS image generation; this always executes as there is no way to specify for CMake to watch for
# contents of the base dir changing.
add_custom_target(fatfs_${partition}_bin ALL
COMMAND ${fatfsgen_py} ${base_dir_full_path}
--partition_size ${size}
--output_file ${image_file}
)
set_property(DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}" APPEND PROPERTY
ADDITIONAL_MAKE_CLEAN_FILES
${image_file})
idf_component_get_property(main_args esptool_py FLASH_ARGS)
idf_component_get_property(sub_args esptool_py FLASH_SUB_ARGS)
# Last (optional) parameter is the encryption for the target. In our
# case, fatfs is not encrypt so pass FALSE to the function.
esptool_py_flash_target(${partition}-flash "${main_args}" "${sub_args}" ALWAYS_PLAINTEXT)
esptool_py_flash_to_partition(${partition}-flash "${partition}" "${image_file}")
add_dependencies(${partition}-flash fatfs_${partition}_bin)
if(arg_FLASH_IN_PROJECT)
esptool_py_flash_to_partition(flash "${partition}" "${image_file}")
add_dependencies(flash fatfs_${partition}_bin)
endif()
else()
set(message "Failed to create FATFS image for partition '${partition}'. "
"Check project configuration if using the correct partition table file.")
fail_at_build_time(fatfs_${partition}_bin "${message}")
endif()
endfunction()

View File

@ -0,0 +1,292 @@
#!/usr/bin/env python
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import os
import shutil
import sys
import unittest
from typing import Any, Dict, Union
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
import fatfsgen # noqa E402
from fatfsgen_utils.exceptions import WriteDirectoryException # noqa E402
from fatfsgen_utils.exceptions import LowerCaseException, NoFreeClusterException, TooLongNameException # noqa E402
class FatFSGen(unittest.TestCase):
CFG = dict(
sector_size=4096,
entry_size=32,
fat_start=0x1000,
data_start=0x7000,
root_start=0x2000,
output_file=os.path.join('output_data', 'tmp_file.img'),
test_dir=os.path.join('output_data', 'test'),
test_dir2=os.path.join('output_data', 'tst_str'),
) # type: Union[Dict[str, Any]]
def setUp(self) -> None:
os.makedirs('output_data')
self.generate_test_dir_1()
self.generate_test_dir_2()
def tearDown(self) -> None:
shutil.rmtree('output_data')
@staticmethod
def generate_test_dir_1() -> None:
os.makedirs(os.path.join(FatFSGen.CFG['test_dir'], 'test', 'test'))
with open(os.path.join(FatFSGen.CFG['test_dir'], 'test', 'test', 'lastfile'), 'w') as file:
file.write('deeptest\n')
with open(os.path.join(FatFSGen.CFG['test_dir'], 'test', 'testfil2'), 'w') as file:
file.write('thisistest\n')
with open(os.path.join(FatFSGen.CFG['test_dir'], 'testfile'), 'w') as file:
file.write('ahoj\n')
@staticmethod
def generate_test_dir_2() -> None:
os.makedirs(os.path.join(FatFSGen.CFG['test_dir2'], 'test', 'test'))
with open(os.path.join(FatFSGen.CFG['test_dir2'], 'test', 'test', 'lastfile.txt'), 'w') as file:
file.write('deeptest\n')
with open(os.path.join(FatFSGen.CFG['test_dir2'], 'test', 'testfil2'), 'w') as file:
file.write('thisistest\n')
with open(os.path.join(FatFSGen.CFG['test_dir2'], 'testfile'), 'w') as file:
file.write('ahoj\n')
def test_empty_file_sn_fat12(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_file('TESTFILE')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
self.assertEqual(file_system[0x2000:0x200c], b'TESTFILE \x20') # check entry name and type
self.assertEqual(file_system[0x1000:0x1006], b'\xf8\xff\xff\xff\x0f\x00') # check fat
def test_directory_sn_fat12(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_directory('TESTFOLD')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
self.assertEqual(file_system[0x2000:0x200c], b'TESTFOLD \x10') # check entry name and type
self.assertEqual(file_system[0x1000:0x1006], b'\xf8\xff\xff\xff\x0f\x00') # check fat
self.assertEqual(file_system[0x6000:0x600c], b'. \x10') # reference to itself
self.assertEqual(file_system[0x6020:0x602c], b'.. \x10') # reference to parent
def test_empty_file_with_extension_sn_fat12(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_file('TESTF', extension='TXT')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
self.assertEqual(file_system[0x2000:0x200c], b'TESTF TXT\x20') # check entry name and type
self.assertEqual(file_system[0x1000:0x1006], b'\xf8\xff\xff\xff\x0f\x00') # check fat
def test_write_to_file_with_extension_sn_fat12(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_file('WRITEF', extension='TXT')
fatfs.write_content(path_from_root=['WRITEF.TXT'], content='testcontent')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
self.assertEqual(file_system[0x2000:0x200c], b'WRITEF TXT\x20') # check entry name and type
self.assertEqual(file_system[0x201a:0x2020], b'\x02\x00\x0b\x00\x00\x00') # check size and cluster ref
self.assertEqual(file_system[0x1000:0x1006], b'\xf8\xff\xff\xff\x0f\x00') # check fat
self.assertEqual(file_system[0x6000:0x600f], b'testcontent\x00\x00\x00\x00') # check file content
def test_write_to_file_in_folder_sn_fat12(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_directory('TESTFOLD')
fatfs.create_file('WRITEF', extension='TXT', path_from_root=['TESTFOLD'])
fatfs.write_content(path_from_root=['TESTFOLD', 'WRITEF.TXT'], content='testcontent')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
self.assertEqual(file_system[0x2000:0x200c], b'TESTFOLD \x10')
self.assertEqual(
file_system[0x1000:0x1010],
b'\xf8\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0x6040:0x6050], b'WRITEF TXT\x20\x00\x00\x01\x00')
self.assertEqual(file_system[0x605a:0x6060], b'\x03\x00\x0b\x00\x00\x00')
self.assertEqual(file_system[0x7000:0x700b], b'testcontent') # check file content
def test_cluster_setting_values(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_file('TESTFIL1')
fatfs.create_file('TESTFIL2')
fatfs.create_file('TESTFIL3')
fatfs.create_file('TESTFIL4')
fatfs.create_file('TESTFIL5')
fatfs.fat.clusters[2].set_in_fat(1000)
fatfs.fat.clusters[3].set_in_fat(4)
fatfs.fat.clusters[4].set_in_fat(5)
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
self.assertEqual(
file_system[0x1000:0x1010],
b'\xf8\xff\xff\xe8\x43\x00\x05\xf0\xff\xff\x0f\x00\x00\x00\x00\x00')
def test_full_sector_file(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_file('WRITEF', extension='TXT')
fatfs.write_content(path_from_root=['WRITEF.TXT'], content=FatFSGen.CFG['sector_size'] * 'a')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
self.assertEqual(file_system[0x1000: 0x100e], b'\xf8\xff\xff\xff\x0f\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0x6000: 0x7000], FatFSGen.CFG['sector_size'] * b'a')
def test_file_chaining(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_file('WRITEF', extension='TXT')
fatfs.write_content(path_from_root=['WRITEF.TXT'], content=FatFSGen.CFG['sector_size'] * 'a' + 'a')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
self.assertEqual(file_system[0x1000: 0x100e], b'\xf8\xff\xff\x03\xf0\xff\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0x7000: 0x8000], b'a' + (FatFSGen.CFG['sector_size'] - 1) * b'\x00')
def test_full_sector_folder(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_directory('TESTFOLD')
for i in range(FatFSGen.CFG['sector_size'] // FatFSGen.CFG['entry_size']):
fatfs.create_file(f'A{str(i).upper()}', path_from_root=['TESTFOLD'])
fatfs.write_content(path_from_root=['TESTFOLD', 'A0'], content='first')
fatfs.write_content(path_from_root=['TESTFOLD', 'A126'], content='later')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
self.assertEqual(file_system[0x1000: 0x10d0],
b'\xf8\xff\xff\x82\xf0\xff' + 192 * b'\xff' + 10 * b'\x00')
self.assertEqual(file_system[0x85000:0x85005], b'later')
self.assertEqual(file_system[0x86000:0x86010], b'A126 \x00\x00\x01\x00')
self.assertEqual(file_system[0x86020:0x86030], b'A127 \x00\x00\x01\x00')
def test_write_to_folder_in_folder_sn_fat12(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_directory('TESTFOLD')
fatfs.create_directory('TESTFOLL', path_from_root=['TESTFOLD'])
self.assertRaises(WriteDirectoryException, fatfs.write_content, path_from_root=['TESTFOLD', 'TESTFOLL'],
content='testcontent')
def test_write_non_existing_file_in_folder_sn_fat12(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_directory('TESTFOLD')
self.assertRaises(FileNotFoundError, fatfs.write_content, path_from_root=['TESTFOLD', 'AHOJ'],
content='testcontent')
@staticmethod
def create_too_many_files() -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_directory('TESTFOLD')
for i in range(2 * FatFSGen.CFG['sector_size'] // FatFSGen.CFG['entry_size']):
fatfs.create_file(f'A{str(i).upper()}', path_from_root=['TESTFOLD'])
def test_too_many_files(self) -> None:
self.assertRaises(NoFreeClusterException, self.create_too_many_files)
def test_full_two_sectors_folder(self) -> None:
fatfs = fatfsgen.FATFS(size=2 * 1024 * 1024)
fatfs.create_directory('TESTFOLD')
for i in range(2 * FatFSGen.CFG['sector_size'] // FatFSGen.CFG['entry_size']):
fatfs.create_file(f'A{str(i).upper()}', path_from_root=['TESTFOLD'])
fatfs.write_content(path_from_root=['TESTFOLD', 'A253'], content='later')
fatfs.write_content(path_from_root=['TESTFOLD', 'A255'], content='last')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
self.assertEqual(file_system[0x105000:0x105010], b'later\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0x108000:0x108010], b'last\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
def test_lower_case_dir_short_names(self) -> None:
fatfs = fatfsgen.FATFS()
self.assertRaises(LowerCaseException, fatfs.create_directory, 'testfold')
def test_lower_case_file_short_names(self) -> None:
fatfs = fatfsgen.FATFS()
self.assertRaises(LowerCaseException, fatfs.create_file, 'newfile')
def test_too_long_name_dir_short_names(self) -> None:
fatfs = fatfsgen.FATFS()
self.assertRaises(TooLongNameException, fatfs.create_directory, 'TOOLONGNAME')
def test_fatfs16_detection(self) -> None:
fatfs = fatfsgen.FATFS(size=16 * 1024 * 1024)
self.assertEqual(fatfs.state.fatfs_type, 16)
def test_fatfs32_detection(self) -> None:
self.assertRaises(NotImplementedError, fatfsgen.FATFS, size=256 * 1024 * 1024)
def test_deep_structure(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_directory('TESTFOLD')
fatfs.create_directory('TESTFOLL', path_from_root=['TESTFOLD'])
fatfs.create_directory('TESTFOLO', path_from_root=['TESTFOLD', 'TESTFOLL'])
fatfs.create_file('WRITEF', extension='TXT', path_from_root=['TESTFOLD', 'TESTFOLL', 'TESTFOLO'])
fatfs.write_content(path_from_root=['TESTFOLD', 'TESTFOLL', 'TESTFOLO', 'WRITEF.TXT'], content='later')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
self.assertEqual(file_system[0x9000:0x9010], b'later\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
def test_same_name_deep_structure(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_directory('TESTFOLD')
fatfs.create_directory('TESTFOLD', path_from_root=['TESTFOLD'])
fatfs.create_directory('TESTFOLD', path_from_root=['TESTFOLD', 'TESTFOLD'])
fatfs.create_file('WRITEF', extension='TXT', path_from_root=['TESTFOLD', 'TESTFOLD', 'TESTFOLD'])
fatfs.write_content(path_from_root=['TESTFOLD', 'TESTFOLD', 'TESTFOLD', 'WRITEF.TXT'], content='later')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
self.assertEqual(file_system[0x2000:0x2010], b'TESTFOLD \x10\x00\x00\x01\x00')
self.assertEqual(file_system[0x2010:0x2020], b'!\x00\x00\x00\x00\x00\x01\x00\x01\x00\x02\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0x6040:0x6050], b'TESTFOLD \x10\x00\x00\x01\x00')
self.assertEqual(file_system[0x6040:0x6050], b'TESTFOLD \x10\x00\x00\x01\x00')
self.assertEqual(file_system[0x7040:0x7050], b'TESTFOLD \x10\x00\x00\x01\x00')
self.assertEqual(file_system[0x8040:0x8050], b'WRITEF TXT \x00\x00\x01\x00')
self.assertEqual(file_system[0x9000:0x9010], b'later\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
def test_e2e_deep_folder_into_image(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.generate(FatFSGen.CFG['test_dir'])
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
self.assertEqual(file_system[0x6060:0x6070], b'TESTFIL2 \x00\x00\x01\x00')
self.assertEqual(file_system[0x6070:0x6080], b'!\x00\x00\x00\x00\x00\x01\x00\x01\x00\x05\x00\x0b\x00\x00\x00')
self.assertEqual(file_system[0x7040:0x7050], b'LASTFILE \x00\x00\x01\x00')
self.assertEqual(file_system[0x8000:0x8010], b'deeptest\n\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0x9000:0x9010], b'thisistest\n\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0xa000:0xa010], b'ahoj\n\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
def test_e2e_deep_folder_into_image_ext(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.generate(FatFSGen.CFG['test_dir2'])
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
file_system = fatfs.read_filesystem(FatFSGen.CFG['output_file'])
self.assertEqual(file_system[0x2020:0x2030], b'TESTFILE \x00\x00\x01\x00')
self.assertEqual(file_system[0x6060:0x6070], b'TESTFIL2 \x00\x00\x01\x00')
self.assertEqual(file_system[0x7000:0x7010], b'. \x10\x00\x00\x01\x00')
self.assertEqual(file_system[0x7040:0x7050], b'LASTFILETXT \x00\x00\x01\x00')
self.assertEqual(file_system[0x8000:0x8010], b'deeptest\n\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0x9000:0x9010], b'thisistest\n\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0xa000:0xa010], b'ahoj\n\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0xb000:0xb009], b'\xff\xff\xff\xff\xff\xff\xff\xff\xff')
if __name__ == '__main__':
unittest.main()

View File

@ -84,3 +84,37 @@ They provide implementation of disk I/O functions for SD/MMC cards and can be re
.. doxygenfunction:: ff_diskio_register_wl_partition
.. doxygenfunction:: ff_diskio_register_raw_partition
FATFS partition generator
-------------------------
We provide partition generator for FATFS (:component_file:`fatfsgen.py<fatfs/fatfsgen.py>`)
which is integrated into the build system and could be easily used in the user project.
The tool is used to create filesystem images on a host and populate it with content of the specified host folder.
Current implementation supports short file names, FAT12 and read-only mode
(because the wear levelling is not implemented yet). The WL, long file names, and FAT16 are subjects of future work.
Build system integration with FATFS partition generator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
It is possible to invoke FATFS generator directly from the CMake build system by calling ``fatfs_create_partition_image``::
fatfs_create_partition_image(<partition> <base_dir> [FLASH_IN_PROJECT])
``fatfs_create_partition_image`` must be called from project's CMakeLists.txt.
The arguments of the function are as follows:
1. partition - the name of the partition, you can define in partition table (e.g. :example_file:`storage/fatfsgen/partitions_example.csv`)
2. base_dir - the directory that will be encoded to FATFS partition and optionally flashed into the device. Beware that you have to specified suitable size of the partition in the partition table.
3. flag ``FLASH_IN_PROJECT`` - optionally, user can opt to have the image automatically flashed together with the app binaries, partition tables, etc. on ``idf.py flash -p <PORT>`` by specifying ``FLASH_IN_PROJECT``.
For example::
fatfs_create_partition_image(my_fatfs_partition my_folder FLASH_IN_PROJECT)
If FLASH_IN_PROJECT is not specified, the image will still be generated, but you will have to flash it manually using ``esptool.py`` or a custom build system target.
For an example, see :example:`storage/fatfsgen`.

View File

@ -0,0 +1,6 @@
# The following lines of boilerplate have to be in your project's CMakeLists
# in this exact order for cmake to work correctly
cmake_minimum_required(VERSION 3.5)
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
project(fatfsgen)

View File

@ -0,0 +1,56 @@
# FATFS partition generation on build example
(See the README.md file in the upper level 'examples' directory for more information about examples.)
This example demonstrates how to use the FATFS partition
generation tool [fatfsgen.py](../../../components/fatfs/fatfsgen.py) to automatically create a FATFS
filesystem image (without wear levelling support)
from the contents of a host folder during build, with an option of
automatically flashing the created image on invocation of `idf.py -p PORT flash`.
Beware that the minimal required size of the flash is 4 MB.
The generated partition does not support wear levelling,
so it can be mounted only in read-only mode.
The following gives an overview of the example:
1. There is a directory `fatfs_image` from which the FATFS filesystem image will be created.
2. The function `fatfs_create_partition_image` is used to specify that a FATFS image
should be created during build for the `storage` partition. For CMake, it is called from [the main component's CMakeLists.txt](./main/CMakeLists.txt).
`FLASH_IN_PROJECT` specifies that the created image
should be flashed on invocation of `idf.py -p PORT flash` together with app, bootloader, partition table, etc.
The image is created on the example's build directory with the output filename `storage.bin`.
3. Upon invocation of `idf.py -p PORT flash monitor`, application loads and
finds there is already a valid FATFS filesystem in the `storage` partition with files same as those in `fatfs_image` directory. The application is then
able to read those files.
## How to use example
### Build and flash
To run the example, type the following command:
```CMake
# CMake
idf.py -p PORT flash monitor
```
(To exit the serial monitor, type ``Ctrl-]``.)
See the Getting Started Guide for full steps to configure and use ESP-IDF to build projects.
## Example output
Here is the example's console output:
```
...
I (322) example: Mounting FAT filesystem
I (332) example: Reading file
I (332) example: Read from file: 'this is test'
I (332) example: Unmounting FAT filesystem
I (342) example: Done
```
The logic of the example is contained in a [single source file](./main/fatfsgen_example_main.c), and it should be relatively simple to match points in its execution with the log outputs above.

View File

@ -0,0 +1 @@
this file is test as well

View File

@ -0,0 +1 @@
this is test

View File

@ -0,0 +1,20 @@
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: CC0
import ttfw_idf
@ttfw_idf.idf_example_test(env_tag='Example_GENERIC')
def test_examples_fatfsgen(env, _): # type: ignore
dut = env.get_dut('fatfsgen', 'examples/storage/fatfsgen')
dut.start_app()
dut.expect_all('example: Mounting FAT filesystem',
'example: Reading file',
'example: Read from file: \'this is test\'',
'example: Unmounting FAT filesystem',
'example: Done',
timeout=20)
if __name__ == '__main__':
test_examples_fatfsgen()

View File

@ -0,0 +1,8 @@
idf_component_register(SRCS "fatfsgen_example_main.c"
INCLUDE_DIRS ".")
# Create a FATFS image from the contents of the 'fatfs_image' directory
# that fits the partition named 'storage'. FLASH_IN_PROJECT indicates that
# the generated image should be flashed when the entire project is flashed to
# the target with 'idf.py -p PORT flash'.
fatfs_create_partition_image(storage ../fatfs_image FLASH_IN_PROJECT)

View File

@ -0,0 +1,58 @@
/*
* SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: CC0
*/
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "esp_vfs.h"
#include "esp_vfs_fat.h"
#include "esp_system.h"
#include "sdkconfig.h"
static const char *TAG = "example";
// Mount path for the partition
const char *base_path = "/spiflash";
void app_main(void)
{
ESP_LOGI(TAG, "Mounting FAT filesystem");
// To mount device we need name of device partition, define base_path
// and allow format partition in case if it is new one and was not formatted before
const esp_vfs_fat_mount_config_t mount_config = {
.max_files = 4,
.format_if_mount_failed = false,
.allocation_unit_size = CONFIG_WL_SECTOR_SIZE
};
esp_err_t err = esp_vfs_fat_rawflash_mount(base_path, "storage", &mount_config);
if (err != ESP_OK) {
ESP_LOGE(TAG, "Failed to mount FATFS (%s)", esp_err_to_name(err));
return;
}
// Open file for reading
ESP_LOGI(TAG, "Reading file");
FILE *f = fopen("/spiflash/sub/test.txt", "rb");
if (f == NULL) {
ESP_LOGE(TAG, "Failed to open file for reading");
return;
}
char line[128];
fgets(line, sizeof(line), f);
fclose(f);
// strip newline
char *pos = strchr(line, '\n');
if (pos) {
*pos = '\0';
}
ESP_LOGI(TAG, "Read from file: '%s'", line);
// Unmount FATFS
ESP_LOGI(TAG, "Unmounting FAT filesystem");
ESP_ERROR_CHECK( esp_vfs_fat_rawflash_unmount(base_path, "storage"));
ESP_LOGI(TAG, "Done");
}

View File

@ -0,0 +1,6 @@
# Name, Type, SubType, Offset, Size, Flags
# Note: if you have increased the bootloader size, make sure to update the offsets to avoid overlap
nvs, data, nvs, 0x9000, 0x6000,
phy_init, data, phy, 0xf000, 0x1000,
factory, app, factory, 0x10000, 1M,
storage, data, fat, , 1M,
1 # Name, Type, SubType, Offset, Size, Flags
2 # Note: if you have increased the bootloader size, make sure to update the offsets to avoid overlap
3 nvs, data, nvs, 0x9000, 0x6000,
4 phy_init, data, phy, 0xf000, 0x1000,
5 factory, app, factory, 0x10000, 1M,
6 storage, data, fat, , 1M,

View File

@ -0,0 +1,4 @@
CONFIG_PARTITION_TABLE_CUSTOM=y
CONFIG_PARTITION_TABLE_CUSTOM_FILENAME="partitions_example.csv"
CONFIG_PARTITION_TABLE_FILENAME="partitions_example.csv"
CONFIG_ESPTOOLPY_FLASHSIZE_4MB=y

View File

@ -7,6 +7,8 @@ components/espcoredump/espcoredump.py
components/espcoredump/test/test_espcoredump.py
components/espcoredump/test/test_espcoredump.sh
components/espcoredump/test_apps/build_espcoredump.sh
components/fatfs/fatfsgen.py
components/fatfs/test_fatfsgen/test_fatfsgen.py
components/heap/test_multi_heap_host/test_all_configs.sh
components/mbedtls/esp_crt_bundle/gen_crt_bundle.py
components/mbedtls/esp_crt_bundle/test_gen_crt_bundle/test_gen_crt_bundle.py