Added several tests to verify the implementation of the vvfat driver.
We needed a way to interact with it, so created a basic `fat16.py` driver that handled writing correct sectors for us.
Added `vvfat` to the non-generic formats, as its not a normal image format.
Signed-off-by: Amjad Alsharafi <amjadsharafi10@gmail.com>
---
tests/qemu-iotests/check | 2 +-
tests/qemu-iotests/fat16.py | 635 +++++++++++++++++++++++++++++
tests/qemu-iotests/testenv.py | 2 +-
tests/qemu-iotests/tests/vvfat | 440 ++++++++++++++++++++
tests/qemu-iotests/tests/vvfat.out | 5 +
5 files changed, 1082 insertions(+), 2 deletions(-)
create mode 100644 tests/qemu-iotests/fat16.py
create mode 100755 tests/qemu-iotests/tests/vvfat
create mode 100755 tests/qemu-iotests/tests/vvfat.out
diff --git a/tests/qemu-iotests/check b/tests/qemu-iotests/check
index 56d88ca423..545f9ec7bd 100755
--- a/tests/qemu-iotests/check
+++ b/tests/qemu-iotests/check
@@ -84,7 +84,7 @@ def make_argparser() -> argparse.ArgumentParser:
p.set_defaults(imgfmt='raw', imgproto='file')
format_list = ['raw', 'bochs', 'cloop', 'parallels', 'qcow', 'qcow2',
- 'qed', 'vdi', 'vpc', 'vhdx', 'vmdk', 'luks', 'dmg']
+ 'qed', 'vdi', 'vpc', 'vhdx', 'vmdk', 'luks', 'dmg', 'vvfat']
g_fmt = p.add_argument_group(
' image format options',
'The following options set the IMGFMT environment variable. '
diff --git a/tests/qemu-iotests/fat16.py b/tests/qemu-iotests/fat16.py
new file mode 100644
index 0000000000..baf801b4d5
--- /dev/null
+++ b/tests/qemu-iotests/fat16.py
@@ -0,0 +1,635 @@
+# A simple FAT16 driver that is used to test the `vvfat` driver in QEMU.
+#
+# Copyright (C) 2024 Amjad Alsharafi <amjadsharafi10@gmail.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from typing import List
+import string
+
+SECTOR_SIZE = 512
+DIRENTRY_SIZE = 32
+ALLOWED_FILE_CHARS = \
+ set("!#$%&'()-@^_`{}~" + string.digits + string.ascii_uppercase)
+
+
+class MBR:
+ def __init__(self, data: bytes):
+ assert len(data) == 512
+ self.partition_table = []
+ for i in range(4):
+ partition = data[446 + i * 16 : 446 + (i + 1) * 16]
+ self.partition_table.append(
+ {
+ "status": partition[0],
+ "start_head": partition[1],
+ "start_sector": partition[2] & 0x3F,
+ "start_cylinder":
+ ((partition[2] & 0xC0) << 2) | partition[3],
+ "type": partition[4],
+ "end_head": partition[5],
+ "end_sector": partition[6] & 0x3F,
+ "end_cylinder":
+ ((partition[6] & 0xC0) << 2) | partition[7],
+ "start_lba": int.from_bytes(partition[8:12], "little"),
+ "size": int.from_bytes(partition[12:16], "little"),
+ }
+ )
+
+ def __str__(self):
+ return "\n".join(
+ [f"{i}: {partition}"
+ for i, partition in enumerate(self.partition_table)]
+ )
+
+
+class FatBootSector:
+ def __init__(self, data: bytes):
+ assert len(data) == 512
+ self.bytes_per_sector = int.from_bytes(data[11:13], "little")
+ self.sectors_per_cluster = data[13]
+ self.reserved_sectors = int.from_bytes(data[14:16], "little")
+ self.fat_count = data[16]
+ self.root_entries = int.from_bytes(data[17:19], "little")
+ self.media_descriptor = data[21]
+ self.fat_size = int.from_bytes(data[22:24], "little")
+ self.sectors_per_fat = int.from_bytes(data[22:24], "little")
+ self.sectors_per_track = int.from_bytes(data[24:26], "little")
+ self.heads = int.from_bytes(data[26:28], "little")
+ self.hidden_sectors = int.from_bytes(data[28:32], "little")
+ self.total_sectors = int.from_bytes(data[32:36], "little")
+ self.drive_number = data[36]
+ self.volume_id = int.from_bytes(data[39:43], "little")
+ self.volume_label = data[43:54].decode("ascii").strip()
+ self.fs_type = data[54:62].decode("ascii").strip()
+
+ def root_dir_start(self):
+ """
+ Calculate the start sector of the root directory.
+ """
+ return self.reserved_sectors + self.fat_count * self.sectors_per_fat
+
+ def root_dir_size(self):
+ """
+ Calculate the size of the root directory in sectors.
+ """
+ return (
+ self.root_entries * DIRENTRY_SIZE + self.bytes_per_sector - 1
+ ) // self.bytes_per_sector
+
+ def data_sector_start(self):
+ """
+ Calculate the start sector of the data region.
+ """
+ return self.root_dir_start() + self.root_dir_size()
+
+ def first_sector_of_cluster(self, cluster: int):
+ """
+ Calculate the first sector of the given cluster.
+ """
+ return self.data_sector_start() \
+ + (cluster - 2) * self.sectors_per_cluster
+
+ def cluster_bytes(self):
+ """
+ Calculate the number of bytes in a cluster.
+ """
+ return self.bytes_per_sector * self.sectors_per_cluster
+
+ def __str__(self):
+ return (
+ f"Bytes per sector: {self.bytes_per_sector}\n"
+ f"Sectors per cluster: {self.sectors_per_cluster}\n"
+ f"Reserved sectors: {self.reserved_sectors}\n"
+ f"FAT count: {self.fat_count}\n"
+ f"Root entries: {self.root_entries}\n"
+ f"Total sectors: {self.total_sectors}\n"
+ f"Media descriptor: {self.media_descriptor}\n"
+ f"Sectors per FAT: {self.sectors_per_fat}\n"
+ f"Sectors per track: {self.sectors_per_track}\n"
+ f"Heads: {self.heads}\n"
+ f"Hidden sectors: {self.hidden_sectors}\n"
+ f"Drive number: {self.drive_number}\n"
+ f"Volume ID: {self.volume_id}\n"
+ f"Volume label: {self.volume_label}\n"
+ f"FS type: {self.fs_type}\n"
+ )
+
+
+class FatDirectoryEntry:
+ def __init__(self, data: bytes, sector: int, offset: int):
+ self.name = data[0:8].decode("ascii").strip()
+ self.ext = data[8:11].decode("ascii").strip()
+ self.attributes = data[11]
+ self.reserved = data[12]
+ self.create_time_tenth = data[13]
+ self.create_time = int.from_bytes(data[14:16], "little")
+ self.create_date = int.from_bytes(data[16:18], "little")
+ self.last_access_date = int.from_bytes(data[18:20], "little")
+ high_cluster = int.from_bytes(data[20:22], "little")
+ self.last_mod_time = int.from_bytes(data[22:24], "little")
+ self.last_mod_date = int.from_bytes(data[24:26], "little")
+ low_cluster = int.from_bytes(data[26:28], "little")
+ self.cluster = (high_cluster << 16) | low_cluster
+ self.size_bytes = int.from_bytes(data[28:32], "little")
+
+ # extra (to help write back to disk)
+ self.sector = sector
+ self.offset = offset
+
+ def as_bytes(self) -> bytes:
+ return (
+ self.name.ljust(8, " ").encode("ascii")
+ + self.ext.ljust(3, " ").encode("ascii")
+ + self.attributes.to_bytes(1, "little")
+ + self.reserved.to_bytes(1, "little")
+ + self.create_time_tenth.to_bytes(1, "little")
+ + self.create_time.to_bytes(2, "little")
+ + self.create_date.to_bytes(2, "little")
+ + self.last_access_date.to_bytes(2, "little")
+ + (self.cluster >> 16).to_bytes(2, "little")
+ + self.last_mod_time.to_bytes(2, "little")
+ + self.last_mod_date.to_bytes(2, "little")
+ + (self.cluster & 0xFFFF).to_bytes(2, "little")
+ + self.size_bytes.to_bytes(4, "little")
+ )
+
+ def whole_name(self):
+ if self.ext:
+ return f"{self.name}.{self.ext}"
+ else:
+ return self.name
+
+ def __str__(self):
+ return (
+ f"Name: {self.name}\n"
+ f"Ext: {self.ext}\n"
+ f"Attributes: {self.attributes}\n"
+ f"Reserved: {self.reserved}\n"
+ f"Create time tenth: {self.create_time_tenth}\n"
+ f"Create time: {self.create_time}\n"
+ f"Create date: {self.create_date}\n"
+ f"Last access date: {self.last_access_date}\n"
+ f"Last mod time: {self.last_mod_time}\n"
+ f"Last mod date: {self.last_mod_date}\n"
+ f"Cluster: {self.cluster}\n"
+ f"Size: {self.size_bytes}\n"
+ )
+
+ def __repr__(self):
+ # convert to dict
+ return str(vars(self))
+
+
+class Fat16:
+ def __init__(
+ self,
+ start_sector: int,
+ size: int,
+ sector_reader: callable,
+ sector_writer: callable,
+ ):
+ self.start_sector = start_sector
+ self.size_in_sectors = size
+ self.sector_reader = sector_reader
+ self.sector_writer = sector_writer
+
+ self.boot_sector = FatBootSector(self.sector_reader(start_sector))
+
+ fat_size_in_sectors = \
+ self.boot_sector.fat_size * self.boot_sector.fat_count
+ self.fats = self.read_sectors(
+ self.boot_sector.reserved_sectors, fat_size_in_sectors
+ )
+ self.fats_dirty_sectors = set()
+
+ def read_sectors(self, start_sector: int, num_sectors: int) -> bytes:
+ return self.sector_reader(start_sector + self.start_sector, num_sectors)
+
+ def write_sectors(self, start_sector: int, data: bytes):
+ return self.sector_writer(start_sector + self.start_sector, data)
+
+ def directory_from_bytes(
+ self, data: bytes, start_sector: int
+ ) -> List[FatDirectoryEntry]:
+ """
+ Convert `bytes` into a list of `FatDirectoryEntry` objects.
+ Will ignore long file names.
+ Will stop when it encounters a 0x00 byte.
+ """
+
+ entries = []
+ for i in range(0, len(data), DIRENTRY_SIZE):
+ entry = data[i : i + DIRENTRY_SIZE]
+
+ current_sector = start_sector + (i // SECTOR_SIZE)
+ current_offset = i % SECTOR_SIZE
+
+ if entry[0] == 0:
+ break
+ elif entry[0] == 0xE5:
+ # Deleted file
+ continue
+
+ if entry[11] & 0xF == 0xF:
+ # Long file name
+ continue
+
+ entries.append(
+ FatDirectoryEntry(entry, current_sector, current_offset))
+ return entries
+
+ def read_root_directory(self) -> List[FatDirectoryEntry]:
+ root_dir = self.read_sectors(
+ self.boot_sector.root_dir_start(), self.boot_sector.root_dir_size()
+ )
+ return self.directory_from_bytes(root_dir,
+ self.boot_sector.root_dir_start())
+
+ def read_fat_entry(self, cluster: int) -> int:
+ """
+ Read the FAT entry for the given cluster.
+ """
+ fat_offset = cluster * 2 # FAT16
+ return int.from_bytes(self.fats[fat_offset : fat_offset + 2], "little")
+
+ def write_fat_entry(self, cluster: int, value: int):
+ """
+ Write the FAT entry for the given cluster.
+ """
+ fat_offset = cluster * 2
+ self.fats = (
+ self.fats[:fat_offset]
+ + value.to_bytes(2, "little")
+ + self.fats[fat_offset + 2 :]
+ )
+ self.fats_dirty_sectors.add(fat_offset // SECTOR_SIZE)
+
+ def flush_fats(self):
+ """
+ Write the FATs back to the disk.
+ """
+ for sector in self.fats_dirty_sectors:
+ data = self.fats[sector * SECTOR_SIZE : (sector + 1) * SECTOR_SIZE]
+ sector = self.boot_sector.reserved_sectors + sector
+ self.write_sectors(sector, data)
+ self.fats_dirty_sectors = set()
+
+ def next_cluster(self, cluster: int) -> int | None:
+ """
+ Get the next cluster in the chain.
+ If its `None`, then its the last cluster.
+ The function will crash if the next cluster
+ is `FREE` (unexpected) or invalid entry.
+ """
+ fat_entry = self.read_fat_entry(cluster)
+ if fat_entry == 0:
+ raise Exception("Unexpected: FREE cluster")
+ elif fat_entry == 1:
+ raise Exception("Unexpected: RESERVED cluster")
+ elif fat_entry >= 0xFFF8:
+ return None
+ elif fat_entry >= 0xFFF7:
+ raise Exception("Invalid FAT entry")
+ else:
+ return fat_entry
+
+ def next_free_cluster(self) -> int:
+ """
+ Find the next free cluster.
+ """
+ # simple linear search
+ for i in range(2, 0xFFFF):
+ if self.read_fat_entry(i) == 0:
+ return i
+ raise Exception("No free clusters")
+
+ def read_cluster(self, cluster: int) -> bytes:
+ """
+ Read the cluster at the given cluster.
+ """
+ return self.read_sectors(
+ self.boot_sector.first_sector_of_cluster(cluster),
+ self.boot_sector.sectors_per_cluster,
+ )
+
+ def write_cluster(self, cluster: int, data: bytes):
+ """
+ Write the cluster at the given cluster.
+ """
+ assert len(data) == self.boot_sector.cluster_bytes()
+ return self.write_sectors(
+ self.boot_sector.first_sector_of_cluster(cluster),
+ data,
+ )
+
+ def read_directory(self, cluster: int) -> List[FatDirectoryEntry]:
+ """
+ Read the directory at the given cluster.
+ """
+ entries = []
+ while cluster is not None:
+ data = self.read_cluster(cluster)
+ entries.extend(
+ self.directory_from_bytes(
+ data, self.boot_sector.first_sector_of_cluster(cluster)
+ )
+ )
+ cluster = self.next_cluster(cluster)
+ return entries
+
+ def add_direntry(self,
+ cluster: int | None,
+ name: str, ext: str,
+ attributes: int):
+ """
+ Add a new directory entry to the given cluster.
+ If the cluster is `None`, then it will be added to the root directory.
+ """
+
+ def find_free_entry(data: bytes):
+ for i in range(0, len(data), DIRENTRY_SIZE):
+ entry = data[i : i + DIRENTRY_SIZE]
+ if entry[0] == 0 or entry[0] == 0xE5:
+ return i
+ return None
+
+ assert len(name) <= 8, "Name must be 8 characters or less"
+ assert len(ext) <= 3, "Ext must be 3 characters or less"
+ assert attributes % 0x15 != 0x15, "Invalid attributes"
+
+ # initial dummy data
+ new_entry = FatDirectoryEntry(b"\0" * 32, 0, 0)
+ new_entry.name = name.ljust(8, " ")
+ new_entry.ext = ext.ljust(3, " ")
+ new_entry.attributes = attributes
+ new_entry.reserved = 0
+ new_entry.create_time_tenth = 0
+ new_entry.create_time = 0
+ new_entry.create_date = 0
+ new_entry.last_access_date = 0
+ new_entry.last_mod_time = 0
+ new_entry.last_mod_date = 0
+ new_entry.cluster = self.next_free_cluster()
+ new_entry.size_bytes = 0
+
+ # mark as EOF
+ self.write_fat_entry(new_entry.cluster, 0xFFFF)
+
+ if cluster is None:
+ for i in range(self.boot_sector.root_dir_size()):
+ sector_data = self.read_sectors(
+ self.boot_sector.root_dir_start() + i, 1
+ )
+ offset = find_free_entry(sector_data)
+ if offset is not None:
+ new_entry.sector = self.boot_sector.root_dir_start() + i
+ new_entry.offset = offset
+ self.update_direntry(new_entry)
+ return new_entry
+ else:
+ while cluster is not None:
+ data = self.read_cluster(cluster)
+ offset = find_free_entry(data)
+ if offset is not None:
+ new_entry.sector = self.boot_sector.first_sector_of_cluster(
+ cluster
+ ) + (offset // SECTOR_SIZE)
+ new_entry.offset = offset % SECTOR_SIZE
+ self.update_direntry(new_entry)
+ return new_entry
+ cluster = self.next_cluster(cluster)
+
+ raise Exception("No free directory entries")
+
+ def update_direntry(self, entry: FatDirectoryEntry):
+ """
+ Write the directory entry back to the disk.
+ """
+ sector = self.read_sectors(entry.sector, 1)
+ sector = (
+ sector[: entry.offset]
+ + entry.as_bytes()
+ + sector[entry.offset + DIRENTRY_SIZE :]
+ )
+ self.write_sectors(entry.sector, sector)
+
+ def find_direntry(self, path: str) -> FatDirectoryEntry | None:
+ """
+ Find the directory entry for the given path.
+ """
+ assert path[0] == "/", "Path must start with /"
+
+ path = path[1:] # remove the leading /
+ parts = path.split("/")
+ directory = self.read_root_directory()
+
+ current_entry = None
+
+ for i, part in enumerate(parts):
+ is_last = i == len(parts) - 1
+
+ for entry in directory:
+ if entry.whole_name() == part:
+ current_entry = entry
+ break
+ if current_entry is None:
+ return None
+
+ if is_last:
+ return current_entry
+ else:
+ if current_entry.attributes & 0x10 == 0:
+ raise Exception(
+ f"{current_entry.whole_name()} is not a directory")
+ else:
+ directory = self.read_directory(current_entry.cluster)
+
+ def read_file(self, entry: FatDirectoryEntry) -> bytes:
+ """
+ Read the content of the file at the given path.
+ """
+ if entry is None:
+ return None
+ if entry.attributes & 0x10 != 0:
+ raise Exception(f"{entry.whole_name()} is a directory")
+
+ data = b""
+ cluster = entry.cluster
+ while cluster is not None and len(data) <= entry.size_bytes:
+ data += self.read_cluster(cluster)
+ cluster = self.next_cluster(cluster)
+ return data[: entry.size_bytes]
+
+ def truncate_file(self, entry: FatDirectoryEntry, new_size: int):
+ """
+ Truncate the file at the given path to the new size.
+ """
+ if entry is None:
+ return Exception("entry is None")
+ if entry.attributes & 0x10 != 0:
+ raise Exception(f"{entry.whole_name()} is a directory")
+
+ def clusters_from_size(size: int):
+ return (
+ size + self.boot_sector.cluster_bytes() - 1
+ ) // self.boot_sector.cluster_bytes()
+
+ # First, allocate new FATs if we need to
+ required_clusters = clusters_from_size(new_size)
+ current_clusters = clusters_from_size(entry.size_bytes)
+
+ affected_clusters = set()
+
+ # Keep at least one cluster, easier to manage this way
+ if required_clusters == 0:
+ required_clusters = 1
+ if current_clusters == 0:
+ current_clusters = 1
+
+ if required_clusters > current_clusters:
+ # Allocate new clusters
+ cluster = entry.cluster
+ to_add = required_clusters
+ for _ in range(current_clusters - 1):
+ to_add -= 1
+ cluster = self.next_cluster(cluster)
+ assert required_clusters > 0, "No new clusters to allocate"
+ assert cluster is not None, "Cluster is None"
+ assert self.next_cluster(cluster) is None, \
+ "Cluster is not the last cluster"
+
+ # Allocate new clusters
+ for _ in range(to_add - 1):
+ new_cluster = self.next_free_cluster()
+ self.write_fat_entry(cluster, new_cluster)
+ self.write_fat_entry(new_cluster, 0xFFFF)
+ cluster = new_cluster
+
+ elif required_clusters < current_clusters:
+ # Truncate the file
+ cluster = entry.cluster
+ for _ in range(required_clusters - 1):
+ cluster = self.next_cluster(cluster)
+ assert cluster is not None, "Cluster is None"
+
+ next_cluster = self.next_cluster(cluster)
+ # mark last as EOF
+ self.write_fat_entry(cluster, 0xFFFF)
+ # free the rest
+ while next_cluster is not None:
+ cluster = next_cluster
+ next_cluster = self.next_cluster(next_cluster)
+ self.write_fat_entry(cluster, 0)
+
+ self.flush_fats()
+
+ # verify number of clusters
+ cluster = entry.cluster
+ count = 0
+ while cluster is not None:
+ count += 1
+ affected_clusters.add(cluster)
+ cluster = self.next_cluster(cluster)
+ assert (
+ count == required_clusters
+ ), f"Expected {required_clusters} clusters, got {count}"
+
+ # update the size
+ entry.size_bytes = new_size
+ self.update_direntry(entry)
+
+ # trigger every affected cluster
+ for cluster in affected_clusters:
+ first_sector = self.boot_sector.first_sector_of_cluster(cluster)
+ first_sector_data = self.read_sectors(first_sector, 1)
+ self.write_sectors(first_sector, first_sector_data)
+
+ def write_file(self, entry: FatDirectoryEntry, data: bytes):
+ """
+ Write the content of the file at the given path.
+ """
+ if entry is None:
+ return Exception("entry is None")
+ if entry.attributes & 0x10 != 0:
+ raise Exception(f"{entry.whole_name()} is a directory")
+
+ data_len = len(data)
+
+ self.truncate_file(entry, data_len)
+
+ cluster = entry.cluster
+ while cluster is not None:
+ data_to_write = data[: self.boot_sector.cluster_bytes()]
+ last_data = False
+ if len(data_to_write) < self.boot_sector.cluster_bytes():
+ last_data = True
+ old_data = self.read_cluster(cluster)
+ data_to_write += old_data[len(data_to_write) :]
+
+ self.write_cluster(cluster, data_to_write)
+ data = data[self.boot_sector.cluster_bytes() :]
+ if len(data) == 0:
+ break
+ cluster = self.next_cluster(cluster)
+
+ assert len(data) == 0, \
+ "Data was not written completely, clusters missing"
+
+ def create_file(self, path: str):
+ """
+ Create a new file at the given path.
+ """
+ assert path[0] == "/", "Path must start with /"
+
+ path = path[1:] # remove the leading /
+
+ parts = path.split("/")
+
+ directory_cluster = None
+ directory = self.read_root_directory()
+
+ parts, filename = parts[:-1], parts[-1]
+
+ for i, part in enumerate(parts):
+ current_entry = None
+ for entry in directory:
+ if entry.whole_name() == part:
+ current_entry = entry
+ break
+ if current_entry is None:
+ return None
+
+ if current_entry.attributes & 0x10 == 0:
+ raise Exception(
+ f"{current_entry.whole_name()} is not a directory")
+ else:
+ directory = self.read_directory(current_entry.cluster)
+ directory_cluster = current_entry.cluster
+
+ # add new entry to the directory
+
+ filename, ext = filename.split(".")
+
+ if len(ext) > 3:
+ raise Exception("Ext must be 3 characters or less")
+ if len(filename) > 8:
+ raise Exception("Name must be 8 characters or less")
+
+ for c in filename + ext:
+
+ if c not in ALLOWED_FILE_CHARS:
+ raise Exception("Invalid character in filename")
+
+ return self.add_direntry(directory_cluster, filename, ext, 0)
diff --git a/tests/qemu-iotests/testenv.py b/tests/qemu-iotests/testenv.py
index 588f30a4f1..4053d29de4 100644
--- a/tests/qemu-iotests/testenv.py
+++ b/tests/qemu-iotests/testenv.py
@@ -250,7 +250,7 @@ def __init__(self, source_dir: str, build_dir: str,
self.qemu_img_options = os.getenv('QEMU_IMG_OPTIONS')
self.qemu_nbd_options = os.getenv('QEMU_NBD_OPTIONS')
- is_generic = self.imgfmt not in ['bochs', 'cloop', 'dmg']
+ is_generic = self.imgfmt not in ['bochs', 'cloop', 'dmg', 'vvfat']
self.imgfmt_generic = 'true' if is_generic else 'false'
self.qemu_io_options = f'--cache {self.cachemode} --aio {self.aiomode}'
diff --git a/tests/qemu-iotests/tests/vvfat b/tests/qemu-iotests/tests/vvfat
new file mode 100755
index 0000000000..113d7d3270
--- /dev/null
+++ b/tests/qemu-iotests/tests/vvfat
@@ -0,0 +1,440 @@
+#!/usr/bin/env python3
+# group: rw vvfat
+#
+# Test vvfat driver implementation
+# Here, we use a simple FAT16 implementation and check the behavior of the vvfat driver.
+#
+# Copyright (C) 2024 Amjad Alsharafi <amjadsharafi10@gmail.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os, shutil
+import iotests
+from iotests import imgfmt, QMPTestCase
+from fat16 import MBR, Fat16, DIRENTRY_SIZE
+
+filesystem = os.path.join(iotests.test_dir, "filesystem")
+
+nbd_sock = iotests.file_path("nbd.sock", base_dir=iotests.sock_dir)
+nbd_uri = "nbd+unix:///disk?socket=" + nbd_sock
+
+SECTOR_SIZE = 512
+
+
+class TestVVFatDriver(QMPTestCase):
+ def setUp(self) -> None:
+ if os.path.exists(filesystem):
+ if os.path.isdir(filesystem):
+ shutil.rmtree(filesystem)
+ else:
+ print(f"Error: {filesystem} exists and is not a directory")
+ exit(1)
+ os.mkdir(filesystem)
+
+ # Add some text files to the filesystem
+ for i in range(10):
+ with open(os.path.join(filesystem, f"file{i}.txt"), "w") as f:
+ f.write(f"Hello, world! {i}\n")
+
+ # Add 2 large files, above the cluster size (8KB)
+ with open(os.path.join(filesystem, "large1.txt"), "wb") as f:
+ # write 'A' * 1KB, 'B' * 1KB, 'C' * 1KB, ...
+ for i in range(8 * 2): # two clusters
+ f.write(bytes([0x41 + i] * 1024))
+
+ with open(os.path.join(filesystem, "large2.txt"), "wb") as f:
+ # write 'A' * 1KB, 'B' * 1KB, 'C' * 1KB, ...
+ for i in range(8 * 3): # 3 clusters
+ f.write(bytes([0x41 + i] * 1024))
+
+ self.vm = iotests.VM()
+
+ self.vm.add_blockdev(
+ self.vm.qmp_to_opts(
+ {
+ "driver": imgfmt,
+ "node-name": "disk",
+ "rw": "true",
+ "fat-type": "16",
+ "dir": filesystem,
+ }
+ )
+ )
+
+ self.vm.launch()
+
+ self.vm.qmp_log("block-dirty-bitmap-add", **{"node": "disk", "name": "bitmap0"})
+
+ # attach nbd server
+ self.vm.qmp_log(
+ "nbd-server-start",
+ **{"addr": {"type": "unix", "data": {"path": nbd_sock}}},
+ filters=[],
+ )
+
+ self.vm.qmp_log(
+ "nbd-server-add",
+ **{"device": "disk", "writable": True, "bitmap": "bitmap0"},
+ )
+
+ self.qio = iotests.QemuIoInteractive("-f", "raw", nbd_uri)
+
+ def tearDown(self) -> None:
+ self.qio.close()
+ self.vm.shutdown()
+ # print(self.vm.get_log())
+ shutil.rmtree(filesystem)
+
+ def read_sectors(self, sector: int, num: int = 1) -> bytes:
+ """
+ Read `num` sectors starting from `sector` from the `disk`.
+ This uses `QemuIoInteractive` to read the sectors into `stdout` and then parse the output.
+ """
+ self.assertGreater(num, 0)
+ # The output contains the content of the sector in hex dump format
+ # We need to extract the content from it
+ output = self.qio.cmd(f"read -v {sector * SECTOR_SIZE} {num * SECTOR_SIZE}")
+ # Each row is 16 bytes long, and we are writing `num` sectors
+ rows = num * SECTOR_SIZE // 16
+ output_rows = output.split("\n")[:rows]
+
+ hex_content = "".join(
+ [(row.split(": ")[1]).split(" ")[0] for row in output_rows]
+ )
+ bytes_content = bytes.fromhex(hex_content)
+
+ self.assertEqual(len(bytes_content), num * SECTOR_SIZE)
+
+ return bytes_content
+
+ def write_sectors(self, sector: int, data: bytes):
+ """
+ Write `data` to the `disk` starting from `sector`.
+ This uses `QemuIoInteractive` to write the data into the disk.
+ """
+
+ self.assertGreater(len(data), 0)
+ self.assertEqual(len(data) % SECTOR_SIZE, 0)
+
+ temp_file = os.path.join(iotests.test_dir, "temp.bin")
+ with open(temp_file, "wb") as f:
+ f.write(data)
+
+ self.qio.cmd(f"write -s {temp_file} {sector * SECTOR_SIZE} {len(data)}")
+
+ os.remove(temp_file)
+
+ def init_fat16(self):
+ mbr = MBR(self.read_sectors(0))
+ return Fat16(
+ mbr.partition_table[0]["start_lba"],
+ mbr.partition_table[0]["size"],
+ self.read_sectors,
+ self.write_sectors,
+ )
+
+ # Tests
+
+ def test_fat_filesystem(self):
+ """
+ Test that vvfat produce a valid FAT16 and MBR sectors
+ """
+ mbr = MBR(self.read_sectors(0))
+
+ self.assertEqual(mbr.partition_table[0]["status"], 0x80)
+ self.assertEqual(mbr.partition_table[0]["type"], 6)
+
+ fat16 = Fat16(
+ mbr.partition_table[0]["start_lba"],
+ mbr.partition_table[0]["size"],
+ self.read_sectors,
+ self.write_sectors,
+ )
+ self.assertEqual(fat16.boot_sector.bytes_per_sector, 512)
+ self.assertEqual(fat16.boot_sector.volume_label, "QEMU VVFAT")
+
+ def test_read_root_directory(self):
+ """
+ Test the content of the root directory
+ """
+ fat16 = self.init_fat16()
+
+ root_dir = fat16.read_root_directory()
+
+ self.assertEqual(len(root_dir), 13) # 12 + 1 special file
+
+ files = {
+ "QEMU VVF.AT": 0, # special empty file
+ "FILE0.TXT": 16,
+ "FILE1.TXT": 16,
+ "FILE2.TXT": 16,
+ "FILE3.TXT": 16,
+ "FILE4.TXT": 16,
+ "FILE5.TXT": 16,
+ "FILE6.TXT": 16,
+ "FILE7.TXT": 16,
+ "FILE8.TXT": 16,
+ "FILE9.TXT": 16,
+ "LARGE1.TXT": 0x2000 * 2,
+ "LARGE2.TXT": 0x2000 * 3,
+ }
+
+ for entry in root_dir:
+ self.assertIn(entry.whole_name(), files)
+ self.assertEqual(entry.size_bytes, files[entry.whole_name()])
+
+ def test_direntry_as_bytes(self):
+ """
+ Test if we can convert Direntry back to bytes, so that we can write it back to the disk safely.
+ """
+ fat16 = self.init_fat16()
+
+ root_dir = fat16.read_root_directory()
+ first_entry_bytes = fat16.read_sectors(fat16.boot_sector.root_dir_start(), 1)
+ # The first entry won't be deleted, so we can compare it with the first entry in the root directory
+ self.assertEqual(root_dir[0].as_bytes(), first_entry_bytes[:DIRENTRY_SIZE])
+
+ def test_read_files(self):
+ """
+ Test reading the content of the files
+ """
+ fat16 = self.init_fat16()
+
+ for i in range(10):
+ file = fat16.find_direntry(f"/FILE{i}.TXT")
+ self.assertIsNotNone(file)
+ self.assertEqual(
+ fat16.read_file(file), f"Hello, world! {i}\n".encode("ascii")
+ )
+
+ # test large files
+ large1 = fat16.find_direntry("/LARGE1.TXT")
+ with open(os.path.join(filesystem, "large1.txt"), "rb") as f:
+ self.assertEqual(fat16.read_file(large1), f.read())
+
+ large2 = fat16.find_direntry("/LARGE2.TXT")
+ self.assertIsNotNone(large2)
+ with open(os.path.join(filesystem, "large2.txt"), "rb") as f:
+ self.assertEqual(fat16.read_file(large2), f.read())
+
+ def test_write_file_same_content_direct(self):
+ """
+ Similar to `test_write_file_in_same_content`, but we write the file directly clusters
+ and thus we don't go through the modification of direntry.
+ """
+ fat16 = self.init_fat16()
+
+ file = fat16.find_direntry("/FILE0.TXT")
+ self.assertIsNotNone(file)
+
+ data = fat16.read_cluster(file.cluster)
+ fat16.write_cluster(file.cluster, data)
+
+ with open(os.path.join(filesystem, "file0.txt"), "rb") as f:
+ self.assertEqual(fat16.read_file(file), f.read())
+
+ def test_write_file_in_same_content(self):
+ """
+ Test writing the same content to the file back to it
+ """
+ fat16 = self.init_fat16()
+
+ file = fat16.find_direntry("/FILE0.TXT")
+ self.assertIsNotNone(file)
+
+ self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n")
+
+ fat16.write_file(file, b"Hello, world! 0\n")
+
+ self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n")
+
+ with open(os.path.join(filesystem, "file0.txt"), "rb") as f:
+ self.assertEqual(f.read(), b"Hello, world! 0\n")
+
+ def test_modify_content_same_clusters(self):
+ """
+ Test modifying the content of the file without changing the number of clusters
+ """
+ fat16 = self.init_fat16()
+
+ file = fat16.find_direntry("/FILE0.TXT")
+ self.assertIsNotNone(file)
+
+ new_content = b"Hello, world! Modified\n"
+ self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n")
+
+ fat16.write_file(file, new_content)
+
+ self.assertEqual(fat16.read_file(file), new_content)
+
+ with open(os.path.join(filesystem, "file0.txt"), "rb") as f:
+ self.assertEqual(f.read(), new_content)
+
+ def test_truncate_file_same_clusters_less(self):
+ """
+ Test truncating the file without changing number of clusters
+ Test decreasing the file size
+ """
+ fat16 = self.init_fat16()
+
+ file = fat16.find_direntry("/FILE0.TXT")
+ self.assertIsNotNone(file)
+
+ self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n")
+
+ fat16.truncate_file(file, 5)
+
+ new_content = fat16.read_file(file)
+
+ self.assertEqual(new_content, b"Hello")
+
+ with open(os.path.join(filesystem, "file0.txt"), "rb") as f:
+ self.assertEqual(f.read(), new_content)
+
+ def test_truncate_file_same_clusters_more(self):
+ """
+ Test truncating the file without changing number of clusters
+ Test increase the file size
+ """
+ fat16 = self.init_fat16()
+
+ file = fat16.find_direntry("/FILE0.TXT")
+ self.assertIsNotNone(file)
+
+ self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n")
+
+ fat16.truncate_file(file, 20)
+
+ new_content = fat16.read_file(file)
+
+ # random pattern will be appended to the file, and its not always the same
+ self.assertEqual(new_content[:16], b"Hello, world! 0\n")
+ self.assertEqual(len(new_content), 20)
+
+ with open(os.path.join(filesystem, "file0.txt"), "rb") as f:
+ self.assertEqual(f.read(), new_content)
+
+ def test_write_large_file(self):
+ """
+ Test writing a large file
+ """
+ fat16 = self.init_fat16()
+
+ file = fat16.find_direntry("/LARGE1.TXT")
+ self.assertIsNotNone(file)
+
+ # The content of LARGE1 is A * 1KB, B * 1KB, C * 1KB, ..., P * 1KB
+ # Lets change it to be Z * 1KB, Y * 1KB, X * 1KB, ..., K * 1KB
+ # without changing the number of clusters or filesize
+ new_content = b"".join([bytes([0x5A - i] * 1024) for i in range(16)])
+
+ fat16.write_file(file, new_content)
+
+ with open(os.path.join(filesystem, "large1.txt"), "rb") as f:
+ self.assertEqual(f.read(), new_content)
+
+ def test_truncate_file_change_clusters_less(self):
+ """
+ Test truncating a file by reducing the number of clusters
+ """
+ fat16 = self.init_fat16()
+
+ file = fat16.find_direntry("/LARGE1.TXT")
+ self.assertIsNotNone(file)
+
+ fat16.truncate_file(file, 1)
+
+ self.assertEqual(fat16.read_file(file), b"A")
+
+ with open(os.path.join(filesystem, "large1.txt"), "rb") as f:
+ self.assertEqual(f.read(), b"A")
+
+ def test_write_file_change_clusters_less(self):
+ """
+ Test truncating a file by reducing the number of clusters
+ """
+ fat16 = self.init_fat16()
+
+ file = fat16.find_direntry("/LARGE2.TXT")
+ self.assertIsNotNone(file)
+
+ new_content = b"Hello, world! This was a large file\n"
+ new_content = b"X" * 8 * 1024 + b"Y" * 8 * 1024
+
+ fat16.write_file(file, new_content)
+
+ self.assertEqual(fat16.read_file(file), new_content)
+
+ with open(os.path.join(filesystem, "large2.txt"), "rb") as f:
+ self.assertEqual(f.read(), new_content)
+
+ def test_write_file_change_clusters_more(self):
+ """
+ Test truncating a file by increasing the number of clusters
+ """
+ fat16 = self.init_fat16()
+
+ file = fat16.find_direntry("/LARGE2.TXT")
+ self.assertIsNotNone(file)
+
+ new_content = b"X" * 8 * 1024 + b"Y" * 8 * 1024 + b"Z" * 8 * 1024
+
+ fat16.write_file(file, new_content)
+
+ with open(os.path.join(filesystem, "large2.txt"), "rb") as f:
+ self.assertEqual(f.read(), new_content)
+
+ def test_write_file_change_clusters_more_non_last_file(self):
+ """
+ Test truncating a file by increasing the number of clusters
+ This is a special variant of the above test, where we write to
+ a file so that when allocating new clusters, it won't have contiguous clusters
+ """
+ fat16 = self.init_fat16()
+
+ file = fat16.find_direntry("/LARGE1.TXT")
+ self.assertIsNotNone(file)
+
+ new_content = b"X" * 8 * 1024 + b"Y" * 8 * 1024 + b"Z" * 8 * 1024
+
+ fat16.write_file(file, new_content)
+
+ with open(os.path.join(filesystem, "large1.txt"), "rb") as f:
+ self.assertEqual(f.read(), new_content)
+
+ def test_create_file(self):
+ """
+ Test creating a new file
+ """
+ fat16 = self.init_fat16()
+
+ new_file = fat16.create_file("/NEWFILE.TXT")
+
+ self.assertIsNotNone(new_file)
+ self.assertEqual(new_file.size_bytes, 0)
+
+ new_content = b"Hello, world! New file\n"
+ fat16.write_file(new_file, new_content)
+
+ self.assertEqual(fat16.read_file(new_file), new_content)
+
+ with open(os.path.join(filesystem, "newfile.txt"), "rb") as f:
+ self.assertEqual(f.read(), new_content)
+
+ # TODO: support deleting files
+
+
+if __name__ == "__main__":
+ # This is a specific test for vvfat driver
+ iotests.main(supported_fmts=["vvfat"], supported_protocols=["file"])
diff --git a/tests/qemu-iotests/tests/vvfat.out b/tests/qemu-iotests/tests/vvfat.out
new file mode 100755
index 0000000000..96961ed0b5
--- /dev/null
+++ b/tests/qemu-iotests/tests/vvfat.out
@@ -0,0 +1,5 @@
+...............
+----------------------------------------------------------------------
+Ran 15 tests
+
+OK
--
2.45.1
Am 05.06.2024 um 02:58 hat Amjad Alsharafi geschrieben: > Added several tests to verify the implementation of the vvfat driver. > > We needed a way to interact with it, so created a basic `fat16.py` driver that handled writing correct sectors for us. > > Added `vvfat` to the non-generic formats, as its not a normal image format. > > Signed-off-by: Amjad Alsharafi <amjadsharafi10@gmail.com> > --- > tests/qemu-iotests/check | 2 +- > tests/qemu-iotests/fat16.py | 635 +++++++++++++++++++++++++++++ > tests/qemu-iotests/testenv.py | 2 +- > tests/qemu-iotests/tests/vvfat | 440 ++++++++++++++++++++ > tests/qemu-iotests/tests/vvfat.out | 5 + > 5 files changed, 1082 insertions(+), 2 deletions(-) > create mode 100644 tests/qemu-iotests/fat16.py > create mode 100755 tests/qemu-iotests/tests/vvfat > create mode 100755 tests/qemu-iotests/tests/vvfat.out > > diff --git a/tests/qemu-iotests/check b/tests/qemu-iotests/check > index 56d88ca423..545f9ec7bd 100755 > --- a/tests/qemu-iotests/check > +++ b/tests/qemu-iotests/check > @@ -84,7 +84,7 @@ def make_argparser() -> argparse.ArgumentParser: > p.set_defaults(imgfmt='raw', imgproto='file') > > format_list = ['raw', 'bochs', 'cloop', 'parallels', 'qcow', 'qcow2', > - 'qed', 'vdi', 'vpc', 'vhdx', 'vmdk', 'luks', 'dmg'] > + 'qed', 'vdi', 'vpc', 'vhdx', 'vmdk', 'luks', 'dmg', 'vvfat'] > g_fmt = p.add_argument_group( > ' image format options', > 'The following options set the IMGFMT environment variable. ' > diff --git a/tests/qemu-iotests/fat16.py b/tests/qemu-iotests/fat16.py > new file mode 100644 > index 0000000000..baf801b4d5 > --- /dev/null > +++ b/tests/qemu-iotests/fat16.py > @@ -0,0 +1,635 @@ > +# A simple FAT16 driver that is used to test the `vvfat` driver in QEMU. > +# > +# Copyright (C) 2024 Amjad Alsharafi <amjadsharafi10@gmail.com> > +# > +# This program is free software; you can redistribute it and/or modify > +# it under the terms of the GNU General Public License as published by > +# the Free Software Foundation; either version 2 of the License, or > +# (at your option) any later version. > +# > +# This program is distributed in the hope that it will be useful, > +# but WITHOUT ANY WARRANTY; without even the implied warranty of > +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > +# GNU General Public License for more details. > +# > +# You should have received a copy of the GNU General Public License > +# along with this program. If not, see <http://www.gnu.org/licenses/>. > + > +from typing import List > +import string > + > +SECTOR_SIZE = 512 > +DIRENTRY_SIZE = 32 > +ALLOWED_FILE_CHARS = \ > + set("!#$%&'()-@^_`{}~" + string.digits + string.ascii_uppercase) > + > + > +class MBR: > + def __init__(self, data: bytes): > + assert len(data) == 512 > + self.partition_table = [] > + for i in range(4): > + partition = data[446 + i * 16 : 446 + (i + 1) * 16] > + self.partition_table.append( > + { > + "status": partition[0], > + "start_head": partition[1], > + "start_sector": partition[2] & 0x3F, > + "start_cylinder": > + ((partition[2] & 0xC0) << 2) | partition[3], > + "type": partition[4], > + "end_head": partition[5], > + "end_sector": partition[6] & 0x3F, > + "end_cylinder": > + ((partition[6] & 0xC0) << 2) | partition[7], > + "start_lba": int.from_bytes(partition[8:12], "little"), > + "size": int.from_bytes(partition[12:16], "little"), > + } > + ) > + > + def __str__(self): > + return "\n".join( > + [f"{i}: {partition}" > + for i, partition in enumerate(self.partition_table)] > + ) > + > + > +class FatBootSector: > + def __init__(self, data: bytes): > + assert len(data) == 512 > + self.bytes_per_sector = int.from_bytes(data[11:13], "little") > + self.sectors_per_cluster = data[13] > + self.reserved_sectors = int.from_bytes(data[14:16], "little") > + self.fat_count = data[16] > + self.root_entries = int.from_bytes(data[17:19], "little") > + self.media_descriptor = data[21] > + self.fat_size = int.from_bytes(data[22:24], "little") > + self.sectors_per_fat = int.from_bytes(data[22:24], "little") Why two different attributes self.fat_size and self.sectors_per_fat that contain the same value? > + self.sectors_per_track = int.from_bytes(data[24:26], "little") > + self.heads = int.from_bytes(data[26:28], "little") > + self.hidden_sectors = int.from_bytes(data[28:32], "little") > + self.total_sectors = int.from_bytes(data[32:36], "little") This value should only be considered if the 16 bit value at byte 19 (which you don't store at all) was 0. > + self.drive_number = data[36] > + self.volume_id = int.from_bytes(data[39:43], "little") > + self.volume_label = data[43:54].decode("ascii").strip() > + self.fs_type = data[54:62].decode("ascii").strip() > + > + def root_dir_start(self): > + """ > + Calculate the start sector of the root directory. > + """ > + return self.reserved_sectors + self.fat_count * self.sectors_per_fat > + > + def root_dir_size(self): > + """ > + Calculate the size of the root directory in sectors. > + """ > + return ( > + self.root_entries * DIRENTRY_SIZE + self.bytes_per_sector - 1 > + ) // self.bytes_per_sector > + > + def data_sector_start(self): > + """ > + Calculate the start sector of the data region. > + """ > + return self.root_dir_start() + self.root_dir_size() > + > + def first_sector_of_cluster(self, cluster: int): > + """ > + Calculate the first sector of the given cluster. > + """ > + return self.data_sector_start() \ > + + (cluster - 2) * self.sectors_per_cluster > + > + def cluster_bytes(self): > + """ > + Calculate the number of bytes in a cluster. > + """ > + return self.bytes_per_sector * self.sectors_per_cluster > + > + def __str__(self): > + return ( > + f"Bytes per sector: {self.bytes_per_sector}\n" > + f"Sectors per cluster: {self.sectors_per_cluster}\n" > + f"Reserved sectors: {self.reserved_sectors}\n" > + f"FAT count: {self.fat_count}\n" > + f"Root entries: {self.root_entries}\n" > + f"Total sectors: {self.total_sectors}\n" > + f"Media descriptor: {self.media_descriptor}\n" > + f"Sectors per FAT: {self.sectors_per_fat}\n" > + f"Sectors per track: {self.sectors_per_track}\n" > + f"Heads: {self.heads}\n" > + f"Hidden sectors: {self.hidden_sectors}\n" > + f"Drive number: {self.drive_number}\n" > + f"Volume ID: {self.volume_id}\n" > + f"Volume label: {self.volume_label}\n" > + f"FS type: {self.fs_type}\n" > + ) > + > + > +class FatDirectoryEntry: > + def __init__(self, data: bytes, sector: int, offset: int): > + self.name = data[0:8].decode("ascii").strip() > + self.ext = data[8:11].decode("ascii").strip() > + self.attributes = data[11] > + self.reserved = data[12] > + self.create_time_tenth = data[13] > + self.create_time = int.from_bytes(data[14:16], "little") > + self.create_date = int.from_bytes(data[16:18], "little") > + self.last_access_date = int.from_bytes(data[18:20], "little") > + high_cluster = int.from_bytes(data[20:22], "little") > + self.last_mod_time = int.from_bytes(data[22:24], "little") > + self.last_mod_date = int.from_bytes(data[24:26], "little") > + low_cluster = int.from_bytes(data[26:28], "little") > + self.cluster = (high_cluster << 16) | low_cluster > + self.size_bytes = int.from_bytes(data[28:32], "little") > + > + # extra (to help write back to disk) > + self.sector = sector > + self.offset = offset > + > + def as_bytes(self) -> bytes: > + return ( > + self.name.ljust(8, " ").encode("ascii") > + + self.ext.ljust(3, " ").encode("ascii") > + + self.attributes.to_bytes(1, "little") > + + self.reserved.to_bytes(1, "little") > + + self.create_time_tenth.to_bytes(1, "little") > + + self.create_time.to_bytes(2, "little") > + + self.create_date.to_bytes(2, "little") > + + self.last_access_date.to_bytes(2, "little") > + + (self.cluster >> 16).to_bytes(2, "little") > + + self.last_mod_time.to_bytes(2, "little") > + + self.last_mod_date.to_bytes(2, "little") > + + (self.cluster & 0xFFFF).to_bytes(2, "little") > + + self.size_bytes.to_bytes(4, "little") > + ) > + > + def whole_name(self): > + if self.ext: > + return f"{self.name}.{self.ext}" > + else: > + return self.name > + > + def __str__(self): > + return ( > + f"Name: {self.name}\n" > + f"Ext: {self.ext}\n" > + f"Attributes: {self.attributes}\n" > + f"Reserved: {self.reserved}\n" > + f"Create time tenth: {self.create_time_tenth}\n" > + f"Create time: {self.create_time}\n" > + f"Create date: {self.create_date}\n" > + f"Last access date: {self.last_access_date}\n" > + f"Last mod time: {self.last_mod_time}\n" > + f"Last mod date: {self.last_mod_date}\n" > + f"Cluster: {self.cluster}\n" > + f"Size: {self.size_bytes}\n" > + ) > + > + def __repr__(self): > + # convert to dict > + return str(vars(self)) > + > + > +class Fat16: > + def __init__( > + self, > + start_sector: int, > + size: int, > + sector_reader: callable, > + sector_writer: callable, > + ): > + self.start_sector = start_sector > + self.size_in_sectors = size > + self.sector_reader = sector_reader > + self.sector_writer = sector_writer > + > + self.boot_sector = FatBootSector(self.sector_reader(start_sector)) > + > + fat_size_in_sectors = \ > + self.boot_sector.fat_size * self.boot_sector.fat_count > + self.fats = self.read_sectors( > + self.boot_sector.reserved_sectors, fat_size_in_sectors > + ) > + self.fats_dirty_sectors = set() > + > + def read_sectors(self, start_sector: int, num_sectors: int) -> bytes: > + return self.sector_reader(start_sector + self.start_sector, num_sectors) > + > + def write_sectors(self, start_sector: int, data: bytes): > + return self.sector_writer(start_sector + self.start_sector, data) > + > + def directory_from_bytes( > + self, data: bytes, start_sector: int > + ) -> List[FatDirectoryEntry]: > + """ > + Convert `bytes` into a list of `FatDirectoryEntry` objects. > + Will ignore long file names. > + Will stop when it encounters a 0x00 byte. > + """ > + > + entries = [] > + for i in range(0, len(data), DIRENTRY_SIZE): > + entry = data[i : i + DIRENTRY_SIZE] > + > + current_sector = start_sector + (i // SECTOR_SIZE) > + current_offset = i % SECTOR_SIZE > + > + if entry[0] == 0: > + break > + elif entry[0] == 0xE5: > + # Deleted file > + continue > + > + if entry[11] & 0xF == 0xF: > + # Long file name > + continue > + > + entries.append( > + FatDirectoryEntry(entry, current_sector, current_offset)) > + return entries > + > + def read_root_directory(self) -> List[FatDirectoryEntry]: > + root_dir = self.read_sectors( > + self.boot_sector.root_dir_start(), self.boot_sector.root_dir_size() > + ) > + return self.directory_from_bytes(root_dir, > + self.boot_sector.root_dir_start()) > + > + def read_fat_entry(self, cluster: int) -> int: > + """ > + Read the FAT entry for the given cluster. > + """ > + fat_offset = cluster * 2 # FAT16 > + return int.from_bytes(self.fats[fat_offset : fat_offset + 2], "little") > + > + def write_fat_entry(self, cluster: int, value: int): > + """ > + Write the FAT entry for the given cluster. > + """ > + fat_offset = cluster * 2 > + self.fats = ( > + self.fats[:fat_offset] > + + value.to_bytes(2, "little") > + + self.fats[fat_offset + 2 :] > + ) > + self.fats_dirty_sectors.add(fat_offset // SECTOR_SIZE) > + > + def flush_fats(self): > + """ > + Write the FATs back to the disk. > + """ > + for sector in self.fats_dirty_sectors: > + data = self.fats[sector * SECTOR_SIZE : (sector + 1) * SECTOR_SIZE] > + sector = self.boot_sector.reserved_sectors + sector > + self.write_sectors(sector, data) > + self.fats_dirty_sectors = set() > + > + def next_cluster(self, cluster: int) -> int | None: > + """ > + Get the next cluster in the chain. > + If its `None`, then its the last cluster. > + The function will crash if the next cluster > + is `FREE` (unexpected) or invalid entry. > + """ > + fat_entry = self.read_fat_entry(cluster) > + if fat_entry == 0: > + raise Exception("Unexpected: FREE cluster") > + elif fat_entry == 1: > + raise Exception("Unexpected: RESERVED cluster") > + elif fat_entry >= 0xFFF8: > + return None > + elif fat_entry >= 0xFFF7: > + raise Exception("Invalid FAT entry") > + else: > + return fat_entry > + > + def next_free_cluster(self) -> int: > + """ > + Find the next free cluster. > + """ > + # simple linear search > + for i in range(2, 0xFFFF): > + if self.read_fat_entry(i) == 0: > + return i > + raise Exception("No free clusters") > + > + def read_cluster(self, cluster: int) -> bytes: > + """ > + Read the cluster at the given cluster. > + """ > + return self.read_sectors( > + self.boot_sector.first_sector_of_cluster(cluster), > + self.boot_sector.sectors_per_cluster, > + ) > + > + def write_cluster(self, cluster: int, data: bytes): > + """ > + Write the cluster at the given cluster. > + """ > + assert len(data) == self.boot_sector.cluster_bytes() > + return self.write_sectors( > + self.boot_sector.first_sector_of_cluster(cluster), > + data, > + ) > + > + def read_directory(self, cluster: int) -> List[FatDirectoryEntry]: > + """ > + Read the directory at the given cluster. > + """ > + entries = [] > + while cluster is not None: > + data = self.read_cluster(cluster) > + entries.extend( > + self.directory_from_bytes( > + data, self.boot_sector.first_sector_of_cluster(cluster) > + ) > + ) > + cluster = self.next_cluster(cluster) > + return entries > + > + def add_direntry(self, > + cluster: int | None, > + name: str, ext: str, > + attributes: int): > + """ > + Add a new directory entry to the given cluster. > + If the cluster is `None`, then it will be added to the root directory. > + """ > + > + def find_free_entry(data: bytes): > + for i in range(0, len(data), DIRENTRY_SIZE): > + entry = data[i : i + DIRENTRY_SIZE] > + if entry[0] == 0 or entry[0] == 0xE5: > + return i > + return None > + > + assert len(name) <= 8, "Name must be 8 characters or less" > + assert len(ext) <= 3, "Ext must be 3 characters or less" > + assert attributes % 0x15 != 0x15, "Invalid attributes" > + > + # initial dummy data > + new_entry = FatDirectoryEntry(b"\0" * 32, 0, 0) > + new_entry.name = name.ljust(8, " ") > + new_entry.ext = ext.ljust(3, " ") > + new_entry.attributes = attributes > + new_entry.reserved = 0 > + new_entry.create_time_tenth = 0 > + new_entry.create_time = 0 > + new_entry.create_date = 0 > + new_entry.last_access_date = 0 > + new_entry.last_mod_time = 0 > + new_entry.last_mod_date = 0 > + new_entry.cluster = self.next_free_cluster() > + new_entry.size_bytes = 0 > + > + # mark as EOF > + self.write_fat_entry(new_entry.cluster, 0xFFFF) > + > + if cluster is None: > + for i in range(self.boot_sector.root_dir_size()): > + sector_data = self.read_sectors( > + self.boot_sector.root_dir_start() + i, 1 > + ) > + offset = find_free_entry(sector_data) > + if offset is not None: > + new_entry.sector = self.boot_sector.root_dir_start() + i > + new_entry.offset = offset > + self.update_direntry(new_entry) > + return new_entry > + else: > + while cluster is not None: > + data = self.read_cluster(cluster) > + offset = find_free_entry(data) > + if offset is not None: > + new_entry.sector = self.boot_sector.first_sector_of_cluster( > + cluster > + ) + (offset // SECTOR_SIZE) > + new_entry.offset = offset % SECTOR_SIZE > + self.update_direntry(new_entry) > + return new_entry > + cluster = self.next_cluster(cluster) > + > + raise Exception("No free directory entries") > + > + def update_direntry(self, entry: FatDirectoryEntry): > + """ > + Write the directory entry back to the disk. > + """ > + sector = self.read_sectors(entry.sector, 1) > + sector = ( > + sector[: entry.offset] > + + entry.as_bytes() > + + sector[entry.offset + DIRENTRY_SIZE :] > + ) > + self.write_sectors(entry.sector, sector) > + > + def find_direntry(self, path: str) -> FatDirectoryEntry | None: > + """ > + Find the directory entry for the given path. > + """ > + assert path[0] == "/", "Path must start with /" > + > + path = path[1:] # remove the leading / > + parts = path.split("/") > + directory = self.read_root_directory() > + > + current_entry = None > + > + for i, part in enumerate(parts): > + is_last = i == len(parts) - 1 > + > + for entry in directory: > + if entry.whole_name() == part: > + current_entry = entry > + break > + if current_entry is None: > + return None > + > + if is_last: > + return current_entry > + else: > + if current_entry.attributes & 0x10 == 0: > + raise Exception( > + f"{current_entry.whole_name()} is not a directory") > + else: > + directory = self.read_directory(current_entry.cluster) > + > + def read_file(self, entry: FatDirectoryEntry) -> bytes: > + """ > + Read the content of the file at the given path. > + """ > + if entry is None: > + return None > + if entry.attributes & 0x10 != 0: > + raise Exception(f"{entry.whole_name()} is a directory") > + > + data = b"" > + cluster = entry.cluster > + while cluster is not None and len(data) <= entry.size_bytes: > + data += self.read_cluster(cluster) > + cluster = self.next_cluster(cluster) > + return data[: entry.size_bytes] > + > + def truncate_file(self, entry: FatDirectoryEntry, new_size: int): > + """ > + Truncate the file at the given path to the new size. > + """ > + if entry is None: > + return Exception("entry is None") > + if entry.attributes & 0x10 != 0: > + raise Exception(f"{entry.whole_name()} is a directory") > + > + def clusters_from_size(size: int): > + return ( > + size + self.boot_sector.cluster_bytes() - 1 > + ) // self.boot_sector.cluster_bytes() > + > + # First, allocate new FATs if we need to > + required_clusters = clusters_from_size(new_size) > + current_clusters = clusters_from_size(entry.size_bytes) > + > + affected_clusters = set() > + > + # Keep at least one cluster, easier to manage this way > + if required_clusters == 0: > + required_clusters = 1 > + if current_clusters == 0: > + current_clusters = 1 > + > + if required_clusters > current_clusters: > + # Allocate new clusters > + cluster = entry.cluster > + to_add = required_clusters > + for _ in range(current_clusters - 1): > + to_add -= 1 > + cluster = self.next_cluster(cluster) > + assert required_clusters > 0, "No new clusters to allocate" > + assert cluster is not None, "Cluster is None" > + assert self.next_cluster(cluster) is None, \ > + "Cluster is not the last cluster" > + > + # Allocate new clusters > + for _ in range(to_add - 1): > + new_cluster = self.next_free_cluster() > + self.write_fat_entry(cluster, new_cluster) > + self.write_fat_entry(new_cluster, 0xFFFF) > + cluster = new_cluster > + > + elif required_clusters < current_clusters: > + # Truncate the file > + cluster = entry.cluster > + for _ in range(required_clusters - 1): > + cluster = self.next_cluster(cluster) > + assert cluster is not None, "Cluster is None" > + > + next_cluster = self.next_cluster(cluster) > + # mark last as EOF > + self.write_fat_entry(cluster, 0xFFFF) > + # free the rest > + while next_cluster is not None: > + cluster = next_cluster > + next_cluster = self.next_cluster(next_cluster) > + self.write_fat_entry(cluster, 0) > + > + self.flush_fats() > + > + # verify number of clusters > + cluster = entry.cluster > + count = 0 > + while cluster is not None: > + count += 1 > + affected_clusters.add(cluster) > + cluster = self.next_cluster(cluster) > + assert ( > + count == required_clusters > + ), f"Expected {required_clusters} clusters, got {count}" > + > + # update the size > + entry.size_bytes = new_size > + self.update_direntry(entry) > + > + # trigger every affected cluster > + for cluster in affected_clusters: > + first_sector = self.boot_sector.first_sector_of_cluster(cluster) > + first_sector_data = self.read_sectors(first_sector, 1) > + self.write_sectors(first_sector, first_sector_data) > + > + def write_file(self, entry: FatDirectoryEntry, data: bytes): > + """ > + Write the content of the file at the given path. > + """ > + if entry is None: > + return Exception("entry is None") > + if entry.attributes & 0x10 != 0: > + raise Exception(f"{entry.whole_name()} is a directory") > + > + data_len = len(data) > + > + self.truncate_file(entry, data_len) > + > + cluster = entry.cluster > + while cluster is not None: > + data_to_write = data[: self.boot_sector.cluster_bytes()] > + last_data = False > + if len(data_to_write) < self.boot_sector.cluster_bytes(): > + last_data = True > + old_data = self.read_cluster(cluster) > + data_to_write += old_data[len(data_to_write) :] > + > + self.write_cluster(cluster, data_to_write) > + data = data[self.boot_sector.cluster_bytes() :] > + if len(data) == 0: > + break > + cluster = self.next_cluster(cluster) > + > + assert len(data) == 0, \ > + "Data was not written completely, clusters missing" > + > + def create_file(self, path: str): > + """ > + Create a new file at the given path. > + """ > + assert path[0] == "/", "Path must start with /" > + > + path = path[1:] # remove the leading / > + > + parts = path.split("/") > + > + directory_cluster = None > + directory = self.read_root_directory() > + > + parts, filename = parts[:-1], parts[-1] > + > + for i, part in enumerate(parts): > + current_entry = None > + for entry in directory: > + if entry.whole_name() == part: > + current_entry = entry > + break > + if current_entry is None: > + return None > + > + if current_entry.attributes & 0x10 == 0: > + raise Exception( > + f"{current_entry.whole_name()} is not a directory") > + else: > + directory = self.read_directory(current_entry.cluster) > + directory_cluster = current_entry.cluster > + > + # add new entry to the directory > + > + filename, ext = filename.split(".") > + > + if len(ext) > 3: > + raise Exception("Ext must be 3 characters or less") > + if len(filename) > 8: > + raise Exception("Name must be 8 characters or less") > + > + for c in filename + ext: > + > + if c not in ALLOWED_FILE_CHARS: > + raise Exception("Invalid character in filename") > + > + return self.add_direntry(directory_cluster, filename, ext, 0) > diff --git a/tests/qemu-iotests/testenv.py b/tests/qemu-iotests/testenv.py > index 588f30a4f1..4053d29de4 100644 > --- a/tests/qemu-iotests/testenv.py > +++ b/tests/qemu-iotests/testenv.py > @@ -250,7 +250,7 @@ def __init__(self, source_dir: str, build_dir: str, > self.qemu_img_options = os.getenv('QEMU_IMG_OPTIONS') > self.qemu_nbd_options = os.getenv('QEMU_NBD_OPTIONS') > > - is_generic = self.imgfmt not in ['bochs', 'cloop', 'dmg'] > + is_generic = self.imgfmt not in ['bochs', 'cloop', 'dmg', 'vvfat'] > self.imgfmt_generic = 'true' if is_generic else 'false' > > self.qemu_io_options = f'--cache {self.cachemode} --aio {self.aiomode}' > diff --git a/tests/qemu-iotests/tests/vvfat b/tests/qemu-iotests/tests/vvfat > new file mode 100755 > index 0000000000..113d7d3270 > --- /dev/null > +++ b/tests/qemu-iotests/tests/vvfat > @@ -0,0 +1,440 @@ > +#!/usr/bin/env python3 > +# group: rw vvfat > +# > +# Test vvfat driver implementation > +# Here, we use a simple FAT16 implementation and check the behavior of the vvfat driver. > +# > +# Copyright (C) 2024 Amjad Alsharafi <amjadsharafi10@gmail.com> > +# > +# This program is free software; you can redistribute it and/or modify > +# it under the terms of the GNU General Public License as published by > +# the Free Software Foundation; either version 2 of the License, or > +# (at your option) any later version. > +# > +# This program is distributed in the hope that it will be useful, > +# but WITHOUT ANY WARRANTY; without even the implied warranty of > +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > +# GNU General Public License for more details. > +# > +# You should have received a copy of the GNU General Public License > +# along with this program. If not, see <http://www.gnu.org/licenses/>. > + > +import os, shutil > +import iotests > +from iotests import imgfmt, QMPTestCase > +from fat16 import MBR, Fat16, DIRENTRY_SIZE > + > +filesystem = os.path.join(iotests.test_dir, "filesystem") > + > +nbd_sock = iotests.file_path("nbd.sock", base_dir=iotests.sock_dir) > +nbd_uri = "nbd+unix:///disk?socket=" + nbd_sock > + > +SECTOR_SIZE = 512 > + > + > +class TestVVFatDriver(QMPTestCase): > + def setUp(self) -> None: > + if os.path.exists(filesystem): > + if os.path.isdir(filesystem): > + shutil.rmtree(filesystem) > + else: > + print(f"Error: {filesystem} exists and is not a directory") > + exit(1) > + os.mkdir(filesystem) > + > + # Add some text files to the filesystem > + for i in range(10): > + with open(os.path.join(filesystem, f"file{i}.txt"), "w") as f: > + f.write(f"Hello, world! {i}\n") > + > + # Add 2 large files, above the cluster size (8KB) > + with open(os.path.join(filesystem, "large1.txt"), "wb") as f: > + # write 'A' * 1KB, 'B' * 1KB, 'C' * 1KB, ... > + for i in range(8 * 2): # two clusters > + f.write(bytes([0x41 + i] * 1024)) > + > + with open(os.path.join(filesystem, "large2.txt"), "wb") as f: > + # write 'A' * 1KB, 'B' * 1KB, 'C' * 1KB, ... > + for i in range(8 * 3): # 3 clusters > + f.write(bytes([0x41 + i] * 1024)) > + > + self.vm = iotests.VM() > + > + self.vm.add_blockdev( > + self.vm.qmp_to_opts( > + { > + "driver": imgfmt, > + "node-name": "disk", > + "rw": "true", > + "fat-type": "16", > + "dir": filesystem, > + } > + ) > + ) > + > + self.vm.launch() > + > + self.vm.qmp_log("block-dirty-bitmap-add", **{"node": "disk", "name": "bitmap0"}) > + > + # attach nbd server > + self.vm.qmp_log( > + "nbd-server-start", > + **{"addr": {"type": "unix", "data": {"path": nbd_sock}}}, > + filters=[], > + ) > + > + self.vm.qmp_log( > + "nbd-server-add", > + **{"device": "disk", "writable": True, "bitmap": "bitmap0"}, > + ) > + > + self.qio = iotests.QemuIoInteractive("-f", "raw", nbd_uri) > + > + def tearDown(self) -> None: > + self.qio.close() > + self.vm.shutdown() > + # print(self.vm.get_log()) > + shutil.rmtree(filesystem) > + > + def read_sectors(self, sector: int, num: int = 1) -> bytes: > + """ > + Read `num` sectors starting from `sector` from the `disk`. > + This uses `QemuIoInteractive` to read the sectors into `stdout` and then parse the output. > + """ > + self.assertGreater(num, 0) > + # The output contains the content of the sector in hex dump format > + # We need to extract the content from it > + output = self.qio.cmd(f"read -v {sector * SECTOR_SIZE} {num * SECTOR_SIZE}") > + # Each row is 16 bytes long, and we are writing `num` sectors > + rows = num * SECTOR_SIZE // 16 > + output_rows = output.split("\n")[:rows] > + > + hex_content = "".join( > + [(row.split(": ")[1]).split(" ")[0] for row in output_rows] > + ) > + bytes_content = bytes.fromhex(hex_content) > + > + self.assertEqual(len(bytes_content), num * SECTOR_SIZE) > + > + return bytes_content > + > + def write_sectors(self, sector: int, data: bytes): > + """ > + Write `data` to the `disk` starting from `sector`. > + This uses `QemuIoInteractive` to write the data into the disk. > + """ > + > + self.assertGreater(len(data), 0) > + self.assertEqual(len(data) % SECTOR_SIZE, 0) > + > + temp_file = os.path.join(iotests.test_dir, "temp.bin") > + with open(temp_file, "wb") as f: > + f.write(data) > + > + self.qio.cmd(f"write -s {temp_file} {sector * SECTOR_SIZE} {len(data)}") > + > + os.remove(temp_file) > + > + def init_fat16(self): > + mbr = MBR(self.read_sectors(0)) > + return Fat16( > + mbr.partition_table[0]["start_lba"], > + mbr.partition_table[0]["size"], > + self.read_sectors, > + self.write_sectors, > + ) > + > + # Tests > + > + def test_fat_filesystem(self): > + """ > + Test that vvfat produce a valid FAT16 and MBR sectors > + """ > + mbr = MBR(self.read_sectors(0)) > + > + self.assertEqual(mbr.partition_table[0]["status"], 0x80) > + self.assertEqual(mbr.partition_table[0]["type"], 6) > + > + fat16 = Fat16( > + mbr.partition_table[0]["start_lba"], > + mbr.partition_table[0]["size"], > + self.read_sectors, > + self.write_sectors, > + ) > + self.assertEqual(fat16.boot_sector.bytes_per_sector, 512) > + self.assertEqual(fat16.boot_sector.volume_label, "QEMU VVFAT") > + > + def test_read_root_directory(self): > + """ > + Test the content of the root directory > + """ > + fat16 = self.init_fat16() > + > + root_dir = fat16.read_root_directory() > + > + self.assertEqual(len(root_dir), 13) # 12 + 1 special file > + > + files = { > + "QEMU VVF.AT": 0, # special empty file > + "FILE0.TXT": 16, > + "FILE1.TXT": 16, > + "FILE2.TXT": 16, > + "FILE3.TXT": 16, > + "FILE4.TXT": 16, > + "FILE5.TXT": 16, > + "FILE6.TXT": 16, > + "FILE7.TXT": 16, > + "FILE8.TXT": 16, > + "FILE9.TXT": 16, > + "LARGE1.TXT": 0x2000 * 2, > + "LARGE2.TXT": 0x2000 * 3, > + } > + > + for entry in root_dir: > + self.assertIn(entry.whole_name(), files) > + self.assertEqual(entry.size_bytes, files[entry.whole_name()]) > + > + def test_direntry_as_bytes(self): > + """ > + Test if we can convert Direntry back to bytes, so that we can write it back to the disk safely. > + """ > + fat16 = self.init_fat16() > + > + root_dir = fat16.read_root_directory() > + first_entry_bytes = fat16.read_sectors(fat16.boot_sector.root_dir_start(), 1) > + # The first entry won't be deleted, so we can compare it with the first entry in the root directory > + self.assertEqual(root_dir[0].as_bytes(), first_entry_bytes[:DIRENTRY_SIZE]) > + > + def test_read_files(self): > + """ > + Test reading the content of the files > + """ > + fat16 = self.init_fat16() > + > + for i in range(10): > + file = fat16.find_direntry(f"/FILE{i}.TXT") > + self.assertIsNotNone(file) > + self.assertEqual( > + fat16.read_file(file), f"Hello, world! {i}\n".encode("ascii") > + ) > + > + # test large files > + large1 = fat16.find_direntry("/LARGE1.TXT") > + with open(os.path.join(filesystem, "large1.txt"), "rb") as f: > + self.assertEqual(fat16.read_file(large1), f.read()) > + > + large2 = fat16.find_direntry("/LARGE2.TXT") > + self.assertIsNotNone(large2) > + with open(os.path.join(filesystem, "large2.txt"), "rb") as f: > + self.assertEqual(fat16.read_file(large2), f.read()) > + > + def test_write_file_same_content_direct(self): > + """ > + Similar to `test_write_file_in_same_content`, but we write the file directly clusters > + and thus we don't go through the modification of direntry. > + """ > + fat16 = self.init_fat16() > + > + file = fat16.find_direntry("/FILE0.TXT") > + self.assertIsNotNone(file) > + > + data = fat16.read_cluster(file.cluster) > + fat16.write_cluster(file.cluster, data) > + > + with open(os.path.join(filesystem, "file0.txt"), "rb") as f: > + self.assertEqual(fat16.read_file(file), f.read()) > + > + def test_write_file_in_same_content(self): > + """ > + Test writing the same content to the file back to it > + """ > + fat16 = self.init_fat16() > + > + file = fat16.find_direntry("/FILE0.TXT") > + self.assertIsNotNone(file) > + > + self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n") > + > + fat16.write_file(file, b"Hello, world! 0\n") > + > + self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n") > + > + with open(os.path.join(filesystem, "file0.txt"), "rb") as f: > + self.assertEqual(f.read(), b"Hello, world! 0\n") > + > + def test_modify_content_same_clusters(self): > + """ > + Test modifying the content of the file without changing the number of clusters > + """ > + fat16 = self.init_fat16() > + > + file = fat16.find_direntry("/FILE0.TXT") > + self.assertIsNotNone(file) > + > + new_content = b"Hello, world! Modified\n" > + self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n") > + > + fat16.write_file(file, new_content) > + > + self.assertEqual(fat16.read_file(file), new_content) > + > + with open(os.path.join(filesystem, "file0.txt"), "rb") as f: > + self.assertEqual(f.read(), new_content) > + > + def test_truncate_file_same_clusters_less(self): > + """ > + Test truncating the file without changing number of clusters > + Test decreasing the file size > + """ > + fat16 = self.init_fat16() > + > + file = fat16.find_direntry("/FILE0.TXT") > + self.assertIsNotNone(file) > + > + self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n") > + > + fat16.truncate_file(file, 5) > + > + new_content = fat16.read_file(file) > + > + self.assertEqual(new_content, b"Hello") > + > + with open(os.path.join(filesystem, "file0.txt"), "rb") as f: > + self.assertEqual(f.read(), new_content) > + > + def test_truncate_file_same_clusters_more(self): > + """ > + Test truncating the file without changing number of clusters > + Test increase the file size > + """ > + fat16 = self.init_fat16() > + > + file = fat16.find_direntry("/FILE0.TXT") > + self.assertIsNotNone(file) > + > + self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n") > + > + fat16.truncate_file(file, 20) > + > + new_content = fat16.read_file(file) > + > + # random pattern will be appended to the file, and its not always the same > + self.assertEqual(new_content[:16], b"Hello, world! 0\n") > + self.assertEqual(len(new_content), 20) > + > + with open(os.path.join(filesystem, "file0.txt"), "rb") as f: > + self.assertEqual(f.read(), new_content) > + > + def test_write_large_file(self): > + """ > + Test writing a large file > + """ > + fat16 = self.init_fat16() > + > + file = fat16.find_direntry("/LARGE1.TXT") > + self.assertIsNotNone(file) > + > + # The content of LARGE1 is A * 1KB, B * 1KB, C * 1KB, ..., P * 1KB > + # Lets change it to be Z * 1KB, Y * 1KB, X * 1KB, ..., K * 1KB > + # without changing the number of clusters or filesize > + new_content = b"".join([bytes([0x5A - i] * 1024) for i in range(16)]) > + > + fat16.write_file(file, new_content) > + > + with open(os.path.join(filesystem, "large1.txt"), "rb") as f: > + self.assertEqual(f.read(), new_content) > + > + def test_truncate_file_change_clusters_less(self): > + """ > + Test truncating a file by reducing the number of clusters > + """ > + fat16 = self.init_fat16() > + > + file = fat16.find_direntry("/LARGE1.TXT") > + self.assertIsNotNone(file) > + > + fat16.truncate_file(file, 1) > + > + self.assertEqual(fat16.read_file(file), b"A") > + > + with open(os.path.join(filesystem, "large1.txt"), "rb") as f: > + self.assertEqual(f.read(), b"A") > + > + def test_write_file_change_clusters_less(self): > + """ > + Test truncating a file by reducing the number of clusters > + """ > + fat16 = self.init_fat16() > + > + file = fat16.find_direntry("/LARGE2.TXT") > + self.assertIsNotNone(file) > + > + new_content = b"Hello, world! This was a large file\n" > + new_content = b"X" * 8 * 1024 + b"Y" * 8 * 1024 This sets and then immediately overwrites new_content. What was intended here? > + > + fat16.write_file(file, new_content) > + > + self.assertEqual(fat16.read_file(file), new_content) > + > + with open(os.path.join(filesystem, "large2.txt"), "rb") as f: > + self.assertEqual(f.read(), new_content) > + > + def test_write_file_change_clusters_more(self): > + """ > + Test truncating a file by increasing the number of clusters > + """ > + fat16 = self.init_fat16() > + > + file = fat16.find_direntry("/LARGE2.TXT") > + self.assertIsNotNone(file) > + > + new_content = b"X" * 8 * 1024 + b"Y" * 8 * 1024 + b"Z" * 8 * 1024 > + > + fat16.write_file(file, new_content) > + > + with open(os.path.join(filesystem, "large2.txt"), "rb") as f: > + self.assertEqual(f.read(), new_content) > + > + def test_write_file_change_clusters_more_non_last_file(self): > + """ > + Test truncating a file by increasing the number of clusters > + This is a special variant of the above test, where we write to > + a file so that when allocating new clusters, it won't have contiguous clusters > + """ > + fat16 = self.init_fat16() > + > + file = fat16.find_direntry("/LARGE1.TXT") > + self.assertIsNotNone(file) > + > + new_content = b"X" * 8 * 1024 + b"Y" * 8 * 1024 + b"Z" * 8 * 1024 > + > + fat16.write_file(file, new_content) > + > + with open(os.path.join(filesystem, "large1.txt"), "rb") as f: > + self.assertEqual(f.read(), new_content) > + > + def test_create_file(self): > + """ > + Test creating a new file > + """ > + fat16 = self.init_fat16() > + > + new_file = fat16.create_file("/NEWFILE.TXT") > + > + self.assertIsNotNone(new_file) > + self.assertEqual(new_file.size_bytes, 0) > + > + new_content = b"Hello, world! New file\n" > + fat16.write_file(new_file, new_content) > + > + self.assertEqual(fat16.read_file(new_file), new_content) > + > + with open(os.path.join(filesystem, "newfile.txt"), "rb") as f: > + self.assertEqual(f.read(), new_content) > + > + # TODO: support deleting files > + > + > +if __name__ == "__main__": > + # This is a specific test for vvfat driver > + iotests.main(supported_fmts=["vvfat"], supported_protocols=["file"]) > diff --git a/tests/qemu-iotests/tests/vvfat.out b/tests/qemu-iotests/tests/vvfat.out > new file mode 100755 > index 0000000000..96961ed0b5 > --- /dev/null > +++ b/tests/qemu-iotests/tests/vvfat.out > @@ -0,0 +1,5 @@ > +............... > +---------------------------------------------------------------------- > +Ran 15 tests > + > +OK With the updated test, I can catch the problems that are fixed by patches 1 and 2, but it still doesn't need patch 3 to pass. Kevin
On Mon, Jun 10, 2024 at 02:01:24PM +0200, Kevin Wolf wrote: > Am 05.06.2024 um 02:58 hat Amjad Alsharafi geschrieben: > > Added several tests to verify the implementation of the vvfat driver. > > > > We needed a way to interact with it, so created a basic `fat16.py` driver that handled writing correct sectors for us. > > > > Added `vvfat` to the non-generic formats, as its not a normal image format. > > > > Signed-off-by: Amjad Alsharafi <amjadsharafi10@gmail.com> > > --- > > tests/qemu-iotests/check | 2 +- > > tests/qemu-iotests/fat16.py | 635 +++++++++++++++++++++++++++++ > > tests/qemu-iotests/testenv.py | 2 +- > > tests/qemu-iotests/tests/vvfat | 440 ++++++++++++++++++++ > > tests/qemu-iotests/tests/vvfat.out | 5 + > > 5 files changed, 1082 insertions(+), 2 deletions(-) > > create mode 100644 tests/qemu-iotests/fat16.py > > create mode 100755 tests/qemu-iotests/tests/vvfat > > create mode 100755 tests/qemu-iotests/tests/vvfat.out > > > > diff --git a/tests/qemu-iotests/check b/tests/qemu-iotests/check > > index 56d88ca423..545f9ec7bd 100755 > > --- a/tests/qemu-iotests/check > > +++ b/tests/qemu-iotests/check > > @@ -84,7 +84,7 @@ def make_argparser() -> argparse.ArgumentParser: > > p.set_defaults(imgfmt='raw', imgproto='file') > > > > format_list = ['raw', 'bochs', 'cloop', 'parallels', 'qcow', 'qcow2', > > - 'qed', 'vdi', 'vpc', 'vhdx', 'vmdk', 'luks', 'dmg'] > > + 'qed', 'vdi', 'vpc', 'vhdx', 'vmdk', 'luks', 'dmg', 'vvfat'] > > g_fmt = p.add_argument_group( > > ' image format options', > > 'The following options set the IMGFMT environment variable. ' > > diff --git a/tests/qemu-iotests/fat16.py b/tests/qemu-iotests/fat16.py > > new file mode 100644 > > index 0000000000..baf801b4d5 > > --- /dev/null > > +++ b/tests/qemu-iotests/fat16.py > > @@ -0,0 +1,635 @@ > > +# A simple FAT16 driver that is used to test the `vvfat` driver in QEMU. > > +# > > +# Copyright (C) 2024 Amjad Alsharafi <amjadsharafi10@gmail.com> > > +# > > +# This program is free software; you can redistribute it and/or modify > > +# it under the terms of the GNU General Public License as published by > > +# the Free Software Foundation; either version 2 of the License, or > > +# (at your option) any later version. > > +# > > +# This program is distributed in the hope that it will be useful, > > +# but WITHOUT ANY WARRANTY; without even the implied warranty of > > +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > > +# GNU General Public License for more details. > > +# > > +# You should have received a copy of the GNU General Public License > > +# along with this program. If not, see <http://www.gnu.org/licenses/>. > > + > > +from typing import List > > +import string > > + > > +SECTOR_SIZE = 512 > > +DIRENTRY_SIZE = 32 > > +ALLOWED_FILE_CHARS = \ > > + set("!#$%&'()-@^_`{}~" + string.digits + string.ascii_uppercase) > > + > > + > > +class MBR: > > + def __init__(self, data: bytes): > > + assert len(data) == 512 > > + self.partition_table = [] > > + for i in range(4): > > + partition = data[446 + i * 16 : 446 + (i + 1) * 16] > > + self.partition_table.append( > > + { > > + "status": partition[0], > > + "start_head": partition[1], > > + "start_sector": partition[2] & 0x3F, > > + "start_cylinder": > > + ((partition[2] & 0xC0) << 2) | partition[3], > > + "type": partition[4], > > + "end_head": partition[5], > > + "end_sector": partition[6] & 0x3F, > > + "end_cylinder": > > + ((partition[6] & 0xC0) << 2) | partition[7], > > + "start_lba": int.from_bytes(partition[8:12], "little"), > > + "size": int.from_bytes(partition[12:16], "little"), > > + } > > + ) > > + > > + def __str__(self): > > + return "\n".join( > > + [f"{i}: {partition}" > > + for i, partition in enumerate(self.partition_table)] > > + ) > > + > > + > > +class FatBootSector: > > + def __init__(self, data: bytes): > > + assert len(data) == 512 > > + self.bytes_per_sector = int.from_bytes(data[11:13], "little") > > + self.sectors_per_cluster = data[13] > > + self.reserved_sectors = int.from_bytes(data[14:16], "little") > > + self.fat_count = data[16] > > + self.root_entries = int.from_bytes(data[17:19], "little") > > + self.media_descriptor = data[21] > > + self.fat_size = int.from_bytes(data[22:24], "little") > > + self.sectors_per_fat = int.from_bytes(data[22:24], "little") > > Why two different attributes self.fat_size and self.sectors_per_fat that > contain the same value? > > > + self.sectors_per_track = int.from_bytes(data[24:26], "little") > > + self.heads = int.from_bytes(data[26:28], "little") > > + self.hidden_sectors = int.from_bytes(data[28:32], "little") > > + self.total_sectors = int.from_bytes(data[32:36], "little") > > This value should only be considered if the 16 bit value at byte 19 > (which you don't store at all) was 0. > > > + self.drive_number = data[36] > > + self.volume_id = int.from_bytes(data[39:43], "little") > > + self.volume_label = data[43:54].decode("ascii").strip() > > + self.fs_type = data[54:62].decode("ascii").strip() > > + > > + def root_dir_start(self): > > + """ > > + Calculate the start sector of the root directory. > > + """ > > + return self.reserved_sectors + self.fat_count * self.sectors_per_fat > > + > > + def root_dir_size(self): > > + """ > > + Calculate the size of the root directory in sectors. > > + """ > > + return ( > > + self.root_entries * DIRENTRY_SIZE + self.bytes_per_sector - 1 > > + ) // self.bytes_per_sector > > + > > + def data_sector_start(self): > > + """ > > + Calculate the start sector of the data region. > > + """ > > + return self.root_dir_start() + self.root_dir_size() > > + > > + def first_sector_of_cluster(self, cluster: int): > > + """ > > + Calculate the first sector of the given cluster. > > + """ > > + return self.data_sector_start() \ > > + + (cluster - 2) * self.sectors_per_cluster > > + > > + def cluster_bytes(self): > > + """ > > + Calculate the number of bytes in a cluster. > > + """ > > + return self.bytes_per_sector * self.sectors_per_cluster > > + > > + def __str__(self): > > + return ( > > + f"Bytes per sector: {self.bytes_per_sector}\n" > > + f"Sectors per cluster: {self.sectors_per_cluster}\n" > > + f"Reserved sectors: {self.reserved_sectors}\n" > > + f"FAT count: {self.fat_count}\n" > > + f"Root entries: {self.root_entries}\n" > > + f"Total sectors: {self.total_sectors}\n" > > + f"Media descriptor: {self.media_descriptor}\n" > > + f"Sectors per FAT: {self.sectors_per_fat}\n" > > + f"Sectors per track: {self.sectors_per_track}\n" > > + f"Heads: {self.heads}\n" > > + f"Hidden sectors: {self.hidden_sectors}\n" > > + f"Drive number: {self.drive_number}\n" > > + f"Volume ID: {self.volume_id}\n" > > + f"Volume label: {self.volume_label}\n" > > + f"FS type: {self.fs_type}\n" > > + ) > > + > > + > > +class FatDirectoryEntry: > > + def __init__(self, data: bytes, sector: int, offset: int): > > + self.name = data[0:8].decode("ascii").strip() > > + self.ext = data[8:11].decode("ascii").strip() > > + self.attributes = data[11] > > + self.reserved = data[12] > > + self.create_time_tenth = data[13] > > + self.create_time = int.from_bytes(data[14:16], "little") > > + self.create_date = int.from_bytes(data[16:18], "little") > > + self.last_access_date = int.from_bytes(data[18:20], "little") > > + high_cluster = int.from_bytes(data[20:22], "little") > > + self.last_mod_time = int.from_bytes(data[22:24], "little") > > + self.last_mod_date = int.from_bytes(data[24:26], "little") > > + low_cluster = int.from_bytes(data[26:28], "little") > > + self.cluster = (high_cluster << 16) | low_cluster > > + self.size_bytes = int.from_bytes(data[28:32], "little") > > + > > + # extra (to help write back to disk) > > + self.sector = sector > > + self.offset = offset > > + > > + def as_bytes(self) -> bytes: > > + return ( > > + self.name.ljust(8, " ").encode("ascii") > > + + self.ext.ljust(3, " ").encode("ascii") > > + + self.attributes.to_bytes(1, "little") > > + + self.reserved.to_bytes(1, "little") > > + + self.create_time_tenth.to_bytes(1, "little") > > + + self.create_time.to_bytes(2, "little") > > + + self.create_date.to_bytes(2, "little") > > + + self.last_access_date.to_bytes(2, "little") > > + + (self.cluster >> 16).to_bytes(2, "little") > > + + self.last_mod_time.to_bytes(2, "little") > > + + self.last_mod_date.to_bytes(2, "little") > > + + (self.cluster & 0xFFFF).to_bytes(2, "little") > > + + self.size_bytes.to_bytes(4, "little") > > + ) > > + > > + def whole_name(self): > > + if self.ext: > > + return f"{self.name}.{self.ext}" > > + else: > > + return self.name > > + > > + def __str__(self): > > + return ( > > + f"Name: {self.name}\n" > > + f"Ext: {self.ext}\n" > > + f"Attributes: {self.attributes}\n" > > + f"Reserved: {self.reserved}\n" > > + f"Create time tenth: {self.create_time_tenth}\n" > > + f"Create time: {self.create_time}\n" > > + f"Create date: {self.create_date}\n" > > + f"Last access date: {self.last_access_date}\n" > > + f"Last mod time: {self.last_mod_time}\n" > > + f"Last mod date: {self.last_mod_date}\n" > > + f"Cluster: {self.cluster}\n" > > + f"Size: {self.size_bytes}\n" > > + ) > > + > > + def __repr__(self): > > + # convert to dict > > + return str(vars(self)) > > + > > + > > +class Fat16: > > + def __init__( > > + self, > > + start_sector: int, > > + size: int, > > + sector_reader: callable, > > + sector_writer: callable, > > + ): > > + self.start_sector = start_sector > > + self.size_in_sectors = size > > + self.sector_reader = sector_reader > > + self.sector_writer = sector_writer > > + > > + self.boot_sector = FatBootSector(self.sector_reader(start_sector)) > > + > > + fat_size_in_sectors = \ > > + self.boot_sector.fat_size * self.boot_sector.fat_count > > + self.fats = self.read_sectors( > > + self.boot_sector.reserved_sectors, fat_size_in_sectors > > + ) > > + self.fats_dirty_sectors = set() > > + > > + def read_sectors(self, start_sector: int, num_sectors: int) -> bytes: > > + return self.sector_reader(start_sector + self.start_sector, num_sectors) > > + > > + def write_sectors(self, start_sector: int, data: bytes): > > + return self.sector_writer(start_sector + self.start_sector, data) > > + > > + def directory_from_bytes( > > + self, data: bytes, start_sector: int > > + ) -> List[FatDirectoryEntry]: > > + """ > > + Convert `bytes` into a list of `FatDirectoryEntry` objects. > > + Will ignore long file names. > > + Will stop when it encounters a 0x00 byte. > > + """ > > + > > + entries = [] > > + for i in range(0, len(data), DIRENTRY_SIZE): > > + entry = data[i : i + DIRENTRY_SIZE] > > + > > + current_sector = start_sector + (i // SECTOR_SIZE) > > + current_offset = i % SECTOR_SIZE > > + > > + if entry[0] == 0: > > + break > > + elif entry[0] == 0xE5: > > + # Deleted file > > + continue > > + > > + if entry[11] & 0xF == 0xF: > > + # Long file name > > + continue > > + > > + entries.append( > > + FatDirectoryEntry(entry, current_sector, current_offset)) > > + return entries > > + > > + def read_root_directory(self) -> List[FatDirectoryEntry]: > > + root_dir = self.read_sectors( > > + self.boot_sector.root_dir_start(), self.boot_sector.root_dir_size() > > + ) > > + return self.directory_from_bytes(root_dir, > > + self.boot_sector.root_dir_start()) > > + > > + def read_fat_entry(self, cluster: int) -> int: > > + """ > > + Read the FAT entry for the given cluster. > > + """ > > + fat_offset = cluster * 2 # FAT16 > > + return int.from_bytes(self.fats[fat_offset : fat_offset + 2], "little") > > + > > + def write_fat_entry(self, cluster: int, value: int): > > + """ > > + Write the FAT entry for the given cluster. > > + """ > > + fat_offset = cluster * 2 > > + self.fats = ( > > + self.fats[:fat_offset] > > + + value.to_bytes(2, "little") > > + + self.fats[fat_offset + 2 :] > > + ) > > + self.fats_dirty_sectors.add(fat_offset // SECTOR_SIZE) > > + > > + def flush_fats(self): > > + """ > > + Write the FATs back to the disk. > > + """ > > + for sector in self.fats_dirty_sectors: > > + data = self.fats[sector * SECTOR_SIZE : (sector + 1) * SECTOR_SIZE] > > + sector = self.boot_sector.reserved_sectors + sector > > + self.write_sectors(sector, data) > > + self.fats_dirty_sectors = set() > > + > > + def next_cluster(self, cluster: int) -> int | None: > > + """ > > + Get the next cluster in the chain. > > + If its `None`, then its the last cluster. > > + The function will crash if the next cluster > > + is `FREE` (unexpected) or invalid entry. > > + """ > > + fat_entry = self.read_fat_entry(cluster) > > + if fat_entry == 0: > > + raise Exception("Unexpected: FREE cluster") > > + elif fat_entry == 1: > > + raise Exception("Unexpected: RESERVED cluster") > > + elif fat_entry >= 0xFFF8: > > + return None > > + elif fat_entry >= 0xFFF7: > > + raise Exception("Invalid FAT entry") > > + else: > > + return fat_entry > > + > > + def next_free_cluster(self) -> int: > > + """ > > + Find the next free cluster. > > + """ > > + # simple linear search > > + for i in range(2, 0xFFFF): > > + if self.read_fat_entry(i) == 0: > > + return i > > + raise Exception("No free clusters") > > + > > + def read_cluster(self, cluster: int) -> bytes: > > + """ > > + Read the cluster at the given cluster. > > + """ > > + return self.read_sectors( > > + self.boot_sector.first_sector_of_cluster(cluster), > > + self.boot_sector.sectors_per_cluster, > > + ) > > + > > + def write_cluster(self, cluster: int, data: bytes): > > + """ > > + Write the cluster at the given cluster. > > + """ > > + assert len(data) == self.boot_sector.cluster_bytes() > > + return self.write_sectors( > > + self.boot_sector.first_sector_of_cluster(cluster), > > + data, > > + ) > > + > > + def read_directory(self, cluster: int) -> List[FatDirectoryEntry]: > > + """ > > + Read the directory at the given cluster. > > + """ > > + entries = [] > > + while cluster is not None: > > + data = self.read_cluster(cluster) > > + entries.extend( > > + self.directory_from_bytes( > > + data, self.boot_sector.first_sector_of_cluster(cluster) > > + ) > > + ) > > + cluster = self.next_cluster(cluster) > > + return entries > > + > > + def add_direntry(self, > > + cluster: int | None, > > + name: str, ext: str, > > + attributes: int): > > + """ > > + Add a new directory entry to the given cluster. > > + If the cluster is `None`, then it will be added to the root directory. > > + """ > > + > > + def find_free_entry(data: bytes): > > + for i in range(0, len(data), DIRENTRY_SIZE): > > + entry = data[i : i + DIRENTRY_SIZE] > > + if entry[0] == 0 or entry[0] == 0xE5: > > + return i > > + return None > > + > > + assert len(name) <= 8, "Name must be 8 characters or less" > > + assert len(ext) <= 3, "Ext must be 3 characters or less" > > + assert attributes % 0x15 != 0x15, "Invalid attributes" > > + > > + # initial dummy data > > + new_entry = FatDirectoryEntry(b"\0" * 32, 0, 0) > > + new_entry.name = name.ljust(8, " ") > > + new_entry.ext = ext.ljust(3, " ") > > + new_entry.attributes = attributes > > + new_entry.reserved = 0 > > + new_entry.create_time_tenth = 0 > > + new_entry.create_time = 0 > > + new_entry.create_date = 0 > > + new_entry.last_access_date = 0 > > + new_entry.last_mod_time = 0 > > + new_entry.last_mod_date = 0 > > + new_entry.cluster = self.next_free_cluster() > > + new_entry.size_bytes = 0 > > + > > + # mark as EOF > > + self.write_fat_entry(new_entry.cluster, 0xFFFF) > > + > > + if cluster is None: > > + for i in range(self.boot_sector.root_dir_size()): > > + sector_data = self.read_sectors( > > + self.boot_sector.root_dir_start() + i, 1 > > + ) > > + offset = find_free_entry(sector_data) > > + if offset is not None: > > + new_entry.sector = self.boot_sector.root_dir_start() + i > > + new_entry.offset = offset > > + self.update_direntry(new_entry) > > + return new_entry > > + else: > > + while cluster is not None: > > + data = self.read_cluster(cluster) > > + offset = find_free_entry(data) > > + if offset is not None: > > + new_entry.sector = self.boot_sector.first_sector_of_cluster( > > + cluster > > + ) + (offset // SECTOR_SIZE) > > + new_entry.offset = offset % SECTOR_SIZE > > + self.update_direntry(new_entry) > > + return new_entry > > + cluster = self.next_cluster(cluster) > > + > > + raise Exception("No free directory entries") > > + > > + def update_direntry(self, entry: FatDirectoryEntry): > > + """ > > + Write the directory entry back to the disk. > > + """ > > + sector = self.read_sectors(entry.sector, 1) > > + sector = ( > > + sector[: entry.offset] > > + + entry.as_bytes() > > + + sector[entry.offset + DIRENTRY_SIZE :] > > + ) > > + self.write_sectors(entry.sector, sector) > > + > > + def find_direntry(self, path: str) -> FatDirectoryEntry | None: > > + """ > > + Find the directory entry for the given path. > > + """ > > + assert path[0] == "/", "Path must start with /" > > + > > + path = path[1:] # remove the leading / > > + parts = path.split("/") > > + directory = self.read_root_directory() > > + > > + current_entry = None > > + > > + for i, part in enumerate(parts): > > + is_last = i == len(parts) - 1 > > + > > + for entry in directory: > > + if entry.whole_name() == part: > > + current_entry = entry > > + break > > + if current_entry is None: > > + return None > > + > > + if is_last: > > + return current_entry > > + else: > > + if current_entry.attributes & 0x10 == 0: > > + raise Exception( > > + f"{current_entry.whole_name()} is not a directory") > > + else: > > + directory = self.read_directory(current_entry.cluster) > > + > > + def read_file(self, entry: FatDirectoryEntry) -> bytes: > > + """ > > + Read the content of the file at the given path. > > + """ > > + if entry is None: > > + return None > > + if entry.attributes & 0x10 != 0: > > + raise Exception(f"{entry.whole_name()} is a directory") > > + > > + data = b"" > > + cluster = entry.cluster > > + while cluster is not None and len(data) <= entry.size_bytes: > > + data += self.read_cluster(cluster) > > + cluster = self.next_cluster(cluster) > > + return data[: entry.size_bytes] > > + > > + def truncate_file(self, entry: FatDirectoryEntry, new_size: int): > > + """ > > + Truncate the file at the given path to the new size. > > + """ > > + if entry is None: > > + return Exception("entry is None") > > + if entry.attributes & 0x10 != 0: > > + raise Exception(f"{entry.whole_name()} is a directory") > > + > > + def clusters_from_size(size: int): > > + return ( > > + size + self.boot_sector.cluster_bytes() - 1 > > + ) // self.boot_sector.cluster_bytes() > > + > > + # First, allocate new FATs if we need to > > + required_clusters = clusters_from_size(new_size) > > + current_clusters = clusters_from_size(entry.size_bytes) > > + > > + affected_clusters = set() > > + > > + # Keep at least one cluster, easier to manage this way > > + if required_clusters == 0: > > + required_clusters = 1 > > + if current_clusters == 0: > > + current_clusters = 1 > > + > > + if required_clusters > current_clusters: > > + # Allocate new clusters > > + cluster = entry.cluster > > + to_add = required_clusters > > + for _ in range(current_clusters - 1): > > + to_add -= 1 > > + cluster = self.next_cluster(cluster) > > + assert required_clusters > 0, "No new clusters to allocate" > > + assert cluster is not None, "Cluster is None" > > + assert self.next_cluster(cluster) is None, \ > > + "Cluster is not the last cluster" > > + > > + # Allocate new clusters > > + for _ in range(to_add - 1): > > + new_cluster = self.next_free_cluster() > > + self.write_fat_entry(cluster, new_cluster) > > + self.write_fat_entry(new_cluster, 0xFFFF) > > + cluster = new_cluster > > + > > + elif required_clusters < current_clusters: > > + # Truncate the file > > + cluster = entry.cluster > > + for _ in range(required_clusters - 1): > > + cluster = self.next_cluster(cluster) > > + assert cluster is not None, "Cluster is None" > > + > > + next_cluster = self.next_cluster(cluster) > > + # mark last as EOF > > + self.write_fat_entry(cluster, 0xFFFF) > > + # free the rest > > + while next_cluster is not None: > > + cluster = next_cluster > > + next_cluster = self.next_cluster(next_cluster) > > + self.write_fat_entry(cluster, 0) > > + > > + self.flush_fats() > > + > > + # verify number of clusters > > + cluster = entry.cluster > > + count = 0 > > + while cluster is not None: > > + count += 1 > > + affected_clusters.add(cluster) > > + cluster = self.next_cluster(cluster) > > + assert ( > > + count == required_clusters > > + ), f"Expected {required_clusters} clusters, got {count}" > > + > > + # update the size > > + entry.size_bytes = new_size > > + self.update_direntry(entry) > > + > > + # trigger every affected cluster > > + for cluster in affected_clusters: > > + first_sector = self.boot_sector.first_sector_of_cluster(cluster) > > + first_sector_data = self.read_sectors(first_sector, 1) > > + self.write_sectors(first_sector, first_sector_data) > > + > > + def write_file(self, entry: FatDirectoryEntry, data: bytes): > > + """ > > + Write the content of the file at the given path. > > + """ > > + if entry is None: > > + return Exception("entry is None") > > + if entry.attributes & 0x10 != 0: > > + raise Exception(f"{entry.whole_name()} is a directory") > > + > > + data_len = len(data) > > + > > + self.truncate_file(entry, data_len) > > + > > + cluster = entry.cluster > > + while cluster is not None: > > + data_to_write = data[: self.boot_sector.cluster_bytes()] > > + last_data = False > > + if len(data_to_write) < self.boot_sector.cluster_bytes(): > > + last_data = True > > + old_data = self.read_cluster(cluster) > > + data_to_write += old_data[len(data_to_write) :] > > + > > + self.write_cluster(cluster, data_to_write) > > + data = data[self.boot_sector.cluster_bytes() :] > > + if len(data) == 0: > > + break > > + cluster = self.next_cluster(cluster) > > + > > + assert len(data) == 0, \ > > + "Data was not written completely, clusters missing" > > + > > + def create_file(self, path: str): > > + """ > > + Create a new file at the given path. > > + """ > > + assert path[0] == "/", "Path must start with /" > > + > > + path = path[1:] # remove the leading / > > + > > + parts = path.split("/") > > + > > + directory_cluster = None > > + directory = self.read_root_directory() > > + > > + parts, filename = parts[:-1], parts[-1] > > + > > + for i, part in enumerate(parts): > > + current_entry = None > > + for entry in directory: > > + if entry.whole_name() == part: > > + current_entry = entry > > + break > > + if current_entry is None: > > + return None > > + > > + if current_entry.attributes & 0x10 == 0: > > + raise Exception( > > + f"{current_entry.whole_name()} is not a directory") > > + else: > > + directory = self.read_directory(current_entry.cluster) > > + directory_cluster = current_entry.cluster > > + > > + # add new entry to the directory > > + > > + filename, ext = filename.split(".") > > + > > + if len(ext) > 3: > > + raise Exception("Ext must be 3 characters or less") > > + if len(filename) > 8: > > + raise Exception("Name must be 8 characters or less") > > + > > + for c in filename + ext: > > + > > + if c not in ALLOWED_FILE_CHARS: > > + raise Exception("Invalid character in filename") > > + > > + return self.add_direntry(directory_cluster, filename, ext, 0) > > diff --git a/tests/qemu-iotests/testenv.py b/tests/qemu-iotests/testenv.py > > index 588f30a4f1..4053d29de4 100644 > > --- a/tests/qemu-iotests/testenv.py > > +++ b/tests/qemu-iotests/testenv.py > > @@ -250,7 +250,7 @@ def __init__(self, source_dir: str, build_dir: str, > > self.qemu_img_options = os.getenv('QEMU_IMG_OPTIONS') > > self.qemu_nbd_options = os.getenv('QEMU_NBD_OPTIONS') > > > > - is_generic = self.imgfmt not in ['bochs', 'cloop', 'dmg'] > > + is_generic = self.imgfmt not in ['bochs', 'cloop', 'dmg', 'vvfat'] > > self.imgfmt_generic = 'true' if is_generic else 'false' > > > > self.qemu_io_options = f'--cache {self.cachemode} --aio {self.aiomode}' > > diff --git a/tests/qemu-iotests/tests/vvfat b/tests/qemu-iotests/tests/vvfat > > new file mode 100755 > > index 0000000000..113d7d3270 > > --- /dev/null > > +++ b/tests/qemu-iotests/tests/vvfat > > @@ -0,0 +1,440 @@ > > +#!/usr/bin/env python3 > > +# group: rw vvfat > > +# > > +# Test vvfat driver implementation > > +# Here, we use a simple FAT16 implementation and check the behavior of the vvfat driver. > > +# > > +# Copyright (C) 2024 Amjad Alsharafi <amjadsharafi10@gmail.com> > > +# > > +# This program is free software; you can redistribute it and/or modify > > +# it under the terms of the GNU General Public License as published by > > +# the Free Software Foundation; either version 2 of the License, or > > +# (at your option) any later version. > > +# > > +# This program is distributed in the hope that it will be useful, > > +# but WITHOUT ANY WARRANTY; without even the implied warranty of > > +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > > +# GNU General Public License for more details. > > +# > > +# You should have received a copy of the GNU General Public License > > +# along with this program. If not, see <http://www.gnu.org/licenses/>. > > + > > +import os, shutil > > +import iotests > > +from iotests import imgfmt, QMPTestCase > > +from fat16 import MBR, Fat16, DIRENTRY_SIZE > > + > > +filesystem = os.path.join(iotests.test_dir, "filesystem") > > + > > +nbd_sock = iotests.file_path("nbd.sock", base_dir=iotests.sock_dir) > > +nbd_uri = "nbd+unix:///disk?socket=" + nbd_sock > > + > > +SECTOR_SIZE = 512 > > + > > + > > +class TestVVFatDriver(QMPTestCase): > > + def setUp(self) -> None: > > + if os.path.exists(filesystem): > > + if os.path.isdir(filesystem): > > + shutil.rmtree(filesystem) > > + else: > > + print(f"Error: {filesystem} exists and is not a directory") > > + exit(1) > > + os.mkdir(filesystem) > > + > > + # Add some text files to the filesystem > > + for i in range(10): > > + with open(os.path.join(filesystem, f"file{i}.txt"), "w") as f: > > + f.write(f"Hello, world! {i}\n") > > + > > + # Add 2 large files, above the cluster size (8KB) > > + with open(os.path.join(filesystem, "large1.txt"), "wb") as f: > > + # write 'A' * 1KB, 'B' * 1KB, 'C' * 1KB, ... > > + for i in range(8 * 2): # two clusters > > + f.write(bytes([0x41 + i] * 1024)) > > + > > + with open(os.path.join(filesystem, "large2.txt"), "wb") as f: > > + # write 'A' * 1KB, 'B' * 1KB, 'C' * 1KB, ... > > + for i in range(8 * 3): # 3 clusters > > + f.write(bytes([0x41 + i] * 1024)) > > + > > + self.vm = iotests.VM() > > + > > + self.vm.add_blockdev( > > + self.vm.qmp_to_opts( > > + { > > + "driver": imgfmt, > > + "node-name": "disk", > > + "rw": "true", > > + "fat-type": "16", > > + "dir": filesystem, > > + } > > + ) > > + ) > > + > > + self.vm.launch() > > + > > + self.vm.qmp_log("block-dirty-bitmap-add", **{"node": "disk", "name": "bitmap0"}) > > + > > + # attach nbd server > > + self.vm.qmp_log( > > + "nbd-server-start", > > + **{"addr": {"type": "unix", "data": {"path": nbd_sock}}}, > > + filters=[], > > + ) > > + > > + self.vm.qmp_log( > > + "nbd-server-add", > > + **{"device": "disk", "writable": True, "bitmap": "bitmap0"}, > > + ) > > + > > + self.qio = iotests.QemuIoInteractive("-f", "raw", nbd_uri) > > + > > + def tearDown(self) -> None: > > + self.qio.close() > > + self.vm.shutdown() > > + # print(self.vm.get_log()) > > + shutil.rmtree(filesystem) > > + > > + def read_sectors(self, sector: int, num: int = 1) -> bytes: > > + """ > > + Read `num` sectors starting from `sector` from the `disk`. > > + This uses `QemuIoInteractive` to read the sectors into `stdout` and then parse the output. > > + """ > > + self.assertGreater(num, 0) > > + # The output contains the content of the sector in hex dump format > > + # We need to extract the content from it > > + output = self.qio.cmd(f"read -v {sector * SECTOR_SIZE} {num * SECTOR_SIZE}") > > + # Each row is 16 bytes long, and we are writing `num` sectors > > + rows = num * SECTOR_SIZE // 16 > > + output_rows = output.split("\n")[:rows] > > + > > + hex_content = "".join( > > + [(row.split(": ")[1]).split(" ")[0] for row in output_rows] > > + ) > > + bytes_content = bytes.fromhex(hex_content) > > + > > + self.assertEqual(len(bytes_content), num * SECTOR_SIZE) > > + > > + return bytes_content > > + > > + def write_sectors(self, sector: int, data: bytes): > > + """ > > + Write `data` to the `disk` starting from `sector`. > > + This uses `QemuIoInteractive` to write the data into the disk. > > + """ > > + > > + self.assertGreater(len(data), 0) > > + self.assertEqual(len(data) % SECTOR_SIZE, 0) > > + > > + temp_file = os.path.join(iotests.test_dir, "temp.bin") > > + with open(temp_file, "wb") as f: > > + f.write(data) > > + > > + self.qio.cmd(f"write -s {temp_file} {sector * SECTOR_SIZE} {len(data)}") > > + > > + os.remove(temp_file) > > + > > + def init_fat16(self): > > + mbr = MBR(self.read_sectors(0)) > > + return Fat16( > > + mbr.partition_table[0]["start_lba"], > > + mbr.partition_table[0]["size"], > > + self.read_sectors, > > + self.write_sectors, > > + ) > > + > > + # Tests > > + > > + def test_fat_filesystem(self): > > + """ > > + Test that vvfat produce a valid FAT16 and MBR sectors > > + """ > > + mbr = MBR(self.read_sectors(0)) > > + > > + self.assertEqual(mbr.partition_table[0]["status"], 0x80) > > + self.assertEqual(mbr.partition_table[0]["type"], 6) > > + > > + fat16 = Fat16( > > + mbr.partition_table[0]["start_lba"], > > + mbr.partition_table[0]["size"], > > + self.read_sectors, > > + self.write_sectors, > > + ) > > + self.assertEqual(fat16.boot_sector.bytes_per_sector, 512) > > + self.assertEqual(fat16.boot_sector.volume_label, "QEMU VVFAT") > > + > > + def test_read_root_directory(self): > > + """ > > + Test the content of the root directory > > + """ > > + fat16 = self.init_fat16() > > + > > + root_dir = fat16.read_root_directory() > > + > > + self.assertEqual(len(root_dir), 13) # 12 + 1 special file > > + > > + files = { > > + "QEMU VVF.AT": 0, # special empty file > > + "FILE0.TXT": 16, > > + "FILE1.TXT": 16, > > + "FILE2.TXT": 16, > > + "FILE3.TXT": 16, > > + "FILE4.TXT": 16, > > + "FILE5.TXT": 16, > > + "FILE6.TXT": 16, > > + "FILE7.TXT": 16, > > + "FILE8.TXT": 16, > > + "FILE9.TXT": 16, > > + "LARGE1.TXT": 0x2000 * 2, > > + "LARGE2.TXT": 0x2000 * 3, > > + } > > + > > + for entry in root_dir: > > + self.assertIn(entry.whole_name(), files) > > + self.assertEqual(entry.size_bytes, files[entry.whole_name()]) > > + > > + def test_direntry_as_bytes(self): > > + """ > > + Test if we can convert Direntry back to bytes, so that we can write it back to the disk safely. > > + """ > > + fat16 = self.init_fat16() > > + > > + root_dir = fat16.read_root_directory() > > + first_entry_bytes = fat16.read_sectors(fat16.boot_sector.root_dir_start(), 1) > > + # The first entry won't be deleted, so we can compare it with the first entry in the root directory > > + self.assertEqual(root_dir[0].as_bytes(), first_entry_bytes[:DIRENTRY_SIZE]) > > + > > + def test_read_files(self): > > + """ > > + Test reading the content of the files > > + """ > > + fat16 = self.init_fat16() > > + > > + for i in range(10): > > + file = fat16.find_direntry(f"/FILE{i}.TXT") > > + self.assertIsNotNone(file) > > + self.assertEqual( > > + fat16.read_file(file), f"Hello, world! {i}\n".encode("ascii") > > + ) > > + > > + # test large files > > + large1 = fat16.find_direntry("/LARGE1.TXT") > > + with open(os.path.join(filesystem, "large1.txt"), "rb") as f: > > + self.assertEqual(fat16.read_file(large1), f.read()) > > + > > + large2 = fat16.find_direntry("/LARGE2.TXT") > > + self.assertIsNotNone(large2) > > + with open(os.path.join(filesystem, "large2.txt"), "rb") as f: > > + self.assertEqual(fat16.read_file(large2), f.read()) > > + > > + def test_write_file_same_content_direct(self): > > + """ > > + Similar to `test_write_file_in_same_content`, but we write the file directly clusters > > + and thus we don't go through the modification of direntry. > > + """ > > + fat16 = self.init_fat16() > > + > > + file = fat16.find_direntry("/FILE0.TXT") > > + self.assertIsNotNone(file) > > + > > + data = fat16.read_cluster(file.cluster) > > + fat16.write_cluster(file.cluster, data) > > + > > + with open(os.path.join(filesystem, "file0.txt"), "rb") as f: > > + self.assertEqual(fat16.read_file(file), f.read()) > > + > > + def test_write_file_in_same_content(self): > > + """ > > + Test writing the same content to the file back to it > > + """ > > + fat16 = self.init_fat16() > > + > > + file = fat16.find_direntry("/FILE0.TXT") > > + self.assertIsNotNone(file) > > + > > + self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n") > > + > > + fat16.write_file(file, b"Hello, world! 0\n") > > + > > + self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n") > > + > > + with open(os.path.join(filesystem, "file0.txt"), "rb") as f: > > + self.assertEqual(f.read(), b"Hello, world! 0\n") > > + > > + def test_modify_content_same_clusters(self): > > + """ > > + Test modifying the content of the file without changing the number of clusters > > + """ > > + fat16 = self.init_fat16() > > + > > + file = fat16.find_direntry("/FILE0.TXT") > > + self.assertIsNotNone(file) > > + > > + new_content = b"Hello, world! Modified\n" > > + self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n") > > + > > + fat16.write_file(file, new_content) > > + > > + self.assertEqual(fat16.read_file(file), new_content) > > + > > + with open(os.path.join(filesystem, "file0.txt"), "rb") as f: > > + self.assertEqual(f.read(), new_content) > > + > > + def test_truncate_file_same_clusters_less(self): > > + """ > > + Test truncating the file without changing number of clusters > > + Test decreasing the file size > > + """ > > + fat16 = self.init_fat16() > > + > > + file = fat16.find_direntry("/FILE0.TXT") > > + self.assertIsNotNone(file) > > + > > + self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n") > > + > > + fat16.truncate_file(file, 5) > > + > > + new_content = fat16.read_file(file) > > + > > + self.assertEqual(new_content, b"Hello") > > + > > + with open(os.path.join(filesystem, "file0.txt"), "rb") as f: > > + self.assertEqual(f.read(), new_content) > > + > > + def test_truncate_file_same_clusters_more(self): > > + """ > > + Test truncating the file without changing number of clusters > > + Test increase the file size > > + """ > > + fat16 = self.init_fat16() > > + > > + file = fat16.find_direntry("/FILE0.TXT") > > + self.assertIsNotNone(file) > > + > > + self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n") > > + > > + fat16.truncate_file(file, 20) > > + > > + new_content = fat16.read_file(file) > > + > > + # random pattern will be appended to the file, and its not always the same > > + self.assertEqual(new_content[:16], b"Hello, world! 0\n") > > + self.assertEqual(len(new_content), 20) > > + > > + with open(os.path.join(filesystem, "file0.txt"), "rb") as f: > > + self.assertEqual(f.read(), new_content) > > + > > + def test_write_large_file(self): > > + """ > > + Test writing a large file > > + """ > > + fat16 = self.init_fat16() > > + > > + file = fat16.find_direntry("/LARGE1.TXT") > > + self.assertIsNotNone(file) > > + > > + # The content of LARGE1 is A * 1KB, B * 1KB, C * 1KB, ..., P * 1KB > > + # Lets change it to be Z * 1KB, Y * 1KB, X * 1KB, ..., K * 1KB > > + # without changing the number of clusters or filesize > > + new_content = b"".join([bytes([0x5A - i] * 1024) for i in range(16)]) > > + > > + fat16.write_file(file, new_content) > > + > > + with open(os.path.join(filesystem, "large1.txt"), "rb") as f: > > + self.assertEqual(f.read(), new_content) > > + > > + def test_truncate_file_change_clusters_less(self): > > + """ > > + Test truncating a file by reducing the number of clusters > > + """ > > + fat16 = self.init_fat16() > > + > > + file = fat16.find_direntry("/LARGE1.TXT") > > + self.assertIsNotNone(file) > > + > > + fat16.truncate_file(file, 1) > > + > > + self.assertEqual(fat16.read_file(file), b"A") > > + > > + with open(os.path.join(filesystem, "large1.txt"), "rb") as f: > > + self.assertEqual(f.read(), b"A") > > + > > + def test_write_file_change_clusters_less(self): > > + """ > > + Test truncating a file by reducing the number of clusters > > + """ > > + fat16 = self.init_fat16() > > + > > + file = fat16.find_direntry("/LARGE2.TXT") > > + self.assertIsNotNone(file) > > + > > + new_content = b"Hello, world! This was a large file\n" > > + new_content = b"X" * 8 * 1024 + b"Y" * 8 * 1024 > > This sets and then immediately overwrites new_content. What was intended > here? > > > + > > + fat16.write_file(file, new_content) > > + > > + self.assertEqual(fat16.read_file(file), new_content) > > + > > + with open(os.path.join(filesystem, "large2.txt"), "rb") as f: > > + self.assertEqual(f.read(), new_content) > > + > > + def test_write_file_change_clusters_more(self): > > + """ > > + Test truncating a file by increasing the number of clusters > > + """ > > + fat16 = self.init_fat16() > > + > > + file = fat16.find_direntry("/LARGE2.TXT") > > + self.assertIsNotNone(file) > > + > > + new_content = b"X" * 8 * 1024 + b"Y" * 8 * 1024 + b"Z" * 8 * 1024 > > + > > + fat16.write_file(file, new_content) > > + > > + with open(os.path.join(filesystem, "large2.txt"), "rb") as f: > > + self.assertEqual(f.read(), new_content) > > + > > + def test_write_file_change_clusters_more_non_last_file(self): > > + """ > > + Test truncating a file by increasing the number of clusters > > + This is a special variant of the above test, where we write to > > + a file so that when allocating new clusters, it won't have contiguous clusters > > + """ > > + fat16 = self.init_fat16() > > + > > + file = fat16.find_direntry("/LARGE1.TXT") > > + self.assertIsNotNone(file) > > + > > + new_content = b"X" * 8 * 1024 + b"Y" * 8 * 1024 + b"Z" * 8 * 1024 > > + > > + fat16.write_file(file, new_content) > > + > > + with open(os.path.join(filesystem, "large1.txt"), "rb") as f: > > + self.assertEqual(f.read(), new_content) > > + > > + def test_create_file(self): > > + """ > > + Test creating a new file > > + """ > > + fat16 = self.init_fat16() > > + > > + new_file = fat16.create_file("/NEWFILE.TXT") > > + > > + self.assertIsNotNone(new_file) > > + self.assertEqual(new_file.size_bytes, 0) > > + > > + new_content = b"Hello, world! New file\n" > > + fat16.write_file(new_file, new_content) > > + > > + self.assertEqual(fat16.read_file(new_file), new_content) > > + > > + with open(os.path.join(filesystem, "newfile.txt"), "rb") as f: > > + self.assertEqual(f.read(), new_content) > > + > > + # TODO: support deleting files > > + > > + > > +if __name__ == "__main__": > > + # This is a specific test for vvfat driver > > + iotests.main(supported_fmts=["vvfat"], supported_protocols=["file"]) > > diff --git a/tests/qemu-iotests/tests/vvfat.out b/tests/qemu-iotests/tests/vvfat.out > > new file mode 100755 > > index 0000000000..96961ed0b5 > > --- /dev/null > > +++ b/tests/qemu-iotests/tests/vvfat.out > > @@ -0,0 +1,5 @@ > > +............... > > +---------------------------------------------------------------------- > > +Ran 15 tests > > + > > +OK > > With the updated test, I can catch the problems that are fixed by > patches 1 and 2, but it still doesn't need patch 3 to pass. > > Kevin > Thanks for reviewing, those are all mistakes, and I fixed them (included a small patch to fix these issues at the end...). Regarding the failing test, I forgot to also read the files from the fat driver, and instead I was just reading from the host filesystem. I'm not sure exactly, why reading from the filesystem works, but reading from the driver (i.e. guest) gives the weird buggy result. I have updated the test in the patch below to reflect this. I would love if you can test the patch below and let me know if the issues are fixed, after that I can send the new series. Thanks, Amjad --- PATCH --- diff --git a/tests/qemu-iotests/fat16.py b/tests/qemu-iotests/fat16.py index baf801b4d5..411a277906 100644 --- a/tests/qemu-iotests/fat16.py +++ b/tests/qemu-iotests/fat16.py @@ -62,13 +62,16 @@ def __init__(self, data: bytes): self.reserved_sectors = int.from_bytes(data[14:16], "little") self.fat_count = data[16] self.root_entries = int.from_bytes(data[17:19], "little") + total_sectors_16 = int.from_bytes(data[19:21], "little") self.media_descriptor = data[21] - self.fat_size = int.from_bytes(data[22:24], "little") self.sectors_per_fat = int.from_bytes(data[22:24], "little") self.sectors_per_track = int.from_bytes(data[24:26], "little") self.heads = int.from_bytes(data[26:28], "little") self.hidden_sectors = int.from_bytes(data[28:32], "little") - self.total_sectors = int.from_bytes(data[32:36], "little") + total_sectors_32 = int.from_bytes(data[32:36], "little") + assert total_sectors_16 == 0 or total_sectors_32 == 0, \ + "Both total sectors (16 and 32) fields are non-zero" + self.total_sectors = total_sectors_16 or total_sectors_32 self.drive_number = data[36] self.volume_id = int.from_bytes(data[39:43], "little") self.volume_label = data[43:54].decode("ascii").strip() @@ -208,7 +211,7 @@ def __init__( self.boot_sector = FatBootSector(self.sector_reader(start_sector)) fat_size_in_sectors = \ - self.boot_sector.fat_size * self.boot_sector.fat_count + self.boot_sector.sectors_per_fat * self.boot_sector.fat_count self.fats = self.read_sectors( self.boot_sector.reserved_sectors, fat_size_in_sectors ) diff --git a/tests/qemu-iotests/tests/vvfat b/tests/qemu-iotests/tests/vvfat index 113d7d3270..8d04f292e3 100755 --- a/tests/qemu-iotests/tests/vvfat +++ b/tests/qemu-iotests/tests/vvfat @@ -369,7 +369,6 @@ class TestVVFatDriver(QMPTestCase): file = fat16.find_direntry("/LARGE2.TXT") self.assertIsNotNone(file) - new_content = b"Hello, world! This was a large file\n" new_content = b"X" * 8 * 1024 + b"Y" * 8 * 1024 fat16.write_file(file, new_content) @@ -388,10 +387,13 @@ class TestVVFatDriver(QMPTestCase): file = fat16.find_direntry("/LARGE2.TXT") self.assertIsNotNone(file) - new_content = b"X" * 8 * 1024 + b"Y" * 8 * 1024 + b"Z" * 8 * 1024 + # from 3 clusters to 4 clusters + new_content = b"W" * 8 * 1024 + b"X" * 8 * 1024 + b"Y" * 8 * 1024 + b"Z" * 8 * 1024 fat16.write_file(file, new_content) + self.assertEqual(fat16.read_file(file), new_content) + with open(os.path.join(filesystem, "large2.txt"), "rb") as f: self.assertEqual(f.read(), new_content) @@ -406,10 +408,13 @@ class TestVVFatDriver(QMPTestCase): file = fat16.find_direntry("/LARGE1.TXT") self.assertIsNotNone(file) + # from 2 clusters to 3 clusters new_content = b"X" * 8 * 1024 + b"Y" * 8 * 1024 + b"Z" * 8 * 1024 fat16.write_file(file, new_content) + self.assertEqual(fat16.read_file(file), new_content) + with open(os.path.join(filesystem, "large1.txt"), "rb") as f: self.assertEqual(f.read(), new_content)
Am 10.06.2024 um 16:11 hat Amjad Alsharafi geschrieben: > On Mon, Jun 10, 2024 at 02:01:24PM +0200, Kevin Wolf wrote: > > With the updated test, I can catch the problems that are fixed by > > patches 1 and 2, but it still doesn't need patch 3 to pass. > > > > Kevin > > > > Thanks for reviewing, those are all mistakes, and I fixed them (included > a small patch to fix these issues at the end...). > > Regarding the failing test, I forgot to also read the files from the fat > driver, and instead I was just reading from the host filesystem. > I'm not sure exactly, why reading from the filesystem works, but reading > from the driver (i.e. guest) gives the weird buggy result. > I have updated the test in the patch below to reflect this. > > I would love if you can test the patch below and let me know if the > issues are fixed, after that I can send the new series. Yes, that looks good to me and reproduces a failure without patch 3. Kevin
© 2016 - 2024 Red Hat, Inc.