composefs/
repository.rs

1//! Content-addressable repository for composefs objects.
2//!
3//! This module provides a repository abstraction for storing and retrieving
4//! content-addressed objects, splitstreams, and images with fs-verity
5//! verification and garbage collection support.
6//!
7//! # Repository Layout
8//!
9//! A composefs repository is a directory with the following structure:
10//!
11//! ```text
12//! repository/
13//! ├── objects/                  # Content-addressed object storage
14//! │   ├── 4e/                   # First byte of fs-verity hash (hex)
15//! │   │   └── 67eaccd9fd...     # Remaining bytes of hash
16//! │   └── ...
17//! ├── images/                   # Composefs (erofs) image tracking
18//! │   ├── 4e67eaccd9fd... → ../objects/4e/67eaccd9fd...
19//! │   └── refs/
20//! │       └── myimage → ../4e67eaccd9fd...
21//! └── streams/                  # Splitstream storage
22//!     ├── oci-config-sha256:... → ../objects/XX/YYY...
23//!     ├── oci-layer-sha256:... → ../objects/XX/YYY...
24//!     └── refs/                 # Named references (GC roots)
25//!         └── mytarball → ../oci-layer-sha256:...
26//! ```
27//!
28//! # Object Storage
29//!
30//! All content is stored in `objects/` using fs-verity hashes as filenames,
31//! split into 256 subdirectories (`00`-`ff`) by the first byte for filesystem
32//! efficiency. Objects are immutable and deduplicated by content. Every file
33//! must have fs-verity enabled (except in "insecure" mode).
34//!
35//! # Images vs Streams
36//!
37//! The repository distinguishes between two types of derived content:
38//!
39//! - **Images** (`images/`): Composefs/erofs filesystem images that can be mounted.
40//!   These are tracked separately for security: only images produced by the repository
41//!   (via mkcomposefs) should be mounted, to avoid exposing the kernel's filesystem
42//!   code to untrusted data.
43//!
44//! - **Streams** (`streams/`): Splitstreams storing arbitrary data (e.g., OCI
45//!   image layers and configs). Symlinks map content identifiers to objects.
46//!
47//! # References (GC Roots)
48//!
49//! Both `images/refs/` and `streams/refs/` contain named symlinks that serve as
50//! garbage collection roots. Any object reachable from a ref is protected from GC.
51//! Refs can be organized hierarchically (e.g., `refs/myapp/layer1`).
52//!
53//! See [`Repository::name_stream`] for creating stream refs.
54//!
55//! # Garbage Collection
56//!
57//! The repository supports garbage collection via [`Repository::gc()`]. Objects
58//! not reachable from any reference are deleted. The GC algorithm:
59//!
60//! 1. Walks all references in `images/refs/` and `streams/refs/` to find roots
61//! 2. Transitively follows stream references to find all reachable objects
62//! 3. Deletes unreferenced objects, images, and streams
63//!
64//! # fs-verity Integration
65//!
66//! When running on a filesystem that supports fs-verity (ext4, btrfs, etc.), objects
67//! are stored with fs-verity enabled, providing kernel-level integrity verification.
68//! In "insecure" mode, fs-verity is not required, allowing operation on filesystems
69//! like tmpfs or overlayfs.
70//!
71//! # Concurrency
72//!
73//! The repository uses advisory file locking (flock) to coordinate concurrent access.
74//! Opening a repository acquires a shared lock, while garbage collection requires
75//! an exclusive lock. This ensures GC cannot run while other processes have the
76//! repository open.
77//!
78//! For more details, see the [repository design documentation](../../../doc/repository.md).
79
80use std::{
81    collections::{HashMap, HashSet},
82    ffi::{CStr, CString, OsStr, OsString},
83    fs::{canonicalize, File},
84    io::{Read, Write},
85    os::{
86        fd::{AsFd, OwnedFd},
87        unix::ffi::OsStrExt,
88    },
89    path::{Path, PathBuf},
90    sync::Arc,
91    thread::available_parallelism,
92};
93
94use log::{debug, trace};
95use tokio::sync::Semaphore;
96
97use anyhow::{bail, ensure, Context, Result};
98use once_cell::sync::OnceCell;
99use rustix::{
100    fs::{
101        flock, linkat, mkdirat, open, openat, readlinkat, statat, syncfs, unlinkat, AtFlags, Dir,
102        FileType, FlockOperation, Mode, OFlags, CWD,
103    },
104    io::{Errno, Result as ErrnoResult},
105};
106
107use crate::{
108    fsverity::{
109        compute_verity, enable_verity_maybe_copy, ensure_verity_equal, measure_verity,
110        CompareVerityError, EnableVerityError, FsVerityHashValue, FsVerityHasher,
111        MeasureVerityError,
112    },
113    mount::{composefs_fsmount, mount_at},
114    splitstream::{SplitStreamReader, SplitStreamWriter},
115    util::{proc_self_fd, replace_symlinkat, ErrnoFilter},
116};
117
118/// Call openat() on the named subdirectory of "dirfd", possibly creating it first.
119///
120/// We assume that the directory will probably exist (ie: we try the open first), and on ENOENT, we
121/// mkdirat() and retry.
122fn ensure_dir_and_openat(dirfd: impl AsFd, filename: &str, flags: OFlags) -> ErrnoResult<OwnedFd> {
123    match openat(
124        &dirfd,
125        filename,
126        flags | OFlags::CLOEXEC | OFlags::DIRECTORY,
127        0o666.into(),
128    ) {
129        Ok(file) => Ok(file),
130        Err(Errno::NOENT) => match mkdirat(&dirfd, filename, 0o777.into()) {
131            Ok(()) | Err(Errno::EXIST) => openat(
132                dirfd,
133                filename,
134                flags | OFlags::CLOEXEC | OFlags::DIRECTORY,
135                0o666.into(),
136            ),
137            Err(other) => Err(other),
138        },
139        Err(other) => Err(other),
140    }
141}
142
143/// A content-addressable repository for composefs objects.
144///
145/// Stores content-addressed objects, splitstreams, and images with fsverity
146/// verification. Objects are stored by their fsverity digest, streams by SHA256
147/// content hash, and both support named references for persistence across
148/// garbage collection.
149pub struct Repository<ObjectID: FsVerityHashValue> {
150    repository: OwnedFd,
151    objects: OnceCell<OwnedFd>,
152    write_semaphore: OnceCell<Arc<Semaphore>>,
153    insecure: bool,
154    _data: std::marker::PhantomData<ObjectID>,
155}
156
157impl<ObjectID: FsVerityHashValue> std::fmt::Debug for Repository<ObjectID> {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        f.debug_struct("Repository")
160            .field("repository", &self.repository)
161            .field("objects", &self.objects)
162            .field("insecure", &self.insecure)
163            .finish_non_exhaustive()
164    }
165}
166
167impl<ObjectID: FsVerityHashValue> Drop for Repository<ObjectID> {
168    fn drop(&mut self) {
169        flock(&self.repository, FlockOperation::Unlock).expect("repository unlock failed");
170    }
171}
172
173/// For Repository::gc_category
174enum GCCategoryWalkMode {
175    RefsOnly,
176    AllEntries,
177}
178
179/// Statistics from a garbage collection operation.
180///
181/// Returned by [`Repository::gc`] to report what was (or would be) removed.
182#[derive(Debug, Clone, Default, PartialEq, Eq)]
183pub struct GcResult {
184    /// Number of unreferenced objects removed (or that would be removed)
185    pub objects_removed: u64,
186    /// Total bytes of object data removed (or that would be removed)
187    pub objects_bytes: u64,
188    /// Number of broken symlinks removed in images/
189    pub images_pruned: u64,
190    /// Number of broken symlinks removed in streams/
191    pub streams_pruned: u64,
192}
193
194impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
195    /// Return the objects directory.
196    pub fn objects_dir(&self) -> ErrnoResult<&OwnedFd> {
197        self.objects
198            .get_or_try_init(|| ensure_dir_and_openat(&self.repository, "objects", OFlags::PATH))
199    }
200
201    /// Return a shared semaphore for limiting concurrent object writes.
202    ///
203    /// This semaphore is lazily initialized with `available_parallelism()` permits,
204    /// and shared across all operations on this repository. Use this to limit
205    /// concurrent I/O when processing multiple files or layers in parallel.
206    pub fn write_semaphore(&self) -> Arc<Semaphore> {
207        self.write_semaphore
208            .get_or_init(|| {
209                let max_concurrent = available_parallelism().map(|n| n.get()).unwrap_or(4);
210                Arc::new(Semaphore::new(max_concurrent))
211            })
212            .clone()
213    }
214
215    /// Open a repository at the target directory and path.
216    pub fn open_path(dirfd: impl AsFd, path: impl AsRef<Path>) -> Result<Self> {
217        let path = path.as_ref();
218
219        // O_PATH isn't enough because flock()
220        let repository = openat(dirfd, path, OFlags::RDONLY | OFlags::CLOEXEC, Mode::empty())
221            .with_context(|| format!("Cannot open composefs repository at {}", path.display()))?;
222
223        flock(&repository, FlockOperation::LockShared)
224            .context("Cannot lock composefs repository")?;
225
226        Ok(Self {
227            repository,
228            objects: OnceCell::new(),
229            write_semaphore: OnceCell::new(),
230            insecure: false,
231            _data: std::marker::PhantomData,
232        })
233    }
234
235    /// Open the default user-owned composefs repository.
236    pub fn open_user() -> Result<Self> {
237        let home = std::env::var("HOME").with_context(|| "$HOME must be set when in user mode")?;
238
239        Self::open_path(CWD, PathBuf::from(home).join(".var/lib/composefs"))
240    }
241
242    /// Open the default system-global composefs repository.
243    pub fn open_system() -> Result<Self> {
244        Self::open_path(CWD, PathBuf::from("/sysroot/composefs".to_string()))
245    }
246
247    fn ensure_dir(&self, dir: impl AsRef<Path>) -> ErrnoResult<()> {
248        mkdirat(&self.repository, dir.as_ref(), 0o755.into()).or_else(|e| match e {
249            Errno::EXIST => Ok(()),
250            _ => Err(e),
251        })
252    }
253
254    /// Asynchronously ensures an object exists in the repository.
255    ///
256    /// Same as `ensure_object` but runs the operation on a blocking thread pool
257    /// to avoid blocking async tasks. Returns the fsverity digest of the object.
258    ///
259    /// For performance reasons, this function does *not* call fsync() or similar.  After you're
260    /// done with everything, call `Repository::sync_async()`.
261    pub async fn ensure_object_async(self: &Arc<Self>, data: Vec<u8>) -> Result<ObjectID> {
262        let self_ = Arc::clone(self);
263        tokio::task::spawn_blocking(move || self_.ensure_object(&data)).await?
264    }
265
266    /// Create an O_TMPFILE in the objects directory for streaming writes.
267    ///
268    /// Returns the file descriptor for writing. The caller should write data to this fd,
269    /// then call `spawn_finalize_object_tmpfile()` to compute the verity digest,
270    /// enable fs-verity, and link the file into the objects directory.
271    pub fn create_object_tmpfile(&self) -> Result<OwnedFd> {
272        let objects_dir = self.objects_dir()?;
273        let fd = openat(
274            objects_dir,
275            ".",
276            OFlags::RDWR | OFlags::TMPFILE | OFlags::CLOEXEC,
277            Mode::from_raw_mode(0o644),
278        )?;
279        Ok(fd)
280    }
281
282    /// Spawn a background task that finalizes a tmpfile as an object.
283    ///
284    /// The task computes the fs-verity digest by reading the file, enables verity,
285    /// and links the file into the objects directory.
286    ///
287    /// Returns a handle that resolves to the ObjectID (fs-verity digest).
288    ///
289    /// # Arguments
290    /// * `tmpfile_fd` - The O_TMPFILE file descriptor with data already written
291    /// * `size` - The exact size in bytes of the data written to the tmpfile
292    pub fn spawn_finalize_object_tmpfile(
293        self: &Arc<Self>,
294        tmpfile_fd: OwnedFd,
295        size: u64,
296    ) -> tokio::task::JoinHandle<Result<ObjectID>> {
297        let self_ = Arc::clone(self);
298        tokio::task::spawn_blocking(move || self_.finalize_object_tmpfile(tmpfile_fd.into(), size))
299    }
300
301    /// Finalize a tmpfile as an object.
302    ///
303    /// This method should be called from a blocking context (e.g., `spawn_blocking`)
304    /// as it performs synchronous I/O operations.
305    ///
306    /// This method:
307    /// 1. Re-opens the file as read-only
308    /// 2. Enables fs-verity on the file (kernel computes digest)
309    /// 3. Reads the digest from the kernel
310    /// 4. Checks if object already exists (deduplication)
311    /// 5. Links the file into the objects directory
312    ///
313    /// By letting the kernel compute the digest during verity enable, we avoid
314    /// reading the file an extra time in userspace.
315    pub fn finalize_object_tmpfile(&self, file: File, size: u64) -> Result<ObjectID> {
316        // Re-open as read-only via /proc/self/fd (required for verity enable)
317        let fd_path = proc_self_fd(&file);
318        let ro_fd = open(&*fd_path, OFlags::RDONLY | OFlags::CLOEXEC, Mode::empty())?;
319
320        // Must close writable fd before enabling verity
321        drop(file);
322
323        // Get objects_dir early since we may need it for verity copy
324        let objects_dir = self.objects_dir()?;
325
326        // Enable verity - the kernel reads the file and computes the digest.
327        // Use enable_verity_maybe_copy to handle the case where forked processes
328        // have inherited writable fds to this file.
329        let (ro_fd, verity_enabled) =
330            match enable_verity_maybe_copy::<ObjectID>(objects_dir, ro_fd.as_fd()) {
331                Ok(None) => (ro_fd, true),
332                Ok(Some(new_fd)) => (new_fd, true),
333                Err(EnableVerityError::FilesystemNotSupported) if self.insecure => (ro_fd, false),
334                Err(EnableVerityError::AlreadyEnabled) => (ro_fd, true),
335                Err(other) => return Err(other).context("Enabling verity on tmpfile")?,
336            };
337
338        // Get the digest - either from kernel (fast) or compute in userspace (fallback)
339        let id: ObjectID = if verity_enabled {
340            measure_verity(&ro_fd).context("Measuring verity digest")?
341        } else {
342            // Insecure mode: compute digest in userspace from ro_fd
343            let mut reader = std::io::BufReader::new(File::from(ro_fd.try_clone()?));
344            Self::compute_verity_digest(&mut reader)?
345        };
346
347        // Check if object already exists
348        let path = id.to_object_pathname();
349
350        match statat(objects_dir, &path, AtFlags::empty()) {
351            Ok(stat) if stat.st_size as u64 == size => {
352                // Object already exists with correct size, skip storage
353                return Ok(id);
354            }
355            _ => {}
356        }
357
358        // Ensure parent directory exists
359        let parent_dir = id.to_object_dir();
360        let _ = mkdirat(objects_dir, &parent_dir, Mode::from_raw_mode(0o755));
361
362        // Link the file into the objects directory
363        match linkat(
364            CWD,
365            proc_self_fd(&ro_fd),
366            objects_dir,
367            &path,
368            AtFlags::SYMLINK_FOLLOW,
369        ) {
370            Ok(()) => Ok(id),
371            Err(Errno::EXIST) => Ok(id), // Race: another task created it
372            Err(e) => Err(e).context("Linking tmpfile into objects directory")?,
373        }
374    }
375
376    /// Compute fs-verity digest in userspace by reading from a buffered source.
377    /// Used as fallback when kernel verity is not available (insecure mode).
378    fn compute_verity_digest(reader: &mut impl std::io::BufRead) -> Result<ObjectID> {
379        let mut hasher = FsVerityHasher::<ObjectID>::new();
380
381        loop {
382            let buf = reader.fill_buf()?;
383            if buf.is_empty() {
384                break;
385            }
386            // add_block expects at most one block at a time
387            let chunk_size = buf.len().min(FsVerityHasher::<ObjectID>::BLOCK_SIZE);
388            hasher.add_block(&buf[..chunk_size]);
389            reader.consume(chunk_size);
390        }
391
392        Ok(hasher.digest())
393    }
394
395    /// Store an object with a pre-computed fs-verity ID.
396    ///
397    /// This is an internal helper that stores data assuming the caller has already
398    /// computed the correct fs-verity digest. The digest is verified after storage.
399    fn store_object_with_id(&self, data: &[u8], id: &ObjectID) -> Result<()> {
400        let dirfd = self.objects_dir()?;
401        let path = id.to_object_pathname();
402
403        // the usual case is that the file will already exist
404        match openat(
405            dirfd,
406            &path,
407            OFlags::RDONLY | OFlags::CLOEXEC,
408            Mode::empty(),
409        ) {
410            Ok(fd) => {
411                // measure the existing file to ensure that it's correct
412                // TODO: try to replace file if it's broken?
413                match ensure_verity_equal(&fd, id) {
414                    Ok(()) => {}
415                    Err(CompareVerityError::Measure(MeasureVerityError::VerityMissing))
416                        if self.insecure =>
417                    {
418                        match enable_verity_maybe_copy::<ObjectID>(dirfd, fd.as_fd()) {
419                            Ok(Some(fd)) => ensure_verity_equal(&fd, id)?,
420                            Ok(None) => ensure_verity_equal(&fd, id)?,
421                            Err(other) => Err(other)?,
422                        }
423                    }
424                    Err(CompareVerityError::Measure(
425                        MeasureVerityError::FilesystemNotSupported,
426                    )) if self.insecure => {}
427                    Err(other) => Err(other)?,
428                }
429                return Ok(());
430            }
431            Err(Errno::NOENT) => {
432                // in this case we'll create the file
433            }
434            Err(other) => {
435                return Err(other).context("Checking for existing object in repository")?;
436            }
437        }
438
439        let fd = ensure_dir_and_openat(dirfd, &id.to_object_dir(), OFlags::RDWR | OFlags::TMPFILE)?;
440        let mut file = File::from(fd);
441        file.write_all(data)?;
442        // We can't enable verity with an open writable fd, so re-open and close the old one.
443        let ro_fd = open(
444            proc_self_fd(&file),
445            OFlags::RDONLY | OFlags::CLOEXEC,
446            Mode::empty(),
447        )?;
448        // NB: We should do fdatasync() or fsync() here, but doing this for each file forces the
449        // creation of a massive number of journal commits and is a performance disaster.  We need
450        // to coordinate this at a higher level.  See .write_stream().
451        drop(file);
452
453        let ro_fd = match enable_verity_maybe_copy::<ObjectID>(dirfd, ro_fd.as_fd()) {
454            Ok(maybe_fd) => {
455                let ro_fd = maybe_fd.unwrap_or(ro_fd);
456                match ensure_verity_equal(&ro_fd, id) {
457                    Ok(()) => ro_fd,
458                    Err(CompareVerityError::Measure(
459                        MeasureVerityError::VerityMissing
460                        | MeasureVerityError::FilesystemNotSupported,
461                    )) if self.insecure => ro_fd,
462                    Err(other) => Err(other).context("Double-checking verity digest")?,
463                }
464            }
465            Err(EnableVerityError::FilesystemNotSupported) if self.insecure => ro_fd,
466            Err(other) => Err(other).context("Enabling verity digest")?,
467        };
468
469        match linkat(
470            CWD,
471            proc_self_fd(&ro_fd),
472            dirfd,
473            path,
474            AtFlags::SYMLINK_FOLLOW,
475        ) {
476            Ok(()) => {}
477            Err(Errno::EXIST) => {
478                // TODO: strictly, we should measure the newly-appeared file
479            }
480            Err(other) => {
481                return Err(other).context("Linking created object file");
482            }
483        }
484
485        Ok(())
486    }
487
488    /// Given a blob of data, store it in the repository.
489    ///
490    /// For performance reasons, this function does *not* call fsync() or similar.  After you're
491    /// done with everything, call `Repository::sync()`.
492    pub fn ensure_object(&self, data: &[u8]) -> Result<ObjectID> {
493        let id: ObjectID = compute_verity(data);
494        self.store_object_with_id(data, &id)?;
495        Ok(id)
496    }
497
498    fn open_with_verity(&self, filename: &str, expected_verity: &ObjectID) -> Result<OwnedFd> {
499        let fd = self.openat(filename, OFlags::RDONLY)?;
500        match ensure_verity_equal(&fd, expected_verity) {
501            Ok(()) => {}
502            Err(CompareVerityError::Measure(
503                MeasureVerityError::VerityMissing | MeasureVerityError::FilesystemNotSupported,
504            )) if self.insecure => {}
505            Err(other) => Err(other)?,
506        }
507        Ok(fd)
508    }
509
510    /// By default fsverity is required to be enabled on the target
511    /// filesystem. Setting this disables verification of digests
512    /// and an instance of [`Self`] can be used on a filesystem
513    /// without fsverity support.
514    pub fn set_insecure(&mut self, insecure: bool) -> &mut Self {
515        self.insecure = insecure;
516        self
517    }
518
519    /// Creates a SplitStreamWriter for writing a split stream.
520    /// You should write the data to the returned object and then pass it to .store_stream() to
521    /// store the result.
522    pub fn create_stream(self: &Arc<Self>, content_type: u64) -> SplitStreamWriter<ObjectID> {
523        SplitStreamWriter::new(self, content_type)
524    }
525
526    fn format_object_path(id: &ObjectID) -> String {
527        format!("objects/{}", id.to_object_pathname())
528    }
529
530    fn format_stream_path(content_identifier: &str) -> String {
531        format!("streams/{content_identifier}")
532    }
533
534    /// Check if the provided splitstream is present in the repository;
535    /// if so, return its fsverity digest.
536    pub fn has_stream(&self, content_identifier: &str) -> Result<Option<ObjectID>> {
537        let stream_path = Self::format_stream_path(content_identifier);
538
539        match readlinkat(&self.repository, &stream_path, []) {
540            Ok(target) => {
541                let bytes = target.as_bytes();
542                ensure!(
543                    bytes.starts_with(b"../"),
544                    "stream symlink has incorrect prefix"
545                );
546                Ok(Some(ObjectID::from_object_pathname(bytes)?))
547            }
548            Err(Errno::NOENT) => Ok(None),
549            Err(err) => Err(err)?,
550        }
551    }
552
553    /// Write the given splitstream to the repository with the provided content identifier and
554    /// optional reference name.
555    ///
556    /// This call contains an internal barrier that guarantees that, in event of a crash, either:
557    ///  - the named stream (by `content_identifier`) will not be available; or
558    ///  - the stream and all of its linked data will be available
559    ///
560    /// In other words: it will not be possible to boot a system which contained a stream named
561    /// `content_identifier` but is missing linked streams or objects from that stream.
562    pub fn write_stream(
563        &self,
564        writer: SplitStreamWriter<ObjectID>,
565        content_identifier: &str,
566        reference: Option<&str>,
567    ) -> Result<ObjectID> {
568        let object_id = writer.done()?;
569
570        // Right now we have:
571        //   - all of the linked external objects and streams; and
572        //   - the binary data of this splitstream itself
573        //
574        // in the filesystem but but not yet guaranteed to be synced to disk.  This is OK because
575        // nobody knows that the binary data of the splitstream is a splitstream yet: it could just
576        // as well be a random data file contained in an OS image or something.
577        //
578        // We need to make sure that all of that makes it to the disk before the splitstream is
579        // visible as a splitstream.
580        self.sync()?;
581
582        let stream_path = Self::format_stream_path(content_identifier);
583        let object_path = Self::format_object_path(&object_id);
584        self.symlink(&stream_path, &object_path)?;
585
586        if let Some(name) = reference {
587            let reference_path = format!("streams/refs/{name}");
588            self.symlink(&reference_path, &stream_path)?;
589        }
590
591        Ok(object_id)
592    }
593
594    /// Register an already-stored object as a named stream.
595    ///
596    /// This is useful when using `SplitStreamBuilder` which stores the splitstream
597    /// directly via `finish()`. After calling `finish()`, call this method to
598    /// sync all data to disk and create the stream symlink.
599    ///
600    /// This method ensures atomicity: the stream symlink is only created after
601    /// all objects have been synced to disk.
602    pub async fn register_stream(
603        self: &Arc<Self>,
604        object_id: &ObjectID,
605        content_identifier: &str,
606        reference: Option<&str>,
607    ) -> Result<()> {
608        self.sync_async().await?;
609
610        let stream_path = Self::format_stream_path(content_identifier);
611        let object_path = Self::format_object_path(object_id);
612        self.symlink(&stream_path, &object_path)?;
613
614        if let Some(name) = reference {
615            let reference_path = format!("streams/refs/{name}");
616            self.symlink(&reference_path, &stream_path)?;
617        }
618
619        Ok(())
620    }
621
622    /// Async version of `write_stream` for use with parallel object storage.
623    ///
624    /// This method awaits any pending parallel object storage tasks before
625    /// finalizing the stream. Use this when you've called `write_external_parallel()`
626    /// on the writer.
627    pub async fn write_stream_async(
628        self: &Arc<Self>,
629        writer: SplitStreamWriter<ObjectID>,
630        content_identifier: &str,
631        reference: Option<&str>,
632    ) -> Result<ObjectID> {
633        let object_id = writer.done_async().await?;
634
635        self.sync_async().await?;
636
637        let stream_path = Self::format_stream_path(content_identifier);
638        let object_path = Self::format_object_path(&object_id);
639        self.symlink(&stream_path, &object_path)?;
640
641        if let Some(name) = reference {
642            let reference_path = format!("streams/refs/{name}");
643            self.symlink(&reference_path, &stream_path)?;
644        }
645
646        Ok(object_id)
647    }
648
649    /// Check if a splitstream with a given name exists in the "refs" in the repository.
650    pub fn has_named_stream(&self, name: &str) -> Result<bool> {
651        let stream_path = format!("streams/refs/{name}");
652
653        Ok(statat(&self.repository, &stream_path, AtFlags::empty())
654            .filter_errno(Errno::NOENT)
655            .context("Looking for stream {name} in repository")?
656            .map(|s| FileType::from_raw_mode(s.st_mode).is_symlink())
657            .unwrap_or(false))
658    }
659
660    /// Assign a named reference to a stream, making it a GC root.
661    ///
662    /// Creates a symlink at `streams/refs/{name}` pointing to the stream identified
663    /// by `content_identifier`. The stream must already exist in the repository.
664    ///
665    /// Named references serve two purposes:
666    /// 1. They provide human-readable names for streams
667    /// 2. They act as GC roots - streams reachable from refs are not garbage collected
668    ///
669    /// The `name` can include path separators to organize refs hierarchically
670    /// (e.g., `myapp/layer1`), and intermediate directories are created automatically.
671    pub fn name_stream(&self, content_identifier: &str, name: &str) -> Result<()> {
672        let stream_path = Self::format_stream_path(content_identifier);
673        let reference_path = format!("streams/refs/{name}");
674        self.symlink(&reference_path, &stream_path)?;
675        Ok(())
676    }
677
678    /// Ensures that the stream with a given content identifier digest exists in the repository.
679    ///
680    /// This tries to find the stream by the content identifier.  If the stream is already in the
681    /// repository, the object ID (fs-verity digest) is read from the symlink.  If the stream is
682    /// not already in the repository, a `SplitStreamWriter` is created and passed to `callback`.
683    /// On return, the object ID of the stream will be calculated and it will be written to disk
684    /// (if it wasn't already created by someone else in the meantime).
685    ///
686    /// In both cases, if `reference` is provided, it is used to provide a fixed name for the
687    /// object.  Any object that doesn't have a fixed reference to it is subject to garbage
688    /// collection.  It is an error if this reference already exists.
689    ///
690    /// On success, the object ID of the new object is returned.  It is expected that this object
691    /// ID will be used when referring to the stream from other linked streams.
692    pub fn ensure_stream(
693        self: &Arc<Self>,
694        content_identifier: &str,
695        content_type: u64,
696        callback: impl FnOnce(&mut SplitStreamWriter<ObjectID>) -> Result<()>,
697        reference: Option<&str>,
698    ) -> Result<ObjectID> {
699        let stream_path = Self::format_stream_path(content_identifier);
700
701        let object_id = match self.has_stream(content_identifier)? {
702            Some(id) => id,
703            None => {
704                let mut writer = self.create_stream(content_type);
705                callback(&mut writer)?;
706                self.write_stream(writer, content_identifier, reference)?
707            }
708        };
709
710        if let Some(name) = reference {
711            let reference_path = format!("streams/refs/{name}");
712            self.symlink(&reference_path, &stream_path)?;
713        }
714
715        Ok(object_id)
716    }
717
718    /// Open a splitstream with the given name.
719    pub fn open_stream(
720        &self,
721        content_identifier: &str,
722        verity: Option<&ObjectID>,
723        expected_content_type: Option<u64>,
724    ) -> Result<SplitStreamReader<ObjectID>> {
725        let file = File::from(if let Some(verity_hash) = verity {
726            self.open_object(verity_hash)
727                .with_context(|| format!("Opening object '{verity_hash:?}'"))?
728        } else {
729            let filename = Self::format_stream_path(content_identifier);
730            self.openat(&filename, OFlags::RDONLY)
731                .with_context(|| format!("Opening ref '{filename}'"))?
732        });
733
734        SplitStreamReader::new(file, expected_content_type)
735    }
736
737    /// Given an object identifier (a digest), return a read-only file descriptor
738    /// for its contents. The fsverity digest is verified (if the repository is not in `insecure` mode).
739    pub fn open_object(&self, id: &ObjectID) -> Result<OwnedFd> {
740        self.open_with_verity(&Self::format_object_path(id), id)
741    }
742
743    /// Read the contents of an object into a Vec
744    pub fn read_object(&self, id: &ObjectID) -> Result<Vec<u8>> {
745        let mut data = vec![];
746        File::from(self.open_object(id)?).read_to_end(&mut data)?;
747        Ok(data)
748    }
749
750    /// Merges a splitstream into a single continuous stream.
751    ///
752    /// Opens the named splitstream, resolves all object references, and writes
753    /// the complete merged content to the provided writer. Optionally verifies
754    /// the splitstream's fsverity digest matches the expected value.
755    pub fn merge_splitstream(
756        &self,
757        content_identifier: &str,
758        verity: Option<&ObjectID>,
759        expected_content_type: Option<u64>,
760        output: &mut impl Write,
761    ) -> Result<()> {
762        let mut split_stream =
763            self.open_stream(content_identifier, verity, expected_content_type)?;
764        split_stream.cat(self, output)
765    }
766
767    /// Write `data into the repository as an image with the given `name`.
768    ///
769    /// The fsverity digest is returned.
770    ///
771    /// # Integrity
772    ///
773    /// This function is not safe for untrusted users.
774    pub fn write_image(&self, name: Option<&str>, data: &[u8]) -> Result<ObjectID> {
775        let object_id = self.ensure_object(data)?;
776
777        let object_path = Self::format_object_path(&object_id);
778        let image_path = format!("images/{}", object_id.to_hex());
779
780        self.symlink(&image_path, &object_path)?;
781
782        if let Some(reference) = name {
783            let ref_path = format!("images/refs/{reference}");
784            self.symlink(&ref_path, &image_path)?;
785        }
786
787        Ok(object_id)
788    }
789
790    /// Import the data from the provided read into the repository as an image.
791    ///
792    /// The fsverity digest is returned.
793    ///
794    /// # Integrity
795    ///
796    /// This function is not safe for untrusted users.
797    pub fn import_image<R: Read>(&self, name: &str, image: &mut R) -> Result<ObjectID> {
798        let mut data = vec![];
799        image.read_to_end(&mut data)?;
800        self.write_image(Some(name), &data)
801    }
802
803    /// Returns the fd of the image and whether or not verity should be
804    /// enabled when mounting it.
805    fn open_image(&self, name: &str) -> Result<(OwnedFd, bool)> {
806        let image = self
807            .openat(&format!("images/{name}"), OFlags::RDONLY)
808            .with_context(|| format!("Opening ref 'images/{name}'"))?;
809
810        if name.contains("/") {
811            return Ok((image, true));
812        }
813
814        // A name with no slashes in it is taken to be a sha256 fs-verity digest
815        match measure_verity::<ObjectID>(&image) {
816            Ok(found) if found == FsVerityHashValue::from_hex(name)? => Ok((image, true)),
817            Ok(_) => bail!("fs-verity content mismatch"),
818            Err(MeasureVerityError::VerityMissing | MeasureVerityError::FilesystemNotSupported)
819                if self.insecure =>
820            {
821                Ok((image, false))
822            }
823            Err(other) => Err(other)?,
824        }
825    }
826
827    /// Create a detached mount of an image. This file descriptor can then
828    /// be attached via e.g. `move_mount`.
829    pub fn mount(&self, name: &str) -> Result<OwnedFd> {
830        let (image, enable_verity) = self.open_image(name)?;
831        Ok(composefs_fsmount(
832            image,
833            name,
834            self.objects_dir()?,
835            enable_verity,
836        )?)
837    }
838
839    /// Mount the image with the provided digest at the target path.
840    pub fn mount_at(&self, name: &str, mountpoint: impl AsRef<Path>) -> Result<()> {
841        Ok(mount_at(
842            self.mount(name)?,
843            CWD,
844            &canonicalize(mountpoint)?,
845        )?)
846    }
847
848    /// Creates a relative symlink within the repository.
849    ///
850    /// Computes the correct relative path from the symlink location to the target,
851    /// creating any necessary intermediate directories. Atomically replaces any
852    /// existing symlink at the specified name.
853    pub fn symlink(&self, name: impl AsRef<Path>, target: impl AsRef<Path>) -> ErrnoResult<()> {
854        let name = name.as_ref();
855
856        let mut symlink_components = name.parent().unwrap().components().peekable();
857        let mut target_components = target.as_ref().components().peekable();
858
859        let mut symlink_ancestor = PathBuf::new();
860
861        // remove common leading components
862        while symlink_components.peek() == target_components.peek() {
863            symlink_ancestor.push(symlink_components.next().unwrap());
864            target_components.next().unwrap();
865        }
866
867        let mut relative = PathBuf::new();
868        // prepend a "../" for each ancestor of the symlink
869        // and create those ancestors as we do so
870        for symlink_component in symlink_components {
871            symlink_ancestor.push(symlink_component);
872            self.ensure_dir(&symlink_ancestor)?;
873            relative.push("..");
874        }
875
876        // now build the relative path from the remaining components of the target
877        for target_component in target_components {
878            relative.push(target_component);
879        }
880
881        // Atomically replace existing symlink
882        replace_symlinkat(&relative, &self.repository, name)
883    }
884
885    fn read_symlink_hashvalue(dirfd: &OwnedFd, name: &CStr) -> Result<ObjectID> {
886        let link_content = readlinkat(dirfd, name, [])?;
887        Ok(ObjectID::from_object_pathname(link_content.to_bytes())?)
888    }
889
890    fn walk_symlinkdir(fd: OwnedFd, entry_digests: &mut HashSet<OsString>) -> Result<()> {
891        for item in Dir::read_from(&fd)? {
892            let entry = item?;
893            // NB: the underlying filesystem must support returning filetype via direntry
894            // that's a reasonable assumption, since it must also support fsverity...
895            match entry.file_type() {
896                FileType::Directory => {
897                    let filename = entry.file_name();
898                    if filename != c"." && filename != c".." {
899                        let dirfd = openat(
900                            &fd,
901                            filename,
902                            OFlags::RDONLY | OFlags::CLOEXEC,
903                            Mode::empty(),
904                        )?;
905                        Self::walk_symlinkdir(dirfd, entry_digests)?;
906                    }
907                }
908                FileType::Symlink => {
909                    let link_content = readlinkat(&fd, entry.file_name(), [])?;
910                    let linked_path = Path::new(OsStr::from_bytes(link_content.as_bytes()));
911                    if let Some(entry_name) = linked_path.file_name() {
912                        entry_digests.insert(entry_name.to_os_string());
913                    } else {
914                        // Does not have a proper file base name (i.e. "..")
915                        // TODO: this case needs to be checked in fsck implementation
916                        continue;
917                    }
918                }
919                _ => {
920                    bail!("Unexpected file type encountered");
921                }
922            }
923        }
924
925        Ok(())
926    }
927
928    /// Open the provided path in the repository.
929    fn openat(&self, name: &str, flags: OFlags) -> ErrnoResult<OwnedFd> {
930        // Unconditionally add CLOEXEC as we always want it.
931        openat(
932            &self.repository,
933            name,
934            flags | OFlags::CLOEXEC,
935            Mode::empty(),
936        )
937    }
938
939    // For a GC category (images / streams), return underlying entry digests and
940    // object IDs for each entry
941    // Under RefsOnly mode, only entries explicitly referenced in `<category>/refs`
942    // directory structure would be walked and returned
943    // Under AllEntries mode, all entires will be returned
944    // Note that this function assumes all`*/refs/` links link to 1st level entries
945    // and all 1st level entries link to object store
946    // TODO: fsck the above noted assumption
947    fn gc_category(
948        &self,
949        category: &str,
950        mode: GCCategoryWalkMode,
951    ) -> Result<Vec<(ObjectID, String)>> {
952        let Some(category_fd) = self
953            .openat(category, OFlags::RDONLY | OFlags::DIRECTORY)
954            .filter_errno(Errno::NOENT)
955            .context(format!("Opening {category} dir in repository"))?
956        else {
957            return Ok(Vec::new());
958        };
959
960        let mut entry_digests = HashSet::new();
961        match mode {
962            GCCategoryWalkMode::RefsOnly => {
963                if let Some(refs) = openat(
964                    &category_fd,
965                    "refs",
966                    OFlags::RDONLY | OFlags::DIRECTORY | OFlags::CLOEXEC,
967                    Mode::empty(),
968                )
969                .filter_errno(Errno::NOENT)
970                .context(format!("Opening {category}/refs dir in repository"))?
971                {
972                    Self::walk_symlinkdir(refs, &mut entry_digests)?;
973                }
974            }
975            GCCategoryWalkMode::AllEntries => {
976                // All first-level link entries should be directly object references
977                for item in Dir::read_from(&category_fd)? {
978                    let entry = item?;
979                    let filename = entry.file_name();
980                    if filename != c"refs" && filename != c"." && filename != c".." {
981                        if entry.file_type() != FileType::Symlink {
982                            bail!("category directory contains non-symlink");
983                        }
984                        entry_digests.insert(OsString::from(&OsStr::from_bytes(
985                            entry.file_name().to_bytes(),
986                        )));
987                    }
988                }
989            }
990        }
991
992        let objects = entry_digests
993            .into_iter()
994            .map(|entry_fn| {
995                Ok((
996                    Self::read_symlink_hashvalue(
997                        &category_fd,
998                        CString::new(entry_fn.as_bytes())?.as_c_str(),
999                    )?,
1000                    entry_fn
1001                        .to_str()
1002                        .context("str conversion fails")?
1003                        .to_owned(),
1004                ))
1005            })
1006            .collect::<Result<_>>()?;
1007
1008        Ok(objects)
1009    }
1010
1011    // Remove all broken links from a directory, may operate recursively
1012    /// Remove broken symlinks from a directory.
1013    /// If `dry_run` is true, counts but does not remove. Returns the count.
1014    fn cleanup_broken_links(fd: &OwnedFd, recursive: bool, dry_run: bool) -> Result<u64> {
1015        let mut count = 0;
1016        for item in Dir::read_from(fd)? {
1017            let entry = item?;
1018            match entry.file_type() {
1019                FileType::Directory => {
1020                    if !recursive {
1021                        continue;
1022                    }
1023                    let filename = entry.file_name();
1024                    if filename != c"." && filename != c".." {
1025                        let dirfd = openat(
1026                            fd,
1027                            filename,
1028                            OFlags::RDONLY | OFlags::CLOEXEC,
1029                            Mode::empty(),
1030                        )?;
1031                        count += Self::cleanup_broken_links(&dirfd, recursive, dry_run)?;
1032                    }
1033                }
1034
1035                FileType::Symlink => {
1036                    let filename = entry.file_name();
1037                    let result = statat(fd, filename, AtFlags::empty())
1038                        .filter_errno(Errno::NOENT)
1039                        .context("Testing for broken links")?;
1040                    if result.is_none() {
1041                        count += 1;
1042                        if !dry_run {
1043                            unlinkat(fd, filename, AtFlags::empty())
1044                                .context("Unlinking broken links")?;
1045                        }
1046                    }
1047                }
1048
1049                _ => {
1050                    bail!("Unexpected file type encountered");
1051                }
1052            }
1053        }
1054        Ok(count)
1055    }
1056
1057    /// Clean up broken links in a gc category. Returns count of links removed.
1058    fn cleanup_gc_category(&self, category: &'static str, dry_run: bool) -> Result<u64> {
1059        let Some(category_fd) = self
1060            .openat(category, OFlags::RDONLY | OFlags::DIRECTORY)
1061            .filter_errno(Errno::NOENT)
1062            .context(format!("Opening {category} dir in repository"))?
1063        else {
1064            return Ok(0);
1065        };
1066        // Always cleanup first-level first, then the refs
1067        let mut count = Self::cleanup_broken_links(&category_fd, false, dry_run)
1068            .context(format!("Cleaning up broken links in {category}/"))?;
1069        let ref_fd = openat(
1070            &category_fd,
1071            "refs",
1072            OFlags::RDONLY | OFlags::DIRECTORY | OFlags::CLOEXEC,
1073            Mode::empty(),
1074        )
1075        .filter_errno(Errno::NOENT)
1076        .context(format!("Opening {category}/refs to clean up broken links"))?;
1077        if let Some(ref dirfd) = ref_fd {
1078            count += Self::cleanup_broken_links(dirfd, true, dry_run).context(format!(
1079                "Cleaning up broken links recursively in {category}/refs"
1080            ))?;
1081        }
1082        Ok(count)
1083    }
1084
1085    // Traverse split streams to resolve all linked objects
1086    fn walk_streams(
1087        &self,
1088        stream_name_map: &HashMap<ObjectID, String>,
1089        stream_name: &str,
1090        walked_streams: &mut HashSet<String>,
1091        objects: &mut HashSet<ObjectID>,
1092    ) -> Result<()> {
1093        if walked_streams.contains(stream_name) {
1094            return Ok(());
1095        }
1096        walked_streams.insert(stream_name.to_owned());
1097
1098        let mut split_stream = self.open_stream(stream_name, None, None)?;
1099        // Plain object references, add to live objects set
1100        split_stream.get_object_refs(|id| {
1101            debug!("   with {id:?}");
1102            objects.insert(id.clone());
1103        })?;
1104        // Collect all stream names from named references table to be walked next
1105        let streams_to_walk: Vec<_> = split_stream.iter_named_refs().collect();
1106        // Note that stream name from the named references table is not stream name in repository
1107        // In practice repository name is often table name prefixed with stream types (e.g. oci-config-<table name>)
1108        // Here we always match objectID to be absolutely sure
1109        for (stream_name_in_table, stream_object_id) in streams_to_walk {
1110            debug!(
1111                "   named reference stream {stream_name_in_table} lives, with {stream_object_id:?}"
1112            );
1113            objects.insert(stream_object_id.clone());
1114            if let Some(stream_name_in_repo) = stream_name_map.get(stream_object_id) {
1115                self.walk_streams(
1116                    stream_name_map,
1117                    stream_name_in_repo,
1118                    walked_streams,
1119                    objects,
1120                )?;
1121            } else {
1122                // stream is in table but not in repo, the repo is potentially broken, issue a warning
1123                trace!("broken repo: named reference stream {stream_name_in_table} not found as stream in repo");
1124            }
1125        }
1126        Ok(())
1127    }
1128
1129    /// Given an image, return the set of all objects referenced by it.
1130    pub fn objects_for_image(&self, name: &str) -> Result<HashSet<ObjectID>> {
1131        let (image, _) = self.open_image(name)?;
1132        let mut data = vec![];
1133        std::fs::File::from(image).read_to_end(&mut data)?;
1134        Ok(crate::erofs::reader::collect_objects(&data)?)
1135    }
1136
1137    /// Makes sure all content is written to the repository.
1138    ///
1139    /// This is currently just syncfs() on the repository's root directory because we don't have
1140    /// any better options at present.  This blocks until the data is written out.
1141    pub fn sync(&self) -> Result<()> {
1142        syncfs(&self.repository)?;
1143        Ok(())
1144    }
1145
1146    /// Makes sure all content is written to the repository.
1147    ///
1148    /// This is currently just syncfs() on the repository's root directory because we don't have
1149    /// any better options at present.  This won't return until the data is written out.
1150    pub async fn sync_async(self: &Arc<Self>) -> Result<()> {
1151        let self_ = Arc::clone(self);
1152        tokio::task::spawn_blocking(move || self_.sync()).await?
1153    }
1154
1155    /// Perform garbage collection, removing unreferenced objects.
1156    ///
1157    /// Objects reachable from `images/refs/` or `streams/refs/` are preserved,
1158    /// plus any `additional_roots` (looked up in both images and streams).
1159    /// Returns statistics about what was removed.
1160    ///
1161    /// # Locking
1162    ///
1163    /// An exclusive lock is held for the duration of this operation.
1164    pub fn gc(&self, additional_roots: &[&str]) -> Result<GcResult> {
1165        flock(&self.repository, FlockOperation::LockExclusive)?;
1166        self.gc_impl(additional_roots, false)
1167    }
1168
1169    /// Preview what garbage collection would remove, without deleting.
1170    ///
1171    /// Returns the same statistics that [`gc`](Self::gc) would return,
1172    /// but no files are actually deleted.
1173    ///
1174    /// # Locking
1175    ///
1176    /// A shared lock is held for the duration of this operation (readers
1177    /// are not blocked).
1178    pub fn gc_dry_run(&self, additional_roots: &[&str]) -> Result<GcResult> {
1179        // Shared lock is sufficient since we don't modify anything
1180        flock(&self.repository, FlockOperation::LockShared)?;
1181        self.gc_impl(additional_roots, true)
1182    }
1183
1184    /// Internal GC implementation (lock must already be held).
1185    fn gc_impl(&self, additional_roots: &[&str], dry_run: bool) -> Result<GcResult> {
1186        let mut result = GcResult::default();
1187        let mut live_objects = HashSet::new();
1188
1189        // Build set of additional roots (checked in both images and streams)
1190        let extra_roots: HashSet<_> = additional_roots.iter().map(|s| s.to_string()).collect();
1191
1192        // Collect images: those in images/refs plus caller-specified roots
1193        let all_images = self.gc_category("images", GCCategoryWalkMode::AllEntries)?;
1194        let root_images: Vec<_> = self
1195            .gc_category("images", GCCategoryWalkMode::RefsOnly)?
1196            .into_iter()
1197            .chain(
1198                all_images
1199                    .into_iter()
1200                    .filter(|(_, name)| extra_roots.contains(name)),
1201            )
1202            .collect();
1203
1204        for ref image in root_images {
1205            debug!("{image:?} lives as an image");
1206            live_objects.insert(image.0.clone());
1207            self.objects_for_image(&image.1)?.iter().for_each(|id| {
1208                debug!("   with {id:?}");
1209                live_objects.insert(id.clone());
1210            });
1211        }
1212
1213        // Collect all streams for the name map, then filter to roots
1214        let all_streams = self.gc_category("streams", GCCategoryWalkMode::AllEntries)?;
1215        let stream_name_map: HashMap<_, _> = all_streams.iter().cloned().collect();
1216        let root_streams: Vec<_> = self
1217            .gc_category("streams", GCCategoryWalkMode::RefsOnly)?
1218            .into_iter()
1219            .chain(
1220                all_streams
1221                    .into_iter()
1222                    .filter(|(_, name)| extra_roots.contains(name)),
1223            )
1224            .collect();
1225
1226        let mut walked_streams = HashSet::new();
1227        for stream in root_streams {
1228            debug!("{stream:?} lives as a stream");
1229            live_objects.insert(stream.0.clone());
1230            self.walk_streams(
1231                &stream_name_map,
1232                &stream.1,
1233                &mut walked_streams,
1234                &mut live_objects,
1235            )?;
1236        }
1237
1238        // Walk all objects and remove unreferenced ones
1239        for first_byte in 0x0..=0xff {
1240            let dirfd = match self.openat(
1241                &format!("objects/{first_byte:02x}"),
1242                OFlags::RDONLY | OFlags::DIRECTORY,
1243            ) {
1244                Ok(fd) => fd,
1245                Err(Errno::NOENT) => continue,
1246                Err(e) => Err(e)?,
1247            };
1248            for item in Dir::read_from(&dirfd)? {
1249                let entry = item?;
1250                let filename = entry.file_name();
1251                if filename != c"." && filename != c".." {
1252                    let id =
1253                        ObjectID::from_object_dir_and_basename(first_byte, filename.to_bytes())?;
1254                    if !live_objects.contains(&id) {
1255                        // Get file size before removing
1256                        if let Ok(stat) = statat(&dirfd, filename, AtFlags::empty()) {
1257                            result.objects_bytes += stat.st_size as u64;
1258                        }
1259                        result.objects_removed += 1;
1260
1261                        if !dry_run {
1262                            debug!("removing: objects/{first_byte:02x}/{filename:?}");
1263                            unlinkat(&dirfd, filename, AtFlags::empty())?;
1264                        }
1265                    } else {
1266                        trace!("objects/{first_byte:02x}/{filename:?} lives");
1267                    }
1268                }
1269            }
1270        }
1271
1272        // Clean up broken symlinks
1273        result.images_pruned = self.cleanup_gc_category("images", dry_run)?;
1274        result.streams_pruned = self.cleanup_gc_category("streams", dry_run)?;
1275
1276        // Downgrade to shared lock if we had exclusive (for actual GC)
1277        if !dry_run {
1278            flock(&self.repository, FlockOperation::LockShared)?;
1279        }
1280        Ok(result)
1281    }
1282
1283    // fn fsck(&self) -> Result<()> {
1284    //     unimplemented!()
1285    // }
1286}
1287
1288#[cfg(test)]
1289mod tests {
1290    use std::vec;
1291
1292    use super::*;
1293    use crate::fsverity::Sha512HashValue;
1294    use crate::test::tempdir;
1295    use rustix::fs::{statat, CWD};
1296    use tempfile::TempDir;
1297
1298    /// Create a test repository in insecure mode (no fs-verity required).
1299    fn create_test_repo(path: &Path) -> Result<Arc<Repository<Sha512HashValue>>> {
1300        mkdirat(CWD, path, Mode::from_raw_mode(0o755))?;
1301        let mut repo = Repository::open_path(CWD, path)?;
1302        repo.set_insecure(true);
1303        Ok(Arc::new(repo))
1304    }
1305
1306    /// Generate deterministic test data of a given size.
1307    fn generate_test_data(size: u64, seed: u8) -> Vec<u8> {
1308        (0..size)
1309            .map(|i| ((i as u8).wrapping_add(seed)).wrapping_mul(17))
1310            .collect()
1311    }
1312
1313    fn read_links_in_repo<P>(tmp: &TempDir, repo_sub_path: P) -> Result<Option<PathBuf>>
1314    where
1315        P: AsRef<Path>,
1316    {
1317        let full_path = tmp.path().join("repo").join(repo_sub_path);
1318        match readlinkat(CWD, &full_path, Vec::new()) {
1319            Ok(result) => Ok(Some(PathBuf::from(result.to_str()?))),
1320            Err(rustix::io::Errno::NOENT) => Ok(None),
1321            Err(e) => Err(e.into()),
1322        }
1323    }
1324
1325    // Does not follow symlinks
1326    fn test_path_exists_in_repo<P>(tmp: &TempDir, repo_sub_path: P) -> Result<bool>
1327    where
1328        P: AsRef<Path>,
1329    {
1330        let full_path = tmp.path().join("repo").join(repo_sub_path);
1331        match statat(CWD, &full_path, AtFlags::SYMLINK_NOFOLLOW) {
1332            Ok(_) => Ok(true),
1333            Err(rustix::io::Errno::NOENT) => Ok(false),
1334            Err(e) => Err(e.into()),
1335        }
1336    }
1337
1338    fn test_object_exists(tmp: &TempDir, obj: &Sha512HashValue) -> Result<bool> {
1339        let digest = obj.to_hex();
1340        let (first_two, remainder) = digest.split_at(2);
1341        test_path_exists_in_repo(tmp, &format!("objects/{first_two}/{remainder}"))
1342    }
1343
1344    #[test]
1345    fn test_gc_removes_one_stream() -> Result<()> {
1346        let tmp = tempdir();
1347        let repo = create_test_repo(&tmp.path().join("repo"))?;
1348
1349        let obj1 = generate_test_data(32 * 1024, 0xAE);
1350        let obj2 = generate_test_data(64 * 1024, 0xEA);
1351
1352        let obj1_id = repo.ensure_object(&obj1)?;
1353        let obj2_id: Sha512HashValue = compute_verity(&obj2);
1354
1355        let mut writer = repo.create_stream(0);
1356        writer.write_external(&obj2)?;
1357        let _stream_id = repo.write_stream(writer, "test-stream", None)?;
1358
1359        repo.sync()?;
1360
1361        assert!(test_object_exists(&tmp, &obj1_id)?);
1362        assert!(test_object_exists(&tmp, &obj2_id)?);
1363        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
1364        let link_target =
1365            read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
1366        assert!(test_path_exists_in_repo(
1367            &tmp,
1368            PathBuf::from("streams").join(&link_target)
1369        )?);
1370
1371        // Now perform gc - should remove 2 objects (obj1 + obj2) and 1 stream symlink
1372        let result = repo.gc(&[])?;
1373
1374        assert!(!test_object_exists(&tmp, &obj1_id)?);
1375        assert!(!test_object_exists(&tmp, &obj2_id)?);
1376        assert!(!test_path_exists_in_repo(&tmp, "streams/test-stream")?);
1377
1378        // Verify GcResult: 3 objects removed (obj1, obj2, splitstream), stream symlink pruned
1379        assert_eq!(result.objects_removed, 3);
1380        assert!(result.objects_bytes > 0);
1381        assert_eq!(result.streams_pruned, 1);
1382        assert_eq!(result.images_pruned, 0);
1383        Ok(())
1384    }
1385
1386    #[test]
1387    fn test_gc_keeps_one_stream() -> Result<()> {
1388        let tmp = tempdir();
1389        let repo = create_test_repo(&tmp.path().join("repo"))?;
1390
1391        let obj1 = generate_test_data(32 * 1024, 0xAE);
1392        let obj2 = generate_test_data(64 * 1024, 0xEA);
1393
1394        let obj1_id = repo.ensure_object(&obj1)?;
1395        let obj2_id: Sha512HashValue = compute_verity(&obj2);
1396
1397        let mut writer = repo.create_stream(0);
1398        writer.write_external(&obj2)?;
1399        let _stream_id = repo.write_stream(writer, "test-stream", None)?;
1400
1401        repo.sync()?;
1402
1403        assert!(test_object_exists(&tmp, &obj1_id)?);
1404        assert!(test_object_exists(&tmp, &obj2_id)?);
1405        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
1406        let link_target =
1407            read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
1408        assert!(test_path_exists_in_repo(
1409            &tmp,
1410            PathBuf::from("streams").join(&link_target)
1411        )?);
1412
1413        // Now perform gc - should remove only obj1, keep obj2 and stream
1414        let result = repo.gc(&["test-stream"])?;
1415
1416        assert!(!test_object_exists(&tmp, &obj1_id)?);
1417        assert!(test_object_exists(&tmp, &obj2_id)?);
1418        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
1419        let link_target =
1420            read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
1421        assert!(test_path_exists_in_repo(
1422            &tmp,
1423            PathBuf::from("streams").join(&link_target)
1424        )?);
1425
1426        // Verify GcResult: only 1 object removed, no symlinks pruned
1427        assert_eq!(result.objects_removed, 1);
1428        assert!(result.objects_bytes > 0);
1429        assert_eq!(result.streams_pruned, 0);
1430        assert_eq!(result.images_pruned, 0);
1431        Ok(())
1432    }
1433
1434    #[test]
1435    fn test_gc_keeps_one_stream_from_refs() -> Result<()> {
1436        let tmp = tempdir();
1437        let repo = create_test_repo(&tmp.path().join("repo"))?;
1438
1439        let obj1 = generate_test_data(32 * 1024, 0xAE);
1440        let obj2 = generate_test_data(64 * 1024, 0xEA);
1441
1442        let obj1_id = repo.ensure_object(&obj1)?;
1443        let obj2_id: Sha512HashValue = compute_verity(&obj2);
1444
1445        let mut writer = repo.create_stream(0);
1446        writer.write_external(&obj2)?;
1447        let _stream_id = repo.write_stream(writer, "test-stream", Some("ref-name"))?;
1448
1449        repo.sync()?;
1450
1451        assert!(test_object_exists(&tmp, &obj1_id)?);
1452        assert!(test_object_exists(&tmp, &obj2_id)?);
1453        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
1454        let link_target =
1455            read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
1456        assert!(test_path_exists_in_repo(
1457            &tmp,
1458            PathBuf::from("streams").join(&link_target)
1459        )?);
1460
1461        // Now perform gc - stream is kept via ref, only obj1 removed
1462        let result = repo.gc(&[])?;
1463
1464        assert!(!test_object_exists(&tmp, &obj1_id)?);
1465        assert!(test_object_exists(&tmp, &obj2_id)?);
1466        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
1467        let link_target =
1468            read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
1469        assert!(test_path_exists_in_repo(
1470            &tmp,
1471            PathBuf::from("streams").join(&link_target)
1472        )?);
1473
1474        // Verify GcResult: 1 object removed, no symlinks pruned (stream has ref)
1475        assert_eq!(result.objects_removed, 1);
1476        assert!(result.objects_bytes > 0);
1477        assert_eq!(result.streams_pruned, 0);
1478        assert_eq!(result.images_pruned, 0);
1479        Ok(())
1480    }
1481
1482    #[test]
1483    fn test_gc_keeps_one_stream_from_two_overlapped() -> Result<()> {
1484        let tmp = tempdir();
1485        let repo = create_test_repo(&tmp.path().join("repo"))?;
1486
1487        let obj1 = generate_test_data(32 * 1024, 0xAE);
1488        let obj2 = generate_test_data(64 * 1024, 0xEA);
1489        let obj3 = generate_test_data(64 * 1024, 0xAA);
1490        let obj4 = generate_test_data(64 * 1024, 0xEE);
1491
1492        let obj1_id = repo.ensure_object(&obj1)?;
1493        let obj2_id: Sha512HashValue = compute_verity(&obj2);
1494        let obj3_id: Sha512HashValue = compute_verity(&obj3);
1495        let obj4_id: Sha512HashValue = compute_verity(&obj4);
1496
1497        let mut writer1 = repo.create_stream(0);
1498        writer1.write_external(&obj2)?;
1499        writer1.write_external(&obj3)?;
1500        let _stream1_id = repo.write_stream(writer1, "test-stream1", None)?;
1501
1502        let mut writer2 = repo.create_stream(0);
1503        writer2.write_external(&obj2)?;
1504        writer2.write_external(&obj4)?;
1505        let _stream2_id = repo.write_stream(writer2, "test-stream2", None)?;
1506
1507        repo.sync()?;
1508
1509        assert!(test_object_exists(&tmp, &obj1_id)?);
1510        assert!(test_object_exists(&tmp, &obj2_id)?);
1511        assert!(test_object_exists(&tmp, &obj3_id)?);
1512        assert!(test_object_exists(&tmp, &obj4_id)?);
1513        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
1514        let link_target =
1515            read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
1516        assert!(test_path_exists_in_repo(
1517            &tmp,
1518            PathBuf::from("streams").join(&link_target)
1519        )?);
1520        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
1521        let link_target =
1522            read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
1523        assert!(test_path_exists_in_repo(
1524            &tmp,
1525            PathBuf::from("streams").join(&link_target)
1526        )?);
1527
1528        // Now perform gc - keep stream1, remove obj1, obj4, and stream2
1529        let result = repo.gc(&["test-stream1"])?;
1530
1531        assert!(!test_object_exists(&tmp, &obj1_id)?);
1532        assert!(test_object_exists(&tmp, &obj2_id)?);
1533        assert!(test_object_exists(&tmp, &obj3_id)?);
1534        assert!(!test_object_exists(&tmp, &obj4_id)?);
1535        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
1536        let link_target =
1537            read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
1538        assert!(test_path_exists_in_repo(
1539            &tmp,
1540            PathBuf::from("streams").join(&link_target)
1541        )?);
1542        assert!(!test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
1543
1544        // Verify GcResult: 3 objects removed (obj1, obj4, stream2's splitstream), 1 stream pruned
1545        assert_eq!(result.objects_removed, 3);
1546        assert!(result.objects_bytes > 0);
1547        assert_eq!(result.streams_pruned, 1);
1548        assert_eq!(result.images_pruned, 0);
1549        Ok(())
1550    }
1551
1552    #[test]
1553    fn test_gc_keeps_named_references() -> Result<()> {
1554        let tmp = tempdir();
1555        let repo = create_test_repo(&tmp.path().join("repo"))?;
1556
1557        let obj1 = generate_test_data(32 * 1024, 0xAE);
1558        let obj2 = generate_test_data(64 * 1024, 0xEA);
1559
1560        let obj1_id = repo.ensure_object(&obj1)?;
1561        let obj2_id: Sha512HashValue = compute_verity(&obj2);
1562
1563        let mut writer1 = repo.create_stream(0);
1564        writer1.write_external(&obj2)?;
1565        let stream1_id = repo.write_stream(writer1, "test-stream1", None)?;
1566
1567        let mut writer2 = repo.create_stream(0);
1568        writer2.add_named_stream_ref("test-stream1", &stream1_id);
1569        let _stream2_id = repo.write_stream(writer2, "test-stream2", None)?;
1570
1571        repo.sync()?;
1572
1573        assert!(test_object_exists(&tmp, &obj1_id)?);
1574        assert!(test_object_exists(&tmp, &obj2_id)?);
1575        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
1576        let link_target =
1577            read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
1578        assert!(test_path_exists_in_repo(
1579            &tmp,
1580            PathBuf::from("streams").join(&link_target)
1581        )?);
1582        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
1583        let link_target =
1584            read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
1585        assert!(test_path_exists_in_repo(
1586            &tmp,
1587            PathBuf::from("streams").join(&link_target)
1588        )?);
1589
1590        // Now perform gc - stream2 refs stream1, both kept, only obj1 removed
1591        let result = repo.gc(&["test-stream2"])?;
1592
1593        assert!(!test_object_exists(&tmp, &obj1_id)?);
1594        assert!(test_object_exists(&tmp, &obj2_id)?);
1595        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
1596        let link_target =
1597            read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
1598        assert!(test_path_exists_in_repo(
1599            &tmp,
1600            PathBuf::from("streams").join(&link_target)
1601        )?);
1602        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
1603        let link_target =
1604            read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
1605        assert!(test_path_exists_in_repo(
1606            &tmp,
1607            PathBuf::from("streams").join(&link_target)
1608        )?);
1609
1610        // Verify GcResult: 1 object removed, no symlinks pruned
1611        assert_eq!(result.objects_removed, 1);
1612        assert!(result.objects_bytes > 0);
1613        assert_eq!(result.streams_pruned, 0);
1614        assert_eq!(result.images_pruned, 0);
1615        Ok(())
1616    }
1617
1618    #[test]
1619    fn test_gc_keeps_named_references_with_different_table_name() -> Result<()> {
1620        let tmp = tempdir();
1621        let repo = create_test_repo(&tmp.path().join("repo"))?;
1622
1623        let obj1 = generate_test_data(32 * 1024, 0xAE);
1624        let obj2 = generate_test_data(64 * 1024, 0xEA);
1625
1626        let obj1_id = repo.ensure_object(&obj1)?;
1627        let obj2_id: Sha512HashValue = compute_verity(&obj2);
1628
1629        let mut writer1 = repo.create_stream(0);
1630        writer1.write_external(&obj2)?;
1631        let stream1_id = repo.write_stream(writer1, "test-stream1", None)?;
1632
1633        let mut writer2 = repo.create_stream(0);
1634        writer2.add_named_stream_ref("different-table-name-for-test-stream1", &stream1_id);
1635        let _stream2_id = repo.write_stream(writer2, "test-stream2", None)?;
1636
1637        repo.sync()?;
1638
1639        assert!(test_object_exists(&tmp, &obj1_id)?);
1640        assert!(test_object_exists(&tmp, &obj2_id)?);
1641        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
1642        let link_target =
1643            read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
1644        assert!(test_path_exists_in_repo(
1645            &tmp,
1646            PathBuf::from("streams").join(&link_target)
1647        )?);
1648        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
1649        let link_target =
1650            read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
1651        assert!(test_path_exists_in_repo(
1652            &tmp,
1653            PathBuf::from("streams").join(&link_target)
1654        )?);
1655
1656        // Now perform gc - different table name, but same object ID links them
1657        let result = repo.gc(&["test-stream2"])?;
1658
1659        assert!(!test_object_exists(&tmp, &obj1_id)?);
1660        assert!(test_object_exists(&tmp, &obj2_id)?);
1661        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
1662        let link_target =
1663            read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
1664        assert!(test_path_exists_in_repo(
1665            &tmp,
1666            PathBuf::from("streams").join(&link_target)
1667        )?);
1668        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
1669        let link_target =
1670            read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
1671        assert!(test_path_exists_in_repo(
1672            &tmp,
1673            PathBuf::from("streams").join(&link_target)
1674        )?);
1675
1676        // Verify GcResult: 1 object removed, no symlinks pruned
1677        assert_eq!(result.objects_removed, 1);
1678        assert!(result.objects_bytes > 0);
1679        assert_eq!(result.streams_pruned, 0);
1680        assert_eq!(result.images_pruned, 0);
1681        Ok(())
1682    }
1683
1684    #[test]
1685    fn test_gc_keeps_one_named_reference_from_two_overlapped() -> Result<()> {
1686        let tmp = tempdir();
1687        let repo = create_test_repo(&tmp.path().join("repo"))?;
1688
1689        let obj1 = generate_test_data(32 * 1024, 0xAE);
1690        let obj2 = generate_test_data(64 * 1024, 0xEA);
1691        let obj3 = generate_test_data(64 * 1024, 0xAA);
1692        let obj4 = generate_test_data(64 * 1024, 0xEE);
1693
1694        let obj1_id = repo.ensure_object(&obj1)?;
1695        let obj2_id: Sha512HashValue = compute_verity(&obj2);
1696        let obj3_id: Sha512HashValue = compute_verity(&obj3);
1697        let obj4_id: Sha512HashValue = compute_verity(&obj4);
1698
1699        let mut writer = repo.create_stream(0);
1700        writer.write_external(&obj2)?;
1701        let stream1_id = repo.write_stream(writer, "test-stream1", None)?;
1702
1703        let mut writer = repo.create_stream(0);
1704        writer.write_external(&obj3)?;
1705        let stream2_id = repo.write_stream(writer, "test-stream2", None)?;
1706
1707        let mut writer = repo.create_stream(0);
1708        writer.write_external(&obj4)?;
1709        let stream3_id = repo.write_stream(writer, "test-stream3", None)?;
1710
1711        let mut writer = repo.create_stream(0);
1712        writer.add_named_stream_ref("test-stream1", &stream1_id);
1713        writer.add_named_stream_ref("test-stream2", &stream2_id);
1714        let _ref_stream1_id = repo.write_stream(writer, "ref-stream1", None)?;
1715
1716        let mut writer = repo.create_stream(0);
1717        writer.add_named_stream_ref("test-stream1", &stream1_id);
1718        writer.add_named_stream_ref("test-stream3", &stream3_id);
1719        let _ref_stream2_id = repo.write_stream(writer, "ref-stream2", None)?;
1720
1721        repo.sync()?;
1722
1723        assert!(test_object_exists(&tmp, &obj1_id)?);
1724        assert!(test_object_exists(&tmp, &obj2_id)?);
1725        assert!(test_object_exists(&tmp, &obj3_id)?);
1726        assert!(test_object_exists(&tmp, &obj4_id)?);
1727        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
1728        let link_target =
1729            read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
1730        assert!(test_path_exists_in_repo(
1731            &tmp,
1732            PathBuf::from("streams").join(&link_target)
1733        )?);
1734        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
1735        let link_target =
1736            read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
1737        assert!(test_path_exists_in_repo(
1738            &tmp,
1739            PathBuf::from("streams").join(&link_target)
1740        )?);
1741        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream3")?);
1742        let link_target =
1743            read_links_in_repo(&tmp, "streams/test-stream3")?.expect("link is not broken");
1744        assert!(test_path_exists_in_repo(
1745            &tmp,
1746            PathBuf::from("streams").join(&link_target)
1747        )?);
1748        assert!(test_path_exists_in_repo(&tmp, "streams/ref-stream1")?);
1749        let link_target =
1750            read_links_in_repo(&tmp, "streams/ref-stream1")?.expect("link is not broken");
1751        assert!(test_path_exists_in_repo(
1752            &tmp,
1753            PathBuf::from("streams").join(&link_target)
1754        )?);
1755        assert!(test_path_exists_in_repo(&tmp, "streams/ref-stream2")?);
1756        let link_target =
1757            read_links_in_repo(&tmp, "streams/ref-stream2")?.expect("link is not broken");
1758        assert!(test_path_exists_in_repo(
1759            &tmp,
1760            PathBuf::from("streams").join(&link_target)
1761        )?);
1762
1763        // Now perform gc - ref-stream1 refs stream1+stream2, so keep those and their objects
1764        let result = repo.gc(&["ref-stream1"])?;
1765
1766        assert!(!test_object_exists(&tmp, &obj1_id)?);
1767        assert!(test_object_exists(&tmp, &obj2_id)?);
1768        assert!(test_object_exists(&tmp, &obj3_id)?);
1769        assert!(!test_object_exists(&tmp, &obj4_id)?);
1770        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
1771        let link_target =
1772            read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
1773        assert!(test_path_exists_in_repo(
1774            &tmp,
1775            PathBuf::from("streams").join(&link_target)
1776        )?);
1777        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
1778        let link_target =
1779            read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
1780        assert!(test_path_exists_in_repo(
1781            &tmp,
1782            PathBuf::from("streams").join(&link_target)
1783        )?);
1784        assert!(!test_path_exists_in_repo(&tmp, "streams/test-stream3")?);
1785        assert!(test_path_exists_in_repo(&tmp, "streams/ref-stream1")?);
1786        let link_target =
1787            read_links_in_repo(&tmp, "streams/ref-stream1")?.expect("link is not broken");
1788        assert!(test_path_exists_in_repo(
1789            &tmp,
1790            PathBuf::from("streams").join(&link_target)
1791        )?);
1792        assert!(!test_path_exists_in_repo(&tmp, "streams/ref-stream2")?);
1793
1794        // Verify GcResult: objects removed include obj1, obj4, plus splitstreams for stream3 and ref-stream2
1795        assert_eq!(result.objects_removed, 4);
1796        assert!(result.objects_bytes > 0);
1797        assert_eq!(result.streams_pruned, 2);
1798        assert_eq!(result.images_pruned, 0);
1799
1800        Ok(())
1801    }
1802
1803    use crate::tree::{FileSystem, Inode, Leaf, LeafContent, RegularFile, Stat};
1804
1805    /// Create a default root stat for test filesystems
1806    fn test_root_stat() -> Stat {
1807        Stat {
1808            st_mode: 0o755,
1809            st_uid: 0,
1810            st_gid: 0,
1811            st_mtim_sec: 0,
1812            xattrs: Default::default(),
1813        }
1814    }
1815
1816    /// Make a test in-memory filesystem that only contains one externally referenced object
1817    fn make_test_fs(obj: &Sha512HashValue, size: u64) -> FileSystem<Sha512HashValue> {
1818        let mut fs: FileSystem<Sha512HashValue> = FileSystem::new(test_root_stat());
1819        let inode = Inode::Leaf(std::rc::Rc::new(Leaf {
1820            stat: Stat {
1821                st_mode: 0o644,
1822                st_uid: 0,
1823                st_gid: 0,
1824                st_mtim_sec: 0,
1825                xattrs: Default::default(),
1826            },
1827            content: LeafContent::Regular(RegularFile::External(obj.clone(), size)),
1828        }));
1829        fs.root.insert(OsStr::new("data"), inode);
1830        fs
1831    }
1832
1833    #[test]
1834    fn test_gc_removes_one_image() -> Result<()> {
1835        let tmp = tempdir();
1836        let repo = create_test_repo(&tmp.path().join("repo"))?;
1837
1838        let obj1_size: u64 = 32 * 1024;
1839        let obj1 = generate_test_data(obj1_size, 0xAE);
1840        let obj2_size: u64 = 64 * 1024;
1841        let obj2 = generate_test_data(obj2_size, 0xEA);
1842
1843        let obj1_id = repo.ensure_object(&obj1)?;
1844        let obj2_id = repo.ensure_object(&obj2)?;
1845
1846        let fs = make_test_fs(&obj2_id, obj2_size);
1847        let image1 = fs.commit_image(&repo, None)?;
1848        let image1_path = format!("images/{}", image1.to_hex());
1849
1850        repo.sync()?;
1851
1852        assert!(test_object_exists(&tmp, &obj1_id)?);
1853        assert!(test_object_exists(&tmp, &obj2_id)?);
1854        assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
1855        let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
1856        assert!(test_path_exists_in_repo(
1857            &tmp,
1858            PathBuf::from("images").join(&link_target)
1859        )?);
1860
1861        // Now perform gc - no refs, so image and both objects removed
1862        let result = repo.gc(&[])?;
1863
1864        assert!(!test_object_exists(&tmp, &obj1_id)?);
1865        assert!(!test_object_exists(&tmp, &obj2_id)?);
1866        assert!(!test_path_exists_in_repo(&tmp, &image1_path)?);
1867
1868        // Verify GcResult: 3 objects removed (obj1, obj2, image erofs), 1 image pruned
1869        assert_eq!(result.objects_removed, 3);
1870        assert!(result.objects_bytes > 0);
1871        assert_eq!(result.images_pruned, 1);
1872        assert_eq!(result.streams_pruned, 0);
1873        Ok(())
1874    }
1875
1876    #[test]
1877    fn test_gc_keeps_one_image() -> Result<()> {
1878        let tmp = tempdir();
1879        let repo = create_test_repo(&tmp.path().join("repo"))?;
1880
1881        let obj1_size: u64 = 32 * 1024;
1882        let obj1 = generate_test_data(obj1_size, 0xAE);
1883        let obj2_size: u64 = 64 * 1024;
1884        let obj2 = generate_test_data(obj2_size, 0xEA);
1885
1886        let obj1_id = repo.ensure_object(&obj1)?;
1887        let obj2_id = repo.ensure_object(&obj2)?;
1888
1889        let fs = make_test_fs(&obj2_id, obj2_size);
1890        let image1 = fs.commit_image(&repo, None)?;
1891        let image1_path = format!("images/{}", image1.to_hex());
1892
1893        repo.sync()?;
1894
1895        assert!(test_object_exists(&tmp, &obj1_id)?);
1896        assert!(test_object_exists(&tmp, &obj2_id)?);
1897        assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
1898        let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
1899        assert!(test_path_exists_in_repo(
1900            &tmp,
1901            PathBuf::from("images").join(&link_target)
1902        )?);
1903
1904        // Now perform gc - keep image via additional_roots
1905        let image1_hex = image1.to_hex();
1906        let result = repo.gc(&[image1_hex.as_str()])?;
1907
1908        assert!(!test_object_exists(&tmp, &obj1_id)?);
1909        assert!(test_object_exists(&tmp, &obj2_id)?);
1910        assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
1911        let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
1912        assert!(test_path_exists_in_repo(
1913            &tmp,
1914            PathBuf::from("images").join(&link_target)
1915        )?);
1916
1917        // Verify GcResult: 1 object removed (obj1), no symlinks pruned
1918        assert_eq!(result.objects_removed, 1);
1919        assert!(result.objects_bytes > 0);
1920        assert_eq!(result.images_pruned, 0);
1921        assert_eq!(result.streams_pruned, 0);
1922        Ok(())
1923    }
1924
1925    #[test]
1926    fn test_gc_keeps_one_image_from_refs() -> Result<()> {
1927        let tmp = tempdir();
1928        let repo = create_test_repo(&tmp.path().join("repo"))?;
1929
1930        let obj1_size: u64 = 32 * 1024;
1931        let obj1 = generate_test_data(obj1_size, 0xAE);
1932        let obj2_size: u64 = 64 * 1024;
1933        let obj2 = generate_test_data(obj2_size, 0xEA);
1934
1935        let obj1_id = repo.ensure_object(&obj1)?;
1936        let obj2_id = repo.ensure_object(&obj2)?;
1937
1938        let fs = make_test_fs(&obj2_id, obj2_size);
1939        let image1 = fs.commit_image(&repo, Some("ref-name"))?;
1940        let image1_path = format!("images/{}", image1.to_hex());
1941
1942        repo.sync()?;
1943
1944        assert!(test_object_exists(&tmp, &obj1_id)?);
1945        assert!(test_object_exists(&tmp, &obj2_id)?);
1946        assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
1947        let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
1948        assert!(test_path_exists_in_repo(
1949            &tmp,
1950            PathBuf::from("images").join(&link_target)
1951        )?);
1952
1953        // Now perform gc - image kept via ref, only obj1 removed
1954        let result = repo.gc(&[])?;
1955
1956        assert!(!test_object_exists(&tmp, &obj1_id)?);
1957        assert!(test_object_exists(&tmp, &obj2_id)?);
1958        assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
1959        let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
1960        assert!(test_path_exists_in_repo(
1961            &tmp,
1962            PathBuf::from("images").join(&link_target)
1963        )?);
1964
1965        // Verify GcResult: 1 object removed, no symlinks pruned (image has ref)
1966        assert_eq!(result.objects_removed, 1);
1967        assert!(result.objects_bytes > 0);
1968        assert_eq!(result.images_pruned, 0);
1969        assert_eq!(result.streams_pruned, 0);
1970        Ok(())
1971    }
1972
1973    fn make_test_fs_with_two_files(
1974        obj1: &Sha512HashValue,
1975        size1: u64,
1976        obj2: &Sha512HashValue,
1977        size2: u64,
1978    ) -> FileSystem<Sha512HashValue> {
1979        let mut fs = make_test_fs(obj1, size1);
1980        let inode = Inode::Leaf(std::rc::Rc::new(Leaf {
1981            stat: Stat {
1982                st_mode: 0o644,
1983                st_uid: 0,
1984                st_gid: 0,
1985                st_mtim_sec: 0,
1986                xattrs: Default::default(),
1987            },
1988            content: LeafContent::Regular(RegularFile::External(obj2.clone(), size2)),
1989        }));
1990        fs.root.insert(OsStr::new("extra_data"), inode);
1991        fs
1992    }
1993
1994    #[test]
1995    fn test_gc_keeps_one_image_from_two_overlapped() -> Result<()> {
1996        let tmp = tempdir();
1997        let repo = create_test_repo(&tmp.path().join("repo"))?;
1998
1999        let obj1_size: u64 = 32 * 1024;
2000        let obj1 = generate_test_data(obj1_size, 0xAE);
2001        let obj2_size: u64 = 64 * 1024;
2002        let obj2 = generate_test_data(obj2_size, 0xEA);
2003        let obj3_size: u64 = 64 * 1024;
2004        let obj3 = generate_test_data(obj2_size, 0xAA);
2005        let obj4_size: u64 = 64 * 1024;
2006        let obj4 = generate_test_data(obj2_size, 0xEE);
2007
2008        let obj1_id = repo.ensure_object(&obj1)?;
2009        let obj2_id = repo.ensure_object(&obj2)?;
2010        let obj3_id = repo.ensure_object(&obj3)?;
2011        let obj4_id = repo.ensure_object(&obj4)?;
2012
2013        let fs = make_test_fs_with_two_files(&obj2_id, obj2_size, &obj3_id, obj3_size);
2014        let image1 = fs.commit_image(&repo, None)?;
2015        let image1_path = format!("images/{}", image1.to_hex());
2016
2017        let fs = make_test_fs_with_two_files(&obj2_id, obj2_size, &obj4_id, obj4_size);
2018        let image2 = fs.commit_image(&repo, None)?;
2019        let image2_path = format!("images/{}", image2.to_hex());
2020
2021        repo.sync()?;
2022
2023        assert!(test_object_exists(&tmp, &obj1_id)?);
2024        assert!(test_object_exists(&tmp, &obj2_id)?);
2025        assert!(test_object_exists(&tmp, &obj3_id)?);
2026        assert!(test_object_exists(&tmp, &obj4_id)?);
2027        assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
2028        let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
2029        assert!(test_path_exists_in_repo(
2030            &tmp,
2031            PathBuf::from("images").join(&link_target)
2032        )?);
2033        assert!(test_path_exists_in_repo(&tmp, &image2_path)?);
2034        let link_target = read_links_in_repo(&tmp, &image2_path)?.expect("link is not broken");
2035        assert!(test_path_exists_in_repo(
2036            &tmp,
2037            PathBuf::from("images").join(&link_target)
2038        )?);
2039
2040        // Now perform gc - keep image1, remove image2 and its unique objects
2041        let image1_hex = image1.to_hex();
2042        let result = repo.gc(&[image1_hex.as_str()])?;
2043
2044        assert!(!test_object_exists(&tmp, &obj1_id)?);
2045        assert!(test_object_exists(&tmp, &obj2_id)?);
2046        assert!(test_object_exists(&tmp, &obj3_id)?);
2047        assert!(!test_object_exists(&tmp, &obj4_id)?);
2048        assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
2049        let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
2050        assert!(test_path_exists_in_repo(
2051            &tmp,
2052            PathBuf::from("images").join(&link_target)
2053        )?);
2054        assert!(!test_path_exists_in_repo(&tmp, &image2_path)?);
2055
2056        // Verify GcResult: 3 objects removed (obj1, obj4, image2 erofs), 1 image pruned
2057        assert_eq!(result.objects_removed, 3);
2058        assert!(result.objects_bytes > 0);
2059        assert_eq!(result.images_pruned, 1);
2060        assert_eq!(result.streams_pruned, 0);
2061        Ok(())
2062    }
2063}