Added FATFS partition parser

This commit is contained in:
Martin Gaňo 2022-04-04 15:33:00 +02:00 committed by Martin Gaňo
parent 0a511e576e
commit 005af75da3
17 changed files with 699 additions and 308 deletions

View File

@ -142,6 +142,7 @@ test_fatfsgen_on_host:
- cd components/fatfs/test_fatfsgen/
- ./test_fatfsgen.py
- ./test_wl_fatfsgen.py
- ./test_fatfsparse.py
test_multi_heap_on_host:
extends: .host_test_template

View File

@ -6,12 +6,12 @@ import os
from datetime import datetime
from typing import Any, List, Optional
from fatfsgen_utils.boot_sector import BootSector
from fatfsgen_utils.fat import FAT
from fatfsgen_utils.fatfs_parser import FATFSParser
from fatfsgen_utils.fatfs_state import FATFSState
from fatfsgen_utils.fs_object import Directory
from fatfsgen_utils.utils import (BYTES_PER_DIRECTORY_ENTRY, FAT32, FATFS_INCEPTION, generate_4bytes_random,
get_args_for_partition_generator, pad_string, read_filesystem)
from fatfsgen_utils.utils import (BYTES_PER_DIRECTORY_ENTRY, FATFS_INCEPTION, FATDefaults,
get_args_for_partition_generator, read_filesystem)
class FATFS:
@ -22,32 +22,32 @@ class FATFS:
def __init__(self,
binary_image_path: Optional[str] = None,
size: int = 1024 * 1024,
reserved_sectors_cnt: int = 1,
fat_tables_cnt: int = 1,
sectors_per_cluster: int = 1,
sector_size: int = 0x1000,
sectors_per_fat: int = 1,
hidden_sectors: int = 0,
size: int = FATDefaults.SIZE,
reserved_sectors_cnt: int = FATDefaults.RESERVED_SECTORS_COUNT,
fat_tables_cnt: int = FATDefaults.FAT_TABLES_COUNT,
sectors_per_cluster: int = FATDefaults.SECTORS_PER_CLUSTER,
sector_size: int = FATDefaults.SECTOR_SIZE,
sectors_per_fat: int = FATDefaults.SECTORS_PER_FAT,
hidden_sectors: int = FATDefaults.HIDDEN_SECTORS,
long_names_enabled: bool = False,
use_default_datetime: bool = True,
entry_size: int = 32,
num_heads: int = 0xff,
oem_name: str = 'MSDOS5.0',
sec_per_track: int = 0x3f,
volume_label: str = 'Espressif',
file_sys_type: str = 'FAT',
root_entry_count: int = 512,
num_heads: int = FATDefaults.NUM_HEADS,
oem_name: str = FATDefaults.OEM_NAME,
sec_per_track: int = FATDefaults.SEC_PER_TRACK,
volume_label: str = FATDefaults.VOLUME_LABEL,
file_sys_type: str = FATDefaults.FILE_SYS_TYPE,
root_entry_count: int = FATDefaults.ROOT_ENTRIES_COUNT,
explicit_fat_type: int = None,
media_type: int = 0xf8) -> None:
media_type: int = FATDefaults.MEDIA_TYPE) -> None:
# root directory bytes should be aligned by sector size
assert (root_entry_count * BYTES_PER_DIRECTORY_ENTRY) % sector_size == 0
# number of bytes in the root dir must be even multiple of BPB_BytsPerSec
assert ((root_entry_count * BYTES_PER_DIRECTORY_ENTRY) // sector_size) % 2 == 0
root_dir_sectors_cnt: int = (root_entry_count * BYTES_PER_DIRECTORY_ENTRY) // sector_size
self.state: FATFSState = FATFSState(entry_size=entry_size,
sector_size=sector_size,
self.state: FATFSState = FATFSState(sector_size=sector_size,
explicit_fat_type=explicit_fat_type,
reserved_sectors_cnt=reserved_sectors_cnt,
root_dir_sectors_cnt=root_dir_sectors_cnt,
@ -68,10 +68,11 @@ class FATFS:
read_filesystem(binary_image_path) if binary_image_path else self.create_empty_fatfs())
self.state.binary_image = binary_image
self.fat: FAT = FAT(fatfs_state=self.state, reserved_sectors_cnt=self.state.reserved_sectors_cnt)
self.fat: FAT = FAT(boot_sector_state=self.state.boot_sector_state, init_=True)
root_dir_size = self.state.boot_sector_state.root_dir_sectors_cnt * self.state.boot_sector_state.sector_size
self.root_directory: Directory = Directory(name='A', # the name is not important, must be string
size=self.state.root_dir_sectors_cnt * self.state.sector_size,
size=root_dir_size,
fat=self.fat,
cluster=self.fat.clusters[1],
fatfs_state=self.state)
@ -107,33 +108,9 @@ class FATFS:
self.root_directory.write_to_file(path_from_root, content)
def create_empty_fatfs(self) -> Any:
sectors_count = self.state.size // self.state.sector_size
volume_uuid = generate_4bytes_random()
return (
FATFSParser.BOOT_SECTOR_HEADER.build(
dict(BS_OEMName=pad_string(self.state.oem_name, size=FATFSParser.MAX_OEM_NAME_SIZE),
BPB_BytsPerSec=self.state.sector_size,
BPB_SecPerClus=self.state.sectors_per_cluster,
BPB_RsvdSecCnt=self.state.reserved_sectors_cnt,
BPB_NumFATs=self.state.fat_tables_cnt,
BPB_RootEntCnt=self.state.entries_root_count,
BPB_TotSec16=0x00 if self.state.fatfs_type == FAT32 else sectors_count,
BPB_Media=self.state.media_type,
BPB_FATSz16=self.state.sectors_per_fat_cnt,
BPB_SecPerTrk=self.state.sec_per_track,
BPB_NumHeads=self.state.num_heads,
BPB_HiddSec=self.state.hidden_sectors,
BPB_TotSec32=sectors_count if self.state.fatfs_type == FAT32 else 0x00,
BS_VolID=volume_uuid,
BS_VolLab=pad_string(self.state.volume_label, size=FATFSParser.MAX_VOL_LAB_SIZE),
BS_FilSysType=pad_string(self.state.file_sys_type, size=FATFSParser.MAX_FS_TYPE_SIZE)
)
)
+ (self.state.sector_size - FATFSParser.BOOT_HEADER_SIZE) * b'\x00'
+ self.state.sectors_per_fat_cnt * self.state.fat_tables_cnt * self.state.sector_size * b'\x00'
+ self.state.root_dir_sectors_cnt * self.state.sector_size * b'\x00'
+ self.state.data_sectors * self.state.sector_size * b'\xff'
)
boot_sector_ = BootSector(boot_sector_state=self.state.boot_sector_state)
boot_sector_.generate_boot_sector()
return boot_sector_.binary_image
def write_filesystem(self, output_path: str) -> None:
with open(output_path, 'wb') as output:
@ -145,10 +122,10 @@ class FATFS:
is_dir: bool = False) -> None:
"""
Given path to folder and folder name recursively encodes folder into binary image.
Used by method generate
Used by method generate.
"""
real_path = os.path.join(folder_path, folder_relative_path)
smaller_path = folder_relative_path
real_path: str = os.path.join(folder_path, folder_relative_path)
lower_path: str = folder_relative_path
folder_relative_path = folder_relative_path.upper()
@ -175,7 +152,7 @@ class FATFS:
# sorting files for better testability
dir_content = list(sorted(os.listdir(real_path)))
for path in dir_content:
self._generate_partition_from_folder(os.path.join(smaller_path, path), folder_path=folder_path)
self._generate_partition_from_folder(os.path.join(lower_path, path), folder_path=folder_path)
def generate(self, input_directory: str) -> None:
"""

View File

@ -0,0 +1,150 @@
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from inspect import getmembers, isroutine
from typing import Optional
from construct import Const, Int8ul, Int16ul, Int32ul, PaddedString, Struct
from .exceptions import InconsistentFATAttributes, NotInitialized
from .fatfs_state import BootSectorState
from .utils import (ALLOWED_SECTOR_SIZES, ALLOWED_SECTORS_PER_CLUSTER, EMPTY_BYTE, FAT32, FULL_BYTE,
SHORT_NAMES_ENCODING, FATDefaults, generate_4bytes_random, pad_string)
class BootSector:
MAX_VOL_LAB_SIZE = 11
MAX_OEM_NAME_SIZE = 8
MAX_FS_TYPE_SIZE = 8
# the FAT specification defines 512 bytes for the boot sector header
BOOT_HEADER_SIZE = 512
BOOT_SECTOR_HEADER = Struct(
'BS_jmpBoot' / Const(b'\xeb\xfe\x90'),
'BS_OEMName' / PaddedString(MAX_OEM_NAME_SIZE, SHORT_NAMES_ENCODING),
'BPB_BytsPerSec' / Int16ul,
'BPB_SecPerClus' / Int8ul,
'BPB_RsvdSecCnt' / Int16ul,
'BPB_NumFATs' / Int8ul,
'BPB_RootEntCnt' / Int16ul,
'BPB_TotSec16' / Int16ul,
'BPB_Media' / Int8ul,
'BPB_FATSz16' / Int16ul, # for FAT32 always zero, for FAT12/FAT16 number of sectors per FAT
'BPB_SecPerTrk' / Int16ul,
'BPB_NumHeads' / Int16ul,
'BPB_HiddSec' / Int32ul,
'BPB_TotSec32' / Int32ul,
'BS_DrvNum' / Const(b'\x80'),
'BS_Reserved1' / Const(EMPTY_BYTE),
'BS_BootSig' / Const(b'\x29'),
'BS_VolID' / Int32ul,
'BS_VolLab' / PaddedString(MAX_VOL_LAB_SIZE, SHORT_NAMES_ENCODING),
'BS_FilSysType' / PaddedString(MAX_FS_TYPE_SIZE, SHORT_NAMES_ENCODING),
'BS_EMPTY' / Const(448 * EMPTY_BYTE),
'Signature_word' / Const(FATDefaults.SIGNATURE_WORD)
)
assert BOOT_SECTOR_HEADER.sizeof() == BOOT_HEADER_SIZE
def __init__(self, boot_sector_state: Optional[BootSectorState] = None) -> None:
self._parsed_header = None
self.boot_sector_state: BootSectorState = boot_sector_state
def generate_boot_sector(self) -> None:
boot_sector_state: BootSectorState = self.boot_sector_state
if boot_sector_state is None:
raise NotInitialized('The BootSectorState instance is not initialized!')
volume_uuid = generate_4bytes_random()
pad_header: bytes = (boot_sector_state.sector_size - BootSector.BOOT_HEADER_SIZE) * EMPTY_BYTE
data_content: bytes = boot_sector_state.data_sectors * boot_sector_state.sector_size * FULL_BYTE
root_dir_content: bytes = boot_sector_state.root_dir_sectors_cnt * boot_sector_state.sector_size * EMPTY_BYTE
fat_tables_content: bytes = (boot_sector_state.sectors_per_fat_cnt
* boot_sector_state.fat_tables_cnt
* boot_sector_state.sector_size
* EMPTY_BYTE)
self.boot_sector_state.binary_image = (
BootSector.BOOT_SECTOR_HEADER.build(
dict(BS_OEMName=pad_string(boot_sector_state.oem_name, size=BootSector.MAX_OEM_NAME_SIZE),
BPB_BytsPerSec=boot_sector_state.sector_size,
BPB_SecPerClus=boot_sector_state.sectors_per_cluster,
BPB_RsvdSecCnt=boot_sector_state.reserved_sectors_cnt,
BPB_NumFATs=boot_sector_state.fat_tables_cnt,
BPB_RootEntCnt=boot_sector_state.entries_root_count,
# if fat type is 12 or 16 BPB_TotSec16 is filled and BPB_TotSec32 is 0x00 and vice versa
BPB_TotSec16=0x00 if boot_sector_state.fatfs_type == FAT32 else boot_sector_state.sectors_count,
BPB_Media=boot_sector_state.media_type,
BPB_FATSz16=boot_sector_state.sectors_per_fat_cnt,
BPB_SecPerTrk=boot_sector_state.sec_per_track,
BPB_NumHeads=boot_sector_state.num_heads,
BPB_HiddSec=boot_sector_state.hidden_sectors,
BPB_TotSec32=boot_sector_state.sectors_count if boot_sector_state.fatfs_type == FAT32 else 0x00,
BS_VolID=volume_uuid,
BS_VolLab=pad_string(boot_sector_state.volume_label,
size=BootSector.MAX_VOL_LAB_SIZE),
BS_FilSysType=pad_string(boot_sector_state.file_sys_type,
size=BootSector.MAX_FS_TYPE_SIZE)
)
) + pad_header + fat_tables_content + root_dir_content + data_content
)
def parse_boot_sector(self, binary_data: bytes) -> None:
self._parsed_header = BootSector.BOOT_SECTOR_HEADER.parse(binary_data)
if self._parsed_header is None:
raise NotInitialized('The boot sector header is not parsed successfully!')
if self._parsed_header['BPB_TotSec16'] != 0x00:
sectors_count_: int = self._parsed_header['BPB_TotSec16']
elif self._parsed_header['BPB_TotSec32'] != 0x00:
# uncomment for FAT32 implementation
# sectors_count_ = self._parsed_header['BPB_TotSec32']
# possible_fat_types = [FAT32]
assert self._parsed_header['BPB_FATSz16'] == 0
raise NotImplementedError('FAT32 not implemented!')
else:
raise InconsistentFATAttributes('The number of FS sectors cannot be zero!')
# in the current code assigning self._parsed_header['BPB_TotSec32'] is not reachable
# the option to assign it is kept for possibility to implement FAT32
sectors_per_fat_cnt_ = self._parsed_header['BPB_FATSz16'] or self._parsed_header['BPB_TotSec32']
if self._parsed_header['BPB_BytsPerSec'] not in ALLOWED_SECTOR_SIZES:
raise InconsistentFATAttributes(f'The number of bytes '
f"per sector is {self._parsed_header['BPB_BytsPerSec']}! "
f'The accepted values are {ALLOWED_SECTOR_SIZES}')
if self._parsed_header['BPB_SecPerClus'] not in ALLOWED_SECTORS_PER_CLUSTER:
raise InconsistentFATAttributes(f'The number of sectors per cluster '
f"is {self._parsed_header['BPB_SecPerClus']}"
f'The accepted values are {ALLOWED_SECTORS_PER_CLUSTER}')
total_root_bytes: int = self._parsed_header['BPB_RootEntCnt'] * FATDefaults.ENTRY_SIZE
root_dir_sectors_cnt_: int = total_root_bytes // self._parsed_header['BPB_BytsPerSec']
self.boot_sector_state = BootSectorState(oem_name=self._parsed_header['BS_OEMName'],
sector_size=self._parsed_header['BPB_BytsPerSec'],
sectors_per_cluster=self._parsed_header['BPB_SecPerClus'],
reserved_sectors_cnt=self._parsed_header['BPB_RsvdSecCnt'],
fat_tables_cnt=self._parsed_header['BPB_NumFATs'],
root_dir_sectors_cnt=root_dir_sectors_cnt_,
sectors_count=sectors_count_,
media_type=self._parsed_header['BPB_Media'],
sectors_per_fat_cnt=sectors_per_fat_cnt_,
sec_per_track=self._parsed_header['BPB_SecPerTrk'],
num_heads=self._parsed_header['BPB_NumHeads'],
hidden_sectors=self._parsed_header['BPB_HiddSec'],
volume_label=self._parsed_header['BS_VolLab'],
file_sys_type=self._parsed_header['BS_FilSysType'],
volume_uuid=self._parsed_header['BS_VolID'])
self.boot_sector_state.binary_image = binary_data
assert self.boot_sector_state.file_sys_type in (f'FAT{self.boot_sector_state.fatfs_type} ', 'FAT ')
def __str__(self) -> str:
if self._parsed_header is None:
return 'Boot sector is not initialized!'
res: str = 'Properties of the FATFS:\n'
for member in getmembers(self.boot_sector_state, lambda a: not(isroutine(a))):
prop_ = getattr(self.boot_sector_state, member[0])
if isinstance(prop_, int) or isinstance(prop_, str) and not member[0].startswith('_'):
res += f'{member[0]}: {prop_}\n'
return res
@property
def binary_image(self) -> bytes:
if len(self.boot_sector_state.binary_image) == 0:
raise NotInitialized('Boot sector is not generated nor initialized!')
bin_image_: bytes = self.boot_sector_state.binary_image
return bin_image_

View File

@ -5,11 +5,16 @@ from typing import Dict, Optional
from construct import Int16ul
from .fatfs_state import FATFSState
from .utils import (FAT12, FAT16, build_byte, clean_first_half_byte, clean_second_half_byte,
from .fatfs_state import BootSectorState
from .utils import (EMPTY_BYTE, FAT12, FAT16, build_byte, merge_by_half_byte_12_bit_little_endian,
split_by_half_byte_12_bit_little_endian)
def get_dir_size(is_root: bool, boot_sector: BootSectorState) -> int:
dir_size_: int = boot_sector.root_dir_sectors_cnt * boot_sector.sector_size if is_root else boot_sector.sector_size
return dir_size_
class Cluster:
"""
class Cluster handles values in FAT table and allocates sectors in data region.
@ -23,22 +28,21 @@ class Cluster:
def __init__(self,
cluster_id: int,
fatfs_state: FATFSState,
is_empty: bool = True) -> None:
boot_sector_state: BootSectorState,
init_: bool) -> None:
self.id: int = cluster_id
self.fatfs_state: FATFSState = fatfs_state
self.boot_sector_state: BootSectorState = boot_sector_state
self._next_cluster = None # type: Optional[Cluster]
if self.id == Cluster.RESERVED_BLOCK_ID:
self.is_empty = False
self.set_in_fat(self.INITIAL_BLOCK_SWITCH[self.fatfs_state.fatfs_type])
# First cluster in FAT is reserved, low 8 bits contains BPB_Media and the rest is filled with 1
# e.g. the esp32 media type is 0xF8 thus the FAT[0] = 0xFF8 for FAT12, 0xFFF8 for FAT16
if self.id == Cluster.RESERVED_BLOCK_ID and init_:
self.set_in_fat(self.INITIAL_BLOCK_SWITCH[self.boot_sector_state.fatfs_type])
return
self.cluster_data_address: int = self._compute_cluster_data_address()
self.is_empty = is_empty
assert self.cluster_data_address or self.is_empty
assert self.cluster_data_address
@property
def next_cluster(self): # type: () -> Optional[Cluster]
@ -50,21 +54,27 @@ class Cluster:
def _cluster_id_to_logical_position_in_bits(self, _id: int) -> int:
# computes address of the cluster in fat table
return self.fatfs_state.fatfs_type * _id # type: ignore
logical_position_: int = self.boot_sector_state.fatfs_type * _id
return logical_position_
@staticmethod
def compute_cluster_data_address(boot_sector_state: BootSectorState, id_: int) -> int:
data_address_: int = boot_sector_state.root_directory_start
if not id_ == Cluster.ROOT_BLOCK_ID:
# the first data cluster id is 2 (we have to subtract reserved cluster and cluster for root)
data_address_ = boot_sector_state.sector_size * (id_ - 2) + boot_sector_state.data_region_start
return data_address_
def _compute_cluster_data_address(self) -> int:
if self.id == Cluster.ROOT_BLOCK_ID:
return self.fatfs_state.root_directory_start # type: ignore
# the first data cluster id is 2 (we have to subtract reserved cluster and cluster for root)
return self.fatfs_state.sector_size * (self.id - 2) + self.fatfs_state.data_region_start # type: ignore
return self.compute_cluster_data_address(self.boot_sector_state, self.id)
def _set_first_half_byte(self, address: int, value: int) -> None:
clean_second_half_byte(self.fatfs_state.binary_image, address)
self.fatfs_state.binary_image[address] |= value << 4
self.boot_sector_state.binary_image[address] &= 0x0f
self.boot_sector_state.binary_image[address] |= value << 4
def _set_second_half_byte(self, address: int, value: int) -> None:
clean_first_half_byte(self.fatfs_state.binary_image, address)
self.fatfs_state.binary_image[address] |= value
self.boot_sector_state.binary_image[address] &= 0xf0
self.boot_sector_state.binary_image[address] |= value
@property
def fat_cluster_address(self) -> int:
@ -73,7 +83,52 @@ class Cluster:
@property
def real_cluster_address(self) -> int:
return self.fatfs_state.start_address + self.fat_cluster_address // 8 # type: ignore
cluster_address: int = self.boot_sector_state.start_address + self.fat_cluster_address // 8
return cluster_address
def get_from_fat(self) -> int:
"""
Calculating the value in the FAT block, that denotes if the block is full, empty, or chained to other block.
For FAT12 is the block stored in one and half byte. If the order of the block is even the first byte and second
half of the second byte belongs to the block. First half of the second byte and the third byte belongs to
the second block.
e.g. b'\xff\x0f\x00' stores two blocks. First of them is evenly ordered (index 0) and is set to 0xfff,
that means full block that is final in chain of blocks
and second block is set to 0x000 that means empty block.
three bytes - AB XC YZ - stores two blocks - CAB YZX
"""
address_: int = self.real_cluster_address
bin_img_: bytearray = self.boot_sector_state.binary_image
if self.boot_sector_state.fatfs_type == FAT12:
if self.fat_cluster_address % 8 == 0:
# even block
byte_zero_full = bin_img_[self.real_cluster_address]
byte_one_second_half = bin_img_[self.real_cluster_address + 1] & 0x0F
merged_byte_: int = merge_by_half_byte_12_bit_little_endian(byte_zero_full & 0x0F,
(byte_zero_full & 0xF0) >> 4,
byte_one_second_half)
else:
# odd block
byte_one_full = bin_img_[self.real_cluster_address + 1]
byte_zero_second_half = (bin_img_[self.real_cluster_address] & 0xF0) >> 4
merged_byte_ = merge_by_half_byte_12_bit_little_endian(byte_zero_second_half,
byte_one_full & 0x0F,
(byte_one_full & 0xF0) >> 4)
return merged_byte_
if self.boot_sector_state.fatfs_type == FAT16:
return int.from_bytes(bin_img_[address_:address_ + 2], byteorder='little')
raise NotImplementedError('Only valid fatfs types are FAT12 and FAT16.')
@property
def is_empty(self) -> bool:
"""
The property method takes a look into the binary array and checks if the bytes ordered by little endian
and relates to the current cluster are all zeros (which denotes they are empty).
"""
return self.get_from_fat() == 0x00
def set_in_fat(self, value: int) -> None:
"""
@ -89,19 +144,21 @@ class Cluster:
"""
# value must fit into number of bits of the fat (12, 16 or 32)
assert value <= (1 << self.fatfs_state.fatfs_type) - 1
assert value <= (1 << self.boot_sector_state.fatfs_type) - 1
half_bytes = split_by_half_byte_12_bit_little_endian(value)
bin_img_: bytearray = self.boot_sector_state.binary_image
if self.fatfs_state.fatfs_type == FAT12:
if self.boot_sector_state.fatfs_type == FAT12:
assert merge_by_half_byte_12_bit_little_endian(*half_bytes) == value
if self.fat_cluster_address % 8 == 0:
self.fatfs_state.binary_image[self.real_cluster_address] = build_byte(half_bytes[1], half_bytes[0])
bin_img_[self.real_cluster_address] = build_byte(half_bytes[1], half_bytes[0])
self._set_second_half_byte(self.real_cluster_address + 1, half_bytes[2])
elif self.fat_cluster_address % 8 != 0:
self._set_first_half_byte(self.real_cluster_address, half_bytes[0])
self.fatfs_state.binary_image[self.real_cluster_address + 1] = build_byte(half_bytes[2], half_bytes[1])
elif self.fatfs_state.fatfs_type == FAT16:
self.fatfs_state.binary_image[self.real_cluster_address:self.real_cluster_address + 2] = Int16ul.build(
value)
bin_img_[self.real_cluster_address + 1] = build_byte(half_bytes[2], half_bytes[1])
elif self.boot_sector_state.fatfs_type == FAT16:
bin_img_[self.real_cluster_address:self.real_cluster_address + 2] = Int16ul.build(value)
assert self.get_from_fat() == value
@property
def is_root(self) -> bool:
@ -111,10 +168,9 @@ class Cluster:
"""
This method sets bits in FAT table to `allocated` and clean the corresponding sector(s)
"""
self.is_empty = False
self.set_in_fat(self.ALLOCATED_BLOCK_SWITCH[self.fatfs_state.fatfs_type])
self.set_in_fat(self.ALLOCATED_BLOCK_SWITCH[self.boot_sector_state.fatfs_type])
cluster_start = self.cluster_data_address
dir_size = self.fatfs_state.get_dir_size(self.is_root)
dir_size = get_dir_size(self.is_root, self.boot_sector_state)
cluster_end = cluster_start + dir_size
self.fatfs_state.binary_image[cluster_start:cluster_end] = dir_size * b'\x00'
self.boot_sector_state.binary_image[cluster_start:cluster_end] = dir_size * EMPTY_BYTE

