1use std::{
81 collections::{HashMap, HashSet},
82 ffi::{CStr, CString, OsStr, OsString},
83 fs::{canonicalize, File},
84 io::{Read, Write},
85 os::{
86 fd::{AsFd, OwnedFd},
87 unix::ffi::OsStrExt,
88 },
89 path::{Path, PathBuf},
90 sync::Arc,
91 thread::available_parallelism,
92};
93
94use log::{debug, trace};
95use tokio::sync::Semaphore;
96
97use anyhow::{bail, ensure, Context, Result};
98use once_cell::sync::OnceCell;
99use rustix::{
100 fs::{
101 flock, linkat, mkdirat, open, openat, readlinkat, statat, syncfs, unlinkat, AtFlags, Dir,
102 FileType, FlockOperation, Mode, OFlags, CWD,
103 },
104 io::{Errno, Result as ErrnoResult},
105};
106
107use crate::{
108 fsverity::{
109 compute_verity, enable_verity_maybe_copy, ensure_verity_equal, measure_verity,
110 CompareVerityError, EnableVerityError, FsVerityHashValue, FsVerityHasher,
111 MeasureVerityError,
112 },
113 mount::{composefs_fsmount, mount_at},
114 splitstream::{SplitStreamReader, SplitStreamWriter},
115 util::{proc_self_fd, replace_symlinkat, ErrnoFilter},
116};
117
118fn ensure_dir_and_openat(dirfd: impl AsFd, filename: &str, flags: OFlags) -> ErrnoResult<OwnedFd> {
123 match openat(
124 &dirfd,
125 filename,
126 flags | OFlags::CLOEXEC | OFlags::DIRECTORY,
127 0o666.into(),
128 ) {
129 Ok(file) => Ok(file),
130 Err(Errno::NOENT) => match mkdirat(&dirfd, filename, 0o777.into()) {
131 Ok(()) | Err(Errno::EXIST) => openat(
132 dirfd,
133 filename,
134 flags | OFlags::CLOEXEC | OFlags::DIRECTORY,
135 0o666.into(),
136 ),
137 Err(other) => Err(other),
138 },
139 Err(other) => Err(other),
140 }
141}
142
143pub struct Repository<ObjectID: FsVerityHashValue> {
150 repository: OwnedFd,
151 objects: OnceCell<OwnedFd>,
152 write_semaphore: OnceCell<Arc<Semaphore>>,
153 insecure: bool,
154 _data: std::marker::PhantomData<ObjectID>,
155}
156
157impl<ObjectID: FsVerityHashValue> std::fmt::Debug for Repository<ObjectID> {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 f.debug_struct("Repository")
160 .field("repository", &self.repository)
161 .field("objects", &self.objects)
162 .field("insecure", &self.insecure)
163 .finish_non_exhaustive()
164 }
165}
166
167impl<ObjectID: FsVerityHashValue> Drop for Repository<ObjectID> {
168 fn drop(&mut self) {
169 flock(&self.repository, FlockOperation::Unlock).expect("repository unlock failed");
170 }
171}
172
173enum GCCategoryWalkMode {
175 RefsOnly,
176 AllEntries,
177}
178
179#[derive(Debug, Clone, Default, PartialEq, Eq)]
183pub struct GcResult {
184 pub objects_removed: u64,
186 pub objects_bytes: u64,
188 pub images_pruned: u64,
190 pub streams_pruned: u64,
192}
193
194impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
195 pub fn objects_dir(&self) -> ErrnoResult<&OwnedFd> {
197 self.objects
198 .get_or_try_init(|| ensure_dir_and_openat(&self.repository, "objects", OFlags::PATH))
199 }
200
201 pub fn write_semaphore(&self) -> Arc<Semaphore> {
207 self.write_semaphore
208 .get_or_init(|| {
209 let max_concurrent = available_parallelism().map(|n| n.get()).unwrap_or(4);
210 Arc::new(Semaphore::new(max_concurrent))
211 })
212 .clone()
213 }
214
215 pub fn open_path(dirfd: impl AsFd, path: impl AsRef<Path>) -> Result<Self> {
217 let path = path.as_ref();
218
219 let repository = openat(dirfd, path, OFlags::RDONLY | OFlags::CLOEXEC, Mode::empty())
221 .with_context(|| format!("Cannot open composefs repository at {}", path.display()))?;
222
223 flock(&repository, FlockOperation::LockShared)
224 .context("Cannot lock composefs repository")?;
225
226 Ok(Self {
227 repository,
228 objects: OnceCell::new(),
229 write_semaphore: OnceCell::new(),
230 insecure: false,
231 _data: std::marker::PhantomData,
232 })
233 }
234
235 pub fn open_user() -> Result<Self> {
237 let home = std::env::var("HOME").with_context(|| "$HOME must be set when in user mode")?;
238
239 Self::open_path(CWD, PathBuf::from(home).join(".var/lib/composefs"))
240 }
241
242 pub fn open_system() -> Result<Self> {
244 Self::open_path(CWD, PathBuf::from("/sysroot/composefs".to_string()))
245 }
246
247 fn ensure_dir(&self, dir: impl AsRef<Path>) -> ErrnoResult<()> {
248 mkdirat(&self.repository, dir.as_ref(), 0o755.into()).or_else(|e| match e {
249 Errno::EXIST => Ok(()),
250 _ => Err(e),
251 })
252 }
253
254 pub async fn ensure_object_async(self: &Arc<Self>, data: Vec<u8>) -> Result<ObjectID> {
262 let self_ = Arc::clone(self);
263 tokio::task::spawn_blocking(move || self_.ensure_object(&data)).await?
264 }
265
266 pub fn create_object_tmpfile(&self) -> Result<OwnedFd> {
272 let objects_dir = self.objects_dir()?;
273 let fd = openat(
274 objects_dir,
275 ".",
276 OFlags::RDWR | OFlags::TMPFILE | OFlags::CLOEXEC,
277 Mode::from_raw_mode(0o644),
278 )?;
279 Ok(fd)
280 }
281
282 pub fn spawn_finalize_object_tmpfile(
293 self: &Arc<Self>,
294 tmpfile_fd: OwnedFd,
295 size: u64,
296 ) -> tokio::task::JoinHandle<Result<ObjectID>> {
297 let self_ = Arc::clone(self);
298 tokio::task::spawn_blocking(move || self_.finalize_object_tmpfile(tmpfile_fd.into(), size))
299 }
300
301 pub fn finalize_object_tmpfile(&self, file: File, size: u64) -> Result<ObjectID> {
316 let fd_path = proc_self_fd(&file);
318 let ro_fd = open(&*fd_path, OFlags::RDONLY | OFlags::CLOEXEC, Mode::empty())?;
319
320 drop(file);
322
323 let objects_dir = self.objects_dir()?;
325
326 let (ro_fd, verity_enabled) =
330 match enable_verity_maybe_copy::<ObjectID>(objects_dir, ro_fd.as_fd()) {
331 Ok(None) => (ro_fd, true),
332 Ok(Some(new_fd)) => (new_fd, true),
333 Err(EnableVerityError::FilesystemNotSupported) if self.insecure => (ro_fd, false),
334 Err(EnableVerityError::AlreadyEnabled) => (ro_fd, true),
335 Err(other) => return Err(other).context("Enabling verity on tmpfile")?,
336 };
337
338 let id: ObjectID = if verity_enabled {
340 measure_verity(&ro_fd).context("Measuring verity digest")?
341 } else {
342 let mut reader = std::io::BufReader::new(File::from(ro_fd.try_clone()?));
344 Self::compute_verity_digest(&mut reader)?
345 };
346
347 let path = id.to_object_pathname();
349
350 match statat(objects_dir, &path, AtFlags::empty()) {
351 Ok(stat) if stat.st_size as u64 == size => {
352 return Ok(id);
354 }
355 _ => {}
356 }
357
358 let parent_dir = id.to_object_dir();
360 let _ = mkdirat(objects_dir, &parent_dir, Mode::from_raw_mode(0o755));
361
362 match linkat(
364 CWD,
365 proc_self_fd(&ro_fd),
366 objects_dir,
367 &path,
368 AtFlags::SYMLINK_FOLLOW,
369 ) {
370 Ok(()) => Ok(id),
371 Err(Errno::EXIST) => Ok(id), Err(e) => Err(e).context("Linking tmpfile into objects directory")?,
373 }
374 }
375
376 fn compute_verity_digest(reader: &mut impl std::io::BufRead) -> Result<ObjectID> {
379 let mut hasher = FsVerityHasher::<ObjectID>::new();
380
381 loop {
382 let buf = reader.fill_buf()?;
383 if buf.is_empty() {
384 break;
385 }
386 let chunk_size = buf.len().min(FsVerityHasher::<ObjectID>::BLOCK_SIZE);
388 hasher.add_block(&buf[..chunk_size]);
389 reader.consume(chunk_size);
390 }
391
392 Ok(hasher.digest())
393 }
394
395 fn store_object_with_id(&self, data: &[u8], id: &ObjectID) -> Result<()> {
400 let dirfd = self.objects_dir()?;
401 let path = id.to_object_pathname();
402
403 match openat(
405 dirfd,
406 &path,
407 OFlags::RDONLY | OFlags::CLOEXEC,
408 Mode::empty(),
409 ) {
410 Ok(fd) => {
411 match ensure_verity_equal(&fd, id) {
414 Ok(()) => {}
415 Err(CompareVerityError::Measure(MeasureVerityError::VerityMissing))
416 if self.insecure =>
417 {
418 match enable_verity_maybe_copy::<ObjectID>(dirfd, fd.as_fd()) {
419 Ok(Some(fd)) => ensure_verity_equal(&fd, id)?,
420 Ok(None) => ensure_verity_equal(&fd, id)?,
421 Err(other) => Err(other)?,
422 }
423 }
424 Err(CompareVerityError::Measure(
425 MeasureVerityError::FilesystemNotSupported,
426 )) if self.insecure => {}
427 Err(other) => Err(other)?,
428 }
429 return Ok(());
430 }
431 Err(Errno::NOENT) => {
432 }
434 Err(other) => {
435 return Err(other).context("Checking for existing object in repository")?;
436 }
437 }
438
439 let fd = ensure_dir_and_openat(dirfd, &id.to_object_dir(), OFlags::RDWR | OFlags::TMPFILE)?;
440 let mut file = File::from(fd);
441 file.write_all(data)?;
442 let ro_fd = open(
444 proc_self_fd(&file),
445 OFlags::RDONLY | OFlags::CLOEXEC,
446 Mode::empty(),
447 )?;
448 drop(file);
452
453 let ro_fd = match enable_verity_maybe_copy::<ObjectID>(dirfd, ro_fd.as_fd()) {
454 Ok(maybe_fd) => {
455 let ro_fd = maybe_fd.unwrap_or(ro_fd);
456 match ensure_verity_equal(&ro_fd, id) {
457 Ok(()) => ro_fd,
458 Err(CompareVerityError::Measure(
459 MeasureVerityError::VerityMissing
460 | MeasureVerityError::FilesystemNotSupported,
461 )) if self.insecure => ro_fd,
462 Err(other) => Err(other).context("Double-checking verity digest")?,
463 }
464 }
465 Err(EnableVerityError::FilesystemNotSupported) if self.insecure => ro_fd,
466 Err(other) => Err(other).context("Enabling verity digest")?,
467 };
468
469 match linkat(
470 CWD,
471 proc_self_fd(&ro_fd),
472 dirfd,
473 path,
474 AtFlags::SYMLINK_FOLLOW,
475 ) {
476 Ok(()) => {}
477 Err(Errno::EXIST) => {
478 }
480 Err(other) => {
481 return Err(other).context("Linking created object file");
482 }
483 }
484
485 Ok(())
486 }
487
488 pub fn ensure_object(&self, data: &[u8]) -> Result<ObjectID> {
493 let id: ObjectID = compute_verity(data);
494 self.store_object_with_id(data, &id)?;
495 Ok(id)
496 }
497
498 fn open_with_verity(&self, filename: &str, expected_verity: &ObjectID) -> Result<OwnedFd> {
499 let fd = self.openat(filename, OFlags::RDONLY)?;
500 match ensure_verity_equal(&fd, expected_verity) {
501 Ok(()) => {}
502 Err(CompareVerityError::Measure(
503 MeasureVerityError::VerityMissing | MeasureVerityError::FilesystemNotSupported,
504 )) if self.insecure => {}
505 Err(other) => Err(other)?,
506 }
507 Ok(fd)
508 }
509
510 pub fn set_insecure(&mut self, insecure: bool) -> &mut Self {
515 self.insecure = insecure;
516 self
517 }
518
519 pub fn create_stream(self: &Arc<Self>, content_type: u64) -> SplitStreamWriter<ObjectID> {
523 SplitStreamWriter::new(self, content_type)
524 }
525
526 fn format_object_path(id: &ObjectID) -> String {
527 format!("objects/{}", id.to_object_pathname())
528 }
529
530 fn format_stream_path(content_identifier: &str) -> String {
531 format!("streams/{content_identifier}")
532 }
533
534 pub fn has_stream(&self, content_identifier: &str) -> Result<Option<ObjectID>> {
537 let stream_path = Self::format_stream_path(content_identifier);
538
539 match readlinkat(&self.repository, &stream_path, []) {
540 Ok(target) => {
541 let bytes = target.as_bytes();
542 ensure!(
543 bytes.starts_with(b"../"),
544 "stream symlink has incorrect prefix"
545 );
546 Ok(Some(ObjectID::from_object_pathname(bytes)?))
547 }
548 Err(Errno::NOENT) => Ok(None),
549 Err(err) => Err(err)?,
550 }
551 }
552
553 pub fn write_stream(
563 &self,
564 writer: SplitStreamWriter<ObjectID>,
565 content_identifier: &str,
566 reference: Option<&str>,
567 ) -> Result<ObjectID> {
568 let object_id = writer.done()?;
569
570 self.sync()?;
581
582 let stream_path = Self::format_stream_path(content_identifier);
583 let object_path = Self::format_object_path(&object_id);
584 self.symlink(&stream_path, &object_path)?;
585
586 if let Some(name) = reference {
587 let reference_path = format!("streams/refs/{name}");
588 self.symlink(&reference_path, &stream_path)?;
589 }
590
591 Ok(object_id)
592 }
593
594 pub async fn register_stream(
603 self: &Arc<Self>,
604 object_id: &ObjectID,
605 content_identifier: &str,
606 reference: Option<&str>,
607 ) -> Result<()> {
608 self.sync_async().await?;
609
610 let stream_path = Self::format_stream_path(content_identifier);
611 let object_path = Self::format_object_path(object_id);
612 self.symlink(&stream_path, &object_path)?;
613
614 if let Some(name) = reference {
615 let reference_path = format!("streams/refs/{name}");
616 self.symlink(&reference_path, &stream_path)?;
617 }
618
619 Ok(())
620 }
621
622 pub async fn write_stream_async(
628 self: &Arc<Self>,
629 writer: SplitStreamWriter<ObjectID>,
630 content_identifier: &str,
631 reference: Option<&str>,
632 ) -> Result<ObjectID> {
633 let object_id = writer.done_async().await?;
634
635 self.sync_async().await?;
636
637 let stream_path = Self::format_stream_path(content_identifier);
638 let object_path = Self::format_object_path(&object_id);
639 self.symlink(&stream_path, &object_path)?;
640
641 if let Some(name) = reference {
642 let reference_path = format!("streams/refs/{name}");
643 self.symlink(&reference_path, &stream_path)?;
644 }
645
646 Ok(object_id)
647 }
648
649 pub fn has_named_stream(&self, name: &str) -> Result<bool> {
651 let stream_path = format!("streams/refs/{name}");
652
653 Ok(statat(&self.repository, &stream_path, AtFlags::empty())
654 .filter_errno(Errno::NOENT)
655 .context("Looking for stream {name} in repository")?
656 .map(|s| FileType::from_raw_mode(s.st_mode).is_symlink())
657 .unwrap_or(false))
658 }
659
660 pub fn name_stream(&self, content_identifier: &str, name: &str) -> Result<()> {
672 let stream_path = Self::format_stream_path(content_identifier);
673 let reference_path = format!("streams/refs/{name}");
674 self.symlink(&reference_path, &stream_path)?;
675 Ok(())
676 }
677
678 pub fn ensure_stream(
693 self: &Arc<Self>,
694 content_identifier: &str,
695 content_type: u64,
696 callback: impl FnOnce(&mut SplitStreamWriter<ObjectID>) -> Result<()>,
697 reference: Option<&str>,
698 ) -> Result<ObjectID> {
699 let stream_path = Self::format_stream_path(content_identifier);
700
701 let object_id = match self.has_stream(content_identifier)? {
702 Some(id) => id,
703 None => {
704 let mut writer = self.create_stream(content_type);
705 callback(&mut writer)?;
706 self.write_stream(writer, content_identifier, reference)?
707 }
708 };
709
710 if let Some(name) = reference {
711 let reference_path = format!("streams/refs/{name}");
712 self.symlink(&reference_path, &stream_path)?;
713 }
714
715 Ok(object_id)
716 }
717
718 pub fn open_stream(
720 &self,
721 content_identifier: &str,
722 verity: Option<&ObjectID>,
723 expected_content_type: Option<u64>,
724 ) -> Result<SplitStreamReader<ObjectID>> {
725 let file = File::from(if let Some(verity_hash) = verity {
726 self.open_object(verity_hash)
727 .with_context(|| format!("Opening object '{verity_hash:?}'"))?
728 } else {
729 let filename = Self::format_stream_path(content_identifier);
730 self.openat(&filename, OFlags::RDONLY)
731 .with_context(|| format!("Opening ref '{filename}'"))?
732 });
733
734 SplitStreamReader::new(file, expected_content_type)
735 }
736
737 pub fn open_object(&self, id: &ObjectID) -> Result<OwnedFd> {
740 self.open_with_verity(&Self::format_object_path(id), id)
741 }
742
743 pub fn read_object(&self, id: &ObjectID) -> Result<Vec<u8>> {
745 let mut data = vec![];
746 File::from(self.open_object(id)?).read_to_end(&mut data)?;
747 Ok(data)
748 }
749
750 pub fn merge_splitstream(
756 &self,
757 content_identifier: &str,
758 verity: Option<&ObjectID>,
759 expected_content_type: Option<u64>,
760 output: &mut impl Write,
761 ) -> Result<()> {
762 let mut split_stream =
763 self.open_stream(content_identifier, verity, expected_content_type)?;
764 split_stream.cat(self, output)
765 }
766
767 pub fn write_image(&self, name: Option<&str>, data: &[u8]) -> Result<ObjectID> {
775 let object_id = self.ensure_object(data)?;
776
777 let object_path = Self::format_object_path(&object_id);
778 let image_path = format!("images/{}", object_id.to_hex());
779
780 self.symlink(&image_path, &object_path)?;
781
782 if let Some(reference) = name {
783 let ref_path = format!("images/refs/{reference}");
784 self.symlink(&ref_path, &image_path)?;
785 }
786
787 Ok(object_id)
788 }
789
790 pub fn import_image<R: Read>(&self, name: &str, image: &mut R) -> Result<ObjectID> {
798 let mut data = vec![];
799 image.read_to_end(&mut data)?;
800 self.write_image(Some(name), &data)
801 }
802
803 fn open_image(&self, name: &str) -> Result<(OwnedFd, bool)> {
806 let image = self
807 .openat(&format!("images/{name}"), OFlags::RDONLY)
808 .with_context(|| format!("Opening ref 'images/{name}'"))?;
809
810 if name.contains("/") {
811 return Ok((image, true));
812 }
813
814 match measure_verity::<ObjectID>(&image) {
816 Ok(found) if found == FsVerityHashValue::from_hex(name)? => Ok((image, true)),
817 Ok(_) => bail!("fs-verity content mismatch"),
818 Err(MeasureVerityError::VerityMissing | MeasureVerityError::FilesystemNotSupported)
819 if self.insecure =>
820 {
821 Ok((image, false))
822 }
823 Err(other) => Err(other)?,
824 }
825 }
826
827 pub fn mount(&self, name: &str) -> Result<OwnedFd> {
830 let (image, enable_verity) = self.open_image(name)?;
831 Ok(composefs_fsmount(
832 image,
833 name,
834 self.objects_dir()?,
835 enable_verity,
836 )?)
837 }
838
839 pub fn mount_at(&self, name: &str, mountpoint: impl AsRef<Path>) -> Result<()> {
841 Ok(mount_at(
842 self.mount(name)?,
843 CWD,
844 &canonicalize(mountpoint)?,
845 )?)
846 }
847
848 pub fn symlink(&self, name: impl AsRef<Path>, target: impl AsRef<Path>) -> ErrnoResult<()> {
854 let name = name.as_ref();
855
856 let mut symlink_components = name.parent().unwrap().components().peekable();
857 let mut target_components = target.as_ref().components().peekable();
858
859 let mut symlink_ancestor = PathBuf::new();
860
861 while symlink_components.peek() == target_components.peek() {
863 symlink_ancestor.push(symlink_components.next().unwrap());
864 target_components.next().unwrap();
865 }
866
867 let mut relative = PathBuf::new();
868 for symlink_component in symlink_components {
871 symlink_ancestor.push(symlink_component);
872 self.ensure_dir(&symlink_ancestor)?;
873 relative.push("..");
874 }
875
876 for target_component in target_components {
878 relative.push(target_component);
879 }
880
881 replace_symlinkat(&relative, &self.repository, name)
883 }
884
885 fn read_symlink_hashvalue(dirfd: &OwnedFd, name: &CStr) -> Result<ObjectID> {
886 let link_content = readlinkat(dirfd, name, [])?;
887 Ok(ObjectID::from_object_pathname(link_content.to_bytes())?)
888 }
889
890 fn walk_symlinkdir(fd: OwnedFd, entry_digests: &mut HashSet<OsString>) -> Result<()> {
891 for item in Dir::read_from(&fd)? {
892 let entry = item?;
893 match entry.file_type() {
896 FileType::Directory => {
897 let filename = entry.file_name();
898 if filename != c"." && filename != c".." {
899 let dirfd = openat(
900 &fd,
901 filename,
902 OFlags::RDONLY | OFlags::CLOEXEC,
903 Mode::empty(),
904 )?;
905 Self::walk_symlinkdir(dirfd, entry_digests)?;
906 }
907 }
908 FileType::Symlink => {
909 let link_content = readlinkat(&fd, entry.file_name(), [])?;
910 let linked_path = Path::new(OsStr::from_bytes(link_content.as_bytes()));
911 if let Some(entry_name) = linked_path.file_name() {
912 entry_digests.insert(entry_name.to_os_string());
913 } else {
914 continue;
917 }
918 }
919 _ => {
920 bail!("Unexpected file type encountered");
921 }
922 }
923 }
924
925 Ok(())
926 }
927
928 fn openat(&self, name: &str, flags: OFlags) -> ErrnoResult<OwnedFd> {
930 openat(
932 &self.repository,
933 name,
934 flags | OFlags::CLOEXEC,
935 Mode::empty(),
936 )
937 }
938
939 fn gc_category(
948 &self,
949 category: &str,
950 mode: GCCategoryWalkMode,
951 ) -> Result<Vec<(ObjectID, String)>> {
952 let Some(category_fd) = self
953 .openat(category, OFlags::RDONLY | OFlags::DIRECTORY)
954 .filter_errno(Errno::NOENT)
955 .context(format!("Opening {category} dir in repository"))?
956 else {
957 return Ok(Vec::new());
958 };
959
960 let mut entry_digests = HashSet::new();
961 match mode {
962 GCCategoryWalkMode::RefsOnly => {
963 if let Some(refs) = openat(
964 &category_fd,
965 "refs",
966 OFlags::RDONLY | OFlags::DIRECTORY | OFlags::CLOEXEC,
967 Mode::empty(),
968 )
969 .filter_errno(Errno::NOENT)
970 .context(format!("Opening {category}/refs dir in repository"))?
971 {
972 Self::walk_symlinkdir(refs, &mut entry_digests)?;
973 }
974 }
975 GCCategoryWalkMode::AllEntries => {
976 for item in Dir::read_from(&category_fd)? {
978 let entry = item?;
979 let filename = entry.file_name();
980 if filename != c"refs" && filename != c"." && filename != c".." {
981 if entry.file_type() != FileType::Symlink {
982 bail!("category directory contains non-symlink");
983 }
984 entry_digests.insert(OsString::from(&OsStr::from_bytes(
985 entry.file_name().to_bytes(),
986 )));
987 }
988 }
989 }
990 }
991
992 let objects = entry_digests
993 .into_iter()
994 .map(|entry_fn| {
995 Ok((
996 Self::read_symlink_hashvalue(
997 &category_fd,
998 CString::new(entry_fn.as_bytes())?.as_c_str(),
999 )?,
1000 entry_fn
1001 .to_str()
1002 .context("str conversion fails")?
1003 .to_owned(),
1004 ))
1005 })
1006 .collect::<Result<_>>()?;
1007
1008 Ok(objects)
1009 }
1010
1011 fn cleanup_broken_links(fd: &OwnedFd, recursive: bool, dry_run: bool) -> Result<u64> {
1015 let mut count = 0;
1016 for item in Dir::read_from(fd)? {
1017 let entry = item?;
1018 match entry.file_type() {
1019 FileType::Directory => {
1020 if !recursive {
1021 continue;
1022 }
1023 let filename = entry.file_name();
1024 if filename != c"." && filename != c".." {
1025 let dirfd = openat(
1026 fd,
1027 filename,
1028 OFlags::RDONLY | OFlags::CLOEXEC,
1029 Mode::empty(),
1030 )?;
1031 count += Self::cleanup_broken_links(&dirfd, recursive, dry_run)?;
1032 }
1033 }
1034
1035 FileType::Symlink => {
1036 let filename = entry.file_name();
1037 let result = statat(fd, filename, AtFlags::empty())
1038 .filter_errno(Errno::NOENT)
1039 .context("Testing for broken links")?;
1040 if result.is_none() {
1041 count += 1;
1042 if !dry_run {
1043 unlinkat(fd, filename, AtFlags::empty())
1044 .context("Unlinking broken links")?;
1045 }
1046 }
1047 }
1048
1049 _ => {
1050 bail!("Unexpected file type encountered");
1051 }
1052 }
1053 }
1054 Ok(count)
1055 }
1056
1057 fn cleanup_gc_category(&self, category: &'static str, dry_run: bool) -> Result<u64> {
1059 let Some(category_fd) = self
1060 .openat(category, OFlags::RDONLY | OFlags::DIRECTORY)
1061 .filter_errno(Errno::NOENT)
1062 .context(format!("Opening {category} dir in repository"))?
1063 else {
1064 return Ok(0);
1065 };
1066 let mut count = Self::cleanup_broken_links(&category_fd, false, dry_run)
1068 .context(format!("Cleaning up broken links in {category}/"))?;
1069 let ref_fd = openat(
1070 &category_fd,
1071 "refs",
1072 OFlags::RDONLY | OFlags::DIRECTORY | OFlags::CLOEXEC,
1073 Mode::empty(),
1074 )
1075 .filter_errno(Errno::NOENT)
1076 .context(format!("Opening {category}/refs to clean up broken links"))?;
1077 if let Some(ref dirfd) = ref_fd {
1078 count += Self::cleanup_broken_links(dirfd, true, dry_run).context(format!(
1079 "Cleaning up broken links recursively in {category}/refs"
1080 ))?;
1081 }
1082 Ok(count)
1083 }
1084
1085 fn walk_streams(
1087 &self,
1088 stream_name_map: &HashMap<ObjectID, String>,
1089 stream_name: &str,
1090 walked_streams: &mut HashSet<String>,
1091 objects: &mut HashSet<ObjectID>,
1092 ) -> Result<()> {
1093 if walked_streams.contains(stream_name) {
1094 return Ok(());
1095 }
1096 walked_streams.insert(stream_name.to_owned());
1097
1098 let mut split_stream = self.open_stream(stream_name, None, None)?;
1099 split_stream.get_object_refs(|id| {
1101 debug!(" with {id:?}");
1102 objects.insert(id.clone());
1103 })?;
1104 let streams_to_walk: Vec<_> = split_stream.iter_named_refs().collect();
1106 for (stream_name_in_table, stream_object_id) in streams_to_walk {
1110 debug!(
1111 " named reference stream {stream_name_in_table} lives, with {stream_object_id:?}"
1112 );
1113 objects.insert(stream_object_id.clone());
1114 if let Some(stream_name_in_repo) = stream_name_map.get(stream_object_id) {
1115 self.walk_streams(
1116 stream_name_map,
1117 stream_name_in_repo,
1118 walked_streams,
1119 objects,
1120 )?;
1121 } else {
1122 trace!("broken repo: named reference stream {stream_name_in_table} not found as stream in repo");
1124 }
1125 }
1126 Ok(())
1127 }
1128
1129 pub fn objects_for_image(&self, name: &str) -> Result<HashSet<ObjectID>> {
1131 let (image, _) = self.open_image(name)?;
1132 let mut data = vec![];
1133 std::fs::File::from(image).read_to_end(&mut data)?;
1134 Ok(crate::erofs::reader::collect_objects(&data)?)
1135 }
1136
1137 pub fn sync(&self) -> Result<()> {
1142 syncfs(&self.repository)?;
1143 Ok(())
1144 }
1145
1146 pub async fn sync_async(self: &Arc<Self>) -> Result<()> {
1151 let self_ = Arc::clone(self);
1152 tokio::task::spawn_blocking(move || self_.sync()).await?
1153 }
1154
1155 pub fn gc(&self, additional_roots: &[&str]) -> Result<GcResult> {
1165 flock(&self.repository, FlockOperation::LockExclusive)?;
1166 self.gc_impl(additional_roots, false)
1167 }
1168
1169 pub fn gc_dry_run(&self, additional_roots: &[&str]) -> Result<GcResult> {
1179 flock(&self.repository, FlockOperation::LockShared)?;
1181 self.gc_impl(additional_roots, true)
1182 }
1183
1184 fn gc_impl(&self, additional_roots: &[&str], dry_run: bool) -> Result<GcResult> {
1186 let mut result = GcResult::default();
1187 let mut live_objects = HashSet::new();
1188
1189 let extra_roots: HashSet<_> = additional_roots.iter().map(|s| s.to_string()).collect();
1191
1192 let all_images = self.gc_category("images", GCCategoryWalkMode::AllEntries)?;
1194 let root_images: Vec<_> = self
1195 .gc_category("images", GCCategoryWalkMode::RefsOnly)?
1196 .into_iter()
1197 .chain(
1198 all_images
1199 .into_iter()
1200 .filter(|(_, name)| extra_roots.contains(name)),
1201 )
1202 .collect();
1203
1204 for ref image in root_images {
1205 debug!("{image:?} lives as an image");
1206 live_objects.insert(image.0.clone());
1207 self.objects_for_image(&image.1)?.iter().for_each(|id| {
1208 debug!(" with {id:?}");
1209 live_objects.insert(id.clone());
1210 });
1211 }
1212
1213 let all_streams = self.gc_category("streams", GCCategoryWalkMode::AllEntries)?;
1215 let stream_name_map: HashMap<_, _> = all_streams.iter().cloned().collect();
1216 let root_streams: Vec<_> = self
1217 .gc_category("streams", GCCategoryWalkMode::RefsOnly)?
1218 .into_iter()
1219 .chain(
1220 all_streams
1221 .into_iter()
1222 .filter(|(_, name)| extra_roots.contains(name)),
1223 )
1224 .collect();
1225
1226 let mut walked_streams = HashSet::new();
1227 for stream in root_streams {
1228 debug!("{stream:?} lives as a stream");
1229 live_objects.insert(stream.0.clone());
1230 self.walk_streams(
1231 &stream_name_map,
1232 &stream.1,
1233 &mut walked_streams,
1234 &mut live_objects,
1235 )?;
1236 }
1237
1238 for first_byte in 0x0..=0xff {
1240 let dirfd = match self.openat(
1241 &format!("objects/{first_byte:02x}"),
1242 OFlags::RDONLY | OFlags::DIRECTORY,
1243 ) {
1244 Ok(fd) => fd,
1245 Err(Errno::NOENT) => continue,
1246 Err(e) => Err(e)?,
1247 };
1248 for item in Dir::read_from(&dirfd)? {
1249 let entry = item?;
1250 let filename = entry.file_name();
1251 if filename != c"." && filename != c".." {
1252 let id =
1253 ObjectID::from_object_dir_and_basename(first_byte, filename.to_bytes())?;
1254 if !live_objects.contains(&id) {
1255 if let Ok(stat) = statat(&dirfd, filename, AtFlags::empty()) {
1257 result.objects_bytes += stat.st_size as u64;
1258 }
1259 result.objects_removed += 1;
1260
1261 if !dry_run {
1262 debug!("removing: objects/{first_byte:02x}/{filename:?}");
1263 unlinkat(&dirfd, filename, AtFlags::empty())?;
1264 }
1265 } else {
1266 trace!("objects/{first_byte:02x}/{filename:?} lives");
1267 }
1268 }
1269 }
1270 }
1271
1272 result.images_pruned = self.cleanup_gc_category("images", dry_run)?;
1274 result.streams_pruned = self.cleanup_gc_category("streams", dry_run)?;
1275
1276 if !dry_run {
1278 flock(&self.repository, FlockOperation::LockShared)?;
1279 }
1280 Ok(result)
1281 }
1282
1283 }
1287
1288#[cfg(test)]
1289mod tests {
1290 use std::vec;
1291
1292 use super::*;
1293 use crate::fsverity::Sha512HashValue;
1294 use crate::test::tempdir;
1295 use rustix::fs::{statat, CWD};
1296 use tempfile::TempDir;
1297
1298 fn create_test_repo(path: &Path) -> Result<Arc<Repository<Sha512HashValue>>> {
1300 mkdirat(CWD, path, Mode::from_raw_mode(0o755))?;
1301 let mut repo = Repository::open_path(CWD, path)?;
1302 repo.set_insecure(true);
1303 Ok(Arc::new(repo))
1304 }
1305
1306 fn generate_test_data(size: u64, seed: u8) -> Vec<u8> {
1308 (0..size)
1309 .map(|i| ((i as u8).wrapping_add(seed)).wrapping_mul(17))
1310 .collect()
1311 }
1312
1313 fn read_links_in_repo<P>(tmp: &TempDir, repo_sub_path: P) -> Result<Option<PathBuf>>
1314 where
1315 P: AsRef<Path>,
1316 {
1317 let full_path = tmp.path().join("repo").join(repo_sub_path);
1318 match readlinkat(CWD, &full_path, Vec::new()) {
1319 Ok(result) => Ok(Some(PathBuf::from(result.to_str()?))),
1320 Err(rustix::io::Errno::NOENT) => Ok(None),
1321 Err(e) => Err(e.into()),
1322 }
1323 }
1324
1325 fn test_path_exists_in_repo<P>(tmp: &TempDir, repo_sub_path: P) -> Result<bool>
1327 where
1328 P: AsRef<Path>,
1329 {
1330 let full_path = tmp.path().join("repo").join(repo_sub_path);
1331 match statat(CWD, &full_path, AtFlags::SYMLINK_NOFOLLOW) {
1332 Ok(_) => Ok(true),
1333 Err(rustix::io::Errno::NOENT) => Ok(false),
1334 Err(e) => Err(e.into()),
1335 }
1336 }
1337
1338 fn test_object_exists(tmp: &TempDir, obj: &Sha512HashValue) -> Result<bool> {
1339 let digest = obj.to_hex();
1340 let (first_two, remainder) = digest.split_at(2);
1341 test_path_exists_in_repo(tmp, &format!("objects/{first_two}/{remainder}"))
1342 }
1343
1344 #[test]
1345 fn test_gc_removes_one_stream() -> Result<()> {
1346 let tmp = tempdir();
1347 let repo = create_test_repo(&tmp.path().join("repo"))?;
1348
1349 let obj1 = generate_test_data(32 * 1024, 0xAE);
1350 let obj2 = generate_test_data(64 * 1024, 0xEA);
1351
1352 let obj1_id = repo.ensure_object(&obj1)?;
1353 let obj2_id: Sha512HashValue = compute_verity(&obj2);
1354
1355 let mut writer = repo.create_stream(0);
1356 writer.write_external(&obj2)?;
1357 let _stream_id = repo.write_stream(writer, "test-stream", None)?;
1358
1359 repo.sync()?;
1360
1361 assert!(test_object_exists(&tmp, &obj1_id)?);
1362 assert!(test_object_exists(&tmp, &obj2_id)?);
1363 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
1364 let link_target =
1365 read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
1366 assert!(test_path_exists_in_repo(
1367 &tmp,
1368 PathBuf::from("streams").join(&link_target)
1369 )?);
1370
1371 let result = repo.gc(&[])?;
1373
1374 assert!(!test_object_exists(&tmp, &obj1_id)?);
1375 assert!(!test_object_exists(&tmp, &obj2_id)?);
1376 assert!(!test_path_exists_in_repo(&tmp, "streams/test-stream")?);
1377
1378 assert_eq!(result.objects_removed, 3);
1380 assert!(result.objects_bytes > 0);
1381 assert_eq!(result.streams_pruned, 1);
1382 assert_eq!(result.images_pruned, 0);
1383 Ok(())
1384 }
1385
1386 #[test]
1387 fn test_gc_keeps_one_stream() -> Result<()> {
1388 let tmp = tempdir();
1389 let repo = create_test_repo(&tmp.path().join("repo"))?;
1390
1391 let obj1 = generate_test_data(32 * 1024, 0xAE);
1392 let obj2 = generate_test_data(64 * 1024, 0xEA);
1393
1394 let obj1_id = repo.ensure_object(&obj1)?;
1395 let obj2_id: Sha512HashValue = compute_verity(&obj2);
1396
1397 let mut writer = repo.create_stream(0);
1398 writer.write_external(&obj2)?;
1399 let _stream_id = repo.write_stream(writer, "test-stream", None)?;
1400
1401 repo.sync()?;
1402
1403 assert!(test_object_exists(&tmp, &obj1_id)?);
1404 assert!(test_object_exists(&tmp, &obj2_id)?);
1405 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
1406 let link_target =
1407 read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
1408 assert!(test_path_exists_in_repo(
1409 &tmp,
1410 PathBuf::from("streams").join(&link_target)
1411 )?);
1412
1413 let result = repo.gc(&["test-stream"])?;
1415
1416 assert!(!test_object_exists(&tmp, &obj1_id)?);
1417 assert!(test_object_exists(&tmp, &obj2_id)?);
1418 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
1419 let link_target =
1420 read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
1421 assert!(test_path_exists_in_repo(
1422 &tmp,
1423 PathBuf::from("streams").join(&link_target)
1424 )?);
1425
1426 assert_eq!(result.objects_removed, 1);
1428 assert!(result.objects_bytes > 0);
1429 assert_eq!(result.streams_pruned, 0);
1430 assert_eq!(result.images_pruned, 0);
1431 Ok(())
1432 }
1433
1434 #[test]
1435 fn test_gc_keeps_one_stream_from_refs() -> Result<()> {
1436 let tmp = tempdir();
1437 let repo = create_test_repo(&tmp.path().join("repo"))?;
1438
1439 let obj1 = generate_test_data(32 * 1024, 0xAE);
1440 let obj2 = generate_test_data(64 * 1024, 0xEA);
1441
1442 let obj1_id = repo.ensure_object(&obj1)?;
1443 let obj2_id: Sha512HashValue = compute_verity(&obj2);
1444
1445 let mut writer = repo.create_stream(0);
1446 writer.write_external(&obj2)?;
1447 let _stream_id = repo.write_stream(writer, "test-stream", Some("ref-name"))?;
1448
1449 repo.sync()?;
1450
1451 assert!(test_object_exists(&tmp, &obj1_id)?);
1452 assert!(test_object_exists(&tmp, &obj2_id)?);
1453 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
1454 let link_target =
1455 read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
1456 assert!(test_path_exists_in_repo(
1457 &tmp,
1458 PathBuf::from("streams").join(&link_target)
1459 )?);
1460
1461 let result = repo.gc(&[])?;
1463
1464 assert!(!test_object_exists(&tmp, &obj1_id)?);
1465 assert!(test_object_exists(&tmp, &obj2_id)?);
1466 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
1467 let link_target =
1468 read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
1469 assert!(test_path_exists_in_repo(
1470 &tmp,
1471 PathBuf::from("streams").join(&link_target)
1472 )?);
1473
1474 assert_eq!(result.objects_removed, 1);
1476 assert!(result.objects_bytes > 0);
1477 assert_eq!(result.streams_pruned, 0);
1478 assert_eq!(result.images_pruned, 0);
1479 Ok(())
1480 }
1481
1482 #[test]
1483 fn test_gc_keeps_one_stream_from_two_overlapped() -> Result<()> {
1484 let tmp = tempdir();
1485 let repo = create_test_repo(&tmp.path().join("repo"))?;
1486
1487 let obj1 = generate_test_data(32 * 1024, 0xAE);
1488 let obj2 = generate_test_data(64 * 1024, 0xEA);
1489 let obj3 = generate_test_data(64 * 1024, 0xAA);
1490 let obj4 = generate_test_data(64 * 1024, 0xEE);
1491
1492 let obj1_id = repo.ensure_object(&obj1)?;
1493 let obj2_id: Sha512HashValue = compute_verity(&obj2);
1494 let obj3_id: Sha512HashValue = compute_verity(&obj3);
1495 let obj4_id: Sha512HashValue = compute_verity(&obj4);
1496
1497 let mut writer1 = repo.create_stream(0);
1498 writer1.write_external(&obj2)?;
1499 writer1.write_external(&obj3)?;
1500 let _stream1_id = repo.write_stream(writer1, "test-stream1", None)?;
1501
1502 let mut writer2 = repo.create_stream(0);
1503 writer2.write_external(&obj2)?;
1504 writer2.write_external(&obj4)?;
1505 let _stream2_id = repo.write_stream(writer2, "test-stream2", None)?;
1506
1507 repo.sync()?;
1508
1509 assert!(test_object_exists(&tmp, &obj1_id)?);
1510 assert!(test_object_exists(&tmp, &obj2_id)?);
1511 assert!(test_object_exists(&tmp, &obj3_id)?);
1512 assert!(test_object_exists(&tmp, &obj4_id)?);
1513 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
1514 let link_target =
1515 read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
1516 assert!(test_path_exists_in_repo(
1517 &tmp,
1518 PathBuf::from("streams").join(&link_target)
1519 )?);
1520 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
1521 let link_target =
1522 read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
1523 assert!(test_path_exists_in_repo(
1524 &tmp,
1525 PathBuf::from("streams").join(&link_target)
1526 )?);
1527
1528 let result = repo.gc(&["test-stream1"])?;
1530
1531 assert!(!test_object_exists(&tmp, &obj1_id)?);
1532 assert!(test_object_exists(&tmp, &obj2_id)?);
1533 assert!(test_object_exists(&tmp, &obj3_id)?);
1534 assert!(!test_object_exists(&tmp, &obj4_id)?);
1535 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
1536 let link_target =
1537 read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
1538 assert!(test_path_exists_in_repo(
1539 &tmp,
1540 PathBuf::from("streams").join(&link_target)
1541 )?);
1542 assert!(!test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
1543
1544 assert_eq!(result.objects_removed, 3);
1546 assert!(result.objects_bytes > 0);
1547 assert_eq!(result.streams_pruned, 1);
1548 assert_eq!(result.images_pruned, 0);
1549 Ok(())
1550 }
1551
1552 #[test]
1553 fn test_gc_keeps_named_references() -> Result<()> {
1554 let tmp = tempdir();
1555 let repo = create_test_repo(&tmp.path().join("repo"))?;
1556
1557 let obj1 = generate_test_data(32 * 1024, 0xAE);
1558 let obj2 = generate_test_data(64 * 1024, 0xEA);
1559
1560 let obj1_id = repo.ensure_object(&obj1)?;
1561 let obj2_id: Sha512HashValue = compute_verity(&obj2);
1562
1563 let mut writer1 = repo.create_stream(0);
1564 writer1.write_external(&obj2)?;
1565 let stream1_id = repo.write_stream(writer1, "test-stream1", None)?;
1566
1567 let mut writer2 = repo.create_stream(0);
1568 writer2.add_named_stream_ref("test-stream1", &stream1_id);
1569 let _stream2_id = repo.write_stream(writer2, "test-stream2", None)?;
1570
1571 repo.sync()?;
1572
1573 assert!(test_object_exists(&tmp, &obj1_id)?);
1574 assert!(test_object_exists(&tmp, &obj2_id)?);
1575 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
1576 let link_target =
1577 read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
1578 assert!(test_path_exists_in_repo(
1579 &tmp,
1580 PathBuf::from("streams").join(&link_target)
1581 )?);
1582 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
1583 let link_target =
1584 read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
1585 assert!(test_path_exists_in_repo(
1586 &tmp,
1587 PathBuf::from("streams").join(&link_target)
1588 )?);
1589
1590 let result = repo.gc(&["test-stream2"])?;
1592
1593 assert!(!test_object_exists(&tmp, &obj1_id)?);
1594 assert!(test_object_exists(&tmp, &obj2_id)?);
1595 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
1596 let link_target =
1597 read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
1598 assert!(test_path_exists_in_repo(
1599 &tmp,
1600 PathBuf::from("streams").join(&link_target)
1601 )?);
1602 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
1603 let link_target =
1604 read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
1605 assert!(test_path_exists_in_repo(
1606 &tmp,
1607 PathBuf::from("streams").join(&link_target)
1608 )?);
1609
1610 assert_eq!(result.objects_removed, 1);
1612 assert!(result.objects_bytes > 0);
1613 assert_eq!(result.streams_pruned, 0);
1614 assert_eq!(result.images_pruned, 0);
1615 Ok(())
1616 }
1617
1618 #[test]
1619 fn test_gc_keeps_named_references_with_different_table_name() -> Result<()> {
1620 let tmp = tempdir();
1621 let repo = create_test_repo(&tmp.path().join("repo"))?;
1622
1623 let obj1 = generate_test_data(32 * 1024, 0xAE);
1624 let obj2 = generate_test_data(64 * 1024, 0xEA);
1625
1626 let obj1_id = repo.ensure_object(&obj1)?;
1627 let obj2_id: Sha512HashValue = compute_verity(&obj2);
1628
1629 let mut writer1 = repo.create_stream(0);
1630 writer1.write_external(&obj2)?;
1631 let stream1_id = repo.write_stream(writer1, "test-stream1", None)?;
1632
1633 let mut writer2 = repo.create_stream(0);
1634 writer2.add_named_stream_ref("different-table-name-for-test-stream1", &stream1_id);
1635 let _stream2_id = repo.write_stream(writer2, "test-stream2", None)?;
1636
1637 repo.sync()?;
1638
1639 assert!(test_object_exists(&tmp, &obj1_id)?);
1640 assert!(test_object_exists(&tmp, &obj2_id)?);
1641 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
1642 let link_target =
1643 read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
1644 assert!(test_path_exists_in_repo(
1645 &tmp,
1646 PathBuf::from("streams").join(&link_target)
1647 )?);
1648 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
1649 let link_target =
1650 read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
1651 assert!(test_path_exists_in_repo(
1652 &tmp,
1653 PathBuf::from("streams").join(&link_target)
1654 )?);
1655
1656 let result = repo.gc(&["test-stream2"])?;
1658
1659 assert!(!test_object_exists(&tmp, &obj1_id)?);
1660 assert!(test_object_exists(&tmp, &obj2_id)?);
1661 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
1662 let link_target =
1663 read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
1664 assert!(test_path_exists_in_repo(
1665 &tmp,
1666 PathBuf::from("streams").join(&link_target)
1667 )?);
1668 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
1669 let link_target =
1670 read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
1671 assert!(test_path_exists_in_repo(
1672 &tmp,
1673 PathBuf::from("streams").join(&link_target)
1674 )?);
1675
1676 assert_eq!(result.objects_removed, 1);
1678 assert!(result.objects_bytes > 0);
1679 assert_eq!(result.streams_pruned, 0);
1680 assert_eq!(result.images_pruned, 0);
1681 Ok(())
1682 }
1683
1684 #[test]
1685 fn test_gc_keeps_one_named_reference_from_two_overlapped() -> Result<()> {
1686 let tmp = tempdir();
1687 let repo = create_test_repo(&tmp.path().join("repo"))?;
1688
1689 let obj1 = generate_test_data(32 * 1024, 0xAE);
1690 let obj2 = generate_test_data(64 * 1024, 0xEA);
1691 let obj3 = generate_test_data(64 * 1024, 0xAA);
1692 let obj4 = generate_test_data(64 * 1024, 0xEE);
1693
1694 let obj1_id = repo.ensure_object(&obj1)?;
1695 let obj2_id: Sha512HashValue = compute_verity(&obj2);
1696 let obj3_id: Sha512HashValue = compute_verity(&obj3);
1697 let obj4_id: Sha512HashValue = compute_verity(&obj4);
1698
1699 let mut writer = repo.create_stream(0);
1700 writer.write_external(&obj2)?;
1701 let stream1_id = repo.write_stream(writer, "test-stream1", None)?;
1702
1703 let mut writer = repo.create_stream(0);
1704 writer.write_external(&obj3)?;
1705 let stream2_id = repo.write_stream(writer, "test-stream2", None)?;
1706
1707 let mut writer = repo.create_stream(0);
1708 writer.write_external(&obj4)?;
1709 let stream3_id = repo.write_stream(writer, "test-stream3", None)?;
1710
1711 let mut writer = repo.create_stream(0);
1712 writer.add_named_stream_ref("test-stream1", &stream1_id);
1713 writer.add_named_stream_ref("test-stream2", &stream2_id);
1714 let _ref_stream1_id = repo.write_stream(writer, "ref-stream1", None)?;
1715
1716 let mut writer = repo.create_stream(0);
1717 writer.add_named_stream_ref("test-stream1", &stream1_id);
1718 writer.add_named_stream_ref("test-stream3", &stream3_id);
1719 let _ref_stream2_id = repo.write_stream(writer, "ref-stream2", None)?;
1720
1721 repo.sync()?;
1722
1723 assert!(test_object_exists(&tmp, &obj1_id)?);
1724 assert!(test_object_exists(&tmp, &obj2_id)?);
1725 assert!(test_object_exists(&tmp, &obj3_id)?);
1726 assert!(test_object_exists(&tmp, &obj4_id)?);
1727 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
1728 let link_target =
1729 read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
1730 assert!(test_path_exists_in_repo(
1731 &tmp,
1732 PathBuf::from("streams").join(&link_target)
1733 )?);
1734 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
1735 let link_target =
1736 read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
1737 assert!(test_path_exists_in_repo(
1738 &tmp,
1739 PathBuf::from("streams").join(&link_target)
1740 )?);
1741 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream3")?);
1742 let link_target =
1743 read_links_in_repo(&tmp, "streams/test-stream3")?.expect("link is not broken");
1744 assert!(test_path_exists_in_repo(
1745 &tmp,
1746 PathBuf::from("streams").join(&link_target)
1747 )?);
1748 assert!(test_path_exists_in_repo(&tmp, "streams/ref-stream1")?);
1749 let link_target =
1750 read_links_in_repo(&tmp, "streams/ref-stream1")?.expect("link is not broken");
1751 assert!(test_path_exists_in_repo(
1752 &tmp,
1753 PathBuf::from("streams").join(&link_target)
1754 )?);
1755 assert!(test_path_exists_in_repo(&tmp, "streams/ref-stream2")?);
1756 let link_target =
1757 read_links_in_repo(&tmp, "streams/ref-stream2")?.expect("link is not broken");
1758 assert!(test_path_exists_in_repo(
1759 &tmp,
1760 PathBuf::from("streams").join(&link_target)
1761 )?);
1762
1763 let result = repo.gc(&["ref-stream1"])?;
1765
1766 assert!(!test_object_exists(&tmp, &obj1_id)?);
1767 assert!(test_object_exists(&tmp, &obj2_id)?);
1768 assert!(test_object_exists(&tmp, &obj3_id)?);
1769 assert!(!test_object_exists(&tmp, &obj4_id)?);
1770 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
1771 let link_target =
1772 read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
1773 assert!(test_path_exists_in_repo(
1774 &tmp,
1775 PathBuf::from("streams").join(&link_target)
1776 )?);
1777 assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
1778 let link_target =
1779 read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
1780 assert!(test_path_exists_in_repo(
1781 &tmp,
1782 PathBuf::from("streams").join(&link_target)
1783 )?);
1784 assert!(!test_path_exists_in_repo(&tmp, "streams/test-stream3")?);
1785 assert!(test_path_exists_in_repo(&tmp, "streams/ref-stream1")?);
1786 let link_target =
1787 read_links_in_repo(&tmp, "streams/ref-stream1")?.expect("link is not broken");
1788 assert!(test_path_exists_in_repo(
1789 &tmp,
1790 PathBuf::from("streams").join(&link_target)
1791 )?);
1792 assert!(!test_path_exists_in_repo(&tmp, "streams/ref-stream2")?);
1793
1794 assert_eq!(result.objects_removed, 4);
1796 assert!(result.objects_bytes > 0);
1797 assert_eq!(result.streams_pruned, 2);
1798 assert_eq!(result.images_pruned, 0);
1799
1800 Ok(())
1801 }
1802
1803 use crate::tree::{FileSystem, Inode, Leaf, LeafContent, RegularFile, Stat};
1804
1805 fn test_root_stat() -> Stat {
1807 Stat {
1808 st_mode: 0o755,
1809 st_uid: 0,
1810 st_gid: 0,
1811 st_mtim_sec: 0,
1812 xattrs: Default::default(),
1813 }
1814 }
1815
1816 fn make_test_fs(obj: &Sha512HashValue, size: u64) -> FileSystem<Sha512HashValue> {
1818 let mut fs: FileSystem<Sha512HashValue> = FileSystem::new(test_root_stat());
1819 let inode = Inode::Leaf(std::rc::Rc::new(Leaf {
1820 stat: Stat {
1821 st_mode: 0o644,
1822 st_uid: 0,
1823 st_gid: 0,
1824 st_mtim_sec: 0,
1825 xattrs: Default::default(),
1826 },
1827 content: LeafContent::Regular(RegularFile::External(obj.clone(), size)),
1828 }));
1829 fs.root.insert(OsStr::new("data"), inode);
1830 fs
1831 }
1832
1833 #[test]
1834 fn test_gc_removes_one_image() -> Result<()> {
1835 let tmp = tempdir();
1836 let repo = create_test_repo(&tmp.path().join("repo"))?;
1837
1838 let obj1_size: u64 = 32 * 1024;
1839 let obj1 = generate_test_data(obj1_size, 0xAE);
1840 let obj2_size: u64 = 64 * 1024;
1841 let obj2 = generate_test_data(obj2_size, 0xEA);
1842
1843 let obj1_id = repo.ensure_object(&obj1)?;
1844 let obj2_id = repo.ensure_object(&obj2)?;
1845
1846 let fs = make_test_fs(&obj2_id, obj2_size);
1847 let image1 = fs.commit_image(&repo, None)?;
1848 let image1_path = format!("images/{}", image1.to_hex());
1849
1850 repo.sync()?;
1851
1852 assert!(test_object_exists(&tmp, &obj1_id)?);
1853 assert!(test_object_exists(&tmp, &obj2_id)?);
1854 assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
1855 let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
1856 assert!(test_path_exists_in_repo(
1857 &tmp,
1858 PathBuf::from("images").join(&link_target)
1859 )?);
1860
1861 let result = repo.gc(&[])?;
1863
1864 assert!(!test_object_exists(&tmp, &obj1_id)?);
1865 assert!(!test_object_exists(&tmp, &obj2_id)?);
1866 assert!(!test_path_exists_in_repo(&tmp, &image1_path)?);
1867
1868 assert_eq!(result.objects_removed, 3);
1870 assert!(result.objects_bytes > 0);
1871 assert_eq!(result.images_pruned, 1);
1872 assert_eq!(result.streams_pruned, 0);
1873 Ok(())
1874 }
1875
1876 #[test]
1877 fn test_gc_keeps_one_image() -> Result<()> {
1878 let tmp = tempdir();
1879 let repo = create_test_repo(&tmp.path().join("repo"))?;
1880
1881 let obj1_size: u64 = 32 * 1024;
1882 let obj1 = generate_test_data(obj1_size, 0xAE);
1883 let obj2_size: u64 = 64 * 1024;
1884 let obj2 = generate_test_data(obj2_size, 0xEA);
1885
1886 let obj1_id = repo.ensure_object(&obj1)?;
1887 let obj2_id = repo.ensure_object(&obj2)?;
1888
1889 let fs = make_test_fs(&obj2_id, obj2_size);
1890 let image1 = fs.commit_image(&repo, None)?;
1891 let image1_path = format!("images/{}", image1.to_hex());
1892
1893 repo.sync()?;
1894
1895 assert!(test_object_exists(&tmp, &obj1_id)?);
1896 assert!(test_object_exists(&tmp, &obj2_id)?);
1897 assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
1898 let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
1899 assert!(test_path_exists_in_repo(
1900 &tmp,
1901 PathBuf::from("images").join(&link_target)
1902 )?);
1903
1904 let image1_hex = image1.to_hex();
1906 let result = repo.gc(&[image1_hex.as_str()])?;
1907
1908 assert!(!test_object_exists(&tmp, &obj1_id)?);
1909 assert!(test_object_exists(&tmp, &obj2_id)?);
1910 assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
1911 let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
1912 assert!(test_path_exists_in_repo(
1913 &tmp,
1914 PathBuf::from("images").join(&link_target)
1915 )?);
1916
1917 assert_eq!(result.objects_removed, 1);
1919 assert!(result.objects_bytes > 0);
1920 assert_eq!(result.images_pruned, 0);
1921 assert_eq!(result.streams_pruned, 0);
1922 Ok(())
1923 }
1924
1925 #[test]
1926 fn test_gc_keeps_one_image_from_refs() -> Result<()> {
1927 let tmp = tempdir();
1928 let repo = create_test_repo(&tmp.path().join("repo"))?;
1929
1930 let obj1_size: u64 = 32 * 1024;
1931 let obj1 = generate_test_data(obj1_size, 0xAE);
1932 let obj2_size: u64 = 64 * 1024;
1933 let obj2 = generate_test_data(obj2_size, 0xEA);
1934
1935 let obj1_id = repo.ensure_object(&obj1)?;
1936 let obj2_id = repo.ensure_object(&obj2)?;
1937
1938 let fs = make_test_fs(&obj2_id, obj2_size);
1939 let image1 = fs.commit_image(&repo, Some("ref-name"))?;
1940 let image1_path = format!("images/{}", image1.to_hex());
1941
1942 repo.sync()?;
1943
1944 assert!(test_object_exists(&tmp, &obj1_id)?);
1945 assert!(test_object_exists(&tmp, &obj2_id)?);
1946 assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
1947 let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
1948 assert!(test_path_exists_in_repo(
1949 &tmp,
1950 PathBuf::from("images").join(&link_target)
1951 )?);
1952
1953 let result = repo.gc(&[])?;
1955
1956 assert!(!test_object_exists(&tmp, &obj1_id)?);
1957 assert!(test_object_exists(&tmp, &obj2_id)?);
1958 assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
1959 let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
1960 assert!(test_path_exists_in_repo(
1961 &tmp,
1962 PathBuf::from("images").join(&link_target)
1963 )?);
1964
1965 assert_eq!(result.objects_removed, 1);
1967 assert!(result.objects_bytes > 0);
1968 assert_eq!(result.images_pruned, 0);
1969 assert_eq!(result.streams_pruned, 0);
1970 Ok(())
1971 }
1972
1973 fn make_test_fs_with_two_files(
1974 obj1: &Sha512HashValue,
1975 size1: u64,
1976 obj2: &Sha512HashValue,
1977 size2: u64,
1978 ) -> FileSystem<Sha512HashValue> {
1979 let mut fs = make_test_fs(obj1, size1);
1980 let inode = Inode::Leaf(std::rc::Rc::new(Leaf {
1981 stat: Stat {
1982 st_mode: 0o644,
1983 st_uid: 0,
1984 st_gid: 0,
1985 st_mtim_sec: 0,
1986 xattrs: Default::default(),
1987 },
1988 content: LeafContent::Regular(RegularFile::External(obj2.clone(), size2)),
1989 }));
1990 fs.root.insert(OsStr::new("extra_data"), inode);
1991 fs
1992 }
1993
1994 #[test]
1995 fn test_gc_keeps_one_image_from_two_overlapped() -> Result<()> {
1996 let tmp = tempdir();
1997 let repo = create_test_repo(&tmp.path().join("repo"))?;
1998
1999 let obj1_size: u64 = 32 * 1024;
2000 let obj1 = generate_test_data(obj1_size, 0xAE);
2001 let obj2_size: u64 = 64 * 1024;
2002 let obj2 = generate_test_data(obj2_size, 0xEA);
2003 let obj3_size: u64 = 64 * 1024;
2004 let obj3 = generate_test_data(obj2_size, 0xAA);
2005 let obj4_size: u64 = 64 * 1024;
2006 let obj4 = generate_test_data(obj2_size, 0xEE);
2007
2008 let obj1_id = repo.ensure_object(&obj1)?;
2009 let obj2_id = repo.ensure_object(&obj2)?;
2010 let obj3_id = repo.ensure_object(&obj3)?;
2011 let obj4_id = repo.ensure_object(&obj4)?;
2012
2013 let fs = make_test_fs_with_two_files(&obj2_id, obj2_size, &obj3_id, obj3_size);
2014 let image1 = fs.commit_image(&repo, None)?;
2015 let image1_path = format!("images/{}", image1.to_hex());
2016
2017 let fs = make_test_fs_with_two_files(&obj2_id, obj2_size, &obj4_id, obj4_size);
2018 let image2 = fs.commit_image(&repo, None)?;
2019 let image2_path = format!("images/{}", image2.to_hex());
2020
2021 repo.sync()?;
2022
2023 assert!(test_object_exists(&tmp, &obj1_id)?);
2024 assert!(test_object_exists(&tmp, &obj2_id)?);
2025 assert!(test_object_exists(&tmp, &obj3_id)?);
2026 assert!(test_object_exists(&tmp, &obj4_id)?);
2027 assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
2028 let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
2029 assert!(test_path_exists_in_repo(
2030 &tmp,
2031 PathBuf::from("images").join(&link_target)
2032 )?);
2033 assert!(test_path_exists_in_repo(&tmp, &image2_path)?);
2034 let link_target = read_links_in_repo(&tmp, &image2_path)?.expect("link is not broken");
2035 assert!(test_path_exists_in_repo(
2036 &tmp,
2037 PathBuf::from("images").join(&link_target)
2038 )?);
2039
2040 let image1_hex = image1.to_hex();
2042 let result = repo.gc(&[image1_hex.as_str()])?;
2043
2044 assert!(!test_object_exists(&tmp, &obj1_id)?);
2045 assert!(test_object_exists(&tmp, &obj2_id)?);
2046 assert!(test_object_exists(&tmp, &obj3_id)?);
2047 assert!(!test_object_exists(&tmp, &obj4_id)?);
2048 assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
2049 let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
2050 assert!(test_path_exists_in_repo(
2051 &tmp,
2052 PathBuf::from("images").join(&link_target)
2053 )?);
2054 assert!(!test_path_exists_in_repo(&tmp, &image2_path)?);
2055
2056 assert_eq!(result.objects_removed, 3);
2058 assert!(result.objects_bytes > 0);
2059 assert_eq!(result.images_pruned, 1);
2060 assert_eq!(result.streams_pruned, 0);
2061 Ok(())
2062 }
2063}