# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 import os from typing import List, Optional, Tuple, Union from .entry import Entry from .exceptions import FatalError, WriteDirectoryException from .fat import FAT, Cluster from .fatfs_state import FATFSState from .long_filename_utils import (build_lfn_full_name, build_lfn_unique_entry_name_order, get_required_lfn_entries_count, split_name_to_lfn_entries, split_name_to_lfn_entry_blocks) from .utils import (MAX_EXT_SIZE, MAX_NAME_SIZE, build_lfn_short_entry_name, lfn_checksum, required_clusters_count, split_content_into_sectors, split_to_name_and_extension) class File: """ The class File provides API to write into the files. It represents file in the FS. """ ATTR_ARCHIVE: int = 0x20 ENTITY_TYPE: int = ATTR_ARCHIVE def __init__(self, name: str, fat: FAT, fatfs_state: FATFSState, entry: Entry, extension: str = '') -> None: self.name: str = name self.extension: str = extension self.fatfs_state: FATFSState = fatfs_state self.fat: FAT = fat self.size: int = 0 self._first_cluster: Optional[Cluster] = None self._entry: Entry = entry @property def entry(self) -> Entry: return self._entry @property def first_cluster(self) -> Optional[Cluster]: return self._first_cluster @first_cluster.setter def first_cluster(self, value: Cluster) -> None: self._first_cluster = value def name_equals(self, name: str, extension: str) -> bool: return self.name == name and self.extension == extension def write(self, content: bytes) -> None: self.entry.update_content_size(len(content)) # we assume that the correct amount of clusters is allocated current_cluster = self._first_cluster for content_part in split_content_into_sectors(content, self.fatfs_state.sector_size): content_as_list = content_part if current_cluster is None: raise FatalError('No free space left!') address: int = current_cluster.cluster_data_address self.fatfs_state.binary_image[address: address + len(content_part)] = content_as_list current_cluster = current_cluster.next_cluster class Directory: """ The Directory class provides API to add files and directories into the directory and to find the file according to path and write it. """ ATTR_DIRECTORY: int = 0x10 ATTR_ARCHIVE: int = 0x20 ENTITY_TYPE: int = ATTR_DIRECTORY CURRENT_DIRECTORY = '.' PARENT_DIRECTORY = '..' def __init__(self, name, fat, fatfs_state, entry=None, cluster=None, size=None, extension='', parent=None): # type: (str, FAT, FATFSState, Optional[Entry], Cluster, Optional[int], str, Directory) -> None self.name: str = name self.fatfs_state: FATFSState = fatfs_state self.extension: str = extension self.fat: FAT = fat self.size: int = size or self.fatfs_state.sector_size # if directory is root its parent is itself self.parent: Directory = parent or self self._first_cluster: Cluster = cluster # entries will be initialized after the cluster allocation self.entries: List[Entry] = [] self.entities: List[Union[File, Directory]] = [] # type: ignore self._entry = entry # currently not in use (will use later for e.g. modification time, etc.) @property def is_root(self) -> bool: return self.parent is self @property def first_cluster(self) -> Cluster: return self._first_cluster @first_cluster.setter def first_cluster(self, value: Cluster) -> None: self._first_cluster = value def name_equals(self, name: str, extension: str) -> bool: return self.name == name and self.extension == extension def create_entries(self, cluster: Cluster) -> List[Entry]: return [Entry(entry_id=i, parent_dir_entries_address=cluster.cluster_data_address, fatfs_state=self.fatfs_state) for i in range(self.size // self.fatfs_state.entry_size)] def init_directory(self) -> None: self.entries = self.create_entries(self._first_cluster) # the root directory doesn't contain link to itself nor the parent if not self.is_root: self_dir_reference: Entry = self.find_free_entry() or self.chain_directory() self_dir_reference.allocate_entry(first_cluster_id=self.first_cluster.id, entity_name=self.CURRENT_DIRECTORY, entity_extension='', entity_type=self.ENTITY_TYPE) self.first_cluster = self._first_cluster parent_dir_reference: Entry = self.find_free_entry() or self.chain_directory() parent_dir_reference.allocate_entry(first_cluster_id=self.parent.first_cluster.id, entity_name=self.PARENT_DIRECTORY, entity_extension='', entity_type=self.parent.ENTITY_TYPE) self.parent.first_cluster = self.parent.first_cluster def lookup_entity(self, object_name: str, extension: str): # type: ignore for entity in self.entities: if entity.name == object_name and entity.extension == extension: return entity return None def recursive_search(self, path_as_list, current_dir): # type: ignore name, extension = split_to_name_and_extension(path_as_list[0]) next_obj = current_dir.lookup_entity(name, extension) if next_obj is None: raise FileNotFoundError('No such file or directory!') if len(path_as_list) == 1 and next_obj.name_equals(name, extension): return next_obj return self.recursive_search(path_as_list[1:], next_obj) def find_free_entry(self) -> Optional[Entry]: for entry in self.entries: if entry.is_empty: return entry return None def _extend_directory(self) -> None: current: Cluster = self.first_cluster while current.next_cluster is not None: current = current.next_cluster new_cluster: Cluster = self.fat.find_free_cluster() current.set_in_fat(new_cluster.id) current.next_cluster = new_cluster self.entries += self.create_entries(new_cluster) def chain_directory(self) -> Entry: self._extend_directory() free_entry: Entry = self.find_free_entry() if free_entry is None: raise FatalError('No more space left!') return free_entry @staticmethod def allocate_long_name_object(free_entry, name, extension, target_dir, free_cluster, entity_type): # type: (Entry, str, str, Directory, Cluster, int) -> Tuple[Cluster, Entry, Directory] lfn_full_name: str = build_lfn_full_name(name, extension) lfn_unique_entry_order: int = build_lfn_unique_entry_name_order(target_dir.entities, name) lfn_short_entry_name: str = build_lfn_short_entry_name(name, extension, lfn_unique_entry_order) checksum: int = lfn_checksum(lfn_short_entry_name) entries_count: int = get_required_lfn_entries_count(lfn_full_name) # entries in long file name entries chain starts with the last entry split_names_reversed = reversed(list(enumerate(split_name_to_lfn_entries(lfn_full_name, entries_count)))) for i, name_split_to_entry in split_names_reversed: order: int = i + 1 lfn_names: List[bytes] = list( map(lambda x: x.lower(), split_name_to_lfn_entry_blocks(name_split_to_entry))) # type: ignore free_entry.allocate_entry(first_cluster_id=free_cluster.id, entity_name=name, entity_extension=extension, entity_type=entity_type, lfn_order=order, lfn_names=lfn_names, lfn_checksum_=checksum, lfn_is_last=order == entries_count) free_entry = target_dir.find_free_entry() or target_dir.chain_directory() free_entry.allocate_entry(first_cluster_id=free_cluster.id, entity_name=lfn_short_entry_name[:MAX_NAME_SIZE], entity_extension=lfn_short_entry_name[MAX_NAME_SIZE:], entity_type=entity_type, lfn_order=Entry.SHORT_ENTRY_LN) return free_cluster, free_entry, target_dir def allocate_object(self, name, entity_type, path_from_root=None, extension=''): # type: (str, int, Optional[List[str]], str) -> Tuple[Cluster, Entry, Directory] """ Method finds the target directory in the path and allocates cluster (both the record in FAT and cluster in the data region) and entry in the specified directory """ free_cluster: Cluster = self.fat.find_free_cluster() target_dir: Directory = self if not path_from_root else self.recursive_search(path_from_root, self) free_entry: Entry = target_dir.find_free_entry() or target_dir.chain_directory() name_fits_short_struct: bool = len(name) <= MAX_NAME_SIZE and len(extension) <= MAX_EXT_SIZE if not self.fatfs_state.long_names_enabled or name_fits_short_struct: free_entry.allocate_entry(first_cluster_id=free_cluster.id, entity_name=name, entity_extension=extension, entity_type=entity_type) return free_cluster, free_entry, target_dir return self.allocate_long_name_object(free_entry=free_entry, name=name, extension=extension, target_dir=target_dir, free_cluster=free_cluster, entity_type=entity_type) def new_file(self, name: str, extension: str, path_from_root: Optional[List[str]]) -> None: free_cluster, free_entry, target_dir = self.allocate_object(name=name, extension=extension, entity_type=Directory.ATTR_ARCHIVE, path_from_root=path_from_root) file: File = File(name=name, fat=self.fat, extension=extension, fatfs_state=self.fatfs_state, entry=free_entry) file.first_cluster = free_cluster target_dir.entities.append(file) def new_directory(self, name, parent, path_from_root): # type: (str, Directory, Optional[List[str]]) -> None free_cluster, free_entry, target_dir = self.allocate_object(name=name, entity_type=Directory.ATTR_DIRECTORY, path_from_root=path_from_root) directory: Directory = Directory(name=name, fat=self.fat, parent=parent, fatfs_state=self.fatfs_state, entry=free_entry) directory.first_cluster = free_cluster directory.init_directory() target_dir.entities.append(directory) def write_to_file(self, path: List[str], content: bytes) -> None: """ Writes to file existing in the directory structure. :param path: path split into the list :param content: content as a string to be written into a file :returns: None :raises WriteDirectoryException: raised is the target object for writing is a directory """ entity_to_write: Entry = self.recursive_search(path, self) if isinstance(entity_to_write, File): clusters_cnt: int = required_clusters_count(cluster_size=self.fatfs_state.sector_size, content=content) self.fat.allocate_chain(entity_to_write.first_cluster, clusters_cnt) entity_to_write.write(content) else: raise WriteDirectoryException(f'`{os.path.join(*path)}` is a directory!')