composefs_oci/
skopeo.rs

1//! Container image pulling and registry interaction via skopeo/containers-image-proxy.
2//!
3//! This module provides functionality to pull container images from various registries and import them
4//! into composefs repositories. It uses the containers-image-proxy library to interface with skopeo
5//! for image operations, handling authentication, transport protocols, and image manifest processing.
6//!
7//! The main entry point is the `pull()` function which downloads an image, processes its layers
8//! asynchronously with parallelism control, and stores them in the composefs repository with proper
9//! fs-verity integration. It supports various image formats and compression types.
10
11use std::{cmp::Reverse, process::Command, thread::available_parallelism};
12
13use std::{iter::zip, sync::Arc};
14
15use anyhow::{bail, Context, Result};
16use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
17use containers_image_proxy::{
18    ConvertedLayerInfo, ImageProxy, ImageProxyConfig, OpenedImage, Transport,
19};
20use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
21use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType};
22use rustix::process::geteuid;
23use tokio::{
24    io::{AsyncReadExt, BufReader},
25    sync::Semaphore,
26    task::JoinSet,
27};
28
29use composefs::{fsverity::FsVerityHashValue, repository::Repository};
30
31use crate::{config_identifier, layer_identifier, tar::split_async, ContentAndVerity};
32
33// Content type identifiers stored as ASCII in the splitstream file
34pub(crate) const TAR_LAYER_CONTENT_TYPE: u64 = u64::from_le_bytes(*b"ocilayer");
35pub(crate) const OCI_CONFIG_CONTENT_TYPE: u64 = u64::from_le_bytes(*b"ociconfg");
36
37struct ImageOp<ObjectID: FsVerityHashValue> {
38    repo: Arc<Repository<ObjectID>>,
39    proxy: ImageProxy,
40    img: OpenedImage,
41    progress: MultiProgress,
42    transport: Transport,
43}
44
45impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
46    async fn new(
47        repo: &Arc<Repository<ObjectID>>,
48        imgref: &str,
49        img_proxy_config: Option<ImageProxyConfig>,
50    ) -> Result<Self> {
51        // Detect transport from image reference
52        let transport = Transport::try_from(imgref).context("Failed to get image transport")?;
53
54        // See https://github.com/containers/skopeo/issues/2563
55        let skopeo_cmd = if transport == Transport::ContainerStorage && !geteuid().is_root() {
56            let mut cmd = Command::new("podman");
57            cmd.args(["unshare", "skopeo"]);
58            Some(cmd)
59        } else {
60            None
61        };
62
63        // See https://github.com/containers/skopeo/issues/2750
64        let imgref = if let Some(hash) = imgref.strip_prefix("containers-storage:sha256:") {
65            &format!("containers-storage:{hash}") // yay temporary lifetime extension!
66        } else {
67            imgref
68        };
69
70        let config = match img_proxy_config {
71            Some(mut conf) => {
72                if conf.skopeo_cmd.is_none() {
73                    conf.skopeo_cmd = skopeo_cmd;
74                }
75
76                conf
77            }
78
79            None => {
80                ImageProxyConfig {
81                    skopeo_cmd,
82                    // auth_anonymous: true, debug: true, insecure_skip_tls_verification: None,
83                    ..ImageProxyConfig::default()
84                }
85            }
86        };
87
88        let proxy = containers_image_proxy::ImageProxy::new_with_config(config).await?;
89        let img = proxy.open_image(imgref).await.context("Opening image")?;
90        let progress = MultiProgress::new();
91        Ok(ImageOp {
92            repo: Arc::clone(repo),
93            proxy,
94            img,
95            progress,
96            transport,
97        })
98    }
99
100    pub async fn ensure_layer(
101        &self,
102        diff_id: &str,
103        descriptor: &Descriptor,
104        uncompressed_layer_info: Option<Arc<Vec<ConvertedLayerInfo>>>,
105        layer_idx: usize,
106    ) -> Result<ObjectID> {
107        // We need to use the per_manifest descriptor to download the compressed layer but it gets
108        // stored in the repository via the per_config descriptor.  Our return value is the
109        // fsverity digest for the corresponding splitstream.
110        let content_id = layer_identifier(diff_id);
111
112        if let Some(layer_id) = self.repo.has_stream(&content_id)? {
113            self.progress
114                .println(format!("Already have layer {diff_id}"))?;
115            Ok(layer_id)
116        } else {
117            // Otherwise, we need to fetch it...
118            let descriptor = match self.transport {
119                Transport::ContainerStorage => {
120                    let layers = uncompressed_layer_info
121                        .as_ref()
122                        .ok_or_else(|| anyhow::anyhow!("Failed to get uncompressed layer info"))?;
123
124                    let layer = layers.get(layer_idx).ok_or_else(|| {
125                        anyhow::anyhow!(
126                            "Failed to get uncompressed layer info for layer index {layer_idx}. Total layers: {}",
127                            layers.len()
128                        )
129                    })?;
130
131                    &Descriptor::new(layer.media_type.clone(), layer.size, layer.digest.clone())
132                }
133
134                _ => descriptor,
135            };
136
137            let (blob_reader, driver) = self
138                .proxy
139                .get_blob(&self.img, descriptor.digest(), descriptor.size())
140                .await?;
141
142            // See https://github.com/containers/containers-image-proxy-rs/issues/71
143            let blob_reader = blob_reader.take(descriptor.size());
144
145            let bar = self.progress.add(ProgressBar::new(descriptor.size()));
146            bar.set_style(ProgressStyle::with_template("[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {msg}")
147                .unwrap()
148                .progress_chars("##-"));
149            let progress = bar.wrap_async_read(blob_reader);
150            self.progress.println(format!("Fetching layer {diff_id}"))?;
151
152            let reader: Box<dyn tokio::io::AsyncBufRead + Unpin + Send> =
153                match descriptor.media_type() {
154                    MediaType::ImageLayer => Box::new(BufReader::new(progress)),
155                    MediaType::ImageLayerGzip => {
156                        Box::new(BufReader::new(GzipDecoder::new(BufReader::new(progress))))
157                    }
158                    MediaType::ImageLayerZstd => {
159                        Box::new(BufReader::new(ZstdDecoder::new(BufReader::new(progress))))
160                    }
161                    other => bail!("Unsupported layer media type {other:?}"),
162                };
163
164            let object_id = split_async(reader, self.repo.clone(), TAR_LAYER_CONTENT_TYPE).await?;
165
166            // skopeo is doing data checksums for us to make sure the content we received is equal
167            // to the claimed diff_id. We trust it, but we need to check it by awaiting the driver.
168            driver.await?;
169
170            // Sync and register the stream with its content identifier
171            self.repo
172                .register_stream(&object_id, &content_id, None)
173                .await?;
174
175            Ok(object_id)
176        }
177    }
178
179    pub async fn ensure_config(
180        self: &Arc<Self>,
181        manifest_layers: &[Descriptor],
182        descriptor: &Descriptor,
183    ) -> Result<ContentAndVerity<ObjectID>> {
184        let config_digest: &str = descriptor.digest().as_ref();
185        let content_id = config_identifier(config_digest);
186
187        if let Some(config_id) = self.repo.has_stream(&content_id)? {
188            // We already got this config?  Nice.
189            self.progress
190                .println(format!("Already have container config {config_digest}"))?;
191            Ok((config_digest.to_string(), config_id))
192        } else {
193            // We need to add the config to the repo.  We need to parse the config and make sure we
194            // have all of the layers first.
195            //
196            self.progress
197                .println(format!("Fetching config {config_digest}"))?;
198
199            let (mut config, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
200            let config = async move {
201                let mut s = Vec::new();
202                config.read_to_end(&mut s).await?;
203                anyhow::Ok(s)
204            };
205            let (config, driver) = tokio::join!(config, driver);
206            let _: () = driver?;
207            let raw_config = config?;
208            let config = ImageConfiguration::from_reader(&raw_config[..])?;
209
210            // We want to sort the layers based on size so we can get started on the big layers
211            // first.  The last thing we want is to start on the biggest layer right at the end.
212            let mut layers: Vec<_> = zip(manifest_layers, config.rootfs().diff_ids()).collect();
213            layers.sort_by_key(|(mld, ..)| Reverse(mld.size()));
214
215            // Bound the number of tasks to the available parallelism.
216            let threads = available_parallelism()?;
217            let sem = Arc::new(Semaphore::new(threads.into()));
218            let mut layer_tasks = JoinSet::new();
219
220            let uncompressed_layer_info = match self.transport {
221                Transport::ContainerStorage => {
222                    self.proxy.get_layer_info(&self.img).await?.map(Arc::new)
223                }
224                _ => None,
225            };
226
227            for (idx, (mld, diff_id)) in layers.into_iter().enumerate() {
228                let diff_id_ = diff_id.clone();
229                let self_ = Arc::clone(self);
230                let permit = Arc::clone(&sem).acquire_owned().await?;
231                let descriptor = mld.clone();
232
233                let layer_idx = manifest_layers
234                    .iter()
235                    .position(|d| *d == descriptor)
236                    .ok_or_else(|| anyhow::anyhow!("Layer descriptor not found in manifest"))?;
237
238                let uncompressed_layer_info = uncompressed_layer_info.clone();
239
240                layer_tasks.spawn(async move {
241                    let _permit = permit;
242                    let verity = self_
243                        .ensure_layer(&diff_id_, &descriptor, uncompressed_layer_info, layer_idx)
244                        .await?;
245                    anyhow::Ok((idx, diff_id_, verity))
246                });
247            }
248
249            // Collect results and sort by original index for deterministic ordering
250            let mut results: Vec<_> = layer_tasks
251                .join_all()
252                .await
253                .into_iter()
254                .collect::<Result<_, _>>()?;
255            results.sort_by_key(|(idx, _, _)| *idx);
256
257            let mut splitstream = self.repo.create_stream(OCI_CONFIG_CONTENT_TYPE);
258            for (_, diff_id, verity) in results {
259                splitstream.add_named_stream_ref(&diff_id, &verity);
260            }
261
262            // NB: We trust that skopeo has verified that raw_config has the correct digest
263            splitstream.write_inline(&raw_config);
264
265            let config_id = self.repo.write_stream(splitstream, &content_id, None)?;
266            Ok((config_digest.to_string(), config_id))
267        }
268    }
269
270    pub async fn pull(self: &Arc<Self>) -> Result<ContentAndVerity<ObjectID>> {
271        let (_manifest_digest, raw_manifest) = self
272            .proxy
273            .fetch_manifest_raw_oci(&self.img)
274            .await
275            .context("Fetching manifest")?;
276
277        // We need to add the manifest to the repo.  We need to parse the manifest and make
278        // sure we have the config first (which will also pull in the layers).
279        let manifest = ImageManifest::from_reader(raw_manifest.as_slice())?;
280        let config_descriptor = manifest.config();
281        let layers = manifest.layers();
282        self.ensure_config(layers, config_descriptor)
283            .await
284            .with_context(|| format!("Failed to pull config {config_descriptor:?}"))
285    }
286}
287
288/// Pull the target image, and add the provided tag. If this is a mountable
289/// image (i.e. not an artifact), it is *not* unpacked by default.
290pub async fn pull<ObjectID: FsVerityHashValue>(
291    repo: &Arc<Repository<ObjectID>>,
292    imgref: &str,
293    reference: Option<&str>,
294    img_proxy_config: Option<ImageProxyConfig>,
295) -> Result<(String, ObjectID)> {
296    let op = Arc::new(ImageOp::new(repo, imgref, img_proxy_config).await?);
297    let (sha256, id) = op
298        .pull()
299        .await
300        .with_context(|| format!("Unable to pull container image {imgref}"))?;
301
302    if let Some(name) = reference {
303        repo.name_stream(&sha256, name)?;
304    }
305    Ok((sha256, id))
306}