1#[cfg(feature = "e2e-encryption")]
17use std::sync::Arc;
18use std::{
19 collections::{BTreeMap, BTreeSet, HashMap},
20 fmt, iter,
21 ops::Deref,
22};
23
24use eyeball::{SharedObservable, Subscriber};
25use eyeball_im::{Vector, VectorDiff};
26use futures_util::Stream;
27#[cfg(feature = "e2e-encryption")]
28use matrix_sdk_crypto::{
29 store::DynCryptoStore, types::requests::ToDeviceRequest, CollectStrategy, DecryptionSettings,
30 EncryptionSettings, EncryptionSyncChanges, OlmError, OlmMachine, RoomEventDecryptionResult,
31 TrustRequirement,
32};
33#[cfg(feature = "e2e-encryption")]
34use ruma::events::{
35 room::{history_visibility::HistoryVisibility, message::MessageType},
36 SyncMessageLikeEvent,
37};
38#[cfg(doc)]
39use ruma::DeviceId;
40use ruma::{
41 api::client::{self as api, sync::sync_events::v5},
42 events::{
43 ignored_user_list::IgnoredUserListEvent,
44 marked_unread::MarkedUnreadEventContent,
45 push_rules::{PushRulesEvent, PushRulesEventContent},
46 room::{
47 member::{MembershipState, RoomMemberEventContent, SyncRoomMemberEvent},
48 power_levels::{
49 RoomPowerLevelsEvent, RoomPowerLevelsEventContent, StrippedRoomPowerLevelsEvent,
50 },
51 },
52 AnyRoomAccountDataEvent, AnyStrippedStateEvent, AnySyncEphemeralRoomEvent,
53 AnySyncMessageLikeEvent, AnySyncStateEvent, AnySyncTimelineEvent,
54 GlobalAccountDataEventType, StateEvent, StateEventType, SyncStateEvent,
55 },
56 push::{Action, PushConditionRoomCtx, Ruleset},
57 serde::Raw,
58 time::Instant,
59 OwnedRoomId, OwnedUserId, RoomId, RoomVersionId, UInt, UserId,
60};
61use tokio::sync::{broadcast, Mutex};
62#[cfg(feature = "e2e-encryption")]
63use tokio::sync::{RwLock, RwLockReadGuard};
64use tracing::{debug, error, info, instrument, trace, warn};
65
66#[cfg(feature = "e2e-encryption")]
67use crate::latest_event::{is_suitable_for_latest_event, LatestEvent, PossibleLatestEvent};
68#[cfg(feature = "e2e-encryption")]
69use crate::RoomMemberships;
70use crate::{
71 deserialized_responses::{DisplayName, RawAnySyncOrStrippedTimelineEvent, TimelineEvent},
72 error::{Error, Result},
73 event_cache::store::EventCacheStoreLock,
74 response_processors::AccountDataProcessor,
75 rooms::{
76 normal::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate},
77 Room, RoomInfo, RoomState,
78 },
79 store::{
80 ambiguity_map::AmbiguityCache, BaseStateStore, DynStateStore, MemoryStore,
81 Result as StoreResult, StateChanges, StateStoreDataKey, StateStoreDataValue, StateStoreExt,
82 StoreConfig,
83 },
84 sync::{JoinedRoomUpdate, LeftRoomUpdate, Notification, RoomUpdates, SyncResponse, Timeline},
85 RoomStateFilter, SessionMeta,
86};
87
88#[derive(Clone)]
102pub struct BaseClient {
103 pub(crate) state_store: BaseStateStore,
105
106 event_cache_store: EventCacheStoreLock,
108
109 #[cfg(feature = "e2e-encryption")]
114 crypto_store: Arc<DynCryptoStore>,
115
116 #[cfg(feature = "e2e-encryption")]
120 olm_machine: Arc<RwLock<Option<OlmMachine>>>,
121
122 pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,
124
125 pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
128
129 #[cfg(feature = "e2e-encryption")]
132 pub room_key_recipient_strategy: CollectStrategy,
133
134 #[cfg(feature = "e2e-encryption")]
136 pub decryption_trust_requirement: TrustRequirement,
137
138 #[cfg(feature = "e2e-encryption")]
140 pub handle_verification_events: bool,
141}
142
143#[cfg(not(tarpaulin_include))]
144impl fmt::Debug for BaseClient {
145 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
146 f.debug_struct("BaseClient")
147 .field("session_meta", &self.state_store.session_meta())
148 .field("sync_token", &self.state_store.sync_token)
149 .finish_non_exhaustive()
150 }
151}
152
153impl BaseClient {
154 pub fn new(config: StoreConfig) -> Self {
161 let store = BaseStateStore::new(config.state_store);
162
163 let (room_info_notable_update_sender, _room_info_notable_update_receiver) =
173 broadcast::channel(500);
174
175 BaseClient {
176 state_store: store,
177 event_cache_store: config.event_cache_store,
178 #[cfg(feature = "e2e-encryption")]
179 crypto_store: config.crypto_store,
180 #[cfg(feature = "e2e-encryption")]
181 olm_machine: Default::default(),
182 ignore_user_list_changes: Default::default(),
183 room_info_notable_update_sender,
184 #[cfg(feature = "e2e-encryption")]
185 room_key_recipient_strategy: Default::default(),
186 #[cfg(feature = "e2e-encryption")]
187 decryption_trust_requirement: TrustRequirement::Untrusted,
188 #[cfg(feature = "e2e-encryption")]
189 handle_verification_events: true,
190 }
191 }
192
193 #[cfg(feature = "e2e-encryption")]
196 pub async fn clone_with_in_memory_state_store(
197 &self,
198 cross_process_store_locks_holder_name: &str,
199 handle_verification_events: bool,
200 ) -> Result<Self> {
201 let config = StoreConfig::new(cross_process_store_locks_holder_name.to_owned())
202 .state_store(MemoryStore::new());
203 let config = config.crypto_store(self.crypto_store.clone());
204
205 let copy = Self {
206 state_store: BaseStateStore::new(config.state_store),
207 event_cache_store: config.event_cache_store,
208 crypto_store: self.crypto_store.clone(),
215 olm_machine: self.olm_machine.clone(),
216 ignore_user_list_changes: Default::default(),
217 room_info_notable_update_sender: self.room_info_notable_update_sender.clone(),
218 room_key_recipient_strategy: self.room_key_recipient_strategy.clone(),
219 decryption_trust_requirement: self.decryption_trust_requirement,
220 handle_verification_events,
221 };
222
223 copy.state_store
224 .derive_from_other(&self.state_store, ©.room_info_notable_update_sender)
225 .await?;
226
227 Ok(copy)
228 }
229
230 #[cfg(not(feature = "e2e-encryption"))]
233 #[allow(clippy::unused_async)]
234 pub async fn clone_with_in_memory_state_store(
235 &self,
236 cross_process_store_locks_holder: &str,
237 _handle_verification_events: bool,
238 ) -> Result<Self> {
239 let config = StoreConfig::new(cross_process_store_locks_holder.to_owned())
240 .state_store(MemoryStore::new());
241 Ok(Self::new(config))
242 }
243
244 pub fn session_meta(&self) -> Option<&SessionMeta> {
250 self.state_store.session_meta()
251 }
252
253 pub fn rooms(&self) -> Vec<Room> {
255 self.state_store.rooms()
256 }
257
258 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
260 self.state_store.rooms_filtered(filter)
261 }
262
263 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
266 self.state_store.rooms_stream()
267 }
268
269 pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
272 self.state_store.get_or_create_room(
273 room_id,
274 room_state,
275 self.room_info_notable_update_sender.clone(),
276 )
277 }
278
279 pub fn state_store(&self) -> &DynStateStore {
281 self.state_store.deref()
282 }
283
284 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
286 &self.event_cache_store
287 }
288
289 pub fn is_active(&self) -> bool {
293 self.state_store.session_meta().is_some()
294 }
295
296 pub async fn activate(
323 &self,
324 session_meta: SessionMeta,
325 #[cfg(feature = "e2e-encryption")] custom_account: Option<
326 crate::crypto::vodozemac::olm::Account,
327 >,
328 ) -> Result<()> {
329 debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Activating the client");
330
331 self.state_store
332 .load_rooms(&session_meta.user_id, &self.room_info_notable_update_sender)
333 .await?;
334 self.state_store.load_sync_token().await?;
335 self.state_store.set_session_meta(session_meta);
336
337 #[cfg(feature = "e2e-encryption")]
338 self.regenerate_olm(custom_account).await?;
339
340 Ok(())
341 }
342
343 #[cfg(feature = "e2e-encryption")]
347 pub async fn regenerate_olm(
348 &self,
349 custom_account: Option<crate::crypto::vodozemac::olm::Account>,
350 ) -> Result<()> {
351 tracing::debug!("regenerating OlmMachine");
352 let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
353
354 let olm_machine = OlmMachine::with_store(
357 &session_meta.user_id,
358 &session_meta.device_id,
359 self.crypto_store.clone(),
360 custom_account,
361 )
362 .await
363 .map_err(OlmError::from)?;
364
365 *self.olm_machine.write().await = Some(olm_machine);
366 Ok(())
367 }
368
369 pub async fn sync_token(&self) -> Option<String> {
372 self.state_store.sync_token.read().await.clone()
373 }
374
375 #[cfg(feature = "e2e-encryption")]
376 async fn handle_verification_event(
377 &self,
378 event: &AnySyncMessageLikeEvent,
379 room_id: &RoomId,
380 ) -> Result<()> {
381 if !self.handle_verification_events {
382 return Ok(());
383 }
384
385 if let Some(olm) = self.olm_machine().await.as_ref() {
386 olm.receive_verification_event(&event.clone().into_full_event(room_id.to_owned()))
387 .await?;
388 }
389
390 Ok(())
391 }
392
393 #[cfg(feature = "e2e-encryption")]
401 async fn decrypt_sync_room_event(
402 &self,
403 event: &Raw<AnySyncTimelineEvent>,
404 room_id: &RoomId,
405 ) -> Result<Option<TimelineEvent>> {
406 let olm = self.olm_machine().await;
407 let Some(olm) = olm.as_ref() else { return Ok(None) };
408
409 let decryption_settings = DecryptionSettings {
410 sender_device_trust_requirement: self.decryption_trust_requirement,
411 };
412
413 let event = match olm
414 .try_decrypt_room_event(event.cast_ref(), room_id, &decryption_settings)
415 .await?
416 {
417 RoomEventDecryptionResult::Decrypted(decrypted) => {
418 let event: TimelineEvent = decrypted.into();
419
420 if let Ok(AnySyncTimelineEvent::MessageLike(e)) = event.raw().deserialize() {
421 match &e {
422 AnySyncMessageLikeEvent::RoomMessage(SyncMessageLikeEvent::Original(
423 original_event,
424 )) => {
425 if let MessageType::VerificationRequest(_) =
426 &original_event.content.msgtype
427 {
428 self.handle_verification_event(&e, room_id).await?;
429 }
430 }
431 _ if e.event_type().to_string().starts_with("m.key.verification") => {
432 self.handle_verification_event(&e, room_id).await?;
433 }
434 _ => (),
435 }
436 }
437 event
438 }
439 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
440 TimelineEvent::new_utd_event(event.clone(), utd_info)
441 }
442 };
443
444 Ok(Some(event))
445 }
446
447 #[allow(clippy::too_many_arguments)]
448 #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
449 pub(crate) async fn handle_timeline(
450 &self,
451 room: &Room,
452 limited: bool,
453 events: Vec<Raw<AnySyncTimelineEvent>>,
454 ignore_state_events: bool,
455 prev_batch: Option<String>,
456 push_rules: &Ruleset,
457 user_ids: &mut BTreeSet<OwnedUserId>,
458 room_info: &mut RoomInfo,
459 changes: &mut StateChanges,
460 notifications: &mut BTreeMap<OwnedRoomId, Vec<Notification>>,
461 ambiguity_cache: &mut AmbiguityCache,
462 ) -> Result<Timeline> {
463 let mut timeline = Timeline::new(limited, prev_batch);
464 let mut push_context = self.get_push_room_context(room, room_info, changes).await?;
465
466 for raw_event in events {
467 let mut event = TimelineEvent::new(raw_event);
470
471 match event.raw().deserialize() {
472 Ok(e) => {
473 #[allow(clippy::single_match)]
474 match &e {
475 AnySyncTimelineEvent::State(s) if !ignore_state_events => {
476 match s {
477 AnySyncStateEvent::RoomMember(member) => {
478 Box::pin(ambiguity_cache.handle_event(
479 changes,
480 room.room_id(),
481 member,
482 ))
483 .await?;
484
485 match member.membership() {
486 MembershipState::Join | MembershipState::Invite => {
487 user_ids.insert(member.state_key().to_owned());
488 }
489 _ => {
490 user_ids.remove(member.state_key());
491 }
492 }
493
494 handle_room_member_event_for_profiles(
495 room.room_id(),
496 member,
497 changes,
498 );
499 }
500 _ => {
501 room_info.handle_state_event(s);
502 }
503 }
504
505 let raw_event: Raw<AnySyncStateEvent> = event.raw().clone().cast();
506 changes.add_state_event(room.room_id(), s.clone(), raw_event);
507 }
508
509 AnySyncTimelineEvent::State(_) => { }
510
511 AnySyncTimelineEvent::MessageLike(
512 AnySyncMessageLikeEvent::RoomRedaction(r),
513 ) => {
514 let room_version =
515 room_info.room_version().unwrap_or(&RoomVersionId::V1);
516
517 if let Some(redacts) = r.redacts(room_version) {
518 room_info.handle_redaction(r, event.raw().cast_ref());
519 let raw_event = event.raw().clone().cast();
520
521 changes.add_redaction(room.room_id(), redacts, raw_event);
522 }
523 }
524
525 #[cfg(feature = "e2e-encryption")]
526 AnySyncTimelineEvent::MessageLike(e) => match e {
527 AnySyncMessageLikeEvent::RoomEncrypted(
528 SyncMessageLikeEvent::Original(_),
529 ) => {
530 if let Some(e) = Box::pin(
531 self.decrypt_sync_room_event(event.raw(), room.room_id()),
532 )
533 .await?
534 {
535 event = e;
536 }
537 }
538 AnySyncMessageLikeEvent::RoomMessage(
539 SyncMessageLikeEvent::Original(original_event),
540 ) => match &original_event.content.msgtype {
541 MessageType::VerificationRequest(_) => {
542 Box::pin(self.handle_verification_event(e, room.room_id()))
543 .await?;
544 }
545 _ => (),
546 },
547 _ if e.event_type().to_string().starts_with("m.key.verification") => {
548 Box::pin(self.handle_verification_event(e, room.room_id())).await?;
549 }
550 _ => (),
551 },
552
553 #[cfg(not(feature = "e2e-encryption"))]
554 AnySyncTimelineEvent::MessageLike(_) => (),
555 }
556
557 if let Some(context) = &mut push_context {
558 self.update_push_room_context(
559 context,
560 room.own_user_id(),
561 room_info,
562 changes,
563 )
564 } else {
565 push_context = self.get_push_room_context(room, room_info, changes).await?;
566 }
567
568 if let Some(context) = &push_context {
569 let actions = push_rules.get_actions(event.raw(), context);
570
571 if actions.iter().any(Action::should_notify) {
572 notifications.entry(room.room_id().to_owned()).or_default().push(
573 Notification {
574 actions: actions.to_owned(),
575 event: RawAnySyncOrStrippedTimelineEvent::Sync(
576 event.raw().clone(),
577 ),
578 },
579 );
580 }
581 event.push_actions = Some(actions.to_owned());
582 }
583 }
584 Err(e) => {
585 warn!("Error deserializing event: {e}");
586 }
587 }
588
589 timeline.events.push(event);
590 }
591
592 Ok(timeline)
593 }
594
595 #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
607 pub(crate) async fn handle_invited_state(
608 &self,
609 room: &Room,
610 events: &[(Raw<AnyStrippedStateEvent>, AnyStrippedStateEvent)],
611 push_rules: &Ruleset,
612 room_info: &mut RoomInfo,
613 changes: &mut StateChanges,
614 notifications: &mut BTreeMap<OwnedRoomId, Vec<Notification>>,
615 ) -> Result<()> {
616 let mut state_events = BTreeMap::new();
617
618 for (raw_event, event) in events {
619 room_info.handle_stripped_state_event(event);
620 state_events
621 .entry(event.event_type())
622 .or_insert_with(BTreeMap::new)
623 .insert(event.state_key().to_owned(), raw_event.clone());
624 }
625
626 changes.stripped_state.insert(room_info.room_id().to_owned(), state_events.clone());
627
628 if let Some(push_context) = self.get_push_room_context(room, room_info, changes).await? {
631 for event in state_events.values().flat_map(|map| map.values()) {
633 let actions = push_rules.get_actions(event, &push_context);
634 if actions.iter().any(Action::should_notify) {
635 notifications.entry(room.room_id().to_owned()).or_default().push(
636 Notification {
637 actions: actions.to_owned(),
638 event: RawAnySyncOrStrippedTimelineEvent::Stripped(event.clone()),
639 },
640 );
641 }
642 }
643 }
644
645 Ok(())
646 }
647
648 #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
654 pub(crate) async fn handle_state(
655 &self,
656 raw_events: &[Raw<AnySyncStateEvent>],
657 events: &[AnySyncStateEvent],
658 room_info: &mut RoomInfo,
659 changes: &mut StateChanges,
660 ambiguity_cache: &mut AmbiguityCache,
661 ) -> StoreResult<BTreeSet<OwnedUserId>> {
662 let mut state_events = BTreeMap::new();
663 let mut user_ids = BTreeSet::new();
664
665 assert_eq!(raw_events.len(), events.len());
666
667 for (raw_event, event) in iter::zip(raw_events, events) {
668 room_info.handle_state_event(event);
669
670 if let AnySyncStateEvent::RoomMember(member) = &event {
671 ambiguity_cache.handle_event(changes, &room_info.room_id, member).await?;
672
673 match member.membership() {
674 MembershipState::Join | MembershipState::Invite => {
675 user_ids.insert(member.state_key().to_owned());
676 }
677 _ => (),
678 }
679
680 handle_room_member_event_for_profiles(&room_info.room_id, member, changes);
681 }
682
683 state_events
684 .entry(event.event_type())
685 .or_insert_with(BTreeMap::new)
686 .insert(event.state_key().to_owned(), raw_event.clone());
687 }
688
689 changes.state.insert((*room_info.room_id).to_owned(), state_events);
690
691 Ok(user_ids)
692 }
693
694 #[instrument(skip_all, fields(?room_id))]
695 pub(crate) async fn handle_room_account_data(
696 &self,
697 room_id: &RoomId,
698 events: &[Raw<AnyRoomAccountDataEvent>],
699 changes: &mut StateChanges,
700 room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
701 ) {
702 fn on_room_info<F>(
707 room_id: &RoomId,
708 changes: &mut StateChanges,
709 client: &BaseClient,
710 mut on_room_info: F,
711 ) where
712 F: FnMut(&mut RoomInfo),
713 {
714 if let Some(room_info) = changes.room_infos.get_mut(room_id) {
716 on_room_info(room_info);
718 }
719 else if let Some(room) = client.state_store.room(room_id) {
721 let mut room_info = room.clone_info();
723
724 on_room_info(&mut room_info);
726
727 changes.add_room(room_info);
729 }
730 }
731
732 fn on_unread_marker(
734 room_id: &RoomId,
735 content: &MarkedUnreadEventContent,
736 room_info: &mut RoomInfo,
737 room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
738 ) {
739 if room_info.base_info.is_marked_unread != content.unread {
740 room_info_notable_updates
743 .entry(room_id.to_owned())
744 .or_default()
745 .insert(RoomInfoNotableUpdateReasons::UNREAD_MARKER);
746 }
747
748 room_info.base_info.is_marked_unread = content.unread;
749 }
750
751 for raw_event in events {
753 match raw_event.deserialize() {
754 Ok(event) => {
755 changes.add_room_account_data(room_id, event.clone(), raw_event.clone());
756
757 match event {
758 AnyRoomAccountDataEvent::MarkedUnread(event) => {
759 on_room_info(room_id, changes, self, |room_info| {
760 on_unread_marker(
761 room_id,
762 &event.content,
763 room_info,
764 room_info_notable_updates,
765 );
766 });
767 }
768 AnyRoomAccountDataEvent::UnstableMarkedUnread(event) => {
769 on_room_info(room_id, changes, self, |room_info| {
770 on_unread_marker(
771 room_id,
772 &event.content.0,
773 room_info,
774 room_info_notable_updates,
775 );
776 });
777 }
778 AnyRoomAccountDataEvent::Tag(event) => {
779 on_room_info(room_id, changes, self, |room_info| {
780 room_info.base_info.handle_notable_tags(&event.content.tags);
781 });
782 }
783
784 _ => {}
786 }
787 }
788
789 Err(err) => {
790 warn!("unable to deserialize account data event: {err}");
791 }
792 }
793 }
794 }
795
796 #[cfg(feature = "e2e-encryption")]
797 #[instrument(skip_all)]
798 pub(crate) async fn preprocess_to_device_events(
799 &self,
800 encryption_sync_changes: EncryptionSyncChanges<'_>,
801 changes: &mut StateChanges,
802 room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
803 ) -> Result<Vec<Raw<ruma::events::AnyToDeviceEvent>>> {
804 if let Some(o) = self.olm_machine().await.as_ref() {
805 let (events, room_key_updates) =
810 o.receive_sync_changes(encryption_sync_changes).await?;
811
812 for room_key_update in room_key_updates {
813 if let Some(room) = self.get_room(&room_key_update.room_id) {
814 self.decrypt_latest_events(&room, changes, room_info_notable_updates).await;
815 }
816 }
817
818 Ok(events)
819 } else {
820 Ok(encryption_sync_changes.to_device_events)
824 }
825 }
826
827 #[cfg(feature = "e2e-encryption")]
832 async fn decrypt_latest_events(
833 &self,
834 room: &Room,
835 changes: &mut StateChanges,
836 room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
837 ) {
838 if let Some((found, found_index)) = self.decrypt_latest_suitable_event(room).await {
842 room.on_latest_event_decrypted(found, found_index, changes, room_info_notable_updates);
843 }
844 }
845
846 #[cfg(feature = "e2e-encryption")]
853 async fn decrypt_latest_suitable_event(
854 &self,
855 room: &Room,
856 ) -> Option<(Box<LatestEvent>, usize)> {
857 let enc_events = room.latest_encrypted_events();
858 let power_levels = room.power_levels().await.ok();
859 let power_levels_info = Some(room.own_user_id()).zip(power_levels.as_ref());
860
861 for (i, event) in enc_events.iter().enumerate().rev() {
863 let decrypt_sync_room_event =
867 Box::pin(self.decrypt_sync_room_event(event, room.room_id()));
868
869 if let Ok(Some(decrypted)) = decrypt_sync_room_event.await {
870 if let Ok(any_sync_event) = decrypted.raw().deserialize() {
872 match is_suitable_for_latest_event(&any_sync_event, power_levels_info) {
874 PossibleLatestEvent::YesRoomMessage(_)
875 | PossibleLatestEvent::YesPoll(_)
876 | PossibleLatestEvent::YesCallInvite(_)
877 | PossibleLatestEvent::YesCallNotify(_)
878 | PossibleLatestEvent::YesSticker(_)
879 | PossibleLatestEvent::YesKnockedStateEvent(_) => {
880 return Some((Box::new(LatestEvent::new(decrypted)), i));
881 }
882 _ => (),
883 }
884 }
885 }
886 }
887 None
888 }
889
890 pub async fn room_knocked(&self, room_id: &RoomId) -> Result<Room> {
894 let room = self.state_store.get_or_create_room(
895 room_id,
896 RoomState::Knocked,
897 self.room_info_notable_update_sender.clone(),
898 );
899
900 if room.state() != RoomState::Knocked {
901 let _sync_lock = self.sync_lock().lock().await;
902
903 let mut room_info = room.clone_info();
904 room_info.mark_as_knocked();
905 room_info.mark_state_partially_synced();
906 room_info.mark_members_missing(); let mut changes = StateChanges::default();
908 changes.add_room(room_info.clone());
909 self.state_store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
911 }
912
913 Ok(room)
914 }
915
916 pub async fn room_joined(&self, room_id: &RoomId) -> Result<Room> {
920 let room = self.state_store.get_or_create_room(
921 room_id,
922 RoomState::Joined,
923 self.room_info_notable_update_sender.clone(),
924 );
925
926 if room.state() != RoomState::Joined {
927 let _sync_lock = self.sync_lock().lock().await;
928
929 let mut room_info = room.clone_info();
930 room_info.mark_as_joined();
931 room_info.mark_state_partially_synced();
932 room_info.mark_members_missing(); let mut changes = StateChanges::default();
934 changes.add_room(room_info.clone());
935 self.state_store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
937 }
938
939 Ok(room)
940 }
941
942 pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
946 let room = self.state_store.get_or_create_room(
947 room_id,
948 RoomState::Left,
949 self.room_info_notable_update_sender.clone(),
950 );
951
952 if room.state() != RoomState::Left {
953 let _sync_lock = self.sync_lock().lock().await;
954
955 let mut room_info = room.clone_info();
956 room_info.mark_as_left();
957 room_info.mark_state_partially_synced();
958 room_info.mark_members_missing(); let mut changes = StateChanges::default();
960 changes.add_room(room_info.clone());
961 self.state_store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
963 }
964
965 Ok(())
966 }
967
968 pub fn sync_lock(&self) -> &Mutex<()> {
970 self.state_store.sync_lock()
971 }
972
973 #[instrument(skip_all)]
979 pub async fn receive_sync_response(
980 &self,
981 response: api::sync::sync_events::v3::Response,
982 ) -> Result<SyncResponse> {
983 self.receive_sync_response_with_requested_required_states(
984 response,
985 &RequestedRequiredStates::default(),
986 )
987 .await
988 }
989
990 pub async fn receive_sync_response_with_requested_required_states(
998 &self,
999 response: api::sync::sync_events::v3::Response,
1000 requested_required_states: &RequestedRequiredStates,
1001 ) -> Result<SyncResponse> {
1002 if self.state_store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
1006 info!("Got the same sync response twice");
1007 return Ok(SyncResponse::default());
1008 }
1009
1010 let now = Instant::now();
1011 let mut changes = Box::new(StateChanges::new(response.next_batch.clone()));
1012
1013 #[cfg_attr(not(feature = "e2e-encryption"), allow(unused_mut))]
1014 let mut room_info_notable_updates =
1015 BTreeMap::<OwnedRoomId, RoomInfoNotableUpdateReasons>::new();
1016
1017 #[cfg(feature = "e2e-encryption")]
1018 let to_device = self
1019 .preprocess_to_device_events(
1020 EncryptionSyncChanges {
1021 to_device_events: response.to_device.events,
1022 changed_devices: &response.device_lists,
1023 one_time_keys_counts: &response.device_one_time_keys_count,
1024 unused_fallback_keys: response.device_unused_fallback_key_types.as_deref(),
1025 next_batch_token: Some(response.next_batch.clone()),
1026 },
1027 &mut changes,
1028 &mut room_info_notable_updates,
1029 )
1030 .await?;
1031
1032 #[cfg(not(feature = "e2e-encryption"))]
1033 let to_device = response.to_device.events;
1034
1035 let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.clone());
1036
1037 let account_data_processor = AccountDataProcessor::process(&response.account_data.events);
1038
1039 let push_rules = self.get_push_rules(&account_data_processor).await?;
1040
1041 let mut new_rooms = RoomUpdates::default();
1042 let mut notifications = Default::default();
1043
1044 let mut updated_members_in_room: BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>> =
1045 BTreeMap::new();
1046
1047 for (room_id, new_info) in response.rooms.join {
1048 let room = self.state_store.get_or_create_room(
1049 &room_id,
1050 RoomState::Joined,
1051 self.room_info_notable_update_sender.clone(),
1052 );
1053
1054 let mut room_info = room.clone_info();
1055
1056 room_info.mark_as_joined();
1057 room_info.update_from_ruma_summary(&new_info.summary);
1058 room_info.set_prev_batch(new_info.timeline.prev_batch.as_deref());
1059 room_info.mark_state_fully_synced();
1060 room_info.handle_encryption_state(requested_required_states.for_room(&room_id));
1061
1062 let state_events = Self::deserialize_state_events(&new_info.state.events);
1063 let (raw_state_events, state_events): (Vec<_>, Vec<_>) =
1064 state_events.into_iter().unzip();
1065
1066 let mut user_ids = self
1067 .handle_state(
1068 &raw_state_events,
1069 &state_events,
1070 &mut room_info,
1071 &mut changes,
1072 &mut ambiguity_cache,
1073 )
1074 .await?;
1075
1076 updated_members_in_room.insert(room_id.to_owned(), user_ids.clone());
1077
1078 for raw in &new_info.ephemeral.events {
1079 match raw.deserialize() {
1080 Ok(AnySyncEphemeralRoomEvent::Receipt(event)) => {
1081 changes.add_receipts(&room_id, event.content);
1082 }
1083 Ok(_) => {}
1084 Err(e) => {
1085 let event_id: Option<String> = raw.get_field("event_id").ok().flatten();
1086 #[rustfmt::skip]
1087 info!(
1088 ?room_id, event_id,
1089 "Failed to deserialize ephemeral room event: {e}"
1090 );
1091 }
1092 }
1093 }
1094
1095 if new_info.timeline.limited {
1096 room_info.mark_members_missing();
1097 }
1098
1099 let timeline = self
1100 .handle_timeline(
1101 &room,
1102 new_info.timeline.limited,
1103 new_info.timeline.events,
1104 false,
1105 new_info.timeline.prev_batch,
1106 &push_rules,
1107 &mut user_ids,
1108 &mut room_info,
1109 &mut changes,
1110 &mut notifications,
1111 &mut ambiguity_cache,
1112 )
1113 .await?;
1114
1115 changes.add_room(room_info);
1117
1118 self.handle_room_account_data(
1119 &room_id,
1120 &new_info.account_data.events,
1121 &mut changes,
1122 &mut Default::default(),
1123 )
1124 .await;
1125
1126 let mut room_info = changes.room_infos.get(&room_id).unwrap().clone();
1132
1133 #[cfg(feature = "e2e-encryption")]
1134 if room_info.encryption_state().is_encrypted() {
1135 if let Some(o) = self.olm_machine().await.as_ref() {
1136 if !room.encryption_state().is_encrypted() {
1137 let user_ids = self
1141 .state_store
1142 .get_user_ids(&room_id, RoomMemberships::ACTIVE)
1143 .await?;
1144 o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?
1145 }
1146
1147 o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?;
1148 }
1149 }
1150
1151 let notification_count = new_info.unread_notifications.into();
1152 room_info.update_notification_count(notification_count);
1153
1154 let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default();
1155
1156 new_rooms.join.insert(
1157 room_id,
1158 JoinedRoomUpdate::new(
1159 timeline,
1160 new_info.state.events,
1161 new_info.account_data.events,
1162 new_info.ephemeral.events,
1163 notification_count,
1164 ambiguity_changes,
1165 ),
1166 );
1167
1168 changes.add_room(room_info);
1169 }
1170
1171 for (room_id, new_info) in response.rooms.leave {
1172 let room = self.state_store.get_or_create_room(
1173 &room_id,
1174 RoomState::Left,
1175 self.room_info_notable_update_sender.clone(),
1176 );
1177
1178 let mut room_info = room.clone_info();
1179 room_info.mark_as_left();
1180 room_info.mark_state_partially_synced();
1181 room_info.handle_encryption_state(requested_required_states.for_room(&room_id));
1182
1183 let state_events = Self::deserialize_state_events(&new_info.state.events);
1184 let (raw_state_events, state_events): (Vec<_>, Vec<_>) =
1185 state_events.into_iter().unzip();
1186
1187 let mut user_ids = self
1188 .handle_state(
1189 &raw_state_events,
1190 &state_events,
1191 &mut room_info,
1192 &mut changes,
1193 &mut ambiguity_cache,
1194 )
1195 .await?;
1196
1197 let timeline = self
1198 .handle_timeline(
1199 &room,
1200 new_info.timeline.limited,
1201 new_info.timeline.events,
1202 false,
1203 new_info.timeline.prev_batch,
1204 &push_rules,
1205 &mut user_ids,
1206 &mut room_info,
1207 &mut changes,
1208 &mut notifications,
1209 &mut ambiguity_cache,
1210 )
1211 .await?;
1212
1213 changes.add_room(room_info);
1215
1216 self.handle_room_account_data(
1217 &room_id,
1218 &new_info.account_data.events,
1219 &mut changes,
1220 &mut Default::default(),
1221 )
1222 .await;
1223
1224 let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default();
1225
1226 new_rooms.leave.insert(
1227 room_id,
1228 LeftRoomUpdate::new(
1229 timeline,
1230 new_info.state.events,
1231 new_info.account_data.events,
1232 ambiguity_changes,
1233 ),
1234 );
1235 }
1236
1237 for (room_id, new_info) in response.rooms.invite {
1238 let room = self.state_store.get_or_create_room(
1239 &room_id,
1240 RoomState::Invited,
1241 self.room_info_notable_update_sender.clone(),
1242 );
1243
1244 let invite_state =
1245 Self::deserialize_stripped_state_events(&new_info.invite_state.events);
1246
1247 let mut room_info = room.clone_info();
1248 room_info.mark_as_invited();
1249 room_info.mark_state_fully_synced();
1250
1251 self.handle_invited_state(
1252 &room,
1253 &invite_state,
1254 &push_rules,
1255 &mut room_info,
1256 &mut changes,
1257 &mut notifications,
1258 )
1259 .await?;
1260
1261 changes.add_room(room_info);
1262
1263 new_rooms.invite.insert(room_id, new_info);
1264 }
1265
1266 for (room_id, new_info) in response.rooms.knock {
1267 let room = self.state_store.get_or_create_room(
1268 &room_id,
1269 RoomState::Knocked,
1270 self.room_info_notable_update_sender.clone(),
1271 );
1272
1273 let knock_state = Self::deserialize_stripped_state_events(&new_info.knock_state.events);
1274
1275 let mut room_info = room.clone_info();
1276 room_info.mark_as_knocked();
1277 room_info.mark_state_fully_synced();
1278
1279 self.handle_invited_state(
1280 &room,
1281 &knock_state,
1282 &push_rules,
1283 &mut room_info,
1284 &mut changes,
1285 &mut notifications,
1286 )
1287 .await?;
1288
1289 changes.add_room(room_info);
1290
1291 new_rooms.knocked.insert(room_id, new_info);
1292 }
1293
1294 account_data_processor.apply(&mut changes, &self.state_store).await;
1295
1296 changes.presence = response
1297 .presence
1298 .events
1299 .iter()
1300 .filter_map(|e| {
1301 let event = e.deserialize().ok()?;
1302 Some((event.sender, e.clone()))
1303 })
1304 .collect();
1305
1306 changes.ambiguity_maps = ambiguity_cache.cache;
1307
1308 {
1309 let _sync_lock = self.sync_lock().lock().await;
1310 let prev_ignored_user_list = self.load_previous_ignored_user_list().await;
1311 self.state_store.save_changes(&changes).await?;
1312 *self.state_store.sync_token.write().await = Some(response.next_batch.clone());
1313 self.apply_changes(&changes, room_info_notable_updates, prev_ignored_user_list);
1314 }
1315
1316 new_rooms.update_in_memory_caches(&self.state_store).await;
1322
1323 for (room_id, member_ids) in updated_members_in_room {
1324 if let Some(room) = self.get_room(&room_id) {
1325 let _ =
1326 room.room_member_updates_sender.send(RoomMembersUpdate::Partial(member_ids));
1327 }
1328 }
1329
1330 info!("Processed a sync response in {:?}", now.elapsed());
1331
1332 let response = SyncResponse {
1333 rooms: new_rooms,
1334 presence: response.presence.events,
1335 account_data: response.account_data.events,
1336 to_device,
1337 notifications,
1338 };
1339
1340 Ok(response)
1341 }
1342
1343 pub(crate) async fn load_previous_ignored_user_list(
1344 &self,
1345 ) -> Option<Raw<IgnoredUserListEvent>> {
1346 self.state_store().get_account_data_event_static().await.ok().flatten()
1347 }
1348
1349 pub(crate) fn apply_changes(
1350 &self,
1351 changes: &StateChanges,
1352 room_info_notable_updates: BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
1353 prev_ignored_user_list: Option<Raw<IgnoredUserListEvent>>,
1354 ) {
1355 if let Some(event) = changes.account_data.get(&GlobalAccountDataEventType::IgnoredUserList)
1356 {
1357 match event.deserialize_as::<IgnoredUserListEvent>() {
1358 Ok(event) => {
1359 let user_ids: Vec<String> =
1360 event.content.ignored_users.keys().map(|id| id.to_string()).collect();
1361
1362 if let Some(prev_user_ids) =
1366 prev_ignored_user_list.and_then(|raw| raw.deserialize().ok()).map(|event| {
1367 event
1368 .content
1369 .ignored_users
1370 .keys()
1371 .map(|id| id.to_string())
1372 .collect::<Vec<_>>()
1373 })
1374 {
1375 if user_ids != prev_user_ids {
1376 self.ignore_user_list_changes.set(user_ids);
1377 }
1378 } else {
1379 self.ignore_user_list_changes.set(user_ids);
1380 }
1381 }
1382
1383 Err(error) => {
1384 error!("Failed to deserialize ignored user list event: {error}")
1385 }
1386 }
1387 }
1388
1389 for (room_id, room_info) in &changes.room_infos {
1390 if let Some(room) = self.state_store.room(room_id) {
1391 let room_info_notable_update_reasons =
1392 room_info_notable_updates.get(room_id).copied().unwrap_or_default();
1393
1394 room.set_room_info(room_info.clone(), room_info_notable_update_reasons)
1395 }
1396 }
1397 }
1398
1399 #[instrument(skip_all, fields(?room_id))]
1411 pub async fn receive_all_members(
1412 &self,
1413 room_id: &RoomId,
1414 request: &api::membership::get_member_events::v3::Request,
1415 response: &api::membership::get_member_events::v3::Response,
1416 ) -> Result<()> {
1417 if request.membership.is_some() || request.not_membership.is_some() || request.at.is_some()
1418 {
1419 return Err(Error::InvalidReceiveMembersParameters);
1423 }
1424
1425 let Some(room) = self.state_store.room(room_id) else {
1426 return Ok(());
1428 };
1429
1430 let mut chunk = Vec::with_capacity(response.chunk.len());
1431 let mut changes = StateChanges::default();
1432
1433 #[cfg(feature = "e2e-encryption")]
1434 let mut user_ids = BTreeSet::new();
1435
1436 let mut ambiguity_map: HashMap<DisplayName, BTreeSet<OwnedUserId>> = Default::default();
1437
1438 for raw_event in &response.chunk {
1439 let member = match raw_event.deserialize() {
1440 Ok(ev) => ev,
1441 Err(e) => {
1442 let event_id: Option<String> = raw_event.get_field("event_id").ok().flatten();
1443 debug!(event_id, "Failed to deserialize member event: {e}");
1444 continue;
1445 }
1446 };
1447
1448 #[cfg(feature = "e2e-encryption")]
1458 match member.membership() {
1459 MembershipState::Join | MembershipState::Invite => {
1460 user_ids.insert(member.state_key().to_owned());
1461 }
1462 _ => (),
1463 }
1464
1465 if let StateEvent::Original(e) = &member {
1466 if let Some(d) = &e.content.displayname {
1467 let display_name = DisplayName::new(d);
1468 ambiguity_map
1469 .entry(display_name)
1470 .or_default()
1471 .insert(member.state_key().clone());
1472 }
1473 }
1474
1475 let sync_member: SyncRoomMemberEvent = member.clone().into();
1476 handle_room_member_event_for_profiles(room_id, &sync_member, &mut changes);
1477
1478 changes
1479 .state
1480 .entry(room_id.to_owned())
1481 .or_default()
1482 .entry(member.event_type())
1483 .or_default()
1484 .insert(member.state_key().to_string(), raw_event.clone().cast());
1485 chunk.push(member);
1486 }
1487
1488 #[cfg(feature = "e2e-encryption")]
1489 if room.encryption_state().is_encrypted() {
1490 if let Some(o) = self.olm_machine().await.as_ref() {
1491 o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?
1492 }
1493 }
1494
1495 changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);
1496
1497 let _sync_lock = self.sync_lock().lock().await;
1498 let mut room_info = room.clone_info();
1499 room_info.mark_members_synced();
1500 changes.add_room(room_info);
1501
1502 let prev_ignored_user_list = self.load_previous_ignored_user_list().await;
1503 self.state_store.save_changes(&changes).await?;
1504 self.apply_changes(&changes, Default::default(), prev_ignored_user_list);
1505
1506 let _ = room.room_member_updates_sender.send(RoomMembersUpdate::FullReload);
1507
1508 Ok(())
1509 }
1510
1511 pub async fn receive_filter_upload(
1527 &self,
1528 filter_name: &str,
1529 response: &api::filter::create_filter::v3::Response,
1530 ) -> Result<()> {
1531 Ok(self
1532 .state_store
1533 .set_kv_data(
1534 StateStoreDataKey::Filter(filter_name),
1535 StateStoreDataValue::Filter(response.filter_id.clone()),
1536 )
1537 .await?)
1538 }
1539
1540 pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
1552 let filter = self
1553 .state_store
1554 .get_kv_data(StateStoreDataKey::Filter(filter_name))
1555 .await?
1556 .map(|d| d.into_filter().expect("State store data not a filter"));
1557
1558 Ok(filter)
1559 }
1560
1561 #[cfg(feature = "e2e-encryption")]
1563 pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
1564 match self.olm_machine().await.as_ref() {
1565 Some(o) => {
1566 let Some(room) = self.get_room(room_id) else {
1567 return Err(Error::InsufficientData);
1568 };
1569
1570 let history_visibility = room.history_visibility_or_default();
1571 let Some(room_encryption_event) = room.encryption_settings() else {
1572 return Err(Error::EncryptionNotEnabled);
1573 };
1574
1575 let filter = if history_visibility == HistoryVisibility::Joined {
1578 RoomMemberships::JOIN
1579 } else {
1580 RoomMemberships::ACTIVE
1581 };
1582
1583 let members = self.state_store.get_user_ids(room_id, filter).await?;
1584
1585 let settings = EncryptionSettings::new(
1586 room_encryption_event,
1587 history_visibility,
1588 self.room_key_recipient_strategy.clone(),
1589 );
1590
1591 Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
1592 }
1593 None => panic!("Olm machine wasn't started"),
1594 }
1595 }
1596
1597 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1603 self.state_store.room(room_id)
1604 }
1605
1606 pub async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
1614 self.state_store.forget_room(room_id).await?;
1616
1617 self.event_cache_store().lock().await?.remove_room(room_id).await?;
1619
1620 Ok(())
1621 }
1622
1623 #[cfg(feature = "e2e-encryption")]
1625 pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
1626 self.olm_machine.read().await
1627 }
1628
1629 pub(crate) async fn get_push_rules(
1635 &self,
1636 account_data_processor: &AccountDataProcessor,
1637 ) -> Result<Ruleset> {
1638 if let Some(event) = account_data_processor
1639 .push_rules()
1640 .and_then(|ev| ev.deserialize_as::<PushRulesEvent>().ok())
1641 {
1642 Ok(event.content.global)
1643 } else if let Some(event) = self
1644 .state_store
1645 .get_account_data_event_static::<PushRulesEventContent>()
1646 .await?
1647 .and_then(|ev| ev.deserialize().ok())
1648 {
1649 Ok(event.content.global)
1650 } else if let Some(session_meta) = self.state_store.session_meta() {
1651 Ok(Ruleset::server_default(&session_meta.user_id))
1652 } else {
1653 Ok(Ruleset::new())
1654 }
1655 }
1656
1657 pub async fn get_push_room_context(
1665 &self,
1666 room: &Room,
1667 room_info: &RoomInfo,
1668 changes: &StateChanges,
1669 ) -> Result<Option<PushConditionRoomCtx>> {
1670 let room_id = room.room_id();
1671 let user_id = room.own_user_id();
1672
1673 let member_count = room_info.active_members_count();
1674
1675 let user_display_name = if let Some(AnySyncStateEvent::RoomMember(member)) =
1677 changes.state.get(room_id).and_then(|events| {
1678 events.get(&StateEventType::RoomMember)?.get(user_id.as_str())?.deserialize().ok()
1679 }) {
1680 member
1681 .as_original()
1682 .and_then(|ev| ev.content.displayname.clone())
1683 .unwrap_or_else(|| user_id.localpart().to_owned())
1684 } else if let Some(AnyStrippedStateEvent::RoomMember(member)) =
1685 changes.stripped_state.get(room_id).and_then(|events| {
1686 events.get(&StateEventType::RoomMember)?.get(user_id.as_str())?.deserialize().ok()
1687 })
1688 {
1689 member.content.displayname.unwrap_or_else(|| user_id.localpart().to_owned())
1690 } else if let Some(member) = Box::pin(room.get_member(user_id)).await? {
1691 member.name().to_owned()
1692 } else {
1693 trace!("Couldn't get push context because of missing own member information");
1694 return Ok(None);
1695 };
1696
1697 let power_levels = if let Some(event) = changes.state.get(room_id).and_then(|types| {
1698 types
1699 .get(&StateEventType::RoomPowerLevels)?
1700 .get("")?
1701 .deserialize_as::<RoomPowerLevelsEvent>()
1702 .ok()
1703 }) {
1704 Some(event.power_levels().into())
1705 } else if let Some(event) = changes.stripped_state.get(room_id).and_then(|types| {
1706 types
1707 .get(&StateEventType::RoomPowerLevels)?
1708 .get("")?
1709 .deserialize_as::<StrippedRoomPowerLevelsEvent>()
1710 .ok()
1711 }) {
1712 Some(event.power_levels().into())
1713 } else {
1714 self.state_store
1715 .get_state_event_static::<RoomPowerLevelsEventContent>(room_id)
1716 .await?
1717 .and_then(|e| e.deserialize().ok())
1718 .map(|event| event.power_levels().into())
1719 };
1720
1721 Ok(Some(PushConditionRoomCtx {
1722 user_id: user_id.to_owned(),
1723 room_id: room_id.to_owned(),
1724 member_count: UInt::new(member_count).unwrap_or(UInt::MAX),
1725 user_display_name,
1726 power_levels,
1727 }))
1728 }
1729
1730 pub fn update_push_room_context(
1734 &self,
1735 push_rules: &mut PushConditionRoomCtx,
1736 user_id: &UserId,
1737 room_info: &RoomInfo,
1738 changes: &StateChanges,
1739 ) {
1740 let room_id = &*room_info.room_id;
1741
1742 push_rules.member_count = UInt::new(room_info.active_members_count()).unwrap_or(UInt::MAX);
1743
1744 if let Some(AnySyncStateEvent::RoomMember(member)) =
1746 changes.state.get(room_id).and_then(|events| {
1747 events.get(&StateEventType::RoomMember)?.get(user_id.as_str())?.deserialize().ok()
1748 })
1749 {
1750 push_rules.user_display_name = member
1751 .as_original()
1752 .and_then(|ev| ev.content.displayname.clone())
1753 .unwrap_or_else(|| user_id.localpart().to_owned())
1754 }
1755
1756 if let Some(AnySyncStateEvent::RoomPowerLevels(event)) =
1757 changes.state.get(room_id).and_then(|types| {
1758 types.get(&StateEventType::RoomPowerLevels)?.get("")?.deserialize().ok()
1759 })
1760 {
1761 push_rules.power_levels = Some(event.power_levels().into());
1762 }
1763 }
1764
1765 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
1768 self.ignore_user_list_changes.subscribe()
1769 }
1770
1771 pub(crate) fn deserialize_state_events(
1772 raw_events: &[Raw<AnySyncStateEvent>],
1773 ) -> Vec<(Raw<AnySyncStateEvent>, AnySyncStateEvent)> {
1774 raw_events
1775 .iter()
1776 .filter_map(|raw_event| match raw_event.deserialize() {
1777 Ok(event) => Some((raw_event.clone(), event)),
1778 Err(e) => {
1779 warn!("Couldn't deserialize state event: {e}");
1780 None
1781 }
1782 })
1783 .collect()
1784 }
1785
1786 pub(crate) fn deserialize_stripped_state_events(
1787 raw_events: &[Raw<AnyStrippedStateEvent>],
1788 ) -> Vec<(Raw<AnyStrippedStateEvent>, AnyStrippedStateEvent)> {
1789 raw_events
1790 .iter()
1791 .filter_map(|raw_event| match raw_event.deserialize() {
1792 Ok(event) => Some((raw_event.clone(), event)),
1793 Err(e) => {
1794 warn!("Couldn't deserialize stripped state event: {e}");
1795 None
1796 }
1797 })
1798 .collect()
1799 }
1800
1801 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
1805 self.room_info_notable_update_sender.subscribe()
1806 }
1807}
1808
1809fn handle_room_member_event_for_profiles(
1810 room_id: &RoomId,
1811 event: &SyncStateEvent<RoomMemberEventContent>,
1812 changes: &mut StateChanges,
1813) {
1814 if event.state_key() == event.sender() {
1818 changes
1819 .profiles
1820 .entry(room_id.to_owned())
1821 .or_default()
1822 .insert(event.sender().to_owned(), event.into());
1823 }
1824
1825 if *event.membership() == MembershipState::Invite {
1826 changes
1833 .profiles_to_delete
1834 .entry(room_id.to_owned())
1835 .or_default()
1836 .push(event.state_key().clone());
1837 }
1838}
1839
1840#[derive(Debug, Default)]
1852pub struct RequestedRequiredStates {
1853 default: Vec<(StateEventType, String)>,
1854 for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1855}
1856
1857impl RequestedRequiredStates {
1858 pub fn new(
1863 default: Vec<(StateEventType, String)>,
1864 for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1865 ) -> Self {
1866 Self { default, for_rooms }
1867 }
1868
1869 pub fn for_room(&self, room_id: &RoomId) -> &[(StateEventType, String)] {
1871 self.for_rooms.get(room_id).unwrap_or(&self.default)
1872 }
1873}
1874
1875impl From<&v5::Request> for RequestedRequiredStates {
1876 fn from(request: &v5::Request) -> Self {
1877 let mut default = BTreeSet::new();
1884
1885 for list in request.lists.values() {
1886 default.extend(BTreeSet::from_iter(list.room_details.required_state.iter().cloned()));
1887 }
1888
1889 for room_subscription in request.room_subscriptions.values() {
1890 default.extend(BTreeSet::from_iter(room_subscription.required_state.iter().cloned()));
1891 }
1892
1893 Self { default: default.into_iter().collect(), for_rooms: HashMap::new() }
1894 }
1895}
1896
1897#[cfg(test)]
1898mod tests {
1899 use std::collections::HashMap;
1900
1901 use assert_matches2::assert_let;
1902 use futures_util::FutureExt as _;
1903 use matrix_sdk_test::{
1904 async_test, event_factory::EventFactory, ruma_response_from_json, InvitedRoomBuilder,
1905 LeftRoomBuilder, StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder, BOB,
1906 };
1907 use ruma::{
1908 api::client::{self as api, sync::sync_events::v5},
1909 event_id,
1910 events::{room::member::MembershipState, StateEventType},
1911 room_id,
1912 serde::Raw,
1913 user_id,
1914 };
1915 use serde_json::{json, value::to_raw_value};
1916
1917 use super::{BaseClient, RequestedRequiredStates};
1918 use crate::{
1919 store::{StateStoreExt, StoreConfig},
1920 test_utils::logged_in_base_client,
1921 RoomDisplayName, RoomState, SessionMeta,
1922 };
1923
1924 #[test]
1925 fn test_requested_required_states() {
1926 let room_id_0 = room_id!("!r0");
1927 let room_id_1 = room_id!("!r1");
1928
1929 let requested_required_states = RequestedRequiredStates::new(
1930 vec![(StateEventType::RoomAvatar, "".to_owned())],
1931 HashMap::from([(
1932 room_id_0.to_owned(),
1933 vec![
1934 (StateEventType::RoomMember, "foo".to_owned()),
1935 (StateEventType::RoomEncryption, "".to_owned()),
1936 ],
1937 )]),
1938 );
1939
1940 assert_eq!(
1942 requested_required_states.for_room(room_id_0),
1943 &[
1944 (StateEventType::RoomMember, "foo".to_owned()),
1945 (StateEventType::RoomEncryption, "".to_owned()),
1946 ]
1947 );
1948
1949 assert_eq!(
1951 requested_required_states.for_room(room_id_1),
1952 &[(StateEventType::RoomAvatar, "".to_owned()),]
1953 );
1954 }
1955
1956 #[test]
1957 fn test_requested_required_states_from_sync_v5_request() {
1958 let room_id_0 = room_id!("!r0");
1959 let room_id_1 = room_id!("!r1");
1960
1961 let mut request = v5::Request::new();
1963
1964 {
1965 let requested_required_states = RequestedRequiredStates::from(&request);
1966
1967 assert!(requested_required_states.default.is_empty());
1968 assert!(requested_required_states.for_rooms.is_empty());
1969 }
1970
1971 request.lists.insert("foo".to_owned(), {
1973 let mut list = v5::request::List::default();
1974 list.room_details.required_state = vec![
1975 (StateEventType::RoomAvatar, "".to_owned()),
1976 (StateEventType::RoomEncryption, "".to_owned()),
1977 ];
1978
1979 list
1980 });
1981
1982 {
1983 let requested_required_states = RequestedRequiredStates::from(&request);
1984
1985 assert_eq!(
1986 requested_required_states.default,
1987 &[
1988 (StateEventType::RoomAvatar, "".to_owned()),
1989 (StateEventType::RoomEncryption, "".to_owned())
1990 ]
1991 );
1992 assert!(requested_required_states.for_rooms.is_empty());
1993 }
1994
1995 request.lists.insert("bar".to_owned(), {
1997 let mut list = v5::request::List::default();
1998 list.room_details.required_state = vec![
1999 (StateEventType::RoomEncryption, "".to_owned()),
2000 (StateEventType::RoomName, "".to_owned()),
2001 ];
2002
2003 list
2004 });
2005
2006 {
2007 let requested_required_states = RequestedRequiredStates::from(&request);
2008
2009 assert_eq!(
2011 requested_required_states.default,
2012 &[
2013 (StateEventType::RoomAvatar, "".to_owned()),
2014 (StateEventType::RoomEncryption, "".to_owned()),
2015 (StateEventType::RoomName, "".to_owned()),
2016 ]
2017 );
2018 assert!(requested_required_states.for_rooms.is_empty());
2019 }
2020
2021 request.room_subscriptions.insert(room_id_0.to_owned(), {
2023 let mut room_subscription = v5::request::RoomSubscription::default();
2024
2025 room_subscription.required_state = vec![
2026 (StateEventType::RoomJoinRules, "".to_owned()),
2027 (StateEventType::RoomEncryption, "".to_owned()),
2028 ];
2029
2030 room_subscription
2031 });
2032
2033 {
2034 let requested_required_states = RequestedRequiredStates::from(&request);
2035
2036 assert_eq!(
2038 requested_required_states.default,
2039 &[
2040 (StateEventType::RoomAvatar, "".to_owned()),
2041 (StateEventType::RoomEncryption, "".to_owned()),
2042 (StateEventType::RoomJoinRules, "".to_owned()),
2043 (StateEventType::RoomName, "".to_owned()),
2044 ]
2045 );
2046 assert!(requested_required_states.for_rooms.is_empty());
2047 }
2048
2049 request.room_subscriptions.insert(room_id_1.to_owned(), {
2051 let mut room_subscription = v5::request::RoomSubscription::default();
2052
2053 room_subscription.required_state = vec![
2054 (StateEventType::RoomName, "".to_owned()),
2055 (StateEventType::RoomTopic, "".to_owned()),
2056 ];
2057
2058 room_subscription
2059 });
2060
2061 {
2062 let requested_required_states = RequestedRequiredStates::from(&request);
2063
2064 assert_eq!(
2066 requested_required_states.default,
2067 &[
2068 (StateEventType::RoomAvatar, "".to_owned()),
2069 (StateEventType::RoomEncryption, "".to_owned()),
2070 (StateEventType::RoomJoinRules, "".to_owned()),
2071 (StateEventType::RoomName, "".to_owned()),
2072 (StateEventType::RoomTopic, "".to_owned()),
2073 ]
2074 );
2075 }
2076 }
2077
2078 #[async_test]
2079 async fn test_invite_after_leaving() {
2080 let user_id = user_id!("@alice:example.org");
2081 let room_id = room_id!("!test:example.org");
2082
2083 let client = logged_in_base_client(Some(user_id)).await;
2084
2085 let mut sync_builder = SyncResponseBuilder::new();
2086
2087 let response = sync_builder
2088 .add_left_room(
2089 LeftRoomBuilder::new(room_id).add_timeline_event(
2090 EventFactory::new()
2091 .member(user_id)
2092 .membership(MembershipState::Leave)
2093 .display_name("Alice")
2094 .event_id(event_id!("$994173582443PhrSn:example.org")),
2095 ),
2096 )
2097 .build_sync_response();
2098 client.receive_sync_response(response).await.unwrap();
2099 assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
2100
2101 let response = sync_builder
2102 .add_invited_room(InvitedRoomBuilder::new(room_id).add_state_event(
2103 StrippedStateTestEvent::Custom(json!({
2104 "content": {
2105 "displayname": "Alice",
2106 "membership": "invite",
2107 },
2108 "event_id": "$143273582443PhrSn:example.org",
2109 "origin_server_ts": 1432735824653u64,
2110 "sender": "@example:example.org",
2111 "state_key": user_id,
2112 "type": "m.room.member",
2113 })),
2114 ))
2115 .build_sync_response();
2116 client.receive_sync_response(response).await.unwrap();
2117 assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
2118 }
2119
2120 #[async_test]
2121 async fn test_invite_displayname() {
2122 let user_id = user_id!("@alice:example.org");
2123 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
2124
2125 let client = logged_in_base_client(Some(user_id)).await;
2126
2127 let response = ruma_response_from_json(&json!({
2128 "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
2129 "device_one_time_keys_count": {
2130 "signed_curve25519": 50u64
2131 },
2132 "device_unused_fallback_key_types": [
2133 "signed_curve25519"
2134 ],
2135 "rooms": {
2136 "invite": {
2137 "!ithpyNKDtmhneaTQja:example.org": {
2138 "invite_state": {
2139 "events": [
2140 {
2141 "content": {
2142 "creator": "@test:example.org",
2143 "room_version": "9"
2144 },
2145 "sender": "@test:example.org",
2146 "state_key": "",
2147 "type": "m.room.create"
2148 },
2149 {
2150 "content": {
2151 "join_rule": "invite"
2152 },
2153 "sender": "@test:example.org",
2154 "state_key": "",
2155 "type": "m.room.join_rules"
2156 },
2157 {
2158 "content": {
2159 "algorithm": "m.megolm.v1.aes-sha2"
2160 },
2161 "sender": "@test:example.org",
2162 "state_key": "",
2163 "type": "m.room.encryption"
2164 },
2165 {
2166 "content": {
2167 "avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
2168 "displayname": "Kyra",
2169 "membership": "join"
2170 },
2171 "sender": "@test:example.org",
2172 "state_key": "@test:example.org",
2173 "type": "m.room.member"
2174 },
2175 {
2176 "content": {
2177 "avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
2178 "displayname": "alice",
2179 "is_direct": true,
2180 "membership": "invite"
2181 },
2182 "origin_server_ts": 1650878657984u64,
2183 "sender": "@test:example.org",
2184 "state_key": "@alice:example.org",
2185 "type": "m.room.member",
2186 "unsigned": {
2187 "age": 14u64
2188 },
2189 "event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
2190 }
2191 ]
2192 }
2193 }
2194 }
2195 }
2196 }));
2197
2198 client.receive_sync_response(response).await.unwrap();
2199
2200 let room = client.get_room(room_id).expect("Room not found");
2201 assert_eq!(room.state(), RoomState::Invited);
2202 assert_eq!(
2203 room.compute_display_name().await.expect("fetching display name failed"),
2204 RoomDisplayName::Calculated("Kyra".to_owned())
2205 );
2206 }
2207
2208 #[cfg(feature = "e2e-encryption")]
2209 #[async_test]
2210 async fn test_when_there_are_no_latest_encrypted_events_decrypting_them_does_nothing() {
2211 use std::collections::BTreeMap;
2212
2213 use matrix_sdk_test::event_factory::EventFactory;
2214 use ruma::{event_id, events::room::member::MembershipState};
2215
2216 use crate::{rooms::normal::RoomInfoNotableUpdateReasons, StateChanges};
2217
2218 let user_id = user_id!("@u:u.to");
2220 let room_id = room_id!("!r:u.to");
2221 let client = logged_in_base_client(Some(user_id)).await;
2222
2223 let mut sync_builder = SyncResponseBuilder::new();
2224
2225 let response = sync_builder
2226 .add_joined_room(
2227 matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_timeline_event(
2228 EventFactory::new()
2229 .member(user_id)
2230 .display_name("Alice")
2231 .membership(MembershipState::Join)
2232 .event_id(event_id!("$1")),
2233 ),
2234 )
2235 .build_sync_response();
2236 client.receive_sync_response(response).await.unwrap();
2237
2238 let room = client.get_room(room_id).expect("Just-created room not found!");
2239
2240 assert!(room.latest_encrypted_events().is_empty());
2242 assert!(room.latest_event().is_none());
2243
2244 let mut changes = StateChanges::default();
2246 let mut room_info_notable_updates = BTreeMap::new();
2247 client.decrypt_latest_events(&room, &mut changes, &mut room_info_notable_updates).await;
2248
2249 assert!(room.latest_encrypted_events().is_empty());
2251 assert!(room.latest_event().is_none());
2252 assert!(changes.room_infos.is_empty());
2253 assert!(!room_info_notable_updates
2254 .get(room_id)
2255 .copied()
2256 .unwrap_or_default()
2257 .contains(RoomInfoNotableUpdateReasons::LATEST_EVENT));
2258 }
2259
2260 #[async_test]
2261 async fn test_deserialization_failure() {
2262 let user_id = user_id!("@alice:example.org");
2263 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
2264
2265 let client =
2266 BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
2267 client
2268 .activate(
2269 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
2270 #[cfg(feature = "e2e-encryption")]
2271 None,
2272 )
2273 .await
2274 .unwrap();
2275
2276 let response = ruma_response_from_json(&json!({
2277 "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
2278 "rooms": {
2279 "join": {
2280 "!ithpyNKDtmhneaTQja:example.org": {
2281 "state": {
2282 "events": [
2283 {
2284 "invalid": "invalid",
2285 },
2286 {
2287 "content": {
2288 "name": "The room name"
2289 },
2290 "event_id": "$143273582443PhrSn:example.org",
2291 "origin_server_ts": 1432735824653u64,
2292 "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
2293 "sender": "@example:example.org",
2294 "state_key": "",
2295 "type": "m.room.name",
2296 "unsigned": {
2297 "age": 1234
2298 }
2299 },
2300 ]
2301 }
2302 }
2303 }
2304 }
2305 }));
2306
2307 client.receive_sync_response(response).await.unwrap();
2308 client
2309 .state_store()
2310 .get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
2311 .await
2312 .expect("Failed to fetch state event")
2313 .expect("State event not found")
2314 .deserialize()
2315 .expect("Failed to deserialize state event");
2316 }
2317
2318 #[async_test]
2319 async fn test_invited_members_arent_ignored() {
2320 let user_id = user_id!("@alice:example.org");
2321 let inviter_user_id = user_id!("@bob:example.org");
2322 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
2323
2324 let client =
2325 BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
2326 client
2327 .activate(
2328 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
2329 #[cfg(feature = "e2e-encryption")]
2330 None,
2331 )
2332 .await
2333 .unwrap();
2334
2335 let mut sync_builder = SyncResponseBuilder::new();
2337 let response = sync_builder
2338 .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id))
2339 .build_sync_response();
2340 client.receive_sync_response(response).await.unwrap();
2341
2342 let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
2345
2346 let raw_member_event = json!({
2347 "content": {
2348 "avatar_url": "mxc://localhost/fewjilfewjil42",
2349 "displayname": "Invited Alice",
2350 "membership": "invite"
2351 },
2352 "event_id": "$151800140517rfvjc:localhost",
2353 "origin_server_ts": 151800140,
2354 "room_id": room_id,
2355 "sender": inviter_user_id,
2356 "state_key": user_id,
2357 "type": "m.room.member",
2358 "unsigned": {
2359 "age": 13374242,
2360 }
2361 });
2362 let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
2363 to_raw_value(&raw_member_event).unwrap(),
2364 )]);
2365
2366 client.receive_all_members(room_id, &request, &response).await.unwrap();
2368
2369 let room = client.get_room(room_id).unwrap();
2370
2371 let member = room.get_member(user_id).await.expect("ok").expect("exists");
2373
2374 assert_eq!(member.user_id(), user_id);
2375 assert_eq!(member.display_name().unwrap(), "Invited Alice");
2376 assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
2377 }
2378
2379 #[async_test]
2380 async fn test_reinvited_members_get_a_display_name() {
2381 let user_id = user_id!("@alice:example.org");
2382 let inviter_user_id = user_id!("@bob:example.org");
2383 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
2384
2385 let client =
2386 BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
2387 client
2388 .activate(
2389 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
2390 #[cfg(feature = "e2e-encryption")]
2391 None,
2392 )
2393 .await
2394 .unwrap();
2395
2396 let mut sync_builder = SyncResponseBuilder::new();
2398 let response = sync_builder
2399 .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_state_event(
2400 StateTestEvent::Custom(json!({
2401 "content": {
2402 "avatar_url": null,
2403 "displayname": null,
2404 "membership": "leave"
2405 },
2406 "event_id": "$151803140217rkvjc:localhost",
2407 "origin_server_ts": 151800139,
2408 "room_id": room_id,
2409 "sender": user_id,
2410 "state_key": user_id,
2411 "type": "m.room.member",
2412 })),
2413 ))
2414 .build_sync_response();
2415 client.receive_sync_response(response).await.unwrap();
2416
2417 let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
2419
2420 let raw_member_event = json!({
2421 "content": {
2422 "avatar_url": "mxc://localhost/fewjilfewjil42",
2423 "displayname": "Invited Alice",
2424 "membership": "invite"
2425 },
2426 "event_id": "$151800140517rfvjc:localhost",
2427 "origin_server_ts": 151800140,
2428 "room_id": room_id,
2429 "sender": inviter_user_id,
2430 "state_key": user_id,
2431 "type": "m.room.member",
2432 "unsigned": {
2433 "age": 13374242,
2434 }
2435 });
2436 let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
2437 to_raw_value(&raw_member_event).unwrap(),
2438 )]);
2439
2440 client.receive_all_members(room_id, &request, &response).await.unwrap();
2442
2443 let room = client.get_room(room_id).unwrap();
2444
2445 let member = room.get_member(user_id).await.expect("ok").expect("exists");
2447
2448 assert_eq!(member.user_id(), user_id);
2449 assert_eq!(member.display_name().unwrap(), "Invited Alice");
2450 assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
2451 }
2452
2453 #[async_test]
2454 async fn test_ignored_user_list_changes() {
2455 let user_id = user_id!("@alice:example.org");
2456 let client =
2457 BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
2458 client
2459 .activate(
2460 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
2461 #[cfg(feature = "e2e-encryption")]
2462 None,
2463 )
2464 .await
2465 .unwrap();
2466
2467 let mut subscriber = client.subscribe_to_ignore_user_list_changes();
2468 assert!(subscriber.next().now_or_never().is_none());
2469
2470 let mut sync_builder = SyncResponseBuilder::new();
2471 let response = sync_builder
2472 .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
2473 json!({
2474 "content": {
2475 "ignored_users": {
2476 *BOB: {}
2477 }
2478 },
2479 "type": "m.ignored_user_list",
2480 }),
2481 ))
2482 .build_sync_response();
2483 client.receive_sync_response(response).await.unwrap();
2484
2485 assert_let!(Some(ignored) = subscriber.next().await);
2486 assert_eq!(ignored, [BOB.to_string()]);
2487
2488 let response = sync_builder
2490 .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
2491 json!({
2492 "content": {
2493 "ignored_users": {
2494 *BOB: {}
2495 }
2496 },
2497 "type": "m.ignored_user_list",
2498 }),
2499 ))
2500 .build_sync_response();
2501 client.receive_sync_response(response).await.unwrap();
2502
2503 assert!(subscriber.next().now_or_never().is_none());
2505
2506 let response = sync_builder
2508 .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
2509 json!({
2510 "content": {
2511 "ignored_users": {}
2512 },
2513 "type": "m.ignored_user_list",
2514 }),
2515 ))
2516 .build_sync_response();
2517 client.receive_sync_response(response).await.unwrap();
2518
2519 assert_let!(Some(ignored) = subscriber.next().await);
2520 assert!(ignored.is_empty());
2521 }
2522}