fractal/utils/media/image/
queue.rs1use std::{
2 collections::{HashMap, HashSet, VecDeque},
3 fmt,
4 future::IntoFuture,
5 path::PathBuf,
6 sync::{Arc, LazyLock, Mutex, MutexGuard},
7 time::Duration,
8};
9
10use futures_util::future::{BoxFuture, LocalBoxFuture};
11use gtk::glib;
12use matrix_sdk::{
13 Client,
14 media::{MediaRequestParameters, UniqueKey},
15};
16use tokio::sync::broadcast;
17use tracing::{debug, warn};
18
19use super::{Image, ImageDecoderSource, ImageError};
20use crate::{
21 spawn, spawn_tokio,
22 utils::{
23 File,
24 media::{FrameDimensions, MediaFileError},
25 },
26};
27
28pub(crate) static IMAGE_QUEUE: LazyLock<ImageRequestQueue> = LazyLock::new(ImageRequestQueue::new);
30
31const DEFAULT_QUEUE_LIMIT: usize = 20;
34const MAX_REQUEST_RETRY_COUNT: u8 = 2;
36const STALLED_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
39
40pub(crate) struct ImageRequestQueue {
54 inner: Arc<Mutex<ImageRequestQueueInner>>,
55}
56
57struct ImageRequestQueueInner {
58 limit: usize,
62 requests: HashMap<ImageRequestId, ImageRequest>,
64 ongoing: HashSet<ImageRequestId>,
66 stalled: HashSet<ImageRequestId>,
68 queue_default: VecDeque<ImageRequestId>,
70 queue_low: VecDeque<ImageRequestId>,
72}
73
74impl ImageRequestQueue {
75 fn new() -> Self {
77 Self {
78 inner: Mutex::new(ImageRequestQueueInner {
79 limit: DEFAULT_QUEUE_LIMIT,
80 requests: Default::default(),
81 ongoing: Default::default(),
82 stalled: Default::default(),
83 queue_default: Default::default(),
84 queue_low: Default::default(),
85 })
86 .into(),
87 }
88 }
89
90 fn inner(&self) -> MutexGuard<'_, ImageRequestQueueInner> {
92 self.inner.lock().expect("Mutex should not be poisoned")
93 }
94
95 pub(crate) fn add_download_request(
100 &self,
101 client: Client,
102 settings: MediaRequestParameters,
103 dimensions: Option<FrameDimensions>,
104 priority: ImageRequestPriority,
105 ) -> ImageRequestHandle {
106 self.inner()
107 .add_download_request(client, settings, dimensions, priority)
108 }
109
110 pub(crate) fn add_file_request(
115 &self,
116 file: File,
117 dimensions: Option<FrameDimensions>,
118 ) -> ImageRequestHandle {
119 self.inner().add_file_request(file, dimensions)
120 }
121
122 fn mark_as_stalled(&self, request_id: ImageRequestId) {
124 self.inner().mark_as_stalled(request_id);
125 }
126
127 fn retry_request(&self, request_id: &ImageRequestId, lower_limit: bool) {
131 self.inner().retry_request(request_id, lower_limit);
132 }
133
134 fn remove_request(&self, request_id: &ImageRequestId) {
136 self.inner().remove_request(request_id);
137 }
138}
139
140impl ImageRequestQueueInner {
141 fn is_limit_reached(&self) -> bool {
143 self.ongoing.len() >= self.limit
144 }
145
146 fn queue_request(&mut self, request_id: ImageRequestId, request: ImageRequest) {
148 let is_limit_reached = self.is_limit_reached();
149 if !is_limit_reached || request.priority == ImageRequestPriority::High {
150 self.ongoing.insert(request_id.clone());
152 request.spawn();
153 } else {
154 let queue = if request.priority == ImageRequestPriority::Default {
156 &mut self.queue_default
157 } else {
158 &mut self.queue_low
159 };
160
161 queue.push_back(request_id.clone());
162 }
163 self.requests.insert(request_id, request);
164 }
165
166 fn add_request(
171 &mut self,
172 inner: ImageLoaderRequest,
173 priority: ImageRequestPriority,
174 ) -> ImageRequestHandle {
175 let request_id = inner.source.request_id();
176
177 if let Some(request) = self.requests.get(&request_id) {
179 let result_receiver = request.result_sender.subscribe();
180 return ImageRequestHandle::new(result_receiver);
181 }
182
183 let (request, result_receiver) = ImageRequest::new(inner, priority);
185
186 self.queue_request(request_id.clone(), request);
187
188 ImageRequestHandle::new(result_receiver)
189 }
190
191 fn add_download_request(
196 &mut self,
197 client: Client,
198 settings: MediaRequestParameters,
199 dimensions: Option<FrameDimensions>,
200 priority: ImageRequestPriority,
201 ) -> ImageRequestHandle {
202 self.add_request(
203 ImageLoaderRequest {
204 source: ImageRequestSource::Download(DownloadRequest { client, settings }),
205 dimensions,
206 },
207 priority,
208 )
209 }
210
211 fn add_file_request(
216 &mut self,
217 file: File,
218 dimensions: Option<FrameDimensions>,
219 ) -> ImageRequestHandle {
220 self.add_request(
223 ImageLoaderRequest {
224 source: ImageRequestSource::File(file),
225 dimensions,
226 },
227 ImageRequestPriority::High,
228 )
229 }
230
231 fn mark_as_stalled(&mut self, request_id: ImageRequestId) {
233 self.ongoing.remove(&request_id);
234 self.stalled.insert(request_id);
235
236 self.spawn_next();
237 }
238
239 fn retry_request(&mut self, request_id: &ImageRequestId, lower_limit: bool) {
243 self.ongoing.remove(request_id);
244
245 if lower_limit {
246 self.limit = 1;
248 }
249
250 let is_limit_reached = self.is_limit_reached();
251
252 match self.requests.get_mut(request_id) {
253 Some(request) => {
254 request.retries_count += 1;
255
256 let can_spawn_request = if request.priority == ImageRequestPriority::High {
259 true
260 } else {
261 !is_limit_reached
262 && self.queue_default.is_empty()
263 && (request.priority == ImageRequestPriority::Default
264 || self.queue_low.is_empty())
265 };
266
267 if can_spawn_request {
268 self.ongoing.insert(request_id.clone());
270 request.spawn();
271 } else {
272 let queue = if request.priority == ImageRequestPriority::Default {
274 &mut self.queue_default
275 } else {
276 &mut self.queue_low
277 };
278
279 queue.push_back(request_id.clone());
280 }
281 }
282 None => {
283 warn!("Could not find request {request_id} to retry");
285 }
286 }
287
288 self.spawn_next();
289 }
290
291 fn remove_request(&mut self, request_id: &ImageRequestId) {
293 self.ongoing.remove(request_id);
294 self.stalled.remove(request_id);
295 self.queue_default.retain(|id| id != request_id);
296 self.queue_low.retain(|id| id != request_id);
297 self.requests.remove(request_id);
298
299 self.spawn_next();
300 }
301
302 fn spawn_next(&mut self) {
304 while !self.is_limit_reached() {
305 let Some(request_id) = self
306 .queue_default
307 .pop_front()
308 .or_else(|| self.queue_low.pop_front())
309 else {
310 return;
312 };
313 let Some(request) = self.requests.get(&request_id) else {
314 warn!("Missing image request {request_id}");
316 continue;
317 };
318
319 self.ongoing.insert(request_id.clone());
320 request.spawn();
321 }
322
323 if self.ongoing.is_empty() {
325 self.limit = DEFAULT_QUEUE_LIMIT;
326 }
327 }
328}
329
330struct ImageRequest {
332 inner: ImageLoaderRequest,
334 priority: ImageRequestPriority,
336 result_sender: broadcast::Sender<Result<Image, ImageError>>,
338 retries_count: u8,
340 task_handle: Arc<Mutex<Option<glib::JoinHandle<()>>>>,
342 stalled_timeout_source: Arc<Mutex<Option<glib::SourceId>>>,
344}
345
346impl ImageRequest {
347 fn new(
349 inner: ImageLoaderRequest,
350 priority: ImageRequestPriority,
351 ) -> (Self, broadcast::Receiver<Result<Image, ImageError>>) {
352 let (result_sender, result_receiver) = broadcast::channel(1);
353 (
354 Self {
355 inner,
356 priority,
357 result_sender,
358 retries_count: 0,
359 task_handle: Default::default(),
360 stalled_timeout_source: Default::default(),
361 },
362 result_receiver,
363 )
364 }
365
366 fn can_retry(retries_count: u8, error: ImageError) -> bool {
369 retries_count < MAX_REQUEST_RETRY_COUNT && error == ImageError::Unknown
372 }
373
374 fn spawn(&self) {
376 let inner = self.inner.clone();
377 let result_sender = self.result_sender.clone();
378 let retries_count = self.retries_count;
379 let task_handle = self.task_handle.clone();
380 let stalled_timeout_source = self.stalled_timeout_source.clone();
381
382 glib::MainContext::default().spawn(async move {
383 let task_handle_clone = task_handle.clone();
384
385 let abort_handle = spawn!(async move{
386 let request_id = inner.source.request_id();
387
388 let stalled_timeout_source_clone = stalled_timeout_source.clone();
389 let request_id_clone = request_id.clone();
390 let source = glib::timeout_add_once(STALLED_REQUEST_TIMEOUT, move || {
391 let _ = stalled_timeout_source_clone.lock().map(|mut s| s.take());
393
394 IMAGE_QUEUE.mark_as_stalled(request_id_clone.clone());
395 debug!(
396 "Request {request_id_clone} is taking longer than {} seconds, it is now marked as stalled",
397 STALLED_REQUEST_TIMEOUT.as_secs()
398 );
399 });
400 if let Ok(Some(source)) = stalled_timeout_source.lock().map(|mut s| s.replace(source)) {
401 source.remove();
403 }
404
405 let result = inner.await;
406
407 if let Ok(Some(source)) = stalled_timeout_source.lock().map(|mut s| s.take()) {
409 source.remove();
410 }
411
412 let _ = task_handle_clone.lock().map(|mut s| s.take());
414
415 if result
417 .as_ref()
418 .err()
419 .is_some_and(|error| Self::can_retry(retries_count, *error))
420 {
421 IMAGE_QUEUE.retry_request(&request_id, true);
423 return;
424 }
425
426 spawn_tokio!(async move {
428 if let Err(error) = result_sender.send(result) {
429 warn!("Could not send result of image request {request_id}: {error}");
430 }
431
432 IMAGE_QUEUE.remove_request(&request_id);
433 });
434 });
435
436
437 if let Ok(Some(handle)) = task_handle.lock().map(|mut s| s.replace(abort_handle)) {
438 handle.abort();
440 }
441 });
442 }
443}
444
445impl Drop for ImageRequest {
446 fn drop(&mut self) {
447 if let Ok(Some(source)) = self.stalled_timeout_source.lock().map(|mut s| s.take()) {
448 source.remove();
449 }
450 if let Ok(Some(handle)) = self.task_handle.lock().map(|mut s| s.take()) {
451 handle.abort();
452
453 let request_id = self.inner.source.request_id();
455 debug!("Image request {request_id} was aborted");
456
457 let result_sender = self.result_sender.clone();
458 spawn_tokio!(async move {
459 if let Err(error) = result_sender.send(Err(ImageError::Aborted)) {
460 warn!("Could not send aborted error for image request {request_id}: {error}");
461 }
462 });
463 }
464 }
465}
466
467#[derive(Clone)]
469struct DownloadRequest {
470 client: Client,
472 settings: MediaRequestParameters,
474}
475
476impl IntoFuture for DownloadRequest {
477 type Output = Result<ImageDecoderSource, MediaFileError>;
478 type IntoFuture = BoxFuture<'static, Self::Output>;
479
480 fn into_future(self) -> Self::IntoFuture {
481 let Self {
482 client, settings, ..
483 } = self;
484
485 Box::pin(async move {
486 let media = client.media();
487 let data = spawn_tokio!(async move { media.get_media_content(&settings, true).await })
488 .await
489 .expect("task should not be aborted")
490 .map_err(MediaFileError::from)?;
491
492 let file = ImageDecoderSource::with_bytes(data).await?;
493
494 Ok(file)
495 })
496 }
497}
498
499#[derive(Clone)]
501struct ImageLoaderRequest {
502 source: ImageRequestSource,
504 dimensions: Option<FrameDimensions>,
506}
507
508impl IntoFuture for ImageLoaderRequest {
509 type Output = Result<Image, ImageError>;
510 type IntoFuture = LocalBoxFuture<'static, Self::Output>;
511
512 fn into_future(self) -> Self::IntoFuture {
513 Box::pin(async move {
514 let source = self.source.try_into_decoder_source().await?;
516
517 source.decode_image(self.dimensions).await
519 })
520 }
521}
522
523#[derive(Clone)]
525enum ImageRequestSource {
526 Download(DownloadRequest),
528 File(File),
530}
531
532impl ImageRequestSource {
533 fn request_id(&self) -> ImageRequestId {
535 match self {
536 Self::Download(download_request) => {
537 ImageRequestId::Download(download_request.settings.unique_key())
538 }
539 Self::File(file) => ImageRequestId::File(file.path().expect("file should have a path")),
540 }
541 }
542
543 async fn try_into_decoder_source(self) -> Result<ImageDecoderSource, ImageError> {
545 match self {
546 Self::Download(download_request) => {
547 Ok(download_request
549 .await
550 .inspect_err(|error| warn!("Could not retrieve image: {error}"))?)
551 }
552 Self::File(data) => Ok(data.into()),
553 }
554 }
555}
556
557#[derive(Debug, Clone, Hash, PartialEq, Eq)]
559enum ImageRequestId {
560 Download(String),
562 File(PathBuf),
564}
565
566impl fmt::Display for ImageRequestId {
567 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
568 match self {
569 Self::Download(id) => id.fmt(f),
570 Self::File(path) => path.to_string_lossy().fmt(f),
571 }
572 }
573}
574
575#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
577pub(crate) enum ImageRequestPriority {
578 High,
586 #[default]
591 Default,
592 Low,
597}
598
599pub(crate) struct ImageRequestHandle {
601 receiver: broadcast::Receiver<Result<Image, ImageError>>,
602}
603
604impl ImageRequestHandle {
605 fn new(receiver: broadcast::Receiver<Result<Image, ImageError>>) -> Self {
607 Self { receiver }
608 }
609}
610
611impl IntoFuture for ImageRequestHandle {
612 type Output = Result<Image, ImageError>;
613 type IntoFuture = BoxFuture<'static, Self::Output>;
614
615 fn into_future(self) -> Self::IntoFuture {
616 let mut receiver = self.receiver;
617 Box::pin(async move {
618 let handle = spawn_tokio!(async move { receiver.recv().await });
619 match handle.await.expect("task was not aborted") {
620 Ok(Ok(image)) => Ok(image),
621 Ok(err) => err,
622 Err(error) => {
623 warn!("Could not load image: {error}");
624 Err(ImageError::Unknown)
625 }
626 }
627 })
628 }
629}