1use std::time::Duration;
2
3use futures_util::{StreamExt, lock::Mutex};
4use gettextrs::gettext;
5use gtk::{gio, glib, glib::clone, prelude::*, subclass::prelude::*};
6use matrix_sdk::{
7 Client, SessionChange, config::SyncSettings, media::MediaRetentionPolicy, sync::SyncResponse,
8};
9use ruma::{
10 api::client::{
11 filter::{FilterDefinition, RoomFilter},
12 profile::{AvatarUrl, DisplayName},
13 search::search_events::v3::UserProfile,
14 },
15 assign,
16};
17use tokio::{task::AbortHandle, time::sleep};
18use tokio_stream::wrappers::BroadcastStream;
19use tracing::{debug, error, info};
20
21mod global_account_data;
22mod ignored_users;
23mod notifications;
24mod remote;
25mod room;
26mod room_list;
27mod security;
28mod session_settings;
29mod sidebar_data;
30mod user;
31mod user_sessions_list;
32mod verification;
33
34pub(crate) use self::{
35 global_account_data::*, ignored_users::*, notifications::*, remote::*, room::*, room_list::*,
36 security::*, session_settings::*, sidebar_data::*, user::*, user_sessions_list::*,
37 verification::*,
38};
39use crate::{
40 Application,
41 components::AvatarData,
42 prelude::*,
43 secret::StoredSession,
44 session_list::{SessionInfo, SessionInfoImpl},
45 spawn, spawn_tokio,
46 utils::{
47 TokioDrop,
48 matrix::{self, ClientSetupError},
49 },
50};
51
52const SESSION_PROFILE_KEY: &str = "session_profile";
54const MISSED_SYNC_OFFLINE_COUNT: usize = 2;
60const MISSED_SYNC_DELAYS: &[u64] = &[1, 5, 10, 20, 30];
63
64#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, glib::Enum)]
66#[repr(i32)]
67#[enum_type(name = "SessionState")]
68pub enum SessionState {
69 LoggedOut = -1,
70 #[default]
71 Init = 0,
72 InitialSync = 1,
73 Ready = 2,
74}
75
76mod imp {
77 use std::cell::{Cell, OnceCell, RefCell};
78
79 use super::*;
80
81 #[derive(Debug, Default, glib::Properties)]
82 #[properties(wrapper_type = super::Session)]
83 pub struct Session {
84 client: OnceCell<TokioDrop<Client>>,
86 #[property(get = Self::sidebar_list_model)]
88 sidebar_list_model: OnceCell<SidebarListModel>,
89 #[property(get = Self::user)]
91 user: OnceCell<User>,
92 #[property(get, builder(SessionState::default()))]
94 state: Cell<SessionState>,
95 #[property(get)]
97 is_homeserver_reachable: Cell<bool>,
98 #[property(get)]
100 is_offline: Cell<bool>,
101 #[property(get, construct_only)]
103 settings: OnceCell<SessionSettings>,
104 #[property(get = Self::global_account_data_owned)]
106 global_account_data: OnceCell<GlobalAccountData>,
107 #[property(get)]
109 notifications: Notifications,
110 #[property(get)]
112 ignored_users: IgnoredUsers,
113 #[property(get)]
115 user_sessions: UserSessionsList,
116 #[property(get)]
118 security: SessionSecurity,
119 remote_cache: OnceCell<RemoteCache>,
121 session_changes_handle: RefCell<Option<AbortHandle>>,
122 sync_handle: RefCell<Option<AbortHandle>>,
123 network_monitor_handler_id: RefCell<Option<glib::SignalHandlerId>>,
124 homeserver_reachable_lock: Mutex<()>,
125 homeserver_reachable_source: RefCell<Option<glib::SourceId>>,
126 missed_sync_count: Cell<usize>,
130 }
131
132 #[glib::object_subclass]
133 impl ObjectSubclass for Session {
134 const NAME: &'static str = "Session";
135 type Type = super::Session;
136 type ParentType = SessionInfo;
137 }
138
139 #[glib::derived_properties]
140 impl ObjectImpl for Session {
141 fn dispose(&self) {
142 if let Some(handler_id) = self.network_monitor_handler_id.take() {
144 gio::NetworkMonitor::default().disconnect(handler_id);
145 }
146
147 if let Some(source) = self.homeserver_reachable_source.take() {
148 source.remove();
149 }
150
151 if let Some(handle) = self.session_changes_handle.take() {
152 handle.abort();
153 }
154
155 if let Some(handle) = self.sync_handle.take() {
156 handle.abort();
157 }
158 }
159 }
160
161 impl SessionInfoImpl for Session {
162 fn avatar_data(&self) -> AvatarData {
163 self.user().avatar_data().clone()
164 }
165 }
166
167 impl Session {
168 pub(super) fn set_client(&self, client: Client) {
170 self.client
171 .set(TokioDrop::new(client))
172 .expect("client should be uninitialized");
173
174 let obj = self.obj();
175
176 self.ignored_users.set_session(Some(obj.clone()));
177 self.notifications.set_session(Some(obj.clone()));
178 self.user_sessions.init(&obj, obj.user_id().clone());
179
180 let monitor = gio::NetworkMonitor::default();
181 let handler_id = monitor.connect_network_changed(clone!(
182 #[weak(rename_to = imp)]
183 self,
184 move |_, _| {
185 spawn!(async move {
186 imp.update_homeserver_reachable().await;
187 });
188 }
189 ));
190 self.network_monitor_handler_id.replace(Some(handler_id));
191 }
192
193 pub(super) fn client(&self) -> &Client {
195 self.client.get().expect("client should be initialized")
196 }
197
198 fn sidebar_list_model(&self) -> SidebarListModel {
200 self.sidebar_list_model
201 .get_or_init(|| {
202 let obj = self.obj();
203 let item_list =
204 SidebarItemList::new(&RoomList::new(&obj), &VerificationList::new(&obj));
205 SidebarListModel::new(&item_list)
206 })
207 .clone()
208 }
209
210 pub(super) fn room_list(&self) -> RoomList {
212 self.sidebar_list_model().item_list().room_list()
213 }
214
215 pub(super) fn verification_list(&self) -> VerificationList {
217 self.sidebar_list_model().item_list().verification_list()
218 }
219
220 fn user(&self) -> User {
222 self.user
223 .get_or_init(|| {
224 let obj = self.obj();
225 User::new(&obj, obj.info().user_id.clone())
226 })
227 .clone()
228 }
229
230 fn set_state(&self, state: SessionState) {
232 let old_state = self.state.get();
233
234 if old_state == SessionState::LoggedOut || old_state == state {
235 return;
238 }
239
240 self.state.set(state);
241 self.obj().notify_state();
242 }
243
244 fn homeserver_address(&self) -> gio::NetworkAddress {
246 let obj = self.obj();
247 let homeserver = obj.homeserver();
248 let default_port = if homeserver.scheme() == "http" {
249 80
250 } else {
251 443
252 };
253
254 gio::NetworkAddress::parse_uri(homeserver.as_str(), default_port)
255 .expect("url is parsed successfully")
256 }
257
258 pub(super) async fn update_homeserver_reachable(&self) {
260 if let Some(source) = self.homeserver_reachable_source.take() {
262 source.remove();
263 }
264 let Some(_guard) = self.homeserver_reachable_lock.try_lock() else {
265 return;
267 };
268
269 let monitor = gio::NetworkMonitor::default();
270 let is_network_available = monitor.is_network_available();
271
272 let is_homeserver_reachable = if is_network_available {
273 let address = self.homeserver_address();
275
276 match monitor.can_reach_future(&address).await {
277 Ok(()) => true,
278 Err(error) => {
279 error!(
280 session = self.obj().session_id(),
281 "Homeserver is not reachable: {error}"
282 );
283 false
284 }
285 }
286 } else {
287 false
288 };
289
290 self.set_is_homeserver_reachable(is_homeserver_reachable);
291
292 if is_network_available && !is_homeserver_reachable {
293 let source = glib::timeout_add_seconds_local_once(
295 10,
296 clone!(
297 #[weak(rename_to = imp)]
298 self,
299 move || {
300 imp.homeserver_reachable_source.take();
301
302 spawn!(async move {
303 imp.update_homeserver_reachable().await;
304 });
305 }
306 ),
307 );
308 self.homeserver_reachable_source.replace(Some(source));
309 }
310 }
311
312 fn set_is_homeserver_reachable(&self, is_reachable: bool) {
314 if self.is_homeserver_reachable.get() == is_reachable {
315 return;
316 }
317 let obj = self.obj();
318
319 self.is_homeserver_reachable.set(is_reachable);
320
321 if let Some(handle) = self.sync_handle.take() {
322 handle.abort();
323 }
324
325 if is_reachable {
326 info!(session = obj.session_id(), "Homeserver is reachable");
327
328 self.sync();
330 } else {
331 self.set_offline(true);
332 }
333
334 obj.notify_is_homeserver_reachable();
335 }
336
337 pub(super) fn set_offline(&self, is_offline: bool) {
339 if self.is_offline.get() == is_offline {
340 return;
341 }
342
343 if !is_offline {
344 let client = self.client().clone();
346 spawn_tokio!(async move {
347 client.send_queue().set_enabled(true).await;
348 });
349 }
350
351 self.is_offline.set(is_offline);
352 self.obj().notify_is_offline();
353 }
354
355 fn global_account_data(&self) -> &GlobalAccountData {
357 self.global_account_data
358 .get_or_init(|| GlobalAccountData::new(&self.obj()))
359 }
360
361 fn global_account_data_owned(&self) -> GlobalAccountData {
364 self.global_account_data().clone()
365 }
366
367 pub(super) fn remote_cache(&self) -> &RemoteCache {
369 self.remote_cache
370 .get_or_init(|| RemoteCache::new(self.obj().clone()))
371 }
372
373 pub(super) async fn prepare(&self) {
375 spawn!(
376 glib::Priority::LOW,
377 clone!(
378 #[weak(rename_to = imp)]
379 self,
380 async move {
381 imp.init_user_profile().await;
383 imp.update_user_profile().await;
385 }
386 )
387 );
388
389 self.global_account_data();
390 self.watch_session_changes();
391 self.update_homeserver_reachable().await;
392
393 self.room_list().load().await;
394 self.verification_list().init();
395 self.security.set_session(Some(&*self.obj()));
396
397 let client = self.client().clone();
398 spawn_tokio!(async move {
399 client
400 .send_queue()
401 .respawn_tasks_for_rooms_with_unsent_requests()
402 .await;
403 });
404
405 self.set_state(SessionState::InitialSync);
406 self.sync();
407
408 debug!(
409 session = self.obj().session_id(),
410 "A new session was prepared"
411 );
412 }
413
414 fn watch_session_changes(&self) {
417 let receiver = self.client().subscribe_to_session_changes();
418 let stream = BroadcastStream::new(receiver);
419
420 let obj_weak = glib::SendWeakRef::from(self.obj().downgrade());
421 let fut = stream.for_each(move |change| {
422 let obj_weak = obj_weak.clone();
423 async move {
424 let Ok(change) = change else {
425 return;
426 };
427
428 let ctx = glib::MainContext::default();
429 ctx.spawn(async move {
430 spawn!(async move {
431 if let Some(obj) = obj_weak.upgrade() {
432 match change {
433 SessionChange::UnknownToken { .. } => {
434 info!(
435 session = obj.session_id(),
436 "The access token is invalid, cleaning up the session…"
437 );
438 obj.imp().clean_up().await;
439 }
440 SessionChange::TokensRefreshed => {
441 obj.imp().store_tokens().await;
442 }
443 }
444 }
445 });
446 });
447 }
448 });
449
450 let handle = spawn_tokio!(fut).abort_handle();
451 self.session_changes_handle.replace(Some(handle));
452 }
453
454 fn sync(&self) {
456 if self.state.get() < SessionState::InitialSync || !self.is_homeserver_reachable.get() {
457 return;
458 }
459
460 let client = self.client().clone();
461 let obj_weak = glib::SendWeakRef::from(self.obj().downgrade());
462
463 let handle = spawn_tokio!(async move {
464 if let Err(error) = client.event_cache().subscribe() {
467 error!("Could not subscribe event cache to sync responses: {error}");
468 }
469
470 let filter = assign!(FilterDefinition::default(), {
472 room: assign!(RoomFilter::with_lazy_loading(), {
473 include_leave: true,
474 }),
475 });
476
477 let sync_settings = SyncSettings::new()
478 .timeout(Duration::from_secs(30))
479 .ignore_timeout_on_first_sync(true)
480 .filter(filter.into());
481
482 let mut sync_stream = Box::pin(client.sync_stream(sync_settings).await);
483 while let Some(response) = sync_stream.next().await {
484 let obj_weak = obj_weak.clone();
485 let ctx = glib::MainContext::default();
486 let delay = ctx
487 .spawn(async move {
488 spawn!(async move {
489 if let Some(obj) = obj_weak.upgrade() {
490 obj.imp().handle_sync_response(response)
491 } else {
492 None
493 }
494 })
495 .await
496 .expect("task was not aborted")
497 })
498 .await
499 .expect("task was not aborted");
500
501 if let Some(delay) = delay {
502 sleep(delay).await;
503 }
504 }
505 })
506 .abort_handle();
507
508 self.sync_handle.replace(Some(handle));
509 }
510
511 fn handle_sync_response(
516 &self,
517 response: Result<SyncResponse, matrix_sdk::Error>,
518 ) -> Option<Duration> {
519 let obj = self.obj();
520 let session_id = obj.session_id();
521 debug!(session = session_id, "Received sync response");
522
523 match response {
524 Ok(response) => {
525 self.room_list().handle_room_updates(response.rooms);
526
527 if self.state.get() < SessionState::Ready {
528 self.set_state(SessionState::Ready);
529 self.init_notifications();
530 }
531
532 self.set_offline(false);
533 self.missed_sync_count.set(0);
534
535 None
536 }
537 Err(error) => {
538 let missed_sync_count = self.missed_sync_count.get();
539
540 if missed_sync_count == MISSED_SYNC_OFFLINE_COUNT {
542 self.set_offline(true);
543 }
544
545 if missed_sync_count < 4 {
547 self.missed_sync_count.set(missed_sync_count + 1);
548 }
549
550 error!(session = session_id, "Could not perform sync: {error}");
551
552 let delay = MISSED_SYNC_DELAYS[missed_sync_count];
554 Some(Duration::from_secs(delay))
555 }
556 }
557 }
558
559 async fn init_user_profile(&self) {
561 let client = self.client().clone();
562 let handle = spawn_tokio!(async move {
563 client
564 .state_store()
565 .get_custom_value(SESSION_PROFILE_KEY.as_bytes())
566 .await
567 });
568
569 let profile = match handle.await.expect("task was not aborted") {
570 Ok(Some(bytes)) => match serde_json::from_slice::<UserProfile>(&bytes) {
571 Ok(profile) => profile,
572 Err(error) => {
573 error!(
574 session = self.obj().session_id(),
575 "Could not deserialize session profile: {error}"
576 );
577 return;
578 }
579 },
580 Ok(None) => return,
581 Err(error) => {
582 error!(
583 session = self.obj().session_id(),
584 "Could not load cached session profile: {error}"
585 );
586 return;
587 }
588 };
589
590 let user = self.user();
591 user.set_name(profile.displayname);
592 user.set_avatar_url(profile.avatar_url);
593 }
594
595 async fn update_user_profile(&self) {
599 let client = self.client().clone();
600 let client_clone = client.clone();
601 let handle =
602 spawn_tokio!(async move { client_clone.account().fetch_user_profile().await });
603
604 let profile = match handle
605 .await
606 .expect("task was not aborted")
607 .and_then(|response| {
608 let mut profile = UserProfile::new();
609 profile.displayname = response.get_static::<DisplayName>()?;
610 profile.avatar_url = response.get_static::<AvatarUrl>()?;
611
612 Ok(profile)
613 }) {
614 Ok(profile) => profile,
615 Err(error) => {
616 error!(
617 session = self.obj().session_id(),
618 "Could not fetch session profile: {error}"
619 );
620 return;
621 }
622 };
623
624 let user = self.user();
625
626 if Some(user.display_name()) == profile.displayname
627 && user
628 .avatar_data()
629 .image()
630 .is_some_and(|i| i.uri() == profile.avatar_url)
631 {
632 return;
634 }
635
636 let value = serde_json::to_vec(&profile);
638
639 user.set_name(profile.displayname);
641 user.set_avatar_url(profile.avatar_url);
642
643 let value = match value {
645 Ok(value) => value,
646 Err(error) => {
647 error!(
648 session = self.obj().session_id(),
649 "Could not serialize session profile: {error}"
650 );
651 return;
652 }
653 };
654
655 let handle = spawn_tokio!(async move {
656 client
657 .state_store()
658 .set_custom_value(SESSION_PROFILE_KEY.as_bytes(), value)
659 .await
660 });
661
662 if let Err(error) = handle.await.expect("task was not aborted") {
663 error!(
664 session = self.obj().session_id(),
665 "Could not cache session profile: {error}"
666 );
667 }
668 }
669
670 fn init_notifications(&self) {
672 let obj_weak = glib::SendWeakRef::from(self.obj().downgrade());
673 let client = self.client().clone();
674
675 spawn_tokio!(async move {
676 client
677 .register_notification_handler(move |notification, room, _| {
678 let obj_weak = obj_weak.clone();
679 async move {
680 let ctx = glib::MainContext::default();
681 ctx.spawn(async move {
682 spawn!(async move {
683 if let Some(obj) = obj_weak.upgrade() {
684 obj.notifications().show_push(notification, room).await;
685 }
686 });
687 });
688 }
689 })
690 .await;
691 });
692 }
693
694 async fn store_tokens(&self) {
696 let Some(session_tokens) = self.client().session_tokens() else {
697 return;
698 };
699
700 debug!(
701 session = self.obj().session_id(),
702 "Storing updated session tokens…"
703 );
704 self.obj().info().store_tokens(session_tokens).await;
705 }
706
707 pub(super) async fn clean_up(&self) {
712 let obj = self.obj();
713 self.set_state(SessionState::LoggedOut);
714
715 if let Some(handle) = self.sync_handle.take() {
716 handle.abort();
717 }
718
719 if let Some(settings) = self.settings.get() {
720 settings.delete();
721 }
722
723 obj.info().clone().delete().await;
724
725 self.notifications.clear();
726
727 debug!(
728 session = obj.session_id(),
729 "The logged out session was cleaned up"
730 );
731 }
732 }
733}
734
735glib::wrapper! {
736 pub struct Session(ObjectSubclass<imp::Session>)
738 @extends SessionInfo;
739}
740
741impl Session {
742 pub(crate) async fn new(
744 stored_session: StoredSession,
745 settings: SessionSettings,
746 ) -> Result<Self, ClientSetupError> {
747 let tokens = stored_session
748 .load_tokens()
749 .await
750 .ok_or(ClientSetupError::NoSessionTokens)?;
751
752 let stored_session_clone = stored_session.clone();
753 let client = spawn_tokio!(async move {
754 let client = matrix::client_with_stored_session(stored_session_clone, tokens).await?;
755
756 let media = client.media();
758 let used_media_retention_policy = media.media_retention_policy().await?;
759 let wanted_media_retention_policy = MediaRetentionPolicy::default();
760
761 if used_media_retention_policy != wanted_media_retention_policy {
762 media
763 .set_media_retention_policy(wanted_media_retention_policy)
764 .await?;
765 }
766
767 Ok::<_, ClientSetupError>(client)
768 })
769 .await
770 .expect("task was not aborted")?;
771
772 let obj = glib::Object::builder::<Self>()
773 .property("info", stored_session)
774 .property("settings", settings)
775 .build();
776 obj.imp().set_client(client);
777
778 Ok(obj)
779 }
780
781 pub(crate) async fn create(client: &Client) -> Result<Self, ClientSetupError> {
783 let stored_session = StoredSession::new(client).await?;
784 let settings = Application::default()
785 .session_list()
786 .settings()
787 .get_or_create(&stored_session.id);
788
789 Self::new(stored_session, settings).await
790 }
791
792 pub(crate) async fn prepare(&self) {
794 self.imp().prepare().await;
795 }
796
797 pub(crate) fn room_list(&self) -> RoomList {
799 self.imp().room_list()
800 }
801
802 pub(crate) fn verification_list(&self) -> VerificationList {
804 self.imp().verification_list()
805 }
806
807 pub(crate) fn client(&self) -> Client {
809 self.imp().client().clone()
810 }
811
812 pub(crate) fn remote_cache(&self) -> &RemoteCache {
814 self.imp().remote_cache()
815 }
816
817 pub(crate) async fn log_out(&self) -> Result<(), String> {
819 debug!(
820 session = self.session_id(),
821 "The session is about to be logged out"
822 );
823
824 let client = self.client();
825 let handle = spawn_tokio!(async move { client.logout().await });
826
827 match handle.await.expect("task was not aborted") {
828 Ok(()) => {
829 self.imp().clean_up().await;
830 Ok(())
831 }
832 Err(error) => {
833 error!(
834 session = self.session_id(),
835 "Could not log the session out: {error}"
836 );
837 Err(gettext("Could not log the session out"))
838 }
839 }
840 }
841
842 pub(crate) async fn clean_up(&self) {
847 self.imp().clean_up().await;
848 }
849
850 pub(crate) fn connect_logged_out<F: Fn(&Self) + 'static>(&self, f: F) -> glib::SignalHandlerId {
852 self.connect_state_notify(move |obj| {
853 if obj.state() == SessionState::LoggedOut {
854 f(obj);
855 }
856 })
857 }
858
859 pub(crate) fn connect_ready<F: Fn(&Self) + 'static>(&self, f: F) -> glib::SignalHandlerId {
861 self.connect_state_notify(move |obj| {
862 if obj.state() == SessionState::Ready {
863 f(obj);
864 }
865 })
866 }
867}