View File

@ -1,14 +1,14 @@
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from typing import Any, List, Optional
from typing import List, Optional, Union
from construct import Const, Int8ul, Int16ul, Int32ul, PaddedString, Struct
from .exceptions import LowerCaseException, TooLongNameException
from .fatfs_state import FATFSState
from .utils import (DATETIME, FATFS_INCEPTION, MAX_EXT_SIZE, MAX_NAME_SIZE, SHORT_NAMES_ENCODING, build_date_entry,
build_time_entry, is_valid_fatfs_name, pad_string)
from .utils import (DATETIME, EMPTY_BYTE, FATFS_INCEPTION, MAX_EXT_SIZE, MAX_NAME_SIZE, SHORT_NAMES_ENCODING,
FATDefaults, build_date_entry, build_time_entry, is_valid_fatfs_name, pad_string)
class Entry:
@ -45,12 +45,12 @@ class Entry:
'DIR_Name' / PaddedString(MAX_NAME_SIZE, SHORT_NAMES_ENCODING),
'DIR_Name_ext' / PaddedString(MAX_EXT_SIZE, SHORT_NAMES_ENCODING),
'DIR_Attr' / Int8ul,
'DIR_NTRes' / Const(b'\x00'),
'DIR_CrtTimeTenth' / Const(b'\x00'), # ignored by esp-idf fatfs library
'DIR_CrtTime' / Int16ul, # ignored by esp-idf fatfs library
'DIR_CrtDate' / Int16ul, # ignored by esp-idf fatfs library
'DIR_LstAccDate' / Int16ul, # must be same as DIR_WrtDate
'DIR_FstClusHI' / Const(b'\x00\x00'),
'DIR_NTRes' / Const(EMPTY_BYTE),
'DIR_CrtTimeTenth' / Const(EMPTY_BYTE), # ignored by esp-idf fatfs library
'DIR_CrtTime' / Int16ul, # ignored by esp-idf fatfs library
'DIR_CrtDate' / Int16ul, # ignored by esp-idf fatfs library
'DIR_LstAccDate' / Int16ul, # must be same as DIR_WrtDate
'DIR_FstClusHI' / Const(2 * EMPTY_BYTE),
'DIR_WrtTime' / Int16ul,
'DIR_WrtDate' / Int16ul,
'DIR_FstClusLO' / Int16ul,
@ -63,7 +63,7 @@ class Entry:
fatfs_state: FATFSState) -> None:
self.fatfs_state: FATFSState = fatfs_state
self.id: int = entry_id
self.entry_address: int = parent_dir_entries_address + self.id * self.fatfs_state.entry_size
self.entry_address: int = parent_dir_entries_address + self.id * FATDefaults.ENTRY_SIZE
self._is_alias: bool = False
self._is_empty: bool = True
@ -72,12 +72,14 @@ class Entry:
return self._is_empty
@staticmethod
def _parse_entry(entry_bytearray: Optional[bytearray]) -> dict:
return Entry.ENTRY_FORMAT_SHORT_NAME.parse(entry_bytearray) # type: ignore
def _parse_entry(entry_bytearray: Union[bytearray, bytes]) -> dict:
entry_: dict = Entry.ENTRY_FORMAT_SHORT_NAME.parse(entry_bytearray)
return entry_
@staticmethod
def _build_entry(**kwargs) -> Any: # type: ignore
return Entry.ENTRY_FORMAT_SHORT_NAME.build(dict(**kwargs))
def _build_entry(**kwargs) -> bytes: # type: ignore
entry_: bytes = Entry.ENTRY_FORMAT_SHORT_NAME.build(dict(**kwargs))
return entry_
@staticmethod
def _build_entry_long(names: List[bytes], checksum: int, order: int, is_last: bool, entity_type: int) -> bytes:
@ -106,15 +108,26 @@ class Entry:
return long_entry
@property
def entry_bytes(self) -> Any:
return self.fatfs_state.binary_image[self.entry_address: self.entry_address + self.fatfs_state.entry_size]
def entry_bytes(self) -> bytes:
"""
:returns: Bytes defining the entry belonging to the given instance.
"""
start_: int = self.entry_address
entry_: bytes = self.fatfs_state.binary_image[start_: start_ + FATDefaults.ENTRY_SIZE]
return entry_
@entry_bytes.setter
def entry_bytes(self, value: int) -> None:
self.fatfs_state.binary_image[self.entry_address: self.entry_address + self.fatfs_state.entry_size] = value
def entry_bytes(self, value: bytes) -> None:
"""
:param value: new content of the entry
:returns: None
The setter sets the content of the entry in bytes.
"""
self.fatfs_state.binary_image[self.entry_address: self.entry_address + FATDefaults.ENTRY_SIZE] = value
def _clean_entry(self) -> None:
self.entry_bytes: bytes = self.fatfs_state.entry_size * b'\x00'
self.entry_bytes: bytes = FATDefaults.ENTRY_SIZE * EMPTY_BYTE
def allocate_entry(self,
first_cluster_id: int,
@ -173,7 +186,7 @@ class Entry:
)
start_address = self.entry_address
end_address = start_address + self.fatfs_state.entry_size
end_address = start_address + FATDefaults.ENTRY_SIZE
if lfn_order in (self.SHORT_ENTRY, self.SHORT_ENTRY_LN):
date_entry_: int = build_date_entry(*date)
time_entry: int = build_time_entry(*time)
@ -198,6 +211,13 @@ class Entry:
self.ATTR_LONG_NAME)
def update_content_size(self, content_size: int) -> None:
"""
:param content_size: the new size of the file content in bytes
:returns: None
This method parses the binary entry to the construct structure, updates the content size of the file
and builds new binary entry.
"""
parsed_entry = self._parse_entry(self.entry_bytes)
parsed_entry.DIR_FileSize = content_size # type: ignore
self.entry_bytes = Entry.ENTRY_FORMAT_SHORT_NAME.build(parsed_entry)

