mirror of
https://github.com/espressif/esp-idf.git
synced 2024-10-05 20:47:46 -04:00
Merge branch 'feature/add-chaining-fatfsparse' into 'master'
fatfs: enable cluster chaining for fatfsparse.py Closes IDF-4975 See merge request espressif/esp-idf!18069
This commit is contained in:
commit
67920a03dc
@ -12,6 +12,15 @@ from .utils import (ALLOWED_SECTOR_SIZES, ALLOWED_SECTORS_PER_CLUSTER, EMPTY_BYT
|
||||
|
||||
|
||||
class BootSector:
|
||||
"""
|
||||
This class describes the first sector of the volume in the Reserved Region.
|
||||
It contains data from BPB (BIOS Parameter Block) and BS (Boot sector). The fields of the BPB and BS are mixed in
|
||||
the header of the physical boot sector. Fields with prefix BPB belongs to BPB block and with prefix BS
|
||||
belongs to the actual boot sector.
|
||||
|
||||
Please beware, that the name of class BootSector refer to data both from the boot sector and BPB.
|
||||
ESP32 ignores fields with prefix "BS_"! Fields with prefix BPB_ are essential to read the filesystem.
|
||||
"""
|
||||
MAX_VOL_LAB_SIZE = 11
|
||||
MAX_OEM_NAME_SIZE = 8
|
||||
MAX_FS_TYPE_SIZE = 8
|
||||
@ -20,6 +29,7 @@ class BootSector:
|
||||
BOOT_HEADER_SIZE = 512
|
||||
|
||||
BOOT_SECTOR_HEADER = Struct(
|
||||
# this value reflects BS_jmpBoot used for ESP32 boot sector (any other accepted)
|
||||
'BS_jmpBoot' / Const(b'\xeb\xfe\x90'),
|
||||
'BS_OEMName' / PaddedString(MAX_OEM_NAME_SIZE, SHORT_NAMES_ENCODING),
|
||||
'BPB_BytsPerSec' / Int16ul,
|
||||
@ -27,13 +37,13 @@ class BootSector:
|
||||
'BPB_RsvdSecCnt' / Int16ul,
|
||||
'BPB_NumFATs' / Int8ul,
|
||||
'BPB_RootEntCnt' / Int16ul,
|
||||
'BPB_TotSec16' / Int16ul,
|
||||
'BPB_TotSec16' / Int16ul, # zero if the FAT type is 32, otherwise number of sectors
|
||||
'BPB_Media' / Int8ul,
|
||||
'BPB_FATSz16' / Int16ul, # for FAT32 always zero, for FAT12/FAT16 number of sectors per FAT
|
||||
'BPB_SecPerTrk' / Int16ul,
|
||||
'BPB_NumHeads' / Int16ul,
|
||||
'BPB_HiddSec' / Int32ul,
|
||||
'BPB_TotSec32' / Int32ul,
|
||||
'BPB_TotSec32' / Int32ul, # zero if the FAT type is 12/16, otherwise number of sectors
|
||||
'BS_DrvNum' / Const(b'\x80'),
|
||||
'BS_Reserved1' / Const(EMPTY_BYTE),
|
||||
'BS_BootSig' / Const(b'\x29'),
|
||||
@ -90,6 +100,7 @@ class BootSector:
|
||||
self._parsed_header = BootSector.BOOT_SECTOR_HEADER.parse(binary_data)
|
||||
if self._parsed_header is None:
|
||||
raise NotInitialized('The boot sector header is not parsed successfully!')
|
||||
|
||||
if self._parsed_header['BPB_TotSec16'] != 0x00:
|
||||
sectors_count_: int = self._parsed_header['BPB_TotSec16']
|
||||
elif self._parsed_header['BPB_TotSec32'] != 0x00:
|
||||
@ -100,6 +111,7 @@ class BootSector:
|
||||
raise NotImplementedError('FAT32 not implemented!')
|
||||
else:
|
||||
raise InconsistentFATAttributes('The number of FS sectors cannot be zero!')
|
||||
|
||||
# in the current code assigning self._parsed_header['BPB_TotSec32'] is not reachable
|
||||
# the option to assign it is kept for possibility to implement FAT32
|
||||
sectors_per_fat_cnt_ = self._parsed_header['BPB_FATSz16'] or self._parsed_header['BPB_TotSec32']
|
||||
@ -136,7 +148,7 @@ class BootSector:
|
||||
if self._parsed_header is None:
|
||||
return 'Boot sector is not initialized!'
|
||||
res: str = 'Properties of the FATFS:\n'
|
||||
for member in getmembers(self.boot_sector_state, lambda a: not(isroutine(a))):
|
||||
for member in getmembers(self.boot_sector_state, lambda a: not (isroutine(a))):
|
||||
prop_ = getattr(self.boot_sector_state, member[0])
|
||||
if isinstance(prop_, int) or isinstance(prop_, str) and not member[0].startswith('_'):
|
||||
res += f'{member[0]}: {prop_}\n'
|
@ -59,6 +59,13 @@ class Cluster:
|
||||
|
||||
@staticmethod
|
||||
def compute_cluster_data_address(boot_sector_state: BootSectorState, id_: int) -> int:
|
||||
"""
|
||||
This method translates the id of the cluster to the address in data region.
|
||||
|
||||
:param boot_sector_state: the class with FS shared data
|
||||
:param id_: id of the cluster
|
||||
:returns: integer denoting the address of the cluster in the data region
|
||||
"""
|
||||
data_address_: int = boot_sector_state.root_directory_start
|
||||
if not id_ == Cluster.ROOT_BLOCK_ID:
|
||||
# the first data cluster id is 2 (we have to subtract reserved cluster and cluster for root)
|
||||
@ -68,11 +75,11 @@ class Cluster:
|
||||
def _compute_cluster_data_address(self) -> int:
|
||||
return self.compute_cluster_data_address(self.boot_sector_state, self.id)
|
||||
|
||||
def _set_first_half_byte(self, address: int, value: int) -> None:
|
||||
def _set_left_half_byte(self, address: int, value: int) -> None:
|
||||
self.boot_sector_state.binary_image[address] &= 0x0f
|
||||
self.boot_sector_state.binary_image[address] |= value << 4
|
||||
|
||||
def _set_second_half_byte(self, address: int, value: int) -> None:
|
||||
def _set_right_half_byte(self, address: int, value: int) -> None:
|
||||
self.boot_sector_state.binary_image[address] &= 0xf0
|
||||
self.boot_sector_state.binary_image[address] |= value
|
||||
|
||||
@ -83,7 +90,11 @@ class Cluster:
|
||||
|
||||
@property
|
||||
def real_cluster_address(self) -> int:
|
||||
cluster_address: int = self.boot_sector_state.start_address + self.fat_cluster_address // 8
|
||||
"""
|
||||
The property method computes the real address of the cluster in the FAT region. Result is simply
|
||||
address of the cluster in fat + fat table address.
|
||||
"""
|
||||
cluster_address: int = self.boot_sector_state.fat_table_start_address + self.fat_cluster_address // 8
|
||||
return cluster_address
|
||||
|
||||
def get_from_fat(self) -> int:
|
||||
@ -105,19 +116,9 @@ class Cluster:
|
||||
if self.boot_sector_state.fatfs_type == FAT12:
|
||||
if self.fat_cluster_address % 8 == 0:
|
||||
# even block
|
||||
byte_zero_full = bin_img_[self.real_cluster_address]
|
||||
byte_one_second_half = bin_img_[self.real_cluster_address + 1] & 0x0F
|
||||
merged_byte_: int = merge_by_half_byte_12_bit_little_endian(byte_zero_full & 0x0F,
|
||||
(byte_zero_full & 0xF0) >> 4,
|
||||
byte_one_second_half)
|
||||
else:
|
||||
# odd block
|
||||
byte_one_full = bin_img_[self.real_cluster_address + 1]
|
||||
byte_zero_second_half = (bin_img_[self.real_cluster_address] & 0xF0) >> 4
|
||||
merged_byte_ = merge_by_half_byte_12_bit_little_endian(byte_zero_second_half,
|
||||
byte_one_full & 0x0F,
|
||||
(byte_one_full & 0xF0) >> 4)
|
||||
return merged_byte_
|
||||
return bin_img_[self.real_cluster_address] | ((bin_img_[self.real_cluster_address + 1] & 0x0F) << 8)
|
||||
# odd block
|
||||
return ((bin_img_[self.real_cluster_address] & 0xF0) >> 4) | (bin_img_[self.real_cluster_address + 1] << 4)
|
||||
if self.boot_sector_state.fatfs_type == FAT16:
|
||||
return int.from_bytes(bin_img_[address_:address_ + 2], byteorder='little')
|
||||
raise NotImplementedError('Only valid fatfs types are FAT12 and FAT16.')
|
||||
@ -151,10 +152,12 @@ class Cluster:
|
||||
if self.boot_sector_state.fatfs_type == FAT12:
|
||||
assert merge_by_half_byte_12_bit_little_endian(*half_bytes) == value
|
||||
if self.fat_cluster_address % 8 == 0:
|
||||
# even block
|
||||
bin_img_[self.real_cluster_address] = build_byte(half_bytes[1], half_bytes[0])
|
||||
self._set_second_half_byte(self.real_cluster_address + 1, half_bytes[2])
|
||||
self._set_right_half_byte(self.real_cluster_address + 1, half_bytes[2])
|
||||
elif self.fat_cluster_address % 8 != 0:
|
||||
self._set_first_half_byte(self.real_cluster_address, half_bytes[0])
|
||||
# odd block
|
||||
self._set_left_half_byte(self.real_cluster_address, half_bytes[0])
|
||||
bin_img_[self.real_cluster_address + 1] = build_byte(half_bytes[2], half_bytes[1])
|
||||
elif self.boot_sector_state.fatfs_type == FAT16:
|
||||
bin_img_[self.real_cluster_address:self.real_cluster_address + 2] = Int16ul.build(value)
|
@ -29,8 +29,14 @@ class FAT:
|
||||
if init_:
|
||||
self.allocate_root_dir()
|
||||
|
||||
def parse_fat_sector(self) -> None:
|
||||
pass
|
||||
def get_cluster_value(self, cluster_id_: int) -> int:
|
||||
fat_cluster_value_: int = self.clusters[cluster_id_].get_from_fat()
|
||||
return fat_cluster_value_
|
||||
|
||||
def is_cluster_last(self, cluster_id_: int) -> bool:
|
||||
value_ = self.get_cluster_value(cluster_id_)
|
||||
is_cluster_last_: bool = value_ == (1 << self.boot_sector_state.fatfs_type) - 1
|
||||
return is_cluster_last_
|
||||
|
||||
def find_free_cluster(self) -> Cluster:
|
||||
# finds first empty cluster and allocates it
|
@ -32,7 +32,6 @@ class FATFSState:
|
||||
use_default_datetime: bool,
|
||||
explicit_fat_type: Optional[int] = None,
|
||||
long_names_enabled: bool = False):
|
||||
|
||||
self.boot_sector_state = BootSectorState(oem_name=oem_name,
|
||||
sector_size=sector_size,
|
||||
sectors_per_cluster=sectors_per_cluster,
|
||||
@ -152,7 +151,7 @@ class BootSectorState:
|
||||
return non_data_sectors_
|
||||
|
||||
@property
|
||||
def start_address(self) -> int:
|
||||
def fat_table_start_address(self) -> int:
|
||||
return self.sector_size * self.reserved_sectors_cnt
|
||||
|
||||
@property
|
@ -6,12 +6,12 @@ import os
|
||||
from datetime import datetime
|
||||
from typing import Any, List, Optional
|
||||
|
||||
from fatfsgen_utils.boot_sector import BootSector
|
||||
from fatfsgen_utils.fat import FAT
|
||||
from fatfsgen_utils.fatfs_state import FATFSState
|
||||
from fatfsgen_utils.fs_object import Directory
|
||||
from fatfsgen_utils.utils import (BYTES_PER_DIRECTORY_ENTRY, FATFS_INCEPTION, FATDefaults,
|
||||
get_args_for_partition_generator, read_filesystem)
|
||||
from fatfs_utils.boot_sector import BootSector
|
||||
from fatfs_utils.fat import FAT
|
||||
from fatfs_utils.fatfs_state import FATFSState
|
||||
from fatfs_utils.fs_object import Directory
|
||||
from fatfs_utils.utils import (BYTES_PER_DIRECTORY_ENTRY, FATFS_INCEPTION, FATDefaults,
|
||||
get_args_for_partition_generator, read_filesystem)
|
||||
|
||||
|
||||
class FATFS:
|
||||
|
@ -4,40 +4,77 @@ import os
|
||||
import sys
|
||||
from typing import Tuple
|
||||
|
||||
from fatfsgen_utils.boot_sector import BootSector
|
||||
from fatfsgen_utils.cluster import Cluster
|
||||
from fatfsgen_utils.entry import Entry
|
||||
from fatfsgen_utils.fat import FAT
|
||||
from fatfsgen_utils.fatfs_state import BootSectorState
|
||||
from fatfsgen_utils.utils import PAD_CHAR, FATDefaults, read_filesystem
|
||||
from fatfs_utils.boot_sector import BootSector
|
||||
from fatfs_utils.cluster import Cluster
|
||||
from fatfs_utils.entry import Entry
|
||||
from fatfs_utils.fat import FAT
|
||||
from fatfs_utils.fatfs_state import BootSectorState
|
||||
from fatfs_utils.utils import PAD_CHAR, FATDefaults, read_filesystem
|
||||
|
||||
|
||||
def get_address_and_name(obj_: dict, state_: BootSectorState) -> Tuple[int, str]:
|
||||
def get_chained_full_content(cluster_id_: int,
|
||||
fat_: FAT,
|
||||
state_: BootSectorState,
|
||||
binary_array_: bytearray) -> bytearray:
|
||||
if fat_.is_cluster_last(cluster_id_):
|
||||
data_address_ = Cluster.compute_cluster_data_address(state_, cluster_id_)
|
||||
content_: bytearray = binary_array_[data_address_: data_address_ + state_.sector_size]
|
||||
return content_
|
||||
fat_value_: int = fat_.get_cluster_value(cluster_id_)
|
||||
data_address_ = Cluster.compute_cluster_data_address(state_, cluster_id_)
|
||||
content_ = binary_array_[data_address_: data_address_ + state_.sector_size]
|
||||
|
||||
while not fat_.is_cluster_last(cluster_id_):
|
||||
cluster_id_ = fat_value_
|
||||
fat_value_ = fat_.get_cluster_value(cluster_id_)
|
||||
data_address_ = Cluster.compute_cluster_data_address(state_, cluster_id_)
|
||||
content_ += binary_array_[data_address_: data_address_ + state_.sector_size]
|
||||
return content_
|
||||
|
||||
|
||||
def get_name_and_id(obj_: dict) -> Tuple[str, int]:
|
||||
cluster_id_ = obj_['DIR_FstClusLO']
|
||||
obj_ext_ = obj_['DIR_Name_ext'].rstrip(chr(PAD_CHAR))
|
||||
ext_ = f'.{obj_ext_}' if len(obj_ext_) > 0 else ''
|
||||
obj_name_ = obj_['DIR_Name'].rstrip(chr(PAD_CHAR)) + ext_
|
||||
data_address_ = Cluster.compute_cluster_data_address(state_, cluster_id_)
|
||||
return data_address_, obj_name_
|
||||
return obj_name_, cluster_id_
|
||||
|
||||
|
||||
def traverse_folder_tree(directory_address: int, name: str, state_: BootSectorState) -> None:
|
||||
def traverse_folder_tree(directory_bytes_: bytes,
|
||||
name: str,
|
||||
state_: BootSectorState, fat_: FAT,
|
||||
binary_array_: bytearray) -> None:
|
||||
if name not in ('.', '..'):
|
||||
os.makedirs(name)
|
||||
for i in range(state_.sector_size // FATDefaults.ENTRY_SIZE):
|
||||
obj_address_ = directory_address + FATDefaults.ENTRY_SIZE * i
|
||||
for i in range(len(directory_bytes_) // FATDefaults.ENTRY_SIZE):
|
||||
obj_address_ = FATDefaults.ENTRY_SIZE * i
|
||||
obj_ = Entry.ENTRY_FORMAT_SHORT_NAME.parse(
|
||||
fs[obj_address_: obj_address_ + FATDefaults.ENTRY_SIZE])
|
||||
directory_bytes_[obj_address_: obj_address_ + FATDefaults.ENTRY_SIZE])
|
||||
if obj_['DIR_Attr'] == Entry.ATTR_ARCHIVE:
|
||||
data_address_, obj_name_ = get_address_and_name(obj_, state_)
|
||||
content_ = fs[data_address_: data_address_ + state_.sector_size].rstrip(chr(0x00).encode())
|
||||
obj_name_, cluster_id_ = get_name_and_id(obj_)
|
||||
content_ = get_chained_full_content(
|
||||
cluster_id_=cluster_id_,
|
||||
fat_=fat_,
|
||||
state_=state_,
|
||||
binary_array_=binary_array_
|
||||
).rstrip(chr(0x00).encode())
|
||||
with open(os.path.join(name, obj_name_), 'wb') as new_file:
|
||||
new_file.write(content_)
|
||||
elif obj_['DIR_Attr'] == Entry.ATTR_DIRECTORY:
|
||||
data_address_, obj_name_ = get_address_and_name(obj_, state_)
|
||||
obj_name_, cluster_id_ = get_name_and_id(obj_)
|
||||
if obj_name_ in ('.', '..'):
|
||||
continue
|
||||
traverse_folder_tree(data_address_, os.path.join(name, obj_name_), state_=state_)
|
||||
child_directory_bytes_ = get_chained_full_content(
|
||||
cluster_id_=obj_['DIR_FstClusLO'],
|
||||
fat_=fat_,
|
||||
state_=state_,
|
||||
binary_array_=binary_array_
|
||||
)
|
||||
traverse_folder_tree(directory_bytes_=child_directory_bytes_,
|
||||
name=os.path.join(name, obj_name_),
|
||||
state_=state_,
|
||||
fat_=fat_,
|
||||
binary_array_=binary_array_)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
@ -46,6 +83,9 @@ if __name__ == '__main__':
|
||||
parser.parse_boot_sector(fs)
|
||||
fat = FAT(parser.boot_sector_state, init_=False)
|
||||
|
||||
traverse_folder_tree(parser.boot_sector_state.root_directory_start,
|
||||
boot_dir_start_ = parser.boot_sector_state.root_directory_start
|
||||
boot_dir_sectors = parser.boot_sector_state.root_dir_sectors_cnt
|
||||
full_ = fs[boot_dir_start_: boot_dir_start_ + boot_dir_sectors * parser.boot_sector_state.sector_size]
|
||||
traverse_folder_tree(full_,
|
||||
parser.boot_sector_state.volume_label.rstrip(chr(PAD_CHAR)),
|
||||
parser.boot_sector_state)
|
||||
parser.boot_sector_state, fat, fs)
|
||||
|
@ -12,11 +12,11 @@ from test_utils import CFG, fill_sector, generate_test_dir_1, generate_test_dir_
|
||||
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
|
||||
import fatfsgen # noqa E402 # pylint: disable=C0413
|
||||
from fatfsgen_utils.exceptions import InconsistentFATAttributes # noqa E402 # pylint: disable=C0413
|
||||
from fatfsgen_utils.exceptions import TooLongNameException # noqa E402 # pylint: disable=C0413
|
||||
from fatfsgen_utils.exceptions import WriteDirectoryException # noqa E402 # pylint: disable=C0413
|
||||
from fatfsgen_utils.exceptions import LowerCaseException, NoFreeClusterException # noqa E402 # pylint: disable=C0413
|
||||
from fatfsgen_utils.utils import FAT12, read_filesystem # noqa E402 # pylint: disable=C0413
|
||||
from fatfs_utils.exceptions import InconsistentFATAttributes # noqa E402 # pylint: disable=C0413
|
||||
from fatfs_utils.exceptions import TooLongNameException # noqa E402 # pylint: disable=C0413
|
||||
from fatfs_utils.exceptions import WriteDirectoryException # noqa E402 # pylint: disable=C0413
|
||||
from fatfs_utils.exceptions import LowerCaseException, NoFreeClusterException # noqa E402 # pylint: disable=C0413
|
||||
from fatfs_utils.utils import FAT12, read_filesystem # noqa E402 # pylint: disable=C0413
|
||||
|
||||
|
||||
class FatFSGen(unittest.TestCase):
|
||||
|
@ -4,10 +4,14 @@
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import unittest
|
||||
from subprocess import STDOUT, run
|
||||
|
||||
from test_utils import generate_test_dir_2
|
||||
from test_utils import compare_folders, fill_sector, generate_local_folder_structure, generate_test_dir_2
|
||||
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
|
||||
import fatfsgen # noqa E402 # pylint: disable=C0413
|
||||
|
||||
|
||||
class FatFSGen(unittest.TestCase):
|
||||
@ -16,8 +20,9 @@ class FatFSGen(unittest.TestCase):
|
||||
generate_test_dir_2()
|
||||
|
||||
def tearDown(self) -> None:
|
||||
shutil.rmtree('output_data')
|
||||
shutil.rmtree('Espressif')
|
||||
shutil.rmtree('output_data', ignore_errors=True)
|
||||
shutil.rmtree('Espressif', ignore_errors=True)
|
||||
shutil.rmtree('testf', ignore_errors=True)
|
||||
|
||||
if os.path.exists('fatfs_image.img'):
|
||||
os.remove('fatfs_image.img')
|
||||
@ -45,6 +50,224 @@ class FatFSGen(unittest.TestCase):
|
||||
with open('Espressif/TEST/TEST/LASTFILE.TXT', 'rb') as in_:
|
||||
assert in_.read() == b'deeptest\n'
|
||||
|
||||
@staticmethod
|
||||
def test_file_chaining() -> None:
|
||||
fatfs = fatfsgen.FATFS()
|
||||
fatfs.create_file('WRITEF', extension='TXT')
|
||||
fatfs.write_content(path_from_root=['WRITEF.TXT'], content=4096 * b'a' + b'a')
|
||||
fatfs.write_filesystem('fatfs_image.img')
|
||||
|
||||
run(['python', '../fatfsparse.py', 'fatfs_image.img'], stderr=STDOUT)
|
||||
with open('Espressif/WRITEF.TXT', 'rb') as in_:
|
||||
assert in_.read() == 4097 * b'a'
|
||||
|
||||
@staticmethod
|
||||
def test_full_two_sectors_folder() -> None:
|
||||
fatfs = fatfsgen.FATFS(size=2 * 1024 * 1024)
|
||||
fatfs.create_directory('TESTFOLD')
|
||||
|
||||
for i in range((2 * 4096) // 32):
|
||||
fatfs.create_file(f'A{str(i).upper()}', path_from_root=['TESTFOLD'])
|
||||
fatfs.write_content(path_from_root=['TESTFOLD', 'A253'], content=b'later')
|
||||
fatfs.write_content(path_from_root=['TESTFOLD', 'A255'], content=b'last')
|
||||
fatfs.write_filesystem('fatfs_image.img')
|
||||
|
||||
run(['python', '../fatfsparse.py', 'fatfs_image.img'], stderr=STDOUT)
|
||||
assert set(os.listdir('Espressif')) == {'TESTFOLD'}
|
||||
assert set(os.listdir('Espressif/TESTFOLD')) == {f'A{str(i).upper()}' for i in range(256)}
|
||||
|
||||
with open('Espressif/TESTFOLD/A253', 'rb') as in_:
|
||||
assert in_.read() == b'later'
|
||||
|
||||
with open('Espressif/TESTFOLD/A255', 'rb') as in_:
|
||||
assert in_.read() == b'last'
|
||||
|
||||
@staticmethod
|
||||
def test_empty_fat16() -> None:
|
||||
fatfs = fatfsgen.FATFS(size=17 * 1024 * 1024)
|
||||
fatfs.write_filesystem('fatfs_image.img')
|
||||
run(['python', '../fatfsparse.py', 'fatfs_image.img'], stderr=STDOUT)
|
||||
|
||||
@staticmethod
|
||||
def test_chaining_fat16() -> None:
|
||||
fatfs = fatfsgen.FATFS(size=17 * 1024 * 1024)
|
||||
fatfs.create_file('WRITEF', extension='TXT')
|
||||
fatfs.write_content(path_from_root=['WRITEF.TXT'], content=4096 * b'a' + b'a')
|
||||
fatfs.write_filesystem('fatfs_image.img')
|
||||
run(['python', '../fatfsparse.py', 'fatfs_image.img'], stderr=STDOUT)
|
||||
with open('Espressif/WRITEF.TXT', 'rb') as in_:
|
||||
assert in_.read() == 4097 * b'a'
|
||||
|
||||
@staticmethod
|
||||
def test_full_sector_folder_fat16() -> None:
|
||||
fatfs = fatfsgen.FATFS(size=17 * 1024 * 1024)
|
||||
fatfs.create_directory('TESTFOLD')
|
||||
|
||||
fill_sector(fatfs)
|
||||
fatfs.write_content(path_from_root=['TESTFOLD', 'A0'], content=b'first')
|
||||
fatfs.write_content(path_from_root=['TESTFOLD', 'A126'], content=b'later')
|
||||
fatfs.write_filesystem('fatfs_image.img')
|
||||
run(['python', '../fatfsparse.py', 'fatfs_image.img'], stderr=STDOUT)
|
||||
assert set(os.listdir('Espressif')) == {'TESTFOLD'}
|
||||
assert set(os.listdir('Espressif/TESTFOLD')) == {f'A{str(i).upper()}' for i in range(128)}
|
||||
with open('Espressif/TESTFOLD/A0', 'rb') as in_:
|
||||
assert in_.read() == b'first'
|
||||
|
||||
with open('Espressif/TESTFOLD/A126', 'rb') as in_:
|
||||
assert in_.read() == b'later'
|
||||
|
||||
@staticmethod
|
||||
def file_(x: str, content_: str = 'hey this is a test') -> dict:
|
||||
return {
|
||||
'type': 'file',
|
||||
'name': x,
|
||||
'content': content_
|
||||
}
|
||||
|
||||
def test_e2e_file(self) -> None:
|
||||
struct_: dict = {
|
||||
'type': 'folder',
|
||||
'name': 'testf',
|
||||
'content': [self.file_('NEWF')]
|
||||
}
|
||||
generate_local_folder_structure(struct_, path_='.')
|
||||
run([
|
||||
'python',
|
||||
f'{os.path.join(os.path.dirname(__file__), "..", "fatfsgen.py")}',
|
||||
'testf'
|
||||
], stderr=STDOUT)
|
||||
run(['python', '../fatfsparse.py', 'fatfs_image.img'], stderr=STDOUT)
|
||||
assert compare_folders('testf', 'Espressif')
|
||||
|
||||
def test_e2e_deeper(self) -> None:
|
||||
folder_ = {
|
||||
'type': 'folder',
|
||||
'name': 'XYZ',
|
||||
'content': [
|
||||
self.file_('NEWFLE'),
|
||||
self.file_('NEW.TXT'),
|
||||
self.file_('NEWE.TXT'),
|
||||
self.file_('NEW4.TXT'),
|
||||
self.file_('NEW5.TXT'),
|
||||
]
|
||||
}
|
||||
struct_: dict = {
|
||||
'type': 'folder',
|
||||
'name': 'testf',
|
||||
'content': [
|
||||
self.file_('MY_NEW'),
|
||||
folder_
|
||||
]
|
||||
}
|
||||
generate_local_folder_structure(struct_, path_='.')
|
||||
run([
|
||||
'python',
|
||||
f'{os.path.join(os.path.dirname(__file__), "..", "fatfsgen.py")}',
|
||||
'testf'
|
||||
], stderr=STDOUT)
|
||||
run(['python', '../fatfsparse.py', 'fatfs_image.img'], stderr=STDOUT)
|
||||
assert compare_folders('testf', 'Espressif')
|
||||
|
||||
def test_e2e_deeper_large(self) -> None:
|
||||
folder_ = {
|
||||
'type': 'folder',
|
||||
'name': 'XYZ',
|
||||
'content': [
|
||||
self.file_('NEWFLE', content_=4097 * 'a'),
|
||||
self.file_('NEW.TXT', content_=2 * 4097 * 'a'),
|
||||
self.file_('NEWE.TXT'),
|
||||
self.file_('NEW4.TXT'),
|
||||
self.file_('NEW5.TXT'),
|
||||
]
|
||||
}
|
||||
folder2_ = {
|
||||
'type': 'folder',
|
||||
'name': 'XYZ3',
|
||||
'content': [
|
||||
self.file_('NEWFLE', content_=4097 * 'a'),
|
||||
self.file_('NEW.TXT', content_=2 * 4097 * 'a'),
|
||||
self.file_('NEWE.TXT'),
|
||||
self.file_('NEW4.TXT'),
|
||||
self.file_('NEW5.TXT'),
|
||||
]
|
||||
}
|
||||
folder3_ = {
|
||||
'type': 'folder',
|
||||
'name': 'XYZ2',
|
||||
'content': [
|
||||
self.file_(f'A{i}') for i in range(50)
|
||||
]
|
||||
}
|
||||
struct_: dict = {
|
||||
'type': 'folder',
|
||||
'name': 'testf',
|
||||
'content': [
|
||||
self.file_('MY_NEW'),
|
||||
folder_,
|
||||
folder2_,
|
||||
folder3_
|
||||
]
|
||||
}
|
||||
generate_local_folder_structure(struct_, path_='.')
|
||||
run([
|
||||
'python',
|
||||
f'{os.path.join(os.path.dirname(__file__), "..", "fatfsgen.py")}',
|
||||
'testf'
|
||||
], stderr=STDOUT)
|
||||
run(['python', '../fatfsparse.py', 'fatfs_image.img'], stderr=STDOUT)
|
||||
assert compare_folders('testf', 'Espressif')
|
||||
|
||||
def test_e2e_very_deep(self) -> None:
|
||||
folder_ = {
|
||||
'type': 'folder',
|
||||
'name': 'XYZ',
|
||||
'content': [
|
||||
self.file_('NEWFLE', content_=4097 * 'a'),
|
||||
self.file_('NEW.TXT', content_=2 * 4097 * 'a'),
|
||||
self.file_('NEWE.TXT'),
|
||||
self.file_('NEW4.TXT'),
|
||||
self.file_('NEW5.TXT'),
|
||||
]
|
||||
}
|
||||
folder2_ = {
|
||||
'type': 'folder',
|
||||
'name': 'XYZ3',
|
||||
'content': [
|
||||
self.file_('NEWFLE', content_=4097 * 'a'),
|
||||
self.file_('NEW.TXT', content_=2 * 4097 * 'a'),
|
||||
self.file_('NEWE.TXT'),
|
||||
self.file_('NEW4.TXT'),
|
||||
self.file_('NEW5.TXT'),
|
||||
folder_,
|
||||
]
|
||||
}
|
||||
folder3_ = {
|
||||
'type': 'folder',
|
||||
'name': 'XYZ2',
|
||||
'content': [
|
||||
self.file_(f'A{i}') for i in range(50)
|
||||
] + [folder2_]
|
||||
}
|
||||
|
||||
struct_: dict = {
|
||||
'type': 'folder',
|
||||
'name': 'testf',
|
||||
'content': [
|
||||
self.file_('MY_NEW'),
|
||||
folder_,
|
||||
folder2_,
|
||||
folder3_
|
||||
]
|
||||
}
|
||||
generate_local_folder_structure(struct_, path_='.')
|
||||
run([
|
||||
'python',
|
||||
f'{os.path.join(os.path.dirname(__file__), "..", "fatfsgen.py")}',
|
||||
'testf'
|
||||
], stderr=STDOUT)
|
||||
run(['python', '../fatfsparse.py', 'fatfs_image.img'], stderr=STDOUT)
|
||||
assert compare_folders('testf', 'Espressif')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -42,3 +42,26 @@ def generate_test_dir_2() -> None:
|
||||
def fill_sector(fatfs: fatfsgen.FATFS, file_prefix: str = 'A') -> None:
|
||||
for i in range(CFG['sector_size'] // CFG['entry_size']):
|
||||
fatfs.create_file(f'{file_prefix}{str(i).upper()}', path_from_root=['TESTFOLD'])
|
||||
|
||||
|
||||
def generate_local_folder_structure(structure_: dict, path_: str) -> None:
|
||||
if structure_['type'] == 'folder':
|
||||
new_path_ = os.path.join(path_, structure_['name'])
|
||||
os.makedirs(new_path_)
|
||||
for item_ in structure_['content']:
|
||||
generate_local_folder_structure(item_, new_path_)
|
||||
else:
|
||||
new_path_ = os.path.join(path_, structure_['name'])
|
||||
with open(new_path_, 'w') as f_:
|
||||
f_.write(structure_['content'])
|
||||
|
||||
|
||||
def compare_folders(fp1: str, fp2: str) -> bool:
|
||||
if os.path.isdir(fp1) != os.path.isdir(fp2):
|
||||
return False
|
||||
if os.path.isdir(fp1):
|
||||
if set(os.listdir(fp1)) != set(os.listdir(fp2)):
|
||||
return False
|
||||
return all([compare_folders(os.path.join(fp1, path_), os.path.join(fp2, path_)) for path_ in os.listdir(fp1)])
|
||||
with open(fp1, 'rb') as f1_, open(fp2, 'rb') as f2_:
|
||||
return f1_.read() == f2_.read()
|
||||
|
@ -11,7 +11,7 @@ from test_utils import CFG, generate_test_dir_1, generate_test_dir_2
|
||||
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
|
||||
import wl_fatfsgen # noqa E402 # pylint: disable=C0413
|
||||
from fatfsgen_utils.exceptions import WLNotInitialized # noqa E402 # pylint: disable=C0413
|
||||
from fatfs_utils.exceptions import WLNotInitialized # noqa E402 # pylint: disable=C0413
|
||||
|
||||
|
||||
class WLFatFSGen(unittest.TestCase):
|
||||
|
@ -5,10 +5,10 @@
|
||||
from typing import List, Optional
|
||||
|
||||
from construct import Const, Int32ul, Struct
|
||||
from fatfs_utils.exceptions import WLNotInitialized
|
||||
from fatfs_utils.utils import (FULL_BYTE, UINT32_MAX, FATDefaults, crc32, generate_4bytes_random,
|
||||
get_args_for_partition_generator)
|
||||
from fatfsgen import FATFS
|
||||
from fatfsgen_utils.exceptions import WLNotInitialized
|
||||
from fatfsgen_utils.utils import (FULL_BYTE, UINT32_MAX, FATDefaults, crc32, generate_4bytes_random,
|
||||
get_args_for_partition_generator)
|
||||
|
||||
|
||||
class WLFATFS:
|
||||
|
Loading…
x
Reference in New Issue
Block a user