1use std::{cmp::Reverse, process::Command, thread::available_parallelism};
12
13use std::{iter::zip, sync::Arc};
14
15use anyhow::{bail, Context, Result};
16use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
17use containers_image_proxy::{
18 ConvertedLayerInfo, ImageProxy, ImageProxyConfig, OpenedImage, Transport,
19};
20use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
21use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType};
22use rustix::process::geteuid;
23use tokio::{
24 io::{AsyncReadExt, BufReader},
25 sync::Semaphore,
26 task::JoinSet,
27};
28
29use composefs::{fsverity::FsVerityHashValue, repository::Repository};
30
31use crate::{config_identifier, layer_identifier, tar::split_async, ContentAndVerity};
32
33pub(crate) const TAR_LAYER_CONTENT_TYPE: u64 = u64::from_le_bytes(*b"ocilayer");
35pub(crate) const OCI_CONFIG_CONTENT_TYPE: u64 = u64::from_le_bytes(*b"ociconfg");
36
37struct ImageOp<ObjectID: FsVerityHashValue> {
38 repo: Arc<Repository<ObjectID>>,
39 proxy: ImageProxy,
40 img: OpenedImage,
41 progress: MultiProgress,
42 transport: Transport,
43}
44
45impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
46 async fn new(
47 repo: &Arc<Repository<ObjectID>>,
48 imgref: &str,
49 img_proxy_config: Option<ImageProxyConfig>,
50 ) -> Result<Self> {
51 let transport = Transport::try_from(imgref).context("Failed to get image transport")?;
53
54 let skopeo_cmd = if transport == Transport::ContainerStorage && !geteuid().is_root() {
56 let mut cmd = Command::new("podman");
57 cmd.args(["unshare", "skopeo"]);
58 Some(cmd)
59 } else {
60 None
61 };
62
63 let imgref = if let Some(hash) = imgref.strip_prefix("containers-storage:sha256:") {
65 &format!("containers-storage:{hash}") } else {
67 imgref
68 };
69
70 let config = match img_proxy_config {
71 Some(mut conf) => {
72 if conf.skopeo_cmd.is_none() {
73 conf.skopeo_cmd = skopeo_cmd;
74 }
75
76 conf
77 }
78
79 None => {
80 ImageProxyConfig {
81 skopeo_cmd,
82 ..ImageProxyConfig::default()
84 }
85 }
86 };
87
88 let proxy = containers_image_proxy::ImageProxy::new_with_config(config).await?;
89 let img = proxy.open_image(imgref).await.context("Opening image")?;
90 let progress = MultiProgress::new();
91 Ok(ImageOp {
92 repo: Arc::clone(repo),
93 proxy,
94 img,
95 progress,
96 transport,
97 })
98 }
99
100 pub async fn ensure_layer(
101 &self,
102 diff_id: &str,
103 descriptor: &Descriptor,
104 uncompressed_layer_info: Option<Arc<Vec<ConvertedLayerInfo>>>,
105 layer_idx: usize,
106 ) -> Result<ObjectID> {
107 let content_id = layer_identifier(diff_id);
111
112 if let Some(layer_id) = self.repo.has_stream(&content_id)? {
113 self.progress
114 .println(format!("Already have layer {diff_id}"))?;
115 Ok(layer_id)
116 } else {
117 let descriptor = match self.transport {
119 Transport::ContainerStorage => {
120 let layers = uncompressed_layer_info
121 .as_ref()
122 .ok_or_else(|| anyhow::anyhow!("Failed to get uncompressed layer info"))?;
123
124 let layer = layers.get(layer_idx).ok_or_else(|| {
125 anyhow::anyhow!(
126 "Failed to get uncompressed layer info for layer index {layer_idx}. Total layers: {}",
127 layers.len()
128 )
129 })?;
130
131 &Descriptor::new(layer.media_type.clone(), layer.size, layer.digest.clone())
132 }
133
134 _ => descriptor,
135 };
136
137 let (blob_reader, driver) = self
138 .proxy
139 .get_blob(&self.img, descriptor.digest(), descriptor.size())
140 .await?;
141
142 let blob_reader = blob_reader.take(descriptor.size());
144
145 let bar = self.progress.add(ProgressBar::new(descriptor.size()));
146 bar.set_style(ProgressStyle::with_template("[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {msg}")
147 .unwrap()
148 .progress_chars("##-"));
149 let progress = bar.wrap_async_read(blob_reader);
150 self.progress.println(format!("Fetching layer {diff_id}"))?;
151
152 let reader: Box<dyn tokio::io::AsyncBufRead + Unpin + Send> =
153 match descriptor.media_type() {
154 MediaType::ImageLayer => Box::new(BufReader::new(progress)),
155 MediaType::ImageLayerGzip => {
156 Box::new(BufReader::new(GzipDecoder::new(BufReader::new(progress))))
157 }
158 MediaType::ImageLayerZstd => {
159 Box::new(BufReader::new(ZstdDecoder::new(BufReader::new(progress))))
160 }
161 other => bail!("Unsupported layer media type {other:?}"),
162 };
163
164 let object_id = split_async(reader, self.repo.clone(), TAR_LAYER_CONTENT_TYPE).await?;
165
166 driver.await?;
169
170 self.repo
172 .register_stream(&object_id, &content_id, None)
173 .await?;
174
175 Ok(object_id)
176 }
177 }
178
179 pub async fn ensure_config(
180 self: &Arc<Self>,
181 manifest_layers: &[Descriptor],
182 descriptor: &Descriptor,
183 ) -> Result<ContentAndVerity<ObjectID>> {
184 let config_digest: &str = descriptor.digest().as_ref();
185 let content_id = config_identifier(config_digest);
186
187 if let Some(config_id) = self.repo.has_stream(&content_id)? {
188 self.progress
190 .println(format!("Already have container config {config_digest}"))?;
191 Ok((config_digest.to_string(), config_id))
192 } else {
193 self.progress
197 .println(format!("Fetching config {config_digest}"))?;
198
199 let (mut config, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
200 let config = async move {
201 let mut s = Vec::new();
202 config.read_to_end(&mut s).await?;
203 anyhow::Ok(s)
204 };
205 let (config, driver) = tokio::join!(config, driver);
206 let _: () = driver?;
207 let raw_config = config?;
208 let config = ImageConfiguration::from_reader(&raw_config[..])?;
209
210 let mut layers: Vec<_> = zip(manifest_layers, config.rootfs().diff_ids()).collect();
213 layers.sort_by_key(|(mld, ..)| Reverse(mld.size()));
214
215 let threads = available_parallelism()?;
217 let sem = Arc::new(Semaphore::new(threads.into()));
218 let mut layer_tasks = JoinSet::new();
219
220 let uncompressed_layer_info = match self.transport {
221 Transport::ContainerStorage => {
222 self.proxy.get_layer_info(&self.img).await?.map(Arc::new)
223 }
224 _ => None,
225 };
226
227 for (idx, (mld, diff_id)) in layers.into_iter().enumerate() {
228 let diff_id_ = diff_id.clone();
229 let self_ = Arc::clone(self);
230 let permit = Arc::clone(&sem).acquire_owned().await?;
231 let descriptor = mld.clone();
232
233 let layer_idx = manifest_layers
234 .iter()
235 .position(|d| *d == descriptor)
236 .ok_or_else(|| anyhow::anyhow!("Layer descriptor not found in manifest"))?;
237
238 let uncompressed_layer_info = uncompressed_layer_info.clone();
239
240 layer_tasks.spawn(async move {
241 let _permit = permit;
242 let verity = self_
243 .ensure_layer(&diff_id_, &descriptor, uncompressed_layer_info, layer_idx)
244 .await?;
245 anyhow::Ok((idx, diff_id_, verity))
246 });
247 }
248
249 let mut results: Vec<_> = layer_tasks
251 .join_all()
252 .await
253 .into_iter()
254 .collect::<Result<_, _>>()?;
255 results.sort_by_key(|(idx, _, _)| *idx);
256
257 let mut splitstream = self.repo.create_stream(OCI_CONFIG_CONTENT_TYPE);
258 for (_, diff_id, verity) in results {
259 splitstream.add_named_stream_ref(&diff_id, &verity);
260 }
261
262 splitstream.write_inline(&raw_config);
264
265 let config_id = self.repo.write_stream(splitstream, &content_id, None)?;
266 Ok((config_digest.to_string(), config_id))
267 }
268 }
269
270 pub async fn pull(self: &Arc<Self>) -> Result<ContentAndVerity<ObjectID>> {
271 let (_manifest_digest, raw_manifest) = self
272 .proxy
273 .fetch_manifest_raw_oci(&self.img)
274 .await
275 .context("Fetching manifest")?;
276
277 let manifest = ImageManifest::from_reader(raw_manifest.as_slice())?;
280 let config_descriptor = manifest.config();
281 let layers = manifest.layers();
282 self.ensure_config(layers, config_descriptor)
283 .await
284 .with_context(|| format!("Failed to pull config {config_descriptor:?}"))
285 }
286}
287
288pub async fn pull<ObjectID: FsVerityHashValue>(
291 repo: &Arc<Repository<ObjectID>>,
292 imgref: &str,
293 reference: Option<&str>,
294 img_proxy_config: Option<ImageProxyConfig>,
295) -> Result<(String, ObjectID)> {
296 let op = Arc::new(ImageOp::new(repo, imgref, img_proxy_config).await?);
297 let (sha256, id) = op
298 .pull()
299 .await
300 .with_context(|| format!("Unable to pull container image {imgref}"))?;
301
302 if let Some(name) = reference {
303 repo.name_stream(&sha256, name)?;
304 }
305 Ok((sha256, id))
306}