View File

@ -29,6 +29,13 @@ class TooLongNameException(Exception):
pass
class NotInitialized(Exception):
"""
Exception is raised when the user tries to access not initialized property
"""
pass
class WLNotInitialized(Exception):
"""
Exception is raised when the user tries to write fatfs not initialized with wear levelling

View File

@ -1,9 +1,11 @@
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from typing import List
from .cluster import Cluster
from .exceptions import NoFreeClusterException
from .fatfs_state import FATFSState
from .fatfs_state import BootSectorState
class FAT:
@ -11,19 +13,24 @@ class FAT:
The FAT represents the FAT region in file system. It is responsible for storing clusters
and chaining them in case we need to extend file or directory to more clusters.
"""
def __init__(self,
fatfs_state: FATFSState,
reserved_sectors_cnt: int) -> None:
self.fatfs_state = fatfs_state
self.reserved_sectors_cnt = reserved_sectors_cnt
self.clusters = [Cluster(cluster_id=i, fatfs_state=self.fatfs_state) for i in
range(1, self.fatfs_state.clusters)]
def allocate_root_dir(self) -> None:
"""
The root directory is implicitly created with the FatFS,
its block is on the index 1 (second index) and is allocated implicitly.
"""
self.clusters[Cluster.ROOT_BLOCK_ID].allocate_cluster()
# update root directory record
self.clusters[0].allocate_cluster()
# add first reserved cluster
self.clusters = [Cluster(cluster_id=Cluster.RESERVED_BLOCK_ID, fatfs_state=self.fatfs_state)] + self.clusters
def __init__(self, boot_sector_state: BootSectorState, init_: bool) -> None:
self.boot_sector_state = boot_sector_state
self.clusters: List[Cluster] = [Cluster(cluster_id=i,
boot_sector_state=self.boot_sector_state,
init_=init_) for i in range(self.boot_sector_state.clusters)]
if init_:
self.allocate_root_dir()
def parse_fat_sector(self) -> None:
pass
def find_free_cluster(self) -> Cluster:
# finds first empty cluster and allocates it

