Skip to main content

fractal/utils/media/image/
queue.rs

1use std::{
2    collections::{HashMap, HashSet, VecDeque},
3    fmt,
4    future::IntoFuture,
5    path::PathBuf,
6    sync::{Arc, LazyLock, Mutex, MutexGuard},
7    time::Duration,
8};
9
10use futures_util::future::{BoxFuture, LocalBoxFuture};
11use gtk::glib;
12use matrix_sdk::{
13    Client,
14    media::{MediaRequestParameters, UniqueKey},
15};
16use tokio::sync::broadcast;
17use tracing::{debug, warn};
18
19use super::{Image, ImageDecoderSource, ImageError};
20use crate::{
21    spawn, spawn_tokio,
22    utils::{
23        File,
24        media::{FrameDimensions, MediaFileError},
25    },
26};
27
28/// The default image request queue.
29pub(crate) static IMAGE_QUEUE: LazyLock<ImageRequestQueue> = LazyLock::new(ImageRequestQueue::new);
30
31/// The default limit of the [`ImageRequestQueue`], aka the maximum number of
32/// concurrent image requests.
33const DEFAULT_QUEUE_LIMIT: usize = 20;
34/// The maximum number of retries for a single request.
35const MAX_REQUEST_RETRY_COUNT: u8 = 2;
36/// The time after which a request is considered to be stalled, 10
37/// seconds.
38const STALLED_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
39
40/// A queue for image requests.
41///
42/// This implements the following features:
43/// - Limit the number of concurrent requests,
44/// - Prioritize requests according to their importance,
45/// - Avoid duplicate requests,
46/// - Watch requests that fail with I/O errors to:
47///   - Reinsert them at the end of the queue to retry them later,
48///   - Reduce the pool capacity temporarily to avoid more similar errors and
49///     let the system recover.
50/// - Watch requests that take too long to:
51///   - Log them,
52///   - Ignore them in the count of ongoing requests.
53pub(crate) struct ImageRequestQueue {
54    inner: Arc<Mutex<ImageRequestQueueInner>>,
55}
56
57struct ImageRequestQueueInner {
58    /// The current limit of the ongoing requests count.
59    ///
60    /// This may change if an error is encountered, to let the system recover.
61    limit: usize,
62    /// The image requests in the queue.
63    requests: HashMap<ImageRequestId, ImageRequest>,
64    /// The ongoing requests.
65    ongoing: HashSet<ImageRequestId>,
66    /// The stalled requests.
67    stalled: HashSet<ImageRequestId>,
68    /// The queue of requests with default priority.
69    queue_default: VecDeque<ImageRequestId>,
70    /// The queue of requests with low priority.
71    queue_low: VecDeque<ImageRequestId>,
72}
73
74impl ImageRequestQueue {
75    /// Construct an empty `ImageRequestQueue` with the default settings.
76    fn new() -> Self {
77        Self {
78            inner: Mutex::new(ImageRequestQueueInner {
79                limit: DEFAULT_QUEUE_LIMIT,
80                requests: Default::default(),
81                ongoing: Default::default(),
82                stalled: Default::default(),
83                queue_default: Default::default(),
84                queue_low: Default::default(),
85            })
86            .into(),
87        }
88    }
89
90    /// Get a mutable copy of the inner data.
91    fn inner(&self) -> MutexGuard<'_, ImageRequestQueueInner> {
92        self.inner.lock().expect("Mutex should not be poisoned")
93    }
94
95    /// Add a request to download an image.
96    ///
97    /// If another request for the same image already exists, this will reuse
98    /// the same request.
99    pub(crate) fn add_download_request(
100        &self,
101        client: Client,
102        settings: MediaRequestParameters,
103        dimensions: Option<FrameDimensions>,
104        priority: ImageRequestPriority,
105    ) -> ImageRequestHandle {
106        self.inner()
107            .add_download_request(client, settings, dimensions, priority)
108    }
109
110    /// Add a request to load an image from a file.
111    ///
112    /// If another request for the same file already exists, this will reuse the
113    /// same request.
114    pub(crate) fn add_file_request(
115        &self,
116        file: File,
117        dimensions: Option<FrameDimensions>,
118    ) -> ImageRequestHandle {
119        self.inner().add_file_request(file, dimensions)
120    }
121
122    /// Mark the request with the given ID as stalled.
123    fn mark_as_stalled(&self, request_id: ImageRequestId) {
124        self.inner().mark_as_stalled(request_id);
125    }
126
127    /// Retry the request with the given ID.
128    ///
129    /// If `lower_limit` is `true`, we will also lower the limit of the queue.
130    fn retry_request(&self, request_id: &ImageRequestId, lower_limit: bool) {
131        self.inner().retry_request(request_id, lower_limit);
132    }
133
134    /// Remove the request with the given ID.
135    fn remove_request(&self, request_id: &ImageRequestId) {
136        self.inner().remove_request(request_id);
137    }
138}
139
140impl ImageRequestQueueInner {
141    /// Whether we have reache the current limit of concurrent requests.
142    fn is_limit_reached(&self) -> bool {
143        self.ongoing.len() >= self.limit
144    }
145
146    /// Add the given request to the queue.
147    fn queue_request(&mut self, request_id: ImageRequestId, request: ImageRequest) {
148        let is_limit_reached = self.is_limit_reached();
149        if !is_limit_reached || request.priority == ImageRequestPriority::High {
150            // Spawn the request right away.
151            self.ongoing.insert(request_id.clone());
152            request.spawn();
153        } else {
154            // Queue the request.
155            let queue = if request.priority == ImageRequestPriority::Default {
156                &mut self.queue_default
157            } else {
158                &mut self.queue_low
159            };
160
161            queue.push_back(request_id.clone());
162        }
163        self.requests.insert(request_id, request);
164    }
165
166    /// Add the given image request.
167    ///
168    /// If another request for the same image already exists, this will reuse
169    /// the same request.
170    fn add_request(
171        &mut self,
172        inner: ImageLoaderRequest,
173        priority: ImageRequestPriority,
174    ) -> ImageRequestHandle {
175        let request_id = inner.source.request_id();
176
177        // If the request already exists, use the existing one.
178        if let Some(request) = self.requests.get(&request_id) {
179            let result_receiver = request.result_sender.subscribe();
180            return ImageRequestHandle::new(result_receiver);
181        }
182
183        // Build and add the request.
184        let (request, result_receiver) = ImageRequest::new(inner, priority);
185
186        self.queue_request(request_id.clone(), request);
187
188        ImageRequestHandle::new(result_receiver)
189    }
190
191    /// Add a request to download an image.
192    ///
193    /// If another request for the same image already exists, this will reuse
194    /// the same request.
195    fn add_download_request(
196        &mut self,
197        client: Client,
198        settings: MediaRequestParameters,
199        dimensions: Option<FrameDimensions>,
200        priority: ImageRequestPriority,
201    ) -> ImageRequestHandle {
202        self.add_request(
203            ImageLoaderRequest {
204                source: ImageRequestSource::Download(DownloadRequest { client, settings }),
205                dimensions,
206            },
207            priority,
208        )
209    }
210
211    /// Add a request to load an image from a file.
212    ///
213    /// If another request for the same file already exists, this will reuse the
214    /// same request.
215    fn add_file_request(
216        &mut self,
217        file: File,
218        dimensions: Option<FrameDimensions>,
219    ) -> ImageRequestHandle {
220        // Always use high priority because file requests should always be for
221        // previewing a local image.
222        self.add_request(
223            ImageLoaderRequest {
224                source: ImageRequestSource::File(file),
225                dimensions,
226            },
227            ImageRequestPriority::High,
228        )
229    }
230
231    /// Mark the request with the given ID as stalled.
232    fn mark_as_stalled(&mut self, request_id: ImageRequestId) {
233        self.ongoing.remove(&request_id);
234        self.stalled.insert(request_id);
235
236        self.spawn_next();
237    }
238
239    /// Retry the request with the given ID.
240    ///
241    /// If `lower_limit` is `true`, we will also lower the limit of the queue.
242    fn retry_request(&mut self, request_id: &ImageRequestId, lower_limit: bool) {
243        self.ongoing.remove(request_id);
244
245        if lower_limit {
246            // Only one request at a time until the problem is likely fixed.
247            self.limit = 1;
248        }
249
250        let is_limit_reached = self.is_limit_reached();
251
252        match self.requests.get_mut(request_id) {
253            Some(request) => {
254                request.retries_count += 1;
255
256                // For fairness, only re-spawn the request right away if there is no other
257                // request waiting with a priority higher or equal to this one.
258                let can_spawn_request = if request.priority == ImageRequestPriority::High {
259                    true
260                } else {
261                    !is_limit_reached
262                        && self.queue_default.is_empty()
263                        && (request.priority == ImageRequestPriority::Default
264                            || self.queue_low.is_empty())
265                };
266
267                if can_spawn_request {
268                    // Re-spawn the request right away.
269                    self.ongoing.insert(request_id.clone());
270                    request.spawn();
271                } else {
272                    // Queue the request.
273                    let queue = if request.priority == ImageRequestPriority::Default {
274                        &mut self.queue_default
275                    } else {
276                        &mut self.queue_low
277                    };
278
279                    queue.push_back(request_id.clone());
280                }
281            }
282            None => {
283                // This should not happen.
284                warn!("Could not find request {request_id} to retry");
285            }
286        }
287
288        self.spawn_next();
289    }
290
291    /// Remove the request with the given ID.
292    fn remove_request(&mut self, request_id: &ImageRequestId) {
293        self.ongoing.remove(request_id);
294        self.stalled.remove(request_id);
295        self.queue_default.retain(|id| id != request_id);
296        self.queue_low.retain(|id| id != request_id);
297        self.requests.remove(request_id);
298
299        self.spawn_next();
300    }
301
302    /// Spawn as many requests as possible.
303    fn spawn_next(&mut self) {
304        while !self.is_limit_reached() {
305            let Some(request_id) = self
306                .queue_default
307                .pop_front()
308                .or_else(|| self.queue_low.pop_front())
309            else {
310                // No request to spawn.
311                return;
312            };
313            let Some(request) = self.requests.get(&request_id) else {
314                // The queues and requests are out of sync, this should not happen.
315                warn!("Missing image request {request_id}");
316                continue;
317            };
318
319            self.ongoing.insert(request_id.clone());
320            request.spawn();
321        }
322
323        // If there are no ongoing requests, restore the limit to its default value.
324        if self.ongoing.is_empty() {
325            self.limit = DEFAULT_QUEUE_LIMIT;
326        }
327    }
328}
329
330/// A request for an image.
331struct ImageRequest {
332    /// The request to the image loader.
333    inner: ImageLoaderRequest,
334    /// The priority of the request.
335    priority: ImageRequestPriority,
336    /// The sender of the channel to use to send the result.
337    result_sender: broadcast::Sender<Result<Image, ImageError>>,
338    /// The number of retries for this request.
339    retries_count: u8,
340    /// The handle for aborting the current task of this request.
341    task_handle: Arc<Mutex<Option<glib::JoinHandle<()>>>>,
342    /// The timeout source for marking this request as stalled.
343    stalled_timeout_source: Arc<Mutex<Option<glib::SourceId>>>,
344}
345
346impl ImageRequest {
347    /// Construct an image request with the given data and priority.
348    fn new(
349        inner: ImageLoaderRequest,
350        priority: ImageRequestPriority,
351    ) -> (Self, broadcast::Receiver<Result<Image, ImageError>>) {
352        let (result_sender, result_receiver) = broadcast::channel(1);
353        (
354            Self {
355                inner,
356                priority,
357                result_sender,
358                retries_count: 0,
359                task_handle: Default::default(),
360                stalled_timeout_source: Default::default(),
361            },
362            result_receiver,
363        )
364    }
365
366    /// Whether we can retry a request with the given retries count and after
367    /// the given error.
368    fn can_retry(retries_count: u8, error: ImageError) -> bool {
369        // Retry if we have not the max retry count && if it's a glycin error.
370        // We assume that the download requests have already been retried by the client.
371        retries_count < MAX_REQUEST_RETRY_COUNT && error == ImageError::Unknown
372    }
373
374    /// Spawn this request.
375    fn spawn(&self) {
376        let inner = self.inner.clone();
377        let result_sender = self.result_sender.clone();
378        let retries_count = self.retries_count;
379        let task_handle = self.task_handle.clone();
380        let stalled_timeout_source = self.stalled_timeout_source.clone();
381
382        glib::MainContext::default().spawn(async move {
383            let task_handle_clone = task_handle.clone();
384
385            let abort_handle = spawn!(async move{
386                let request_id = inner.source.request_id();
387
388                let stalled_timeout_source_clone = stalled_timeout_source.clone();
389                let request_id_clone = request_id.clone();
390                let source = glib::timeout_add_once(STALLED_REQUEST_TIMEOUT, move || {
391                    // Drop the timeout source.
392                    let _ = stalled_timeout_source_clone.lock().map(|mut s| s.take());
393
394                    IMAGE_QUEUE.mark_as_stalled(request_id_clone.clone());
395                    debug!(
396                        "Request {request_id_clone} is taking longer than {} seconds, it is now marked as stalled",
397                        STALLED_REQUEST_TIMEOUT.as_secs()
398                    );
399                });
400                if let Ok(Some(source)) = stalled_timeout_source.lock().map(|mut s| s.replace(source)) {
401                    // This should not happen, but cancel the old timeout if we have one.
402                    source.remove();
403                }
404
405                let result = inner.await;
406
407                // Cancel the timeout.
408                if let Ok(Some(source)) = stalled_timeout_source.lock().map(|mut s| s.take()) {
409                    source.remove();
410                }
411
412                // Now that we have the result, do not offer to abort the task anymore.
413                let _ = task_handle_clone.lock().map(|mut s| s.take());
414
415                // If it is an error, maybe we can retry it.
416                if result
417                    .as_ref()
418                    .err()
419                    .is_some_and(|error| Self::can_retry(retries_count, *error))
420                {
421                    // Lower the limit of the queue, it is likely that glycin cannot spawn a sandbox.
422                    IMAGE_QUEUE.retry_request(&request_id, true);
423                    return;
424                }
425
426                // Send the result.
427                spawn_tokio!(async move {
428                    if let Err(error) = result_sender.send(result) {
429                        warn!("Could not send result of image request {request_id}: {error}");
430                    }
431
432                    IMAGE_QUEUE.remove_request(&request_id);
433                });
434            });
435
436
437            if let Ok(Some(handle)) = task_handle.lock().map(|mut s| s.replace(abort_handle)) {
438                // This should not happen, but cancel the old task if we have one.
439                handle.abort();
440            }
441        });
442    }
443}
444
445impl Drop for ImageRequest {
446    fn drop(&mut self) {
447        if let Ok(Some(source)) = self.stalled_timeout_source.lock().map(|mut s| s.take()) {
448            source.remove();
449        }
450        if let Ok(Some(handle)) = self.task_handle.lock().map(|mut s| s.take()) {
451            handle.abort();
452
453            // Broadcast that the request was aborted.
454            let request_id = self.inner.source.request_id();
455            debug!("Image request {request_id} was aborted");
456
457            let result_sender = self.result_sender.clone();
458            spawn_tokio!(async move {
459                if let Err(error) = result_sender.send(Err(ImageError::Aborted)) {
460                    warn!("Could not send aborted error for image request {request_id}: {error}");
461                }
462            });
463        }
464    }
465}
466
467/// A request to download an image.
468#[derive(Clone)]
469struct DownloadRequest {
470    /// The Matrix client to use to make the request.
471    client: Client,
472    /// The settings of the request.
473    settings: MediaRequestParameters,
474}
475
476impl IntoFuture for DownloadRequest {
477    type Output = Result<ImageDecoderSource, MediaFileError>;
478    type IntoFuture = BoxFuture<'static, Self::Output>;
479
480    fn into_future(self) -> Self::IntoFuture {
481        let Self {
482            client, settings, ..
483        } = self;
484
485        Box::pin(async move {
486            let media = client.media();
487            let data = spawn_tokio!(async move { media.get_media_content(&settings, true).await })
488                .await
489                .expect("task should not be aborted")
490                .map_err(MediaFileError::from)?;
491
492            let file = ImageDecoderSource::with_bytes(data).await?;
493
494            Ok(file)
495        })
496    }
497}
498
499/// A request to the image loader.
500#[derive(Clone)]
501struct ImageLoaderRequest {
502    /// The source of the image data.
503    source: ImageRequestSource,
504    /// The dimensions to request.
505    dimensions: Option<FrameDimensions>,
506}
507
508impl IntoFuture for ImageLoaderRequest {
509    type Output = Result<Image, ImageError>;
510    type IntoFuture = LocalBoxFuture<'static, Self::Output>;
511
512    fn into_future(self) -> Self::IntoFuture {
513        Box::pin(async move {
514            // Load the data from the source.
515            let source = self.source.try_into_decoder_source().await?;
516
517            // Decode the image from the data.
518            source.decode_image(self.dimensions).await
519        })
520    }
521}
522
523/// The source for an image request.
524#[derive(Clone)]
525enum ImageRequestSource {
526    /// The image must be downloaded from the media cache or the server.
527    Download(DownloadRequest),
528    /// The image is in the given file.
529    File(File),
530}
531
532impl ImageRequestSource {
533    /// The ID of the image request with this source.
534    fn request_id(&self) -> ImageRequestId {
535        match self {
536            Self::Download(download_request) => {
537                ImageRequestId::Download(download_request.settings.unique_key())
538            }
539            Self::File(file) => ImageRequestId::File(file.path().expect("file should have a path")),
540        }
541    }
542
543    /// Try to download the image, if necessary.
544    async fn try_into_decoder_source(self) -> Result<ImageDecoderSource, ImageError> {
545        match self {
546            Self::Download(download_request) => {
547                // Download the image.
548                Ok(download_request
549                    .await
550                    .inspect_err(|error| warn!("Could not retrieve image: {error}"))?)
551            }
552            Self::File(data) => Ok(data.into()),
553        }
554    }
555}
556
557/// A unique identifier for an image request.
558#[derive(Debug, Clone, Hash, PartialEq, Eq)]
559enum ImageRequestId {
560    /// The identifier for a download request.
561    Download(String),
562    /// The identifier for a file request.
563    File(PathBuf),
564}
565
566impl fmt::Display for ImageRequestId {
567    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
568        match self {
569            Self::Download(id) => id.fmt(f),
570            Self::File(path) => path.to_string_lossy().fmt(f),
571        }
572    }
573}
574
575/// The priority of an image request.
576#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
577pub(crate) enum ImageRequestPriority {
578    /// The highest priority.
579    ///
580    /// A request with this priority will be spawned right away and will not be
581    /// limited by the capacity of the pool.
582    ///
583    /// Should be used for images presented in the image viewer, the user avatar
584    /// in the account settings or the room avatar in the room details.
585    High,
586    /// The default priority.
587    ///
588    /// Should be used for images in messages in the room history, or in the
589    /// media history.
590    #[default]
591    Default,
592    /// The lowest priority.
593    ///
594    /// Should be used for avatars in the sidebar, the room history or the
595    /// members list.
596    Low,
597}
598
599/// A handle for `await`ing an image request.
600pub(crate) struct ImageRequestHandle {
601    receiver: broadcast::Receiver<Result<Image, ImageError>>,
602}
603
604impl ImageRequestHandle {
605    /// Construct a new `ImageRequestHandle` with the given request ID.
606    fn new(receiver: broadcast::Receiver<Result<Image, ImageError>>) -> Self {
607        Self { receiver }
608    }
609}
610
611impl IntoFuture for ImageRequestHandle {
612    type Output = Result<Image, ImageError>;
613    type IntoFuture = BoxFuture<'static, Self::Output>;
614
615    fn into_future(self) -> Self::IntoFuture {
616        let mut receiver = self.receiver;
617        Box::pin(async move {
618            let handle = spawn_tokio!(async move { receiver.recv().await });
619            match handle.await.expect("task was not aborted") {
620                Ok(Ok(image)) => Ok(image),
621                Ok(err) => err,
622                Err(error) => {
623                    warn!("Could not load image: {error}");
624                    Err(ImageError::Unknown)
625                }
626            }
627        })
628    }
629}