View File

@ -1,44 +1,11 @@
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from construct import Const, Int8ul, Int16ul, Int32ul, PaddedString, Struct
from .utils import (BYTES_PER_DIRECTORY_ENTRY, SHORT_NAMES_ENCODING, get_fatfs_type, get_non_data_sectors_cnt,
number_of_clusters, read_filesystem)
from .boot_sector import BootSector
from .utils import read_filesystem
class FATFSParser:
MAX_VOL_LAB_SIZE = 11
MAX_OEM_NAME_SIZE = 8
MAX_FS_TYPE_SIZE = 8
# the FAT specification defines 512 bytes for the boot sector header
BOOT_HEADER_SIZE = 512
BOOT_SECTOR_HEADER = Struct(
'BS_jmpBoot' / Const(b'\xeb\xfe\x90'),
'BS_OEMName' / PaddedString(MAX_OEM_NAME_SIZE, SHORT_NAMES_ENCODING),
'BPB_BytsPerSec' / Int16ul,
'BPB_SecPerClus' / Int8ul,
'BPB_RsvdSecCnt' / Int16ul,
'BPB_NumFATs' / Int8ul,
'BPB_RootEntCnt' / Int16ul,
'BPB_TotSec16' / Int16ul,
'BPB_Media' / Int8ul,
'BPB_FATSz16' / Int16ul,
'BPB_SecPerTrk' / Int16ul,
'BPB_NumHeads' / Int16ul,
'BPB_HiddSec' / Int32ul,
'BPB_TotSec32' / Int32ul,
'BS_DrvNum' / Const(b'\x80'),
'BS_Reserved1' / Const(b'\x00'),
'BS_BootSig' / Const(b'\x29'),
'BS_VolID' / Int32ul,
'BS_VolLab' / PaddedString(MAX_VOL_LAB_SIZE, SHORT_NAMES_ENCODING),
'BS_FilSysType' / PaddedString(MAX_FS_TYPE_SIZE, SHORT_NAMES_ENCODING),
'BS_EMPTY' / Const(448 * b'\x00'),
'Signature_word' / Const(b'\x55\xAA')
)
assert BOOT_SECTOR_HEADER.sizeof() == BOOT_HEADER_SIZE
def __init__(self, image_file_path: str, wl_support: bool = False) -> None:
if wl_support:
@ -46,22 +13,5 @@ class FATFSParser:
self.fatfs = read_filesystem(image_file_path)
# when wl is not supported we expect boot sector to be the first
self.parsed_header = FATFSParser.BOOT_SECTOR_HEADER.parse(self.fatfs[:FATFSParser.BOOT_HEADER_SIZE])
def print(self) -> None:
print('Properties of the FATFS:')
for key in self.parsed_header.keys():
if key in ('_io', 'BS_EMPTY', 'Signature_word'):
continue
print(' {}: {}'.format(key.replace('BPB_', '').replace('BS_', ''), self.parsed_header[key]))
root_dir_cnt = (self.parsed_header['BPB_RootEntCnt'] * BYTES_PER_DIRECTORY_ENTRY) // self.parsed_header[
'BPB_BytsPerSec']
non_data_sectors = get_non_data_sectors_cnt(self.parsed_header['BPB_RsvdSecCnt'],
# this has to be changed when FAT32 is supported
self.parsed_header['BPB_FATSz16'], root_dir_cnt)
data_clusters = self.parsed_header['BPB_TotSec16'] - non_data_sectors
clusters_num = number_of_clusters(data_clusters, self.parsed_header['BPB_SecPerClus'])
assert self.parsed_header['BPB_BytsPerSec'] in (512, 1024, 2048, 4096)
assert self.parsed_header['BPB_SecPerClus'] in (1, 2, 4, 8, 16, 32, 64, 128)
print(f' Clusters number: {clusters_num}')
print(f' FATFS type: FAT{get_fatfs_type(clusters_num)}')
self.parsed_header = BootSector.BOOT_SECTOR_HEADER.parse(self.fatfs[:BootSector.BOOT_HEADER_SIZE])
print(BootSector)

View File

@ -2,10 +2,11 @@
# SPDX-License-Identifier: Apache-2.0
from textwrap import dedent
from typing import Optional
from .exceptions import InconsistentFATAttributes
from .utils import (FAT12, FAT12_MAX_CLUSTERS, FAT16, FAT16_MAX_CLUSTERS, get_fatfs_type, get_non_data_sectors_cnt,
number_of_clusters)
from .utils import (ALLOWED_SECTOR_SIZES, FAT12, FAT12_MAX_CLUSTERS, FAT16, FAT16_MAX_CLUSTERS, FATDefaults,
get_fatfs_type, get_non_data_sectors_cnt, number_of_clusters)
class FATFSState:
@ -14,7 +15,6 @@ class FATFSState:
"""
def __init__(self,
entry_size: int,
sector_size: int,
reserved_sectors_cnt: int,
root_dir_sectors_cnt: int,
@ -30,34 +30,88 @@ class FATFSState:
hidden_sectors: int,
file_sys_type: str,
use_default_datetime: bool,
explicit_fat_type: int = None,
explicit_fat_type: Optional[int] = None,
long_names_enabled: bool = False):
self._explicit_fat_type = explicit_fat_type
self._binary_image: bytearray = bytearray(b'')
self.fat_tables_cnt: int = fat_tables_cnt
self.oem_name: str = oem_name
self.file_sys_type: str = file_sys_type
self.sec_per_track: int = sec_per_track
self.hidden_sectors: int = hidden_sectors
self.volume_label: str = volume_label
self.media_type: int = media_type
self.boot_sector_state = BootSectorState(oem_name=oem_name,
sector_size=sector_size,
sectors_per_cluster=sectors_per_cluster,
reserved_sectors_cnt=reserved_sectors_cnt,
fat_tables_cnt=fat_tables_cnt,
root_dir_sectors_cnt=root_dir_sectors_cnt,
sectors_count=size // sector_size,
media_type=media_type,
sectors_per_fat_cnt=sectors_per_fat,
sec_per_track=sec_per_track,
num_heads=num_heads,
hidden_sectors=hidden_sectors,
volume_label=volume_label,
file_sys_type=file_sys_type,
volume_uuid=-1)
self._explicit_fat_type: Optional[int] = explicit_fat_type
self.long_names_enabled: bool = long_names_enabled
self.entry_size: int = entry_size
self.num_heads: int = num_heads
self.sector_size: int = sector_size
self.root_dir_sectors_cnt: int = root_dir_sectors_cnt
self.reserved_sectors_cnt: int = reserved_sectors_cnt
self.size: int = size
self.sectors_per_fat_cnt: int = sectors_per_fat
self.sectors_per_cluster: int = sectors_per_cluster
self.use_default_datetime: bool = use_default_datetime
if self.clusters in (FAT12_MAX_CLUSTERS, FAT16_MAX_CLUSTERS):
if self.boot_sector_state.clusters in (FAT12_MAX_CLUSTERS, FAT16_MAX_CLUSTERS):
print('WARNING: It is not recommended to create FATFS with bounding '
f'count of clusters: {FAT12_MAX_CLUSTERS} or {FAT16_MAX_CLUSTERS}')
self.check_fat_type()
@property
def binary_image(self) -> bytearray:
return self.boot_sector_state.binary_image
@binary_image.setter
def binary_image(self, value: bytearray) -> None:
self.boot_sector_state.binary_image = value
def check_fat_type(self) -> None:
_type = self.boot_sector_state.fatfs_type
if self._explicit_fat_type is not None and self._explicit_fat_type != _type:
raise InconsistentFATAttributes(dedent(
f"""FAT type you specified is inconsistent with other attributes of the system.
The specified FATFS type: FAT{self._explicit_fat_type}
The actual FATFS type: FAT{_type}"""))
if _type not in (FAT12, FAT16):
raise NotImplementedError('FAT32 is currently not supported.')
class BootSectorState:
# pylint: disable=too-many-instance-attributes
def __init__(self,
oem_name: str,
sector_size: int,
sectors_per_cluster: int,
reserved_sectors_cnt: int,
fat_tables_cnt: int,
root_dir_sectors_cnt: int,
sectors_count: int,
media_type: int,
sectors_per_fat_cnt: int,
sec_per_track: int,
num_heads: int,
hidden_sectors: int,
volume_label: str,
file_sys_type: str,
volume_uuid: int = -1) -> None:
self.oem_name: str = oem_name
self.sector_size: int = sector_size
assert self.sector_size in ALLOWED_SECTOR_SIZES
self.sectors_per_cluster: int = sectors_per_cluster
self.reserved_sectors_cnt: int = reserved_sectors_cnt
self.fat_tables_cnt: int = fat_tables_cnt
self.root_dir_sectors_cnt: int = root_dir_sectors_cnt
self.sectors_count: int = sectors_count
self.media_type: int = media_type
self.sectors_per_fat_cnt: int = sectors_per_fat_cnt
self.sec_per_track: int = sec_per_track
self.num_heads: int = num_heads
self.hidden_sectors: int = hidden_sectors
self.volume_label: str = volume_label
self.file_sys_type: str = file_sys_type
self.volume_uuid: int = volume_uuid
self._binary_image: bytearray = bytearray(b'')
@property
def binary_image(self) -> bytearray:
return self._binary_image
@ -66,50 +120,46 @@ class FATFSState:
def binary_image(self, value: bytearray) -> None:
self._binary_image = value
def get_dir_size(self, is_root: bool) -> int:
return self.root_dir_sectors_cnt * self.sector_size if is_root else self.sector_size
@property
def start_address(self) -> int:
return self.sector_size * self.reserved_sectors_cnt
@property
def data_sectors(self) -> int:
return (self.size // self.sector_size) - self.non_data_sectors
@property
def non_data_sectors(self) -> int:
return get_non_data_sectors_cnt(self.reserved_sectors_cnt, self.sectors_per_fat_cnt, # type: ignore
self.root_dir_sectors_cnt)
def size(self) -> int:
return self.sector_size * self.sectors_count
@property
def data_region_start(self) -> int:
return self.non_data_sectors * self.sector_size
@property
def clusters(self) -> int:
return number_of_clusters(self.data_sectors, self.sectors_per_cluster) # type: ignore
@property
def root_directory_start(self) -> int:
return (self.reserved_sectors_cnt + self.sectors_per_fat_cnt) * self.sector_size
def check_fat_type(self) -> None:
_type = self.fatfs_type
if self._explicit_fat_type is not None and self._explicit_fat_type != _type:
raise InconsistentFATAttributes(dedent(
f"""FAT type you specified is inconsistent with other attributes of the system.
The specified FATFS type: FAT{self._explicit_fat_type}
The actual FATFS type: FAT{_type}"""))
if _type not in (FAT12, FAT16):
raise NotImplementedError('FAT32 is currently not supported.')
@property
def fatfs_type(self) -> int:
# variable typed_fatfs_type must be explicitly typed to avoid mypy error
typed_fatfs_type: int = get_fatfs_type(self.clusters)
return typed_fatfs_type
@property
def clusters(self) -> int:
clusters_cnt_: int = number_of_clusters(self.data_sectors, self.sectors_per_cluster)
return clusters_cnt_
@property
def data_sectors(self) -> int:
# self.sector_size is checked in constructor if has one of allowed values (ALLOWED_SECTOR_SIZES)
return (self.size // self.sector_size) - self.non_data_sectors
@property
def non_data_sectors(self) -> int:
non_data_sectors_: int = get_non_data_sectors_cnt(self.reserved_sectors_cnt,
self.sectors_per_fat_cnt,
self.root_dir_sectors_cnt)
return non_data_sectors_
@property
def start_address(self) -> int:
return self.sector_size * self.reserved_sectors_cnt
@property
def entries_root_count(self) -> int:
return (self.root_dir_sectors_cnt * self.sector_size) // self.entry_size
entries_root_count_: int = (self.root_dir_sectors_cnt * self.sector_size) // FATDefaults.ENTRY_SIZE
return entries_root_count_
@property
def root_directory_start(self) -> int:
return (self.reserved_sectors_cnt + self.sectors_per_fat_cnt) * self.sector_size

View File

@ -12,7 +12,7 @@ from .fatfs_state import FATFSState
from .long_filename_utils import (build_lfn_full_name, build_lfn_unique_entry_name_order,
get_required_lfn_entries_count, split_name_to_lfn_entries,
split_name_to_lfn_entry_blocks)
from .utils import (DATETIME, MAX_EXT_SIZE, MAX_NAME_SIZE, build_lfn_short_entry_name, lfn_checksum,
from .utils import (DATETIME, MAX_EXT_SIZE, MAX_NAME_SIZE, FATDefaults, build_lfn_short_entry_name, lfn_checksum,
required_clusters_count, split_content_into_sectors, split_to_name_and_extension)
@ -51,7 +51,7 @@ class File:
self.entry.update_content_size(len(content))
# we assume that the correct amount of clusters is allocated
current_cluster = self._first_cluster
for content_part in split_content_into_sectors(content, self.fatfs_state.sector_size):
for content_part in split_content_into_sectors(content, self.fatfs_state.boot_sector_state.sector_size):
content_as_list = content_part
if current_cluster is None:
raise FatalError('No free space left!')
@ -88,7 +88,7 @@ class Directory:
self.extension: str = extension
self.fat: FAT = fat
self.size: int = size or self.fatfs_state.sector_size
self.size: int = size or self.fatfs_state.boot_sector_state.sector_size
# if directory is root its parent is itself
self.parent: Directory = parent or self
@ -114,29 +114,30 @@ class Directory:
def name_equals(self, name: str, extension: str) -> bool:
return self.name == name and self.extension == extension
@property
def entries_count(self) -> int:
entries_count_: int = self.size // FATDefaults.ENTRY_SIZE
return entries_count_
def create_entries(self, cluster: Cluster) -> List[Entry]:
return [Entry(entry_id=i,
parent_dir_entries_address=cluster.cluster_data_address,
fatfs_state=self.fatfs_state)
for i in range(self.size // self.fatfs_state.entry_size)]
for i in range(self.entries_count)]
def init_directory(self) -> None:
self.entries = self.create_entries(self._first_cluster)
# the root directory doesn't contain link to itself nor the parent
if not self.is_root:
self_dir_reference: Entry = self.find_free_entry() or self.chain_directory()
self_dir_reference.allocate_entry(first_cluster_id=self.first_cluster.id,
entity_name=self.CURRENT_DIRECTORY,
entity_extension='',
entity_type=self.ENTITY_TYPE)
self.first_cluster = self._first_cluster
parent_dir_reference: Entry = self.find_free_entry() or self.chain_directory()
parent_dir_reference.allocate_entry(first_cluster_id=self.parent.first_cluster.id,
entity_name=self.PARENT_DIRECTORY,
entity_extension='',
entity_type=self.parent.ENTITY_TYPE)
self.parent.first_cluster = self.parent.first_cluster
if self.is_root:
return
# if the directory is not root we initialize the reference to itself and to the parent directory
for dir_id, name_ in ((self, self.CURRENT_DIRECTORY), (self.parent, self.PARENT_DIRECTORY)):
new_dir_: Entry = self.find_free_entry() or self.chain_directory()
new_dir_.allocate_entry(first_cluster_id=dir_id.first_cluster.id,
entity_name=name_,
entity_extension='',
entity_type=dir_id.ENTITY_TYPE)
def lookup_entity(self, object_name: str, extension: str): # type: ignore
for entity in self.entities:
@ -144,12 +145,23 @@ class Directory:
return entity
return None
@staticmethod
def _if_end_of_path(path_as_list: List[str]) -> bool:
"""
:param path_as_list: path split into the list
:returns: True if the file is the leaf of the directory tree, False otherwise
The method is part of the base of recursion,
determines if the path is target file or directory in the tree folder structure.
"""
return len(path_as_list) == 1
def recursive_search(self, path_as_list, current_dir): # type: ignore
name, extension = split_to_name_and_extension(path_as_list[0])
next_obj = current_dir.lookup_entity(name, extension)
if next_obj is None:
raise FileNotFoundError('No such file or directory!')
if len(path_as_list) == 1 and next_obj.name_equals(name, extension):
if self._if_end_of_path(path_as_list) and next_obj.name_equals(name, extension):
return next_obj
return self.recursive_search(path_as_list[1:], next_obj)
@ -165,10 +177,16 @@ class Directory:
current = current.next_cluster
new_cluster: Cluster = self.fat.find_free_cluster()
current.set_in_fat(new_cluster.id)
assert current is not new_cluster
current.next_cluster = new_cluster
self.entries += self.create_entries(new_cluster)
def chain_directory(self) -> Entry:
"""
:returns: First free entry
The method adds new Cluster to the Directory and returns first free entry.
"""
self._extend_directory()
free_entry: Entry = self.find_free_entry()
if free_entry is None:
@ -299,7 +317,8 @@ class Directory:
"""
entity_to_write: Entry = self.recursive_search(path, self)
if isinstance(entity_to_write, File):
clusters_cnt: int = required_clusters_count(cluster_size=self.fatfs_state.sector_size, content=content)
clusters_cnt: int = required_clusters_count(cluster_size=self.fatfs_state.boot_sector_state.sector_size,
content=content)
self.fat.allocate_chain(entity_to_write.first_cluster, clusters_cnt)
entity_to_write.write(content)
else:

View File

@ -12,20 +12,39 @@ from construct import BitsInteger, BitStruct, Int16ul
FAT12_MAX_CLUSTERS: int = 4085
FAT16_MAX_CLUSTERS: int = 65525
PAD_CHAR: int = 0x20
FAT12: int = 12
FAT16: int = 16
FAT32: int = 32
FULL_BYTE: bytes = b'\xff'
EMPTY_BYTE: bytes = b'\x00'
# redundant
BYTES_PER_DIRECTORY_ENTRY: int = 32
UINT32_MAX: int = (1 << 32) - 1
MAX_NAME_SIZE: int = 8
MAX_EXT_SIZE: int = 3
DATETIME = Tuple[int, int, int]
FATFS_INCEPTION: datetime = datetime(1980, 1, 1, 0, 0, 0, 0)
FATFS_INCEPTION_YEAR: int = 1980
FATFS_INCEPTION: datetime = datetime(FATFS_INCEPTION_YEAR, 1, 1, 0, 0, 0, 0)
FATFS_MAX_HOURS = 24
FATFS_MAX_MINUTES = 60
FATFS_MAX_SECONDS = 60
FATFS_MAX_DAYS = 31
FATFS_MAX_MONTHS = 12
FATFS_MAX_YEARS = 127
FATFS_SECONDS_GRANULARITY: int = 2
# long names are encoded to two bytes in utf-16
LONG_NAMES_ENCODING: str = 'utf-16'
SHORT_NAMES_ENCODING: str = 'utf-8'
ALLOWED_SECTOR_SIZES: List[int] = [512, 1024, 2048, 4096]
ALLOWED_SECTORS_PER_CLUSTER: List[int] = [1, 2, 4, 8, 16, 32, 64, 128]
def crc32(input_values: List[int], crc: int) -> int:
"""
@ -60,11 +79,15 @@ def generate_4bytes_random() -> int:
return uuid.uuid4().int & 0xFFFFFFFF
def pad_string(content: str, size: Optional[int] = None, pad: int = 0x20) -> str:
def pad_string(content: str, size: Optional[int] = None, pad: int = PAD_CHAR) -> str:
# cut string if longer and fill with pad character if shorter than size
return content.ljust(size or len(content), chr(pad))[:size]
def right_strip_string(content: str, pad: int = PAD_CHAR) -> str:
return content.rstrip(chr(pad))
def build_lfn_short_entry_name(name: str, extension: str, order: int) -> str:
return '{}{}'.format(pad_string(content=name[:MAX_NAME_SIZE - 2] + '~' + chr(order), size=MAX_NAME_SIZE),
pad_string(extension[:MAX_EXT_SIZE], size=MAX_EXT_SIZE))
@ -84,7 +107,7 @@ def lfn_checksum(short_entry_name: str) -> int:
def convert_to_utf16_and_pad(content: str,
expected_size: int,
pad: bytes = b'\xff',
pad: bytes = FULL_BYTE,
terminator: bytes = b'\x00\x00') -> bytes:
# we need to get rid of the Byte order mark 0xfeff or 0xfffe, fatfs does not use it
bom_utf16: bytes = b'\xfe\xff'
@ -103,31 +126,19 @@ def is_valid_fatfs_name(string: str) -> bool:
return string == string.upper()
def split_by_half_byte_12_bit_little_endian(value: int) -> DATETIME:
def split_by_half_byte_12_bit_little_endian(value: int) -> Tuple[int, int, int]:
value_as_bytes: bytes = Int16ul.build(value)
return value_as_bytes[0] & 0x0f, value_as_bytes[0] >> 4, value_as_bytes[1] & 0x0f
def merge_by_half_byte_12_bit_little_endian(v1: int, v2: int, v3: int) -> int:
return v1 | v2 << 4 | v3 << 8
def build_byte(first_half: int, second_half: int) -> int:
return (first_half << 4) | second_half
def clean_first_half_byte(bytes_array: bytearray, address: int) -> None:
"""
the function sets to zero first four bits of the byte.
E.g. 10111100 -> 10110000
"""
bytes_array[address] &= 0xf0
def clean_second_half_byte(bytes_array: bytearray, address: int) -> None:
"""
the function sets to zero last four bits of the byte.
E.g. 10111100 -> 00001100
"""
bytes_array[address] &= 0x0f
def split_content_into_sectors(content: bytes, sector_size: int) -> List[bytes]:
result = []
clusters_cnt: int = required_clusters_count(cluster_size=sector_size, content=content)
@ -145,20 +156,20 @@ def get_args_for_partition_generator(desc: str) -> argparse.Namespace:
default='fatfs_image.img',
help='Filename of the generated fatfs image')
parser.add_argument('--partition_size',
default=1024 * 1024,
default=FATDefaults.SIZE,
help='Size of the partition in bytes')
parser.add_argument('--sector_size',
default=4096,
default=FATDefaults.SECTOR_SIZE,
type=int,
choices=[512, 1024, 2048, 4096],
choices=ALLOWED_SECTOR_SIZES,
help='Size of the partition in bytes')
parser.add_argument('--sectors_per_cluster',
default=1,
type=int,
choices=[1, 2, 4, 8, 16, 32, 64, 128],
choices=ALLOWED_SECTORS_PER_CLUSTER,
help='Number of sectors per cluster')
parser.add_argument('--root_entry_count',
default=512,
default=FATDefaults.ROOT_ENTRIES_COUNT,
help='Number of entries in the root directory')
parser.add_argument('--long_name_support',
action='store_true',
@ -212,10 +223,10 @@ def build_date_entry(year: int, mon: int, mday: int) -> int:
:returns: 16 bit integer number (7 bits for year, 4 bits for month and 5 bits for day of the month)
"""
assert year in range(1980, 2107)
assert mon in range(1, 13)
assert mday in range(1, 32)
return int.from_bytes(DATE_ENTRY.build(dict(year=year - 1980, month=mon, day=mday)), 'big')
assert year in range(FATFS_INCEPTION_YEAR, FATFS_INCEPTION_YEAR + FATFS_MAX_YEARS)
assert mon in range(1, FATFS_MAX_MONTHS + 1)
assert mday in range(1, FATFS_MAX_DAYS + 1)
return int.from_bytes(DATE_ENTRY.build(dict(year=year - FATFS_INCEPTION_YEAR, month=mon, day=mday)), 'big')
def build_time_entry(hour: int, minute: int, sec: int) -> int:
@ -226,7 +237,35 @@ def build_time_entry(hour: int, minute: int, sec: int) -> int:
:returns: 16 bit integer number (5 bits for hour, 6 bits for minute and 5 bits for second)
"""
assert hour in range(0, 24)
assert minute in range(0, 60)
assert sec in range(0, 60)
return int.from_bytes(TIME_ENTRY.build(dict(hour=hour, minute=minute, second=sec // 2)), 'big')
assert hour in range(FATFS_MAX_HOURS)
assert minute in range(FATFS_MAX_MINUTES)
assert sec in range(FATFS_MAX_SECONDS)
return int.from_bytes(TIME_ENTRY.build(
dict(hour=hour, minute=minute, second=sec // FATFS_SECONDS_GRANULARITY)),
byteorder='big'
)
class FATDefaults:
# FATFS defaults
SIZE: int = 1024 * 1024
RESERVED_SECTORS_COUNT: int = 1
FAT_TABLES_COUNT: int = 1
SECTORS_PER_CLUSTER: int = 1
SECTOR_SIZE: int = 0x1000
SECTORS_PER_FAT: int = 1
HIDDEN_SECTORS: int = 0
ENTRY_SIZE: int = 32
NUM_HEADS: int = 0xff
OEM_NAME: str = 'MSDOS5.0'
SEC_PER_TRACK: int = 0x3f
VOLUME_LABEL: str = 'Espressif'
FILE_SYS_TYPE: str = 'FAT'
ROOT_ENTRIES_COUNT: int = 512 # number of entries in the root directory
MEDIA_TYPE: int = 0xf8
SIGNATURE_WORD: bytes = b'\x55\xAA'
# wear levelling defaults
VERSION: int = 2
TEMP_BUFFER_SIZE: int = 32
UPDATE_RATE: int = 16

View File

@ -0,0 +1,51 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import os
import sys
from typing import Tuple
from fatfsgen_utils.boot_sector import BootSector
from fatfsgen_utils.cluster import Cluster
from fatfsgen_utils.entry import Entry
from fatfsgen_utils.fat import FAT
from fatfsgen_utils.fatfs_state import BootSectorState
from fatfsgen_utils.utils import PAD_CHAR, FATDefaults, read_filesystem
def get_address_and_name(obj_: dict, state_: BootSectorState) -> Tuple[int, str]:
cluster_id_ = obj_['DIR_FstClusLO']
obj_ext_ = obj_['DIR_Name_ext'].rstrip(chr(PAD_CHAR))
ext_ = f'.{obj_ext_}' if len(obj_ext_) > 0 else ''
obj_name_ = obj_['DIR_Name'].rstrip(chr(PAD_CHAR)) + ext_
data_address_ = Cluster.compute_cluster_data_address(state_, cluster_id_)
return data_address_, obj_name_
def traverse_folder_tree(directory_address: int, name: str, state_: BootSectorState) -> None:
if name not in ('.', '..'):
os.makedirs(name)
for i in range(state_.sector_size // FATDefaults.ENTRY_SIZE):
obj_address_ = directory_address + FATDefaults.ENTRY_SIZE * i
obj_ = Entry.ENTRY_FORMAT_SHORT_NAME.parse(
fs[obj_address_: obj_address_ + FATDefaults.ENTRY_SIZE])
if obj_['DIR_Attr'] == Entry.ATTR_ARCHIVE:
data_address_, obj_name_ = get_address_and_name(obj_, state_)
content_ = fs[data_address_: data_address_ + state_.sector_size].rstrip(chr(0x00).encode())
with open(os.path.join(name, obj_name_), 'wb') as new_file:
new_file.write(content_)
elif obj_['DIR_Attr'] == Entry.ATTR_DIRECTORY:
data_address_, obj_name_ = get_address_and_name(obj_, state_)
if obj_name_ in ('.', '..'):
continue
traverse_folder_tree(data_address_, os.path.join(name, obj_name_), state_=state_)
if __name__ == '__main__':
fs = read_filesystem(sys.argv[1])
parser = BootSector()
parser.parse_boot_sector(fs)
fat = FAT(parser.boot_sector_state, init_=False)
traverse_folder_tree(parser.boot_sector_state.root_directory_start,
parser.boot_sector_state.volume_label.rstrip(chr(PAD_CHAR)),
parser.boot_sector_state)

View File

@ -189,7 +189,7 @@ class FatFSGen(unittest.TestCase):
def test_fatfs16_detection(self) -> None:
fatfs = fatfsgen.FATFS(size=16 * 1024 * 1024)
self.assertEqual(fatfs.state.fatfs_type, 16)
self.assertEqual(fatfs.state.boot_sector_state.fatfs_type, 16)
def test_fatfs32_detection(self) -> None:
self.assertRaises(NotImplementedError, fatfsgen.FATFS, size=256 * 1024 * 1024)

View File

@ -0,0 +1,50 @@
#!/usr/bin/env python
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import os
import shutil
import unittest
from subprocess import STDOUT, run
from test_utils import generate_test_dir_2
class FatFSGen(unittest.TestCase):
def setUp(self) -> None:
os.makedirs('output_data')
generate_test_dir_2()
def tearDown(self) -> None:
shutil.rmtree('output_data')
shutil.rmtree('Espressif')
if os.path.exists('fatfs_image.img'):
os.remove('fatfs_image.img')
@staticmethod
def test_gen_parse() -> None:
run([
'python',
f'{os.path.join(os.path.dirname(__file__), "..", "fatfsgen.py")}',
'output_data/tst_str'
], stderr=STDOUT)
run(['python', '../fatfsgen.py', 'output_data/tst_str'], stderr=STDOUT)
run(['python', '../fatfsparse.py', 'fatfs_image.img'], stderr=STDOUT)
assert set(os.listdir('Espressif')) == {'TEST', 'TESTFILE'}
with open('Espressif/TESTFILE', 'rb') as in_:
assert in_.read() == b'ahoj\n'
assert set(os.listdir('Espressif/TEST')) == {'TEST', 'TESTFIL2'}
with open('Espressif/TEST/TESTFIL2', 'rb') as in_:
assert in_.read() == b'thisistest\n'
assert set(os.listdir('Espressif/TEST/TEST')) == {'LASTFILE.TXT'}
with open('Espressif/TEST/TEST/LASTFILE.TXT', 'rb') as in_:
assert in_.read() == b'deeptest\n'
if __name__ == '__main__':
unittest.main()

View File

@ -7,13 +7,14 @@ from typing import List, Optional
from construct import Const, Int32ul, Struct
from fatfsgen import FATFS
from fatfsgen_utils.exceptions import WLNotInitialized
from fatfsgen_utils.utils import UINT32_MAX, crc32, generate_4bytes_random, get_args_for_partition_generator
from fatfsgen_utils.utils import (FULL_BYTE, UINT32_MAX, FATDefaults, crc32, generate_4bytes_random,
get_args_for_partition_generator)
class WLFATFS:
# pylint: disable=too-many-instance-attributes
CFG_SECTORS_COUNT = 1
DUMMY_SECTORS_COUNT = 1
WL_CFG_SECTORS_COUNT = 1
WL_DUMMY_SECTORS_COUNT = 1
WL_CONFIG_HEADER_SIZE = 48
WL_STATE_RECORD_SIZE = 16
WL_STATE_HEADER_SIZE = 64
@ -45,28 +46,27 @@ class WLFATFS:
WL_CONFIG_T_HEADER_SIZE = 48
def __init__(self,
size: int = 1024 * 1024,
reserved_sectors_cnt: int = 1,
fat_tables_cnt: int = 1,
sectors_per_cluster: int = 1,
sector_size: int = 0x1000,
sectors_per_fat: int = 1,
size: int = FATDefaults.SIZE,
reserved_sectors_cnt: int = FATDefaults.RESERVED_SECTORS_COUNT,
fat_tables_cnt: int = FATDefaults.FAT_TABLES_COUNT,
sectors_per_cluster: int = FATDefaults.SECTORS_PER_CLUSTER,
sector_size: int = FATDefaults.SECTOR_SIZE,
sectors_per_fat: int = FATDefaults.SECTORS_PER_FAT,
explicit_fat_type: int = None,
hidden_sectors: int = 0,
hidden_sectors: int = FATDefaults.HIDDEN_SECTORS,
long_names_enabled: bool = False,
entry_size: int = 32,
num_heads: int = 0xff,
oem_name: str = 'MSDOS5.0',
sec_per_track: int = 0x3f,
volume_label: str = 'Espressif',
file_sys_type: str = 'FAT',
num_heads: int = FATDefaults.NUM_HEADS,
oem_name: str = FATDefaults.OEM_NAME,
sec_per_track: int = FATDefaults.SEC_PER_TRACK,
volume_label: str = FATDefaults.VOLUME_LABEL,
file_sys_type: str = FATDefaults.FILE_SYS_TYPE,
use_default_datetime: bool = True,
version: int = 2,
temp_buff_size: int = 32,
update_rate: int = 16,
version: int = FATDefaults.VERSION,
temp_buff_size: int = FATDefaults.TEMP_BUFFER_SIZE,
update_rate: int = FATDefaults.UPDATE_RATE,
device_id: int = None,
root_entry_count: int = 512,
media_type: int = 0xf8) -> None:
root_entry_count: int = FATDefaults.ROOT_ENTRIES_COUNT,
media_type: int = FATDefaults.MEDIA_TYPE) -> None:
if sector_size != WLFATFS.WL_SECTOR_SIZE:
raise NotImplementedError(f'The only supported sector size is currently {WLFATFS.WL_SECTOR_SIZE}')
@ -86,7 +86,8 @@ class WLFATFS:
self.boot_sector_start = self.sector_size # shift by one "dummy" sector
self.fat_table_start = self.boot_sector_start + reserved_sectors_cnt * self.sector_size
wl_sectors = WLFATFS.DUMMY_SECTORS_COUNT + WLFATFS.CFG_SECTORS_COUNT + self.wl_state_sectors * 2
wl_sectors = (WLFATFS.WL_DUMMY_SECTORS_COUNT + WLFATFS.WL_CFG_SECTORS_COUNT +
self.wl_state_sectors * WLFATFS.WL_STATE_COPY_COUNT)
self.plain_fat_sectors = self.total_sectors - wl_sectors
self.plain_fatfs = FATFS(
@ -100,7 +101,6 @@ class WLFATFS:
root_entry_count=root_entry_count,
hidden_sectors=hidden_sectors,
long_names_enabled=long_names_enabled,
entry_size=entry_size,
num_heads=num_heads,
use_default_datetime=use_default_datetime,
oem_name=oem_name,
@ -121,7 +121,7 @@ class WLFATFS:
self._initialized = True
def _add_dummy_sector(self) -> None:
self.fatfs_binary_image = self.sector_size * b'\xff' + self.fatfs_binary_image
self.fatfs_binary_image = self.sector_size * FULL_BYTE + self.fatfs_binary_image
def _add_config_sector(self) -> None:
wl_config_data = WLFATFS.WL_CONFIG_T_DATA.build(
@ -143,13 +143,13 @@ class WLFATFS:
# adding three 4 byte zeros to align the structure
wl_config = wl_config_data + wl_config_crc + Int32ul.build(0) + Int32ul.build(0) + Int32ul.build(0)
self.fatfs_binary_image += (wl_config + (self.sector_size - WLFATFS.WL_CONFIG_HEADER_SIZE) * b'\xff')
self.fatfs_binary_image += (wl_config + (self.sector_size - WLFATFS.WL_CONFIG_HEADER_SIZE) * FULL_BYTE)
def _add_state_sectors(self) -> None:
wl_state_data = WLFATFS.WL_STATE_T_DATA.build(
dict(
pos=0,
max_pos=self.plain_fat_sectors + WLFATFS.DUMMY_SECTORS_COUNT,
max_pos=self.plain_fat_sectors + WLFATFS.WL_DUMMY_SECTORS_COUNT,
move_count=0,
access_count=0,
max_count=self._update_rate,
@ -161,9 +161,11 @@ class WLFATFS:
crc = crc32(list(wl_state_data), UINT32_MAX)
wl_state_crc = Int32ul.build(crc)
wl_state = wl_state_data + wl_state_crc
self.fatfs_binary_image += WLFATFS.WL_STATE_COPY_COUNT * (
(wl_state + (self.sector_size - WLFATFS.WL_STATE_HEADER_SIZE) * b'\xff') + (
self.wl_state_sectors - 1) * self.sector_size * b'\xff')
wl_state_sector_padding: bytes = (self.sector_size - WLFATFS.WL_STATE_HEADER_SIZE) * FULL_BYTE
wl_state_sector: bytes = (
wl_state + wl_state_sector_padding + (self.wl_state_sectors - 1) * self.sector_size * FULL_BYTE
)
self.fatfs_binary_image += (WLFATFS.WL_STATE_COPY_COUNT * wl_state_sector)
def wl_write_filesystem(self, output_path: str) -> None:
if not self._initialized:

View File

@ -129,3 +129,14 @@ For example::
If FLASH_IN_PROJECT is not specified, the image will still be generated, but you will have to flash it manually using ``esptool.py`` or a custom build system target.
For an example, see :example:`storage/fatfsgen`.
FATFS partition analyzer
------------------------
We provide a partition analyzer for FATFS (:component_file:`fatfsparse.py<fatfs/fatfsparse.py>`). The tool is still in active progress and provides only restricted functionality.
It is only guaranteed that the tool is able to analyze images generated by FATFS partition generator (:component_file:`fatfsgen.py<fatfs/fatfsgen.py>`) (without support for wear levelling and long names) and generate the folder structure on host with the same name as a FATFS volume label.
Usage::
./fatfsparse.py fatfs_image.img

View File

@ -8,6 +8,7 @@ components/espcoredump/test/test_espcoredump.sh
components/espcoredump/test_apps/build_espcoredump.sh
components/fatfs/fatfsgen.py
components/fatfs/test_fatfsgen/test_fatfsgen.py
components/fatfs/test_fatfsgen/test_fatfsparse.py
components/fatfs/test_fatfsgen/test_wl_fatfsgen.py
components/fatfs/wl_fatfsgen.py
components/heap/test_multi_heap_host/test_all_configs.sh